aboutsummaryrefslogtreecommitdiff
path: root/userapi/userapi.go
diff options
context:
space:
mode:
Diffstat (limited to 'userapi/userapi.go')
-rw-r--r--userapi/userapi.go57
1 files changed, 43 insertions, 14 deletions
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 4a5793ab..2382e951 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -18,11 +18,17 @@ import (
"time"
"github.com/gorilla/mux"
+ "github.com/matrix-org/dendrite/internal/pushgateway"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ rsapi "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/dendrite/userapi/consumers"
"github.com/matrix-org/dendrite/userapi/internal"
"github.com/matrix-org/dendrite/userapi/inthttp"
+ "github.com/matrix-org/dendrite/userapi/producers"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/sirupsen/logrus"
)
@@ -36,26 +42,49 @@ func AddInternalRoutes(router *mux.Router, intAPI api.UserInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
- accountDB storage.Database, cfg *config.UserAPI, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI,
+ base *base.BaseDendrite, db storage.Database, cfg *config.UserAPI,
+ appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI,
+ rsAPI rsapi.RoomserverInternalAPI, pgClient pushgateway.Client,
) api.UserInternalAPI {
db, err := storage.NewDatabase(&cfg.AccountDatabase, cfg.Matrix.ServerName, cfg.BCryptCost, int64(api.DefaultLoginTokenLifetime*time.Millisecond), api.DefaultLoginTokenLifetime)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to device db")
}
- return newInternalAPI(db, cfg, appServices, keyAPI)
-}
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
-func newInternalAPI(
- db storage.Database,
- cfg *config.UserAPI,
- appServices []config.ApplicationService,
- keyAPI keyapi.KeyInternalAPI,
-) api.UserInternalAPI {
- return &internal.UserInternalAPI{
- DB: db,
- ServerName: cfg.Matrix.ServerName,
- AppServices: appServices,
- KeyAPI: keyAPI,
+ syncProducer := producers.NewSyncAPI(
+ db, js,
+ // TODO: user API should handle syncs for account data. Right now,
+ // it's handled by clientapi, and hence uses its topic. When user
+ // API handles it for all account data, we can remove it from
+ // here.
+ cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
+ cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
+ )
+
+ userAPI := &internal.UserInternalAPI{
+ DB: db,
+ SyncProducer: syncProducer,
+ ServerName: cfg.Matrix.ServerName,
+ AppServices: appServices,
+ KeyAPI: keyAPI,
+ DisableTLSValidation: cfg.PushGatewayDisableTLSValidation,
+ }
+
+ readConsumer := consumers.NewOutputReadUpdateConsumer(
+ base.ProcessContext, cfg, js, db, pgClient, userAPI, syncProducer,
+ )
+ if err := readConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start user API read update consumer")
+ }
+
+ eventConsumer := consumers.NewOutputStreamEventConsumer(
+ base.ProcessContext, cfg, js, db, pgClient, userAPI, rsAPI, syncProducer,
+ )
+ if err := eventConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start user API streamed event consumer")
}
+
+ return userAPI
}