diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-05-18 17:49:24 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-18 17:49:24 +0100 |
commit | 9ef30bb13b49db7120c153d42cc67859cf5ed7da (patch) | |
tree | b8dd26c09e7bd84145ba7982c95cecefd3673f64 | |
parent | 5c221f065565a995f1f21be48f96e2ebf69473ec (diff) |
Tweaks to latest events updater (#1045)
* Comment out updaters a bit, add overwrite flag to latest events
* Make sure we don't send fast-forwarded state changes over federation, start with empty set when overwriting
* Remove redundant check for overwrite
-rw-r--r-- | roomserver/internal/input_events.go | 40 | ||||
-rw-r--r-- | roomserver/internal/input_latest_events.go | 71 | ||||
-rw-r--r-- | roomserver/internal/input_membership.go | 9 | ||||
-rw-r--r-- | roomserver/types/types.go | 4 |
4 files changed, 101 insertions, 23 deletions
diff --git a/roomserver/internal/input_events.go b/roomserver/internal/input_events.go index f4d3bb8f..f5c678ca 100644 --- a/roomserver/internal/input_events.go +++ b/roomserver/internal/input_events.go @@ -52,13 +52,15 @@ func processRoomEvent( headered := input.Event event := headered.Unwrap() - // Check that the event passes authentication checks and work out the numeric IDs for the auth events. + // Check that the event passes authentication checks and work out + // the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) if err != nil { logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event") return } + // If we don't have a transaction ID then get one. if input.TransactionID != nil { tdID := input.TransactionID eventID, err = db.GetTransactionEventID( @@ -70,17 +72,21 @@ func processRoomEvent( } } - // Store the event + // Store the event. roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs) if err != nil { return } + // For outliers we can stop after we've stored the event itself as it + // doesn't have any associated state to store and we don't need to + // notify anyone about it. if input.Kind == api.KindOutlier { - // For outliers we can stop after we've stored the event itself as it - // doesn't have any associated state to store and we don't need to - // notify anyone about it. - logrus.WithField("event_id", event.EventID()).WithField("type", event.Type()).WithField("room", event.RoomID()).Info("Stored outlier") + logrus.WithFields(logrus.Fields{ + "event_id": event.EventID(), + "type": event.Type(), + "room": event.RoomID(), + }).Info("Stored outlier") return event.EventID(), nil } @@ -93,10 +99,21 @@ func processRoomEvent( } } + if err = updateLatestEvents( + ctx, // context + db, // roomserver database + ow, // output event writer + roomNID, // room NID to update + stateAtEvent, // state at event (below) + event, // event + input.SendAsServer, // send as server + input.TransactionID, // transaction ID + ); err != nil { + return + } + // Update the extremities of the event graph for the room - return event.EventID(), updateLatestEvents( - ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID, - ) + return event.EventID(), nil } func calculateAndSetState( @@ -111,6 +128,9 @@ func calculateAndSetState( roomState := state.NewStateResolution(db) if input.HasState { + // TODO: Check here if we think we're in the room already. + stateAtEvent.Overwrite = true + // We've been told what the state at the event is so we don't need to calculate it. // Check that those state events are in the database and store the state. var entries []types.StateEntry @@ -122,6 +142,8 @@ func calculateAndSetState( return err } } else { + stateAtEvent.Overwrite = false + // We haven't been told what the state at the event is so we need to calculate it from the prev_events if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, roomNID); err != nil { return err diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go index 42be0f40..6eeeedab 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input_latest_events.go @@ -69,10 +69,17 @@ func updateLatestEvents( }() u := latestEventsUpdater{ - ctx: ctx, db: db, updater: updater, ow: ow, roomNID: roomNID, - stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer, + ctx: ctx, + db: db, + updater: updater, + ow: ow, + roomNID: roomNID, + stateAtEvent: stateAtEvent, + event: event, + sendAsServer: sendAsServer, transactionID: transactionID, } + if err = u.doUpdateLatestEvents(); err != nil { return err } @@ -115,38 +122,65 @@ type latestEventsUpdater struct { func (u *latestEventsUpdater) doUpdateLatestEvents() error { prevEvents := u.event.PrevEvents() - oldLatest := u.updater.LatestEvents() u.lastEventIDSent = u.updater.LastEventIDSent() u.oldStateNID = u.updater.CurrentStateSnapshotNID() + // If we are doing a regular event update then we will get the + // previous latest events to use as a part of the calculation. If + // we are overwriting the latest events because we have a complete + // state snapshot from somewhere else, e.g. a federated room join, + // then start with an empty set - none of the forward extremities + // that we knew about before matter anymore. + oldLatest := []types.StateAtEventAndReference{} + if !u.stateAtEvent.Overwrite { + oldLatest = u.updater.LatestEvents() + } + + // If the event has already been written to the output log then we + // don't need to do anything, as we've handled it already. hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID) if err != nil { return err } else if hasBeenSent { - // Already sent this event so we can stop processing return nil } + // Update the roomserver_previous_events table with references. This + // is effectively tracking the structure of the DAG. if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil { return err } + // Get the event reference for our new event. This will be used when + // determining if the event is referenced by an existing event. eventReference := u.event.EventReference() - // Check if this event is already referenced by another event in the room. + + // Check if our new event is already referenced by an existing event + // in the room. If it is then it isn't a latest event. alreadyReferenced, err := u.updater.IsReferenced(eventReference) if err != nil { return err } - u.latest = calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{ - EventReference: eventReference, - StateAtEvent: u.stateAtEvent, - }) + // Work out what the latest events are. + u.latest = calculateLatest( + oldLatest, + alreadyReferenced, + prevEvents, + types.StateAtEventAndReference{ + EventReference: eventReference, + StateAtEvent: u.stateAtEvent, + }, + ) + // Now that we know what the latest events are, it's time to get the + // latest state. if err = u.latestState(); err != nil { return err } + // If we need to generate any output events then here's where we do it. + // TODO: Move this! updates, err := updateMemberships(u.ctx, u.db, u.updater, u.removed, u.added) if err != nil { return err @@ -181,10 +215,15 @@ func (u *latestEventsUpdater) latestState() error { var err error roomState := state.NewStateResolution(u.db) + // Get a list of the current latest events. latestStateAtEvents := make([]types.StateAtEvent, len(u.latest)) for i := range u.latest { latestStateAtEvents[i] = u.latest[i].StateAtEvent } + + // Takes the NIDs of the latest events and creates a state snapshot + // of the state after the events. The snapshot state will be resolved + // using the correct state resolution algorithm for the room. u.newStateNID, err = roomState.CalculateAndStoreStateAfterEvents( u.ctx, u.roomNID, latestStateAtEvents, ) @@ -192,6 +231,18 @@ func (u *latestEventsUpdater) latestState() error { return err } + // If we are overwriting the state then we should make sure that we + // don't send anything out over federation again, it will very likely + // be a repeat. + if u.stateAtEvent.Overwrite { + u.sendAsServer = "" + } + + // Now that we have a new state snapshot based on the latest events, + // we can compare that new snapshot to the previous one and see what + // has changed. This gives us one list of removed state events and + // another list of added ones. Replacing a value for a state-key tuple + // will result one removed (the old event) and one added (the new event). u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots( u.ctx, u.oldStateNID, u.newStateNID, ) @@ -199,6 +250,8 @@ func (u *latestEventsUpdater) latestState() error { return err } + // Also work out the state before the event removes and the event + // adds. u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = roomState.DifferenceBetweeenStateSnapshots( u.ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID, ) diff --git a/roomserver/internal/input_membership.go b/roomserver/internal/input_membership.go index a0029a28..666e7ebc 100644 --- a/roomserver/internal/input_membership.go +++ b/roomserver/internal/input_membership.go @@ -16,7 +16,6 @@ package internal import ( "context" - "errors" "fmt" "github.com/matrix-org/dendrite/roomserver/api" @@ -108,10 +107,10 @@ func updateMembership( } if add == nil { - // This shouldn't happen. Returning an error here is better than panicking - // in the membership updater functions later on. - // TODO: Why does this happen to begin with? - return updates, errors.New("add should not be nil") + // This can happen when we have rejoined a room and suddenly we have a + // divergence between the former state and the new one. We don't want to + // act on removals and apparently there are no adds, so stop here. + return updates, nil } mu, err := updater.MembershipUpdater(targetUserNID) diff --git a/roomserver/types/types.go b/roomserver/types/types.go index dfc112cf..da83f614 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -75,6 +75,10 @@ func (a StateEntry) LessThan(b StateEntry) bool { // StateAtEvent is the state before and after a matrix event. type StateAtEvent struct { + // Should this state overwrite the latest events and memberships of the room? + // This might be necessary when rejoining a federated room after a period of + // absence, as our state and latest events will be out of date. + Overwrite bool // The state before the event. BeforeStateSnapshotNID StateSnapshotNID // The state entry for the event itself, allows us to calculate the state after the event. |