aboutsummaryrefslogtreecommitdiff
path: root/syncapi/storage/shared/syncserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/storage/shared/syncserver.go')
-rw-r--r--syncapi/storage/shared/syncserver.go68
1 files changed, 39 insertions, 29 deletions
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 71a42003..128aaa5b 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -492,6 +492,7 @@ func (d *Database) syncPositionTx(
PDUPosition: types.StreamPosition(maxEventID),
TypingPosition: types.StreamPosition(d.EDUCache.GetLatestSyncPosition()),
ReceiptPosition: types.StreamPosition(maxReceiptID),
+ InvitePosition: types.StreamPosition(maxInviteID),
}
return
}
@@ -543,11 +544,6 @@ func (d *Database) addPDUDeltaToResponse(
}
}
- // TODO: This should be done in getStateDeltas
- if err = d.addInvitesToResponse(ctx, txn, device.UserID, r, res); err != nil {
- return nil, fmt.Errorf("d.addInvitesToResponse: %w", err)
- }
-
succeeded = true
return joinedRoomIDs, nil
}
@@ -583,6 +579,7 @@ func (d *Database) addTypingDeltaToResponse(
res.Rooms.Join[roomID] = jr
}
}
+ res.NextBatch.TypingPosition = types.StreamPosition(d.EDUCache.GetLatestSyncPosition())
return nil
}
@@ -593,7 +590,7 @@ func (d *Database) addReceiptDeltaToResponse(
joinedRoomIDs []string,
res *types.Response,
) error {
- receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition)
+ lastPos, receipts, err := d.Receipts.SelectRoomReceiptsAfter(context.TODO(), joinedRoomIDs, since.ReceiptPosition)
if err != nil {
return fmt.Errorf("unable to select receipts for rooms: %w", err)
}
@@ -638,6 +635,7 @@ func (d *Database) addReceiptDeltaToResponse(
res.Rooms.Join[roomID] = jr
}
+ res.NextBatch.ReceiptPosition = lastPos
return nil
}
@@ -691,8 +689,7 @@ func (d *Database) IncrementalSync(
numRecentEventsPerRoom int,
wantFullState bool,
) (*types.Response, error) {
- nextBatchPos := fromPos.WithUpdates(toPos)
- res.NextBatch = nextBatchPos.String()
+ res.NextBatch = fromPos.WithUpdates(toPos)
var joinedRoomIDs []string
var err error
@@ -725,6 +722,14 @@ func (d *Database) IncrementalSync(
return nil, fmt.Errorf("d.addEDUDeltaToResponse: %w", err)
}
+ ir := types.Range{
+ From: fromPos.InvitePosition,
+ To: toPos.InvitePosition,
+ }
+ if err = d.addInvitesToResponse(ctx, nil, device.UserID, ir, res); err != nil {
+ return nil, fmt.Errorf("d.addInvitesToResponse: %w", err)
+ }
+
return res, nil
}
@@ -783,8 +788,12 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
From: 0,
To: toPos.PDUPosition,
}
+ ir := types.Range{
+ From: 0,
+ To: toPos.InvitePosition,
+ }
- res.NextBatch = toPos.String()
+ res.NextBatch.ApplyUpdates(toPos)
// Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err = d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
@@ -824,7 +833,7 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
}
}
- if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil {
+ if err = d.addInvitesToResponse(ctx, txn, userID, ir, res); err != nil {
return
}
@@ -884,19 +893,18 @@ func (d *Database) getJoinResponseForCompleteSync(
// Retrieve the backward topology position, i.e. the position of the
// oldest event in the room's topology.
- var prevBatchStr string
+ var prevBatch *types.TopologyToken
if len(recentStreamEvents) > 0 {
var backwardTopologyPos, backwardStreamPos types.StreamPosition
backwardTopologyPos, backwardStreamPos, err = d.Topology.SelectPositionInTopology(ctx, txn, recentStreamEvents[0].EventID())
if err != nil {
return
}
- prevBatch := types.TopologyToken{
+ prevBatch = &types.TopologyToken{
Depth: backwardTopologyPos,
PDUPosition: backwardStreamPos,
}
prevBatch.Decrement()
- prevBatchStr = prevBatch.String()
}
// We don't include a device here as we don't need to send down
@@ -905,7 +913,7 @@ func (d *Database) getJoinResponseForCompleteSync(
recentEvents := d.StreamEventsToEvents(&device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr = types.NewJoinResponse()
- jr.Timeline.PrevBatch = prevBatchStr
+ jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
@@ -1033,7 +1041,7 @@ func (d *Database) addRoomDeltaToResponse(
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
- jr.Timeline.PrevBatch = prevBatch.String()
+ jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
@@ -1041,7 +1049,7 @@ func (d *Database) addRoomDeltaToResponse(
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
- jr.Timeline.PrevBatch = prevBatch.String()
+ jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
@@ -1052,7 +1060,7 @@ func (d *Database) addRoomDeltaToResponse(
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
- lr.Timeline.PrevBatch = prevBatch.String()
+ lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
@@ -1373,39 +1381,40 @@ func (d *Database) SendToDeviceUpdatesWaiting(
}
func (d *Database) StoreNewSendForDeviceMessage(
- ctx context.Context, streamPos types.StreamPosition, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent,
-) (types.StreamPosition, error) {
+ ctx context.Context, userID, deviceID string, event gomatrixserverlib.SendToDeviceEvent,
+) (newPos types.StreamPosition, err error) {
j, err := json.Marshal(event)
if err != nil {
- return streamPos, err
+ return 0, err
}
// Delegate the database write task to the SendToDeviceWriter. It'll guarantee
// that we don't lock the table for writes in more than one place.
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
- return d.SendToDevice.InsertSendToDeviceMessage(
+ newPos, err = d.SendToDevice.InsertSendToDeviceMessage(
ctx, txn, userID, deviceID, string(j),
)
+ return err
})
if err != nil {
- return streamPos, err
+ return 0, err
}
- return streamPos, nil
+ return 0, nil
}
func (d *Database) SendToDeviceUpdatesForSync(
ctx context.Context,
userID, deviceID string,
token types.StreamingToken,
-) ([]types.SendToDeviceEvent, []types.SendToDeviceNID, []types.SendToDeviceNID, error) {
+) (types.StreamPosition, []types.SendToDeviceEvent, []types.SendToDeviceNID, []types.SendToDeviceNID, error) {
// First of all, get our send-to-device updates for this user.
- events, err := d.SendToDevice.SelectSendToDeviceMessages(ctx, nil, userID, deviceID)
+ lastPos, events, err := d.SendToDevice.SelectSendToDeviceMessages(ctx, nil, userID, deviceID)
if err != nil {
- return nil, nil, nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err)
+ return 0, nil, nil, nil, fmt.Errorf("d.SendToDevice.SelectSendToDeviceMessages: %w", err)
}
// If there's nothing to do then stop here.
if len(events) == 0 {
- return nil, nil, nil, nil
+ return 0, nil, nil, nil, nil
}
// Work out whether we need to update any of the database entries.
@@ -1432,7 +1441,7 @@ func (d *Database) SendToDeviceUpdatesForSync(
}
}
- return toReturn, toUpdate, toDelete, nil
+ return lastPos, toReturn, toUpdate, toDelete, nil
}
func (d *Database) CleanSendToDeviceUpdates(
@@ -1519,5 +1528,6 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId
}
func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) {
- return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
+ _, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
+ return receipts, err
}