diff options
Diffstat (limited to 'syncapi/consumers')
-rw-r--r-- | syncapi/consumers/clientapi.go | 2 | ||||
-rw-r--r-- | syncapi/consumers/receipts.go (renamed from syncapi/consumers/eduserver_receipts.go) | 24 | ||||
-rw-r--r-- | syncapi/consumers/sendtodevice.go (renamed from syncapi/consumers/eduserver_sendtodevice.go) | 26 | ||||
-rw-r--r-- | syncapi/consumers/typing.go (renamed from syncapi/consumers/eduserver_typing.go) | 51 |
4 files changed, 57 insertions, 46 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 40c1cd3d..c28da460 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -62,7 +62,7 @@ func NewOutputClientDataConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), - durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"), db: store, notifier: notifier, stream: stream, diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/receipts.go index ab79998e..6bb0747f 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/receipts.go @@ -17,11 +17,10 @@ package consumers import ( "context" "database/sql" - "encoding/json" "fmt" + "strconv" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -63,7 +62,7 @@ func NewOutputReceiptEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"), db: store, notifier: notifier, stream: stream, @@ -72,7 +71,7 @@ func NewOutputReceiptEventConsumer( } } -// Start consuming from EDU api +// Start consuming receipts events. func (s *OutputReceiptEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, @@ -81,14 +80,23 @@ func (s *OutputReceiptEventConsumer) Start() error { } func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputReceiptEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { + output := types.OutputReceiptEvent{ + UserID: msg.Header.Get(jetstream.UserID), + RoomID: msg.Header.Get(jetstream.RoomID), + EventID: msg.Header.Get(jetstream.EventID), + Type: msg.Header.Get("type"), + } + + timestamp, err := strconv.Atoi(msg.Header.Get("timestamp")) + if err != nil { // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") + log.WithError(err).Errorf("output log: message parse failure") sentry.CaptureException(err) return true } + output.Timestamp = gomatrixserverlib.Timestamp(timestamp) + streamPos, err := s.db.StoreReceipt( s.ctx, output.RoomID, @@ -117,7 +125,7 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms return true } -func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error { +func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error { if output.Type != "m.read" { return nil } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/sendtodevice.go index bdbe7735..0b9153fc 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/sendtodevice.go @@ -19,7 +19,6 @@ import ( "encoding/json" "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" @@ -58,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"), db: store, serverName: cfg.Matrix.ServerName, notifier: notifier, @@ -66,7 +65,7 @@ func NewOutputSendToDeviceEventConsumer( } } -// Start consuming from EDU api +// Start consuming send-to-device events. func (s *OutputSendToDeviceEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, @@ -75,15 +74,8 @@ func (s *OutputSendToDeviceEventConsumer) Start() error { } func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputSendToDeviceEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return true - } - - _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) + userID := msg.Header.Get(jetstream.UserID) + _, domain, err := gomatrixserverlib.SplitID('@', userID) if err != nil { sentry.CaptureException(err) return true @@ -92,12 +84,20 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *na return true } + var output types.OutputSendToDeviceEvent + if err = json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("output log: message parse failure") + sentry.CaptureException(err) + return true + } + util.GetLogger(context.TODO()).WithFields(log.Fields{ "sender": output.Sender, "user_id": output.UserID, "device_id": output.DeviceID, "event_type": output.Type, - }).Info("sync API received send-to-device event from EDU server") + }).Debugf("sync API received send-to-device event from the clientapi/federationsender") streamPos, err := s.db.StoreNewSendForDeviceMessage( s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent, diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/typing.go index c2828c7f..48e484ec 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/typing.go @@ -16,16 +16,14 @@ package consumers import ( "context" - "encoding/json" + "strconv" + "time" - "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" - "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" @@ -37,7 +35,7 @@ type OutputTypingEventConsumer struct { jetstream nats.JetStreamContext durable string topic string - eduCache *cache.EDUCache + eduCache *caching.EDUCache stream types.StreamProvider notifier *notifier.Notifier } @@ -48,8 +46,7 @@ func NewOutputTypingEventConsumer( process *process.ProcessContext, cfg *config.SyncAPI, js nats.JetStreamContext, - store storage.Database, - eduCache *cache.EDUCache, + eduCache *caching.EDUCache, notifier *notifier.Notifier, stream types.StreamProvider, ) *OutputTypingEventConsumer { @@ -57,14 +54,14 @@ func NewOutputTypingEventConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPITypingConsumer"), eduCache: eduCache, notifier: notifier, stream: stream, } } -// Start consuming from EDU api +// Start consuming typing events. func (s *OutputTypingEventConsumer) Start() error { return jetstream.JetStreamConsumer( s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, @@ -73,34 +70,40 @@ func (s *OutputTypingEventConsumer) Start() error { } func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputTypingEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) + roomID := msg.Header.Get(jetstream.RoomID) + userID := msg.Header.Get(jetstream.UserID) + typing, err := strconv.ParseBool(msg.Header.Get("typing")) + if err != nil { + log.WithError(err).Errorf("output log: typing parse failure") + return true + } + timeout, err := strconv.Atoi(msg.Header.Get("timeout_ms")) + if err != nil { + log.WithError(err).Errorf("output log: timeout_ms parse failure") return true } log.WithFields(log.Fields{ - "room_id": output.Event.RoomID, - "user_id": output.Event.UserID, - "typing": output.Event.Typing, - }).Debug("received data from EDU server") + "room_id": roomID, + "user_id": userID, + "typing": typing, + "timeout": timeout, + }).Debug("syncapi received EDU data from client api") var typingPos types.StreamPosition - typingEvent := output.Event - if typingEvent.Typing { + if typing { + expiry := time.Now().Add(time.Duration(timeout) * time.Millisecond) typingPos = types.StreamPosition( - s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime), + s.eduCache.AddTypingUser(userID, roomID, &expiry), ) } else { typingPos = types.StreamPosition( - s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID), + s.eduCache.RemoveUser(userID, roomID), ) } s.stream.Advance(typingPos) - s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) + s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: typingPos}) return true } |