aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go28
1 files changed, 28 insertions, 0 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 041f9906..20a9ea42 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -166,6 +166,8 @@ const selectContextAfterEventSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id ASC LIMIT $3"
+const selectSearchSQL = "SELECT id, event_id, headered_event_json FROM syncapi_output_room_events WHERE id > $1 AND type = ANY($2) ORDER BY id ASC LIMIT $3"
+
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
@@ -180,6 +182,7 @@ type outputRoomEventsStatements struct {
selectContextEventStmt *sql.Stmt
selectContextBeforeEventStmt *sql.Stmt
selectContextAfterEventStmt *sql.Stmt
+ selectSearchStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@@ -215,6 +218,7 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
{&s.selectContextEventStmt, selectContextEventSQL},
{&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
{&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
+ {&s.selectSearchStmt, selectSearchSQL},
}.Prepare(db)
}
@@ -632,3 +636,27 @@ func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
}
return result, rows.Err()
}
+
+func (s *outputRoomEventsStatements) ReIndex(ctx context.Context, txn *sql.Tx, limit, afterID int64, types []string) (map[int64]gomatrixserverlib.HeaderedEvent, error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectSearchStmt).QueryContext(ctx, afterID, pq.StringArray(types), limit)
+ if err != nil {
+ return nil, err
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "rows.close() failed")
+
+ var eventID string
+ var id int64
+ result := make(map[int64]gomatrixserverlib.HeaderedEvent)
+ for rows.Next() {
+ var ev gomatrixserverlib.HeaderedEvent
+ var eventBytes []byte
+ if err = rows.Scan(&id, &eventID, &eventBytes); err != nil {
+ return nil, err
+ }
+ if err = ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+ result[id] = ev
+ }
+ return result, rows.Err()
+}