aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres/output_room_events_table.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_table.go')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go160
1 files changed, 121 insertions, 39 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 0075fc8d..59fb99aa 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -19,18 +19,17 @@ import (
"context"
"database/sql"
"encoding/json"
+ "fmt"
"sort"
+ "github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
-
- "github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib"
-
- "github.com/matrix-org/dendrite/internal/sqlutil"
)
const outputRoomEventsSchema = `
@@ -44,7 +43,7 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
-- This isn't a problem for us since we just want to order by this field.
id BIGINT PRIMARY KEY DEFAULT nextval('syncapi_stream_id'),
-- The event ID for the event
- event_id TEXT NOT NULL CONSTRAINT syncapi_event_id_idx UNIQUE,
+ event_id TEXT NOT NULL CONSTRAINT syncapi_output_room_event_id_idx UNIQUE,
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
-- The headered JSON for the event, containing potentially additional metadata such as
@@ -79,13 +78,16 @@ CREATE INDEX IF NOT EXISTS syncapi_output_room_events_room_id_idx ON syncapi_out
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_exclude_from_sync_idx ON syncapi_output_room_events (exclude_from_sync);
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_add_state_ids_idx ON syncapi_output_room_events ((add_state_ids IS NOT NULL));
CREATE INDEX IF NOT EXISTS syncapi_output_room_events_remove_state_ids_idx ON syncapi_output_room_events ((remove_state_ids IS NOT NULL));
+CREATE INDEX IF NOT EXISTS syncapi_output_room_events_recent_events_idx ON syncapi_output_room_events (room_id, exclude_from_sync, id, sender, type);
+
+
`
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync, history_visibility" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
- "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
+ "ON CONFLICT ON CONSTRAINT syncapi_output_room_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
"RETURNING id"
const selectEventsSQL = "" +
@@ -109,14 +111,29 @@ const selectRecentEventsSQL = "" +
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
" ORDER BY id DESC LIMIT $8"
-const selectRecentEventsForSyncSQL = "" +
- "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
- " WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
- " AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
- " AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
- " AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
- " AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
- " ORDER BY id DESC LIMIT $8"
+// selectRecentEventsForSyncSQL contains an optimization to get the recent events for a list of rooms, using a LATERAL JOIN
+// The sub select inside LATERAL () is executed for all room_ids it gets as a parameter $1
+const selectRecentEventsForSyncSQL = `
+WITH room_ids AS (
+ SELECT unnest($1::text[]) AS room_id
+)
+SELECT x.*
+FROM room_ids,
+ LATERAL (
+ SELECT room_id, event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility
+ FROM syncapi_output_room_events recent_events
+ WHERE
+ recent_events.room_id = room_ids.room_id
+ AND recent_events.exclude_from_sync = FALSE
+ AND id > $2 AND id <= $3
+ AND ( $4::text[] IS NULL OR sender = ANY($4) )
+ AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )
+ AND ( $6::text[] IS NULL OR type LIKE ANY($6) )
+ AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )
+ ORDER BY recent_events.id DESC
+ LIMIT $8
+ ) AS x
+`
const selectEarlyEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_visibility FROM syncapi_output_room_events" +
@@ -207,12 +224,30 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
return nil, err
}
+ migrationName := "syncapi: rename dupe index (output_room_events)"
+
+ var cName string
+ err = db.QueryRowContext(context.Background(), "select constraint_name from information_schema.table_constraints where table_name = 'syncapi_output_room_events' AND constraint_name = 'syncapi_event_id_idx'").Scan(&cName)
+ switch err {
+ case sql.ErrNoRows: // migration was already executed, as the index was renamed
+ if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil {
+ return nil, fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
+ }
+ case nil:
+ default:
+ return nil, err
+ }
+
m := sqlutil.NewMigrator(db)
m.AddMigrations(
sqlutil.Migration{
Version: "syncapi: add history visibility column (output_room_events)",
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
},
+ sqlutil.Migration{
+ Version: migrationName,
+ Up: deltas.UpRenameOutputRoomEventsIndex,
+ },
)
err = m.Up(context.Background())
if err != nil {
@@ -398,9 +433,9 @@ func (s *outputRoomEventsStatements) InsertEvent(
// from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
+ roomIDs []string, ra types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
chronologicalOrder bool, onlySyncEvents bool,
-) ([]types.StreamEvent, bool, error) {
+) (map[string]types.RecentEvents, error) {
var stmt *sql.Stmt
if onlySyncEvents {
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
@@ -408,8 +443,9 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
}
senders, notSenders := getSendersRoomEventFilter(eventFilter)
+
rows, err := stmt.QueryContext(
- ctx, roomID, r.Low(), r.High(),
+ ctx, pq.StringArray(roomIDs), ra.Low(), ra.High(),
pq.StringArray(senders),
pq.StringArray(notSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
@@ -417,34 +453,80 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
eventFilter.Limit+1,
)
if err != nil {
- return nil, false, err
+ return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRecentEvents: rows.close() failed")
- events, err := rowsToStreamEvents(rows)
- if err != nil {
- return nil, false, err
- }
- if chronologicalOrder {
- // The events need to be returned from oldest to latest, which isn't
- // necessary the way the SQL query returns them, so a sort is necessary to
- // ensure the events are in the right order in the slice.
- sort.SliceStable(events, func(i int, j int) bool {
- return events[i].StreamPosition < events[j].StreamPosition
+
+ result := make(map[string]types.RecentEvents)
+
+ for rows.Next() {
+ var (
+ roomID string
+ eventID string
+ streamPos types.StreamPosition
+ eventBytes []byte
+ excludeFromSync bool
+ sessionID *int64
+ txnID *string
+ transactionID *api.TransactionID
+ historyVisibility gomatrixserverlib.HistoryVisibility
+ )
+ if err := rows.Scan(&roomID, &eventID, &streamPos, &eventBytes, &sessionID, &excludeFromSync, &txnID, &historyVisibility); err != nil {
+ return nil, err
+ }
+ // TODO: Handle redacted events
+ var ev gomatrixserverlib.HeaderedEvent
+ if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
+ return nil, err
+ }
+
+ if sessionID != nil && txnID != nil {
+ transactionID = &api.TransactionID{
+ SessionID: *sessionID,
+ TransactionID: *txnID,
+ }
+ }
+
+ r := result[roomID]
+
+ ev.Visibility = historyVisibility
+ r.Events = append(r.Events, types.StreamEvent{
+ HeaderedEvent: &ev,
+ StreamPosition: streamPos,
+ TransactionID: transactionID,
+ ExcludeFromSync: excludeFromSync,
})
+
+ result[roomID] = r
}
- // we queried for 1 more than the limit, so if we returned one more mark limited=true
- limited := false
- if len(events) > eventFilter.Limit {
- limited = true
- // re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
- if chronologicalOrder {
- events = events[1:]
- } else {
- events = events[:len(events)-1]
+
+ if chronologicalOrder {
+ for roomID, evs := range result {
+ // The events need to be returned from oldest to latest, which isn't
+ // necessary the way the SQL query returns them, so a sort is necessary to
+ // ensure the events are in the right order in the slice.
+ sort.SliceStable(evs.Events, func(i int, j int) bool {
+ return evs.Events[i].StreamPosition < evs.Events[j].StreamPosition
+ })
+
+ if len(evs.Events) > eventFilter.Limit {
+ evs.Limited = true
+ evs.Events = evs.Events[1:]
+ }
+
+ result[roomID] = evs
}
- }
+ } else {
+ for roomID, evs := range result {
+ if len(evs.Events) > eventFilter.Limit {
+ evs.Limited = true
+ evs.Events = evs.Events[:len(evs.Events)-1]
+ }
- return events, limited, nil
+ result[roomID] = evs
+ }
+ }
+ return result, rows.Err()
}
// selectEarlyEvents returns the earliest events in the given room, starting