aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-04-26 10:53:17 +0200
committerGitHub <noreply@github.com>2022-04-26 10:53:17 +0200
commite8be2b234f616c8422372665c845d9a7a1af245f (patch)
treee7b058caf5ffebf9a7c589a0c45997e1b2b89490 /syncapi
parentfeac9db43fc459f1efa10424dfc96f8d54b55c64 (diff)
Add heroes to the room summary (#2373)
* Implement room summary heroes * Add passing tests * Move MembershipCount to addRoomSummary * Add comments, close Statement
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/storage/interface.go1
-rw-r--r--syncapi/storage/postgres/memberships_table.go38
-rw-r--r--syncapi/storage/shared/syncserver.go4
-rw-r--r--syncapi/storage/sqlite3/memberships_table.go51
-rw-r--r--syncapi/storage/tables/interface.go1
-rw-r--r--syncapi/streams/stream_pdu.go56
-rw-r--r--syncapi/streams/streams.go1
7 files changed, 127 insertions, 25 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 14cb08a5..0fea88da 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -39,6 +39,7 @@ type Database interface {
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
+ GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error)
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
diff --git a/syncapi/storage/postgres/memberships_table.go b/syncapi/storage/postgres/memberships_table.go
index 39fa656c..8c049977 100644
--- a/syncapi/storage/postgres/memberships_table.go
+++ b/syncapi/storage/postgres/memberships_table.go
@@ -19,6 +19,8 @@ import (
"database/sql"
"fmt"
+ "github.com/lib/pq"
+ "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -61,9 +63,13 @@ const selectMembershipCountSQL = "" +
" SELECT DISTINCT ON (room_id, user_id) room_id, user_id, membership FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 ORDER BY room_id, user_id, stream_pos DESC" +
") t WHERE t.membership = $3"
+const selectHeroesSQL = "" +
+ "SELECT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5"
+
type membershipsStatements struct {
upsertMembershipStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
+ selectHeroesStmt *sql.Stmt
}
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
@@ -72,13 +78,11 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
if err != nil {
return nil, err
}
- if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
- return nil, err
- }
- if s.selectMembershipCountStmt, err = db.Prepare(selectMembershipCountSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.upsertMembershipStmt, upsertMembershipSQL},
+ {&s.selectMembershipCountStmt, selectMembershipCountSQL},
+ {&s.selectHeroesStmt, selectHeroesSQL},
+ }.Prepare(db)
}
func (s *membershipsStatements) UpsertMembership(
@@ -108,3 +112,23 @@ func (s *membershipsStatements) SelectMembershipCount(
err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count)
return
}
+
+func (s *membershipsStatements) SelectHeroes(
+ ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string,
+) (heroes []string, err error) {
+ stmt := sqlutil.TxStmt(txn, s.selectHeroesStmt)
+ var rows *sql.Rows
+ rows, err = stmt.QueryContext(ctx, roomID, userID, pq.StringArray(memberships))
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed")
+ var hero string
+ for rows.Next() {
+ if err = rows.Scan(&hero); err != nil {
+ return
+ }
+ heroes = append(heroes, hero)
+ }
+ return heroes, rows.Err()
+}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 2143fd67..3c431db4 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -124,6 +124,10 @@ func (d *Database) MembershipCount(ctx context.Context, roomID, membership strin
return d.Memberships.SelectMembershipCount(ctx, nil, roomID, membership, pos)
}
+func (d *Database) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) {
+ return d.Memberships.SelectHeroes(ctx, nil, roomID, userID, memberships)
+}
+
func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
}
diff --git a/syncapi/storage/sqlite3/memberships_table.go b/syncapi/storage/sqlite3/memberships_table.go
index 9f3530cc..e4daa99c 100644
--- a/syncapi/storage/sqlite3/memberships_table.go
+++ b/syncapi/storage/sqlite3/memberships_table.go
@@ -18,7 +18,9 @@ import (
"context"
"database/sql"
"fmt"
+ "strings"
+ "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -61,10 +63,14 @@ const selectMembershipCountSQL = "" +
" SELECT * FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 GROUP BY user_id HAVING(max(stream_pos))" +
") t WHERE t.membership = $3"
+const selectHeroesSQL = "" +
+ "SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5"
+
type membershipsStatements struct {
db *sql.DB
upsertMembershipStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
+ //selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic
}
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
@@ -75,13 +81,11 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
if err != nil {
return nil, err
}
- if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
- return nil, err
- }
- if s.selectMembershipCountStmt, err = db.Prepare(selectMembershipCountSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.upsertMembershipStmt, upsertMembershipSQL},
+ {&s.selectMembershipCountStmt, selectMembershipCountSQL},
+ // {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic
+ }.Prepare(db)
}
func (s *membershipsStatements) UpsertMembership(
@@ -111,3 +115,36 @@ func (s *membershipsStatements) SelectMembershipCount(
err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count)
return
}
+
+func (s *membershipsStatements) SelectHeroes(
+ ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string,
+) (heroes []string, err error) {
+ stmtSQL := strings.Replace(selectHeroesSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
+ stmt, err := s.db.PrepareContext(ctx, stmtSQL)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, stmt, "SelectHeroes: stmt.close() failed")
+ params := []interface{}{
+ roomID, userID,
+ }
+ for _, membership := range memberships {
+ params = append(params, membership)
+ }
+
+ stmt = sqlutil.TxStmt(txn, stmt)
+ var rows *sql.Rows
+ rows, err = stmt.QueryContext(ctx, params...)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed")
+ var hero string
+ for rows.Next() {
+ if err = rows.Scan(&hero); err != nil {
+ return
+ }
+ heroes = append(heroes, hero)
+ }
+ return heroes, rows.Err()
+}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 993e2022..ac713dd5 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -170,6 +170,7 @@ type Receipts interface {
type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
+ SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
}
type NotificationData interface {
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index df5fb8e0..0d033095 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -4,13 +4,16 @@ import (
"context"
"database/sql"
"fmt"
+ "sort"
"sync"
"time"
"github.com/matrix-org/dendrite/internal/caching"
+ roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/tidwall/gjson"
"go.uber.org/atomic"
)
@@ -30,6 +33,7 @@ type PDUStreamProvider struct {
workers atomic.Int32
// userID+deviceID -> lazy loading cache
lazyLoadCache *caching.LazyLoadCache
+ rsAPI roomserverAPI.RoomserverInternalAPI
}
func (p *PDUStreamProvider) worker() {
@@ -290,16 +294,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
- // Work out how many members are in the room.
- joinedCount, _ := p.DB.MembershipCount(ctx, delta.RoomID, gomatrixserverlib.Join, latestPosition)
- invitedCount, _ := p.DB.MembershipCount(ctx, delta.RoomID, gomatrixserverlib.Invite, latestPosition)
-
switch delta.Membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
if hasMembershipChange {
- jr.Summary.JoinedMemberCount = &joinedCount
- jr.Summary.InvitedMemberCount = &invitedCount
+ p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition)
}
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
@@ -332,6 +331,45 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return latestPosition, nil
}
+func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
+ // Work out how many members are in the room.
+ joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
+ invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition)
+
+ jr.Summary.JoinedMemberCount = &joinedCount
+ jr.Summary.InvitedMemberCount = &invitedCount
+
+ fetchStates := []gomatrixserverlib.StateKeyTuple{
+ {EventType: gomatrixserverlib.MRoomName},
+ {EventType: gomatrixserverlib.MRoomCanonicalAlias},
+ }
+ // Check if the room has a name or a canonical alias
+ latestState := &roomserverAPI.QueryLatestEventsAndStateResponse{}
+ err := p.rsAPI.QueryLatestEventsAndState(ctx, &roomserverAPI.QueryLatestEventsAndStateRequest{StateToFetch: fetchStates, RoomID: roomID}, latestState)
+ if err != nil {
+ return
+ }
+ // Check if the room has a name or canonical alias, if so, return.
+ for _, ev := range latestState.StateEvents {
+ switch ev.Type() {
+ case gomatrixserverlib.MRoomName:
+ if gjson.GetBytes(ev.Content(), "name").Str != "" {
+ return
+ }
+ case gomatrixserverlib.MRoomCanonicalAlias:
+ if gjson.GetBytes(ev.Content(), "alias").Str != "" {
+ return
+ }
+ }
+ }
+ heroes, err := p.DB.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"})
+ if err != nil {
+ return
+ }
+ sort.Strings(heroes)
+ jr.Summary.Heroes = heroes
+}
+
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
roomID string,
@@ -416,9 +454,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
prevBatch.Decrement()
}
- // Work out how many members are in the room.
- joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, r.From)
- invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, r.From)
+ p.addRoomSummary(ctx, jr, roomID, device.UserID, r.From)
// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
@@ -439,8 +475,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
}
- jr.Summary.JoinedMemberCount = &joinedCount
- jr.Summary.InvitedMemberCount = &invitedCount
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index d3195b78..a18a0cc4 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -33,6 +33,7 @@ func NewSyncStreamProviders(
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
lazyLoadCache: lazyLoadCache,
+ rsAPI: rsAPI,
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},