diff options
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 57 |
1 files changed, 34 insertions, 23 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index e03a6b9f..be1f9c7a 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -54,17 +54,22 @@ func (d *Database) GetEventsInStreamingRange( roomID string, limit int, backwardOrdering bool, ) (events []types.StreamEvent, err error) { + r := types.Range{ + From: from.PDUPosition(), + To: to.PDUPosition(), + Backwards: backwardOrdering, + } if backwardOrdering { // When using backward ordering, we want the most recent events first. if events, err = d.OutputEvents.SelectRecentEvents( - ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false, + ctx, nil, roomID, r, limit, false, false, ); err != nil { return } } else { // When using forward ordering, we want the least recent events first. if events, err = d.OutputEvents.SelectEarlyEvents( - ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit, + ctx, nil, roomID, r, limit, ); err != nil { return } @@ -167,10 +172,10 @@ func (d *Database) RetireInviteEvent( // If no data is retrieved, returns an empty map // If there was an issue with the retrieval, returns an error func (d *Database) GetAccountDataInRange( - ctx context.Context, userID string, oldPos, newPos types.StreamPosition, + ctx context.Context, userID string, r types.Range, accountDataFilterPart *gomatrixserverlib.EventFilter, ) (map[string][]string, error) { - return d.AccountData.SelectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart) + return d.AccountData.SelectAccountDataInRange(ctx, userID, r, accountDataFilterPart) } // UpsertAccountData keeps track of new or updated account data, by saving the type @@ -417,7 +422,7 @@ func (d *Database) syncPositionTx( func (d *Database) addPDUDeltaToResponse( ctx context.Context, device authtypes.Device, - fromPos, toPos types.StreamPosition, + r types.Range, numRecentEventsPerRoom int, wantFullState bool, res *types.Response, @@ -443,11 +448,11 @@ func (d *Database) addPDUDeltaToResponse( var deltas []stateDelta if !wantFullState { deltas, joinedRoomIDs, err = d.getStateDeltas( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter, + ctx, &device, txn, r, device.UserID, &stateFilter, ) } else { deltas, joinedRoomIDs, err = d.getStateDeltasForFullStateSync( - ctx, &device, txn, fromPos, toPos, device.UserID, &stateFilter, + ctx, &device, txn, r, device.UserID, &stateFilter, ) } if err != nil { @@ -455,14 +460,14 @@ func (d *Database) addPDUDeltaToResponse( } for _, delta := range deltas { - err = d.addRoomDeltaToResponse(ctx, &device, txn, fromPos, toPos, delta, numRecentEventsPerRoom, res) + err = d.addRoomDeltaToResponse(ctx, &device, txn, r, delta, numRecentEventsPerRoom, res) if err != nil { return nil, err } } // TODO: This should be done in getStateDeltas - if err = d.addInvitesToResponse(ctx, txn, device.UserID, fromPos, toPos, res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, device.UserID, r, res); err != nil { return nil, err } @@ -534,8 +539,12 @@ func (d *Database) IncrementalSync( var joinedRoomIDs []string var err error if fromPos.PDUPosition() != toPos.PDUPosition() || wantFullState { + r := types.Range{ + From: fromPos.PDUPosition(), + To: toPos.PDUPosition(), + } joinedRoomIDs, err = d.addPDUDeltaToResponse( - ctx, device, fromPos.PDUPosition(), toPos.PDUPosition(), numRecentEventsPerRoom, wantFullState, res, + ctx, device, r, numRecentEventsPerRoom, wantFullState, res, ) } else { joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership( @@ -589,6 +598,10 @@ func (d *Database) getResponseWithPDUsForCompleteSync( if err != nil { return } + r := types.Range{ + From: 0, + To: toPos.PDUPosition(), + } res = types.NewResponse(toPos) @@ -611,8 +624,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent recentStreamEvents, err = d.OutputEvents.SelectRecentEvents( - ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(), - numRecentEventsPerRoom, true, true, + ctx, txn, roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { return @@ -644,7 +656,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync( res.Rooms.Join[roomID] = *jr } - if err = d.addInvitesToResponse(ctx, txn, userID, 0, toPos.PDUPosition(), res); err != nil { + if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil { return } @@ -686,11 +698,11 @@ var txReadOnlySnapshot = sql.TxOptions{ func (d *Database) addInvitesToResponse( ctx context.Context, txn *sql.Tx, userID string, - fromPos, toPos types.StreamPosition, + r types.Range, res *types.Response, ) error { invites, err := d.Invites.SelectInviteEventsInRange( - ctx, txn, userID, fromPos, toPos, + ctx, txn, userID, r, ) if err != nil { return err @@ -726,12 +738,11 @@ func (d *Database) addRoomDeltaToResponse( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, + r types.Range, delta stateDelta, numRecentEventsPerRoom int, res *types.Response, ) error { - endPos := toPos if delta.membershipPos > 0 && delta.membership == gomatrixserverlib.Leave { // make sure we don't leak recent events after the leave event. // TODO: History visibility makes this somewhat complex to handle correctly. For example: @@ -739,10 +750,10 @@ func (d *Database) addRoomDeltaToResponse( // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave // in a single /sync request // This is all "okay" assuming history_visibility == "shared" which it is by default. - endPos = delta.membershipPos + r.To = delta.membershipPos } recentStreamEvents, err := d.OutputEvents.SelectRecentEvents( - ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos), + ctx, txn, delta.roomID, r, numRecentEventsPerRoom, true, true, ) if err != nil { @@ -872,7 +883,7 @@ func (d *Database) fetchMissingStateEvents( // A list of joined room IDs is also returned in case the caller needs it. func (d *Database) getStateDeltas( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, + r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]stateDelta, []string, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 @@ -886,7 +897,7 @@ func (d *Database) getStateDeltas( var deltas []stateDelta // get all the state events ever between these two positions - stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) if err != nil { return nil, nil, err } @@ -947,7 +958,7 @@ func (d *Database) getStateDeltas( // updates for other rooms. func (d *Database) getStateDeltasForFullStateSync( ctx context.Context, device *authtypes.Device, txn *sql.Tx, - fromPos, toPos types.StreamPosition, userID string, + r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]stateDelta, []string, error) { joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join) @@ -972,7 +983,7 @@ func (d *Database) getStateDeltasForFullStateSync( } // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilter) + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) if err != nil { return nil, nil, err } |