aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/clientapi.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/clientapi.go')
-rw-r--r--syncapi/consumers/clientapi.go77
1 files changed, 63 insertions, 14 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index c3650085..f01afce6 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -17,6 +17,7 @@ package consumers
import (
"context"
"encoding/json"
+ "fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal/eventutil"
@@ -24,21 +25,26 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- db storage.Database
- stream types.StreamProvider
- notifier *notifier.Notifier
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ stream types.StreamProvider
+ notifier *notifier.Notifier
+ serverName gomatrixserverlib.ServerName
+ producer *producers.UserAPIReadProducer
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@@ -49,15 +55,18 @@ func NewOutputClientDataConsumer(
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
+ producer *producers.UserAPIReadProducer,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
- ctx: process.Context(),
- jetstream: js,
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
- db: store,
- notifier: notifier,
- stream: stream,
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
+ db: store,
+ notifier: notifier,
+ stream: stream,
+ serverName: cfg.Matrix.ServerName,
+ producer: producer,
}
}
@@ -100,8 +109,48 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg)
}).Panicf("could not save account data")
}
+ if err = s.sendReadUpdate(ctx, userID, output); err != nil {
+ log.WithError(err).WithFields(logrus.Fields{
+ "user_id": userID,
+ "room_id": output.RoomID,
+ }).Errorf("Failed to generate read update")
+ sentry.CaptureException(err)
+ return false
+ }
+
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
return true
}
+
+func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error {
+ if output.Type != "m.fully_read" || output.ReadMarker == nil {
+ return nil
+ }
+ _, serverName, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
+ }
+ if serverName != s.serverName {
+ return nil
+ }
+ var readPos types.StreamPosition
+ var fullyReadPos types.StreamPosition
+ if output.ReadMarker.Read != "" {
+ if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil {
+ return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
+ }
+ }
+ if output.ReadMarker.FullyRead != "" {
+ if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil {
+ return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err)
+ }
+ }
+ if readPos > 0 || fullyReadPos > 0 {
+ if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil {
+ return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
+ }
+ }
+ return nil
+}