diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-04-26 10:53:17 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-26 10:53:17 +0200 |
commit | e8be2b234f616c8422372665c845d9a7a1af245f (patch) | |
tree | e7b058caf5ffebf9a7c589a0c45997e1b2b89490 /syncapi | |
parent | feac9db43fc459f1efa10424dfc96f8d54b55c64 (diff) |
Add heroes to the room summary (#2373)
* Implement room summary heroes
* Add passing tests
* Move MembershipCount to addRoomSummary
* Add comments, close Statement
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/storage/interface.go | 1 | ||||
-rw-r--r-- | syncapi/storage/postgres/memberships_table.go | 38 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 4 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/memberships_table.go | 51 | ||||
-rw-r--r-- | syncapi/storage/tables/interface.go | 1 | ||||
-rw-r--r-- | syncapi/streams/stream_pdu.go | 56 | ||||
-rw-r--r-- | syncapi/streams/streams.go | 1 |
7 files changed, 127 insertions, 25 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 14cb08a5..0fea88da 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -39,6 +39,7 @@ type Database interface { GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error) + GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go index 39fa656c..8c049977 100644 --- a/syncapi/storage/postgres/memberships_table.go +++ b/syncapi/storage/postgres/memberships_table.go @@ -19,6 +19,8 @@ import ( "database/sql" "fmt" + "github.com/lib/pq" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" @@ -61,9 +63,13 @@ const selectMembershipCountSQL = "" + " SELECT DISTINCT ON (room_id, user_id) room_id, user_id, membership FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 ORDER BY room_id, user_id, stream_pos DESC" + ") t WHERE t.membership = $3" +const selectHeroesSQL = "" + + "SELECT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5" + type membershipsStatements struct { upsertMembershipStmt *sql.Stmt selectMembershipCountStmt *sql.Stmt + selectHeroesStmt *sql.Stmt } func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) { @@ -72,13 +78,11 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) { if err != nil { return nil, err } - if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil { - return nil, err - } - if s.selectMembershipCountStmt, err = db.Prepare(selectMembershipCountSQL); err != nil { - return nil, err - } - return s, nil + return s, sqlutil.StatementList{ + {&s.upsertMembershipStmt, upsertMembershipSQL}, + {&s.selectMembershipCountStmt, selectMembershipCountSQL}, + {&s.selectHeroesStmt, selectHeroesSQL}, + }.Prepare(db) } func (s *membershipsStatements) UpsertMembership( @@ -108,3 +112,23 @@ func (s *membershipsStatements) SelectMembershipCount( err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count) return } + +func (s *membershipsStatements) SelectHeroes( + ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string, +) (heroes []string, err error) { + stmt := sqlutil.TxStmt(txn, s.selectHeroesStmt) + var rows *sql.Rows + rows, err = stmt.QueryContext(ctx, roomID, userID, pq.StringArray(memberships)) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed") + var hero string + for rows.Next() { + if err = rows.Scan(&hero); err != nil { + return + } + heroes = append(heroes, hero) + } + return heroes, rows.Err() +} diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 2143fd67..3c431db4 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -124,6 +124,10 @@ func (d *Database) MembershipCount(ctx context.Context, roomID, membership strin return d.Memberships.SelectMembershipCount(ctx, nil, roomID, membership, pos) } +func (d *Database) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) { + return d.Memberships.SelectHeroes(ctx, nil, roomID, userID, memberships) +} + func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) { return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents) } diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go index 9f3530cc..e4daa99c 100644 --- a/syncapi/storage/sqlite3/memberships_table.go +++ b/syncapi/storage/sqlite3/memberships_table.go @@ -18,7 +18,9 @@ import ( "context" "database/sql" "fmt" + "strings" + "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" @@ -61,10 +63,14 @@ const selectMembershipCountSQL = "" + " SELECT * FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 GROUP BY user_id HAVING(max(stream_pos))" + ") t WHERE t.membership = $3" +const selectHeroesSQL = "" + + "SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5" + type membershipsStatements struct { db *sql.DB upsertMembershipStmt *sql.Stmt selectMembershipCountStmt *sql.Stmt + //selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic } func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) { @@ -75,13 +81,11 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) { if err != nil { return nil, err } - if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil { - return nil, err - } - if s.selectMembershipCountStmt, err = db.Prepare(selectMembershipCountSQL); err != nil { - return nil, err - } - return s, nil + return s, sqlutil.StatementList{ + {&s.upsertMembershipStmt, upsertMembershipSQL}, + {&s.selectMembershipCountStmt, selectMembershipCountSQL}, + // {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic + }.Prepare(db) } func (s *membershipsStatements) UpsertMembership( @@ -111,3 +115,36 @@ func (s *membershipsStatements) SelectMembershipCount( err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count) return } + +func (s *membershipsStatements) SelectHeroes( + ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string, +) (heroes []string, err error) { + stmtSQL := strings.Replace(selectHeroesSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1) + stmt, err := s.db.PrepareContext(ctx, stmtSQL) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, stmt, "SelectHeroes: stmt.close() failed") + params := []interface{}{ + roomID, userID, + } + for _, membership := range memberships { + params = append(params, membership) + } + + stmt = sqlutil.TxStmt(txn, stmt) + var rows *sql.Rows + rows, err = stmt.QueryContext(ctx, params...) + if err != nil { + return + } + defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed") + var hero string + for rows.Next() { + if err = rows.Scan(&hero); err != nil { + return + } + heroes = append(heroes, hero) + } + return heroes, rows.Err() +} diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 993e2022..ac713dd5 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -170,6 +170,7 @@ type Receipts interface { type Memberships interface { UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error) + SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error) } type NotificationData interface { diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index df5fb8e0..0d033095 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -4,13 +4,16 @@ import ( "context" "database/sql" "fmt" + "sort" "sync" "time" "github.com/matrix-org/dendrite/internal/caching" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" + "github.com/tidwall/gjson" "go.uber.org/atomic" ) @@ -30,6 +33,7 @@ type PDUStreamProvider struct { workers atomic.Int32 // userID+deviceID -> lazy loading cache lazyLoadCache *caching.LazyLoadCache + rsAPI roomserverAPI.RoomserverInternalAPI } func (p *PDUStreamProvider) worker() { @@ -290,16 +294,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( } } - // Work out how many members are in the room. - joinedCount, _ := p.DB.MembershipCount(ctx, delta.RoomID, gomatrixserverlib.Join, latestPosition) - invitedCount, _ := p.DB.MembershipCount(ctx, delta.RoomID, gomatrixserverlib.Invite, latestPosition) - switch delta.Membership { case gomatrixserverlib.Join: jr := types.NewJoinResponse() if hasMembershipChange { - jr.Summary.JoinedMemberCount = &joinedCount - jr.Summary.InvitedMemberCount = &invitedCount + p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition) } jr.Timeline.PrevBatch = &prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) @@ -332,6 +331,45 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return latestPosition, nil } +func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) { + // Work out how many members are in the room. + joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition) + invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition) + + jr.Summary.JoinedMemberCount = &joinedCount + jr.Summary.InvitedMemberCount = &invitedCount + + fetchStates := []gomatrixserverlib.StateKeyTuple{ + {EventType: gomatrixserverlib.MRoomName}, + {EventType: gomatrixserverlib.MRoomCanonicalAlias}, + } + // Check if the room has a name or a canonical alias + latestState := &roomserverAPI.QueryLatestEventsAndStateResponse{} + err := p.rsAPI.QueryLatestEventsAndState(ctx, &roomserverAPI.QueryLatestEventsAndStateRequest{StateToFetch: fetchStates, RoomID: roomID}, latestState) + if err != nil { + return + } + // Check if the room has a name or canonical alias, if so, return. + for _, ev := range latestState.StateEvents { + switch ev.Type() { + case gomatrixserverlib.MRoomName: + if gjson.GetBytes(ev.Content(), "name").Str != "" { + return + } + case gomatrixserverlib.MRoomCanonicalAlias: + if gjson.GetBytes(ev.Content(), "alias").Str != "" { + return + } + } + } + heroes, err := p.DB.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"}) + if err != nil { + return + } + sort.Strings(heroes) + jr.Summary.Heroes = heroes +} + func (p *PDUStreamProvider) getJoinResponseForCompleteSync( ctx context.Context, roomID string, @@ -416,9 +454,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( prevBatch.Decrement() } - // Work out how many members are in the room. - joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, r.From) - invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, r.From) + p.addRoomSummary(ctx, jr, roomID, device.UserID, r.From) // We don't include a device here as we don't need to send down // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: @@ -439,8 +475,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync( } } - jr.Summary.JoinedMemberCount = &joinedCount - jr.Summary.InvitedMemberCount = &invitedCount jr.Timeline.PrevBatch = prevBatch jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = limited diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index d3195b78..a18a0cc4 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -33,6 +33,7 @@ func NewSyncStreamProviders( PDUStreamProvider: &PDUStreamProvider{ StreamProvider: StreamProvider{DB: d}, lazyLoadCache: lazyLoadCache, + rsAPI: rsAPI, }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d}, |