diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-04-06 13:11:19 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-06 13:11:19 +0200 |
commit | e5e3350ce168a192dfc6b6b654276d5cffbdbf0f (patch) | |
tree | 738e3bd364da85767de8c487d3d9a851ab804e8b /federationapi/routing | |
parent | 16e2d243fc8f3d433a9d7f428e6f782065dc5e89 (diff) |
Add presence module V2 (#2312)
* Syncapi presence
* Clientapi http presence handler
* Why is this here?
* Missing files
* FederationAPI presence implementation
* Add new presence stream
* Pinecone update
* Pinecone update
* Add passing tests
* Make linter happy
* Add presence producer
* Add presence config option
* Set user to unavailable after x minutes
* Only set currently_active if online
Avoid unneeded presence updates when syncing
* Tweaks
* Query devices for last_active_ts
Fixes & tweaks
* Export SharedUsers/SharedUsers
* Presence stream in MemoryStorage
* Remove status_msg_nil
* Fix sytest crashes
* Make presence types const and use stringer for it
* Change options to allow inbound/outbound presence
* Fix option & typo
* Update configs
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'federationapi/routing')
-rw-r--r-- | federationapi/routing/send.go | 56 |
1 files changed, 42 insertions, 14 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index eacc76db..1bba632b 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -30,6 +30,7 @@ import ( keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + syncTypes "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" @@ -127,13 +128,14 @@ func Send( defer inFlightTxnsPerOrigin.Delete(index) t := txnReq{ - rsAPI: rsAPI, - keys: keys, - federation: federation, - servers: servers, - keyAPI: keyAPI, - roomsMu: mu, - producer: producer, + rsAPI: rsAPI, + keys: keys, + federation: federation, + servers: servers, + keyAPI: keyAPI, + roomsMu: mu, + producer: producer, + inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound, } var txnEvents struct { @@ -185,13 +187,14 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - rsAPI api.RoomserverInternalAPI - keyAPI keyapi.KeyInternalAPI - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient - roomsMu *internal.MutexByRoom - servers federationAPI.ServersInRoomProvider - producer *producers.SyncAPIProducer + rsAPI api.RoomserverInternalAPI + keyAPI keyapi.KeyInternalAPI + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient + roomsMu *internal.MutexByRoom + servers federationAPI.ServersInRoomProvider + producer *producers.SyncAPIProducer + inboundPresenceEnabled bool } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -389,12 +392,37 @@ func (t *txnReq) processEDUs(ctx context.Context) { if err := t.processSigningKeyUpdate(ctx, e); err != nil { logrus.WithError(err).Errorf("Failed to process signing key update") } + case gomatrixserverlib.MPresence: + if t.inboundPresenceEnabled { + if err := t.processPresence(ctx, e); err != nil { + logrus.WithError(err).Errorf("Failed to process presence update") + } + } default: util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU") } } } +// processPresence handles m.receipt events +func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error { + payload := types.Presence{} + if err := json.Unmarshal(e.Content, &payload); err != nil { + return err + } + for _, content := range payload.Push { + presence, ok := syncTypes.PresenceFromString(content.Presence) + if !ok { + logrus.Warnf("invalid presence '%s', skipping.", content.Presence) + continue + } + if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil { + return err + } + } + return nil +} + func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error { var updatePayload keyapi.CrossSigningKeyUpdate if err := json.Unmarshal(e.Content, &updatePayload); err != nil { |