aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/presence.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/presence.go')
-rw-r--r--syncapi/consumers/presence.go158
1 files changed, 158 insertions, 0 deletions
diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go
new file mode 100644
index 00000000..b198b229
--- /dev/null
+++ b/syncapi/consumers/presence.go
@@ -0,0 +1,158 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "strconv"
+
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
+)
+
+// OutputTypingEventConsumer consumes events that originated in the EDU server.
+type PresenceConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ nats *nats.Conn
+ durable string
+ requestTopic string
+ presenceTopic string
+ db storage.Database
+ stream types.StreamProvider
+ notifier *notifier.Notifier
+ deviceAPI api.UserDeviceAPI
+ cfg *config.SyncAPI
+}
+
+// NewPresenceConsumer creates a new PresenceConsumer.
+// Call Start() to begin consuming events.
+func NewPresenceConsumer(
+ process *process.ProcessContext,
+ cfg *config.SyncAPI,
+ js nats.JetStreamContext,
+ nats *nats.Conn,
+ db storage.Database,
+ notifier *notifier.Notifier,
+ stream types.StreamProvider,
+ deviceAPI api.UserDeviceAPI,
+) *PresenceConsumer {
+ return &PresenceConsumer{
+ ctx: process.Context(),
+ nats: nats,
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIPresenceConsumer"),
+ presenceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ requestTopic: cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence),
+ db: db,
+ notifier: notifier,
+ stream: stream,
+ deviceAPI: deviceAPI,
+ cfg: cfg,
+ }
+}
+
+// Start consuming typing events.
+func (s *PresenceConsumer) Start() error {
+ // Normal NATS subscription, used by Request/Reply
+ _, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
+ userID := msg.Header.Get(jetstream.UserID)
+ presence, err := s.db.GetPresence(context.Background(), userID)
+ m := &nats.Msg{
+ Header: nats.Header{},
+ }
+ if err != nil {
+ m.Header.Set("error", err.Error())
+ if err = msg.RespondMsg(m); err != nil {
+ logrus.WithError(err).Error("Unable to respond to messages")
+ }
+ return
+ }
+
+ deviceRes := api.QueryDevicesResponse{}
+ if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
+ m.Header.Set("error", err.Error())
+ if err = msg.RespondMsg(m); err != nil {
+ logrus.WithError(err).Error("Unable to respond to messages")
+ }
+ return
+ }
+
+ for i := range deviceRes.Devices {
+ if int64(presence.LastActiveTS) < deviceRes.Devices[i].LastSeenTS {
+ presence.LastActiveTS = gomatrixserverlib.Timestamp(deviceRes.Devices[i].LastSeenTS)
+ }
+ }
+
+ m.Header.Set(jetstream.UserID, presence.UserID)
+ m.Header.Set("presence", presence.ClientFields.Presence)
+ m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
+ m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS)))
+
+ if err = msg.RespondMsg(m); err != nil {
+ logrus.WithError(err).Error("Unable to respond to messages")
+ return
+ }
+ })
+ if err != nil {
+ return err
+ }
+ if !s.cfg.Matrix.Presence.EnableInbound && !s.cfg.Matrix.Presence.EnableOutbound {
+ return nil
+ }
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
+ )
+}
+
+func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ userID := msg.Header.Get(jetstream.UserID)
+ presence := msg.Header.Get("presence")
+ timestamp := msg.Header.Get("last_active_ts")
+ fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
+
+ logrus.Debugf("syncAPI received presence event: %+v", msg.Header)
+
+ ts, err := strconv.Atoi(timestamp)
+ if err != nil {
+ return true
+ }
+
+ var statusMsg *string = nil
+ if data, ok := msg.Header["status_msg"]; ok && len(data) > 0 {
+ newMsg := msg.Header.Get("status_msg")
+ statusMsg = &newMsg
+ }
+ // OK is already checked, so no need to do it again
+ p, _ := types.PresenceFromString(presence)
+ pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
+ if err != nil {
+ return true
+ }
+
+ s.stream.Advance(pos)
+ s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID)
+
+ return true
+}