aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync/requestpool.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/sync/requestpool.go')
-rw-r--r--syncapi/sync/requestpool.go88
1 files changed, 86 insertions, 2 deletions
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 2c9920d1..cf667337 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -17,6 +17,8 @@
package sync
import (
+ "context"
+ "database/sql"
"net"
"net/http"
"strings"
@@ -33,8 +35,10 @@ import (
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
// RequestPool manages HTTP long-poll connections for /sync
@@ -44,9 +48,15 @@ type RequestPool struct {
userAPI userapi.UserInternalAPI
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
- lastseen sync.Map
+ lastseen *sync.Map
+ presence *sync.Map
streams *streams.Streams
Notifier *notifier.Notifier
+ producer PresencePublisher
+}
+
+type PresencePublisher interface {
+ SendPresence(userID string, presence types.Presence, statusMsg *string) error
}
// NewRequestPool makes a new RequestPool
@@ -55,6 +65,7 @@ func NewRequestPool(
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
streams *streams.Streams, notifier *notifier.Notifier,
+ producer PresencePublisher,
) *RequestPool {
rp := &RequestPool{
db: db,
@@ -62,11 +73,14 @@ func NewRequestPool(
userAPI: userAPI,
keyAPI: keyAPI,
rsAPI: rsAPI,
- lastseen: sync.Map{},
+ lastseen: &sync.Map{},
+ presence: &sync.Map{},
streams: streams,
Notifier: notifier,
+ producer: producer,
}
go rp.cleanLastSeen()
+ go rp.cleanPresence(db, time.Minute*5)
return rp
}
@@ -80,6 +94,68 @@ func (rp *RequestPool) cleanLastSeen() {
}
}
+func (rp *RequestPool) cleanPresence(db storage.Presence, cleanupTime time.Duration) {
+ if !rp.cfg.Matrix.Presence.EnableOutbound {
+ return
+ }
+ for {
+ rp.presence.Range(func(key interface{}, v interface{}) bool {
+ p := v.(types.PresenceInternal)
+ if time.Since(p.LastActiveTS.Time()) > cleanupTime {
+ rp.updatePresence(db, types.PresenceUnavailable.String(), p.UserID)
+ rp.presence.Delete(key)
+ }
+ return true
+ })
+ time.Sleep(cleanupTime)
+ }
+}
+
+// updatePresence sends presence updates to the SyncAPI and FederationAPI
+func (rp *RequestPool) updatePresence(db storage.Presence, presence string, userID string) {
+ if !rp.cfg.Matrix.Presence.EnableOutbound {
+ return
+ }
+ if presence == "" {
+ presence = types.PresenceOnline.String()
+ }
+
+ presenceID, ok := types.PresenceFromString(presence)
+ if !ok { // this should almost never happen
+ logrus.Errorf("unknown presence '%s'", presence)
+ return
+ }
+
+ newPresence := types.PresenceInternal{
+ ClientFields: types.PresenceClientResponse{
+ Presence: presenceID.String(),
+ },
+ Presence: presenceID,
+ UserID: userID,
+ LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
+ }
+ defer rp.presence.Store(userID, newPresence)
+ // avoid spamming presence updates when syncing
+ existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
+ if ok {
+ p := existingPresence.(types.PresenceInternal)
+ if p.ClientFields.Presence == newPresence.ClientFields.Presence {
+ return
+ }
+ }
+
+ // ensure we also send the current status_msg to federated servers and not nil
+ dbPresence, err := db.GetPresence(context.Background(), userID)
+ if err != nil && err != sql.ErrNoRows {
+ return
+ }
+
+ if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil {
+ logrus.WithError(err).Error("Unable to publish presence message from sync")
+ return
+ }
+}
+
func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device) {
if _, ok := rp.lastseen.LoadOrStore(device.UserID+device.ID, struct{}{}); ok {
return
@@ -156,6 +232,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
defer activeSyncRequests.Dec()
rp.updateLastSeen(req, device)
+ rp.updatePresence(rp.db, req.FormValue("set_presence"), device.UserID)
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
@@ -219,6 +296,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
syncReq.Context, syncReq,
),
+ PresencePosition: rp.streams.PresenceStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
}
} else {
// Incremental sync
@@ -255,6 +335,10 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
syncReq.Context, syncReq,
syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
),
+ PresencePosition: rp.streams.PresenceStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.PresencePosition, currentPos.PresencePosition,
+ ),
}
}