diff options
136 files changed, 1213 insertions, 1788 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go index 782e090a..fd903b98 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -30,7 +30,7 @@ import ( "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/appservice/workers" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/kafka" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -45,7 +45,7 @@ func AddInternalRoutes(router *mux.Router, queryAPI appserviceAPI.AppServiceQuer // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - base *setup.BaseDendrite, + base *base.BaseDendrite, userAPI userapi.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, ) appserviceAPI.AppServiceQueryAPI { diff --git a/build/docker/config/dendrite-config.yaml b/build/docker/config/dendrite-config.yaml index bc5d6699..d6357747 100644 --- a/build/docker/config/dendrite-config.yaml +++ b/build/docker/config/dendrite-config.yaml @@ -174,6 +174,11 @@ federation_api: connect: http://federation_api:7772 external_api: listen: http://0.0.0.0:8072 + database: + connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_federationapi?sslmode=disable + max_open_conns: 10 + max_idle_conns: 2 + conn_max_lifetime: -1 # List of paths to X.509 certificates to be used by the external federation listeners. # These certificates will be used to calculate the TLS fingerprints and other servers @@ -181,17 +186,6 @@ federation_api: # format. federation_certificates: [] -# Configuration for the Federation Sender. -federation_sender: - internal_api: - listen: http://0.0.0.0:7775 - connect: http://federation_sender:7775 - database: - connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_federationsender?sslmode=disable - max_open_conns: 10 - max_idle_conns: 2 - conn_max_lifetime: -1 - # How many times we will try to resend a failed transaction to a specific server. The # backoff is 2**x seconds, so 1 = 2 seconds, 2 = 4 seconds, 3 = 8 seconds etc. send_max_retries: 16 @@ -207,6 +201,22 @@ federation_sender: host: localhost port: 8080 + # Perspective keyservers to use as a backup when direct key fetches fail. This may + # be required to satisfy key requests for servers that are no longer online when + # joining some rooms. + key_perspectives: + - server_name: matrix.org + keys: + - key_id: ed25519:auto + public_key: Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw + - key_id: ed25519:a_RXGa + public_key: l8Hft5qXKn1vfHrg3p4+W8gELQVo8N13JkluMfmn2sQ + + # This option will control whether Dendrite will prefer to look up keys directly + # or whether it should try perspective servers first, using direct fetches as a + # last resort. + prefer_direct_fetch: false + # Configuration for the Key Server (for end-to-end encryption). key_server: internal_api: @@ -267,33 +277,6 @@ room_server: max_idle_conns: 2 conn_max_lifetime: -1 -# Configuration for the Server Key API (for server signing keys). -signing_key_server: - internal_api: - listen: http://0.0.0.0:7780 - connect: http://signing_key_server:7780 - database: - connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_signingkeyserver?sslmode=disable - max_open_conns: 10 - max_idle_conns: 2 - conn_max_lifetime: -1 - - # Perspective keyservers to use as a backup when direct key fetches fail. This may - # be required to satisfy key requests for servers that are no longer online when - # joining some rooms. - key_perspectives: - - server_name: matrix.org - keys: - - key_id: ed25519:auto - public_key: Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw - - key_id: ed25519:a_RXGa - public_key: l8Hft5qXKn1vfHrg3p4+W8gELQVo8N13JkluMfmn2sQ - - # This option will control whether Dendrite will prefer to look up keys directly - # or whether it should try perspective servers first, using direct fetches as a - # last resort. - prefer_direct_fetch: false - # Configuration for the Sync API. sync_api: internal_api: diff --git a/build/docker/docker-compose.polylith.yml b/build/docker/docker-compose.polylith.yml index 5174830b..9bbd6a8f 100644 --- a/build/docker/docker-compose.polylith.yml +++ b/build/docker/docker-compose.polylith.yml @@ -54,15 +54,6 @@ services: - ./config:/etc/dendrite networks: - internal - - federation_sender: - hostname: federation_sender - image: matrixdotorg/dendrite-polylith:latest - command: federationsender - volumes: - - ./config:/etc/dendrite - networks: - - internal key_server: hostname: key_server @@ -72,16 +63,7 @@ services: - ./config:/etc/dendrite networks: - internal - - signing_key_server: - hostname: signing_key_server - image: matrixdotorg/dendrite-polylith:latest - command: signingkeyserver - volumes: - - ./config:/etc/dendrite - networks: - - internal - + user_api: hostname: user_api image: matrixdotorg/dendrite-polylith:latest diff --git a/build/docker/postgres/create_db.sh b/build/docker/postgres/create_db.sh index eb77bb26..a7107e27 100755 --- a/build/docker/postgres/create_db.sh +++ b/build/docker/postgres/create_db.sh @@ -1,5 +1,5 @@ #!/bin/sh -for db in userapi_accounts userapi_devices mediaapi syncapi roomserver signingkeyserver keyserver federationsender appservice naffka; do +for db in userapi_accounts userapi_devices mediaapi syncapi roomserver keyserver federationapi appservice naffka; do createdb -U dendrite -O dendrite dendrite_$db done diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index 21dc128c..46fe24ce 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -24,12 +24,13 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationsender" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/dendrite/userapi" @@ -270,9 +271,8 @@ func (m *DendriteMonolith) Start() { cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-mediaapi.db", m.CacheDirectory, prefix)) cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-syncapi.db", m.StorageDirectory, prefix)) cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-roomserver.db", m.StorageDirectory, prefix)) - cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-signingkeyserver.db", m.StorageDirectory, prefix)) cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-keyserver.db", m.StorageDirectory, prefix)) - cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-federationsender.db", m.StorageDirectory, prefix)) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-federationapi.db", m.StorageDirectory, prefix)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-appservice.db", m.StorageDirectory, prefix)) cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/media", m.CacheDirectory)) cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/media", m.CacheDirectory)) @@ -281,7 +281,7 @@ func (m *DendriteMonolith) Start() { panic(err) } - base := setup.NewBaseDendrite(cfg, "Monolith", false) + base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() @@ -290,12 +290,10 @@ func (m *DendriteMonolith) Start() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - rsAPI := roomserver.NewInternalAPI( - base, keyRing, - ) + rsAPI := roomserver.NewInternalAPI(base) - fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, true, + fsAPI := federationapi.NewInternalAPI( + base, federation, rsAPI, base.Caches, true, ) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) @@ -310,7 +308,8 @@ func (m *DendriteMonolith) Start() { // The underlying roomserver implementation needs to be able to call the fedsender. // This is different to rsAPI which can be the http client which doesn't need this dependency - rsAPI.SetFederationSenderAPI(fsAPI) + rsAPI.SetFederationAPI(fsAPI) + rsAPI.SetKeyring(keyRing) monolith := setup.Monolith{ Config: base.Cfg, @@ -321,7 +320,7 @@ func (m *DendriteMonolith) Start() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, + FederationAPI: fsAPI, RoomserverAPI: rsAPI, UserAPI: m.userAPI, KeyAPI: keyAPI, diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go index 693e9236..fcfb7a25 100644 --- a/build/gobind-yggdrasil/monolith.go +++ b/build/gobind-yggdrasil/monolith.go @@ -15,12 +15,13 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationsender" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" @@ -92,9 +93,8 @@ func (m *DendriteMonolith) Start() { cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-mediaapi.db", m.StorageDirectory)) cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-syncapi.db", m.StorageDirectory)) cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-roomserver.db", m.StorageDirectory)) - cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-signingkeyserver.db", m.StorageDirectory)) cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-keyserver.db", m.StorageDirectory)) - cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-federationsender.db", m.StorageDirectory)) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-federationapi.db", m.StorageDirectory)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-appservice.db", m.StorageDirectory)) cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory)) cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory)) @@ -102,7 +102,7 @@ func (m *DendriteMonolith) Start() { panic(err) } - base := setup.NewBaseDendrite(cfg, "Monolith", false) + base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() @@ -111,12 +111,10 @@ func (m *DendriteMonolith) Start() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - rsAPI := roomserver.NewInternalAPI( - base, keyRing, - ) + rsAPI := roomserver.NewInternalAPI(base) - fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, true, + fsAPI := federationapi.NewInternalAPI( + base, federation, rsAPI, base.Caches, true, ) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation) @@ -132,7 +130,8 @@ func (m *DendriteMonolith) Start() { // The underlying roomserver implementation needs to be able to call the fedsender. // This is different to rsAPI which can be the http client which doesn't need this dependency - rsAPI.SetFederationSenderAPI(fsAPI) + rsAPI.SetFederationAPI(fsAPI) + rsAPI.SetKeyring(keyRing) monolith := setup.Monolith{ Config: base.Cfg, @@ -141,12 +140,12 @@ func (m *DendriteMonolith) Start() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( ygg, fsAPI, federation, ), diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 562d89d2..64a7aa5e 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -21,7 +21,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/routing" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/transactions" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -43,7 +43,7 @@ func AddPublicRoutes( eduInputAPI eduServerAPI.EDUServerInputAPI, asAPI appserviceAPI.AppServiceQueryAPI, transactionsCache *transactions.Cache, - fsAPI federationSenderAPI.FederationSenderInternalAPI, + fsAPI federationAPI.FederationInternalAPI, userAPI userapi.UserInternalAPI, keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, diff --git a/clientapi/routing/directory.go b/clientapi/routing/directory.go index 96cb0262..e408c264 100644 --- a/clientapi/routing/directory.go +++ b/clientapi/routing/directory.go @@ -20,7 +20,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -47,7 +47,7 @@ func DirectoryRoom( federation *gomatrixserverlib.FederationClient, cfg *config.ClientAPI, rsAPI roomserverAPI.RoomserverInternalAPI, - fedSenderAPI federationSenderAPI.FederationSenderInternalAPI, + fedSenderAPI federationAPI.FederationInternalAPI, ) util.JSONResponse { _, domain, err := gomatrixserverlib.SplitID('#', roomAlias) if err != nil { @@ -96,8 +96,8 @@ func DirectoryRoom( } } } else { - joinedHostsReq := federationSenderAPI.QueryJoinedHostServerNamesInRoomRequest{RoomID: res.RoomID} - var joinedHostsRes federationSenderAPI.QueryJoinedHostServerNamesInRoomResponse + joinedHostsReq := federationAPI.QueryJoinedHostServerNamesInRoomRequest{RoomID: res.RoomID} + var joinedHostsRes federationAPI.QueryJoinedHostServerNamesInRoomResponse if err = fedSenderAPI.QueryJoinedHostServerNamesInRoom(req.Context(), &joinedHostsReq, &joinedHostsRes); err != nil { util.GetLogger(req.Context()).WithError(err).Error("fedSenderAPI.QueryJoinedHostServerNamesInRoom failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 30ecc292..813d9d16 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -27,7 +27,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/internal/transactions" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" @@ -56,7 +56,7 @@ func Setup( federation *gomatrixserverlib.FederationClient, syncProducer *producers.SyncAPIProducer, transactionsCache *transactions.Cache, - federationSender federationSenderAPI.FederationSenderInternalAPI, + federationSender federationAPI.FederationInternalAPI, keyAPI keyserverAPI.KeyInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, mscCfg *config.MSCs, diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 7606e418..6a1428d5 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -30,14 +30,13 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed" "github.com/matrix-org/dendrite/eduserver" - "github.com/matrix-org/dendrite/federationsender" + "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/mscs" - "github.com/matrix-org/dendrite/signingkeyserver" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" @@ -50,7 +49,7 @@ import ( func createKeyDB( base *P2PDendrite, - db gomatrixserverlib.KeyDatabase, + db *gomatrixserverlib.KeyRing, ) { mdns := mDNSListener{ host: base.LibP2P, @@ -125,14 +124,13 @@ func main() { cfg.Global.PrivateKey = privKey cfg.Global.KeyID = gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", *instanceName)) cfg.Global.Kafka.UseNaffka = true - cfg.FederationSender.FederationMaxRetries = 6 + cfg.FederationAPI.FederationMaxRetries = 6 cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) - cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-signingkeyserver.db", *instanceName)) - cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName)) @@ -151,32 +149,29 @@ func main() { userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) keyAPI.SetUserAPI(userAPI) - serverKeyAPI := signingkeyserver.NewInternalAPI( - &base.Base.Cfg.SigningKeyServer, federation, base.Base.Caches, - ) - keyRing := serverKeyAPI.KeyRing() - createKeyDB( - base, serverKeyAPI, - ) - rsAPI := roomserver.NewInternalAPI( - &base.Base, keyRing, + &base.Base, ) eduInputAPI := eduserver.NewInternalAPI( &base.Base, cache.New(), userAPI, ) asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) - fsAPI := federationsender.NewInternalAPI( - &base.Base, federation, rsAPI, keyRing, true, + fsAPI := federationapi.NewInternalAPI( + &base.Base, federation, rsAPI, base.Base.Caches, true, ) - rsAPI.SetFederationSenderAPI(fsAPI) + keyRing := fsAPI.KeyRing() + rsAPI.SetFederationAPI(fsAPI) provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI) err = provider.Start() if err != nil { panic("failed to create new public rooms provider: " + err.Error()) } + createKeyDB( + base, keyRing, + ) + monolith := setup.Monolith{ Config: base.Base.Cfg, AccountDB: accountDB, @@ -186,9 +181,8 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, + FederationAPI: fsAPI, RoomserverAPI: rsAPI, - ServerKeyAPI: serverKeyAPI, UserAPI: userAPI, KeyAPI: keyAPI, ExtPublicRoomsProvider: provider, diff --git a/cmd/dendrite-demo-libp2p/mdnslistener.go b/cmd/dendrite-demo-libp2p/mdnslistener.go index c30aaa33..c6105e52 100644 --- a/cmd/dendrite-demo-libp2p/mdnslistener.go +++ b/cmd/dendrite-demo-libp2p/mdnslistener.go @@ -25,7 +25,7 @@ import ( ) type mDNSListener struct { - keydb gomatrixserverlib.KeyDatabase + keydb *gomatrixserverlib.KeyRing host host.Host } @@ -35,7 +35,7 @@ func (n *mDNSListener) HandlePeerFound(p peer.AddrInfo) { } if pubkey, err := p.ID.ExtractPublicKey(); err == nil { raw, _ := pubkey.Raw() - if err := n.keydb.StoreKeys( + if err := n.keydb.KeyDatabase.StoreKeys( context.Background(), map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{ { diff --git a/cmd/dendrite-demo-libp2p/p2pdendrite.go b/cmd/dendrite-demo-libp2p/p2pdendrite.go index 45eb42a9..ba1868b2 100644 --- a/cmd/dendrite-demo-libp2p/p2pdendrite.go +++ b/cmd/dendrite-demo-libp2p/p2pdendrite.go @@ -22,7 +22,6 @@ import ( pstore "github.com/libp2p/go-libp2p-core/peerstore" record "github.com/libp2p/go-libp2p-record" - "github.com/matrix-org/dendrite/setup" "github.com/libp2p/go-libp2p" circuit "github.com/libp2p/go-libp2p-circuit" @@ -34,12 +33,13 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) // P2PDendrite is a Peer-to-Peer variant of BaseDendrite. type P2PDendrite struct { - Base setup.BaseDendrite + Base base.BaseDendrite // Store our libp2p object so that we can make outgoing connections from it // later @@ -54,7 +54,7 @@ type P2PDendrite struct { // The componentName is used for logging purposes, and should be a friendly name // of the component running, e.g. SyncAPI. func NewP2PDendrite(cfg *config.Dendrite, componentName string) *P2PDendrite { - baseDendrite := setup.NewBaseDendrite(cfg, componentName, false) + baseDendrite := base.NewBaseDendrite(cfg, componentName) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/dendrite-demo-pinecone/conn/client.go b/cmd/dendrite-demo-pinecone/conn/client.go index 29436fda..40ccb9c0 100644 --- a/cmd/dendrite-demo-pinecone/conn/client.go +++ b/cmd/dendrite-demo-pinecone/conn/client.go @@ -7,7 +7,7 @@ import ( "net/http" "strings" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/gomatrixserverlib" "nhooyr.io/websocket" @@ -70,7 +70,7 @@ func createTransport(s *pineconeSessions.Sessions) *http.Transport { } func CreateClient( - base *setup.BaseDendrite, s *pineconeSessions.Sessions, + base *base.BaseDendrite, s *pineconeSessions.Sessions, ) *gomatrixserverlib.Client { return gomatrixserverlib.NewClient( gomatrixserverlib.WithTransport(createTransport(s)), @@ -78,7 +78,7 @@ func CreateClient( } func CreateFederationClient( - base *setup.BaseDendrite, s *pineconeSessions.Sessions, + base *base.BaseDendrite, s *pineconeSessions.Sessions, ) *gomatrixserverlib.FederationClient { return gomatrixserverlib.NewFederationClient( base.Cfg.Global.ServerName, diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 60b83dc7..9afa53a6 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -37,13 +37,14 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationsender" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" @@ -151,9 +152,8 @@ func main() { cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) - cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-signingkeyserver.db", *instanceName)) cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) - cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} @@ -161,7 +161,7 @@ func main() { panic(err) } - base := setup.NewBaseDendrite(cfg, "Monolith", false) + base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() @@ -170,12 +170,10 @@ func main() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - rsComponent := roomserver.NewInternalAPI( - base, keyRing, - ) + rsComponent := roomserver.NewInternalAPI(base) rsAPI := rsComponent - fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, true, + fsAPI := federationapi.NewInternalAPI( + base, federation, rsAPI, base.Caches, true, ) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) @@ -188,7 +186,8 @@ func main() { asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) - rsComponent.SetFederationSenderAPI(fsAPI) + rsComponent.SetFederationAPI(fsAPI) + rsComponent.SetKeyring(keyRing) monolith := setup.Monolith{ Config: base.Cfg, @@ -199,7 +198,7 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, + FederationAPI: fsAPI, RoomserverAPI: rsAPI, UserAPI: userAPI, KeyAPI: keyAPI, diff --git a/cmd/dendrite-demo-pinecone/rooms/rooms.go b/cmd/dendrite-demo-pinecone/rooms/rooms.go index fc387a17..5972d129 100644 --- a/cmd/dendrite-demo-pinecone/rooms/rooms.go +++ b/cmd/dendrite-demo-pinecone/rooms/rooms.go @@ -19,7 +19,7 @@ import ( "sync" "time" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -30,14 +30,14 @@ import ( type PineconeRoomProvider struct { r *pineconeRouter.Router s *pineconeSessions.Sessions - fedSender api.FederationSenderInternalAPI + fedSender api.FederationInternalAPI fedClient *gomatrixserverlib.FederationClient } func NewPineconeRoomProvider( r *pineconeRouter.Router, s *pineconeSessions.Sessions, - fedSender api.FederationSenderInternalAPI, + fedSender api.FederationInternalAPI, fedClient *gomatrixserverlib.FederationClient, ) *PineconeRoomProvider { p := &PineconeRoomProvider{ diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 1b5e1a51..2663aabb 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -31,13 +31,14 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationsender" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/mscs" "github.com/matrix-org/dendrite/userapi" @@ -82,9 +83,8 @@ func main() { cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) - cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-signingkeyserver.db", *instanceName)) cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) - cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", *instanceName)) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) cfg.MSCs.MSCs = []string{"msc2836"} @@ -93,7 +93,7 @@ func main() { panic(err) } - base := setup.NewBaseDendrite(cfg, "Monolith", false) + base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() @@ -107,7 +107,7 @@ func main() { keyAPI.SetUserAPI(userAPI) rsComponent := roomserver.NewInternalAPI( - base, keyRing, + base, ) rsAPI := rsComponent @@ -117,11 +117,12 @@ func main() { asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) rsAPI.SetAppserviceAPI(asAPI) - fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, true, + fsAPI := federationapi.NewInternalAPI( + base, federation, rsAPI, base.Caches, true, ) - rsComponent.SetFederationSenderAPI(fsAPI) + rsComponent.SetFederationAPI(fsAPI) + rsComponent.SetKeyring(keyRing) monolith := setup.Monolith{ Config: base.Cfg, @@ -130,12 +131,12 @@ func main() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( ygg, fsAPI, federation, ), diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/client.go b/cmd/dendrite-demo-yggdrasil/yggconn/client.go index c7409c21..358d3725 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/client.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/client.go @@ -4,7 +4,7 @@ import ( "net/http" "time" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/gomatrixserverlib" ) @@ -18,7 +18,7 @@ func (y *yggroundtripper) RoundTrip(req *http.Request) (*http.Response, error) { } func (n *Node) CreateClient( - base *setup.BaseDendrite, + base *base.BaseDendrite, ) *gomatrixserverlib.Client { tr := &http.Transport{} tr.RegisterProtocol( @@ -39,7 +39,7 @@ func (n *Node) CreateClient( } func (n *Node) CreateFederationClient( - base *setup.BaseDendrite, + base *base.BaseDendrite, ) *gomatrixserverlib.FederationClient { tr := &http.Transport{} tr.RegisterProtocol( diff --git a/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go b/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go index 0174e84d..402b86ed 100644 --- a/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go +++ b/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go @@ -20,19 +20,19 @@ import ( "time" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) type YggdrasilRoomProvider struct { node *yggconn.Node - fedSender api.FederationSenderInternalAPI + fedSender api.FederationInternalAPI fedClient *gomatrixserverlib.FederationClient } func NewYggdrasilRoomProvider( - node *yggconn.Node, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient, + node *yggconn.Node, fedSender api.FederationInternalAPI, fedClient *gomatrixserverlib.FederationClient, ) *YggdrasilRoomProvider { p := &YggdrasilRoomProvider{ node: node, diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index ec8751df..0e55e7ba 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -21,14 +21,14 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationsender" + "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/mscs" - "github.com/matrix-org/dendrite/signingkeyserver" "github.com/matrix-org/dendrite/userapi" uapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" @@ -51,7 +51,7 @@ func main() { httpAddr := config.HTTPAddress("http://" + *httpBindAddr) httpsAddr := config.HTTPAddress("https://" + *httpsBindAddr) httpAPIAddr := httpAddr - + options := []basepkg.BaseDendriteOptions{} if *enableHTTPAPIs { logrus.Warnf("DANGER! The -api option is enabled, exposing internal APIs on %q!", *apiBindAddr) httpAPIAddr = config.HTTPAddress("http://" + *apiBindAddr) @@ -63,32 +63,20 @@ func main() { cfg.ClientAPI.InternalAPI.Connect = httpAPIAddr cfg.EDUServer.InternalAPI.Connect = httpAPIAddr cfg.FederationAPI.InternalAPI.Connect = httpAPIAddr - cfg.FederationSender.InternalAPI.Connect = httpAPIAddr cfg.KeyServer.InternalAPI.Connect = httpAPIAddr cfg.MediaAPI.InternalAPI.Connect = httpAPIAddr cfg.RoomServer.InternalAPI.Connect = httpAPIAddr - cfg.SigningKeyServer.InternalAPI.Connect = httpAPIAddr cfg.SyncAPI.InternalAPI.Connect = httpAPIAddr + options = append(options, basepkg.UseHTTPAPIs) } - base := setup.NewBaseDendrite(cfg, "Monolith", *enableHTTPAPIs) + base := basepkg.NewBaseDendrite(cfg, "Monolith", options...) defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() federation := base.CreateFederationClient() - skAPI := signingkeyserver.NewInternalAPI( - &base.Cfg.SigningKeyServer, federation, base.Caches, - ) - if base.UseHTTPAPIs { - signingkeyserver.AddInternalRoutes(base.InternalAPIMux, skAPI, base.Caches) - skAPI = base.SigningKeyServerHTTPClient() - } - keyRing := skAPI.KeyRing() - - rsImpl := roomserver.NewInternalAPI( - base, keyRing, - ) + rsImpl := roomserver.NewInternalAPI(base) // call functions directly on the impl unless running in HTTP mode rsAPI := rsImpl if base.UseHTTPAPIs { @@ -101,16 +89,18 @@ func main() { } } - fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, false, + fsAPI := federationapi.NewInternalAPI( + base, federation, rsAPI, base.Caches, false, ) if base.UseHTTPAPIs { - federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) - fsAPI = base.FederationSenderHTTPClient() + federationapi.AddInternalRoutes(base.InternalAPIMux, fsAPI) + fsAPI = base.FederationAPIHTTPClient() } + keyRing := fsAPI.KeyRing() + // The underlying roomserver implementation needs to be able to call the fedsender. // This is different to rsAPI which can be the http client which doesn't need this dependency - rsImpl.SetFederationSenderAPI(fsAPI) + rsImpl.SetFederationAPI(fsAPI) keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) @@ -148,13 +138,12 @@ func main() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, - RoomserverAPI: rsAPI, - ServerKeyAPI: skAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, } monolith.AddAllPublicRoutes( base.ProcessContext, @@ -184,9 +173,9 @@ func main() { if *certFile != "" && *keyFile != "" { go func() { base.SetupAndServeHTTP( - setup.NoListener, // internal API - httpsAddr, // external API - certFile, keyFile, // TLS settings + basepkg.NoListener, // internal API + httpsAddr, // external API + certFile, keyFile, // TLS settings ) }() } diff --git a/cmd/dendrite-polylith-multi/main.go b/cmd/dendrite-polylith-multi/main.go index c2208ca2..edfe6cdb 100644 --- a/cmd/dendrite-polylith-multi/main.go +++ b/cmd/dendrite-polylith-multi/main.go @@ -21,13 +21,14 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-polylith-multi/personalities" "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/sirupsen/logrus" _ "github.com/mattn/go-sqlite3" ) -type entrypoint func(base *setup.BaseDendrite, cfg *config.Dendrite) +type entrypoint func(base *base.BaseDendrite, cfg *config.Dendrite) func main() { cfg := setup.ParseFlags(true) @@ -40,17 +41,15 @@ func main() { } components := map[string]entrypoint{ - "appservice": personalities.Appservice, - "clientapi": personalities.ClientAPI, - "eduserver": personalities.EDUServer, - "federationapi": personalities.FederationAPI, - "federationsender": personalities.FederationSender, - "keyserver": personalities.KeyServer, - "mediaapi": personalities.MediaAPI, - "roomserver": personalities.RoomServer, - "signingkeyserver": personalities.SigningKeyServer, - "syncapi": personalities.SyncAPI, - "userapi": personalities.UserAPI, + "appservice": personalities.Appservice, + "clientapi": personalities.ClientAPI, + "eduserver": personalities.EDUServer, + "federationapi": personalities.FederationAPI, + "keyserver": personalities.KeyServer, + "mediaapi": personalities.MediaAPI, + "roomserver": personalities.RoomServer, + "syncapi": personalities.SyncAPI, + "userapi": personalities.UserAPI, } start, ok := components[component] @@ -73,8 +72,8 @@ func main() { logrus.Infof("Starting %q component", component) - base := setup.NewBaseDendrite(cfg, component, false) // TODO - defer base.Close() // nolint: errcheck + base := base.NewBaseDendrite(cfg, component) // TODO + defer base.Close() // nolint: errcheck go start(base, cfg) base.WaitForShutdown() diff --git a/cmd/dendrite-polylith-multi/personalities/appservice.go b/cmd/dendrite-polylith-multi/personalities/appservice.go index d269b15d..4f74434a 100644 --- a/cmd/dendrite-polylith-multi/personalities/appservice.go +++ b/cmd/dendrite-polylith-multi/personalities/appservice.go @@ -16,11 +16,12 @@ package personalities import ( "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func Appservice(base *setup.BaseDendrite, cfg *config.Dendrite) { +func Appservice(base *base.BaseDendrite, cfg *config.Dendrite) { userAPI := base.UserAPIClient() rsAPI := base.RoomserverHTTPClient() @@ -29,7 +30,7 @@ func Appservice(base *setup.BaseDendrite, cfg *config.Dendrite) { base.SetupAndServeHTTP( base.Cfg.AppServiceAPI.InternalAPI.Listen, // internal listener - setup.NoListener, // external listener + basepkg.NoListener, // external listener nil, nil, ) } diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go index 5e0c4354..bd9f7a10 100644 --- a/cmd/dendrite-polylith-multi/personalities/clientapi.go +++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go @@ -17,17 +17,17 @@ package personalities import ( "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/internal/transactions" - "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func ClientAPI(base *setup.BaseDendrite, cfg *config.Dendrite) { +func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { accountDB := base.CreateAccountsDB() federation := base.CreateFederationClient() asQuery := base.AppserviceHTTPClient() rsAPI := base.RoomserverHTTPClient() - fsAPI := base.FederationSenderHTTPClient() + fsAPI := base.FederationAPIHTTPClient() eduInputAPI := base.EDUServerClient() userAPI := base.UserAPIClient() keyAPI := base.KeyServerHTTPClient() diff --git a/cmd/dendrite-polylith-multi/personalities/eduserver.go b/cmd/dendrite-polylith-multi/personalities/eduserver.go index 55b986e8..8719facb 100644 --- a/cmd/dendrite-polylith-multi/personalities/eduserver.go +++ b/cmd/dendrite-polylith-multi/personalities/eduserver.go @@ -17,17 +17,17 @@ package personalities import ( "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func EDUServer(base *setup.BaseDendrite, cfg *config.Dendrite) { +func EDUServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) { intAPI := eduserver.NewInternalAPI(base, cache.New(), base.UserAPIClient()) eduserver.AddInternalRoutes(base.InternalAPIMux, intAPI) base.SetupAndServeHTTP( base.Cfg.EDUServer.InternalAPI.Listen, // internal listener - setup.NoListener, // external listener + basepkg.NoListener, // external listener nil, nil, ) } diff --git a/cmd/dendrite-polylith-multi/personalities/federationapi.go b/cmd/dendrite-polylith-multi/personalities/federationapi.go index 5488fbf3..9b59cf45 100644 --- a/cmd/dendrite-polylith-multi/personalities/federationapi.go +++ b/cmd/dendrite-polylith-multi/personalities/federationapi.go @@ -16,16 +16,15 @@ package personalities import ( "github.com/matrix-org/dendrite/federationapi" - "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func FederationAPI(base *setup.BaseDendrite, cfg *config.Dendrite) { +func FederationAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { userAPI := base.UserAPIClient() federation := base.CreateFederationClient() - serverKeyAPI := base.SigningKeyServerHTTPClient() - keyRing := serverKeyAPI.KeyRing() - fsAPI := base.FederationSenderHTTPClient() + fsAPI := base.FederationAPIHTTPClient() + keyRing := fsAPI.KeyRing() rsAPI := base.RoomserverHTTPClient() keyAPI := base.KeyServerHTTPClient() diff --git a/cmd/dendrite-polylith-multi/personalities/federationsender.go b/cmd/dendrite-polylith-multi/personalities/federationsender.go deleted file mode 100644 index ca9a8add..00000000 --- a/cmd/dendrite-polylith-multi/personalities/federationsender.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package personalities - -import ( - "github.com/matrix-org/dendrite/federationsender" - "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/config" -) - -func FederationSender(base *setup.BaseDendrite, cfg *config.Dendrite) { - federation := base.CreateFederationClient() - - serverKeyAPI := base.SigningKeyServerHTTPClient() - keyRing := serverKeyAPI.KeyRing() - - rsAPI := base.RoomserverHTTPClient() - fsAPI := federationsender.NewInternalAPI( - base, federation, rsAPI, keyRing, false, - ) - federationsender.AddInternalRoutes(base.InternalAPIMux, fsAPI) - - base.SetupAndServeHTTP( - base.Cfg.FederationSender.InternalAPI.Listen, // internal listener - setup.NoListener, // external listener - nil, nil, - ) -} diff --git a/cmd/dendrite-polylith-multi/personalities/keyserver.go b/cmd/dendrite-polylith-multi/personalities/keyserver.go index 8a99d779..f8aa57b8 100644 --- a/cmd/dendrite-polylith-multi/personalities/keyserver.go +++ b/cmd/dendrite-polylith-multi/personalities/keyserver.go @@ -16,12 +16,12 @@ package personalities import ( "github.com/matrix-org/dendrite/keyserver" - "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func KeyServer(base *setup.BaseDendrite, cfg *config.Dendrite) { - fsAPI := base.FederationSenderHTTPClient() +func KeyServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) { + fsAPI := base.FederationAPIHTTPClient() intAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI) intAPI.SetUserAPI(base.UserAPIClient()) @@ -29,7 +29,7 @@ func KeyServer(base *setup.BaseDendrite, cfg *config.Dendrite) { base.SetupAndServeHTTP( base.Cfg.KeyServer.InternalAPI.Listen, // internal listener - setup.NoListener, // external listener + basepkg.NoListener, // external listener nil, nil, ) } diff --git a/cmd/dendrite-polylith-multi/personalities/mediaapi.go b/cmd/dendrite-polylith-multi/personalities/mediaapi.go index cf3e6882..00a2d56a 100644 --- a/cmd/dendrite-polylith-multi/personalities/mediaapi.go +++ b/cmd/dendrite-polylith-multi/personalities/mediaapi.go @@ -16,11 +16,11 @@ package personalities import ( "github.com/matrix-org/dendrite/mediaapi" - "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func MediaAPI(base *setup.BaseDendrite, cfg *config.Dendrite) { +func MediaAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { userAPI := base.UserAPIClient() client := base.CreateClient() diff --git a/cmd/dendrite-polylith-multi/personalities/roomserver.go b/cmd/dendrite-polylith-multi/personalities/roomserver.go index 72f0f6d1..23514dbe 100644 --- a/cmd/dendrite-polylith-multi/personalities/roomserver.go +++ b/cmd/dendrite-polylith-multi/personalities/roomserver.go @@ -16,24 +16,21 @@ package personalities import ( "github.com/matrix-org/dendrite/roomserver" - "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" ) -func RoomServer(base *setup.BaseDendrite, cfg *config.Dendrite) { - serverKeyAPI := base.SigningKeyServerHTTPClient() - keyRing := serverKeyAPI.KeyRing() - +func RoomServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) { asAPI := base.AppserviceHTTPClient() - fsAPI := base.FederationSenderHTTPClient() - rsAPI := roomserver.NewInternalAPI(base, keyRing) - rsAPI.SetFederationSenderAPI(fsAPI) + fsAPI := base.FederationAPIHTTPClient() + rsAPI := roomserver.NewInternalAPI(base) + rsAPI.SetFederationAPI(fsAPI) rsAPI.SetAppserviceAPI(asAPI) roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI) base.SetupAndServeHTTP( base.Cfg.RoomServer.InternalAPI.Listen, // internal listener - setup.NoListener, // external listener + basepkg.NoListener, // external listener nil, nil, ) } diff --git a/cmd/dendrite-polylith-multi/personalities/signingkeyserver.go b/cmd/dendrite-polylith-multi/personalities/signingkeyserver.go deleted file mode 100644 index 0a7fc502..00000000 --- a/cmd/dendrite-polylith-multi/personalities/signingkeyserver.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package personalities - -import ( - "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/signingkeyserver" -) - -func SigningKeyServer(base *setup.BaseDendrite, cfg *config.Dendrite) { - federation := base.CreateFederationClient() - - intAPI := signingkeyserver.NewInternalAPI(&base.Cfg.SigningKeyServer, federation, base.Caches) - signingkeyserver.AddInternalRoutes(base.InternalAPIMux, intAPI, base.Caches) - - base.SetupAndServeHTTP( - base.Cfg.SigningKeyServer.InternalAPI.Listen, - setup.NoListener, - nil, nil, - ) -} diff --git a/cmd/dendrite-polylith-multi/personalities/syncapi.go b/cmd/dendrite-polylith-multi/personalities/syncapi.go index b9b20229..6fee8419 100644 --- a/cmd/dendrite-polylith-multi/personalities/syncapi.go +++ b/cmd/dendrite-polylith-multi/personalities/syncapi.go @@ -15,12 +15,12 @@ package personalities import ( - "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi" ) -func SyncAPI(base *setup.BaseDendrite, cfg *config.Dendrite) { +func SyncAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { userAPI := base.UserAPIClient() federation := base.CreateFederationClient() diff --git a/cmd/dendrite-polylith-multi/personalities/userapi.go b/cmd/dendrite-polylith-multi/personalities/userapi.go index 6feb906d..f147cda1 100644 --- a/cmd/dendrite-polylith-multi/personalities/userapi.go +++ b/cmd/dendrite-polylith-multi/personalities/userapi.go @@ -15,12 +15,12 @@ package personalities import ( - "github.com/matrix-org/dendrite/setup" + basepkg "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi" ) -func UserAPI(base *setup.BaseDendrite, cfg *config.Dendrite) { +func UserAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { accountDB := base.CreateAccountsDB() userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, base.KeyServerHTTPClient()) @@ -29,7 +29,7 @@ func UserAPI(base *setup.BaseDendrite, cfg *config.Dendrite) { base.SetupAndServeHTTP( base.Cfg.UserAPI.InternalAPI.Listen, // internal listener - setup.NoListener, // external listener + basepkg.NoListener, // external listener nil, nil, ) } diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go index 46e65783..a430f82a 100644 --- a/cmd/dendritejs-pinecone/main.go +++ b/cmd/dendritejs-pinecone/main.go @@ -33,11 +33,12 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationsender" + "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/userapi" @@ -164,10 +165,9 @@ func startup() { cfg.UserAPI.AccountDatabase.ConnectionString = "file:/idb/dendritejs_account.db" cfg.AppServiceAPI.Database.ConnectionString = "file:/idb/dendritejs_appservice.db" cfg.UserAPI.DeviceDatabase.ConnectionString = "file:/idb/dendritejs_device.db" - cfg.FederationSender.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db" + cfg.FederationAPI.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db" cfg.MediaAPI.Database.ConnectionString = "file:/idb/dendritejs_mediaapi.db" cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db" - cfg.SigningKeyServer.Database.ConnectionString = "file:/idb/dendritejs_signingkeyserver.db" cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db" cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db" cfg.Global.Kafka.UseNaffka = true @@ -180,7 +180,7 @@ func startup() { if err := cfg.Derive(); err != nil { logrus.Fatalf("Failed to derive values from config: %s", err) } - base := setup.NewBaseDendrite(cfg, "Monolith", false) + base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() @@ -192,14 +192,15 @@ func startup() { serverKeyAPI := &signing.YggdrasilKeys{} keyRing := serverKeyAPI.KeyRing() - rsAPI := roomserver.NewInternalAPI(base, keyRing) + rsAPI := roomserver.NewInternalAPI(base) eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) asQuery := appservice.NewInternalAPI( base, userAPI, rsAPI, ) rsAPI.SetAppserviceAPI(asQuery) - fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, keyRing, true) - rsAPI.SetFederationSenderAPI(fedSenderAPI) + fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true) + rsAPI.SetFederationAPI(fedSenderAPI) + rsAPI.SetKeyring(keyRing) monolith := setup.Monolith{ Config: base.Cfg, @@ -208,12 +209,12 @@ func startup() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asQuery, - EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fedSenderAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, + AppserviceAPI: asQuery, + EDUInternalAPI: eduInputAPI, + FederationAPI: fedSenderAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation), } diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 10aadb6e..9bb1f24e 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -26,7 +26,7 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" - "github.com/matrix-org/dendrite/federationsender" + "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" @@ -168,10 +168,9 @@ func main() { cfg.UserAPI.AccountDatabase.ConnectionString = "file:/idb/dendritejs_account.db" cfg.AppServiceAPI.Database.ConnectionString = "file:/idb/dendritejs_appservice.db" cfg.UserAPI.DeviceDatabase.ConnectionString = "file:/idb/dendritejs_device.db" - cfg.FederationSender.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db" + cfg.FederationAPI.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db" cfg.MediaAPI.Database.ConnectionString = "file:/idb/dendritejs_mediaapi.db" cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db" - cfg.SigningKeyServer.Database.ConnectionString = "file:/idb/dendritejs_signingkeyserver.db" cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db" cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db" cfg.Global.Kafka.UseNaffka = true @@ -188,7 +187,7 @@ func main() { if err := cfg.Derive(); err != nil { logrus.Fatalf("Failed to derive values from config: %s", err) } - base := setup.NewBaseDendrite(cfg, "Monolith", false) + base := setup.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck accountDB := base.CreateAccountsDB() @@ -205,14 +204,15 @@ func main() { KeyDatabase: fetcher, } - rsAPI := roomserver.NewInternalAPI(base, keyRing) + rsAPI := roomserver.NewInternalAPI(base) eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI) asQuery := appservice.NewInternalAPI( base, userAPI, rsAPI, ) rsAPI.SetAppserviceAPI(asQuery) - fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing, true) - rsAPI.SetFederationSenderAPI(fedSenderAPI) + fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true) + rsAPI.SetFederationAPI(fedSenderAPI) + rsAPI.SetKeyring(keyRing) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) monolith := setup.Monolith{ diff --git a/cmd/dendritejs/publicrooms.go b/cmd/dendritejs/publicrooms.go index 19afc5bc..2e3339a4 100644 --- a/cmd/dendritejs/publicrooms.go +++ b/cmd/dendritejs/publicrooms.go @@ -22,7 +22,7 @@ import ( "sync" "time" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi/api" go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -31,12 +31,12 @@ import ( type libp2pPublicRoomsProvider struct { node *go_http_js_libp2p.P2pLocalNode providers []go_http_js_libp2p.PeerInfo - fedSender api.FederationSenderInternalAPI + fedSender api.FederationInternalAPI fedClient *gomatrixserverlib.FederationClient } func NewLibP2PPublicRoomsProvider( - node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient, + node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationInternalAPI, fedClient *gomatrixserverlib.FederationClient, ) *libp2pPublicRoomsProvider { p := &libp2pPublicRoomsProvider{ node: node, diff --git a/cmd/generate-config/main.go b/cmd/generate-config/main.go index e300a1ca..a0407b73 100644 --- a/cmd/generate-config/main.go +++ b/cmd/generate-config/main.go @@ -24,12 +24,11 @@ func main() { if *dbURI != "" { cfg.Global.Kafka.Database.ConnectionString = config.DataSource(*dbURI) cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(*dbURI) - cfg.FederationSender.Database.ConnectionString = config.DataSource(*dbURI) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(*dbURI) cfg.KeyServer.Database.ConnectionString = config.DataSource(*dbURI) cfg.MSCs.Database.ConnectionString = config.DataSource(*dbURI) cfg.MediaAPI.Database.ConnectionString = config.DataSource(*dbURI) cfg.RoomServer.Database.ConnectionString = config.DataSource(*dbURI) - cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(*dbURI) cfg.SyncAPI.Database.ConnectionString = config.DataSource(*dbURI) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(*dbURI) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(*dbURI) @@ -47,7 +46,7 @@ func main() { }, }, } - cfg.SigningKeyServer.KeyPerspectives = config.KeyPerspectives{ + cfg.FederationAPI.KeyPerspectives = config.KeyPerspectives{ { ServerName: "matrix.org", Keys: []config.KeyPerspectiveTrustKey{ @@ -83,11 +82,11 @@ func main() { if *defaultsForCI { cfg.AppServiceAPI.DisableTLSValidation = true cfg.ClientAPI.RateLimiting.Enabled = false - cfg.FederationSender.DisableTLSValidation = true + cfg.FederationAPI.DisableTLSValidation = true + // don't hit matrix.org when running tests!!! + cfg.FederationAPI.KeyPerspectives = config.KeyPerspectives{} cfg.MSCs.MSCs = []string{"msc2836", "msc2946", "msc2444", "msc2753"} cfg.Logging[0].Level = "trace" - // don't hit matrix.org when running tests!!! - cfg.SigningKeyServer.KeyPerspectives = config.KeyPerspectives{} cfg.UserAPI.BCryptCost = bcrypt.MinCost } diff --git a/cmd/goose/main.go b/cmd/goose/main.go index 83c97a72..8ed5cbd9 100644 --- a/cmd/goose/main.go +++ b/cmd/goose/main.go @@ -20,7 +20,7 @@ import ( const ( AppService = "appservice" - FederationSender = "federationsender" + FederationSender = "federationapi" KeyServer = "keyserver" MediaAPI = "mediaapi" RoomServer = "roomserver" diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 5f2a9de2..1204bd54 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -191,6 +191,11 @@ federation_api: connect: http://localhost:7772 external_api: listen: http://[::]:8072 + database: + connection_string: file:federationapi.db + max_open_conns: 10 + max_idle_conns: 2 + conn_max_lifetime: -1 # List of paths to X.509 certificates to be used by the external federation listeners. # These certificates will be used to calculate the TLS fingerprints and other servers @@ -198,17 +203,6 @@ federation_api: # format. federation_certificates: [] -# Configuration for the Federation Sender. -federation_sender: - internal_api: - listen: http://localhost:7775 - connect: http://localhost:7775 - database: - connection_string: file:federationsender.db - max_open_conns: 10 - max_idle_conns: 2 - conn_max_lifetime: -1 - # How many times we will try to resend a failed transaction to a specific server. The # backoff is 2**x seconds, so 1 = 2 seconds, 2 = 4 seconds, 3 = 8 seconds etc. send_max_retries: 16 @@ -224,6 +218,22 @@ federation_sender: host: localhost port: 8080 + # Perspective keyservers to use as a backup when direct key fetches fail. This may + # be required to satisfy key requests for servers that are no longer online when + # joining some rooms. + key_perspectives: + - server_name: matrix.org + keys: + - key_id: ed25519:auto + public_key: Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw + - key_id: ed25519:a_RXGa + public_key: l8Hft5qXKn1vfHrg3p4+W8gELQVo8N13JkluMfmn2sQ + + # This option will control whether Dendrite will prefer to look up keys directly + # or whether it should try perspective servers first, using direct fetches as a + # last resort. + prefer_direct_fetch: false + # Configuration for the Key Server (for end-to-end encryption). key_server: internal_api: @@ -298,33 +308,6 @@ room_server: max_idle_conns: 2 conn_max_lifetime: -1 -# Configuration for the Signing Key Server (for server signing keys). -signing_key_server: - internal_api: - listen: http://localhost:7780 - connect: http://localhost:7780 - database: - connection_string: file:signingkeyserver.db - max_open_conns: 10 - max_idle_conns: 2 - conn_max_lifetime: -1 - - # Perspective keyservers to use as a backup when direct key fetches fail. This may - # be required to satisfy key requests for servers that are no longer online when - # joining some rooms. - key_perspectives: - - server_name: matrix.org - keys: - - key_id: ed25519:auto - public_key: Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw - - key_id: ed25519:a_RXGa - public_key: l8Hft5qXKn1vfHrg3p4+W8gELQVo8N13JkluMfmn2sQ - - # This option will control whether Dendrite will prefer to look up keys directly - # or whether it should try perspective servers first, using direct fetches as a - # last resort. - prefer_direct_fetch: false - # Configuration for the Sync API. sync_api: internal_api: diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go index 7875e27f..97831f2b 100644 --- a/eduserver/eduserver.go +++ b/eduserver/eduserver.go @@ -22,7 +22,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/eduserver/input" "github.com/matrix-org/dendrite/eduserver/inthttp" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/kafka" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -37,7 +37,7 @@ func AddInternalRoutes(internalMux *mux.Router, inputAPI api.EDUServerInputAPI) // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - base *setup.BaseDendrite, + base *base.BaseDendrite, eduCache *cache.EDUCache, userAPI userapi.UserInternalAPI, ) api.EDUServerInputAPI { diff --git a/federationsender/api/api.go b/federationapi/api/api.go index 82cdf9d8..5d4eb884 100644 --- a/federationsender/api/api.go +++ b/federationapi/api/api.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" ) @@ -36,9 +36,12 @@ func (e *FederationClientError) Error() string { return fmt.Sprintf("%s - (retry_after=%s, blacklisted=%v)", e.Err, e.RetryAfter.String(), e.Blacklisted) } -// FederationSenderInternalAPI is used to query information from the federation sender. -type FederationSenderInternalAPI interface { +// FederationInternalAPI is used to query information from the federation sender. +type FederationInternalAPI interface { FederationClient + gomatrixserverlib.KeyDatabase + + KeyRing() *gomatrixserverlib.KeyRing QueryServerKeys(ctx context.Context, request *QueryServerKeysRequest, response *QueryServerKeysResponse) error @@ -114,6 +117,14 @@ type QueryServerKeysResponse struct { ServerKeys []gomatrixserverlib.ServerKeys } +type QueryPublicKeysRequest struct { + Requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp `json:"requests"` +} + +type QueryPublicKeysResponse struct { + Results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"results"` +} + type PerformDirectoryLookupRequest struct { RoomAlias string `json:"room_alias"` ServerName gomatrixserverlib.ServerName `json:"server_name"` @@ -188,3 +199,10 @@ type PerformBroadcastEDURequest struct { type PerformBroadcastEDUResponse struct { } + +type InputPublicKeysRequest struct { + Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"` +} + +type InputPublicKeysResponse struct { +} diff --git a/federationsender/consumers/eduserver.go b/federationapi/consumers/eduserver.go index 9a1ec1e2..56ec9eaf 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationapi/consumers/eduserver.go @@ -21,8 +21,8 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/federationsender/queue" - "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" @@ -46,7 +46,7 @@ type OutputEDUConsumer struct { // NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers. func NewOutputEDUConsumer( process *process.ProcessContext, - cfg *config.FederationSender, + cfg *config.FederationAPI, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, store storage.Database, diff --git a/federationsender/consumers/keychange.go b/federationapi/consumers/keychange.go index d5dc595c..a8ae0894 100644 --- a/federationsender/consumers/keychange.go +++ b/federationapi/consumers/keychange.go @@ -21,8 +21,8 @@ import ( "github.com/Shopify/sarama" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/dendrite/federationsender/queue" - "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -53,7 +53,7 @@ func NewKeyChangeConsumer( c := &KeyChangeConsumer{ consumer: &internal.ContinualConsumer{ Process: process, - ComponentName: "federationsender/keychange", + ComponentName: "federationapi/keychange", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)), Consumer: kafkaConsumer, PartitionStore: store, diff --git a/federationsender/consumers/roomserver.go b/federationapi/consumers/roomserver.go index f9c4a5c2..20b1bacb 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationapi/consumers/roomserver.go @@ -20,9 +20,9 @@ import ( "fmt" "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/federationsender/queue" - "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" @@ -33,7 +33,7 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { - cfg *config.FederationSender + cfg *config.FederationAPI rsAPI api.RoomserverInternalAPI rsConsumer *internal.ContinualConsumer db storage.Database @@ -43,7 +43,7 @@ type OutputRoomEventConsumer struct { // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( process *process.ProcessContext, - cfg *config.FederationSender, + cfg *config.FederationAPI, kafkaConsumer sarama.Consumer, queues *queue.OutgoingQueues, store storage.Database, @@ -51,7 +51,7 @@ func NewOutputRoomEventConsumer( ) *OutputRoomEventConsumer { consumer := internal.ContinualConsumer{ Process: process, - ComponentName: "federationsender/roomserver", + ComponentName: "federationapi/roomserver", Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), Consumer: kafkaConsumer, PartitionStore: store, @@ -133,7 +133,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } // processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any) -// causing the federationsender to start sending messages to the peeking server +// causing the federationapi to start sending messages to the peeking server func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error { // FIXME: there's a race here - we should start /sending new peeked events diff --git a/federationsender/consumers/roomserver_test.go b/federationapi/consumers/roomserver_test.go index bb659b9c..bb659b9c 100644 --- a/federationsender/consumers/roomserver_test.go +++ b/federationapi/consumers/roomserver_test.go diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index c40d77a6..371b364c 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -17,17 +17,33 @@ package federationapi import ( "github.com/gorilla/mux" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/federationapi/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi/consumers" + "github.com/matrix-org/dendrite/federationapi/internal" + "github.com/matrix-org/dendrite/federationapi/inthttp" + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/internal/caching" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/kafka" userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/sirupsen/logrus" "github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/gomatrixserverlib" ) +// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions +// on the given input API. +func AddInternalRoutes(router *mux.Router, intAPI api.FederationInternalAPI) { + inthttp.AddRoutes(intAPI, router) +} + // AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component. func AddPublicRoutes( fedRouter, keyRouter, wellKnownRouter *mux.Router, @@ -36,7 +52,7 @@ func AddPublicRoutes( federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.JSONVerifier, rsAPI roomserverAPI.RoomserverInternalAPI, - federationSenderAPI federationSenderAPI.FederationSenderInternalAPI, + federationAPI federationAPI.FederationInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI, keyAPI keyserverAPI.KeyInternalAPI, mscCfg *config.MSCs, @@ -44,8 +60,70 @@ func AddPublicRoutes( ) { routing.Setup( fedRouter, keyRouter, wellKnownRouter, cfg, rsAPI, - eduAPI, federationSenderAPI, keyRing, + eduAPI, federationAPI, keyRing, federation, userAPI, keyAPI, mscCfg, servers, ) } + +// NewInternalAPI returns a concerete implementation of the internal API. Callers +// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. +func NewInternalAPI( + base *base.BaseDendrite, + federation *gomatrixserverlib.FederationClient, + rsAPI roomserverAPI.RoomserverInternalAPI, + caches *caching.Caches, + resetBlacklist bool, +) api.FederationInternalAPI { + cfg := &base.Cfg.FederationAPI + + federationDB, err := storage.NewDatabase(&cfg.Database, base.Caches) + if err != nil { + logrus.WithError(err).Panic("failed to connect to federation sender db") + } + + if resetBlacklist { + _ = federationDB.RemoveAllServersFromBlacklist() + } + + stats := &statistics.Statistics{ + DB: federationDB, + FailuresUntilBlacklist: cfg.FederationMaxRetries, + } + + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + + queues := queue.NewOutgoingQueues( + federationDB, base.ProcessContext, + cfg.Matrix.DisableFederation, + cfg.Matrix.ServerName, federation, rsAPI, stats, + &queue.SigningInfo{ + KeyID: cfg.Matrix.KeyID, + PrivateKey: cfg.Matrix.PrivateKey, + ServerName: cfg.Matrix.ServerName, + }, + ) + + rsConsumer := consumers.NewOutputRoomEventConsumer( + base.ProcessContext, cfg, consumer, queues, + federationDB, rsAPI, + ) + if err = rsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start room server consumer") + } + + tsConsumer := consumers.NewOutputEDUConsumer( + base.ProcessContext, cfg, consumer, queues, federationDB, + ) + if err := tsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start typing server consumer") + } + keyConsumer := consumers.NewKeyChangeConsumer( + base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationDB, rsAPI, + ) + if err := keyConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start key server consumer") + } + + return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues) +} diff --git a/signingkeyserver/serverkeyapi_test.go b/federationapi/federationapi_keys_test.go index bd6119aa..a0051e19 100644 --- a/signingkeyserver/serverkeyapi_test.go +++ b/federationapi/federationapi_keys_test.go @@ -1,4 +1,4 @@ -package signingkeyserver +package federationapi import ( "bytes" @@ -13,21 +13,21 @@ import ( "testing" "time" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/dendrite/internal/caching" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/signingkeyserver/api" "github.com/matrix-org/gomatrixserverlib" ) type server struct { name gomatrixserverlib.ServerName // server name validity time.Duration // key validity duration from now - config *config.SigningKeyServer // skeleton config, from TestMain - fedconfig *config.FederationAPI // + config *config.FederationAPI // skeleton config, from TestMain fedclient *gomatrixserverlib.FederationClient // uses MockRoundTripper cache *caching.Caches // server-specific cache - api api.SigningKeyServerAPI // server-specific server key API + api api.FederationInternalAPI // server-specific server key API } func (s *server) renew() { @@ -74,11 +74,11 @@ func TestMain(m *testing.M) { cfg.Defaults() cfg.Global.ServerName = gomatrixserverlib.ServerName(s.name) cfg.Global.PrivateKey = testPriv + cfg.Global.Kafka.UseNaffka = true cfg.Global.KeyID = serverKeyID cfg.Global.KeyValidityPeriod = s.validity - cfg.SigningKeyServer.Database.ConnectionString = config.DataSource("file::memory:") - s.config = &cfg.SigningKeyServer - s.fedconfig = &cfg.FederationAPI + cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:") + s.config = &cfg.FederationAPI // Create a transport which redirects federation requests to // the mock round tripper. Since we're not *really* listening for @@ -93,7 +93,8 @@ func TestMain(m *testing.M) { ) // Finally, build the server key APIs. - s.api = NewInternalAPI(s.config, s.fedclient, s.cache) + sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics) + s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, true) } // Now that we have built our server key APIs, start the @@ -119,7 +120,7 @@ func (m *MockRoundTripper) RoundTrip(req *http.Request) (res *http.Response, err } // Get the keys and JSON-ify them. - keys := routing.LocalKeys(s.fedconfig) + keys := routing.LocalKeys(s.config) body, err := json.MarshalIndent(keys.JSON, "", " ") if err != nil { return nil, err diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go index cb4d8103..0c5ee740 100644 --- a/federationapi/federationapi_test.go +++ b/federationapi/federationapi_test.go @@ -8,7 +8,7 @@ import ( "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/test" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" @@ -25,10 +25,10 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) { cfg.Global.PrivateKey = privKey cfg.Global.Kafka.UseNaffka = true cfg.Global.Kafka.Database.ConnectionString = config.DataSource("file::memory:") - cfg.FederationSender.Database.ConnectionString = config.DataSource("file::memory:") - base := setup.NewBaseDendrite(cfg, "Monolith", false) + cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:") + base := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics) keyRing := &test.NopJSONVerifier{} - fsAPI := base.FederationSenderHTTPClient() + fsAPI := base.FederationAPIHTTPClient() // TODO: This is pretty fragile, as if anything calls anything on these nils this test will break. // Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing. federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil) diff --git a/federationsender/internal/api.go b/federationapi/internal/api.go index 11032eda..73d27315 100644 --- a/federationsender/internal/api.go +++ b/federationapi/internal/api.go @@ -2,23 +2,28 @@ package internal import ( "context" + "crypto/ed25519" + "encoding/base64" "sync" "time" - "github.com/matrix-org/dendrite/federationsender/api" - "github.com/matrix-org/dendrite/federationsender/queue" - "github.com/matrix-org/dendrite/federationsender/statistics" - "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/queue" + "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/federationapi/storage/cache" + "github.com/matrix-org/dendrite/internal/caching" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" ) -// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI -type FederationSenderInternalAPI struct { +// FederationInternalAPI is an implementation of api.FederationInternalAPI +type FederationInternalAPI struct { db storage.Database - cfg *config.FederationSender + cfg *config.FederationAPI statistics *statistics.Statistics rsAPI roomserverAPI.RoomserverInternalAPI federation *gomatrixserverlib.FederationClient @@ -27,26 +32,79 @@ type FederationSenderInternalAPI struct { joins sync.Map // joins currently in progress } -func NewFederationSenderInternalAPI( - db storage.Database, cfg *config.FederationSender, +func NewFederationInternalAPI( + db storage.Database, cfg *config.FederationAPI, rsAPI roomserverAPI.RoomserverInternalAPI, federation *gomatrixserverlib.FederationClient, - keyRing *gomatrixserverlib.KeyRing, statistics *statistics.Statistics, + caches *caching.Caches, queues *queue.OutgoingQueues, -) *FederationSenderInternalAPI { - return &FederationSenderInternalAPI{ +) *FederationInternalAPI { + serverKeyDB, err := cache.NewKeyDatabase(db, caches) + if err != nil { + logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database") + } + + keyRing := &gomatrixserverlib.KeyRing{ + KeyFetchers: []gomatrixserverlib.KeyFetcher{}, + KeyDatabase: serverKeyDB, + } + + addDirectFetcher := func() { + keyRing.KeyFetchers = append( + keyRing.KeyFetchers, + &gomatrixserverlib.DirectKeyFetcher{ + Client: federation, + }, + ) + } + + if cfg.PreferDirectFetch { + addDirectFetcher() + } else { + defer addDirectFetcher() + } + + var b64e = base64.StdEncoding.WithPadding(base64.NoPadding) + for _, ps := range cfg.KeyPerspectives { + perspective := &gomatrixserverlib.PerspectiveKeyFetcher{ + PerspectiveServerName: ps.ServerName, + PerspectiveServerKeys: map[gomatrixserverlib.KeyID]ed25519.PublicKey{}, + Client: federation, + } + + for _, key := range ps.Keys { + rawkey, err := b64e.DecodeString(key.PublicKey) + if err != nil { + logrus.WithError(err).WithFields(logrus.Fields{ + "server_name": ps.ServerName, + "public_key": key.PublicKey, + }).Warn("Couldn't parse perspective key") + continue + } + perspective.PerspectiveServerKeys[key.KeyID] = rawkey + } + + keyRing.KeyFetchers = append(keyRing.KeyFetchers, perspective) + + logrus.WithFields(logrus.Fields{ + "server_name": ps.ServerName, + "num_public_keys": len(ps.Keys), + }).Info("Enabled perspective key fetcher") + } + + return &FederationInternalAPI{ db: db, cfg: cfg, rsAPI: rsAPI, - federation: federation, keyRing: keyRing, + federation: federation, statistics: statistics, queues: queues, } } -func (a *FederationSenderInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) { +func (a *FederationInternalAPI) isBlacklistedOrBackingOff(s gomatrixserverlib.ServerName) (*statistics.ServerStatistics, error) { stats := a.statistics.ForServer(s) until, blacklisted := stats.BackoffInfo() if blacklisted { @@ -81,7 +139,7 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti return } -func (a *FederationSenderInternalAPI) doRequest( +func (a *FederationInternalAPI) doRequest( s gomatrixserverlib.ServerName, request func() (interface{}, error), ) (interface{}, error) { stats, err := a.isBlacklistedOrBackingOff(s) @@ -106,7 +164,7 @@ func (a *FederationSenderInternalAPI) doRequest( return res, nil } -func (a *FederationSenderInternalAPI) GetUserDevices( +func (a *FederationInternalAPI) GetUserDevices( ctx context.Context, s gomatrixserverlib.ServerName, userID string, ) (gomatrixserverlib.RespUserDevices, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) @@ -120,7 +178,7 @@ func (a *FederationSenderInternalAPI) GetUserDevices( return ires.(gomatrixserverlib.RespUserDevices), nil } -func (a *FederationSenderInternalAPI) ClaimKeys( +func (a *FederationInternalAPI) ClaimKeys( ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, ) (gomatrixserverlib.RespClaimKeys, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) @@ -134,7 +192,7 @@ func (a *FederationSenderInternalAPI) ClaimKeys( return ires.(gomatrixserverlib.RespClaimKeys), nil } -func (a *FederationSenderInternalAPI) QueryKeys( +func (a *FederationInternalAPI) QueryKeys( ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, ) (gomatrixserverlib.RespQueryKeys, error) { ires, err := a.doRequest(s, func() (interface{}, error) { @@ -146,7 +204,7 @@ func (a *FederationSenderInternalAPI) QueryKeys( return ires.(gomatrixserverlib.RespQueryKeys), nil } -func (a *FederationSenderInternalAPI) Backfill( +func (a *FederationInternalAPI) Backfill( ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string, ) (res gomatrixserverlib.Transaction, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) @@ -160,7 +218,7 @@ func (a *FederationSenderInternalAPI) Backfill( return ires.(gomatrixserverlib.Transaction), nil } -func (a *FederationSenderInternalAPI) LookupState( +func (a *FederationInternalAPI) LookupState( ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion, ) (res gomatrixserverlib.RespState, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) @@ -174,7 +232,7 @@ func (a *FederationSenderInternalAPI) LookupState( return ires.(gomatrixserverlib.RespState), nil } -func (a *FederationSenderInternalAPI) LookupStateIDs( +func (a *FederationInternalAPI) LookupStateIDs( ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, ) (res gomatrixserverlib.RespStateIDs, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) @@ -188,7 +246,7 @@ func (a *FederationSenderInternalAPI) LookupStateIDs( return ires.(gomatrixserverlib.RespStateIDs), nil } -func (a *FederationSenderInternalAPI) GetEvent( +func (a *FederationInternalAPI) GetEvent( ctx context.Context, s gomatrixserverlib.ServerName, eventID string, ) (res gomatrixserverlib.Transaction, err error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) @@ -202,7 +260,7 @@ func (a *FederationSenderInternalAPI) GetEvent( return ires.(gomatrixserverlib.Transaction), nil } -func (a *FederationSenderInternalAPI) LookupServerKeys( +func (a *FederationInternalAPI) LookupServerKeys( ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, ) ([]gomatrixserverlib.ServerKeys, error) { ctx, cancel := context.WithTimeout(ctx, time.Minute) @@ -216,7 +274,7 @@ func (a *FederationSenderInternalAPI) LookupServerKeys( return ires.([]gomatrixserverlib.ServerKeys), nil } -func (a *FederationSenderInternalAPI) MSC2836EventRelationships( +func (a *FederationInternalAPI) MSC2836EventRelationships( ctx context.Context, s gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion, ) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) { @@ -231,7 +289,7 @@ func (a *FederationSenderInternalAPI) MSC2836EventRelationships( return ires.(gomatrixserverlib.MSC2836EventRelationshipsResponse), nil } -func (a *FederationSenderInternalAPI) MSC2946Spaces( +func (a *FederationInternalAPI) MSC2946Spaces( ctx context.Context, s gomatrixserverlib.ServerName, roomID string, r gomatrixserverlib.MSC2946SpacesRequest, ) (res gomatrixserverlib.MSC2946SpacesResponse, err error) { ctx, cancel := context.WithTimeout(ctx, time.Minute) diff --git a/signingkeyserver/internal/api.go b/federationapi/internal/keys.go index f9a04a74..2b7a8219 100644 --- a/signingkeyserver/internal/api.go +++ b/federationapi/internal/keys.go @@ -6,36 +6,18 @@ import ( "fmt" "time" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/signingkeyserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" ) -type ServerKeyAPI struct { - api.SigningKeyServerAPI - - ServerName gomatrixserverlib.ServerName - ServerPublicKey ed25519.PublicKey - ServerKeyID gomatrixserverlib.KeyID - ServerKeyValidity time.Duration - OldServerKeys []config.OldVerifyKeys - - OurKeyRing gomatrixserverlib.KeyRing - FedClient gomatrixserverlib.KeyClient -} - -func (s *ServerKeyAPI) KeyRing() *gomatrixserverlib.KeyRing { +func (s *FederationInternalAPI) KeyRing() *gomatrixserverlib.KeyRing { // Return a keyring that forces requests to be proxied through the // below functions. That way we can enforce things like validity // and keeping the cache up-to-date. - return &gomatrixserverlib.KeyRing{ - KeyDatabase: s, - KeyFetchers: []gomatrixserverlib.KeyFetcher{}, - } + return s.keyRing } -func (s *ServerKeyAPI) StoreKeys( +func (s *FederationInternalAPI) StoreKeys( _ context.Context, results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, ) error { @@ -44,10 +26,10 @@ func (s *ServerKeyAPI) StoreKeys( ctx := context.Background() // Store any keys that we were given in our database. - return s.OurKeyRing.KeyDatabase.StoreKeys(ctx, results) + return s.keyRing.KeyDatabase.StoreKeys(ctx, results) } -func (s *ServerKeyAPI) FetchKeys( +func (s *FederationInternalAPI) FetchKeys( _ context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, ) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { @@ -75,7 +57,7 @@ func (s *ServerKeyAPI) FetchKeys( // For any key requests that we still have outstanding, next try to // fetch them directly. We'll go through each of the key fetchers to // ask for the remaining keys - for _, fetcher := range s.OurKeyRing.KeyFetchers { + for _, fetcher := range s.keyRing.KeyFetchers { // If there are no more keys to look up then stop. if len(requests) == 0 { break @@ -105,22 +87,22 @@ func (s *ServerKeyAPI) FetchKeys( return results, nil } -func (s *ServerKeyAPI) FetcherName() string { - return fmt.Sprintf("ServerKeyAPI (wrapping %q)", s.OurKeyRing.KeyDatabase.FetcherName()) +func (s *FederationInternalAPI) FetcherName() string { + return fmt.Sprintf("FederationInternalAPI (wrapping %q)", s.keyRing.KeyDatabase.FetcherName()) } // handleLocalKeys handles cases where the key request contains // a request for our own server keys, either current or old. -func (s *ServerKeyAPI) handleLocalKeys( +func (s *FederationInternalAPI) handleLocalKeys( _ context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, ) { for req := range requests { - if req.ServerName != s.ServerName { + if req.ServerName != s.cfg.Matrix.ServerName { continue } - if req.KeyID == s.ServerKeyID { + if req.KeyID == s.cfg.Matrix.KeyID { // We found a key request that is supposed to be for our own // keys. Remove it from the request list so we don't hit the // database or the fetchers for it. @@ -129,15 +111,15 @@ func (s *ServerKeyAPI) handleLocalKeys( // Insert our own key into the response. results[req] = gomatrixserverlib.PublicKeyLookupResult{ VerifyKey: gomatrixserverlib.VerifyKey{ - Key: gomatrixserverlib.Base64Bytes(s.ServerPublicKey), + Key: gomatrixserverlib.Base64Bytes(s.cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey)), }, ExpiredTS: gomatrixserverlib.PublicKeyNotExpired, - ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(s.ServerKeyValidity)), + ValidUntilTS: gomatrixserverlib.AsTimestamp(time.Now().Add(s.cfg.Matrix.KeyValidityPeriod)), } } else { // The key request doesn't match our current key. Let's see // if it matches any of our old verify keys. - for _, oldVerifyKey := range s.OldServerKeys { + for _, oldVerifyKey := range s.cfg.Matrix.OldVerifyKeys { if req.KeyID == oldVerifyKey.KeyID { // We found a key request that is supposed to be an expired // key. @@ -162,14 +144,14 @@ func (s *ServerKeyAPI) handleLocalKeys( // handleDatabaseKeys handles cases where the key requests can be // satisfied from our local database/cache. -func (s *ServerKeyAPI) handleDatabaseKeys( +func (s *FederationInternalAPI) handleDatabaseKeys( ctx context.Context, now gomatrixserverlib.Timestamp, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, ) error { // Ask the database/cache for the keys. - dbResults, err := s.OurKeyRing.KeyDatabase.FetchKeys(ctx, requests) + dbResults, err := s.keyRing.KeyDatabase.FetchKeys(ctx, requests) if err != nil { return err } @@ -196,7 +178,7 @@ func (s *ServerKeyAPI) handleDatabaseKeys( // handleFetcherKeys handles cases where a fetcher can satisfy // the remaining requests. -func (s *ServerKeyAPI) handleFetcherKeys( +func (s *FederationInternalAPI) handleFetcherKeys( ctx context.Context, _ gomatrixserverlib.Timestamp, fetcher gomatrixserverlib.KeyFetcher, @@ -248,10 +230,10 @@ func (s *ServerKeyAPI) handleFetcherKeys( } // Store the keys from our store map. - if err = s.OurKeyRing.KeyDatabase.StoreKeys(context.Background(), storeResults); err != nil { + if err = s.keyRing.KeyDatabase.StoreKeys(context.Background(), storeResults); err != nil { logrus.WithError(err).WithFields(logrus.Fields{ "fetcher_name": fetcher.FetcherName(), - "database_name": s.OurKeyRing.KeyDatabase.FetcherName(), + "database_name": s.keyRing.KeyDatabase.FetcherName(), }).Errorf("Failed to store keys in the database") return fmt.Errorf("server key API failed to store retrieved keys: %w", err) } diff --git a/federationsender/internal/perform.go b/federationapi/internal/perform.go index 53fa974b..82d04c21 100644 --- a/federationsender/internal/perform.go +++ b/federationapi/internal/perform.go @@ -7,7 +7,7 @@ import ( "fmt" "time" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrix" @@ -16,8 +16,8 @@ import ( "github.com/sirupsen/logrus" ) -// PerformLeaveRequest implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformDirectoryLookup( +// PerformLeaveRequest implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformDirectoryLookup( ctx context.Context, request *api.PerformDirectoryLookupRequest, response *api.PerformDirectoryLookupResponse, @@ -42,8 +42,8 @@ type federatedJoin struct { RoomID string } -// PerformJoin implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformJoin( +// PerformJoin implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformJoin( ctx context.Context, request *api.PerformJoinRequest, response *api.PerformJoinResponse, @@ -132,7 +132,7 @@ func (r *FederationSenderInternalAPI) PerformJoin( ) } -func (r *FederationSenderInternalAPI) performJoinUsingServer( +func (r *FederationInternalAPI) performJoinUsingServer( ctx context.Context, roomID, userID string, content map[string]interface{}, @@ -263,8 +263,8 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer( return nil } -// PerformOutboundPeekRequest implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformOutboundPeek( +// PerformOutboundPeekRequest implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformOutboundPeek( ctx context.Context, request *api.PerformOutboundPeekRequest, response *api.PerformOutboundPeekResponse, @@ -344,7 +344,7 @@ func (r *FederationSenderInternalAPI) PerformOutboundPeek( return lastErr } -func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer( +func (r *FederationInternalAPI) performOutboundPeekUsingServer( ctx context.Context, roomID string, serverName gomatrixserverlib.ServerName, @@ -438,8 +438,8 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer( return nil } -// PerformLeaveRequest implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformLeave( +// PerformLeaveRequest implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformLeave( ctx context.Context, request *api.PerformLeaveRequest, response *api.PerformLeaveResponse, @@ -528,8 +528,8 @@ func (r *FederationSenderInternalAPI) PerformLeave( ) } -// PerformLeaveRequest implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformInvite( +// PerformLeaveRequest implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformInvite( ctx context.Context, request *api.PerformInviteRequest, response *api.PerformInviteResponse, @@ -565,8 +565,8 @@ func (r *FederationSenderInternalAPI) PerformInvite( return nil } -// PerformServersAlive implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformServersAlive( +// PerformServersAlive implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformServersAlive( ctx context.Context, request *api.PerformServersAliveRequest, response *api.PerformServersAliveResponse, @@ -579,8 +579,8 @@ func (r *FederationSenderInternalAPI) PerformServersAlive( return nil } -// PerformServersAlive implements api.FederationSenderInternalAPI -func (r *FederationSenderInternalAPI) PerformBroadcastEDU( +// PerformServersAlive implements api.FederationInternalAPI +func (r *FederationInternalAPI) PerformBroadcastEDU( ctx context.Context, request *api.PerformBroadcastEDURequest, response *api.PerformBroadcastEDUResponse, diff --git a/federationsender/internal/query.go b/federationapi/internal/query.go index af531f7d..bac81333 100644 --- a/federationsender/internal/query.go +++ b/federationapi/internal/query.go @@ -5,13 +5,13 @@ import ( "fmt" "time" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) -// QueryJoinedHostServerNamesInRoom implements api.FederationSenderInternalAPI -func (f *FederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom( +// QueryJoinedHostServerNamesInRoom implements api.FederationInternalAPI +func (f *FederationInternalAPI) QueryJoinedHostServerNamesInRoom( ctx context.Context, request *api.QueryJoinedHostServerNamesInRoomRequest, response *api.QueryJoinedHostServerNamesInRoomResponse, @@ -25,7 +25,7 @@ func (f *FederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom( return } -func (a *FederationSenderInternalAPI) fetchServerKeysDirectly(ctx context.Context, serverName gomatrixserverlib.ServerName) (*gomatrixserverlib.ServerKeys, error) { +func (a *FederationInternalAPI) fetchServerKeysDirectly(ctx context.Context, serverName gomatrixserverlib.ServerName) (*gomatrixserverlib.ServerKeys, error) { ctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() ires, err := a.doRequest(serverName, func() (interface{}, error) { @@ -38,7 +38,7 @@ func (a *FederationSenderInternalAPI) fetchServerKeysDirectly(ctx context.Contex return &sks, nil } -func (a *FederationSenderInternalAPI) fetchServerKeysFromCache( +func (a *FederationInternalAPI) fetchServerKeysFromCache( ctx context.Context, req *api.QueryServerKeysRequest, ) ([]gomatrixserverlib.ServerKeys, error) { var results []gomatrixserverlib.ServerKeys @@ -64,7 +64,7 @@ func (a *FederationSenderInternalAPI) fetchServerKeysFromCache( return results, nil } -func (a *FederationSenderInternalAPI) QueryServerKeys( +func (a *FederationInternalAPI) QueryServerKeys( ctx context.Context, req *api.QueryServerKeysRequest, res *api.QueryServerKeysResponse, ) error { // attempt to satisfy the entire request from the cache first diff --git a/federationsender/inthttp/client.go b/federationapi/inthttp/client.go index f08e610a..af6b801b 100644 --- a/federationsender/inthttp/client.go +++ b/federationapi/inthttp/client.go @@ -5,7 +5,8 @@ import ( "errors" "net/http" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" @@ -14,45 +15,49 @@ import ( // HTTP paths for the internal HTTP API const ( - FederationSenderQueryJoinedHostServerNamesInRoomPath = "/federationsender/queryJoinedHostServerNamesInRoom" - FederationSenderQueryServerKeysPath = "/federationsender/queryServerKeys" - - FederationSenderPerformDirectoryLookupRequestPath = "/federationsender/performDirectoryLookup" - FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest" - FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest" - FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest" - FederationSenderPerformOutboundPeekRequestPath = "/federationsender/performOutboundPeekRequest" - FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive" - FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU" - - FederationSenderGetUserDevicesPath = "/federationsender/client/getUserDevices" - FederationSenderClaimKeysPath = "/federationsender/client/claimKeys" - FederationSenderQueryKeysPath = "/federationsender/client/queryKeys" - FederationSenderBackfillPath = "/federationsender/client/backfill" - FederationSenderLookupStatePath = "/federationsender/client/lookupState" - FederationSenderLookupStateIDsPath = "/federationsender/client/lookupStateIDs" - FederationSenderGetEventPath = "/federationsender/client/getEvent" - FederationSenderLookupServerKeysPath = "/federationsender/client/lookupServerKeys" - FederationSenderEventRelationshipsPath = "/federationsender/client/msc2836eventRelationships" - FederationSenderSpacesSummaryPath = "/federationsender/client/msc2946spacesSummary" + FederationAPIQueryJoinedHostServerNamesInRoomPath = "/federationapi/queryJoinedHostServerNamesInRoom" + FederationAPIQueryServerKeysPath = "/federationapi/queryServerKeys" + + FederationAPIPerformDirectoryLookupRequestPath = "/federationapi/performDirectoryLookup" + FederationAPIPerformJoinRequestPath = "/federationapi/performJoinRequest" + FederationAPIPerformLeaveRequestPath = "/federationapi/performLeaveRequest" + FederationAPIPerformInviteRequestPath = "/federationapi/performInviteRequest" + FederationAPIPerformOutboundPeekRequestPath = "/federationapi/performOutboundPeekRequest" + FederationAPIPerformServersAlivePath = "/federationapi/performServersAlive" + FederationAPIPerformBroadcastEDUPath = "/federationapi/performBroadcastEDU" + + FederationAPIGetUserDevicesPath = "/federationapi/client/getUserDevices" + FederationAPIClaimKeysPath = "/federationapi/client/claimKeys" + FederationAPIQueryKeysPath = "/federationapi/client/queryKeys" + FederationAPIBackfillPath = "/federationapi/client/backfill" + FederationAPILookupStatePath = "/federationapi/client/lookupState" + FederationAPILookupStateIDsPath = "/federationapi/client/lookupStateIDs" + FederationAPIGetEventPath = "/federationapi/client/getEvent" + FederationAPILookupServerKeysPath = "/federationapi/client/lookupServerKeys" + FederationAPIEventRelationshipsPath = "/federationapi/client/msc2836eventRelationships" + FederationAPISpacesSummaryPath = "/federationapi/client/msc2946spacesSummary" + + FederationAPIInputPublicKeyPath = "/federationapi/inputPublicKey" + FederationAPIQueryPublicKeyPath = "/federationapi/queryPublicKey" ) -// NewFederationSenderClient creates a FederationSenderInternalAPI implemented by talking to a HTTP POST API. +// NewFederationAPIClient creates a FederationInternalAPI implemented by talking to a HTTP POST API. // If httpClient is nil an error is returned -func NewFederationSenderClient(federationSenderURL string, httpClient *http.Client) (api.FederationSenderInternalAPI, error) { +func NewFederationAPIClient(federationSenderURL string, httpClient *http.Client, cache caching.ServerKeyCache) (api.FederationInternalAPI, error) { if httpClient == nil { - return nil, errors.New("NewFederationSenderInternalAPIHTTP: httpClient is <nil>") + return nil, errors.New("NewFederationInternalAPIHTTP: httpClient is <nil>") } - return &httpFederationSenderInternalAPI{federationSenderURL, httpClient}, nil + return &httpFederationInternalAPI{federationSenderURL, httpClient, cache}, nil } -type httpFederationSenderInternalAPI struct { - federationSenderURL string - httpClient *http.Client +type httpFederationInternalAPI struct { + federationAPIURL string + httpClient *http.Client + cache caching.ServerKeyCache } // Handle an instruction to make_leave & send_leave with a remote server. -func (h *httpFederationSenderInternalAPI) PerformLeave( +func (h *httpFederationInternalAPI) PerformLeave( ctx context.Context, request *api.PerformLeaveRequest, response *api.PerformLeaveResponse, @@ -60,12 +65,12 @@ func (h *httpFederationSenderInternalAPI) PerformLeave( span, ctx := opentracing.StartSpanFromContext(ctx, "PerformLeaveRequest") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderPerformLeaveRequestPath + apiURL := h.federationAPIURL + FederationAPIPerformLeaveRequestPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } // Handle sending an invite to a remote server. -func (h *httpFederationSenderInternalAPI) PerformInvite( +func (h *httpFederationInternalAPI) PerformInvite( ctx context.Context, request *api.PerformInviteRequest, response *api.PerformInviteResponse, @@ -73,12 +78,12 @@ func (h *httpFederationSenderInternalAPI) PerformInvite( span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInviteRequest") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderPerformInviteRequestPath + apiURL := h.federationAPIURL + FederationAPIPerformInviteRequestPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } // Handle starting a peek on a remote server. -func (h *httpFederationSenderInternalAPI) PerformOutboundPeek( +func (h *httpFederationInternalAPI) PerformOutboundPeek( ctx context.Context, request *api.PerformOutboundPeekRequest, response *api.PerformOutboundPeekResponse, @@ -86,11 +91,11 @@ func (h *httpFederationSenderInternalAPI) PerformOutboundPeek( span, ctx := opentracing.StartSpanFromContext(ctx, "PerformOutboundPeekRequest") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderPerformOutboundPeekRequestPath + apiURL := h.federationAPIURL + FederationAPIPerformOutboundPeekRequestPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } -func (h *httpFederationSenderInternalAPI) PerformServersAlive( +func (h *httpFederationInternalAPI) PerformServersAlive( ctx context.Context, request *api.PerformServersAliveRequest, response *api.PerformServersAliveResponse, @@ -98,12 +103,12 @@ func (h *httpFederationSenderInternalAPI) PerformServersAlive( span, ctx := opentracing.StartSpanFromContext(ctx, "PerformServersAlive") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderPerformServersAlivePath + apiURL := h.federationAPIURL + FederationAPIPerformServersAlivePath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } -// QueryJoinedHostServerNamesInRoom implements FederationSenderInternalAPI -func (h *httpFederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom( +// QueryJoinedHostServerNamesInRoom implements FederationInternalAPI +func (h *httpFederationInternalAPI) QueryJoinedHostServerNamesInRoom( ctx context.Context, request *api.QueryJoinedHostServerNamesInRoomRequest, response *api.QueryJoinedHostServerNamesInRoomResponse, @@ -111,12 +116,12 @@ func (h *httpFederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom( span, ctx := opentracing.StartSpanFromContext(ctx, "QueryJoinedHostServerNamesInRoom") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderQueryJoinedHostServerNamesInRoomPath + apiURL := h.federationAPIURL + FederationAPIQueryJoinedHostServerNamesInRoomPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } // Handle an instruction to make_join & send_join with a remote server. -func (h *httpFederationSenderInternalAPI) PerformJoin( +func (h *httpFederationInternalAPI) PerformJoin( ctx context.Context, request *api.PerformJoinRequest, response *api.PerformJoinResponse, @@ -124,7 +129,7 @@ func (h *httpFederationSenderInternalAPI) PerformJoin( span, ctx := opentracing.StartSpanFromContext(ctx, "PerformJoinRequest") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderPerformJoinRequestPath + apiURL := h.federationAPIURL + FederationAPIPerformJoinRequestPath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) if err != nil { response.LastError = &gomatrix.HTTPError{ @@ -136,7 +141,7 @@ func (h *httpFederationSenderInternalAPI) PerformJoin( } // Handle an instruction to make_join & send_join with a remote server. -func (h *httpFederationSenderInternalAPI) PerformDirectoryLookup( +func (h *httpFederationInternalAPI) PerformDirectoryLookup( ctx context.Context, request *api.PerformDirectoryLookupRequest, response *api.PerformDirectoryLookupResponse, @@ -144,12 +149,12 @@ func (h *httpFederationSenderInternalAPI) PerformDirectoryLookup( span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDirectoryLookup") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderPerformDirectoryLookupRequestPath + apiURL := h.federationAPIURL + FederationAPIPerformDirectoryLookupRequestPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } // Handle an instruction to broadcast an EDU to all servers in rooms we are joined to. -func (h *httpFederationSenderInternalAPI) PerformBroadcastEDU( +func (h *httpFederationInternalAPI) PerformBroadcastEDU( ctx context.Context, request *api.PerformBroadcastEDURequest, response *api.PerformBroadcastEDUResponse, @@ -157,7 +162,7 @@ func (h *httpFederationSenderInternalAPI) PerformBroadcastEDU( span, ctx := opentracing.StartSpanFromContext(ctx, "PerformBroadcastEDU") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderPerformBroadcastEDUPath + apiURL := h.federationAPIURL + FederationAPIPerformBroadcastEDUPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) } @@ -168,7 +173,7 @@ type getUserDevices struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) GetUserDevices( +func (h *httpFederationInternalAPI) GetUserDevices( ctx context.Context, s gomatrixserverlib.ServerName, userID string, ) (gomatrixserverlib.RespUserDevices, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetUserDevices") @@ -180,7 +185,7 @@ func (h *httpFederationSenderInternalAPI) GetUserDevices( UserID: userID, } var response getUserDevices - apiURL := h.federationSenderURL + FederationSenderGetUserDevicesPath + apiURL := h.federationAPIURL + FederationAPIGetUserDevicesPath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return result, err @@ -198,7 +203,7 @@ type claimKeys struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) ClaimKeys( +func (h *httpFederationInternalAPI) ClaimKeys( ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string, ) (gomatrixserverlib.RespClaimKeys, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "ClaimKeys") @@ -210,7 +215,7 @@ func (h *httpFederationSenderInternalAPI) ClaimKeys( OneTimeKeys: oneTimeKeys, } var response claimKeys - apiURL := h.federationSenderURL + FederationSenderClaimKeysPath + apiURL := h.federationAPIURL + FederationAPIClaimKeysPath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return result, err @@ -228,7 +233,7 @@ type queryKeys struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) QueryKeys( +func (h *httpFederationInternalAPI) QueryKeys( ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string, ) (gomatrixserverlib.RespQueryKeys, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeys") @@ -240,7 +245,7 @@ func (h *httpFederationSenderInternalAPI) QueryKeys( Keys: keys, } var response queryKeys - apiURL := h.federationSenderURL + FederationSenderQueryKeysPath + apiURL := h.federationAPIURL + FederationAPIQueryKeysPath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return result, err @@ -260,7 +265,7 @@ type backfill struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) Backfill( +func (h *httpFederationInternalAPI) Backfill( ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string, ) (gomatrixserverlib.Transaction, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "Backfill") @@ -273,7 +278,7 @@ func (h *httpFederationSenderInternalAPI) Backfill( EventIDs: eventIDs, } var response backfill - apiURL := h.federationSenderURL + FederationSenderBackfillPath + apiURL := h.federationAPIURL + FederationAPIBackfillPath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return gomatrixserverlib.Transaction{}, err @@ -293,7 +298,7 @@ type lookupState struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) LookupState( +func (h *httpFederationInternalAPI) LookupState( ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion, ) (gomatrixserverlib.RespState, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "LookupState") @@ -306,7 +311,7 @@ func (h *httpFederationSenderInternalAPI) LookupState( RoomVersion: roomVersion, } var response lookupState - apiURL := h.federationSenderURL + FederationSenderLookupStatePath + apiURL := h.federationAPIURL + FederationAPILookupStatePath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return gomatrixserverlib.RespState{}, err @@ -325,7 +330,7 @@ type lookupStateIDs struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) LookupStateIDs( +func (h *httpFederationInternalAPI) LookupStateIDs( ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, ) (gomatrixserverlib.RespStateIDs, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "LookupStateIDs") @@ -337,7 +342,7 @@ func (h *httpFederationSenderInternalAPI) LookupStateIDs( EventID: eventID, } var response lookupStateIDs - apiURL := h.federationSenderURL + FederationSenderLookupStateIDsPath + apiURL := h.federationAPIURL + FederationAPILookupStateIDsPath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return gomatrixserverlib.RespStateIDs{}, err @@ -355,7 +360,7 @@ type getEvent struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) GetEvent( +func (h *httpFederationInternalAPI) GetEvent( ctx context.Context, s gomatrixserverlib.ServerName, eventID string, ) (gomatrixserverlib.Transaction, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "GetEvent") @@ -366,7 +371,7 @@ func (h *httpFederationSenderInternalAPI) GetEvent( EventID: eventID, } var response getEvent - apiURL := h.federationSenderURL + FederationSenderGetEventPath + apiURL := h.federationAPIURL + FederationAPIGetEventPath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return gomatrixserverlib.Transaction{}, err @@ -377,13 +382,13 @@ func (h *httpFederationSenderInternalAPI) GetEvent( return *response.Res, nil } -func (h *httpFederationSenderInternalAPI) QueryServerKeys( +func (h *httpFederationInternalAPI) QueryServerKeys( ctx context.Context, req *api.QueryServerKeysRequest, res *api.QueryServerKeysResponse, ) error { span, ctx := opentracing.StartSpanFromContext(ctx, "QueryServerKeys") defer span.Finish() - apiURL := h.federationSenderURL + FederationSenderQueryServerKeysPath + apiURL := h.federationAPIURL + FederationAPIQueryServerKeysPath return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res) } @@ -394,7 +399,7 @@ type lookupServerKeys struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) LookupServerKeys( +func (h *httpFederationInternalAPI) LookupServerKeys( ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, ) ([]gomatrixserverlib.ServerKeys, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "LookupServerKeys") @@ -405,7 +410,7 @@ func (h *httpFederationSenderInternalAPI) LookupServerKeys( KeyRequests: keyRequests, } var response lookupServerKeys - apiURL := h.federationSenderURL + FederationSenderLookupServerKeysPath + apiURL := h.federationAPIURL + FederationAPILookupServerKeysPath err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return []gomatrixserverlib.ServerKeys{}, err @@ -424,7 +429,7 @@ type eventRelationships struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) MSC2836EventRelationships( +func (h *httpFederationInternalAPI) MSC2836EventRelationships( ctx context.Context, s gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest, roomVersion gomatrixserverlib.RoomVersion, ) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) { @@ -437,7 +442,7 @@ func (h *httpFederationSenderInternalAPI) MSC2836EventRelationships( RoomVer: roomVersion, } var response eventRelationships - apiURL := h.federationSenderURL + FederationSenderEventRelationshipsPath + apiURL := h.federationAPIURL + FederationAPIEventRelationshipsPath err = httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return res, err @@ -456,7 +461,7 @@ type spacesReq struct { Err *api.FederationClientError } -func (h *httpFederationSenderInternalAPI) MSC2946Spaces( +func (h *httpFederationInternalAPI) MSC2946Spaces( ctx context.Context, dst gomatrixserverlib.ServerName, roomID string, r gomatrixserverlib.MSC2946SpacesRequest, ) (res gomatrixserverlib.MSC2946SpacesResponse, err error) { span, ctx := opentracing.StartSpanFromContext(ctx, "MSC2946Spaces") @@ -468,7 +473,7 @@ func (h *httpFederationSenderInternalAPI) MSC2946Spaces( RoomID: roomID, } var response spacesReq - apiURL := h.federationSenderURL + FederationSenderSpacesSummaryPath + apiURL := h.federationAPIURL + FederationAPISpacesSummaryPath err = httputil.PostJSON(ctx, span, h.httpClient, apiURL, &request, &response) if err != nil { return res, err @@ -478,3 +483,93 @@ func (h *httpFederationSenderInternalAPI) MSC2946Spaces( } return response.Res, nil } + +func (s *httpFederationInternalAPI) KeyRing() *gomatrixserverlib.KeyRing { + // This is a bit of a cheat - we tell gomatrixserverlib that this API is + // both the key database and the key fetcher. While this does have the + // rather unfortunate effect of preventing gomatrixserverlib from handling + // key fetchers directly, we can at least reimplement this behaviour on + // the other end of the API. + return &gomatrixserverlib.KeyRing{ + KeyDatabase: s, + KeyFetchers: []gomatrixserverlib.KeyFetcher{}, + } +} + +func (s *httpFederationInternalAPI) FetcherName() string { + return "httpServerKeyInternalAPI" +} + +func (s *httpFederationInternalAPI) StoreKeys( + _ context.Context, + results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, +) error { + // Run in a background context - we don't want to stop this work just + // because the caller gives up waiting. + ctx := context.Background() + request := api.InputPublicKeysRequest{ + Keys: make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult), + } + response := api.InputPublicKeysResponse{} + for req, res := range results { + request.Keys[req] = res + s.cache.StoreServerKey(req, res) + } + return s.InputPublicKeys(ctx, &request, &response) +} + +func (s *httpFederationInternalAPI) FetchKeys( + _ context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + // Run in a background context - we don't want to stop this work just + // because the caller gives up waiting. + ctx := context.Background() + result := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) + request := api.QueryPublicKeysRequest{ + Requests: make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp), + } + response := api.QueryPublicKeysResponse{ + Results: make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult), + } + for req, ts := range requests { + if res, ok := s.cache.GetServerKey(req, ts); ok { + result[req] = res + continue + } + request.Requests[req] = ts + } + err := s.QueryPublicKeys(ctx, &request, &response) + if err != nil { + return nil, err + } + for req, res := range response.Results { + result[req] = res + s.cache.StoreServerKey(req, res) + } + return result, nil +} + +func (h *httpFederationInternalAPI) InputPublicKeys( + ctx context.Context, + request *api.InputPublicKeysRequest, + response *api.InputPublicKeysResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "InputPublicKey") + defer span.Finish() + + apiURL := h.federationAPIURL + FederationAPIInputPublicKeyPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} + +func (h *httpFederationInternalAPI) QueryPublicKeys( + ctx context.Context, + request *api.QueryPublicKeysRequest, + response *api.QueryPublicKeysResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPublicKey") + defer span.Finish() + + apiURL := h.federationAPIURL + FederationAPIQueryPublicKeyPath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/federationsender/inthttp/server.go b/federationapi/inthttp/server.go index a7fbc4ed..7133eddd 100644 --- a/federationsender/inthttp/server.go +++ b/federationapi/inthttp/server.go @@ -5,16 +5,16 @@ import ( "net/http" "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/util" ) -// AddRoutes adds the FederationSenderInternalAPI handlers to the http.ServeMux. +// AddRoutes adds the FederationInternalAPI handlers to the http.ServeMux. // nolint:gocyclo -func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Router) { +func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) { internalAPIMux.Handle( - FederationSenderQueryJoinedHostServerNamesInRoomPath, + FederationAPIQueryJoinedHostServerNamesInRoomPath, httputil.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse { var request api.QueryJoinedHostServerNamesInRoomRequest var response api.QueryJoinedHostServerNamesInRoomResponse @@ -28,7 +28,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderPerformJoinRequestPath, + FederationAPIPerformJoinRequestPath, httputil.MakeInternalAPI("PerformJoinRequest", func(req *http.Request) util.JSONResponse { var request api.PerformJoinRequest var response api.PerformJoinResponse @@ -40,7 +40,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderPerformLeaveRequestPath, + FederationAPIPerformLeaveRequestPath, httputil.MakeInternalAPI("PerformLeaveRequest", func(req *http.Request) util.JSONResponse { var request api.PerformLeaveRequest var response api.PerformLeaveResponse @@ -54,7 +54,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderPerformInviteRequestPath, + FederationAPIPerformInviteRequestPath, httputil.MakeInternalAPI("PerformInviteRequest", func(req *http.Request) util.JSONResponse { var request api.PerformInviteRequest var response api.PerformInviteResponse @@ -68,7 +68,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderPerformDirectoryLookupRequestPath, + FederationAPIPerformDirectoryLookupRequestPath, httputil.MakeInternalAPI("PerformDirectoryLookupRequest", func(req *http.Request) util.JSONResponse { var request api.PerformDirectoryLookupRequest var response api.PerformDirectoryLookupResponse @@ -82,7 +82,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderPerformServersAlivePath, + FederationAPIPerformServersAlivePath, httputil.MakeInternalAPI("PerformServersAliveRequest", func(req *http.Request) util.JSONResponse { var request api.PerformServersAliveRequest var response api.PerformServersAliveResponse @@ -96,7 +96,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderPerformBroadcastEDUPath, + FederationAPIPerformBroadcastEDUPath, httputil.MakeInternalAPI("PerformBroadcastEDU", func(req *http.Request) util.JSONResponse { var request api.PerformBroadcastEDURequest var response api.PerformBroadcastEDUResponse @@ -110,7 +110,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderGetUserDevicesPath, + FederationAPIGetUserDevicesPath, httputil.MakeInternalAPI("GetUserDevices", func(req *http.Request) util.JSONResponse { var request getUserDevices if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -132,7 +132,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderClaimKeysPath, + FederationAPIClaimKeysPath, httputil.MakeInternalAPI("ClaimKeys", func(req *http.Request) util.JSONResponse { var request claimKeys if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -154,7 +154,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderQueryKeysPath, + FederationAPIQueryKeysPath, httputil.MakeInternalAPI("QueryKeys", func(req *http.Request) util.JSONResponse { var request queryKeys if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -176,7 +176,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderBackfillPath, + FederationAPIBackfillPath, httputil.MakeInternalAPI("Backfill", func(req *http.Request) util.JSONResponse { var request backfill if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -198,7 +198,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderLookupStatePath, + FederationAPILookupStatePath, httputil.MakeInternalAPI("LookupState", func(req *http.Request) util.JSONResponse { var request lookupState if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -220,7 +220,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderLookupStateIDsPath, + FederationAPILookupStateIDsPath, httputil.MakeInternalAPI("LookupStateIDs", func(req *http.Request) util.JSONResponse { var request lookupStateIDs if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -242,7 +242,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderGetEventPath, + FederationAPIGetEventPath, httputil.MakeInternalAPI("GetEvent", func(req *http.Request) util.JSONResponse { var request getEvent if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -264,7 +264,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderQueryServerKeysPath, + FederationAPIQueryServerKeysPath, httputil.MakeInternalAPI("QueryServerKeys", func(req *http.Request) util.JSONResponse { var request api.QueryServerKeysRequest var response api.QueryServerKeysResponse @@ -278,7 +278,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderLookupServerKeysPath, + FederationAPILookupServerKeysPath, httputil.MakeInternalAPI("LookupServerKeys", func(req *http.Request) util.JSONResponse { var request lookupServerKeys if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -300,7 +300,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderEventRelationshipsPath, + FederationAPIEventRelationshipsPath, httputil.MakeInternalAPI("MSC2836EventRelationships", func(req *http.Request) util.JSONResponse { var request eventRelationships if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -322,7 +322,7 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route }), ) internalAPIMux.Handle( - FederationSenderSpacesSummaryPath, + FederationAPISpacesSummaryPath, httputil.MakeInternalAPI("MSC2946SpacesSummary", func(req *http.Request) util.JSONResponse { var request spacesReq if err := json.NewDecoder(req.Body).Decode(&request); err != nil { @@ -343,4 +343,32 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route return util.JSONResponse{Code: http.StatusOK, JSON: request} }), ) + internalAPIMux.Handle(FederationAPIQueryPublicKeyPath, + httputil.MakeInternalAPI("queryPublicKeys", func(req *http.Request) util.JSONResponse { + request := api.QueryPublicKeysRequest{} + response := api.QueryPublicKeysResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + keys, err := intAPI.FetchKeys(req.Context(), request.Requests) + if err != nil { + return util.ErrorResponse(err) + } + response.Results = keys + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) + internalAPIMux.Handle(FederationAPIInputPublicKeyPath, + httputil.MakeInternalAPI("inputPublicKeys", func(req *http.Request) util.JSONResponse { + request := api.InputPublicKeysRequest{} + response := api.InputPublicKeysResponse{} + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := intAPI.StoreKeys(req.Context(), request.Keys); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) } diff --git a/federationsender/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index 0123fa24..1306e858 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -21,9 +21,9 @@ import ( "sync" "time" - "github.com/matrix-org/dendrite/federationsender/statistics" - "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrix" @@ -81,7 +81,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re context.TODO(), "", // TODO: remove this, as we don't need to persist the transaction ID oq.destination, // the destination server name - receipt, // NIDs from federationsender_queue_json table + receipt, // NIDs from federationapi_queue_json table ); err != nil { logrus.WithError(err).Errorf("failed to associate PDU %q with destination %q", event.EventID(), oq.destination) return @@ -124,7 +124,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share if err := oq.db.AssociateEDUWithDestination( context.TODO(), oq.destination, // the destination server name - receipt, // NIDs from federationsender_queue_json table + receipt, // NIDs from federationapi_queue_json table ); err != nil { logrus.WithError(err).Errorf("failed to associate EDU with destination %q", oq.destination) return diff --git a/federationsender/queue/queue.go b/federationapi/queue/queue.go index a2ef6e5f..8a6ad155 100644 --- a/federationsender/queue/queue.go +++ b/federationapi/queue/queue.go @@ -22,9 +22,9 @@ import ( "sync" "time" - "github.com/matrix-org/dendrite/federationsender/statistics" - "github.com/matrix-org/dendrite/federationsender/storage" - "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/federationapi/statistics" + "github.com/matrix-org/dendrite/federationapi/storage" + "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" @@ -58,7 +58,7 @@ func init() { var destinationQueueTotal = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "dendrite", - Subsystem: "federationsender", + Subsystem: "federationapi", Name: "destination_queues_total", }, ) @@ -66,7 +66,7 @@ var destinationQueueTotal = prometheus.NewGauge( var destinationQueueRunning = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "dendrite", - Subsystem: "federationsender", + Subsystem: "federationapi", Name: "destination_queues_running", }, ) @@ -74,7 +74,7 @@ var destinationQueueRunning = prometheus.NewGauge( var destinationQueueBackingOff = prometheus.NewGauge( prometheus.GaugeOpts{ Namespace: "dendrite", - Subsystem: "federationsender", + Subsystem: "federationapi", Name: "destination_queues_backing_off", }, ) diff --git a/federationapi/routing/keys.go b/federationapi/routing/keys.go index bba3272b..49a6c558 100644 --- a/federationapi/routing/keys.go +++ b/federationapi/routing/keys.go @@ -21,7 +21,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" @@ -179,7 +179,7 @@ func localKeys(cfg *config.FederationAPI, validUntil time.Time) (*gomatrixserver func NotaryKeys( httpReq *http.Request, cfg *config.FederationAPI, - fsAPI federationSenderAPI.FederationSenderInternalAPI, + fsAPI federationAPI.FederationInternalAPI, req *gomatrixserverlib.PublicKeyNotaryLookupRequest, ) util.JSONResponse { if req == nil { @@ -203,8 +203,8 @@ func NotaryKeys( return util.ErrorResponse(err) } } else { - var resp federationSenderAPI.QueryServerKeysResponse - err := fsAPI.QueryServerKeys(httpReq.Context(), &federationSenderAPI.QueryServerKeysRequest{ + var resp federationAPI.QueryServerKeysResponse + err := fsAPI.QueryServerKeys(httpReq.Context(), &federationAPI.QueryServerKeysRequest{ ServerName: serverName, KeyIDToCriteria: kidToCriteria, }, &resp) diff --git a/federationapi/routing/peek.go b/federationapi/routing/peek.go index 8f83cb15..51132999 100644 --- a/federationapi/routing/peek.go +++ b/federationapi/routing/peek.go @@ -33,7 +33,7 @@ func Peek( roomID, peekID string, remoteVersions []gomatrixserverlib.RoomVersion, ) util.JSONResponse { - // TODO: check if we're just refreshing an existing peek by querying the federationsender + // TODO: check if we're just refreshing an existing peek by querying the federationapi verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} diff --git a/federationapi/routing/query.go b/federationapi/routing/query.go index b4158f0c..47d3b2df 100644 --- a/federationapi/routing/query.go +++ b/federationapi/routing/query.go @@ -19,7 +19,7 @@ import ( "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrix" @@ -33,7 +33,7 @@ func RoomAliasToID( federation *gomatrixserverlib.FederationClient, cfg *config.FederationAPI, rsAPI roomserverAPI.RoomserverInternalAPI, - senderAPI federationSenderAPI.FederationSenderInternalAPI, + senderAPI federationAPI.FederationInternalAPI, ) util.JSONResponse { roomAlias := httpReq.FormValue("room_alias") if roomAlias == "" { @@ -64,8 +64,8 @@ func RoomAliasToID( } if queryRes.RoomID != "" { - serverQueryReq := federationSenderAPI.QueryJoinedHostServerNamesInRoomRequest{RoomID: queryRes.RoomID} - var serverQueryRes federationSenderAPI.QueryJoinedHostServerNamesInRoomResponse + serverQueryReq := federationAPI.QueryJoinedHostServerNamesInRoomRequest{RoomID: queryRes.RoomID} + var serverQueryRes federationAPI.QueryJoinedHostServerNamesInRoomResponse if err = senderAPI.QueryJoinedHostServerNamesInRoom(httpReq.Context(), &serverQueryReq, &serverQueryRes); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("senderAPI.QueryJoinedHostServerNamesInRoom failed") return jsonerror.InternalServerError() diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index 7446f1fb..04c88d95 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -21,7 +21,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/jsonerror" eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationAPI "github.com/matrix-org/dendrite/federationapi/api" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" @@ -46,7 +45,7 @@ func Setup( cfg *config.FederationAPI, rsAPI roomserverAPI.RoomserverInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI, - fsAPI federationSenderAPI.FederationSenderInternalAPI, + fsAPI federationAPI.FederationInternalAPI, keys gomatrixserverlib.JSONVerifier, federation *gomatrixserverlib.FederationClient, userAPI userapi.UserInternalAPI, diff --git a/federationsender/statistics/statistics.go b/federationapi/statistics/statistics.go index b5fe7513..8bac99cb 100644 --- a/federationsender/statistics/statistics.go +++ b/federationapi/statistics/statistics.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/matrix-org/dendrite/federationsender/storage" + "github.com/matrix-org/dendrite/federationapi/storage" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" "go.uber.org/atomic" diff --git a/federationsender/statistics/statistics_test.go b/federationapi/statistics/statistics_test.go index 225350b6..225350b6 100644 --- a/federationsender/statistics/statistics_test.go +++ b/federationapi/statistics/statistics_test.go diff --git a/signingkeyserver/storage/cache/keydb.go b/federationapi/storage/cache/keydb.go index 2063dfc5..2063dfc5 100644 --- a/signingkeyserver/storage/cache/keydb.go +++ b/federationapi/storage/cache/keydb.go diff --git a/federationsender/storage/interface.go b/federationapi/storage/interface.go index 58c8a7cf..a36f5152 100644 --- a/federationsender/storage/interface.go +++ b/federationapi/storage/interface.go @@ -17,14 +17,15 @@ package storage import ( "context" - "github.com/matrix-org/dendrite/federationsender/storage/shared" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/storage/shared" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/gomatrixserverlib" ) type Database interface { internal.PartitionStorer + gomatrixserverlib.KeyDatabase UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error) diff --git a/federationsender/storage/postgres/blacklist_table.go b/federationapi/storage/postgres/blacklist_table.go index eef37318..eef37318 100644 --- a/federationsender/storage/postgres/blacklist_table.go +++ b/federationapi/storage/postgres/blacklist_table.go diff --git a/federationsender/storage/postgres/deltas/2021020411080000_rooms.go b/federationapi/storage/postgres/deltas/2021020411080000_rooms.go index cc4bdadf..cc4bdadf 100644 --- a/federationsender/storage/postgres/deltas/2021020411080000_rooms.go +++ b/federationapi/storage/postgres/deltas/2021020411080000_rooms.go diff --git a/federationsender/storage/postgres/inbound_peeks_table.go b/federationapi/storage/postgres/inbound_peeks_table.go index fe35ce44..df5c6076 100644 --- a/federationsender/storage/postgres/inbound_peeks_table.go +++ b/federationapi/storage/postgres/inbound_peeks_table.go @@ -19,7 +19,7 @@ import ( "database/sql" "time" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" diff --git a/federationsender/storage/postgres/joined_hosts_table.go b/federationapi/storage/postgres/joined_hosts_table.go index 0c1e91ee..5c95b72a 100644 --- a/federationsender/storage/postgres/joined_hosts_table.go +++ b/federationapi/storage/postgres/joined_hosts_table.go @@ -20,7 +20,7 @@ import ( "database/sql" "github.com/lib/pq" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" diff --git a/federationsender/storage/postgres/notary_server_keys_json_table.go b/federationapi/storage/postgres/notary_server_keys_json_table.go index 42e58ba7..65221c08 100644 --- a/federationsender/storage/postgres/notary_server_keys_json_table.go +++ b/federationapi/storage/postgres/notary_server_keys_json_table.go @@ -18,7 +18,7 @@ import ( "context" "database/sql" - "github.com/matrix-org/dendrite/federationsender/storage/tables" + "github.com/matrix-org/dendrite/federationapi/storage/tables" "github.com/matrix-org/gomatrixserverlib" ) diff --git a/federationsender/storage/postgres/notary_server_keys_metadata_table.go b/federationapi/storage/postgres/notary_server_keys_metadata_table.go index b460dcd8..72faf480 100644 --- a/federationsender/storage/postgres/notary_server_keys_metadata_table.go +++ b/federationapi/storage/postgres/notary_server_keys_metadata_table.go @@ -20,7 +20,7 @@ import ( "encoding/json" "github.com/lib/pq" - "github.com/matrix-org/dendrite/federationsender/storage/tables" + "github.com/matrix-org/dendrite/federationapi/storage/tables" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/gomatrixserverlib" ) diff --git a/federationsender/storage/postgres/outbound_peeks_table.go b/federationapi/storage/postgres/outbound_peeks_table.go index 596b4bcc..c22d893f 100644 --- a/federationsender/storage/postgres/outbound_peeks_table.go +++ b/federationapi/storage/postgres/outbound_peeks_table.go @@ -19,7 +19,7 @@ import ( "database/sql" "time" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" diff --git a/federationsender/storage/postgres/queue_edus_table.go b/federationapi/storage/postgres/queue_edus_table.go index 6cac489b..6cac489b 100644 --- a/federationsender/storage/postgres/queue_edus_table.go +++ b/federationapi/storage/postgres/queue_edus_table.go diff --git a/federationsender/storage/postgres/queue_json_table.go b/federationapi/storage/postgres/queue_json_table.go index 85307374..85307374 100644 --- a/federationsender/storage/postgres/queue_json_table.go +++ b/federationapi/storage/postgres/queue_json_table.go diff --git a/federationsender/storage/postgres/queue_pdus_table.go b/federationapi/storage/postgres/queue_pdus_table.go index f9a47748..f9a47748 100644 --- a/federationsender/storage/postgres/queue_pdus_table.go +++ b/federationapi/storage/postgres/queue_pdus_table.go diff --git a/signingkeyserver/storage/postgres/server_key_table.go b/federationapi/storage/postgres/server_key_table.go index 87f1c211..16e294e0 100644 --- a/signingkeyserver/storage/postgres/server_key_table.go +++ b/federationapi/storage/postgres/server_key_table.go @@ -21,10 +21,11 @@ import ( "github.com/lib/pq" "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" ) -const serverKeysSchema = ` +const serverSigningKeysSchema = ` -- A cache of signing keys downloaded from remote servers. CREATE TABLE IF NOT EXISTS keydb_server_keys ( -- The name of the matrix server the key is for. @@ -48,39 +49,40 @@ CREATE TABLE IF NOT EXISTS keydb_server_keys ( CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id); ` -const bulkSelectServerKeysSQL = "" + +const bulkSelectServerSigningKeysSQL = "" + "SELECT server_name, server_key_id, valid_until_ts, expired_ts, " + " server_key FROM keydb_server_keys" + " WHERE server_name_and_key_id = ANY($1)" -const upsertServerKeysSQL = "" + +const upsertServerSigningKeysSQL = "" + "INSERT INTO keydb_server_keys (server_name, server_key_id," + " server_name_and_key_id, valid_until_ts, expired_ts, server_key)" + " VALUES ($1, $2, $3, $4, $5, $6)" + " ON CONFLICT ON CONSTRAINT keydb_server_keys_unique" + " DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6" -type serverKeyStatements struct { +type serverSigningKeyStatements struct { bulkSelectServerKeysStmt *sql.Stmt upsertServerKeysStmt *sql.Stmt } -func (s *serverKeyStatements) prepare(db *sql.DB) (err error) { - _, err = db.Exec(serverKeysSchema) +func NewPostgresServerSigningKeysTable(db *sql.DB) (s *serverSigningKeyStatements, err error) { + s = &serverSigningKeyStatements{} + _, err = db.Exec(serverSigningKeysSchema) if err != nil { return } - if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerKeysSQL); err != nil { + if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerSigningKeysSQL); err != nil { return } - if s.upsertServerKeysStmt, err = db.Prepare(upsertServerKeysSQL); err != nil { + if s.upsertServerKeysStmt, err = db.Prepare(upsertServerSigningKeysSQL); err != nil { return } - return + return s, nil } -func (s *serverKeyStatements) bulkSelectServerKeys( - ctx context.Context, +func (s *serverSigningKeyStatements) BulkSelectServerKeys( + ctx context.Context, txn *sql.Tx, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, ) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { var nameAndKeyIDs []string @@ -121,12 +123,13 @@ func (s *serverKeyStatements) bulkSelectServerKeys( return results, rows.Err() } -func (s *serverKeyStatements) upsertServerKeys( - ctx context.Context, +func (s *serverSigningKeyStatements) UpsertServerKeys( + ctx context.Context, txn *sql.Tx, request gomatrixserverlib.PublicKeyLookupRequest, key gomatrixserverlib.PublicKeyLookupResult, ) error { - _, err := s.upsertServerKeysStmt.ExecContext( + stmt := sqlutil.TxStmt(txn, s.upsertServerKeysStmt) + _, err := stmt.ExecContext( ctx, string(request.ServerName), string(request.KeyID), diff --git a/federationsender/storage/postgres/storage.go b/federationapi/storage/postgres/storage.go index 5507bad7..1f6afe37 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationapi/storage/postgres/storage.go @@ -19,8 +19,8 @@ import ( "database/sql" "fmt" - "github.com/matrix-org/dendrite/federationsender/storage/postgres/deltas" - "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/federationapi/storage/postgres/deltas" + "github.com/matrix-org/dendrite/federationapi/storage/shared" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" @@ -35,7 +35,7 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache) (*Database, error) { var d Database var err error if d.db, err = sqlutil.Open(dbProperties); err != nil { @@ -78,24 +78,29 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, fmt.Errorf("NewPostgresNotaryServerKeysMetadataTable: %s", err) } + serverSigningKeys, err := NewPostgresServerSigningKeysTable(d.db) + if err != nil { + return nil, err + } m := sqlutil.NewMigrations() deltas.LoadRemoveRoomsTable(m) if err = m.RunDeltas(d.db, dbProperties); err != nil { return nil, err } d.Database = shared.Database{ - DB: d.db, - Cache: cache, - Writer: d.writer, - FederationSenderJoinedHosts: joinedHosts, - FederationSenderQueuePDUs: queuePDUs, - FederationSenderQueueEDUs: queueEDUs, - FederationSenderQueueJSON: queueJSON, - FederationSenderBlacklist: blacklist, - FederationSenderInboundPeeks: inboundPeeks, - FederationSenderOutboundPeeks: outboundPeeks, - NotaryServerKeysJSON: notaryJSON, - NotaryServerKeysMetadata: notaryMetadata, + DB: d.db, + Cache: cache, + Writer: d.writer, + FederationJoinedHosts: joinedHosts, + FederationQueuePDUs: queuePDUs, + FederationQueueEDUs: queueEDUs, + FederationQueueJSON: queueJSON, + FederationBlacklist: blacklist, + FederationInboundPeeks: inboundPeeks, + FederationOutboundPeeks: outboundPeeks, + NotaryServerKeysJSON: notaryJSON, + NotaryServerKeysMetadata: notaryMetadata, + ServerSigningKeys: serverSigningKeys, } if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err diff --git a/federationsender/storage/shared/storage.go b/federationapi/storage/shared/storage.go index 45c9febd..ddd770e2 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationapi/storage/shared/storage.go @@ -20,26 +20,27 @@ import ( "fmt" "time" - "github.com/matrix-org/dendrite/federationsender/storage/tables" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/storage/tables" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" ) type Database struct { - DB *sql.DB - Cache caching.FederationSenderCache - Writer sqlutil.Writer - FederationSenderQueuePDUs tables.FederationSenderQueuePDUs - FederationSenderQueueEDUs tables.FederationSenderQueueEDUs - FederationSenderQueueJSON tables.FederationSenderQueueJSON - FederationSenderJoinedHosts tables.FederationSenderJoinedHosts - FederationSenderBlacklist tables.FederationSenderBlacklist - FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks - FederationSenderInboundPeeks tables.FederationSenderInboundPeeks - NotaryServerKeysJSON tables.FederationSenderNotaryServerKeysJSON - NotaryServerKeysMetadata tables.FederationSenderNotaryServerKeysMetadata + DB *sql.DB + Cache caching.FederationCache + Writer sqlutil.Writer + FederationQueuePDUs tables.FederationQueuePDUs + FederationQueueEDUs tables.FederationQueueEDUs + FederationQueueJSON tables.FederationQueueJSON + FederationJoinedHosts tables.FederationJoinedHosts + FederationBlacklist tables.FederationBlacklist + FederationOutboundPeeks tables.FederationOutboundPeeks + FederationInboundPeeks tables.FederationInboundPeeks + NotaryServerKeysJSON tables.FederationNotaryServerKeysJSON + NotaryServerKeysMetadata tables.FederationNotaryServerKeysMetadata + ServerSigningKeys tables.FederationServerSigningKeys } // An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs. @@ -66,18 +67,18 @@ func (d *Database) UpdateRoom( removeHosts []string, ) (joinedHosts []types.JoinedHost, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - joinedHosts, err = d.FederationSenderJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID) + joinedHosts, err = d.FederationJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID) if err != nil { return err } for _, add := range addHosts { - err = d.FederationSenderJoinedHosts.InsertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName) + err = d.FederationJoinedHosts.InsertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName) if err != nil { return err } } - if err = d.FederationSenderJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil { + if err = d.FederationJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil { return err } return nil @@ -91,18 +92,18 @@ func (d *Database) UpdateRoom( func (d *Database) GetJoinedHosts( ctx context.Context, roomID string, ) ([]types.JoinedHost, error) { - return d.FederationSenderJoinedHosts.SelectJoinedHosts(ctx, roomID) + return d.FederationJoinedHosts.SelectJoinedHosts(ctx, roomID) } // GetAllJoinedHosts returns the currently joined hosts for // all rooms known to the federation sender. // Returns an error if something goes wrong. func (d *Database) GetAllJoinedHosts(ctx context.Context) ([]gomatrixserverlib.ServerName, error) { - return d.FederationSenderJoinedHosts.SelectAllJoinedHosts(ctx) + return d.FederationJoinedHosts.SelectAllJoinedHosts(ctx) } func (d *Database) GetJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) { - return d.FederationSenderJoinedHosts.SelectJoinedHostsForRooms(ctx, roomIDs) + return d.FederationJoinedHosts.SelectJoinedHostsForRooms(ctx, roomIDs) } // StoreJSON adds a JSON blob into the queue JSON table and returns @@ -114,7 +115,7 @@ func (d *Database) StoreJSON( var nid int64 var err error _ = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - nid, err = d.FederationSenderQueueJSON.InsertQueueJSON(ctx, txn, js) + nid, err = d.FederationQueueJSON.InsertQueueJSON(ctx, txn, js) return err }) if err != nil { @@ -132,8 +133,8 @@ func (d *Database) PurgeRoomState( // If the event is a create event then we'll delete all of the existing // data for the room. The only reason that a create event would be replayed // to us in this way is if we're about to receive the entire room state. - if err := d.FederationSenderJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil { - return fmt.Errorf("d.FederationSenderJoinedHosts.DeleteJoinedHosts: %w", err) + if err := d.FederationJoinedHosts.DeleteJoinedHostsForRoom(ctx, txn, roomID); err != nil { + return fmt.Errorf("d.FederationJoinedHosts.DeleteJoinedHosts: %w", err) } return nil }) @@ -141,64 +142,64 @@ func (d *Database) PurgeRoomState( func (d *Database) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.FederationSenderBlacklist.InsertBlacklist(context.TODO(), txn, serverName) + return d.FederationBlacklist.InsertBlacklist(context.TODO(), txn, serverName) }) } func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.FederationSenderBlacklist.DeleteBlacklist(context.TODO(), txn, serverName) + return d.FederationBlacklist.DeleteBlacklist(context.TODO(), txn, serverName) }) } func (d *Database) RemoveAllServersFromBlacklist() error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.FederationSenderBlacklist.DeleteAllBlacklist(context.TODO(), txn) + return d.FederationBlacklist.DeleteAllBlacklist(context.TODO(), txn) }) } func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) { - return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName) + return d.FederationBlacklist.SelectBlacklist(context.TODO(), nil, serverName) } func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + return d.FederationOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) }) } func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + return d.FederationOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) }) } func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) { - return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID) + return d.FederationOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID) } func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) { - return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID) + return d.FederationOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID) } func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + return d.FederationInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) }) } func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) + return d.FederationInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval) }) } func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) { - return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID) + return d.FederationInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID) } func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) { - return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID) + return d.FederationInboundPeeks.SelectInboundPeeks(ctx, nil, roomID) } func (d *Database) UpdateNotaryKeys(ctx context.Context, serverName gomatrixserverlib.ServerName, serverKeys gomatrixserverlib.ServerKeys) error { diff --git a/federationsender/storage/shared/storage_edus.go b/federationapi/storage/shared/storage_edus.go index 86fee1a3..6e3c7e36 100644 --- a/federationsender/storage/shared/storage_edus.go +++ b/federationapi/storage/shared/storage_edus.go @@ -33,12 +33,12 @@ func (d *Database) AssociateEDUWithDestination( receipt *Receipt, ) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - if err := d.FederationSenderQueueEDUs.InsertQueueEDU( + if err := d.FederationQueueEDUs.InsertQueueEDU( ctx, // context txn, // SQL transaction "", // TODO: EDU type for coalescing serverName, // destination server name - receipt.nid, // NID from the federationsender_queue_json table + receipt.nid, // NID from the federationapi_queue_json table ); err != nil { return fmt.Errorf("InsertQueueEDU: %w", err) } @@ -58,21 +58,21 @@ func (d *Database) GetPendingEDUs( ) { edus = make(map[*Receipt]*gomatrixserverlib.EDU) err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - nids, err := d.FederationSenderQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) + nids, err := d.FederationQueueEDUs.SelectQueueEDUs(ctx, txn, serverName, limit) if err != nil { return fmt.Errorf("SelectQueueEDUs: %w", err) } retrieve := make([]int64, 0, len(nids)) for _, nid := range nids { - if edu, ok := d.Cache.GetFederationSenderQueuedEDU(nid); ok { + if edu, ok := d.Cache.GetFederationQueuedEDU(nid); ok { edus[&Receipt{nid}] = edu } else { retrieve = append(retrieve, nid) } } - blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) + blobs, err := d.FederationQueueJSON.SelectQueueJSON(ctx, txn, retrieve) if err != nil { return fmt.Errorf("SelectQueueJSON: %w", err) } @@ -107,24 +107,24 @@ func (d *Database) CleanEDUs( } return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - if err := d.FederationSenderQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil { + if err := d.FederationQueueEDUs.DeleteQueueEDUs(ctx, txn, serverName, nids); err != nil { return err } var deleteNIDs []int64 for _, nid := range nids { - count, err := d.FederationSenderQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid) + count, err := d.FederationQueueEDUs.SelectQueueEDUReferenceJSONCount(ctx, txn, nid) if err != nil { return fmt.Errorf("SelectQueueEDUReferenceJSONCount: %w", err) } if count == 0 { deleteNIDs = append(deleteNIDs, nid) - d.Cache.EvictFederationSenderQueuedEDU(nid) + d.Cache.EvictFederationQueuedEDU(nid) } } if len(deleteNIDs) > 0 { - if err := d.FederationSenderQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil { + if err := d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil { return fmt.Errorf("DeleteQueueJSON: %w", err) } } @@ -139,7 +139,7 @@ func (d *Database) GetPendingEDUCount( ctx context.Context, serverName gomatrixserverlib.ServerName, ) (int64, error) { - return d.FederationSenderQueueEDUs.SelectQueueEDUCount(ctx, nil, serverName) + return d.FederationQueueEDUs.SelectQueueEDUCount(ctx, nil, serverName) } // GetPendingServerNames returns the server names that have EDUs @@ -147,5 +147,5 @@ func (d *Database) GetPendingEDUCount( func (d *Database) GetPendingEDUServerNames( ctx context.Context, ) ([]gomatrixserverlib.ServerName, error) { - return d.FederationSenderQueueEDUs.SelectQueueEDUServerNames(ctx, nil) + return d.FederationQueueEDUs.SelectQueueEDUServerNames(ctx, nil) } diff --git a/federationapi/storage/shared/storage_keys.go b/federationapi/storage/shared/storage_keys.go new file mode 100644 index 00000000..3222b122 --- /dev/null +++ b/federationapi/storage/shared/storage_keys.go @@ -0,0 +1,59 @@ +// Copyright 2017-2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "database/sql" + + "github.com/matrix-org/gomatrixserverlib" +) + +// FetcherName implements KeyFetcher +func (d Database) FetcherName() string { + return "FederationAPIKeyDatabase" +} + +// FetchKeys implements gomatrixserverlib.KeyDatabase +func (d *Database) FetchKeys( + ctx context.Context, + requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, +) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { + return d.ServerSigningKeys.BulkSelectServerKeys(ctx, nil, requests) +} + +// StoreKeys implements gomatrixserverlib.KeyDatabase +func (d *Database) StoreKeys( + ctx context.Context, + keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, +) error { + return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + var lastErr error + for request, keys := range keyMap { + if err := d.ServerSigningKeys.UpsertServerKeys(ctx, txn, request, keys); err != nil { + // Rather than returning immediately on error we try to insert the + // remaining keys. + // Since we are inserting the keys outside of a transaction it is + // possible for some of the inserts to succeed even though some + // of the inserts have failed. + // Ensuring that we always insert all the keys we can means that + // this behaviour won't depend on the iteration order of the map. + lastErr = err + } + } + return lastErr + }) +} diff --git a/federationsender/storage/shared/storage_pdus.go b/federationapi/storage/shared/storage_pdus.go index bc298a90..5a12c388 100644 --- a/federationsender/storage/shared/storage_pdus.go +++ b/federationapi/storage/shared/storage_pdus.go @@ -34,12 +34,12 @@ func (d *Database) AssociatePDUWithDestination( receipt *Receipt, ) error { return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - if err := d.FederationSenderQueuePDUs.InsertQueuePDU( + if err := d.FederationQueuePDUs.InsertQueuePDU( ctx, // context txn, // SQL transaction transactionID, // transaction ID serverName, // destination server name - receipt.nid, // NID from the federationsender_queue_json table + receipt.nid, // NID from the federationapi_queue_json table ); err != nil { return fmt.Errorf("InsertQueuePDU: %w", err) } @@ -64,21 +64,21 @@ func (d *Database) GetPendingPDUs( // the database. events = make(map[*Receipt]*gomatrixserverlib.HeaderedEvent) err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - nids, err := d.FederationSenderQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit) + nids, err := d.FederationQueuePDUs.SelectQueuePDUs(ctx, txn, serverName, limit) if err != nil { return fmt.Errorf("SelectQueuePDUs: %w", err) } retrieve := make([]int64, 0, len(nids)) for _, nid := range nids { - if event, ok := d.Cache.GetFederationSenderQueuedPDU(nid); ok { + if event, ok := d.Cache.GetFederationQueuedPDU(nid); ok { events[&Receipt{nid}] = event } else { retrieve = append(retrieve, nid) } } - blobs, err := d.FederationSenderQueueJSON.SelectQueueJSON(ctx, txn, retrieve) + blobs, err := d.FederationQueueJSON.SelectQueueJSON(ctx, txn, retrieve) if err != nil { return fmt.Errorf("SelectQueueJSON: %w", err) } @@ -89,7 +89,7 @@ func (d *Database) GetPendingPDUs( return fmt.Errorf("json.Unmarshal: %w", err) } events[&Receipt{nid}] = &event - d.Cache.StoreFederationSenderQueuedPDU(nid, &event) + d.Cache.StoreFederationQueuedPDU(nid, &event) } return nil @@ -115,24 +115,24 @@ func (d *Database) CleanPDUs( } return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - if err := d.FederationSenderQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil { + if err := d.FederationQueuePDUs.DeleteQueuePDUs(ctx, txn, serverName, nids); err != nil { return err } var deleteNIDs []int64 for _, nid := range nids { - count, err := d.FederationSenderQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid) + count, err := d.FederationQueuePDUs.SelectQueuePDUReferenceJSONCount(ctx, txn, nid) if err != nil { return fmt.Errorf("SelectQueuePDUReferenceJSONCount: %w", err) } if count == 0 { deleteNIDs = append(deleteNIDs, nid) - d.Cache.EvictFederationSenderQueuedPDU(nid) + d.Cache.EvictFederationQueuedPDU(nid) } } if len(deleteNIDs) > 0 { - if err := d.FederationSenderQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil { + if err := d.FederationQueueJSON.DeleteQueueJSON(ctx, txn, deleteNIDs); err != nil { return fmt.Errorf("DeleteQueueJSON: %w", err) } } @@ -147,7 +147,7 @@ func (d *Database) GetPendingPDUCount( ctx context.Context, serverName gomatrixserverlib.ServerName, ) (int64, error) { - return d.FederationSenderQueuePDUs.SelectQueuePDUCount(ctx, nil, serverName) + return d.FederationQueuePDUs.SelectQueuePDUCount(ctx, nil, serverName) } // GetPendingServerNames returns the server names that have PDUs @@ -155,5 +155,5 @@ func (d *Database) GetPendingPDUCount( func (d *Database) GetPendingPDUServerNames( ctx context.Context, ) ([]gomatrixserverlib.ServerName, error) { - return d.FederationSenderQueuePDUs.SelectQueuePDUServerNames(ctx, nil) + return d.FederationQueuePDUs.SelectQueuePDUServerNames(ctx, nil) } diff --git a/federationsender/storage/sqlite3/blacklist_table.go b/federationapi/storage/sqlite3/blacklist_table.go index 2694e630..2694e630 100644 --- a/federationsender/storage/sqlite3/blacklist_table.go +++ b/federationapi/storage/sqlite3/blacklist_table.go diff --git a/federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go b/federationapi/storage/sqlite3/deltas/2021020411080000_rooms.go index cc4bdadf..cc4bdadf 100644 --- a/federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go +++ b/federationapi/storage/sqlite3/deltas/2021020411080000_rooms.go diff --git a/federationsender/storage/sqlite3/inbound_peeks_table.go b/federationapi/storage/sqlite3/inbound_peeks_table.go index d5eacf9e..ad3c4a6d 100644 --- a/federationsender/storage/sqlite3/inbound_peeks_table.go +++ b/federationapi/storage/sqlite3/inbound_peeks_table.go @@ -19,7 +19,7 @@ import ( "database/sql" "time" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" diff --git a/federationsender/storage/sqlite3/joined_hosts_table.go b/federationapi/storage/sqlite3/joined_hosts_table.go index 4c0c1f51..e0e0f287 100644 --- a/federationsender/storage/sqlite3/joined_hosts_table.go +++ b/federationapi/storage/sqlite3/joined_hosts_table.go @@ -20,7 +20,7 @@ import ( "database/sql" "strings" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" diff --git a/federationsender/storage/sqlite3/notary_server_keys_json_table.go b/federationapi/storage/sqlite3/notary_server_keys_json_table.go index 6990036a..4a028fa2 100644 --- a/federationsender/storage/sqlite3/notary_server_keys_json_table.go +++ b/federationapi/storage/sqlite3/notary_server_keys_json_table.go @@ -18,7 +18,7 @@ import ( "context" "database/sql" - "github.com/matrix-org/dendrite/federationsender/storage/tables" + "github.com/matrix-org/dendrite/federationapi/storage/tables" "github.com/matrix-org/gomatrixserverlib" ) diff --git a/federationsender/storage/sqlite3/notary_server_keys_metadata_table.go b/federationapi/storage/sqlite3/notary_server_keys_metadata_table.go index a2959407..55709a96 100644 --- a/federationsender/storage/sqlite3/notary_server_keys_metadata_table.go +++ b/federationapi/storage/sqlite3/notary_server_keys_metadata_table.go @@ -21,7 +21,7 @@ import ( "fmt" "strings" - "github.com/matrix-org/dendrite/federationsender/storage/tables" + "github.com/matrix-org/dendrite/federationapi/storage/tables" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" diff --git a/federationsender/storage/sqlite3/outbound_peeks_table.go b/federationapi/storage/sqlite3/outbound_peeks_table.go index 02aefce7..e29026fa 100644 --- a/federationsender/storage/sqlite3/outbound_peeks_table.go +++ b/federationapi/storage/sqlite3/outbound_peeks_table.go @@ -19,7 +19,7 @@ import ( "database/sql" "time" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/gomatrixserverlib" diff --git a/federationsender/storage/sqlite3/queue_edus_table.go b/federationapi/storage/sqlite3/queue_edus_table.go index a6d60950..a6d60950 100644 --- a/federationsender/storage/sqlite3/queue_edus_table.go +++ b/federationapi/storage/sqlite3/queue_edus_table.go diff --git a/federationsender/storage/sqlite3/queue_json_table.go b/federationapi/storage/sqlite3/queue_json_table.go index 3e3f60f6..3e3f60f6 100644 --- a/federationsender/storage/sqlite3/queue_json_table.go +++ b/federationapi/storage/sqlite3/queue_json_table.go diff --git a/federationsender/storage/sqlite3/queue_pdus_table.go b/federationapi/storage/sqlite3/queue_pdus_table.go index e0fdbda5..e0fdbda5 100644 --- a/federationsender/storage/sqlite3/queue_pdus_table.go +++ b/federationapi/storage/sqlite3/queue_pdus_table.go diff --git a/signingkeyserver/storage/sqlite3/server_key_table.go b/federationapi/storage/sqlite3/server_key_table.go index 2484d636..9b89649f 100644 --- a/signingkeyserver/storage/sqlite3/server_key_table.go +++ b/federationapi/storage/sqlite3/server_key_table.go @@ -24,7 +24,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -const serverKeysSchema = ` +const serverSigningKeysSchema = ` -- A cache of signing keys downloaded from remote servers. CREATE TABLE IF NOT EXISTS keydb_server_keys ( -- The name of the matrix server the key is for. @@ -48,43 +48,43 @@ CREATE TABLE IF NOT EXISTS keydb_server_keys ( CREATE INDEX IF NOT EXISTS keydb_server_name_and_key_id ON keydb_server_keys (server_name_and_key_id); ` -const bulkSelectServerKeysSQL = "" + +const bulkSelectServerSigningKeysSQL = "" + "SELECT server_name, server_key_id, valid_until_ts, expired_ts, " + " server_key FROM keydb_server_keys" + " WHERE server_name_and_key_id IN ($1)" -const upsertServerKeysSQL = "" + +const upsertServerSigningKeysSQL = "" + "INSERT INTO keydb_server_keys (server_name, server_key_id," + " server_name_and_key_id, valid_until_ts, expired_ts, server_key)" + " VALUES ($1, $2, $3, $4, $5, $6)" + " ON CONFLICT (server_name, server_key_id)" + " DO UPDATE SET valid_until_ts = $4, expired_ts = $5, server_key = $6" -type serverKeyStatements struct { +type serverSigningKeyStatements struct { db *sql.DB - writer sqlutil.Writer bulkSelectServerKeysStmt *sql.Stmt upsertServerKeysStmt *sql.Stmt } -func (s *serverKeyStatements) prepare(db *sql.DB, writer sqlutil.Writer) (err error) { - s.db = db - s.writer = writer - _, err = db.Exec(serverKeysSchema) +func NewSQLiteServerSigningKeysTable(db *sql.DB) (s *serverSigningKeyStatements, err error) { + s = &serverSigningKeyStatements{ + db: db, + } + _, err = db.Exec(serverSigningKeysSchema) if err != nil { return } - if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerKeysSQL); err != nil { + if s.bulkSelectServerKeysStmt, err = db.Prepare(bulkSelectServerSigningKeysSQL); err != nil { return } - if s.upsertServerKeysStmt, err = db.Prepare(upsertServerKeysSQL); err != nil { + if s.upsertServerKeysStmt, err = db.Prepare(upsertServerSigningKeysSQL); err != nil { return } - return + return s, nil } -func (s *serverKeyStatements) bulkSelectServerKeys( - ctx context.Context, +func (s *serverSigningKeyStatements) BulkSelectServerKeys( + ctx context.Context, txn *sql.Tx, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, ) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { nameAndKeyIDs := make([]string, 0, len(requests)) @@ -98,7 +98,7 @@ func (s *serverKeyStatements) bulkSelectServerKeys( } err := sqlutil.RunLimitedVariablesQuery( - ctx, bulkSelectServerKeysSQL, s.db, iKeyIDs, sqlutil.SQLite3MaxVariables, + ctx, bulkSelectServerSigningKeysSQL, s.db, iKeyIDs, sqlutil.SQLite3MaxVariables, func(rows *sql.Rows) error { for rows.Next() { var serverName string @@ -134,24 +134,22 @@ func (s *serverKeyStatements) bulkSelectServerKeys( return results, nil } -func (s *serverKeyStatements) upsertServerKeys( - ctx context.Context, +func (s *serverSigningKeyStatements) UpsertServerKeys( + ctx context.Context, txn *sql.Tx, request gomatrixserverlib.PublicKeyLookupRequest, key gomatrixserverlib.PublicKeyLookupResult, ) error { - return s.writer.Do(s.db, nil, func(txn *sql.Tx) error { - stmt := sqlutil.TxStmt(txn, s.upsertServerKeysStmt) - _, err := stmt.ExecContext( - ctx, - string(request.ServerName), - string(request.KeyID), - nameAndKeyID(request), - key.ValidUntilTS, - key.ExpiredTS, - key.Key.Encode(), - ) - return err - }) + stmt := sqlutil.TxStmt(txn, s.upsertServerKeysStmt) + _, err := stmt.ExecContext( + ctx, + string(request.ServerName), + string(request.KeyID), + nameAndKeyID(request), + key.ValidUntilTS, + key.ExpiredTS, + key.Key.Encode(), + ) + return err } func nameAndKeyID(request gomatrixserverlib.PublicKeyLookupRequest) string { diff --git a/federationsender/storage/sqlite3/storage.go b/federationapi/storage/sqlite3/storage.go index 18fa418f..0fe6df5d 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationapi/storage/sqlite3/storage.go @@ -18,8 +18,8 @@ package sqlite3 import ( "database/sql" - "github.com/matrix-org/dendrite/federationsender/storage/shared" - "github.com/matrix-org/dendrite/federationsender/storage/sqlite3/deltas" + "github.com/matrix-org/dendrite/federationapi/storage/shared" + "github.com/matrix-org/dendrite/federationapi/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" @@ -34,7 +34,7 @@ type Database struct { } // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (*Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache) (*Database, error) { var d Database var err error if d.db, err = sqlutil.Open(dbProperties); err != nil { @@ -77,24 +77,29 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + serverSigningKeys, err := NewSQLiteServerSigningKeysTable(d.db) + if err != nil { + return nil, err + } m := sqlutil.NewMigrations() deltas.LoadRemoveRoomsTable(m) if err = m.RunDeltas(d.db, dbProperties); err != nil { return nil, err } d.Database = shared.Database{ - DB: d.db, - Cache: cache, - Writer: d.writer, - FederationSenderJoinedHosts: joinedHosts, - FederationSenderQueuePDUs: queuePDUs, - FederationSenderQueueEDUs: queueEDUs, - FederationSenderQueueJSON: queueJSON, - FederationSenderBlacklist: blacklist, - FederationSenderOutboundPeeks: outboundPeeks, - FederationSenderInboundPeeks: inboundPeeks, - NotaryServerKeysJSON: notaryKeys, - NotaryServerKeysMetadata: notaryKeysMetadata, + DB: d.db, + Cache: cache, + Writer: d.writer, + FederationJoinedHosts: joinedHosts, + FederationQueuePDUs: queuePDUs, + FederationQueueEDUs: queueEDUs, + FederationQueueJSON: queueJSON, + FederationBlacklist: blacklist, + FederationOutboundPeeks: outboundPeeks, + FederationInboundPeeks: inboundPeeks, + NotaryServerKeysJSON: notaryKeys, + NotaryServerKeysMetadata: notaryKeysMetadata, + ServerSigningKeys: serverSigningKeys, } if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil { return nil, err diff --git a/federationsender/storage/storage.go b/federationapi/storage/storage.go index 46e01f25..083f0b30 100644 --- a/federationsender/storage/storage.go +++ b/federationapi/storage/storage.go @@ -20,14 +20,14 @@ package storage import ( "fmt" - "github.com/matrix-org/dendrite/federationsender/storage/postgres" - "github.com/matrix-org/dendrite/federationsender/storage/sqlite3" + "github.com/matrix-org/dendrite/federationapi/storage/postgres" + "github.com/matrix-org/dendrite/federationapi/storage/sqlite3" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): return sqlite3.NewDatabase(dbProperties, cache) diff --git a/federationsender/storage/storage_wasm.go b/federationapi/storage/storage_wasm.go index bc52bd9b..455464e7 100644 --- a/federationsender/storage/storage_wasm.go +++ b/federationapi/storage/storage_wasm.go @@ -17,13 +17,13 @@ package storage import ( "fmt" - "github.com/matrix-org/dendrite/federationsender/storage/sqlite3" + "github.com/matrix-org/dendrite/federationapi/storage/sqlite3" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/setup/config" ) // NewDatabase opens a new database -func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationSenderCache) (Database, error) { +func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationCache) (Database, error) { switch { case dbProperties.ConnectionString.IsSQLite(): return sqlite3.NewDatabase(dbProperties, cache) diff --git a/federationsender/storage/tables/interface.go b/federationapi/storage/tables/interface.go index 663a4cb2..19357393 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationapi/storage/tables/interface.go @@ -18,13 +18,13 @@ import ( "context" "database/sql" - "github.com/matrix-org/dendrite/federationsender/types" + "github.com/matrix-org/dendrite/federationapi/types" "github.com/matrix-org/gomatrixserverlib" ) type NotaryID int64 -type FederationSenderQueuePDUs interface { +type FederationQueuePDUs interface { InsertQueuePDU(ctx context.Context, txn *sql.Tx, transactionID gomatrixserverlib.TransactionID, serverName gomatrixserverlib.ServerName, nid int64) error DeleteQueuePDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error SelectQueuePDUReferenceJSONCount(ctx context.Context, txn *sql.Tx, jsonNID int64) (int64, error) @@ -33,7 +33,7 @@ type FederationSenderQueuePDUs interface { SelectQueuePDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) } -type FederationSenderQueueEDUs interface { +type FederationQueueEDUs interface { InsertQueueEDU(ctx context.Context, txn *sql.Tx, eduType string, serverName gomatrixserverlib.ServerName, nid int64) error DeleteQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, jsonNIDs []int64) error SelectQueueEDUs(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, limit int) ([]int64, error) @@ -42,13 +42,13 @@ type FederationSenderQueueEDUs interface { SelectQueueEDUServerNames(ctx context.Context, txn *sql.Tx) ([]gomatrixserverlib.ServerName, error) } -type FederationSenderQueueJSON interface { +type FederationQueueJSON interface { InsertQueueJSON(ctx context.Context, txn *sql.Tx, json string) (int64, error) DeleteQueueJSON(ctx context.Context, txn *sql.Tx, nids []int64) error SelectQueueJSON(ctx context.Context, txn *sql.Tx, jsonNIDs []int64) (map[int64][]byte, error) } -type FederationSenderJoinedHosts interface { +type FederationJoinedHosts interface { InsertJoinedHosts(ctx context.Context, txn *sql.Tx, roomID, eventID string, serverName gomatrixserverlib.ServerName) error DeleteJoinedHosts(ctx context.Context, txn *sql.Tx, eventIDs []string) error DeleteJoinedHostsForRoom(ctx context.Context, txn *sql.Tx, roomID string) error @@ -58,14 +58,14 @@ type FederationSenderJoinedHosts interface { SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) } -type FederationSenderBlacklist interface { +type FederationBlacklist interface { InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error) DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error DeleteAllBlacklist(ctx context.Context, txn *sql.Tx) error } -type FederationSenderOutboundPeeks interface { +type FederationOutboundPeeks interface { InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error) @@ -74,7 +74,7 @@ type FederationSenderOutboundPeeks interface { DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) } -type FederationSenderInboundPeeks interface { +type FederationInboundPeeks interface { InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error) SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error) @@ -83,9 +83,9 @@ type FederationSenderInboundPeeks interface { DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error) } -// FederationSenderNotaryServerKeysJSON contains the byte-for-byte responses from servers which contain their keys and is signed by them. -type FederationSenderNotaryServerKeysJSON interface { - // InsertJSONResponse inserts a new response JSON. Useless on its own, needs querying via FederationSenderNotaryServerKeysMetadata +// FederationNotaryServerKeysJSON contains the byte-for-byte responses from servers which contain their keys and is signed by them. +type FederationNotaryServerKeysJSON interface { + // InsertJSONResponse inserts a new response JSON. Useless on its own, needs querying via FederationNotaryServerKeysMetadata // `validUntil` should be the value of `valid_until_ts` with the 7-day check applied from: // "Servers MUST use the lesser of this field and 7 days into the future when determining if a key is valid. // This is to avoid a situation where an attacker publishes a key which is valid for a significant amount of time @@ -93,14 +93,19 @@ type FederationSenderNotaryServerKeysJSON interface { InsertJSONResponse(ctx context.Context, txn *sql.Tx, keyQueryResponseJSON gomatrixserverlib.ServerKeys, serverName gomatrixserverlib.ServerName, validUntil gomatrixserverlib.Timestamp) (NotaryID, error) } -// FederationSenderNotaryServerKeysMetadata persists the metadata for FederationSenderNotaryServerKeysJSON -type FederationSenderNotaryServerKeysMetadata interface { +// FederationNotaryServerKeysMetadata persists the metadata for FederationNotaryServerKeysJSON +type FederationNotaryServerKeysMetadata interface { // UpsertKey updates or inserts a (server_name, key_id) tuple, pointing it via NotaryID at the the response which has the longest valid_until_ts // `newNotaryID` and `newValidUntil` should be the notary ID / valid_until which has this (server_name, key_id) tuple already, e.g one you just inserted. UpsertKey(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyID gomatrixserverlib.KeyID, newNotaryID NotaryID, newValidUntil gomatrixserverlib.Timestamp) (NotaryID, error) // SelectKeys returns the signed JSON objects which contain the given key IDs. This will be at most the length of `keyIDs` and at least 1 (assuming // the keys exist in the first place). If `keyIDs` is empty, the signed JSON object with the longest valid_until_ts will be returned. SelectKeys(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, keyIDs []gomatrixserverlib.KeyID) ([]gomatrixserverlib.ServerKeys, error) - // DeleteOldJSONResponses removes all responses which are not referenced in FederationSenderNotaryServerKeysMetadata + // DeleteOldJSONResponses removes all responses which are not referenced in FederationNotaryServerKeysMetadata DeleteOldJSONResponses(ctx context.Context, txn *sql.Tx) error } + +type FederationServerSigningKeys interface { + BulkSelectServerKeys(ctx context.Context, txn *sql.Tx, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) + UpsertServerKeys(ctx context.Context, txn *sql.Tx, request gomatrixserverlib.PublicKeyLookupRequest, key gomatrixserverlib.PublicKeyLookupResult) error +} diff --git a/federationsender/types/types.go b/federationapi/types/types.go index c486c05c..c486c05c 100644 --- a/federationsender/types/types.go +++ b/federationapi/types/types.go diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go deleted file mode 100644 index 0732c5d3..00000000 --- a/federationsender/federationsender.go +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package federationsender - -import ( - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/federationsender/api" - "github.com/matrix-org/dendrite/federationsender/consumers" - "github.com/matrix-org/dendrite/federationsender/internal" - "github.com/matrix-org/dendrite/federationsender/inthttp" - "github.com/matrix-org/dendrite/federationsender/queue" - "github.com/matrix-org/dendrite/federationsender/statistics" - "github.com/matrix-org/dendrite/federationsender/storage" - roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup" - "github.com/matrix-org/dendrite/setup/kafka" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" -) - -// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions -// on the given input API. -func AddInternalRoutes(router *mux.Router, intAPI api.FederationSenderInternalAPI) { - inthttp.AddRoutes(intAPI, router) -} - -// NewInternalAPI returns a concerete implementation of the internal API. Callers -// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. -func NewInternalAPI( - base *setup.BaseDendrite, - federation *gomatrixserverlib.FederationClient, - rsAPI roomserverAPI.RoomserverInternalAPI, - keyRing *gomatrixserverlib.KeyRing, - resetBlacklist bool, -) api.FederationSenderInternalAPI { - cfg := &base.Cfg.FederationSender - - federationSenderDB, err := storage.NewDatabase(&cfg.Database, base.Caches) - if err != nil { - logrus.WithError(err).Panic("failed to connect to federation sender db") - } - - if resetBlacklist { - _ = federationSenderDB.RemoveAllServersFromBlacklist() - } - - stats := &statistics.Statistics{ - DB: federationSenderDB, - FailuresUntilBlacklist: cfg.FederationMaxRetries, - } - - consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) - - queues := queue.NewOutgoingQueues( - federationSenderDB, base.ProcessContext, - cfg.Matrix.DisableFederation, - cfg.Matrix.ServerName, federation, rsAPI, stats, - &queue.SigningInfo{ - KeyID: cfg.Matrix.KeyID, - PrivateKey: cfg.Matrix.PrivateKey, - ServerName: cfg.Matrix.ServerName, - }, - ) - - rsConsumer := consumers.NewOutputRoomEventConsumer( - base.ProcessContext, cfg, consumer, queues, - federationSenderDB, rsAPI, - ) - if err = rsConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start room server consumer") - } - - tsConsumer := consumers.NewOutputEDUConsumer( - base.ProcessContext, cfg, consumer, queues, federationSenderDB, - ) - if err := tsConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start typing server consumer") - } - keyConsumer := consumers.NewKeyChangeConsumer( - base.ProcessContext, &base.Cfg.KeyServer, consumer, queues, federationSenderDB, rsAPI, - ) - if err := keyConsumer.Start(); err != nil { - logrus.WithError(err).Panic("failed to start key server consumer") - } - - return internal.NewFederationSenderInternalAPI(federationSenderDB, cfg, rsAPI, federation, keyRing, stats, queues) -} diff --git a/internal/caching/cache_federationevents.go b/internal/caching/cache_federationevents.go index a48c11fd..d10b333a 100644 --- a/internal/caching/cache_federationevents.go +++ b/internal/caching/cache_federationevents.go @@ -12,19 +12,19 @@ const ( FederationEventCacheMutable = true // to allow use of Unset only ) -// FederationSenderCache contains the subset of functions needed for +// FederationCache contains the subset of functions needed for // a federation event cache. -type FederationSenderCache interface { - GetFederationSenderQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool) - StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) - EvictFederationSenderQueuedPDU(eventNID int64) +type FederationCache interface { + GetFederationQueuedPDU(eventNID int64) (event *gomatrixserverlib.HeaderedEvent, ok bool) + StoreFederationQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) + EvictFederationQueuedPDU(eventNID int64) - GetFederationSenderQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool) - StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) - EvictFederationSenderQueuedEDU(eventNID int64) + GetFederationQueuedEDU(eventNID int64) (event *gomatrixserverlib.EDU, ok bool) + StoreFederationQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) + EvictFederationQueuedEDU(eventNID int64) } -func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) { +func (c Caches) GetFederationQueuedPDU(eventNID int64) (*gomatrixserverlib.HeaderedEvent, bool) { key := fmt.Sprintf("%d", eventNID) val, found := c.FederationEvents.Get(key) if found && val != nil { @@ -35,17 +35,17 @@ func (c Caches) GetFederationSenderQueuedPDU(eventNID int64) (*gomatrixserverlib return nil, false } -func (c Caches) StoreFederationSenderQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) { +func (c Caches) StoreFederationQueuedPDU(eventNID int64, event *gomatrixserverlib.HeaderedEvent) { key := fmt.Sprintf("%d", eventNID) c.FederationEvents.Set(key, event) } -func (c Caches) EvictFederationSenderQueuedPDU(eventNID int64) { +func (c Caches) EvictFederationQueuedPDU(eventNID int64) { key := fmt.Sprintf("%d", eventNID) c.FederationEvents.Unset(key) } -func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) { +func (c Caches) GetFederationQueuedEDU(eventNID int64) (*gomatrixserverlib.EDU, bool) { key := fmt.Sprintf("%d", eventNID) val, found := c.FederationEvents.Get(key) if found && val != nil { @@ -56,12 +56,12 @@ func (c Caches) GetFederationSenderQueuedEDU(eventNID int64) (*gomatrixserverlib return nil, false } -func (c Caches) StoreFederationSenderQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) { +func (c Caches) StoreFederationQueuedEDU(eventNID int64, event *gomatrixserverlib.EDU) { key := fmt.Sprintf("%d", eventNID) c.FederationEvents.Set(key, event) } -func (c Caches) EvictFederationSenderQueuedEDU(eventNID int64) { +func (c Caches) EvictFederationQueuedEDU(eventNID int64) { key := fmt.Sprintf("%d", eventNID) c.FederationEvents.Unset(key) } diff --git a/internal/httputil/httpapi.go b/internal/httputil/httpapi.go index 704bdecb..1fbd77da 100644 --- a/internal/httputil/httpapi.go +++ b/internal/httputil/httpapi.go @@ -29,7 +29,7 @@ import ( "github.com/getsentry/sentry-go" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/auth" - federationsenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationapiAPI "github.com/matrix-org/dendrite/federationapi/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -251,7 +251,7 @@ func MakeFedAPI( } type FederationWakeups struct { - FsAPI federationsenderAPI.FederationSenderInternalAPI + FsAPI federationapiAPI.FederationInternalAPI origins sync.Map } @@ -263,10 +263,10 @@ func (f *FederationWakeups) Wakeup(ctx context.Context, origin gomatrixserverlib return } } - aliveReq := federationsenderAPI.PerformServersAliveRequest{ + aliveReq := federationapiAPI.PerformServersAliveRequest{ Servers: []gomatrixserverlib.ServerName{origin}, } - aliveRes := federationsenderAPI.PerformServersAliveResponse{} + aliveRes := federationapiAPI.PerformServersAliveResponse{} if err := f.FsAPI.PerformServersAlive(ctx, &aliveReq, &aliveRes); err != nil { util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{ "origin": origin, diff --git a/internal/test/config.go b/internal/test/config.go index 7e68d6d2..6281e7f6 100644 --- a/internal/test/config.go +++ b/internal/test/config.go @@ -88,11 +88,10 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con // the table names are globally unique. But we might not want to // rely on that in the future. cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(database) - cfg.FederationSender.Database.ConnectionString = config.DataSource(database) + cfg.FederationAPI.Database.ConnectionString = config.DataSource(database) cfg.KeyServer.Database.ConnectionString = config.DataSource(database) cfg.MediaAPI.Database.ConnectionString = config.DataSource(database) cfg.RoomServer.Database.ConnectionString = config.DataSource(database) - cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(database) cfg.SyncAPI.Database.ConnectionString = config.DataSource(database) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(database) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(database) @@ -100,22 +99,18 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.AppServiceAPI.InternalAPI.Listen = assignAddress() cfg.EDUServer.InternalAPI.Listen = assignAddress() cfg.FederationAPI.InternalAPI.Listen = assignAddress() - cfg.FederationSender.InternalAPI.Listen = assignAddress() cfg.KeyServer.InternalAPI.Listen = assignAddress() cfg.MediaAPI.InternalAPI.Listen = assignAddress() cfg.RoomServer.InternalAPI.Listen = assignAddress() - cfg.SigningKeyServer.InternalAPI.Listen = assignAddress() cfg.SyncAPI.InternalAPI.Listen = assignAddress() cfg.UserAPI.InternalAPI.Listen = assignAddress() cfg.AppServiceAPI.InternalAPI.Connect = cfg.AppServiceAPI.InternalAPI.Listen cfg.EDUServer.InternalAPI.Connect = cfg.EDUServer.InternalAPI.Listen cfg.FederationAPI.InternalAPI.Connect = cfg.FederationAPI.InternalAPI.Listen - cfg.FederationSender.InternalAPI.Connect = cfg.FederationSender.InternalAPI.Listen cfg.KeyServer.InternalAPI.Connect = cfg.KeyServer.InternalAPI.Listen cfg.MediaAPI.InternalAPI.Connect = cfg.MediaAPI.InternalAPI.Listen cfg.RoomServer.InternalAPI.Connect = cfg.RoomServer.InternalAPI.Listen - cfg.SigningKeyServer.InternalAPI.Connect = cfg.SigningKeyServer.InternalAPI.Listen cfg.SyncAPI.InternalAPI.Connect = cfg.SyncAPI.InternalAPI.Listen cfg.UserAPI.InternalAPI.Connect = cfg.UserAPI.InternalAPI.Listen diff --git a/keyserver/internal/device_list_update.go b/keyserver/internal/device_list_update.go index 1f7c6e2a..1b6e2d42 100644 --- a/keyserver/internal/device_list_update.go +++ b/keyserver/internal/device_list_update.go @@ -22,7 +22,7 @@ import ( "sync" "time" - fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" + fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go index a546e94b..3e91962e 100644 --- a/keyserver/internal/internal.go +++ b/keyserver/internal/internal.go @@ -22,7 +22,7 @@ import ( "sync" "time" - fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" + fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 60306755..477efafd 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -16,14 +16,14 @@ package keyserver import ( "github.com/gorilla/mux" - fedsenderapi "github.com/matrix-org/dendrite/federationsender/api" + fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" "github.com/matrix-org/dendrite/keyserver/storage" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/kafka" "github.com/sirupsen/logrus" @@ -38,7 +38,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - base *setup.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, + base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient, ) api.KeyInternalAPI { consumer, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) diff --git a/roomserver/api/api.go b/roomserver/api/api.go index 72e406ee..1e882ca1 100644 --- a/roomserver/api/api.go +++ b/roomserver/api/api.go @@ -4,15 +4,17 @@ import ( "context" asAPI "github.com/matrix-org/dendrite/appservice/api" - fsAPI "github.com/matrix-org/dendrite/federationsender/api" + fsAPI "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/gomatrixserverlib" ) // RoomserverInputAPI is used to write events to the room server. type RoomserverInternalAPI interface { // needed to avoid chicken and egg scenario when setting up the // interdependencies between the roomserver and other input APIs - SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) + SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) + SetKeyring(keyRing *gomatrixserverlib.KeyRing) InputRoomEvents( ctx context.Context, diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go index 1a2b9a49..cb8c471a 100644 --- a/roomserver/api/api_trace.go +++ b/roomserver/api/api_trace.go @@ -6,7 +6,8 @@ import ( "fmt" asAPI "github.com/matrix-org/dendrite/appservice/api" - fsAPI "github.com/matrix-org/dendrite/federationsender/api" + fsAPI "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" ) @@ -16,8 +17,12 @@ type RoomserverInternalAPITrace struct { Impl RoomserverInternalAPI } -func (t *RoomserverInternalAPITrace) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) { - t.Impl.SetFederationSenderAPI(fsAPI) +func (t *RoomserverInternalAPITrace) SetKeyring(keyRing *gomatrixserverlib.KeyRing) { + t.Impl.SetKeyring(keyRing) +} + +func (t *RoomserverInternalAPITrace) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) { + t.Impl.SetFederationAPI(fsAPI) } func (t *RoomserverInternalAPITrace) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) { diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index f39b26ea..bda585cf 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -6,7 +6,7 @@ import ( "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" asAPI "github.com/matrix-org/dendrite/appservice/api" - fsAPI "github.com/matrix-org/dendrite/federationsender/api" + fsAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" @@ -37,7 +37,7 @@ type RoomserverInternalAPI struct { Cache caching.RoomServerCaches ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier - fsAPI fsAPI.FederationSenderInternalAPI + fsAPI fsAPI.FederationInternalAPI asAPI asAPI.AppServiceQueryAPI OutputRoomEventTopic string // Kafka topic for new output room events PerspectiveServerNames []gomatrixserverlib.ServerName @@ -46,7 +46,7 @@ type RoomserverInternalAPI struct { func NewRoomserverAPI( cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer, outputRoomEventTopic string, caches caching.RoomServerCaches, - keyRing gomatrixserverlib.JSONVerifier, perspectiveServerNames []gomatrixserverlib.ServerName, + perspectiveServerNames []gomatrixserverlib.ServerName, ) *RoomserverInternalAPI { serverACLs := acls.NewServerACLs(roomserverDB) a := &RoomserverInternalAPI{ @@ -55,7 +55,6 @@ func NewRoomserverAPI( Cache: caches, ServerName: cfg.Matrix.ServerName, PerspectiveServerNames: perspectiveServerNames, - KeyRing: keyRing, Queryer: &query.Queryer{ DB: roomserverDB, Cache: caches, @@ -74,11 +73,18 @@ func NewRoomserverAPI( return a } -// SetFederationSenderInputAPI passes in a federation sender input API reference -// so that we can avoid the chicken-and-egg problem of both the roomserver input API -// and the federation sender input API being interdependent. -func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) { +// SetKeyring sets the keyring to a given keyring. This is only useful for the P2P +// demos and must be called after SetFederationSenderInputAPI. +func (r *RoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) { + r.KeyRing = keyRing +} + +// SetFederationInputAPI passes in a federation input API reference so that we can +// avoid the chicken-and-egg problem of both the roomserver input API and the +// federation input API being interdependent. +func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) { r.fsAPI = fsAPI + r.SetKeyring(fsAPI.KeyRing()) r.Inviter = &perform.Inviter{ DB: r.DB, diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 23b1f737..8c2477de 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -18,7 +18,7 @@ import ( "context" "fmt" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/auth" @@ -38,7 +38,7 @@ const maxBackfillServers = 5 type Backfiller struct { ServerName gomatrixserverlib.ServerName DB storage.Database - FSAPI federationSenderAPI.FederationSenderInternalAPI + FSAPI federationAPI.FederationInternalAPI KeyRing gomatrixserverlib.JSONVerifier // The servers which should be preferred above other servers when backfilling @@ -224,7 +224,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom // backfillRequester implements gomatrixserverlib.BackfillRequester type backfillRequester struct { db storage.Database - fsAPI federationSenderAPI.FederationSenderInternalAPI + fsAPI federationAPI.FederationInternalAPI thisServer gomatrixserverlib.ServerName preferServer map[gomatrixserverlib.ServerName]bool bwExtrems map[string][]string @@ -236,7 +236,7 @@ type backfillRequester struct { } func newBackfillRequester( - db storage.Database, fsAPI federationSenderAPI.FederationSenderInternalAPI, thisServer gomatrixserverlib.ServerName, + db storage.Database, fsAPI federationAPI.FederationInternalAPI, thisServer gomatrixserverlib.ServerName, bwExtrems map[string][]string, preferServers []gomatrixserverlib.ServerName, ) *backfillRequester { preferServer := make(map[gomatrixserverlib.ServerName]bool) diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index eb3c9727..98f5f6f9 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -34,7 +34,7 @@ type InboundPeeker struct { } // PerformInboundPeek handles peeking into matrix rooms, including over -// federation by talking to the federationsender. called when a remote server +// federation by talking to the federationapi. called when a remote server // initiates a /peek over federation. // // It should atomically figure out the current state of the room (for the diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go index 024c0930..ca065468 100644 --- a/roomserver/internal/perform/perform_invite.go +++ b/roomserver/internal/perform/perform_invite.go @@ -18,7 +18,7 @@ import ( "context" "fmt" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/input" @@ -33,7 +33,7 @@ import ( type Inviter struct { DB storage.Database Cfg *config.RoomServer - FSAPI federationSenderAPI.FederationSenderInternalAPI + FSAPI federationAPI.FederationInternalAPI Inputer *input.Inputer } @@ -146,12 +146,12 @@ func (r *Inviter) PerformInvite( // that the remote user doesn't exist, in which case we can give up // processing here. if req.SendAsServer != api.DoNotSendToOtherServers && !isTargetLocal { - fsReq := &federationSenderAPI.PerformInviteRequest{ + fsReq := &federationAPI.PerformInviteRequest{ RoomVersion: req.RoomVersion, Event: event, InviteRoomState: inviteState, } - fsRes := &federationSenderAPI.PerformInviteResponse{} + fsRes := &federationAPI.PerformInviteResponse{} if err = r.FSAPI.PerformInvite(ctx, fsReq, fsRes); err != nil { res.Error = &api.PerformError{ Msg: err.Error(), diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go index 772c9d7d..75397eb6 100644 --- a/roomserver/internal/perform/perform_join.go +++ b/roomserver/internal/perform/perform_join.go @@ -22,7 +22,7 @@ import ( "time" "github.com/getsentry/sentry-go" - fsAPI "github.com/matrix-org/dendrite/federationsender/api" + fsAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/eventutil" rsAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" @@ -37,7 +37,7 @@ import ( type Joiner struct { ServerName gomatrixserverlib.ServerName Cfg *config.RoomServer - FSAPI fsAPI.FederationSenderInternalAPI + FSAPI fsAPI.FederationInternalAPI RSAPI rsAPI.RoomserverInternalAPI DB storage.Database @@ -45,7 +45,7 @@ type Joiner struct { Queryer *query.Queryer } -// PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender. +// PerformJoin handles joining matrix rooms, including over federation by talking to the federationapi. func (r *Joiner) PerformJoin( ctx context.Context, req *rsAPI.PerformJoinRequest, diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go index a51de546..4daeb10a 100644 --- a/roomserver/internal/perform/perform_leave.go +++ b/roomserver/internal/perform/perform_leave.go @@ -19,7 +19,7 @@ import ( "fmt" "strings" - fsAPI "github.com/matrix-org/dendrite/federationsender/api" + fsAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/internal/input" @@ -32,7 +32,7 @@ import ( type Leaver struct { Cfg *config.RoomServer DB storage.Database - FSAPI fsAPI.FederationSenderInternalAPI + FSAPI fsAPI.FederationInternalAPI Inputer *input.Inputer } diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go index bd799667..6a2c329b 100644 --- a/roomserver/internal/perform/perform_peek.go +++ b/roomserver/internal/perform/perform_peek.go @@ -20,7 +20,7 @@ import ( "fmt" "strings" - fsAPI "github.com/matrix-org/dendrite/federationsender/api" + fsAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/storage" @@ -33,13 +33,13 @@ import ( type Peeker struct { ServerName gomatrixserverlib.ServerName Cfg *config.RoomServer - FSAPI fsAPI.FederationSenderInternalAPI + FSAPI fsAPI.FederationInternalAPI DB storage.Database Inputer *input.Inputer } -// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationsender. +// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationapi. func (r *Peeker) PerformPeek( ctx context.Context, req *api.PerformPeekRequest, diff --git a/roomserver/internal/perform/perform_unpeek.go b/roomserver/internal/perform/perform_unpeek.go index f71e0007..16b4eeae 100644 --- a/roomserver/internal/perform/perform_unpeek.go +++ b/roomserver/internal/perform/perform_unpeek.go @@ -19,7 +19,7 @@ import ( "fmt" "strings" - fsAPI "github.com/matrix-org/dendrite/federationsender/api" + fsAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/storage" @@ -30,13 +30,13 @@ import ( type Unpeeker struct { ServerName gomatrixserverlib.ServerName Cfg *config.RoomServer - FSAPI fsAPI.FederationSenderInternalAPI + FSAPI fsAPI.FederationInternalAPI DB storage.Database Inputer *input.Inputer } -// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationsender. +// PerformPeek handles peeking into matrix rooms, including over federation by talking to the federationapi. func (r *Unpeeker) PerformUnpeek( ctx context.Context, req *api.PerformUnpeekRequest, diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go index 6774d102..2afeb8b1 100644 --- a/roomserver/inthttp/client.go +++ b/roomserver/inthttp/client.go @@ -7,10 +7,11 @@ import ( "net/http" asAPI "github.com/matrix-org/dendrite/appservice/api" - fsInputAPI "github.com/matrix-org/dendrite/federationsender/api" + fsInputAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/gomatrixserverlib" "github.com/opentracing/opentracing-go" ) @@ -82,8 +83,12 @@ func NewRoomserverClient( }, nil } -// SetFederationSenderInputAPI no-ops in HTTP client mode as there is no chicken/egg scenario -func (h *httpRoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsInputAPI.FederationSenderInternalAPI) { +// SetKeyring no-ops in HTTP client mode as there is no chicken/egg scenario +func (h *httpRoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) { +} + +// SetFederationInputAPI no-ops in HTTP client mode as there is no chicken/egg scenario +func (h *httpRoomserverInternalAPI) SetFederationAPI(fsAPI fsInputAPI.FederationInternalAPI) { } // SetAppserviceAPI no-ops in HTTP client mode as there is no chicken/egg scenario diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index 396a1def..e4742100 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -22,7 +22,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/kafka" "github.com/sirupsen/logrus" @@ -37,15 +37,14 @@ func AddInternalRoutes(router *mux.Router, intAPI api.RoomserverInternalAPI) { // NewInternalAPI returns a concerete implementation of the internal API. Callers // can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. func NewInternalAPI( - base *setup.BaseDendrite, - keyRing gomatrixserverlib.JSONVerifier, + base *base.BaseDendrite, ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) var perspectiveServerNames []gomatrixserverlib.ServerName - for _, kp := range base.Cfg.SigningKeyServer.KeyPerspectives { + for _, kp := range base.Cfg.FederationAPI.KeyPerspectives { perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) } @@ -56,6 +55,6 @@ func NewInternalAPI( return internal.NewRoomserverAPI( cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), - base.Caches, keyRing, perspectiveServerNames, + base.Caches, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 5c954007..8d92d8d8 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -13,11 +13,10 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/internal/test" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -172,7 +171,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro if err != nil { t.Fatalf("failed to make caches: %s", err) } - base := &setup.BaseDendrite{ + base := &base.BaseDendrite{ Caches: cache, Cfg: cfg, } @@ -182,7 +181,7 @@ func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyPro } return internal.NewRoomserverAPI( &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)), - base.Caches, &test.NopJSONVerifier{}, nil, + base.Caches, nil, ), dp } diff --git a/setup/base.go b/setup/base/base.go index d4acdbfb..9ba88bef 100644 --- a/setup/base.go +++ b/setup/base/base.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package setup +package base import ( "context" @@ -47,15 +47,13 @@ import ( asinthttp "github.com/matrix-org/dendrite/appservice/inthttp" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" - fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" + federationIntHTTP "github.com/matrix-org/dendrite/federationapi/inthttp" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" keyinthttp "github.com/matrix-org/dendrite/keyserver/inthttp" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp" "github.com/matrix-org/dendrite/setup/config" - skapi "github.com/matrix-org/dendrite/signingkeyserver/api" - skinthttp "github.com/matrix-org/dendrite/signingkeyserver/inthttp" userapi "github.com/matrix-org/dendrite/userapi/api" userapiinthttp "github.com/matrix-org/dendrite/userapi/inthttp" "github.com/sirupsen/logrus" @@ -89,15 +87,33 @@ type BaseDendrite struct { // KafkaProducer sarama.SyncProducer } +const NoListener = "" + const HTTPServerTimeout = time.Minute * 5 const HTTPClientTimeout = time.Second * 30 -const NoListener = "" +type BaseDendriteOptions int + +const ( + NoCacheMetrics BaseDendriteOptions = iota + UseHTTPAPIs +) // NewBaseDendrite creates a new instance to be used by a component. // The componentName is used for logging purposes, and should be a friendly name // of the compontent running, e.g. "SyncAPI" -func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs bool) *BaseDendrite { +func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...BaseDendriteOptions) *BaseDendrite { + useHTTPAPIs := false + cacheMetrics := true + for _, opt := range options { + switch opt { + case NoCacheMetrics: + cacheMetrics = false + case UseHTTPAPIs: + useHTTPAPIs = true + } + } + configErrors := &config.ConfigErrors{} cfg.Verify(configErrors, componentName == "Monolith") // TODO: better way? if len(*configErrors) > 0 { @@ -133,7 +149,7 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo } } - cache, err := caching.NewInMemoryLRUCache(true) + cache, err := caching.NewInMemoryLRUCache(cacheMetrics) if err != nil { logrus.WithError(err).Warnf("Failed to create cache") } @@ -168,10 +184,10 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, useHTTPAPIs boo }, } client := http.Client{Timeout: HTTPClientTimeout} - if cfg.FederationSender.Proxy.Enabled { + if cfg.FederationAPI.Proxy.Enabled { client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{ - Scheme: cfg.FederationSender.Proxy.Protocol, - Host: fmt.Sprintf("%s:%d", cfg.FederationSender.Proxy.Host, cfg.FederationSender.Proxy.Port), + Scheme: cfg.FederationAPI.Proxy.Protocol, + Host: fmt.Sprintf("%s:%d", cfg.FederationAPI.Proxy.Host, cfg.FederationAPI.Proxy.Port), })} } @@ -248,25 +264,12 @@ func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI { return e } -// FederationSenderHTTPClient returns FederationSenderInternalAPI for hitting -// the federation sender over HTTP -func (b *BaseDendrite) FederationSenderHTTPClient() federationSenderAPI.FederationSenderInternalAPI { - f, err := fsinthttp.NewFederationSenderClient(b.Cfg.FederationSenderURL(), b.apiHttpClient) - if err != nil { - logrus.WithError(err).Panic("FederationSenderHTTPClient failed", b.apiHttpClient) - } - return f -} - -// SigningKeyServerHTTPClient returns SigningKeyServer for hitting the signing key server over HTTP -func (b *BaseDendrite) SigningKeyServerHTTPClient() skapi.SigningKeyServerAPI { - f, err := skinthttp.NewSigningKeyServerClient( - b.Cfg.SigningKeyServerURL(), - b.apiHttpClient, - b.Caches, - ) +// FederationAPIHTTPClient returns FederationInternalAPI for hitting +// the federation API server over HTTP +func (b *BaseDendrite) FederationAPIHTTPClient() federationAPI.FederationInternalAPI { + f, err := federationIntHTTP.NewFederationAPIClient(b.Cfg.FederationAPIURL(), b.apiHttpClient, b.Caches) if err != nil { - logrus.WithError(err).Panic("SigningKeyServerHTTPClient failed", b.httpClient) + logrus.WithError(err).Panic("FederationAPIHTTPClient failed", b.apiHttpClient) } return f } @@ -300,7 +303,7 @@ func (b *BaseDendrite) CreateClient() *gomatrixserverlib.Client { ) } opts := []gomatrixserverlib.ClientOption{ - gomatrixserverlib.WithSkipVerify(b.Cfg.FederationSender.DisableTLSValidation), + gomatrixserverlib.WithSkipVerify(b.Cfg.FederationAPI.DisableTLSValidation), } if b.Cfg.Global.DNSCache.Enabled { opts = append(opts, gomatrixserverlib.WithDNSCache(b.DNSCache)) @@ -321,7 +324,7 @@ func (b *BaseDendrite) CreateFederationClient() *gomatrixserverlib.FederationCli } opts := []gomatrixserverlib.ClientOption{ gomatrixserverlib.WithTimeout(time.Minute * 5), - gomatrixserverlib.WithSkipVerify(b.Cfg.FederationSender.DisableTLSValidation), + gomatrixserverlib.WithSkipVerify(b.Cfg.FederationAPI.DisableTLSValidation), } if b.Cfg.Global.DNSCache.Enabled { opts = append(opts, gomatrixserverlib.WithDNSCache(b.DNSCache)) diff --git a/setup/federation.go b/setup/base/federation.go index 7e9a22b3..804b74e5 100644 --- a/setup/federation.go +++ b/setup/base/federation.go @@ -1,4 +1,4 @@ -package setup +package base import ( "context" diff --git a/setup/config/config.go b/setup/config/config.go index b9114407..44a67c2a 100644 --- a/setup/config/config.go +++ b/setup/config/config.go @@ -53,18 +53,16 @@ type Dendrite struct { // been a breaking change to the config file format. Version int `yaml:"version"` - Global Global `yaml:"global"` - AppServiceAPI AppServiceAPI `yaml:"app_service_api"` - ClientAPI ClientAPI `yaml:"client_api"` - EDUServer EDUServer `yaml:"edu_server"` - FederationAPI FederationAPI `yaml:"federation_api"` - FederationSender FederationSender `yaml:"federation_sender"` - KeyServer KeyServer `yaml:"key_server"` - MediaAPI MediaAPI `yaml:"media_api"` - RoomServer RoomServer `yaml:"room_server"` - SigningKeyServer SigningKeyServer `yaml:"signing_key_server"` - SyncAPI SyncAPI `yaml:"sync_api"` - UserAPI UserAPI `yaml:"user_api"` + Global Global `yaml:"global"` + AppServiceAPI AppServiceAPI `yaml:"app_service_api"` + ClientAPI ClientAPI `yaml:"client_api"` + EDUServer EDUServer `yaml:"edu_server"` + FederationAPI FederationAPI `yaml:"federation_api"` + KeyServer KeyServer `yaml:"key_server"` + MediaAPI MediaAPI `yaml:"media_api"` + RoomServer RoomServer `yaml:"room_server"` + SyncAPI SyncAPI `yaml:"sync_api"` + UserAPI UserAPI `yaml:"user_api"` MSCs MSCs `yaml:"mscs"` @@ -300,11 +298,9 @@ func (c *Dendrite) Defaults() { c.ClientAPI.Defaults() c.EDUServer.Defaults() c.FederationAPI.Defaults() - c.FederationSender.Defaults() c.KeyServer.Defaults() c.MediaAPI.Defaults() c.RoomServer.Defaults() - c.SigningKeyServer.Defaults() c.SyncAPI.Defaults() c.UserAPI.Defaults() c.AppServiceAPI.Defaults() @@ -319,9 +315,9 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) { } for _, c := range []verifiable{ &c.Global, &c.ClientAPI, - &c.EDUServer, &c.FederationAPI, &c.FederationSender, + &c.EDUServer, &c.FederationAPI, &c.KeyServer, &c.MediaAPI, &c.RoomServer, - &c.SigningKeyServer, &c.SyncAPI, &c.UserAPI, + &c.SyncAPI, &c.UserAPI, &c.AppServiceAPI, &c.MSCs, } { c.Verify(configErrs, isMonolith) @@ -332,11 +328,9 @@ func (c *Dendrite) Wiring() { c.ClientAPI.Matrix = &c.Global c.EDUServer.Matrix = &c.Global c.FederationAPI.Matrix = &c.Global - c.FederationSender.Matrix = &c.Global c.KeyServer.Matrix = &c.Global c.MediaAPI.Matrix = &c.Global c.RoomServer.Matrix = &c.Global - c.SigningKeyServer.Matrix = &c.Global c.SyncAPI.Matrix = &c.Global c.UserAPI.Matrix = &c.Global c.AppServiceAPI.Matrix = &c.Global @@ -493,6 +487,15 @@ func (config *Dendrite) AppServiceURL() string { return string(config.AppServiceAPI.InternalAPI.Connect) } +// FederationAPIURL returns an HTTP URL for where the federation API is listening. +func (config *Dendrite) FederationAPIURL() string { + // Hard code the federationapi to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.FederationAPI.InternalAPI.Connect) +} + // RoomServerURL returns an HTTP URL for where the roomserver is listening. func (config *Dendrite) RoomServerURL() string { // Hard code the roomserver to talk HTTP for now. @@ -520,24 +523,6 @@ func (config *Dendrite) EDUServerURL() string { return string(config.EDUServer.InternalAPI.Connect) } -// FederationSenderURL returns an HTTP URL for where the federation sender is listening. -func (config *Dendrite) FederationSenderURL() string { - // Hard code the federation sender server to talk HTTP for now. - // If we support HTTPS we need to think of a practical way to do certificate validation. - // People setting up servers shouldn't need to get a certificate valid for the public - // internet for an internal API. - return string(config.FederationSender.InternalAPI.Connect) -} - -// SigningKeyServerURL returns an HTTP URL for where the signing key server is listening. -func (config *Dendrite) SigningKeyServerURL() string { - // Hard code the signing key server to talk HTTP for now. - // If we support HTTPS we need to think of a practical way to do certificate validation. - // People setting up servers shouldn't need to get a certificate valid for the public - // internet for an internal API. - return string(config.SigningKeyServer.InternalAPI.Connect) -} - // KeyServerURL returns an HTTP URL for where the key server is listening. func (config *Dendrite) KeyServerURL() string { // Hard code the key server to talk HTTP for now. diff --git a/setup/config/config_federationapi.go b/setup/config/config_federationapi.go index 64803d95..cd5015fd 100644 --- a/setup/config/config_federationapi.go +++ b/setup/config/config_federationapi.go @@ -1,23 +1,55 @@ package config +import "github.com/matrix-org/gomatrixserverlib" + type FederationAPI struct { Matrix *Global `yaml:"-"` InternalAPI InternalAPIOptions `yaml:"internal_api"` ExternalAPI ExternalAPIOptions `yaml:"external_api"` + // The database stores information used by the federation destination queues to + // send transactions to remote servers. + Database DatabaseOptions `yaml:"database"` + // List of paths to X509 certificates used by the external federation listeners. // These are used to calculate the TLS fingerprints to publish for this server. // Other matrix servers talking to this server will expect the x509 certificate // to match one of these certificates. // The certificates should be in PEM format. FederationCertificatePaths []Path `yaml:"federation_certificates"` + + // Federation failure threshold. How many consecutive failures that we should + // tolerate when sending federation requests to a specific server. The backoff + // is 2**x seconds, so 1 = 2 seconds, 2 = 4 seconds, 3 = 8 seconds, etc. + // The default value is 16 if not specified, which is circa 18 hours. + FederationMaxRetries uint32 `yaml:"send_max_retries"` + + // FederationDisableTLSValidation disables the validation of X.509 TLS certs + // on remote federation endpoints. This is not recommended in production! + DisableTLSValidation bool `yaml:"disable_tls_validation"` + + Proxy Proxy `yaml:"proxy_outbound"` + + // Perspective keyservers, to use as a backup when direct key fetch + // requests don't succeed + KeyPerspectives KeyPerspectives `yaml:"key_perspectives"` + + // Should we prefer direct key fetches over perspective ones? + PreferDirectFetch bool `yaml:"prefer_direct_fetch"` } func (c *FederationAPI) Defaults() { c.InternalAPI.Listen = "http://localhost:7772" c.InternalAPI.Connect = "http://localhost:7772" c.ExternalAPI.Listen = "http://[::]:8072" + c.Database.Defaults(10) + c.Database.ConnectionString = "file:federationapi.db" + + c.FederationMaxRetries = 16 + c.DisableTLSValidation = false + + c.Proxy.Defaults() } func (c *FederationAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { @@ -26,6 +58,48 @@ func (c *FederationAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { if !isMonolith { checkURL(configErrs, "federation_api.external_api.listen", string(c.ExternalAPI.Listen)) } + checkNotEmpty(configErrs, "federation_api.database.connection_string", string(c.Database.ConnectionString)) // TODO: not applicable always, e.g. in demos //checkNotZero(configErrs, "federation_api.federation_certificates", int64(len(c.FederationCertificatePaths))) } + +// The config for setting a proxy to use for server->server requests +type Proxy struct { + // Is the proxy enabled? + Enabled bool `yaml:"enabled"` + // The protocol for the proxy (http / https / socks5) + Protocol string `yaml:"protocol"` + // The host where the proxy is listening + Host string `yaml:"host"` + // The port on which the proxy is listening + Port uint16 `yaml:"port"` +} + +func (c *Proxy) Defaults() { + c.Enabled = false + c.Protocol = "http" + c.Host = "localhost" + c.Port = 8080 +} + +func (c *Proxy) Verify(configErrs *ConfigErrors) { +} + +// KeyPerspectives are used to configure perspective key servers for +// retrieving server keys. +type KeyPerspectives []KeyPerspective + +type KeyPerspective struct { + // The server name of the perspective key server + ServerName gomatrixserverlib.ServerName `yaml:"server_name"` + // Server keys for the perspective user, used to verify the + // keys have been signed by the perspective server + Keys []KeyPerspectiveTrustKey `yaml:"keys"` +} + +type KeyPerspectiveTrustKey struct { + // The key ID, e.g. ed25519:auto + KeyID gomatrixserverlib.KeyID `yaml:"key_id"` + // The public key in base64 unpadded format + PublicKey string `yaml:"public_key"` +} diff --git a/setup/config/config_federationsender.go b/setup/config/config_federationsender.go deleted file mode 100644 index 67ee6356..00000000 --- a/setup/config/config_federationsender.go +++ /dev/null @@ -1,63 +0,0 @@ -package config - -type FederationSender struct { - Matrix *Global `yaml:"-"` - - InternalAPI InternalAPIOptions `yaml:"internal_api"` - - // The FederationSender database stores information used by the FederationSender - // It is only accessed by the FederationSender. - Database DatabaseOptions `yaml:"database"` - - // Federation failure threshold. How many consecutive failures that we should - // tolerate when sending federation requests to a specific server. The backoff - // is 2**x seconds, so 1 = 2 seconds, 2 = 4 seconds, 3 = 8 seconds, etc. - // The default value is 16 if not specified, which is circa 18 hours. - FederationMaxRetries uint32 `yaml:"send_max_retries"` - - // FederationDisableTLSValidation disables the validation of X.509 TLS certs - // on remote federation endpoints. This is not recommended in production! - DisableTLSValidation bool `yaml:"disable_tls_validation"` - - Proxy Proxy `yaml:"proxy_outbound"` -} - -func (c *FederationSender) Defaults() { - c.InternalAPI.Listen = "http://localhost:7775" - c.InternalAPI.Connect = "http://localhost:7775" - c.Database.Defaults(10) - c.Database.ConnectionString = "file:federationsender.db" - - c.FederationMaxRetries = 16 - c.DisableTLSValidation = false - - c.Proxy.Defaults() -} - -func (c *FederationSender) Verify(configErrs *ConfigErrors, isMonolith bool) { - checkURL(configErrs, "federation_sender.internal_api.listen", string(c.InternalAPI.Listen)) - checkURL(configErrs, "federation_sender.internal_api.connect", string(c.InternalAPI.Connect)) - checkNotEmpty(configErrs, "federation_sender.database.connection_string", string(c.Database.ConnectionString)) -} - -// The config for setting a proxy to use for server->server requests -type Proxy struct { - // Is the proxy enabled? - Enabled bool `yaml:"enabled"` - // The protocol for the proxy (http / https / socks5) - Protocol string `yaml:"protocol"` - // The host where the proxy is listening - Host string `yaml:"host"` - // The port on which the proxy is listening - Port uint16 `yaml:"port"` -} - -func (c *Proxy) Defaults() { - c.Enabled = false - c.Protocol = "http" - c.Host = "localhost" - c.Port = 8080 -} - -func (c *Proxy) Verify(configErrs *ConfigErrors) { -} diff --git a/setup/config/config_signingkeyserver.go b/setup/config/config_signingkeyserver.go deleted file mode 100644 index c192d1fc..00000000 --- a/setup/config/config_signingkeyserver.go +++ /dev/null @@ -1,52 +0,0 @@ -package config - -import "github.com/matrix-org/gomatrixserverlib" - -type SigningKeyServer struct { - Matrix *Global `yaml:"-"` - - InternalAPI InternalAPIOptions `yaml:"internal_api"` - - // The SigningKeyServer database caches the public keys of remote servers. - // It may be accessed by the FederationAPI, the ClientAPI, and the MediaAPI. - Database DatabaseOptions `yaml:"database"` - - // Perspective keyservers, to use as a backup when direct key fetch - // requests don't succeed - KeyPerspectives KeyPerspectives `yaml:"key_perspectives"` - - // Should we prefer direct key fetches over perspective ones? - PreferDirectFetch bool `yaml:"prefer_direct_fetch"` -} - -func (c *SigningKeyServer) Defaults() { - c.InternalAPI.Listen = "http://localhost:7780" - c.InternalAPI.Connect = "http://localhost:7780" - c.Database.Defaults(10) - c.Database.ConnectionString = "file:signingkeyserver.db" -} - -func (c *SigningKeyServer) Verify(configErrs *ConfigErrors, isMonolith bool) { - checkURL(configErrs, "signing_key_server.internal_api.listen", string(c.InternalAPI.Listen)) - checkURL(configErrs, "signing_key_server.internal_api.bind", string(c.InternalAPI.Connect)) - checkNotEmpty(configErrs, "signing_key_server.database.connection_string", string(c.Database.ConnectionString)) -} - -// KeyPerspectives are used to configure perspective key servers for -// retrieving server keys. -type KeyPerspectives []KeyPerspective - -type KeyPerspective struct { - // The server name of the perspective key server - ServerName gomatrixserverlib.ServerName `yaml:"server_name"` - // Server keys for the perspective user, used to verify the - // keys have been signed by the perspective server - Keys []KeyPerspectiveTrustKey `yaml:"keys"` -} - -type KeyPerspectiveTrustKey struct { - // The key ID, e.g. ed25519:auto - KeyID gomatrixserverlib.KeyID `yaml:"key_id"` - // The public key in base64 unpadded format - PublicKey string `yaml:"public_key"` -} diff --git a/setup/config/config_test.go b/setup/config/config_test.go index 5c51a363..ffe9edab 100644 --- a/setup/config/config_test.go +++ b/setup/config/config_test.go @@ -112,7 +112,7 @@ federation_sender: listen: http://localhost:7775 connect: http://localhost:7775 database: - connection_string: file:federationsender.db + connection_string: file:federationapi.db max_open_conns: 100 max_idle_conns: 2 conn_max_lifetime: -1 diff --git a/setup/monolith.go b/setup/monolith.go index a77cdd56..b076e990 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -21,14 +21,13 @@ import ( "github.com/matrix-org/dendrite/clientapi/api" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi" - federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" + federationAPI "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/transactions" keyAPI "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/mediaapi" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" - serverKeyAPI "github.com/matrix-org/dendrite/signingkeyserver/api" "github.com/matrix-org/dendrite/syncapi" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/accounts" @@ -44,13 +43,12 @@ type Monolith struct { Client *gomatrixserverlib.Client FedClient *gomatrixserverlib.FederationClient - AppserviceAPI appserviceAPI.AppServiceQueryAPI - EDUInternalAPI eduServerAPI.EDUServerInputAPI - FederationSenderAPI federationSenderAPI.FederationSenderInternalAPI - RoomserverAPI roomserverAPI.RoomserverInternalAPI - ServerKeyAPI serverKeyAPI.SigningKeyServerAPI - UserAPI userapi.UserInternalAPI - KeyAPI keyAPI.KeyInternalAPI + AppserviceAPI appserviceAPI.AppServiceQueryAPI + EDUInternalAPI eduServerAPI.EDUServerInputAPI + FederationAPI federationAPI.FederationInternalAPI + RoomserverAPI roomserverAPI.RoomserverInternalAPI + UserAPI userapi.UserInternalAPI + KeyAPI keyAPI.KeyInternalAPI // Optional ExtPublicRoomsProvider api.ExtraPublicRoomsProvider @@ -62,12 +60,12 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss csMux, synapseMux, &m.Config.ClientAPI, m.AccountDB, m.FedClient, m.RoomserverAPI, m.EDUInternalAPI, m.AppserviceAPI, transactions.New(), - m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, + m.FederationAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider, &m.Config.MSCs, ) federationapi.AddPublicRoutes( ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient, - m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI, + m.KeyRing, m.RoomserverAPI, m.FederationAPI, m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil, ) mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client) diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go index a538299d..7e2ecfb9 100644 --- a/setup/mscs/msc2836/msc2836.go +++ b/setup/mscs/msc2836/msc2836.go @@ -28,11 +28,11 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" - fs "github.com/matrix-org/dendrite/federationsender/api" + fs "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/internal/httputil" roomserver "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -93,7 +93,7 @@ func toClientResponse(res *gomatrixserverlib.MSC2836EventRelationshipsResponse) // Enable this MSC func Enable( - base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, + base *base.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier, ) error { db, err := NewDatabase(&base.Cfg.MSCs.Database) @@ -148,10 +148,10 @@ type reqCtx struct { // federated request args isFederatedRequest bool serverName gomatrixserverlib.ServerName - fsAPI fs.FederationSenderInternalAPI + fsAPI fs.FederationInternalAPI } -func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse { +func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI) func(*http.Request, *userapi.Device) util.JSONResponse { return func(req *http.Request, device *userapi.Device) util.JSONResponse { relation, err := NewEventRelationshipRequest(req.Body) if err != nil { @@ -183,7 +183,7 @@ func eventRelationshipHandler(db Database, rsAPI roomserver.RoomserverInternalAP } func federatedEventRelationship( - ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, + ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, ) util.JSONResponse { relation, err := NewEventRelationshipRequest(bytes.NewBuffer(fedReq.Content())) if err != nil { diff --git a/setup/mscs/msc2836/msc2836_test.go b/setup/mscs/msc2836/msc2836_test.go index 51fde691..5572d107 100644 --- a/setup/mscs/msc2836/msc2836_test.go +++ b/setup/mscs/msc2836/msc2836_test.go @@ -19,7 +19,7 @@ import ( "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/internal/httputil" roomserver "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/mscs/msc2836" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -549,7 +549,7 @@ func injectEvents(t *testing.T, userAPI userapi.UserInternalAPI, rsAPI roomserve cfg.Global.ServerName = "localhost" cfg.MSCs.Database.ConnectionString = "file:msc2836_test.db" cfg.MSCs.MSCs = []string{"msc2836"} - base := &setup.BaseDendrite{ + base := &base.BaseDendrite{ Cfg: cfg, PublicClientAPIMux: mux.NewRouter().PathPrefix(httputil.PublicClientPathPrefix).Subrouter(), PublicFederationAPIMux: mux.NewRouter().PathPrefix(httputil.PublicFederationPathPrefix).Subrouter(), diff --git a/setup/mscs/msc2946/msc2946.go b/setup/mscs/msc2946/msc2946.go index 121a73fd..3824c99a 100644 --- a/setup/mscs/msc2946/msc2946.go +++ b/setup/mscs/msc2946/msc2946.go @@ -27,11 +27,11 @@ import ( "github.com/gorilla/mux" chttputil "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - fs "github.com/matrix-org/dendrite/federationsender/api" + fs "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/internal/httputil" roomserver "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -52,8 +52,8 @@ func Defaults(r *gomatrixserverlib.MSC2946SpacesRequest) { // Enable this MSC func Enable( - base *setup.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, userAPI userapi.UserInternalAPI, - fsAPI fs.FederationSenderInternalAPI, keyRing gomatrixserverlib.JSONVerifier, + base *base.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, userAPI userapi.UserInternalAPI, + fsAPI fs.FederationInternalAPI, keyRing gomatrixserverlib.JSONVerifier, ) error { db, err := NewDatabase(&base.Cfg.MSCs.Database) if err != nil { @@ -96,7 +96,7 @@ func Enable( func federatedSpacesHandler( ctx context.Context, fedReq *gomatrixserverlib.FederationRequest, roomID string, db Database, - rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, + rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, thisServer gomatrixserverlib.ServerName, ) util.JSONResponse { inMemoryBatchCache := make(map[string]set) @@ -128,7 +128,7 @@ func federatedSpacesHandler( } func spacesHandler( - db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationSenderInternalAPI, + db Database, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI, thisServer gomatrixserverlib.ServerName, ) func(*http.Request, *userapi.Device) util.JSONResponse { return func(req *http.Request, device *userapi.Device) util.JSONResponse { @@ -172,7 +172,7 @@ type walker struct { thisServer gomatrixserverlib.ServerName db Database rsAPI roomserver.RoomserverInternalAPI - fsAPI fs.FederationSenderInternalAPI + fsAPI fs.FederationInternalAPI ctx context.Context // user ID|device ID|batch_num => event/room IDs sent to client diff --git a/setup/mscs/msc2946/msc2946_test.go b/setup/mscs/msc2946/msc2946_test.go index c362d9fb..43d60557 100644 --- a/setup/mscs/msc2946/msc2946_test.go +++ b/setup/mscs/msc2946/msc2946_test.go @@ -30,7 +30,7 @@ import ( "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/internal/httputil" roomserver "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/mscs/msc2946" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -415,7 +415,7 @@ func injectEvents(t *testing.T, userAPI userapi.UserInternalAPI, rsAPI roomserve cfg.Global.ServerName = "localhost" cfg.MSCs.Database.ConnectionString = "file:msc2946_test.db" cfg.MSCs.MSCs = []string{"msc2946"} - base := &setup.BaseDendrite{ + base := &base.BaseDendrite{ Cfg: cfg, PublicClientAPIMux: mux.NewRouter().PathPrefix(httputil.PublicClientPathPrefix).Subrouter(), PublicFederationAPIMux: mux.NewRouter().PathPrefix(httputil.PublicFederationPathPrefix).Subrouter(), diff --git a/setup/mscs/mscs.go b/setup/mscs/mscs.go index da02956b..10df6a15 100644 --- a/setup/mscs/mscs.go +++ b/setup/mscs/mscs.go @@ -20,13 +20,14 @@ import ( "fmt" "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/mscs/msc2836" "github.com/matrix-org/dendrite/setup/mscs/msc2946" "github.com/matrix-org/util" ) // Enable MSCs - returns an error on unknown MSCs -func Enable(base *setup.BaseDendrite, monolith *setup.Monolith) error { +func Enable(base *base.BaseDendrite, monolith *setup.Monolith) error { for _, msc := range base.Cfg.MSCs.MSCs { util.GetLogger(context.Background()).WithField("msc", msc).Info("Enabling MSC") if err := EnableMSC(base, monolith, msc); err != nil { @@ -36,12 +37,12 @@ func Enable(base *setup.BaseDendrite, monolith *setup.Monolith) error { return nil } -func EnableMSC(base *setup.BaseDendrite, monolith *setup.Monolith, msc string) error { +func EnableMSC(base *base.BaseDendrite, monolith *setup.Monolith, msc string) error { switch msc { case "msc2836": - return msc2836.Enable(base, monolith.RoomserverAPI, monolith.FederationSenderAPI, monolith.UserAPI, monolith.KeyRing) + return msc2836.Enable(base, monolith.RoomserverAPI, monolith.FederationAPI, monolith.UserAPI, monolith.KeyRing) case "msc2946": - return msc2946.Enable(base, monolith.RoomserverAPI, monolith.UserAPI, monolith.FederationSenderAPI, monolith.KeyRing) + return msc2946.Enable(base, monolith.RoomserverAPI, monolith.UserAPI, monolith.FederationAPI, monolith.KeyRing) case "msc2444": // enabled inside federationapi case "msc2753": // enabled inside clientapi default: diff --git a/signingkeyserver/api/api.go b/signingkeyserver/api/api.go deleted file mode 100644 index f053d72e..00000000 --- a/signingkeyserver/api/api.go +++ /dev/null @@ -1,40 +0,0 @@ -package api - -import ( - "context" - - "github.com/matrix-org/gomatrixserverlib" -) - -type SigningKeyServerAPI interface { - gomatrixserverlib.KeyDatabase - - KeyRing() *gomatrixserverlib.KeyRing - - InputPublicKeys( - ctx context.Context, - request *InputPublicKeysRequest, - response *InputPublicKeysResponse, - ) error - - QueryPublicKeys( - ctx context.Context, - request *QueryPublicKeysRequest, - response *QueryPublicKeysResponse, - ) error -} - -type QueryPublicKeysRequest struct { - Requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp `json:"requests"` -} - -type QueryPublicKeysResponse struct { - Results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"results"` -} - -type InputPublicKeysRequest struct { - Keys map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult `json:"keys"` -} - -type InputPublicKeysResponse struct { -} diff --git a/signingkeyserver/inthttp/client.go b/signingkeyserver/inthttp/client.go deleted file mode 100644 index 71e40b8f..00000000 --- a/signingkeyserver/inthttp/client.go +++ /dev/null @@ -1,132 +0,0 @@ -package inthttp - -import ( - "context" - "errors" - "net/http" - - "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/signingkeyserver/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/opentracing/opentracing-go" -) - -// HTTP paths for the internal HTTP APIs -const ( - ServerKeyInputPublicKeyPath = "/signingkeyserver/inputPublicKey" - ServerKeyQueryPublicKeyPath = "/signingkeyserver/queryPublicKey" -) - -// NewSigningKeyServerClient creates a SigningKeyServerAPI implemented by talking to a HTTP POST API. -// If httpClient is nil an error is returned -func NewSigningKeyServerClient( - serverKeyAPIURL string, - httpClient *http.Client, - cache caching.ServerKeyCache, -) (api.SigningKeyServerAPI, error) { - if httpClient == nil { - return nil, errors.New("NewSigningKeyServerClient: httpClient is <nil>") - } - return &httpServerKeyInternalAPI{ - serverKeyAPIURL: serverKeyAPIURL, - httpClient: httpClient, - cache: cache, - }, nil -} - -type httpServerKeyInternalAPI struct { - serverKeyAPIURL string - httpClient *http.Client - cache caching.ServerKeyCache -} - -func (s *httpServerKeyInternalAPI) KeyRing() *gomatrixserverlib.KeyRing { - // This is a bit of a cheat - we tell gomatrixserverlib that this API is - // both the key database and the key fetcher. While this does have the - // rather unfortunate effect of preventing gomatrixserverlib from handling - // key fetchers directly, we can at least reimplement this behaviour on - // the other end of the API. - return &gomatrixserverlib.KeyRing{ - KeyDatabase: s, - KeyFetchers: []gomatrixserverlib.KeyFetcher{}, - } -} - -func (s *httpServerKeyInternalAPI) FetcherName() string { - return "httpServerKeyInternalAPI" -} - -func (s *httpServerKeyInternalAPI) StoreKeys( - _ context.Context, - results map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, -) error { - // Run in a background context - we don't want to stop this work just - // because the caller gives up waiting. - ctx := context.Background() - request := api.InputPublicKeysRequest{ - Keys: make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult), - } - response := api.InputPublicKeysResponse{} - for req, res := range results { - request.Keys[req] = res - s.cache.StoreServerKey(req, res) - } - return s.InputPublicKeys(ctx, &request, &response) -} - -func (s *httpServerKeyInternalAPI) FetchKeys( - _ context.Context, - requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, -) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { - // Run in a background context - we don't want to stop this work just - // because the caller gives up waiting. - ctx := context.Background() - result := make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) - request := api.QueryPublicKeysRequest{ - Requests: make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp), - } - response := api.QueryPublicKeysResponse{ - Results: make(map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult), - } - for req, ts := range requests { - if res, ok := s.cache.GetServerKey(req, ts); ok { - result[req] = res - continue - } - request.Requests[req] = ts - } - err := s.QueryPublicKeys(ctx, &request, &response) - if err != nil { - return nil, err - } - for req, res := range response.Results { - result[req] = res - s.cache.StoreServerKey(req, res) - } - return result, nil -} - -func (h *httpServerKeyInternalAPI) InputPublicKeys( - ctx context.Context, - request *api.InputPublicKeysRequest, - response *api.InputPublicKeysResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "InputPublicKey") - defer span.Finish() - - apiURL := h.serverKeyAPIURL + ServerKeyInputPublicKeyPath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} - -func (h *httpServerKeyInternalAPI) QueryPublicKeys( - ctx context.Context, - request *api.QueryPublicKeysRequest, - response *api.QueryPublicKeysResponse, -) error { - span, ctx := opentracing.StartSpanFromContext(ctx, "QueryPublicKey") - defer span.Finish() - - apiURL := h.serverKeyAPIURL + ServerKeyQueryPublicKeyPath - return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) -} diff --git a/signingkeyserver/inthttp/server.go b/signingkeyserver/inthttp/server.go deleted file mode 100644 index d26f7380..00000000 --- a/signingkeyserver/inthttp/server.go +++ /dev/null @@ -1,43 +0,0 @@ -package inthttp - -import ( - "encoding/json" - "net/http" - - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/internal/httputil" - "github.com/matrix-org/dendrite/signingkeyserver/api" - "github.com/matrix-org/util" -) - -func AddRoutes(s api.SigningKeyServerAPI, internalAPIMux *mux.Router, cache caching.ServerKeyCache) { - internalAPIMux.Handle(ServerKeyQueryPublicKeyPath, - httputil.MakeInternalAPI("queryPublicKeys", func(req *http.Request) util.JSONResponse { - request := api.QueryPublicKeysRequest{} - response := api.QueryPublicKeysResponse{} - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - keys, err := s.FetchKeys(req.Context(), request.Requests) - if err != nil { - return util.ErrorResponse(err) - } - response.Results = keys - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) - internalAPIMux.Handle(ServerKeyInputPublicKeyPath, - httputil.MakeInternalAPI("inputPublicKeys", func(req *http.Request) util.JSONResponse { - request := api.InputPublicKeysRequest{} - response := api.InputPublicKeysResponse{} - if err := json.NewDecoder(req.Body).Decode(&request); err != nil { - return util.MessageResponse(http.StatusBadRequest, err.Error()) - } - if err := s.StoreKeys(req.Context(), request.Keys); err != nil { - return util.ErrorResponse(err) - } - return util.JSONResponse{Code: http.StatusOK, JSON: &response} - }), - ) -} diff --git a/signingkeyserver/signingkeyserver.go b/signingkeyserver/signingkeyserver.go deleted file mode 100644 index 2b1d6751..00000000 --- a/signingkeyserver/signingkeyserver.go +++ /dev/null @@ -1,107 +0,0 @@ -package signingkeyserver - -import ( - "crypto/ed25519" - "encoding/base64" - - "github.com/gorilla/mux" - "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/signingkeyserver/api" - "github.com/matrix-org/dendrite/signingkeyserver/internal" - "github.com/matrix-org/dendrite/signingkeyserver/inthttp" - "github.com/matrix-org/dendrite/signingkeyserver/storage" - "github.com/matrix-org/dendrite/signingkeyserver/storage/cache" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" -) - -// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions -// on the given input API. -func AddInternalRoutes(router *mux.Router, intAPI api.SigningKeyServerAPI, caches *caching.Caches) { - inthttp.AddRoutes(intAPI, router, caches) -} - -// NewInternalAPI returns a concerete implementation of the internal API. Callers -// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. -func NewInternalAPI( - cfg *config.SigningKeyServer, - fedClient gomatrixserverlib.KeyClient, - caches *caching.Caches, -) api.SigningKeyServerAPI { - innerDB, err := storage.NewDatabase( - &cfg.Database, - cfg.Matrix.ServerName, - cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey), - cfg.Matrix.KeyID, - ) - if err != nil { - logrus.WithError(err).Panicf("failed to connect to server key database") - } - - serverKeyDB, err := cache.NewKeyDatabase(innerDB, caches) - if err != nil { - logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database") - } - - internalAPI := internal.ServerKeyAPI{ - ServerName: cfg.Matrix.ServerName, - ServerPublicKey: cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey), - ServerKeyID: cfg.Matrix.KeyID, - ServerKeyValidity: cfg.Matrix.KeyValidityPeriod, - OldServerKeys: cfg.Matrix.OldVerifyKeys, - FedClient: fedClient, - OurKeyRing: gomatrixserverlib.KeyRing{ - KeyFetchers: []gomatrixserverlib.KeyFetcher{}, - KeyDatabase: serverKeyDB, - }, - } - - addDirectFetcher := func() { - internalAPI.OurKeyRing.KeyFetchers = append( - internalAPI.OurKeyRing.KeyFetchers, - &gomatrixserverlib.DirectKeyFetcher{ - Client: fedClient, - }, - ) - } - - if cfg.PreferDirectFetch { - addDirectFetcher() - } else { - defer addDirectFetcher() - } - - var b64e = base64.StdEncoding.WithPadding(base64.NoPadding) - for _, ps := range cfg.KeyPerspectives { - perspective := &gomatrixserverlib.PerspectiveKeyFetcher{ - PerspectiveServerName: ps.ServerName, - PerspectiveServerKeys: map[gomatrixserverlib.KeyID]ed25519.PublicKey{}, - Client: fedClient, - } - - for _, key := range ps.Keys { - rawkey, err := b64e.DecodeString(key.PublicKey) - if err != nil { - logrus.WithError(err).WithFields(logrus.Fields{ - "server_name": ps.ServerName, - "public_key": key.PublicKey, - }).Warn("Couldn't parse perspective key") - continue - } - perspective.PerspectiveServerKeys[key.KeyID] = rawkey - } - - internalAPI.OurKeyRing.KeyFetchers = append( - internalAPI.OurKeyRing.KeyFetchers, - perspective, - ) - - logrus.WithFields(logrus.Fields{ - "server_name": ps.ServerName, - "num_public_keys": len(ps.Keys), - }).Info("Enabled perspective key fetcher") - } - - return &internalAPI -} diff --git a/signingkeyserver/storage/interface.go b/signingkeyserver/storage/interface.go deleted file mode 100644 index 3a67ac55..00000000 --- a/signingkeyserver/storage/interface.go +++ /dev/null @@ -1,13 +0,0 @@ -package storage - -import ( - "context" - - "github.com/matrix-org/gomatrixserverlib" -) - -type Database interface { - FetcherName() string - FetchKeys(ctx context.Context, requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) - StoreKeys(ctx context.Context, keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult) error -} diff --git a/signingkeyserver/storage/keydb.go b/signingkeyserver/storage/keydb.go deleted file mode 100644 index 82b6d0ad..00000000 --- a/signingkeyserver/storage/keydb.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !wasm -// +build !wasm - -package storage - -import ( - "fmt" - - "golang.org/x/crypto/ed25519" - - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/signingkeyserver/storage/postgres" - "github.com/matrix-org/dendrite/signingkeyserver/storage/sqlite3" - "github.com/matrix-org/gomatrixserverlib" -) - -// NewDatabase opens a database connection. -func NewDatabase( - dbProperties *config.DatabaseOptions, - serverName gomatrixserverlib.ServerName, - serverKey ed25519.PublicKey, - serverKeyID gomatrixserverlib.KeyID, -) (Database, error) { - switch { - case dbProperties.ConnectionString.IsSQLite(): - return sqlite3.NewDatabase(dbProperties, serverName, serverKey, serverKeyID) - case dbProperties.ConnectionString.IsPostgres(): - return postgres.NewDatabase(dbProperties, serverName, serverKey, serverKeyID) - default: - return nil, fmt.Errorf("unexpected database type") - } -} diff --git a/signingkeyserver/storage/keydb_wasm.go b/signingkeyserver/storage/keydb_wasm.go deleted file mode 100644 index b112993a..00000000 --- a/signingkeyserver/storage/keydb_wasm.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build wasm -// +build wasm - -package storage - -import ( - "fmt" - "net/url" - - "golang.org/x/crypto/ed25519" - - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/signingkeyserver/storage/sqlite3" - "github.com/matrix-org/gomatrixserverlib" -) - -// NewDatabase opens a database connection. -func NewDatabase( - dataSourceName string, - dbProperties sqlutil.DbProperties, // nolint:unparam - serverName gomatrixserverlib.ServerName, - serverKey ed25519.PublicKey, - serverKeyID gomatrixserverlib.KeyID, -) (Database, error) { - uri, err := url.Parse(dataSourceName) - if err != nil { - return nil, err - } - switch uri.Scheme { - case "postgres": - return nil, fmt.Errorf("Cannot use postgres implementation") - case "file": - return sqlite3.NewDatabase(dataSourceName, serverName, serverKey, serverKeyID) - default: - return nil, fmt.Errorf("Cannot use postgres implementation") - } -} diff --git a/signingkeyserver/storage/postgres/keydb.go b/signingkeyserver/storage/postgres/keydb.go deleted file mode 100644 index 1b3032de..00000000 --- a/signingkeyserver/storage/postgres/keydb.go +++ /dev/null @@ -1,91 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package postgres - -import ( - "context" - - "golang.org/x/crypto/ed25519" - - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/gomatrixserverlib" -) - -// A Database implements gomatrixserverlib.KeyDatabase and is used to store -// the public keys for other matrix servers. -type Database struct { - statements serverKeyStatements -} - -// NewDatabase prepares a new key database. -// It creates the necessary tables if they don't already exist. -// It prepares all the SQL statements that it will use. -// Returns an error if there was a problem talking to the database. -func NewDatabase( - dbProperties *config.DatabaseOptions, - serverName gomatrixserverlib.ServerName, - serverKey ed25519.PublicKey, - serverKeyID gomatrixserverlib.KeyID, -) (*Database, error) { - db, err := sqlutil.Open(dbProperties) - if err != nil { - return nil, err - } - d := &Database{} - err = d.statements.prepare(db) - if err != nil { - return nil, err - } - return d, nil -} - -// FetcherName implements KeyFetcher -func (d Database) FetcherName() string { - return "PostgresKeyDatabase" -} - -// FetchKeys implements gomatrixserverlib.KeyDatabase -func (d *Database) FetchKeys( - ctx context.Context, - requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, -) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { - return d.statements.bulkSelectServerKeys(ctx, requests) -} - -// StoreKeys implements gomatrixserverlib.KeyDatabase -func (d *Database) StoreKeys( - ctx context.Context, - keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, -) error { - // TODO: Inserting all the keys within a single transaction may - // be more efficient since the transaction overhead can be quite - // high for a single insert statement. - var lastErr error - for request, keys := range keyMap { - if err := d.statements.upsertServerKeys(ctx, request, keys); err != nil { - // Rather than returning immediately on error we try to insert the - // remaining keys. - // Since we are inserting the keys outside of a transaction it is - // possible for some of the inserts to succeed even though some - // of the inserts have failed. - // Ensuring that we always insert all the keys we can means that - // this behaviour won't depend on the iteration order of the map. - lastErr = err - } - } - return lastErr -} diff --git a/signingkeyserver/storage/sqlite3/keydb.go b/signingkeyserver/storage/sqlite3/keydb.go deleted file mode 100644 index 1f85a09c..00000000 --- a/signingkeyserver/storage/sqlite3/keydb.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlite3 - -import ( - "context" - - "golang.org/x/crypto/ed25519" - - "github.com/matrix-org/dendrite/internal/sqlutil" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/gomatrixserverlib" -) - -// A Database implements gomatrixserverlib.KeyDatabase and is used to store -// the public keys for other matrix servers. -type Database struct { - writer sqlutil.Writer - statements serverKeyStatements -} - -// NewDatabase prepares a new key database. -// It creates the necessary tables if they don't already exist. -// It prepares all the SQL statements that it will use. -// Returns an error if there was a problem talking to the database. -func NewDatabase( - dbProperties *config.DatabaseOptions, - serverName gomatrixserverlib.ServerName, - serverKey ed25519.PublicKey, - serverKeyID gomatrixserverlib.KeyID, -) (*Database, error) { - db, err := sqlutil.Open(dbProperties) - if err != nil { - return nil, err - } - d := &Database{ - writer: sqlutil.NewExclusiveWriter(), - } - err = d.statements.prepare(db, d.writer) - if err != nil { - return nil, err - } - if err != nil { - return nil, err - } - return d, nil -} - -// FetcherName implements KeyFetcher -func (d Database) FetcherName() string { - return "SqliteKeyDatabase" -} - -// FetchKeys implements gomatrixserverlib.KeyDatabase -func (d *Database) FetchKeys( - ctx context.Context, - requests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp, -) (map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, error) { - return d.statements.bulkSelectServerKeys(ctx, requests) -} - -// StoreKeys implements gomatrixserverlib.KeyDatabase -func (d *Database) StoreKeys( - ctx context.Context, - keyMap map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult, -) error { - // TODO: Inserting all the keys within a single transaction may - // be more efficient since the transaction overhead can be quite - // high for a single insert statement. - var lastErr error - for request, keys := range keyMap { - if err := d.statements.upsertServerKeys(ctx, request, keys); err != nil { - // Rather than returning immediately on error we try to insert the - // remaining keys. - // Since we are inserting the keys outside of a transaction it is - // possible for some of the inserts to succeed even though some - // of the inserts have failed. - // Ensuring that we always insert all the keys we can means that - // this behaviour won't depend on the iteration order of the map. - lastErr = err - } - } - return lastErr -} |