diff options
Diffstat (limited to 'cmd/dendrite-demo-pinecone/main.go')
-rw-r--r-- | cmd/dendrite-demo-pinecone/main.go | 112 |
1 files changed, 15 insertions, 97 deletions
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 3f63ab65..abffbb67 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" @@ -43,7 +44,6 @@ import ( "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/relayapi" - relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" @@ -52,7 +52,6 @@ import ( "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" - "go.uber.org/atomic" pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" @@ -72,8 +71,6 @@ var ( instanceRelayingEnabled = flag.Bool("relay", false, "whether to enable store & forward relaying for other nodes") ) -const relayServerRetryInterval = time.Second * 30 - // nolint:gocyclo func main() { flag.Parse() @@ -328,28 +325,24 @@ func main() { logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter)) }() - go func(ch <-chan pineconeEvents.Event) { - eLog := logrus.WithField("pinecone", "events") - relayServerSyncRunning := atomic.NewBool(false) - stopRelayServerSync := make(chan bool) - - m := RelayServerRetriever{ - Context: context.Background(), - ServerName: gomatrixserverlib.ServerName(pRouter.PublicKey().String()), - FederationAPI: fsAPI, - RelayServersQueried: make(map[gomatrixserverlib.ServerName]bool), - RelayAPI: monolith.RelayAPI, - } - m.InitializeRelayServers(eLog) + stopRelayServerSync := make(chan bool) + eLog := logrus.WithField("pinecone", "events") + relayRetriever := relay.NewRelayServerRetriever( + context.Background(), + gomatrixserverlib.ServerName(pRouter.PublicKey().String()), + monolith.FederationAPI, + monolith.RelayAPI, + stopRelayServerSync, + ) + relayRetriever.InitializeRelayServers(eLog) + go func(ch <-chan pineconeEvents.Event) { for event := range ch { switch e := event.(type) { case pineconeEvents.PeerAdded: - if !relayServerSyncRunning.Load() { - go m.syncRelayServers(stopRelayServerSync, *relayServerSyncRunning) - } + relayRetriever.StartSync() case pineconeEvents.PeerRemoved: - if relayServerSyncRunning.Load() && pRouter.TotalPeerCount() == 0 { + if relayRetriever.IsRunning() && pRouter.TotalPeerCount() == 0 { stopRelayServerSync <- true } case pineconeEvents.BroadcastReceived: @@ -359,7 +352,7 @@ func main() { ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, } res := &api.PerformWakeupServersResponse{} - if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { + if err := monolith.FederationAPI.PerformWakeupServers(base.Context(), req, res); err != nil { eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) } default: @@ -369,78 +362,3 @@ func main() { base.WaitForShutdown() } - -type RelayServerRetriever struct { - Context context.Context - ServerName gomatrixserverlib.ServerName - FederationAPI api.FederationInternalAPI - RelayServersQueried map[gomatrixserverlib.ServerName]bool - RelayAPI relayServerAPI.RelayInternalAPI -} - -func (m *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) { - request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(m.ServerName)} - response := api.P2PQueryRelayServersResponse{} - err := m.FederationAPI.P2PQueryRelayServers(m.Context, &request, &response) - if err != nil { - eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error()) - } - for _, server := range response.RelayServers { - m.RelayServersQueried[server] = false - } - - eLog.Infof("Registered relay servers: %v", response.RelayServers) -} - -func (m *RelayServerRetriever) syncRelayServers(stop <-chan bool, running atomic.Bool) { - defer running.Store(false) - - t := time.NewTimer(relayServerRetryInterval) - for { - relayServersToQuery := []gomatrixserverlib.ServerName{} - for server, complete := range m.RelayServersQueried { - if !complete { - relayServersToQuery = append(relayServersToQuery, server) - } - } - if len(relayServersToQuery) == 0 { - // All relay servers have been synced. - return - } - m.queryRelayServers(relayServersToQuery) - t.Reset(relayServerRetryInterval) - - select { - case <-stop: - // We have been asked to stop syncing, drain the timer and return. - if !t.Stop() { - <-t.C - } - return - case <-t.C: - // The timer has expired. Continue to the next loop iteration. - } - } -} - -func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) { - logrus.Info("querying relay servers for any available transactions") - for _, server := range relayServers { - userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false) - if err != nil { - return - } - err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server) - if err == nil { - m.RelayServersQueried[server] = true - // TODO : What happens if your relay receives new messages after this point? - // Should you continue to check with them, or should they try and contact you? - // They could send a "new_async_events" message your way maybe? - // Then you could mark them as needing to be queried again. - // What if you miss this message? - // Maybe you should try querying them again after a certain period of time as a backup? - } else { - logrus.Errorf("Failed querying relay server: %s", err.Error()) - } - } -} |