aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-19 18:00:42 +0000
committerGitHub <noreply@github.com>2021-01-19 18:00:42 +0000
commitb70238f2d5579876d834ec393f178161599a2fa7 (patch)
tree7b39deefc186142a9a95142c9fe6331b4fa4a32f /syncapi/storage/postgres
parent80aa9aa8b053655683cbdae1aeccb083166bc714 (diff)
Basic sync filtering (#1721)
* Add some filtering (postgres only for now) * Fix build error * Try to use request filter * Use default filter as a template when retrieving from the database * Remove unused strut * Update sytest-whitelist * Add filtering to SelectEarlyEvents * Fix Postgres selectEarlyEvents query * Attempt filtering on SQLite * Test limit, set field for limit/order in prepareWithFilters * Remove debug logging, add comments * Tweaks, debug logging * Separate SQLite stream IDs * Fix filtering in current state table * Fix lock issues * More tweaks * Current state requires room ID * Review comments
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/filter_table.go2
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go42
2 files changed, 35 insertions, 9 deletions
diff --git a/syncapi/storage/postgres/filter_table.go b/syncapi/storage/postgres/filter_table.go
index beeb864b..dfd3d696 100644
--- a/syncapi/storage/postgres/filter_table.go
+++ b/syncapi/storage/postgres/filter_table.go
@@ -83,7 +83,7 @@ func (s *filterStatements) SelectFilter(
}
// Unmarshal JSON into Filter struct
- var filter gomatrixserverlib.Filter
+ filter := gomatrixserverlib.DefaultFilter()
if err = json.Unmarshal(filterData, &filter); err != nil {
return nil, err
}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index f4bbebd2..28668de0 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -84,17 +84,29 @@ const selectEventsSQL = "" +
const selectRecentEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
- " ORDER BY id DESC LIMIT $4"
+ " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
+ " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
+ " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
+ " ORDER BY id DESC LIMIT $8"
const selectRecentEventsForSyncSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
- " ORDER BY id DESC LIMIT $4"
+ " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
+ " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
+ " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
+ " ORDER BY id DESC LIMIT $8"
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
- " ORDER BY id ASC LIMIT $4"
+ " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
+ " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
+ " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
+ " ORDER BY id ASC LIMIT $8"
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
@@ -322,7 +334,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
// from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, limit int,
+ roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt
@@ -331,7 +343,14 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
} else {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
}
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
+ rows, err := stmt.QueryContext(
+ ctx, roomID, r.Low(), r.High(),
+ pq.StringArray(eventFilter.Senders),
+ pq.StringArray(eventFilter.NotSenders),
+ pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
+ eventFilter.Limit+1,
+ )
if err != nil {
return nil, false, err
}
@@ -350,7 +369,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
}
// we queried for 1 more than the limit, so if we returned one more mark limited=true
limited := false
- if len(events) > limit {
+ if len(events) > eventFilter.Limit {
limited = true
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
if chronologicalOrder {
@@ -367,10 +386,17 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
// from a given position, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, limit int,
+ roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
) ([]types.StreamEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
+ rows, err := stmt.QueryContext(
+ ctx, roomID, r.Low(), r.High(),
+ pq.StringArray(eventFilter.Senders),
+ pq.StringArray(eventFilter.NotSenders),
+ pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
+ eventFilter.Limit,
+ )
if err != nil {
return nil, err
}