aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-11-25 09:46:26 +0000
committerNeil Alexander <neilalexander@users.noreply.github.com>2021-11-25 09:46:26 +0000
commit9bc1c36ff6fbf8de130c7131b67b7a7ce9e31e1e (patch)
tree5e1af77a9ede087f949d1ba9959a5cb961c7cd40
parent25dcf801806bbca4ac76060f595591881b67de32 (diff)
Support connecting to multiple Pinecone static peers in the P2P demos (supply a comma-separated list)
-rw-r--r--build/gobind-pinecone/monolith.go47
-rw-r--r--cmd/dendrite-demo-pinecone/conn/client.go7
-rw-r--r--cmd/dendrite-demo-pinecone/main.go35
3 files changed, 62 insertions, 27 deletions
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go
index 47ca20db..cd2809e4 100644
--- a/build/gobind-pinecone/monolith.go
+++ b/build/gobind-pinecone/monolith.go
@@ -13,6 +13,7 @@ import (
"net"
"net/http"
"os"
+ "strings"
"sync"
"time"
@@ -75,7 +76,7 @@ func (m *DendriteMonolith) BaseURL() string {
}
func (m *DendriteMonolith) PeerCount(peertype int) int {
- return m.PineconeRouter.PeerCount(peertype)
+ return m.PineconeRouter.PeerCount(pineconeRouter.ConnectionPeerType(peertype))
}
func (m *DendriteMonolith) SessionCount() int {
@@ -87,15 +88,15 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
m.PineconeMulticast.Start()
} else {
m.PineconeMulticast.Stop()
- m.DisconnectType(pineconeRouter.PeerTypeMulticast)
+ m.DisconnectType(int(pineconeRouter.PeerTypeMulticast))
}
}
func (m *DendriteMonolith) SetStaticPeer(uri string) {
m.staticPeerMutex.Lock()
- m.staticPeerURI = uri
+ m.staticPeerURI = strings.TrimSpace(uri)
m.staticPeerMutex.Unlock()
- m.DisconnectType(pineconeRouter.PeerTypeRemote)
+ m.DisconnectType(int(pineconeRouter.PeerTypeRemote))
if uri != "" {
go func() {
m.staticPeerAttempt <- struct{}{}
@@ -105,7 +106,7 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) {
func (m *DendriteMonolith) DisconnectType(peertype int) {
for _, p := range m.PineconeRouter.Peers() {
- if peertype == p.PeerType {
+ if int(peertype) == p.PeerType {
m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil)
}
}
@@ -133,7 +134,11 @@ func (m *DendriteMonolith) Conduit(zone string, peertype int) (*Conduit, error)
for i := 1; i <= 10; i++ {
logrus.Errorf("Attempting authenticated connect (attempt %d)", i)
var err error
- conduit.port, err = m.PineconeRouter.AuthenticatedConnect(l, zone, peertype, true)
+ conduit.port, err = m.PineconeRouter.Connect(
+ l,
+ pineconeRouter.ConnectionZone(zone),
+ pineconeRouter.ConnectionPeerType(peertype),
+ )
switch err {
case io.ErrClosedPipe:
logrus.Errorf("Authenticated connect failed due to closed pipe (attempt %d)", i)
@@ -195,16 +200,28 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
}
func (m *DendriteMonolith) staticPeerConnect() {
+ connected := map[string]bool{} // URI -> connected?
attempt := func() {
- if m.PineconeRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
- m.staticPeerMutex.RLock()
- uri := m.staticPeerURI
- m.staticPeerMutex.RUnlock()
- if uri == "" {
- return
- }
- if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil {
- logrus.WithError(err).Error("Failed to connect to static peer")
+ m.staticPeerMutex.RLock()
+ uri := m.staticPeerURI
+ m.staticPeerMutex.RUnlock()
+ if uri == "" {
+ return
+ }
+ for k := range connected {
+ connected[k] = false
+ }
+ for _, uri := range strings.Split(uri, ",") {
+ connected[strings.TrimSpace(uri)] = false
+ }
+ for _, info := range m.PineconeRouter.Peers() {
+ connected[info.URI] = true
+ }
+ for k, online := range connected {
+ if !online {
+ if err := conn.ConnectToPeer(m.PineconeRouter, k); err != nil {
+ logrus.WithError(err).Error("Failed to connect to static peer")
+ }
}
}
}
diff --git a/cmd/dendrite-demo-pinecone/conn/client.go b/cmd/dendrite-demo-pinecone/conn/client.go
index 40ccb9c0..14b648a3 100644
--- a/cmd/dendrite-demo-pinecone/conn/client.go
+++ b/cmd/dendrite-demo-pinecone/conn/client.go
@@ -34,7 +34,12 @@ func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error {
if parent == nil {
return fmt.Errorf("failed to wrap connection")
}
- _, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote, true)
+ _, err := pRouter.Connect(
+ parent,
+ pineconeRouter.ConnectionZone("static"),
+ pineconeRouter.PeerTypeRemote,
+ pineconeRouter.ConnectionURI(peer),
+ )
return err
}
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go
index 2b56ef35..8ed3f349 100644
--- a/cmd/dendrite-demo-pinecone/main.go
+++ b/cmd/dendrite-demo-pinecone/main.go
@@ -26,6 +26,7 @@ import (
"net"
"net/http"
"os"
+ "strings"
"time"
"github.com/gorilla/mux"
@@ -61,7 +62,7 @@ import (
var (
instanceName = flag.String("name", "dendrite-p2p-pinecone", "the name of this P2P demo instance")
instancePort = flag.Int("port", 8008, "the port that the client API will listen on")
- instancePeer = flag.String("peer", "", "the static Pinecone peer to connect to")
+ instancePeer = flag.String("peer", "", "the static Pinecone peers to connect to, comma separated-list")
instanceListen = flag.String("listen", ":0", "the port Pinecone peers can connect to")
)
@@ -109,9 +110,9 @@ func main() {
continue
}
- port, err := pRouter.AuthenticatedConnect(conn, "", pineconeRouter.PeerTypeRemote, true)
+ port, err := pRouter.Connect(conn, pineconeRouter.PeerTypeRemote)
if err != nil {
- logrus.WithError(err).Error("pSwitch.AuthenticatedConnect failed")
+ logrus.WithError(err).Error("pSwitch.Connect failed")
continue
}
@@ -124,14 +125,22 @@ func main() {
pMulticast.Start()
connectToStaticPeer := func() {
+ connected := map[string]bool{} // URI -> connected?
+ for _, uri := range strings.Split(*instancePeer, ",") {
+ connected[strings.TrimSpace(uri)] = false
+ }
attempt := func() {
- if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
- uri := *instancePeer
- if uri == "" {
- return
- }
- if err := conn.ConnectToPeer(pRouter, uri); err != nil {
- logrus.WithError(err).Error("Failed to connect to static peer")
+ for k := range connected {
+ connected[k] = false
+ }
+ for _, info := range pRouter.Peers() {
+ connected[info.URI] = true
+ }
+ for k, online := range connected {
+ if !online {
+ if err := conn.ConnectToPeer(pRouter, k); err != nil {
+ logrus.WithError(err).Error("Failed to connect to static peer")
+ }
}
}
}
@@ -230,7 +239,11 @@ func main() {
return
}
conn := conn.WrapWebSocketConn(c)
- if _, err = pRouter.AuthenticatedConnect(conn, "websocket", pineconeRouter.PeerTypeRemote, true); err != nil {
+ if _, err = pRouter.Connect(
+ conn,
+ pineconeRouter.ConnectionZone("websocket"),
+ pineconeRouter.PeerTypeRemote,
+ ); err != nil {
logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
}
})