aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-26 15:34:41 +0100
committerGitHub <noreply@github.com>2020-06-26 15:34:41 +0100
commit1ad7219e4b6c71f64e4d44db17a6a8d729e6198a (patch)
treec13db3fd184c0c9bd7d879793be7e5aba2066121 /syncapi/storage
parent164057a3be1e666d6fb68398d616da9a8a665a18 (diff)
Implement /sync `limited` and read timeline limit from stored filters (#1168)
* Move filter table to syncapi where it is used * Implement /sync `limited` and read timeline limit from stored filters We now fully handle `room.timeline.limit` filters (in-line + stored) and return the right value for `limited` syncs. * Update whitelist * Default to the default timeline limit if it's unset, also strip the extra event correctly * Update whitelist
Diffstat (limited to 'syncapi/storage')
-rw-r--r--syncapi/storage/interface.go8
-rw-r--r--syncapi/storage/postgres/filter_table.go129
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go22
-rw-r--r--syncapi/storage/postgres/syncserver.go5
-rw-r--r--syncapi/storage/shared/syncserver.go24
-rw-r--r--syncapi/storage/sqlite3/filter_table.go137
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go21
-rw-r--r--syncapi/storage/sqlite3/syncserver.go5
-rw-r--r--syncapi/storage/tables/interface.go9
9 files changed, 343 insertions, 17 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index c693326b..c4dae4d0 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -128,4 +128,12 @@ type Database interface {
CleanSendToDeviceUpdates(ctx context.Context, toUpdate, toDelete []types.SendToDeviceNID, token types.StreamingToken) (err error)
// SendToDeviceUpdatesWaiting returns true if there are send-to-device updates waiting to be sent.
SendToDeviceUpdatesWaiting(ctx context.Context, userID, deviceID string) (bool, error)
+ // GetFilter looks up the filter associated with a given local user and filter ID.
+ // Returns a filter structure. Otherwise returns an error if no such filter exists
+ // or if there was an error talking to the database.
+ GetFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error)
+ // PutFilter puts the passed filter into the database.
+ // Returns the filterID as a string. Otherwise returns an error if something
+ // goes wrong.
+ PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
}
diff --git a/syncapi/storage/postgres/filter_table.go b/syncapi/storage/postgres/filter_table.go
new file mode 100644
index 00000000..beeb864b
--- /dev/null
+++ b/syncapi/storage/postgres/filter_table.go
@@ -0,0 +1,129 @@
+// Copyright 2017 Jan Christian Grünhage
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const filterSchema = `
+-- Stores data about filters
+CREATE TABLE IF NOT EXISTS syncapi_filter (
+ -- The filter
+ filter TEXT NOT NULL,
+ -- The ID
+ id SERIAL UNIQUE,
+ -- The localpart of the Matrix user ID associated to this filter
+ localpart TEXT NOT NULL,
+
+ PRIMARY KEY(id, localpart)
+);
+
+CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart);
+`
+
+const selectFilterSQL = "" +
+ "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2"
+
+const selectFilterIDByContentSQL = "" +
+ "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2"
+
+const insertFilterSQL = "" +
+ "INSERT INTO syncapi_filter (filter, id, localpart) VALUES ($1, DEFAULT, $2) RETURNING id"
+
+type filterStatements struct {
+ selectFilterStmt *sql.Stmt
+ selectFilterIDByContentStmt *sql.Stmt
+ insertFilterStmt *sql.Stmt
+}
+
+func NewPostgresFilterTable(db *sql.DB) (tables.Filter, error) {
+ _, err := db.Exec(filterSchema)
+ if err != nil {
+ return nil, err
+ }
+ s := &filterStatements{}
+ if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil {
+ return nil, err
+ }
+ if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil {
+ return nil, err
+ }
+ if s.insertFilterStmt, err = db.Prepare(insertFilterSQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+func (s *filterStatements) SelectFilter(
+ ctx context.Context, localpart string, filterID string,
+) (*gomatrixserverlib.Filter, error) {
+ // Retrieve filter from database (stored as canonical JSON)
+ var filterData []byte
+ err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData)
+ if err != nil {
+ return nil, err
+ }
+
+ // Unmarshal JSON into Filter struct
+ var filter gomatrixserverlib.Filter
+ if err = json.Unmarshal(filterData, &filter); err != nil {
+ return nil, err
+ }
+ return &filter, nil
+}
+
+func (s *filterStatements) InsertFilter(
+ ctx context.Context, filter *gomatrixserverlib.Filter, localpart string,
+) (filterID string, err error) {
+ var existingFilterID string
+
+ // Serialise json
+ filterJSON, err := json.Marshal(filter)
+ if err != nil {
+ return "", err
+ }
+ // Remove whitespaces and sort JSON data
+ // needed to prevent from inserting the same filter multiple times
+ filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON)
+ if err != nil {
+ return "", err
+ }
+
+ // Check if filter already exists in the database using its localpart and content
+ //
+ // This can result in a race condition when two clients try to insert the
+ // same filter and localpart at the same time, however this is not a
+ // problem as both calls will result in the same filterID
+ err = s.selectFilterIDByContentStmt.QueryRowContext(ctx,
+ localpart, filterJSON).Scan(&existingFilterID)
+ if err != nil && err != sql.ErrNoRows {
+ return "", err
+ }
+ // If it does, return the existing ID
+ if existingFilterID != "" {
+ return existingFilterID, err
+ }
+
+ // Otherwise insert the filter and return the new ID
+ err = s.insertFilterStmt.QueryRowContext(ctx, filterJSON, localpart).
+ Scan(&filterID)
+ return
+}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index f01b2eab..c7c4dc63 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -301,21 +301,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int,
chronologicalOrder bool, onlySyncEvents bool,
-) ([]types.StreamEvent, error) {
+) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt
if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
} else {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
}
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
if err != nil {
- return nil, err
+ return nil, false, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
events, err := rowsToStreamEvents(rows)
if err != nil {
- return nil, err
+ return nil, false, err
}
if chronologicalOrder {
// The events need to be returned from oldest to latest, which isn't
@@ -325,7 +325,19 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return events[i].StreamPosition < events[j].StreamPosition
})
}
- return events, nil
+ // we queried for 1 more than the limit, so if we returned one more mark limited=true
+ limited := false
+ if len(events) > limit {
+ limited = true
+ // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
+ if chronologicalOrder {
+ events = events[1:]
+ } else {
+ events = events[:len(events)-1]
+ }
+ }
+
+ return events, limited, nil
}
// selectEarlyEvents returns the earliest events in the given room, starting
diff --git a/syncapi/storage/postgres/syncserver.go b/syncapi/storage/postgres/syncserver.go
index 573586cc..10c1b37c 100644
--- a/syncapi/storage/postgres/syncserver.go
+++ b/syncapi/storage/postgres/syncserver.go
@@ -71,6 +71,10 @@ func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*S
if err != nil {
return nil, err
}
+ filter, err := NewPostgresFilterTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
DB: d.db,
Invites: invites,
@@ -79,6 +83,7 @@ func NewDatabase(dbDataSourceName string, dbProperties sqlutil.DbProperties) (*S
Topology: topology,
CurrentRoomState: currState,
BackwardExtremities: backwardExtremities,
+ Filter: filter,
SendToDevice: sendToDevice,
SendToDeviceWriter: sqlutil.NewTransactionWriter(),
EDUCache: cache.New(),
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index f84dc341..01362ddd 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -43,6 +43,7 @@ type Database struct {
CurrentRoomState tables.CurrentRoomState
BackwardExtremities tables.BackwardsExtremities
SendToDevice tables.SendToDevice
+ Filter tables.Filter
SendToDeviceWriter *sqlutil.TransactionWriter
EDUCache *cache.EDUCache
}
@@ -78,7 +79,7 @@ func (d *Database) GetEventsInStreamingRange(
}
if backwardOrdering {
// When using backward ordering, we want the most recent events first.
- if events, err = d.OutputEvents.SelectRecentEvents(
+ if events, _, err = d.OutputEvents.SelectRecentEvents(
ctx, nil, roomID, r, limit, false, false,
); err != nil {
return
@@ -545,6 +546,18 @@ func (d *Database) addEDUDeltaToResponse(
return
}
+func (d *Database) GetFilter(
+ ctx context.Context, localpart string, filterID string,
+) (*gomatrixserverlib.Filter, error) {
+ return d.Filter.SelectFilter(ctx, localpart, filterID)
+}
+
+func (d *Database) PutFilter(
+ ctx context.Context, localpart string, filter *gomatrixserverlib.Filter,
+) (string, error) {
+ return d.Filter.InsertFilter(ctx, filter, localpart)
+}
+
func (d *Database) IncrementalSync(
ctx context.Context, res *types.Response,
device userapi.Device,
@@ -642,7 +655,8 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
- recentStreamEvents, err = d.OutputEvents.SelectRecentEvents(
+ var limited bool
+ recentStreamEvents, limited, err = d.OutputEvents.SelectRecentEvents(
ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
)
if err != nil {
@@ -670,7 +684,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = prevBatchStr
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = true
+ jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
@@ -776,7 +790,7 @@ func (d *Database) addRoomDeltaToResponse(
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.membershipPos
}
- recentStreamEvents, err := d.OutputEvents.SelectRecentEvents(
+ recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
ctx, txn, delta.roomID, r,
numRecentEventsPerRoom, true, true,
)
@@ -796,7 +810,7 @@ func (d *Database) addRoomDeltaToResponse(
jr.Timeline.PrevBatch = prevBatch.String()
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
- jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Leave:
diff --git a/syncapi/storage/sqlite3/filter_table.go b/syncapi/storage/sqlite3/filter_table.go
new file mode 100644
index 00000000..8b26759d
--- /dev/null
+++ b/syncapi/storage/sqlite3/filter_table.go
@@ -0,0 +1,137 @@
+// Copyright 2017 Jan Christian Grünhage
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "fmt"
+
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const filterSchema = `
+-- Stores data about filters
+CREATE TABLE IF NOT EXISTS syncapi_filter (
+ -- The filter
+ filter TEXT NOT NULL,
+ -- The ID
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ -- The localpart of the Matrix user ID associated to this filter
+ localpart TEXT NOT NULL,
+
+ UNIQUE (id, localpart)
+);
+
+CREATE INDEX IF NOT EXISTS syncapi_filter_localpart ON syncapi_filter(localpart);
+`
+
+const selectFilterSQL = "" +
+ "SELECT filter FROM syncapi_filter WHERE localpart = $1 AND id = $2"
+
+const selectFilterIDByContentSQL = "" +
+ "SELECT id FROM syncapi_filter WHERE localpart = $1 AND filter = $2"
+
+const insertFilterSQL = "" +
+ "INSERT INTO syncapi_filter (filter, localpart) VALUES ($1, $2)"
+
+type filterStatements struct {
+ selectFilterStmt *sql.Stmt
+ selectFilterIDByContentStmt *sql.Stmt
+ insertFilterStmt *sql.Stmt
+}
+
+func NewSqliteFilterTable(db *sql.DB) (tables.Filter, error) {
+ _, err := db.Exec(filterSchema)
+ if err != nil {
+ return nil, err
+ }
+ s := &filterStatements{}
+ if s.selectFilterStmt, err = db.Prepare(selectFilterSQL); err != nil {
+ return nil, err
+ }
+ if s.selectFilterIDByContentStmt, err = db.Prepare(selectFilterIDByContentSQL); err != nil {
+ return nil, err
+ }
+ if s.insertFilterStmt, err = db.Prepare(insertFilterSQL); err != nil {
+ return nil, err
+ }
+ return s, nil
+}
+
+func (s *filterStatements) SelectFilter(
+ ctx context.Context, localpart string, filterID string,
+) (*gomatrixserverlib.Filter, error) {
+ // Retrieve filter from database (stored as canonical JSON)
+ var filterData []byte
+ err := s.selectFilterStmt.QueryRowContext(ctx, localpart, filterID).Scan(&filterData)
+ if err != nil {
+ return nil, err
+ }
+
+ // Unmarshal JSON into Filter struct
+ var filter gomatrixserverlib.Filter
+ if err = json.Unmarshal(filterData, &filter); err != nil {
+ return nil, err
+ }
+ return &filter, nil
+}
+
+func (s *filterStatements) InsertFilter(
+ ctx context.Context, filter *gomatrixserverlib.Filter, localpart string,
+) (filterID string, err error) {
+ var existingFilterID string
+
+ // Serialise json
+ filterJSON, err := json.Marshal(filter)
+ if err != nil {
+ return "", err
+ }
+ // Remove whitespaces and sort JSON data
+ // needed to prevent from inserting the same filter multiple times
+ filterJSON, err = gomatrixserverlib.CanonicalJSON(filterJSON)
+ if err != nil {
+ return "", err
+ }
+
+ // Check if filter already exists in the database using its localpart and content
+ //
+ // This can result in a race condition when two clients try to insert the
+ // same filter and localpart at the same time, however this is not a
+ // problem as both calls will result in the same filterID
+ err = s.selectFilterIDByContentStmt.QueryRowContext(ctx,
+ localpart, filterJSON).Scan(&existingFilterID)
+ if err != nil && err != sql.ErrNoRows {
+ return "", err
+ }
+ // If it does, return the existing ID
+ if existingFilterID != "" {
+ return existingFilterID, err
+ }
+
+ // Otherwise insert the filter and return the new ID
+ res, err := s.insertFilterStmt.ExecContext(ctx, filterJSON, localpart)
+ if err != nil {
+ return "", err
+ }
+ rowid, err := res.LastInsertId()
+ if err != nil {
+ return "", err
+ }
+ filterID = fmt.Sprintf("%d", rowid)
+ return
+}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index 367ab3c9..0c909cc4 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -311,7 +311,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, r types.Range, limit int,
chronologicalOrder bool, onlySyncEvents bool,
-) ([]types.StreamEvent, error) {
+) ([]types.StreamEvent, bool, error) {
var stmt *sql.Stmt
if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
@@ -319,14 +319,14 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
}
- rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
if err != nil {
- return nil, err
+ return nil, false, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
events, err := rowsToStreamEvents(rows)
if err != nil {
- return nil, err
+ return nil, false, err
}
if chronologicalOrder {
// The events need to be returned from oldest to latest, which isn't
@@ -336,7 +336,18 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return events[i].StreamPosition < events[j].StreamPosition
})
}
- return events, nil
+ // we queried for 1 more than the limit, so if we returned one more mark limited=true
+ limited := false
+ if len(events) > limit {
+ limited = true
+ // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
+ if chronologicalOrder {
+ events = events[1:]
+ } else {
+ events = events[:len(events)-1]
+ }
+ }
+ return events, limited, nil
}
func (s *outputRoomEventsStatements) SelectEarlyEvents(
diff --git a/syncapi/storage/sqlite3/syncserver.go b/syncapi/storage/sqlite3/syncserver.go
index 51cdbe32..c85db5a4 100644
--- a/syncapi/storage/sqlite3/syncserver.go
+++ b/syncapi/storage/sqlite3/syncserver.go
@@ -87,6 +87,10 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err != nil {
return err
}
+ filter, err := NewSqliteFilterTable(d.db)
+ if err != nil {
+ return err
+ }
d.Database = shared.Database{
DB: d.db,
Invites: invites,
@@ -95,6 +99,7 @@ func (d *SyncServerDatasource) prepare() (err error) {
BackwardExtremities: bwExtrem,
CurrentRoomState: roomState,
Topology: topology,
+ Filter: filter,
SendToDevice: sendToDevice,
SendToDeviceWriter: sqlutil.NewTransactionWriter(),
EDUCache: cache.New(),
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 246dc695..4ac0be4e 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -44,8 +44,8 @@ type Events interface {
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
- // Returns up to `limit` events.
- SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error)
+ // Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
+ SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
// SelectEarlyEvents returns the earliest events in the given room.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
@@ -133,3 +133,8 @@ type SendToDevice interface {
DeleteSendToDeviceMessages(ctx context.Context, txn *sql.Tx, nids []types.SendToDeviceNID) (err error)
CountSendToDeviceMessages(ctx context.Context, txn *sql.Tx, userID, deviceID string) (count int, err error)
}
+
+type Filter interface {
+ SelectFilter(ctx context.Context, localpart string, filterID string) (*gomatrixserverlib.Filter, error)
+ InsertFilter(ctx context.Context, filter *gomatrixserverlib.Filter, localpart string) (filterID string, err error)
+}