diff options
author | Kegsay <kegan@matrix.org> | 2020-06-26 15:34:41 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-26 15:34:41 +0100 |
commit | 1ad7219e4b6c71f64e4d44db17a6a8d729e6198a (patch) | |
tree | c13db3fd184c0c9bd7d879793be7e5aba2066121 /syncapi/storage | |
parent | 164057a3be1e666d6fb68398d616da9a8a665a18 (diff) |
Implement /sync `limited` and read timeline limit from stored filters (#1168)
* Move filter table to syncapi where it is used
* Implement /sync `limited` and read timeline limit from stored filters
We now fully handle `room.timeline.limit` filters (in-line + stored) and
return the right value for `limited` syncs.
* Update whitelist
* Default to the default timeline limit if it's unset, also strip the extra event correctly
* Update whitelist
Diffstat (limited to 'syncapi/storage')
-rw-r--r-- | syncapi/storage/interface.go | 8 | ||||
-rw-r--r-- | syncapi/storage/postgres/filter_table.go | 129 | ||||
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 22 | ||||
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 5 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 24 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/filter_table.go | 137 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/output_room_events_table.go | 21 | ||||
-rw-r--r-- | syncapi/storage/sqlite3/syncserver.go | 5 | ||||
-rw-r--r-- | syncapi/storage/tables/interface.go | 9 |
9 files changed, 343 insertions, 17 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index c693326b..c4dae4d0 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -128,4 +128,12 @@ type Database interface { CleanSendToDeviceUpdates(ctx context.Context, toUpdate, toDelete []types.SendToDeviceNID, token types.StreamingToken) (err error) // SendToDeviceUpdatesWaiting returns true if there are send-to-device updates waiting to be sent. SendToDeviceUpdatesWaiting(ctx context.Context, userID, deviceID string) (bool, error) + // GetFilter looks up the filter associated with a given local user and filter ID. + // Returns a filter structure. Otherwise returns an error if no such filter exists + // or if there was an error talking to the database. + GetFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error) + // PutFilter puts the passed filter into the database. + // Returns the filterID as a string. Otherwise returns an error if something + // goes wrong. + PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error) } diff --git a/syncapi/storage/postgres/filter_table.go b/syncapi/storage/postgres/filter_table.go new file mode 100644 index 00000000..beeb864b --- /dev/null +++ b/syncapi/storage/postgres/filter_table.go @@ -0,0 +1,129 @@ +// Copyright 2017 Jan Christian Grünhage +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/gomatrixserverlib" +) + +const filterSchema = ` +-- Stores data about filters +CREATE TABLE IF NOT EXISTS syncapi_filter ( + -- The filter + filter TEXT NOT NULL, + -- The ID + id SERIAL UNIQUE, + -- The localpart of the Matrix user ID associated to this filter + localpart TEXT NOT NULL, + + PRIMARY KEY(id, localpart) +); + +CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart); +` + +const selectFilterSQL = "" + + "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2" + +const selectFilterIDByContentSQL = "" + + "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2" + +const insertFilterSQL = "" + + "INSERT INTO syncapi_filter (filter, id, localpart) VALUES ($1, DEFAULT, $2) RETURNING id" + +type filterStatements struct { + selectFilterStmt *sql.Stmt + selectFilterIDByContentStmt *sql.Stmt + insertFilterStmt *sql.Stmt +} + +func NewPostgresFilterTable(db *sql.DB) (tables.Filter, error) { + _, err := db.Exec(filterSchema) + if err != nil { + return nil, err + } + s := &filterStatements{} + if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil { + return nil, err + } + if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil { + return nil, err + } + if s.insertFilterStmt, err = db.Prepare(insertFilterSQL); err != nil { + return nil, err + } + return s, nil +} + +func (s *filterStatements) SelectFilter( + ctx context.Context, localpart string, filterID string, +) (*gomatrixserverlib.Filter, error) { + // Retrieve filter from database (stored as canonical JSON) + var filterData []byte + err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData) + if err != nil { + return nil, err + } + + // Unmarshal JSON into Filter struct + var filter gomatrixserverlib.Filter + if err = json.Unmarshal(filterData, &filter); err != nil { + return nil, err + } + return &filter, nil +} + +func (s *filterStatements) InsertFilter( + ctx context.Context, filter *gomatrixserverlib.Filter, localpart string, +) (filterID string, err error) { + var existingFilterID string + + // Serialise json + filterJSON, err := json.Marshal(filter) + if err != nil { + return "", err + } + // Remove whitespaces and sort JSON data + // needed to prevent from inserting the same filter multiple times + filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON) + if err != nil { + return "", err + } + + // Check if filter already exists in the database using its localpart and content + // + // This can result in a race condition when two clients try to insert the + // same filter and localpart at the same time, however this is not a + // problem as both calls will result in the same filterID + err = s.selectFilterIDByContentStmt.QueryRowContext(ctx, + localpart, filterJSON).Scan(&existingFilterID) + if err != nil && err != sql.ErrNoRows { + return "", err + } + // If it does, return the existing ID + if existingFilterID != "" { + return existingFilterID, err + } + + // Otherwise insert the filter and return the new ID + err = s.insertFilterStmt.QueryRowContext(ctx, filterJSON, localpart). + Scan(&filterID) + return +} diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index f01b2eab..c7c4dc63 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -301,21 +301,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool, -) ([]types.StreamEvent, error) { +) ([]types.StreamEvent, bool, error) { var stmt *sql.Stmt if onlySyncEvents { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) } else { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1) if err != nil { - return nil, err + return nil, false, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") events, err := rowsToStreamEvents(rows) if err != nil { - return nil, err + return nil, false, err } if chronologicalOrder { // The events need to be returned from oldest to latest, which isn't @@ -325,7 +325,19 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( return events[i].StreamPosition < events[j].StreamPosition }) } - return events, nil + // we queried for 1 more than the limit, so if we returned one more mark limited=true + limited := false + if len(events) > limit { + limited = true + // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last. + if chronologicalOrder { + events = events[1:] + } else { + events = events[:len(events)-1] + } + } + + return events, limited, nil } // selectEarlyEvents returns the earliest events in the given room, starting diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 573586cc..10c1b37c 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -71,6 +71,10 @@ func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*S if err != nil { return nil, err } + filter, err := NewPostgresFilterTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Invites: invites, @@ -79,6 +83,7 @@ func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*S Topology: topology, CurrentRoomState: currState, BackwardExtremities: backwardExtremities, + Filter: filter, SendToDevice: sendToDevice, SendToDeviceWriter: sqlutil.NewTransactionWriter(), EDUCache: cache.New(), diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index f84dc341..01362ddd 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -43,6 +43,7 @@ type Database struct { CurrentRoomState tables.CurrentRoomState BackwardExtremities tables.BackwardsExtremities SendToDevice tables.SendToDevice + Filter tables.Filter SendToDeviceWriter *sqlutil.TransactionWriter EDUCache *cache.EDUCache } @@ -78,7 +79,7 @@ func (d *Database) GetEventsInStreamingRange( } if backwardOrdering { // When using backward ordering, we want the most recent events first. - if events, err = d.OutputEvents.SelectRecentEvents( + if events, _, err = d.OutputEvents.SelectRecentEvents( ctx, nil, roomID, r, limit, false, false, ); err != nil { return @@ -545,6 +546,18 @@ func (d *Database) addEDUDeltaToResponse( return } +func (d *Database) GetFilter( + ctx context.Context, localpart string, filterID string, +) (*gomatrixserverlib.Filter, error) { + return d.Filter.SelectFilter(ctx, localpart, filterID) +} + +func (d *Database) PutFilter( + ctx context.Context, localpart string, filter *gomatrixserverlib.Filter, +) (string, error) { + return d.Filter.InsertFilter(ctx, filter, localpart) +} + func (d *Database) IncrementalSync( ctx context.Context, res *types.Response, device userapi.Device, @@ -642,7 +655,8 @@ func (d *Database) getResponseWithPDUsForCompleteSync( // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent - recentStreamEvents, err = d.OutputEvents.SelectRecentEvents( + var limited bool + recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents( ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { @@ -670,7 +684,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( jr := types.NewJoinResponse() jr.Timeline.PrevBatch = prevBatchStr jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = true + jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[roomID] = *jr } @@ -776,7 +790,7 @@ func (d *Database) addRoomDeltaToResponse( // This is all "okay" assuming history_visibility == "shared" which it is by default. r.To = delta.membershipPos } - recentStreamEvents, err := d.OutputEvents.SelectRecentEvents( + recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents( ctx, txn, delta.roomID, r, numRecentEventsPerRoom, true, true, ) @@ -796,7 +810,7 @@ func (d *Database) addRoomDeltaToResponse( jr.Timeline.PrevBatch = prevBatch.String() jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.Timeline.Limited = limited jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr case gomatrixserverlib.Leave: diff --git a/syncapi/storage/sqlite3/filter_table.go b/syncapi/storage/sqlite3/filter_table.go new file mode 100644 index 00000000..8b26759d --- /dev/null +++ b/syncapi/storage/sqlite3/filter_table.go @@ -0,0 +1,137 @@ +// Copyright 2017 Jan Christian Grünhage +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/gomatrixserverlib" +) + +const filterSchema = ` +-- Stores data about filters +CREATE TABLE IF NOT EXISTS syncapi_filter ( + -- The filter + filter TEXT NOT NULL, + -- The ID + id INTEGER PRIMARY KEY AUTOINCREMENT, + -- The localpart of the Matrix user ID associated to this filter + localpart TEXT NOT NULL, + + UNIQUE (id, localpart) +); + +CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart); +` + +const selectFilterSQL = "" + + "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2" + +const selectFilterIDByContentSQL = "" + + "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2" + +const insertFilterSQL = "" + + "INSERT INTO syncapi_filter (filter, localpart) VALUES ($1, $2)" + +type filterStatements struct { + selectFilterStmt *sql.Stmt + selectFilterIDByContentStmt *sql.Stmt + insertFilterStmt *sql.Stmt +} + +func NewSqliteFilterTable(db *sql.DB) (tables.Filter, error) { + _, err := db.Exec(filterSchema) + if err != nil { + return nil, err + } + s := &filterStatements{} + if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil { + return nil, err + } + if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil { + return nil, err + } + if s.insertFilterStmt, err = db.Prepare(insertFilterSQL); err != nil { + return nil, err + } + return s, nil +} + +func (s *filterStatements) SelectFilter( + ctx context.Context, localpart string, filterID string, +) (*gomatrixserverlib.Filter, error) { + // Retrieve filter from database (stored as canonical JSON) + var filterData []byte + err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData) + if err != nil { + return nil, err + } + + // Unmarshal JSON into Filter struct + var filter gomatrixserverlib.Filter + if err = json.Unmarshal(filterData, &filter); err != nil { + return nil, err + } + return &filter, nil +} + +func (s *filterStatements) InsertFilter( + ctx context.Context, filter *gomatrixserverlib.Filter, localpart string, +) (filterID string, err error) { + var existingFilterID string + + // Serialise json + filterJSON, err := json.Marshal(filter) + if err != nil { + return "", err + } + // Remove whitespaces and sort JSON data + // needed to prevent from inserting the same filter multiple times + filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON) + if err != nil { + return "", err + } + + // Check if filter already exists in the database using its localpart and content + // + // This can result in a race condition when two clients try to insert the + // same filter and localpart at the same time, however this is not a + // problem as both calls will result in the same filterID + err = s.selectFilterIDByContentStmt.QueryRowContext(ctx, + localpart, filterJSON).Scan(&existingFilterID) + if err != nil && err != sql.ErrNoRows { + return "", err + } + // If it does, return the existing ID + if existingFilterID != "" { + return existingFilterID, err + } + + // Otherwise insert the filter and return the new ID + res, err := s.insertFilterStmt.ExecContext(ctx, filterJSON, localpart) + if err != nil { + return "", err + } + rowid, err := res.LastInsertId() + if err != nil { + return "", err + } + filterID = fmt.Sprintf("%d", rowid) + return +} diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 367ab3c9..0c909cc4 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -311,7 +311,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool, -) ([]types.StreamEvent, error) { +) ([]types.StreamEvent, bool, error) { var stmt *sql.Stmt if onlySyncEvents { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) @@ -319,14 +319,14 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1) if err != nil { - return nil, err + return nil, false, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") events, err := rowsToStreamEvents(rows) if err != nil { - return nil, err + return nil, false, err } if chronologicalOrder { // The events need to be returned from oldest to latest, which isn't @@ -336,7 +336,18 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( return events[i].StreamPosition < events[j].StreamPosition }) } - return events, nil + // we queried for 1 more than the limit, so if we returned one more mark limited=true + limited := false + if len(events) > limit { + limited = true + // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last. + if chronologicalOrder { + events = events[1:] + } else { + events = events[:len(events)-1] + } + } + return events, limited, nil } func (s *outputRoomEventsStatements) SelectEarlyEvents( diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go index 51cdbe32..c85db5a4 100644 --- a/syncapi/storage/sqlite3/syncserver.go +++ b/syncapi/storage/sqlite3/syncserver.go @@ -87,6 +87,10 @@ func (d *SyncServerDatasource) prepare() (err error) { if err != nil { return err } + filter, err := NewSqliteFilterTable(d.db) + if err != nil { + return err + } d.Database = shared.Database{ DB: d.db, Invites: invites, @@ -95,6 +99,7 @@ func (d *SyncServerDatasource) prepare() (err error) { BackwardExtremities: bwExtrem, CurrentRoomState: roomState, Topology: topology, + Filter: filter, SendToDevice: sendToDevice, SendToDeviceWriter: sqlutil.NewTransactionWriter(), EDUCache: cache.New(), diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 246dc695..4ac0be4e 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -44,8 +44,8 @@ type Events interface { InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error) // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync. - // Returns up to `limit` events. - SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error) + // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`. + SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) // SelectEarlyEvents returns the earliest events in the given room. SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error) SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error) @@ -133,3 +133,8 @@ type SendToDevice interface { DeleteSendToDeviceMessages(ctx context.Context, txn *sql.Tx, nids []types.SendToDeviceNID) (err error) CountSendToDeviceMessages(ctx context.Context, txn *sql.Tx, userID, deviceID string) (count int, err error) } + +type Filter interface { + SelectFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error) + InsertFilter(ctx context.Context, filter *gomatrixserverlib.Filter, localpart string) (filterID string, err error) +} |