diff options
author | Devon Hudson <devonhudson@librem.one> | 2023-01-31 15:51:08 -0700 |
---|---|---|
committer | Devon Hudson <devonhudson@librem.one> | 2023-02-01 13:41:36 -0700 |
commit | be43b9c0eae5fd45e38f954c23cfadbdfa77cb32 (patch) | |
tree | fd7b4a61f21f63512d40ecea818d53e7d73995e1 /build | |
parent | 4738fe656fc0975b5faa4fcdf6c11a776b9563a0 (diff) |
Refactor common relay sync struct to remove duplication
Diffstat (limited to 'build')
-rw-r--r-- | build/gobind-pinecone/monolith.go | 218 | ||||
-rw-r--r-- | build/gobind-pinecone/monolith_test.go | 69 |
2 files changed, 15 insertions, 272 deletions
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index a219621b..f44eed89 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -36,6 +36,7 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi/userutil" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/relay" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" @@ -70,11 +71,10 @@ import ( ) const ( - PeerTypeRemote = pineconeRouter.PeerTypeRemote - PeerTypeMulticast = pineconeRouter.PeerTypeMulticast - PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth - PeerTypeBonjour = pineconeRouter.PeerTypeBonjour - relayServerRetryInterval = time.Second * 30 + PeerTypeRemote = pineconeRouter.PeerTypeRemote + PeerTypeMulticast = pineconeRouter.PeerTypeMulticast + PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth + PeerTypeBonjour = pineconeRouter.PeerTypeBonjour ) type DendriteMonolith struct { @@ -91,7 +91,7 @@ type DendriteMonolith struct { userAPI userapiAPI.UserInternalAPI federationAPI api.FederationInternalAPI relayAPI relayServerAPI.RelayInternalAPI - relayRetriever RelayServerRetriever + relayRetriever relay.RelayServerRetriever } func (m *DendriteMonolith) PublicKey() string { @@ -189,57 +189,6 @@ func getServerKeyFromString(nodeID string) (gomatrixserverlib.ServerName, error) return nodeKey, nil } -func updateNodeRelayServers( - node gomatrixserverlib.ServerName, - relays []gomatrixserverlib.ServerName, - ctx context.Context, - fedAPI api.FederationInternalAPI, -) { - // Get the current relay list - request := api.P2PQueryRelayServersRequest{Server: node} - response := api.P2PQueryRelayServersResponse{} - err := fedAPI.P2PQueryRelayServers(ctx, &request, &response) - if err != nil { - logrus.Warnf("Failed obtaining list of relay servers for %s: %s", node, err.Error()) - } - - // Remove old, non-matching relays - var serversToRemove []gomatrixserverlib.ServerName - for _, existingServer := range response.RelayServers { - shouldRemove := true - for _, newServer := range relays { - if newServer == existingServer { - shouldRemove = false - break - } - } - - if shouldRemove { - serversToRemove = append(serversToRemove, existingServer) - } - } - removeRequest := api.P2PRemoveRelayServersRequest{ - Server: node, - RelayServers: serversToRemove, - } - removeResponse := api.P2PRemoveRelayServersResponse{} - err = fedAPI.P2PRemoveRelayServers(ctx, &removeRequest, &removeResponse) - if err != nil { - logrus.Warnf("Failed removing old relay servers for %s: %s", node, err.Error()) - } - - // Add new relays - addRequest := api.P2PAddRelayServersRequest{ - Server: node, - RelayServers: relays, - } - addResponse := api.P2PAddRelayServersResponse{} - err = fedAPI.P2PAddRelayServers(ctx, &addRequest, &addResponse) - if err != nil { - logrus.Warnf("Failed adding relay servers for %s: %s", node, err.Error()) - } -} - func (m *DendriteMonolith) SetRelayServers(nodeID string, uris string) { relays := []gomatrixserverlib.ServerName{} for _, uri := range strings.Split(uris, ",") { @@ -266,7 +215,7 @@ func (m *DendriteMonolith) SetRelayServers(nodeID string, uris string) { logrus.Infof("Setting own relay servers to: %v", relays) m.relayRetriever.SetRelayServers(relays) } else { - updateNodeRelayServers( + relay.UpdateNodeRelayServers( gomatrixserverlib.ServerName(nodeKey), relays, m.baseDendrite.Context(), @@ -610,27 +559,23 @@ func (m *DendriteMonolith) Start() { }() stopRelayServerSync := make(chan bool) - eLog := logrus.WithField("pinecone", "events") - m.relayRetriever = RelayServerRetriever{ - Context: context.Background(), - ServerName: gomatrixserverlib.ServerName(m.PineconeRouter.PublicKey().String()), - FederationAPI: m.federationAPI, - relayServersQueried: make(map[gomatrixserverlib.ServerName]bool), - RelayAPI: monolith.RelayAPI, - running: *atomic.NewBool(false), - quit: stopRelayServerSync, - } + m.relayRetriever = relay.NewRelayServerRetriever( + context.Background(), + gomatrixserverlib.ServerName(m.PineconeRouter.PublicKey().String()), + m.federationAPI, + monolith.RelayAPI, + stopRelayServerSync, + ) m.relayRetriever.InitializeRelayServers(eLog) go func(ch <-chan pineconeEvents.Event) { - for event := range ch { switch e := event.(type) { case pineconeEvents.PeerAdded: m.relayRetriever.StartSync() case pineconeEvents.PeerRemoved: - if m.relayRetriever.running.Load() && m.PineconeRouter.TotalPeerCount() == 0 { + if m.relayRetriever.IsRunning() && m.PineconeRouter.TotalPeerCount() == 0 { stopRelayServerSync <- true } case pineconeEvents.BroadcastReceived: @@ -658,139 +603,6 @@ func (m *DendriteMonolith) Stop() { _ = m.PineconeRouter.Close() } -type RelayServerRetriever struct { - Context context.Context - ServerName gomatrixserverlib.ServerName - FederationAPI api.FederationInternalAPI - RelayAPI relayServerAPI.RelayInternalAPI - relayServersQueried map[gomatrixserverlib.ServerName]bool - queriedServersMutex sync.Mutex - running atomic.Bool - quit <-chan bool -} - -func (r *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) { - request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(r.ServerName)} - response := api.P2PQueryRelayServersResponse{} - err := r.FederationAPI.P2PQueryRelayServers(r.Context, &request, &response) - if err != nil { - eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error()) - } - - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - for _, server := range response.RelayServers { - r.relayServersQueried[server] = false - } - - eLog.Infof("Registered relay servers: %v", response.RelayServers) -} - -func (r *RelayServerRetriever) SetRelayServers(servers []gomatrixserverlib.ServerName) { - updateNodeRelayServers(r.ServerName, servers, r.Context, r.FederationAPI) - - // Replace list of servers to sync with and mark them all as unsynced. - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - r.relayServersQueried = make(map[gomatrixserverlib.ServerName]bool) - for _, server := range servers { - r.relayServersQueried[server] = false - } - - r.StartSync() -} - -func (r *RelayServerRetriever) GetRelayServers() []gomatrixserverlib.ServerName { - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - relayServers := []gomatrixserverlib.ServerName{} - for server := range r.relayServersQueried { - relayServers = append(relayServers, server) - } - - return relayServers -} - -func (r *RelayServerRetriever) StartSync() { - if !r.running.Load() { - logrus.Info("Starting relay server sync") - go r.SyncRelayServers(r.quit) - } -} - -func (r *RelayServerRetriever) SyncRelayServers(stop <-chan bool) { - defer r.running.Store(false) - - t := time.NewTimer(relayServerRetryInterval) - for { - relayServersToQuery := []gomatrixserverlib.ServerName{} - func() { - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - for server, complete := range r.relayServersQueried { - if !complete { - relayServersToQuery = append(relayServersToQuery, server) - } - } - }() - if len(relayServersToQuery) == 0 { - // All relay servers have been synced. - logrus.Info("Finished syncing with all known relays") - return - } - r.queryRelayServers(relayServersToQuery) - t.Reset(relayServerRetryInterval) - - select { - case <-stop: - if !t.Stop() { - <-t.C - } - return - case <-t.C: - } - } -} - -func (r *RelayServerRetriever) GetQueriedServerStatus() map[gomatrixserverlib.ServerName]bool { - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - - result := map[gomatrixserverlib.ServerName]bool{} - for server, queried := range r.relayServersQueried { - result[server] = queried - } - return result -} - -func (r *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) { - logrus.Info("Querying relay servers for any available transactions") - for _, server := range relayServers { - userID, err := gomatrixserverlib.NewUserID("@user:"+string(r.ServerName), false) - if err != nil { - return - } - - logrus.Infof("Syncing with relay: %s", string(server)) - err = r.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server) - if err == nil { - func() { - r.queriedServersMutex.Lock() - defer r.queriedServersMutex.Unlock() - r.relayServersQueried[server] = true - }() - // TODO : What happens if your relay receives new messages after this point? - // Should you continue to check with them, or should they try and contact you? - // They could send a "new_async_events" message your way maybe? - // Then you could mark them as needing to be queried again. - // What if you miss this message? - // Maybe you should try querying them again after a certain period of time as a backup? - } else { - logrus.Errorf("Failed querying relay server: %s", err.Error()) - } - } -} - const MaxFrameSize = types.MaxFrameSize type Conduit struct { diff --git a/build/gobind-pinecone/monolith_test.go b/build/gobind-pinecone/monolith_test.go index 3c8873e0..f6bf2ef0 100644 --- a/build/gobind-pinecone/monolith_test.go +++ b/build/gobind-pinecone/monolith_test.go @@ -15,19 +15,13 @@ package gobind import ( - "context" "fmt" "net" "strings" "testing" - "time" - "github.com/matrix-org/dendrite/federationapi/api" - relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" - "gotest.tools/v3/poll" ) var TestBuf = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} @@ -128,69 +122,6 @@ func TestConduitReadCopyFails(t *testing.T) { assert.Error(t, err) } -var testRelayServers = []gomatrixserverlib.ServerName{"relay1", "relay2"} - -type FakeFedAPI struct { - api.FederationInternalAPI -} - -func (f *FakeFedAPI) P2PQueryRelayServers(ctx context.Context, req *api.P2PQueryRelayServersRequest, res *api.P2PQueryRelayServersResponse) error { - res.RelayServers = testRelayServers - return nil -} - -type FakeRelayAPI struct { - relayServerAPI.RelayInternalAPI -} - -func (r *FakeRelayAPI) PerformRelayServerSync(ctx context.Context, userID gomatrixserverlib.UserID, relayServer gomatrixserverlib.ServerName) error { - return nil -} - -func TestRelayRetrieverInitialization(t *testing.T) { - retriever := RelayServerRetriever{ - Context: context.Background(), - ServerName: "server", - relayServersQueried: make(map[gomatrixserverlib.ServerName]bool), - FederationAPI: &FakeFedAPI{}, - RelayAPI: &FakeRelayAPI{}, - } - - retriever.InitializeRelayServers(logrus.WithField("test", "relay")) - relayServers := retriever.GetQueriedServerStatus() - assert.Equal(t, 2, len(relayServers)) -} - -func TestRelayRetrieverSync(t *testing.T) { - retriever := RelayServerRetriever{ - Context: context.Background(), - ServerName: "server", - relayServersQueried: make(map[gomatrixserverlib.ServerName]bool), - FederationAPI: &FakeFedAPI{}, - RelayAPI: &FakeRelayAPI{}, - } - - retriever.InitializeRelayServers(logrus.WithField("test", "relay")) - relayServers := retriever.GetQueriedServerStatus() - assert.Equal(t, 2, len(relayServers)) - - stopRelayServerSync := make(chan bool) - go retriever.SyncRelayServers(stopRelayServerSync) - - check := func(log poll.LogT) poll.Result { - relayServers := retriever.GetQueriedServerStatus() - for _, queried := range relayServers { - if !queried { - return poll.Continue("waiting for all servers to be queried") - } - } - - stopRelayServerSync <- true - return poll.Success() - } - poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond)) -} - func TestMonolithStarts(t *testing.T) { monolith := DendriteMonolith{} monolith.Start() |