aboutsummaryrefslogtreecommitdiff
path: root/federationsender/federationsender.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/federationsender.go')
-rw-r--r--federationsender/federationsender.go8
1 files changed, 8 insertions, 0 deletions
diff --git a/federationsender/federationsender.go b/federationsender/federationsender.go
index 9e14f6ec..fbf506aa 100644
--- a/federationsender/federationsender.go
+++ b/federationsender/federationsender.go
@@ -16,6 +16,7 @@ package federationsender
import (
"github.com/gorilla/mux"
+ stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/consumers"
"github.com/matrix-org/dendrite/federationsender/internal"
@@ -41,6 +42,7 @@ func NewInternalAPI(
base *setup.BaseDendrite,
federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI,
+ stateAPI stateapi.CurrentStateInternalAPI,
keyRing *gomatrixserverlib.KeyRing,
) api.FederationSenderInternalAPI {
federationSenderDB, err := storage.NewDatabase(string(base.Cfg.Database.FederationSender), base.Cfg.DbProperties())
@@ -76,6 +78,12 @@ func NewInternalAPI(
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
}
+ keyConsumer := consumers.NewKeyChangeConsumer(
+ base.Cfg, base.KafkaConsumer, queues, federationSenderDB, stateAPI,
+ )
+ if err := keyConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start key server consumer")
+ }
return internal.NewFederationSenderInternalAPI(federationSenderDB, base.Cfg, rsAPI, federation, keyRing, stats, queues)
}