diff options
author | Kegsay <kegan@matrix.org> | 2020-07-30 14:52:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-30 14:52:21 +0100 |
commit | a2174d3294841dbdf201bde76de3ffc44399fcbc (patch) | |
tree | 64f5c0d7dee038960941a937dd84631d53c1e9a9 /keyserver/storage | |
parent | 9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b (diff) |
Implement /keys/changes (#1232)
* Implement /keys/changes
And refactor QueryKeyChanges to accept a `to` offset.
* Unbreak tests
* Sort keys when serialising log tokens
Diffstat (limited to 'keyserver/storage')
-rw-r--r-- | keyserver/storage/interface.go | 5 | ||||
-rw-r--r-- | keyserver/storage/postgres/key_changes_table.go | 11 | ||||
-rw-r--r-- | keyserver/storage/shared/storage.go | 4 | ||||
-rw-r--r-- | keyserver/storage/sqlite3/key_changes_table.go | 11 | ||||
-rw-r--r-- | keyserver/storage/storage_test.go | 26 | ||||
-rw-r--r-- | keyserver/storage/tables/interface.go | 4 |
6 files changed, 48 insertions, 13 deletions
diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go index f4787790..7a4fce6f 100644 --- a/keyserver/storage/interface.go +++ b/keyserver/storage/interface.go @@ -48,7 +48,8 @@ type Database interface { // their keys in some way. StoreKeyChange(ctx context.Context, partition int32, offset int64, userID string) error - // KeyChanges returns a list of user IDs who have modified their keys from the offset given. + // KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive). + // A to offset of sarama.OffsetNewest means no upper limit. // Returns the offset of the latest key change. - KeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error) + KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) } diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go index 9d259f9f..d7f0991a 100644 --- a/keyserver/storage/postgres/key_changes_table.go +++ b/keyserver/storage/postgres/key_changes_table.go @@ -17,7 +17,9 @@ package postgres import ( "context" "database/sql" + "math" + "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/storage/tables" ) @@ -44,7 +46,7 @@ const upsertKeyChangeSQL = "" + // select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just // take the max offset value as the latest offset. const selectKeyChangesSQL = "" + - "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 GROUP BY user_id" + "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 AND log_offset <= $3 GROUP BY user_id" type keyChangesStatements struct { db *sql.DB @@ -75,9 +77,12 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in } func (s *keyChangesStatements) SelectKeyChanges( - ctx context.Context, partition int32, fromOffset int64, + ctx context.Context, partition int32, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { - rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset) + if toOffset == sarama.OffsetNewest { + toOffset = math.MaxInt64 + } + rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset) if err != nil { return nil, 0, err } diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go index 537a5f7b..8c2534f5 100644 --- a/keyserver/storage/shared/storage.go +++ b/keyserver/storage/shared/storage.go @@ -78,6 +78,6 @@ func (d *Database) StoreKeyChange(ctx context.Context, partition int32, offset i return d.KeyChangesTable.InsertKeyChange(ctx, partition, offset, userID) } -func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error) { - return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset) +func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) { + return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset, toOffset) } diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go index b830214d..32721eae 100644 --- a/keyserver/storage/sqlite3/key_changes_table.go +++ b/keyserver/storage/sqlite3/key_changes_table.go @@ -17,7 +17,9 @@ package sqlite3 import ( "context" "database/sql" + "math" + "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/storage/tables" ) @@ -45,7 +47,7 @@ const upsertKeyChangeSQL = "" + // select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just // take the max offset value as the latest offset. const selectKeyChangesSQL = "" + - "SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 GROUP BY user_id" + "SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 AND offset <= $3 GROUP BY user_id" type keyChangesStatements struct { db *sql.DB @@ -76,9 +78,12 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in } func (s *keyChangesStatements) SelectKeyChanges( - ctx context.Context, partition int32, fromOffset int64, + ctx context.Context, partition int32, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { - rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset) + if toOffset == sarama.OffsetNewest { + toOffset = math.MaxInt64 + } + rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset) if err != nil { return nil, 0, err } diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go index 88972478..66f6930f 100644 --- a/keyserver/storage/storage_test.go +++ b/keyserver/storage/storage_test.go @@ -4,6 +4,8 @@ import ( "context" "reflect" "testing" + + "github.com/Shopify/sarama" ) var ctx = context.Background() @@ -24,7 +26,7 @@ func TestKeyChanges(t *testing.T) { MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost")) MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost")) MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost")) - userIDs, latest, err := db.KeyChanges(ctx, 0, 1) + userIDs, latest, err := db.KeyChanges(ctx, 0, 1, sarama.OffsetNewest) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } @@ -44,7 +46,7 @@ func TestKeyChangesNoDupes(t *testing.T) { MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost")) MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@alice:localhost")) MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@alice:localhost")) - userIDs, latest, err := db.KeyChanges(ctx, 0, 0) + userIDs, latest, err := db.KeyChanges(ctx, 0, 0, sarama.OffsetNewest) if err != nil { t.Fatalf("Failed to KeyChanges: %s", err) } @@ -55,3 +57,23 @@ func TestKeyChangesNoDupes(t *testing.T) { t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) } } + +func TestKeyChangesUpperLimit(t *testing.T) { + db, err := NewDatabase("file::memory:", nil) + if err != nil { + t.Fatalf("Failed to NewDatabase: %s", err) + } + MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost")) + MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost")) + MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost")) + userIDs, latest, err := db.KeyChanges(ctx, 0, 0, 1) + if err != nil { + t.Fatalf("Failed to KeyChanges: %s", err) + } + if latest != 1 { + t.Fatalf("KeyChanges: got latest=%d want 1", latest) + } + if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) { + t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs) + } +} diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go index 824b9f0f..8b89283f 100644 --- a/keyserver/storage/tables/interface.go +++ b/keyserver/storage/tables/interface.go @@ -38,5 +38,7 @@ type DeviceKeys interface { type KeyChanges interface { InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error - SelectKeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error) + // SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets. + // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset. + SelectKeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) } |