aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--roomserver/api/query.go9
-rw-r--r--roomserver/internal/query/query.go26
-rw-r--r--roomserver/storage/interface.go7
-rw-r--r--roomserver/storage/postgres/events_table.go3
-rw-r--r--roomserver/storage/postgres/state_snapshot_table.go69
-rw-r--r--roomserver/storage/shared/storage.go6
-rw-r--r--roomserver/storage/sqlite3/state_snapshot_table.go5
-rw-r--r--roomserver/storage/tables/interface.go4
-rw-r--r--roomserver/storage/tables/state_snapshot_table_test.go2
-rw-r--r--syncapi/internal/history_visibility.go27
-rw-r--r--syncapi/routing/context.go67
-rw-r--r--syncapi/routing/messages.go44
-rw-r--r--syncapi/storage/interface.go2
-rw-r--r--syncapi/storage/postgres/current_room_state_table.go9
-rw-r--r--syncapi/storage/postgres/deltas/20230201152200_rename_index.go29
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go160
-rw-r--r--syncapi/storage/shared/storage_sync.go54
-rw-r--r--syncapi/storage/shared/storage_sync_test.go72
-rw-r--r--syncapi/storage/sqlite3/account_data_table.go4
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go9
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go88
-rw-r--r--syncapi/storage/storage_test.go73
-rw-r--r--syncapi/storage/tables/current_room_state_test.go40
-rw-r--r--syncapi/storage/tables/interface.go2
-rw-r--r--syncapi/streams/stream_pdu.go64
-rw-r--r--syncapi/syncapi_test.go173
-rw-r--r--syncapi/types/types.go5
27 files changed, 835 insertions, 218 deletions
diff --git a/roomserver/api/query.go b/roomserver/api/query.go
index 76f8298c..4ef548e1 100644
--- a/roomserver/api/query.go
+++ b/roomserver/api/query.go
@@ -433,7 +433,7 @@ func (r *QueryCurrentStateResponse) UnmarshalJSON(data []byte) error {
return nil
}
-// QueryMembershipAtEventRequest requests the membership events for a user
+// QueryMembershipAtEventRequest requests the membership event for a user
// for a list of eventIDs.
type QueryMembershipAtEventRequest struct {
RoomID string
@@ -443,9 +443,10 @@ type QueryMembershipAtEventRequest struct {
// QueryMembershipAtEventResponse is the response to QueryMembershipAtEventRequest.
type QueryMembershipAtEventResponse struct {
- // Memberships is a map from eventID to a list of events (if any). Events that
- // do not have known state will return an empty array here.
- Memberships map[string][]*gomatrixserverlib.HeaderedEvent `json:"memberships"`
+ // Membership is a map from eventID to membership event. Events that
+ // do not have known state will return a nil event, resulting in a "leave" membership
+ // when calculating history visibility.
+ Membership map[string]*gomatrixserverlib.HeaderedEvent `json:"membership"`
}
// QueryLeftUsersRequest is a request to calculate users that we (the server) don't share a
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
index 69d841dd..1083bb23 100644
--- a/roomserver/internal/query/query.go
+++ b/roomserver/internal/query/query.go
@@ -21,6 +21,7 @@ import (
"errors"
"fmt"
+ "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@@ -216,7 +217,8 @@ func (r *Queryer) QueryMembershipAtEvent(
request *api.QueryMembershipAtEventRequest,
response *api.QueryMembershipAtEventResponse,
) error {
- response.Memberships = make(map[string][]*gomatrixserverlib.HeaderedEvent)
+ response.Membership = make(map[string]*gomatrixserverlib.HeaderedEvent)
+
info, err := r.DB.RoomInfo(ctx, request.RoomID)
if err != nil {
return fmt.Errorf("unable to get roomInfo: %w", err)
@@ -234,7 +236,17 @@ func (r *Queryer) QueryMembershipAtEvent(
return fmt.Errorf("requested stateKeyNID for %s was not found", request.UserID)
}
- stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, info, request.EventIDs, stateKeyNIDs[request.UserID])
+ response.Membership, err = r.DB.GetMembershipForHistoryVisibility(ctx, stateKeyNIDs[request.UserID], info, request.EventIDs...)
+ switch err {
+ case nil:
+ return nil
+ case tables.OptimisationNotSupportedError: // fallthrough, slow way of getting the membership events for each event
+ default:
+ return err
+ }
+
+ response.Membership = make(map[string]*gomatrixserverlib.HeaderedEvent)
+ stateEntries, err := helpers.MembershipAtEvent(ctx, r.DB, nil, request.EventIDs, stateKeyNIDs[request.UserID])
if err != nil {
return fmt.Errorf("unable to get state before event: %w", err)
}
@@ -258,7 +270,7 @@ func (r *Queryer) QueryMembershipAtEvent(
for _, eventID := range request.EventIDs {
stateEntry, ok := stateEntries[eventID]
if !ok || len(stateEntry) == 0 {
- response.Memberships[eventID] = []*gomatrixserverlib.HeaderedEvent{}
+ response.Membership[eventID] = nil
continue
}
@@ -275,15 +287,15 @@ func (r *Queryer) QueryMembershipAtEvent(
return fmt.Errorf("unable to get memberships at state: %w", err)
}
- res := make([]*gomatrixserverlib.HeaderedEvent, 0, len(memberships))
-
+ // Iterate over all membership events we got. Given we only query the membership for
+ // one user and assuming this user only ever has one membership event associated to
+ // a given event, overwrite any other existing membership events.
for i := range memberships {
ev := memberships[i]
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(request.UserID) {
- res = append(res, ev.Headered(info.RoomVersion))
+ response.Membership[eventID] = ev.Event.Headered(info.RoomVersion)
}
}
- response.Memberships[eventID] = res
}
return nil
diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go
index e0b9c56b..99891512 100644
--- a/roomserver/storage/interface.go
+++ b/roomserver/storage/interface.go
@@ -175,4 +175,11 @@ type Database interface {
GetLeftUsers(ctx context.Context, userIDs []string) ([]string, error)
PurgeRoom(ctx context.Context, roomID string) error
UpgradeRoom(ctx context.Context, oldRoomID, newRoomID, eventSender string) error
+
+ // GetMembershipForHistoryVisibility queries the membership events for the given eventIDs.
+ // Returns a map from (input) eventID -> membership event. If no membership event is found, returns an empty event, resulting in
+ // a membership of "leave" when calculating history visibility.
+ GetMembershipForHistoryVisibility(
+ ctx context.Context, userNID types.EventStateKeyNID, info *types.RoomInfo, eventIDs ...string,
+ ) (map[string]*gomatrixserverlib.HeaderedEvent, error)
}
diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go
index 9b5ed6ed..f4a21c8a 100644
--- a/roomserver/storage/postgres/events_table.go
+++ b/roomserver/storage/postgres/events_table.go
@@ -69,6 +69,9 @@ CREATE TABLE IF NOT EXISTS roomserver_events (
auth_event_nids BIGINT[] NOT NULL,
is_rejected BOOLEAN NOT NULL DEFAULT FALSE
);
+
+-- Create an index which helps in resolving membership events (event_type_nid = 5) - (used for history visibility)
+CREATE INDEX IF NOT EXISTS roomserver_events_memberships_idx ON roomserver_events (room_nid, event_state_key_nid) WHERE (event_type_nid = 5);
`
const insertEventSQL = "" +
diff --git a/roomserver/storage/postgres/state_snapshot_table.go b/roomserver/storage/postgres/state_snapshot_table.go
index a00c026f..0e83cfc2 100644
--- a/roomserver/storage/postgres/state_snapshot_table.go
+++ b/roomserver/storage/postgres/state_snapshot_table.go
@@ -21,10 +21,10 @@ import (
"fmt"
"github.com/lib/pq"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/matrix-org/dendrite/internal/sqlutil"
- "github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
)
@@ -99,10 +99,26 @@ const bulkSelectStateForHistoryVisibilitySQL = `
AND (event_type_nid = 7 OR event_state_key LIKE '%:' || $2);
`
+// bulkSelectMembershipForHistoryVisibilitySQL is an optimization to get membership events for a specific user for defined set of events.
+// Returns the event_id of the event we want the membership event for, the event_id of the membership event and the membership event JSON.
+const bulkSelectMembershipForHistoryVisibilitySQL = `
+SELECT re.event_id, re2.event_id, rej.event_json
+FROM roomserver_events re
+LEFT JOIN roomserver_state_snapshots rss on re.state_snapshot_nid = rss.state_snapshot_nid
+CROSS JOIN unnest(rss.state_block_nids) AS blocks(block_nid)
+LEFT JOIN roomserver_state_block rsb ON rsb.state_block_nid = blocks.block_nid
+CROSS JOIN unnest(rsb.event_nids) AS rsb2(event_nid)
+JOIN roomserver_events re2 ON re2.room_nid = $3 AND re2.event_type_nid = 5 AND re2.event_nid = rsb2.event_nid AND re2.event_state_key_nid = $1
+LEFT JOIN roomserver_event_json rej ON rej.event_nid = re2.event_nid
+WHERE re.event_id = ANY($2)
+
+`
+
type stateSnapshotStatements struct {
- insertStateStmt *sql.Stmt
- bulkSelectStateBlockNIDsStmt *sql.Stmt
- bulkSelectStateForHistoryVisibilityStmt *sql.Stmt
+ insertStateStmt *sql.Stmt
+ bulkSelectStateBlockNIDsStmt *sql.Stmt
+ bulkSelectStateForHistoryVisibilityStmt *sql.Stmt
+ bulktSelectMembershipForHistoryVisibilityStmt *sql.Stmt
}
func CreateStateSnapshotTable(db *sql.DB) error {
@@ -110,13 +126,14 @@ func CreateStateSnapshotTable(db *sql.DB) error {
return err
}
-func PrepareStateSnapshotTable(db *sql.DB) (tables.StateSnapshot, error) {
+func PrepareStateSnapshotTable(db *sql.DB) (*stateSnapshotStatements, error) {
s := &stateSnapshotStatements{}
return s, sqlutil.StatementList{
{&s.insertStateStmt, insertStateSQL},
{&s.bulkSelectStateBlockNIDsStmt, bulkSelectStateBlockNIDsSQL},
{&s.bulkSelectStateForHistoryVisibilityStmt, bulkSelectStateForHistoryVisibilitySQL},
+ {&s.bulktSelectMembershipForHistoryVisibilityStmt, bulkSelectMembershipForHistoryVisibilitySQL},
}.Prepare(db)
}
@@ -185,3 +202,45 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
}
return results, rows.Err()
}
+
+func (s *stateSnapshotStatements) BulkSelectMembershipForHistoryVisibility(
+ ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string,
+) (map[string]*gomatrixserverlib.HeaderedEvent, error) {
+ stmt := sqlutil.TxStmt(txn, s.bulktSelectMembershipForHistoryVisibilityStmt)
+ rows, err := stmt.QueryContext(ctx, userNID, pq.Array(eventIDs), roomInfo.RoomNID)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close() // nolint: errcheck
+ result := make(map[string]*gomatrixserverlib.HeaderedEvent, len(eventIDs))
+ var evJson []byte
+ var eventID string
+ var membershipEventID string
+
+ knownEvents := make(map[string]*gomatrixserverlib.HeaderedEvent, len(eventIDs))
+
+ for rows.Next() {
+ if err = rows.Scan(&eventID, &membershipEventID, &evJson); err != nil {
+ return nil, err
+ }
+ if len(evJson) == 0 {
+ result[eventID] = &gomatrixserverlib.HeaderedEvent{}
+ continue
+ }
+ // If we already know this event, don't try to marshal the json again
+ if ev, ok := knownEvents[membershipEventID]; ok {
+ result[eventID] = ev
+ continue
+ }
+ event, err := gomatrixserverlib.NewEventFromTrustedJSON(evJson, false, roomInfo.RoomVersion)
+ if err != nil {
+ result[eventID] = &gomatrixserverlib.HeaderedEvent{}
+ // not fatal
+ continue
+ }
+ he := event.Headered(roomInfo.RoomVersion)
+ result[eventID] = he
+ knownEvents[membershipEventID] = he
+ }
+ return result, rows.Err()
+}
diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go
index 654b078d..f8672496 100644
--- a/roomserver/storage/shared/storage.go
+++ b/roomserver/storage/shared/storage.go
@@ -51,6 +51,12 @@ func (d *Database) SupportsConcurrentRoomInputs() bool {
return true
}
+func (d *Database) GetMembershipForHistoryVisibility(
+ ctx context.Context, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string,
+) (map[string]*gomatrixserverlib.HeaderedEvent, error) {
+ return d.StateSnapshotTable.BulkSelectMembershipForHistoryVisibility(ctx, nil, userNID, roomInfo, eventIDs...)
+}
+
func (d *Database) EventTypeNIDs(
ctx context.Context, eventTypes []string,
) (map[string]types.EventTypeNID, error) {
diff --git a/roomserver/storage/sqlite3/state_snapshot_table.go b/roomserver/storage/sqlite3/state_snapshot_table.go
index 930ad14d..e57e1a4b 100644
--- a/roomserver/storage/sqlite3/state_snapshot_table.go
+++ b/roomserver/storage/sqlite3/state_snapshot_table.go
@@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@@ -152,6 +153,10 @@ func (s *stateSnapshotStatements) BulkSelectStateForHistoryVisibility(
return nil, tables.OptimisationNotSupportedError
}
+func (s *stateSnapshotStatements) BulkSelectMembershipForHistoryVisibility(ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string) (map[string]*gomatrixserverlib.HeaderedEvent, error) {
+ return nil, tables.OptimisationNotSupportedError
+}
+
func (s *stateSnapshotStatements) selectStateBlockNIDsForRoomNID(
ctx context.Context, txn *sql.Tx, roomNID types.RoomNID,
) ([]types.StateBlockNID, error) {
diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go
index 64145f83..c7f1064d 100644
--- a/roomserver/storage/tables/interface.go
+++ b/roomserver/storage/tables/interface.go
@@ -91,6 +91,10 @@ type StateSnapshot interface {
// which users are in a room faster than having to load the entire room state. In the
// case of SQLite, this will return tables.OptimisationNotSupportedError.
BulkSelectStateForHistoryVisibility(ctx context.Context, txn *sql.Tx, stateSnapshotNID types.StateSnapshotNID, domain string) ([]types.EventNID, error)
+
+ BulkSelectMembershipForHistoryVisibility(
+ ctx context.Context, txn *sql.Tx, userNID types.EventStateKeyNID, roomInfo *types.RoomInfo, eventIDs ...string,
+ ) (map[string]*gomatrixserverlib.HeaderedEvent, error)
}
type StateBlock interface {
diff --git a/roomserver/storage/tables/state_snapshot_table_test.go b/roomserver/storage/tables/state_snapshot_table_test.go
index b2e59377..c7c991b2 100644
--- a/roomserver/storage/tables/state_snapshot_table_test.go
+++ b/roomserver/storage/tables/state_snapshot_table_test.go
@@ -29,6 +29,8 @@ func mustCreateStateSnapshotTable(t *testing.T, dbType test.DBType) (tab tables.
assert.NoError(t, err)
err = postgres.CreateEventsTable(db)
assert.NoError(t, err)
+ err = postgres.CreateEventJSONTable(db)
+ assert.NoError(t, err)
err = postgres.CreateStateBlockTable(db)
assert.NoError(t, err)
// ... and then the snapshot table itself
diff --git a/syncapi/internal/history_visibility.go b/syncapi/internal/history_visibility.go
index 71d7ddd1..87c59e03 100644
--- a/syncapi/internal/history_visibility.go
+++ b/syncapi/internal/history_visibility.go
@@ -121,10 +121,7 @@ func ApplyHistoryVisibilityFilter(
// Get the mapping from eventID -> eventVisibility
eventsFiltered := make([]*gomatrixserverlib.HeaderedEvent, 0, len(events))
- visibilities, err := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID())
- if err != nil {
- return eventsFiltered, err
- }
+ visibilities := visibilityForEvents(ctx, rsAPI, events, userID, events[0].RoomID())
for _, ev := range events {
evVis := visibilities[ev.EventID()]
evVis.membershipCurrent = membershipCurrent
@@ -175,7 +172,7 @@ func visibilityForEvents(
rsAPI api.SyncRoomserverAPI,
events []*gomatrixserverlib.HeaderedEvent,
userID, roomID string,
-) (map[string]eventVisibility, error) {
+) map[string]eventVisibility {
eventIDs := make([]string, len(events))
for i := range events {
eventIDs[i] = events[i].EventID()
@@ -185,6 +182,7 @@ func visibilityForEvents(
// get the membership events for all eventIDs
membershipResp := &api.QueryMembershipAtEventResponse{}
+
err := rsAPI.QueryMembershipAtEvent(ctx, &api.QueryMembershipAtEventRequest{
RoomID: roomID,
EventIDs: eventIDs,
@@ -201,19 +199,20 @@ func visibilityForEvents(
membershipAtEvent: gomatrixserverlib.Leave, // default to leave, to not expose events by accident
visibility: event.Visibility,
}
- membershipEvs, ok := membershipResp.Memberships[eventID]
- if !ok {
+ ev, ok := membershipResp.Membership[eventID]
+ if !ok || ev == nil {
result[eventID] = vis
continue
}
- for _, ev := range membershipEvs {
- membership, err := ev.Membership()
- if err != nil {
- return result, err
- }
- vis.membershipAtEvent = membership
+
+ membership, err := ev.Membership()
+ if err != nil {
+ result[eventID] = vis
+ continue
}
+ vis.membershipAtEvent = membership
+
result[eventID] = vis
}
- return result, nil
+ return result
}
diff --git a/syncapi/routing/context.go b/syncapi/routing/context.go
index 095a868c..76f00367 100644
--- a/syncapi/routing/context.go
+++ b/syncapi/routing/context.go
@@ -67,6 +67,8 @@ func Context(
errMsg = "unable to parse filter"
case *strconv.NumError:
errMsg = "unable to parse limit"
+ default:
+ errMsg = err.Error()
}
return util.JSONResponse{
Code: http.StatusBadRequest,
@@ -167,7 +169,18 @@ func Context(
eventsBeforeClient := gomatrixserverlib.HeaderedToClientEvents(eventsBeforeFiltered, gomatrixserverlib.FormatAll)
eventsAfterClient := gomatrixserverlib.HeaderedToClientEvents(eventsAfterFiltered, gomatrixserverlib.FormatAll)
- newState := applyLazyLoadMembers(device, filter, eventsAfterClient, eventsBeforeClient, state, lazyLoadCache)
+
+ newState := state
+ if filter.LazyLoadMembers {
+ allEvents := append(eventsBeforeFiltered, eventsAfterFiltered...)
+ allEvents = append(allEvents, &requestedEvent)
+ evs := gomatrixserverlib.HeaderedToClientEvents(allEvents, gomatrixserverlib.FormatAll)
+ newState, err = applyLazyLoadMembers(ctx, device, snapshot, roomID, evs, lazyLoadCache)
+ if err != nil {
+ logrus.WithError(err).Error("unable to load membership events")
+ return jsonerror.InternalServerError()
+ }
+ }
ev := gomatrixserverlib.HeaderedToClientEvent(&requestedEvent, gomatrixserverlib.FormatAll)
response := ContextRespsonse{
@@ -244,41 +257,43 @@ func getStartEnd(ctx context.Context, snapshot storage.DatabaseTransaction, star
}
func applyLazyLoadMembers(
+ ctx context.Context,
device *userapi.Device,
- filter *gomatrixserverlib.RoomEventFilter,
- eventsAfter, eventsBefore []gomatrixserverlib.ClientEvent,
- state []*gomatrixserverlib.HeaderedEvent,
+ snapshot storage.DatabaseTransaction,
+ roomID string,
+ events []gomatrixserverlib.ClientEvent,
lazyLoadCache caching.LazyLoadCache,
-) []*gomatrixserverlib.HeaderedEvent {
- if filter == nil || !filter.LazyLoadMembers {
- return state
- }
- allEvents := append(eventsBefore, eventsAfter...)
- x := make(map[string]struct{})
+) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ eventSenders := make(map[string]struct{})
// get members who actually send an event
- for _, e := range allEvents {
+ for _, e := range events {
// Don't add membership events the client should already know about
if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, e.RoomID, e.Sender); cached {
continue
}
- x[e.Sender] = struct{}{}
+ eventSenders[e.Sender] = struct{}{}
}
- newState := []*gomatrixserverlib.HeaderedEvent{}
- membershipEvents := []*gomatrixserverlib.HeaderedEvent{}
- for _, event := range state {
- if event.Type() != gomatrixserverlib.MRoomMember {
- newState = append(newState, event)
- } else {
- // did the user send an event?
- if _, ok := x[event.Sender()]; ok {
- membershipEvents = append(membershipEvents, event)
- lazyLoadCache.StoreLazyLoadedUser(device, event.RoomID(), event.Sender(), event.EventID())
- }
- }
+ wantUsers := make([]string, 0, len(eventSenders))
+ for userID := range eventSenders {
+ wantUsers = append(wantUsers, userID)
+ }
+
+ // Query missing membership events
+ filter := gomatrixserverlib.DefaultStateFilter()
+ filter.Senders = &wantUsers
+ filter.Types = &[]string{gomatrixserverlib.MRoomMember}
+ memberships, err := snapshot.GetStateEventsForRoom(ctx, roomID, &filter)
+ if err != nil {
+ return nil, err
}
- // Add the membershipEvents to the end of the list, to make Sytest happy
- return append(newState, membershipEvents...)
+
+ // cache the membership events
+ for _, membership := range memberships {
+ lazyLoadCache.StoreLazyLoadedUser(device, roomID, *membership.StateKey(), membership.EventID())
+ }
+
+ return memberships, nil
}
func parseRoomEventFilter(req *http.Request) (*gomatrixserverlib.RoomEventFilter, error) {
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 4a01ec35..02d8fcc7 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -64,6 +64,7 @@ type messagesResp struct {
// OnIncomingMessagesRequest implements the /messages endpoint from the
// client-server API.
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
+// nolint:gocyclo
func OnIncomingMessagesRequest(
req *http.Request, db storage.Database, roomID string, device *userapi.Device,
rsAPI api.SyncRoomserverAPI,
@@ -246,7 +247,14 @@ func OnIncomingMessagesRequest(
Start: start.String(),
End: end.String(),
}
- res.applyLazyLoadMembers(req.Context(), snapshot, roomID, device, filter.LazyLoadMembers, lazyLoadCache)
+ if filter.LazyLoadMembers {
+ membershipEvents, err := applyLazyLoadMembers(req.Context(), device, snapshot, roomID, clientEvents, lazyLoadCache)
+ if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("failed to apply lazy loading")
+ return jsonerror.InternalServerError()
+ }
+ res.State = append(res.State, gomatrixserverlib.HeaderedToClientEvents(membershipEvents, gomatrixserverlib.FormatAll)...)
+ }
// If we didn't return any events, set the end to an empty string, so it will be omitted
// in the response JSON.
@@ -265,40 +273,6 @@ func OnIncomingMessagesRequest(
}
}
-// applyLazyLoadMembers loads membership events for users returned in Chunk, if the filter has
-// LazyLoadMembers enabled.
-func (m *messagesResp) applyLazyLoadMembers(
- ctx context.Context,
- db storage.DatabaseTransaction,
- roomID string,
- device *userapi.Device,
- lazyLoad bool,
- lazyLoadCache caching.LazyLoadCache,
-) {
- if !lazyLoad {
- return
- }
- membershipToUser := make(map[string]*gomatrixserverlib.HeaderedEvent)
- for _, evt := range m.Chunk {
- // Don't add membership events the client should already know about
- if _, cached := lazyLoadCache.IsLazyLoadedUserCached(device, roomID, evt.Sender); cached {
- continue
- }
- membership, err := db.GetStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, evt.Sender)
- if err != nil {
- util.GetLogger(ctx).WithError(err).Error("failed to get membership event for user")
- continue
- }
- if membership != nil {
- membershipToUser[evt.Sender] = membership
- lazyLoadCache.StoreLazyLoadedUser(device, roomID, evt.Sender, membership.EventID())
- }
- }
- for _, evt := range membershipToUser {
- m.State = append(m.State, gomatrixserverlib.HeaderedToClientEvent(evt, gomatrixserverlib.FormatAll))
- }
-}
-
func getMembershipForUser(ctx context.Context, roomID, userID string, rsAPI api.SyncRoomserverAPI) (resp api.QueryMembershipForUserResponse, err error) {
req := api.QueryMembershipForUserRequest{
RoomID: roomID,
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index a7a127e3..04c2020a 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -46,7 +46,7 @@ type DatabaseTransaction interface {
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
GetRoomSummary(ctx context.Context, roomID, userID string) (summary *types.Summary, err error)
- RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
+ RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
GetBackwardTopologyPos(ctx context.Context, events []*gomatrixserverlib.HeaderedEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go
index 3caafa14..0d607b7c 100644
--- a/syncapi/storage/postgres/current_room_state_table.go
+++ b/syncapi/storage/postgres/current_room_state_table.go
@@ -275,6 +275,15 @@ func (s *currentRoomStateStatements) SelectCurrentState(
) ([]*gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
senders, notSenders := getSendersStateFilterFilter(stateFilter)
+ // We're going to query members later, so remove them from this request
+ if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers {
+ notTypes := &[]string{gomatrixserverlib.MRoomMember}
+ if stateFilter.NotTypes != nil {
+ *stateFilter.NotTypes = append(*stateFilter.NotTypes, gomatrixserverlib.MRoomMember)
+ } else {
+ stateFilter.NotTypes = notTypes
+ }
+ }
rows, err := stmt.QueryContext(ctx, roomID,
pq.StringArray(senders),
pq.StringArray(notSenders),
diff --git a/syncapi/storage/postgres/deltas/20230201152200_rename_index.go b/syncapi/storage/postgres/deltas/20230201152200_rename_index.go
new file mode 100644
index 00000000..5a0ec505
--- /dev/null
+++ b/syncapi/storage/postgres/deltas/20230201152200_rename_index.go
@@ -0,0 +1,29 @@
+// Copyright 2023 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package deltas
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+)
+
+func UpRenameOutputRoomEventsIndex(ctx context.Context, tx *sql.Tx) error {
+ _, err := tx.ExecContext(ctx, `ALTER TABLE syncapi_output_room_events RENAME CONSTRAINT syncapi_event_id_idx TO syncapi_output_room_event_id_idx;`)
+ if err != nil {
+ return fmt.Errorf("failed to execute upgrade: %w", err)
+ }
+ return nil
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 0075fc8d..59fb99aa 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -19,18 +19,17 @@ import (
"context"
"database/sql"
"encoding/json"
+ "fmt"
"sort"
+ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
-
- "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
-
- "github.com/matrix-org/dendrite/internal/sqlutil"
)
const outputRoomEventsSchema = `
@@ -44,7 +43,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- This isn't a problem for us since we just want to order by this field.
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- The event ID for the event
- event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE,
+ event_id TEXT NOT NULL CONSTRAINT syncapi_output_room_event_id_idx UNIQUE,
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
-- The headered JSON for the event, containing potentially additional metadata such as
@@ -79,13 +78,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_room_id_idx ON syncapi_out
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON syncapi_output_room_events (exclude_from_sync);
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_add_state_ids_idx ON syncapi_output_room_events ((add_state_ids IS NOT NULL));
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_remove_state_ids_idx ON syncapi_output_room_events ((remove_state_ids IS NOT NULL));
+CREATE INDEX IF NOT EXISTS syncapi_output_room_events_recent_events_idx ON syncapi_output_room_events (room_id, exclude_from_sync, id, sender, type);
+
+
`
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
- "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
+ "ON CONFLICT ON CONSTRAINT syncapi_output_room_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
"RETURNING id"
const selectEventsSQL = "" +
@@ -109,14 +111,29 @@ const selectRecentEventsSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $8"
-const selectRecentEventsForSyncSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
- " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
- " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
- " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
- " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
- " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
- " ORDER BY id DESC LIMIT $8"
+// selectRecentEventsForSyncSQL contains an optimization to get the recent events for a list of rooms, using a LATERAL JOIN
+// The sub select inside LATERAL () is executed for all room_ids it gets as a parameter $1
+const selectRecentEventsForSyncSQL = `
+WITH room_ids AS (
+ SELECT unnest($1::text[]) AS room_id
+)
+SELECT x.*
+FROM room_ids,
+ LATERAL (
+ SELECT room_id, event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility
+ FROM syncapi_output_room_events recent_events
+ WHERE
+ recent_events.room_id = room_ids.room_id
+ AND recent_events.exclude_from_sync = FALSE
+ AND id > $2 AND id <= $3
+ AND ( $4::text[] IS NULL OR sender = ANY($4) )
+ AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )
+ AND ( $6::text[] IS NULL OR type LIKE ANY($6) )
+ AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )
+ ORDER BY recent_events.id DESC
+ LIMIT $8
+ ) AS x
+`
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
@@ -207,12 +224,30 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
return nil, err
}
+ migrationName := "syncapi: rename dupe index (output_room_events)"
+
+ var cName string
+ err = db.QueryRowContext(context.Background(), "select constraint_name from information_schema.table_constraints where table_name = 'syncapi_output_room_events' AND constraint_name = 'syncapi_event_id_idx'").Scan(&cName)
+ switch err {
+ case sql.ErrNoRows: // migration was already executed, as the index was renamed
+ if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil {
+ return nil, fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
+ }
+ case nil:
+ default:
+ return nil, err
+ }
+
m := sqlutil.NewMigrator(db)
m.AddMigrations(
sqlutil.Migration{
Version: "syncapi: add history visibility column (output_room_events)",
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
},
+ sqlutil.Migration{
+ Version: migrationName,
+ Up: deltas.UpRenameOutputRoomEventsIndex,
+ },
)
err = m.Up(context.Background())
if err != nil {
@@ -398,9 +433,9 @@ func (s *outputRoomEventsStatements) InsertEvent(
// from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
+ roomIDs []string, ra types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool,
-) ([]types.StreamEvent, bool, error) {
+) (map[string]types.RecentEvents, error) {
var stmt *sql.Stmt
if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
@@ -408,8 +443,9 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
}
senders, notSenders := getSendersRoomEventFilter(eventFilter)
+
rows, err := stmt.QueryContext(
- ctx, roomID, r.Low(), r.High(),
+ ctx, pq.StringArray(roomIDs), ra.Low(), ra.High(),
pq.StringArray(senders),
pq.StringArray(notSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
@@ -417,34 +453,80 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
eventFilter.Limit+1,
)
if err != nil {
- return nil, false, err
+ return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
- events, err := rowsToStreamEvents(rows)
- if err != nil {
- return nil, false, err
- }
- if chronologicalOrder {
- // The events need to be returned from oldest to latest, which isn't
- // necessary the way the SQL query returns them, so a sort is necessary to
- // ensure the events are in the right order in the slice.
- sort.SliceStable(events, func(i int, j int) bool {
- return events[i].StreamPosition < events[j].StreamPosition
+
+ result := make(map[string]types.RecentEvents)
+
+ for rows.Next() {
+ var (
+ roomID string
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ sessionID *int64
+ txnID *string
+ transactionID *api.TransactionID
+ historyVisibility gomatrixserverlib.HistoryVisibility
+ )
+ if err := rows.Scan(&roomID, &eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+
+ if sessionID != nil && txnID != nil {
+ transactionID = &api.TransactionID{
+ SessionID: *sessionID,
+ TransactionID: *txnID,
+ }
+ }
+
+ r := result[roomID]
+
+ ev.Visibility = historyVisibility
+ r.Events = append(r.Events, types.StreamEvent{
+ HeaderedEvent: &ev,
+ StreamPosition: streamPos,
+ TransactionID: transactionID,
+ ExcludeFromSync: excludeFromSync,
})
+
+ result[roomID] = r
}
- // we queried for 1 more than the limit, so if we returned one more mark limited=true
- limited := false
- if len(events) > eventFilter.Limit {
- limited = true
- // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
- if chronologicalOrder {
- events = events[1:]
- } else {
- events = events[:len(events)-1]
+
+ if chronologicalOrder {
+ for roomID, evs := range result {
+ // The events need to be returned from oldest to latest, which isn't
+ // necessary the way the SQL query returns them, so a sort is necessary to
+ // ensure the events are in the right order in the slice.
+ sort.SliceStable(evs.Events, func(i int, j int) bool {
+ return evs.Events[i].StreamPosition < evs.Events[j].StreamPosition
+ })
+
+ if len(evs.Events) > eventFilter.Limit {
+ evs.Limited = true
+ evs.Events = evs.Events[1:]
+ }
+
+ result[roomID] = evs
}
- }
+ } else {
+ for roomID, evs := range result {
+ if len(evs.Events) > eventFilter.Limit {
+ evs.Limited = true
+ evs.Events = evs.Events[:len(evs.Events)-1]
+ }
- return events, limited, nil
+ result[roomID] = evs
+ }
+ }
+ return result, rows.Err()
}
// selectEarlyEvents returns the earliest events in the given room, starting
diff --git a/syncapi/storage/shared/storage_sync.go b/syncapi/storage/shared/storage_sync.go
index 8385b95a..931bc9e2 100644
--- a/syncapi/storage/shared/storage_sync.go
+++ b/syncapi/storage/shared/storage_sync.go
@@ -151,8 +151,8 @@ func (d *DatabaseTransaction) GetRoomSummary(ctx context.Context, roomID, userID
return summary, nil
}
-func (d *DatabaseTransaction) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
- return d.OutputEvents.SelectRecentEvents(ctx, d.txn, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
+func (d *DatabaseTransaction) RecentEvents(ctx context.Context, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error) {
+ return d.OutputEvents.SelectRecentEvents(ctx, d.txn, roomIDs, r, eventFilter, chronologicalOrder, onlySyncEvents)
}
func (d *DatabaseTransaction) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
@@ -370,19 +370,25 @@ func (d *DatabaseTransaction) GetStateDeltas(
}
// get all the state events ever (i.e. for all available rooms) between these two positions
- stateNeededFiltered, eventMapFiltered, err := d.OutputEvents.SelectStateInRange(ctx, d.txn, r, stateFilter, allRoomIDs)
- if err != nil {
- if err == sql.ErrNoRows {
- return nil, nil, nil
+ stateFiltered := state
+ // avoid hitting the database if the result would be the same as above
+ if !isStatefilterEmpty(stateFilter) {
+ var stateNeededFiltered map[string]map[string]bool
+ var eventMapFiltered map[string]types.StreamEvent
+ stateNeededFiltered, eventMapFiltered, err = d.OutputEvents.SelectStateInRange(ctx, d.txn, r, stateFilter, allRoomIDs)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ return nil, nil, nil
+ }
+ return nil, nil, err
}
- return nil, nil, err
- }
- stateFiltered, err := d.fetchStateEvents(ctx, d.txn, stateNeededFiltered, eventMapFiltered)
- if err != nil {
- if err == sql.ErrNoRows {
- return nil, nil, nil
+ stateFiltered, err = d.fetchStateEvents(ctx, d.txn, stateNeededFiltered, eventMapFiltered)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ return nil, nil, nil
+ }
+ return nil, nil, err
}
- return nil, nil, err
}
// find out which rooms this user is peeking, if any.
@@ -701,6 +707,28 @@ func (d *DatabaseTransaction) MaxStreamPositionForRelations(ctx context.Context)
return types.StreamPosition(id), err
}
+func isStatefilterEmpty(filter *gomatrixserverlib.StateFilter) bool {
+ if filter == nil {
+ return true
+ }
+ switch {
+ case filter.NotTypes != nil && len(*filter.NotTypes) > 0:
+ return false
+ case filter.Types != nil && len(*filter.Types) > 0:
+ return false
+ case filter.Senders != nil && len(*filter.Senders) > 0:
+ return false
+ case filter.NotSenders != nil && len(*filter.NotSenders) > 0:
+ return false
+ case filter.NotRooms != nil && len(*filter.NotRooms) > 0:
+ return false
+ case filter.ContainsURL != nil:
+ return false
+ default:
+ return true
+ }
+}
+
func (d *DatabaseTransaction) RelationsFor(ctx context.Context, roomID, eventID, relType, eventType string, from, to types.StreamPosition, backwards bool, limit int) (
events []types.StreamEvent, prevBatch, nextBatch string, err error,
) {
diff --git a/syncapi/storage/shared/storage_sync_test.go b/syncapi/storage/shared/storage_sync_test.go
new file mode 100644
index 00000000..c56720db
--- /dev/null
+++ b/syncapi/storage/shared/storage_sync_test.go
@@ -0,0 +1,72 @@
+package shared
+
+import (
+ "testing"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+func Test_isStatefilterEmpty(t *testing.T) {
+ filterSet := []string{"a"}
+ boolValue := false
+
+ tests := []struct {
+ name string
+ filter *gomatrixserverlib.StateFilter
+ want bool
+ }{
+ {
+ name: "nil filter is empty",
+ filter: nil,
+ want: true,
+ },
+ {
+ name: "Empty filter is empty",
+ filter: &gomatrixserverlib.StateFilter{},
+ want: true,
+ },
+ {
+ name: "NotTypes is set",
+ filter: &gomatrixserverlib.StateFilter{
+ NotTypes: &filterSet,
+ },
+ },
+ {
+ name: "Types is set",
+ filter: &gomatrixserverlib.StateFilter{
+ Types: &filterSet,
+ },
+ },
+ {
+ name: "Senders is set",
+ filter: &gomatrixserverlib.StateFilter{
+ Senders: &filterSet,
+ },
+ },
+ {
+ name: "NotSenders is set",
+ filter: &gomatrixserverlib.StateFilter{
+ NotSenders: &filterSet,
+ },
+ },
+ {
+ name: "NotRooms is set",
+ filter: &gomatrixserverlib.StateFilter{
+ NotRooms: &filterSet,
+ },
+ },
+ {
+ name: "ContainsURL is set",
+ filter: &gomatrixserverlib.StateFilter{
+ ContainsURL: &boolValue,
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := isStatefilterEmpty(tt.filter); got != tt.want {
+ t.Errorf("isStatefilterEmpty() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go
index d8967113..de0e72db 100644
--- a/syncapi/storage/sqlite3/account_data_table.go
+++ b/syncapi/storage/sqlite3/account_data_table.go
@@ -105,7 +105,9 @@ func (s *accountDataStatements) SelectAccountDataInRange(
filter.Senders, filter.NotSenders,
filter.Types, filter.NotTypes,
[]string{}, nil, filter.Limit, FilterOrderAsc)
-
+ if err != nil {
+ return
+ }
rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index 6bc1b267..35b746c5 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -267,6 +267,15 @@ func (s *currentRoomStateStatements) SelectCurrentState(
stateFilter *gomatrixserverlib.StateFilter,
excludeEventIDs []string,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
+ // We're going to query members later, so remove them from this request
+ if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers {
+ notTypes := &[]string{gomatrixserverlib.MRoomMember}
+ if stateFilter.NotTypes != nil {
+ *stateFilter.NotTypes = append(*stateFilter.NotTypes, gomatrixserverlib.MRoomMember)
+ } else {
+ stateFilter.NotTypes = notTypes
+ }
+ }
stmt, params, err := prepareWithFilters(
s.db, txn, selectCurrentStateSQL,
[]interface{}{
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index db708c08..23bc68a4 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -368,9 +368,9 @@ func (s *outputRoomEventsStatements) InsertEvent(
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
+ roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool,
-) ([]types.StreamEvent, bool, error) {
+) (map[string]types.RecentEvents, error) {
var query string
if onlySyncEvents {
query = selectRecentEventsForSyncSQL
@@ -378,49 +378,55 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
query = selectRecentEventsSQL
}
- stmt, params, err := prepareWithFilters(
- s.db, txn, query,
- []interface{}{
- roomID, r.Low(), r.High(),
- },
- eventFilter.Senders, eventFilter.NotSenders,
- eventFilter.Types, eventFilter.NotTypes,
- nil, eventFilter.ContainsURL, eventFilter.Limit+1, FilterOrderDesc,
- )
- if err != nil {
- return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
- }
- defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed")
+ result := make(map[string]types.RecentEvents, len(roomIDs))
+ for _, roomID := range roomIDs {
+ stmt, params, err := prepareWithFilters(
+ s.db, txn, query,
+ []interface{}{
+ roomID, r.Low(), r.High(),
+ },
+ eventFilter.Senders, eventFilter.NotSenders,
+ eventFilter.Types, eventFilter.NotTypes,
+ nil, eventFilter.ContainsURL, eventFilter.Limit+1, FilterOrderDesc,
+ )
+ if err != nil {
+ return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
+ }
+ defer internal.CloseAndLogIfError(ctx, stmt, "selectRecentEvents: stmt.close() failed")
- rows, err := stmt.QueryContext(ctx, params...)
- if err != nil {
- return nil, false, err
- }
- defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
- events, err := rowsToStreamEvents(rows)
- if err != nil {
- return nil, false, err
- }
- if chronologicalOrder {
- // The events need to be returned from oldest to latest, which isn't
- // necessary the way the SQL query returns them, so a sort is necessary to
- // ensure the events are in the right order in the slice.
- sort.SliceStable(events, func(i int, j int) bool {
- return events[i].StreamPosition < events[j].StreamPosition
- })
- }
- // we queried for 1 more than the limit, so if we returned one more mark limited=true
- limited := false
- if len(events) > eventFilter.Limit {
- limited = true
- // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
+ rows, err := stmt.QueryContext(ctx, params...)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
+ events, err := rowsToStreamEvents(rows)
+ if err != nil {
+ return nil, err
+ }
if chronologicalOrder {
- events = events[1:]
- } else {
- events = events[:len(events)-1]
+ // The events need to be returned from oldest to latest, which isn't
+ // necessary the way the SQL query returns them, so a sort is necessary to
+ // ensure the events are in the right order in the slice.
+ sort.SliceStable(events, func(i int, j int) bool {
+ return events[i].StreamPosition < events[j].StreamPosition
+ })
}
+ res := types.RecentEvents{}
+ // we queried for 1 more than the limit, so if we returned one more mark limited=true
+ if len(events) > eventFilter.Limit {
+ res.Limited = true
+ // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
+ if chronologicalOrder {
+ events = events[1:]
+ } else {
+ events = events[:len(events)-1]
+ }
+ }
+ res.Events = events
+ result[roomID] = res
}
- return events, limited, nil
+
+ return result, nil
}
func (s *outputRoomEventsStatements) SelectEarlyEvents(
diff --git a/syncapi/storage/storage_test.go b/syncapi/storage/storage_test.go
index e65367d8..05d498bc 100644
--- a/syncapi/storage/storage_test.go
+++ b/syncapi/storage/storage_test.go
@@ -156,12 +156,12 @@ func TestRecentEventsPDU(t *testing.T) {
tc := testCases[i]
t.Run(tc.Name, func(st *testing.T) {
var filter gomatrixserverlib.RoomEventFilter
- var gotEvents []types.StreamEvent
+ var gotEvents map[string]types.RecentEvents
var limited bool
filter.Limit = tc.Limit
WithSnapshot(t, db, func(snapshot storage.DatabaseTransaction) {
var err error
- gotEvents, limited, err = snapshot.RecentEvents(ctx, r.ID, types.Range{
+ gotEvents, err = snapshot.RecentEvents(ctx, []string{r.ID}, types.Range{
From: tc.From,
To: tc.To,
}, &filter, !tc.ReverseOrder, true)
@@ -169,15 +169,18 @@ func TestRecentEventsPDU(t *testing.T) {
st.Fatalf("failed to do sync: %s", err)
}
})
+ streamEvents := gotEvents[r.ID]
+ limited = streamEvents.Limited
if limited != tc.WantLimited {
st.Errorf("got limited=%v want %v", limited, tc.WantLimited)
}
- if len(gotEvents) != len(tc.WantEvents) {
+ if len(streamEvents.Events) != len(tc.WantEvents) {
st.Errorf("got %d events, want %d", len(gotEvents), len(tc.WantEvents))
}
- for j := range gotEvents {
- if !reflect.DeepEqual(gotEvents[j].JSON(), tc.WantEvents[j].JSON()) {
- st.Errorf("event %d got %s want %s", j, string(gotEvents[j].JSON()), string(tc.WantEvents[j].JSON()))
+
+ for j := range streamEvents.Events {
+ if !reflect.DeepEqual(streamEvents.Events[j].JSON(), tc.WantEvents[j].JSON()) {
+ st.Errorf("event %d got %s want %s", j, string(streamEvents.Events[j].JSON()), string(tc.WantEvents[j].JSON()))
}
}
})
@@ -923,3 +926,61 @@ func TestRoomSummary(t *testing.T) {
}
})
}
+
+func TestRecentEvents(t *testing.T) {
+ alice := test.NewUser(t)
+ room1 := test.NewRoom(t, alice)
+ room2 := test.NewRoom(t, alice)
+ roomIDs := []string{room1.ID, room2.ID}
+ rooms := map[string]*test.Room{
+ room1.ID: room1,
+ room2.ID: room2,
+ }
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ filter := gomatrixserverlib.DefaultRoomEventFilter()
+ db, close, closeBase := MustCreateDatabase(t, dbType)
+ t.Cleanup(func() {
+ close()
+ closeBase()
+ })
+
+ MustWriteEvents(t, db, room1.Events())
+ MustWriteEvents(t, db, room2.Events())
+
+ transaction, err := db.NewDatabaseTransaction(ctx)
+ assert.NoError(t, err)
+ defer transaction.Rollback()
+
+ // get all recent events from 0 to 100 (we only created 5 events, so we should get 5 back)
+ roomEvs, err := transaction.RecentEvents(ctx, roomIDs, types.Range{From: 0, To: 100}, &filter, true, true)
+ assert.NoError(t, err)
+ assert.Equal(t, len(roomEvs), 2, "unexpected recent events response")
+ for _, recentEvents := range roomEvs {
+ assert.Equal(t, 5, len(recentEvents.Events), "unexpected recent events for room")
+ }
+
+ // update the filter to only return one event
+ filter.Limit = 1
+ roomEvs, err = transaction.RecentEvents(ctx, roomIDs, types.Range{From: 0, To: 100}, &filter, true, true)
+ assert.NoError(t, err)
+ assert.Equal(t, len(roomEvs), 2, "unexpected recent events response")
+ for roomID, recentEvents := range roomEvs {
+ origEvents := rooms[roomID].Events()
+ assert.Equal(t, true, recentEvents.Limited, "expected events to be limited")
+ assert.Equal(t, 1, len(recentEvents.Events), "unexpected recent events for room")
+ assert.Equal(t, origEvents[len(origEvents)-1].EventID(), recentEvents.Events[0].EventID())
+ }
+
+ // not chronologically ordered still returns the events in order (given ORDER BY id DESC)
+ roomEvs, err = transaction.RecentEvents(ctx, roomIDs, types.Range{From: 0, To: 100}, &filter, false, true)
+ assert.NoError(t, err)
+ assert.Equal(t, len(roomEvs), 2, "unexpected recent events response")
+ for roomID, recentEvents := range roomEvs {
+ origEvents := rooms[roomID].Events()
+ assert.Equal(t, true, recentEvents.Limited, "expected events to be limited")
+ assert.Equal(t, 1, len(recentEvents.Events), "unexpected recent events for room")
+ assert.Equal(t, origEvents[len(origEvents)-1].EventID(), recentEvents.Events[0].EventID())
+ }
+ })
+}
diff --git a/syncapi/storage/tables/current_room_state_test.go b/syncapi/storage/tables/current_room_state_test.go
index 23287c50..c7af4f97 100644
--- a/syncapi/storage/tables/current_room_state_test.go
+++ b/syncapi/storage/tables/current_room_state_test.go
@@ -13,6 +13,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/gomatrixserverlib"
)
func newCurrentRoomStateTable(t *testing.T, dbType test.DBType) (tables.CurrentRoomState, *sql.DB, func()) {
@@ -79,6 +80,9 @@ func TestCurrentRoomStateTable(t *testing.T) {
return fmt.Errorf("SelectEventsWithEventIDs\nexpected id %q not returned", id)
}
}
+
+ testCurrentState(t, ctx, txn, tab, room)
+
return nil
})
if err != nil {
@@ -86,3 +90,39 @@ func TestCurrentRoomStateTable(t *testing.T) {
}
})
}
+
+func testCurrentState(t *testing.T, ctx context.Context, txn *sql.Tx, tab tables.CurrentRoomState, room *test.Room) {
+ t.Run("test currentState", func(t *testing.T) {
+ // returns the complete state of the room with a default filter
+ filter := gomatrixserverlib.DefaultStateFilter()
+ evs, err := tab.SelectCurrentState(ctx, txn, room.ID, &filter, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ expectCount := 5
+ if gotCount := len(evs); gotCount != expectCount {
+ t.Fatalf("expected %d state events, got %d", expectCount, gotCount)
+ }
+ // When lazy loading, we expect no membership event, so only 4 events
+ filter.LazyLoadMembers = true
+ expectCount = 4
+ evs, err = tab.SelectCurrentState(ctx, txn, room.ID, &filter, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if gotCount := len(evs); gotCount != expectCount {
+ t.Fatalf("expected %d state events, got %d", expectCount, gotCount)
+ }
+ // same as above, but with existing NotTypes defined
+ notTypes := []string{gomatrixserverlib.MRoomMember}
+ filter.NotTypes = &notTypes
+ evs, err = tab.SelectCurrentState(ctx, txn, room.ID, &filter, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if gotCount := len(evs); gotCount != expectCount {
+ t.Fatalf("expected %d state events, got %d", expectCount, gotCount)
+ }
+ })
+
+}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 145e197c..727d6bf2 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -66,7 +66,7 @@ type Events interface {
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
- SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
+ SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
// SelectEarlyEvents returns the earliest events in the given room.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *gomatrixserverlib.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 44013e37..6af25c02 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -82,19 +82,24 @@ func (p *PDUStreamProvider) CompleteSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
- // Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars
- // TODO: This might be inefficient, when joined to many and/or large rooms.
+ recentEvents, err := snapshot.RecentEvents(ctx, joinedRoomIDs, r, &eventFilter, true, true)
+ if err != nil {
+ return from
+ }
+ // Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
+ events := recentEvents[roomID]
+ // Invalidate the lazyLoadCache, otherwise we end up with missing displaynames/avatars
+ // TODO: This might be inefficient, when joined to many and/or large rooms.
joinedUsers := p.notifier.JoinedUsers(roomID)
for _, sharedUser := range joinedUsers {
p.lazyLoadCache.InvalidateLazyLoadedUser(req.Device, roomID, sharedUser)
}
- }
- // Build up a /sync response. Add joined rooms.
- for _, roomID := range joinedRoomIDs {
+ // get the join response for each room
jr, jerr := p.getJoinResponseForCompleteSync(
- ctx, snapshot, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, false,
+ ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, false,
+ events.Events, events.Limited,
)
if jerr != nil {
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
@@ -113,11 +118,25 @@ func (p *PDUStreamProvider) CompleteSync(
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
return from
}
- for _, peek := range peeks {
- if !peek.Deleted {
+ if len(peeks) > 0 {
+ peekRooms := make([]string, 0, len(peeks))
+ for _, peek := range peeks {
+ if !peek.Deleted {
+ peekRooms = append(peekRooms, peek.RoomID)
+ }
+ }
+
+ recentEvents, err = snapshot.RecentEvents(ctx, peekRooms, r, &eventFilter, true, true)
+ if err != nil {
+ return from
+ }
+
+ for _, roomID := range peekRooms {
var jr *types.JoinResponse
+ events := recentEvents[roomID]
jr, err = p.getJoinResponseForCompleteSync(
- ctx, snapshot, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, true,
+ ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, true,
+ events.Events, events.Limited,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@@ -126,7 +145,7 @@ func (p *PDUStreamProvider) CompleteSync(
}
continue
}
- req.Response.Rooms.Peek[peek.RoomID] = jr
+ req.Response.Rooms.Peek[roomID] = jr
}
}
@@ -227,7 +246,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
stateFilter *gomatrixserverlib.StateFilter,
req *types.SyncRequest,
) (types.StreamPosition, error) {
-
+ var err error
originalLimit := eventFilter.Limit
// If we're going backwards, grep at least X events, this is mostly to satisfy Sytest
if r.Backwards && originalLimit < recentEventBackwardsLimit {
@@ -238,8 +257,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
- recentStreamEvents, limited, err := snapshot.RecentEvents(
- ctx, delta.RoomID, r,
+ dbEvents, err := snapshot.RecentEvents(
+ ctx, []string{delta.RoomID}, r,
eventFilter, true, true,
)
if err != nil {
@@ -248,6 +267,10 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
return r.From, fmt.Errorf("p.DB.RecentEvents: %w", err)
}
+
+ recentStreamEvents := dbEvents[delta.RoomID].Events
+ limited := dbEvents[delta.RoomID].Limited
+
recentEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
snapshot.StreamEventsToEvents(device, recentStreamEvents),
gomatrixserverlib.TopologicalOrderByPrevEvents,
@@ -420,7 +443,7 @@ func applyHistoryVisibilityFilter(
"room_id": roomID,
"before": len(recentEvents),
"after": len(events),
- }).Trace("Applied history visibility (sync)")
+ }).Debugf("Applied history visibility (sync)")
return events, nil
}
@@ -428,25 +451,16 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
snapshot storage.DatabaseTransaction,
roomID string,
- r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
- eventFilter *gomatrixserverlib.RoomEventFilter,
wantFullState bool,
device *userapi.Device,
isPeek bool,
+ recentStreamEvents []types.StreamEvent,
+ limited bool,
) (jr *types.JoinResponse, err error) {
jr = types.NewJoinResponse()
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
- recentStreamEvents, limited, err := snapshot.RecentEvents(
- ctx, roomID, r, eventFilter, true, true,
- )
- if err != nil {
- if err == sql.ErrNoRows {
- return jr, nil
- }
- return
- }
// Work our way through the timeline events and pick out the event IDs
// of any state events that appear in the timeline. We'll specifically
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index 666a872f..643c3026 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -10,6 +10,7 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/syncapi/routing"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
@@ -448,6 +449,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", fmt.Sprintf("/_matrix/client/v3/rooms/%s/messages", room.ID), test.WithQueryParams(map[string]string{
"access_token": bobDev.AccessToken,
"dir": "b",
+ "filter": `{"lazy_load_members":true}`, // check that lazy loading doesn't break history visibility
})))
if w.Code != 200 {
t.Logf("%s", w.Body.String())
@@ -905,6 +907,177 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
}
}
+func TestContext(t *testing.T) {
+ test.WithAllDatabases(t, testContext)
+}
+
+func testContext(t *testing.T, dbType test.DBType) {
+
+ tests := []struct {
+ name string
+ roomID string
+ eventID string
+ params map[string]string
+ wantError bool
+ wantStateLength int
+ wantBeforeLength int
+ wantAfterLength int
+ }{
+ {
+ name: "invalid filter",
+ params: map[string]string{
+ "filter": "{",
+ },
+ wantError: true,
+ },
+ {
+ name: "invalid limit",
+ params: map[string]string{
+ "limit": "abc",
+ },
+ wantError: true,
+ },
+ {
+ name: "high limit",
+ params: map[string]string{
+ "limit": "100000",
+ },
+ },
+ {
+ name: "fine limit",
+ params: map[string]string{
+ "limit": "10",
+ },
+ },
+ {
+ name: "last event without lazy loading",
+ wantStateLength: 5,
+ },
+ {
+ name: "last event with lazy loading",
+ params: map[string]string{
+ "filter": `{"lazy_load_members":true}`,
+ },
+ wantStateLength: 1,
+ },
+ {
+ name: "invalid room",
+ roomID: "!doesnotexist",
+ wantError: true,
+ },
+ {
+ name: "invalid eventID",
+ eventID: "$doesnotexist",
+ wantError: true,
+ },
+ {
+ name: "state is limited",
+ params: map[string]string{
+ "limit": "1",
+ },
+ wantStateLength: 1,
+ },
+ {
+ name: "events are not limited",
+ wantBeforeLength: 7,
+ },
+ {
+ name: "all events are limited",
+ params: map[string]string{
+ "limit": "1",
+ },
+ wantStateLength: 1,
+ wantBeforeLength: 1,
+ wantAfterLength: 1,
+ },
+ }
+
+ user := test.NewUser(t)
+ alice := userapi.Device{
+ ID: "ALICEID",
+ UserID: user.ID,
+ AccessToken: "ALICE_BEARER_TOKEN",
+ DisplayName: "Alice",
+ AccountType: userapi.AccountTypeUser,
+ }
+
+ base, baseClose := testrig.CreateBaseDendrite(t, dbType)
+ defer baseClose()
+
+ // Use an actual roomserver for this
+ rsAPI := roomserver.NewInternalAPI(base)
+ rsAPI.SetFederationAPI(nil, nil)
+
+ AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{alice}}, rsAPI, &syncKeyAPI{})
+
+ room := test.NewRoom(t, user)
+
+ room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world 1!"})
+ room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world 2!"})
+ thirdMsg := room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world3!"})
+ room.CreateAndInsert(t, user, "m.room.message", map[string]interface{}{"body": "hello world4!"})
+
+ if err := api.SendEvents(context.Background(), rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
+ t.Fatalf("failed to send events: %v", err)
+ }
+
+ jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream)
+
+ syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool {
+ // wait for the last sent eventID to come down sync
+ path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, thirdMsg.EventID())
+ return gjson.Get(syncBody, path).Exists()
+ })
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ params := map[string]string{
+ "access_token": alice.AccessToken,
+ }
+ w := httptest.NewRecorder()
+ // test overrides
+ roomID := room.ID
+ if tc.roomID != "" {
+ roomID = tc.roomID
+ }
+ eventID := thirdMsg.EventID()
+ if tc.eventID != "" {
+ eventID = tc.eventID
+ }
+ requestPath := fmt.Sprintf("/_matrix/client/v3/rooms/%s/context/%s", roomID, eventID)
+ if tc.params != nil {
+ for k, v := range tc.params {
+ params[k] = v
+ }
+ }
+ base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", requestPath, test.WithQueryParams(params)))
+
+ if tc.wantError && w.Code == 200 {
+ t.Fatalf("Expected an error, but got none")
+ }
+ t.Log(w.Body.String())
+ resp := routing.ContextRespsonse{}
+ if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
+ t.Fatal(err)
+ }
+ if tc.wantStateLength > 0 && tc.wantStateLength != len(resp.State) {
+ t.Fatalf("expected %d state events, got %d", tc.wantStateLength, len(resp.State))
+ }
+ if tc.wantBeforeLength > 0 && tc.wantBeforeLength != len(resp.EventsBefore) {
+ t.Fatalf("expected %d before events, got %d", tc.wantBeforeLength, len(resp.EventsBefore))
+ }
+ if tc.wantAfterLength > 0 && tc.wantAfterLength != len(resp.EventsAfter) {
+ t.Fatalf("expected %d after events, got %d", tc.wantAfterLength, len(resp.EventsAfter))
+ }
+
+ if !tc.wantError && resp.Event.EventID != eventID {
+ t.Fatalf("unexpected eventID %s, expected %s", resp.Event.EventID, eventID)
+ }
+ })
+ }
+}
+
func syncUntil(t *testing.T,
base *base.BaseDendrite, accessToken string,
skip bool,
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 9fbadc06..6495dd53 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -63,6 +63,11 @@ type StreamEvent struct {
ExcludeFromSync bool
}
+type RecentEvents struct {
+ Limited bool
+ Events []StreamEvent
+}
+
// Range represents a range between two stream positions.
type Range struct {
// From is the position the client has already received.