aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-10-19 14:59:13 +0100
committerGitHub <noreply@github.com>2020-10-19 14:59:13 +0100
commit6e63df1d9a3eadf924d518a1a02f04dfd03ad6b1 (patch)
treefdfab85a07f37c18b0545f042a8e70dedc1aa75b /syncapi
parent0974f6e2c055d8d06b5ea9c175252b22b2399fe2 (diff)
KindOld (#1531)
* Add KindOld * Don't process latest events/memberships for old events * Allow federationsender to ignore duplicate key entries when LatestEventIDs is duplicated by RS output events * Signal to downstream components if an event has become a forward extremity * Don't exclude from sync * Soft-fail checks on KindNew * Don't run the latest events updater at all for KindOld * Don't make federation sender change after all * Kind in federation sender join * Don't send isForwardExtremity * Fix syncapi * Update comments * Fix SendEventWithState * Update sytest-whitelist * Generate old output events * Sync API consumes old room events * Update comments
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go37
1 files changed, 36 insertions, 1 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index ca48c830..373baea5 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -97,6 +97,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
}
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
+ case api.OutputTypeOldRoomEvent:
+ return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
@@ -168,7 +170,40 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
log.ErrorKey: err,
"add": msg.AddsStateEventIDs,
"del": msg.RemovesStateEventIDs,
- }).Panicf("roomserver output log: write event failure")
+ }).Panicf("roomserver output log: write new event failure")
+ return nil
+ }
+
+ if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
+ logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
+ return err
+ }
+
+ s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
+
+ return nil
+}
+
+func (s *OutputRoomEventConsumer) onOldRoomEvent(
+ ctx context.Context, msg api.OutputOldRoomEvent,
+) error {
+ ev := msg.Event
+
+ pduPos, err := s.db.WriteEvent(
+ ctx,
+ &ev,
+ []gomatrixserverlib.HeaderedEvent{},
+ []string{}, // adds no state
+ []string{}, // removes no state
+ nil, // no transaction
+ false, // not excluded from sync
+ )
+ if err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write old event failure")
return nil
}