aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-26 15:34:41 +0100
committerGitHub <noreply@github.com>2020-06-26 15:34:41 +0100
commit1ad7219e4b6c71f64e4d44db17a6a8d729e6198a (patch)
treec13db3fd184c0c9bd7d879793be7e5aba2066121 /syncapi/storage/postgres
parent164057a3be1e666d6fb68398d616da9a8a665a18 (diff)
Implement /sync `limited` and read timeline limit from stored filters (#1168)
* Move filter table to syncapi where it is used * Implement /sync `limited` and read timeline limit from stored filters We now fully handle `room.timeline.limit` filters (in-line + stored) and return the right value for `limited` syncs. * Update whitelist * Default to the default timeline limit if it's unset, also strip the extra event correctly * Update whitelist
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/filter_table.go129
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go22
-rw-r--r--syncapi/storage/postgres/syncserver.go5
3 files changed, 151 insertions, 5 deletions
diff --git a/syncapi/storage/postgres/filter_table.go b/syncapi/storage/postgres/filter_table.go
new file mode 100644
index 00000000..beeb864b
--- /dev/null
+++ b/syncapi/storage/postgres/filter_table.go
@@ -0,0 +1,129 @@
+// Copyright 2017 Jan Christian Grünhage
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const filterSchema = `
+-- Stores data about filters
+CREATE TABLE IF NOT EXISTS syncapi_filter (
+ -- The filter
+ filter TEXT NOT NULL,
+ -- The ID
+ id SERIAL UNIQUE,
+ -- The localpart of the Matrix user ID associated to this filter
+ localpart TEXT NOT NULL,
+
+ PRIMARY KEY(id, localpart)
+);
+
+CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart);
+`
+
+const selectFilterSQL = "" +
+ "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2"
+
+const selectFilterIDByContentSQL = "" +
+ "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2"
+
+const insertFilterSQL = "" +
+ "INSERT INTO syncapi_filter (filter, id, localpart) VALUES ($1, DEFAULT, $2) RETURNING id"
+
+type filterStatements struct {
+ selectFilterStmt *sql.Stmt
+ selectFilterIDByContentStmt *sql.Stmt
+ insertFilterStmt *sql.Stmt
+}
+
+func NewPostgresFilterTable(db *sql.DB) (tables.Filter, error) {
+ _, err := db.Exec(filterSchema)
+ if err != nil {
+ return nil, err
+ }
+ s := &filterStatements{}
+ if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil {
+ return nil, err
+ }
+ if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil {
+ return nil, err
+ }
+ if s.insertFilterStmt, err = db.Prepare(insertFilterSQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+func (s *filterStatements) SelectFilter(
+ ctx context.Context, localpart string, filterID string,
+) (*gomatrixserverlib.Filter, error) {
+ // Retrieve filter from database (stored as canonical JSON)
+ var filterData []byte
+ err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData)
+ if err != nil {
+ return nil, err
+ }
+
+ // Unmarshal JSON into Filter struct
+ var filter gomatrixserverlib.Filter
+ if err = json.Unmarshal(filterData, &filter); err != nil {
+ return nil, err
+ }
+ return &filter, nil
+}
+
+func (s *filterStatements) InsertFilter(
+ ctx context.Context, filter *gomatrixserverlib.Filter, localpart string,
+) (filterID string, err error) {
+ var existingFilterID string
+
+ // Serialise json
+ filterJSON, err := json.Marshal(filter)
+ if err != nil {
+ return "", err
+ }
+ // Remove whitespaces and sort JSON data
+ // needed to prevent from inserting the same filter multiple times
+ filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON)
+ if err != nil {
+ return "", err
+ }
+
+ // Check if filter already exists in the database using its localpart and content
+ //
+ // This can result in a race condition when two clients try to insert the
+ // same filter and localpart at the same time, however this is not a
+ // problem as both calls will result in the same filterID
+ err = s.selectFilterIDByContentStmt.QueryRowContext(ctx,
+ localpart, filterJSON).Scan(&existingFilterID)
+ if err != nil && err != sql.ErrNoRows {
+ return "", err
+ }
+ // If it does, return the existing ID
+ if existingFilterID != "" {
+ return existingFilterID, err
+ }
+
+ // Otherwise insert the filter and return the new ID
+ err = s.insertFilterStmt.QueryRowContext(ctx, filterJSON, localpart).
+ Scan(&filterID)
+ return
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index f01b2eab..c7c4dc63 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -301,21 +301,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int,
chronologicalOrder bool, onlySyncEvents bool,
-) ([]types.StreamEvent, error) {
+) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt
if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
} else {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
}
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
if err != nil {
- return nil, err
+ return nil, false, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
events, err := rowsToStreamEvents(rows)
if err != nil {
- return nil, err
+ return nil, false, err
}
if chronologicalOrder {
// The events need to be returned from oldest to latest, which isn't
@@ -325,7 +325,19 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return events[i].StreamPosition < events[j].StreamPosition
})
}
- return events, nil
+ // we queried for 1 more than the limit, so if we returned one more mark limited=true
+ limited := false
+ if len(events) > limit {
+ limited = true
+ // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
+ if chronologicalOrder {
+ events = events[1:]
+ } else {
+ events = events[:len(events)-1]
+ }
+ }
+
+ return events, limited, nil
}
// selectEarlyEvents returns the earliest events in the given room, starting
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 573586cc..10c1b37c 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -71,6 +71,10 @@ func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*S
if err != nil {
return nil, err
}
+ filter, err := NewPostgresFilterTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
DB: d.db,
Invites: invites,
@@ -79,6 +83,7 @@ func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*S
Topology: topology,
CurrentRoomState: currState,
BackwardExtremities: backwardExtremities,
+ Filter: filter,
SendToDevice: sendToDevice,
SendToDeviceWriter: sqlutil.NewTransactionWriter(),
EDUCache: cache.New(),