diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-10-19 14:59:13 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-19 14:59:13 +0100 |
commit | 6e63df1d9a3eadf924d518a1a02f04dfd03ad6b1 (patch) | |
tree | fdfab85a07f37c18b0545f042a8e70dedc1aa75b /syncapi | |
parent | 0974f6e2c055d8d06b5ea9c175252b22b2399fe2 (diff) |
KindOld (#1531)
* Add KindOld
* Don't process latest events/memberships for old events
* Allow federationsender to ignore duplicate key entries when LatestEventIDs is duplicated by RS output events
* Signal to downstream components if an event has become a forward extremity
* Don't exclude from sync
* Soft-fail checks on KindNew
* Don't run the latest events updater at all for KindOld
* Don't make federation sender change after all
* Kind in federation sender join
* Don't send isForwardExtremity
* Fix syncapi
* Update comments
* Fix SendEventWithState
* Update sytest-whitelist
* Generate old output events
* Sync API consumes old room events
* Update comments
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/roomserver.go | 37 |
1 files changed, 36 insertions, 1 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index ca48c830..373baea5 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -97,6 +97,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } } return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) + case api.OutputTypeOldRoomEvent: + return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) case api.OutputTypeNewInviteEvent: return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) case api.OutputTypeRetireInviteEvent: @@ -168,7 +170,40 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( log.ErrorKey: err, "add": msg.AddsStateEventIDs, "del": msg.RemovesStateEventIDs, - }).Panicf("roomserver output log: write event failure") + }).Panicf("roomserver output log: write new event failure") + return nil + } + + if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil { + logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) + return err + } + + s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) + + return nil +} + +func (s *OutputRoomEventConsumer) onOldRoomEvent( + ctx context.Context, msg api.OutputOldRoomEvent, +) error { + ev := msg.Event + + pduPos, err := s.db.WriteEvent( + ctx, + &ev, + []gomatrixserverlib.HeaderedEvent{}, + []string{}, // adds no state + []string{}, // removes no state + nil, // no transaction + false, // not excluded from sync + ) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event": string(ev.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write old event failure") return nil } |