aboutsummaryrefslogtreecommitdiff
path: root/roomserver/internal/input
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-21 15:37:07 +0100
committerGitHub <noreply@github.com>2020-10-21 15:37:07 +0100
commit534f9a9eb69ad35c7492c18c65cdbd55892bf930 (patch)
tree2eb0608e2084fc60599d719ff631ba4a412b1c63 /roomserver/internal/input
parente4f3f38f35ad14a581bed481352a6c1efeb56115 (diff)
Refactor forward extremities (#1556)
* Add resolve-state helper * Tweaks * Refactor forward extremities, again * Tweaks * Minor optimisation * Make path a bit clearer * Only process state/membership if forward extremities have changed * Usage comments in resolve-state
Diffstat (limited to 'roomserver/internal/input')
-rw-r--r--roomserver/internal/input/input_latest_events.go121
1 files changed, 73 insertions, 48 deletions
diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go
index 5631959b..a86e901d 100644
--- a/roomserver/internal/input/input_latest_events.go
+++ b/roomserver/internal/input/input_latest_events.go
@@ -17,7 +17,6 @@
package input
import (
- "bytes"
"context"
"fmt"
@@ -28,7 +27,6 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
- "github.com/sirupsen/logrus"
)
// updateLatestEvents updates the list of latest events for this room in the database and writes the
@@ -141,27 +139,30 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// Work out what the latest events are. This will include the new
// event if it is not already referenced.
- if err := u.calculateLatest(
- oldLatest,
+ extremitiesChanged, err := u.calculateLatest(
+ oldLatest, &u.event,
types.StateAtEventAndReference{
EventReference: u.event.EventReference(),
StateAtEvent: u.stateAtEvent,
},
- ); err != nil {
+ )
+ if err != nil {
return fmt.Errorf("u.calculateLatest: %w", err)
}
// Now that we know what the latest events are, it's time to get the
// latest state.
- if err := u.latestState(); err != nil {
- return fmt.Errorf("u.latestState: %w", err)
- }
+ var updates []api.OutputEvent
+ if extremitiesChanged {
+ if err = u.latestState(); err != nil {
+ return fmt.Errorf("u.latestState: %w", err)
+ }
- // If we need to generate any output events then here's where we do it.
- // TODO: Move this!
- updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added)
- if err != nil {
- return fmt.Errorf("u.api.updateMemberships: %w", err)
+ // If we need to generate any output events then here's where we do it.
+ // TODO: Move this!
+ if updates, err = u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added); err != nil {
+ return fmt.Errorf("u.api.updateMemberships: %w", err)
+ }
}
update, err := u.makeOutputNewRoomEvent()
@@ -250,50 +251,74 @@ func (u *latestEventsUpdater) latestState() error {
// true if the new event is included in those extremites, false otherwise.
func (u *latestEventsUpdater) calculateLatest(
oldLatest []types.StateAtEventAndReference,
- newEvent types.StateAtEventAndReference,
-) error {
- var newLatest []types.StateAtEventAndReference
-
- // First of all, let's see if any of the existing forward extremities
- // now have entries in the previous events table. If they do then we
- // will no longer include them as forward extremities.
- for _, l := range oldLatest {
- referenced, err := u.updater.IsReferenced(l.EventReference)
- if err != nil {
- logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", l.EventID)
- return fmt.Errorf("u.updater.IsReferenced (old): %w", err)
- } else if !referenced {
- newLatest = append(newLatest, l)
+ newEvent *gomatrixserverlib.Event,
+ newStateAndRef types.StateAtEventAndReference,
+) (bool, error) {
+ // First of all, get a list of all of the events in our current
+ // set of forward extremities.
+ existingRefs := make(map[string]*types.StateAtEventAndReference)
+ existingNIDs := make([]types.EventNID, len(oldLatest))
+ for i, old := range oldLatest {
+ existingRefs[old.EventID] = &oldLatest[i]
+ existingNIDs[i] = old.EventNID
+ }
+
+ // Look up the old extremity events. This allows us to find their
+ // prev events.
+ events, err := u.api.DB.Events(u.ctx, existingNIDs)
+ if err != nil {
+ return false, fmt.Errorf("u.api.DB.Events: %w", err)
+ }
+
+ // Make a list of all of the prev events as referenced by all of
+ // the current forward extremities.
+ existingPrevs := make(map[string]struct{})
+ for _, old := range events {
+ for _, prevEventID := range old.PrevEventIDs() {
+ existingPrevs[prevEventID] = struct{}{}
}
}
- // Then check and see if our new event is already included in that set.
- // This ordinarily won't happen but it covers the edge-case that we've
- // already seen this event before and it's a forward extremity, so rather
- // than adding a duplicate, we'll just return the set as complete.
- for _, l := range newLatest {
- if l.EventReference.EventID == newEvent.EventReference.EventID && bytes.Equal(l.EventReference.EventSHA256, newEvent.EventReference.EventSHA256) {
- // We've already referenced this new event so we can just return
- // the newly completed extremities at this point.
- u.latest = newLatest
- return nil
+ // If the "new" event is already referenced by a forward extremity
+ // then do nothing - it's not a candidate to be a new extremity if
+ // it has been referenced.
+ if _, ok := existingPrevs[newEvent.EventID()]; ok {
+ return false, nil
+ }
+
+ // If the "new" event is already a forward extremity then stop, as
+ // nothing changes.
+ for _, event := range events {
+ if event.EventID() == newEvent.EventID() {
+ return false, nil
}
}
- // At this point we've processed the old extremities, and we've checked
- // that our new event isn't already in that set. Therefore now we can
- // check if our *new* event is a forward extremity, and if it is, add
- // it in.
- referenced, err := u.updater.IsReferenced(newEvent.EventReference)
- if err != nil {
- logrus.WithError(err).Errorf("Failed to retrieve event reference for %q", newEvent.EventReference.EventID)
- return fmt.Errorf("u.updater.IsReferenced (new): %w", err)
- } else if !referenced || len(newLatest) == 0 {
- newLatest = append(newLatest, newEvent)
+ // Include our new event in the extremities.
+ newLatest := []types.StateAtEventAndReference{newStateAndRef}
+
+ // Then run through and see if the other extremities are still valid.
+ // If our new event references them then they are no longer good
+ // candidates.
+ for _, prevEventID := range newEvent.PrevEventIDs() {
+ delete(existingRefs, prevEventID)
+ }
+
+ // Ensure that we don't add any candidate forward extremities from
+ // the old set that are, themselves, referenced by the old set of
+ // forward extremities. This shouldn't happen but guards against
+ // the possibility anyway.
+ for prevEventID := range existingPrevs {
+ delete(existingRefs, prevEventID)
+ }
+
+ // Then re-add any old extremities that are still valid after all.
+ for _, old := range existingRefs {
+ newLatest = append(newLatest, *old)
}
u.latest = newLatest
- return nil
+ return true, nil
}
func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) {