aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--syncapi/consumers/presence.go9
-rw-r--r--syncapi/storage/postgres/presence_table.go3
-rw-r--r--syncapi/storage/sqlite3/presence_table.go3
-rw-r--r--syncapi/streams/stream_presence.go12
-rw-r--r--syncapi/sync/requestpool.go23
-rw-r--r--sytest-whitelist2
6 files changed, 33 insertions, 19 deletions
diff --git a/syncapi/consumers/presence.go b/syncapi/consumers/presence.go
index b198b229..6bcca48f 100644
--- a/syncapi/consumers/presence.go
+++ b/syncapi/consumers/presence.go
@@ -88,6 +88,11 @@ func (s *PresenceConsumer) Start() error {
}
return
}
+ if presence == nil {
+ presence = &types.PresenceInternal{
+ UserID: userID,
+ }
+ }
deviceRes := api.QueryDevicesResponse{}
if err = s.deviceAPI.QueryDevices(s.ctx, &api.QueryDevicesRequest{UserID: userID}, &deviceRes); err != nil {
@@ -106,7 +111,9 @@ func (s *PresenceConsumer) Start() error {
m.Header.Set(jetstream.UserID, presence.UserID)
m.Header.Set("presence", presence.ClientFields.Presence)
- m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
+ if presence.ClientFields.StatusMsg != nil {
+ m.Header.Set("status_msg", *presence.ClientFields.StatusMsg)
+ }
m.Header.Set("last_active_ts", strconv.Itoa(int(presence.LastActiveTS)))
if err = msg.RespondMsg(m); err != nil {
diff --git a/syncapi/storage/postgres/presence_table.go b/syncapi/storage/postgres/presence_table.go
index 49336c4e..9f1e37f7 100644
--- a/syncapi/storage/postgres/presence_table.go
+++ b/syncapi/storage/postgres/presence_table.go
@@ -127,6 +127,9 @@ func (p *presenceStatements) GetPresenceForUser(
}
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
result.ClientFields.Presence = result.Presence.String()
return result, err
}
diff --git a/syncapi/storage/sqlite3/presence_table.go b/syncapi/storage/sqlite3/presence_table.go
index 00b16458..177a01bf 100644
--- a/syncapi/storage/sqlite3/presence_table.go
+++ b/syncapi/storage/sqlite3/presence_table.go
@@ -142,6 +142,9 @@ func (p *presenceStatements) GetPresenceForUser(
}
stmt := sqlutil.TxStmt(txn, p.selectPresenceForUsersStmt)
err := stmt.QueryRowContext(ctx, userID).Scan(&result.Presence, &result.ClientFields.StatusMsg, &result.LastActiveTS)
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
result.ClientFields.Presence = result.Presence.String()
return result, err
}
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
index 9a6c5c13..614b88d4 100644
--- a/syncapi/streams/stream_presence.go
+++ b/syncapi/streams/stream_presence.go
@@ -16,7 +16,6 @@ package streams
import (
"context"
- "database/sql"
"encoding/json"
"sync"
@@ -80,11 +79,10 @@ func (p *PresenceStreamProvider) IncrementalSync(
if _, ok := presences[roomUsers[i]]; ok {
continue
}
+ // Bear in mind that this might return nil, but at least populating
+ // a nil means that there's a map entry so we won't repeat this call.
presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
if err != nil {
- if err == sql.ErrNoRows {
- continue
- }
req.Log.WithError(err).Error("unable to query presence for user")
return from
}
@@ -93,8 +91,10 @@ func (p *PresenceStreamProvider) IncrementalSync(
}
lastPos := to
- for i := range presences {
- presence := presences[i]
+ for _, presence := range presences {
+ if presence == nil {
+ continue
+ }
// Ignore users we don't share a room with
if req.Device.UserID != presence.UserID && !p.notifier.IsSharedUser(req.Device.UserID, presence.UserID) {
continue
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 70334099..76d550a6 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -127,14 +127,23 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
if !ok { // this should almost never happen
return
}
+
newPresence := types.PresenceInternal{
- ClientFields: types.PresenceClientResponse{
- Presence: presenceID.String(),
- },
Presence: presenceID,
UserID: userID,
LastActiveTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
+
+ // ensure we also send the current status_msg to federated servers and not nil
+ dbPresence, err := db.GetPresence(context.Background(), userID)
+ if err != nil && err != sql.ErrNoRows {
+ return
+ }
+ if dbPresence != nil {
+ newPresence.ClientFields = dbPresence.ClientFields
+ }
+ newPresence.ClientFields.Presence = presenceID.String()
+
defer rp.presence.Store(userID, newPresence)
// avoid spamming presence updates when syncing
existingPresence, ok := rp.presence.LoadOrStore(userID, newPresence)
@@ -145,13 +154,7 @@ func (rp *RequestPool) updatePresence(db storage.Presence, presence string, user
}
}
- // ensure we also send the current status_msg to federated servers and not nil
- dbPresence, err := db.GetPresence(context.Background(), userID)
- if err != nil && err != sql.ErrNoRows {
- return
- }
-
- if err := rp.producer.SendPresence(userID, presenceID, dbPresence.ClientFields.StatusMsg); err != nil {
+ if err := rp.producer.SendPresence(userID, presenceID, newPresence.ClientFields.StatusMsg); err != nil {
logrus.WithError(err).Error("Unable to publish presence message from sync")
return
}
diff --git a/sytest-whitelist b/sytest-whitelist
index c9829606..6af8d89f 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -681,8 +681,6 @@ GET /presence/:user_id/status fetches initial status
PUT /presence/:user_id/status updates my presence
Presence change reports an event to myself
Existing members see new members' presence
-#Existing members see new member's presence
-Newly joined room includes presence in incremental sync
Get presence for newly joined members in incremental sync
User sees their own presence in a sync
User sees updates to presence from other users in the incremental sync.