diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2023-11-09 08:43:27 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-09 08:43:27 +0100 |
commit | 7863a405a5f41acd2e40b40ec288eebe781eac1a (patch) | |
tree | 1d94d328f0981ca1b9c77d828c242f474e54147c /userapi | |
parent | 699f5ca8c1f73ff7e4b70f0f9273ffcb1c195cdc (diff) |
Use `IsBlacklistedOrBackingOff` to determine if we should try to fetch devices (#3254)
Use `IsBlacklistedOrBackingOff` from the federation API to check if we
should fetch devices.
To reduce back pressure, we now only queue retrying servers if there's
space in the channel.
Diffstat (limited to 'userapi')
-rw-r--r-- | userapi/internal/device_list_update.go | 80 | ||||
-rw-r--r-- | userapi/internal/device_list_update_test.go | 79 | ||||
-rw-r--r-- | userapi/userapi.go | 5 |
3 files changed, 139 insertions, 25 deletions
diff --git a/userapi/internal/device_list_update.go b/userapi/internal/device_list_update.go index a4d28188..b4063516 100644 --- a/userapi/internal/device_list_update.go +++ b/userapi/internal/device_list_update.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/matrix-org/dendrite/federationapi/statistics" rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib/fclient" "github.com/matrix-org/gomatrixserverlib/spec" @@ -108,6 +109,8 @@ type DeviceListUpdater struct { userIDToChan map[string]chan bool userIDToChanMu *sync.Mutex rsAPI rsapi.KeyserverRoomserverAPI + + isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error) } // DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater. @@ -167,25 +170,28 @@ func NewDeviceListUpdater( process *process.ProcessContext, db DeviceListUpdaterDatabase, api DeviceListUpdaterAPI, producer KeyChangeProducer, fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int, - rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName, + rsAPI rsapi.KeyserverRoomserverAPI, + thisServer spec.ServerName, enableMetrics bool, + isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error), ) *DeviceListUpdater { if enableMetrics { prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying) } return &DeviceListUpdater{ - process: process, - userIDToMutex: make(map[string]*sync.Mutex), - mu: &sync.Mutex{}, - db: db, - api: api, - producer: producer, - fedClient: fedClient, - thisServer: thisServer, - workerChans: make([]chan spec.ServerName, numWorkers), - userIDToChan: make(map[string]chan bool), - userIDToChanMu: &sync.Mutex{}, - rsAPI: rsAPI, + process: process, + userIDToMutex: make(map[string]*sync.Mutex), + mu: &sync.Mutex{}, + db: db, + api: api, + producer: producer, + fedClient: fedClient, + thisServer: thisServer, + workerChans: make([]chan spec.ServerName, numWorkers), + userIDToChan: make(map[string]chan bool), + userIDToChanMu: &sync.Mutex{}, + rsAPI: rsAPI, + isBlacklistedOrBackingOffFn: isBlacklistedOrBackingOffFn, } } @@ -362,13 +368,22 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) { if err != nil { return } + _, err = u.isBlacklistedOrBackingOffFn(remoteServer) + var federationClientError *fedsenderapi.FederationClientError + if errors.As(err, &federationClientError) { + if federationClientError.Blacklisted { + return + } + } + hash := fnv.New32a() _, _ = hash.Write([]byte(remoteServer)) index := int(int64(hash.Sum32()) % int64(len(u.workerChans))) ch := u.assignChannel(userID) + // Since workerChans are buffered, we only increment here and let the worker + // decrement it once it is done processing. deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc() - defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec() u.workerChans[index] <- remoteServer select { case <-ch: @@ -405,24 +420,38 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) { go func() { var serversToRetry []spec.ServerName for { - serversToRetry = serversToRetry[:0] // reuse memory - time.Sleep(time.Second) + // nuke serversToRetry by re-slicing it to be "empty". + // The capacity of the slice is unchanged, which ensures we can reuse the memory. + serversToRetry = serversToRetry[:0] + + deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries))) + time.Sleep(time.Second * 2) + + // -2, so we have space for incoming device list updates over federation + maxServers := (cap(ch) - len(ch)) - 2 + if maxServers <= 0 { + continue + } + retriesMu.Lock() now := time.Now() for srv, retryAt := range retries { if now.After(retryAt) { serversToRetry = append(serversToRetry, srv) + if maxServers == len(serversToRetry) { + break + } } } + for _, srv := range serversToRetry { delete(retries, srv) } - deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries))) retriesMu.Unlock() + for _, srv := range serversToRetry { deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc() ch <- srv - deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec() } } }() @@ -430,8 +459,18 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) { retriesMu.Lock() _, exists := retries[serverName] retriesMu.Unlock() - if exists { - // Don't retry a server that we're already waiting for. + + // If the serverName is coming from retries, maybe it was + // blacklisted in the meantime. + _, err := u.isBlacklistedOrBackingOffFn(serverName) + var federationClientError *fedsenderapi.FederationClientError + // unwrap errors and check for FederationClientError, if found, federationClientError will be not nil + errors.As(err, &federationClientError) + isBlacklisted := federationClientError != nil && federationClientError.Blacklisted + + // Don't retry a server that we're already waiting for or is blacklisted by now. + if exists || isBlacklisted { + deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec() continue } waitTime, shouldRetry := u.processServer(serverName) @@ -442,6 +481,7 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) { } retriesMu.Unlock() } + deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec() } } diff --git a/userapi/internal/device_list_update_test.go b/userapi/internal/device_list_update_test.go index 14a49bc5..a2f1869d 100644 --- a/userapi/internal/device_list_update_test.go +++ b/userapi/internal/device_list_update_test.go @@ -27,6 +27,8 @@ import ( "testing" "time" + api2 "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" @@ -129,6 +131,10 @@ type mockDeviceListUpdaterAPI struct { func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) { } +var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) { + return &statistics.ServerStatistics{}, nil +} + type roundTripper struct { fn func(*http.Request) (*http.Response, error) } @@ -162,7 +168,7 @@ func TestUpdateHavePrevID(t *testing.T) { } ap := &mockDeviceListUpdaterAPI{} producer := &mockKeyChangeProducer{} - updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics) + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff) event := gomatrixserverlib.DeviceListUpdateEvent{ DeviceDisplayName: "Foo Bar", Deleted: false, @@ -234,7 +240,7 @@ func TestUpdateNoPrevID(t *testing.T) { `)), }, nil }) - updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics) + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics, testIsBlacklistedOrBackingOff) if err := updater.Start(); err != nil { t.Fatalf("failed to start updater: %s", err) } @@ -304,7 +310,7 @@ func TestDebounce(t *testing.T) { close(incomingFedReq) return <-fedCh, nil }) - updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics) + updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff) if err := updater.Start(); err != nil { t.Fatalf("failed to start updater: %s", err) } @@ -407,7 +413,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) { updater := NewDeviceListUpdater(processCtx, db, nil, nil, nil, - 0, rsAPI, "test", caching.DisableMetrics) + 0, rsAPI, "test", caching.DisableMetrics, testIsBlacklistedOrBackingOff) if err := updater.CleanUp(); err != nil { t.Error(err) } @@ -475,3 +481,68 @@ func Test_dedupeStateList(t *testing.T) { }) } } + +func TestDeviceListUpdaterIgnoreBlacklisted(t *testing.T) { + unreachableServer := spec.ServerName("notlocalhost") + + updater := DeviceListUpdater{ + workerChans: make([]chan spec.ServerName, 1), + isBlacklistedOrBackingOffFn: func(s spec.ServerName) (*statistics.ServerStatistics, error) { + switch s { + case unreachableServer: + return nil, &api2.FederationClientError{Blacklisted: true} + } + return nil, nil + }, + mu: &sync.Mutex{}, + userIDToChanMu: &sync.Mutex{}, + userIDToChan: make(map[string]chan bool), + userIDToMutex: make(map[string]*sync.Mutex), + } + workerCh := make(chan spec.ServerName) + defer close(workerCh) + updater.workerChans[0] = workerCh + + // happy case + alice := "@alice:localhost" + aliceCh := updater.assignChannel(alice) + defer updater.clearChannel(alice) + + // failing case + bob := "@bob:" + unreachableServer + bobCh := updater.assignChannel(string(bob)) + defer updater.clearChannel(string(bob)) + + expectedServers := map[spec.ServerName]struct{}{ + "localhost": {}, + } + unexpectedServers := make(map[spec.ServerName]struct{}) + + go func() { + for serverName := range workerCh { + switch serverName { + case "localhost": + delete(expectedServers, serverName) + aliceCh <- true // unblock notifyWorkers + case unreachableServer: // this should not happen as it is "filtered" away by the blacklist + unexpectedServers[serverName] = struct{}{} + bobCh <- true + default: + unexpectedServers[serverName] = struct{}{} + } + } + }() + + // alice is not blacklisted + updater.notifyWorkers(alice) + // bob is blacklisted + updater.notifyWorkers(string(bob)) + + for server := range expectedServers { + t.Errorf("Server still in expectedServers map: %s", server) + } + + for server := range unexpectedServers { + t.Errorf("unexpected server in result: %s", server) + } +} diff --git a/userapi/userapi.go b/userapi/userapi.go index 34bf119a..a1c9b94a 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -18,10 +18,12 @@ import ( "time" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/statistics" "github.com/matrix-org/dendrite/internal/pushgateway" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" + "github.com/matrix-org/gomatrixserverlib/spec" "github.com/sirupsen/logrus" rsapi "github.com/matrix-org/dendrite/roomserver/api" @@ -47,6 +49,7 @@ func NewInternalAPI( rsAPI rsapi.UserRoomserverAPI, fedClient fedsenderapi.KeyserverFederationAPI, enableMetrics bool, + blacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error), ) *internal.UserInternalAPI { js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream) appServices := dendriteCfg.Derived.ApplicationServices @@ -100,7 +103,7 @@ func NewInternalAPI( FedClient: fedClient, } - updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics) + updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics, blacklistedOrBackingOffFn) userAPI.Updater = updater // Remove users which we don't share a room with anymore if err := updater.CleanUp(); err != nil { |