diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-04-26 13:25:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-26 13:25:57 +0100 |
commit | 5ce1fe80dea8b8cfca8712e8d584deb995bbddcc (patch) | |
tree | 1307a1edf73abf68cebd4601efec1e467dac964c /roomserver/storage/postgres/events_table.go | |
parent | d6e9b7b307ff0d7541046ec33890d49239c7a6ca (diff) |
State storage refactor (#1839)
* Hash-deduplicated state storage (and migrations) for PostgreSQL and SQLite
* Refactor droomserver database setup for migrations
* Fix conflict statements
* Update migration names
* Set a boundary for old to new block/snapshot IDs so we don't rewrite them more than once accidentally
* Create sequence if not exists
* Fix boundary queries
* Fix boundary queries
* Use Query
* Break out queries a bit
* More sequence tweaks
* Query parameters are not playing the game
* Injection escaping may not work for CREATE SEQUENCE after all
* Fix snapshot sequence name
* Use boundaried IDs in SQLite too
* Use IFNULL for SQLite
* Use COALESCE in PostgreSQL
* Review comments @Kegsay
Diffstat (limited to 'roomserver/storage/postgres/events_table.go')
-rw-r--r-- | roomserver/storage/postgres/events_table.go | 60 |
1 files changed, 55 insertions, 5 deletions
diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index 0cf0bd22..88c82083 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -19,6 +19,7 @@ import ( "context" "database/sql" "fmt" + "sort" "github.com/lib/pq" "github.com/matrix-org/dendrite/internal" @@ -88,6 +89,16 @@ const bulkSelectStateEventByIDSQL = "" + " WHERE event_id = ANY($1)" + " ORDER BY event_type_nid, event_state_key_nid ASC" +// Bulk look up of events by event NID, optionally filtering by the event type +// or event state key NIDs if provided. (The CARDINALITY check will return true +// if the provided arrays are empty, ergo no filtering). +const bulkSelectStateEventByNIDSQL = "" + + "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" + + " WHERE event_nid = ANY($1)" + + " AND (CARDINALITY($2::bigint[]) = 0 OR event_type_nid = ANY($2))" + + " AND (CARDINALITY($3::bigint[]) = 0 OR event_state_key_nid = ANY($3))" + + " ORDER BY event_type_nid, event_state_key_nid ASC" + const bulkSelectStateAtEventByIDSQL = "" + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" + " WHERE event_id = ANY($1)" @@ -127,6 +138,7 @@ type eventStatements struct { insertEventStmt *sql.Stmt selectEventStmt *sql.Stmt bulkSelectStateEventByIDStmt *sql.Stmt + bulkSelectStateEventByNIDStmt *sql.Stmt bulkSelectStateAtEventByIDStmt *sql.Stmt updateEventStateStmt *sql.Stmt selectEventSentToOutputStmt *sql.Stmt @@ -140,17 +152,19 @@ type eventStatements struct { selectRoomNIDsForEventNIDsStmt *sql.Stmt } -func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { - s := &eventStatements{} +func createEventsTable(db *sql.DB) error { _, err := db.Exec(eventsSchema) - if err != nil { - return nil, err - } + return err +} + +func prepareEventsTable(db *sql.DB) (tables.Events, error) { + s := &eventStatements{} return s, shared.StatementList{ {&s.insertEventStmt, insertEventSQL}, {&s.selectEventStmt, selectEventSQL}, {&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL}, + {&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL}, {&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL}, {&s.updateEventStateStmt, updateEventStateSQL}, {&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL}, @@ -238,6 +252,42 @@ func (s *eventStatements) BulkSelectStateEventByID( return results, nil } +// bulkSelectStateEventByNID lookups a list of state events by event NID. +// If any of the requested events are missing from the database it returns a types.MissingEventError +func (s *eventStatements) BulkSelectStateEventByNID( + ctx context.Context, eventNIDs []types.EventNID, + stateKeyTuples []types.StateKeyTuple, +) ([]types.StateEntry, error) { + tuples := stateKeyTupleSorter(stateKeyTuples) + sort.Sort(tuples) + eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays() + rows, err := s.bulkSelectStateEventByNIDStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs), eventTypeNIDArray, eventStateKeyNIDArray) + if err != nil { + return nil, err + } + defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed") + // We know that we will only get as many results as event IDs + // because of the unique constraint on event IDs. + // So we can allocate an array of the correct size now. + // We might get fewer results than IDs so we adjust the length of the slice before returning it. + results := make([]types.StateEntry, len(eventNIDs)) + i := 0 + for ; rows.Next(); i++ { + result := &results[i] + if err = rows.Scan( + &result.EventTypeNID, + &result.EventStateKeyNID, + &result.EventNID, + ); err != nil { + return nil, err + } + } + if err = rows.Err(); err != nil { + return nil, err + } + return results[:i], nil +} + // bulkSelectStateAtEventByID lookups the state at a list of events by event ID. // If any of the requested events are missing from the database it returns a types.MissingEventError. // If we do not have the state for any of the requested events it returns a types.MissingEventError. |