aboutsummaryrefslogtreecommitdiff
path: root/keyserver/storage/postgres/key_changes_table.go
diff options
context:
space:
mode:
Diffstat (limited to 'keyserver/storage/postgres/key_changes_table.go')
-rw-r--r--keyserver/storage/postgres/key_changes_table.go51
1 files changed, 25 insertions, 26 deletions
diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go
index df4b47e7..20d227c2 100644
--- a/keyserver/storage/postgres/key_changes_table.go
+++ b/keyserver/storage/postgres/key_changes_table.go
@@ -26,27 +26,25 @@ import (
var keyChangesSchema = `
-- Stores key change information about users. Used to determine when to send updated device lists to clients.
+CREATE SEQUENCE IF NOT EXISTS keyserver_key_changes_seq;
CREATE TABLE IF NOT EXISTS keyserver_key_changes (
- partition BIGINT NOT NULL,
- log_offset BIGINT NOT NULL,
+ change_id BIGINT PRIMARY KEY DEFAULT nextval('keyserver_key_changes_seq'),
user_id TEXT NOT NULL,
- CONSTRAINT keyserver_key_changes_unique UNIQUE (partition, log_offset)
+ CONSTRAINT keyserver_key_changes_unique_per_user UNIQUE (user_id)
);
`
-// Replace based on partition|offset - we should never insert duplicates unless the kafka logs are wiped.
-// Rather than falling over, just overwrite (though this will mean clients with an existing sync token will
-// miss out on updates). TODO: Ideally we would detect when kafka logs are purged then purge this table too.
+// Replace based on user ID. We don't care how many times the user's keys have changed, only that they
+// have changed, hence we can just keep bumping the change ID for this user.
const upsertKeyChangeSQL = "" +
- "INSERT INTO keyserver_key_changes (partition, log_offset, user_id)" +
- " VALUES ($1, $2, $3)" +
- " ON CONFLICT ON CONSTRAINT keyserver_key_changes_unique" +
- " DO UPDATE SET user_id = $3"
+ "INSERT INTO keyserver_key_changes (user_id)" +
+ " VALUES ($1)" +
+ " ON CONFLICT ON CONSTRAINT keyserver_key_changes_unique_per_user" +
+ " DO UPDATE SET change_id = nextval('keyserver_key_changes_seq')" +
+ " RETURNING change_id"
-// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just
-// take the max offset value as the latest offset.
const selectKeyChangesSQL = "" +
- "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 AND log_offset <= $3 GROUP BY user_id"
+ "SELECT user_id, change_id FROM keyserver_key_changes WHERE change_id > $1 AND change_id <= $2"
type keyChangesStatements struct {
db *sql.DB
@@ -59,31 +57,32 @@ func NewPostgresKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) {
db: db,
}
_, err := db.Exec(keyChangesSchema)
- if err != nil {
- return nil, err
- }
- if s.upsertKeyChangeStmt, err = db.Prepare(upsertKeyChangeSQL); err != nil {
- return nil, err
+ return s, err
+}
+
+func (s *keyChangesStatements) Prepare() (err error) {
+ if s.upsertKeyChangeStmt, err = s.db.Prepare(upsertKeyChangeSQL); err != nil {
+ return err
}
- if s.selectKeyChangesStmt, err = db.Prepare(selectKeyChangesSQL); err != nil {
- return nil, err
+ if s.selectKeyChangesStmt, err = s.db.Prepare(selectKeyChangesSQL); err != nil {
+ return err
}
- return s, nil
+ return nil
}
-func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error {
- _, err := s.upsertKeyChangeStmt.ExecContext(ctx, partition, offset, userID)
- return err
+func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID string) (changeID int64, err error) {
+ err = s.upsertKeyChangeStmt.QueryRowContext(ctx, userID).Scan(&changeID)
+ return
}
func (s *keyChangesStatements) SelectKeyChanges(
- ctx context.Context, partition int32, fromOffset, toOffset int64,
+ ctx context.Context, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
if toOffset == sarama.OffsetNewest {
toOffset = math.MaxInt64
}
latestOffset = fromOffset
- rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset)
+ rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset)
if err != nil {
return nil, 0, err
}