aboutsummaryrefslogtreecommitdiff
path: root/build/gobind-pinecone
diff options
context:
space:
mode:
authorDevon Hudson <devonhudson@librem.one>2023-02-01 13:41:27 -0700
committerDevon Hudson <devonhudson@librem.one>2023-02-01 13:41:38 -0700
commit048e35026c5396594d26cef6a2412c00ffe6f35f (patch)
treec060b8cf2c4df65cba87839aba9f490a335c5d28 /build/gobind-pinecone
parentdbc2869cbd36b069f4a0745557320c5fe1daf0f7 (diff)
Refactor common pinecone demo code to remove major duplication
Diffstat (limited to 'build/gobind-pinecone')
-rw-r--r--build/gobind-pinecone/monolith.go226
1 files changed, 27 insertions, 199 deletions
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go
index cca92151..597c9135 100644
--- a/build/gobind-pinecone/monolith.go
+++ b/build/gobind-pinecone/monolith.go
@@ -18,47 +18,25 @@ import (
"context"
"crypto/ed25519"
"crypto/rand"
- "crypto/tls"
"encoding/hex"
"fmt"
"net"
- "net/http"
"path/filepath"
"strings"
- "time"
- "github.com/gorilla/mux"
- "github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conduit"
- "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/monolith"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay"
- "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
- "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
- "github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
- "github.com/matrix-org/dendrite/federationapi/producers"
- "github.com/matrix-org/dendrite/internal/httputil"
- "github.com/matrix-org/dendrite/keyserver"
- "github.com/matrix-org/dendrite/relayapi"
- relayServerAPI "github.com/matrix-org/dendrite/relayapi/api"
- "github.com/matrix-org/dendrite/roomserver"
- "github.com/matrix-org/dendrite/setup"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/userapi"
userapiAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/pinecone/types"
"github.com/sirupsen/logrus"
- "golang.org/x/net/http2"
- "golang.org/x/net/http2/h2c"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router"
- pineconeEvents "github.com/matrix-org/pinecone/router/events"
_ "golang.org/x/mobile/bind"
)
@@ -70,17 +48,17 @@ const (
PeerTypeBonjour = pineconeRouter.PeerTypeBonjour
)
+// Re-export Conduit in this package for bindings.
+type Conduit struct {
+ conduit.Conduit
+}
+
type DendriteMonolith struct {
logger logrus.Logger
- baseDendrite *base.BaseDendrite
p2pMonolith monolith.P2PMonolith
StorageDirectory string
+ CacheDirectory string
listener net.Listener
- httpServer *http.Server
- userAPI userapiAPI.UserInternalAPI
- federationAPI api.FederationInternalAPI
- relayAPI relayServerAPI.RelayInternalAPI
- relayRetriever relay.RelayServerRetriever
}
func (m *DendriteMonolith) PublicKey() string {
@@ -88,7 +66,7 @@ func (m *DendriteMonolith) PublicKey() string {
}
func (m *DendriteMonolith) BaseURL() string {
- return fmt.Sprintf("http://%s", m.listener.Addr().String())
+ return fmt.Sprintf("http://%s", m.p2pMonolith.Addr())
}
func (m *DendriteMonolith) PeerCount(peertype int) int {
@@ -96,7 +74,7 @@ func (m *DendriteMonolith) PeerCount(peertype int) int {
}
func (m *DendriteMonolith) SessionCount() int {
- return len(m.p2pMonolith.Sessions.Protocol("matrix").Sessions())
+ return len(m.p2pMonolith.Sessions.Protocol(monolith.SessionProtocol).Sessions())
}
type InterfaceInfo struct {
@@ -202,13 +180,13 @@ func (m *DendriteMonolith) SetRelayServers(nodeID string, uris string) {
if string(nodeKey) == m.PublicKey() {
logrus.Infof("Setting own relay servers to: %v", relays)
- m.relayRetriever.SetRelayServers(relays)
+ m.p2pMonolith.RelayRetriever.SetRelayServers(relays)
} else {
relay.UpdateNodeRelayServers(
gomatrixserverlib.ServerName(nodeKey),
relays,
- m.baseDendrite.Context(),
- m.federationAPI,
+ m.p2pMonolith.BaseDendrite.Context(),
+ m.p2pMonolith.GetFederationAPI(),
)
}
}
@@ -222,7 +200,7 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string {
relaysString := ""
if string(nodeKey) == m.PublicKey() {
- relays := m.relayRetriever.GetRelayServers()
+ relays := m.p2pMonolith.RelayRetriever.GetRelayServers()
for i, relay := range relays {
if i != 0 {
@@ -234,7 +212,7 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string {
} else {
request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(nodeKey)}
response := api.P2PQueryRelayServersResponse{}
- err := m.federationAPI.P2PQueryRelayServers(m.baseDendrite.Context(), &request, &response)
+ err := m.p2pMonolith.GetFederationAPI().P2PQueryRelayServers(m.p2pMonolith.BaseDendrite.Context(), &request, &response)
if err != nil {
logrus.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error())
return ""
@@ -253,11 +231,11 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string {
}
func (m *DendriteMonolith) RelayingEnabled() bool {
- return m.relayAPI.RelayingEnabled()
+ return m.p2pMonolith.GetRelayAPI().RelayingEnabled()
}
func (m *DendriteMonolith) SetRelayingEnabled(enabled bool) {
- m.relayAPI.SetRelayingEnabled(enabled)
+ m.p2pMonolith.GetRelayAPI().SetRelayingEnabled(enabled)
}
func (m *DendriteMonolith) DisconnectType(peertype int) {
@@ -280,9 +258,9 @@ func (m *DendriteMonolith) DisconnectPort(port int) {
m.p2pMonolith.Router.Disconnect(types.SwitchPortID(port), nil)
}
-func (m *DendriteMonolith) Conduit(zone string, peertype int) (*conduit.Conduit, error) {
+func (m *DendriteMonolith) Conduit(zone string, peertype int) (*Conduit, error) {
l, r := net.Pipe()
- newConduit := conduit.NewConduit(r, 0)
+ newConduit := Conduit{conduit.NewConduit(r, 0)}
go func() {
logrus.Errorf("Attempting authenticated connect")
var port types.SwitchPortID
@@ -316,7 +294,7 @@ func (m *DendriteMonolith) RegisterUser(localpart, password string) (string, err
Password: password,
}
userRes := &userapiAPI.PerformAccountCreationResponse{}
- if err := m.userAPI.PerformAccountCreation(context.Background(), userReq, userRes); err != nil {
+ if err := m.p2pMonolith.GetUserAPI().PerformAccountCreation(context.Background(), userReq, userRes); err != nil {
return userID, fmt.Errorf("userAPI.PerformAccountCreation: %w", err)
}
return userID, nil
@@ -334,7 +312,7 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
AccessToken: hex.EncodeToString(accessTokenBytes[:n]),
}
loginRes := &userapiAPI.PerformDeviceCreationResponse{}
- if err := m.userAPI.PerformDeviceCreation(context.Background(), loginReq, loginRes); err != nil {
+ if err := m.p2pMonolith.GetUserAPI().PerformDeviceCreation(context.Background(), loginReq, loginRes); err != nil {
return "", fmt.Errorf("userAPI.PerformDeviceCreation: %w", err)
}
if !loginRes.DeviceCreated {
@@ -348,12 +326,6 @@ func (m *DendriteMonolith) Start() {
oldKeyfile := filepath.Join(m.StorageDirectory, "p2p.key")
sk, pk := monolith.GetOrCreateKey(keyfile, oldKeyfile)
- var err error
- m.listener, err = net.Listen("tcp", "localhost:65432")
- if err != nil {
- panic(err)
- }
-
m.logger = logrus.Logger{
Out: BindLogger{},
}
@@ -364,164 +336,20 @@ func (m *DendriteMonolith) Start() {
m.p2pMonolith.SetupPinecone(sk)
prefix := hex.EncodeToString(pk)
- cfg := monolith.GenerateDefaultConfig(sk, m.StorageDirectory, prefix)
+ cfg := monolith.GenerateDefaultConfig(sk, m.StorageDirectory, m.CacheDirectory, prefix)
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
cfg.Global.JetStream.InMemory = false
- base := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics)
- m.baseDendrite = base
- base.ConfigureAdminEndpoints()
-
- federation := conn.CreateFederationClient(base, m.p2pMonolith.Sessions)
-
- serverKeyAPI := &signing.YggdrasilKeys{}
- keyRing := serverKeyAPI.KeyRing()
-
- rsAPI := roomserver.NewInternalAPI(base)
-
- m.federationAPI = federationapi.NewInternalAPI(
- base, federation, rsAPI, base.Caches, keyRing, true,
- )
-
- keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, m.federationAPI, rsAPI)
- m.userAPI = userapi.NewInternalAPI(base, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
- keyAPI.SetUserAPI(m.userAPI)
-
- asAPI := appservice.NewInternalAPI(base, m.userAPI, rsAPI)
-
- // The underlying roomserver implementation needs to be able to call the fedsender.
- // This is different to rsAPI which can be the http client which doesn't need this dependency
- rsAPI.SetFederationAPI(m.federationAPI, keyRing)
-
- userProvider := users.NewPineconeUserProvider(m.p2pMonolith.Router, m.p2pMonolith.Sessions, m.userAPI, federation)
- roomProvider := rooms.NewPineconeRoomProvider(m.p2pMonolith.Router, m.p2pMonolith.Sessions, m.federationAPI, federation)
-
- js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
- producer := &producers.SyncAPIProducer{
- JetStream: js,
- TopicReceiptEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent),
- TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
- TopicTypingEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent),
- TopicPresenceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
- TopicDeviceListUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
- TopicSigningKeyUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
- Config: &base.Cfg.FederationAPI,
- UserAPI: m.userAPI,
- }
- m.relayAPI = relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, false)
-
- monolith := setup.Monolith{
- Config: base.Cfg,
- Client: conn.CreateClient(base, m.p2pMonolith.Sessions),
- FedClient: federation,
- KeyRing: keyRing,
-
- AppserviceAPI: asAPI,
- FederationAPI: m.federationAPI,
- RoomserverAPI: rsAPI,
- UserAPI: m.userAPI,
- KeyAPI: keyAPI,
- RelayAPI: m.relayAPI,
- ExtPublicRoomsProvider: roomProvider,
- ExtUserDirectoryProvider: userProvider,
- }
- monolith.AddAllPublicRoutes(base)
-
- httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
- httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux)
- httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)
- httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
- httpRouter.PathPrefix(httputil.DendriteAdminPathPrefix).Handler(base.DendriteAdminMux)
- httpRouter.PathPrefix(httputil.SynapseAdminPathPrefix).Handler(base.SynapseAdminMux)
- httpRouter.HandleFunc("/pinecone", m.p2pMonolith.Router.ManholeHandler)
-
- pMux := mux.NewRouter().SkipClean(true).UseEncodedPath()
- pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles)
- pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux)
- pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
-
- pHTTP := m.p2pMonolith.Sessions.Protocol("matrix").HTTP()
- pHTTP.Mux().Handle(users.PublicURL, pMux)
- pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux)
- pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux)
-
- // Build both ends of a HTTP multiplex.
- h2s := &http2.Server{}
- m.httpServer = &http.Server{
- Addr: ":0",
- TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
- ReadTimeout: 10 * time.Second,
- WriteTimeout: 10 * time.Second,
- IdleTimeout: 30 * time.Second,
- BaseContext: func(_ net.Listener) context.Context {
- return context.Background()
- },
- Handler: h2c.NewHandler(pMux, h2s),
- }
-
- go func() {
- m.logger.Info("Listening on ", cfg.Global.ServerName)
-
- switch m.httpServer.Serve(m.p2pMonolith.Sessions.Protocol("matrix")) {
- case net.ErrClosed, http.ErrServerClosed:
- m.logger.Info("Stopped listening on ", cfg.Global.ServerName)
- default:
- m.logger.Error("Stopped listening on ", cfg.Global.ServerName)
- }
- }()
- go func() {
- logrus.Info("Listening on ", m.listener.Addr())
+ enableRelaying := false
+ enableMetrics := true
+ enableWebsockets := true
+ m.p2pMonolith.SetupDendrite(cfg, 65432, enableRelaying, enableMetrics, enableWebsockets)
- switch http.Serve(m.listener, httpRouter) {
- case net.ErrClosed, http.ErrServerClosed:
- m.logger.Info("Stopped listening on ", cfg.Global.ServerName)
- default:
- m.logger.Error("Stopped listening on ", cfg.Global.ServerName)
- }
- }()
-
- stopRelayServerSync := make(chan bool)
- eLog := logrus.WithField("pinecone", "events")
- m.relayRetriever = relay.NewRelayServerRetriever(
- context.Background(),
- gomatrixserverlib.ServerName(m.p2pMonolith.Router.PublicKey().String()),
- m.federationAPI,
- monolith.RelayAPI,
- stopRelayServerSync,
- )
- m.relayRetriever.InitializeRelayServers(eLog)
-
- go func(ch <-chan pineconeEvents.Event) {
- for event := range ch {
- switch e := event.(type) {
- case pineconeEvents.PeerAdded:
- m.relayRetriever.StartSync()
- case pineconeEvents.PeerRemoved:
- if m.relayRetriever.IsRunning() && m.p2pMonolith.Router.TotalPeerCount() == 0 {
- stopRelayServerSync <- true
- }
- case pineconeEvents.BroadcastReceived:
- // eLog.Info("Broadcast received from: ", e.PeerID)
-
- req := &api.PerformWakeupServersRequest{
- ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
- }
- res := &api.PerformWakeupServersResponse{}
- if err := m.federationAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
- eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
- }
- default:
- }
- }
- }(m.p2pMonolith.EventChannel)
+ useTCPListener := false
+ m.p2pMonolith.StartMonolith(useTCPListener)
}
func (m *DendriteMonolith) Stop() {
- _ = m.baseDendrite.Close()
- m.baseDendrite.WaitForShutdown()
- _ = m.listener.Close()
- m.p2pMonolith.Multicast.Stop()
- _ = m.p2pMonolith.Sessions.Close()
- _ = m.p2pMonolith.Router.Close()
+ m.p2pMonolith.Stop()
}