aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-04-17 11:25:33 +0200
committerGitHub <noreply@github.com>2023-04-17 10:25:33 +0100
commit9fa39263c0a4a8d349c8715f6ba30cae30b1b73a (patch)
tree5ff7275881e4896dfc39859aacf213394b355aa4 /syncapi
parentf66862958d1fca18cedf65999bb86273e81ad28f (diff)
Add sync API db tests (#3043)
Co-authored-by: kegsay <kegan@matrix.org>
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go44
-rw-r--r--syncapi/storage/shared/storage_consumer_test.go103
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go57
-rw-r--r--syncapi/storage/tables/interface.go2
-rw-r--r--syncapi/storage/tables/output_room_events_test.go50
5 files changed, 160 insertions, 96 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 3900ac3a..8ee5098c 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -136,15 +136,6 @@ FROM room_ids,
) AS x
`
-const selectEarlyEventsSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
- " WHERE room_id = $1 AND id > $2 AND id <= $3" +
- " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
- " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
- " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
- " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
- " ORDER BY id ASC LIMIT $8"
-
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
@@ -206,7 +197,6 @@ type outputRoomEventsStatements struct {
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt
- selectEarlyEventsStmt *sql.Stmt
selectStateInRangeFilteredStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
@@ -262,7 +252,6 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
{&s.selectRecentEventsStmt, selectRecentEventsSQL},
{&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
- {&s.selectEarlyEventsStmt, selectEarlyEventsSQL},
{&s.selectStateInRangeFilteredStmt, selectStateInRangeFilteredSQL},
{&s.selectStateInRangeStmt, selectStateInRangeSQL},
{&s.updateEventJSONStmt, updateEventJSONSQL},
@@ -530,39 +519,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return result, rows.Err()
}
-// selectEarlyEvents returns the earliest events in the given room, starting
-// from a given position, up to a maximum of 'limit'.
-func (s *outputRoomEventsStatements) SelectEarlyEvents(
- ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, eventFilter *synctypes.RoomEventFilter,
-) ([]types.StreamEvent, error) {
- senders, notSenders := getSendersRoomEventFilter(eventFilter)
- stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
- rows, err := stmt.QueryContext(
- ctx, roomID, r.Low(), r.High(),
- pq.StringArray(senders),
- pq.StringArray(notSenders),
- pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
- pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
- eventFilter.Limit,
- )
- if err != nil {
- return nil, err
- }
- defer internal.CloseAndLogIfError(ctx, rows, "selectEarlyEvents: rows.close() failed")
- events, err := rowsToStreamEvents(rows)
- if err != nil {
- return nil, err
- }
- // The events need to be returned from oldest to latest, which isn't
- // necessarily the way the SQL query returns them, so a sort is necessary to
- // ensure the events are in the right order in the slice.
- sort.SliceStable(events, func(i int, j int) bool {
- return events[i].StreamPosition < events[j].StreamPosition
- })
- return events, nil
-}
-
// selectEvents returns the events for the given event IDs. If an event is
// missing from the database, it will be omitted.
func (s *outputRoomEventsStatements) SelectEvents(
diff --git a/syncapi/storage/shared/storage_consumer_test.go b/syncapi/storage/shared/storage_consumer_test.go
new file mode 100644
index 00000000..e5f734c9
--- /dev/null
+++ b/syncapi/storage/shared/storage_consumer_test.go
@@ -0,0 +1,103 @@
+package shared_test
+
+import (
+ "context"
+ "reflect"
+ "testing"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/synctypes"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/dendrite/test/testrig"
+)
+
+func newSyncDB(t *testing.T, dbType test.DBType) (storage.Database, func()) {
+ t.Helper()
+
+ cfg, processCtx, closeDB := testrig.CreateConfig(t, dbType)
+ cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
+ syncDB, err := storage.NewSyncServerDatasource(processCtx.Context(), cm, &cfg.SyncAPI.Database)
+ if err != nil {
+ t.Fatalf("failed to create sync DB: %s", err)
+ }
+
+ return syncDB, closeDB
+}
+
+func TestFilterTable(t *testing.T) {
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ tab, closeDB := newSyncDB(t, dbType)
+ defer closeDB()
+
+ // initially create a filter
+ filter := &synctypes.Filter{}
+ filterID, err := tab.PutFilter(context.Background(), "alice", filter)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // create the same filter again, we should receive the existing filter
+ secondFilterID, err := tab.PutFilter(context.Background(), "alice", filter)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if secondFilterID != filterID {
+ t.Fatalf("expected second filter to be the same as the first: %s vs %s", filterID, secondFilterID)
+ }
+
+ // query the filter again
+ targetFilter := &synctypes.Filter{}
+ if err = tab.GetFilter(context.Background(), targetFilter, "alice", filterID); err != nil {
+ t.Fatal(err)
+ }
+
+ if !reflect.DeepEqual(filter, targetFilter) {
+ t.Fatalf("%#v vs %#v", filter, targetFilter)
+ }
+
+ // query non-existent filter
+ if err = tab.GetFilter(context.Background(), targetFilter, "bob", filterID); err == nil {
+ t.Fatalf("expected filter to not exist, but it does exist: %v", targetFilter)
+ }
+ })
+}
+
+func TestIgnores(t *testing.T) {
+ alice := test.NewUser(t)
+ bob := test.NewUser(t)
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ syncDB, closeDB := newSyncDB(t, dbType)
+ defer closeDB()
+
+ tab, err := syncDB.NewDatabaseTransaction(context.Background())
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer tab.Rollback() // nolint: errcheck
+
+ ignoredUsers := &types.IgnoredUsers{List: map[string]interface{}{
+ bob.ID: "",
+ }}
+ if err = tab.UpdateIgnoresForUser(context.Background(), alice.ID, ignoredUsers); err != nil {
+ t.Fatal(err)
+ }
+
+ gotIgnoredUsers, err := tab.IgnoresForUser(context.Background(), alice.ID)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // verify the ignored users matches those we stored
+ if !reflect.DeepEqual(gotIgnoredUsers, ignoredUsers) {
+ t.Fatalf("%#v vs %#v", gotIgnoredUsers, ignoredUsers)
+ }
+
+ // Bob doesn't have any ignored users, so should receive sql.ErrNoRows
+ if _, err = tab.IgnoresForUser(context.Background(), bob.ID); err == nil {
+ t.Fatalf("expected an error but got none")
+ }
+ })
+}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index 33ca687d..b5b6ea88 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -29,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
-
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/internal/sqlutil"
@@ -82,12 +81,6 @@ const selectRecentEventsForSyncSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
-const selectEarlyEventsSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
- " WHERE room_id = $1 AND id > $2 AND id <= $3"
-
-// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
-
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
@@ -119,7 +112,7 @@ const selectContextAfterEventSQL = "" +
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
-const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE type IN ($1) AND id > $2 LIMIT $3 ORDER BY id ASC"
+const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type IN ($2)"
const purgeEventsSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
@@ -430,42 +423,6 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
return result, nil
}
-func (s *outputRoomEventsStatements) SelectEarlyEvents(
- ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, eventFilter *synctypes.RoomEventFilter,
-) ([]types.StreamEvent, error) {
- stmt, params, err := prepareWithFilters(
- s.db, txn, selectEarlyEventsSQL,
- []interface{}{
- roomID, r.Low(), r.High(),
- },
- eventFilter.Senders, eventFilter.NotSenders,
- eventFilter.Types, eventFilter.NotTypes,
- nil, eventFilter.ContainsURL, eventFilter.Limit, FilterOrderAsc,
- )
- if err != nil {
- return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
- }
- defer internal.CloseAndLogIfError(ctx, stmt, "SelectEarlyEvents: stmt.close() failed")
-
- rows, err := stmt.QueryContext(ctx, params...)
- if err != nil {
- return nil, err
- }
- defer internal.CloseAndLogIfError(ctx, rows, "selectEarlyEvents: rows.close() failed")
- events, err := rowsToStreamEvents(rows)
- if err != nil {
- return nil, err
- }
- // The events need to be returned from oldest to latest, which isn't
- // necessarily the way the SQL query returns them, so a sort is necessary to
- // ensure the events are in the right order in the slice.
- sort.SliceStable(events, func(i int, j int) bool {
- return events[i].StreamPosition < events[j].StreamPosition
- })
- return events, nil
-}
-
// selectEvents returns the events for the given event IDs. If an event is
// missing from the database, it will be omitted.
func (s *outputRoomEventsStatements) SelectEvents(
@@ -686,18 +643,18 @@ func (s *outputRoomEventsStatements) PurgeEvents(
}
func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
- params := make([]interface{}, len(types))
+ params := make([]interface{}, len(types)+1)
+ params[0] = afterID
for i := range types {
- params[i] = types[i]
+ params[i+1] = types[i]
}
- params = append(params, afterID)
- params = append(params, limit)
- selectSQL := strings.Replace(selectSearchSQL, "($1)", sqlutil.QueryVariadic(len(types)), 1)
- stmt, err := s.db.Prepare(selectSQL)
+ selectSQL := strings.Replace(selectSearchSQL, "($2)", sqlutil.QueryVariadicOffset(len(types), 1), 1)
+ stmt, params, err := prepareWithFilters(s.db, txn, selectSQL, params, nil, nil, nil, nil, nil, nil, int(limit), FilterOrderAsc)
if err != nil {
return nil, err
}
+
defer internal.CloseAndLogIfError(ctx, stmt, "selectEvents: stmt.close() failed")
rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, params...)
if err != nil {
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index b710e60a..94c9c402 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -68,8 +68,6 @@ type Events interface {
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomIDs []string, r types.Range, eventFilter *synctypes.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) (map[string]types.RecentEvents, error)
- // SelectEarlyEvents returns the earliest events in the given room.
- SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *synctypes.RoomEventFilter) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string, filter *synctypes.RoomEventFilter, preserveOrder bool) ([]types.StreamEvent, error)
UpdateEventJSON(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
diff --git a/syncapi/storage/tables/output_room_events_test.go b/syncapi/storage/tables/output_room_events_test.go
index c0d45111..69587800 100644
--- a/syncapi/storage/tables/output_room_events_test.go
+++ b/syncapi/storage/tables/output_room_events_test.go
@@ -104,3 +104,53 @@ func TestOutputRoomEventsTable(t *testing.T) {
}
})
}
+
+func TestReindex(t *testing.T) {
+ ctx := context.Background()
+ alice := test.NewUser(t)
+ room := test.NewRoom(t, alice)
+
+ room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomName, map[string]interface{}{
+ "name": "my new room name",
+ }, test.WithStateKey(""))
+
+ room.CreateAndInsert(t, alice, gomatrixserverlib.MRoomTopic, map[string]interface{}{
+ "topic": "my new room topic",
+ }, test.WithStateKey(""))
+
+ room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{
+ "msgbody": "my room message",
+ "type": "m.text",
+ })
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ tab, db, close := newOutputRoomEventsTable(t, dbType)
+ defer close()
+ err := sqlutil.WithTransaction(db, func(txn *sql.Tx) error {
+ for _, ev := range room.Events() {
+ _, err := tab.InsertEvent(ctx, txn, ev, nil, nil, nil, false, gomatrixserverlib.HistoryVisibilityShared)
+ if err != nil {
+ return fmt.Errorf("failed to InsertEvent: %s", err)
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("err: %s", err)
+ }
+
+ events, err := tab.ReIndex(ctx, nil, 10, 0, []string{
+ gomatrixserverlib.MRoomName,
+ gomatrixserverlib.MRoomTopic,
+ "m.room.message"})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ wantEventCount := 3
+ if len(events) != wantEventCount {
+ t.Fatalf("expected %d events, got %d", wantEventCount, len(events))
+ }
+ })
+}