diff options
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_table.go')
-rw-r--r-- | syncapi/storage/postgres/output_room_events_table.go | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 0b53dfa9..5870bfd5 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -22,6 +22,7 @@ import ( "sort" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/lib/pq" @@ -120,39 +121,40 @@ type outputRoomEventsStatements struct { selectStateInRangeStmt *sql.Stmt } -func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(outputRoomEventsSchema) +func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { + s := &outputRoomEventsStatements{} + _, err := db.Exec(outputRoomEventsSchema) if err != nil { - return + return nil, err } if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { - return + return nil, err } if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil { - return + return nil, err } if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil { - return + return nil, err } if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil { - return + return nil, err } if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil { - return + return nil, err } if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil { - return + return nil, err } if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil { - return + return nil, err } - return + return s, nil } // selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos. // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. -func (s *outputRoomEventsStatements) selectStateInRange( +func (s *outputRoomEventsStatements) SelectStateInRange( ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter, ) (map[string]map[string]bool, map[string]types.StreamEvent, error) { @@ -233,7 +235,7 @@ func (s *outputRoomEventsStatements) selectStateInRange( // MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied, // then this function should only ever be used at startup, as it will race with inserting events if it is // done afterwards. If there are no inserted events, 0 is returned. -func (s *outputRoomEventsStatements) selectMaxEventID( +func (s *outputRoomEventsStatements) SelectMaxEventID( ctx context.Context, txn *sql.Tx, ) (id int64, err error) { var nullableID sql.NullInt64 @@ -247,7 +249,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID( // InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position // of the inserted event. -func (s *outputRoomEventsStatements) insertEvent( +func (s *outputRoomEventsStatements) InsertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool, @@ -294,7 +296,7 @@ func (s *outputRoomEventsStatements) insertEvent( // selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'. // If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude // from sync. -func (s *outputRoomEventsStatements) selectRecentEvents( +func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool, @@ -327,7 +329,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents( // selectEarlyEvents returns the earliest events in the given room, starting // from a given position, up to a maximum of 'limit'. -func (s *outputRoomEventsStatements) selectEarlyEvents( +func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, ) ([]types.StreamEvent, error) { @@ -352,7 +354,7 @@ func (s *outputRoomEventsStatements) selectEarlyEvents( // selectEvents returns the events for the given event IDs. If an event is // missing from the database, it will be omitted. -func (s *outputRoomEventsStatements) selectEvents( +func (s *outputRoomEventsStatements) SelectEvents( ctx context.Context, txn *sql.Tx, eventIDs []string, ) ([]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEventsStmt) |