aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--build/docker/config/dendrite.yaml11
-rw-r--r--clientapi/clientapi.go5
-rw-r--r--clientapi/producers/syncapi.go18
-rw-r--r--clientapi/routing/presence.go138
-rw-r--r--clientapi/routing/routing.go36
-rw-r--r--cmd/generate-config/main.go4
-rw-r--r--dendrite-config.yaml7
-rw-r--r--federationapi/consumers/presence.go143
-rw-r--r--federationapi/federationapi.go7
-rw-r--r--federationapi/producers/syncapi.go19
-rw-r--r--federationapi/routing/send.go56
-rw-r--r--federationapi/types/types.go12
-rw-r--r--setup/config/config_global.go11
-rw-r--r--setup/jetstream/streams.go8
-rw-r--r--syncapi/consumers/presence.go158
-rw-r--r--syncapi/notifier/notifier.go58
-rw-r--r--syncapi/notifier/notifier_test.go21
-rw-r--r--syncapi/producers/federationapi_presence.go48
-rw-r--r--syncapi/storage/interface.go8
-rw-r--r--syncapi/storage/postgres/presence_table.go162
-rw-r--r--syncapi/storage/postgres/syncserver.go5
-rw-r--r--syncapi/storage/shared/syncserver.go17
-rw-r--r--syncapi/storage/sqlite3/presence_table.go177
-rw-r--r--syncapi/storage/sqlite3/stream_id_table.go8
-rw-r--r--syncapi/storage/sqlite3/syncserver.go5
-rw-r--r--syncapi/storage/tables/interface.go7
-rw-r--r--syncapi/streams/stream_presence.go179
-rw-r--r--syncapi/streams/streams.go10
-rw-r--r--syncapi/sync/requestpool.go88
-rw-r--r--syncapi/sync/requestpool_test.go128
-rw-r--r--syncapi/syncapi.go25
-rw-r--r--syncapi/types/presence.go75
-rw-r--r--syncapi/types/presence_string.go26
-rw-r--r--syncapi/types/presence_test.go42
-rw-r--r--syncapi/types/types.go14
-rw-r--r--syncapi/types/types_test.go8
-rw-r--r--sytest-whitelist14
-rw-r--r--userapi/api/api.go14
38 files changed, 1706 insertions, 66 deletions
diff --git a/build/docker/config/dendrite.yaml b/build/docker/config/dendrite.yaml
index 25cbd6d8..e3a0316d 100644
--- a/build/docker/config/dendrite.yaml
+++ b/build/docker/config/dendrite.yaml
@@ -62,6 +62,17 @@ global:
- matrix.org
- vector.im
+ # Disables federation. Dendrite will not be able to make any outbound HTTP requests
+ # to other servers and the federation API will not be exposed.
+ disable_federation: false
+
+ # Configures the handling of presence events.
+ presence:
+ # Whether inbound presence events are allowed, e.g. receiving presence events from other servers
+ enable_inbound: false
+ # Whether outbound presence events are allowed, e.g. sending presence events to other servers
+ enable_outbound: false
+
# Configuration for NATS JetStream
jetstream:
# A list of NATS Server addresses to connect to. If none are specified, an
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index d4b417a3..e2f8d3f3 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -48,7 +48,7 @@ func AddPublicRoutes(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
- js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
+ js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
@@ -56,6 +56,7 @@ func AddPublicRoutes(
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
UserAPI: userAPI,
ServerName: cfg.Matrix.ServerName,
}
@@ -64,6 +65,6 @@ func AddPublicRoutes(
router, synapseAdminRouter, cfg, rsAPI, asAPI,
userAPI, userDirectoryProvider, federation,
syncProducer, transactionsCache, fsAPI, keyAPI,
- extRoomsProvider, mscCfg,
+ extRoomsProvider, mscCfg, natsClient,
)
}
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 2dee04e3..6e869c31 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
+ "time"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -34,6 +35,7 @@ type SyncAPIProducer struct {
TopicReceiptEvent string
TopicSendToDeviceEvent string
TopicTypingEvent string
+ TopicPresenceEvent string
JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
@@ -173,3 +175,19 @@ func (p *SyncAPIProducer) SendTyping(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}
+
+func (p *SyncAPIProducer) SendPresence(
+ ctx context.Context, userID string, presence types.Presence, statusMsg *string,
+) error {
+ m := nats.NewMsg(p.TopicPresenceEvent)
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set("presence", presence.String())
+ if statusMsg != nil {
+ m.Header.Set("status_msg", *statusMsg)
+ }
+
+ m.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
+
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}
diff --git a/clientapi/routing/presence.go b/clientapi/routing/presence.go
new file mode 100644
index 00000000..63fbb75e
--- /dev/null
+++ b/clientapi/routing/presence.go
@@ -0,0 +1,138 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package routing
+
+import (
+ "fmt"
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/httputil"
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/clientapi/producers"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+type presenceReq struct {
+ Presence string `json:"presence"`
+ StatusMsg *string `json:"status_msg,omitempty"`
+}
+
+func SetPresence(
+ req *http.Request,
+ cfg *config.ClientAPI,
+ device *api.Device,
+ producer *producers.SyncAPIProducer,
+ userID string,
+) util.JSONResponse {
+ if !cfg.Matrix.Presence.EnableOutbound {
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: struct{}{},
+ }
+ }
+ if device.UserID != userID {
+ return util.JSONResponse{
+ Code: http.StatusForbidden,
+ JSON: jsonerror.Forbidden("Unable to set presence for other user."),
+ }
+ }
+ var presence presenceReq
+ parseErr := httputil.UnmarshalJSONRequest(req, &presence)
+ if parseErr != nil {
+ return *parseErr
+ }
+
+ presenceStatus, ok := types.PresenceFromString(presence.Presence)
+ if !ok {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", presence.Presence)),
+ }
+ }
+
+ err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg)
+ if err != nil {
+ log.WithError(err).Errorf("failed to update presence")
+ return util.JSONResponse{
+ Code: http.StatusInternalServerError,
+ JSON: jsonerror.InternalServerError(),
+ }
+ }
+
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: struct{}{},
+ }
+}
+
+func GetPresence(
+ req *http.Request,
+ device *api.Device,
+ natsClient *nats.Conn,
+ presenceTopic string,
+ userID string,
+) util.JSONResponse {
+ msg := nats.NewMsg(presenceTopic)
+ msg.Header.Set(jetstream.UserID, userID)
+
+ presence, err := natsClient.RequestMsg(msg, time.Second*10)
+ if err != nil {
+ log.WithError(err).Errorf("unable to get presence")
+ return util.JSONResponse{
+ Code: http.StatusInternalServerError,
+ JSON: jsonerror.InternalServerError(),
+ }
+ }
+
+ statusMsg := presence.Header.Get("status_msg")
+ e := presence.Header.Get("error")
+ if e != "" {
+ log.Errorf("received error msg from nats: %s", e)
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: types.PresenceClientResponse{
+ Presence: types.PresenceUnavailable.String(),
+ },
+ }
+ }
+ lastActive, err := strconv.Atoi(presence.Header.Get("last_active_ts"))
+ if err != nil {
+ return util.JSONResponse{
+ Code: http.StatusInternalServerError,
+ JSON: jsonerror.InternalServerError(),
+ }
+ }
+
+ p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
+ currentlyActive := p.CurrentlyActive()
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: types.PresenceClientResponse{
+ CurrentlyActive: &currentlyActive,
+ LastActiveAgo: p.LastActiveAgo(),
+ Presence: presence.Header.Get("presence"),
+ StatusMsg: &statusMsg,
+ },
+ }
+}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index db860dcd..32e83187 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -32,9 +32,11 @@ import (
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
@@ -56,7 +58,7 @@ func Setup(
federationSender federationAPI.FederationInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
- mscCfg *config.MSCs,
+ mscCfg *config.MSCs, natsClient *nats.Conn,
) {
rateLimits := httputil.NewRateLimits(&cfg.RateLimiting)
userInteractiveAuth := auth.NewUserInteractive(userAPI, cfg)
@@ -779,20 +781,6 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
- // Element logs get flooded unless this is handled
- v3mux.Handle("/presence/{userID}/status",
- httputil.MakeExternalAPI("presence", func(req *http.Request) util.JSONResponse {
- if r := rateLimits.Limit(req); r != nil {
- return *r
- }
- // TODO: Set presence (probably the responsibility of a presence server not clientapi)
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: struct{}{},
- }
- }),
- ).Methods(http.MethodPut, http.MethodOptions)
-
v3mux.Handle("/voip/turnServer",
httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
@@ -1308,4 +1296,22 @@ func Setup(
return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"])
}),
).Methods(http.MethodPost, http.MethodOptions)
+ v3mux.Handle("/presence/{userId}/status",
+ httputil.MakeAuthAPI("set_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
+ if err != nil {
+ return util.ErrorResponse(err)
+ }
+ return SetPresence(req, cfg, device, syncProducer, vars["userId"])
+ }),
+ ).Methods(http.MethodPut, http.MethodOptions)
+ v3mux.Handle("/presence/{userId}/status",
+ httputil.MakeAuthAPI("get_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
+ if err != nil {
+ return util.ErrorResponse(err)
+ }
+ return GetPresence(req, device, natsClient, cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence), vars["userId"])
+ }),
+ ).Methods(http.MethodGet, http.MethodOptions)
}
diff --git a/cmd/generate-config/main.go b/cmd/generate-config/main.go
index ba5a87a7..24085afa 100644
--- a/cmd/generate-config/main.go
+++ b/cmd/generate-config/main.go
@@ -91,6 +91,10 @@ func main() {
cfg.UserAPI.BCryptCost = bcrypt.MinCost
cfg.Global.JetStream.InMemory = true
cfg.ClientAPI.RegistrationSharedSecret = "complement"
+ cfg.Global.Presence = config.PresenceOptions{
+ EnableInbound: true,
+ EnableOutbound: true,
+ }
}
j, err := yaml.Marshal(cfg)
diff --git a/dendrite-config.yaml b/dendrite-config.yaml
index 8b4c820a..47f08c4f 100644
--- a/dendrite-config.yaml
+++ b/dendrite-config.yaml
@@ -68,6 +68,13 @@ global:
# to other servers and the federation API will not be exposed.
disable_federation: false
+ # Configures the handling of presence events.
+ presence:
+ # Whether inbound presence events are allowed, e.g. receiving presence events from other servers
+ enable_inbound: false
+ # Whether outbound presence events are allowed, e.g. sending presence events to other servers
+ enable_outbound: false
+
# Server notices allows server admins to send messages to all users.
server_notices:
enabled: false
diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go
new file mode 100644
index 00000000..bfce1b28
--- /dev/null
+++ b/federationapi/consumers/presence.go
@@ -0,0 +1,143 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+ "strconv"
+
+ "github.com/matrix-org/dendrite/federationapi/queue"
+ "github.com/matrix-org/dendrite/federationapi/storage"
+ fedTypes "github.com/matrix-org/dendrite/federationapi/types"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// OutputReceiptConsumer consumes events that originate in the clientapi.
+type OutputPresenceConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ ServerName gomatrixserverlib.ServerName
+ topic string
+ outboundPresenceEnabled bool
+}
+
+// NewOutputPresenceConsumer creates a new OutputPresenceConsumer. Call Start() to begin consuming events.
+func NewOutputPresenceConsumer(
+ process *process.ProcessContext,
+ cfg *config.FederationAPI,
+ js nats.JetStreamContext,
+ queues *queue.OutgoingQueues,
+ store storage.Database,
+) *OutputPresenceConsumer {
+ return &OutputPresenceConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ queues: queues,
+ db: store,
+ ServerName: cfg.Matrix.ServerName,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
+ }
+}
+
+// Start consuming from the clientapi
+func (t *OutputPresenceConsumer) Start() error {
+ if !t.outboundPresenceEnabled {
+ return nil
+ }
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
+ )
+}
+
+// onMessage is called in response to a message received on the presence
+// events topic from the client api.
+func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ // only send presence events which originated from us
+ userID := msg.Header.Get(jetstream.UserID)
+ _, serverName, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender")
+ return true
+ }
+ if serverName != t.ServerName {
+ return true
+ }
+
+ presence := msg.Header.Get("presence")
+
+ ts, err := strconv.Atoi(msg.Header.Get("last_active_ts"))
+ if err != nil {
+ return true
+ }
+
+ joined, err := t.db.GetAllJoinedHosts(ctx)
+ if err != nil {
+ log.WithError(err).Error("failed to get joined hosts")
+ return true
+ }
+ if len(joined) == 0 {
+ return true
+ }
+
+ var statusMsg *string = nil
+ if data, ok := msg.Header["status_msg"]; ok && len(data) > 0 {
+ status := msg.Header.Get("status_msg")
+ statusMsg = &status
+ }
+
+ p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(ts)}
+
+ content := fedTypes.Presence{
+ Push: []fedTypes.PresenceContent{
+ {
+ CurrentlyActive: p.CurrentlyActive(),
+ LastActiveAgo: p.LastActiveAgo(),
+ Presence: presence,
+ StatusMsg: statusMsg,
+ UserID: userID,
+ },
+ },
+ }
+
+ edu := &gomatrixserverlib.EDU{
+ Type: gomatrixserverlib.MPresence,
+ Origin: string(t.ServerName),
+ }
+ if edu.Content, err = json.Marshal(content); err != nil {
+ log.WithError(err).Error("failed to marshal EDU JSON")
+ return true
+ }
+
+ log.Debugf("sending presence EDU to %d servers", len(joined))
+ if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil {
+ log.WithError(err).Error("failed to send EDU")
+ return false
+ }
+
+ return true
+}
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index 8a0ce8e3..5bfe237a 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -66,6 +66,7 @@ func AddPublicRoutes(
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
ServerName: cfg.Matrix.ServerName,
UserAPI: userAPI,
}
@@ -149,5 +150,11 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start key server consumer")
}
+ presenceConsumer := consumers.NewOutputPresenceConsumer(
+ base.ProcessContext, cfg, js, queues, federationDB,
+ )
+ if err = presenceConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start presence consumer")
+ }
return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues, keyRing)
}
diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go
index 24acb126..49415003 100644
--- a/federationapi/producers/syncapi.go
+++ b/federationapi/producers/syncapi.go
@@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
+ "time"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -32,6 +33,7 @@ type SyncAPIProducer struct {
TopicReceiptEvent string
TopicSendToDeviceEvent string
TopicTypingEvent string
+ TopicPresenceEvent string
JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
@@ -142,3 +144,20 @@ func (p *SyncAPIProducer) SendTyping(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}
+
+func (p *SyncAPIProducer) SendPresence(
+ ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveAgo int64,
+) error {
+ m := nats.NewMsg(p.TopicPresenceEvent)
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set("presence", presence.String())
+ if statusMsg != nil {
+ m.Header.Set("status_msg", *statusMsg)
+ }
+ lastActiveTS := gomatrixserverlib.AsTimestamp(time.Now().Add(-(time.Duration(lastActiveAgo) * time.Millisecond)))
+
+ m.Header.Set("last_active_ts", strconv.Itoa(int(lastActiveTS)))
+ log.Debugf("Sending presence to syncAPI: %+v", m.Header)
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index eacc76db..1bba632b 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -30,6 +30,7 @@ import (
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ syncTypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
@@ -127,13 +128,14 @@ func Send(
defer inFlightTxnsPerOrigin.Delete(index)
t := txnReq{
- rsAPI: rsAPI,
- keys: keys,
- federation: federation,
- servers: servers,
- keyAPI: keyAPI,
- roomsMu: mu,
- producer: producer,
+ rsAPI: rsAPI,
+ keys: keys,
+ federation: federation,
+ servers: servers,
+ keyAPI: keyAPI,
+ roomsMu: mu,
+ producer: producer,
+ inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound,
}
var txnEvents struct {
@@ -185,13 +187,14 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
- rsAPI api.RoomserverInternalAPI
- keyAPI keyapi.KeyInternalAPI
- keys gomatrixserverlib.JSONVerifier
- federation txnFederationClient
- roomsMu *internal.MutexByRoom
- servers federationAPI.ServersInRoomProvider
- producer *producers.SyncAPIProducer
+ rsAPI api.RoomserverInternalAPI
+ keyAPI keyapi.KeyInternalAPI
+ keys gomatrixserverlib.JSONVerifier
+ federation txnFederationClient
+ roomsMu *internal.MutexByRoom
+ servers federationAPI.ServersInRoomProvider
+ producer *producers.SyncAPIProducer
+ inboundPresenceEnabled bool
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@@ -389,12 +392,37 @@ func (t *txnReq) processEDUs(ctx context.Context) {
if err := t.processSigningKeyUpdate(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process signing key update")
}
+ case gomatrixserverlib.MPresence:
+ if t.inboundPresenceEnabled {
+ if err := t.processPresence(ctx, e); err != nil {
+ logrus.WithError(err).Errorf("Failed to process presence update")
+ }
+ }
default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
}
}
}
+// processPresence handles m.receipt events
+func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error {
+ payload := types.Presence{}
+ if err := json.Unmarshal(e.Content, &payload); err != nil {
+ return err
+ }
+ for _, content := range payload.Push {
+ presence, ok := syncTypes.PresenceFromString(content.Presence)
+ if !ok {
+ logrus.Warnf("invalid presence '%s', skipping.", content.Presence)
+ continue
+ }
+ if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
var updatePayload keyapi.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
diff --git a/federationapi/types/types.go b/federationapi/types/types.go
index a28a80b2..5821000c 100644
--- a/federationapi/types/types.go
+++ b/federationapi/types/types.go
@@ -66,3 +66,15 @@ type FederationReceiptData struct {
type ReceiptTS struct {
TS gomatrixserverlib.Timestamp `json:"ts"`
}
+
+type Presence struct {
+ Push []PresenceContent `json:"push"`
+}
+
+type PresenceContent struct {
+ CurrentlyActive bool `json:"currently_active,omitempty"`
+ LastActiveAgo int64 `json:"last_active_ago"`
+ Presence string `json:"presence"`
+ StatusMsg *string `json:"status_msg,omitempty"`
+ UserID string `json:"user_id"`
+}
diff --git a/setup/config/config_global.go b/setup/config/config_global.go
index b947f207..c1650f07 100644
--- a/setup/config/config_global.go
+++ b/setup/config/config_global.go
@@ -41,6 +41,9 @@ type Global struct {
// to other servers and the federation API will not be exposed.
DisableFederation bool `yaml:"disable_federation"`
+ // Configures the handling of presence events.
+ Presence PresenceOptions `yaml:"presence"`
+
// List of domains that the server will trust as identity servers to
// verify third-party identifiers.
// Defaults to an empty array.
@@ -225,3 +228,11 @@ func (c *DNSCacheOptions) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkPositive(configErrs, "cache_size", int64(c.CacheSize))
checkPositive(configErrs, "cache_lifetime", int64(c.CacheLifetime))
}
+
+// PresenceOptions defines possible configurations for presence events.
+type PresenceOptions struct {
+ // Whether inbound presence events are allowed
+ EnableInbound bool `yaml:"enable_inbound"`
+ // Whether outbound presence events are allowed
+ EnableOutbound bool `yaml:"enable_outbound"`
+}
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
index 5f0d37fd..6594e694 100644
--- a/setup/jetstream/streams.go
+++ b/setup/jetstream/streams.go
@@ -25,6 +25,8 @@ var (
OutputReceiptEvent = "OutputReceiptEvent"
OutputStreamEvent = "OutputStreamEvent"
OutputReadUpdate = "OutputReadUpdate"
+ RequestPresence = "GetPresence"
+ OutputPresenceEvent = "OutputPresenceEvent"
)
var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+")
@@ -89,4 +91,10 @@ var streams = []*nats.StreamConfig{
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
+ {
+ Name: OutputPresenceEvent,
+ Retention: nats.InterestPolicy,
+ Storage: nats.MemoryStorage,
+ MaxAge: time.Minute * 5,
+ },
}
diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go
new file mode 100644
index 00000000..b198b229
--- /dev/null
+++ b/syncapi/consumers/presence.go
@@ -0,0 +1,158 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "strconv"
+
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
+)
+
+// OutputTypingEventConsumer consumes events that originated in the EDU server.
+type PresenceConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ nats *nats.Conn
+ durable string
+ requestTopic string
+ presenceTopic string
+ db storage.Database
+ stream types.StreamProvider
+ notifier *notifier.Notifier
+ deviceAPI api.UserDeviceAPI
+ cfg *config.SyncAPI
+}
+
+// NewPresenceConsumer creates a new PresenceConsumer.
+// Call Start() to begin consuming events.
+func NewPresenceConsumer(
+ process *process.ProcessContext,
+ cfg *config.SyncAPI,
+ js nats.JetStreamContext,
+ nats *nats.Conn,
+ db storage.Database,
+ notifier *notifier.Notifier,
+ stream types.StreamProvider,
+ deviceAPI api.UserDeviceAPI,
+) *PresenceConsumer {
+ return &PresenceConsumer{
+ ctx: process.Context(),
+ nats: nats,
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIPresenceConsumer"),
+ presenceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ requestTopic: cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence),
+ db: db,
+ notifier: notifier,
+ stream: stream,
+ deviceAPI: deviceAPI,
+ cfg: cfg,
+ }
+}
+
+// Start consuming typing events.
+func (s *PresenceConsumer) Start() error {
+ // Normal NATS subscription, used by Request/Reply
+ _, err := s.nats.Subscribe(s.requestTopic, func(msg *nats.Msg) {
+ userID := msg.Header.Get(jetstream.UserID)
+ presence, err := s.db.GetPresence(context.Background(), userID)
+ m := &nats.Msg{
+ Header: nats.Header{},
+ }
+ if err != nil {
+ m.Header.Set("error", err.Error())
+ if err = msg.RespondMsg(m); err != nil {
+ logrus.WithError(err).Error("Unable to respond to messages")
+ }
+ return
+ }
+
+ deviceRes := api.QueryDevicesResponse{}
+ if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
+ m.Header.Set("error", err.Error())
+ if err = msg.RespondMsg(m); err != nil {
+ logrus.WithError(err).Error("Unable to respond to messages")
+ }
+ return
+ }
+
+ for i := range deviceRes.Devices {
+ if int64(presence.LastActiveTS) < deviceRes.Devices[i].LastSeenTS {
+ presence.LastActiveTS = gomatrixserverlib.Timestamp(deviceRes.Devices[i].LastSeenTS)
+ }
+ }
+
+ m.Header.Set(jetstream.UserID, presence.UserID)
+ m.Header.Set("presence", presence.ClientFields.Presence)
+ m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
+ m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS)))
+
+ if err = msg.RespondMsg(m); err != nil {
+ logrus.WithError(err).Error("Unable to respond to messages")
+ return
+ }
+ })
+ if err != nil {
+ return err
+ }
+ if !s.cfg.Matrix.Presence.EnableInbound && !s.cfg.Matrix.Presence.EnableOutbound {
+ return nil
+ }
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.presenceTopic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
+ )
+}
+
+func (s *PresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ userID := msg.Header.Get(jetstream.UserID)
+ presence := msg.Header.Get("presence")
+ timestamp := msg.Header.Get("last_active_ts")
+ fromSync, _ := strconv.ParseBool(msg.Header.Get("from_sync"))
+
+ logrus.Debugf("syncAPI received presence event: %+v", msg.Header)
+
+ ts, err := strconv.Atoi(timestamp)
+ if err != nil {
+ return true
+ }
+
+ var statusMsg *string = nil
+ if data, ok := msg.Header["status_msg"]; ok && len(data) > 0 {
+ newMsg := msg.Header.Get("status_msg")
+ statusMsg = &newMsg
+ }
+ // OK is already checked, so no need to do it again
+ p, _ := types.PresenceFromString(presence)
+ pos, err := s.db.UpdatePresence(ctx, userID, p, statusMsg, gomatrixserverlib.Timestamp(ts), fromSync)
+ if err != nil {
+ return true
+ }
+
+ s.stream.Advance(pos)
+ s.notifier.OnNewPresence(types.StreamingToken{PresencePosition: pos}, userID)
+
+ return true
+}
diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go
index 6a641e6f..d2b79b63 100644
--- a/syncapi/notifier/notifier.go
+++ b/syncapi/notifier/notifier.go
@@ -43,22 +43,30 @@ type Notifier struct {
userDeviceStreams map[string]map[string]*UserDeviceStream
// The last time we cleaned out stale entries from the userStreams map
lastCleanUpTime time.Time
+ // Protects roomIDToJoinedUsers and roomIDToPeekingDevices
+ mapLock *sync.RWMutex
}
// NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
-func NewNotifier(currPos types.StreamingToken) *Notifier {
+func NewNotifier() *Notifier {
return &Notifier{
- currPos: currPos,
roomIDToJoinedUsers: make(map[string]userIDSet),
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
streamLock: &sync.Mutex{},
+ mapLock: &sync.RWMutex{},
lastCleanUpTime: time.Now(),
}
}
+// SetCurrentPosition sets the current streaming positions.
+// This must be called directly after NewNotifier and initialising the streams.
+func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) {
+ n.currPos = currPos
+}
+
// OnNewEvent is called when a new event is received from the room server. Must only be
// called from a single goroutine, to avoid races between updates which could set the
// current sync position incorrectly.
@@ -83,7 +91,7 @@ func (n *Notifier) OnNewEvent(
if ev != nil {
// Map this event's room_id to a list of joined users, and wake them up.
- usersToNotify := n.joinedUsers(ev.RoomID())
+ usersToNotify := n.JoinedUsers(ev.RoomID())
// Map this event's room_id to a list of peeking devices, and wake them up.
peekingDevicesToNotify := n.PeekingDevices(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
@@ -114,7 +122,7 @@ func (n *Notifier) OnNewEvent(
n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
} else if roomID != "" {
- n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
+ n.wakeupUsers(n.JoinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
} else if len(userIDs) > 0 {
n.wakeupUsers(userIDs, nil, n.currPos)
} else {
@@ -182,7 +190,7 @@ func (n *Notifier) OnNewTyping(
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
- n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
+ n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos)
}
// OnNewReceipt updates the current position
@@ -194,7 +202,7 @@ func (n *Notifier) OnNewReceipt(
defer n.streamLock.Unlock()
n.currPos.ApplyUpdates(posUpdate)
- n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
+ n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos)
}
func (n *Notifier) OnNewKeyChange(
@@ -228,6 +236,28 @@ func (n *Notifier) OnNewNotificationData(
n.wakeupUsers([]string{userID}, nil, n.currPos)
}
+func (n *Notifier) OnNewPresence(
+ posUpdate types.StreamingToken, userID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ sharedUsers := n.SharedUsers(userID)
+ sharedUsers = append(sharedUsers, userID)
+
+ n.wakeupUsers(sharedUsers, nil, n.currPos)
+}
+
+func (n *Notifier) SharedUsers(userID string) (sharedUsers []string) {
+ for roomID, users := range n.roomIDToJoinedUsers {
+ if _, ok := users[userID]; ok {
+ sharedUsers = append(sharedUsers, n.JoinedUsers(roomID)...)
+ }
+ }
+ return sharedUsers
+}
+
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
@@ -250,6 +280,8 @@ func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener {
// Load the membership states required to notify users correctly.
func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
+ n.mapLock.Lock()
+ defer n.mapLock.Unlock()
roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
if err != nil {
return err
@@ -377,6 +409,8 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addJoinedUser(roomID, userID string) {
+ n.mapLock.Lock()
+ defer n.mapLock.Unlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
@@ -385,6 +419,8 @@ func (n *Notifier) addJoinedUser(roomID, userID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) removeJoinedUser(roomID, userID string) {
+ n.mapLock.Lock()
+ defer n.mapLock.Unlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
n.roomIDToJoinedUsers[roomID] = make(userIDSet)
}
@@ -392,7 +428,9 @@ func (n *Notifier) removeJoinedUser(roomID, userID string) {
}
// Not thread-safe: must be called on the OnNewEvent goroutine only
-func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
+func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) {
+ n.mapLock.RLock()
+ defer n.mapLock.RUnlock()
if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
return
}
@@ -401,6 +439,8 @@ func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
+ n.mapLock.Lock()
+ defer n.mapLock.Unlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
}
@@ -410,6 +450,8 @@ func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only
// nolint:unused
func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
+ n.mapLock.Lock()
+ defer n.mapLock.Unlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
}
@@ -419,6 +461,8 @@ func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
// Not thread-safe: must be called on the OnNewEvent goroutine only
func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
+ n.mapLock.RLock()
+ defer n.mapLock.RUnlock()
if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
return
}
diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go
index 60403d5d..8b305586 100644
--- a/syncapi/notifier/notifier_test.go
+++ b/syncapi/notifier/notifier_test.go
@@ -107,7 +107,8 @@ func mustEqualPositions(t *testing.T, got, want types.StreamingToken) {
// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
+ n := NewNotifier()
+ n.SetCurrentPosition(syncPositionBefore)
pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld))
if err != nil {
t.Fatalf("TestImmediateNotification error: %s", err)
@@ -117,7 +118,8 @@ func TestImmediateNotification(t *testing.T) {
// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
+ n := NewNotifier()
+ n.SetCurrentPosition(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -142,7 +144,8 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
}
func TestCorrectStream(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
+ n := NewNotifier()
+ n.SetCurrentPosition(syncPositionBefore)
stream := lockedFetchUserStream(n, bob, bobDev)
if stream.UserID != bob {
t.Fatalf("expected user %q, got %q", bob, stream.UserID)
@@ -153,7 +156,8 @@ func TestCorrectStream(t *testing.T) {
}
func TestCorrectStreamWakeup(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
+ n := NewNotifier()
+ n.SetCurrentPosition(syncPositionBefore)
awoken := make(chan string)
streamone := lockedFetchUserStream(n, alice, "one")
@@ -180,7 +184,8 @@ func TestCorrectStreamWakeup(t *testing.T) {
// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
+ n := NewNotifier()
+ n.SetCurrentPosition(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -236,7 +241,8 @@ func TestEDUWakeup(t *testing.T) {
// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
+ n := NewNotifier()
+ n.SetCurrentPosition(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -272,7 +278,8 @@ func TestMultipleRequestWakeup(t *testing.T) {
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// listen as bob. Make bob leave room. Make alice send event to room.
// Make sure alice gets woken up only and not bob as well.
- n := NewNotifier(syncPositionBefore)
+ n := NewNotifier()
+ n.SetCurrentPosition(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
diff --git a/syncapi/producers/federationapi_presence.go b/syncapi/producers/federationapi_presence.go
new file mode 100644
index 00000000..dc03457e
--- /dev/null
+++ b/syncapi/producers/federationapi_presence.go
@@ -0,0 +1,48 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "strconv"
+ "time"
+
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+)
+
+// FederationAPIPresenceProducer produces events for the federation API server to consume
+type FederationAPIPresenceProducer struct {
+ Topic string
+ JetStream nats.JetStreamContext
+}
+
+func (f *FederationAPIPresenceProducer) SendPresence(
+ userID string, presence types.Presence, statusMsg *string,
+) error {
+ msg := nats.NewMsg(f.Topic)
+ msg.Header.Set(jetstream.UserID, userID)
+ msg.Header.Set("presence", presence.String())
+ msg.Header.Set("from_sync", "true") // only update last_active_ts and presence
+ msg.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
+
+ if statusMsg != nil {
+ msg.Header.Set("status_msg", *statusMsg)
+ }
+
+ _, err := f.JetStream.PublishMsg(msg)
+ return err
+}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 03313ec6..0b3ab235 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -26,6 +26,7 @@ import (
)
type Database interface {
+ Presence
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
@@ -150,3 +151,10 @@ type Database interface {
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
}
+
+type Presence interface {
+ UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error)
+ GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error)
+ PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error)
+ MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
+}
diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go
new file mode 100644
index 00000000..49336c4e
--- /dev/null
+++ b/syncapi/storage/postgres/presence_table.go
@@ -0,0 +1,162 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const presenceSchema = `
+CREATE SEQUENCE IF NOT EXISTS syncapi_presence_id;
+-- Stores data about presence
+CREATE TABLE IF NOT EXISTS syncapi_presence (
+ -- The ID
+ id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_presence_id'),
+ -- The Matrix user ID
+ user_id TEXT NOT NULL,
+ -- The actual presence
+ presence INT NOT NULL,
+ -- The status message
+ status_msg TEXT,
+ -- The last time an action was received by this user
+ last_active_ts BIGINT NOT NULL,
+ CONSTRAINT presence_presences_unique UNIQUE (user_id)
+);
+CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id);
+`
+
+const upsertPresenceSQL = "" +
+ "INSERT INTO syncapi_presence AS p" +
+ " (user_id, presence, status_msg, last_active_ts)" +
+ " VALUES ($1, $2, $3, $4)" +
+ " ON CONFLICT (user_id)" +
+ " DO UPDATE SET id = nextval('syncapi_presence_id')," +
+ " presence = $2, status_msg = COALESCE($3, p.status_msg), last_active_ts = $4" +
+ " RETURNING id"
+
+const upsertPresenceFromSyncSQL = "" +
+ "INSERT INTO syncapi_presence AS p" +
+ " (user_id, presence, last_active_ts)" +
+ " VALUES ($1, $2, $3)" +
+ " ON CONFLICT (user_id)" +
+ " DO UPDATE SET id = nextval('syncapi_presence_id')," +
+ " presence = $2, last_active_ts = $3" +
+ " RETURNING id"
+
+const selectPresenceForUserSQL = "" +
+ "SELECT presence, status_msg, last_active_ts" +
+ " FROM syncapi_presence" +
+ " WHERE user_id = $1 LIMIT 1"
+
+const selectMaxPresenceSQL = "" +
+ "SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
+
+const selectPresenceAfter = "" +
+ " SELECT id, user_id, presence, status_msg, last_active_ts" +
+ " FROM syncapi_presence" +
+ " WHERE id > $1"
+
+type presenceStatements struct {
+ upsertPresenceStmt *sql.Stmt
+ upsertPresenceFromSyncStmt *sql.Stmt
+ selectPresenceForUsersStmt *sql.Stmt
+ selectMaxPresenceStmt *sql.Stmt
+ selectPresenceAfterStmt *sql.Stmt
+}
+
+func NewPostgresPresenceTable(db *sql.DB) (*presenceStatements, error) {
+ _, err := db.Exec(presenceSchema)
+ if err != nil {
+ return nil, err
+ }
+ s := &presenceStatements{}
+ return s, sqlutil.StatementList{
+ {&s.upsertPresenceStmt, upsertPresenceSQL},
+ {&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL},
+ {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
+ {&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
+ {&s.selectPresenceAfterStmt, selectPresenceAfter},
+ }.Prepare(db)
+}
+
+// UpsertPresence creates/updates a presence status.
+func (p *presenceStatements) UpsertPresence(
+ ctx context.Context,
+ txn *sql.Tx,
+ userID string,
+ statusMsg *string,
+ presence types.Presence,
+ lastActiveTS gomatrixserverlib.Timestamp,
+ fromSync bool,
+) (pos types.StreamPosition, err error) {
+ if fromSync {
+ stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
+ err = stmt.QueryRowContext(ctx, userID, presence, lastActiveTS).Scan(&pos)
+ } else {
+ stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
+ err = stmt.QueryRowContext(ctx, userID, presence, statusMsg, lastActiveTS).Scan(&pos)
+ }
+ return
+}
+
+// GetPresenceForUser returns the current presence of a user.
+func (p *presenceStatements) GetPresenceForUser(
+ ctx context.Context, txn *sql.Tx,
+ userID string,
+) (*types.PresenceInternal, error) {
+ result := &types.PresenceInternal{
+ UserID: userID,
+ }
+ stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
+ err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
+ result.ClientFields.Presence = result.Presence.String()
+ return result, err
+}
+
+func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
+ stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&pos)
+ return
+}
+
+// GetPresenceAfter returns the changes presences after a given stream id
+func (p *presenceStatements) GetPresenceAfter(
+ ctx context.Context, txn *sql.Tx,
+ after types.StreamPosition,
+) (presences map[string]*types.PresenceInternal, err error) {
+ presences = make(map[string]*types.PresenceInternal)
+ stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
+
+ rows, err := stmt.QueryContext(ctx, after)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
+ for rows.Next() {
+ qryRes := &types.PresenceInternal{}
+ if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
+ return nil, err
+ }
+ qryRes.ClientFields.Presence = qryRes.Presence.String()
+ presences[qryRes.UserID] = qryRes
+ }
+ return presences, rows.Err()
+}
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 4e4b5c0b..54445c7e 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
if err != nil {
return nil, err
}
+ presence, err := NewPostgresPresenceTable(d.db)
+ if err != nil {
+ return nil, err
+ }
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@@ -111,6 +115,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
+ Presence: presence,
}
return &d, nil
}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index de43678d..7c4786fc 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -48,6 +48,7 @@ type Database struct {
Receipts tables.Receipts
Memberships tables.Memberships
NotificationData tables.NotificationData
+ Presence tables.Presence
}
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
@@ -1002,3 +1003,19 @@ func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
}
+
+func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
+ return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
+}
+
+func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
+ return s.Presence.GetPresenceForUser(ctx, nil, userID)
+}
+
+func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) {
+ return s.Presence.GetPresenceAfter(ctx, nil, after)
+}
+
+func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
+ return s.Presence.GetMaxPresenceID(ctx, nil)
+}
diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go
new file mode 100644
index 00000000..e7b78a70
--- /dev/null
+++ b/syncapi/storage/sqlite3/presence_table.go
@@ -0,0 +1,177 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const presenceSchema = `
+-- Stores data about presence
+CREATE TABLE IF NOT EXISTS syncapi_presence (
+ -- The ID
+ id BIGINT NOT NULL,
+ -- The Matrix user ID
+ user_id TEXT NOT NULL,
+ -- The actual presence
+ presence INT NOT NULL,
+ -- The status message
+ status_msg TEXT,
+ -- The last time an action was received by this user
+ last_active_ts BIGINT NOT NULL,
+ CONSTRAINT presence_presences_unique UNIQUE (user_id)
+);
+CREATE INDEX IF NOT EXISTS syncapi_presence_user_id ON syncapi_presence(user_id);
+`
+
+const upsertPresenceSQL = "" +
+ "INSERT INTO syncapi_presence AS p" +
+ " (id, user_id, presence, status_msg, last_active_ts)" +
+ " VALUES ($1, $2, $3, $4, $5)" +
+ " ON CONFLICT (user_id)" +
+ " DO UPDATE SET id = $6, " +
+ " presence = $7, status_msg = COALESCE($8, p.status_msg), last_active_ts = $9" +
+ " RETURNING id"
+
+const upsertPresenceFromSyncSQL = "" +
+ "INSERT INTO syncapi_presence AS p" +
+ " (id, user_id, presence, last_active_ts)" +
+ " VALUES ($1, $2, $3, $4)" +
+ " ON CONFLICT (user_id)" +
+ " DO UPDATE SET id = $5, " +
+ " presence = $6, last_active_ts = $7" +
+ " RETURNING id"
+
+const selectPresenceForUserSQL = "" +
+ "SELECT presence, status_msg, last_active_ts" +
+ " FROM syncapi_presence" +
+ " WHERE user_id = $1 LIMIT 1"
+
+const selectMaxPresenceSQL = "" +
+ "SELECT COALESCE(MAX(id), 0) FROM syncapi_presence"
+
+const selectPresenceAfter = "" +
+ " SELECT id, user_id, presence, status_msg, last_active_ts" +
+ " FROM syncapi_presence" +
+ " WHERE id > $1"
+
+type presenceStatements struct {
+ db *sql.DB
+ streamIDStatements *streamIDStatements
+ upsertPresenceStmt *sql.Stmt
+ upsertPresenceFromSyncStmt *sql.Stmt
+ selectPresenceForUsersStmt *sql.Stmt
+ selectMaxPresenceStmt *sql.Stmt
+ selectPresenceAfterStmt *sql.Stmt
+}
+
+func NewSqlitePresenceTable(db *sql.DB, streamID *streamIDStatements) (*presenceStatements, error) {
+ _, err := db.Exec(presenceSchema)
+ if err != nil {
+ return nil, err
+ }
+ s := &presenceStatements{
+ db: db,
+ streamIDStatements: streamID,
+ }
+ return s, sqlutil.StatementList{
+ {&s.upsertPresenceStmt, upsertPresenceSQL},
+ {&s.upsertPresenceFromSyncStmt, upsertPresenceFromSyncSQL},
+ {&s.selectPresenceForUsersStmt, selectPresenceForUserSQL},
+ {&s.selectMaxPresenceStmt, selectMaxPresenceSQL},
+ {&s.selectPresenceAfterStmt, selectPresenceAfter},
+ }.Prepare(db)
+}
+
+// UpsertPresence creates/updates a presence status.
+func (p *presenceStatements) UpsertPresence(
+ ctx context.Context,
+ txn *sql.Tx,
+ userID string,
+ statusMsg *string,
+ presence types.Presence,
+ lastActiveTS gomatrixserverlib.Timestamp,
+ fromSync bool,
+) (pos types.StreamPosition, err error) {
+ pos, err = p.streamIDStatements.nextPresenceID(ctx, txn)
+ if err != nil {
+ return pos, err
+ }
+
+ if fromSync {
+ stmt := sqlutil.TxStmt(txn, p.upsertPresenceFromSyncStmt)
+ err = stmt.QueryRowContext(ctx,
+ pos, userID, presence,
+ lastActiveTS, pos,
+ presence, lastActiveTS).Scan(&pos)
+ } else {
+ stmt := sqlutil.TxStmt(txn, p.upsertPresenceStmt)
+ err = stmt.QueryRowContext(ctx,
+ pos, userID, presence,
+ statusMsg, lastActiveTS, pos,
+ presence, statusMsg, lastActiveTS).Scan(&pos)
+ }
+ return
+}
+
+// GetPresenceForUser returns the current presence of a user.
+func (p *presenceStatements) GetPresenceForUser(
+ ctx context.Context, txn *sql.Tx,
+ userID string,
+) (*types.PresenceInternal, error) {
+ result := &types.PresenceInternal{
+ UserID: userID,
+ }
+ stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
+ err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
+ result.ClientFields.Presence = result.Presence.String()
+ return result, err
+}
+
+func (p *presenceStatements) GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
+ stmt := sqlutil.TxStmt(txn, p.selectMaxPresenceStmt)
+ err = stmt.QueryRowContext(ctx).Scan(&pos)
+ return
+}
+
+// GetPresenceAfter returns the changes presences after a given stream id
+func (p *presenceStatements) GetPresenceAfter(
+ ctx context.Context, txn *sql.Tx,
+ after types.StreamPosition,
+) (presences map[string]*types.PresenceInternal, err error) {
+ presences = make(map[string]*types.PresenceInternal)
+ stmt := sqlutil.TxStmt(txn, p.selectPresenceAfterStmt)
+
+ rows, err := stmt.QueryContext(ctx, after)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "GetPresenceAfter: failed to close rows")
+ for rows.Next() {
+ qryRes := &types.PresenceInternal{}
+ if err := rows.Scan(&qryRes.StreamPos, &qryRes.UserID, &qryRes.Presence, &qryRes.ClientFields.StatusMsg, &qryRes.LastActiveTS); err != nil {
+ return nil, err
+ }
+ qryRes.ClientFields.Presence = qryRes.Presence.String()
+ presences[qryRes.UserID] = qryRes
+ }
+ return presences, rows.Err()
+}
diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go
index 2be3ae93..faa2c41f 100644
--- a/syncapi/storage/sqlite3/stream_id_table.go
+++ b/syncapi/storage/sqlite3/stream_id_table.go
@@ -24,6 +24,8 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
ON CONFLICT DO NOTHING;
+INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("presence", 0)
+ ON CONFLICT DO NOTHING;
`
const increaseStreamIDStmt = "" +
@@ -70,3 +72,9 @@ func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx)
err = increaseStmt.QueryRowContext(ctx, "accountdata").Scan(&pos)
return
}
+
+func (s *streamIDStatements) nextPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
+ increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
+ err = increaseStmt.QueryRowContext(ctx, "presence").Scan(&pos)
+ return
+}
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index cb7e3b46..cb2c3169 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
if err != nil {
return err
}
+ presence, err := NewSqlitePresenceTable(d.db, &d.streamID)
+ if err != nil {
+ return err
+ }
m := sqlutil.NewMigrations()
deltas.LoadFixSequences(m)
deltas.LoadRemoveSendToDeviceSentColumn(m)
@@ -121,6 +125,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
Receipts: receipts,
Memberships: memberships,
NotificationData: notificationData,
+ Presence: presence,
}
return nil
}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 2c29888d..ef0587bb 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -182,3 +182,10 @@ type NotificationData interface {
SelectUserUnreadCounts(ctx context.Context, userID string, fromExcl, toIncl types.StreamPosition) (map[string]*eventutil.NotificationData, error)
SelectMaxID(ctx context.Context) (int64, error)
}
+
+type Presence interface {
+ UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
+ GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
+ GetMaxPresenceID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error)
+ GetPresenceAfter(ctx context.Context, txn *sql.Tx, after types.StreamPosition) (presences map[string]*types.PresenceInternal, err error)
+}
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
new file mode 100644
index 00000000..a24edad5
--- /dev/null
+++ b/syncapi/streams/stream_presence.go
@@ -0,0 +1,179 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package streams
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "sync"
+
+ "github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type PresenceStreamProvider struct {
+ StreamProvider
+ // cache contains previously sent presence updates to avoid unneeded updates
+ cache sync.Map
+ notifier *notifier.Notifier
+}
+
+func (p *PresenceStreamProvider) Setup() {
+ p.StreamProvider.Setup()
+
+ id, err := p.DB.MaxStreamPositionForPresence(context.Background())
+ if err != nil {
+ panic(err)
+ }
+ p.latest = id
+}
+
+func (p *PresenceStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+}
+
+func (p *PresenceStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) types.StreamPosition {
+ presences, err := p.DB.PresenceAfter(ctx, from)
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
+ return from
+ }
+
+ if len(presences) == 0 {
+ return to
+ }
+
+ // get all joined users
+ // TODO: SharedUsers might get out of sync
+ sharedUsers := p.notifier.SharedUsers(req.Device.UserID)
+
+ sharedUsersMap := map[string]bool{
+ req.Device.UserID: true,
+ }
+ // convert array to a map for easier checking if a user exists
+ for i := range sharedUsers {
+ sharedUsersMap[sharedUsers[i]] = true
+ }
+
+ // add newly joined rooms user presences
+ newlyJoined := joinedRooms(req.Response, req.Device.UserID)
+ if len(newlyJoined) > 0 {
+ // TODO: This refreshes all lists and is quite expensive
+ // The notifier should update the lists itself
+ if err = p.notifier.Load(ctx, p.DB); err != nil {
+ req.Log.WithError(err).Error("unable to refresh notifier lists")
+ return from
+ }
+ for _, roomID := range newlyJoined {
+ roomUsers := p.notifier.JoinedUsers(roomID)
+ for i := range roomUsers {
+ sharedUsersMap[roomUsers[i]] = true
+ // we already got a presence from this user
+ if _, ok := presences[roomUsers[i]]; ok {
+ continue
+ }
+ presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
+ if err != nil {
+ if err == sql.ErrNoRows {
+ continue
+ }
+ req.Log.WithError(err).Error("unable to query presence for user")
+ return from
+ }
+ }
+ }
+ }
+
+ lastPos := to
+ for i := range presences {
+ presence := presences[i]
+ // Ignore users we don't share a room with
+ if !sharedUsersMap[presence.UserID] {
+ continue
+ }
+ cacheKey := req.Device.UserID + req.Device.ID + presence.UserID
+ pres, ok := p.cache.Load(cacheKey)
+ if ok {
+ // skip already sent presence
+ prevPresence := pres.(*types.PresenceInternal)
+ currentlyActive := prevPresence.CurrentlyActive()
+ skip := prevPresence.Equals(presence) && currentlyActive && req.Device.UserID != presence.UserID
+ if skip {
+ req.Log.Debugf("Skipping presence, no change (%s)", presence.UserID)
+ continue
+ }
+ }
+ presence.ClientFields.LastActiveAgo = presence.LastActiveAgo()
+ if presence.ClientFields.Presence == "online" {
+ currentlyActive := presence.CurrentlyActive()
+ presence.ClientFields.CurrentlyActive = &currentlyActive
+ }
+
+ content, err := json.Marshal(presence.ClientFields)
+ if err != nil {
+ return from
+ }
+
+ req.Response.Presence.Events = append(req.Response.Presence.Events, gomatrixserverlib.ClientEvent{
+ Content: content,
+ Sender: presence.UserID,
+ Type: gomatrixserverlib.MPresence,
+ })
+ if presence.StreamPos > lastPos {
+ lastPos = presence.StreamPos
+ }
+ p.cache.Store(cacheKey, presence)
+ }
+
+ return lastPos
+}
+
+func joinedRooms(res *types.Response, userID string) []string {
+ var roomIDs []string
+ for roomID, join := range res.Rooms.Join {
+ // we would expect to see our join event somewhere if we newly joined the room.
+ // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
+ newlyJoined := membershipEventPresent(join.State.Events, userID)
+ if newlyJoined {
+ roomIDs = append(roomIDs, roomID)
+ continue
+ }
+ newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
+ if newlyJoined {
+ roomIDs = append(roomIDs, roomID)
+ }
+ }
+ return roomIDs
+}
+
+func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
+ for _, ev := range events {
+ // it's enough to know that we have our member event here, don't need to check membership content
+ // as it's implied by being in the respective section of the sync response.
+ if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
+ return true
+ }
+ }
+ return false
+}
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index b2273aad..07322388 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -6,6 +6,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -20,12 +21,13 @@ type Streams struct {
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.StreamProvider
NotificationDataStreamProvider types.StreamProvider
+ PresenceStreamProvider types.StreamProvider
}
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
- eduCache *caching.EDUCache,
+ eduCache *caching.EDUCache, notifier *notifier.Notifier,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
@@ -56,6 +58,10 @@ func NewSyncStreamProviders(
rsAPI: rsAPI,
keyAPI: keyAPI,
},
+ PresenceStreamProvider: &PresenceStreamProvider{
+ StreamProvider: StreamProvider{DB: d},
+ notifier: notifier,
+ },
}
streams.PDUStreamProvider.Setup()
@@ -66,6 +72,7 @@ func NewSyncStreamProviders(
streams.AccountDataStreamProvider.Setup()
streams.NotificationDataStreamProvider.Setup()
streams.DeviceListStreamProvider.Setup()
+ streams.PresenceStreamProvider.Setup()
return streams
}
@@ -80,5 +87,6 @@ func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
NotificationDataPosition: s.NotificationDataStreamProvider.LatestPosition(ctx),
DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
+ PresencePosition: s.PresenceStreamProvider.LatestPosition(ctx),
}
}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 2c9920d1..cf667337 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -17,6 +17,8 @@
package sync
import (
+ "context"
+ "database/sql"
"net"
"net/http"
"strings"
@@ -33,8 +35,10 @@ import (
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
// RequestPool manages HTTP long-poll connections for /sync
@@ -44,9 +48,15 @@ type RequestPool struct {
userAPI userapi.UserInternalAPI
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
- lastseen sync.Map
+ lastseen *sync.Map
+ presence *sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
+ producer PresencePublisher
+}
+
+type PresencePublisher interface {
+ SendPresence(userID string, presence types.Presence, statusMsg *string) error
}
// NewRequestPool makes a new RequestPool
@@ -55,6 +65,7 @@ func NewRequestPool(
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams, notifier *notifier.Notifier,
+ producer PresencePublisher,
) *RequestPool {
rp := &RequestPool{
db: db,
@@ -62,11 +73,14 @@ func NewRequestPool(
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
- lastseen: sync.Map{},
+ lastseen: &sync.Map{},
+ presence: &sync.Map{},
streams: streams,
Notifier: notifier,
+ producer: producer,
}
go rp.cleanLastSeen()
+ go rp.cleanPresence(db, time.Minute*5)
return rp
}
@@ -80,6 +94,68 @@ func (rp *RequestPool) cleanLastSeen() {
}
}
+func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Duration) {
+ if !rp.cfg.Matrix.Presence.EnableOutbound {
+ return
+ }
+ for {
+ rp.presence.Range(func(key interface{}, v interface{}) bool {
+ p := v.(types.PresenceInternal)
+ if time.Since(p.LastActiveTS.Time()) > cleanupTime {
+ rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID)
+ rp.presence.Delete(key)
+ }
+ return true
+ })
+ time.Sleep(cleanupTime)
+ }
+}
+
+// updatePresence sends presence updates to the SyncAPI and FederationAPI
+func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
+ if !rp.cfg.Matrix.Presence.EnableOutbound {
+ return
+ }
+ if presence == "" {
+ presence = types.PresenceOnline.String()
+ }
+
+ presenceID, ok := types.PresenceFromString(presence)
+ if !ok { // this should almost never happen
+ logrus.Errorf("unknown presence '%s'", presence)
+ return
+ }
+
+ newPresence := types.PresenceInternal{
+ ClientFields: types.PresenceClientResponse{
+ Presence: presenceID.String(),
+ },
+ Presence: presenceID,
+ UserID: userID,
+ LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
+ }
+ defer rp.presence.Store(userID, newPresence)
+ // avoid spamming presence updates when syncing
+ existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
+ if ok {
+ p := existingPresence.(types.PresenceInternal)
+ if p.ClientFields.Presence == newPresence.ClientFields.Presence {
+ return
+ }
+ }
+
+ // ensure we also send the current status_msg to federated servers and not nil
+ dbPresence, err := db.GetPresence(context.Background(), userID)
+ if err != nil && err != sql.ErrNoRows {
+ return
+ }
+
+ if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil {
+ logrus.WithError(err).Error("Unable to publish presence message from sync")
+ return
+ }
+}
+
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
if _, ok := rp.lastseen.LoadOrStore(device.UserID+device.ID, struct{}{}); ok {
return
@@ -156,6 +232,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
defer activeSyncRequests.Dec()
rp.updateLastSeen(req, device)
+ rp.updatePresence(rp.db, req.FormValue("set_presence"), device.UserID)
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
@@ -219,6 +296,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
+ PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
}
} else {
// Incremental sync
@@ -255,6 +335,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
+ PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.PresencePosition, currentPos.PresencePosition,
+ ),
}
}
diff --git a/syncapi/sync/requestpool_test.go b/syncapi/sync/requestpool_test.go
new file mode 100644
index 00000000..a8008994
--- /dev/null
+++ b/syncapi/sync/requestpool_test.go
@@ -0,0 +1,128 @@
+package sync
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type dummyPublisher struct {
+ count int
+}
+
+func (d *dummyPublisher) SendPresence(userID string, presence types.Presence, statusMsg *string) error {
+ d.count++
+ return nil
+}
+
+type dummyDB struct{}
+
+func (d dummyDB) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
+ return 0, nil
+}
+
+func (d dummyDB) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
+ return &types.PresenceInternal{}, nil
+}
+
+func (d dummyDB) PresenceAfter(ctx context.Context, after types.StreamPosition) (map[string]*types.PresenceInternal, error) {
+ return map[string]*types.PresenceInternal{}, nil
+}
+
+func (d dummyDB) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
+ return 0, nil
+}
+
+func TestRequestPool_updatePresence(t *testing.T) {
+ type args struct {
+ presence string
+ userID string
+ sleep time.Duration
+ }
+ publisher := &dummyPublisher{}
+ syncMap := sync.Map{}
+
+ tests := []struct {
+ name string
+ args args
+ wantIncrease bool
+ }{
+ {
+ name: "new presence is published",
+ wantIncrease: true,
+ args: args{
+ userID: "dummy",
+ },
+ },
+ {
+ name: "presence not published, no change",
+ args: args{
+ userID: "dummy",
+ },
+ },
+ {
+ name: "new presence is published dummy2",
+ wantIncrease: true,
+ args: args{
+ userID: "dummy2",
+ presence: "online",
+ },
+ },
+ {
+ name: "different presence is published dummy2",
+ wantIncrease: true,
+ args: args{
+ userID: "dummy2",
+ presence: "unavailable",
+ },
+ },
+ {
+ name: "same presence is not published dummy2",
+ args: args{
+ userID: "dummy2",
+ presence: "unavailable",
+ sleep: time.Millisecond * 150,
+ },
+ },
+ {
+ name: "same presence is published after being deleted",
+ wantIncrease: true,
+ args: args{
+ userID: "dummy2",
+ presence: "unavailable",
+ },
+ },
+ }
+ rp := &RequestPool{
+ presence: &syncMap,
+ producer: publisher,
+ cfg: &config.SyncAPI{
+ Matrix: &config.Global{
+ JetStream: config.JetStream{
+ TopicPrefix: "Dendrite",
+ },
+ Presence: config.PresenceOptions{
+ EnableInbound: true,
+ EnableOutbound: true,
+ },
+ },
+ },
+ }
+ db := dummyDB{}
+ go rp.cleanPresence(db, time.Millisecond*50)
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ beforeCount := publisher.count
+ rp.updatePresence(db, tt.args.presence, tt.args.userID)
+ if tt.wantIncrease && publisher.count <= beforeCount {
+ t.Fatalf("expected count to increase: %d <= %d", publisher.count, beforeCount)
+ }
+ time.Sleep(tt.args.sleep)
+ })
+ }
+}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index b579467a..384121a8 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -49,7 +49,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
- js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
+ js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
@@ -57,13 +57,19 @@ func AddPublicRoutes(
}
eduCache := caching.NewTypingCache()
- streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
- notifier := notifier.NewNotifier(streams.Latest(context.Background()))
+ notifier := notifier.NewNotifier()
+ streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache, notifier)
+ notifier.SetCurrentPosition(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
logrus.WithError(err).Panicf("failed to load notifier ")
}
- requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
+ federationPresenceProducer := &producers.FederationAPIPresenceProducer{
+ Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ JetStream: js,
+ }
+
+ requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier, federationPresenceProducer)
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
@@ -75,8 +81,6 @@ func AddPublicRoutes(
Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
}
- _ = userAPIReadUpdateProducer
-
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier,
@@ -131,5 +135,14 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start receipts consumer")
}
+ presenceConsumer := consumers.NewPresenceConsumer(
+ process, cfg, js, natsClient, syncDB,
+ notifier, streams.PresenceStreamProvider,
+ userAPI,
+ )
+ if err = presenceConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start presence consumer")
+ }
+
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}
diff --git a/syncapi/types/presence.go b/syncapi/types/presence.go
new file mode 100644
index 00000000..40aa29cf
--- /dev/null
+++ b/syncapi/types/presence.go
@@ -0,0 +1,75 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package types
+
+import (
+ "strings"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+//go:generate stringer -type=Presence -linecomment
+type Presence uint8
+
+const (
+ PresenceUnavailable Presence = iota + 1 // unavailable
+ PresenceOnline // online
+ PresenceOffline // offline
+)
+
+// PresenceFromString returns the integer representation of the given input presence.
+// Returns false for ok, if input is not a valid presence value.
+func PresenceFromString(input string) (p Presence, ok bool) {
+ for i := 0; i < len(_Presence_index)-1; i++ {
+ l, r := _Presence_index[i], _Presence_index[i+1]
+ if strings.EqualFold(input, _Presence_name[l:r]) {
+ return Presence(i + 1), true
+ }
+ }
+ return 0, false
+}
+
+type PresenceInternal struct {
+ ClientFields PresenceClientResponse
+ StreamPos StreamPosition `json:"-"`
+ UserID string `json:"-"`
+ LastActiveTS gomatrixserverlib.Timestamp `json:"-"`
+ Presence Presence `json:"-"`
+}
+
+// Equals compares p1 with p2.
+func (p1 *PresenceInternal) Equals(p2 *PresenceInternal) bool {
+ return p1.ClientFields.Presence == p2.ClientFields.Presence &&
+ p1.ClientFields.StatusMsg == p2.ClientFields.StatusMsg &&
+ p1.UserID == p2.UserID
+}
+
+// CurrentlyActive returns the current active state.
+func (p *PresenceInternal) CurrentlyActive() bool {
+ return time.Since(p.LastActiveTS.Time()).Minutes() < 5
+}
+
+// LastActiveAgo returns the time since the LastActiveTS in milliseconds.
+func (p *PresenceInternal) LastActiveAgo() int64 {
+ return time.Since(p.LastActiveTS.Time()).Milliseconds()
+}
+
+type PresenceClientResponse struct {
+ CurrentlyActive *bool `json:"currently_active,omitempty"`
+ LastActiveAgo int64 `json:"last_active_ago,omitempty"`
+ Presence string `json:"presence"`
+ StatusMsg *string `json:"status_msg,omitempty"`
+}
diff --git a/syncapi/types/presence_string.go b/syncapi/types/presence_string.go
new file mode 100644
index 00000000..467b463b
--- /dev/null
+++ b/syncapi/types/presence_string.go
@@ -0,0 +1,26 @@
+// Code generated by "stringer -type=Presence -linecomment"; DO NOT EDIT.
+
+package types
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[PresenceUnavailable-1]
+ _ = x[PresenceOnline-2]
+ _ = x[PresenceOffline-3]
+}
+
+const _Presence_name = "unavailableonlineoffline"
+
+var _Presence_index = [...]uint8{0, 11, 17, 24}
+
+func (i Presence) String() string {
+ i -= 1
+ if i >= Presence(len(_Presence_index)-1) {
+ return "Presence(" + strconv.FormatInt(int64(i+1), 10) + ")"
+ }
+ return _Presence_name[_Presence_index[i]:_Presence_index[i+1]]
+}
diff --git a/syncapi/types/presence_test.go b/syncapi/types/presence_test.go
new file mode 100644
index 00000000..dbc201c5
--- /dev/null
+++ b/syncapi/types/presence_test.go
@@ -0,0 +1,42 @@
+package types
+
+import "testing"
+
+func TestPresenceFromString(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ wantStatus Presence
+ wantOk bool
+ }{
+ {
+ name: "presence unavailable",
+ input: "unavailable",
+ wantStatus: PresenceUnavailable,
+ wantOk: true,
+ },
+ {
+ name: "presence online",
+ input: "OnLINE",
+ wantStatus: PresenceOnline,
+ wantOk: true,
+ },
+ {
+ name: "unknown presence",
+ input: "unknown",
+ wantStatus: 0,
+ wantOk: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, got1 := PresenceFromString(tt.input)
+ if got != tt.wantStatus {
+ t.Errorf("PresenceFromString() got = %v, want %v", got, tt.wantStatus)
+ }
+ if got1 != tt.wantOk {
+ t.Errorf("PresenceFromString() got1 = %v, want %v", got1, tt.wantOk)
+ }
+ })
+ }
+}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index d0efa1bb..d21203b5 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -103,6 +103,7 @@ type StreamingToken struct {
AccountDataPosition StreamPosition
DeviceListPosition StreamPosition
NotificationDataPosition StreamPosition
+ PresencePosition StreamPosition
}
// This will be used as a fallback by json.Marshal.
@@ -118,11 +119,12 @@ func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
func (t StreamingToken) String() string {
posStr := fmt.Sprintf(
- "s%d_%d_%d_%d_%d_%d_%d_%d",
+ "s%d_%d_%d_%d_%d_%d_%d_%d_%d",
t.PDUPosition, t.TypingPosition,
t.ReceiptPosition, t.SendToDevicePosition,
t.InvitePosition, t.AccountDataPosition,
t.DeviceListPosition, t.NotificationDataPosition,
+ t.PresencePosition,
)
return posStr
}
@@ -146,12 +148,14 @@ func (t *StreamingToken) IsAfter(other StreamingToken) bool {
return true
case t.NotificationDataPosition > other.NotificationDataPosition:
return true
+ case t.PresencePosition > other.PresencePosition:
+ return true
}
return false
}
func (t *StreamingToken) IsEmpty() bool {
- return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition == 0
+ return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition == 0
}
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
@@ -192,6 +196,9 @@ func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
if other.NotificationDataPosition > t.NotificationDataPosition {
t.NotificationDataPosition = other.NotificationDataPosition
}
+ if other.PresencePosition > t.PresencePosition {
+ t.PresencePosition = other.PresencePosition
+ }
}
type TopologyToken struct {
@@ -284,7 +291,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
// s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions
tok = strings.Split(tok, ".")[0]
parts := strings.Split(tok[1:], "_")
- var positions [8]StreamPosition
+ var positions [9]StreamPosition
for i, p := range parts {
if i >= len(positions) {
break
@@ -306,6 +313,7 @@ func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
AccountDataPosition: positions[5],
DeviceListPosition: positions[6],
NotificationDataPosition: positions[7],
+ PresencePosition: positions[8],
}
return token, nil
}
diff --git a/syncapi/types/types_test.go b/syncapi/types/types_test.go
index ff78bfb9..19fcfc15 100644
--- a/syncapi/types/types_test.go
+++ b/syncapi/types/types_test.go
@@ -9,10 +9,10 @@ import (
func TestSyncTokens(t *testing.T) {
shouldPass := map[string]string{
- "s4_0_0_0_0_0_0_0": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0}.String(),
- "s3_1_0_0_0_0_2_0": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0}.String(),
- "s3_1_2_3_5_0_0_0": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0}.String(),
- "t3_1": TopologyToken{3, 1}.String(),
+ "s4_0_0_0_0_0_0_0_3": StreamingToken{4, 0, 0, 0, 0, 0, 0, 0, 3}.String(),
+ "s3_1_0_0_0_0_2_0_5": StreamingToken{3, 1, 0, 0, 0, 0, 2, 0, 5}.String(),
+ "s3_1_2_3_5_0_0_0_6": StreamingToken{3, 1, 2, 3, 5, 0, 0, 0, 6}.String(),
+ "t3_1": TopologyToken{3, 1}.String(),
}
for a, b := range shouldPass {
diff --git a/sytest-whitelist b/sytest-whitelist
index 38a057da..69fa19c6 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -679,6 +679,20 @@ remote user has push rules copied to upgraded room
/upgrade of a bogus room fails gracefully
Cannot send tombstone event that points to the same room
Room summary counts change when membership changes
+GET /presence/:user_id/status fetches initial status
+PUT /presence/:user_id/status updates my presence
+Presence change reports an event to myself
+Existing members see new members' presence
+#Existing members see new member's presence
+Newly joined room includes presence in incremental sync
+Get presence for newly joined members in incremental sync
+User sees their own presence in a sync
+User sees updates to presence from other users in the incremental sync.
+Presence changes are reported to local room members
+Presence changes are also reported to remote room members
+Presence changes to UNAVAILABLE are reported to local room members
+Presence changes to UNAVAILABLE are reported to remote room members
+New federated private chats get full presence information (SYN-115)
/upgrade copies >100 power levels to the new room
Room state after a rejected message event is the same as before
Room state after a rejected state event is the same as before \ No newline at end of file
diff --git a/userapi/api/api.go b/userapi/api/api.go
index a9544f00..b86774d1 100644
--- a/userapi/api/api.go
+++ b/userapi/api/api.go
@@ -31,12 +31,10 @@ type UserInternalAPI interface {
UserRegisterAPI
UserAccountAPI
UserThreePIDAPI
+ UserDeviceAPI
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
- PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
- PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
- PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
PerformOpenIDTokenCreation(ctx context.Context, req *PerformOpenIDTokenCreationRequest, res *PerformOpenIDTokenCreationResponse) error
PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse) error
PerformPusherSet(ctx context.Context, req *PerformPusherSetRequest, res *struct{}) error
@@ -45,15 +43,21 @@ type UserInternalAPI interface {
QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse)
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
- QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error
- QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
QueryOpenIDToken(ctx context.Context, req *QueryOpenIDTokenRequest, res *QueryOpenIDTokenResponse) error
QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error
QueryPushRules(ctx context.Context, req *QueryPushRulesRequest, res *QueryPushRulesResponse) error
QueryNotifications(ctx context.Context, req *QueryNotificationsRequest, res *QueryNotificationsResponse) error
}
+type UserDeviceAPI interface {
+ PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
+ PerformLastSeenUpdate(ctx context.Context, req *PerformLastSeenUpdateRequest, res *PerformLastSeenUpdateResponse) error
+ PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
+ QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
+ QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error
+}
+
type UserDirectoryProvider interface {
QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error
}