aboutsummaryrefslogtreecommitdiff
path: root/userapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-10-31 16:39:45 +0100
committerGitHub <noreply@github.com>2023-10-31 16:39:45 +0100
commitda7bca0224760a4fe0e10876a9c11da333513a29 (patch)
tree87ce9477ac5256c5ccfbadd0dfe989fe4cb15a3e /userapi
parent32f7c4b166c5e74ef34973a1d6a30e5e2d75c3ed (diff)
Some tweaks for the device list updater (#3251)
This makes the following changes: - Adds two new metrics observing the usage of the `DeviceListUpdater` workers - Makes the number of workers configurable - Adds a 30s timeout for DB requests when receiving a device list update over federation
Diffstat (limited to 'userapi')
-rw-r--r--userapi/consumers/devicelistupdate.go6
-rw-r--r--userapi/internal/device_list_update.go33
-rw-r--r--userapi/internal/device_list_update_test.go9
-rw-r--r--userapi/userapi.go3
4 files changed, 43 insertions, 8 deletions
diff --git a/userapi/consumers/devicelistupdate.go b/userapi/consumers/devicelistupdate.go
index 3389bb80..b3ccb573 100644
--- a/userapi/consumers/devicelistupdate.go
+++ b/userapi/consumers/devicelistupdate.go
@@ -17,6 +17,7 @@ package consumers
import (
"context"
"encoding/json"
+ "time"
"github.com/matrix-org/dendrite/userapi/internal"
"github.com/matrix-org/gomatrixserverlib"
@@ -82,7 +83,10 @@ func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M
return true
}
- err := t.updater.Update(ctx, m)
+ timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+
+ err := t.updater.Update(timeoutCtx, m)
if err != nil {
logrus.WithFields(logrus.Fields{
"user_id": m.UserID,
diff --git a/userapi/internal/device_list_update.go b/userapi/internal/device_list_update.go
index 2f33589f..a4d28188 100644
--- a/userapi/internal/device_list_update.go
+++ b/userapi/internal/device_list_update.go
@@ -21,6 +21,7 @@ import (
"fmt"
"hash/fnv"
"net"
+ "strconv"
"sync"
"time"
@@ -142,13 +143,36 @@ type KeyChangeProducer interface {
ProduceKeyChanges(keys []api.DeviceMessage) error
}
+var deviceListUpdaterBackpressure = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "keyserver",
+ Name: "worker_backpressure",
+ Help: "How many device list updater requests are queued",
+ },
+ []string{"worker_id"},
+)
+var deviceListUpdaterServersRetrying = prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "keyserver",
+ Name: "worker_servers_retrying",
+ Help: "How many servers are queued for retry",
+ },
+ []string{"worker_id"},
+)
+
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
func NewDeviceListUpdater(
process *process.ProcessContext, db DeviceListUpdaterDatabase,
api DeviceListUpdaterAPI, producer KeyChangeProducer,
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName,
+ enableMetrics bool,
) *DeviceListUpdater {
+ if enableMetrics {
+ prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying)
+ }
return &DeviceListUpdater{
process: process,
userIDToMutex: make(map[string]*sync.Mutex),
@@ -173,7 +197,7 @@ func (u *DeviceListUpdater) Start() error {
// to stop (in this transaction) until key requests can be made.
ch := make(chan spec.ServerName, 10)
u.workerChans[i] = ch
- go u.worker(ch)
+ go u.worker(ch, i)
}
staleLists, err := u.db.StaleDeviceLists(u.process.Context(), []spec.ServerName{})
@@ -343,6 +367,8 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
ch := u.assignChannel(userID)
+ deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
+ defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec()
u.workerChans[index] <- remoteServer
select {
case <-ch:
@@ -372,7 +398,7 @@ func (u *DeviceListUpdater) clearChannel(userID string) {
}
}
-func (u *DeviceListUpdater) worker(ch chan spec.ServerName) {
+func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
retries := make(map[spec.ServerName]time.Time)
retriesMu := &sync.Mutex{}
// restarter goroutine which will inject failed servers into ch when it is time
@@ -391,9 +417,12 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName) {
for _, srv := range serversToRetry {
delete(retries, srv)
}
+ deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
retriesMu.Unlock()
for _, srv := range serversToRetry {
+ deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
ch <- srv
+ deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
}
}
}()
diff --git a/userapi/internal/device_list_update_test.go b/userapi/internal/device_list_update_test.go
index 38fd8b58..14a49bc5 100644
--- a/userapi/internal/device_list_update_test.go
+++ b/userapi/internal/device_list_update_test.go
@@ -27,6 +27,7 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/fclient"
@@ -161,7 +162,7 @@ func TestUpdateHavePrevID(t *testing.T) {
}
ap := &mockDeviceListUpdaterAPI{}
producer := &mockKeyChangeProducer{}
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost")
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics)
event := gomatrixserverlib.DeviceListUpdateEvent{
DeviceDisplayName: "Foo Bar",
Deleted: false,
@@ -233,7 +234,7 @@ func TestUpdateNoPrevID(t *testing.T) {
`)),
}, nil
})
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test")
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics)
if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err)
}
@@ -303,7 +304,7 @@ func TestDebounce(t *testing.T) {
close(incomingFedReq)
return <-fedCh, nil
})
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost")
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics)
if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err)
}
@@ -406,7 +407,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) {
updater := NewDeviceListUpdater(processCtx, db, nil,
nil, nil,
- 0, rsAPI, "test")
+ 0, rsAPI, "test", caching.DisableMetrics)
if err := updater.CleanUp(); err != nil {
t.Error(err)
}
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 6b6dac88..34bf119a 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -46,6 +46,7 @@ func NewInternalAPI(
natsInstance *jetstream.NATSInstance,
rsAPI rsapi.UserRoomserverAPI,
fedClient fedsenderapi.KeyserverFederationAPI,
+ enableMetrics bool,
) *internal.UserInternalAPI {
js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
appServices := dendriteCfg.Derived.ApplicationServices
@@ -99,7 +100,7 @@ func NewInternalAPI(
FedClient: fedClient,
}
- updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, dendriteCfg.Global.ServerName) // 8 workers TODO: configurable
+ updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics)
userAPI.Updater = updater
// Remove users which we don't share a room with anymore
if err := updater.CleanUp(); err != nil {