aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorDevon Hudson <devonhudson@librem.one>2023-02-24 15:41:47 -0700
committerDevon Hudson <devonhudson@librem.one>2023-02-24 15:41:47 -0700
commitb28406c7d01550c6e26e5cf365332c56ff95fbf2 (patch)
tree8ef0db074002283b9a762d18b72f0cf52be8ef1b /cmd
parent3d31b131fc6b66e25b09c41204ea71ed4a47dcc6 (diff)
Tweaks to pinecone demo to shutdown more cleanly
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dendrite-demo-pinecone/monolith/monolith.go83
-rw-r--r--cmd/dendrite-demo-pinecone/relay/retriever.go5
-rw-r--r--cmd/dendrite-demo-pinecone/relay/retriever_test.go4
3 files changed, 63 insertions, 29 deletions
diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go
index 27720369..e8a29e41 100644
--- a/cmd/dendrite-demo-pinecone/monolith/monolith.go
+++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go
@@ -68,12 +68,14 @@ type P2PMonolith struct {
EventChannel chan pineconeEvents.Event
RelayRetriever relay.RelayServerRetriever
- dendrite setup.Monolith
- port int
- httpMux *mux.Router
- pineconeMux *mux.Router
- listener net.Listener
- httpListenAddr string
+ dendrite setup.Monolith
+ port int
+ httpMux *mux.Router
+ pineconeMux *mux.Router
+ httpServer *http.Server
+ listener net.Listener
+ httpListenAddr string
+ stopHandlingEvents chan bool
}
func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir string, dbPrefix string) *config.Dendrite {
@@ -199,8 +201,10 @@ func (p *P2PMonolith) StartMonolith() {
}
func (p *P2PMonolith) Stop() {
+ logrus.Info("Stopping monolith")
_ = p.BaseDendrite.Close()
p.WaitForShutdown()
+ logrus.Info("Stopped monolith")
}
func (p *P2PMonolith) WaitForShutdown() {
@@ -209,6 +213,16 @@ func (p *P2PMonolith) WaitForShutdown() {
}
func (p *P2PMonolith) closeAllResources() {
+ logrus.Info("Closing monolith resources")
+ if p.httpServer != nil {
+ p.httpServer.Shutdown(context.Background())
+ }
+
+ select {
+ case p.stopHandlingEvents <- true:
+ default:
+ }
+
if p.listener != nil {
_ = p.listener.Close()
}
@@ -224,6 +238,7 @@ func (p *P2PMonolith) closeAllResources() {
if p.Router != nil {
_ = p.Router.Close()
}
+ logrus.Info("Monolith resources closed")
}
func (p *P2PMonolith) Addr() string {
@@ -280,7 +295,7 @@ func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider,
func (p *P2PMonolith) startHTTPServers() {
go func() {
// Build both ends of a HTTP multiplex.
- httpServer := &http.Server{
+ p.httpServer = &http.Server{
Addr: ":0",
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
ReadTimeout: 10 * time.Second,
@@ -296,12 +311,13 @@ func (p *P2PMonolith) startHTTPServers() {
pubkeyString := hex.EncodeToString(pubkey[:])
logrus.Info("Listening on ", pubkeyString)
- switch httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) {
+ switch p.httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) {
case net.ErrClosed, http.ErrServerClosed:
logrus.Info("Stopped listening on ", pubkeyString)
default:
logrus.Error("Stopped listening on ", pubkeyString)
}
+ logrus.Info("Stopped goroutine listening on ", pubkeyString)
}()
p.httpListenAddr = fmt.Sprintf(":%d", p.port)
@@ -313,10 +329,12 @@ func (p *P2PMonolith) startHTTPServers() {
default:
logrus.Error("Stopped listening on ", p.httpListenAddr)
}
+ logrus.Info("Stopped goroutine listening on ", p.httpListenAddr)
}()
}
func (p *P2PMonolith) startEventHandler() {
+ p.stopHandlingEvents = make(chan bool)
stopRelayServerSync := make(chan bool)
eLog := logrus.WithField("pinecone", "events")
p.RelayRetriever = relay.NewRelayServerRetriever(
@@ -329,25 +347,40 @@ func (p *P2PMonolith) startEventHandler() {
p.RelayRetriever.InitializeRelayServers(eLog)
go func(ch <-chan pineconeEvents.Event) {
- for event := range ch {
- switch e := event.(type) {
- case pineconeEvents.PeerAdded:
- p.RelayRetriever.StartSync()
- case pineconeEvents.PeerRemoved:
- if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 {
- stopRelayServerSync <- true
+ for {
+ select {
+ case event := <-ch:
+ switch e := event.(type) {
+ case pineconeEvents.PeerAdded:
+ p.RelayRetriever.StartSync()
+ case pineconeEvents.PeerRemoved:
+ if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 {
+ // NOTE: Don't block on channel
+ select {
+ case stopRelayServerSync <- true:
+ default:
+ }
+ }
+ case pineconeEvents.BroadcastReceived:
+ // eLog.Info("Broadcast received from: ", e.PeerID)
+
+ req := &federationAPI.PerformWakeupServersRequest{
+ ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
+ }
+ res := &federationAPI.PerformWakeupServersResponse{}
+ if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil {
+ eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
+ }
}
- case pineconeEvents.BroadcastReceived:
- // eLog.Info("Broadcast received from: ", e.PeerID)
-
- req := &federationAPI.PerformWakeupServersRequest{
- ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
- }
- res := &federationAPI.PerformWakeupServersResponse{}
- if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil {
- eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
+ case <-p.stopHandlingEvents:
+ logrus.Info("Stopping processing pinecone events")
+ // NOTE: Don't block on channel
+ select {
+ case stopRelayServerSync <- true:
+ default:
}
- default:
+ logrus.Info("Stopped processing pinecone events")
+ return
}
}
}(p.EventChannel)
diff --git a/cmd/dendrite-demo-pinecone/relay/retriever.go b/cmd/dendrite-demo-pinecone/relay/retriever.go
index 1b5c617e..6b34f641 100644
--- a/cmd/dendrite-demo-pinecone/relay/retriever.go
+++ b/cmd/dendrite-demo-pinecone/relay/retriever.go
@@ -38,7 +38,7 @@ type RelayServerRetriever struct {
relayServersQueried map[gomatrixserverlib.ServerName]bool
queriedServersMutex sync.Mutex
running atomic.Bool
- quit <-chan bool
+ quit chan bool
}
func NewRelayServerRetriever(
@@ -46,7 +46,7 @@ func NewRelayServerRetriever(
serverName gomatrixserverlib.ServerName,
federationAPI federationAPI.FederationInternalAPI,
relayAPI relayServerAPI.RelayInternalAPI,
- quit <-chan bool,
+ quit chan bool,
) RelayServerRetriever {
return RelayServerRetriever{
ctx: ctx,
@@ -151,6 +151,7 @@ func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
if !t.Stop() {
<-t.C
}
+ logrus.Info("Stopped relay server retriever")
return
case <-t.C:
}
diff --git a/cmd/dendrite-demo-pinecone/relay/retriever_test.go b/cmd/dendrite-demo-pinecone/relay/retriever_test.go
index 8f86a377..6c4c3a52 100644
--- a/cmd/dendrite-demo-pinecone/relay/retriever_test.go
+++ b/cmd/dendrite-demo-pinecone/relay/retriever_test.go
@@ -60,7 +60,7 @@ func TestRelayRetrieverInitialization(t *testing.T) {
"server",
&FakeFedAPI{},
&FakeRelayAPI{},
- make(<-chan bool),
+ make(chan bool),
)
retriever.InitializeRelayServers(logrus.WithField("test", "relay"))
@@ -74,7 +74,7 @@ func TestRelayRetrieverSync(t *testing.T) {
"server",
&FakeFedAPI{},
&FakeRelayAPI{},
- make(<-chan bool),
+ make(chan bool),
)
retriever.InitializeRelayServers(logrus.WithField("test", "relay"))