aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-09-02 15:26:30 +0100
committerGitHub <noreply@github.com>2020-09-02 15:26:30 +0100
commit096191ca240776031370e99b93732557972ba92a (patch)
tree79f4247556a3dbd01fc8dca2fac68f752800a6d7
parente473320e733484b1cc6da0588fd2ccf4affb3d24 (diff)
Use federation sender for backfill/getting missing events (#1379)
* Use federation sender for backfill and getting missing events * Fix internal URL paths * Update go.mod/go.sum for matrix-org/gomatrixserverlib#218 * Add missing server implementations in HTTP interface
-rw-r--r--build/gobind/monolith.go2
-rw-r--r--cmd/dendrite-demo-libp2p/main.go2
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go2
-rw-r--r--cmd/dendrite-monolith-server/main.go2
-rw-r--r--cmd/dendrite-room-server/main.go3
-rw-r--r--cmd/dendritejs/main.go2
-rw-r--r--federationsender/api/api.go3
-rw-r--r--federationsender/internal/api.go48
-rw-r--r--federationsender/inthttp/client.go130
-rw-r--r--federationsender/inthttp/server.go88
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--roomserver/internal/api.go8
-rw-r--r--roomserver/internal/perform/perform_backfill.go23
-rw-r--r--roomserver/roomserver.go3
-rw-r--r--roomserver/roomserver_test.go2
16 files changed, 295 insertions, 29 deletions
diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go
index 59535c7b..725c9c07 100644
--- a/build/gobind/monolith.go
+++ b/build/gobind/monolith.go
@@ -120,7 +120,7 @@ func (m *DendriteMonolith) Start() {
keyAPI.SetUserAPI(userAPI)
rsAPI := roomserver.NewInternalAPI(
- base, keyRing, federation,
+ base, keyRing,
)
eduInputAPI := eduserver.NewInternalAPI(
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go
index e2d23e89..d4f0cee0 100644
--- a/cmd/dendrite-demo-libp2p/main.go
+++ b/cmd/dendrite-demo-libp2p/main.go
@@ -155,7 +155,7 @@ func main() {
stateAPI := currentstateserver.NewInternalAPI(&base.Base.Cfg.CurrentStateServer, base.Base.KafkaConsumer)
rsAPI := roomserver.NewInternalAPI(
- &base.Base, keyRing, federation,
+ &base.Base, keyRing,
)
eduInputAPI := eduserver.NewInternalAPI(
&base.Base, cache.New(), userAPI,
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index 26999ebe..fcf3d4c5 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -104,7 +104,7 @@ func main() {
keyAPI.SetUserAPI(userAPI)
rsComponent := roomserver.NewInternalAPI(
- base, keyRing, federation,
+ base, keyRing,
)
rsAPI := rsComponent
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index 81511746..717b21a9 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -81,7 +81,7 @@ func main() {
keyRing := serverKeyAPI.KeyRing()
rsImpl := roomserver.NewInternalAPI(
- base, keyRing, federation,
+ base, keyRing,
)
// call functions directly on the impl unless running in HTTP mode
rsAPI := rsImpl
diff --git a/cmd/dendrite-room-server/main.go b/cmd/dendrite-room-server/main.go
index 0d587e6e..08ad34bf 100644
--- a/cmd/dendrite-room-server/main.go
+++ b/cmd/dendrite-room-server/main.go
@@ -23,13 +23,12 @@ func main() {
cfg := setup.ParseFlags(false)
base := setup.NewBaseDendrite(cfg, "RoomServerAPI", true)
defer base.Close() // nolint: errcheck
- federation := base.CreateFederationClient()
serverKeyAPI := base.ServerKeyAPIClient()
keyRing := serverKeyAPI.KeyRing()
fsAPI := base.FederationSenderHTTPClient()
- rsAPI := roomserver.NewInternalAPI(base, keyRing, federation)
+ rsAPI := roomserver.NewInternalAPI(base, keyRing)
rsAPI.SetFederationSenderAPI(fsAPI)
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index c95eb3fc..aeca7094 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -205,7 +205,7 @@ func main() {
}
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
- rsAPI := roomserver.NewInternalAPI(base, keyRing, federation)
+ rsAPI := roomserver.NewInternalAPI(base, keyRing)
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI,
diff --git a/federationsender/api/api.go b/federationsender/api/api.go
index cea0010d..655d1d10 100644
--- a/federationsender/api/api.go
+++ b/federationsender/api/api.go
@@ -14,9 +14,12 @@ import (
// implements as proxy calls, with built-in backoff/retries/etc. Errors returned from functions in
// this interface are of type FederationClientError
type FederationClient interface {
+ gomatrixserverlib.BackfillClient
+ gomatrixserverlib.FederatedStateClient
GetUserDevices(ctx context.Context, s gomatrixserverlib.ServerName, userID string) (res gomatrixserverlib.RespUserDevices, err error)
ClaimKeys(ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string) (res gomatrixserverlib.RespClaimKeys, err error)
QueryKeys(ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string) (res gomatrixserverlib.RespQueryKeys, err error)
+ GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
}
// FederationClientError is returned from FederationClient methods in the event of a problem.
diff --git a/federationsender/internal/api.go b/federationsender/internal/api.go
index 6b5f4c34..61663be3 100644
--- a/federationsender/internal/api.go
+++ b/federationsender/internal/api.go
@@ -136,3 +136,51 @@ func (a *FederationSenderInternalAPI) QueryKeys(
}
return ires.(gomatrixserverlib.RespQueryKeys), nil
}
+
+func (a *FederationSenderInternalAPI) Backfill(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string,
+) (res gomatrixserverlib.Transaction, err error) {
+ ires, err := a.doRequest(s, func() (interface{}, error) {
+ return a.federation.Backfill(ctx, s, roomID, limit, eventIDs)
+ })
+ if err != nil {
+ return gomatrixserverlib.Transaction{}, err
+ }
+ return ires.(gomatrixserverlib.Transaction), nil
+}
+
+func (a *FederationSenderInternalAPI) LookupState(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
+) (res gomatrixserverlib.RespState, err error) {
+ ires, err := a.doRequest(s, func() (interface{}, error) {
+ return a.federation.LookupState(ctx, s, roomID, eventID, roomVersion)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespState{}, err
+ }
+ return ires.(gomatrixserverlib.RespState), nil
+}
+
+func (a *FederationSenderInternalAPI) LookupStateIDs(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string,
+) (res gomatrixserverlib.RespStateIDs, err error) {
+ ires, err := a.doRequest(s, func() (interface{}, error) {
+ return a.federation.LookupStateIDs(ctx, s, roomID, eventID)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespStateIDs{}, err
+ }
+ return ires.(gomatrixserverlib.RespStateIDs), nil
+}
+
+func (a *FederationSenderInternalAPI) GetEvent(
+ ctx context.Context, s gomatrixserverlib.ServerName, eventID string,
+) (res gomatrixserverlib.Transaction, err error) {
+ ires, err := a.doRequest(s, func() (interface{}, error) {
+ return a.federation.GetEvent(ctx, s, eventID)
+ })
+ if err != nil {
+ return gomatrixserverlib.Transaction{}, err
+ }
+ return ires.(gomatrixserverlib.Transaction), nil
+}
diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go
index 79e220c3..5bfe6089 100644
--- a/federationsender/inthttp/client.go
+++ b/federationsender/inthttp/client.go
@@ -26,6 +26,10 @@ const (
FederationSenderGetUserDevicesPath = "/federationsender/client/getUserDevices"
FederationSenderClaimKeysPath = "/federationsender/client/claimKeys"
FederationSenderQueryKeysPath = "/federationsender/client/queryKeys"
+ FederationSenderBackfillPath = "/federationsender/client/backfill"
+ FederationSenderLookupStatePath = "/federationsender/client/lookupState"
+ FederationSenderLookupStateIDsPath = "/federationsender/client/lookupStateIDs"
+ FederationSenderGetEventPath = "/federationsender/client/getEvent"
)
// NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API.
@@ -228,3 +232,129 @@ func (h *httpFederationSenderInternalAPI) QueryKeys(
}
return *response.Res, nil
}
+
+type backfill struct {
+ S gomatrixserverlib.ServerName
+ RoomID string
+ Limit int
+ EventIDs []string
+ Res *gomatrixserverlib.Transaction
+ Err *api.FederationClientError
+}
+
+func (h *httpFederationSenderInternalAPI) Backfill(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string,
+) (gomatrixserverlib.Transaction, error) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "Backfill")
+ defer span.Finish()
+
+ request := backfill{
+ S: s,
+ RoomID: roomID,
+ Limit: limit,
+ EventIDs: eventIDs,
+ }
+ var response backfill
+ apiURL := h.federationSenderURL + FederationSenderBackfillPath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
+ if err != nil {
+ return gomatrixserverlib.Transaction{}, err
+ }
+ if response.Err != nil {
+ return gomatrixserverlib.Transaction{}, response.Err
+ }
+ return *response.Res, nil
+}
+
+type lookupState struct {
+ S gomatrixserverlib.ServerName
+ RoomID string
+ EventID string
+ RoomVersion gomatrixserverlib.RoomVersion
+ Res *gomatrixserverlib.RespState
+ Err *api.FederationClientError
+}
+
+func (h *httpFederationSenderInternalAPI) LookupState(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
+) (gomatrixserverlib.RespState, error) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "LookupState")
+ defer span.Finish()
+
+ request := lookupState{
+ S: s,
+ RoomID: roomID,
+ EventID: eventID,
+ RoomVersion: roomVersion,
+ }
+ var response lookupState
+ apiURL := h.federationSenderURL + FederationSenderLookupStatePath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
+ if err != nil {
+ return gomatrixserverlib.RespState{}, err
+ }
+ if response.Err != nil {
+ return gomatrixserverlib.RespState{}, response.Err
+ }
+ return *response.Res, nil
+}
+
+type lookupStateIDs struct {
+ S gomatrixserverlib.ServerName
+ RoomID string
+ EventID string
+ Res *gomatrixserverlib.RespStateIDs
+ Err *api.FederationClientError
+}
+
+func (h *httpFederationSenderInternalAPI) LookupStateIDs(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string,
+) (gomatrixserverlib.RespStateIDs, error) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "LookupStateIDs")
+ defer span.Finish()
+
+ request := lookupStateIDs{
+ S: s,
+ RoomID: roomID,
+ EventID: eventID,
+ }
+ var response lookupStateIDs
+ apiURL := h.federationSenderURL + FederationSenderLookupStateIDsPath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
+ if err != nil {
+ return gomatrixserverlib.RespStateIDs{}, err
+ }
+ if response.Err != nil {
+ return gomatrixserverlib.RespStateIDs{}, response.Err
+ }
+ return *response.Res, nil
+}
+
+type getEvent struct {
+ S gomatrixserverlib.ServerName
+ EventID string
+ Res *gomatrixserverlib.Transaction
+ Err *api.FederationClientError
+}
+
+func (h *httpFederationSenderInternalAPI) GetEvent(
+ ctx context.Context, s gomatrixserverlib.ServerName, eventID string,
+) (gomatrixserverlib.Transaction, error) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "GetEvent")
+ defer span.Finish()
+
+ request := getEvent{
+ S: s,
+ EventID: eventID,
+ }
+ var response getEvent
+ apiURL := h.federationSenderURL + FederationSenderGetEventPath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response)
+ if err != nil {
+ return gomatrixserverlib.Transaction{}, err
+ }
+ if response.Err != nil {
+ return gomatrixserverlib.Transaction{}, response.Err
+ }
+ return *response.Res, nil
+}
diff --git a/federationsender/inthttp/server.go b/federationsender/inthttp/server.go
index b1825576..dfbff1c0 100644
--- a/federationsender/inthttp/server.go
+++ b/federationsender/inthttp/server.go
@@ -175,4 +175,92 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route
return util.JSONResponse{Code: http.StatusOK, JSON: request}
}),
)
+ internalAPIMux.Handle(
+ FederationSenderBackfillPath,
+ httputil.MakeInternalAPI("Backfill", func(req *http.Request) util.JSONResponse {
+ var request backfill
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ res, err := intAPI.Backfill(req.Context(), request.S, request.RoomID, request.Limit, request.EventIDs)
+ if err != nil {
+ ferr, ok := err.(*api.FederationClientError)
+ if ok {
+ request.Err = ferr
+ } else {
+ request.Err = &api.FederationClientError{
+ Err: err.Error(),
+ }
+ }
+ }
+ request.Res = &res
+ return util.JSONResponse{Code: http.StatusOK, JSON: request}
+ }),
+ )
+ internalAPIMux.Handle(
+ FederationSenderLookupStatePath,
+ httputil.MakeInternalAPI("LookupState", func(req *http.Request) util.JSONResponse {
+ var request lookupState
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ res, err := intAPI.LookupState(req.Context(), request.S, request.RoomID, request.EventID, request.RoomVersion)
+ if err != nil {
+ ferr, ok := err.(*api.FederationClientError)
+ if ok {
+ request.Err = ferr
+ } else {
+ request.Err = &api.FederationClientError{
+ Err: err.Error(),
+ }
+ }
+ }
+ request.Res = &res
+ return util.JSONResponse{Code: http.StatusOK, JSON: request}
+ }),
+ )
+ internalAPIMux.Handle(
+ FederationSenderLookupStateIDsPath,
+ httputil.MakeInternalAPI("LookupStateIDs", func(req *http.Request) util.JSONResponse {
+ var request lookupStateIDs
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ res, err := intAPI.LookupStateIDs(req.Context(), request.S, request.RoomID, request.EventID)
+ if err != nil {
+ ferr, ok := err.(*api.FederationClientError)
+ if ok {
+ request.Err = ferr
+ } else {
+ request.Err = &api.FederationClientError{
+ Err: err.Error(),
+ }
+ }
+ }
+ request.Res = &res
+ return util.JSONResponse{Code: http.StatusOK, JSON: request}
+ }),
+ )
+ internalAPIMux.Handle(
+ FederationSenderGetEventPath,
+ httputil.MakeInternalAPI("GetEvent", func(req *http.Request) util.JSONResponse {
+ var request getEvent
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ res, err := intAPI.GetEvent(req.Context(), request.S, request.EventID)
+ if err != nil {
+ ferr, ok := err.(*api.FederationClientError)
+ if ok {
+ request.Err = ferr
+ } else {
+ request.Err = &api.FederationClientError{
+ Err: err.Error(),
+ }
+ }
+ }
+ request.Res = &res
+ return util.JSONResponse{Code: http.StatusOK, JSON: request}
+ }),
+ )
}
diff --git a/go.mod b/go.mod
index c6906805..3a9fef9f 100644
--- a/go.mod
+++ b/go.mod
@@ -21,7 +21,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
- github.com/matrix-org/gomatrixserverlib v0.0.0-20200817100842-9d02141812f2
+ github.com/matrix-org/gomatrixserverlib v0.0.0-20200902135805-f7a5b5e89750
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.2
diff --git a/go.sum b/go.sum
index 332ae05f..33b4f591 100644
--- a/go.sum
+++ b/go.sum
@@ -567,8 +567,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
-github.com/matrix-org/gomatrixserverlib v0.0.0-20200817100842-9d02141812f2 h1:9wKwfd5KDcXuqZ7/kAaYe0QM4DGM+2awjjvXQtrDa6k=
-github.com/matrix-org/gomatrixserverlib v0.0.0-20200817100842-9d02141812f2/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20200902135805-f7a5b5e89750 h1:k5vsLfpylXHOXgN51N0QNbak9i+4bT33Puk/ZJgcdDw=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20200902135805-f7a5b5e89750/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 1897f7a5..8ac1bdda 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -22,7 +22,7 @@ type RoomserverInternalAPI struct {
Cache caching.RoomServerCaches
ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier
- FedClient *gomatrixserverlib.FederationClient
+ fsAPI fsAPI.FederationSenderInternalAPI
OutputRoomEventTopic string // Kafka topic for new output room events
Inviter *perform.Inviter
Joiner *perform.Joiner
@@ -30,12 +30,11 @@ type RoomserverInternalAPI struct {
Publisher *perform.Publisher
Backfiller *perform.Backfiller
mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent
- fsAPI fsAPI.FederationSenderInternalAPI
}
func NewRoomserverAPI(
cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer,
- outputRoomEventTopic string, caches caching.RoomServerCaches, fedClient *gomatrixserverlib.FederationClient,
+ outputRoomEventTopic string, caches caching.RoomServerCaches,
keyRing gomatrixserverlib.JSONVerifier,
) *RoomserverInternalAPI {
a := &RoomserverInternalAPI{
@@ -45,7 +44,6 @@ func NewRoomserverAPI(
Cache: caches,
ServerName: cfg.Matrix.ServerName,
KeyRing: keyRing,
- FedClient: fedClient,
OutputRoomEventTopic: outputRoomEventTopic,
// perform-er structs get initialised when we have a federation sender to use
}
@@ -83,7 +81,7 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
r.Backfiller = &perform.Backfiller{
ServerName: r.ServerName,
DB: r.DB,
- FedClient: r.FedClient,
+ FSAPI: r.fsAPI,
KeyRing: r.KeyRing,
}
}
diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go
index ebb66ef4..d345e9c7 100644
--- a/roomserver/internal/perform/perform_backfill.go
+++ b/roomserver/internal/perform/perform_backfill.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
+ federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/auth"
@@ -18,7 +19,7 @@ import (
type Backfiller struct {
ServerName gomatrixserverlib.ServerName
DB storage.Database
- FedClient *gomatrixserverlib.FederationClient
+ FSAPI federationSenderAPI.FederationSenderInternalAPI
KeyRing gomatrixserverlib.JSONVerifier
}
@@ -81,7 +82,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
if info == nil || info.IsStub {
return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID)
}
- requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities)
+ requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities)
// Request 100 items regardless of what the query asks for.
// We don't want to go much higher than this.
// We can't honour exactly the limit as some sytests rely on requesting more for tests to pass
@@ -166,7 +167,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
continue // already found
}
logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id)
- res, err := r.FedClient.GetEvent(ctx, srv, id)
+ res, err := r.FSAPI.GetEvent(ctx, srv, id)
if err != nil {
logger.WithError(err).Warn("failed to get event from server")
continue
@@ -201,7 +202,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
// backfillRequester implements gomatrixserverlib.BackfillRequester
type backfillRequester struct {
db storage.Database
- fedClient *gomatrixserverlib.FederationClient
+ fsAPI federationSenderAPI.FederationSenderInternalAPI
thisServer gomatrixserverlib.ServerName
bwExtrems map[string][]string
@@ -211,10 +212,10 @@ type backfillRequester struct {
eventIDMap map[string]gomatrixserverlib.Event
}
-func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester {
+func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester {
return &backfillRequester{
db: db,
- fedClient: fedClient,
+ fsAPI: fsAPI,
thisServer: thisServer,
eventIDToBeforeStateIDs: make(map[string][]string),
eventIDMap: make(map[string]gomatrixserverlib.Event),
@@ -258,7 +259,7 @@ FederationHit:
logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event")
for _, srv := range b.servers { // hit any valid server
c := gomatrixserverlib.FederatedStateProvider{
- FedClient: b.fedClient,
+ FedClient: b.fsAPI,
RememberAuthEvents: false,
Server: srv,
}
@@ -331,7 +332,7 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr
}
c := gomatrixserverlib.FederatedStateProvider{
- FedClient: b.fedClient,
+ FedClient: b.fsAPI,
RememberAuthEvents: false,
Server: b.servers[0],
}
@@ -430,10 +431,10 @@ FindSuccessor:
// Backfill performs a backfill request to the given server.
// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string,
- fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) {
+ limit int, fromEventIDs []string) (gomatrixserverlib.Transaction, error) {
- tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs)
- return &tx, err
+ tx, err := b.fsAPI.Backfill(ctx, server, roomID, limit, fromEventIDs)
+ return tx, err
}
func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) {
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index a428ad57..2eabf450 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -38,7 +38,6 @@ func AddInternalRoutes(router *mux.Router, intAPI api.RoomserverInternalAPI) {
func NewInternalAPI(
base *setup.BaseDendrite,
keyRing gomatrixserverlib.JSONVerifier,
- fedClient *gomatrixserverlib.FederationClient,
) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer
@@ -49,6 +48,6 @@ func NewInternalAPI(
return internal.NewRoomserverAPI(
cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
- base.Caches, fedClient, keyRing,
+ base.Caches, keyRing,
)
}
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index bcd9afb3..0deb7acb 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -112,7 +112,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js
Cfg: cfg,
}
- rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{}, nil)
+ rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{})
hevents := mustLoadEvents(t, ver, events)
_, err = api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil)
if err != nil {