aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-09-27 15:01:34 +0200
committerGitHub <noreply@github.com>2022-09-27 15:01:34 +0200
commit249b32c4f3ee2e01e6f89435e0c7a5786d2ae3a1 (patch)
tree1fc229f6a4bafa88afd7c1db3eda3ff3a59b021e /syncapi
parentf18bce93cc3e7e5f57ebc55d309360b7f8703553 (diff)
Refactor notifications (#2688)
This PR changes the handling of notifications - removes the `StreamEvent` and `ReadUpdate` stream - listens on the `OutputRoomEvent` stream in the UserAPI to inform the SyncAPI about unread notifications - listens on the `OutputReceiptEvent` stream in the UserAPI to set receipts/update notifications - sets the `read_markers` directly from within the internal UserAPI Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/clientapi.go46
-rw-r--r--syncapi/consumers/receipts.go48
-rw-r--r--syncapi/consumers/roomserver.go17
-rw-r--r--syncapi/consumers/userapi.go5
-rw-r--r--syncapi/producers/userapi_readupdate.go62
-rw-r--r--syncapi/producers/userapi_streamevent.go60
-rw-r--r--syncapi/storage/interface.go15
-rw-r--r--syncapi/storage/postgres/notification_data_table.go36
-rw-r--r--syncapi/storage/shared/syncserver.go11
-rw-r--r--syncapi/storage/sqlite3/notification_data_table.go39
-rw-r--r--syncapi/storage/tables/interface.go2
-rw-r--r--syncapi/streams/stream_accountdata.go3
-rw-r--r--syncapi/streams/stream_notificationdata.go23
-rw-r--r--syncapi/syncapi.go14
-rw-r--r--syncapi/types/types.go23
15 files changed, 93 insertions, 311 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index f0588cab..a170a6ec 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -16,9 +16,7 @@ package consumers
import (
"context"
- "database/sql"
"encoding/json"
- "fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
@@ -31,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
@@ -46,7 +43,6 @@ type OutputClientDataConsumer struct {
stream types.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
- producer *producers.UserAPIReadProducer
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@@ -57,7 +53,6 @@ func NewOutputClientDataConsumer(
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
- producer *producers.UserAPIReadProducer,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
ctx: process.Context(),
@@ -68,7 +63,6 @@ func NewOutputClientDataConsumer(
notifier: notifier,
stream: stream,
serverName: cfg.Matrix.ServerName,
- producer: producer,
}
}
@@ -113,15 +107,6 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.M
return false
}
- if err = s.sendReadUpdate(ctx, userID, output); err != nil {
- log.WithError(err).WithFields(logrus.Fields{
- "user_id": userID,
- "room_id": output.RoomID,
- }).Errorf("Failed to generate read update")
- sentry.CaptureException(err)
- return false
- }
-
if output.IgnoredUsers != nil {
if err := s.db.UpdateIgnoresForUser(ctx, userID, output.IgnoredUsers); err != nil {
log.WithError(err).WithFields(logrus.Fields{
@@ -136,34 +121,3 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msgs []*nats.M
return true
}
-
-func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error {
- if output.Type != "m.fully_read" || output.ReadMarker == nil {
- return nil
- }
- _, serverName, err := gomatrixserverlib.SplitID('@', userID)
- if err != nil {
- return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
- }
- if serverName != s.serverName {
- return nil
- }
- var readPos types.StreamPosition
- var fullyReadPos types.StreamPosition
- if output.ReadMarker.Read != "" {
- if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
- }
- }
- if output.ReadMarker.FullyRead != "" {
- if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err)
- }
- }
- if readPos > 0 || fullyReadPos > 0 {
- if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil {
- return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
- }
- }
- return nil
-}
diff --git a/syncapi/consumers/receipts.go b/syncapi/consumers/receipts.go
index a18244c4..4379dd13 100644
--- a/syncapi/consumers/receipts.go
+++ b/syncapi/consumers/receipts.go
@@ -16,22 +16,19 @@ package consumers
import (
"context"
- "database/sql"
- "fmt"
"strconv"
"github.com/getsentry/sentry-go"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- "github.com/sirupsen/logrus"
- log "github.com/sirupsen/logrus"
)
// OutputReceiptEventConsumer consumes events that originated in the EDU server.
@@ -44,7 +41,6 @@ type OutputReceiptEventConsumer struct {
stream types.StreamProvider
notifier *notifier.Notifier
serverName gomatrixserverlib.ServerName
- producer *producers.UserAPIReadProducer
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@@ -56,7 +52,6 @@ func NewOutputReceiptEventConsumer(
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
- producer *producers.UserAPIReadProducer,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
ctx: process.Context(),
@@ -67,7 +62,6 @@ func NewOutputReceiptEventConsumer(
notifier: notifier,
stream: stream,
serverName: cfg.Matrix.ServerName,
- producer: producer,
}
}
@@ -111,42 +105,8 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats
return true
}
- if err = s.sendReadUpdate(ctx, output); err != nil {
- log.WithError(err).WithFields(logrus.Fields{
- "user_id": output.UserID,
- "room_id": output.RoomID,
- }).Errorf("Failed to generate read update")
- sentry.CaptureException(err)
- return false
- }
-
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return true
}
-
-func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error {
- if output.Type != "m.read" {
- return nil
- }
- _, serverName, err := gomatrixserverlib.SplitID('@', output.UserID)
- if err != nil {
- return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
- }
- if serverName != s.serverName {
- return nil
- }
- var readPos types.StreamPosition
- if output.EventID != "" {
- if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil && err != sql.ErrNoRows {
- return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
- }
- }
- if readPos > 0 {
- if err := s.producer.SendReadUpdate(output.UserID, output.RoomID, readPos, 0); err != nil {
- return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
- }
- }
- return nil
-}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 6979eb48..0964ae20 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -21,17 +21,17 @@ import (
"fmt"
"github.com/getsentry/sentry-go"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
@@ -46,7 +46,6 @@ type OutputRoomEventConsumer struct {
pduStream types.StreamProvider
inviteStream types.StreamProvider
notifier *notifier.Notifier
- producer *producers.UserAPIStreamEventProducer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -59,7 +58,6 @@ func NewOutputRoomEventConsumer(
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.SyncRoomserverAPI,
- producer *producers.UserAPIStreamEventProducer,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@@ -72,7 +70,6 @@ func NewOutputRoomEventConsumer(
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
- producer: producer,
}
}
@@ -255,12 +252,6 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return nil
}
- if err = s.producer.SendStreamEvent(ev.RoomID(), ev, pduPos); err != nil {
- log.WithError(err).Errorf("Failed to send stream output event for event %s", ev.EventID())
- sentry.CaptureException(err)
- return err
- }
-
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
sentry.CaptureException(err)
diff --git a/syncapi/consumers/userapi.go b/syncapi/consumers/userapi.go
index 22782352..c9b96f78 100644
--- a/syncapi/consumers/userapi.go
+++ b/syncapi/consumers/userapi.go
@@ -19,6 +19,9 @@ import (
"encoding/json"
"github.com/getsentry/sentry-go"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -26,8 +29,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
)
// OutputNotificationDataConsumer consumes events that originated in
diff --git a/syncapi/producers/userapi_readupdate.go b/syncapi/producers/userapi_readupdate.go
deleted file mode 100644
index d56cab77..00000000
--- a/syncapi/producers/userapi_readupdate.go
+++ /dev/null
@@ -1,62 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package producers
-
-import (
- "encoding/json"
-
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/syncapi/types"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
-)
-
-// UserAPIProducer produces events for the user API server to consume
-type UserAPIReadProducer struct {
- Topic string
- JetStream nats.JetStreamContext
-}
-
-// SendData sends account data to the user API server
-func (p *UserAPIReadProducer) SendReadUpdate(userID, roomID string, readPos, fullyReadPos types.StreamPosition) error {
- m := &nats.Msg{
- Subject: p.Topic,
- Header: nats.Header{},
- }
- m.Header.Set(jetstream.UserID, userID)
- m.Header.Set(jetstream.RoomID, roomID)
-
- data := types.ReadUpdate{
- UserID: userID,
- RoomID: roomID,
- Read: readPos,
- FullyRead: fullyReadPos,
- }
- var err error
- m.Data, err = json.Marshal(data)
- if err != nil {
- return err
- }
-
- log.WithFields(log.Fields{
- "user_id": userID,
- "room_id": roomID,
- "read_pos": readPos,
- "fully_read_pos": fullyReadPos,
- }).Tracef("Producing to topic '%s'", p.Topic)
-
- _, err = p.JetStream.PublishMsg(m)
- return err
-}
diff --git a/syncapi/producers/userapi_streamevent.go b/syncapi/producers/userapi_streamevent.go
deleted file mode 100644
index 2bbd19c0..00000000
--- a/syncapi/producers/userapi_streamevent.go
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package producers
-
-import (
- "encoding/json"
-
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
-)
-
-// UserAPIProducer produces events for the user API server to consume
-type UserAPIStreamEventProducer struct {
- Topic string
- JetStream nats.JetStreamContext
-}
-
-// SendData sends account data to the user API server
-func (p *UserAPIStreamEventProducer) SendStreamEvent(roomID string, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) error {
- m := &nats.Msg{
- Subject: p.Topic,
- Header: nats.Header{},
- }
- m.Header.Set(jetstream.RoomID, roomID)
-
- data := types.StreamedEvent{
- Event: event,
- StreamPosition: pos,
- }
- var err error
- m.Data, err = json.Marshal(data)
- if err != nil {
- return err
- }
-
- log.WithFields(log.Fields{
- "room_id": roomID,
- "event_id": event.EventID(),
- "event_type": event.Type(),
- "stream_pos": pos,
- }).Tracef("Producing to topic '%s'", p.Topic)
-
- _, err = p.JetStream.PublishMsg(m)
- return err
-}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 0c8ba4e3..ad3be420 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -29,6 +29,7 @@ import (
type Database interface {
Presence
SharedUsers
+ Notifications
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
@@ -148,12 +149,6 @@ type Database interface {
// GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error)
- // UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key.
- UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
-
- // GetUserUnreadNotificationCounts returns statistics per room a user is interested in.
- GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error)
-
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
@@ -179,3 +174,11 @@ type SharedUsers interface {
// SharedUsers returns a subset of otherUserIDs that share a room with userID.
SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error)
}
+
+type Notifications interface {
+ // UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key.
+ UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
+
+ // getUserUnreadNotificationCountsForRooms returns the unread notifications for the given rooms
+ GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, roomIDs map[string]string) (map[string]*eventutil.NotificationData, error)
+}
diff --git a/syncapi/storage/postgres/notification_data_table.go b/syncapi/storage/postgres/notification_data_table.go
index 708c3a9b..2c7b2480 100644
--- a/syncapi/storage/postgres/notification_data_table.go
+++ b/syncapi/storage/postgres/notification_data_table.go
@@ -18,6 +18,8 @@ import (
"context"
"database/sql"
+ "github.com/lib/pq"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
@@ -33,15 +35,15 @@ func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, erro
r := &notificationDataStatements{}
return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
- {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
+ {&r.selectUserUnreadCountsForRooms, selectUserUnreadNotificationsForRooms},
{&r.selectMaxID, selectMaxNotificationIDSQL},
}.Prepare(db)
}
type notificationDataStatements struct {
- upsertRoomUnreadCounts *sql.Stmt
- selectUserUnreadCounts *sql.Stmt
- selectMaxID *sql.Stmt
+ upsertRoomUnreadCounts *sql.Stmt
+ selectUserUnreadCountsForRooms *sql.Stmt
+ selectMaxID *sql.Stmt
}
const notificationDataSchema = `
@@ -61,12 +63,10 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_
DO UPDATE SET id = nextval('syncapi_notification_data_id_seq'), notification_count = $3, highlight_count = $4
RETURNING id`
-const selectUserUnreadNotificationCountsSQL = `SELECT
- id, room_id, notification_count, highlight_count
- FROM syncapi_notification_data
- WHERE
- user_id = $1 AND
- id BETWEEN $2 + 1 AND $3`
+const selectUserUnreadNotificationsForRooms = `SELECT room_id, notification_count, highlight_count
+ FROM syncapi_notification_data
+ WHERE user_id = $1 AND
+ room_id = ANY($2)`
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
@@ -75,20 +75,20 @@ func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context,
return
}
-func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
- rows, err := sqlutil.TxStmt(txn, r.selectUserUnreadCounts).QueryContext(ctx, userID, fromExcl, toIncl)
+func (r *notificationDataStatements) SelectUserUnreadCountsForRooms(
+ ctx context.Context, txn *sql.Tx, userID string, roomIDs []string,
+) (map[string]*eventutil.NotificationData, error) {
+ rows, err := sqlutil.TxStmt(txn, r.selectUserUnreadCountsForRooms).QueryContext(ctx, userID, pq.Array(roomIDs))
if err != nil {
return nil, err
}
- defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed")
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCountsForRooms: rows.close() failed")
roomCounts := map[string]*eventutil.NotificationData{}
+ var roomID string
+ var notificationCount, highlightCount int
for rows.Next() {
- var id types.StreamPosition
- var roomID string
- var notificationCount, highlightCount int
-
- if err = rows.Scan(&id, &roomID, &notificationCount, &highlightCount); err != nil {
+ if err = rows.Scan(&roomID, &notificationCount, &highlightCount); err != nil {
return nil, err
}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 778ad8b1..215bad3a 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -1036,8 +1036,15 @@ func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userI
return
}
-func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
- return d.NotificationData.SelectUserUnreadCounts(ctx, nil, userID, from, to)
+func (d *Database) GetUserUnreadNotificationCountsForRooms(ctx context.Context, userID string, rooms map[string]string) (map[string]*eventutil.NotificationData, error) {
+ roomIDs := make([]string, 0, len(rooms))
+ for roomID, membership := range rooms {
+ if membership != gomatrixserverlib.Join {
+ continue
+ }
+ roomIDs = append(roomIDs, roomID)
+ }
+ return d.NotificationData.SelectUserUnreadCountsForRooms(ctx, nil, userID, roomIDs)
}
func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
diff --git a/syncapi/storage/sqlite3/notification_data_table.go b/syncapi/storage/sqlite3/notification_data_table.go
index 66d4d438..ceff6055 100644
--- a/syncapi/storage/sqlite3/notification_data_table.go
+++ b/syncapi/storage/sqlite3/notification_data_table.go
@@ -17,6 +17,7 @@ package sqlite3
import (
"context"
"database/sql"
+ "strings"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
@@ -32,19 +33,21 @@ func NewSqliteNotificationDataTable(db *sql.DB, streamID *StreamIDStatements) (t
}
r := &notificationDataStatements{
streamIDStatements: streamID,
+ db: db,
}
return r, sqlutil.StatementList{
{&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
- {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
{&r.selectMaxID, selectMaxNotificationIDSQL},
+ // {&r.selectUserUnreadCountsForRooms, selectUserUnreadNotificationsForRooms}, // used at runtime
}.Prepare(db)
}
type notificationDataStatements struct {
+ db *sql.DB
streamIDStatements *StreamIDStatements
upsertRoomUnreadCounts *sql.Stmt
- selectUserUnreadCounts *sql.Stmt
selectMaxID *sql.Stmt
+ //selectUserUnreadCountsForRooms *sql.Stmt
}
const notificationDataSchema = `
@@ -63,12 +66,10 @@ const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_
ON CONFLICT (user_id, room_id)
DO UPDATE SET id = $5, notification_count = $6, highlight_count = $7`
-const selectUserUnreadNotificationCountsSQL = `SELECT
- id, room_id, notification_count, highlight_count
- FROM syncapi_notification_data
- WHERE
- user_id = $1 AND
- id BETWEEN $2 + 1 AND $3`
+const selectUserUnreadNotificationsForRooms = `SELECT room_id, notification_count, highlight_count
+ FROM syncapi_notification_data
+ WHERE user_id = $1 AND
+ room_id IN ($2)`
const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
@@ -81,20 +82,26 @@ func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context,
return
}
-func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
- rows, err := sqlutil.TxStmt(txn, r.selectUserUnreadCounts).QueryContext(ctx, userID, fromExcl, toIncl)
+func (r *notificationDataStatements) SelectUserUnreadCountsForRooms(
+ ctx context.Context, txn *sql.Tx, userID string, roomIDs []string,
+) (map[string]*eventutil.NotificationData, error) {
+ params := make([]interface{}, len(roomIDs)+1)
+ params[0] = userID
+ for i := range roomIDs {
+ params[i+1] = roomIDs[i]
+ }
+ sql := strings.Replace(selectUserUnreadNotificationsForRooms, "($1)", sqlutil.QueryVariadic(len(params)), 1)
+ rows, err := r.db.QueryContext(ctx, sql, params)
if err != nil {
return nil, err
}
- defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed")
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCountsForRooms: rows.close() failed")
roomCounts := map[string]*eventutil.NotificationData{}
+ var roomID string
+ var notificationCount, highlightCount int
for rows.Next() {
- var id types.StreamPosition
- var roomID string
- var notificationCount, highlightCount int
-
- if err = rows.Scan(&id, &roomID, &notificationCount, &highlightCount); err != nil {
+ if err = rows.Scan(&roomID, &notificationCount, &highlightCount); err != nil {
return nil, err
}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 193881b4..9a873c2e 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -190,7 +190,7 @@ type Memberships interface {
type NotificationData interface {
UpsertRoomUnreadCounts(ctx context.Context, txn *sql.Tx, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
- SelectUserUnreadCounts(ctx context.Context, txn *sql.Tx, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
+ SelectUserUnreadCountsForRooms(ctx context.Context, txn *sql.Tx, userID string, roomIDs []string) (map[string]*eventutil.NotificationData, error)
SelectMaxID(ctx context.Context, txn *sql.Tx) (int64, error)
}
diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go
index 9c19b846..0297d5c2 100644
--- a/syncapi/streams/stream_accountdata.go
+++ b/syncapi/streams/stream_accountdata.go
@@ -3,9 +3,10 @@ package streams
import (
"context"
+ "github.com/matrix-org/gomatrixserverlib"
+
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
)
type AccountDataStreamProvider struct {
diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go
index 8ba9e07c..33872734 100644
--- a/syncapi/streams/stream_notificationdata.go
+++ b/syncapi/streams/stream_notificationdata.go
@@ -30,26 +30,29 @@ func (p *NotificationDataStreamProvider) CompleteSync(
func (p *NotificationDataStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
- from, to types.StreamPosition,
+ from, _ types.StreamPosition,
) types.StreamPosition {
- // We want counts for all possible rooms, so always start from zero.
- countsByRoom, err := p.DB.GetUserUnreadNotificationCounts(ctx, req.Device.UserID, from, to)
+ // Get the unread notifications for rooms in our join response.
+ // This is to ensure clients always have an unread notification section
+ // and can display the correct numbers.
+ countsByRoom, err := p.DB.GetUserUnreadNotificationCountsForRooms(ctx, req.Device.UserID, req.Rooms)
if err != nil {
- req.Log.WithError(err).Error("GetUserUnreadNotificationCounts failed")
+ req.Log.WithError(err).Error("GetUserUnreadNotificationCountsForRooms failed")
return from
}
- // We're merely decorating existing rooms. Note that the Join map
- // values are not pointers.
+ // We're merely decorating existing rooms.
for roomID, jr := range req.Response.Rooms.Join {
counts := countsByRoom[roomID]
if counts == nil {
continue
}
-
- jr.UnreadNotifications.HighlightCount = counts.UnreadHighlightCount
- jr.UnreadNotifications.NotificationCount = counts.UnreadNotificationCount
+ jr.UnreadNotifications = &types.UnreadNotifications{
+ HighlightCount: counts.UnreadHighlightCount,
+ NotificationCount: counts.UnreadNotificationCount,
+ }
req.Response.Rooms.Join[roomID] = jr
}
- return to
+
+ return p.LatestPosition(ctx)
}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 68537bc4..f5d00f36 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -77,16 +77,6 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start presence consumer")
}
- userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
- JetStream: js,
- Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
- }
-
- userAPIReadUpdateProducer := &producers.UserAPIReadProducer{
- JetStream: js,
- Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
- }
-
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
base.ProcessContext, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, rsAPI, syncDB, notifier,
@@ -98,7 +88,7 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
- streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer,
+ streams.InviteStreamProvider, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
@@ -106,7 +96,6 @@ func AddPublicRoutes(
clientConsumer := consumers.NewOutputClientDataConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider,
- userAPIReadUpdateProducer,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
@@ -135,7 +124,6 @@ func AddPublicRoutes(
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
base.ProcessContext, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider,
- userAPIReadUpdateProducer,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index d75d53ca..3b85db4a 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -398,6 +398,11 @@ func (r *Response) IsEmpty() bool {
len(r.ToDevice.Events) == 0
}
+type UnreadNotifications struct {
+ HighlightCount int `json:"highlight_count"`
+ NotificationCount int `json:"notification_count"`
+}
+
// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
type JoinResponse struct {
Summary struct {
@@ -419,10 +424,7 @@ type JoinResponse struct {
AccountData struct {
Events []gomatrixserverlib.ClientEvent `json:"events"`
} `json:"account_data"`
- UnreadNotifications struct {
- HighlightCount int `json:"highlight_count"`
- NotificationCount int `json:"notification_count"`
- } `json:"unread_notifications"`
+ *UnreadNotifications `json:"unread_notifications,omitempty"`
}
// NewJoinResponse creates an empty response with initialised arrays.
@@ -503,19 +505,6 @@ type Peek struct {
Deleted bool
}
-type ReadUpdate struct {
- UserID string `json:"user_id"`
- RoomID string `json:"room_id"`
- Read StreamPosition `json:"read,omitempty"`
- FullyRead StreamPosition `json:"fully_read,omitempty"`
-}
-
-// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
-type StreamedEvent struct {
- Event *gomatrixserverlib.HeaderedEvent `json:"event"`
- StreamPosition StreamPosition `json:"stream_position"`
-}
-
// OutputReceiptEvent is an entry in the receipt output kafka log
type OutputReceiptEvent struct {
UserID string `json:"user_id"`