aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--build/gobind-pinecone/monolith.go57
-rw-r--r--cmd/dendrite-demo-pinecone/main.go40
-rw-r--r--cmd/dendritejs-pinecone/main.go20
3 files changed, 16 insertions, 101 deletions
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go
index 60859949..9cc94d65 100644
--- a/build/gobind-pinecone/monolith.go
+++ b/build/gobind-pinecone/monolith.go
@@ -52,6 +52,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
+ pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
@@ -71,11 +72,9 @@ type DendriteMonolith struct {
PineconeRouter *pineconeRouter.Router
PineconeMulticast *pineconeMulticast.Multicast
PineconeQUIC *pineconeSessions.Sessions
+ PineconeManager *pineconeConnections.ConnectionManager
StorageDirectory string
CacheDirectory string
- staticPeerURI string
- staticPeerMutex sync.RWMutex
- staticPeerAttempt chan struct{}
listener net.Listener
httpServer *http.Server
processContext *process.ProcessContext
@@ -104,15 +103,8 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
}
func (m *DendriteMonolith) SetStaticPeer(uri string) {
- m.staticPeerMutex.Lock()
- m.staticPeerURI = strings.TrimSpace(uri)
- m.staticPeerMutex.Unlock()
- m.DisconnectType(int(pineconeRouter.PeerTypeRemote))
- if uri != "" {
- go func() {
- m.staticPeerAttempt <- struct{}{}
- }()
- }
+ m.PineconeManager.RemovePeers()
+ m.PineconeManager.AddPeer(strings.TrimSpace(uri))
}
func (m *DendriteMonolith) DisconnectType(peertype int) {
@@ -210,43 +202,6 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
return loginRes.Device.AccessToken, nil
}
-func (m *DendriteMonolith) staticPeerConnect() {
- connected := map[string]bool{} // URI -> connected?
- attempt := func() {
- m.staticPeerMutex.RLock()
- uri := m.staticPeerURI
- m.staticPeerMutex.RUnlock()
- if uri == "" {
- return
- }
- for k := range connected {
- delete(connected, k)
- }
- for _, uri := range strings.Split(uri, ",") {
- connected[strings.TrimSpace(uri)] = false
- }
- for _, info := range m.PineconeRouter.Peers() {
- connected[info.URI] = true
- }
- for k, online := range connected {
- if !online {
- if err := conn.ConnectToPeer(m.PineconeRouter, k); err != nil {
- logrus.WithError(err).Error("Failed to connect to static peer")
- }
- }
- }
- }
- for {
- select {
- case <-m.processContext.Context().Done():
- case <-m.staticPeerAttempt:
- attempt()
- case <-time.After(time.Second * 5):
- attempt()
- }
- }
-}
-
// nolint:gocyclo
func (m *DendriteMonolith) Start() {
var err error
@@ -284,6 +239,7 @@ func (m *DendriteMonolith) Start() {
m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"})
m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter)
+ m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter)
prefix := hex.EncodeToString(pk)
cfg := &config.Dendrite{}
@@ -392,9 +348,6 @@ func (m *DendriteMonolith) Start() {
m.processContext = base.ProcessContext
- m.staticPeerAttempt = make(chan struct{}, 1)
- go m.staticPeerConnect()
-
go func() {
m.logger.Info("Listening on ", cfg.Global.ServerName)
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC.Protocol("matrix")))
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go
index a3d3ed17..dd1ab369 100644
--- a/cmd/dendrite-demo-pinecone/main.go
+++ b/cmd/dendrite-demo-pinecone/main.go
@@ -25,7 +25,6 @@ import (
"net"
"net/http"
"os"
- "strings"
"time"
"github.com/gorilla/mux"
@@ -47,6 +46,7 @@ import (
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
+ pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
@@ -90,6 +90,13 @@ func main() {
}
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
+ pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
+ pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
+ pManager := pineconeConnections.NewConnectionManager(pRouter)
+ pMulticast.Start()
+ if instancePeer != nil && *instancePeer != "" {
+ pManager.AddPeer(*instancePeer)
+ }
go func() {
listener, err := net.Listen("tcp", *instanceListen)
@@ -119,36 +126,6 @@ func main() {
}
}()
- pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
- pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter)
- pMulticast.Start()
-
- connectToStaticPeer := func() {
- connected := map[string]bool{} // URI -> connected?
- for _, uri := range strings.Split(*instancePeer, ",") {
- connected[strings.TrimSpace(uri)] = false
- }
- attempt := func() {
- for k := range connected {
- connected[k] = false
- }
- for _, info := range pRouter.Peers() {
- connected[info.URI] = true
- }
- for k, online := range connected {
- if !online {
- if err := conn.ConnectToPeer(pRouter, k); err != nil {
- logrus.WithError(err).Error("Failed to connect to static peer")
- }
- }
- }
- }
- for {
- attempt()
- time.Sleep(time.Second * 5)
- }
- }
-
cfg := &config.Dendrite{}
cfg.Defaults(true)
cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
@@ -268,7 +245,6 @@ func main() {
Handler: pMux,
}
- go connectToStaticPeer()
go func() {
pubkey := pRouter.PublicKey()
logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))
diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go
index ba9edf23..211b3e13 100644
--- a/cmd/dendritejs-pinecone/main.go
+++ b/cmd/dendritejs-pinecone/main.go
@@ -22,7 +22,6 @@ import (
"encoding/hex"
"fmt"
"syscall/js"
- "time"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/appservice"
@@ -44,6 +43,7 @@ import (
_ "github.com/matrix-org/go-sqlite3-js"
+ pineconeConnections "github.com/matrix-org/pinecone/connections"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
@@ -154,6 +154,8 @@ func startup() {
pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk, false)
pSessions := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"})
+ pManager := pineconeConnections.NewConnectionManager(pRouter)
+ pManager.AddPeer("wss://pinecone.matrix.org/public")
cfg := &config.Dendrite{}
cfg.Defaults(true)
@@ -237,20 +239,4 @@ func startup() {
}
s.ListenAndServe("fetch")
}()
-
- // Connect to the static peer
- go func() {
- for {
- if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
- if err := conn.ConnectToPeer(pRouter, publicPeer); err != nil {
- logrus.WithError(err).Error("Failed to connect to static peer")
- }
- }
- select {
- case <-base.ProcessContext.Context().Done():
- return
- case <-time.After(time.Second * 5):
- }
- }
- }()
}