diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-10-19 14:59:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-19 14:59:13 +0100 |
commit | 6e63df1d9a3eadf924d518a1a02f04dfd03ad6b1 (patch) | |
tree | fdfab85a07f37c18b0545f042a8e70dedc1aa75b /roomserver | |
parent | 0974f6e2c055d8d06b5ea9c175252b22b2399fe2 (diff) |
KindOld (#1531)
* Add KindOld
* Don't process latest events/memberships for old events
* Allow federationsender to ignore duplicate key entries when LatestEventIDs is duplicated by RS output events
* Signal to downstream components if an event has become a forward extremity
* Don't exclude from sync
* Soft-fail checks on KindNew
* Don't run the latest events updater at all for KindOld
* Don't make federation sender change after all
* Kind in federation sender join
* Don't send isForwardExtremity
* Fix syncapi
* Update comments
* Fix SendEventWithState
* Update sytest-whitelist
* Generate old output events
* Sync API consumes old room events
* Update comments
Diffstat (limited to 'roomserver')
-rw-r--r-- | roomserver/api/input.go | 16 | ||||
-rw-r--r-- | roomserver/api/output.go | 18 | ||||
-rw-r--r-- | roomserver/api/wrapper.go | 14 | ||||
-rw-r--r-- | roomserver/internal/input/input_events.go | 39 | ||||
-rw-r--r-- | roomserver/internal/input/input_latest_events.go | 11 | ||||
-rw-r--r-- | roomserver/roomserver_test.go | 4 |
6 files changed, 73 insertions, 29 deletions
diff --git a/roomserver/api/input.go b/roomserver/api/input.go index dd693203..e1a8afa0 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -21,17 +21,25 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) +type Kind int + const ( // KindOutlier event fall outside the contiguous event graph. // We do not have the state for these events. // These events are state events used to authenticate other events. // They can become part of the contiguous event graph via backfill. - KindOutlier = 1 + KindOutlier Kind = iota + 1 // KindNew event extend the contiguous graph going forwards. // They usually don't need state, but may include state if the // there was a new event that references an event that we don't - // have a copy of. - KindNew = 2 + // have a copy of. New events will influence the fwd extremities + // of the room and output events will be generated as a result. + KindNew + // KindOld event extend the graph backwards, or fill gaps in + // history. They may or may not include state. They will not be + // considered for forward extremities, and output events will NOT + // be generated for them. + KindOld ) // DoNotSendToOtherServers tells us not to send the event to other matrix @@ -43,7 +51,7 @@ const DoNotSendToOtherServers = "" type InputRoomEvent struct { // Whether this event is new, backfilled or an outlier. // This controls how the event is processed. - Kind int `json:"kind"` + Kind Kind `json:"kind"` // The event JSON for the event to add. Event gomatrixserverlib.HeaderedEvent `json:"event"` // List of state event IDs that authenticate this event. diff --git a/roomserver/api/output.go b/roomserver/api/output.go index d57f3b04..9cb814a4 100644 --- a/roomserver/api/output.go +++ b/roomserver/api/output.go @@ -24,6 +24,8 @@ type OutputType string const ( // OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent OutputTypeNewRoomEvent OutputType = "new_room_event" + // OutputTypeOldRoomEvent indicates that the event is an OutputOldRoomEvent + OutputTypeOldRoomEvent OutputType = "old_room_event" // OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent OutputTypeNewInviteEvent OutputType = "new_invite_event" // OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent @@ -58,6 +60,8 @@ type OutputEvent struct { Type OutputType `json:"type"` // The content of event with type OutputTypeNewRoomEvent NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"` + // The content of event with type OutputTypeOldRoomEvent + OldRoomEvent *OutputOldRoomEvent `json:"old_room_event,omitempty"` // The content of event with type OutputTypeNewInviteEvent NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"` // The content of event with type OutputTypeRetireInviteEvent @@ -178,6 +182,20 @@ func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent { return append(ore.AddStateEvents, ore.Event) } +// An OutputOldRoomEvent is written when the roomserver receives an old event. +// This will typically happen as a result of getting either missing events +// or backfilling. Downstream components may wish to send these events to +// clients when it is advantageous to do so, but with the consideration that +// the event is likely a historic event. +// +// Old events do not update forward extremities or the current room state, +// therefore they must not be treated as if they do. Downstream components +// should build their current room state up from OutputNewRoomEvents only. +type OutputOldRoomEvent struct { + // The Event. + Event gomatrixserverlib.HeaderedEvent `json:"event"` +} + // An OutputNewInviteEvent is written whenever an invite becomes active. // Invite events can be received outside of an existing room so have to be // tracked separately from the room events themselves. diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index a38c00df..9e821910 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -24,13 +24,14 @@ import ( // SendEvents to the roomserver The events are written with KindNew. func SendEvents( - ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent, + ctx context.Context, rsAPI RoomserverInternalAPI, + kind Kind, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, ) error { ires := make([]InputRoomEvent, len(events)) for i, event := range events { ires[i] = InputRoomEvent{ - Kind: KindNew, + Kind: kind, Event: event, AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), @@ -40,12 +41,13 @@ func SendEvents( return SendInputRoomEvents(ctx, rsAPI, ires) } -// SendEventWithState writes an event with KindNew to the roomserver +// SendEventWithState writes an event with the specified kind to the roomserver // with the state at the event as KindOutlier before it. Will not send any event that is // marked as `true` in haveEventIDs func SendEventWithState( - ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState, - event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, + ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, + state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, + haveEventIDs map[string]bool, ) error { outliers, err := state.Events() if err != nil { @@ -70,7 +72,7 @@ func SendEventWithState( } ires = append(ires, InputRoomEvent{ - Kind: KindNew, + Kind: kind, Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 11334159..67031609 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -119,7 +119,7 @@ func (r *Inputer) processRoomEvent( // We haven't calculated a state for this event yet. // Lets calculate one. err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected) - if err != nil { + if err != nil && input.Kind != api.KindOld { return "", fmt.Errorf("r.calculateAndSetState: %w", err) } } @@ -136,16 +136,31 @@ func (r *Inputer) processRoomEvent( return event.EventID(), rejectionErr } - if err = r.updateLatestEvents( - ctx, // context - roomInfo, // room info for the room being updated - stateAtEvent, // state at event (below) - event, // event - input.SendAsServer, // send as server - input.TransactionID, // transaction ID - input.HasState, // rewrites state? - ); err != nil { - return "", fmt.Errorf("r.updateLatestEvents: %w", err) + switch input.Kind { + case api.KindNew: + if err = r.updateLatestEvents( + ctx, // context + roomInfo, // room info for the room being updated + stateAtEvent, // state at event (below) + event, // event + input.SendAsServer, // send as server + input.TransactionID, // transaction ID + input.HasState, // rewrites state? + ); err != nil { + return "", fmt.Errorf("r.updateLatestEvents: %w", err) + } + case api.KindOld: + err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ + { + Type: api.OutputTypeOldRoomEvent, + OldRoomEvent: &api.OutputOldRoomEvent{ + Event: headered, + }, + }, + }) + if err != nil { + return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err) + } } // processing this event resulted in an event (which may not be the one we're processing) @@ -163,7 +178,7 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return "", fmt.Errorf("r.WriteOutputEvents: %w", err) + return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) } } diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index ca5d214d..5adcd087 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -164,8 +164,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { return fmt.Errorf("u.api.updateMemberships: %w", err) } - var update *api.OutputEvent - update, err = u.makeOutputNewRoomEvent() + update, err := u.makeOutputNewRoomEvent() if err != nil { return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err) } @@ -259,6 +258,8 @@ func (u *latestEventsUpdater) latestState() error { return nil } +// calculateLatest works out the new set of forward extremities. Returns +// true if the new event is included in those extremites, false otherwise. func (u *latestEventsUpdater) calculateLatest( oldLatest []types.StateAtEventAndReference, newEvent types.StateAtEventAndReference, @@ -326,7 +327,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) if err != nil { return nil, err } - for _, entry := range u.added { ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID]) } @@ -339,13 +339,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error) for _, entry := range u.stateBeforeEventAdds { ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID]) } + ore.SendAsServer = u.sendAsServer // include extra state events if they were added as nearly every downstream component will care about it // and we'd rather not have them all hit QueryEventsByID at the same time! if len(ore.AddsStateEventIDs) > 0 { - ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs) - if err != nil { + var err error + if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil { return nil, fmt.Errorf("failed to load add_state_events from db: %w", err) } } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 1b692a09..c8e60efa 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -191,7 +191,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js t.Helper() rsAPI, dp := mustCreateRoomserverAPI(t) hevents := mustLoadRawEvents(t, ver, events) - if err := api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil); err != nil { + if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil { t.Errorf("failed to SendEvents: %s", err) } return rsAPI, dp, hevents @@ -337,7 +337,7 @@ func TestOutputRewritesState(t *testing.T) { deleteDatabase() rsAPI, producer := mustCreateRoomserverAPI(t) defer deleteDatabase() - err := api.SendEvents(context.Background(), rsAPI, originalEvents, testOrigin, nil) + err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil) if err != nil { t.Fatalf("failed to send original events: %s", err) } |