aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres/output_room_events_table.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/postgres/output_room_events_table.go')
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go22
1 files changed, 20 insertions, 2 deletions
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 14af6a94..a30e220b 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -427,7 +427,7 @@ func (s *outputRoomEventsStatements) SelectEarlyEvents(
// selectEvents returns the events for the given event IDs. If an event is
// missing from the database, it will be omitted.
func (s *outputRoomEventsStatements) SelectEvents(
- ctx context.Context, txn *sql.Tx, eventIDs []string,
+ ctx context.Context, txn *sql.Tx, eventIDs []string, preserveOrder bool,
) ([]types.StreamEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectEventsStmt)
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
@@ -435,7 +435,25 @@ func (s *outputRoomEventsStatements) SelectEvents(
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectEvents: rows.close() failed")
- return rowsToStreamEvents(rows)
+ streamEvents, err := rowsToStreamEvents(rows)
+ if err != nil {
+ return nil, err
+ }
+ if preserveOrder {
+ eventMap := make(map[string]types.StreamEvent)
+ for _, ev := range streamEvents {
+ eventMap[ev.EventID()] = ev
+ }
+ var returnEvents []types.StreamEvent
+ for _, eventID := range eventIDs {
+ ev, ok := eventMap[eventID]
+ if ok {
+ returnEvents = append(returnEvents, ev)
+ }
+ }
+ return returnEvents, nil
+ }
+ return streamEvents, nil
}
func (s *outputRoomEventsStatements) DeleteEventsForRoom(