aboutsummaryrefslogtreecommitdiff
path: root/clientapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-04-06 13:11:19 +0200
committerGitHub <noreply@github.com>2022-04-06 13:11:19 +0200
commite5e3350ce168a192dfc6b6b654276d5cffbdbf0f (patch)
tree738e3bd364da85767de8c487d3d9a851ab804e8b /clientapi
parent16e2d243fc8f3d433a9d7f428e6f782065dc5e89 (diff)
Add presence module V2 (#2312)
* Syncapi presence * Clientapi http presence handler * Why is this here? * Missing files * FederationAPI presence implementation * Add new presence stream * Pinecone update * Pinecone update * Add passing tests * Make linter happy * Add presence producer * Add presence config option * Set user to unavailable after x minutes * Only set currently_active if online Avoid unneeded presence updates when syncing * Tweaks * Query devices for last_active_ts Fixes & tweaks * Export SharedUsers/SharedUsers * Presence stream in MemoryStorage * Remove status_msg_nil * Fix sytest crashes * Make presence types const and use stringer for it * Change options to allow inbound/outbound presence * Fix option & typo * Update configs Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'clientapi')
-rw-r--r--clientapi/clientapi.go5
-rw-r--r--clientapi/producers/syncapi.go18
-rw-r--r--clientapi/routing/presence.go138
-rw-r--r--clientapi/routing/routing.go36
4 files changed, 180 insertions, 17 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index d4b417a3..e2f8d3f3 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -48,7 +48,7 @@ func AddPublicRoutes(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
- js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
+ js, natsClient := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
@@ -56,6 +56,7 @@ func AddPublicRoutes(
TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ TopicPresenceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
UserAPI: userAPI,
ServerName: cfg.Matrix.ServerName,
}
@@ -64,6 +65,6 @@ func AddPublicRoutes(
router, synapseAdminRouter, cfg, rsAPI, asAPI,
userAPI, userDirectoryProvider, federation,
syncProducer, transactionsCache, fsAPI, keyAPI,
- extRoomsProvider, mscCfg,
+ extRoomsProvider, mscCfg, natsClient,
)
}
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 2dee04e3..6e869c31 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
+ "time"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -34,6 +35,7 @@ type SyncAPIProducer struct {
TopicReceiptEvent string
TopicSendToDeviceEvent string
TopicTypingEvent string
+ TopicPresenceEvent string
JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
UserAPI userapi.UserInternalAPI
@@ -173,3 +175,19 @@ func (p *SyncAPIProducer) SendTyping(
_, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
return err
}
+
+func (p *SyncAPIProducer) SendPresence(
+ ctx context.Context, userID string, presence types.Presence, statusMsg *string,
+) error {
+ m := nats.NewMsg(p.TopicPresenceEvent)
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set("presence", presence.String())
+ if statusMsg != nil {
+ m.Header.Set("status_msg", *statusMsg)
+ }
+
+ m.Header.Set("last_active_ts", strconv.Itoa(int(gomatrixserverlib.AsTimestamp(time.Now()))))
+
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}
diff --git a/clientapi/routing/presence.go b/clientapi/routing/presence.go
new file mode 100644
index 00000000..63fbb75e
--- /dev/null
+++ b/clientapi/routing/presence.go
@@ -0,0 +1,138 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package routing
+
+import (
+ "fmt"
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/httputil"
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/clientapi/producers"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+type presenceReq struct {
+ Presence string `json:"presence"`
+ StatusMsg *string `json:"status_msg,omitempty"`
+}
+
+func SetPresence(
+ req *http.Request,
+ cfg *config.ClientAPI,
+ device *api.Device,
+ producer *producers.SyncAPIProducer,
+ userID string,
+) util.JSONResponse {
+ if !cfg.Matrix.Presence.EnableOutbound {
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: struct{}{},
+ }
+ }
+ if device.UserID != userID {
+ return util.JSONResponse{
+ Code: http.StatusForbidden,
+ JSON: jsonerror.Forbidden("Unable to set presence for other user."),
+ }
+ }
+ var presence presenceReq
+ parseErr := httputil.UnmarshalJSONRequest(req, &presence)
+ if parseErr != nil {
+ return *parseErr
+ }
+
+ presenceStatus, ok := types.PresenceFromString(presence.Presence)
+ if !ok {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.Unknown(fmt.Sprintf("Unknown presence '%s'.", presence.Presence)),
+ }
+ }
+
+ err := producer.SendPresence(req.Context(), userID, presenceStatus, presence.StatusMsg)
+ if err != nil {
+ log.WithError(err).Errorf("failed to update presence")
+ return util.JSONResponse{
+ Code: http.StatusInternalServerError,
+ JSON: jsonerror.InternalServerError(),
+ }
+ }
+
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: struct{}{},
+ }
+}
+
+func GetPresence(
+ req *http.Request,
+ device *api.Device,
+ natsClient *nats.Conn,
+ presenceTopic string,
+ userID string,
+) util.JSONResponse {
+ msg := nats.NewMsg(presenceTopic)
+ msg.Header.Set(jetstream.UserID, userID)
+
+ presence, err := natsClient.RequestMsg(msg, time.Second*10)
+ if err != nil {
+ log.WithError(err).Errorf("unable to get presence")
+ return util.JSONResponse{
+ Code: http.StatusInternalServerError,
+ JSON: jsonerror.InternalServerError(),
+ }
+ }
+
+ statusMsg := presence.Header.Get("status_msg")
+ e := presence.Header.Get("error")
+ if e != "" {
+ log.Errorf("received error msg from nats: %s", e)
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: types.PresenceClientResponse{
+ Presence: types.PresenceUnavailable.String(),
+ },
+ }
+ }
+ lastActive, err := strconv.Atoi(presence.Header.Get("last_active_ts"))
+ if err != nil {
+ return util.JSONResponse{
+ Code: http.StatusInternalServerError,
+ JSON: jsonerror.InternalServerError(),
+ }
+ }
+
+ p := types.PresenceInternal{LastActiveTS: gomatrixserverlib.Timestamp(lastActive)}
+ currentlyActive := p.CurrentlyActive()
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: types.PresenceClientResponse{
+ CurrentlyActive: &currentlyActive,
+ LastActiveAgo: p.LastActiveAgo(),
+ Presence: presence.Header.Get("presence"),
+ StatusMsg: &statusMsg,
+ },
+ }
+}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index db860dcd..32e83187 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -32,9 +32,11 @@ import (
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
@@ -56,7 +58,7 @@ func Setup(
federationSender federationAPI.FederationInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
- mscCfg *config.MSCs,
+ mscCfg *config.MSCs, natsClient *nats.Conn,
) {
rateLimits := httputil.NewRateLimits(&cfg.RateLimiting)
userInteractiveAuth := auth.NewUserInteractive(userAPI, cfg)
@@ -779,20 +781,6 @@ func Setup(
}),
).Methods(http.MethodPost, http.MethodOptions)
- // Element logs get flooded unless this is handled
- v3mux.Handle("/presence/{userID}/status",
- httputil.MakeExternalAPI("presence", func(req *http.Request) util.JSONResponse {
- if r := rateLimits.Limit(req); r != nil {
- return *r
- }
- // TODO: Set presence (probably the responsibility of a presence server not clientapi)
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: struct{}{},
- }
- }),
- ).Methods(http.MethodPut, http.MethodOptions)
-
v3mux.Handle("/voip/turnServer",
httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
@@ -1308,4 +1296,22 @@ func Setup(
return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"])
}),
).Methods(http.MethodPost, http.MethodOptions)
+ v3mux.Handle("/presence/{userId}/status",
+ httputil.MakeAuthAPI("set_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
+ if err != nil {
+ return util.ErrorResponse(err)
+ }
+ return SetPresence(req, cfg, device, syncProducer, vars["userId"])
+ }),
+ ).Methods(http.MethodPut, http.MethodOptions)
+ v3mux.Handle("/presence/{userId}/status",
+ httputil.MakeAuthAPI("get_presence", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
+ vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
+ if err != nil {
+ return util.ErrorResponse(err)
+ }
+ return GetPresence(req, device, natsClient, cfg.Matrix.JetStream.Prefixed(jetstream.RequestPresence), vars["userId"])
+ }),
+ ).Methods(http.MethodGet, http.MethodOptions)
}