aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorDevon Hudson <devonhudson@librem.one>2023-02-01 13:41:27 -0700
committerDevon Hudson <devonhudson@librem.one>2023-02-01 13:41:38 -0700
commit048e35026c5396594d26cef6a2412c00ffe6f35f (patch)
treec060b8cf2c4df65cba87839aba9f490a335c5d28 /cmd
parentdbc2869cbd36b069f4a0745557320c5fe1daf0f7 (diff)
Refactor common pinecone demo code to remove major duplication
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dendrite-demo-pinecone/main.go196
-rw-r--r--cmd/dendrite-demo-pinecone/monolith/monolith.go321
2 files changed, 320 insertions, 197 deletions
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go
index ffb7ace1..0b76188b 100644
--- a/cmd/dendrite-demo-pinecone/main.go
+++ b/cmd/dendrite-demo-pinecone/main.go
@@ -15,48 +15,24 @@
package main
import (
- "context"
"crypto/ed25519"
- "crypto/tls"
"encoding/hex"
"flag"
"fmt"
"net"
- "net/http"
"os"
"path/filepath"
"strings"
- "time"
- "github.com/gorilla/mux"
- "github.com/gorilla/websocket"
- "github.com/matrix-org/dendrite/appservice"
- "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
- "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/monolith"
- "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay"
- "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
- "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
- "github.com/matrix-org/dendrite/federationapi"
- "github.com/matrix-org/dendrite/federationapi/api"
- "github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal"
- "github.com/matrix-org/dendrite/internal/httputil"
- "github.com/matrix-org/dendrite/keyserver"
- "github.com/matrix-org/dendrite/relayapi"
- "github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
- "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
pineconeRouter "github.com/matrix-org/pinecone/router"
- pineconeEvents "github.com/matrix-org/pinecone/router/events"
-
- "github.com/sirupsen/logrus"
)
var (
@@ -95,16 +71,12 @@ func main() {
keyfile := filepath.Join(*instanceDir, *instanceName) + ".pem"
oldKeyfile := *instanceName + ".key"
sk, pk = monolith.GetOrCreateKey(keyfile, oldKeyfile)
- cfg = monolith.GenerateDefaultConfig(sk, *instanceDir, *instanceName)
+ cfg = monolith.GenerateDefaultConfig(sk, *instanceDir, *instanceDir, *instanceName)
}
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
- base := base.NewBaseDendrite(cfg, "Monolith")
- base.ConfigureAdminEndpoints()
- defer base.Close() // nolint: errcheck
-
p2pMonolith := monolith.P2PMonolith{}
p2pMonolith.SetupPinecone(sk)
p2pMonolith.Multicast.Start()
@@ -115,6 +87,14 @@ func main() {
}
}
+ enableMetrics := true
+ enableWebsockets := true
+ p2pMonolith.SetupDendrite(cfg, *instancePort, *instanceRelayingEnabled, enableMetrics, enableWebsockets)
+
+ useTCPListener := false
+ p2pMonolith.StartMonolith(useTCPListener)
+ p2pMonolith.WaitForShutdown()
+
go func() {
listener, err := net.Listen("tcp", *instanceListen)
if err != nil {
@@ -142,160 +122,4 @@ func main() {
fmt.Println("Inbound connection", conn.RemoteAddr(), "is connected to port", port)
}
}()
-
- // TODO : factor this dendrite setup out to a common place
- federation := conn.CreateFederationClient(base, p2pMonolith.Sessions)
-
- serverKeyAPI := &signing.YggdrasilKeys{}
- keyRing := serverKeyAPI.KeyRing()
-
- rsComponent := roomserver.NewInternalAPI(base)
- rsAPI := rsComponent
- fsAPI := federationapi.NewInternalAPI(
- base, federation, rsAPI, base.Caches, keyRing, true,
- )
-
- keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI, rsComponent)
- userAPI := userapi.NewInternalAPI(base, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient())
- keyAPI.SetUserAPI(userAPI)
-
- asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
-
- rsComponent.SetFederationAPI(fsAPI, keyRing)
-
- userProvider := users.NewPineconeUserProvider(p2pMonolith.Router, p2pMonolith.Sessions, userAPI, federation)
- roomProvider := rooms.NewPineconeRoomProvider(p2pMonolith.Router, p2pMonolith.Sessions, fsAPI, federation)
-
- js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- producer := &producers.SyncAPIProducer{
- JetStream: js,
- TopicReceiptEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent),
- TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
- TopicTypingEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent),
- TopicPresenceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
- TopicDeviceListUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
- TopicSigningKeyUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
- Config: &base.Cfg.FederationAPI,
- UserAPI: userAPI,
- }
- relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, *instanceRelayingEnabled)
- logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled())
-
- m := setup.Monolith{
- Config: base.Cfg,
- Client: conn.CreateClient(base, p2pMonolith.Sessions),
- FedClient: federation,
- KeyRing: keyRing,
-
- AppserviceAPI: asAPI,
- FederationAPI: fsAPI,
- RoomserverAPI: rsAPI,
- UserAPI: userAPI,
- KeyAPI: keyAPI,
- RelayAPI: relayAPI,
- ExtPublicRoomsProvider: roomProvider,
- ExtUserDirectoryProvider: userProvider,
- }
- m.AddAllPublicRoutes(base)
-
- wsUpgrader := websocket.Upgrader{
- CheckOrigin: func(_ *http.Request) bool {
- return true
- },
- }
- httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
- httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux)
- httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)
- httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
- httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.DendriteAdminMux)
- httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.SynapseAdminMux)
- httpRouter.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
- c, err := wsUpgrader.Upgrade(w, r, nil)
- if err != nil {
- logrus.WithError(err).Error("Failed to upgrade WebSocket connection")
- return
- }
- conn := conn.WrapWebSocketConn(c)
- if _, err = p2pMonolith.Router.Connect(
- conn,
- pineconeRouter.ConnectionZone("websocket"),
- pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
- ); err != nil {
- logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
- }
- })
- httpRouter.HandleFunc("/pinecone", p2pMonolith.Router.ManholeHandler)
- embed.Embed(httpRouter, *instancePort, "Pinecone Demo")
-
- pMux := mux.NewRouter().SkipClean(true).UseEncodedPath()
- pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles)
- pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux)
- pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
-
- pHTTP := p2pMonolith.Sessions.Protocol("matrix").HTTP()
- pHTTP.Mux().Handle(users.PublicURL, pMux)
- pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux)
- pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux)
-
- // Build both ends of a HTTP multiplex.
- httpServer := &http.Server{
- Addr: ":0",
- TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 10 * time.Second,
- IdleTimeout: 60 * time.Second,
- BaseContext: func(_ net.Listener) context.Context {
- return context.Background()
- },
- Handler: pMux,
- }
-
- // TODO : factor these funcs out to a common place
- go func() {
- pubkey := p2pMonolith.Router.PublicKey()
- logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))
- logrus.Fatal(httpServer.Serve(p2pMonolith.Sessions.Protocol("matrix")))
- }()
- go func() {
- httpBindAddr := fmt.Sprintf(":%d", *instancePort)
- logrus.Info("Listening on ", httpBindAddr)
- logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter))
- }()
-
- stopRelayServerSync := make(chan bool)
- eLog := logrus.WithField("pinecone", "events")
- relayRetriever := relay.NewRelayServerRetriever(
- context.Background(),
- gomatrixserverlib.ServerName(p2pMonolith.Router.PublicKey().String()),
- m.FederationAPI,
- m.RelayAPI,
- stopRelayServerSync,
- )
- relayRetriever.InitializeRelayServers(eLog)
-
- go func(ch <-chan pineconeEvents.Event) {
- for event := range ch {
- switch e := event.(type) {
- case pineconeEvents.PeerAdded:
- relayRetriever.StartSync()
- case pineconeEvents.PeerRemoved:
- if relayRetriever.IsRunning() && p2pMonolith.Router.TotalPeerCount() == 0 {
- stopRelayServerSync <- true
- }
- case pineconeEvents.BroadcastReceived:
- // eLog.Info("Broadcast received from: ", e.PeerID)
-
- req := &api.PerformWakeupServersRequest{
- ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
- }
- res := &api.PerformWakeupServersResponse{}
- if err := m.FederationAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
- eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
- }
- default:
- }
- }
- }(p2pMonolith.EventChannel)
-
- base.WaitForShutdown()
}
diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go
index d3f0562b..61e71e87 100644
--- a/cmd/dendrite-demo-pinecone/monolith/monolith.go
+++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go
@@ -15,12 +15,43 @@
package monolith
import (
+ "context"
"crypto/ed25519"
+ "crypto/tls"
+ "encoding/hex"
"fmt"
+ "net"
+ "net/http"
"path/filepath"
+ "time"
+ "github.com/gorilla/mux"
+ "github.com/gorilla/websocket"
+ "github.com/matrix-org/dendrite/appservice"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
+ "github.com/matrix-org/dendrite/federationapi"
+ federationAPI "github.com/matrix-org/dendrite/federationapi/api"
+ "github.com/matrix-org/dendrite/federationapi/producers"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/keyserver"
+ "github.com/matrix-org/dendrite/relayapi"
+ relayAPI "github.com/matrix-org/dendrite/relayapi/api"
+ "github.com/matrix-org/dendrite/roomserver"
+ "github.com/matrix-org/dendrite/setup"
+ "github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/userapi"
+ userAPI "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
+ "golang.org/x/net/http2"
+ "golang.org/x/net/http2/h2c"
pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
@@ -29,22 +60,33 @@ import (
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
+const SessionProtocol = "matrix"
+
type P2PMonolith struct {
- Sessions *pineconeSessions.Sessions
- Multicast *pineconeMulticast.Multicast
- ConnManager *pineconeConnections.ConnectionManager
- Router *pineconeRouter.Router
- EventChannel chan pineconeEvents.Event
+ BaseDendrite *base.BaseDendrite
+ Sessions *pineconeSessions.Sessions
+ Multicast *pineconeMulticast.Multicast
+ ConnManager *pineconeConnections.ConnectionManager
+ Router *pineconeRouter.Router
+ EventChannel chan pineconeEvents.Event
+ RelayRetriever relay.RelayServerRetriever
+
+ dendrite setup.Monolith
+ port int
+ httpMux *mux.Router
+ pineconeMux *mux.Router
+ listener net.Listener
+ httpListenAddr string
}
-func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, dbPrefix string) *config.Dendrite {
+func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir string, dbPrefix string) *config.Dendrite {
cfg := config.Dendrite{}
cfg.Defaults(config.DefaultOpts{
Generate: true,
Monolithic: true,
})
cfg.Global.PrivateKey = sk
- cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", filepath.Join(storageDir, dbPrefix)))
+ cfg.Global.JetStream.StoragePath = config.Path(fmt.Sprintf("%s/", filepath.Join(cacheDir, dbPrefix)))
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", filepath.Join(storageDir, dbPrefix)))
cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", filepath.Join(storageDir, dbPrefix)))
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", filepath.Join(storageDir, dbPrefix)))
@@ -56,10 +98,10 @@ func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, dbPrefix st
cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", filepath.Join(storageDir, dbPrefix)))
cfg.ClientAPI.RegistrationDisabled = false
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
- cfg.MediaAPI.BasePath = config.Path(filepath.Join(storageDir, "media"))
- cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(storageDir, "media"))
+ cfg.MediaAPI.BasePath = config.Path(filepath.Join(cacheDir, "media"))
+ cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(cacheDir, "media"))
cfg.SyncAPI.Fulltext.Enabled = true
- cfg.SyncAPI.Fulltext.IndexPath = config.Path(filepath.Join(storageDir, "search"))
+ cfg.SyncAPI.Fulltext.IndexPath = config.Path(filepath.Join(cacheDir, "search"))
if err := cfg.Derive(); err != nil {
panic(err)
}
@@ -74,7 +116,264 @@ func (p *P2PMonolith) SetupPinecone(sk ed25519.PrivateKey) {
p.Router.EnableWakeupBroadcasts()
p.Router.Subscribe(p.EventChannel)
- p.Sessions = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), p.Router, []string{"matrix"})
+ p.Sessions = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), p.Router, []string{SessionProtocol})
p.Multicast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), p.Router)
p.ConnManager = pineconeConnections.NewConnectionManager(p.Router, nil)
}
+
+func (p *P2PMonolith) SetupDendrite(cfg *config.Dendrite, port int, enableRelaying bool, enableMetrics bool, enableWebsockets bool) {
+ if enableMetrics {
+ p.BaseDendrite = base.NewBaseDendrite(cfg, "Monolith")
+ } else {
+ p.BaseDendrite = base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics)
+ }
+ p.port = port
+ p.BaseDendrite.ConfigureAdminEndpoints()
+
+ federation := conn.CreateFederationClient(p.BaseDendrite, p.Sessions)
+
+ serverKeyAPI := &signing.YggdrasilKeys{}
+ keyRing := serverKeyAPI.KeyRing()
+
+ rsComponent := roomserver.NewInternalAPI(p.BaseDendrite)
+ rsAPI := rsComponent
+ fsAPI := federationapi.NewInternalAPI(
+ p.BaseDendrite, federation, rsAPI, p.BaseDendrite.Caches, keyRing, true,
+ )
+
+ keyAPI := keyserver.NewInternalAPI(p.BaseDendrite, &p.BaseDendrite.Cfg.KeyServer, fsAPI, rsComponent)
+ userAPI := userapi.NewInternalAPI(p.BaseDendrite, &cfg.UserAPI, nil, keyAPI, rsAPI, p.BaseDendrite.PushGatewayHTTPClient())
+ keyAPI.SetUserAPI(userAPI)
+
+ asAPI := appservice.NewInternalAPI(p.BaseDendrite, userAPI, rsAPI)
+
+ rsComponent.SetFederationAPI(fsAPI, keyRing)
+
+ userProvider := users.NewPineconeUserProvider(p.Router, p.Sessions, userAPI, federation)
+ roomProvider := rooms.NewPineconeRoomProvider(p.Router, p.Sessions, fsAPI, federation)
+
+ js, _ := p.BaseDendrite.NATS.Prepare(p.BaseDendrite.ProcessContext, &p.BaseDendrite.Cfg.Global.JetStream)
+ producer := &producers.SyncAPIProducer{
+ JetStream: js,
+ TopicReceiptEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ TopicSendToDeviceEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ TopicTypingEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ TopicPresenceEvent: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ TopicDeviceListUpdate: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
+ TopicSigningKeyUpdate: p.BaseDendrite.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
+ Config: &p.BaseDendrite.Cfg.FederationAPI,
+ UserAPI: userAPI,
+ }
+ relayAPI := relayapi.NewRelayInternalAPI(p.BaseDendrite, federation, rsAPI, keyRing, producer, enableRelaying)
+ logrus.Infof("Relaying enabled: %v", relayAPI.RelayingEnabled())
+
+ p.dendrite = setup.Monolith{
+ Config: p.BaseDendrite.Cfg,
+ Client: conn.CreateClient(p.BaseDendrite, p.Sessions),
+ FedClient: federation,
+ KeyRing: keyRing,
+
+ AppserviceAPI: asAPI,
+ FederationAPI: fsAPI,
+ RoomserverAPI: rsAPI,
+ UserAPI: userAPI,
+ KeyAPI: keyAPI,
+ RelayAPI: relayAPI,
+ ExtPublicRoomsProvider: roomProvider,
+ ExtUserDirectoryProvider: userProvider,
+ }
+ p.dendrite.AddAllPublicRoutes(p.BaseDendrite)
+
+ p.setupHttpServers(userProvider, enableWebsockets)
+}
+
+func (p *P2PMonolith) GetFederationAPI() federationAPI.FederationInternalAPI {
+ return p.dendrite.FederationAPI
+}
+
+func (p *P2PMonolith) GetRelayAPI() relayAPI.RelayInternalAPI {
+ return p.dendrite.RelayAPI
+}
+
+func (p *P2PMonolith) GetUserAPI() userAPI.UserInternalAPI {
+ return p.dendrite.UserAPI
+}
+
+func (p *P2PMonolith) StartMonolith(useTCPListener bool) {
+ p.startHTTPServers(useTCPListener)
+ p.startEventHandler()
+}
+
+func (p *P2PMonolith) Stop() {
+ _ = p.BaseDendrite.Close()
+ p.WaitForShutdown()
+}
+
+func (p *P2PMonolith) WaitForShutdown() {
+ p.BaseDendrite.WaitForShutdown()
+ p.closeAllResources()
+}
+
+func (p *P2PMonolith) closeAllResources() {
+ if p.listener != nil {
+ _ = p.listener.Close()
+ }
+
+ if p.Multicast != nil {
+ p.Multicast.Stop()
+ }
+
+ if p.Sessions != nil {
+ _ = p.Sessions.Close()
+ }
+
+ if p.Router != nil {
+ _ = p.Router.Close()
+ }
+}
+
+func (p *P2PMonolith) Addr() string {
+ return p.httpListenAddr
+}
+
+func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider, enableWebsockets bool) {
+ p.httpMux = mux.NewRouter().SkipClean(true).UseEncodedPath()
+ p.httpMux.PathPrefix(httputil.InternalPathPrefix).Handler(p.BaseDendrite.InternalAPIMux)
+ p.httpMux.PathPrefix(httputil.PublicClientPathPrefix).Handler(p.BaseDendrite.PublicClientAPIMux)
+ p.httpMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(p.BaseDendrite.PublicMediaAPIMux)
+ p.httpMux.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(p.BaseDendrite.DendriteAdminMux)
+ p.httpMux.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(p.BaseDendrite.SynapseAdminMux)
+
+ if enableWebsockets {
+ wsUpgrader := websocket.Upgrader{
+ CheckOrigin: func(_ *http.Request) bool {
+ return true
+ },
+ }
+ p.httpMux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
+ c, err := wsUpgrader.Upgrade(w, r, nil)
+ if err != nil {
+ logrus.WithError(err).Error("Failed to upgrade WebSocket connection")
+ return
+ }
+ conn := conn.WrapWebSocketConn(c)
+ if _, err = p.Router.Connect(
+ conn,
+ pineconeRouter.ConnectionZone("websocket"),
+ pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
+ ); err != nil {
+ logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
+ }
+ })
+ }
+
+ p.httpMux.HandleFunc("/pinecone", p.Router.ManholeHandler)
+
+ if enableWebsockets {
+ embed.Embed(p.httpMux, p.port, "Pinecone Demo")
+ }
+
+ p.pineconeMux = mux.NewRouter().SkipClean(true).UseEncodedPath()
+ p.pineconeMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles)
+ p.pineconeMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(p.BaseDendrite.PublicFederationAPIMux)
+ p.pineconeMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(p.BaseDendrite.PublicMediaAPIMux)
+
+ pHTTP := p.Sessions.Protocol(SessionProtocol).HTTP()
+ pHTTP.Mux().Handle(users.PublicURL, p.pineconeMux)
+ pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, p.pineconeMux)
+ pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, p.pineconeMux)
+}
+
+func (p *P2PMonolith) startHTTPServers(useTCPListener bool) {
+ var handler http.Handler
+ var httpServeFunc func() error
+ if useTCPListener {
+ var err error
+ p.httpListenAddr = "localhost:" + fmt.Sprint(p.port)
+ p.listener, err = net.Listen("tcp", p.httpListenAddr)
+ if err != nil {
+ panic(err)
+ }
+
+ h2s := &http2.Server{}
+ handler = h2c.NewHandler(p.pineconeMux, h2s)
+ httpServeFunc = func() error { return http.Serve(p.listener, p.httpMux) }
+ } else {
+ handler = p.pineconeMux
+ p.httpListenAddr = fmt.Sprintf(":%d", p.port)
+ httpServeFunc = func() error { return http.ListenAndServe(p.httpListenAddr, p.httpMux) }
+ }
+
+ go func() {
+ // Build both ends of a HTTP multiplex.
+ httpServer := &http.Server{
+ Addr: ":0",
+ TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
+ ReadTimeout: 10 * time.Second,
+ WriteTimeout: 10 * time.Second,
+ IdleTimeout: 30 * time.Second,
+ BaseContext: func(_ net.Listener) context.Context {
+ return context.Background()
+ },
+ Handler: handler,
+ }
+
+ pubkey := p.Router.PublicKey()
+ pubkeyString := hex.EncodeToString(pubkey[:])
+ logrus.Info("Listening on ", pubkeyString)
+
+ switch httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) {
+ case net.ErrClosed, http.ErrServerClosed:
+ logrus.Info("Stopped listening on ", pubkeyString)
+ default:
+ logrus.Error("Stopped listening on ", pubkeyString)
+ }
+ }()
+
+ go func() {
+ logrus.Info("Listening on ", p.httpListenAddr)
+ switch httpServeFunc() {
+ case net.ErrClosed, http.ErrServerClosed:
+ logrus.Info("Stopped listening on ", p.httpListenAddr)
+ default:
+ logrus.Error("Stopped listening on ", p.httpListenAddr)
+ }
+ }()
+}
+
+func (p *P2PMonolith) startEventHandler() {
+ stopRelayServerSync := make(chan bool)
+ eLog := logrus.WithField("pinecone", "events")
+ p.RelayRetriever = relay.NewRelayServerRetriever(
+ context.Background(),
+ gomatrixserverlib.ServerName(p.Router.PublicKey().String()),
+ p.dendrite.FederationAPI,
+ p.dendrite.RelayAPI,
+ stopRelayServerSync,
+ )
+ p.RelayRetriever.InitializeRelayServers(eLog)
+
+ go func(ch <-chan pineconeEvents.Event) {
+ for event := range ch {
+ switch e := event.(type) {
+ case pineconeEvents.PeerAdded:
+ p.RelayRetriever.StartSync()
+ case pineconeEvents.PeerRemoved:
+ if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 {
+ stopRelayServerSync <- true
+ }
+ case pineconeEvents.BroadcastReceived:
+ // eLog.Info("Broadcast received from: ", e.PeerID)
+
+ req := &federationAPI.PerformWakeupServersRequest{
+ ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
+ }
+ res := &federationAPI.PerformWakeupServersResponse{}
+ if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil {
+ eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
+ }
+ default:
+ }
+ }
+ }(p.EventChannel)
+}