aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--syncapi/storage/interface.go2
-rw-r--r--syncapi/storage/postgres/account_data_table.go11
-rw-r--r--syncapi/storage/postgres/invites_table.go4
-rw-r--r--syncapi/storage/postgres/output_room_events_table.go16
-rw-r--r--syncapi/storage/shared/syncserver.go57
-rw-r--r--syncapi/storage/sqlite3/account_data_table.go11
-rw-r--r--syncapi/storage/sqlite3/invites_table.go4
-rw-r--r--syncapi/storage/sqlite3/output_room_events_table.go16
-rw-r--r--syncapi/storage/tables/interface.go18
-rw-r--r--syncapi/sync/requestpool.go15
-rw-r--r--syncapi/types/types.go28
11 files changed, 108 insertions, 74 deletions
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 22716789..eba008b3 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -64,7 +64,7 @@ type Database interface {
// Returns a map following the format data[roomID] = []dataTypes
// If no data is retrieved, returns an empty map
// If there was an issue with the retrieval, returns an error
- GetAccountDataInRange(ctx context.Context, userID string, oldPos, newPos types.StreamPosition, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error)
+ GetAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter) (map[string][]string, error)
// UpsertAccountData keeps track of new or updated account data, by saving the type
// of the new/updated data, and the user ID and room ID the data is related to (empty)
// room ID means the data isn't specific to any room)
diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go
index 58fb2198..a5e0c121 100644
--- a/syncapi/storage/postgres/account_data_table.go
+++ b/syncapi/storage/postgres/account_data_table.go
@@ -100,19 +100,12 @@ func (s *accountDataStatements) InsertAccountData(
func (s *accountDataStatements) SelectAccountDataInRange(
ctx context.Context,
userID string,
- oldPos, newPos types.StreamPosition,
+ r types.Range,
accountDataEventFilter *gomatrixserverlib.EventFilter,
) (data map[string][]string, err error) {
data = make(map[string][]string)
- // If both positions are the same, it means that the data was saved after the
- // latest room event. In that case, we need to decrement the old position as
- // it would prevent the SQL request from returning anything.
- if oldPos == newPos {
- oldPos--
- }
-
- rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos,
+ rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High(),
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.Types)),
pq.StringArray(filterConvertTypeWildcardToSQL(accountDataEventFilter.NotTypes)),
accountDataEventFilter.Limit,
diff --git a/syncapi/storage/postgres/invites_table.go b/syncapi/storage/postgres/invites_table.go
index 78ca4d6d..01f2e7f4 100644
--- a/syncapi/storage/postgres/invites_table.go
+++ b/syncapi/storage/postgres/invites_table.go
@@ -117,10 +117,10 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// selectInviteEventsInRange returns a map of room ID to invite event for the
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
- ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
) (map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
- rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
+ rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
return nil, err
}
diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go
index 5870bfd5..5020d1e7 100644
--- a/syncapi/storage/postgres/output_room_events_table.go
+++ b/syncapi/storage/postgres/output_room_events_table.go
@@ -155,13 +155,13 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) SelectStateInRange(
- ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
rows, err := stmt.QueryContext(
- ctx, oldPos, newPos,
+ ctx, r.Low(), r.High(),
pq.StringArray(stateFilter.Senders),
pq.StringArray(stateFilter.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
@@ -198,8 +198,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
// since it'll just mark the event as not being needed.
if len(addIDs) < len(delIDs) {
log.WithFields(log.Fields{
- "since": oldPos,
- "current": newPos,
+ "since": r.From,
+ "current": r.To,
"adds": addIDs,
"dels": delIDs,
}).Warn("StateBetween: ignoring deleted state")
@@ -298,7 +298,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
// from sync.
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, fromPos, toPos types.StreamPosition, limit int,
+ roomID string, r types.Range, limit int,
chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, error) {
var stmt *sql.Stmt
@@ -307,7 +307,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
} else {
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
}
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
if err != nil {
return nil, err
}
@@ -331,10 +331,10 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
// from a given position, up to a maximum of 'limit'.
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, fromPos, toPos types.StreamPosition, limit int,
+ roomID string, r types.Range, limit int,
) ([]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEarlyEventsStmt)
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
if err != nil {
return nil, err
}
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index e03a6b9f..be1f9c7a 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -54,17 +54,22 @@ func (d *Database) GetEventsInStreamingRange(
roomID string, limit int,
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
+ r := types.Range{
+ From: from.PDUPosition(),
+ To: to.PDUPosition(),
+ Backwards: backwardOrdering,
+ }
if backwardOrdering {
// When using backward ordering, we want the most recent events first.
if events, err = d.OutputEvents.SelectRecentEvents(
- ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false,
+ ctx, nil, roomID, r, limit, false, false,
); err != nil {
return
}
} else {
// When using forward ordering, we want the least recent events first.
if events, err = d.OutputEvents.SelectEarlyEvents(
- ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit,
+ ctx, nil, roomID, r, limit,
); err != nil {
return
}
@@ -167,10 +172,10 @@ func (d *Database) RetireInviteEvent(
// If no data is retrieved, returns an empty map
// If there was an issue with the retrieval, returns an error
func (d *Database) GetAccountDataInRange(
- ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
+ ctx context.Context, userID string, r types.Range,
accountDataFilterPart *gomatrixserverlib.EventFilter,
) (map[string][]string, error) {
- return d.AccountData.SelectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
+ return d.AccountData.SelectAccountDataInRange(ctx, userID, r, accountDataFilterPart)
}
// UpsertAccountData keeps track of new or updated account data, by saving the type
@@ -417,7 +422,7 @@ func (d *Database) syncPositionTx(
func (d *Database) addPDUDeltaToResponse(
ctx context.Context,
device authtypes.Device,
- fromPos, toPos types.StreamPosition,
+ r types.Range,
numRecentEventsPerRoom int,
wantFullState bool,
res *types.Response,
@@ -443,11 +448,11 @@ func (d *Database) addPDUDeltaToResponse(
var deltas []stateDelta
if !wantFullState {
deltas, joinedRoomIDs, err = d.getStateDeltas(
- ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
+ ctx, &device, txn, r, device.UserID, &stateFilter,
)
} else {
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
- ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
+ ctx, &device, txn, r, device.UserID, &stateFilter,
)
}
if err != nil {
@@ -455,14 +460,14 @@ func (d *Database) addPDUDeltaToResponse(
}
for _, delta := range deltas {
- err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
+ err = d.addRoomDeltaToResponse(ctx, &device, txn, r, delta, numRecentEventsPerRoom, res)
if err != nil {
return nil, err
}
}
// TODO: This should be done in getStateDeltas
- if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
+ if err = d.addInvitesToResponse(ctx, txn, device.UserID, r, res); err != nil {
return nil, err
}
@@ -534,8 +539,12 @@ func (d *Database) IncrementalSync(
var joinedRoomIDs []string
var err error
if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
+ r := types.Range{
+ From: fromPos.PDUPosition(),
+ To: toPos.PDUPosition(),
+ }
joinedRoomIDs, err = d.addPDUDeltaToResponse(
- ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res,
+ ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
)
} else {
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
@@ -589,6 +598,10 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
if err != nil {
return
}
+ r := types.Range{
+ From: 0,
+ To: toPos.PDUPosition(),
+ }
res = types.NewResponse(toPos)
@@ -611,8 +624,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
recentStreamEvents, err = d.OutputEvents.SelectRecentEvents(
- ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(),
- numRecentEventsPerRoom, true, true,
+ ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
)
if err != nil {
return
@@ -644,7 +656,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
res.Rooms.Join[roomID] = *jr
}
- if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
+ if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil {
return
}
@@ -686,11 +698,11 @@ var txReadOnlySnapshot = sql.TxOptions{
func (d *Database) addInvitesToResponse(
ctx context.Context, txn *sql.Tx,
userID string,
- fromPos, toPos types.StreamPosition,
+ r types.Range,
res *types.Response,
) error {
invites, err := d.Invites.SelectInviteEventsInRange(
- ctx, txn, userID, fromPos, toPos,
+ ctx, txn, userID, r,
)
if err != nil {
return err
@@ -726,12 +738,11 @@ func (d *Database) addRoomDeltaToResponse(
ctx context.Context,
device *authtypes.Device,
txn *sql.Tx,
- fromPos, toPos types.StreamPosition,
+ r types.Range,
delta stateDelta,
numRecentEventsPerRoom int,
res *types.Response,
) error {
- endPos := toPos
if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
@@ -739,10 +750,10 @@ func (d *Database) addRoomDeltaToResponse(
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
- endPos = delta.membershipPos
+ r.To = delta.membershipPos
}
recentStreamEvents, err := d.OutputEvents.SelectRecentEvents(
- ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos),
+ ctx, txn, delta.roomID, r,
numRecentEventsPerRoom, true, true,
)
if err != nil {
@@ -872,7 +883,7 @@ func (d *Database) fetchMissingStateEvents(
// A list of joined room IDs is also returned in case the caller needs it.
func (d *Database) getStateDeltas(
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
+ r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
@@ -886,7 +897,7 @@ func (d *Database) getStateDeltas(
var deltas []stateDelta
// get all the state events ever between these two positions
- stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
+ stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
if err != nil {
return nil, nil, err
}
@@ -947,7 +958,7 @@ func (d *Database) getStateDeltas(
// updates for other rooms.
func (d *Database) getStateDeltasForFullStateSync(
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
+ r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
@@ -972,7 +983,7 @@ func (d *Database) getStateDeltasForFullStateSync(
}
// Get all the state events ever between these two positions
- stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
+ stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
if err != nil {
return nil, nil, err
}
diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go
index e5f2417b..435a399c 100644
--- a/syncapi/storage/sqlite3/account_data_table.go
+++ b/syncapi/storage/sqlite3/account_data_table.go
@@ -91,19 +91,12 @@ func (s *accountDataStatements) InsertAccountData(
func (s *accountDataStatements) SelectAccountDataInRange(
ctx context.Context,
userID string,
- oldPos, newPos types.StreamPosition,
+ r types.Range,
accountDataFilterPart *gomatrixserverlib.EventFilter,
) (data map[string][]string, err error) {
data = make(map[string][]string)
- // If both positions are the same, it means that the data was saved after the
- // latest room event. In that case, we need to decrement the old position as
- // it would prevent the SQL request from returning anything.
- if oldPos == newPos {
- oldPos--
- }
-
- rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, oldPos, newPos)
+ rows, err := s.selectAccountDataInRangeStmt.QueryContext(ctx, userID, r.Low(), r.High())
if err != nil {
return
}
diff --git a/syncapi/storage/sqlite3/invites_table.go b/syncapi/storage/sqlite3/invites_table.go
index 26b3a316..9cb184f2 100644
--- a/syncapi/storage/sqlite3/invites_table.go
+++ b/syncapi/storage/sqlite3/invites_table.go
@@ -121,10 +121,10 @@ func (s *inviteEventsStatements) DeleteInviteEvent(
// selectInviteEventsInRange returns a map of room ID to invite event for the
// active invites for the target user ID in the supplied range.
func (s *inviteEventsStatements) SelectInviteEventsInRange(
- ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range,
) (map[string]gomatrixserverlib.HeaderedEvent, error) {
stmt := common.TxStmt(txn, s.selectInviteEventsInRangeStmt)
- rows, err := stmt.QueryContext(ctx, targetUserID, startPos, endPos)
+ rows, err := stmt.QueryContext(ctx, targetUserID, r.Low(), r.High())
if err != nil {
return nil, err
}
diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go
index c1647a14..c89e007e 100644
--- a/syncapi/storage/sqlite3/output_room_events_table.go
+++ b/syncapi/storage/sqlite3/output_room_events_table.go
@@ -146,13 +146,13 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.
func (s *outputRoomEventsStatements) SelectStateInRange(
- ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition,
+ ctx context.Context, txn *sql.Tx, r types.Range,
stateFilterPart *gomatrixserverlib.StateFilter,
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectStateInRangeStmt)
rows, err := stmt.QueryContext(
- ctx, oldPos, newPos,
+ ctx, r.Low(), r.High(),
/*pq.StringArray(stateFilterPart.Senders),
pq.StringArray(stateFilterPart.NotSenders),
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
@@ -195,8 +195,8 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
// since it'll just mark the event as not being needed.
if len(addIDs) < len(delIDs) {
log.WithFields(log.Fields{
- "since": oldPos,
- "current": newPos,
+ "since": r.From,
+ "current": r.To,
"adds": addIDsJSON,
"dels": delIDsJSON,
}).Warn("StateBetween: ignoring deleted state")
@@ -308,7 +308,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
func (s *outputRoomEventsStatements) SelectRecentEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, fromPos, toPos types.StreamPosition, limit int,
+ roomID string, r types.Range, limit int,
chronologicalOrder bool, onlySyncEvents bool,
) ([]types.StreamEvent, error) {
var stmt *sql.Stmt
@@ -318,7 +318,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
}
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
if err != nil {
return nil, err
}
@@ -340,10 +340,10 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
func (s *outputRoomEventsStatements) SelectEarlyEvents(
ctx context.Context, txn *sql.Tx,
- roomID string, fromPos, toPos types.StreamPosition, limit int,
+ roomID string, r types.Range, limit int,
) ([]types.StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectEarlyEventsStmt)
- rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
+ rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
if err != nil {
return nil, err
}
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 1e5351b5..f31bdc2e 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -11,29 +11,29 @@ import (
type AccountData interface {
InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error)
- // SelectAccountDataInRange returns a map of room ID to a list of `dataType`. The range is exclusive of `lowPos` and inclusive of `hiPos`.
- SelectAccountDataInRange(ctx context.Context, userID string, lowPos, hiPos types.StreamPosition, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error)
+ // SelectAccountDataInRange returns a map of room ID to a list of `dataType`.
+ SelectAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error)
SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}
type Invites interface {
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
DeleteInviteEvent(ctx context.Context, inviteEventID string) error
- // SelectInviteEventsInRange returns a map of room ID to invite events. The range is exclusive of `startPos` and inclusive of `endPos`.
- SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, startPos, endPos types.StreamPosition) (map[string]gomatrixserverlib.HeaderedEvent, error)
+ // SelectInviteEventsInRange returns a map of room ID to invite events.
+ SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (map[string]gomatrixserverlib.HeaderedEvent, error)
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}
type Events interface {
- SelectStateInRange(ctx context.Context, txn *sql.Tx, oldPos, newPos types.StreamPosition, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error)
+ SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error)
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
- // SelectRecentEvents returns events between the two stream positions: exclusive of `fromPos` and inclusive of `toPos`.
+ // SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
// Returns up to `limit` events.
- SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error)
- // SelectEarlyEvents returns the earliest events in the given room, exclusive of `fromPos` and inclusive of `toPos`.
- SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error)
+ SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error)
+ // SelectEarlyEvents returns the earliest events in the given room.
+ SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 126e76f5..6e0f44e9 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -184,11 +184,20 @@ func (rp *RequestPool) appendAccountData(
return data, nil
}
+ r := types.Range{
+ From: req.since.PDUPosition(),
+ To: currentPos,
+ }
+ // If both positions are the same, it means that the data was saved after the
+ // latest room event. In that case, we need to decrement the old position as
+ // results are exclusive of Low.
+ if r.Low() == r.High() {
+ r.From--
+ }
+
// Sync is not initial, get all account data since the latest sync
dataTypes, err := rp.db.GetAccountDataInRange(
- req.ctx, userID,
- types.StreamPosition(req.since.PDUPosition()), types.StreamPosition(currentPos),
- accountDataFilter,
+ req.ctx, userID, r, accountDataFilter,
)
if err != nil {
return nil, err
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 8a79ccd4..caa1b3ad 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -47,6 +47,34 @@ type StreamEvent struct {
ExcludeFromSync bool
}
+// Range represents a range between two stream positions.
+type Range struct {
+ // From is the position the client has already received.
+ From StreamPosition
+ // To is the position the client is going towards.
+ To StreamPosition
+ // True if the client is going backwards
+ Backwards bool
+}
+
+// Low returns the low number of the range.
+// This represents the position the client already has and hence is exclusive.
+func (r *Range) Low() StreamPosition {
+ if !r.Backwards {
+ return r.From
+ }
+ return r.To
+}
+
+// High returns the high number of the range
+// This represents the position the client is going towards and hence is inclusive.
+func (r *Range) High() StreamPosition {
+ if !r.Backwards {
+ return r.To
+ }
+ return r.From
+}
+
// SyncTokenType represents the type of a sync token.
// It can be either "s" (representing a position in the whole stream of events)
// or "t" (representing a position in a room's topology/depth).