diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-09-30 12:48:10 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-30 12:48:10 +0100 |
commit | 6348486a1365c7469a498101f5035a9b6bd16d22 (patch) | |
tree | d8a5ba572c5fc4fdec383802de5fac3a5e13c24d /syncapi/streams/stream_sendtodevice.go | |
parent | 8a82f100460dc5ca7bd39ae2345c251d6622c494 (diff) |
Transactional isolation for `/sync` (#2745)
This should transactional snapshot isolation for `/sync` etc requests.
For now we don't use repeatable read due to some odd test failures with
invites.
Diffstat (limited to 'syncapi/streams/stream_sendtodevice.go')
-rw-r--r-- | syncapi/streams/stream_sendtodevice.go | 20 |
1 files changed, 14 insertions, 6 deletions
diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go index 31c6187c..00b67cc4 100644 --- a/syncapi/streams/stream_sendtodevice.go +++ b/syncapi/streams/stream_sendtodevice.go @@ -3,17 +3,23 @@ package streams import ( "context" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" ) type SendToDeviceStreamProvider struct { - StreamProvider + DefaultStreamProvider } -func (p *SendToDeviceStreamProvider) Setup() { - p.StreamProvider.Setup() +func (p *SendToDeviceStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) - id, err := p.DB.MaxStreamPositionForSendToDeviceMessages(context.Background()) + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := snapshot.MaxStreamPositionForSendToDeviceMessages(ctx) if err != nil { panic(err) } @@ -22,18 +28,20 @@ func (p *SendToDeviceStreamProvider) Setup() { func (p *SendToDeviceStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *SendToDeviceStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { // See if we have any new tasks to do for the send-to-device messaging. - lastPos, events, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to) + lastPos, events, err := snapshot.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, from, to) if err != nil { req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed") return from |