aboutsummaryrefslogtreecommitdiff
path: root/userapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-09-27 15:01:34 +0200
committerGitHub <noreply@github.com>2022-09-27 15:01:34 +0200
commit249b32c4f3ee2e01e6f89435e0c7a5786d2ae3a1 (patch)
tree1fc229f6a4bafa88afd7c1db3eda3ff3a59b021e /userapi
parentf18bce93cc3e7e5f57ebc55d309360b7f8703553 (diff)
Refactor notifications (#2688)
This PR changes the handling of notifications - removes the `StreamEvent` and `ReadUpdate` stream - listens on the `OutputRoomEvent` stream in the UserAPI to inform the SyncAPI about unread notifications - listens on the `OutputReceiptEvent` stream in the UserAPI to set receipts/update notifications - sets the `read_markers` directly from within the internal UserAPI Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'userapi')
-rw-r--r--userapi/consumers/clientapi.go127
-rw-r--r--userapi/consumers/roomserver.go (renamed from userapi/consumers/syncapi_streamevent.go)76
-rw-r--r--userapi/consumers/roomserver_test.go (renamed from userapi/consumers/syncapi_streamevent_test.go)2
-rw-r--r--userapi/consumers/syncapi_readupdate.go137
-rw-r--r--userapi/internal/api.go46
-rw-r--r--userapi/producers/syncapi.go7
-rw-r--r--userapi/storage/interface.go6
-rw-r--r--userapi/storage/postgres/notifications_table.go51
-rw-r--r--userapi/storage/postgres/pusher_table.go5
-rw-r--r--userapi/storage/shared/storage.go6
-rw-r--r--userapi/storage/sqlite3/notifications_table.go51
-rw-r--r--userapi/storage/sqlite3/pusher_table.go5
-rw-r--r--userapi/storage/storage_test.go11
-rw-r--r--userapi/storage/tables/interface.go6
-rw-r--r--userapi/userapi.go11
15 files changed, 271 insertions, 276 deletions
diff --git a/userapi/consumers/clientapi.go b/userapi/consumers/clientapi.go
new file mode 100644
index 00000000..c220d35c
--- /dev/null
+++ b/userapi/consumers/clientapi.go
@@ -0,0 +1,127 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/internal/pushgateway"
+ "github.com/matrix-org/dendrite/userapi/storage"
+
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/userapi/producers"
+ "github.com/matrix-org/dendrite/userapi/util"
+)
+
+// OutputReceiptEventConsumer consumes events that originated in the clientAPI.
+type OutputReceiptEventConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ serverName gomatrixserverlib.ServerName
+ syncProducer *producers.SyncAPI
+ pgClient pushgateway.Client
+}
+
+// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
+// Call Start() to begin consuming from the EDU server.
+func NewOutputReceiptEventConsumer(
+ process *process.ProcessContext,
+ cfg *config.UserAPI,
+ js nats.JetStreamContext,
+ store storage.Database,
+ syncProducer *producers.SyncAPI,
+ pgClient pushgateway.Client,
+) *OutputReceiptEventConsumer {
+ return &OutputReceiptEventConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ durable: cfg.Matrix.JetStream.Durable("UserAPIReceiptConsumer"),
+ db: store,
+ serverName: cfg.Matrix.ServerName,
+ syncProducer: syncProducer,
+ pgClient: pgClient,
+ }
+}
+
+// Start consuming receipts events.
+func (s *OutputReceiptEventConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, 1,
+ s.onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+}
+
+func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+
+ userID := msg.Header.Get(jetstream.UserID)
+ roomID := msg.Header.Get(jetstream.RoomID)
+ readPos := msg.Header.Get(jetstream.EventID)
+ evType := msg.Header.Get("type")
+
+ if readPos == "" || evType != "m.read" {
+ return true
+ }
+
+ log := log.WithFields(log.Fields{
+ "room_id": roomID,
+ "user_id": userID,
+ })
+
+ localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ log.WithError(err).Error("userapi clientapi consumer: SplitID failure")
+ return true
+ }
+ if domain != s.serverName {
+ return true
+ }
+
+ metadata, err := msg.Metadata()
+ if err != nil {
+ return false
+ }
+
+ updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, uint64(gomatrixserverlib.AsTimestamp(metadata.Timestamp)), true)
+ if err != nil {
+ log.WithError(err).Error("userapi EDU consumer")
+ return false
+ }
+
+ if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil {
+ log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed")
+ return false
+ }
+
+ if !updated {
+ return true
+ }
+ if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
+ log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed")
+ return false
+ }
+
+ return true
+}
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/roomserver.go
index f3b2bf27..952de98f 100644
--- a/userapi/consumers/syncapi_streamevent.go
+++ b/userapi/consumers/roomserver.go
@@ -26,7 +26,7 @@ import (
"github.com/matrix-org/dendrite/userapi/util"
)
-type OutputStreamEventConsumer struct {
+type OutputRoomEventConsumer struct {
ctx context.Context
cfg *config.UserAPI
rsAPI rsapi.UserRoomserverAPI
@@ -38,7 +38,7 @@ type OutputStreamEventConsumer struct {
syncProducer *producers.SyncAPI
}
-func NewOutputStreamEventConsumer(
+func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.UserAPI,
js nats.JetStreamContext,
@@ -46,21 +46,21 @@ func NewOutputStreamEventConsumer(
pgClient pushgateway.Client,
rsAPI rsapi.UserRoomserverAPI,
syncProducer *producers.SyncAPI,
-) *OutputStreamEventConsumer {
- return &OutputStreamEventConsumer{
+) *OutputRoomEventConsumer {
+ return &OutputRoomEventConsumer{
ctx: process.Context(),
cfg: cfg,
jetstream: js,
db: store,
- durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
+ durable: cfg.Matrix.JetStream.Durable("UserAPIRoomServerConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
pgClient: pgClient,
rsAPI: rsAPI,
syncProducer: syncProducer,
}
}
-func (s *OutputStreamEventConsumer) Start() error {
+func (s *OutputRoomEventConsumer) Start() error {
if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, 1,
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
@@ -70,35 +70,43 @@ func (s *OutputStreamEventConsumer) Start() error {
return nil
}
-func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
- var output types.StreamedEvent
- output.Event = &gomatrixserverlib.HeaderedEvent{}
+ var output rsapi.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
- log.WithError(err).Errorf("userapi consumer: message parse failure")
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
}
- if output.Event.Event == nil {
+ if output.Type != rsapi.OutputTypeNewRoomEvent {
+ return true
+ }
+ event := output.NewRoomEvent.Event
+ if event == nil {
log.Errorf("userapi consumer: expected event")
return true
}
log.WithFields(log.Fields{
- "event_id": output.Event.EventID(),
- "event_type": output.Event.Type(),
- "stream_pos": output.StreamPosition,
- }).Tracef("Received message from sync API: %#v", output)
+ "event_id": event.EventID(),
+ "event_type": event.Type(),
+ }).Tracef("Received message from roomserver: %#v", output)
+
+ metadata, err := msg.Metadata()
+ if err != nil {
+ return true
+ }
- if err := s.processMessage(ctx, output.Event, int64(output.StreamPosition)); err != nil {
+ if err := s.processMessage(ctx, event, uint64(gomatrixserverlib.AsTimestamp(metadata.Timestamp))); err != nil {
log.WithFields(log.Fields{
- "event_id": output.Event.EventID(),
+ "event_id": event.EventID(),
}).WithError(err).Errorf("userapi consumer: process room event failure")
}
return true
}
-func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos int64) error {
+func (s *OutputRoomEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, streamPos uint64) error {
members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
if err != nil {
return fmt.Errorf("s.localRoomMembers: %w", err)
@@ -138,10 +146,10 @@ func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *g
// removing it means we can send all notifications to
// e.g. Element's Push gateway in one go.
for _, mem := range members {
- if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil {
+ if err := s.notifyLocal(ctx, event, mem, roomSize, roomName, streamPos); err != nil {
log.WithFields(log.Fields{
"localpart": mem.Localpart,
- }).WithError(err).Debugf("Unable to push to local user")
+ }).WithError(err).Error("Unable to push to local user")
continue
}
}
@@ -179,7 +187,7 @@ func newLocalMembership(event *gomatrixserverlib.ClientEvent) (*localMembership,
// localRoomMembers fetches the current local members of a room, and
// the total number of members.
-func (s *OutputStreamEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) {
+func (s *OutputRoomEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) {
req := &rsapi.QueryMembershipsForRoomRequest{
RoomID: roomID,
JoinedOnly: true,
@@ -219,7 +227,7 @@ func (s *OutputStreamEventConsumer) localRoomMembers(ctx context.Context, roomID
// looks it up in roomserver. If there is no name,
// m.room.canonical_alias is consulted. Returns an empty string if the
// room has no name.
-func (s *OutputStreamEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) {
+func (s *OutputRoomEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) {
if event.Type() == gomatrixserverlib.MRoomName {
name, err := unmarshalRoomName(event)
if err != nil {
@@ -287,7 +295,7 @@ func unmarshalCanonicalAlias(event *gomatrixserverlib.HeaderedEvent) (string, er
}
// notifyLocal finds the right push actions for a local user, given an event.
-func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos int64, mem *localMembership, roomSize int, roomName string) error {
+func (s *OutputRoomEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int, roomName string, streamPos uint64) error {
actions, err := s.evaluatePushRules(ctx, event, mem, roomSize)
if err != nil {
return err
@@ -302,7 +310,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
"event_id": event.EventID(),
"room_id": event.RoomID(),
"localpart": mem.Localpart,
- }).Debugf("Push rule evaluation rejected the event")
+ }).Tracef("Push rule evaluation rejected the event")
return nil
}
@@ -325,7 +333,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
RoomID: event.RoomID(),
TS: gomatrixserverlib.AsTimestamp(time.Now()),
}
- if err = s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), pos, tweaks, n); err != nil {
+ if err = s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), streamPos, tweaks, n); err != nil {
return err
}
@@ -345,7 +353,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
"localpart": mem.Localpart,
"num_urls": len(devicesByURLAndFormat),
"num_unread": userNumUnreadNotifs,
- }).Debugf("Notifying single member")
+ }).Trace("Notifying single member")
// Push gateways are out of our control, and we cannot risk
// looking up the server on a misbehaving push gateway. Each user
@@ -396,7 +404,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
// evaluatePushRules fetches and evaluates the push rules of a local
// user. Returns actions (including dont_notify).
-func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) {
+func (s *OutputRoomEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) {
if event.Sender() == mem.UserID {
// SPEC: Homeservers MUST NOT notify the Push Gateway for
// events that the user has sent themselves.
@@ -447,7 +455,7 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
"room_id": event.RoomID(),
"localpart": mem.Localpart,
"rule_id": rule.RuleID,
- }).Tracef("Matched a push rule")
+ }).Trace("Matched a push rule")
return rule.Actions, nil
}
@@ -491,7 +499,7 @@ func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, err
// localPushDevices pushes to the configured devices of a local
// user. The map keys are [url][format].
-func (s *OutputStreamEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) {
+func (s *OutputRoomEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) {
pusherDevices, err := util.GetPushDevices(ctx, localpart, tweaks, s.db)
if err != nil {
return nil, "", err
@@ -515,7 +523,7 @@ func (s *OutputStreamEventConsumer) localPushDevices(ctx context.Context, localp
}
// notifyHTTP performs a notificatation to a Push Gateway.
-func (s *OutputStreamEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) {
+func (s *OutputRoomEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) {
logger := log.WithFields(log.Fields{
"event_id": event.EventID(),
"url": url,
@@ -561,13 +569,13 @@ func (s *OutputStreamEventConsumer) notifyHTTP(ctx context.Context, event *gomat
}
}
- logger.Debugf("Notifying push gateway %s", url)
+ logger.Tracef("Notifying push gateway %s", url)
var res pushgateway.NotifyResponse
if err := s.pgClient.Notify(ctx, url, &req, &res); err != nil {
logger.WithError(err).Errorf("Failed to notify push gateway %s", url)
return nil, err
}
- logger.WithField("num_rejected", len(res.Rejected)).Tracef("Push gateway result")
+ logger.WithField("num_rejected", len(res.Rejected)).Trace("Push gateway result")
if len(res.Rejected) == 0 {
return nil, nil
@@ -589,7 +597,7 @@ func (s *OutputStreamEventConsumer) notifyHTTP(ctx context.Context, event *gomat
}
// deleteRejectedPushers deletes the pushers associated with the given devices.
-func (s *OutputStreamEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) {
+func (s *OutputRoomEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) {
log.WithFields(log.Fields{
"localpart": localpart,
"app_id0": devices[0].AppID,
diff --git a/userapi/consumers/syncapi_streamevent_test.go b/userapi/consumers/roomserver_test.go
index 48ea0fe1..3bbeb439 100644
--- a/userapi/consumers/syncapi_streamevent_test.go
+++ b/userapi/consumers/roomserver_test.go
@@ -40,7 +40,7 @@ func Test_evaluatePushRules(t *testing.T) {
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
db, close := mustCreateDatabase(t, dbType)
defer close()
- consumer := OutputStreamEventConsumer{db: db}
+ consumer := OutputRoomEventConsumer{db: db}
testCases := []struct {
name string
diff --git a/userapi/consumers/syncapi_readupdate.go b/userapi/consumers/syncapi_readupdate.go
deleted file mode 100644
index 54654f75..00000000
--- a/userapi/consumers/syncapi_readupdate.go
+++ /dev/null
@@ -1,137 +0,0 @@
-package consumers
-
-import (
- "context"
- "encoding/json"
-
- "github.com/matrix-org/dendrite/internal/pushgateway"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/setup/process"
- "github.com/matrix-org/dendrite/syncapi/types"
- uapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/dendrite/userapi/producers"
- "github.com/matrix-org/dendrite/userapi/storage"
- "github.com/matrix-org/dendrite/userapi/util"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
-)
-
-type OutputReadUpdateConsumer struct {
- ctx context.Context
- cfg *config.UserAPI
- jetstream nats.JetStreamContext
- durable string
- db storage.Database
- pgClient pushgateway.Client
- ServerName gomatrixserverlib.ServerName
- topic string
- userAPI uapi.UserInternalAPI
- syncProducer *producers.SyncAPI
-}
-
-func NewOutputReadUpdateConsumer(
- process *process.ProcessContext,
- cfg *config.UserAPI,
- js nats.JetStreamContext,
- store storage.Database,
- pgClient pushgateway.Client,
- userAPI uapi.UserInternalAPI,
- syncProducer *producers.SyncAPI,
-) *OutputReadUpdateConsumer {
- return &OutputReadUpdateConsumer{
- ctx: process.Context(),
- cfg: cfg,
- jetstream: js,
- db: store,
- ServerName: cfg.Matrix.ServerName,
- durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
- pgClient: pgClient,
- userAPI: userAPI,
- syncProducer: syncProducer,
- }
-}
-
-func (s *OutputReadUpdateConsumer) Start() error {
- if err := jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, 1,
- s.onMessage, nats.DeliverAll(), nats.ManualAck(),
- ); err != nil {
- return err
- }
- return nil
-}
-
-func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
- msg := msgs[0] // Guaranteed to exist if onMessage is called
- var read types.ReadUpdate
- if err := json.Unmarshal(msg.Data, &read); err != nil {
- log.WithError(err).Error("userapi clientapi consumer: message parse failure")
- return true
- }
- if read.FullyRead == 0 && read.Read == 0 {
- return true
- }
-
- userID := string(msg.Header.Get(jetstream.UserID))
- roomID := string(msg.Header.Get(jetstream.RoomID))
-
- localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
- if err != nil {
- log.WithError(err).Error("userapi clientapi consumer: SplitID failure")
- return true
- }
- if domain != s.ServerName {
- log.Error("userapi clientapi consumer: not a local user")
- return true
- }
-
- log := log.WithFields(log.Fields{
- "room_id": roomID,
- "user_id": userID,
- })
- log.Tracef("Received read update from sync API: %#v", read)
-
- if read.Read > 0 {
- updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, int64(read.Read), true)
- if err != nil {
- log.WithError(err).Error("userapi EDU consumer")
- return false
- }
-
- if updated {
- if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil {
- log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed")
- return false
- }
- if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
- log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed")
- return false
- }
- }
- }
-
- if read.FullyRead > 0 {
- deleted, err := s.db.DeleteNotificationsUpTo(ctx, localpart, roomID, int64(read.FullyRead))
- if err != nil {
- log.WithError(err).Errorf("userapi clientapi consumer: DeleteNotificationsUpTo failed")
- return false
- }
-
- if deleted {
- if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
- log.WithError(err).Error("userapi clientapi consumer: NotifyUserCounts failed")
- return false
- }
-
- if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, read.RoomID); err != nil {
- log.WithError(err).Errorf("userapi clientapi consumer: GetAndSendNotificationData failed")
- return false
- }
- }
- }
-
- return true
-}
diff --git a/userapi/internal/api.go b/userapi/internal/api.go
index dcbb7361..3e761a88 100644
--- a/userapi/internal/api.go
+++ b/userapi/internal/api.go
@@ -30,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
@@ -39,6 +40,7 @@ import (
"github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables"
+ userapiUtil "github.com/matrix-org/dendrite/userapi/util"
)
type UserInternalAPI struct {
@@ -51,6 +53,7 @@ type UserInternalAPI struct {
AppServices []config.ApplicationService
KeyAPI keyapi.UserKeyAPI
RSAPI rsapi.UserRoomserverAPI
+ PgClient pushgateway.Client
}
func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error {
@@ -73,6 +76,11 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
ignoredUsers = &synctypes.IgnoredUsers{}
_ = json.Unmarshal(req.AccountData, ignoredUsers)
}
+ if req.DataType == "m.fully_read" {
+ if err := a.setFullyRead(ctx, req); err != nil {
+ return err
+ }
+ }
if err := a.SyncProducer.SendAccountData(req.UserID, eventutil.AccountData{
RoomID: req.RoomID,
Type: req.DataType,
@@ -84,6 +92,44 @@ func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAc
return nil
}
+func (a *UserInternalAPI) setFullyRead(ctx context.Context, req *api.InputAccountDataRequest) error {
+ var output eventutil.ReadMarkerJSON
+
+ if err := json.Unmarshal(req.AccountData, &output); err != nil {
+ return err
+ }
+ localpart, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
+ if err != nil {
+ logrus.WithError(err).Error("UserInternalAPI.setFullyRead: SplitID failure")
+ return nil
+ }
+ if domain != a.ServerName {
+ return nil
+ }
+
+ deleted, err := a.DB.DeleteNotificationsUpTo(ctx, localpart, req.RoomID, uint64(gomatrixserverlib.AsTimestamp(time.Now())))
+ if err != nil {
+ logrus.WithError(err).Errorf("UserInternalAPI.setFullyRead: DeleteNotificationsUpTo failed")
+ return err
+ }
+
+ if err = a.SyncProducer.GetAndSendNotificationData(ctx, req.UserID, req.RoomID); err != nil {
+ logrus.WithError(err).Error("UserInternalAPI.setFullyRead: GetAndSendNotificationData failed")
+ return err
+ }
+
+ // nothing changed, no need to notify the push gateway
+ if !deleted {
+ return nil
+ }
+
+ if err = userapiUtil.NotifyUserCountsAsync(ctx, a.PgClient, localpart, a.DB); err != nil {
+ logrus.WithError(err).Error("UserInternalAPI.setFullyRead: NotifyUserCounts failed")
+ return err
+ }
+ return nil
+}
+
func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.PerformAccountCreationRequest, res *api.PerformAccountCreationResponse) error {
acc, err := a.DB.CreateAccount(ctx, req.Localpart, req.Password, req.AppServiceID, req.AccountType)
if err != nil {
diff --git a/userapi/producers/syncapi.go b/userapi/producers/syncapi.go
index 27cfc284..f556ea35 100644
--- a/userapi/producers/syncapi.go
+++ b/userapi/producers/syncapi.go
@@ -4,12 +4,13 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/dendrite/internal/eventutil"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/userapi/storage"
)
type JetStreamPublisher interface {
diff --git a/userapi/storage/interface.go b/userapi/storage/interface.go
index fbac463e..02efe7af 100644
--- a/userapi/storage/interface.go
+++ b/userapi/storage/interface.go
@@ -119,9 +119,9 @@ type ThreePID interface {
}
type Notification interface {
- InsertNotification(ctx context.Context, localpart, eventID string, pos int64, tweaks map[string]interface{}, n *api.Notification) error
- DeleteNotificationsUpTo(ctx context.Context, localpart, roomID string, pos int64) (affected bool, err error)
- SetNotificationsRead(ctx context.Context, localpart, roomID string, pos int64, read bool) (affected bool, err error)
+ InsertNotification(ctx context.Context, localpart, eventID string, pos uint64, tweaks map[string]interface{}, n *api.Notification) error
+ DeleteNotificationsUpTo(ctx context.Context, localpart, roomID string, pos uint64) (affected bool, err error)
+ SetNotificationsRead(ctx context.Context, localpart, roomID string, pos uint64, read bool) (affected bool, err error)
GetNotifications(ctx context.Context, localpart string, fromID int64, limit int, filter tables.NotificationFilter) ([]*api.Notification, int64, error)
GetNotificationCount(ctx context.Context, localpart string, filter tables.NotificationFilter) (int64, error)
GetRoomNotificationCounts(ctx context.Context, localpart, roomID string) (total int64, highlight int64, _ error)
diff --git a/userapi/storage/postgres/notifications_table.go b/userapi/storage/postgres/notifications_table.go
index a27c1125..24a30b2f 100644
--- a/userapi/storage/postgres/notifications_table.go
+++ b/userapi/storage/postgres/notifications_table.go
@@ -20,12 +20,13 @@ import (
"encoding/json"
"time"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
- "github.com/matrix-org/gomatrixserverlib"
- log "github.com/sirupsen/logrus"
)
type notificationsStatements struct {
@@ -110,7 +111,7 @@ func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error
}
// Insert inserts a notification into the database.
-func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
+func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos uint64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS
nn := *n
// Clears out fields that have their own columns to (1) shrink the
@@ -126,7 +127,7 @@ func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, local
}
// DeleteUpTo deletes all previous notifications, up to and including the event.
-func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) {
+func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos uint64) (affected bool, _ error) {
res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, pos)
if err != nil {
return false, err
@@ -140,7 +141,7 @@ func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, l
}
// UpdateRead updates the "read" value for an event.
-func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) {
+func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos uint64, v bool) (affected bool, _ error) {
res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, pos)
if err != nil {
return false, err
@@ -196,40 +197,12 @@ func (s *notificationsStatements) Select(ctx context.Context, txn *sql.Tx, local
return notifs, maxID, rows.Err()
}
-func (s *notificationsStatements) SelectCount(ctx context.Context, txn *sql.Tx, localpart string, filter tables.NotificationFilter) (int64, error) {
- rows, err := sqlutil.TxStmt(txn, s.selectCountStmt).QueryContext(ctx, localpart, uint32(filter))
-
- if err != nil {
- return 0, err
- }
- defer internal.CloseAndLogIfError(ctx, rows, "notifications.Select: rows.Close() failed")
-
- if rows.Next() {
- var count int64
- if err := rows.Scan(&count); err != nil {
- return 0, err
- }
-
- return count, nil
- }
- return 0, rows.Err()
+func (s *notificationsStatements) SelectCount(ctx context.Context, txn *sql.Tx, localpart string, filter tables.NotificationFilter) (count int64, err error) {
+ err = sqlutil.TxStmt(txn, s.selectCountStmt).QueryRowContext(ctx, localpart, uint32(filter)).Scan(&count)
+ return
}
-func (s *notificationsStatements) SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, _ error) {
- rows, err := sqlutil.TxStmt(txn, s.selectRoomCountsStmt).QueryContext(ctx, localpart, roomID)
-
- if err != nil {
- return 0, 0, err
- }
- defer internal.CloseAndLogIfError(ctx, rows, "notifications.Select: rows.Close() failed")
-
- if rows.Next() {
- var total, highlight int64
- if err := rows.Scan(&total, &highlight); err != nil {
- return 0, 0, err
- }
-
- return total, highlight, nil
- }
- return 0, 0, rows.Err()
+func (s *notificationsStatements) SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, err error) {
+ err = sqlutil.TxStmt(txn, s.selectRoomCountsStmt).QueryRowContext(ctx, localpart, roomID).Scan(&total, &highlight)
+ return
}
diff --git a/userapi/storage/postgres/pusher_table.go b/userapi/storage/postgres/pusher_table.go
index 2eb379ae..6fb714fb 100644
--- a/userapi/storage/postgres/pusher_table.go
+++ b/userapi/storage/postgres/pusher_table.go
@@ -19,11 +19,12 @@ import (
"database/sql"
"encoding/json"
+ "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
- "github.com/sirupsen/logrus"
)
// See https://matrix.org/docs/spec/client_server/r0.6.1#get-matrix-client-r0-pushers
@@ -136,7 +137,7 @@ func (s *pushersStatements) SelectPushers(
pushers = append(pushers, pusher)
}
- logrus.Debugf("Database returned %d pushers", len(pushers))
+ logrus.Tracef("Database returned %d pushers", len(pushers))
return pushers, rows.Err()
}
diff --git a/userapi/storage/shared/storage.go b/userapi/storage/shared/storage.go
index e32a442d..3ff299f1 100644
--- a/userapi/storage/shared/storage.go
+++ b/userapi/storage/shared/storage.go
@@ -700,13 +700,13 @@ func (d *Database) GetLoginTokenDataByToken(ctx context.Context, token string) (
return d.LoginTokens.SelectLoginToken(ctx, token)
}
-func (d *Database) InsertNotification(ctx context.Context, localpart, eventID string, pos int64, tweaks map[string]interface{}, n *api.Notification) error {
+func (d *Database) InsertNotification(ctx context.Context, localpart, eventID string, pos uint64, tweaks map[string]interface{}, n *api.Notification) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
return d.Notifications.Insert(ctx, txn, localpart, eventID, pos, pushrules.BoolTweakOr(tweaks, pushrules.HighlightTweak, false), n)
})
}
-func (d *Database) DeleteNotificationsUpTo(ctx context.Context, localpart, roomID string, pos int64) (affected bool, err error) {
+func (d *Database) DeleteNotificationsUpTo(ctx context.Context, localpart, roomID string, pos uint64) (affected bool, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
affected, err = d.Notifications.DeleteUpTo(ctx, txn, localpart, roomID, pos)
return err
@@ -714,7 +714,7 @@ func (d *Database) DeleteNotificationsUpTo(ctx context.Context, localpart, roomI
return
}
-func (d *Database) SetNotificationsRead(ctx context.Context, localpart, roomID string, pos int64, b bool) (affected bool, err error) {
+func (d *Database) SetNotificationsRead(ctx context.Context, localpart, roomID string, pos uint64, b bool) (affected bool, err error) {
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
affected, err = d.Notifications.UpdateRead(ctx, txn, localpart, roomID, pos, b)
return err
diff --git a/userapi/storage/sqlite3/notifications_table.go b/userapi/storage/sqlite3/notifications_table.go
index df826025..a35ec7be 100644
--- a/userapi/storage/sqlite3/notifications_table.go
+++ b/userapi/storage/sqlite3/notifications_table.go
@@ -20,12 +20,13 @@ import (
"encoding/json"
"time"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
- "github.com/matrix-org/gomatrixserverlib"
- log "github.com/sirupsen/logrus"
)
type notificationsStatements struct {
@@ -110,7 +111,7 @@ func (s *notificationsStatements) Clean(ctx context.Context, txn *sql.Tx) error
}
// Insert inserts a notification into the database.
-func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error {
+func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos uint64, highlight bool, n *api.Notification) error {
roomID, tsMS := n.RoomID, n.TS
nn := *n
// Clears out fields that have their own columns to (1) shrink the
@@ -126,7 +127,7 @@ func (s *notificationsStatements) Insert(ctx context.Context, txn *sql.Tx, local
}
// DeleteUpTo deletes all previous notifications, up to and including the event.
-func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error) {
+func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos uint64) (affected bool, _ error) {
res, err := sqlutil.TxStmt(txn, s.deleteUpToStmt).ExecContext(ctx, localpart, roomID, pos)
if err != nil {
return false, err
@@ -140,7 +141,7 @@ func (s *notificationsStatements) DeleteUpTo(ctx context.Context, txn *sql.Tx, l
}
// UpdateRead updates the "read" value for an event.
-func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error) {
+func (s *notificationsStatements) UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos uint64, v bool) (affected bool, _ error) {
res, err := sqlutil.TxStmt(txn, s.updateReadStmt).ExecContext(ctx, v, localpart, roomID, pos)
if err != nil {
return false, err
@@ -196,40 +197,12 @@ func (s *notificationsStatements) Select(ctx context.Context, txn *sql.Tx, local
return notifs, maxID, rows.Err()
}
-func (s *notificationsStatements) SelectCount(ctx context.Context, txn *sql.Tx, localpart string, filter tables.NotificationFilter) (int64, error) {
- rows, err := sqlutil.TxStmt(txn, s.selectCountStmt).QueryContext(ctx, localpart, uint32(filter))
-
- if err != nil {
- return 0, err
- }
- defer internal.CloseAndLogIfError(ctx, rows, "notifications.Select: rows.Close() failed")
-
- if rows.Next() {
- var count int64
- if err := rows.Scan(&count); err != nil {
- return 0, err
- }
-
- return count, nil
- }
- return 0, rows.Err()
+func (s *notificationsStatements) SelectCount(ctx context.Context, txn *sql.Tx, localpart string, filter tables.NotificationFilter) (count int64, err error) {
+ err = sqlutil.TxStmt(txn, s.selectCountStmt).QueryRowContext(ctx, localpart, uint32(filter)).Scan(&count)
+ return
}
-func (s *notificationsStatements) SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, _ error) {
- rows, err := sqlutil.TxStmt(txn, s.selectRoomCountsStmt).QueryContext(ctx, localpart, roomID)
-
- if err != nil {
- return 0, 0, err
- }
- defer internal.CloseAndLogIfError(ctx, rows, "notifications.Select: rows.Close() failed")
-
- if rows.Next() {
- var total, highlight int64
- if err := rows.Scan(&total, &highlight); err != nil {
- return 0, 0, err
- }
-
- return total, highlight, nil
- }
- return 0, 0, rows.Err()
+func (s *notificationsStatements) SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, err error) {
+ err = sqlutil.TxStmt(txn, s.selectRoomCountsStmt).QueryRowContext(ctx, localpart, roomID).Scan(&total, &highlight)
+ return
}
diff --git a/userapi/storage/sqlite3/pusher_table.go b/userapi/storage/sqlite3/pusher_table.go
index dba97c3d..4de0a9f0 100644
--- a/userapi/storage/sqlite3/pusher_table.go
+++ b/userapi/storage/sqlite3/pusher_table.go
@@ -19,11 +19,12 @@ import (
"database/sql"
"encoding/json"
+ "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/tables"
- "github.com/sirupsen/logrus"
)
// See https://matrix.org/docs/spec/client_server/r0.6.1#get-matrix-client-r0-pushers
@@ -136,7 +137,7 @@ func (s *pushersStatements) SelectPushers(
pushers = append(pushers, pusher)
}
- logrus.Debugf("Database returned %d pushers", len(pushers))
+ logrus.Tracef("Database returned %d pushers", len(pushers))
return pushers, rows.Err()
}
diff --git a/userapi/storage/storage_test.go b/userapi/storage/storage_test.go
index a2609733..ca7c1bfd 100644
--- a/userapi/storage/storage_test.go
+++ b/userapi/storage/storage_test.go
@@ -7,6 +7,11 @@ import (
"testing"
"time"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/stretchr/testify/assert"
+ "golang.org/x/crypto/bcrypt"
+
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/internal/pushrules"
"github.com/matrix-org/dendrite/setup/config"
@@ -14,10 +19,6 @@ import (
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
- "github.com/stretchr/testify/assert"
- "golang.org/x/crypto/bcrypt"
)
const loginTokenLifetime = time.Minute
@@ -513,7 +514,7 @@ func Test_Notification(t *testing.T) {
RoomID: roomID,
TS: gomatrixserverlib.AsTimestamp(ts),
}
- err = db.InsertNotification(ctx, aliceLocalpart, eventID, int64(i+1), nil, notification)
+ err = db.InsertNotification(ctx, aliceLocalpart, eventID, uint64(i+1), nil, notification)
assert.NoError(t, err, "unable to insert notification")
}
diff --git a/userapi/storage/tables/interface.go b/userapi/storage/tables/interface.go
index 2fe95567..cc428799 100644
--- a/userapi/storage/tables/interface.go
+++ b/userapi/storage/tables/interface.go
@@ -105,9 +105,9 @@ type PusherTable interface {
type NotificationTable interface {
Clean(ctx context.Context, txn *sql.Tx) error
- Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos int64, highlight bool, n *api.Notification) error
- DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64) (affected bool, _ error)
- UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos int64, v bool) (affected bool, _ error)
+ Insert(ctx context.Context, txn *sql.Tx, localpart, eventID string, pos uint64, highlight bool, n *api.Notification) error
+ DeleteUpTo(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos uint64) (affected bool, _ error)
+ UpdateRead(ctx context.Context, txn *sql.Tx, localpart, roomID string, pos uint64, v bool) (affected bool, _ error)
Select(ctx context.Context, txn *sql.Tx, localpart string, fromID int64, limit int, filter NotificationFilter) ([]*api.Notification, int64, error)
SelectCount(ctx context.Context, txn *sql.Tx, localpart string, filter NotificationFilter) (int64, error)
SelectRoomCounts(ctx context.Context, txn *sql.Tx, localpart, roomID string) (total int64, highlight int64, _ error)
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 23855a89..d26b4e19 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -81,16 +81,17 @@ func NewInternalAPI(
KeyAPI: keyAPI,
RSAPI: rsAPI,
DisableTLSValidation: cfg.PushGatewayDisableTLSValidation,
+ PgClient: pgClient,
}
- readConsumer := consumers.NewOutputReadUpdateConsumer(
- base.ProcessContext, cfg, js, db, pgClient, userAPI, syncProducer,
+ receiptConsumer := consumers.NewOutputReceiptEventConsumer(
+ base.ProcessContext, cfg, js, db, syncProducer, pgClient,
)
- if err := readConsumer.Start(); err != nil {
- logrus.WithError(err).Panic("failed to start user API read update consumer")
+ if err := receiptConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start user API receipt consumer")
}
- eventConsumer := consumers.NewOutputStreamEventConsumer(
+ eventConsumer := consumers.NewOutputRoomEventConsumer(
base.ProcessContext, cfg, js, db, pgClient, rsAPI, syncProducer,
)
if err := eventConsumer.Start(); err != nil {