aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-04-28 18:53:28 +0200
committerGitHub <noreply@github.com>2022-04-28 18:53:28 +0200
commit2a5b8e0306a283aa8ca64822d59d71479ffba59a (patch)
treece014c70033cafc9afdb6db5959ba4d8672bd7df /syncapi
parentc6ea2c9ff26ca6ae4c799db08a3f72c6b4d99256 (diff)
Only load members of newly joined rooms (#2389)
* Only load members of newly joined rooms * Comment that the query is prepared at runtime Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/notifier/notifier.go14
-rw-r--r--syncapi/storage/interface.go3
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go34
-rw-r--r--syncapi/storage/shared/syncserver.go4
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go49
-rw-r--r--syncapi/storage/tables/interface.go2
-rw-r--r--syncapi/streams/stream_presence.go5
7 files changed, 102 insertions, 9 deletions
diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go
index 82834239..87f0d86d 100644
--- a/syncapi/notifier/notifier.go
+++ b/syncapi/notifier/notifier.go
@@ -333,6 +333,20 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
return nil
}
+// LoadRooms loads the membership states required to notify users correctly.
+func (n *Notifier) LoadRooms(ctx context.Context, db storage.Database, roomIDs []string) error {
+ n.lock.Lock()
+ defer n.lock.Unlock()
+
+ roomToUsers, err := db.AllJoinedUsersInRoom(ctx, roomIDs)
+ if err != nil {
+ return err
+ }
+ n.setUsersJoinedToRooms(roomToUsers)
+
+ return nil
+}
+
// CurrentPosition returns the current sync position
func (n *Notifier) CurrentPosition() types.StreamingToken {
n.lock.RLock()
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 48697859..5a036d88 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -52,6 +52,9 @@ type Database interface {
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
+ // AllJoinedUsersInRoom returns a map of room ID to a list of all joined user IDs for a given room.
+ AllJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error)
+
// AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices.
AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error)
// Events lookups a list of event by their event ID.
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index fe68788d..8ee387b3 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -93,6 +93,9 @@ const selectCurrentStateSQL = "" +
const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
+const selectJoinedUsersInRoomSQL = "" +
+ "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id = ANY($1)"
+
const selectStateEventSQL = "" +
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
@@ -112,6 +115,7 @@ type currentRoomStateStatements struct {
selectRoomIDsWithAnyMembershipStmt *sql.Stmt
selectCurrentStateStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt
+ selectJoinedUsersInRoomStmt *sql.Stmt
selectEventsWithEventIDsStmt *sql.Stmt
selectStateEventStmt *sql.Stmt
}
@@ -143,6 +147,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return nil, err
}
+ if s.selectJoinedUsersInRoomStmt, err = db.Prepare(selectJoinedUsersInRoomSQL); err != nil {
+ return nil, err
+ }
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
return nil, err
}
@@ -163,9 +170,32 @@ func (s *currentRoomStateStatements) SelectJoinedUsers(
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
result := make(map[string][]string)
+ var roomID string
+ var userID string
+ for rows.Next() {
+ if err := rows.Scan(&roomID, &userID); err != nil {
+ return nil, err
+ }
+ users := result[roomID]
+ users = append(users, userID)
+ result[roomID] = users
+ }
+ return result, rows.Err()
+}
+
+// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
+func (s *currentRoomStateStatements) SelectJoinedUsersInRoom(
+ ctx context.Context, roomIDs []string,
+) (map[string][]string, error) {
+ rows, err := s.selectJoinedUsersInRoomStmt.QueryContext(ctx, pq.StringArray(roomIDs))
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
+
+ result := make(map[string][]string)
+ var userID, roomID string
for rows.Next() {
- var roomID string
- var userID string
if err := rows.Scan(&roomID, &userID); err != nil {
return nil, err
}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index b7d2d3a2..ec5edd35 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -168,6 +168,10 @@ func (d *Database) AllJoinedUsersInRooms(ctx context.Context) (map[string][]stri
return d.CurrentRoomState.SelectJoinedUsers(ctx)
}
+func (d *Database) AllJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) {
+ return d.CurrentRoomState.SelectJoinedUsersInRoom(ctx, roomIDs)
+}
+
func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]types.PeekingDevice, error) {
return d.Peeks.SelectPeekingDevices(ctx)
}
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index ccda005c..f0a1c7bb 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -77,6 +77,9 @@ const selectCurrentStateSQL = "" +
const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
+const selectJoinedUsersInRoomSQL = "" +
+ "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id IN ($1)"
+
const selectStateEventSQL = "" +
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
@@ -97,7 +100,8 @@ type currentRoomStateStatements struct {
selectRoomIDsWithMembershipStmt *sql.Stmt
selectRoomIDsWithAnyMembershipStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt
- selectStateEventStmt *sql.Stmt
+ //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic
+ selectStateEventStmt *sql.Stmt
}
func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) {
@@ -127,13 +131,16 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (t
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return nil, err
}
+ //if s.selectJoinedUsersInRoomStmt, err = db.Prepare(selectJoinedUsersInRoomSQL); err != nil {
+ // return nil, err
+ //}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return nil, err
}
return s, nil
}
-// JoinedMemberLists returns a map of room ID to a list of joined user IDs.
+// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
func (s *currentRoomStateStatements) SelectJoinedUsers(
ctx context.Context,
) (map[string][]string, error) {
@@ -144,9 +151,9 @@ func (s *currentRoomStateStatements) SelectJoinedUsers(
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
result := make(map[string][]string)
+ var roomID string
+ var userID string
for rows.Next() {
- var roomID string
- var userID string
if err := rows.Scan(&roomID, &userID); err != nil {
return nil, err
}
@@ -157,6 +164,40 @@ func (s *currentRoomStateStatements) SelectJoinedUsers(
return result, nil
}
+// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
+func (s *currentRoomStateStatements) SelectJoinedUsersInRoom(
+ ctx context.Context, roomIDs []string,
+) (map[string][]string, error) {
+ query := strings.Replace(selectJoinedUsersInRoomSQL, "($1)", sqlutil.QueryVariadic(len(roomIDs)), 1)
+ params := make([]interface{}, 0, len(roomIDs))
+ for _, roomID := range roomIDs {
+ params = append(params, roomID)
+ }
+ stmt, err := s.db.Prepare(query)
+ if err != nil {
+ return nil, fmt.Errorf("SelectJoinedUsersInRoom s.db.Prepare: %w", err)
+ }
+ defer internal.CloseAndLogIfError(ctx, stmt, "SelectJoinedUsersInRoom: stmt.close() failed")
+
+ rows, err := stmt.QueryContext(ctx, params...)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectJoinedUsersInRoom: rows.close() failed")
+
+ result := make(map[string][]string)
+ var userID, roomID string
+ for rows.Next() {
+ if err := rows.Scan(&roomID, &userID); err != nil {
+ return nil, err
+ }
+ users := result[roomID]
+ users = append(users, userID)
+ result[roomID] = users
+ }
+ return result, rows.Err()
+}
+
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
ctx context.Context,
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 6fdd9483..ccdebfdb 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -102,6 +102,8 @@ type CurrentRoomState interface {
SelectRoomIDsWithAnyMembership(ctx context.Context, txn *sql.Tx, userID string) (map[string]string, error)
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
SelectJoinedUsers(ctx context.Context) (map[string][]string, error)
+ // SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
+ SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error)
}
// BackwardsExtremities keeps track of backwards extremities for a room.
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
index 675a7a17..a84d1987 100644
--- a/syncapi/streams/stream_presence.go
+++ b/syncapi/streams/stream_presence.go
@@ -67,9 +67,8 @@ func (p *PresenceStreamProvider) IncrementalSync(
// add newly joined rooms user presences
newlyJoined := joinedRooms(req.Response, req.Device.UserID)
if len(newlyJoined) > 0 {
- // TODO: This refreshes all lists and is quite expensive
- // The notifier should update the lists itself
- if err = p.notifier.Load(ctx, p.DB); err != nil {
+ // TODO: Check if this is working better than before.
+ if err = p.notifier.LoadRooms(ctx, p.DB, newlyJoined); err != nil {
req.Log.WithError(err).Error("unable to refresh notifier lists")
return from
}