aboutsummaryrefslogtreecommitdiff
path: root/keyserver/storage
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-30 14:52:21 +0100
committerGitHub <noreply@github.com>2020-07-30 14:52:21 +0100
commita2174d3294841dbdf201bde76de3ffc44399fcbc (patch)
tree64f5c0d7dee038960941a937dd84631d53c1e9a9 /keyserver/storage
parent9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b (diff)
Implement /keys/changes (#1232)
* Implement /keys/changes And refactor QueryKeyChanges to accept a `to` offset. * Unbreak tests * Sort keys when serialising log tokens
Diffstat (limited to 'keyserver/storage')
-rw-r--r--keyserver/storage/interface.go5
-rw-r--r--keyserver/storage/postgres/key_changes_table.go11
-rw-r--r--keyserver/storage/shared/storage.go4
-rw-r--r--keyserver/storage/sqlite3/key_changes_table.go11
-rw-r--r--keyserver/storage/storage_test.go26
-rw-r--r--keyserver/storage/tables/interface.go4
6 files changed, 48 insertions, 13 deletions
diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go
index f4787790..7a4fce6f 100644
--- a/keyserver/storage/interface.go
+++ b/keyserver/storage/interface.go
@@ -48,7 +48,8 @@ type Database interface {
// their keys in some way.
StoreKeyChange(ctx context.Context, partition int32, offset int64, userID string) error
- // KeyChanges returns a list of user IDs who have modified their keys from the offset given.
+ // KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive).
+ // A to offset of sarama.OffsetNewest means no upper limit.
// Returns the offset of the latest key change.
- KeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error)
+ KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
}
diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go
index 9d259f9f..d7f0991a 100644
--- a/keyserver/storage/postgres/key_changes_table.go
+++ b/keyserver/storage/postgres/key_changes_table.go
@@ -17,7 +17,9 @@ package postgres
import (
"context"
"database/sql"
+ "math"
+ "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@@ -44,7 +46,7 @@ const upsertKeyChangeSQL = "" +
// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just
// take the max offset value as the latest offset.
const selectKeyChangesSQL = "" +
- "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 GROUP BY user_id"
+ "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 AND log_offset <= $3 GROUP BY user_id"
type keyChangesStatements struct {
db *sql.DB
@@ -75,9 +77,12 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in
}
func (s *keyChangesStatements) SelectKeyChanges(
- ctx context.Context, partition int32, fromOffset int64,
+ ctx context.Context, partition int32, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
- rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset)
+ if toOffset == sarama.OffsetNewest {
+ toOffset = math.MaxInt64
+ }
+ rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset)
if err != nil {
return nil, 0, err
}
diff --git a/keyserver/storage/shared/storage.go b/keyserver/storage/shared/storage.go
index 537a5f7b..8c2534f5 100644
--- a/keyserver/storage/shared/storage.go
+++ b/keyserver/storage/shared/storage.go
@@ -78,6 +78,6 @@ func (d *Database) StoreKeyChange(ctx context.Context, partition int32, offset i
return d.KeyChangesTable.InsertKeyChange(ctx, partition, offset, userID)
}
-func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error) {
- return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset)
+func (d *Database) KeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error) {
+ return d.KeyChangesTable.SelectKeyChanges(ctx, partition, fromOffset, toOffset)
}
diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go
index b830214d..32721eae 100644
--- a/keyserver/storage/sqlite3/key_changes_table.go
+++ b/keyserver/storage/sqlite3/key_changes_table.go
@@ -17,7 +17,9 @@ package sqlite3
import (
"context"
"database/sql"
+ "math"
+ "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@@ -45,7 +47,7 @@ const upsertKeyChangeSQL = "" +
// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just
// take the max offset value as the latest offset.
const selectKeyChangesSQL = "" +
- "SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 GROUP BY user_id"
+ "SELECT user_id, MAX(offset) FROM keyserver_key_changes WHERE partition = $1 AND offset > $2 AND offset <= $3 GROUP BY user_id"
type keyChangesStatements struct {
db *sql.DB
@@ -76,9 +78,12 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition in
}
func (s *keyChangesStatements) SelectKeyChanges(
- ctx context.Context, partition int32, fromOffset int64,
+ ctx context.Context, partition int32, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
- rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset)
+ if toOffset == sarama.OffsetNewest {
+ toOffset = math.MaxInt64
+ }
+ rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset)
if err != nil {
return nil, 0, err
}
diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go
index 88972478..66f6930f 100644
--- a/keyserver/storage/storage_test.go
+++ b/keyserver/storage/storage_test.go
@@ -4,6 +4,8 @@ import (
"context"
"reflect"
"testing"
+
+ "github.com/Shopify/sarama"
)
var ctx = context.Background()
@@ -24,7 +26,7 @@ func TestKeyChanges(t *testing.T) {
MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost"))
- userIDs, latest, err := db.KeyChanges(ctx, 0, 1)
+ userIDs, latest, err := db.KeyChanges(ctx, 0, 1, sarama.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@@ -44,7 +46,7 @@ func TestKeyChangesNoDupes(t *testing.T) {
MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@alice:localhost"))
MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@alice:localhost"))
- userIDs, latest, err := db.KeyChanges(ctx, 0, 0)
+ userIDs, latest, err := db.KeyChanges(ctx, 0, 0, sarama.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@@ -55,3 +57,23 @@ func TestKeyChangesNoDupes(t *testing.T) {
t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
}
}
+
+func TestKeyChangesUpperLimit(t *testing.T) {
+ db, err := NewDatabase("file::memory:", nil)
+ if err != nil {
+ t.Fatalf("Failed to NewDatabase: %s", err)
+ }
+ MustNotError(t, db.StoreKeyChange(ctx, 0, 0, "@alice:localhost"))
+ MustNotError(t, db.StoreKeyChange(ctx, 0, 1, "@bob:localhost"))
+ MustNotError(t, db.StoreKeyChange(ctx, 0, 2, "@charlie:localhost"))
+ userIDs, latest, err := db.KeyChanges(ctx, 0, 0, 1)
+ if err != nil {
+ t.Fatalf("Failed to KeyChanges: %s", err)
+ }
+ if latest != 1 {
+ t.Fatalf("KeyChanges: got latest=%d want 1", latest)
+ }
+ if !reflect.DeepEqual(userIDs, []string{"@bob:localhost"}) {
+ t.Fatalf("KeyChanges: wrong user_ids: %v", userIDs)
+ }
+}
diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go
index 824b9f0f..8b89283f 100644
--- a/keyserver/storage/tables/interface.go
+++ b/keyserver/storage/tables/interface.go
@@ -38,5 +38,7 @@ type DeviceKeys interface {
type KeyChanges interface {
InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error
- SelectKeyChanges(ctx context.Context, partition int32, fromOffset int64) (userIDs []string, latestOffset int64, err error)
+ // SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets.
+ // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset.
+ SelectKeyChanges(ctx context.Context, partition int32, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
}