aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-29 16:32:54 +0000
committerGitHub <noreply@github.com>2021-01-29 16:32:54 +0000
commit62a325ded8d6d4fc72553179da81b509179cc342 (patch)
treecc81204a77ce325d4af9ff77b745f4319374f263 /syncapi
parent6d1c6f29e05a11830d45f5a229578e91fc012d4b (diff)
Complete sync performance (#1741)
* Parallelise PDU stream fetching for complete sync * Fixes * Fixes * Worker queue * Workers * Don't populate device list changes on complete sync * Don't fast-forward typing notifications either on complete sync * Revert "Don't fast-forward typing notifications either on complete sync" This reverts commit 01471f78431cdd840915111f71bd2b5176e584a8. * Comments
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/streams/stream_devicelist.go2
-rw-r--r--syncapi/streams/stream_pdu.go71
2 files changed, 61 insertions, 12 deletions
diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go
index c43d50a4..9ea9d088 100644
--- a/syncapi/streams/stream_devicelist.go
+++ b/syncapi/streams/stream_devicelist.go
@@ -19,7 +19,7 @@ func (p *DeviceListStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.LogPosition {
- return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx))
+ return p.LatestPosition(ctx)
}
func (p *DeviceListStreamProvider) IncrementalSync(
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
index d6d7ff44..ae38dc30 100644
--- a/syncapi/streams/stream_pdu.go
+++ b/syncapi/streams/stream_pdu.go
@@ -2,18 +2,54 @@ package streams
import (
"context"
+ "sync"
+ "time"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
+ "go.uber.org/atomic"
)
+// The max number of per-room goroutines to have running.
+// Too high and this will consume lots of CPU, too low and complete
+// sync responses will take longer to process.
+const PDU_STREAM_WORKERS = 256
+
+// The maximum number of tasks that can be queued in total before
+// backpressure will build up and the rests will start to block.
+const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
+
type PDUStreamProvider struct {
StreamProvider
+
+ tasks chan func()
+ workers atomic.Int32
+}
+
+func (p *PDUStreamProvider) worker() {
+ defer p.workers.Dec()
+ for {
+ select {
+ case f := <-p.tasks:
+ f()
+ case <-time.After(time.Second * 10):
+ return
+ }
+ }
+}
+
+func (p *PDUStreamProvider) queue(f func()) {
+ if p.workers.Load() < PDU_STREAM_WORKERS {
+ p.workers.Inc()
+ go p.worker()
+ }
+ p.tasks <- f
}
func (p *PDUStreamProvider) Setup() {
p.StreamProvider.Setup()
+ p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE)
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
@@ -52,19 +88,32 @@ func (p *PDUStreamProvider) CompleteSync(
eventFilter := req.Filter.Room.Timeline
// Build up a /sync response. Add joined rooms.
- for _, roomID := range joinedRoomIDs {
- var jr *types.JoinResponse
- jr, err = p.getJoinResponseForCompleteSync(
- ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
- )
- if err != nil {
- req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
- return from
- }
- req.Response.Rooms.Join[roomID] = *jr
- req.Rooms[roomID] = gomatrixserverlib.Join
+ var reqMutex sync.Mutex
+ var reqWaitGroup sync.WaitGroup
+ reqWaitGroup.Add(len(joinedRoomIDs))
+ for _, room := range joinedRoomIDs {
+ roomID := room
+ p.queue(func() {
+ defer reqWaitGroup.Done()
+
+ var jr *types.JoinResponse
+ jr, err = p.getJoinResponseForCompleteSync(
+ ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
+ )
+ if err != nil {
+ req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
+ return
+ }
+
+ reqMutex.Lock()
+ defer reqMutex.Unlock()
+ req.Response.Rooms.Join[roomID] = *jr
+ req.Rooms[roomID] = gomatrixserverlib.Join
+ })
}
+ reqWaitGroup.Wait()
+
// Add peeked rooms.
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
if err != nil {