diff options
Diffstat (limited to 'cmd/dendrite-demo-pinecone/monolith/monolith.go')
-rw-r--r-- | cmd/dendrite-demo-pinecone/monolith/monolith.go | 83 |
1 files changed, 58 insertions, 25 deletions
diff --git a/cmd/dendrite-demo-pinecone/monolith/monolith.go b/cmd/dendrite-demo-pinecone/monolith/monolith.go index 27720369..e8a29e41 100644 --- a/cmd/dendrite-demo-pinecone/monolith/monolith.go +++ b/cmd/dendrite-demo-pinecone/monolith/monolith.go @@ -68,12 +68,14 @@ type P2PMonolith struct { EventChannel chan pineconeEvents.Event RelayRetriever relay.RelayServerRetriever - dendrite setup.Monolith - port int - httpMux *mux.Router - pineconeMux *mux.Router - listener net.Listener - httpListenAddr string + dendrite setup.Monolith + port int + httpMux *mux.Router + pineconeMux *mux.Router + httpServer *http.Server + listener net.Listener + httpListenAddr string + stopHandlingEvents chan bool } func GenerateDefaultConfig(sk ed25519.PrivateKey, storageDir string, cacheDir string, dbPrefix string) *config.Dendrite { @@ -199,8 +201,10 @@ func (p *P2PMonolith) StartMonolith() { } func (p *P2PMonolith) Stop() { + logrus.Info("Stopping monolith") _ = p.BaseDendrite.Close() p.WaitForShutdown() + logrus.Info("Stopped monolith") } func (p *P2PMonolith) WaitForShutdown() { @@ -209,6 +213,16 @@ func (p *P2PMonolith) WaitForShutdown() { } func (p *P2PMonolith) closeAllResources() { + logrus.Info("Closing monolith resources") + if p.httpServer != nil { + p.httpServer.Shutdown(context.Background()) + } + + select { + case p.stopHandlingEvents <- true: + default: + } + if p.listener != nil { _ = p.listener.Close() } @@ -224,6 +238,7 @@ func (p *P2PMonolith) closeAllResources() { if p.Router != nil { _ = p.Router.Close() } + logrus.Info("Monolith resources closed") } func (p *P2PMonolith) Addr() string { @@ -280,7 +295,7 @@ func (p *P2PMonolith) setupHttpServers(userProvider *users.PineconeUserProvider, func (p *P2PMonolith) startHTTPServers() { go func() { // Build both ends of a HTTP multiplex. - httpServer := &http.Server{ + p.httpServer = &http.Server{ Addr: ":0", TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, ReadTimeout: 10 * time.Second, @@ -296,12 +311,13 @@ func (p *P2PMonolith) startHTTPServers() { pubkeyString := hex.EncodeToString(pubkey[:]) logrus.Info("Listening on ", pubkeyString) - switch httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) { + switch p.httpServer.Serve(p.Sessions.Protocol(SessionProtocol)) { case net.ErrClosed, http.ErrServerClosed: logrus.Info("Stopped listening on ", pubkeyString) default: logrus.Error("Stopped listening on ", pubkeyString) } + logrus.Info("Stopped goroutine listening on ", pubkeyString) }() p.httpListenAddr = fmt.Sprintf(":%d", p.port) @@ -313,10 +329,12 @@ func (p *P2PMonolith) startHTTPServers() { default: logrus.Error("Stopped listening on ", p.httpListenAddr) } + logrus.Info("Stopped goroutine listening on ", p.httpListenAddr) }() } func (p *P2PMonolith) startEventHandler() { + p.stopHandlingEvents = make(chan bool) stopRelayServerSync := make(chan bool) eLog := logrus.WithField("pinecone", "events") p.RelayRetriever = relay.NewRelayServerRetriever( @@ -329,25 +347,40 @@ func (p *P2PMonolith) startEventHandler() { p.RelayRetriever.InitializeRelayServers(eLog) go func(ch <-chan pineconeEvents.Event) { - for event := range ch { - switch e := event.(type) { - case pineconeEvents.PeerAdded: - p.RelayRetriever.StartSync() - case pineconeEvents.PeerRemoved: - if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 { - stopRelayServerSync <- true + for { + select { + case event := <-ch: + switch e := event.(type) { + case pineconeEvents.PeerAdded: + p.RelayRetriever.StartSync() + case pineconeEvents.PeerRemoved: + if p.RelayRetriever.IsRunning() && p.Router.TotalPeerCount() == 0 { + // NOTE: Don't block on channel + select { + case stopRelayServerSync <- true: + default: + } + } + case pineconeEvents.BroadcastReceived: + // eLog.Info("Broadcast received from: ", e.PeerID) + + req := &federationAPI.PerformWakeupServersRequest{ + ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, + } + res := &federationAPI.PerformWakeupServersResponse{} + if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil { + eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) + } } - case pineconeEvents.BroadcastReceived: - // eLog.Info("Broadcast received from: ", e.PeerID) - - req := &federationAPI.PerformWakeupServersRequest{ - ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, - } - res := &federationAPI.PerformWakeupServersResponse{} - if err := p.dendrite.FederationAPI.PerformWakeupServers(p.BaseDendrite.Context(), req, res); err != nil { - eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) + case <-p.stopHandlingEvents: + logrus.Info("Stopping processing pinecone events") + // NOTE: Don't block on channel + select { + case stopRelayServerSync <- true: + default: } - default: + logrus.Info("Stopped processing pinecone events") + return } } }(p.EventChannel) |