diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-09-27 15:01:34 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-27 15:01:34 +0200 |
commit | 249b32c4f3ee2e01e6f89435e0c7a5786d2ae3a1 (patch) | |
tree | 1fc229f6a4bafa88afd7c1db3eda3ff3a59b021e /userapi/consumers/clientapi.go | |
parent | f18bce93cc3e7e5f57ebc55d309360b7f8703553 (diff) |
Refactor notifications (#2688)
This PR changes the handling of notifications
- removes the `StreamEvent` and `ReadUpdate` stream
- listens on the `OutputRoomEvent` stream in the UserAPI to inform the
SyncAPI about unread notifications
- listens on the `OutputReceiptEvent` stream in the UserAPI to set
receipts/update notifications
- sets the `read_markers` directly from within the internal UserAPI
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'userapi/consumers/clientapi.go')
-rw-r--r-- | userapi/consumers/clientapi.go | 127 |
1 files changed, 127 insertions, 0 deletions
diff --git a/userapi/consumers/clientapi.go b/userapi/consumers/clientapi.go new file mode 100644 index 00000000..c220d35c --- /dev/null +++ b/userapi/consumers/clientapi.go @@ -0,0 +1,127 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + + "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + log "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/internal/pushgateway" + "github.com/matrix-org/dendrite/userapi/storage" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/dendrite/userapi/producers" + "github.com/matrix-org/dendrite/userapi/util" +) + +// OutputReceiptEventConsumer consumes events that originated in the clientAPI. +type OutputReceiptEventConsumer struct { + ctx context.Context + jetstream nats.JetStreamContext + durable string + topic string + db storage.Database + serverName gomatrixserverlib.ServerName + syncProducer *producers.SyncAPI + pgClient pushgateway.Client +} + +// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer. +// Call Start() to begin consuming from the EDU server. +func NewOutputReceiptEventConsumer( + process *process.ProcessContext, + cfg *config.UserAPI, + js nats.JetStreamContext, + store storage.Database, + syncProducer *producers.SyncAPI, + pgClient pushgateway.Client, +) *OutputReceiptEventConsumer { + return &OutputReceiptEventConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent), + durable: cfg.Matrix.JetStream.Durable("UserAPIReceiptConsumer"), + db: store, + serverName: cfg.Matrix.ServerName, + syncProducer: syncProducer, + pgClient: pgClient, + } +} + +// Start consuming receipts events. +func (s *OutputReceiptEventConsumer) Start() error { + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, 1, + s.onMessage, nats.DeliverAll(), nats.ManualAck(), + ) +} + +func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { + msg := msgs[0] // Guaranteed to exist if onMessage is called + + userID := msg.Header.Get(jetstream.UserID) + roomID := msg.Header.Get(jetstream.RoomID) + readPos := msg.Header.Get(jetstream.EventID) + evType := msg.Header.Get("type") + + if readPos == "" || evType != "m.read" { + return true + } + + log := log.WithFields(log.Fields{ + "room_id": roomID, + "user_id": userID, + }) + + localpart, domain, err := gomatrixserverlib.SplitID('@', userID) + if err != nil { + log.WithError(err).Error("userapi clientapi consumer: SplitID failure") + return true + } + if domain != s.serverName { + return true + } + + metadata, err := msg.Metadata() + if err != nil { + return false + } + + updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, uint64(gomatrixserverlib.AsTimestamp(metadata.Timestamp)), true) + if err != nil { + log.WithError(err).Error("userapi EDU consumer") + return false + } + + if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil { + log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed") + return false + } + + if !updated { + return true + } + if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil { + log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed") + return false + } + + return true +} |