diff options
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/clientapi.go | 98 | ||||
-rw-r--r-- | syncapi/consumers/eduserver_receipts.go | 92 | ||||
-rw-r--r-- | syncapi/consumers/eduserver_sendtodevice.go | 134 | ||||
-rw-r--r-- | syncapi/consumers/eduserver_typing.go | 106 | ||||
-rw-r--r-- | syncapi/consumers/keychange.go | 9 | ||||
-rw-r--r-- | syncapi/consumers/roomserver.go | 138 | ||||
-rw-r--r-- | syncapi/notifier/notifier_test.go | 10 | ||||
-rw-r--r-- | syncapi/syncapi.go | 19 |
8 files changed, 291 insertions, 315 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index a166ae14..85710cdd 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -18,90 +18,88 @@ import ( "context" "encoding/json" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // OutputClientDataConsumer consumes events that originated in the client API server. type OutputClientDataConsumer struct { - clientAPIConsumer *internal.ContinualConsumer - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + topic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier } // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. func NewOutputClientDataConsumer( process *process.ProcessContext, cfg *config.SyncAPI, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputClientDataConsumer { - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/clientapi", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)), - Consumer: kafkaConsumer, - PartitionStore: store, + return &OutputClientDataConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + db: store, + notifier: notifier, + stream: stream, } - s := &OutputClientDataConsumer{ - clientAPIConsumer: &consumer, - db: store, - notifier: notifier, - stream: stream, - } - consumer.ProcessMessage = s.onMessage - - return s } // Start consuming from room servers func (s *OutputClientDataConsumer) Start() error { - return s.clientAPIConsumer.Start() + _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + return err } // onMessage is called when the sync server receives a new event from the client API server output log. // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. -func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error { - // Parse out the event JSON - var output eventutil.AccountData - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("client API server output log: message parse failure") - sentry.CaptureException(err) - return nil - } +func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + // Parse out the event JSON + userID := msg.Header.Get(jetstream.UserID) + var output eventutil.AccountData + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("client API server output log: message parse failure") + sentry.CaptureException(err) + return true + } - log.WithFields(log.Fields{ - "type": output.Type, - "room_id": output.RoomID, - }).Info("received data from client API server") - - streamPos, err := s.db.UpsertAccountData( - context.TODO(), string(msg.Key), output.RoomID, output.Type, - ) - if err != nil { - sentry.CaptureException(err) log.WithFields(log.Fields{ - "type": output.Type, - "room_id": output.RoomID, - log.ErrorKey: err, - }).Panicf("could not save account data") - } + "type": output.Type, + "room_id": output.RoomID, + }).Info("received data from client API server") + + streamPos, err := s.db.UpsertAccountData( + s.ctx, userID, output.RoomID, output.Type, + ) + if err != nil { + sentry.CaptureException(err) + log.WithFields(log.Fields{ + "type": output.Type, + "room_id": output.RoomID, + log.ErrorKey: err, + }).Panicf("could not save account data") + } - s.stream.Advance(streamPos) - s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos}) + s.stream.Advance(streamPos) + s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos}) - return nil + return true + }) } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index 668f945b..582e1d64 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -18,24 +18,26 @@ import ( "context" "encoding/json" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // OutputReceiptEventConsumer consumes events that originated in the EDU server. type OutputReceiptEventConsumer struct { - receiptConsumer *internal.ContinualConsumer - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + topic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier } // NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. @@ -43,61 +45,53 @@ type OutputReceiptEventConsumer struct { func NewOutputReceiptEventConsumer( process *process.ProcessContext, cfg *config.SyncAPI, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputReceiptEventConsumer { - - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/eduserver/receipt", - Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent), - Consumer: kafkaConsumer, - PartitionStore: store, - } - - s := &OutputReceiptEventConsumer{ - receiptConsumer: &consumer, - db: store, - notifier: notifier, - stream: stream, + return &OutputReceiptEventConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent), + db: store, + notifier: notifier, + stream: stream, } - - consumer.ProcessMessage = s.onMessage - - return s } // Start consuming from EDU api func (s *OutputReceiptEventConsumer) Start() error { - return s.receiptConsumer.Start() + _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + return err } -func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var output api.OutputReceiptEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return nil - } +func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + var output api.OutputReceiptEvent + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("EDU server output log: message parse failure") + sentry.CaptureException(err) + return true + } - streamPos, err := s.db.StoreReceipt( - context.TODO(), - output.RoomID, - output.Type, - output.UserID, - output.EventID, - output.Timestamp, - ) - if err != nil { - sentry.CaptureException(err) - return err - } + streamPos, err := s.db.StoreReceipt( + s.ctx, + output.RoomID, + output.Type, + output.UserID, + output.EventID, + output.Timestamp, + ) + if err != nil { + sentry.CaptureException(err) + return true + } - s.stream.Advance(streamPos) - s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) + s.stream.Advance(streamPos) + s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) - return nil + return true + }) } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 5e626aef..6579c303 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -18,27 +18,29 @@ import ( "context" "encoding/json" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // OutputSendToDeviceEventConsumer consumes events that originated in the EDU server. type OutputSendToDeviceEventConsumer struct { - sendToDeviceConsumer *internal.ContinualConsumer - db storage.Database - serverName gomatrixserverlib.ServerName // our server name - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + topic string + db storage.Database + serverName gomatrixserverlib.ServerName // our server name + stream types.StreamProvider + notifier *notifier.Notifier } // NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer. @@ -46,78 +48,70 @@ type OutputSendToDeviceEventConsumer struct { func NewOutputSendToDeviceEventConsumer( process *process.ProcessContext, cfg *config.SyncAPI, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputSendToDeviceEventConsumer { - - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/eduserver/sendtodevice", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)), - Consumer: kafkaConsumer, - PartitionStore: store, - } - - s := &OutputSendToDeviceEventConsumer{ - sendToDeviceConsumer: &consumer, - db: store, - serverName: cfg.Matrix.ServerName, - notifier: notifier, - stream: stream, + return &OutputSendToDeviceEventConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent), + db: store, + serverName: cfg.Matrix.ServerName, + notifier: notifier, + stream: stream, } - - consumer.ProcessMessage = s.onMessage - - return s } // Start consuming from EDU api func (s *OutputSendToDeviceEventConsumer) Start() error { - return s.sendToDeviceConsumer.Start() + _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + return err } -func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var output api.OutputSendToDeviceEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return err - } - - _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) - if err != nil { - sentry.CaptureException(err) - return err - } - if domain != s.serverName { - return nil - } - - util.GetLogger(context.TODO()).WithFields(log.Fields{ - "sender": output.Sender, - "user_id": output.UserID, - "device_id": output.DeviceID, - "event_type": output.Type, - }).Info("sync API received send-to-device event from EDU server") - - streamPos, err := s.db.StoreNewSendForDeviceMessage( - context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent, - ) - if err != nil { - sentry.CaptureException(err) - log.WithError(err).Errorf("failed to store send-to-device message") - return err - } - - s.stream.Advance(streamPos) - s.notifier.OnNewSendToDevice( - output.UserID, - []string{output.DeviceID}, - types.StreamingToken{SendToDevicePosition: streamPos}, - ) - - return nil +func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + var output api.OutputSendToDeviceEvent + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("EDU server output log: message parse failure") + sentry.CaptureException(err) + return true + } + + _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) + if err != nil { + sentry.CaptureException(err) + return true + } + if domain != s.serverName { + return true + } + + util.GetLogger(context.TODO()).WithFields(log.Fields{ + "sender": output.Sender, + "user_id": output.UserID, + "device_id": output.DeviceID, + "event_type": output.Type, + }).Info("sync API received send-to-device event from EDU server") + + streamPos, err := s.db.StoreNewSendForDeviceMessage( + s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent, + ) + if err != nil { + sentry.CaptureException(err) + log.WithError(err).Errorf("failed to store send-to-device message") + return false + } + + s.stream.Advance(streamPos) + s.notifier.OnNewSendToDevice( + output.UserID, + []string{output.DeviceID}, + types.StreamingToken{SendToDevicePosition: streamPos}, + ) + + return true + }) } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index 8d06e3ca..487befe8 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -15,27 +15,30 @@ package consumers import ( + "context" "encoding/json" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // OutputTypingEventConsumer consumes events that originated in the EDU server. type OutputTypingEventConsumer struct { - typingConsumer *internal.ContinualConsumer - eduCache *cache.EDUCache - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + topic string + eduCache *cache.EDUCache + stream types.StreamProvider + notifier *notifier.Notifier } // NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. @@ -43,72 +46,59 @@ type OutputTypingEventConsumer struct { func NewOutputTypingEventConsumer( process *process.ProcessContext, cfg *config.SyncAPI, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, eduCache *cache.EDUCache, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputTypingEventConsumer { - - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/eduserver/typing", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)), - Consumer: kafkaConsumer, - PartitionStore: store, - } - - s := &OutputTypingEventConsumer{ - typingConsumer: &consumer, - eduCache: eduCache, - notifier: notifier, - stream: stream, + return &OutputTypingEventConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent), + eduCache: eduCache, + notifier: notifier, + stream: stream, } - - consumer.ProcessMessage = s.onMessage - - return s } // Start consuming from EDU api func (s *OutputTypingEventConsumer) Start() error { - s.eduCache.SetTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) { - pos := types.StreamPosition(latestSyncPosition) - s.stream.Advance(pos) - s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: pos}) - }) - return s.typingConsumer.Start() + _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + return err } -func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - var output api.OutputTypingEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return nil - } +func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + var output api.OutputTypingEvent + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("EDU server output log: message parse failure") + sentry.CaptureException(err) + return true + } - log.WithFields(log.Fields{ - "room_id": output.Event.RoomID, - "user_id": output.Event.UserID, - "typing": output.Event.Typing, - }).Debug("received data from EDU server") + log.WithFields(log.Fields{ + "room_id": output.Event.RoomID, + "user_id": output.Event.UserID, + "typing": output.Event.Typing, + }).Debug("received data from EDU server") - var typingPos types.StreamPosition - typingEvent := output.Event - if typingEvent.Typing { - typingPos = types.StreamPosition( - s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime), - ) - } else { - typingPos = types.StreamPosition( - s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID), - ) - } + var typingPos types.StreamPosition + typingEvent := output.Event + if typingEvent.Typing { + typingPos = types.StreamPosition( + s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime), + ) + } else { + typingPos = types.StreamPosition( + s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID), + ) + } - s.stream.Advance(typingPos) - s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) + s.stream.Advance(typingPos) + s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) - return nil + return true + }) } diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index dfedc640..76b143d8 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -34,6 +34,7 @@ import ( // OutputKeyChangeEventConsumer consumes events that originated in the key server. type OutputKeyChangeEventConsumer struct { + ctx context.Context keyChangeConsumer *internal.ContinualConsumer db storage.Database notifier *notifier.Notifier @@ -68,6 +69,7 @@ func NewOutputKeyChangeEventConsumer( } s := &OutputKeyChangeEventConsumer{ + ctx: process.Context(), keyChangeConsumer: &consumer, db: store, serverName: serverName, @@ -125,10 +127,13 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er } func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error { + if m.DeviceKeys == nil { + return nil + } output := m.DeviceKeys // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse - err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ + err := s.rsAPI.QuerySharedUsers(s.ctx, &roomserverAPI.QuerySharedUsersRequest{ UserID: output.UserID, }, &queryRes) if err != nil { @@ -155,7 +160,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage output := m.CrossSigningKeyUpdate // work out who we need to notify about the new key var queryRes roomserverAPI.QuerySharedUsersResponse - err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{ + err := s.rsAPI.QuerySharedUsers(s.ctx, &roomserverAPI.QuerySharedUsersRequest{ UserID: output.UserID, }, &queryRes) if err != nil { diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 0d6f528a..5b008e3d 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -19,24 +19,26 @@ import ( "encoding/json" "fmt" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" ) // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { + ctx context.Context cfg *config.SyncAPI rsAPI api.RoomserverInternalAPI - rsConsumer *internal.ContinualConsumer + jetstream nats.JetStreamContext + topic string db storage.Database pduStream types.StreamProvider inviteStream types.StreamProvider @@ -47,83 +49,83 @@ type OutputRoomEventConsumer struct { func NewOutputRoomEventConsumer( process *process.ProcessContext, cfg *config.SyncAPI, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, store storage.Database, notifier *notifier.Notifier, pduStream types.StreamProvider, inviteStream types.StreamProvider, rsAPI api.RoomserverInternalAPI, ) *OutputRoomEventConsumer { - - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "syncapi/roomserver", - Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), - Consumer: kafkaConsumer, - PartitionStore: store, - } - s := &OutputRoomEventConsumer{ + return &OutputRoomEventConsumer{ + ctx: process.Context(), cfg: cfg, - rsConsumer: &consumer, + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), db: store, notifier: notifier, pduStream: pduStream, inviteStream: inviteStream, rsAPI: rsAPI, } - consumer.ProcessMessage = s.onMessage - - return s } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - return s.rsConsumer.Start() + _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + return err } // onMessage is called when the sync server receives a new event from the room server output log. // It is not safe for this function to be called from multiple goroutines, or else the // sync stream position may race and be incorrectly calculated. -func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - // Parse out the event JSON - var output api.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } +func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + // Parse out the event JSON + var err error + var output api.OutputEvent + if err = json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return true + } - switch output.Type { - case api.OutputTypeNewRoomEvent: - // Ignore redaction events. We will add them to the database when they are - // validated (when we receive OutputTypeRedactedEvent) - event := output.NewRoomEvent.Event - if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil { - // in the special case where the event redacts itself, just pass the message through because - // we will never see the other part of the pair - if event.Redacts() != event.EventID() { - return nil + switch output.Type { + case api.OutputTypeNewRoomEvent: + // Ignore redaction events. We will add them to the database when they are + // validated (when we receive OutputTypeRedactedEvent) + event := output.NewRoomEvent.Event + if event.Type() == gomatrixserverlib.MRoomRedaction && event.StateKey() == nil { + // in the special case where the event redacts itself, just pass the message through because + // we will never see the other part of the pair + if event.Redacts() != event.EventID() { + return true + } } + err = s.onNewRoomEvent(s.ctx, *output.NewRoomEvent) + case api.OutputTypeOldRoomEvent: + err = s.onOldRoomEvent(s.ctx, *output.OldRoomEvent) + case api.OutputTypeNewInviteEvent: + s.onNewInviteEvent(s.ctx, *output.NewInviteEvent) + case api.OutputTypeRetireInviteEvent: + s.onRetireInviteEvent(s.ctx, *output.RetireInviteEvent) + case api.OutputTypeNewPeek: + s.onNewPeek(s.ctx, *output.NewPeek) + case api.OutputTypeRetirePeek: + s.onRetirePeek(s.ctx, *output.RetirePeek) + case api.OutputTypeRedactedEvent: + err = s.onRedactEvent(s.ctx, *output.RedactedEvent) + default: + log.WithField("type", output.Type).Debug( + "roomserver output log: ignoring unknown output type", + ) } - return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent) - case api.OutputTypeOldRoomEvent: - return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent) - case api.OutputTypeNewInviteEvent: - return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent) - case api.OutputTypeRetireInviteEvent: - return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) - case api.OutputTypeNewPeek: - return s.onNewPeek(context.TODO(), *output.NewPeek) - case api.OutputTypeRetirePeek: - return s.onRetirePeek(context.TODO(), *output.RetirePeek) - case api.OutputTypeRedactedEvent: - return s.onRedactEvent(context.TODO(), *output.RedactedEvent) - default: - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) - return nil - } + if err != nil { + log.WithError(err).Error("roomserver output log: failed to process event") + return false + } + + return true + }) } func (s *OutputRoomEventConsumer) onRedactEvent( @@ -275,12 +277,12 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gom func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, -) error { +) { if msg.Event.StateKey() == nil { log.WithFields(log.Fields{ "event": string(msg.Event.JSON()), }).Panicf("roomserver output log: invite has no state key") - return nil + return } pduPos, err := s.db.AddInviteEvent(ctx, msg.Event) if err != nil { @@ -292,18 +294,16 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent( "pdupos": pduPos, log.ErrorKey: err, }).Panicf("roomserver output log: write invite failure") - return nil + return } s.inviteStream.Advance(pduPos) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey()) - - return nil } func (s *OutputRoomEventConsumer) onRetireInviteEvent( ctx context.Context, msg api.OutputRetireInviteEvent, -) error { +) { pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID) if err != nil { sentry.CaptureException(err) @@ -312,19 +312,17 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent( "event_id": msg.EventID, log.ErrorKey: err, }).Panicf("roomserver output log: remove invite failure") - return nil + return } // Notify any active sync requests that the invite has been retired. s.inviteStream.Advance(pduPos) s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID) - - return nil } func (s *OutputRoomEventConsumer) onNewPeek( ctx context.Context, msg api.OutputNewPeek, -) error { +) { sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) if err != nil { sentry.CaptureException(err) @@ -332,7 +330,7 @@ func (s *OutputRoomEventConsumer) onNewPeek( log.WithFields(log.Fields{ log.ErrorKey: err, }).Panicf("roomserver output log: write peek failure") - return nil + return } // tell the notifier about the new peek so it knows to wake up new devices @@ -340,20 +338,18 @@ func (s *OutputRoomEventConsumer) onNewPeek( // index as PDUs, but we should fix this s.pduStream.Advance(sp) s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) - - return nil } func (s *OutputRoomEventConsumer) onRetirePeek( ctx context.Context, msg api.OutputRetirePeek, -) error { +) { sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) if err != nil { // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ log.ErrorKey: err, }).Panicf("roomserver output log: write peek failure") - return nil + return } // tell the notifier about the new peek so it knows to wake up new devices @@ -361,8 +357,6 @@ func (s *OutputRoomEventConsumer) onRetirePeek( // index as PDUs, but we should fix this s.pduStream.Advance(sp) s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp}) - - return nil } func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) { diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go index 1401fc67..c6d3df7e 100644 --- a/syncapi/notifier/notifier_test.go +++ b/syncapi/notifier/notifier_test.go @@ -127,7 +127,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) { go func() { pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) if err != nil { - t.Errorf("TestNewEventAndJoinedToRoom error: %w", err) + t.Errorf("TestNewEventAndJoinedToRoom error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter) wg.Done() @@ -190,7 +190,7 @@ func TestNewInviteEventForUser(t *testing.T) { go func() { pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) if err != nil { - t.Errorf("TestNewInviteEventForUser error: %w", err) + t.Errorf("TestNewInviteEventForUser error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter) wg.Done() @@ -246,7 +246,7 @@ func TestMultipleRequestWakeup(t *testing.T) { poll := func() { pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) if err != nil { - t.Errorf("TestMultipleRequestWakeup error: %w", err) + t.Errorf("TestMultipleRequestWakeup error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter) wg.Done() @@ -284,7 +284,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { go func() { pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) if err != nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter) leaveWG.Done() @@ -301,7 +301,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { go func() { pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter)) if err != nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) + t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err) } mustEqualPositions(t, pos, syncPositionAfter2) aliceWG.Done() diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index 84c7140c..39bc233a 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -24,7 +24,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -48,7 +48,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, cfg *config.SyncAPI, ) { - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream) syncDB, err := storage.NewSyncServerDatasource(&cfg.Database) if err != nil { @@ -65,15 +65,16 @@ func AddPublicRoutes( requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier) keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( - process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), - consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider, + process, cfg.Matrix.ServerName, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), + consumer, keyAPI, rsAPI, syncDB, notifier, + streams.DeviceListStreamProvider, ) if err = keyChangeConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start key change consumer") } roomConsumer := consumers.NewOutputRoomEventConsumer( - process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider, + process, cfg, js, syncDB, notifier, streams.PDUStreamProvider, streams.InviteStreamProvider, rsAPI, ) if err = roomConsumer.Start(); err != nil { @@ -81,28 +82,28 @@ func AddPublicRoutes( } clientConsumer := consumers.NewOutputClientDataConsumer( - process, cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider, + process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider, ) if err = clientConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start client data consumer") } typingConsumer := consumers.NewOutputTypingEventConsumer( - process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider, + process, cfg, js, syncDB, eduCache, notifier, streams.TypingStreamProvider, ) if err = typingConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start typing consumer") } sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer( - process, cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider, + process, cfg, js, syncDB, notifier, streams.SendToDeviceStreamProvider, ) if err = sendToDeviceConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start send-to-device consumer") } receiptConsumer := consumers.NewOutputReceiptEventConsumer( - process, cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider, + process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider, ) if err = receiptConsumer.Start(); err != nil { logrus.WithError(err).Panicf("failed to start receipts consumer") |