diff options
Diffstat (limited to 'syncapi/streams/stream_accountdata.go')
-rw-r--r-- | syncapi/streams/stream_accountdata.go | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go index 0297d5c2..3f2f7d13 100644 --- a/syncapi/streams/stream_accountdata.go +++ b/syncapi/streams/stream_accountdata.go @@ -5,22 +5,25 @@ import ( "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" ) type AccountDataStreamProvider struct { - StreamProvider + DefaultStreamProvider userAPI userapi.SyncUserAPI } -func (p *AccountDataStreamProvider) Setup() { - p.StreamProvider.Setup() +func (p *AccountDataStreamProvider) Setup( + ctx context.Context, snapshot storage.DatabaseTransaction, +) { + p.DefaultStreamProvider.Setup(ctx, snapshot) p.latestMutex.Lock() defer p.latestMutex.Unlock() - id, err := p.DB.MaxStreamPositionForAccountData(context.Background()) + id, err := snapshot.MaxStreamPositionForAccountData(ctx) if err != nil { panic(err) } @@ -29,13 +32,15 @@ func (p *AccountDataStreamProvider) Setup() { func (p *AccountDataStreamProvider) CompleteSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, ) types.StreamPosition { - return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) + return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx)) } func (p *AccountDataStreamProvider) IncrementalSync( ctx context.Context, + snapshot storage.DatabaseTransaction, req *types.SyncRequest, from, to types.StreamPosition, ) types.StreamPosition { @@ -44,7 +49,7 @@ func (p *AccountDataStreamProvider) IncrementalSync( To: to, } - dataTypes, pos, err := p.DB.GetAccountDataInRange( + dataTypes, pos, err := snapshot.GetAccountDataInRange( ctx, req.Device.UserID, r, &req.Filter.AccountData, ) if err != nil { |