aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/shared/syncserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r--syncapi/storage/shared/syncserver.go57
1 files changed, 34 insertions, 23 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index e03a6b9f..be1f9c7a 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -54,17 +54,22 @@ func (d *Database) GetEventsInStreamingRange(
roomID string, limit int,
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
+ r := types.Range{
+ From: from.PDUPosition(),
+ To: to.PDUPosition(),
+ Backwards: backwardOrdering,
+ }
if backwardOrdering {
// When using backward ordering, we want the most recent events first.
if events, err = d.OutputEvents.SelectRecentEvents(
- ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false,
+ ctx, nil, roomID, r, limit, false, false,
); err != nil {
return
}
} else {
// When using forward ordering, we want the least recent events first.
if events, err = d.OutputEvents.SelectEarlyEvents(
- ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit,
+ ctx, nil, roomID, r, limit,
); err != nil {
return
}
@@ -167,10 +172,10 @@ func (d *Database) RetireInviteEvent(
// If no data is retrieved, returns an empty map
// If there was an issue with the retrieval, returns an error
func (d *Database) GetAccountDataInRange(
- ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
+ ctx context.Context, userID string, r types.Range,
accountDataFilterPart *gomatrixserverlib.EventFilter,
) (map[string][]string, error) {
- return d.AccountData.SelectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
+ return d.AccountData.SelectAccountDataInRange(ctx, userID, r, accountDataFilterPart)
}
// UpsertAccountData keeps track of new or updated account data, by saving the type
@@ -417,7 +422,7 @@ func (d *Database) syncPositionTx(
func (d *Database) addPDUDeltaToResponse(
ctx context.Context,
device authtypes.Device,
- fromPos, toPos types.StreamPosition,
+ r types.Range,
numRecentEventsPerRoom int,
wantFullState bool,
res *types.Response,
@@ -443,11 +448,11 @@ func (d *Database) addPDUDeltaToResponse(
var deltas []stateDelta
if !wantFullState {
deltas, joinedRoomIDs, err = d.getStateDeltas(
- ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
+ ctx, &device, txn, r, device.UserID, &stateFilter,
)
} else {
deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync(
- ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter,
+ ctx, &device, txn, r, device.UserID, &stateFilter,
)
}
if err != nil {
@@ -455,14 +460,14 @@ func (d *Database) addPDUDeltaToResponse(
}
for _, delta := range deltas {
- err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res)
+ err = d.addRoomDeltaToResponse(ctx, &device, txn, r, delta, numRecentEventsPerRoom, res)
if err != nil {
return nil, err
}
}
// TODO: This should be done in getStateDeltas
- if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil {
+ if err = d.addInvitesToResponse(ctx, txn, device.UserID, r, res); err != nil {
return nil, err
}
@@ -534,8 +539,12 @@ func (d *Database) IncrementalSync(
var joinedRoomIDs []string
var err error
if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState {
+ r := types.Range{
+ From: fromPos.PDUPosition(),
+ To: toPos.PDUPosition(),
+ }
joinedRoomIDs, err = d.addPDUDeltaToResponse(
- ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res,
+ ctx, device, r, numRecentEventsPerRoom, wantFullState, res,
)
} else {
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(
@@ -589,6 +598,10 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
if err != nil {
return
}
+ r := types.Range{
+ From: 0,
+ To: toPos.PDUPosition(),
+ }
res = types.NewResponse(toPos)
@@ -611,8 +624,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
recentStreamEvents, err = d.OutputEvents.SelectRecentEvents(
- ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(),
- numRecentEventsPerRoom, true, true,
+ ctx, txn, roomID, r, numRecentEventsPerRoom, true, true,
)
if err != nil {
return
@@ -644,7 +656,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
res.Rooms.Join[roomID] = *jr
}
- if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil {
+ if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil {
return
}
@@ -686,11 +698,11 @@ var txReadOnlySnapshot = sql.TxOptions{
func (d *Database) addInvitesToResponse(
ctx context.Context, txn *sql.Tx,
userID string,
- fromPos, toPos types.StreamPosition,
+ r types.Range,
res *types.Response,
) error {
invites, err := d.Invites.SelectInviteEventsInRange(
- ctx, txn, userID, fromPos, toPos,
+ ctx, txn, userID, r,
)
if err != nil {
return err
@@ -726,12 +738,11 @@ func (d *Database) addRoomDeltaToResponse(
ctx context.Context,
device *authtypes.Device,
txn *sql.Tx,
- fromPos, toPos types.StreamPosition,
+ r types.Range,
delta stateDelta,
numRecentEventsPerRoom int,
res *types.Response,
) error {
- endPos := toPos
if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
@@ -739,10 +750,10 @@ func (d *Database) addRoomDeltaToResponse(
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
- endPos = delta.membershipPos
+ r.To = delta.membershipPos
}
recentStreamEvents, err := d.OutputEvents.SelectRecentEvents(
- ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos),
+ ctx, txn, delta.roomID, r,
numRecentEventsPerRoom, true, true,
)
if err != nil {
@@ -872,7 +883,7 @@ func (d *Database) fetchMissingStateEvents(
// A list of joined room IDs is also returned in case the caller needs it.
func (d *Database) getStateDeltas(
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
+ r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
// Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821
@@ -886,7 +897,7 @@ func (d *Database) getStateDeltas(
var deltas []stateDelta
// get all the state events ever between these two positions
- stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
+ stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
if err != nil {
return nil, nil, err
}
@@ -947,7 +958,7 @@ func (d *Database) getStateDeltas(
// updates for other rooms.
func (d *Database) getStateDeltasForFullStateSync(
ctx context.Context, device *authtypes.Device, txn *sql.Tx,
- fromPos, toPos types.StreamPosition, userID string,
+ r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
@@ -972,7 +983,7 @@ func (d *Database) getStateDeltasForFullStateSync(
}
// Get all the state events ever between these two positions
- stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter)
+ stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
if err != nil {
return nil, nil, err
}