From 2b5052eccfca49e736b39a9879ebde2ab196f6da Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 15 May 2020 09:41:12 +0100 Subject: Add Range (#1037) * Add Range * Use Range --- syncapi/storage/postgres/account_data_table.go | 11 ++--------- syncapi/storage/postgres/invites_table.go | 4 ++-- syncapi/storage/postgres/output_room_events_table.go | 16 ++++++++-------- 3 files changed, 12 insertions(+), 19 deletions(-) (limited to 'syncapi/storage/postgres') diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go index 58fb2198..a5e0c121 100644 --- a/syncapi/storage/postgres/account_data_table.go +++ b/syncapi/storage/postgres/account_data_table.go @@ -100,19 +100,12 @@ func (s *accountDataStatements) InsertAccountData( func (s *accountDataStatements) SelectAccountDataInRange( ctx context.Context, userID string, - oldPos, newPos types.StreamPosition, + r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter, ) (data map[string][]string, err error) { data = make(map[string][]string) - // If both positions are the same, it means that the data was saved after the - // latest room event. In that case, we need to decrement the old position as - // it would prevent the SQL request from returning anything. - if oldPos == newPos { - oldPos-- - } - - rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos, + rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(), pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)), pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.NotTypes)), accountDataEventFilter.Limit, diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go index 78ca4d6d..01f2e7f4 100644 --- a/syncapi/storage/postgres/invites_table.go +++ b/syncapi/storage/postgres/invites_table.go @@ -117,10 +117,10 @@ func (s *inviteEventsStatements) DeleteInviteEvent( // selectInviteEventsInRange returns a map of room ID to invite event for the // active invites for the target user ID in the supplied range. func (s *inviteEventsStatements) SelectInviteEventsInRange( - ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range, ) (map[string]gomatrixserverlib.HeaderedEvent, error) { stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt) - rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos) + rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High()) if err != nil { return nil, err } diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 5870bfd5..5020d1e7 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -155,13 +155,13 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { // Results are bucketed based on the room ID. If the same state is overwritten multiple times between the // two positions, only the most recent state is returned. func (s *outputRoomEventsStatements) SelectStateInRange( - ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, + ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter, ) (map[string]map[string]bool, map[string]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectStateInRangeStmt) rows, err := stmt.QueryContext( - ctx, oldPos, newPos, + ctx, r.Low(), r.High(), pq.StringArray(stateFilter.Senders), pq.StringArray(stateFilter.NotSenders), pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)), @@ -198,8 +198,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // since it'll just mark the event as not being needed. if len(addIDs) < len(delIDs) { log.WithFields(log.Fields{ - "since": oldPos, - "current": newPos, + "since": r.From, + "current": r.To, "adds": addIDs, "dels": delIDs, }).Warn("StateBetween: ignoring deleted state") @@ -298,7 +298,7 @@ func (s *outputRoomEventsStatements) InsertEvent( // from sync. func (s *outputRoomEventsStatements) SelectRecentEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos types.StreamPosition, limit int, + roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool, ) ([]types.StreamEvent, error) { var stmt *sql.Stmt @@ -307,7 +307,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( } else { stmt = common.TxStmt(txn, s.selectRecentEventsStmt) } - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) if err != nil { return nil, err } @@ -331,10 +331,10 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( // from a given position, up to a maximum of 'limit'. func (s *outputRoomEventsStatements) SelectEarlyEvents( ctx context.Context, txn *sql.Tx, - roomID string, fromPos, toPos types.StreamPosition, limit int, + roomID string, r types.Range, limit int, ) ([]types.StreamEvent, error) { stmt := common.TxStmt(txn, s.selectEarlyEventsStmt) - rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit) + rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit) if err != nil { return nil, err } -- cgit v1.2.3