diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-09-02 15:26:30 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-02 15:26:30 +0100 |
commit | 096191ca240776031370e99b93732557972ba92a (patch) | |
tree | 79f4247556a3dbd01fc8dca2fac68f752800a6d7 /roomserver | |
parent | e473320e733484b1cc6da0588fd2ccf4affb3d24 (diff) |
Use federation sender for backfill/getting missing events (#1379)
* Use federation sender for backfill and getting missing events
* Fix internal URL paths
* Update go.mod/go.sum for matrix-org/gomatrixserverlib#218
* Add missing server implementations in HTTP interface
Diffstat (limited to 'roomserver')
-rw-r--r-- | roomserver/internal/api.go | 8 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_backfill.go | 23 | ||||
-rw-r--r-- | roomserver/roomserver.go | 3 | ||||
-rw-r--r-- | roomserver/roomserver_test.go | 2 |
4 files changed, 17 insertions, 19 deletions
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 1897f7a5..8ac1bdda 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -22,7 +22,7 @@ type RoomserverInternalAPI struct { Cache caching.RoomServerCaches ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier - FedClient *gomatrixserverlib.FederationClient + fsAPI fsAPI.FederationSenderInternalAPI OutputRoomEventTopic string // Kafka topic for new output room events Inviter *perform.Inviter Joiner *perform.Joiner @@ -30,12 +30,11 @@ type RoomserverInternalAPI struct { Publisher *perform.Publisher Backfiller *perform.Backfiller mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent - fsAPI fsAPI.FederationSenderInternalAPI } func NewRoomserverAPI( cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer, - outputRoomEventTopic string, caches caching.RoomServerCaches, fedClient *gomatrixserverlib.FederationClient, + outputRoomEventTopic string, caches caching.RoomServerCaches, keyRing gomatrixserverlib.JSONVerifier, ) *RoomserverInternalAPI { a := &RoomserverInternalAPI{ @@ -45,7 +44,6 @@ func NewRoomserverAPI( Cache: caches, ServerName: cfg.Matrix.ServerName, KeyRing: keyRing, - FedClient: fedClient, OutputRoomEventTopic: outputRoomEventTopic, // perform-er structs get initialised when we have a federation sender to use } @@ -83,7 +81,7 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen r.Backfiller = &perform.Backfiller{ ServerName: r.ServerName, DB: r.DB, - FedClient: r.FedClient, + FSAPI: r.fsAPI, KeyRing: r.KeyRing, } } diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index ebb66ef4..d345e9c7 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/auth" @@ -18,7 +19,7 @@ import ( type Backfiller struct { ServerName gomatrixserverlib.ServerName DB storage.Database - FedClient *gomatrixserverlib.FederationClient + FSAPI federationSenderAPI.FederationSenderInternalAPI KeyRing gomatrixserverlib.JSONVerifier } @@ -81,7 +82,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform if info == nil || info.IsStub { return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID) } - requester := newBackfillRequester(r.DB, r.FedClient, r.ServerName, req.BackwardsExtremities) + requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities) // Request 100 items regardless of what the query asks for. // We don't want to go much higher than this. // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass @@ -166,7 +167,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom continue // already found } logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id) - res, err := r.FedClient.GetEvent(ctx, srv, id) + res, err := r.FSAPI.GetEvent(ctx, srv, id) if err != nil { logger.WithError(err).Warn("failed to get event from server") continue @@ -201,7 +202,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom // backfillRequester implements gomatrixserverlib.BackfillRequester type backfillRequester struct { db storage.Database - fedClient *gomatrixserverlib.FederationClient + fsAPI federationSenderAPI.FederationSenderInternalAPI thisServer gomatrixserverlib.ServerName bwExtrems map[string][]string @@ -211,10 +212,10 @@ type backfillRequester struct { eventIDMap map[string]gomatrixserverlib.Event } -func newBackfillRequester(db storage.Database, fedClient *gomatrixserverlib.FederationClient, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { +func newBackfillRequester(db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string) *backfillRequester { return &backfillRequester{ db: db, - fedClient: fedClient, + fsAPI: fsAPI, thisServer: thisServer, eventIDToBeforeStateIDs: make(map[string][]string), eventIDMap: make(map[string]gomatrixserverlib.Event), @@ -258,7 +259,7 @@ FederationHit: logrus.WithField("event_id", targetEvent.EventID()).Info("Requesting /state_ids at event") for _, srv := range b.servers { // hit any valid server c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fedClient, + FedClient: b.fsAPI, RememberAuthEvents: false, Server: srv, } @@ -331,7 +332,7 @@ func (b *backfillRequester) StateBeforeEvent(ctx context.Context, roomVer gomatr } c := gomatrixserverlib.FederatedStateProvider{ - FedClient: b.fedClient, + FedClient: b.fsAPI, RememberAuthEvents: false, Server: b.servers[0], } @@ -430,10 +431,10 @@ FindSuccessor: // Backfill performs a backfill request to the given server. // https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, - fromEventIDs []string, limit int) (*gomatrixserverlib.Transaction, error) { + limit int, fromEventIDs []string) (gomatrixserverlib.Transaction, error) { - tx, err := b.fedClient.Backfill(ctx, server, roomID, limit, fromEventIDs) - return &tx, err + tx, err := b.fsAPI.Backfill(ctx, server, roomID, limit, fromEventIDs) + return tx, err } func (b *backfillRequester) ProvideEvents(roomVer gomatrixserverlib.RoomVersion, eventIDs []string) ([]gomatrixserverlib.Event, error) { diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index a428ad57..2eabf450 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -38,7 +38,6 @@ func AddInternalRoutes(router *mux.Router, intAPI api.RoomserverInternalAPI) { func NewInternalAPI( base *setup.BaseDendrite, keyRing gomatrixserverlib.JSONVerifier, - fedClient *gomatrixserverlib.FederationClient, ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer @@ -49,6 +48,6 @@ func NewInternalAPI( return internal.NewRoomserverAPI( cfg, roomserverDB, base.KafkaProducer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), - base.Caches, fedClient, keyRing, + base.Caches, keyRing, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index bcd9afb3..0deb7acb 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -112,7 +112,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js Cfg: cfg, } - rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{}, nil) + rsAPI := NewInternalAPI(base, &test.NopJSONVerifier{}) hevents := mustLoadEvents(t, ver, events) _, err = api.SendEvents(ctx, rsAPI, hevents, testOrigin, nil) if err != nil { |