aboutsummaryrefslogtreecommitdiff
path: root/build
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-06-14 13:13:07 +0100
committerNeil Alexander <neilalexander@users.noreply.github.com>2021-06-14 13:13:07 +0100
commitbd9dec8e066ebf9af6b25b733c8b4fecd67dddcf (patch)
tree35a45e3854c98ea8df739f8ea3941de0598c9c8b /build
parent2c9a390fa67aa80bd8cfffb02b20739d0b1807d8 (diff)
Pinecone demo updates
Diffstat (limited to 'build')
-rw-r--r--build/gobind-pinecone/monolith.go75
1 files changed, 41 insertions, 34 deletions
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go
index c15707e5..09af80f6 100644
--- a/build/gobind-pinecone/monolith.go
+++ b/build/gobind-pinecone/monolith.go
@@ -10,7 +10,6 @@ import (
"io"
"io/ioutil"
"log"
- "math"
"net"
"net/http"
"os"
@@ -37,15 +36,14 @@ import (
userapiAPI "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
- "go.uber.org/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
pineconeMulticast "github.com/matrix-org/pinecone/multicast"
+ "github.com/matrix-org/pinecone/router"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
"github.com/matrix-org/pinecone/types"
- pineconeTypes "github.com/matrix-org/pinecone/types"
_ "golang.org/x/mobile/bind"
)
@@ -57,19 +55,19 @@ const (
)
type DendriteMonolith struct {
- logger logrus.Logger
- PineconeRouter *pineconeRouter.Router
- PineconeMulticast *pineconeMulticast.Multicast
- PineconeQUIC *pineconeSessions.Sessions
- StorageDirectory string
- CacheDirectory string
- staticPeerURI string
- staticPeerMutex sync.RWMutex
- staticPeerAttempts atomic.Uint32
- listener net.Listener
- httpServer *http.Server
- processContext *process.ProcessContext
- userAPI userapiAPI.UserInternalAPI
+ logger logrus.Logger
+ PineconeRouter *pineconeRouter.Router
+ PineconeMulticast *pineconeMulticast.Multicast
+ PineconeQUIC *pineconeSessions.Sessions
+ StorageDirectory string
+ CacheDirectory string
+ staticPeerURI string
+ staticPeerMutex sync.RWMutex
+ staticPeerAttempt chan struct{}
+ listener net.Listener
+ httpServer *http.Server
+ processContext *process.ProcessContext
+ userAPI userapiAPI.UserInternalAPI
}
func (m *DendriteMonolith) BaseURL() string {
@@ -99,7 +97,9 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) {
m.staticPeerMutex.Unlock()
m.DisconnectType(pineconeRouter.PeerTypeRemote)
if uri != "" {
- go m.staticPeerConnect()
+ go func() {
+ m.staticPeerAttempt <- struct{}{}
+ }()
}
}
@@ -195,17 +195,27 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
}
func (m *DendriteMonolith) staticPeerConnect() {
- m.staticPeerMutex.RLock()
- uri := m.staticPeerURI
- m.staticPeerMutex.RUnlock()
- if uri == "" {
- return
+ attempt := func() {
+ if m.PineconeRouter.PeerCount(router.PeerTypeRemote) == 0 {
+ m.staticPeerMutex.RLock()
+ uri := m.staticPeerURI
+ m.staticPeerMutex.RUnlock()
+ if uri == "" {
+ return
+ }
+ if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil {
+ logrus.WithError(err).Error("Failed to connect to static peer")
+ }
+ }
}
- if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil {
- exp := time.Second * time.Duration(math.Exp2(float64(m.staticPeerAttempts.Inc())))
- time.AfterFunc(exp, m.staticPeerConnect)
- } else {
- m.staticPeerAttempts.Store(0)
+ for {
+ select {
+ case <-m.processContext.Context().Done():
+ case <-m.staticPeerAttempt:
+ attempt()
+ case <-time.After(time.Second * 5):
+ attempt()
+ }
}
}
@@ -248,13 +258,6 @@ func (m *DendriteMonolith) Start() {
m.PineconeQUIC = pineconeSessions.NewSessions(logger, m.PineconeRouter)
m.PineconeMulticast = pineconeMulticast.NewMulticast(logger, m.PineconeRouter)
- m.PineconeRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
- if peertype == pineconeRouter.PeerTypeRemote {
- m.staticPeerAttempts.Store(0)
- time.AfterFunc(time.Second, m.staticPeerConnect)
- }
- })
-
prefix := hex.EncodeToString(pk)
cfg := &config.Dendrite{}
cfg.Defaults()
@@ -359,8 +362,12 @@ func (m *DendriteMonolith) Start() {
},
Handler: h2c.NewHandler(pMux, h2s),
}
+
m.processContext = base.ProcessContext
+ m.staticPeerAttempt = make(chan struct{}, 1)
+ go m.staticPeerConnect()
+
go func() {
m.logger.Info("Listening on ", cfg.Global.ServerName)
m.logger.Fatal(m.httpServer.Serve(m.PineconeQUIC))