aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-19 14:59:13 +0100
committerGitHub <noreply@github.com>2020-10-19 14:59:13 +0100
commit6e63df1d9a3eadf924d518a1a02f04dfd03ad6b1 (patch)
treefdfab85a07f37c18b0545f042a8e70dedc1aa75b /roomserver
parent0974f6e2c055d8d06b5ea9c175252b22b2399fe2 (diff)
KindOld (#1531)
* Add KindOld * Don't process latest events/memberships for old events * Allow federationsender to ignore duplicate key entries when LatestEventIDs is duplicated by RS output events * Signal to downstream components if an event has become a forward extremity * Don't exclude from sync * Soft-fail checks on KindNew * Don't run the latest events updater at all for KindOld * Don't make federation sender change after all * Kind in federation sender join * Don't send isForwardExtremity * Fix syncapi * Update comments * Fix SendEventWithState * Update sytest-whitelist * Generate old output events * Sync API consumes old room events * Update comments
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/input.go16
-rw-r--r--roomserver/api/output.go18
-rw-r--r--roomserver/api/wrapper.go14
-rw-r--r--roomserver/internal/input/input_events.go39
-rw-r--r--roomserver/internal/input/input_latest_events.go11
-rw-r--r--roomserver/roomserver_test.go4
6 files changed, 73 insertions, 29 deletions
diff --git a/roomserver/api/input.go b/roomserver/api/input.go
index dd693203..e1a8afa0 100644
--- a/roomserver/api/input.go
+++ b/roomserver/api/input.go
@@ -21,17 +21,25 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
+type Kind int
+
const (
// KindOutlier event fall outside the contiguous event graph.
// We do not have the state for these events.
// These events are state events used to authenticate other events.
// They can become part of the contiguous event graph via backfill.
- KindOutlier = 1
+ KindOutlier Kind = iota + 1
// KindNew event extend the contiguous graph going forwards.
// They usually don't need state, but may include state if the
// there was a new event that references an event that we don't
- // have a copy of.
- KindNew = 2
+ // have a copy of. New events will influence the fwd extremities
+ // of the room and output events will be generated as a result.
+ KindNew
+ // KindOld event extend the graph backwards, or fill gaps in
+ // history. They may or may not include state. They will not be
+ // considered for forward extremities, and output events will NOT
+ // be generated for them.
+ KindOld
)
// DoNotSendToOtherServers tells us not to send the event to other matrix
@@ -43,7 +51,7 @@ const DoNotSendToOtherServers = ""
type InputRoomEvent struct {
// Whether this event is new, backfilled or an outlier.
// This controls how the event is processed.
- Kind int `json:"kind"`
+ Kind Kind `json:"kind"`
// The event JSON for the event to add.
Event gomatrixserverlib.HeaderedEvent `json:"event"`
// List of state event IDs that authenticate this event.
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index d57f3b04..9cb814a4 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -24,6 +24,8 @@ type OutputType string
const (
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
OutputTypeNewRoomEvent OutputType = "new_room_event"
+ // OutputTypeOldRoomEvent indicates that the event is an OutputOldRoomEvent
+ OutputTypeOldRoomEvent OutputType = "old_room_event"
// OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent
OutputTypeNewInviteEvent OutputType = "new_invite_event"
// OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent
@@ -58,6 +60,8 @@ type OutputEvent struct {
Type OutputType `json:"type"`
// The content of event with type OutputTypeNewRoomEvent
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
+ // The content of event with type OutputTypeOldRoomEvent
+ OldRoomEvent *OutputOldRoomEvent `json:"old_room_event,omitempty"`
// The content of event with type OutputTypeNewInviteEvent
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
// The content of event with type OutputTypeRetireInviteEvent
@@ -178,6 +182,20 @@ func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent {
return append(ore.AddStateEvents, ore.Event)
}
+// An OutputOldRoomEvent is written when the roomserver receives an old event.
+// This will typically happen as a result of getting either missing events
+// or backfilling. Downstream components may wish to send these events to
+// clients when it is advantageous to do so, but with the consideration that
+// the event is likely a historic event.
+//
+// Old events do not update forward extremities or the current room state,
+// therefore they must not be treated as if they do. Downstream components
+// should build their current room state up from OutputNewRoomEvents only.
+type OutputOldRoomEvent struct {
+ // The Event.
+ Event gomatrixserverlib.HeaderedEvent `json:"event"`
+}
+
// An OutputNewInviteEvent is written whenever an invite becomes active.
// Invite events can be received outside of an existing room so have to be
// tracked separately from the room events themselves.
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go
index a38c00df..9e821910 100644
--- a/roomserver/api/wrapper.go
+++ b/roomserver/api/wrapper.go
@@ -24,13 +24,14 @@ import (
// SendEvents to the roomserver The events are written with KindNew.
func SendEvents(
- ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, rsAPI RoomserverInternalAPI,
+ kind Kind, events []gomatrixserverlib.HeaderedEvent,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) error {
ires := make([]InputRoomEvent, len(events))
for i, event := range events {
ires[i] = InputRoomEvent{
- Kind: KindNew,
+ Kind: kind,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer),
@@ -40,12 +41,13 @@ func SendEvents(
return SendInputRoomEvents(ctx, rsAPI, ires)
}
-// SendEventWithState writes an event with KindNew to the roomserver
+// SendEventWithState writes an event with the specified kind to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs
func SendEventWithState(
- ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState,
- event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
+ ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
+ state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
+ haveEventIDs map[string]bool,
) error {
outliers, err := state.Events()
if err != nil {
@@ -70,7 +72,7 @@ func SendEventWithState(
}
ires = append(ires, InputRoomEvent{
- Kind: KindNew,
+ Kind: kind,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
HasState: true,
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 11334159..67031609 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -119,7 +119,7 @@ func (r *Inputer) processRoomEvent(
// We haven't calculated a state for this event yet.
// Lets calculate one.
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
- if err != nil {
+ if err != nil && input.Kind != api.KindOld {
return "", fmt.Errorf("r.calculateAndSetState: %w", err)
}
}
@@ -136,16 +136,31 @@ func (r *Inputer) processRoomEvent(
return event.EventID(), rejectionErr
}
- if err = r.updateLatestEvents(
- ctx, // context
- roomInfo, // room info for the room being updated
- stateAtEvent, // state at event (below)
- event, // event
- input.SendAsServer, // send as server
- input.TransactionID, // transaction ID
- input.HasState, // rewrites state?
- ); err != nil {
- return "", fmt.Errorf("r.updateLatestEvents: %w", err)
+ switch input.Kind {
+ case api.KindNew:
+ if err = r.updateLatestEvents(
+ ctx, // context
+ roomInfo, // room info for the room being updated
+ stateAtEvent, // state at event (below)
+ event, // event
+ input.SendAsServer, // send as server
+ input.TransactionID, // transaction ID
+ input.HasState, // rewrites state?
+ ); err != nil {
+ return "", fmt.Errorf("r.updateLatestEvents: %w", err)
+ }
+ case api.KindOld:
+ err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
+ {
+ Type: api.OutputTypeOldRoomEvent,
+ OldRoomEvent: &api.OutputOldRoomEvent{
+ Event: headered,
+ },
+ },
+ })
+ if err != nil {
+ return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err)
+ }
}
// processing this event resulted in an event (which may not be the one we're processing)
@@ -163,7 +178,7 @@ func (r *Inputer) processRoomEvent(
},
})
if err != nil {
- return "", fmt.Errorf("r.WriteOutputEvents: %w", err)
+ return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
}
}
diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go
index ca5d214d..5adcd087 100644
--- a/roomserver/internal/input/input_latest_events.go
+++ b/roomserver/internal/input/input_latest_events.go
@@ -164,8 +164,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
return fmt.Errorf("u.api.updateMemberships: %w", err)
}
- var update *api.OutputEvent
- update, err = u.makeOutputNewRoomEvent()
+ update, err := u.makeOutputNewRoomEvent()
if err != nil {
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
@@ -259,6 +258,8 @@ func (u *latestEventsUpdater) latestState() error {
return nil
}
+// calculateLatest works out the new set of forward extremities. Returns
+// true if the new event is included in those extremites, false otherwise.
func (u *latestEventsUpdater) calculateLatest(
oldLatest []types.StateAtEventAndReference,
newEvent types.StateAtEventAndReference,
@@ -326,7 +327,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
if err != nil {
return nil, err
}
-
for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
}
@@ -339,13 +339,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
for _, entry := range u.stateBeforeEventAdds {
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
}
+
ore.SendAsServer = u.sendAsServer
// include extra state events if they were added as nearly every downstream component will care about it
// and we'd rather not have them all hit QueryEventsByID at the same time!
if len(ore.AddsStateEventIDs) > 0 {
- ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs)
- if err != nil {
+ var err error
+ if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil {
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
}
}
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index 1b692a09..c8e60efa 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -191,7 +191,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js
t.Helper()
rsAPI, dp := mustCreateRoomserverAPI(t)
hevents := mustLoadRawEvents(t, ver, events)
- if err := api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil); err != nil {
+ if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil {
t.Errorf("failed to SendEvents: %s", err)
}
return rsAPI, dp, hevents
@@ -337,7 +337,7 @@ func TestOutputRewritesState(t *testing.T) {
deleteDatabase()
rsAPI, producer := mustCreateRoomserverAPI(t)
defer deleteDatabase()
- err := api.SendEvents(context.Background(), rsAPI, originalEvents, testOrigin, nil)
+ err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil)
if err != nil {
t.Fatalf("failed to send original events: %s", err)
}