diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-10-21 15:37:07 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-21 15:37:07 +0100 |
commit | 534f9a9eb69ad35c7492c18c65cdbd55892bf930 (patch) | |
tree | 2eb0608e2084fc60599d719ff631ba4a412b1c63 /roomserver/internal/input | |
parent | e4f3f38f35ad14a581bed481352a6c1efeb56115 (diff) |
Refactor forward extremities (#1556)
* Add resolve-state helper
* Tweaks
* Refactor forward extremities, again
* Tweaks
* Minor optimisation
* Make path a bit clearer
* Only process state/membership if forward extremities have changed
* Usage comments in resolve-state
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r-- | roomserver/internal/input/input_latest_events.go | 121 |
1 files changed, 73 insertions, 48 deletions
diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 5631959b..a86e901d 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -17,7 +17,6 @@ package input import ( - "bytes" "context" "fmt" @@ -28,7 +27,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - "github.com/sirupsen/logrus" ) // updateLatestEvents updates the list of latest events for this room in the database and writes the @@ -141,27 +139,30 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { // Work out what the latest events are. This will include the new // event if it is not already referenced. - if err := u.calculateLatest( - oldLatest, + extremitiesChanged, err := u.calculateLatest( + oldLatest, &u.event, types.StateAtEventAndReference{ EventReference: u.event.EventReference(), StateAtEvent: u.stateAtEvent, }, - ); err != nil { + ) + if err != nil { return fmt.Errorf("u.calculateLatest: %w", err) } // Now that we know what the latest events are, it's time to get the // latest state. - if err := u.latestState(); err != nil { - return fmt.Errorf("u.latestState: %w", err) - } + var updates []api.OutputEvent + if extremitiesChanged { + if err = u.latestState(); err != nil { + return fmt.Errorf("u.latestState: %w", err) + } - // If we need to generate any output events then here's where we do it. - // TODO: Move this! - updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added) - if err != nil { - return fmt.Errorf("u.api.updateMemberships: %w", err) + // If we need to generate any output events then here's where we do it. + // TODO: Move this! + if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil { + return fmt.Errorf("u.api.updateMemberships: %w", err) + } } update, err := u.makeOutputNewRoomEvent() @@ -250,50 +251,74 @@ func (u *latestEventsUpdater) latestState() error { // true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, - newEvent types.StateAtEventAndReference, -) error { - var newLatest []types.StateAtEventAndReference - - // First of all, let's see if any of the existing forward extremities - // now have entries in the previous events table. If they do then we - // will no longer include them as forward extremities. - for _, l := range oldLatest { - referenced, err := u.updater.IsReferenced(l.EventReference) - if err != nil { - logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID) - return fmt.Errorf("u.updater.IsReferenced (old): %w", err) - } else if !referenced { - newLatest = append(newLatest, l) + newEvent *gomatrixserverlib.Event, + newStateAndRef types.StateAtEventAndReference, +) (bool, error) { + // First of all, get a list of all of the events in our current + // set of forward extremities. + existingRefs := make(map[string]*types.StateAtEventAndReference) + existingNIDs := make([]types.EventNID, len(oldLatest)) + for i, old := range oldLatest { + existingRefs[old.EventID] = &oldLatest[i] + existingNIDs[i] = old.EventNID + } + + // Look up the old extremity events. This allows us to find their + // prev events. + events, err := u.api.DB.Events(u.ctx, existingNIDs) + if err != nil { + return false, fmt.Errorf("u.api.DB.Events: %w", err) + } + + // Make a list of all of the prev events as referenced by all of + // the current forward extremities. + existingPrevs := make(map[string]struct{}) + for _, old := range events { + for _, prevEventID := range old.PrevEventIDs() { + existingPrevs[prevEventID] = struct{}{} } } - // Then check and see if our new event is already included in that set. - // This ordinarily won't happen but it covers the edge-case that we've - // already seen this event before and it's a forward extremity, so rather - // than adding a duplicate, we'll just return the set as complete. - for _, l := range newLatest { - if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) { - // We've already referenced this new event so we can just return - // the newly completed extremities at this point. - u.latest = newLatest - return nil + // If the "new" event is already referenced by a forward extremity + // then do nothing - it's not a candidate to be a new extremity if + // it has been referenced. + if _, ok := existingPrevs[newEvent.EventID()]; ok { + return false, nil + } + + // If the "new" event is already a forward extremity then stop, as + // nothing changes. + for _, event := range events { + if event.EventID() == newEvent.EventID() { + return false, nil } } - // At this point we've processed the old extremities, and we've checked - // that our new event isn't already in that set. Therefore now we can - // check if our *new* event is a forward extremity, and if it is, add - // it in. - referenced, err := u.updater.IsReferenced(newEvent.EventReference) - if err != nil { - logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID) - return fmt.Errorf("u.updater.IsReferenced (new): %w", err) - } else if !referenced || len(newLatest) == 0 { - newLatest = append(newLatest, newEvent) + // Include our new event in the extremities. + newLatest := []types.StateAtEventAndReference{newStateAndRef} + + // Then run through and see if the other extremities are still valid. + // If our new event references them then they are no longer good + // candidates. + for _, prevEventID := range newEvent.PrevEventIDs() { + delete(existingRefs, prevEventID) + } + + // Ensure that we don't add any candidate forward extremities from + // the old set that are, themselves, referenced by the old set of + // forward extremities. This shouldn't happen but guards against + // the possibility anyway. + for prevEventID := range existingPrevs { + delete(existingRefs, prevEventID) + } + + // Then re-add any old extremities that are still valid after all. + for _, old := range existingRefs { + newLatest = append(newLatest, *old) } u.latest = newLatest - return nil + return true, nil } func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) { |