aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/postgres
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-05-15 09:41:12 +0100
committerGitHub <noreply@github.com>2020-05-15 09:41:12 +0100
commit2b5052eccfca49e736b39a9879ebde2ab196f6da (patch)
treeebd43e4ea84cf403a0c1530833a2751faa2d255d /syncapi/storage/postgres
parent419ff150d41a3d0de25f0e8e66baf36948bcfbc1 (diff)
Add Range (#1037)
* Add Range * Use Range
Diffstat (limited to 'syncapi/storage/postgres')
-rw-r--r--syncapi/storage/postgres/account_data_table.go11
-rw-r--r--syncapi/storage/postgres/invites_table.go4
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go16
3 files changed, 12 insertions, 19 deletions
diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go
index 58fb2198..a5e0c121 100644
--- a/syncapi/storage/postgres/account_data_table.go
+++ b/syncapi/storage/postgres/account_data_table.go
@@ -100,19 +100,12 @@ func (s *accountDataStatements) InsertAccountData(
func (s *accountDataStatements) SelectAccountDataInRange(
ctx context.Context,
userID string,
- oldPos, newPos types.StreamPosition,
+ r types.Range,
accountDataEventFilter *gomatrixserverlib.EventFilter,
) (data map[string][]string, err error) {
data = make(map[string][]string)
- // If both positions are the same, it means that the data was saved after the
- // latest room event. In that case, we need to decrement the old position as
- // it would prevent the SQL request from returning anything.
- if oldPos == newPos {
- oldPos--
- }
-
- rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos,
+ rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(),
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.NotTypes)),
accountDataEventFilter.Limit,
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
index 78ca4d6d..01f2e7f4 100644
--- a/syncapi/storage/postgres/invites_table.go
+++ b/syncapi/storage/postgres/invites_table.go
@@ -117,10 +117,10 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// selectInviteEventsInRange returns a map of room ID to invite event for the
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
- ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
) (map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
- rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
+ rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
return nil, err
}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 5870bfd5..5020d1e7 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -155,13 +155,13 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) SelectStateInRange(
- ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
rows, err := stmt.QueryContext(
- ctx, oldPos, newPos,
+ ctx, r.Low(), r.High(),
pq.StringArray(stateFilter.Senders),
pq.StringArray(stateFilter.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
@@ -198,8 +198,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
// since it'll just mark the event as not being needed.
if len(addIDs) < len(delIDs) {
log.WithFields(log.Fields{
- "since": oldPos,
- "current": newPos,
+ "since": r.From,
+ "current": r.To,
"adds": addIDs,
"dels": delIDs,
}).Warn("StateBetween: ignoring deleted state")
@@ -298,7 +298,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
// from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, fromPos, toPos types.StreamPosition, limit int,
+ roomID string, r types.Range, limit int,
chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, error) {
var stmt *sql.Stmt
@@ -307,7 +307,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
} else {
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
}
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
if err != nil {
return nil, err
}
@@ -331,10 +331,10 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
// from a given position, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, fromPos, toPos types.StreamPosition, limit int,
+ roomID string, r types.Range, limit int,
) ([]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEarlyEventsStmt)
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
if err != nil {
return nil, err
}