diff options
author | kegsay <kegan@matrix.org> | 2022-01-21 09:56:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-21 09:56:06 +0000 |
commit | 2c581377a5f6b243300445947fd0fd38a919873d (patch) | |
tree | 4aa9948333bbd5f64c1cf0af8294789cfae7a2e9 /keyserver/storage/postgres/key_changes_table.go | |
parent | db7d9cba8ad28c300dd66c01b9b0ce2414e8cbe0 (diff) |
Remodel how device list change IDs are created (#2098)
* Remodel how device list change IDs are created
Previously we made them using the offset Kafka supplied.
We don't run Kafka anymore, so now we make the SQL table assign
the change ID via an AUTOINCREMENTing ID. Redesign the
`keyserver_key_changes` table to have `UNIQUE(user_id)` so we
don't accumulate key changes forevermore, we now have at most 1
row per user which contains the highest change ID.
This needs a SQL migration.
* Ensure we bump the change ID on sqlite
* Actually read the DeviceChangeID not the Offset in synapi
* Add SQL migrations
* Prepare after migration; fixup dendrite-upgrade-test logging
* Use higher version numbers; fix sqlite query to increment better
* Default 0 on postgres
* fixup postgres migration on fresh dendrite instances
Diffstat (limited to 'keyserver/storage/postgres/key_changes_table.go')
-rw-r--r-- | keyserver/storage/postgres/key_changes_table.go | 51 |
1 files changed, 25 insertions, 26 deletions
diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go index df4b47e7..20d227c2 100644 --- a/keyserver/storage/postgres/key_changes_table.go +++ b/keyserver/storage/postgres/key_changes_table.go @@ -26,27 +26,25 @@ import ( var keyChangesSchema = ` -- Stores key change information about users. Used to determine when to send updated device lists to clients. +CREATE SEQUENCE IF NOT EXISTS keyserver_key_changes_seq; CREATE TABLE IF NOT EXISTS keyserver_key_changes ( - partition BIGINT NOT NULL, - log_offset BIGINT NOT NULL, + change_id BIGINT PRIMARY KEY DEFAULT nextval('keyserver_key_changes_seq'), user_id TEXT NOT NULL, - CONSTRAINT keyserver_key_changes_unique UNIQUE (partition, log_offset) + CONSTRAINT keyserver_key_changes_unique_per_user UNIQUE (user_id) ); ` -// Replace based on partition|offset - we should never insert duplicates unless the kafka logs are wiped. -// Rather than falling over, just overwrite (though this will mean clients with an existing sync token will -// miss out on updates). TODO: Ideally we would detect when kafka logs are purged then purge this table too. +// Replace based on user ID. We don't care how many times the user's keys have changed, only that they +// have changed, hence we can just keep bumping the change ID for this user. const upsertKeyChangeSQL = "" + - "INSERT INTO keyserver_key_changes (partition, log_offset, user_id)" + - " VALUES ($1, $2, $3)" + - " ON CONFLICT ON CONSTRAINT keyserver_key_changes_unique" + - " DO UPDATE SET user_id = $3" + "INSERT INTO keyserver_key_changes (user_id)" + + " VALUES ($1)" + + " ON CONFLICT ON CONSTRAINT keyserver_key_changes_unique_per_user" + + " DO UPDATE SET change_id = nextval('keyserver_key_changes_seq')" + + " RETURNING change_id" -// select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just -// take the max offset value as the latest offset. const selectKeyChangesSQL = "" + - "SELECT user_id, MAX(log_offset) FROM keyserver_key_changes WHERE partition = $1 AND log_offset > $2 AND log_offset <= $3 GROUP BY user_id" + "SELECT user_id, change_id FROM keyserver_key_changes WHERE change_id > $1 AND change_id <= $2" type keyChangesStatements struct { db *sql.DB @@ -59,31 +57,32 @@ func NewPostgresKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) { db: db, } _, err := db.Exec(keyChangesSchema) - if err != nil { - return nil, err - } - if s.upsertKeyChangeStmt, err = db.Prepare(upsertKeyChangeSQL); err != nil { - return nil, err + return s, err +} + +func (s *keyChangesStatements) Prepare() (err error) { + if s.upsertKeyChangeStmt, err = s.db.Prepare(upsertKeyChangeSQL); err != nil { + return err } - if s.selectKeyChangesStmt, err = db.Prepare(selectKeyChangesSQL); err != nil { - return nil, err + if s.selectKeyChangesStmt, err = s.db.Prepare(selectKeyChangesSQL); err != nil { + return err } - return s, nil + return nil } -func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, partition int32, offset int64, userID string) error { - _, err := s.upsertKeyChangeStmt.ExecContext(ctx, partition, offset, userID) - return err +func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID string) (changeID int64, err error) { + err = s.upsertKeyChangeStmt.QueryRowContext(ctx, userID).Scan(&changeID) + return } func (s *keyChangesStatements) SelectKeyChanges( - ctx context.Context, partition int32, fromOffset, toOffset int64, + ctx context.Context, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { if toOffset == sarama.OffsetNewest { toOffset = math.MaxInt64 } latestOffset = fromOffset - rows, err := s.selectKeyChangesStmt.QueryContext(ctx, partition, fromOffset, toOffset) + rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset) if err != nil { return nil, 0, err } |