diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-07-15 16:25:26 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-15 16:25:26 +0100 |
commit | 90bf01d8b107375d45aee9de79c1760a0363b189 (patch) | |
tree | e6220a07f788cbf27d9d13f10fd2d442b9b980f6 /syncapi/storage | |
parent | 69c86295f70b6756913df0bba37e30e0b3a6070e (diff) |
Use sync API database in `filterSharedUsers` (#2572)
* Add function to the sync API storage package for filtering shared users
* Use the database instead of asking the RS API
* Fix unit tests
* Fix map handling in `filterSharedUsers`
Diffstat (limited to 'syncapi/storage')
-rw-r--r-- | syncapi/storage/interface.go | 7 | ||||
-rw-r--r-- | syncapi/storage/postgres/current_room_state_table.go | 30 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 4 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/current_room_state_table.go | 34 | ||||
-rw-r--r-- | syncapi/storage/tables/interface.go | 2 |
5 files changed, 76 insertions, 1 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 5a036d88..05542603 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -27,6 +27,8 @@ import ( type Database interface { Presence + SharedUsers + MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error) @@ -165,3 +167,8 @@ type Presence interface { PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) } + +type SharedUsers interface { + // SharedUsers returns a subset of otherUserIDs that share a room with userID. + SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) +} diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 8ee387b3..c4667baf 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -107,6 +107,11 @@ const selectEventsWithEventIDsSQL = "" + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + " FROM syncapi_current_room_state WHERE event_id = ANY($1)" +const selectSharedUsersSQL = "" + + "SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" + + " SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" + + ") AND state_key = ANY($2) AND membership='join';" + type currentRoomStateStatements struct { upsertRoomStateStmt *sql.Stmt deleteRoomStateByEventIDStmt *sql.Stmt @@ -118,6 +123,7 @@ type currentRoomStateStatements struct { selectJoinedUsersInRoomStmt *sql.Stmt selectEventsWithEventIDsStmt *sql.Stmt selectStateEventStmt *sql.Stmt + selectSharedUsersStmt *sql.Stmt } func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) { @@ -156,6 +162,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil { return nil, err } + if s.selectSharedUsersStmt, err = db.Prepare(selectSharedUsersSQL); err != nil { + return nil, err + } return s, nil } @@ -379,3 +388,24 @@ func (s *currentRoomStateStatements) SelectStateEvent( } return &ev, err } + +func (s *currentRoomStateStatements) SelectSharedUsers( + ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string, +) ([]string, error) { + stmt := sqlutil.TxStmt(txn, s.selectSharedUsersStmt) + rows, err := stmt.QueryContext(ctx, userID, otherUserIDs) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed") + + var stateKey string + result := make([]string, 0, len(otherUserIDs)) + for rows.Next() { + if err := rows.Scan(&stateKey); err != nil { + return nil, err + } + result = append(result, stateKey) + } + return result, rows.Err() +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 76114aff..d1c5e2d1 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -176,6 +176,10 @@ func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]t return d.Peeks.SelectPeekingDevices(ctx) } +func (d *Database) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) { + return d.CurrentRoomState.SelectSharedUsers(ctx, nil, userID, otherUserIDs) +} + func (d *Database) GetStateEvent( ctx context.Context, roomID, evType, stateKey string, ) (*gomatrixserverlib.HeaderedEvent, error) { diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index f0a1c7bb..376c3a3d 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -91,6 +91,11 @@ const selectEventsWithEventIDsSQL = "" + "SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" + " FROM syncapi_current_room_state WHERE event_id IN ($1)" +const selectSharedUsersSQL = "" + + "SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" + + " SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" + + ") AND state_key IN ($2) AND membership='join';" + type currentRoomStateStatements struct { db *sql.DB streamIDStatements *StreamIDStatements @@ -100,8 +105,9 @@ type currentRoomStateStatements struct { selectRoomIDsWithMembershipStmt *sql.Stmt selectRoomIDsWithAnyMembershipStmt *sql.Stmt selectJoinedUsersStmt *sql.Stmt - //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic + //selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic selectStateEventStmt *sql.Stmt + //selectSharedUsersSQL *sql.Stmt - prepared at runtime due to variadic } func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) { @@ -396,3 +402,29 @@ func (s *currentRoomStateStatements) SelectStateEvent( } return &ev, err } + +func (s *currentRoomStateStatements) SelectSharedUsers( + ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string, +) ([]string, error) { + query := strings.Replace(selectSharedUsersSQL, "($2)", sqlutil.QueryVariadicOffset(len(otherUserIDs), 1), 1) + stmt, err := s.db.Prepare(query) + if err != nil { + return nil, fmt.Errorf("SelectSharedUsers s.db.Prepare: %w", err) + } + defer internal.CloseAndLogIfError(ctx, stmt, "SelectSharedUsers: stmt.close() failed") + rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, userID, otherUserIDs) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed") + + var stateKey string + result := make([]string, 0, len(otherUserIDs)) + for rows.Next() { + if err := rows.Scan(&stateKey); err != nil { + return nil, err + } + result = append(result, stateKey) + } + return result, rows.Err() +} diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index ccdebfdb..08568d9a 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -104,6 +104,8 @@ type CurrentRoomState interface { SelectJoinedUsers(ctx context.Context) (map[string][]string, error) // SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room. SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error) + // SelectSharedUsers returns a subset of otherUserIDs that share a room with userID. + SelectSharedUsers(ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string) ([]string, error) } // BackwardsExtremities keeps track of backwards extremities for a room. |