diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-05-06 12:00:42 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-06 12:00:42 +0100 |
commit | 1002e87b60659291af964c6d07c3a9057a3ed9b7 (patch) | |
tree | 81ca40385ac4072826e05d46c0d0a1562f13fa4e /build/gobind-yggdrasil/monolith.go | |
parent | 464b908bd0c13854b3f6b9a17467f39e0608dc08 (diff) |
Pinecone P2P demo (#1856)
* Pinecone demo
* Enable multicast, fix HTTP routing
* Fix multicast import
* Fix build
* Update Pinecone demo
* Fix the keys
* Tweaks
* Pinecone room directory support (early)
* Fix gobind-pinecone
* Add pinecone listener
* Fix public key value
* Use AuthenticatedConnect for dial
* Fix gobind-pinecone
* Stop panics
* Give fsAPI to keyserver
* Pinecone demo fixes
* Update gobind build scripts
* Account creation
* Tweaks
* Setup tweaks
* API tweaks
* API tweaks
* API tweaks
* Port mutex
* Re-enable multicast
* Add ReadCopy
* Update quic-go, fixes
* Shutdowns fixed for iOS
* Update build script
* Add WebSocket support
* Bug fixes
* Netconn context
* Fix WebSocket connectivity
* Fixes to gobind API
* Strip frameworks
* Configurability updates
* Update go.mod
* Update go.mod/go.sum
* Update go.mod/go.sum
* Update go.mod/go.sum
* Try to stay connected tto static peer
* Update gobind-pinecone
* Update go.mod/go.sum
* Test uTP+TLS
* Use HTTP/2
* Don't use HTTP/2
* Update go.mod/go.sum
* Attempt to reconnect to the static peer if it drops
* Stay connected to static peers more stickily
* Retry room directory lookups if they fail
* NewQUIC -> NewSessions
* Storage updates
* Don't return immediately when there's nothing to sync
* Updates
* Try to reconnect to static peer more
* Update go.mod/go.sum
* Require Go 1.14
* Update go.mod/go.sum
* Update go.mod/go.sum
Diffstat (limited to 'build/gobind-yggdrasil/monolith.go')
-rw-r--r-- | build/gobind-yggdrasil/monolith.go | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go new file mode 100644 index 00000000..332d156b --- /dev/null +++ b/build/gobind-yggdrasil/monolith.go @@ -0,0 +1,221 @@ +package gobind + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "time" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/appservice" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/federationsender" + "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/roomserver" + "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/userapi" + "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" +) + +type DendriteMonolith struct { + logger logrus.Logger + YggdrasilNode *yggconn.Node + StorageDirectory string + listener net.Listener + httpServer *http.Server +} + +func (m *DendriteMonolith) BaseURL() string { + return fmt.Sprintf("http://%s", m.listener.Addr().String()) +} + +func (m *DendriteMonolith) PeerCount() int { + return m.YggdrasilNode.PeerCount() +} + +func (m *DendriteMonolith) SessionCount() int { + return m.YggdrasilNode.SessionCount() +} + +func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) { + m.YggdrasilNode.SetMulticastEnabled(enabled) +} + +func (m *DendriteMonolith) SetStaticPeer(uri string) error { + return m.YggdrasilNode.SetStaticPeer(uri) +} + +func (m *DendriteMonolith) DisconnectNonMulticastPeers() { + m.YggdrasilNode.DisconnectNonMulticastPeers() +} + +func (m *DendriteMonolith) DisconnectMulticastPeers() { + m.YggdrasilNode.DisconnectMulticastPeers() +} + +func (m *DendriteMonolith) Start() { + m.logger = logrus.Logger{ + Out: BindLogger{}, + } + m.logger.SetOutput(BindLogger{}) + logrus.SetOutput(BindLogger{}) + + var err error + m.listener, err = net.Listen("tcp", "localhost:65432") + if err != nil { + panic(err) + } + + ygg, err := yggconn.Setup("dendrite", m.StorageDirectory) + if err != nil { + panic(err) + } + m.YggdrasilNode = ygg + + cfg := &config.Dendrite{} + cfg.Defaults() + cfg.Global.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) + cfg.Global.PrivateKey = ygg.SigningPrivateKey() + cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) + cfg.Global.Kafka.UseNaffka = true + cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-naffka.db", m.StorageDirectory)) + cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-account.db", m.StorageDirectory)) + cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-device.db", m.StorageDirectory)) + cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-mediaapi.db", m.StorageDirectory)) + cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-syncapi.db", m.StorageDirectory)) + cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-roomserver.db", m.StorageDirectory)) + cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-signingkeyserver.db", m.StorageDirectory)) + cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-keyserver.db", m.StorageDirectory)) + cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-federationsender.db", m.StorageDirectory)) + cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-appservice.db", m.StorageDirectory)) + cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory)) + cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory)) + if err = cfg.Derive(); err != nil { + panic(err) + } + + base := setup.NewBaseDendrite(cfg, "Monolith", false) + defer base.Close() // nolint: errcheck + + accountDB := base.CreateAccountsDB() + federation := ygg.CreateFederationClient(base) + + serverKeyAPI := &signing.YggdrasilKeys{} + keyRing := serverKeyAPI.KeyRing() + + rsAPI := roomserver.NewInternalAPI( + base, keyRing, + ) + + fsAPI := federationsender.NewInternalAPI( + base, federation, rsAPI, keyRing, + ) + + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, federation) + userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI) + keyAPI.SetUserAPI(userAPI) + + eduInputAPI := eduserver.NewInternalAPI( + base, cache.New(), userAPI, + ) + + asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + rsAPI.SetAppserviceAPI(asAPI) + + ygg.SetSessionFunc(func(address string) { + req := &api.PerformServersAliveRequest{ + Servers: []gomatrixserverlib.ServerName{ + gomatrixserverlib.ServerName(address), + }, + } + res := &api.PerformServersAliveResponse{} + if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil { + logrus.WithError(err).Error("Failed to send wake-up message to newly connected node") + } + }) + + // The underlying roomserver implementation needs to be able to call the fedsender. + // This is different to rsAPI which can be the http client which doesn't need this dependency + rsAPI.SetFederationSenderAPI(fsAPI) + + monolith := setup.Monolith{ + Config: base.Cfg, + AccountDB: accountDB, + Client: ygg.CreateClient(base), + FedClient: federation, + KeyRing: keyRing, + + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationSenderAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, + ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( + ygg, fsAPI, federation, + ), + } + monolith.AddAllPublicRoutes( + base.ProcessContext, + base.PublicClientAPIMux, + base.PublicFederationAPIMux, + base.PublicKeyAPIMux, + base.PublicMediaAPIMux, + ) + + httpRouter := mux.NewRouter() + httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux) + httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) + httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) + + yggRouter := mux.NewRouter() + yggRouter.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) + yggRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) + + // Build both ends of a HTTP multiplex. + m.httpServer = &http.Server{ + Addr: ":0", + TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, + BaseContext: func(_ net.Listener) context.Context { + return context.Background() + }, + Handler: yggRouter, + } + + go func() { + m.logger.Info("Listening on ", ygg.DerivedServerName()) + m.logger.Fatal(m.httpServer.Serve(ygg)) + }() + go func() { + logrus.Info("Listening on ", m.listener.Addr()) + logrus.Fatal(http.Serve(m.listener, httpRouter)) + }() + go func() { + logrus.Info("Sending wake-up message to known nodes") + req := &api.PerformBroadcastEDURequest{} + res := &api.PerformBroadcastEDUResponse{} + if err := fsAPI.PerformBroadcastEDU(context.TODO(), req, res); err != nil { + logrus.WithError(err).Error("Failed to send wake-up message to known nodes") + } + }() +} + +func (m *DendriteMonolith) Suspend() { + m.logger.Info("Suspending monolith") + if err := m.httpServer.Close(); err != nil { + m.logger.Warn("Error stopping HTTP server:", err) + } +} |