diff options
author | Kegsay <kegan@matrix.org> | 2020-06-26 15:34:41 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-26 15:34:41 +0100 |
commit | 1ad7219e4b6c71f64e4d44db17a6a8d729e6198a (patch) | |
tree | c13db3fd184c0c9bd7d879793be7e5aba2066121 /syncapi/storage/postgres | |
parent | 164057a3be1e666d6fb68398d616da9a8a665a18 (diff) |
Implement /sync `limited` and read timeline limit from stored filters (#1168)
* Move filter table to syncapi where it is used
* Implement /sync `limited` and read timeline limit from stored filters
We now fully handle `room.timeline.limit` filters (in-line + stored) and
return the right value for `limited` syncs.
* Update whitelist
* Default to the default timeline limit if it's unset, also strip the extra event correctly
* Update whitelist
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r-- | syncapi/storage/postgres/filter_table.go | 129 | ||||
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 22 | ||||
-rw-r--r-- | syncapi/storage/postgres/syncserver.go | 5 |
3 files changed, 151 insertions, 5 deletions
diff --git a/syncapi/storage/postgres/filter_table.go b/syncapi/storage/postgres/filter_table.go new file mode 100644 index 00000000..beeb864b --- /dev/null +++ b/syncapi/storage/postgres/filter_table.go @@ -0,0 +1,129 @@ +// Copyright 2017 Jan Christian Grünhage +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + "encoding/json" + + "github.com/matrix-org/dendrite/syncapi/storage/tables" + "github.com/matrix-org/gomatrixserverlib" +) + +const filterSchema = ` +-- Stores data about filters +CREATE TABLE IF NOT EXISTS syncapi_filter ( + -- The filter + filter TEXT NOT NULL, + -- The ID + id SERIAL UNIQUE, + -- The localpart of the Matrix user ID associated to this filter + localpart TEXT NOT NULL, + + PRIMARY KEY(id, localpart) +); + +CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart); +` + +const selectFilterSQL = "" + + "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2" + +const selectFilterIDByContentSQL = "" + + "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2" + +const insertFilterSQL = "" + + "INSERT INTO syncapi_filter (filter, id, localpart) VALUES ($1, DEFAULT, $2) RETURNING id" + +type filterStatements struct { + selectFilterStmt *sql.Stmt + selectFilterIDByContentStmt *sql.Stmt + insertFilterStmt *sql.Stmt +} + +func NewPostgresFilterTable(db *sql.DB) (tables.Filter, error) { + _, err := db.Exec(filterSchema) + if err != nil { + return nil, err + } + s := &filterStatements{} + if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil { + return nil, err + } + if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil { + return nil, err + } + if s.insertFilterStmt, err = db.Prepare(insertFilterSQL); err != nil { + return nil, err + } + return s, nil +} + +func (s *filterStatements) SelectFilter( + ctx context.Context, localpart string, filterID string, +) (*gomatrixserverlib.Filter, error) { + // Retrieve filter from database (stored as canonical JSON) + var filterData []byte + err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData) + if err != nil { + return nil, err + } + + // Unmarshal JSON into Filter struct + var filter gomatrixserverlib.Filter + if err = json.Unmarshal(filterData, &filter); err != nil { + return nil, err + } + return &filter, nil +} + +func (s *filterStatements) InsertFilter( + ctx context.Context, filter *gomatrixserverlib.Filter, localpart string, +) (filterID string, err error) { + var existingFilterID string + + // Serialise json + filterJSON, err := json.Marshal(filter) + if err != nil { + return "", err + } + // Remove whitespaces and sort JSON data + // needed to prevent from inserting the same filter multiple times + filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON) + if err != nil { + return "", err + } + + // Check if filter already exists in the database using its localpart and content + // + // This can result in a race condition when two clients try to insert the + // same filter and localpart at the same time, however this is not a + // problem as both calls will result in the same filterID + err = s.selectFilterIDByContentStmt.QueryRowContext(ctx, + localpart, filterJSON).Scan(&existingFilterID) + if err != nil && err != sql.ErrNoRows { + return "", err + } + // If it does, return the existing ID + if existingFilterID != "" { + return existingFilterID, err + } + + // Otherwise insert the filter and return the new ID + err = s.insertFilterStmt.QueryRowContext(ctx, filterJSON, localpart). + Scan(&filterID) + return +} diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index f01b2eab..c7c4dc63 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -301,21 +301,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool, -) ([]types.StreamEvent, error) { +) ([]types.StreamEvent, bool, error) { var stmt *sql.Stmt if onlySyncEvents { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt) } else { stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1) if err != nil { - return nil, err + return nil, false, err } defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed") events, err := rowsToStreamEvents(rows) if err != nil { - return nil, err + return nil, false, err } if chronologicalOrder { // The events need to be returned from oldest to latest, which isn't @@ -325,7 +325,19 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( return events[i].StreamPosition < events[j].StreamPosition }) } - return events, nil + // we queried for 1 more than the limit, so if we returned one more mark limited=true + limited := false + if len(events) > limit { + limited = true + // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last. + if chronologicalOrder { + events = events[1:] + } else { + events = events[:len(events)-1] + } + } + + return events, limited, nil } // selectEarlyEvents returns the earliest events in the given room, starting diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go index 573586cc..10c1b37c 100644 --- a/syncapi/storage/postgres/syncserver.go +++ b/syncapi/storage/postgres/syncserver.go @@ -71,6 +71,10 @@ func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*S if err != nil { return nil, err } + filter, err := NewPostgresFilterTable(d.db) + if err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Invites: invites, @@ -79,6 +83,7 @@ func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*S Topology: topology, CurrentRoomState: currState, BackwardExtremities: backwardExtremities, + Filter: filter, SendToDevice: sendToDevice, SendToDeviceWriter: sqlutil.NewTransactionWriter(), EDUCache: cache.New(), |