aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/clientapi.go
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-09-27 15:01:34 +0200
committerGitHub <noreply@github.com>2022-09-27 15:01:34 +0200
commit249b32c4f3ee2e01e6f89435e0c7a5786d2ae3a1 (patch)
tree1fc229f6a4bafa88afd7c1db3eda3ff3a59b021e /syncapi/consumers/clientapi.go
parentf18bce93cc3e7e5f57ebc55d309360b7f8703553 (diff)
Refactor notifications (#2688)
This PR changes the handling of notifications - removes the `StreamEvent` and `ReadUpdate` stream - listens on the `OutputRoomEvent` stream in the UserAPI to inform the SyncAPI about unread notifications - listens on the `OutputReceiptEvent` stream in the UserAPI to set receipts/update notifications - sets the `read_markers` directly from within the internal UserAPI Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/consumers/clientapi.go')
-rw-r--r--syncapi/consumers/clientapi.go46
1 files changed, 0 insertions, 46 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index f0588cab..a170a6ec 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -16,9 +16,7 @@ package consumers
import (
"context"
- "database/sql"
"encoding/json"
- "fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
@@ -31,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -46,7 +43,6 @@ type OutputClientDataConsumer struct {
stream types.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
- producer *producers.UserAPIReadProducer
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@@ -57,7 +53,6 @@ func NewOutputClientDataConsumer(
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
- producer *producers.UserAPIReadProducer,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
ctx: process.Context(),
@@ -68,7 +63,6 @@ func NewOutputClientDataConsumer(
notifier: notifier,
stream: stream,
serverName: cfg.Matrix.ServerName,
- producer: producer,
}
}
@@ -113,15 +107,6 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.M
return false
}
- if err = s.sendReadUpdate(ctx, userID, output); err != nil {
- log.WithError(err).WithFields(logrus.Fields{
- "user_id": userID,
- "room_id": output.RoomID,
- }).Errorf("Failed to generate read update")
- sentry.CaptureException(err)
- return false
- }
-
if output.IgnoredUsers != nil {
if err := s.db.UpdateIgnoresForUser(ctx, userID, output.IgnoredUsers); err != nil {
log.WithError(err).WithFields(logrus.Fields{
@@ -136,34 +121,3 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.M
return true
}
-
-func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error {
- if output.Type != "m.fully_read" || output.ReadMarker == nil {
- return nil
- }
- _, serverName, err := gomatrixserverlib.SplitID('@', userID)
- if err != nil {
- return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
- }
- if serverName != s.serverName {
- return nil
- }
- var readPos types.StreamPosition
- var fullyReadPos types.StreamPosition
- if output.ReadMarker.Read != "" {
- if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
- }
- }
- if output.ReadMarker.FullyRead != "" {
- if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err)
- }
- }
- if readPos > 0 || fullyReadPos > 0 {
- if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil {
- return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
- }
- }
- return nil
-}