diff options
Diffstat (limited to 'roomserver/storage/postgres/state_block_table.go')
-rw-r--r-- | roomserver/storage/postgres/state_block_table.go | 214 |
1 files changed, 48 insertions, 166 deletions
diff --git a/roomserver/storage/postgres/state_block_table.go b/roomserver/storage/postgres/state_block_table.go index d618686f..4523d18b 100644 --- a/roomserver/storage/postgres/state_block_table.go +++ b/roomserver/storage/postgres/state_block_table.go @@ -41,141 +41,88 @@ const stateDataSchema = ` -- which in turn makes it easier to merge state data blocks. CREATE SEQUENCE IF NOT EXISTS roomserver_state_block_nid_seq; CREATE TABLE IF NOT EXISTS roomserver_state_block ( - -- Local numeric ID for this state data. - state_block_nid bigint NOT NULL, - event_type_nid bigint NOT NULL, - event_state_key_nid bigint NOT NULL, - event_nid bigint NOT NULL, - UNIQUE (state_block_nid, event_type_nid, event_state_key_nid) + -- The state snapshot NID that identifies this snapshot. + state_block_nid bigint PRIMARY KEY DEFAULT nextval('roomserver_state_block_nid_seq'), + -- The hash of the state block, which is used to enforce uniqueness. The hash is + -- generated in Dendrite and passed through to the database, as a btree index over + -- this column is cheap and fits within the maximum index size. + state_block_hash BYTEA UNIQUE, + -- The event NIDs contained within the state block. + event_nids bigint[] NOT NULL ); ` +// Insert a new state block. If we conflict on the hash column then +// we must perform an update so that the RETURNING statement returns the +// ID of the row that we conflicted with, so that we can then refer to +// the original block. const insertStateDataSQL = "" + - "INSERT INTO roomserver_state_block (state_block_nid, event_type_nid, event_state_key_nid, event_nid)" + - " VALUES ($1, $2, $3, $4)" + "INSERT INTO roomserver_state_block (state_block_hash, event_nids)" + + " VALUES ($1, $2)" + + " ON CONFLICT (state_block_hash) DO UPDATE SET event_nids=$2" + + " RETURNING state_block_nid" -const selectNextStateBlockNIDSQL = "" + - "SELECT nextval('roomserver_state_block_nid_seq')" - -// Bulk state lookup by numeric state block ID. -// Sort by the state_block_nid, event_type_nid, event_state_key_nid -// This means that all the entries for a given state_block_nid will appear -// together in the list and those entries will sorted by event_type_nid -// and event_state_key_nid. This property makes it easier to merge two -// state data blocks together. const bulkSelectStateBlockEntriesSQL = "" + - "SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" + - " FROM roomserver_state_block WHERE state_block_nid = ANY($1)" + - " ORDER BY state_block_nid, event_type_nid, event_state_key_nid" - -// Bulk state lookup by numeric state block ID. -// Filters the rows in each block to the requested types and state keys. -// We would like to restrict to particular type state key pairs but we are -// restricted by the query language to pull the cross product of a list -// of types and a list state_keys. So we have to filter the result in the -// application to restrict it to the list of event types and state keys we -// actually wanted. -const bulkSelectFilteredStateBlockEntriesSQL = "" + - "SELECT state_block_nid, event_type_nid, event_state_key_nid, event_nid" + - " FROM roomserver_state_block WHERE state_block_nid = ANY($1)" + - " AND event_type_nid = ANY($2) AND event_state_key_nid = ANY($3)" + - " ORDER BY state_block_nid, event_type_nid, event_state_key_nid" + "SELECT state_block_nid, event_nids" + + " FROM roomserver_state_block WHERE state_block_nid = ANY($1)" type stateBlockStatements struct { - insertStateDataStmt *sql.Stmt - selectNextStateBlockNIDStmt *sql.Stmt - bulkSelectStateBlockEntriesStmt *sql.Stmt - bulkSelectFilteredStateBlockEntriesStmt *sql.Stmt + insertStateDataStmt *sql.Stmt + bulkSelectStateBlockEntriesStmt *sql.Stmt } -func NewPostgresStateBlockTable(db *sql.DB) (tables.StateBlock, error) { - s := &stateBlockStatements{} +func createStateBlockTable(db *sql.DB) error { _, err := db.Exec(stateDataSchema) - if err != nil { - return nil, err - } + return err +} + +func prepareStateBlockTable(db *sql.DB) (tables.StateBlock, error) { + s := &stateBlockStatements{} return s, shared.StatementList{ {&s.insertStateDataStmt, insertStateDataSQL}, - {&s.selectNextStateBlockNIDStmt, selectNextStateBlockNIDSQL}, {&s.bulkSelectStateBlockEntriesStmt, bulkSelectStateBlockEntriesSQL}, - {&s.bulkSelectFilteredStateBlockEntriesStmt, bulkSelectFilteredStateBlockEntriesSQL}, }.Prepare(db) } func (s *stateBlockStatements) BulkInsertStateData( ctx context.Context, txn *sql.Tx, - entries []types.StateEntry, -) (types.StateBlockNID, error) { - stateBlockNID, err := s.selectNextStateBlockNID(ctx) - if err != nil { - return 0, err - } - for _, entry := range entries { - _, err := s.insertStateDataStmt.ExecContext( - ctx, - int64(stateBlockNID), - int64(entry.EventTypeNID), - int64(entry.EventStateKeyNID), - int64(entry.EventNID), - ) - if err != nil { - return 0, err - } + entries types.StateEntries, +) (id types.StateBlockNID, err error) { + entries = entries[:util.SortAndUnique(entries)] + var nids types.EventNIDs + for _, e := range entries { + nids = append(nids, e.EventNID) } - return stateBlockNID, nil -} - -func (s *stateBlockStatements) selectNextStateBlockNID( - ctx context.Context, -) (types.StateBlockNID, error) { - var stateBlockNID int64 - err := s.selectNextStateBlockNIDStmt.QueryRowContext(ctx).Scan(&stateBlockNID) - return types.StateBlockNID(stateBlockNID), err + err = s.insertStateDataStmt.QueryRowContext( + ctx, nids.Hash(), eventNIDsAsArray(nids), + ).Scan(&id) + return } func (s *stateBlockStatements) BulkSelectStateBlockEntries( - ctx context.Context, stateBlockNIDs []types.StateBlockNID, -) ([]types.StateEntryList, error) { - nids := make([]int64, len(stateBlockNIDs)) - for i := range stateBlockNIDs { - nids[i] = int64(stateBlockNIDs[i]) - } - rows, err := s.bulkSelectStateBlockEntriesStmt.QueryContext(ctx, pq.Int64Array(nids)) + ctx context.Context, stateBlockNIDs types.StateBlockNIDs, +) ([][]types.EventNID, error) { + rows, err := s.bulkSelectStateBlockEntriesStmt.QueryContext(ctx, stateBlockNIDsAsArray(stateBlockNIDs)) if err != nil { return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateBlockEntries: rows.close() failed") - results := make([]types.StateEntryList, len(stateBlockNIDs)) - // current is a pointer to the StateEntryList to append the state entries to. - var current *types.StateEntryList + results := make([][]types.EventNID, len(stateBlockNIDs)) i := 0 - for rows.Next() { - var ( - stateBlockNID int64 - eventTypeNID int64 - eventStateKeyNID int64 - eventNID int64 - entry types.StateEntry - ) - if err = rows.Scan( - &stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID, - ); err != nil { + for ; rows.Next(); i++ { + var stateBlockNID types.StateBlockNID + var result pq.Int64Array + if err = rows.Scan(&stateBlockNID, &result); err != nil { return nil, err } - entry.EventTypeNID = types.EventTypeNID(eventTypeNID) - entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID) - entry.EventNID = types.EventNID(eventNID) - if current == nil || types.StateBlockNID(stateBlockNID) != current.StateBlockNID { - // The state entry row is for a different state data block to the current one. - // So we start appending to the next entry in the list. - current = &results[i] - current.StateBlockNID = types.StateBlockNID(stateBlockNID) - i++ + r := []types.EventNID{} + for _, e := range result { + r = append(r, types.EventNID(e)) } - current.StateEntries = append(current.StateEntries, entry) + results[i] = r } if err = rows.Err(); err != nil { return nil, err @@ -186,71 +133,6 @@ func (s *stateBlockStatements) BulkSelectStateBlockEntries( return results, err } -func (s *stateBlockStatements) BulkSelectFilteredStateBlockEntries( - ctx context.Context, - stateBlockNIDs []types.StateBlockNID, - stateKeyTuples []types.StateKeyTuple, -) ([]types.StateEntryList, error) { - tuples := stateKeyTupleSorter(stateKeyTuples) - // Sort the tuples so that we can run binary search against them as we filter the rows returned by the db. - sort.Sort(tuples) - - eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays() - rows, err := s.bulkSelectFilteredStateBlockEntriesStmt.QueryContext( - ctx, - stateBlockNIDsAsArray(stateBlockNIDs), - eventTypeNIDArray, - eventStateKeyNIDArray, - ) - if err != nil { - return nil, err - } - defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectFilteredStateBlockEntries: rows.close() failed") - - var results []types.StateEntryList - var current types.StateEntryList - for rows.Next() { - var ( - stateBlockNID int64 - eventTypeNID int64 - eventStateKeyNID int64 - eventNID int64 - entry types.StateEntry - ) - if err := rows.Scan( - &stateBlockNID, &eventTypeNID, &eventStateKeyNID, &eventNID, - ); err != nil { - return nil, err - } - entry.EventTypeNID = types.EventTypeNID(eventTypeNID) - entry.EventStateKeyNID = types.EventStateKeyNID(eventStateKeyNID) - entry.EventNID = types.EventNID(eventNID) - - // We can use binary search here because we sorted the tuples earlier - if !tuples.contains(entry.StateKeyTuple) { - // The select will return the cross product of types and state keys. - // So we need to check if type of the entry is in the list. - continue - } - - if types.StateBlockNID(stateBlockNID) != current.StateBlockNID { - // The state entry row is for a different state data block to the current one. - // So we append the current entry to the results and start adding to a new one. - // The first time through the loop current will be empty. - if current.StateEntries != nil { - results = append(results, current) - } - current = types.StateEntryList{StateBlockNID: types.StateBlockNID(stateBlockNID)} - } - current.StateEntries = append(current.StateEntries, entry) - } - // Add the last entry to the list if it is not empty. - if current.StateEntries != nil { - results = append(results, current) - } - return results, rows.Err() -} - func stateBlockNIDsAsArray(stateBlockNIDs []types.StateBlockNID) pq.Int64Array { nids := make([]int64, len(stateBlockNIDs)) for i := range stateBlockNIDs { |