aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-19 18:00:42 +0000
committerGitHub <noreply@github.com>2021-01-19 18:00:42 +0000
commitb70238f2d5579876d834ec393f178161599a2fa7 (patch)
tree7b39deefc186142a9a95142c9fe6331b4fa4a32f /syncapi
parent80aa9aa8b053655683cbdae1aeccb083166bc714 (diff)
Basic sync filtering (#1721)
* Add some filtering (postgres only for now) * Fix build error * Try to use request filter * Use default filter as a template when retrieving from the database * Remove unused strut * Update sytest-whitelist * Add filtering to SelectEarlyEvents * Fix Postgres selectEarlyEvents query * Attempt filtering on SQLite * Test limit, set field for limit/order in prepareWithFilters * Remove debug logging, add comments * Tweaks, debug logging * Separate SQLite stream IDs * Fix filtering in current state table * Fix lock issues * More tweaks * Current state requires room ID * Review comments
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/notifier/notifier_test.go1
-rw-r--r--syncapi/routing/messages.go5
-rw-r--r--syncapi/storage/interface.go4
-rw-r--r--syncapi/storage/postgres/filter_table.go2
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go42
-rw-r--r--syncapi/storage/shared/syncserver.go10
-rw-r--r--syncapi/storage/sqlite3/account_data_table.go2
-rw-r--r--syncapi/storage/sqlite3/current_room_state_table.go37
-rw-r--r--syncapi/storage/sqlite3/filter_table.go2
-rw-r--r--syncapi/storage/sqlite3/filtering.go76
-rw-r--r--syncapi/storage/sqlite3/invites_table.go4
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go148
-rw-r--r--syncapi/storage/sqlite3/peeks_table.go6
-rw-r--r--syncapi/storage/sqlite3/stream_id_table.go26
-rw-r--r--syncapi/storage/tables/interface.go4
-rw-r--r--syncapi/streams/stream_pdu.go22
-rw-r--r--syncapi/sync/request.go38
-rw-r--r--syncapi/types/provider.go3
18 files changed, 273 insertions, 159 deletions
diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go
index 8b9425e3..1401fc67 100644
--- a/syncapi/notifier/notifier_test.go
+++ b/syncapi/notifier/notifier_test.go
@@ -367,7 +367,6 @@ func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) typ
Timeout: 1 * time.Minute,
Since: since,
WantFullState: false,
- Limit: 20,
Log: util.GetLogger(context.TODO()),
Context: context.TODO(),
}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index e294c880..ba739148 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -235,12 +235,15 @@ func (r *messagesReq) retrieveEvents() (
clientEvents []gomatrixserverlib.ClientEvent, start,
end types.TopologyToken, err error,
) {
+ eventFilter := gomatrixserverlib.DefaultRoomEventFilter()
+ eventFilter.Limit = r.limit
+
// Retrieve the events from the local database.
var streamEvents []types.StreamEvent
if r.fromStream != nil {
toStream := r.to.StreamToken()
streamEvents, err = r.db.GetEventsInStreamingRange(
- r.ctx, r.fromStream, &toStream, r.roomID, r.limit, r.backwardOrdering,
+ r.ctx, r.fromStream, &toStream, r.roomID, &eventFilter, r.backwardOrdering,
)
} else {
streamEvents, err = r.db.GetEventsInTopologicalRange(
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index a51ab4e0..22d80161 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -40,7 +40,7 @@ type Database interface {
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
- RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
+ RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error)
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
@@ -105,7 +105,7 @@ type Database interface {
// Returns an error if there was a problem communicating with the database.
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit.
- GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
+ GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error)
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// EventPositionInTopology returns the depth and stream position of the given event.
diff --git a/syncapi/storage/postgres/filter_table.go b/syncapi/storage/postgres/filter_table.go
index beeb864b..dfd3d696 100644
--- a/syncapi/storage/postgres/filter_table.go
+++ b/syncapi/storage/postgres/filter_table.go
@@ -83,7 +83,7 @@ func (s *filterStatements) SelectFilter(
}
// Unmarshal JSON into Filter struct
- var filter gomatrixserverlib.Filter
+ filter := gomatrixserverlib.DefaultFilter()
if err = json.Unmarshal(filterData, &filter); err != nil {
return nil, err
}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index f4bbebd2..28668de0 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -84,17 +84,29 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
- " ORDER BY id DESC LIMIT $4"
+ " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
+ " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
+ " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
+ " ORDER BY id DESC LIMIT $8"
const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
- " ORDER BY id DESC LIMIT $4"
+ " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
+ " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
+ " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
+ " ORDER BY id DESC LIMIT $8"
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
- " ORDER BY id ASC LIMIT $4"
+ " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
+ " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
+ " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
+ " ORDER BY id ASC LIMIT $8"
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
@@ -322,7 +334,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
// from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, limit int,
+ roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt
@@ -331,7 +343,14 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
} else {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
}
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
+ rows, err := stmt.QueryContext(
+ ctx, roomID, r.Low(), r.High(),
+ pq.StringArray(eventFilter.Senders),
+ pq.StringArray(eventFilter.NotSenders),
+ pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
+ eventFilter.Limit+1,
+ )
if err != nil {
return nil, false, err
}
@@ -350,7 +369,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
}
// we queried for 1 more than the limit, so if we returned one more mark limited=true
limited := false
- if len(events) > limit {
+ if len(events) > eventFilter.Limit {
limited = true
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
if chronologicalOrder {
@@ -367,10 +386,17 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
// from a given position, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, limit int,
+ roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
) ([]types.StreamEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
+ rows, err := stmt.QueryContext(
+ ctx, roomID, r.Low(), r.High(),
+ pq.StringArray(eventFilter.Senders),
+ pq.StringArray(eventFilter.NotSenders),
+ pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
+ eventFilter.Limit,
+ )
if err != nil {
return nil, err
}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 5b06aabc..9df07693 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -110,8 +110,8 @@ func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, mem
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership)
}
-func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
- return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, limit, chronologicalOrder, onlySyncEvents)
+func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
+ return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
}
func (d *Database) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
@@ -151,7 +151,7 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse
func (d *Database) GetEventsInStreamingRange(
ctx context.Context,
from, to *types.StreamingToken,
- roomID string, limit int,
+ roomID string, eventFilter *gomatrixserverlib.RoomEventFilter,
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
r := types.Range{
@@ -162,14 +162,14 @@ func (d *Database) GetEventsInStreamingRange(
if backwardOrdering {
// When using backward ordering, we want the most recent events first.
if events, _, err = d.OutputEvents.SelectRecentEvents(
- ctx, nil, roomID, r, limit, false, false,
+ ctx, nil, roomID, r, eventFilter, false, false,
); err != nil {
return
}
} else {
// When using forward ordering, we want the least recent events first.
if events, err = d.OutputEvents.SelectEarlyEvents(
- ctx, nil, roomID, r, limit,
+ ctx, nil, roomID, r, eventFilter,
); err != nil {
return
}
diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go
index 4bcc06ed..1c65cb6a 100644
--- a/syncapi/storage/sqlite3/account_data_table.go
+++ b/syncapi/storage/sqlite3/account_data_table.go
@@ -82,7 +82,7 @@ func (s *accountDataStatements) InsertAccountData(
ctx context.Context, txn *sql.Tx,
userID, roomID, dataType string,
) (pos types.StreamPosition, err error) {
- pos, err = s.streamIDStatements.nextStreamID(ctx, txn)
+ pos, err = s.streamIDStatements.nextAccountDataID(ctx, txn)
if err != nil {
return
}
diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go
index ac659057..55ed27a4 100644
--- a/syncapi/storage/sqlite3/current_room_state_table.go
+++ b/syncapi/storage/sqlite3/current_room_state_table.go
@@ -19,6 +19,7 @@ import (
"context"
"database/sql"
"encoding/json"
+ "fmt"
"strings"
"github.com/matrix-org/dendrite/internal"
@@ -66,13 +67,8 @@ const selectRoomIDsWithMembershipSQL = "" +
"SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
const selectCurrentStateSQL = "" +
- "SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" +
- " AND ( $2 IS NULL OR sender IN ($2) )" +
- " AND ( $3 IS NULL OR NOT(sender IN ($3)) )" +
- " AND ( $4 IS NULL OR type IN ($4) )" +
- " AND ( $5 IS NULL OR NOT(type IN ($5)) )" +
- " AND ( $6 IS NULL OR contains_url = $6 )" +
- " LIMIT $7"
+ "SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1"
+ // WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
const selectJoinedUsersSQL = "" +
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
@@ -95,7 +91,6 @@ type currentRoomStateStatements struct {
deleteRoomStateByEventIDStmt *sql.Stmt
DeleteRoomStateForRoomStmt *sql.Stmt
selectRoomIDsWithMembershipStmt *sql.Stmt
- selectCurrentStateStmt *sql.Stmt
selectJoinedUsersStmt *sql.Stmt
selectStateEventStmt *sql.Stmt
}
@@ -121,9 +116,6 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (t
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
return nil, err
}
- if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
- return nil, err
- }
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
return nil, err
}
@@ -185,18 +177,23 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
// CurrentState returns all the current state events for the given room.
func (s *currentRoomStateStatements) SelectCurrentState(
ctx context.Context, txn *sql.Tx, roomID string,
- stateFilterPart *gomatrixserverlib.StateFilter,
+ stateFilter *gomatrixserverlib.StateFilter,
) ([]*gomatrixserverlib.HeaderedEvent, error) {
- stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
- rows, err := stmt.QueryContext(ctx, roomID,
- nil, // FIXME: pq.StringArray(stateFilterPart.Senders),
- nil, // FIXME: pq.StringArray(stateFilterPart.NotSenders),
- nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
- nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
- stateFilterPart.ContainsURL,
- stateFilterPart.Limit,
+ stmt, params, err := prepareWithFilters(
+ s.db, txn, selectCurrentStateSQL,
+ []interface{}{
+ roomID,
+ },
+ stateFilter.Senders, stateFilter.NotSenders,
+ stateFilter.Types, stateFilter.NotTypes,
+ stateFilter.Limit, FilterOrderNone,
)
if err != nil {
+ return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
+ }
+
+ rows, err := stmt.QueryContext(ctx, params...)
+ if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectCurrentState: rows.close() failed")
diff --git a/syncapi/storage/sqlite3/filter_table.go b/syncapi/storage/sqlite3/filter_table.go
index 3092bcd7..0cfebef2 100644
--- a/syncapi/storage/sqlite3/filter_table.go
+++ b/syncapi/storage/sqlite3/filter_table.go
@@ -87,7 +87,7 @@ func (s *filterStatements) SelectFilter(
}
// Unmarshal JSON into Filter struct
- var filter gomatrixserverlib.Filter
+ filter := gomatrixserverlib.DefaultFilter()
if err = json.Unmarshal(filterData, &filter); err != nil {
return nil, err
}
diff --git a/syncapi/storage/sqlite3/filtering.go b/syncapi/storage/sqlite3/filtering.go
new file mode 100644
index 00000000..0faf5297
--- /dev/null
+++ b/syncapi/storage/sqlite3/filtering.go
@@ -0,0 +1,76 @@
+package sqlite3
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+)
+
+type FilterOrder int
+
+const (
+ FilterOrderNone = iota
+ FilterOrderAsc
+ FilterOrderDesc
+)
+
+// prepareWithFilters returns a prepared statement with the
+// relevant filters included. It also includes an []interface{}
+// list of all the relevant parameters to pass straight to
+// QueryContext, QueryRowContext etc.
+// We don't take the filter object directly here because the
+// fields might come from either a StateFilter or an EventFilter,
+// and it's easier just to have the caller extract the relevant
+// parts.
+func prepareWithFilters(
+ db *sql.DB, txn *sql.Tx, query string, params []interface{},
+ senders, notsenders, types, nottypes []string,
+ limit int, order FilterOrder,
+) (*sql.Stmt, []interface{}, error) {
+ offset := len(params)
+ if count := len(senders); count > 0 {
+ query += " AND sender IN " + sqlutil.QueryVariadicOffset(count, offset)
+ for _, v := range senders {
+ params, offset = append(params, v), offset+1
+ }
+ }
+ if count := len(notsenders); count > 0 {
+ query += " AND sender NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
+ for _, v := range notsenders {
+ params, offset = append(params, v), offset+1
+ }
+ }
+ if count := len(types); count > 0 {
+ query += " AND type IN " + sqlutil.QueryVariadicOffset(count, offset)
+ for _, v := range types {
+ params, offset = append(params, v), offset+1
+ }
+ }
+ if count := len(nottypes); count > 0 {
+ query += " AND type NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
+ for _, v := range nottypes {
+ params, offset = append(params, v), offset+1
+ }
+ }
+ switch order {
+ case FilterOrderAsc:
+ query += " ORDER BY id ASC"
+ case FilterOrderDesc:
+ query += " ORDER BY id DESC"
+ }
+ query += fmt.Sprintf(" LIMIT $%d", offset+1)
+ params = append(params, limit)
+
+ var stmt *sql.Stmt
+ var err error
+ if txn != nil {
+ stmt, err = txn.Prepare(query)
+ } else {
+ stmt, err = db.Prepare(query)
+ }
+ if err != nil {
+ return nil, nil, fmt.Errorf("s.db.Prepare: %w", err)
+ }
+ return stmt, params, nil
+}
diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go
index f9dcfdbc..7498fd68 100644
--- a/syncapi/storage/sqlite3/invites_table.go
+++ b/syncapi/storage/sqlite3/invites_table.go
@@ -93,7 +93,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *streamIDStatements) (tables.Inv
func (s *inviteEventsStatements) InsertInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
) (streamPos types.StreamPosition, err error) {
- streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
+ streamPos, err = s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil {
return
}
@@ -119,7 +119,7 @@ func (s *inviteEventsStatements) InsertInviteEvent(
func (s *inviteEventsStatements) DeleteInviteEvent(
ctx context.Context, txn *sql.Tx, inviteEventID string,
) (types.StreamPosition, error) {
- streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
+ streamPos, err := s.streamIDStatements.nextInviteID(ctx, txn)
if err != nil {
return streamPos, err
}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index edbd36fb..019aba8b 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -19,6 +19,7 @@ import (
"context"
"database/sql"
"encoding/json"
+ "fmt"
"sort"
"github.com/matrix-org/dendrite/internal"
@@ -60,18 +61,18 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
- " WHERE room_id = $1 AND id > $2 AND id <= $3" +
- " ORDER BY id DESC LIMIT $4"
+ " WHERE room_id = $1 AND id > $2 AND id <= $3"
+ // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
- " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
- " ORDER BY id DESC LIMIT $4"
+ " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
+ // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
- " WHERE room_id = $1 AND id > $2 AND id <= $3" +
- " ORDER BY id ASC LIMIT $4"
+ " WHERE room_id = $1 AND id > $2 AND id <= $3"
+ // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
@@ -79,45 +80,24 @@ const selectMaxEventIDSQL = "" +
const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
-// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
-/*
- $1 = oldPos,
- $2 = newPos,
- $3 = pq.StringArray(stateFilterPart.Senders),
- $4 = pq.StringArray(stateFilterPart.NotSenders),
- $5 = pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
- $6 = pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
- $7 = stateFilterPart.ContainsURL,
- $8 = stateFilterPart.Limit,
-*/
const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
- " WHERE (id > $1 AND id <= $2)" + // old/new pos
- " AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
- /* " AND ( $3 IS NULL OR sender IN ($3) )" + // sender
- " AND ( $4 IS NULL OR NOT(sender IN ($4)) )" + // not sender
- " AND ( $5 IS NULL OR type IN ($5) )" + // type
- " AND ( $6 IS NULL OR NOT(type IN ($6)) )" + // not type
- " AND ( $7 IS NULL OR contains_url = $7)" + // contains URL? */
- " ORDER BY id ASC" +
- " LIMIT $8" // limit
+ " WHERE (id > $1 AND id <= $2)" +
+ " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
+ // WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
type outputRoomEventsStatements struct {
- db *sql.DB
- streamIDStatements *streamIDStatements
- insertEventStmt *sql.Stmt
- selectEventsStmt *sql.Stmt
- selectMaxEventIDStmt *sql.Stmt
- selectRecentEventsStmt *sql.Stmt
- selectRecentEventsForSyncStmt *sql.Stmt
- selectEarlyEventsStmt *sql.Stmt
- selectStateInRangeStmt *sql.Stmt
- updateEventJSONStmt *sql.Stmt
- deleteEventsForRoomStmt *sql.Stmt
+ db *sql.DB
+ streamIDStatements *streamIDStatements
+ insertEventStmt *sql.Stmt
+ selectEventsStmt *sql.Stmt
+ selectMaxEventIDStmt *sql.Stmt
+ updateEventJSONStmt *sql.Stmt
+ deleteEventsForRoomStmt *sql.Stmt
}
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@@ -138,18 +118,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
return nil, err
}
- if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
- return nil, err
- }
- if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
- return nil, err
- }
- if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
- return nil, err
- }
- if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
- return nil, err
- }
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
return nil, err
}
@@ -173,20 +141,23 @@ func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) SelectStateInRange(
ctx context.Context, txn *sql.Tx, r types.Range,
- stateFilterPart *gomatrixserverlib.StateFilter,
+ stateFilter *gomatrixserverlib.StateFilter,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
- stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt)
-
- rows, err := stmt.QueryContext(
- ctx, r.Low(), r.High(),
- /*pq.StringArray(stateFilterPart.Senders),
- pq.StringArray(stateFilterPart.NotSenders),
- pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
- pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
- stateFilterPart.ContainsURL,*/
- stateFilterPart.Limit,
+ stmt, params, err := prepareWithFilters(
+ s.db, txn, selectStateInRangeSQL,
+ []interface{}{
+ r.Low(), r.High(),
+ },
+ stateFilter.Senders, stateFilter.NotSenders,
+ stateFilter.Types, stateFilter.NotTypes,
+ stateFilter.Limit, FilterOrderAsc,
)
if err != nil {
+ return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
+ }
+
+ rows, err := stmt.QueryContext(ctx, params...)
+ if err != nil {
return nil, nil, err
}
defer rows.Close() // nolint: errcheck
@@ -298,16 +269,21 @@ func (s *outputRoomEventsStatements) InsertEvent(
return 0, err
}
- addStateJSON, err := json.Marshal(addState)
+ var addStateJSON, removeStateJSON []byte
+ if len(addState) > 0 {
+ addStateJSON, err = json.Marshal(addState)
+ }
if err != nil {
- return 0, err
+ return 0, fmt.Errorf("json.Marshal(addState): %w", err)
+ }
+ if len(removeState) > 0 {
+ removeStateJSON, err = json.Marshal(removeState)
}
- removeStateJSON, err := json.Marshal(removeState)
if err != nil {
- return 0, err
+ return 0, fmt.Errorf("json.Marshal(removeState): %w", err)
}
- streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
+ streamPos, err := s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil {
return 0, err
}
@@ -333,17 +309,30 @@ func (s *outputRoomEventsStatements) InsertEvent(
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, limit int,
+ roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, bool, error) {
- var stmt *sql.Stmt
+ var query string
if onlySyncEvents {
- stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
+ query = selectRecentEventsForSyncSQL
} else {
- stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
+ query = selectRecentEventsSQL
}
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
+ stmt, params, err := prepareWithFilters(
+ s.db, txn, query,
+ []interface{}{
+ roomID, r.Low(), r.High(),
+ },
+ eventFilter.Senders, eventFilter.NotSenders,
+ eventFilter.Types, eventFilter.NotTypes,
+ eventFilter.Limit+1, FilterOrderDesc,
+ )
+ if err != nil {
+ return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
+ }
+
+ rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return nil, false, err
}
@@ -362,7 +351,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
}
// we queried for 1 more than the limit, so if we returned one more mark limited=true
limited := false
- if len(events) > limit {
+ if len(events) > eventFilter.Limit {
limited = true
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
if chronologicalOrder {
@@ -376,10 +365,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, limit int,
+ roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
) ([]types.StreamEvent, error) {
- stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
+ stmt, params, err := prepareWithFilters(
+ s.db, txn, selectEarlyEventsSQL,
+ []interface{}{
+ roomID, r.Low(), r.High(),
+ },
+ eventFilter.Senders, eventFilter.NotSenders,
+ eventFilter.Types, eventFilter.NotTypes,
+ eventFilter.Limit, FilterOrderAsc,
+ )
+ if err != nil {
+ return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
+ }
+ rows, err := stmt.QueryContext(ctx, params...)
if err != nil {
return nil, err
}
diff --git a/syncapi/storage/sqlite3/peeks_table.go b/syncapi/storage/sqlite3/peeks_table.go
index d755e28c..c93c8205 100644
--- a/syncapi/storage/sqlite3/peeks_table.go
+++ b/syncapi/storage/sqlite3/peeks_table.go
@@ -108,7 +108,7 @@ func NewSqlitePeeksTable(db *sql.DB, streamID *streamIDStatements) (tables.Peeks
func (s *peekStatements) InsertPeek(
ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string,
) (streamPos types.StreamPosition, err error) {
- streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
+ streamPos, err = s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil {
return
}
@@ -120,7 +120,7 @@ func (s *peekStatements) InsertPeek(
func (s *peekStatements) DeletePeek(
ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string,
) (streamPos types.StreamPosition, err error) {
- streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
+ streamPos, err = s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil {
return
}
@@ -131,7 +131,7 @@ func (s *peekStatements) DeletePeek(
func (s *peekStatements) DeletePeeks(
ctx context.Context, txn *sql.Tx, roomID, userID string,
) (types.StreamPosition, error) {
- streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
+ streamPos, err := s.streamIDStatements.nextPDUID(ctx, txn)
if err != nil {
return 0, err
}
diff --git a/syncapi/storage/sqlite3/stream_id_table.go b/syncapi/storage/sqlite3/stream_id_table.go
index f73be422..b614271d 100644
--- a/syncapi/storage/sqlite3/stream_id_table.go
+++ b/syncapi/storage/sqlite3/stream_id_table.go
@@ -20,6 +20,10 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("global", 0)
ON CONFLICT DO NOTHING;
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("receipt", 0)
ON CONFLICT DO NOTHING;
+INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0)
+ ON CONFLICT DO NOTHING;
+INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
+ ON CONFLICT DO NOTHING;
`
const increaseStreamIDStmt = "" +
@@ -49,7 +53,7 @@ func (s *streamIDStatements) prepare(db *sql.DB) (err error) {
return
}
-func (s *streamIDStatements) nextStreamID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
+func (s *streamIDStatements) nextPDUID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
if _, err = increaseStmt.ExecContext(ctx, "global"); err != nil {
@@ -68,3 +72,23 @@ func (s *streamIDStatements) nextReceiptID(ctx context.Context, txn *sql.Tx) (po
err = selectStmt.QueryRowContext(ctx, "receipt").Scan(&pos)
return
}
+
+func (s *streamIDStatements) nextInviteID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
+ increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
+ selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
+ if _, err = increaseStmt.ExecContext(ctx, "invite"); err != nil {
+ return
+ }
+ err = selectStmt.QueryRowContext(ctx, "invite").Scan(&pos)
+ return
+}
+
+func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
+ increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
+ selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
+ if _, err = increaseStmt.ExecContext(ctx, "accountdata"); err != nil {
+ return
+ }
+ err = selectStmt.QueryRowContext(ctx, "accountdata").Scan(&pos)
+ return
+}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index fca88824..73967677 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -56,9 +56,9 @@ type Events interface {
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
- SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
+ SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
// SelectEarlyEvents returns the earliest events in the given room.
- SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
+ SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index 483be575..d6d7ff44 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -48,13 +48,14 @@ func (p *PDUStreamProvider) CompleteSync(
return from
}
- stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
+ stateFilter := req.Filter.Room.State
+ eventFilter := req.Filter.Room.Timeline
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
- ctx, roomID, r, &stateFilter, req.Limit, req.Device,
+ ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@@ -74,7 +75,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
- ctx, peek.RoomID, r, &stateFilter, req.Limit, req.Device,
+ ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@@ -104,8 +105,8 @@ func (p *PDUStreamProvider) IncrementalSync(
var stateDeltas []types.StateDelta
var joinedRooms []string
- // TODO: use filter provided in request
- stateFilter := gomatrixserverlib.DefaultStateFilter()
+ stateFilter := req.Filter.Room.State
+ eventFilter := req.Filter.Room.Timeline
if req.WantFullState {
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
@@ -124,7 +125,7 @@ func (p *PDUStreamProvider) IncrementalSync(
}
for _, delta := range stateDeltas {
- if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil {
+ if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
return newPos
}
@@ -138,7 +139,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
device *userapi.Device,
r types.Range,
delta types.StateDelta,
- numRecentEventsPerRoom int,
+ eventFilter *gomatrixserverlib.RoomEventFilter,
res *types.Response,
) error {
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
@@ -152,7 +153,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
recentStreamEvents, limited, err := p.DB.RecentEvents(
ctx, delta.RoomID, r,
- numRecentEventsPerRoom, true, true,
+ eventFilter, true, true,
)
if err != nil {
return err
@@ -209,7 +210,8 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
roomID string,
r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
- numRecentEventsPerRoom int, device *userapi.Device,
+ eventFilter *gomatrixserverlib.RoomEventFilter,
+ device *userapi.Device,
) (jr *types.JoinResponse, err error) {
var stateEvents []*gomatrixserverlib.HeaderedEvent
stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter)
@@ -221,7 +223,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
var recentStreamEvents []types.StreamEvent
var limited bool
recentStreamEvents, limited, err = p.DB.RecentEvents(
- ctx, roomID, r, numRecentEventsPerRoom, true, true,
+ ctx, roomID, r, eventFilter, true, true,
)
if err != nil {
return
diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go
index 5f89ffc3..09a62e3d 100644
--- a/syncapi/sync/request.go
+++ b/syncapi/sync/request.go
@@ -16,6 +16,7 @@ package sync
import (
"encoding/json"
+ "fmt"
"net/http"
"strconv"
"time"
@@ -31,14 +32,6 @@ import (
const defaultSyncTimeout = time.Duration(0)
const DefaultTimelineLimit = 20
-type filter struct {
- Room struct {
- Timeline struct {
- Limit *int `json:"limit"`
- } `json:"timeline"`
- } `json:"room"`
-}
-
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) {
timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state")
@@ -51,41 +44,37 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
return nil, err
}
}
- timelineLimit := DefaultTimelineLimit
// TODO: read from stored filters too
+ filter := gomatrixserverlib.DefaultFilter()
filterQuery := req.URL.Query().Get("filter")
if filterQuery != "" {
if filterQuery[0] == '{' {
- // attempt to parse the timeline limit at least
- var f filter
- err := json.Unmarshal([]byte(filterQuery), &f)
- if err == nil && f.Room.Timeline.Limit != nil {
- timelineLimit = *f.Room.Timeline.Limit
+ // Parse the filter from the query string
+ if err := json.Unmarshal([]byte(filterQuery), &filter); err != nil {
+ return nil, fmt.Errorf("json.Unmarshal: %w", err)
}
} else {
- // attempt to load the filter ID
+ // Try to load the filter from the database
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
- return nil, err
+ return nil, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
- f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery)
- if err == nil {
- timelineLimit = f.Room.Timeline.Limit
+ if f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery); err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("syncDB.GetFilter failed")
+ return nil, fmt.Errorf("syncDB.GetFilter: %w", err)
+ } else {
+ filter = *f
}
}
}
- filter := gomatrixserverlib.DefaultEventFilter()
- filter.Limit = timelineLimit
- // TODO: Additional query params: set_presence, filter
-
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
"user_id": device.UserID,
"device_id": device.ID,
"since": since,
"timeout": timeout,
- "limit": timelineLimit,
+ "limit": filter.Room.Timeline.Limit,
})
return &types.SyncRequest{
@@ -96,7 +85,6 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
Filter: filter, //
Since: since, //
Timeout: timeout, //
- Limit: timelineLimit, //
Rooms: make(map[string]string), // Populated by the PDU stream
WantFullState: wantFullState, //
}, nil
diff --git a/syncapi/types/provider.go b/syncapi/types/provider.go
index 24b453a8..93ed1266 100644
--- a/syncapi/types/provider.go
+++ b/syncapi/types/provider.go
@@ -14,9 +14,8 @@ type SyncRequest struct {
Log *logrus.Entry
Device *userapi.Device
Response *Response
- Filter gomatrixserverlib.EventFilter
+ Filter gomatrixserverlib.Filter
Since StreamingToken
- Limit int
Timeout time.Duration
WantFullState bool