From 6d25bd6ca57f518404000c47d69bcbfadb4fd2ef Mon Sep 17 00:00:00 2001 From: kegsay Date: Fri, 8 Apr 2022 17:53:24 +0100 Subject: syncapi: add more tests; fix more bugs (#2338) * syncapi: add more tests; fix more bugs bugfixes: - The postgres impl of TopologyTable.SelectEventIDsInRange did not use the provided txn - The postgres impl of EventsTable.SelectEvents did not preserve the ordering of the input event IDs in the output events slice - The sqlite impl of EventsTable.SelectEvents did not use a bulk `IN ($1)` query. Added tests: - `TestGetEventsInRangeWithTopologyToken` - `TestOutputRoomEventsTable` - `TestTopologyTable` * -p 1 for now --- .../storage/postgres/output_room_events_table.go | 22 ++++++++++++++++++++-- .../postgres/output_room_events_topology_table.go | 4 ++-- 2 files changed, 22 insertions(+), 4 deletions(-) (limited to 'syncapi/storage/postgres') diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 14af6a94..a30e220b 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -427,7 +427,7 @@ func (s *outputRoomEventsStatements) SelectEarlyEvents( // selectEvents returns the events for the given event IDs. If an event is // missing from the database, it will be omitted. func (s *outputRoomEventsStatements) SelectEvents( - ctx context.Context, txn *sql.Tx, eventIDs []string, + ctx context.Context, txn *sql.Tx, eventIDs []string, preserveOrder bool, ) ([]types.StreamEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectEventsStmt) rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs)) @@ -435,7 +435,25 @@ func (s *outputRoomEventsStatements) SelectEvents( return nil, err } defer internal.CloseAndLogIfError(ctx, rows, "selectEvents: rows.close() failed") - return rowsToStreamEvents(rows) + streamEvents, err := rowsToStreamEvents(rows) + if err != nil { + return nil, err + } + if preserveOrder { + eventMap := make(map[string]types.StreamEvent) + for _, ev := range streamEvents { + eventMap[ev.EventID()] = ev + } + var returnEvents []types.StreamEvent + for _, eventID := range eventIDs { + ev, ok := eventMap[eventID] + if ok { + returnEvents = append(returnEvents, ev) + } + } + return returnEvents, nil + } + return streamEvents, nil } func (s *outputRoomEventsStatements) DeleteEventsForRoom( diff --git a/syncapi/storage/postgres/output_room_events_topology_table.go b/syncapi/storage/postgres/output_room_events_topology_table.go index 626386ba..90b3b008 100644 --- a/syncapi/storage/postgres/output_room_events_topology_table.go +++ b/syncapi/storage/postgres/output_room_events_topology_table.go @@ -148,9 +148,9 @@ func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange( // is requested or not. var stmt *sql.Stmt if chronologicalOrder { - stmt = s.selectEventIDsInRangeASCStmt + stmt = sqlutil.TxStmt(txn, s.selectEventIDsInRangeASCStmt) } else { - stmt = s.selectEventIDsInRangeDESCStmt + stmt = sqlutil.TxStmt(txn, s.selectEventIDsInRangeDESCStmt) } // Query the event IDs. -- cgit v1.2.3