aboutsummaryrefslogtreecommitdiff
path: root/federationapi/routing
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/routing')
-rw-r--r--federationapi/routing/send.go56
1 files changed, 42 insertions, 14 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index eacc76db..1bba632b 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -30,6 +30,7 @@ import (
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ syncTypes "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
@@ -127,13 +128,14 @@ func Send(
defer inFlightTxnsPerOrigin.Delete(index)
t := txnReq{
- rsAPI: rsAPI,
- keys: keys,
- federation: federation,
- servers: servers,
- keyAPI: keyAPI,
- roomsMu: mu,
- producer: producer,
+ rsAPI: rsAPI,
+ keys: keys,
+ federation: federation,
+ servers: servers,
+ keyAPI: keyAPI,
+ roomsMu: mu,
+ producer: producer,
+ inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound,
}
var txnEvents struct {
@@ -185,13 +187,14 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
- rsAPI api.RoomserverInternalAPI
- keyAPI keyapi.KeyInternalAPI
- keys gomatrixserverlib.JSONVerifier
- federation txnFederationClient
- roomsMu *internal.MutexByRoom
- servers federationAPI.ServersInRoomProvider
- producer *producers.SyncAPIProducer
+ rsAPI api.RoomserverInternalAPI
+ keyAPI keyapi.KeyInternalAPI
+ keys gomatrixserverlib.JSONVerifier
+ federation txnFederationClient
+ roomsMu *internal.MutexByRoom
+ servers federationAPI.ServersInRoomProvider
+ producer *producers.SyncAPIProducer
+ inboundPresenceEnabled bool
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@@ -389,12 +392,37 @@ func (t *txnReq) processEDUs(ctx context.Context) {
if err := t.processSigningKeyUpdate(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process signing key update")
}
+ case gomatrixserverlib.MPresence:
+ if t.inboundPresenceEnabled {
+ if err := t.processPresence(ctx, e); err != nil {
+ logrus.WithError(err).Errorf("Failed to process presence update")
+ }
+ }
default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
}
}
}
+// processPresence handles m.receipt events
+func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error {
+ payload := types.Presence{}
+ if err := json.Unmarshal(e.Content, &payload); err != nil {
+ return err
+ }
+ for _, content := range payload.Push {
+ presence, ok := syncTypes.PresenceFromString(content.Presence)
+ if !ok {
+ logrus.Warnf("invalid presence '%s', skipping.", content.Presence)
+ continue
+ }
+ if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
var updatePayload keyapi.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {