aboutsummaryrefslogtreecommitdiff
path: root/keyserver
diff options
context:
space:
mode:
Diffstat (limited to 'keyserver')
-rw-r--r--keyserver/internal/device_list_update.go29
-rw-r--r--keyserver/internal/device_list_update_test.go86
-rw-r--r--keyserver/keyserver.go12
-rw-r--r--keyserver/keyserver_test.go29
-rw-r--r--keyserver/storage/interface.go5
-rw-r--r--keyserver/storage/postgres/stale_device_lists.go33
-rw-r--r--keyserver/storage/shared/storage.go10
-rw-r--r--keyserver/storage/sqlite3/stale_device_lists.go44
-rw-r--r--keyserver/storage/tables/interface.go1
-rw-r--r--keyserver/storage/tables/stale_device_lists_test.go94
10 files changed, 317 insertions, 26 deletions
diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go
index 8ff9dfc3..c7bf8da5 100644
--- a/keyserver/internal/device_list_update.go
+++ b/keyserver/internal/device_list_update.go
@@ -24,6 +24,8 @@ import (
"sync"
"time"
+ rsapi "github.com/matrix-org/dendrite/roomserver/api"
+
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -102,6 +104,7 @@ type DeviceListUpdater struct {
// block on or timeout via a select.
userIDToChan map[string]chan bool
userIDToChanMu *sync.Mutex
+ rsAPI rsapi.KeyserverRoomserverAPI
}
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
@@ -124,6 +127,8 @@ type DeviceListUpdaterDatabase interface {
// DeviceKeysJSON populates the KeyJSON for the given keys. If any proided `keys` have a `KeyJSON` or `StreamID` already then it will be replaced.
DeviceKeysJSON(ctx context.Context, keys []api.DeviceMessage) error
+
+ DeleteStaleDeviceLists(ctx context.Context, userIDs []string) error
}
type DeviceListUpdaterAPI interface {
@@ -140,7 +145,7 @@ func NewDeviceListUpdater(
process *process.ProcessContext, db DeviceListUpdaterDatabase,
api DeviceListUpdaterAPI, producer KeyChangeProducer,
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
- thisServer gomatrixserverlib.ServerName,
+ rsAPI rsapi.KeyserverRoomserverAPI, thisServer gomatrixserverlib.ServerName,
) *DeviceListUpdater {
return &DeviceListUpdater{
process: process,
@@ -154,6 +159,7 @@ func NewDeviceListUpdater(
workerChans: make([]chan gomatrixserverlib.ServerName, numWorkers),
userIDToChan: make(map[string]chan bool),
userIDToChanMu: &sync.Mutex{},
+ rsAPI: rsAPI,
}
}
@@ -168,7 +174,7 @@ func (u *DeviceListUpdater) Start() error {
go u.worker(ch)
}
- staleLists, err := u.db.StaleDeviceLists(context.Background(), []gomatrixserverlib.ServerName{})
+ staleLists, err := u.db.StaleDeviceLists(u.process.Context(), []gomatrixserverlib.ServerName{})
if err != nil {
return err
}
@@ -186,6 +192,25 @@ func (u *DeviceListUpdater) Start() error {
return nil
}
+// CleanUp removes stale device entries for users we don't share a room with anymore
+func (u *DeviceListUpdater) CleanUp() error {
+ staleUsers, err := u.db.StaleDeviceLists(u.process.Context(), []gomatrixserverlib.ServerName{})
+ if err != nil {
+ return err
+ }
+
+ res := rsapi.QueryLeftUsersResponse{}
+ if err = u.rsAPI.QueryLeftUsers(u.process.Context(), &rsapi.QueryLeftUsersRequest{StaleDeviceListUsers: staleUsers}, &res); err != nil {
+ return err
+ }
+
+ if len(res.LeftUsers) == 0 {
+ return nil
+ }
+ logrus.Debugf("Deleting %d stale device list entries", len(res.LeftUsers))
+ return u.db.DeleteStaleDeviceLists(u.process.Context(), res.LeftUsers)
+}
+
func (u *DeviceListUpdater) mutex(userID string) *sync.Mutex {
u.mu.Lock()
defer u.mu.Unlock()
diff --git a/keyserver/internal/device_list_update_test.go b/keyserver/internal/device_list_update_test.go
index a374c951..60a2c2f3 100644
--- a/keyserver/internal/device_list_update_test.go
+++ b/keyserver/internal/device_list_update_test.go
@@ -30,7 +30,12 @@ import (
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/keyserver/storage"
+ roomserver "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/dendrite/test/testrig"
)
var (
@@ -53,6 +58,10 @@ type mockDeviceListUpdaterDatabase struct {
mu sync.Mutex // protect staleUsers
}
+func (d *mockDeviceListUpdaterDatabase) DeleteStaleDeviceLists(ctx context.Context, userIDs []string) error {
+ return nil
+}
+
// StaleDeviceLists returns a list of user IDs ending with the domains provided who have stale device lists.
// If no domains are given, all user IDs with stale device lists are returned.
func (d *mockDeviceListUpdaterDatabase) StaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error) {
@@ -153,7 +162,7 @@ func TestUpdateHavePrevID(t *testing.T) {
}
ap := &mockDeviceListUpdaterAPI{}
producer := &mockKeyChangeProducer{}
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, "localhost")
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost")
event := gomatrixserverlib.DeviceListUpdateEvent{
DeviceDisplayName: "Foo Bar",
Deleted: false,
@@ -225,7 +234,7 @@ func TestUpdateNoPrevID(t *testing.T) {
`)),
}, nil
})
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, "example.test")
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test")
if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err)
}
@@ -239,6 +248,7 @@ func TestUpdateNoPrevID(t *testing.T) {
UserID: remoteUserID,
}
err := updater.Update(ctx, event)
+
if err != nil {
t.Fatalf("Update returned an error: %s", err)
}
@@ -294,7 +304,7 @@ func TestDebounce(t *testing.T) {
close(incomingFedReq)
return <-fedCh, nil
})
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, "localhost")
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost")
if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err)
}
@@ -349,3 +359,73 @@ func TestDebounce(t *testing.T) {
t.Errorf("user %s is marked as stale", userID)
}
}
+
+func mustCreateKeyserverDB(t *testing.T, dbType test.DBType) (storage.Database, func()) {
+ t.Helper()
+
+ base, _, _ := testrig.Base(nil)
+ connStr, clearDB := test.PrepareDBConnectionString(t, dbType)
+ db, err := storage.NewDatabase(base, &config.DatabaseOptions{ConnectionString: config.DataSource(connStr)})
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ return db, clearDB
+}
+
+type mockKeyserverRoomserverAPI struct {
+ leftUsers []string
+}
+
+func (m *mockKeyserverRoomserverAPI) QueryLeftUsers(ctx context.Context, req *roomserver.QueryLeftUsersRequest, res *roomserver.QueryLeftUsersResponse) error {
+ res.LeftUsers = m.leftUsers
+ return nil
+}
+
+func TestDeviceListUpdater_CleanUp(t *testing.T) {
+ processCtx := process.NewProcessContext()
+
+ alice := test.NewUser(t)
+ bob := test.NewUser(t)
+
+ // Bob is not joined to any of our rooms
+ rsAPI := &mockKeyserverRoomserverAPI{leftUsers: []string{bob.ID}}
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ db, clearDB := mustCreateKeyserverDB(t, dbType)
+ defer clearDB()
+
+ // This should not get deleted
+ if err := db.MarkDeviceListStale(processCtx.Context(), alice.ID, true); err != nil {
+ t.Error(err)
+ }
+
+ // this one should get deleted
+ if err := db.MarkDeviceListStale(processCtx.Context(), bob.ID, true); err != nil {
+ t.Error(err)
+ }
+
+ updater := NewDeviceListUpdater(processCtx, db, nil,
+ nil, nil,
+ 0, rsAPI, "test")
+ if err := updater.CleanUp(); err != nil {
+ t.Error(err)
+ }
+
+ // check that we still have Alice in our stale list
+ staleUsers, err := db.StaleDeviceLists(ctx, []gomatrixserverlib.ServerName{"test"})
+ if err != nil {
+ t.Error(err)
+ }
+
+ // There should only be Alice
+ wantCount := 1
+ if count := len(staleUsers); count != wantCount {
+ t.Fatalf("expected there to be %d stale device lists, got %d", wantCount, count)
+ }
+
+ if staleUsers[0] != alice.ID {
+ t.Fatalf("unexpected stale device list user: %s, want %s", staleUsers[0], alice.ID)
+ }
+ })
+}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 5360c06f..27557677 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -18,6 +18,8 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
+ rsapi "github.com/matrix-org/dendrite/roomserver/api"
+
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/consumers"
@@ -40,6 +42,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI, enableMetr
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.KeyserverFederationAPI,
+ rsAPI rsapi.KeyserverRoomserverAPI,
) api.KeyInternalAPI {
js, _ := base.NATS.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
@@ -47,6 +50,7 @@ func NewInternalAPI(
if err != nil {
logrus.WithError(err).Panicf("failed to connect to key server database")
}
+
keyChangeProducer := &producers.KeyChange{
Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent)),
JetStream: js,
@@ -58,8 +62,14 @@ func NewInternalAPI(
FedClient: fedClient,
Producer: keyChangeProducer,
}
- updater := internal.NewDeviceListUpdater(base.ProcessContext, db, ap, keyChangeProducer, fedClient, 8, cfg.Matrix.ServerName) // 8 workers TODO: configurable
+ updater := internal.NewDeviceListUpdater(base.ProcessContext, db, ap, keyChangeProducer, fedClient, 8, rsAPI, cfg.Matrix.ServerName) // 8 workers TODO: configurable
ap.Updater = updater
+
+ // Remove users which we don't share a room with anymore
+ if err := updater.CleanUp(); err != nil {
+ logrus.WithError(err).Error("failed to cleanup stale device lists")
+ }
+
go func() {
if err := updater.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start device list updater")
diff --git a/keyserver/keyserver_test.go b/keyserver/keyserver_test.go
new file mode 100644
index 00000000..159b280f
--- /dev/null
+++ b/keyserver/keyserver_test.go
@@ -0,0 +1,29 @@
+package keyserver
+
+import (
+ "context"
+ "testing"
+
+ roomserver "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/dendrite/test/testrig"
+)
+
+type mockKeyserverRoomserverAPI struct {
+ leftUsers []string
+}
+
+func (m *mockKeyserverRoomserverAPI) QueryLeftUsers(ctx context.Context, req *roomserver.QueryLeftUsersRequest, res *roomserver.QueryLeftUsersResponse) error {
+ res.LeftUsers = m.leftUsers
+ return nil
+}
+
+// Merely tests that we can create an internal keyserver API
+func Test_NewInternalAPI(t *testing.T) {
+ rsAPI := &mockKeyserverRoomserverAPI{}
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ base, closeBase := testrig.CreateBaseDendrite(t, dbType)
+ defer closeBase()
+ _ = NewInternalAPI(base, &base.Cfg.KeyServer, nil, rsAPI)
+ })
+}
diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go
index 242e16a0..c6a8f44c 100644
--- a/keyserver/storage/interface.go
+++ b/keyserver/storage/interface.go
@@ -85,4 +85,9 @@ type Database interface {
StoreCrossSigningKeysForUser(ctx context.Context, userID string, keyMap types.CrossSigningKeyMap) error
StoreCrossSigningSigsForTarget(ctx context.Context, originUserID string, originKeyID gomatrixserverlib.KeyID, targetUserID string, targetKeyID gomatrixserverlib.KeyID, signature gomatrixserverlib.Base64Bytes) error
+
+ DeleteStaleDeviceLists(
+ ctx context.Context,
+ userIDs []string,
+ ) error
}
diff --git a/keyserver/storage/postgres/stale_device_lists.go b/keyserver/storage/postgres/stale_device_lists.go
index d0fe50d0..248ddfb4 100644
--- a/keyserver/storage/postgres/stale_device_lists.go
+++ b/keyserver/storage/postgres/stale_device_lists.go
@@ -19,6 +19,10 @@ import (
"database/sql"
"time"
+ "github.com/lib/pq"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
@@ -48,10 +52,14 @@ const selectStaleDeviceListsWithDomainsSQL = "" +
const selectStaleDeviceListsSQL = "" +
"SELECT user_id FROM keyserver_stale_device_lists WHERE is_stale = $1 ORDER BY ts_added_secs DESC"
+const deleteStaleDevicesSQL = "" +
+ "DELETE FROM keyserver_stale_device_lists WHERE user_id = ANY($1)"
+
type staleDeviceListsStatements struct {
upsertStaleDeviceListStmt *sql.Stmt
selectStaleDeviceListsWithDomainsStmt *sql.Stmt
selectStaleDeviceListsStmt *sql.Stmt
+ deleteStaleDeviceListsStmt *sql.Stmt
}
func NewPostgresStaleDeviceListsTable(db *sql.DB) (tables.StaleDeviceLists, error) {
@@ -60,16 +68,12 @@ func NewPostgresStaleDeviceListsTable(db *sql.DB) (tables.StaleDeviceLists, erro
if err != nil {
return nil, err
}
- if s.upsertStaleDeviceListStmt, err = db.Prepare(upsertStaleDeviceListSQL); err != nil {
- return nil, err
- }
- if s.selectStaleDeviceListsStmt, err = db.Prepare(selectStaleDeviceListsSQL); err != nil {
- return nil, err
- }
- if s.selectStaleDeviceListsWithDomainsStmt, err = db.Prepare(selectStaleDeviceListsWithDomainsSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.upsertStaleDeviceListStmt, upsertStaleDeviceListSQL},
+ {&s.selectStaleDeviceListsStmt, selectStaleDeviceListsSQL},
+ {&s.selectStaleDeviceListsWithDomainsStmt, selectStaleDeviceListsWithDomainsSQL},
+ {&s.deleteStaleDeviceListsStmt, deleteStaleDevicesSQL},
+ }.Prepare(db)
}
func (s *staleDeviceListsStatements) InsertStaleDeviceList(ctx context.Context, userID string, isStale bool) error {
@@ -105,6 +109,15 @@ func (s *staleDeviceListsStatements) SelectUserIDsWithStaleDeviceLists(ctx conte
return result, nil
}
+// DeleteStaleDeviceLists removes users from stale device lists
+func (s *staleDeviceListsStatements) DeleteStaleDeviceLists(
+ ctx context.Context, txn *sql.Tx, userIDs []string,
+) error {
+ stmt := sqlutil.TxStmt(txn, s.deleteStaleDeviceListsStmt)
+ _, err := stmt.ExecContext(ctx, pq.Array(userIDs))
+ return err
+}
+
func rowsToUserIDs(ctx context.Context, rows *sql.Rows) (result []string, err error) {
defer internal.CloseAndLogIfError(ctx, rows, "closing rowsToUserIDs failed")
for rows.Next() {
diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go
index 5beeed0f..54dd6ddc 100644
--- a/keyserver/storage/shared/storage.go
+++ b/keyserver/storage/shared/storage.go
@@ -249,3 +249,13 @@ func (d *Database) StoreCrossSigningSigsForTarget(
return nil
})
}
+
+// DeleteStaleDeviceLists deletes stale device list entries for users we don't share a room with anymore.
+func (d *Database) DeleteStaleDeviceLists(
+ ctx context.Context,
+ userIDs []string,
+) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.StaleDeviceListsTable.DeleteStaleDeviceLists(ctx, txn, userIDs)
+ })
+}
diff --git a/keyserver/storage/sqlite3/stale_device_lists.go b/keyserver/storage/sqlite3/stale_device_lists.go
index 1e08b266..fd76a6e3 100644
--- a/keyserver/storage/sqlite3/stale_device_lists.go
+++ b/keyserver/storage/sqlite3/stale_device_lists.go
@@ -17,8 +17,11 @@ package sqlite3
import (
"context"
"database/sql"
+ "strings"
"time"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
"github.com/matrix-org/gomatrixserverlib"
@@ -48,11 +51,15 @@ const selectStaleDeviceListsWithDomainsSQL = "" +
const selectStaleDeviceListsSQL = "" +
"SELECT user_id FROM keyserver_stale_device_lists WHERE is_stale = $1 ORDER BY ts_added_secs DESC"
+const deleteStaleDevicesSQL = "" +
+ "DELETE FROM keyserver_stale_device_lists WHERE user_id IN ($1)"
+
type staleDeviceListsStatements struct {
db *sql.DB
upsertStaleDeviceListStmt *sql.Stmt
selectStaleDeviceListsWithDomainsStmt *sql.Stmt
selectStaleDeviceListsStmt *sql.Stmt
+ // deleteStaleDeviceListsStmt *sql.Stmt // Prepared at runtime
}
func NewSqliteStaleDeviceListsTable(db *sql.DB) (tables.StaleDeviceLists, error) {
@@ -63,16 +70,12 @@ func NewSqliteStaleDeviceListsTable(db *sql.DB) (tables.StaleDeviceLists, error)
if err != nil {
return nil, err
}
- if s.upsertStaleDeviceListStmt, err = db.Prepare(upsertStaleDeviceListSQL); err != nil {
- return nil, err
- }
- if s.selectStaleDeviceListsStmt, err = db.Prepare(selectStaleDeviceListsSQL); err != nil {
- return nil, err
- }
- if s.selectStaleDeviceListsWithDomainsStmt, err = db.Prepare(selectStaleDeviceListsWithDomainsSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.upsertStaleDeviceListStmt, upsertStaleDeviceListSQL},
+ {&s.selectStaleDeviceListsStmt, selectStaleDeviceListsSQL},
+ {&s.selectStaleDeviceListsWithDomainsStmt, selectStaleDeviceListsWithDomainsSQL},
+ // { &s.deleteStaleDeviceListsStmt, deleteStaleDevicesSQL}, // Prepared at runtime
+ }.Prepare(db)
}
func (s *staleDeviceListsStatements) InsertStaleDeviceList(ctx context.Context, userID string, isStale bool) error {
@@ -108,6 +111,27 @@ func (s *staleDeviceListsStatements) SelectUserIDsWithStaleDeviceLists(ctx conte
return result, nil
}
+// DeleteStaleDeviceLists removes users from stale device lists
+func (s *staleDeviceListsStatements) DeleteStaleDeviceLists(
+ ctx context.Context, txn *sql.Tx, userIDs []string,
+) error {
+ qry := strings.Replace(deleteStaleDevicesSQL, "($1)", sqlutil.QueryVariadic(len(userIDs)), 1)
+ stmt, err := s.db.Prepare(qry)
+ if err != nil {
+ return err
+ }
+ defer internal.CloseAndLogIfError(ctx, stmt, "DeleteStaleDeviceLists: stmt.Close failed")
+ stmt = sqlutil.TxStmt(txn, stmt)
+
+ params := make([]any, len(userIDs))
+ for i := range userIDs {
+ params[i] = userIDs[i]
+ }
+
+ _, err = stmt.ExecContext(ctx, params...)
+ return err
+}
+
func rowsToUserIDs(ctx context.Context, rows *sql.Rows) (result []string, err error) {
defer internal.CloseAndLogIfError(ctx, rows, "closing rowsToUserIDs failed")
for rows.Next() {
diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go
index 37a010a7..24da1125 100644
--- a/keyserver/storage/tables/interface.go
+++ b/keyserver/storage/tables/interface.go
@@ -56,6 +56,7 @@ type KeyChanges interface {
type StaleDeviceLists interface {
InsertStaleDeviceList(ctx context.Context, userID string, isStale bool) error
SelectUserIDsWithStaleDeviceLists(ctx context.Context, domains []gomatrixserverlib.ServerName) ([]string, error)
+ DeleteStaleDeviceLists(ctx context.Context, txn *sql.Tx, userIDs []string) error
}
type CrossSigningKeys interface {
diff --git a/keyserver/storage/tables/stale_device_lists_test.go b/keyserver/storage/tables/stale_device_lists_test.go
new file mode 100644
index 00000000..76d3badd
--- /dev/null
+++ b/keyserver/storage/tables/stale_device_lists_test.go
@@ -0,0 +1,94 @@
+package tables_test
+
+import (
+ "context"
+ "testing"
+
+ "github.com/matrix-org/gomatrixserverlib"
+
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/keyserver/storage/sqlite3"
+ "github.com/matrix-org/dendrite/setup/config"
+
+ "github.com/matrix-org/dendrite/keyserver/storage/postgres"
+ "github.com/matrix-org/dendrite/keyserver/storage/tables"
+ "github.com/matrix-org/dendrite/test"
+)
+
+func mustCreateTable(t *testing.T, dbType test.DBType) (tab tables.StaleDeviceLists, close func()) {
+ connStr, close := test.PrepareDBConnectionString(t, dbType)
+ db, err := sqlutil.Open(&config.DatabaseOptions{
+ ConnectionString: config.DataSource(connStr),
+ }, nil)
+ if err != nil {
+ t.Fatalf("failed to open database: %s", err)
+ }
+ switch dbType {
+ case test.DBTypePostgres:
+ tab, err = postgres.NewPostgresStaleDeviceListsTable(db)
+ case test.DBTypeSQLite:
+ tab, err = sqlite3.NewSqliteStaleDeviceListsTable(db)
+ }
+ if err != nil {
+ t.Fatalf("failed to create new table: %s", err)
+ }
+ return tab, close
+}
+
+func TestStaleDeviceLists(t *testing.T) {
+ alice := test.NewUser(t)
+ bob := test.NewUser(t)
+ charlie := "@charlie:localhost"
+ ctx := context.Background()
+
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ tab, closeDB := mustCreateTable(t, dbType)
+ defer closeDB()
+
+ if err := tab.InsertStaleDeviceList(ctx, alice.ID, true); err != nil {
+ t.Fatalf("failed to insert stale device: %s", err)
+ }
+ if err := tab.InsertStaleDeviceList(ctx, bob.ID, true); err != nil {
+ t.Fatalf("failed to insert stale device: %s", err)
+ }
+ if err := tab.InsertStaleDeviceList(ctx, charlie, true); err != nil {
+ t.Fatalf("failed to insert stale device: %s", err)
+ }
+
+ // Query one server
+ wantStaleUsers := []string{alice.ID, bob.ID}
+ gotStaleUsers, err := tab.SelectUserIDsWithStaleDeviceLists(ctx, []gomatrixserverlib.ServerName{"test"})
+ if err != nil {
+ t.Fatalf("failed to query stale device lists: %s", err)
+ }
+ if !test.UnsortedStringSliceEqual(wantStaleUsers, gotStaleUsers) {
+ t.Fatalf("expected stale users %v, got %v", wantStaleUsers, gotStaleUsers)
+ }
+
+ // Query all servers
+ wantStaleUsers = []string{alice.ID, bob.ID, charlie}
+ gotStaleUsers, err = tab.SelectUserIDsWithStaleDeviceLists(ctx, []gomatrixserverlib.ServerName{})
+ if err != nil {
+ t.Fatalf("failed to query stale device lists: %s", err)
+ }
+ if !test.UnsortedStringSliceEqual(wantStaleUsers, gotStaleUsers) {
+ t.Fatalf("expected stale users %v, got %v", wantStaleUsers, gotStaleUsers)
+ }
+
+ // Delete stale devices
+ deleteUsers := []string{alice.ID, bob.ID}
+ if err = tab.DeleteStaleDeviceLists(ctx, nil, deleteUsers); err != nil {
+ t.Fatalf("failed to delete stale device lists: %s", err)
+ }
+
+ // Verify we don't get anything back after deleting
+ gotStaleUsers, err = tab.SelectUserIDsWithStaleDeviceLists(ctx, []gomatrixserverlib.ServerName{"test"})
+ if err != nil {
+ t.Fatalf("failed to query stale device lists: %s", err)
+ }
+
+ if gotCount := len(gotStaleUsers); gotCount > 0 {
+ t.Fatalf("expected no stale users, got %d", gotCount)
+ }
+ })
+}