aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/clientapi.go77
-rw-r--r--syncapi/consumers/eduserver_receipts.go71
-rw-r--r--syncapi/consumers/roomserver.go10
-rw-r--r--syncapi/consumers/userapi.go110
-rw-r--r--syncapi/notifier/notifier.go11
-rw-r--r--syncapi/notifier/notifier_test.go2
-rw-r--r--syncapi/producers/userapi_readupdate.go62
-rw-r--r--syncapi/producers/userapi_streamevent.go60
-rw-r--r--syncapi/storage/interface.go8
-rw-r--r--syncapi/storage/postgres/notification_data_table.go108
-rw-r--r--syncapi/storage/postgres/syncserver.go5
-rw-r--r--syncapi/storage/shared/syncserver.go21
-rw-r--r--syncapi/storage/sqlite3/notification_data_table.go108
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go6
-rw-r--r--syncapi/storage/sqlite3/syncserver.go5
-rw-r--r--syncapi/storage/tables/interface.go7
-rw-r--r--syncapi/streams/stream_notificationdata.go55
-rw-r--r--syncapi/streams/streams.go34
-rw-r--r--syncapi/sync/requestpool.go9
-rw-r--r--syncapi/syncapi.go24
-rw-r--r--syncapi/types/types.go61
-rw-r--r--syncapi/types/types_test.go8
22 files changed, 795 insertions, 67 deletions
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index c3650085..f01afce6 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -17,6 +17,7 @@ package consumers
import (
"context"
"encoding/json"
+ "fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal/eventutil"
@@ -24,21 +25,26 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- db storage.Database
- stream types.StreamProvider
- notifier *notifier.Notifier
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ stream types.StreamProvider
+ notifier *notifier.Notifier
+ serverName gomatrixserverlib.ServerName
+ producer *producers.UserAPIReadProducer
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
@@ -49,15 +55,18 @@ func NewOutputClientDataConsumer(
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
+ producer *producers.UserAPIReadProducer,
) *OutputClientDataConsumer {
return &OutputClientDataConsumer{
- ctx: process.Context(),
- jetstream: js,
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
- db: store,
- notifier: notifier,
- stream: stream,
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
+ db: store,
+ notifier: notifier,
+ stream: stream,
+ serverName: cfg.Matrix.ServerName,
+ producer: producer,
}
}
@@ -100,8 +109,48 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg)
}).Panicf("could not save account data")
}
+ if err = s.sendReadUpdate(ctx, userID, output); err != nil {
+ log.WithError(err).WithFields(logrus.Fields{
+ "user_id": userID,
+ "room_id": output.RoomID,
+ }).Errorf("Failed to generate read update")
+ sentry.CaptureException(err)
+ return false
+ }
+
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
return true
}
+
+func (s *OutputClientDataConsumer) sendReadUpdate(ctx context.Context, userID string, output eventutil.AccountData) error {
+ if output.Type != "m.fully_read" || output.ReadMarker == nil {
+ return nil
+ }
+ _, serverName, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
+ }
+ if serverName != s.serverName {
+ return nil
+ }
+ var readPos types.StreamPosition
+ var fullyReadPos types.StreamPosition
+ if output.ReadMarker.Read != "" {
+ if _, readPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.Read); err != nil {
+ return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
+ }
+ }
+ if output.ReadMarker.FullyRead != "" {
+ if _, fullyReadPos, err = s.db.PositionInTopology(ctx, output.ReadMarker.FullyRead); err != nil {
+ return fmt.Errorf("s.db.PositionInTopology (FullyRead): %w", err)
+ }
+ }
+ if readPos > 0 || fullyReadPos > 0 {
+ if err := s.producer.SendReadUpdate(userID, output.RoomID, readPos, fullyReadPos); err != nil {
+ return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
+ }
+ }
+ return nil
+}
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
index 392840ec..88158344 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -17,6 +17,7 @@ package consumers
import (
"context"
"encoding/json"
+ "fmt"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api"
@@ -24,21 +25,26 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// OutputReceiptEventConsumer consumes events that originated in the EDU server.
type OutputReceiptEventConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- db storage.Database
- stream types.StreamProvider
- notifier *notifier.Notifier
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ stream types.StreamProvider
+ notifier *notifier.Notifier
+ serverName gomatrixserverlib.ServerName
+ producer *producers.UserAPIReadProducer
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@@ -50,15 +56,18 @@ func NewOutputReceiptEventConsumer(
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
+ producer *producers.UserAPIReadProducer,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
- ctx: process.Context(),
- jetstream: js,
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
- db: store,
- notifier: notifier,
- stream: stream,
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
+ db: store,
+ notifier: notifier,
+ stream: stream,
+ serverName: cfg.Matrix.ServerName,
+ producer: producer,
}
}
@@ -92,8 +101,42 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms
return true
}
+ if err = s.sendReadUpdate(ctx, output); err != nil {
+ log.WithError(err).WithFields(logrus.Fields{
+ "user_id": output.UserID,
+ "room_id": output.RoomID,
+ }).Errorf("Failed to generate read update")
+ sentry.CaptureException(err)
+ return false
+ }
+
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return true
}
+
+func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error {
+ if output.Type != "m.read" {
+ return nil
+ }
+ _, serverName, err := gomatrixserverlib.SplitID('@', output.UserID)
+ if err != nil {
+ return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
+ }
+ if serverName != s.serverName {
+ return nil
+ }
+ var readPos types.StreamPosition
+ if output.EventID != "" {
+ if _, readPos, err = s.db.PositionInTopology(ctx, output.EventID); err != nil {
+ return fmt.Errorf("s.db.PositionInTopology (Read): %w", err)
+ }
+ }
+ if readPos > 0 {
+ if err := s.producer.SendReadUpdate(output.UserID, output.RoomID, readPos, 0); err != nil {
+ return fmt.Errorf("s.producer.SendReadUpdate: %w", err)
+ }
+ }
+ return nil
+}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 15485bb3..159657f9 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@@ -45,6 +46,7 @@ type OutputRoomEventConsumer struct {
pduStream types.StreamProvider
inviteStream types.StreamProvider
notifier *notifier.Notifier
+ producer *producers.UserAPIStreamEventProducer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -57,6 +59,7 @@ func NewOutputRoomEventConsumer(
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI,
+ producer *producers.UserAPIStreamEventProducer,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
@@ -69,6 +72,7 @@ func NewOutputRoomEventConsumer(
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
+ producer: producer,
}
}
@@ -194,6 +198,12 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
return nil
}
+ if err = s.producer.SendStreamEvent(ev.RoomID(), ev, pduPos); err != nil {
+ log.WithError(err).Errorf("Failed to send stream output event for event %s", ev.EventID())
+ sentry.CaptureException(err)
+ return err
+ }
+
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
sentry.CaptureException(err)
diff --git a/syncapi/consumers/userapi.go b/syncapi/consumers/userapi.go
new file mode 100644
index 00000000..a3b2dd53
--- /dev/null
+++ b/syncapi/consumers/userapi.go
@@ -0,0 +1,110 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/getsentry/sentry-go"
+ "github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// OutputNotificationDataConsumer consumes events that originated in
+// the Push server.
+type OutputNotificationDataConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ notifier *notifier.Notifier
+ stream types.StreamProvider
+}
+
+// NewOutputNotificationDataConsumer creates a new consumer. Call
+// Start() to begin consuming.
+func NewOutputNotificationDataConsumer(
+ process *process.ProcessContext,
+ cfg *config.SyncAPI,
+ js nats.JetStreamContext,
+ store storage.Database,
+ notifier *notifier.Notifier,
+ stream types.StreamProvider,
+) *OutputNotificationDataConsumer {
+ s := &OutputNotificationDataConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"),
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
+ db: store,
+ notifier: notifier,
+ stream: stream,
+ }
+ return s
+}
+
+// Start starts consumption.
+func (s *OutputNotificationDataConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
+}
+
+// onMessage is called when the Sync server receives a new event from
+// the push server. It is not safe for this function to be called from
+// multiple goroutines, or else the sync stream position may race and
+// be incorrectly calculated.
+func (s *OutputNotificationDataConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ userID := string(msg.Header.Get(jetstream.UserID))
+
+ // Parse out the event JSON
+ var data eventutil.NotificationData
+ if err := json.Unmarshal(msg.Data, &data); err != nil {
+ sentry.CaptureException(err)
+ log.WithField("user_id", userID).WithError(err).Error("user API consumer: message parse failure")
+ return true
+ }
+
+ streamPos, err := s.db.UpsertRoomUnreadNotificationCounts(ctx, userID, data.RoomID, data.UnreadNotificationCount, data.UnreadHighlightCount)
+ if err != nil {
+ sentry.CaptureException(err)
+ log.WithFields(log.Fields{
+ "user_id": userID,
+ "room_id": data.RoomID,
+ }).WithError(err).Error("Could not save notification counts")
+ return false
+ }
+
+ s.stream.Advance(streamPos)
+ s.notifier.OnNewNotificationData(userID, types.StreamingToken{NotificationDataPosition: streamPos})
+
+ log.WithFields(log.Fields{
+ "user_id": userID,
+ "room_id": data.RoomID,
+ "streamPos": streamPos,
+ }).Trace("Received notification data from user API")
+
+ return true
+}
diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go
index d853cc0e..6a641e6f 100644
--- a/syncapi/notifier/notifier.go
+++ b/syncapi/notifier/notifier.go
@@ -217,6 +217,17 @@ func (n *Notifier) OnNewInvite(
n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
}
+func (n *Notifier) OnNewNotificationData(
+ userID string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers([]string{userID}, nil, n.currPos)
+}
+
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go
index c6d3df7e..60403d5d 100644
--- a/syncapi/notifier/notifier_test.go
+++ b/syncapi/notifier/notifier_test.go
@@ -219,7 +219,7 @@ func TestEDUWakeup(t *testing.T) {
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
if err != nil {
- t.Errorf("TestNewInviteEventForUser error: %w", err)
+ t.Errorf("TestNewInviteEventForUser error: %v", err)
}
mustEqualPositions(t, pos, syncPositionNewEDU)
wg.Done()
diff --git a/syncapi/producers/userapi_readupdate.go b/syncapi/producers/userapi_readupdate.go
new file mode 100644
index 00000000..d56cab77
--- /dev/null
+++ b/syncapi/producers/userapi_readupdate.go
@@ -0,0 +1,62 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// UserAPIProducer produces events for the user API server to consume
+type UserAPIReadProducer struct {
+ Topic string
+ JetStream nats.JetStreamContext
+}
+
+// SendData sends account data to the user API server
+func (p *UserAPIReadProducer) SendReadUpdate(userID, roomID string, readPos, fullyReadPos types.StreamPosition) error {
+ m := &nats.Msg{
+ Subject: p.Topic,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set(jetstream.RoomID, roomID)
+
+ data := types.ReadUpdate{
+ UserID: userID,
+ RoomID: roomID,
+ Read: readPos,
+ FullyRead: fullyReadPos,
+ }
+ var err error
+ m.Data, err = json.Marshal(data)
+ if err != nil {
+ return err
+ }
+
+ log.WithFields(log.Fields{
+ "user_id": userID,
+ "room_id": roomID,
+ "read_pos": readPos,
+ "fully_read_pos": fullyReadPos,
+ }).Tracef("Producing to topic '%s'", p.Topic)
+
+ _, err = p.JetStream.PublishMsg(m)
+ return err
+}
diff --git a/syncapi/producers/userapi_streamevent.go b/syncapi/producers/userapi_streamevent.go
new file mode 100644
index 00000000..2bbd19c0
--- /dev/null
+++ b/syncapi/producers/userapi_streamevent.go
@@ -0,0 +1,60 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// UserAPIProducer produces events for the user API server to consume
+type UserAPIStreamEventProducer struct {
+ Topic string
+ JetStream nats.JetStreamContext
+}
+
+// SendData sends account data to the user API server
+func (p *UserAPIStreamEventProducer) SendStreamEvent(roomID string, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) error {
+ m := &nats.Msg{
+ Subject: p.Topic,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.RoomID, roomID)
+
+ data := types.StreamedEvent{
+ Event: event,
+ StreamPosition: pos,
+ }
+ var err error
+ m.Data, err = json.Marshal(data)
+ if err != nil {
+ return err
+ }
+
+ log.WithFields(log.Fields{
+ "room_id": roomID,
+ "event_id": event.EventID(),
+ "event_type": event.Type(),
+ "stream_pos": pos,
+ }).Tracef("Producing to topic '%s'", p.Topic)
+
+ _, err = p.JetStream.PublishMsg(m)
+ return err
+}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 126bc865..e4476633 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -18,6 +18,7 @@ import (
"context"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -31,6 +32,7 @@ type Database interface {
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error)
+ MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error)
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
@@ -138,6 +140,12 @@ type Database interface {
// GetRoomReceipts gets all receipts for a given roomID
GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
+ // UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key.
+ UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
+
+ // GetUserUnreadNotificationCounts returns statistics per room a user is interested in.
+ GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error)
+
SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error)
SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error)
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
diff --git a/syncapi/storage/postgres/notification_data_table.go b/syncapi/storage/postgres/notification_data_table.go
new file mode 100644
index 00000000..f3fc4451
--- /dev/null
+++ b/syncapi/storage/postgres/notification_data_table.go
@@ -0,0 +1,108 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+func NewPostgresNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
+ _, err := db.Exec(notificationDataSchema)
+ if err != nil {
+ return nil, err
+ }
+ r := &notificationDataStatements{}
+ return r, sqlutil.StatementList{
+ {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
+ {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
+ {&r.selectMaxID, selectMaxNotificationIDSQL},
+ }.Prepare(db)
+}
+
+type notificationDataStatements struct {
+ upsertRoomUnreadCounts *sql.Stmt
+ selectUserUnreadCounts *sql.Stmt
+ selectMaxID *sql.Stmt
+}
+
+const notificationDataSchema = `
+CREATE TABLE IF NOT EXISTS syncapi_notification_data (
+ id BIGSERIAL PRIMARY KEY,
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ notification_count BIGINT NOT NULL DEFAULT 0,
+ highlight_count BIGINT NOT NULL DEFAULT 0,
+ CONSTRAINT syncapi_notification_data_unique UNIQUE (user_id, room_id)
+);`
+
+const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data
+ (user_id, room_id, notification_count, highlight_count)
+ VALUES ($1, $2, $3, $4)
+ ON CONFLICT (user_id, room_id)
+ DO UPDATE SET notification_count = $3, highlight_count = $4
+ RETURNING id`
+
+const selectUserUnreadNotificationCountsSQL = `SELECT
+ id, room_id, notification_count, highlight_count
+ FROM syncapi_notification_data
+ WHERE
+ user_id = $1 AND
+ id BETWEEN $2 + 1 AND $3`
+
+const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
+
+func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
+ err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
+ return
+}
+
+func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
+ rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed")
+
+ roomCounts := map[string]*eventutil.NotificationData{}
+ for rows.Next() {
+ var id types.StreamPosition
+ var roomID string
+ var notificationCount, highlightCount int
+
+ if err = rows.Scan(&id, &roomID, &notificationCount, &highlightCount); err != nil {
+ return nil, err
+ }
+
+ roomCounts[roomID] = &eventutil.NotificationData{
+ RoomID: roomID,
+ UnreadNotificationCount: notificationCount,
+ UnreadHighlightCount: highlightCount,
+ }
+ }
+ return roomCounts, rows.Err()
+}
+
+func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
+ var id int64
+ err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
+ return id, err
+}
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 6f4e7749..60fe5b54 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil {
return nil, err
}
+ notificationData, err := NewPostgresNotificationDataTable(d.db)
+ if err != nil {
+ return nil, err
+ }
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@@ -110,6 +114,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
SendToDevice: sendToDevice,
Receipts: receipts,
Memberships: memberships,
+ NotificationData: notificationData,
}
return &d, nil
}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 819851b3..87d7c6df 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -48,6 +48,7 @@ type Database struct {
Filter tables.Filter
Receipts tables.Receipts
Memberships tables.Memberships
+ NotificationData tables.NotificationData
}
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@@ -102,6 +103,14 @@ func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.S
return types.StreamPosition(id), nil
}
+func (d *Database) MaxStreamPositionForNotificationData(ctx context.Context) (types.StreamPosition, error) {
+ id, err := d.NotificationData.SelectMaxID(ctx)
+ if err != nil {
+ return 0, fmt.Errorf("d.NotificationData.SelectMaxID: %w", err)
+ }
+ return types.StreamPosition(id), nil
+}
+
func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) {
return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart, excludeEventIDs)
}
@@ -956,6 +965,18 @@ func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, stream
return receipts, err
}
+func (d *Database) UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
+ err = d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
+ pos, err = d.NotificationData.UpsertRoomUnreadCounts(ctx, userID, roomID, notificationCount, highlightCount)
+ return err
+ })
+ return
+}
+
+func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID string, from, to types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
+ return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to)
+}
+
func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
}
diff --git a/syncapi/storage/sqlite3/notification_data_table.go b/syncapi/storage/sqlite3/notification_data_table.go
new file mode 100644
index 00000000..4b3f074d
--- /dev/null
+++ b/syncapi/storage/sqlite3/notification_data_table.go
@@ -0,0 +1,108 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+func NewSqliteNotificationDataTable(db *sql.DB) (tables.NotificationData, error) {
+ _, err := db.Exec(notificationDataSchema)
+ if err != nil {
+ return nil, err
+ }
+ r := &notificationDataStatements{}
+ return r, sqlutil.StatementList{
+ {&r.upsertRoomUnreadCounts, upsertRoomUnreadNotificationCountsSQL},
+ {&r.selectUserUnreadCounts, selectUserUnreadNotificationCountsSQL},
+ {&r.selectMaxID, selectMaxNotificationIDSQL},
+ }.Prepare(db)
+}
+
+type notificationDataStatements struct {
+ upsertRoomUnreadCounts *sql.Stmt
+ selectUserUnreadCounts *sql.Stmt
+ selectMaxID *sql.Stmt
+}
+
+const notificationDataSchema = `
+CREATE TABLE IF NOT EXISTS syncapi_notification_data (
+ id INTEGER PRIMARY KEY,
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ notification_count BIGINT NOT NULL DEFAULT 0,
+ highlight_count BIGINT NOT NULL DEFAULT 0,
+ CONSTRAINT syncapi_notifications_unique UNIQUE (user_id, room_id)
+);`
+
+const upsertRoomUnreadNotificationCountsSQL = `INSERT INTO syncapi_notification_data
+ (user_id, room_id, notification_count, highlight_count)
+ VALUES ($1, $2, $3, $4)
+ ON CONFLICT (user_id, room_id)
+ DO UPDATE SET notification_count = $3, highlight_count = $4
+ RETURNING id`
+
+const selectUserUnreadNotificationCountsSQL = `SELECT
+ id, room_id, notification_count, highlight_count
+ FROM syncapi_notification_data
+ WHERE
+ user_id = $1 AND
+ id BETWEEN $2 + 1 AND $3`
+
+const selectMaxNotificationIDSQL = `SELECT CASE COUNT(*) WHEN 0 THEN 0 ELSE MAX(id) END FROM syncapi_notification_data`
+
+func (r *notificationDataStatements) UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (pos types.StreamPosition, err error) {
+ err = r.upsertRoomUnreadCounts.QueryRowContext(ctx, userID, roomID, notificationCount, highlightCount).Scan(&pos)
+ return
+}
+
+func (r *notificationDataStatements) SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error) {
+ rows, err := r.selectUserUnreadCounts.QueryContext(ctx, userID, fromExcl, toIncl)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectUserUnreadCounts: rows.close() failed")
+
+ roomCounts := map[string]*eventutil.NotificationData{}
+ for rows.Next() {
+ var id types.StreamPosition
+ var roomID string
+ var notificationCount, highlightCount int
+
+ if err = rows.Scan(&id, &roomID, &notificationCount, &highlightCount); err != nil {
+ return nil, err
+ }
+
+ roomCounts[roomID] = &eventutil.NotificationData{
+ RoomID: roomID,
+ UnreadNotificationCount: notificationCount,
+ UnreadHighlightCount: highlightCount,
+ }
+ }
+ return roomCounts, rows.Err()
+}
+
+func (r *notificationDataStatements) SelectMaxID(ctx context.Context) (int64, error) {
+ var id int64
+ err := r.selectMaxID.QueryRowContext(ctx).Scan(&id)
+ return id, err
+}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index 581ee692..1b256f91 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -62,16 +62,19 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3"
+
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
+
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3"
+
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectMaxEventIDSQL = "" +
@@ -85,6 +88,7 @@ const selectStateInRangeSQL = "" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" +
" AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
+
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const deleteEventsForRoomSQL = "" +
@@ -95,10 +99,12 @@ const selectContextEventSQL = "" +
const selectContextBeforeEventSQL = "" +
"SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2"
+
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectContextAfterEventSQL = "" +
"SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2"
+
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
type outputRoomEventsStatements struct {
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index 706d43f8..f5ae9fdd 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil {
return err
}
+ notificationData, err := NewSqliteNotificationDataTable(d.db)
+ if err != nil {
+ return err
+ }
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@@ -120,6 +124,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
SendToDevice: sendToDevice,
Receipts: receipts,
Memberships: memberships,
+ NotificationData: notificationData,
}
return nil
}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 1d807ee6..1ebb4265 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -19,6 +19,7 @@ import (
"database/sql"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@@ -171,3 +172,9 @@ type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error)
}
+
+type NotificationData interface {
+ UpsertRoomUnreadCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
+ SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
+ SelectMaxID(ctx context.Context) (int64, error)
+}
diff --git a/syncapi/streams/stream_notificationdata.go b/syncapi/streams/stream_notificationdata.go
new file mode 100644
index 00000000..8ba9e07c
--- /dev/null
+++ b/syncapi/streams/stream_notificationdata.go
@@ -0,0 +1,55 @@
+package streams
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+type NotificationDataStreamProvider struct {
+ StreamProvider
+}
+
+func (p *NotificationDataStreamProvider) Setup() {
+ p.StreamProvider.Setup()
+
+ id, err := p.DB.MaxStreamPositionForNotificationData(context.Background())
+ if err != nil {
+ panic(err)
+ }
+ p.latest = id
+}
+
+func (p *NotificationDataStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+}
+
+func (p *NotificationDataStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) types.StreamPosition {
+ // We want counts for all possible rooms, so always start from zero.
+ countsByRoom, err := p.DB.GetUserUnreadNotificationCounts(ctx, req.Device.UserID, from, to)
+ if err != nil {
+ req.Log.WithError(err).Error("GetUserUnreadNotificationCounts failed")
+ return from
+ }
+
+ // We're merely decorating existing rooms. Note that the Join map
+ // values are not pointers.
+ for roomID, jr := range req.Response.Rooms.Join {
+ counts := countsByRoom[roomID]
+ if counts == nil {
+ continue
+ }
+
+ jr.UnreadNotifications.HighlightCount = counts.UnreadHighlightCount
+ jr.UnreadNotifications.NotificationCount = counts.UnreadNotificationCount
+ req.Response.Rooms.Join[roomID] = jr
+ }
+ return to
+}
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index c71095af..17951acb 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -12,13 +12,14 @@ import (
)
type Streams struct {
- PDUStreamProvider types.StreamProvider
- TypingStreamProvider types.StreamProvider
- ReceiptStreamProvider types.StreamProvider
- InviteStreamProvider types.StreamProvider
- SendToDeviceStreamProvider types.StreamProvider
- AccountDataStreamProvider types.StreamProvider
- DeviceListStreamProvider types.StreamProvider
+ PDUStreamProvider types.StreamProvider
+ TypingStreamProvider types.StreamProvider
+ ReceiptStreamProvider types.StreamProvider
+ InviteStreamProvider types.StreamProvider
+ SendToDeviceStreamProvider types.StreamProvider
+ AccountDataStreamProvider types.StreamProvider
+ DeviceListStreamProvider types.StreamProvider
+ NotificationDataStreamProvider types.StreamProvider
}
func NewSyncStreamProviders(
@@ -47,6 +48,9 @@ func NewSyncStreamProviders(
StreamProvider: StreamProvider{DB: d},
userAPI: userAPI,
},
+ NotificationDataStreamProvider: &NotificationDataStreamProvider{
+ StreamProvider: StreamProvider{DB: d},
+ },
DeviceListStreamProvider: &DeviceListStreamProvider{
StreamProvider: StreamProvider{DB: d},
rsAPI: rsAPI,
@@ -60,6 +64,7 @@ func NewSyncStreamProviders(
streams.InviteStreamProvider.Setup()
streams.SendToDeviceStreamProvider.Setup()
streams.AccountDataStreamProvider.Setup()
+ streams.NotificationDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
return streams
@@ -67,12 +72,13 @@ func NewSyncStreamProviders(
func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
return types.StreamingToken{
- PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
- TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
- ReceiptPosition: s.ReceiptStreamProvider.LatestPosition(ctx),
- InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
- SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
- AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
- DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
+ PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
+ TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
+ ReceiptPosition: s.ReceiptStreamProvider.LatestPosition(ctx),
+ InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
+ SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
+ AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
+ NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx),
+ DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
}
}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index ca35951a..2c9920d1 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -189,7 +189,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
}
} else {
- syncReq.Log.Debugln("Responding to sync immediately")
+ syncReq.Log.WithField("currentPos", currentPos).Debugln("Responding to sync immediately")
}
if syncReq.Since.IsEmpty() {
@@ -213,6 +213,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
+ NotificationDataPosition: rp.streams.NotificationDataStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
@@ -244,6 +247,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq,
syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
),
+ NotificationDataPosition: rp.streams.NotificationDataStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.NotificationDataPosition, currentPos.NotificationDataPosition,
+ ),
DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 72462459..cb9890ff 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/consumers"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/producers"
"github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
@@ -64,6 +65,18 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
+ userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
+ JetStream: js,
+ Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
+ }
+
+ userAPIReadUpdateProducer := &producers.UserAPIReadProducer{
+ JetStream: js,
+ Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate),
+ }
+
+ _ = userAPIReadUpdateProducer
+
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier,
@@ -75,7 +88,7 @@ func AddPublicRoutes(
roomConsumer := consumers.NewOutputRoomEventConsumer(
process, cfg, js, syncDB, notifier, streams.PDUStreamProvider,
- streams.InviteStreamProvider, rsAPI,
+ streams.InviteStreamProvider, rsAPI, userAPIStreamEventProducer,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
@@ -83,11 +96,19 @@ func AddPublicRoutes(
clientConsumer := consumers.NewOutputClientDataConsumer(
process, cfg, js, syncDB, notifier, streams.AccountDataStreamProvider,
+ userAPIReadUpdateProducer,
)
if err = clientConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start client data consumer")
}
+ notificationConsumer := consumers.NewOutputNotificationDataConsumer(
+ process, cfg, js, syncDB, notifier, streams.NotificationDataStreamProvider,
+ )
+ if err = notificationConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start notification data consumer")
+ }
+
typingConsumer := consumers.NewOutputTypingEventConsumer(
process, cfg, js, syncDB, eduCache, notifier, streams.TypingStreamProvider,
)
@@ -104,6 +125,7 @@ func AddPublicRoutes(
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
process, cfg, js, syncDB, notifier, streams.ReceiptStreamProvider,
+ userAPIReadUpdateProducer,
)
if err = receiptConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start receipts consumer")
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index c2e8ed01..4150e6c9 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -95,13 +95,14 @@ const (
)
type StreamingToken struct {
- PDUPosition StreamPosition
- TypingPosition StreamPosition
- ReceiptPosition StreamPosition
- SendToDevicePosition StreamPosition
- InvitePosition StreamPosition
- AccountDataPosition StreamPosition
- DeviceListPosition StreamPosition
+ PDUPosition StreamPosition
+ TypingPosition StreamPosition
+ ReceiptPosition StreamPosition
+ SendToDevicePosition StreamPosition
+ InvitePosition StreamPosition
+ AccountDataPosition StreamPosition
+ DeviceListPosition StreamPosition
+ NotificationDataPosition StreamPosition
}
// This will be used as a fallback by json.Marshal.
@@ -117,10 +118,11 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string {
posStr := fmt.Sprintf(
- "s%d_%d_%d_%d_%d_%d_%d",
+ "s%d_%d_%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
- t.InvitePosition, t.AccountDataPosition, t.DeviceListPosition,
+ t.InvitePosition, t.AccountDataPosition,
+ t.DeviceListPosition, t.NotificationDataPosition,
)
return posStr
}
@@ -142,12 +144,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.DeviceListPosition > other.DeviceListPosition:
return true
+ case t.NotificationDataPosition > other.NotificationDataPosition:
+ return true
}
return false
}
func (t *StreamingToken) IsEmpty() bool {
- return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition == 0
+ return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition == 0
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@@ -185,6 +189,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.DeviceListPosition > t.DeviceListPosition {
t.DeviceListPosition = other.DeviceListPosition
}
+ if other.NotificationDataPosition > t.NotificationDataPosition {
+ t.NotificationDataPosition = other.NotificationDataPosition
+ }
}
type TopologyToken struct {
@@ -277,7 +284,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
// s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions
tok = strings.Split(tok, ".")[0]
parts := strings.Split(tok[1:], "_")
- var positions [7]StreamPosition
+ var positions [8]StreamPosition
for i, p := range parts {
if i >= len(positions) {
break
@@ -291,13 +298,14 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
positions[i] = StreamPosition(pos)
}
token = StreamingToken{
- PDUPosition: positions[0],
- TypingPosition: positions[1],
- ReceiptPosition: positions[2],
- SendToDevicePosition: positions[3],
- InvitePosition: positions[4],
- AccountDataPosition: positions[5],
- DeviceListPosition: positions[6],
+ PDUPosition: positions[0],
+ TypingPosition: positions[1],
+ ReceiptPosition: positions[2],
+ SendToDevicePosition: positions[3],
+ InvitePosition: positions[4],
+ AccountDataPosition: positions[5],
+ DeviceListPosition: positions[6],
+ NotificationDataPosition: positions[7],
}
return token, nil
}
@@ -383,6 +391,10 @@ type JoinResponse struct {
AccountData struct {
Events []gomatrixserverlib.ClientEvent `json:"events"`
} `json:"account_data"`
+ UnreadNotifications struct {
+ HighlightCount int `json:"highlight_count"`
+ NotificationCount int `json:"notification_count"`
+ } `json:"unread_notifications"`
}
// NewJoinResponse creates an empty response with initialised arrays.
@@ -462,3 +474,16 @@ type Peek struct {
New bool
Deleted bool
}
+
+type ReadUpdate struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ Read StreamPosition `json:"read,omitempty"`
+ FullyRead StreamPosition `json:"fully_read,omitempty"`
+}
+
+// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
+type StreamedEvent struct {
+ Event *gomatrixserverlib.HeaderedEvent `json:"event"`
+ StreamPosition StreamPosition `json:"stream_position"`
+}
diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go
index cda178b3..ff78bfb9 100644
--- a/syncapi/types/types_test.go
+++ b/syncapi/types/types_test.go
@@ -9,10 +9,10 @@ import (
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
- "s4_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0}.String(),
- "s3_1_0_0_0_0_2": StreamingToken{3, 1, 0, 0, 0, 0, 2}.String(),
- "s3_1_2_3_5_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0}.String(),
- "t3_1": TopologyToken{3, 1}.String(),
+ "s4_0_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0}.String(),
+ "s3_1_0_0_0_0_2_0": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0}.String(),
+ "s3_1_2_3_5_0_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0}.String(),
+ "t3_1": TopologyToken{3, 1}.String(),
}
for a, b := range shouldPass {