aboutsummaryrefslogtreecommitdiff
path: root/roomserver/storage/postgres/events_table.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-04-26 13:25:57 +0100
committerGitHub <noreply@github.com>2021-04-26 13:25:57 +0100
commit5ce1fe80dea8b8cfca8712e8d584deb995bbddcc (patch)
tree1307a1edf73abf68cebd4601efec1e467dac964c /roomserver/storage/postgres/events_table.go
parentd6e9b7b307ff0d7541046ec33890d49239c7a6ca (diff)
State storage refactor (#1839)
* Hash-deduplicated state storage (and migrations) for PostgreSQL and SQLite * Refactor droomserver database setup for migrations * Fix conflict statements * Update migration names * Set a boundary for old to new block/snapshot IDs so we don't rewrite them more than once accidentally * Create sequence if not exists * Fix boundary queries * Fix boundary queries * Use Query * Break out queries a bit * More sequence tweaks * Query parameters are not playing the game * Injection escaping may not work for CREATE SEQUENCE after all * Fix snapshot sequence name * Use boundaried IDs in SQLite too * Use IFNULL for SQLite * Use COALESCE in PostgreSQL * Review comments @Kegsay
Diffstat (limited to 'roomserver/storage/postgres/events_table.go')
-rw-r--r--roomserver/storage/postgres/events_table.go60
1 files changed, 55 insertions, 5 deletions
diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go
index 0cf0bd22..88c82083 100644
--- a/roomserver/storage/postgres/events_table.go
+++ b/roomserver/storage/postgres/events_table.go
@@ -19,6 +19,7 @@ import (
"context"
"database/sql"
"fmt"
+ "sort"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
@@ -88,6 +89,16 @@ const bulkSelectStateEventByIDSQL = "" +
" WHERE event_id = ANY($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
+// Bulk look up of events by event NID, optionally filtering by the event type
+// or event state key NIDs if provided. (The CARDINALITY check will return true
+// if the provided arrays are empty, ergo no filtering).
+const bulkSelectStateEventByNIDSQL = "" +
+ "SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
+ " WHERE event_nid = ANY($1)" +
+ " AND (CARDINALITY($2::bigint[]) = 0 OR event_type_nid = ANY($2))" +
+ " AND (CARDINALITY($3::bigint[]) = 0 OR event_state_key_nid = ANY($3))" +
+ " ORDER BY event_type_nid, event_state_key_nid ASC"
+
const bulkSelectStateAtEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
" WHERE event_id = ANY($1)"
@@ -127,6 +138,7 @@ type eventStatements struct {
insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt
bulkSelectStateEventByIDStmt *sql.Stmt
+ bulkSelectStateEventByNIDStmt *sql.Stmt
bulkSelectStateAtEventByIDStmt *sql.Stmt
updateEventStateStmt *sql.Stmt
selectEventSentToOutputStmt *sql.Stmt
@@ -140,17 +152,19 @@ type eventStatements struct {
selectRoomNIDsForEventNIDsStmt *sql.Stmt
}
-func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
- s := &eventStatements{}
+func createEventsTable(db *sql.DB) error {
_, err := db.Exec(eventsSchema)
- if err != nil {
- return nil, err
- }
+ return err
+}
+
+func prepareEventsTable(db *sql.DB) (tables.Events, error) {
+ s := &eventStatements{}
return s, shared.StatementList{
{&s.insertEventStmt, insertEventSQL},
{&s.selectEventStmt, selectEventSQL},
{&s.bulkSelectStateEventByIDStmt, bulkSelectStateEventByIDSQL},
+ {&s.bulkSelectStateEventByNIDStmt, bulkSelectStateEventByNIDSQL},
{&s.bulkSelectStateAtEventByIDStmt, bulkSelectStateAtEventByIDSQL},
{&s.updateEventStateStmt, updateEventStateSQL},
{&s.updateEventSentToOutputStmt, updateEventSentToOutputSQL},
@@ -238,6 +252,42 @@ func (s *eventStatements) BulkSelectStateEventByID(
return results, nil
}
+// bulkSelectStateEventByNID lookups a list of state events by event NID.
+// If any of the requested events are missing from the database it returns a types.MissingEventError
+func (s *eventStatements) BulkSelectStateEventByNID(
+ ctx context.Context, eventNIDs []types.EventNID,
+ stateKeyTuples []types.StateKeyTuple,
+) ([]types.StateEntry, error) {
+ tuples := stateKeyTupleSorter(stateKeyTuples)
+ sort.Sort(tuples)
+ eventTypeNIDArray, eventStateKeyNIDArray := tuples.typesAndStateKeysAsArrays()
+ rows, err := s.bulkSelectStateEventByNIDStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs), eventTypeNIDArray, eventStateKeyNIDArray)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectStateEventByID: rows.close() failed")
+ // We know that we will only get as many results as event IDs
+ // because of the unique constraint on event IDs.
+ // So we can allocate an array of the correct size now.
+ // We might get fewer results than IDs so we adjust the length of the slice before returning it.
+ results := make([]types.StateEntry, len(eventNIDs))
+ i := 0
+ for ; rows.Next(); i++ {
+ result := &results[i]
+ if err = rows.Scan(
+ &result.EventTypeNID,
+ &result.EventStateKeyNID,
+ &result.EventNID,
+ ); err != nil {
+ return nil, err
+ }
+ }
+ if err = rows.Err(); err != nil {
+ return nil, err
+ }
+ return results[:i], nil
+}
+
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError.
// If we do not have the state for any of the requested events it returns a types.MissingEventError.