diff options
Diffstat (limited to 'syncapi/consumers/clientapi.go')
-rw-r--r-- | syncapi/consumers/clientapi.go | 77 |
1 files changed, 63 insertions, 14 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index c3650085..f01afce6 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -17,6 +17,7 @@ package consumers import ( "context" "encoding/json" + "fmt" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/internal/eventutil" @@ -24,21 +25,26 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" + "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) // OutputClientDataConsumer consumes events that originated in the client API server. type OutputClientDataConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - db storage.Database - stream types.StreamProvider - notifier *notifier.Notifier + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db storage.Database + stream types.StreamProvider + notifier *notifier.Notifier + serverName gomatrixserverlib.ServerName + producer *producers.UserAPIReadProducer } // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. @@ -49,15 +55,18 @@ func NewOutputClientDataConsumer( store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, + producer *producers.UserAPIReadProducer, ) *OutputClientDataConsumer { return &OutputClientDataConsumer{ - ctx: process.Context(), - jetstream: js, - topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), - durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), - db: store, - notifier: notifier, - stream: stream, + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData), + durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"), + db: store, + notifier: notifier, + stream: stream, + serverName: cfg.Matrix.ServerName, + producer: producer, } } @@ -100,8 +109,48 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) }).Panicf("could not save account data") } + if err = s.sendReadUpdate(ctx, userID, output); err != nil { + log.WithError(err).WithFields(logrus.Fields{ + "user_id": userID, + "room_id": output.RoomID, + }).Errorf("Failed to generate read update") + sentry.CaptureException(err) + return false + } + s.stream.Advance(streamPos) s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos}) return true } + +func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error { + if output.Type != "m.fully_read" || output.ReadMarker == nil { + return nil + } + _, serverName, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) + } + if serverName != s.serverName { + return nil + } + var readPos types.StreamPosition + var fullyReadPos types.StreamPosition + if output.ReadMarker.Read != "" { + if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil { + return fmt.Errorf("s.db.PositionInTopology (Read): %w", err) + } + } + if output.ReadMarker.FullyRead != "" { + if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil { + return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err) + } + } + if readPos > 0 || fullyReadPos > 0 { + if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil { + return fmt.Errorf("s.producer.SendReadUpdate: %w", err) + } + } + return nil +} |