aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-02-21 17:12:22 +0100
committerGitHub <noreply@github.com>2022-02-21 17:12:22 +0100
commitcf525d1f619cc65df244c20ec0f220ace22ae2bd (patch)
tree40e72b7e7da95956892d45e1613ad7eae206d147 /syncapi/storage/postgres
parent280e9b19a195e3ce19f0fa5bc0e94bb09e397a23 (diff)
Implement `/context` (#2207)
* Add QueryEventsAfter * Add /context * Make all tests pass on sqlite * Add queries to get the events for /context requests * Move /context to the syncapi * Revert "Add QueryEventsAfter" This reverts commit 440a771d10632622e8c65d35fe90f0804bc98862. * Simplify getting the required events * Apply RoomEventFilter when getting events * Add passing tests * Remove logging * Remove unused SQL statements Update comments & add TODO
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go142
1 files changed, 114 insertions, 28 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 44de02c9..d4cc4f3f 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -130,6 +130,25 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
+const selectContextEventSQL = "" +
+ "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND event_id = $2"
+
+const selectContextBeforeEventSQL = "" +
+ "SELECT headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id < $2" +
+ " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
+ " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
+ " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
+ " ORDER BY id DESC LIMIT $3"
+
+const selectContextAfterEventSQL = "" +
+ "SELECT id, headered_event_json FROM syncapi_output_room_events WHERE room_id = $1 AND id > $2" +
+ " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
+ " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
+ " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
+ " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
+ " ORDER BY id ASC LIMIT $3"
+
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
@@ -140,6 +159,9 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
+ selectContextEventStmt *sql.Stmt
+ selectContextBeforeEventStmt *sql.Stmt
+ selectContextAfterEventStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@@ -148,34 +170,20 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if err != nil {
return nil, err
}
- if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
- return nil, err
- }
- if s.selectEventsStmt, err = db.Prepare(selectEventsSQL); err != nil {
- return nil, err
- }
- if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
- return nil, err
- }
- if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
- return nil, err
- }
- if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
- return nil, err
- }
- if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
- return nil, err
- }
- if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
- return nil, err
- }
- if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
- return nil, err
- }
- if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
- return nil, err
- }
- return s, nil
+ return s, sqlutil.StatementList{
+ {&s.insertEventStmt, insertEventSQL},
+ {&s.selectEventsStmt, selectEventsSQL},
+ {&s.selectMaxEventIDStmt, selectMaxEventIDSQL},
+ {&s.selectRecentEventsStmt, selectRecentEventsSQL},
+ {&s.selectRecentEventsForSyncStmt, selectRecentEventsForSyncSQL},
+ {&s.selectEarlyEventsStmt, selectEarlyEventsSQL},
+ {&s.selectStateInRangeStmt, selectStateInRangeSQL},
+ {&s.updateEventJSONStmt, updateEventJSONSQL},
+ {&s.deleteEventsForRoomStmt, deleteEventsForRoomSQL},
+ {&s.selectContextEventStmt, selectContextEventSQL},
+ {&s.selectContextBeforeEventStmt, selectContextBeforeEventSQL},
+ {&s.selectContextAfterEventStmt, selectContextAfterEventSQL},
+ }.Prepare(db)
}
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
@@ -436,6 +444,84 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
+func (s *outputRoomEventsStatements) SelectContextEvent(ctx context.Context, txn *sql.Tx, roomID, eventID string) (id int, evt gomatrixserverlib.HeaderedEvent, err error) {
+ row := sqlutil.TxStmt(txn, s.selectContextEventStmt).QueryRowContext(ctx, roomID, eventID)
+
+ var eventAsString string
+ if err = row.Scan(&id, &eventAsString); err != nil {
+ return 0, evt, err
+ }
+
+ if err = json.Unmarshal([]byte(eventAsString), &evt); err != nil {
+ return 0, evt, err
+ }
+ return id, evt, nil
+}
+
+func (s *outputRoomEventsStatements) SelectContextBeforeEvent(
+ ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
+) (evts []*gomatrixserverlib.HeaderedEvent, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectContextBeforeEventStmt).QueryContext(
+ ctx, roomID, id, filter.Limit,
+ pq.StringArray(filter.Senders),
+ pq.StringArray(filter.NotSenders),
+ pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)),
+ )
+ if err != nil {
+ return
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var (
+ eventBytes []byte
+ evt *gomatrixserverlib.HeaderedEvent
+ )
+ if err = rows.Scan(&eventBytes); err != nil {
+ return evts, err
+ }
+ if err = json.Unmarshal(eventBytes, &evt); err != nil {
+ return evts, err
+ }
+ evts = append(evts, evt)
+ }
+
+ return evts, rows.Err()
+}
+
+func (s *outputRoomEventsStatements) SelectContextAfterEvent(
+ ctx context.Context, txn *sql.Tx, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter,
+) (lastID int, evts []*gomatrixserverlib.HeaderedEvent, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectContextAfterEventStmt).QueryContext(
+ ctx, roomID, id, filter.Limit,
+ pq.StringArray(filter.Senders),
+ pq.StringArray(filter.NotSenders),
+ pq.StringArray(filterConvertTypeWildcardToSQL(filter.Types)),
+ pq.StringArray(filterConvertTypeWildcardToSQL(filter.NotTypes)),
+ )
+ if err != nil {
+ return
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var (
+ eventBytes []byte
+ evt *gomatrixserverlib.HeaderedEvent
+ )
+ if err = rows.Scan(&lastID, &eventBytes); err != nil {
+ return 0, evts, err
+ }
+ if err = json.Unmarshal(eventBytes, &evt); err != nil {
+ return 0, evts, err
+ }
+ evts = append(evts, evt)
+ }
+
+ return lastID, evts, rows.Err()
+}
+
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {