diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-09-27 15:01:34 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-27 15:01:34 +0200 |
commit | 249b32c4f3ee2e01e6f89435e0c7a5786d2ae3a1 (patch) | |
tree | 1fc229f6a4bafa88afd7c1db3eda3ff3a59b021e /syncapi/consumers/clientapi.go | |
parent | f18bce93cc3e7e5f57ebc55d309360b7f8703553 (diff) |
Refactor notifications (#2688)
This PR changes the handling of notifications
- removes the `StreamEvent` and `ReadUpdate` stream
- listens on the `OutputRoomEvent` stream in the UserAPI to inform the
SyncAPI about unread notifications
- listens on the `OutputReceiptEvent` stream in the UserAPI to set
receipts/update notifications
- sets the `read_markers` directly from within the internal UserAPI
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/consumers/clientapi.go')
-rw-r--r-- | syncapi/consumers/clientapi.go | 46 |
1 files changed, 0 insertions, 46 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index f0588cab..a170a6ec 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -16,9 +16,7 @@ package consumers import ( "context" - "database/sql" "encoding/json" - "fmt" "github.com/getsentry/sentry-go" "github.com/matrix-org/gomatrixserverlib" @@ -31,7 +29,6 @@ import ( "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/syncapi/notifier" - "github.com/matrix-org/dendrite/syncapi/producers" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) @@ -46,7 +43,6 @@ type OutputClientDataConsumer struct { stream types.StreamProvider notifier *notifier.Notifier serverName gomatrixserverlib.ServerName - producer *producers.UserAPIReadProducer } // NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers. @@ -57,7 +53,6 @@ func NewOutputClientDataConsumer( store storage.Database, notifier *notifier.Notifier, stream types.StreamProvider, - producer *producers.UserAPIReadProducer, ) *OutputClientDataConsumer { return &OutputClientDataConsumer{ ctx: process.Context(), @@ -68,7 +63,6 @@ func NewOutputClientDataConsumer( notifier: notifier, stream: stream, serverName: cfg.Matrix.ServerName, - producer: producer, } } @@ -113,15 +107,6 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.M return false } - if err = s.sendReadUpdate(ctx, userID, output); err != nil { - log.WithError(err).WithFields(logrus.Fields{ - "user_id": userID, - "room_id": output.RoomID, - }).Errorf("Failed to generate read update") - sentry.CaptureException(err) - return false - } - if output.IgnoredUsers != nil { if err := s.db.UpdateIgnoresForUser(ctx, userID, output.IgnoredUsers); err != nil { log.WithError(err).WithFields(logrus.Fields{ @@ -136,34 +121,3 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.M return true } - -func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error { - if output.Type != "m.fully_read" || output.ReadMarker == nil { - return nil - } - _, serverName, err := gomatrixserverlib.SplitID('@', userID) - if err != nil { - return fmt.Errorf("gomatrixserverlib.SplitID: %w", err) - } - if serverName != s.serverName { - return nil - } - var readPos types.StreamPosition - var fullyReadPos types.StreamPosition - if output.ReadMarker.Read != "" { - if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil && err != sql.ErrNoRows { - return fmt.Errorf("s.db.PositionInTopology (Read): %w", err) - } - } - if output.ReadMarker.FullyRead != "" { - if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil && err != sql.ErrNoRows { - return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err) - } - } - if readPos > 0 || fullyReadPos > 0 { - if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil { - return fmt.Errorf("s.producer.SendReadUpdate: %w", err) - } - } - return nil -} |