aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams/stream_presence.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/streams/stream_presence.go')
-rw-r--r--syncapi/streams/stream_presence.go22
1 files changed, 15 insertions, 7 deletions
diff --git a/syncapi/streams/stream_presence.go b/syncapi/streams/stream_presence.go
index 15db4d30..81cea7d5 100644
--- a/syncapi/streams/stream_presence.go
+++ b/syncapi/streams/stream_presence.go
@@ -23,20 +23,26 @@ import (
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/syncapi/notifier"
+ "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type PresenceStreamProvider struct {
- StreamProvider
+ DefaultStreamProvider
// cache contains previously sent presence updates to avoid unneeded updates
cache sync.Map
notifier *notifier.Notifier
}
-func (p *PresenceStreamProvider) Setup() {
- p.StreamProvider.Setup()
+func (p *PresenceStreamProvider) Setup(
+ ctx context.Context, snapshot storage.DatabaseTransaction,
+) {
+ p.DefaultStreamProvider.Setup(ctx, snapshot)
- id, err := p.DB.MaxStreamPositionForPresence(context.Background())
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := snapshot.MaxStreamPositionForPresence(ctx)
if err != nil {
panic(err)
}
@@ -45,18 +51,20 @@ func (p *PresenceStreamProvider) Setup() {
func (p *PresenceStreamProvider) CompleteSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
) types.StreamPosition {
- return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+ return p.IncrementalSync(ctx, snapshot, req, 0, p.LatestPosition(ctx))
}
func (p *PresenceStreamProvider) IncrementalSync(
ctx context.Context,
+ snapshot storage.DatabaseTransaction,
req *types.SyncRequest,
from, to types.StreamPosition,
) types.StreamPosition {
// We pull out a larger number than the filter asks for, since we're filtering out events later
- presences, err := p.DB.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000})
+ presences, err := snapshot.PresenceAfter(ctx, from, gomatrixserverlib.EventFilter{Limit: 1000})
if err != nil {
req.Log.WithError(err).Error("p.DB.PresenceAfter failed")
return from
@@ -84,7 +92,7 @@ func (p *PresenceStreamProvider) IncrementalSync(
}
// Bear in mind that this might return nil, but at least populating
// a nil means that there's a map entry so we won't repeat this call.
- presences[roomUsers[i]], err = p.DB.GetPresence(ctx, roomUsers[i])
+ presences[roomUsers[i]], err = snapshot.GetPresence(ctx, roomUsers[i])
if err != nil {
req.Log.WithError(err).Error("unable to query presence for user")
return from