diff options
Diffstat (limited to 'syncapi/consumers/roomserver.go')
-rw-r--r-- | syncapi/consumers/roomserver.go | 138 |
1 files changed, 66 insertions, 72 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 0d6f528a..5b008e3d 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -19,24 +19,26 @@ import ( "encoding/json" "fmt" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { + ctx context.Context cfg *config.SyncAPI rsAPI api.RoomserverInternalAPI - rsConsumer *internal.ContinualConsumer + jetstream nats.JetStreamContext + topic string db storage.Database pduStream types.StreamProvider inviteStream types.StreamProvider @@ -47,83 +49,83 @@ type OutputRoomEventConsumer struct { func NewOutputRoomEventConsumer( process *process.ProcessContext, cfg *config.SyncAPI, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, notifier *notifier.Notifier, pduStream types.StreamProvider, inviteStream types.StreamProvider, rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { - - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/roomserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), - Consumer: kafkaConsumer, - PartitionStore: store, - } - s := &OutputRoomEventConsumer{ + return &OutputRoomEventConsumer{ + ctx: process.Context(), cfg: cfg, - rsConsumer: &consumer, + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), db: store, notifier: notifier, pduStream: pduStream, inviteStream: inviteStream, rsAPI: rsAPI, } - consumer.ProcessMessage = s.onMessage - - return s } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - return s.rsConsumer.Start() + _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + return err } // onMessage is called when the sync server receives a new event from the room server output log. // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. -func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - // Parse out the event JSON - var output api.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } +func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + // Parse out the event JSON + var err error + var output api.OutputEvent + if err = json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return true + } - switch output.Type { - case api.OutputTypeNewRoomEvent: - // Ignore redaction events. We will add them to the database when they are - // validated (when we receive OutputTypeRedactedEvent) - event := output.NewRoomEvent.Event - if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil { - // in the special case where the event redacts itself, just pass the message through because - // we will never see the other part of the pair - if event.Redacts() != event.EventID() { - return nil + switch output.Type { + case api.OutputTypeNewRoomEvent: + // Ignore redaction events. We will add them to the database when they are + // validated (when we receive OutputTypeRedactedEvent) + event := output.NewRoomEvent.Event + if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil { + // in the special case where the event redacts itself, just pass the message through because + // we will never see the other part of the pair + if event.Redacts() != event.EventID() { + return true + } } + err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent) + case api.OutputTypeOldRoomEvent: + err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent) + case api.OutputTypeNewInviteEvent: + s.onNewInviteEvent(s.ctx, *output.NewInviteEvent) + case api.OutputTypeRetireInviteEvent: + s.onRetireInviteEvent(s.ctx, *output.RetireInviteEvent) + case api.OutputTypeNewPeek: + s.onNewPeek(s.ctx, *output.NewPeek) + case api.OutputTypeRetirePeek: + s.onRetirePeek(s.ctx, *output.RetirePeek) + case api.OutputTypeRedactedEvent: + err = s.onRedactEvent(s.ctx, *output.RedactedEvent) + default: + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) } - return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) - case api.OutputTypeOldRoomEvent: - return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) - case api.OutputTypeNewInviteEvent: - return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) - case api.OutputTypeRetireInviteEvent: - return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) - case api.OutputTypeNewPeek: - return s.onNewPeek(context.TODO(), *output.NewPeek) - case api.OutputTypeRetirePeek: - return s.onRetirePeek(context.TODO(), *output.RetirePeek) - case api.OutputTypeRedactedEvent: - return s.onRedactEvent(context.TODO(), *output.RedactedEvent) - default: - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil - } + if err != nil { + log.WithError(err).Error("roomserver output log: failed to process event") + return false + } + + return true + }) } func (s *OutputRoomEventConsumer) onRedactEvent( @@ -275,12 +277,12 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gom func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, -) error { +) { if msg.Event.StateKey() == nil { log.WithFields(log.Fields{ "event": string(msg.Event.JSON()), }).Panicf("roomserver output log: invite has no state key") - return nil + return } pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) if err != nil { @@ -292,18 +294,16 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( "pdupos": pduPos, log.ErrorKey: err, }).Panicf("roomserver output log: write invite failure") - return nil + return } s.inviteStream.Advance(pduPos) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) - - return nil } func (s *OutputRoomEventConsumer) onRetireInviteEvent( ctx context.Context, msg api.OutputRetireInviteEvent, -) error { +) { pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID) if err != nil { sentry.CaptureException(err) @@ -312,19 +312,17 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( "event_id": msg.EventID, log.ErrorKey: err, }).Panicf("roomserver output log: remove invite failure") - return nil + return } // Notify any active sync requests that the invite has been retired. s.inviteStream.Advance(pduPos) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) - - return nil } func (s *OutputRoomEventConsumer) onNewPeek( ctx context.Context, msg api.OutputNewPeek, -) error { +) { sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) if err != nil { sentry.CaptureException(err) @@ -332,7 +330,7 @@ func (s *OutputRoomEventConsumer) onNewPeek( log.WithFields(log.Fields{ log.ErrorKey: err, }).Panicf("roomserver output log: write peek failure") - return nil + return } // tell the notifier about the new peek so it knows to wake up new devices @@ -340,20 +338,18 @@ func (s *OutputRoomEventConsumer) onNewPeek( // index as PDUs, but we should fix this s.pduStream.Advance(sp) s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) - - return nil } func (s *OutputRoomEventConsumer) onRetirePeek( ctx context.Context, msg api.OutputRetirePeek, -) error { +) { sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ log.ErrorKey: err, }).Panicf("roomserver output log: write peek failure") - return nil + return } // tell the notifier about the new peek so it knows to wake up new devices @@ -361,8 +357,6 @@ func (s *OutputRoomEventConsumer) onRetirePeek( // index as PDUs, but we should fix this s.pduStream.Advance(sp) s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) - - return nil } func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) { |