aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-05-18 17:49:24 +0100
committerGitHub <noreply@github.com>2020-05-18 17:49:24 +0100
commit9ef30bb13b49db7120c153d42cc67859cf5ed7da (patch)
treeb8dd26c09e7bd84145ba7982c95cecefd3673f64
parent5c221f065565a995f1f21be48f96e2ebf69473ec (diff)
Tweaks to latest events updater (#1045)
* Comment out updaters a bit, add overwrite flag to latest events * Make sure we don't send fast-forwarded state changes over federation, start with empty set when overwriting * Remove redundant check for overwrite
-rw-r--r--roomserver/internal/input_events.go40
-rw-r--r--roomserver/internal/input_latest_events.go71
-rw-r--r--roomserver/internal/input_membership.go9
-rw-r--r--roomserver/types/types.go4
4 files changed, 101 insertions, 23 deletions
diff --git a/roomserver/internal/input_events.go b/roomserver/internal/input_events.go
index f4d3bb8f..f5c678ca 100644
--- a/roomserver/internal/input_events.go
+++ b/roomserver/internal/input_events.go
@@ -52,13 +52,15 @@ func processRoomEvent(
headered := input.Event
event := headered.Unwrap()
- // Check that the event passes authentication checks and work out the numeric IDs for the auth events.
+ // Check that the event passes authentication checks and work out
+ // the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
if err != nil {
logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event")
return
}
+ // If we don't have a transaction ID then get one.
if input.TransactionID != nil {
tdID := input.TransactionID
eventID, err = db.GetTransactionEventID(
@@ -70,17 +72,21 @@ func processRoomEvent(
}
}
- // Store the event
+ // Store the event.
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
if err != nil {
return
}
+ // For outliers we can stop after we've stored the event itself as it
+ // doesn't have any associated state to store and we don't need to
+ // notify anyone about it.
if input.Kind == api.KindOutlier {
- // For outliers we can stop after we've stored the event itself as it
- // doesn't have any associated state to store and we don't need to
- // notify anyone about it.
- logrus.WithField("event_id", event.EventID()).WithField("type", event.Type()).WithField("room", event.RoomID()).Info("Stored outlier")
+ logrus.WithFields(logrus.Fields{
+ "event_id": event.EventID(),
+ "type": event.Type(),
+ "room": event.RoomID(),
+ }).Info("Stored outlier")
return event.EventID(), nil
}
@@ -93,10 +99,21 @@ func processRoomEvent(
}
}
+ if err = updateLatestEvents(
+ ctx, // context
+ db, // roomserver database
+ ow, // output event writer
+ roomNID, // room NID to update
+ stateAtEvent, // state at event (below)
+ event, // event
+ input.SendAsServer, // send as server
+ input.TransactionID, // transaction ID
+ ); err != nil {
+ return
+ }
+
// Update the extremities of the event graph for the room
- return event.EventID(), updateLatestEvents(
- ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,
- )
+ return event.EventID(), nil
}
func calculateAndSetState(
@@ -111,6 +128,9 @@ func calculateAndSetState(
roomState := state.NewStateResolution(db)
if input.HasState {
+ // TODO: Check here if we think we're in the room already.
+ stateAtEvent.Overwrite = true
+
// We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
@@ -122,6 +142,8 @@ func calculateAndSetState(
return err
}
} else {
+ stateAtEvent.Overwrite = false
+
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, roomNID); err != nil {
return err
diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input_latest_events.go
index 42be0f40..6eeeedab 100644
--- a/roomserver/internal/input_latest_events.go
+++ b/roomserver/internal/input_latest_events.go
@@ -69,10 +69,17 @@ func updateLatestEvents(
}()
u := latestEventsUpdater{
- ctx: ctx, db: db, updater: updater, ow: ow, roomNID: roomNID,
- stateAtEvent: stateAtEvent, event: event, sendAsServer: sendAsServer,
+ ctx: ctx,
+ db: db,
+ updater: updater,
+ ow: ow,
+ roomNID: roomNID,
+ stateAtEvent: stateAtEvent,
+ event: event,
+ sendAsServer: sendAsServer,
transactionID: transactionID,
}
+
if err = u.doUpdateLatestEvents(); err != nil {
return err
}
@@ -115,38 +122,65 @@ type latestEventsUpdater struct {
func (u *latestEventsUpdater) doUpdateLatestEvents() error {
prevEvents := u.event.PrevEvents()
- oldLatest := u.updater.LatestEvents()
u.lastEventIDSent = u.updater.LastEventIDSent()
u.oldStateNID = u.updater.CurrentStateSnapshotNID()
+ // If we are doing a regular event update then we will get the
+ // previous latest events to use as a part of the calculation. If
+ // we are overwriting the latest events because we have a complete
+ // state snapshot from somewhere else, e.g. a federated room join,
+ // then start with an empty set - none of the forward extremities
+ // that we knew about before matter anymore.
+ oldLatest := []types.StateAtEventAndReference{}
+ if !u.stateAtEvent.Overwrite {
+ oldLatest = u.updater.LatestEvents()
+ }
+
+ // If the event has already been written to the output log then we
+ // don't need to do anything, as we've handled it already.
hasBeenSent, err := u.updater.HasEventBeenSent(u.stateAtEvent.EventNID)
if err != nil {
return err
} else if hasBeenSent {
- // Already sent this event so we can stop processing
return nil
}
+ // Update the roomserver_previous_events table with references. This
+ // is effectively tracking the structure of the DAG.
if err = u.updater.StorePreviousEvents(u.stateAtEvent.EventNID, prevEvents); err != nil {
return err
}
+ // Get the event reference for our new event. This will be used when
+ // determining if the event is referenced by an existing event.
eventReference := u.event.EventReference()
- // Check if this event is already referenced by another event in the room.
+
+ // Check if our new event is already referenced by an existing event
+ // in the room. If it is then it isn't a latest event.
alreadyReferenced, err := u.updater.IsReferenced(eventReference)
if err != nil {
return err
}
- u.latest = calculateLatest(oldLatest, alreadyReferenced, prevEvents, types.StateAtEventAndReference{
- EventReference: eventReference,
- StateAtEvent: u.stateAtEvent,
- })
+ // Work out what the latest events are.
+ u.latest = calculateLatest(
+ oldLatest,
+ alreadyReferenced,
+ prevEvents,
+ types.StateAtEventAndReference{
+ EventReference: eventReference,
+ StateAtEvent: u.stateAtEvent,
+ },
+ )
+ // Now that we know what the latest events are, it's time to get the
+ // latest state.
if err = u.latestState(); err != nil {
return err
}
+ // If we need to generate any output events then here's where we do it.
+ // TODO: Move this!
updates, err := updateMemberships(u.ctx, u.db, u.updater, u.removed, u.added)
if err != nil {
return err
@@ -181,10 +215,15 @@ func (u *latestEventsUpdater) latestState() error {
var err error
roomState := state.NewStateResolution(u.db)
+ // Get a list of the current latest events.
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
for i := range u.latest {
latestStateAtEvents[i] = u.latest[i].StateAtEvent
}
+
+ // Takes the NIDs of the latest events and creates a state snapshot
+ // of the state after the events. The snapshot state will be resolved
+ // using the correct state resolution algorithm for the room.
u.newStateNID, err = roomState.CalculateAndStoreStateAfterEvents(
u.ctx, u.roomNID, latestStateAtEvents,
)
@@ -192,6 +231,18 @@ func (u *latestEventsUpdater) latestState() error {
return err
}
+ // If we are overwriting the state then we should make sure that we
+ // don't send anything out over federation again, it will very likely
+ // be a repeat.
+ if u.stateAtEvent.Overwrite {
+ u.sendAsServer = ""
+ }
+
+ // Now that we have a new state snapshot based on the latest events,
+ // we can compare that new snapshot to the previous one and see what
+ // has changed. This gives us one list of removed state events and
+ // another list of added ones. Replacing a value for a state-key tuple
+ // will result one removed (the old event) and one added (the new event).
u.removed, u.added, err = roomState.DifferenceBetweeenStateSnapshots(
u.ctx, u.oldStateNID, u.newStateNID,
)
@@ -199,6 +250,8 @@ func (u *latestEventsUpdater) latestState() error {
return err
}
+ // Also work out the state before the event removes and the event
+ // adds.
u.stateBeforeEventRemoves, u.stateBeforeEventAdds, err = roomState.DifferenceBetweeenStateSnapshots(
u.ctx, u.newStateNID, u.stateAtEvent.BeforeStateSnapshotNID,
)
diff --git a/roomserver/internal/input_membership.go b/roomserver/internal/input_membership.go
index a0029a28..666e7ebc 100644
--- a/roomserver/internal/input_membership.go
+++ b/roomserver/internal/input_membership.go
@@ -16,7 +16,6 @@ package internal
import (
"context"
- "errors"
"fmt"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -108,10 +107,10 @@ func updateMembership(
}
if add == nil {
- // This shouldn't happen. Returning an error here is better than panicking
- // in the membership updater functions later on.
- // TODO: Why does this happen to begin with?
- return updates, errors.New("add should not be nil")
+ // This can happen when we have rejoined a room and suddenly we have a
+ // divergence between the former state and the new one. We don't want to
+ // act on removals and apparently there are no adds, so stop here.
+ return updates, nil
}
mu, err := updater.MembershipUpdater(targetUserNID)
diff --git a/roomserver/types/types.go b/roomserver/types/types.go
index dfc112cf..da83f614 100644
--- a/roomserver/types/types.go
+++ b/roomserver/types/types.go
@@ -75,6 +75,10 @@ func (a StateEntry) LessThan(b StateEntry) bool {
// StateAtEvent is the state before and after a matrix event.
type StateAtEvent struct {
+ // Should this state overwrite the latest events and memberships of the room?
+ // This might be necessary when rejoining a federated room after a period of
+ // absence, as our state and latest events will be out of date.
+ Overwrite bool
// The state before the event.
BeforeStateSnapshotNID StateSnapshotNID
// The state entry for the event itself, allows us to calculate the state after the event.