aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-09-27 15:01:34 +0200
committerGitHub <noreply@github.com>2022-09-27 15:01:34 +0200
commit249b32c4f3ee2e01e6f89435e0c7a5786d2ae3a1 (patch)
tree1fc229f6a4bafa88afd7c1db3eda3ff3a59b021e /syncapi/consumers
parentf18bce93cc3e7e5f57ebc55d309360b7f8703553 (diff)
Refactor notifications (#2688)
This PR changes the handling of notifications - removes the `StreamEvent` and `ReadUpdate` stream - listens on the `OutputRoomEvent` stream in the UserAPI to inform the SyncAPI about unread notifications - listens on the `OutputReceiptEvent` stream in the UserAPI to set receipts/update notifications - sets the `read_markers` directly from within the internal UserAPI Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi/consumers')
-rw-r--r--syncapi/consumers/clientapi.go46
-rw-r--r--syncapi/consumers/receipts.go48
-rw-r--r--syncapi/consumers/roomserver.go17
-rw-r--r--syncapi/consumers/userapi.go5
4 files changed, 11 insertions, 105 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index f0588cab..a170a6ec 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -16,9 +16,7 @@ package consumers
import (
"context"
- "database/sql"
"encoding/json"
- "fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
@@ -31,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -46,7 +43,6 @@ type OutputClientDataConsumer struct {
stream types.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
- producer *producers.UserAPIReadProducer
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@@ -57,7 +53,6 @@ func NewOutputClientDataConsumer(
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
- producer *producers.UserAPIReadProducer,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
ctx: process.Context(),
@@ -68,7 +63,6 @@ func NewOutputClientDataConsumer(
notifier: notifier,
stream: stream,
serverName: cfg.Matrix.ServerName,
- producer: producer,
}
}
@@ -113,15 +107,6 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.M
return false
}
- if err = s.sendReadUpdate(ctx, userID, output); err != nil {
- log.WithError(err).WithFields(logrus.Fields{
- "user_id": userID,
- "room_id": output.RoomID,
- }).Errorf("Failed to generate read update")
- sentry.CaptureException(err)
- return false
- }
-
if output.IgnoredUsers != nil {
if err := s.db.UpdateIgnoresForUser(ctx, userID, output.IgnoredUsers); err != nil {
log.WithError(err).WithFields(logrus.Fields{
@@ -136,34 +121,3 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.M
return true
}
-
-func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error {
- if output.Type != "m.fully_read" || output.ReadMarker == nil {
- return nil
- }
- _, serverName, err := gomatrixserverlib.SplitID('@', userID)
- if err != nil {
- return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
- }
- if serverName != s.serverName {
- return nil
- }
- var readPos types.StreamPosition
- var fullyReadPos types.StreamPosition
- if output.ReadMarker.Read != "" {
- if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
- }
- }
- if output.ReadMarker.FullyRead != "" {
- if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err)
- }
- }
- if readPos > 0 || fullyReadPos > 0 {
- if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil {
- return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
- }
- }
- return nil
-}
diff --git a/syncapi/consumers/receipts.go b/syncapi/consumers/receipts.go
index a18244c4..4379dd13 100644
--- a/syncapi/consumers/receipts.go
+++ b/syncapi/consumers/receipts.go
@@ -16,22 +16,19 @@ package consumers
import (
"context"
- "database/sql"
- "fmt"
"strconv"
"github.com/getsentry/sentry-go"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- "github.com/sirupsen/logrus"
- log "github.com/sirupsen/logrus"
)
// OutputReceiptEventConsumer consumes events that originated in the EDU server.
@@ -44,7 +41,6 @@ type OutputReceiptEventConsumer struct {
stream types.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
- producer *producers.UserAPIReadProducer
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@@ -56,7 +52,6 @@ func NewOutputReceiptEventConsumer(
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
- producer *producers.UserAPIReadProducer,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
ctx: process.Context(),
@@ -67,7 +62,6 @@ func NewOutputReceiptEventConsumer(
notifier: notifier,
stream: stream,
serverName: cfg.Matrix.ServerName,
- producer: producer,
}
}
@@ -111,42 +105,8 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats
return true
}
- if err = s.sendReadUpdate(ctx, output); err != nil {
- log.WithError(err).WithFields(logrus.Fields{
- "user_id": output.UserID,
- "room_id": output.RoomID,
- }).Errorf("Failed to generate read update")
- sentry.CaptureException(err)
- return false
- }
-
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return true
}
-
-func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error {
- if output.Type != "m.read" {
- return nil
- }
- _, serverName, err := gomatrixserverlib.SplitID('@', output.UserID)
- if err != nil {
- return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
- }
- if serverName != s.serverName {
- return nil
- }
- var readPos types.StreamPosition
- if output.EventID != "" {
- if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
- }
- }
- if readPos > 0 {
- if err := s.producer.SendReadUpdate(output.UserID, output.RoomID, readPos, 0); err != nil {
- return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
- }
- }
- return nil
-}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 6979eb48..0964ae20 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -21,17 +21,17 @@ import (
"fmt"
"github.com/getsentry/sentry-go"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
@@ -46,7 +46,6 @@ type OutputRoomEventConsumer struct {
pduStream types.StreamProvider
inviteStream types.StreamProvider
notifier *notifier.Notifier
- producer *producers.UserAPIStreamEventProducer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -59,7 +58,6 @@ func NewOutputRoomEventConsumer(
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.SyncRoomserverAPI,
- producer *producers.UserAPIStreamEventProducer,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@@ -72,7 +70,6 @@ func NewOutputRoomEventConsumer(
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
- producer: producer,
}
}
@@ -255,12 +252,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return nil
}
- if err = s.producer.SendStreamEvent(ev.RoomID(), ev, pduPos); err != nil {
- log.WithError(err).Errorf("Failed to send stream output event for event %s", ev.EventID())
- sentry.CaptureException(err)
- return err
- }
-
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
sentry.CaptureException(err)
diff --git a/syncapi/consumers/userapi.go b/syncapi/consumers/userapi.go
index 22782352..c9b96f78 100644
--- a/syncapi/consumers/userapi.go
+++ b/syncapi/consumers/userapi.go
@@ -19,6 +19,9 @@ import (
"encoding/json"
"github.com/getsentry/sentry-go"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -26,8 +29,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
)
// OutputNotificationDataConsumer consumes events that originated in