aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-11-09 08:43:27 +0100
committerGitHub <noreply@github.com>2023-11-09 08:43:27 +0100
commit7863a405a5f41acd2e40b40ec288eebe781eac1a (patch)
tree1d94d328f0981ca1b9c77d828c242f474e54147c
parent699f5ca8c1f73ff7e4b70f0f9273ffcb1c195cdc (diff)
Use `IsBlacklistedOrBackingOff` to determine if we should try to fetch devices (#3254)
Use `IsBlacklistedOrBackingOff` from the federation API to check if we should fetch devices. To reduce back pressure, we now only queue retrying servers if there's space in the channel.
-rw-r--r--appservice/appservice_test.go11
-rw-r--r--build/dendritejs-pinecone/main.go4
-rw-r--r--build/gobind-yggdrasil/monolith.go2
-rw-r--r--clientapi/admin_test.go20
-rw-r--r--clientapi/clientapi_test.go31
-rw-r--r--clientapi/routing/joinroom_test.go7
-rw-r--r--clientapi/routing/login_test.go2
-rw-r--r--clientapi/routing/register_test.go6
-rw-r--r--cmd/dendrite-demo-pinecone/monolith/monolith.go2
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go9
-rw-r--r--cmd/dendrite/main.go2
-rw-r--r--federationapi/federationapi.go3
-rw-r--r--federationapi/internal/api.go4
-rw-r--r--federationapi/routing/profile_test.go8
-rw-r--r--federationapi/routing/query_test.go8
-rw-r--r--federationapi/routing/send_test.go8
-rw-r--r--roomserver/roomserver_test.go11
-rw-r--r--userapi/internal/device_list_update.go80
-rw-r--r--userapi/internal/device_list_update_test.go79
-rw-r--r--userapi/userapi.go5
20 files changed, 212 insertions, 90 deletions
diff --git a/appservice/appservice_test.go b/appservice/appservice_test.go
index bbdeb47d..eca63371 100644
--- a/appservice/appservice_test.go
+++ b/appservice/appservice_test.go
@@ -14,6 +14,7 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/appservice"
@@ -32,6 +33,10 @@ import (
"github.com/matrix-org/dendrite/test/testrig"
)
+var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
+ return &statistics.ServerStatistics{}, nil
+}
+
func TestAppserviceInternalAPI(t *testing.T) {
// Set expected results
@@ -144,7 +149,7 @@ func TestAppserviceInternalAPI(t *testing.T) {
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)
runCases(t, asAPI)
@@ -239,7 +244,7 @@ func TestAppserviceInternalAPI_UnixSocket_Simple(t *testing.T) {
cm := sqlutil.NewConnectionManager(ctx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(ctx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ usrAPI := userapi.NewInternalAPI(ctx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(ctx, cfg, &natsInstance, usrAPI, rsAPI)
t.Run("UserIDExists", func(t *testing.T) {
@@ -378,7 +383,7 @@ func TestRoomserverConsumerOneInvite(t *testing.T) {
// Create required internal APIs
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics)
+ usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// start the consumer
appservice.NewInternalAPI(processCtx, cfg, natsInstance, usrAPI, rsAPI)
diff --git a/build/dendritejs-pinecone/main.go b/build/dendritejs-pinecone/main.go
index d3d5f59f..6acc93c7 100644
--- a/build/dendritejs-pinecone/main.go
+++ b/build/dendritejs-pinecone/main.go
@@ -191,13 +191,13 @@ func startup() {
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics)
+ fedSenderAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fedSenderAPI.IsBlacklistedOrBackingOff)
asQuery := appservice.NewInternalAPI(
processCtx, cfg, &natsInstance, userAPI, rsAPI,
)
rsAPI.SetAppserviceAPI(asQuery)
- fedSenderAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true)
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
monolith := setup.Monolith{
diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go
index 791ad261..2b227d37 100644
--- a/build/gobind-yggdrasil/monolith.go
+++ b/build/gobind-yggdrasil/monolith.go
@@ -216,7 +216,7 @@ func (m *DendriteMonolith) Start() {
processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true,
)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
diff --git a/clientapi/admin_test.go b/clientapi/admin_test.go
index b228dd93..f0e5f004 100644
--- a/clientapi/admin_test.go
+++ b/clientapi/admin_test.go
@@ -45,7 +45,7 @@ func TestAdminCreateToken(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{
aliceAdmin: {},
@@ -196,7 +196,7 @@ func TestAdminListRegistrationTokens(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{
aliceAdmin: {},
@@ -314,7 +314,7 @@ func TestAdminGetRegistrationToken(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{
aliceAdmin: {},
@@ -415,7 +415,7 @@ func TestAdminDeleteRegistrationToken(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{
aliceAdmin: {},
@@ -509,7 +509,7 @@ func TestAdminUpdateRegistrationToken(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
accessTokens := map[*test.User]userDevice{
aliceAdmin: {},
@@ -693,7 +693,7 @@ func TestAdminResetPassword(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
// Needed for changing the password/login
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -791,7 +791,7 @@ func TestPurgeRoom(t *testing.T) {
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(fsAPI, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
// Create the room
@@ -863,7 +863,7 @@ func TestAdminEvacuateRoom(t *testing.T) {
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(fsAPI, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// Create the room
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil {
@@ -964,7 +964,7 @@ func TestAdminEvacuateUser(t *testing.T) {
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, basepkg.CreateFederationClient(cfg, nil), rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(fsAPI, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// Create the room
if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", api.DoNotSendToOtherServers, nil, false); err != nil {
@@ -1055,7 +1055,7 @@ func TestAdminMarkAsStale(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
diff --git a/clientapi/clientapi_test.go b/clientapi/clientapi_test.go
index 2bb15fba..2ff4b650 100644
--- a/clientapi/clientapi_test.go
+++ b/clientapi/clientapi_test.go
@@ -17,6 +17,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/clientapi/threepid"
+ "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushrules"
@@ -49,6 +50,10 @@ type userDevice struct {
password string
}
+var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
+ return &statistics.ServerStatistics{}, nil
+}
+
func TestGetPutDevices(t *testing.T) {
alice := test.NewUser(t)
bob := test.NewUser(t)
@@ -121,7 +126,7 @@ func TestGetPutDevices(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -170,7 +175,7 @@ func TestDeleteDevice(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -275,7 +280,7 @@ func TestDeleteDevices(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/ for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -442,7 +447,7 @@ func TestSetDisplayname(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI)
AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -554,7 +559,7 @@ func TestSetAvatarURL(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asPI := appservice.NewInternalAPI(processCtx, cfg, natsInstance, userAPI, rsAPI)
AddPublicRoutes(processCtx, routers, cfg, natsInstance, base.CreateFederationClient(cfg, nil), rsAPI, asPI, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -632,7 +637,7 @@ func TestTyping(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
// Needed to create accounts
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -716,7 +721,7 @@ func TestMembership(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
// Needed to create accounts
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
rsAPI.SetUserAPI(userAPI)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -955,7 +960,7 @@ func TestCapabilities(t *testing.T) {
// Needed to create accounts
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -1002,7 +1007,7 @@ func TestTurnserver(t *testing.T) {
// Needed to create accounts
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
//rsAPI.SetUserAPI(userAPI)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -1100,7 +1105,7 @@ func Test3PID(t *testing.T) {
// Needed to create accounts
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI/userAPI for this test, so nil for other APIs etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -1276,7 +1281,7 @@ func TestPushRules(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -1663,7 +1668,7 @@ func TestKeys(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
@@ -2125,7 +2130,7 @@ func TestKeyBackup(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the rsAPI for this test, so nil for other APIs/caches etc.
AddPublicRoutes(processCtx, routers, cfg, &natsInstance, nil, rsAPI, nil, nil, nil, userAPI, nil, nil, caching.DisableMetrics)
diff --git a/clientapi/routing/joinroom_test.go b/clientapi/routing/joinroom_test.go
index be3fb429..bd854efa 100644
--- a/clientapi/routing/joinroom_test.go
+++ b/clientapi/routing/joinroom_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -21,6 +22,10 @@ import (
uapi "github.com/matrix-org/dendrite/userapi/api"
)
+var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
+ return &statistics.ServerStatistics{}, nil
+}
+
func TestJoinRoomByIDOrAlias(t *testing.T) {
alice := test.NewUser(t)
bob := test.NewUser(t)
@@ -36,7 +41,7 @@ func TestJoinRoomByIDOrAlias(t *testing.T) {
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil) // creates the rs.Inputer etc
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
// Create the users in the userapi
diff --git a/clientapi/routing/login_test.go b/clientapi/routing/login_test.go
index 4c4fc353..1c628b19 100644
--- a/clientapi/routing/login_test.go
+++ b/clientapi/routing/login_test.go
@@ -49,7 +49,7 @@ func TestLogin(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
// Needed for /login
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
// We mostly need the userAPI for this test, so nil for other APIs/caches etc.
Setup(routers, cfg, nil, nil, userAPI, nil, nil, nil, nil, nil, nil, nil, caching.DisableMetrics)
diff --git a/clientapi/routing/register_test.go b/clientapi/routing/register_test.go
index 69b29e9c..98455f80 100644
--- a/clientapi/routing/register_test.go
+++ b/clientapi/routing/register_test.go
@@ -416,7 +416,7 @@ func Test_register(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
@@ -596,7 +596,7 @@ func TestRegisterUserWithDisplayName(t *testing.T) {
cm := sqlutil.NewConnectionManager(processCtx, cfg.Global.DatabaseOptions)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
deviceName, deviceID := "deviceName", "deviceID"
expectedDisplayName := "DisplayName"
response := completeRegistration(
@@ -637,7 +637,7 @@ func TestRegisterAdminUsingSharedSecret(t *testing.T) {
caches := caching.NewRistrettoCache(128*1024*1024, time.Hour, caching.DisableMetrics)
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
expectedDisplayName := "rabbit"
jsonStr := []byte(`{"admin":true,"mac":"24dca3bba410e43fe64b9b5c28306693bf3baa9f","nonce":"759f047f312b99ff428b21d581256f8592b8976e58bc1b543972dc6147e529a79657605b52d7becd160ff5137f3de11975684319187e06901955f79e5a6c5a79","password":"wonderland","username":"alice","displayname":"rabbit"}`)
diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go
index 5222d0b8..d9f44b5c 100644
--- a/cmd/dendrite-demo-pinecone/monolith/monolith.go
+++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go
@@ -145,7 +145,7 @@ func (p *P2PMonolith) SetupDendrite(
)
rsAPI.SetFederationAPI(fsAPI, keyRing)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, enableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, enableMetrics, fsAPI.IsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index b0768753..1e518634 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -213,14 +213,15 @@ func main() {
natsInstance := jetstream.NATSInstance{}
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.EnableMetrics)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics)
-
- asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
- rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationapi.NewInternalAPI(
processCtx, cfg, cm, &natsInstance, federation, rsAPI, caches, keyRing, true,
)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federation, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
+
+ asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
+ rsAPI.SetAppserviceAPI(asAPI)
+
rsAPI.SetFederationAPI(fsAPI, keyRing)
monolith := setup.Monolith{
diff --git a/cmd/dendrite/main.go b/cmd/dendrite/main.go
index f38263c6..5234b750 100644
--- a/cmd/dendrite/main.go
+++ b/cmd/dendrite/main.go
@@ -162,7 +162,7 @@ func main() {
// dependency. Other components also need updating after their dependencies are up.
rsAPI.SetFederationAPI(fsAPI, keyRing)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federationClient, caching.EnableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, federationClient, caching.EnableMetrics, fsAPI.IsBlacklistedOrBackingOff)
asAPI := appservice.NewInternalAPI(processCtx, cfg, &natsInstance, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index e148199f..e2524f66 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -24,7 +24,6 @@ import (
"github.com/matrix-org/gomatrixserverlib/fclient"
"github.com/sirupsen/logrus"
- "github.com/matrix-org/dendrite/federationapi/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
"github.com/matrix-org/dendrite/federationapi/internal"
@@ -102,7 +101,7 @@ func NewInternalAPI(
caches *caching.Caches,
keyRing *gomatrixserverlib.KeyRing,
resetBlacklist bool,
-) api.FederationInternalAPI {
+) *internal.FederationInternalAPI {
cfg := &dendriteCfg.FederationAPI
federationDB, err := storage.NewDatabase(processContext.Context(), cm, &cfg.Database, caches, dendriteCfg.Global.IsLocalServerName)
diff --git a/federationapi/internal/api.go b/federationapi/internal/api.go
index 3e6f3956..67388a10 100644
--- a/federationapi/internal/api.go
+++ b/federationapi/internal/api.go
@@ -112,7 +112,7 @@ func NewFederationInternalAPI(
}
}
-func (a *FederationInternalAPI) isBlacklistedOrBackingOff(s spec.ServerName) (*statistics.ServerStatistics, error) {
+func (a *FederationInternalAPI) IsBlacklistedOrBackingOff(s spec.ServerName) (*statistics.ServerStatistics, error) {
stats := a.statistics.ForServer(s)
if stats.Blacklisted() {
return stats, &api.FederationClientError{
@@ -151,7 +151,7 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti
func (a *FederationInternalAPI) doRequestIfNotBackingOffOrBlacklisted(
s spec.ServerName, request func() (interface{}, error),
) (interface{}, error) {
- stats, err := a.isBlacklistedOrBackingOff(s)
+ stats, err := a.IsBlacklistedOrBackingOff(s)
if err != nil {
return nil, err
}
diff --git a/federationapi/routing/profile_test.go b/federationapi/routing/profile_test.go
index a31b206c..ba13e07f 100644
--- a/federationapi/routing/profile_test.go
+++ b/federationapi/routing/profile_test.go
@@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
fedAPI "github.com/matrix-org/dendrite/federationapi"
- fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
@@ -67,11 +66,8 @@ func TestHandleQueryProfile(t *testing.T) {
keyRing := serverKeyAPI.KeyRing()
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true)
userapi := fakeUserAPI{}
- r, ok := fedapi.(*fedInternal.FederationInternalAPI)
- if !ok {
- panic("This is a programming error.")
- }
- routing.Setup(routers, cfg, nil, r, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
+
+ routing.Setup(routers, cfg, nil, fedapi, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
handler := fedMux.Get(routing.QueryProfileRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil)
diff --git a/federationapi/routing/query_test.go b/federationapi/routing/query_test.go
index bb14ab03..fd0894d1 100644
--- a/federationapi/routing/query_test.go
+++ b/federationapi/routing/query_test.go
@@ -25,7 +25,6 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
fedAPI "github.com/matrix-org/dendrite/federationapi"
- fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
@@ -65,11 +64,8 @@ func TestHandleQueryDirectory(t *testing.T) {
keyRing := serverKeyAPI.KeyRing()
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, &fedClient, nil, nil, keyRing, true)
userapi := fakeUserAPI{}
- r, ok := fedapi.(*fedInternal.FederationInternalAPI)
- if !ok {
- panic("This is a programming error.")
- }
- routing.Setup(routers, cfg, nil, r, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
+
+ routing.Setup(routers, cfg, nil, fedapi, keyRing, &fedClient, &userapi, &cfg.MSCs, nil, caching.DisableMetrics)
handler := fedMux.Get(routing.QueryDirectoryRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil)
diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go
index f629479d..ff4f7bd0 100644
--- a/federationapi/routing/send_test.go
+++ b/federationapi/routing/send_test.go
@@ -23,7 +23,6 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
fedAPI "github.com/matrix-org/dendrite/federationapi"
- fedInternal "github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
@@ -62,11 +61,8 @@ func TestHandleSend(t *testing.T) {
fedapi := fedAPI.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, nil, nil, nil, true)
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
- r, ok := fedapi.(*fedInternal.FederationInternalAPI)
- if !ok {
- panic("This is a programming error.")
- }
- routing.Setup(routers, cfg, nil, r, keyRing, nil, nil, &cfg.MSCs, nil, caching.DisableMetrics)
+
+ routing.Setup(routers, cfg, nil, fedapi, keyRing, nil, nil, &cfg.MSCs, nil, caching.DisableMetrics)
handler := fedMux.Get(routing.SendRouteName).GetHandler().ServeHTTP
_, sk, _ := ed25519.GenerateKey(nil)
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index 22d27ba0..218a0d8a 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"time"
+ "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/httputil"
@@ -34,6 +35,10 @@ import (
"github.com/matrix-org/dendrite/test/testrig"
)
+var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
+ return &statistics.ServerStatistics{}, nil
+}
+
type FakeQuerier struct {
api.QuerySenderIDAPI
}
@@ -58,7 +63,7 @@ func TestUsers(t *testing.T) {
})
t.Run("kick users", func(t *testing.T) {
- usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ usrAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
rsAPI.SetUserAPI(usrAPI)
testKickUsers(t, rsAPI, usrAPI)
})
@@ -258,7 +263,7 @@ func TestPurgeRoom(t *testing.T) {
fsAPI := federationapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, nil, rsAPI, caches, nil, true)
rsAPI.SetFederationAPI(fsAPI, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, fsAPI.IsBlacklistedOrBackingOff)
syncapi.AddPublicRoutes(processCtx, routers, cfg, cm, &natsInstance, userAPI, rsAPI, caches, caching.DisableMetrics)
// Create the room
@@ -1050,7 +1055,7 @@ func TestUpgrade(t *testing.T) {
rsAPI := roomserver.NewInternalAPI(processCtx, cfg, cm, &natsInstance, caches, caching.DisableMetrics)
rsAPI.SetFederationAPI(nil, nil)
- userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics)
+ userAPI := userapi.NewInternalAPI(processCtx, cfg, cm, &natsInstance, rsAPI, nil, caching.DisableMetrics, testIsBlacklistedOrBackingOff)
rsAPI.SetUserAPI(userAPI)
for _, tc := range testCases {
diff --git a/userapi/internal/device_list_update.go b/userapi/internal/device_list_update.go
index a4d28188..b4063516 100644
--- a/userapi/internal/device_list_update.go
+++ b/userapi/internal/device_list_update.go
@@ -25,6 +25,7 @@ import (
"sync"
"time"
+ "github.com/matrix-org/dendrite/federationapi/statistics"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib/fclient"
"github.com/matrix-org/gomatrixserverlib/spec"
@@ -108,6 +109,8 @@ type DeviceListUpdater struct {
userIDToChan map[string]chan bool
userIDToChanMu *sync.Mutex
rsAPI rsapi.KeyserverRoomserverAPI
+
+ isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error)
}
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
@@ -167,25 +170,28 @@ func NewDeviceListUpdater(
process *process.ProcessContext, db DeviceListUpdaterDatabase,
api DeviceListUpdaterAPI, producer KeyChangeProducer,
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
- rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName,
+ rsAPI rsapi.KeyserverRoomserverAPI,
+ thisServer spec.ServerName,
enableMetrics bool,
+ isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error),
) *DeviceListUpdater {
if enableMetrics {
prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying)
}
return &DeviceListUpdater{
- process: process,
- userIDToMutex: make(map[string]*sync.Mutex),
- mu: &sync.Mutex{},
- db: db,
- api: api,
- producer: producer,
- fedClient: fedClient,
- thisServer: thisServer,
- workerChans: make([]chan spec.ServerName, numWorkers),
- userIDToChan: make(map[string]chan bool),
- userIDToChanMu: &sync.Mutex{},
- rsAPI: rsAPI,
+ process: process,
+ userIDToMutex: make(map[string]*sync.Mutex),
+ mu: &sync.Mutex{},
+ db: db,
+ api: api,
+ producer: producer,
+ fedClient: fedClient,
+ thisServer: thisServer,
+ workerChans: make([]chan spec.ServerName, numWorkers),
+ userIDToChan: make(map[string]chan bool),
+ userIDToChanMu: &sync.Mutex{},
+ rsAPI: rsAPI,
+ isBlacklistedOrBackingOffFn: isBlacklistedOrBackingOffFn,
}
}
@@ -362,13 +368,22 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
if err != nil {
return
}
+ _, err = u.isBlacklistedOrBackingOffFn(remoteServer)
+ var federationClientError *fedsenderapi.FederationClientError
+ if errors.As(err, &federationClientError) {
+ if federationClientError.Blacklisted {
+ return
+ }
+ }
+
hash := fnv.New32a()
_, _ = hash.Write([]byte(remoteServer))
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
ch := u.assignChannel(userID)
+ // Since workerChans are buffered, we only increment here and let the worker
+ // decrement it once it is done processing.
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
- defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec()
u.workerChans[index] <- remoteServer
select {
case <-ch:
@@ -405,24 +420,38 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
go func() {
var serversToRetry []spec.ServerName
for {
- serversToRetry = serversToRetry[:0] // reuse memory
- time.Sleep(time.Second)
+ // nuke serversToRetry by re-slicing it to be "empty".
+ // The capacity of the slice is unchanged, which ensures we can reuse the memory.
+ serversToRetry = serversToRetry[:0]
+
+ deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
+ time.Sleep(time.Second * 2)
+
+ // -2, so we have space for incoming device list updates over federation
+ maxServers := (cap(ch) - len(ch)) - 2
+ if maxServers <= 0 {
+ continue
+ }
+
retriesMu.Lock()
now := time.Now()
for srv, retryAt := range retries {
if now.After(retryAt) {
serversToRetry = append(serversToRetry, srv)
+ if maxServers == len(serversToRetry) {
+ break
+ }
}
}
+
for _, srv := range serversToRetry {
delete(retries, srv)
}
- deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
retriesMu.Unlock()
+
for _, srv := range serversToRetry {
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
ch <- srv
- deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
}
}
}()
@@ -430,8 +459,18 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
retriesMu.Lock()
_, exists := retries[serverName]
retriesMu.Unlock()
- if exists {
- // Don't retry a server that we're already waiting for.
+
+ // If the serverName is coming from retries, maybe it was
+ // blacklisted in the meantime.
+ _, err := u.isBlacklistedOrBackingOffFn(serverName)
+ var federationClientError *fedsenderapi.FederationClientError
+ // unwrap errors and check for FederationClientError, if found, federationClientError will be not nil
+ errors.As(err, &federationClientError)
+ isBlacklisted := federationClientError != nil && federationClientError.Blacklisted
+
+ // Don't retry a server that we're already waiting for or is blacklisted by now.
+ if exists || isBlacklisted {
+ deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
continue
}
waitTime, shouldRetry := u.processServer(serverName)
@@ -442,6 +481,7 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
}
retriesMu.Unlock()
}
+ deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
}
}
diff --git a/userapi/internal/device_list_update_test.go b/userapi/internal/device_list_update_test.go
index 14a49bc5..a2f1869d 100644
--- a/userapi/internal/device_list_update_test.go
+++ b/userapi/internal/device_list_update_test.go
@@ -27,6 +27,8 @@ import (
"testing"
"time"
+ api2 "github.com/matrix-org/dendrite/federationapi/api"
+ "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
@@ -129,6 +131,10 @@ type mockDeviceListUpdaterAPI struct {
func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
}
+var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
+ return &statistics.ServerStatistics{}, nil
+}
+
type roundTripper struct {
fn func(*http.Request) (*http.Response, error)
}
@@ -162,7 +168,7 @@ func TestUpdateHavePrevID(t *testing.T) {
}
ap := &mockDeviceListUpdaterAPI{}
producer := &mockKeyChangeProducer{}
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics)
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
event := gomatrixserverlib.DeviceListUpdateEvent{
DeviceDisplayName: "Foo Bar",
Deleted: false,
@@ -234,7 +240,7 @@ func TestUpdateNoPrevID(t *testing.T) {
`)),
}, nil
})
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics)
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err)
}
@@ -304,7 +310,7 @@ func TestDebounce(t *testing.T) {
close(incomingFedReq)
return <-fedCh, nil
})
- updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics)
+ updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
if err := updater.Start(); err != nil {
t.Fatalf("failed to start updater: %s", err)
}
@@ -407,7 +413,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) {
updater := NewDeviceListUpdater(processCtx, db, nil,
nil, nil,
- 0, rsAPI, "test", caching.DisableMetrics)
+ 0, rsAPI, "test", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
if err := updater.CleanUp(); err != nil {
t.Error(err)
}
@@ -475,3 +481,68 @@ func Test_dedupeStateList(t *testing.T) {
})
}
}
+
+func TestDeviceListUpdaterIgnoreBlacklisted(t *testing.T) {
+ unreachableServer := spec.ServerName("notlocalhost")
+
+ updater := DeviceListUpdater{
+ workerChans: make([]chan spec.ServerName, 1),
+ isBlacklistedOrBackingOffFn: func(s spec.ServerName) (*statistics.ServerStatistics, error) {
+ switch s {
+ case unreachableServer:
+ return nil, &api2.FederationClientError{Blacklisted: true}
+ }
+ return nil, nil
+ },
+ mu: &sync.Mutex{},
+ userIDToChanMu: &sync.Mutex{},
+ userIDToChan: make(map[string]chan bool),
+ userIDToMutex: make(map[string]*sync.Mutex),
+ }
+ workerCh := make(chan spec.ServerName)
+ defer close(workerCh)
+ updater.workerChans[0] = workerCh
+
+ // happy case
+ alice := "@alice:localhost"
+ aliceCh := updater.assignChannel(alice)
+ defer updater.clearChannel(alice)
+
+ // failing case
+ bob := "@bob:" + unreachableServer
+ bobCh := updater.assignChannel(string(bob))
+ defer updater.clearChannel(string(bob))
+
+ expectedServers := map[spec.ServerName]struct{}{
+ "localhost": {},
+ }
+ unexpectedServers := make(map[spec.ServerName]struct{})
+
+ go func() {
+ for serverName := range workerCh {
+ switch serverName {
+ case "localhost":
+ delete(expectedServers, serverName)
+ aliceCh <- true // unblock notifyWorkers
+ case unreachableServer: // this should not happen as it is "filtered" away by the blacklist
+ unexpectedServers[serverName] = struct{}{}
+ bobCh <- true
+ default:
+ unexpectedServers[serverName] = struct{}{}
+ }
+ }
+ }()
+
+ // alice is not blacklisted
+ updater.notifyWorkers(alice)
+ // bob is blacklisted
+ updater.notifyWorkers(string(bob))
+
+ for server := range expectedServers {
+ t.Errorf("Server still in expectedServers map: %s", server)
+ }
+
+ for server := range unexpectedServers {
+ t.Errorf("unexpected server in result: %s", server)
+ }
+}
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 34bf119a..a1c9b94a 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -18,10 +18,12 @@ import (
"time"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
+ "github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/gomatrixserverlib/spec"
"github.com/sirupsen/logrus"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
@@ -47,6 +49,7 @@ func NewInternalAPI(
rsAPI rsapi.UserRoomserverAPI,
fedClient fedsenderapi.KeyserverFederationAPI,
enableMetrics bool,
+ blacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error),
) *internal.UserInternalAPI {
js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
appServices := dendriteCfg.Derived.ApplicationServices
@@ -100,7 +103,7 @@ func NewInternalAPI(
FedClient: fedClient,
}
- updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics)
+ updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics, blacklistedOrBackingOffFn)
userAPI.Updater = updater
// Remove users which we don't share a room with anymore
if err := updater.CleanUp(); err != nil {