diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-09-02 15:26:30 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-02 15:26:30 +0100 |
commit | 096191ca240776031370e99b93732557972ba92a (patch) | |
tree | 79f4247556a3dbd01fc8dca2fac68f752800a6d7 | |
parent | e473320e733484b1cc6da0588fd2ccf4affb3d24 (diff) |
Use federation sender for backfill/getting missing events (#1379)
* Use federation sender for backfill and getting missing events
* Fix internal URL paths
* Update go.mod/go.sum for matrix-org/gomatrixserverlib#218
* Add missing server implementations in HTTP interface
-rw-r--r-- | build/gobind/monolith.go | 2 | ||||
-rw-r--r-- | cmd/dendrite-demo-libp2p/main.go | 2 | ||||
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/main.go | 2 | ||||
-rw-r--r-- | cmd/dendrite-monolith-server/main.go | 2 | ||||
-rw-r--r-- | cmd/dendrite-room-server/main.go | 3 | ||||
-rw-r--r-- | cmd/dendritejs/main.go | 2 | ||||
-rw-r--r-- | federationsender/api/api.go | 3 | ||||
-rw-r--r-- | federationsender/internal/api.go | 48 | ||||
-rw-r--r-- | federationsender/inthttp/client.go | 130 | ||||
-rw-r--r-- | federationsender/inthttp/server.go | 88 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rw-r--r-- | roomserver/internal/api.go | 8 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_backfill.go | 23 | ||||
-rw-r--r-- | roomserver/roomserver.go | 3 | ||||
-rw-r--r-- | roomserver/roomserver_test.go | 2 |
16 files changed, 295 insertions, 29 deletions
diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 59535c7b..725c9c07 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -120,7 +120,7 @@ func (m *DendriteMonolith) Start() { keyAPI.SetUserAPI(userAPI) rsAPI := roomserver.NewInternalAPI( - base, keyRing, federation, + base, keyRing, ) eduInputAPI := eduserver.NewInternalAPI( diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index e2d23e89..d4f0cee0 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -155,7 +155,7 @@ func main() { stateAPI := currentstateserver.NewInternalAPI(&base.Base.Cfg.CurrentStateServer, base.Base.KafkaConsumer) rsAPI := roomserver.NewInternalAPI( - &base.Base, keyRing, federation, + &base.Base, keyRing, ) eduInputAPI := eduserver.NewInternalAPI( &base.Base, cache.New(), userAPI, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 26999ebe..fcf3d4c5 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -104,7 +104,7 @@ func main() { keyAPI.SetUserAPI(userAPI) rsComponent := roomserver.NewInternalAPI( - base, keyRing, federation, + base, keyRing, ) rsAPI := rsComponent diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 81511746..717b21a9 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -81,7 +81,7 @@ func main() { keyRing := serverKeyAPI.KeyRing() rsImpl := roomserver.NewInternalAPI( - base, keyRing, federation, + base, keyRing, ) // call functions directly on the impl unless running in HTTP mode rsAPI := rsImpl diff --git a/cmd/dendrite-room-server/main.go b/cmd/dendrite-room-server/main.go index 0d587e6e..08ad34bf 100644 --- a/cmd/dendrite-room-server/main.go +++ b/cmd/dendrite-room-server/main.go @@ -23,13 +23,12 @@ func main() { cfg := setup.ParseFlags(false) base := setup.NewBaseDendrite(cfg, "RoomServerAPI", true) defer base.Close() // nolint: errcheck - federation := base.CreateFederationClient() serverKeyAPI := base.ServerKeyAPIClient() keyRing := serverKeyAPI.KeyRing() fsAPI := base.FederationSenderHTTPClient() - rsAPI := roomserver.NewInternalAPI(base, keyRing, federation) + rsAPI := roomserver.NewInternalAPI(base, keyRing) rsAPI.SetFederationSenderAPI(fsAPI) roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index c95eb3fc..aeca7094 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -205,7 +205,7 @@ func main() { } stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer) - rsAPI := roomserver.NewInternalAPI(base, keyRing, federation) + rsAPI := roomserver.NewInternalAPI(base, keyRing) eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) asQuery := appservice.NewInternalAPI( base, userAPI, rsAPI, diff --git a/federationsender/api/api.go b/federationsender/api/api.go index cea0010d..655d1d10 100644 --- a/federationsender/api/api.go +++ b/federationsender/api/api.go @@ -14,9 +14,12 @@ import ( // implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in // this interface are of type FederationClientError type FederationClient interface { + gomatrixserverlib.BackfillClient + gomatrixserverlib.FederatedStateClient GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error) ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error) QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error) + GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) } // FederationClientError is returned from FederationClient methods in the event of a problem. diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go index 6b5f4c34..61663be3 100644 --- a/federationsender/internal/api.go +++ b/federationsender/internal/api.go @@ -136,3 +136,51 @@ func (a *FederationSenderInternalAPI) QueryKeys( } return ires.(gomatrixserverlib.RespQueryKeys), nil } + +func (a *FederationSenderInternalAPI) Backfill( + ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string, +) (res gomatrixserverlib.Transaction, err error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.Backfill(ctx, s, roomID, limit, eventIDs) + }) + if err != nil { + return gomatrixserverlib.Transaction{}, err + } + return ires.(gomatrixserverlib.Transaction), nil +} + +func (a *FederationSenderInternalAPI) LookupState( + ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion, +) (res gomatrixserverlib.RespState, err error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.LookupState(ctx, s, roomID, eventID, roomVersion) + }) + if err != nil { + return gomatrixserverlib.RespState{}, err + } + return ires.(gomatrixserverlib.RespState), nil +} + +func (a *FederationSenderInternalAPI) LookupStateIDs( + ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, +) (res gomatrixserverlib.RespStateIDs, err error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.LookupStateIDs(ctx, s, roomID, eventID) + }) + if err != nil { + return gomatrixserverlib.RespStateIDs{}, err + } + return ires.(gomatrixserverlib.RespStateIDs), nil +} + +func (a *FederationSenderInternalAPI) GetEvent( + ctx context.Context, s gomatrixserverlib.ServerName, eventID string, +) (res gomatrixserverlib.Transaction, err error) { + ires, err := a.doRequest(s, func() (interface{}, error) { + return a.federation.GetEvent(ctx, s, eventID) + }) + if err != nil { + return gomatrixserverlib.Transaction{}, err + } + return ires.(gomatrixserverlib.Transaction), nil +} diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go index 79e220c3..5bfe6089 100644 --- a/federationsender/inthttp/client.go +++ b/federationsender/inthttp/client.go @@ -26,6 +26,10 @@ const ( FederationSenderGetUserDevicesPath = "/federationsender/client/getUserDevices" FederationSenderClaimKeysPath = "/federationsender/client/claimKeys" FederationSenderQueryKeysPath = "/federationsender/client/queryKeys" + FederationSenderBackfillPath = "/federationsender/client/backfill" + FederationSenderLookupStatePath = "/federationsender/client/lookupState" + FederationSenderLookupStateIDsPath = "/federationsender/client/lookupStateIDs" + FederationSenderGetEventPath = "/federationsender/client/getEvent" ) // NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. @@ -228,3 +232,129 @@ func (h *httpFederationSenderInternalAPI) QueryKeys( } return *response.Res, nil } + +type backfill struct { + S gomatrixserverlib.ServerName + RoomID string + Limit int + EventIDs []string + Res *gomatrixserverlib.Transaction + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) Backfill( + ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string, +) (gomatrixserverlib.Transaction, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "Backfill") + defer span.Finish() + + request := backfill{ + S: s, + RoomID: roomID, + Limit: limit, + EventIDs: eventIDs, + } + var response backfill + apiURL := h.federationSenderURL + FederationSenderBackfillPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return gomatrixserverlib.Transaction{}, err + } + if response.Err != nil { + return gomatrixserverlib.Transaction{}, response.Err + } + return *response.Res, nil +} + +type lookupState struct { + S gomatrixserverlib.ServerName + RoomID string + EventID string + RoomVersion gomatrixserverlib.RoomVersion + Res *gomatrixserverlib.RespState + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) LookupState( + ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion, +) (gomatrixserverlib.RespState, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "LookupState") + defer span.Finish() + + request := lookupState{ + S: s, + RoomID: roomID, + EventID: eventID, + RoomVersion: roomVersion, + } + var response lookupState + apiURL := h.federationSenderURL + FederationSenderLookupStatePath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return gomatrixserverlib.RespState{}, err + } + if response.Err != nil { + return gomatrixserverlib.RespState{}, response.Err + } + return *response.Res, nil +} + +type lookupStateIDs struct { + S gomatrixserverlib.ServerName + RoomID string + EventID string + Res *gomatrixserverlib.RespStateIDs + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) LookupStateIDs( + ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, +) (gomatrixserverlib.RespStateIDs, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "LookupStateIDs") + defer span.Finish() + + request := lookupStateIDs{ + S: s, + RoomID: roomID, + EventID: eventID, + } + var response lookupStateIDs + apiURL := h.federationSenderURL + FederationSenderLookupStateIDsPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return gomatrixserverlib.RespStateIDs{}, err + } + if response.Err != nil { + return gomatrixserverlib.RespStateIDs{}, response.Err + } + return *response.Res, nil +} + +type getEvent struct { + S gomatrixserverlib.ServerName + EventID string + Res *gomatrixserverlib.Transaction + Err *api.FederationClientError +} + +func (h *httpFederationSenderInternalAPI) GetEvent( + ctx context.Context, s gomatrixserverlib.ServerName, eventID string, +) (gomatrixserverlib.Transaction, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "GetEvent") + defer span.Finish() + + request := getEvent{ + S: s, + EventID: eventID, + } + var response getEvent + apiURL := h.federationSenderURL + FederationSenderGetEventPath + err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) + if err != nil { + return gomatrixserverlib.Transaction{}, err + } + if response.Err != nil { + return gomatrixserverlib.Transaction{}, response.Err + } + return *response.Res, nil +} diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go index b1825576..dfbff1c0 100644 --- a/federationsender/inthttp/server.go +++ b/federationsender/inthttp/server.go @@ -175,4 +175,92 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route return util.JSONResponse{Code: http.StatusOK, JSON: request} }), ) + internalAPIMux.Handle( + FederationSenderBackfillPath, + httputil.MakeInternalAPI("Backfill", func(req *http.Request) util.JSONResponse { + var request backfill + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.Backfill(req.Context(), request.S, request.RoomID, request.Limit, request.EventIDs) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderLookupStatePath, + httputil.MakeInternalAPI("LookupState", func(req *http.Request) util.JSONResponse { + var request lookupState + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.LookupState(req.Context(), request.S, request.RoomID, request.EventID, request.RoomVersion) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderLookupStateIDsPath, + httputil.MakeInternalAPI("LookupStateIDs", func(req *http.Request) util.JSONResponse { + var request lookupStateIDs + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.LookupStateIDs(req.Context(), request.S, request.RoomID, request.EventID) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) + internalAPIMux.Handle( + FederationSenderGetEventPath, + httputil.MakeInternalAPI("GetEvent", func(req *http.Request) util.JSONResponse { + var request getEvent + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + res, err := intAPI.GetEvent(req.Context(), request.S, request.EventID) + if err != nil { + ferr, ok := err.(*api.FederationClientError) + if ok { + request.Err = ferr + } else { + request.Err = &api.FederationClientError{ + Err: err.Error(), + } + } + } + request.Res = &res + return util.JSONResponse{Code: http.StatusOK, JSON: request} + }), + ) } @@ -21,7 +21,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd - github.com/matrix-org/gomatrixserverlib v0.0.0-20200817100842-9d02141812f2 + github.com/matrix-org/gomatrixserverlib v0.0.0-20200902135805-f7a5b5e89750 github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/mattn/go-sqlite3 v1.14.2 @@ -567,8 +567,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg= github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200817100842-9d02141812f2 h1:9wKwfd5KDcXuqZ7/kAaYe0QM4DGM+2awjjvXQtrDa6k= -github.com/matrix-org/gomatrixserverlib v0.0.0-20200817100842-9d02141812f2/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200902135805-f7a5b5e89750 h1:k5vsLfpylXHOXgN51N0QNbak9i+4bT33Puk/ZJgcdDw= +github.com/matrix-org/gomatrixserverlib v0.0.0-20200902135805-f7a5b5e89750/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU= github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4= github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo= diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 1897f7a5..8ac1bdda 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -22,7 +22,7 @@ type RoomserverInternalAPI struct { Cache caching.RoomServerCaches ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier - FedClient *gomatrixserverlib.FederationClient + fsAPI fsAPI.FederationSenderInternalAPI OutputRoomEventTopic string // Kafka topic for new output room events Inviter *perform.Inviter Joiner *perform.Joiner @@ -30,12 +30,11 @@ type RoomserverInternalAPI struct { Publisher *perform.Publisher Backfiller *perform.Backfiller mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent - fsAPI fsAPI.FederationSenderInternalAPI } func NewRoomserverAPI( cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer, - outputRoomEventTopic string, caches caching.RoomServerCaches, fedClient *gomatrixserverlib.FederationClient, + outputRoomEventTopic string, caches caching.RoomServerCaches, keyRing gomatrixserverlib.JSONVerifier, ) *RoomserverInternalAPI { a := &RoomserverInternalAPI{ @@ -45,7 +44,6 @@ func NewRoomserverAPI( Cache: caches, ServerName: cfg.Matrix.ServerName, KeyRing: keyRing, - FedClient: fedClient, OutputRoomEventTopic: outputRoomEventTopic, // perform-er structs get initialised when we have a federation sender to use } @@ -83,7 +81,7 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen r.Backfiller = &perform.Backfiller{ ServerName: r.ServerName, DB: r.DB, - FedClient: r.FedClient, + FSAPI: r.fsAPI, KeyRing: r.KeyRing, } } diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index ebb66ef4..d345e9c7 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/auth" @@ -18,7 +19,7 @@ import ( type Backfiller struct { ServerName gomatrixserverlib.ServerName DB storage.Database - FedClient *gomatrixserverlib.FederationClient + FSAPI federationSenderAPI.FederationSenderInternalAPI KeyRing gomatrixserverlib.JSONVerifier } @@ -81,7 +82,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform if info == nil || info.IsStub { return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID) } - requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities) + requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities) // Request 100 items regardless of what the query asks for. // We don't want to go much higher than this. // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass @@ -166,7 +167,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom continue // already found } logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id) - res, err := r.FedClient.GetEvent(ctx, srv, id) + res, err := r.FSAPI.GetEvent(ctx, srv, id) if err != nil { logger.WithError(err).Warn("failed to get event from server") continue @@ -201,7 +202,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom // backfillRequester implements gomatrixserverlib.BackfillRequester type backfillRequester struct { db storage.Database - fedClient *gomatrixserverlib.FederationClient + fsAPI federationSenderAPI.FederationSenderInternalAPI thisServer gomatrixserverlib.ServerName bwExtrems map[string][]string @@ -211,10 +212,10 @@ type backfillRequester struct { eventIDMap map[string]gomatrixserverlib.Event } -func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { +func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { return &backfillRequester{ db: db, - fedClient: fedClient, + fsAPI: fsAPI, thisServer: thisServer, eventIDToBeforeStateIDs: make(map[string][]string), eventIDMap: make(map[string]gomatrixserverlib.Event), @@ -258,7 +259,7 @@ FederationHit: logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event") for _, srv := range b.servers { // hit any valid server c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fedClient, + FedClient: b.fsAPI, RememberAuthEvents: false, Server: srv, } @@ -331,7 +332,7 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr } c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fedClient, + FedClient: b.fsAPI, RememberAuthEvents: false, Server: b.servers[0], } @@ -430,10 +431,10 @@ FindSuccessor: // Backfill performs a backfill request to the given server. // https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, - fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) { + limit int, fromEventIDs []string) (gomatrixserverlib.Transaction, error) { - tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs) - return &tx, err + tx, err := b.fsAPI.Backfill(ctx, server, roomID, limit, fromEventIDs) + return tx, err } func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) { diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index a428ad57..2eabf450 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -38,7 +38,6 @@ func AddInternalRoutes(router *mux.Router, intAPI api.RoomserverInternalAPI) { func NewInternalAPI( base *setup.BaseDendrite, keyRing gomatrixserverlib.JSONVerifier, - fedClient *gomatrixserverlib.FederationClient, ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer @@ -49,6 +48,6 @@ func NewInternalAPI( return internal.NewRoomserverAPI( cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), - base.Caches, fedClient, keyRing, + base.Caches, keyRing, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index bcd9afb3..0deb7acb 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -112,7 +112,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js Cfg: cfg, } - rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{}, nil) + rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{}) hevents := mustLoadEvents(t, ver, events) _, err = api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil) if err != nil { |