aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/roomserver.go')
-rw-r--r--syncapi/consumers/roomserver.go37
1 files changed, 36 insertions, 1 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index ca48c830..373baea5 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -97,6 +97,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
}
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
+ case api.OutputTypeOldRoomEvent:
+ return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
@@ -168,7 +170,40 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
log.ErrorKey: err,
"add": msg.AddsStateEventIDs,
"del": msg.RemovesStateEventIDs,
- }).Panicf("roomserver output log: write event failure")
+ }).Panicf("roomserver output log: write new event failure")
+ return nil
+ }
+
+ if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
+ logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
+ return err
+ }
+
+ s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
+
+ return nil
+}
+
+func (s *OutputRoomEventConsumer) onOldRoomEvent(
+ ctx context.Context, msg api.OutputOldRoomEvent,
+) error {
+ ev := msg.Event
+
+ pduPos, err := s.db.WriteEvent(
+ ctx,
+ &ev,
+ []gomatrixserverlib.HeaderedEvent{},
+ []string{}, // adds no state
+ []string{}, // removes no state
+ nil, // no transaction
+ false, // not excluded from sync
+ )
+ if err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event": string(ev.JSON()),
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write old event failure")
return nil
}