aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres/output_room_events_table.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_table.go')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go36
1 files changed, 19 insertions, 17 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 0b53dfa9..5870bfd5 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -22,6 +22,7 @@ import (
"sort"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/lib/pq"
@@ -120,39 +121,40 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt
}
-func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
- _, err = db.Exec(outputRoomEventsSchema)
+func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
+ s := &outputRoomEventsStatements{}
+ _, err := db.Exec(outputRoomEventsSchema)
if err != nil {
- return
+ return nil, err
}
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
- return
+ return nil, err
}
if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
- return
+ return nil, err
}
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
- return
+ return nil, err
}
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
- return
+ return nil, err
}
if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
- return
+ return nil, err
}
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
- return
+ return nil, err
}
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
- return
+ return nil, err
}
- return
+ return s, nil
}
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
-func (s *outputRoomEventsStatements) selectStateInRange(
+func (s *outputRoomEventsStatements) SelectStateInRange(
ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
stateFilter *gomatrixserverlib.StateFilter,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
@@ -233,7 +235,7 @@ func (s *outputRoomEventsStatements) selectStateInRange(
// MaxID returns the ID of the last inserted event in this table. 'txn' is optional. If it is not supplied,
// then this function should only ever be used at startup, as it will race with inserting events if it is
// done afterwards. If there are no inserted events, 0 is returned.
-func (s *outputRoomEventsStatements) selectMaxEventID(
+func (s *outputRoomEventsStatements) SelectMaxEventID(
ctx context.Context, txn *sql.Tx,
) (id int64, err error) {
var nullableID sql.NullInt64
@@ -247,7 +249,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
// InsertEvent into the output_room_events table. addState and removeState are an optional list of state event IDs. Returns the position
// of the inserted event.
-func (s *outputRoomEventsStatements) insertEvent(
+func (s *outputRoomEventsStatements) InsertEvent(
ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.HeaderedEvent, addState, removeState []string,
transactionID *api.TransactionID, excludeFromSync bool,
@@ -294,7 +296,7 @@ func (s *outputRoomEventsStatements) insertEvent(
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
// from sync.
-func (s *outputRoomEventsStatements) selectRecentEvents(
+func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
chronologicalOrder bool, onlySyncEvents bool,
@@ -327,7 +329,7 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
// selectEarlyEvents returns the earliest events in the given room, starting
// from a given position, up to a maximum of 'limit'.
-func (s *outputRoomEventsStatements) selectEarlyEvents(
+func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
) ([]types.StreamEvent, error) {
@@ -352,7 +354,7 @@ func (s *outputRoomEventsStatements) selectEarlyEvents(
// selectEvents returns the events for the given event IDs. If an event is
// missing from the database, it will be omitted.
-func (s *outputRoomEventsStatements) selectEvents(
+func (s *outputRoomEventsStatements) SelectEvents(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEventsStmt)