From 49dc49b232432d52c082646cc6f778593f4cb8b4 Mon Sep 17 00:00:00 2001 From: S7evinK <2353100+S7evinK@users.noreply.github.com> Date: Tue, 29 Mar 2022 14:14:35 +0200 Subject: Remove eduserver (#2306) * Move receipt sending to own JetStream producer * Move SendToDevice to producer * Remove most parts of the EDU server * Fix SendToDevice & copyrights * Move structs, cleanup EDU Server traces * Use HeadersOnly subscription * Missing file * Fix linter issues * Move consumers to own files * Rename durable consumer; Consumer cleanup * Docs/config cleanup --- syncapi/consumers/clientapi.go | 2 +- syncapi/consumers/eduserver_receipts.go | 143 -------------------------- syncapi/consumers/eduserver_sendtodevice.go | 119 ---------------------- syncapi/consumers/eduserver_typing.go | 106 ------------------- syncapi/consumers/receipts.go | 151 ++++++++++++++++++++++++++++ syncapi/consumers/sendtodevice.go | 119 ++++++++++++++++++++++ syncapi/consumers/typing.go | 109 ++++++++++++++++++++ 7 files changed, 380 insertions(+), 369 deletions(-) delete mode 100644 syncapi/consumers/eduserver_receipts.go delete mode 100644 syncapi/consumers/eduserver_sendtodevice.go delete mode 100644 syncapi/consumers/eduserver_typing.go create mode 100644 syncapi/consumers/receipts.go create mode 100644 syncapi/consumers/sendtodevice.go create mode 100644 syncapi/consumers/typing.go (limited to 'syncapi/consumers') diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index 40c1cd3d..c28da460 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -62,7 +62,7 @@ func NewOutputClientDataConsumer( ctx: process.Context(), jetstream: js, topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData), - durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), + durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"), db: store, notifier: notifier, stream: stream, diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go deleted file mode 100644 index ab79998e..00000000 --- a/syncapi/consumers/eduserver_receipts.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - - "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/dendrite/syncapi/notifier" - "github.com/matrix-org/dendrite/syncapi/producers" - "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" - log "github.com/sirupsen/logrus" -) - -// OutputReceiptEventConsumer consumes events that originated in the EDU server. -type OutputReceiptEventConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier - serverName gomatrixserverlib.ServerName - producer *producers.UserAPIReadProducer -} - -// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. -// Call Start() to begin consuming from the EDU server. -func NewOutputReceiptEventConsumer( - process *process.ProcessContext, - cfg *config.SyncAPI, - js nats.JetStreamContext, - store storage.Database, - notifier *notifier.Notifier, - stream types.StreamProvider, - producer *producers.UserAPIReadProducer, -) *OutputReceiptEventConsumer { - return &OutputReceiptEventConsumer{ - ctx: process.Context(), - jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"), - db: store, - notifier: notifier, - stream: stream, - serverName: cfg.Matrix.ServerName, - producer: producer, - } -} - -// Start consuming from EDU api -func (s *OutputReceiptEventConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ) -} - -func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputReceiptEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return true - } - - streamPos, err := s.db.StoreReceipt( - s.ctx, - output.RoomID, - output.Type, - output.UserID, - output.EventID, - output.Timestamp, - ) - if err != nil { - sentry.CaptureException(err) - return true - } - - if err = s.sendReadUpdate(ctx, output); err != nil { - log.WithError(err).WithFields(logrus.Fields{ - "user_id": output.UserID, - "room_id": output.RoomID, - }).Errorf("Failed to generate read update") - sentry.CaptureException(err) - return false - } - - s.stream.Advance(streamPos) - s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) - - return true -} - -func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error { - if output.Type != "m.read" { - return nil - } - _, serverName, err := gomatrixserverlib.SplitID('@', output.UserID) - if err != nil { - return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) - } - if serverName != s.serverName { - return nil - } - var readPos types.StreamPosition - if output.EventID != "" { - if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil && err != sql.ErrNoRows { - return fmt.Errorf("s.db.PositionInTopology (Read): %w", err) - } - } - if readPos > 0 { - if err := s.producer.SendReadUpdate(output.UserID, output.RoomID, readPos, 0); err != nil { - return fmt.Errorf("s.producer.SendReadUpdate: %w", err) - } - } - return nil -} diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go deleted file mode 100644 index bdbe7735..00000000 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/dendrite/syncapi/notifier" - "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" - "github.com/nats-io/nats.go" - log "github.com/sirupsen/logrus" -) - -// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server. -type OutputSendToDeviceEventConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - db storage.Database - serverName gomatrixserverlib.ServerName // our server name - stream types.StreamProvider - notifier *notifier.Notifier -} - -// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer. -// Call Start() to begin consuming from the EDU server. -func NewOutputSendToDeviceEventConsumer( - process *process.ProcessContext, - cfg *config.SyncAPI, - js nats.JetStreamContext, - store storage.Database, - notifier *notifier.Notifier, - stream types.StreamProvider, -) *OutputSendToDeviceEventConsumer { - return &OutputSendToDeviceEventConsumer{ - ctx: process.Context(), - jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"), - db: store, - serverName: cfg.Matrix.ServerName, - notifier: notifier, - stream: stream, - } -} - -// Start consuming from EDU api -func (s *OutputSendToDeviceEventConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ) -} - -func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputSendToDeviceEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return true - } - - _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) - if err != nil { - sentry.CaptureException(err) - return true - } - if domain != s.serverName { - return true - } - - util.GetLogger(context.TODO()).WithFields(log.Fields{ - "sender": output.Sender, - "user_id": output.UserID, - "device_id": output.DeviceID, - "event_type": output.Type, - }).Info("sync API received send-to-device event from EDU server") - - streamPos, err := s.db.StoreNewSendForDeviceMessage( - s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent, - ) - if err != nil { - sentry.CaptureException(err) - log.WithError(err).Errorf("failed to store send-to-device message") - return false - } - - s.stream.Advance(streamPos) - s.notifier.OnNewSendToDevice( - output.UserID, - []string{output.DeviceID}, - types.StreamingToken{SendToDevicePosition: streamPos}, - ) - - return true -} diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go deleted file mode 100644 index c2828c7f..00000000 --- a/syncapi/consumers/eduserver_typing.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2019 Alex Chen -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/getsentry/sentry-go" - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/dendrite/syncapi/notifier" - "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/nats-io/nats.go" - log "github.com/sirupsen/logrus" -) - -// OutputTypingEventConsumer consumes events that originated in the EDU server. -type OutputTypingEventConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - eduCache *cache.EDUCache - stream types.StreamProvider - notifier *notifier.Notifier -} - -// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. -// Call Start() to begin consuming from the EDU server. -func NewOutputTypingEventConsumer( - process *process.ProcessContext, - cfg *config.SyncAPI, - js nats.JetStreamContext, - store storage.Database, - eduCache *cache.EDUCache, - notifier *notifier.Notifier, - stream types.StreamProvider, -) *OutputTypingEventConsumer { - return &OutputTypingEventConsumer{ - ctx: process.Context(), - jetstream: js, - topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), - durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"), - eduCache: eduCache, - notifier: notifier, - stream: stream, - } -} - -// Start consuming from EDU api -func (s *OutputTypingEventConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ) -} - -func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var output api.OutputTypingEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("EDU server output log: message parse failure") - sentry.CaptureException(err) - return true - } - - log.WithFields(log.Fields{ - "room_id": output.Event.RoomID, - "user_id": output.Event.UserID, - "typing": output.Event.Typing, - }).Debug("received data from EDU server") - - var typingPos types.StreamPosition - typingEvent := output.Event - if typingEvent.Typing { - typingPos = types.StreamPosition( - s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime), - ) - } else { - typingPos = types.StreamPosition( - s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID), - ) - } - - s.stream.Advance(typingPos) - s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) - - return true -} diff --git a/syncapi/consumers/receipts.go b/syncapi/consumers/receipts.go new file mode 100644 index 00000000..6bb0747f --- /dev/null +++ b/syncapi/consumers/receipts.go @@ -0,0 +1,151 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "database/sql" + "fmt" + "strconv" + + "github.com/getsentry/sentry-go" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" + log "github.com/sirupsen/logrus" +) + +// OutputReceiptEventConsumer consumes events that originated in the EDU server. +type OutputReceiptEventConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier + serverName gomatrixserverlib.ServerName + producer *producers.UserAPIReadProducer +} + +// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. +// Call Start() to begin consuming from the EDU server. +func NewOutputReceiptEventConsumer( + process *process.ProcessContext, + cfg *config.SyncAPI, + js nats.JetStreamContext, + store storage.Database, + notifier *notifier.Notifier, + stream types.StreamProvider, + producer *producers.UserAPIReadProducer, +) *OutputReceiptEventConsumer { + return &OutputReceiptEventConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"), + db: store, + notifier: notifier, + stream: stream, + serverName: cfg.Matrix.ServerName, + producer: producer, + } +} + +// Start consuming receipts events. +func (s *OutputReceiptEventConsumer) Start() error { + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) +} + +func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + output := types.OutputReceiptEvent{ + UserID: msg.Header.Get(jetstream.UserID), + RoomID: msg.Header.Get(jetstream.RoomID), + EventID: msg.Header.Get(jetstream.EventID), + Type: msg.Header.Get("type"), + } + + timestamp, err := strconv.Atoi(msg.Header.Get("timestamp")) + if err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("output log: message parse failure") + sentry.CaptureException(err) + return true + } + + output.Timestamp = gomatrixserverlib.Timestamp(timestamp) + + streamPos, err := s.db.StoreReceipt( + s.ctx, + output.RoomID, + output.Type, + output.UserID, + output.EventID, + output.Timestamp, + ) + if err != nil { + sentry.CaptureException(err) + return true + } + + if err = s.sendReadUpdate(ctx, output); err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "user_id": output.UserID, + "room_id": output.RoomID, + }).Errorf("Failed to generate read update") + sentry.CaptureException(err) + return false + } + + s.stream.Advance(streamPos) + s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) + + return true +} + +func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error { + if output.Type != "m.read" { + return nil + } + _, serverName, err := gomatrixserverlib.SplitID('@', output.UserID) + if err != nil { + return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) + } + if serverName != s.serverName { + return nil + } + var readPos types.StreamPosition + if output.EventID != "" { + if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil && err != sql.ErrNoRows { + return fmt.Errorf("s.db.PositionInTopology (Read): %w", err) + } + } + if readPos > 0 { + if err := s.producer.SendReadUpdate(output.UserID, output.RoomID, readPos, 0); err != nil { + return fmt.Errorf("s.producer.SendReadUpdate: %w", err) + } + } + return nil +} diff --git a/syncapi/consumers/sendtodevice.go b/syncapi/consumers/sendtodevice.go new file mode 100644 index 00000000..0b9153fc --- /dev/null +++ b/syncapi/consumers/sendtodevice.go @@ -0,0 +1,119 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "encoding/json" + + "github.com/getsentry/sentry-go" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server. +type OutputSendToDeviceEventConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db storage.Database + serverName gomatrixserverlib.ServerName // our server name + stream types.StreamProvider + notifier *notifier.Notifier +} + +// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer. +// Call Start() to begin consuming from the EDU server. +func NewOutputSendToDeviceEventConsumer( + process *process.ProcessContext, + cfg *config.SyncAPI, + js nats.JetStreamContext, + store storage.Database, + notifier *notifier.Notifier, + stream types.StreamProvider, +) *OutputSendToDeviceEventConsumer { + return &OutputSendToDeviceEventConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"), + db: store, + serverName: cfg.Matrix.ServerName, + notifier: notifier, + stream: stream, + } +} + +// Start consuming send-to-device events. +func (s *OutputSendToDeviceEventConsumer) Start() error { + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) +} + +func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + userID := msg.Header.Get(jetstream.UserID) + _, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + sentry.CaptureException(err) + return true + } + if domain != s.serverName { + return true + } + + var output types.OutputSendToDeviceEvent + if err = json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("output log: message parse failure") + sentry.CaptureException(err) + return true + } + + util.GetLogger(context.TODO()).WithFields(log.Fields{ + "sender": output.Sender, + "user_id": output.UserID, + "device_id": output.DeviceID, + "event_type": output.Type, + }).Debugf("sync API received send-to-device event from the clientapi/federationsender") + + streamPos, err := s.db.StoreNewSendForDeviceMessage( + s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent, + ) + if err != nil { + sentry.CaptureException(err) + log.WithError(err).Errorf("failed to store send-to-device message") + return false + } + + s.stream.Advance(streamPos) + s.notifier.OnNewSendToDevice( + output.UserID, + []string{output.DeviceID}, + types.StreamingToken{SendToDevicePosition: streamPos}, + ) + + return true +} diff --git a/syncapi/consumers/typing.go b/syncapi/consumers/typing.go new file mode 100644 index 00000000..48e484ec --- /dev/null +++ b/syncapi/consumers/typing.go @@ -0,0 +1,109 @@ +// Copyright 2019 Alex Chen +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + "strconv" + "time" + + "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" +) + +// OutputTypingEventConsumer consumes events that originated in the EDU server. +type OutputTypingEventConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + eduCache *caching.EDUCache + stream types.StreamProvider + notifier *notifier.Notifier +} + +// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer. +// Call Start() to begin consuming from the EDU server. +func NewOutputTypingEventConsumer( + process *process.ProcessContext, + cfg *config.SyncAPI, + js nats.JetStreamContext, + eduCache *caching.EDUCache, + notifier *notifier.Notifier, + stream types.StreamProvider, +) *OutputTypingEventConsumer { + return &OutputTypingEventConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent), + durable: cfg.Matrix.JetStream.Durable("SyncAPITypingConsumer"), + eduCache: eduCache, + notifier: notifier, + stream: stream, + } +} + +// Start consuming typing events. +func (s *OutputTypingEventConsumer) Start() error { + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) +} + +func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + roomID := msg.Header.Get(jetstream.RoomID) + userID := msg.Header.Get(jetstream.UserID) + typing, err := strconv.ParseBool(msg.Header.Get("typing")) + if err != nil { + log.WithError(err).Errorf("output log: typing parse failure") + return true + } + timeout, err := strconv.Atoi(msg.Header.Get("timeout_ms")) + if err != nil { + log.WithError(err).Errorf("output log: timeout_ms parse failure") + return true + } + + log.WithFields(log.Fields{ + "room_id": roomID, + "user_id": userID, + "typing": typing, + "timeout": timeout, + }).Debug("syncapi received EDU data from client api") + + var typingPos types.StreamPosition + if typing { + expiry := time.Now().Add(time.Duration(timeout) * time.Millisecond) + typingPos = types.StreamPosition( + s.eduCache.AddTypingUser(userID, roomID, &expiry), + ) + } else { + typingPos = types.StreamPosition( + s.eduCache.RemoveUser(userID, roomID), + ) + } + + s.stream.Advance(typingPos) + s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: typingPos}) + + return true +} -- cgit v1.2.3