aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-10-20 11:42:54 +0100
committerGitHub <noreply@github.com>2020-10-20 11:42:54 +0100
commiteb86e2b336e8feaf54eb7fe688c896aa11bd5329 (patch)
tree5a23caffab7290c0f6fd7065d91a6943fd61c0b8
parent92982a402ff553fe6d9179859a96ec33e2f09929 (diff)
Fix sqlite locking bugs present on sytest (#1543)
* Fix sqite locking bugs present on sytest Comments do the explaining. * Fix deadlock in sqlite mode Caused by starting a writer whilst within a writer * Only complain about invalid state deltas for non-overwrite events * Do not re-process outlier unnecessarily
-rw-r--r--roomserver/internal/input/input_events.go24
-rw-r--r--roomserver/internal/input/input_latest_events.go2
-rw-r--r--roomserver/storage/shared/latest_events_updater.go14
-rw-r--r--roomserver/storage/shared/storage.go25
4 files changed, 52 insertions, 13 deletions
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 67031609..6a5d9d26 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -17,6 +17,7 @@
package input
import (
+ "bytes"
"context"
"fmt"
@@ -26,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
@@ -44,6 +46,28 @@ func (r *Inputer) processRoomEvent(
headered := input.Event
event := headered.Unwrap()
+ // if we have already got this event then do not process it again, if the input kind is an outlier.
+ // Outliers contain no extra information which may warrant a re-processing.
+ if input.Kind == api.KindOutlier {
+ evs, err := r.DB.EventsFromIDs(ctx, []string{event.EventID()})
+ if err == nil && len(evs) == 1 {
+ // check hash matches if we're on early room versions where the event ID was a random string
+ idFormat, err := headered.RoomVersion.EventIDFormat()
+ if err == nil {
+ switch idFormat {
+ case gomatrixserverlib.EventIDFormatV1:
+ if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
+ util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
+ return event.EventID(), nil
+ }
+ default:
+ util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
+ return event.EventID(), nil
+ }
+ }
+ }
+ }
+
// Check that the event passes authentication checks and work out
// the numeric IDs for the auth events.
isRejected := false
diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go
index 5adcd087..f76b0a0b 100644
--- a/roomserver/internal/input/input_latest_events.go
+++ b/roomserver/internal/input/input_latest_events.go
@@ -233,7 +233,7 @@ func (u *latestEventsUpdater) latestState() error {
if err != nil {
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
}
- if len(u.removed) > len(u.added) {
+ if !u.stateAtEvent.Overwrite && len(u.removed) > len(u.added) {
// This really shouldn't happen.
// TODO: What is ultimately the best way to handle this situation?
logrus.Errorf(
diff --git a/roomserver/storage/shared/latest_events_updater.go b/roomserver/storage/shared/latest_events_updater.go
index b316f639..8825dc46 100644
--- a/roomserver/storage/shared/latest_events_updater.go
+++ b/roomserver/storage/shared/latest_events_updater.go
@@ -70,16 +70,14 @@ func (u *LatestEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID {
return u.currentStateSnapshotNID
}
-// StorePreviousEvents implements types.RoomRecentEventsUpdater
+// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer
func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
- return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
- for _, ref := range previousEventReferences {
- if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
- return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err)
- }
+ for _, ref := range previousEventReferences {
+ if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
+ return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err)
}
- return nil
- })
+ }
+ return nil
}
// IsReferenced implements types.RoomRecentEventsUpdater
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index f2be8b3c..51dcb888 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -492,15 +492,32 @@ func (d *Database) StoreEvent(
if roomInfo == nil && len(prevEvents) > 0 {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
}
+ // Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of
+ // GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This
+ // function only does SELECTs though so the created txn (at this point) is just a read txn like
+ // any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
+ // to do writes however then this will need to go inside `Writer.Do`.
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
}
- if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
- return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
+ // Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents
+ // and EndTransaction in a writer then it's possible for a new write txn to be made between the two
+ // function calls which will then fail with 'database is locked'. This new write txn would HAVE to be
+ // something like SetRoomAlias/RemoveRoomAlias as normal input events are already done sequentially due to
+ // SupportsConcurrentRoomInputs() == false on sqlite, though this does not apply to setting room aliases
+ // as they don't go via InputRoomEvents
+ err = d.Writer.Do(d.DB, updater.txn, func(txn *sql.Tx) error {
+ if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
+ return fmt.Errorf("updater.StorePreviousEvents: %w", err)
+ }
+ succeeded := true
+ err = sqlutil.EndTransaction(updater, &succeeded)
+ return err
+ })
+ if err != nil {
+ return 0, types.StateAtEvent{}, nil, "", err
}
- succeeded := true
- err = sqlutil.EndTransaction(updater, &succeeded)
}
return roomNID, types.StateAtEvent{