aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/receipts.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/receipts.go')
-rw-r--r--syncapi/consumers/receipts.go151
1 files changed, 151 insertions, 0 deletions
diff --git a/syncapi/consumers/receipts.go b/syncapi/consumers/receipts.go
new file mode 100644
index 00000000..6bb0747f
--- /dev/null
+++ b/syncapi/consumers/receipts.go
@@ -0,0 +1,151 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "strconv"
+
+ "github.com/getsentry/sentry-go"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
+ log "github.com/sirupsen/logrus"
+)
+
+// OutputReceiptEventConsumer consumes events that originated in the EDU server.
+type OutputReceiptEventConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ stream types.StreamProvider
+ notifier *notifier.Notifier
+ serverName gomatrixserverlib.ServerName
+ producer *producers.UserAPIReadProducer
+}
+
+// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
+// Call Start() to begin consuming from the EDU server.
+func NewOutputReceiptEventConsumer(
+ process *process.ProcessContext,
+ cfg *config.SyncAPI,
+ js nats.JetStreamContext,
+ store storage.Database,
+ notifier *notifier.Notifier,
+ stream types.StreamProvider,
+ producer *producers.UserAPIReadProducer,
+) *OutputReceiptEventConsumer {
+ return &OutputReceiptEventConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"),
+ db: store,
+ notifier: notifier,
+ stream: stream,
+ serverName: cfg.Matrix.ServerName,
+ producer: producer,
+ }
+}
+
+// Start consuming receipts events.
+func (s *OutputReceiptEventConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
+}
+
+func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ output := types.OutputReceiptEvent{
+ UserID: msg.Header.Get(jetstream.UserID),
+ RoomID: msg.Header.Get(jetstream.RoomID),
+ EventID: msg.Header.Get(jetstream.EventID),
+ Type: msg.Header.Get("type"),
+ }
+
+ timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
+ if err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("output log: message parse failure")
+ sentry.CaptureException(err)
+ return true
+ }
+
+ output.Timestamp = gomatrixserverlib.Timestamp(timestamp)
+
+ streamPos, err := s.db.StoreReceipt(
+ s.ctx,
+ output.RoomID,
+ output.Type,
+ output.UserID,
+ output.EventID,
+ output.Timestamp,
+ )
+ if err != nil {
+ sentry.CaptureException(err)
+ return true
+ }
+
+ if err = s.sendReadUpdate(ctx, output); err != nil {
+ log.WithError(err).WithFields(logrus.Fields{
+ "user_id": output.UserID,
+ "room_id": output.RoomID,
+ }).Errorf("Failed to generate read update")
+ sentry.CaptureException(err)
+ return false
+ }
+
+ s.stream.Advance(streamPos)
+ s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
+
+ return true
+}
+
+func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error {
+ if output.Type != "m.read" {
+ return nil
+ }
+ _, serverName, err := gomatrixserverlib.SplitID('@', output.UserID)
+ if err != nil {
+ return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
+ }
+ if serverName != s.serverName {
+ return nil
+ }
+ var readPos types.StreamPosition
+ if output.EventID != "" {
+ if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil && err != sql.ErrNoRows {
+ return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
+ }
+ }
+ if readPos > 0 {
+ if err := s.producer.SendReadUpdate(output.UserID, output.RoomID, readPos, 0); err != nil {
+ return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
+ }
+ }
+ return nil
+}