aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-19 14:59:13 +0100
committerGitHub <noreply@github.com>2020-10-19 14:59:13 +0100
commit6e63df1d9a3eadf924d518a1a02f04dfd03ad6b1 (patch)
treefdfab85a07f37c18b0545f042a8e70dedc1aa75b
parent0974f6e2c055d8d06b5ea9c175252b22b2399fe2 (diff)
KindOld (#1531)
* Add KindOld * Don't process latest events/memberships for old events * Allow federationsender to ignore duplicate key entries when LatestEventIDs is duplicated by RS output events * Signal to downstream components if an event has become a forward extremity * Don't exclude from sync * Soft-fail checks on KindNew * Don't run the latest events updater at all for KindOld * Don't make federation sender change after all * Kind in federation sender join * Don't send isForwardExtremity * Fix syncapi * Update comments * Fix SendEventWithState * Update sytest-whitelist * Generate old output events * Sync API consumes old room events * Update comments
-rw-r--r--clientapi/routing/createroom.go1
-rw-r--r--clientapi/routing/membership.go1
-rw-r--r--clientapi/routing/profile.go4
-rw-r--r--clientapi/routing/redaction.go2
-rw-r--r--clientapi/routing/sendevent.go1
-rw-r--r--clientapi/threepid/invites.go1
-rw-r--r--federationapi/routing/join.go1
-rw-r--r--federationapi/routing/leave.go1
-rw-r--r--federationapi/routing/send.go3
-rw-r--r--federationapi/routing/threepid.go3
-rw-r--r--federationsender/internal/perform.go1
-rw-r--r--roomserver/api/input.go16
-rw-r--r--roomserver/api/output.go18
-rw-r--r--roomserver/api/wrapper.go14
-rw-r--r--roomserver/internal/input/input_events.go39
-rw-r--r--roomserver/internal/input/input_latest_events.go11
-rw-r--r--roomserver/roomserver_test.go4
-rw-r--r--syncapi/consumers/roomserver.go37
-rw-r--r--sytest-whitelist1
19 files changed, 125 insertions, 34 deletions
diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go
index 9655339c..cff3c981 100644
--- a/clientapi/routing/createroom.go
+++ b/clientapi/routing/createroom.go
@@ -344,6 +344,7 @@ func createRoom(
if err = roomserverAPI.SendEventWithState(
req.Context(),
rsAPI,
+ roomserverAPI.KindNew,
&gomatrixserverlib.RespState{
StateEvents: accumulated,
AuthEvents: accumulated,
diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go
index 88cb2364..fe079557 100644
--- a/clientapi/routing/membership.go
+++ b/clientapi/routing/membership.go
@@ -76,6 +76,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us
if err = roomserverAPI.SendEvents(
ctx, rsAPI,
+ api.KindNew,
[]gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)},
cfg.Matrix.ServerName,
nil,
diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go
index 60669a0c..bbe35fac 100644
--- a/clientapi/routing/profile.go
+++ b/clientapi/routing/profile.go
@@ -170,7 +170,7 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
- if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
+ if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -288,7 +288,7 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
- if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
+ if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go
index 9701685e..266c0aff 100644
--- a/clientapi/routing/redaction.go
+++ b/clientapi/routing/redaction.go
@@ -121,7 +121,7 @@ func SendRedaction(
JSON: jsonerror.NotFound("Room does not exist"),
}
}
- if err = roomserverAPI.SendEvents(context.Background(), rsAPI, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil {
+ if err = roomserverAPI.SendEvents(context.Background(), rsAPI, api.KindNew, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go
index 9744a564..1303663f 100644
--- a/clientapi/routing/sendevent.go
+++ b/clientapi/routing/sendevent.go
@@ -92,6 +92,7 @@ func SendEvent(
// event ID in case of duplicate transaction is discarded
if err := api.SendEvents(
req.Context(), rsAPI,
+ api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(verRes.RoomVersion),
},
diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go
index b9575a28..272d3407 100644
--- a/clientapi/threepid/invites.go
+++ b/clientapi/threepid/invites.go
@@ -361,6 +361,7 @@ func emit3PIDInviteEvent(
return api.SendEvents(
ctx, rsAPI,
+ api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion),
},
diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go
index c637116f..12f20536 100644
--- a/federationapi/routing/join.go
+++ b/federationapi/routing/join.go
@@ -290,6 +290,7 @@ func SendJoin(
if !alreadyJoined {
if err = api.SendEvents(
httpReq.Context(), rsAPI,
+ api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
event.Headered(stateAndAuthChainResponse.RoomVersion),
},
diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go
index e16dfcc2..fb81d931 100644
--- a/federationapi/routing/leave.go
+++ b/federationapi/routing/leave.go
@@ -256,6 +256,7 @@ func SendLeave(
// the room, so set SendAsServer to cfg.Matrix.ServerName
if err = api.SendEvents(
httpReq.Context(), rsAPI,
+ api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
event.Headered(verRes.RoomVersion),
},
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 783fdc3b..76dc3a2e 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -403,6 +403,7 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event) er
return api.SendEvents(
context.Background(),
t.rsAPI,
+ api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion),
},
@@ -586,6 +587,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
err = api.SendEventWithState(
context.Background(),
t.rsAPI,
+ api.KindOld,
resolvedState,
backwardsExtremity.Headered(roomVersion),
t.haveEventIDs(),
@@ -605,6 +607,7 @@ func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixser
if err = api.SendEvents(
context.Background(),
t.rsAPI,
+ api.KindOld,
append(headeredNewEvents, e.Headered(roomVersion)),
api.DoNotSendToOtherServers,
nil,
diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go
index ec6cc148..4db5273a 100644
--- a/federationapi/routing/threepid.go
+++ b/federationapi/routing/threepid.go
@@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites(
}
// Send all the events
- if err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil {
+ if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -174,6 +174,7 @@ func ExchangeThirdPartyInvite(
// Send the event to the roomserver
if err = api.SendEvents(
httpReq.Context(), rsAPI,
+ api.KindNew,
[]gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion),
},
diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go
index 254883e6..3904ab85 100644
--- a/federationsender/internal/perform.go
+++ b/federationsender/internal/perform.go
@@ -248,6 +248,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// returned state to the roomserver to update our local view.
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
+ roomserverAPI.KindNew,
respState,
event.Headered(respMakeJoin.RoomVersion),
nil,
diff --git a/roomserver/api/input.go b/roomserver/api/input.go
index dd693203..e1a8afa0 100644
--- a/roomserver/api/input.go
+++ b/roomserver/api/input.go
@@ -21,17 +21,25 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
+type Kind int
+
const (
// KindOutlier event fall outside the contiguous event graph.
// We do not have the state for these events.
// These events are state events used to authenticate other events.
// They can become part of the contiguous event graph via backfill.
- KindOutlier = 1
+ KindOutlier Kind = iota + 1
// KindNew event extend the contiguous graph going forwards.
// They usually don't need state, but may include state if the
// there was a new event that references an event that we don't
- // have a copy of.
- KindNew = 2
+ // have a copy of. New events will influence the fwd extremities
+ // of the room and output events will be generated as a result.
+ KindNew
+ // KindOld event extend the graph backwards, or fill gaps in
+ // history. They may or may not include state. They will not be
+ // considered for forward extremities, and output events will NOT
+ // be generated for them.
+ KindOld
)
// DoNotSendToOtherServers tells us not to send the event to other matrix
@@ -43,7 +51,7 @@ const DoNotSendToOtherServers = ""
type InputRoomEvent struct {
// Whether this event is new, backfilled or an outlier.
// This controls how the event is processed.
- Kind int `json:"kind"`
+ Kind Kind `json:"kind"`
// The event JSON for the event to add.
Event gomatrixserverlib.HeaderedEvent `json:"event"`
// List of state event IDs that authenticate this event.
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index d57f3b04..9cb814a4 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -24,6 +24,8 @@ type OutputType string
const (
// OutputTypeNewRoomEvent indicates that the event is an OutputNewRoomEvent
OutputTypeNewRoomEvent OutputType = "new_room_event"
+ // OutputTypeOldRoomEvent indicates that the event is an OutputOldRoomEvent
+ OutputTypeOldRoomEvent OutputType = "old_room_event"
// OutputTypeNewInviteEvent indicates that the event is an OutputNewInviteEvent
OutputTypeNewInviteEvent OutputType = "new_invite_event"
// OutputTypeRetireInviteEvent indicates that the event is an OutputRetireInviteEvent
@@ -58,6 +60,8 @@ type OutputEvent struct {
Type OutputType `json:"type"`
// The content of event with type OutputTypeNewRoomEvent
NewRoomEvent *OutputNewRoomEvent `json:"new_room_event,omitempty"`
+ // The content of event with type OutputTypeOldRoomEvent
+ OldRoomEvent *OutputOldRoomEvent `json:"old_room_event,omitempty"`
// The content of event with type OutputTypeNewInviteEvent
NewInviteEvent *OutputNewInviteEvent `json:"new_invite_event,omitempty"`
// The content of event with type OutputTypeRetireInviteEvent
@@ -178,6 +182,20 @@ func (ore *OutputNewRoomEvent) AddsState() []gomatrixserverlib.HeaderedEvent {
return append(ore.AddStateEvents, ore.Event)
}
+// An OutputOldRoomEvent is written when the roomserver receives an old event.
+// This will typically happen as a result of getting either missing events
+// or backfilling. Downstream components may wish to send these events to
+// clients when it is advantageous to do so, but with the consideration that
+// the event is likely a historic event.
+//
+// Old events do not update forward extremities or the current room state,
+// therefore they must not be treated as if they do. Downstream components
+// should build their current room state up from OutputNewRoomEvents only.
+type OutputOldRoomEvent struct {
+ // The Event.
+ Event gomatrixserverlib.HeaderedEvent `json:"event"`
+}
+
// An OutputNewInviteEvent is written whenever an invite becomes active.
// Invite events can be received outside of an existing room so have to be
// tracked separately from the room events themselves.
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go
index a38c00df..9e821910 100644
--- a/roomserver/api/wrapper.go
+++ b/roomserver/api/wrapper.go
@@ -24,13 +24,14 @@ import (
// SendEvents to the roomserver The events are written with KindNew.
func SendEvents(
- ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent,
+ ctx context.Context, rsAPI RoomserverInternalAPI,
+ kind Kind, events []gomatrixserverlib.HeaderedEvent,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) error {
ires := make([]InputRoomEvent, len(events))
for i, event := range events {
ires[i] = InputRoomEvent{
- Kind: KindNew,
+ Kind: kind,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer),
@@ -40,12 +41,13 @@ func SendEvents(
return SendInputRoomEvents(ctx, rsAPI, ires)
}
-// SendEventWithState writes an event with KindNew to the roomserver
+// SendEventWithState writes an event with the specified kind to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs
func SendEventWithState(
- ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState,
- event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
+ ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
+ state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent,
+ haveEventIDs map[string]bool,
) error {
outliers, err := state.Events()
if err != nil {
@@ -70,7 +72,7 @@ func SendEventWithState(
}
ires = append(ires, InputRoomEvent{
- Kind: KindNew,
+ Kind: kind,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
HasState: true,
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 11334159..67031609 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -119,7 +119,7 @@ func (r *Inputer) processRoomEvent(
// We haven't calculated a state for this event yet.
// Lets calculate one.
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
- if err != nil {
+ if err != nil && input.Kind != api.KindOld {
return "", fmt.Errorf("r.calculateAndSetState: %w", err)
}
}
@@ -136,16 +136,31 @@ func (r *Inputer) processRoomEvent(
return event.EventID(), rejectionErr
}
- if err = r.updateLatestEvents(
- ctx, // context
- roomInfo, // room info for the room being updated
- stateAtEvent, // state at event (below)
- event, // event
- input.SendAsServer, // send as server
- input.TransactionID, // transaction ID
- input.HasState, // rewrites state?
- ); err != nil {
- return "", fmt.Errorf("r.updateLatestEvents: %w", err)
+ switch input.Kind {
+ case api.KindNew:
+ if err = r.updateLatestEvents(
+ ctx, // context
+ roomInfo, // room info for the room being updated
+ stateAtEvent, // state at event (below)
+ event, // event
+ input.SendAsServer, // send as server
+ input.TransactionID, // transaction ID
+ input.HasState, // rewrites state?
+ ); err != nil {
+ return "", fmt.Errorf("r.updateLatestEvents: %w", err)
+ }
+ case api.KindOld:
+ err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
+ {
+ Type: api.OutputTypeOldRoomEvent,
+ OldRoomEvent: &api.OutputOldRoomEvent{
+ Event: headered,
+ },
+ },
+ })
+ if err != nil {
+ return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err)
+ }
}
// processing this event resulted in an event (which may not be the one we're processing)
@@ -163,7 +178,7 @@ func (r *Inputer) processRoomEvent(
},
})
if err != nil {
- return "", fmt.Errorf("r.WriteOutputEvents: %w", err)
+ return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
}
}
diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go
index ca5d214d..5adcd087 100644
--- a/roomserver/internal/input/input_latest_events.go
+++ b/roomserver/internal/input/input_latest_events.go
@@ -164,8 +164,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
return fmt.Errorf("u.api.updateMemberships: %w", err)
}
- var update *api.OutputEvent
- update, err = u.makeOutputNewRoomEvent()
+ update, err := u.makeOutputNewRoomEvent()
if err != nil {
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
}
@@ -259,6 +258,8 @@ func (u *latestEventsUpdater) latestState() error {
return nil
}
+// calculateLatest works out the new set of forward extremities. Returns
+// true if the new event is included in those extremites, false otherwise.
func (u *latestEventsUpdater) calculateLatest(
oldLatest []types.StateAtEventAndReference,
newEvent types.StateAtEventAndReference,
@@ -326,7 +327,6 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
if err != nil {
return nil, err
}
-
for _, entry := range u.added {
ore.AddsStateEventIDs = append(ore.AddsStateEventIDs, eventIDMap[entry.EventNID])
}
@@ -339,13 +339,14 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
for _, entry := range u.stateBeforeEventAdds {
ore.StateBeforeAddsEventIDs = append(ore.StateBeforeAddsEventIDs, eventIDMap[entry.EventNID])
}
+
ore.SendAsServer = u.sendAsServer
// include extra state events if they were added as nearly every downstream component will care about it
// and we'd rather not have them all hit QueryEventsByID at the same time!
if len(ore.AddsStateEventIDs) > 0 {
- ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs)
- if err != nil {
+ var err error
+ if ore.AddStateEvents, err = u.extraEventsForIDs(u.roomInfo.RoomVersion, ore.AddsStateEventIDs); err != nil {
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
}
}
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index 1b692a09..c8e60efa 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -191,7 +191,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js
t.Helper()
rsAPI, dp := mustCreateRoomserverAPI(t)
hevents := mustLoadRawEvents(t, ver, events)
- if err := api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil); err != nil {
+ if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil {
t.Errorf("failed to SendEvents: %s", err)
}
return rsAPI, dp, hevents
@@ -337,7 +337,7 @@ func TestOutputRewritesState(t *testing.T) {
deleteDatabase()
rsAPI, producer := mustCreateRoomserverAPI(t)
defer deleteDatabase()
- err := api.SendEvents(context.Background(), rsAPI, originalEvents, testOrigin, nil)
+ err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil)
if err != nil {
t.Fatalf("failed to send original events: %s", err)
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index ca48c830..373baea5 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -97,6 +97,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
}
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
+ case api.OutputTypeOldRoomEvent:
+ return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
@@ -168,7 +170,40 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
log.ErrorKey: err,
"add": msg.AddsStateEventIDs,
"del": msg.RemovesStateEventIDs,
- }).Panicf("roomserver output log: write event failure")
+ }).Panicf("roomserver output log: write new event failure")
+ return nil
+ }
+
+ if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
+ logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
+ return err
+ }
+
+ s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
+
+ return nil
+}
+
+func (s *OutputRoomEventConsumer) onOldRoomEvent(
+ ctx context.Context, msg api.OutputOldRoomEvent,
+) error {
+ ev := msg.Event
+
+ pduPos, err := s.db.WriteEvent(
+ ctx,
+ &ev,
+ []gomatrixserverlib.HeaderedEvent{},
+ []string{}, // adds no state
+ []string{}, // removes no state
+ nil, // no transaction
+ false, // not excluded from sync
+ )
+ if err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write old event failure")
return nil
}
diff --git a/sytest-whitelist b/sytest-whitelist
index 2ba0a88b..2d032331 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -484,3 +484,4 @@ Users cannot kick users who have already left a room
A prev_batch token from incremental sync can be used in the v1 messages API
Event with an invalid signature in the send_join response should not cause room join to fail
Inbound federation rejects typing notifications from wrong remote
+Should not be able to take over the room by pretending there is no PL event