diff options
author | Kegsay <kegan@matrix.org> | 2020-10-06 18:09:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-06 18:09:02 +0100 |
commit | 0f7e707f399e7f633c58f4e1a5aedc0e45f90241 (patch) | |
tree | 3e6722fb086d738d80d140acfdc11433b666d19a | |
parent | 4feff8e8d9efd36b5d202ba219af997a8313866a (diff) |
Optimise servers to backfill from (#1485)
- Prefer perspective servers if they are in the room.
- Limit the number of backfill servers to 5 to avoid taking too long.
-rw-r--r-- | roomserver/internal/api.go | 34 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_backfill.go | 39 | ||||
-rw-r--r-- | roomserver/roomserver.go | 7 |
3 files changed, 58 insertions, 22 deletions
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 8dc1a170..ee4e4ec9 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -26,28 +26,30 @@ type RoomserverInternalAPI struct { *perform.Leaver *perform.Publisher *perform.Backfiller - DB storage.Database - Cfg *config.RoomServer - Producer sarama.SyncProducer - Cache caching.RoomServerCaches - ServerName gomatrixserverlib.ServerName - KeyRing gomatrixserverlib.JSONVerifier - fsAPI fsAPI.FederationSenderInternalAPI - OutputRoomEventTopic string // Kafka topic for new output room events + DB storage.Database + Cfg *config.RoomServer + Producer sarama.SyncProducer + Cache caching.RoomServerCaches + ServerName gomatrixserverlib.ServerName + KeyRing gomatrixserverlib.JSONVerifier + fsAPI fsAPI.FederationSenderInternalAPI + OutputRoomEventTopic string // Kafka topic for new output room events + PerspectiveServerNames []gomatrixserverlib.ServerName } func NewRoomserverAPI( cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer, outputRoomEventTopic string, caches caching.RoomServerCaches, - keyRing gomatrixserverlib.JSONVerifier, + keyRing gomatrixserverlib.JSONVerifier, perspectiveServerNames []gomatrixserverlib.ServerName, ) *RoomserverInternalAPI { serverACLs := acls.NewServerACLs(roomserverDB) a := &RoomserverInternalAPI{ - DB: roomserverDB, - Cfg: cfg, - Cache: caches, - ServerName: cfg.Matrix.ServerName, - KeyRing: keyRing, + DB: roomserverDB, + Cfg: cfg, + Cache: caches, + ServerName: cfg.Matrix.ServerName, + PerspectiveServerNames: perspectiveServerNames, + KeyRing: keyRing, Queryer: &query.Queryer{ DB: roomserverDB, Cache: caches, @@ -105,6 +107,10 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen DB: r.DB, FSAPI: r.fsAPI, KeyRing: r.KeyRing, + // Perspective servers are trusted to not lie about server keys, so we will also + // prefer these servers when backfilling (assuming they are in the room) rather + // than trying random servers + PreferServers: r.PerspectiveServerNames, } } diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index f6091994..d90ac8fc 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -30,11 +30,19 @@ import ( "github.com/sirupsen/logrus" ) +// the max number of servers to backfill from per request. If this is too low we may fail to backfill when +// we could've from another server. If this is too high we may take far too long to successfully backfill +// as we try dead servers. +const maxBackfillServers = 5 + type Backfiller struct { ServerName gomatrixserverlib.ServerName DB storage.Database FSAPI federationSenderAPI.FederationSenderInternalAPI KeyRing gomatrixserverlib.JSONVerifier + + // The servers which should be preferred above other servers when backfilling + PreferServers []gomatrixserverlib.ServerName } // PerformBackfill implements api.RoomServerQueryAPI @@ -96,7 +104,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform if info == nil || info.IsStub { return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID) } - requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities) + requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities, r.PreferServers) // Request 100 items regardless of what the query asks for. // We don't want to go much higher than this. // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass @@ -215,10 +223,11 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom // backfillRequester implements gomatrixserverlib.BackfillRequester type backfillRequester struct { - db storage.Database - fsAPI federationSenderAPI.FederationSenderInternalAPI - thisServer gomatrixserverlib.ServerName - bwExtrems map[string][]string + db storage.Database + fsAPI federationSenderAPI.FederationSenderInternalAPI + thisServer gomatrixserverlib.ServerName + preferServer map[gomatrixserverlib.ServerName]bool + bwExtrems map[string][]string // per-request state servers []gomatrixserverlib.ServerName @@ -226,7 +235,14 @@ type backfillRequester struct { eventIDMap map[string]gomatrixserverlib.Event } -func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { +func newBackfillRequester( + db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName, + bwExtrems map[string][]string, preferServers []gomatrixserverlib.ServerName, +) *backfillRequester { + preferServer := make(map[gomatrixserverlib.ServerName]bool) + for _, p := range preferServers { + preferServer[p] = true + } return &backfillRequester{ db: db, fsAPI: fsAPI, @@ -234,6 +250,7 @@ func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.Federat eventIDToBeforeStateIDs: make(map[string][]string), eventIDMap: make(map[string]gomatrixserverlib.Event), bwExtrems: bwExtrems, + preferServer: preferServer, } } @@ -436,8 +453,16 @@ FindSuccessor: if server == b.thisServer { continue } - servers = append(servers, server) + if b.preferServer[server] { // insert at the front + servers = append([]gomatrixserverlib.ServerName{server}, servers...) + } else { // insert at the back + servers = append(servers, server) + } + } + if len(servers) > maxBackfillServers { + servers = servers[:maxBackfillServers] } + b.servers = servers return servers } diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 2eabf450..98a86e5b 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -41,6 +41,11 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer + var perspectiveServerNames []gomatrixserverlib.ServerName + for _, kp := range base.Cfg.ServerKeyAPI.KeyPerspectives { + perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) + } + roomserverDB, err := storage.Open(&cfg.Database, base.Caches) if err != nil { logrus.WithError(err).Panicf("failed to connect to room server db") @@ -48,6 +53,6 @@ func NewInternalAPI( return internal.NewRoomserverAPI( cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), - base.Caches, keyRing, + base.Caches, keyRing, perspectiveServerNames, ) } |