diff options
author | devonh <devon.dmytro@gmail.com> | 2023-01-28 23:27:53 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-28 23:27:53 +0000 |
commit | 63df85db6d5bc528a784dc52e550fc64385c5f67 (patch) | |
tree | 80da0f2cbcf9f4473974e600f90f20aed9803707 /federationapi/queue | |
parent | 2debabf0f09bb6e55063bbaa00dfb77090789abc (diff) |
Relay integration to pinecone demos (#2955)
This extends the dendrite monolith for pinecone to integrate the s&f
features into the mobile apps.
Also makes a few tweaks to federation queueing/statistics to make some
edge cases more robust.
Diffstat (limited to 'federationapi/queue')
-rw-r--r-- | federationapi/queue/destinationqueue.go | 63 | ||||
-rw-r--r-- | federationapi/queue/queue_test.go | 4 |
2 files changed, 41 insertions, 26 deletions
diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index 51350916..12e6db9f 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -410,34 +410,49 @@ func (oq *destinationQueue) nextTransaction( defer cancel() relayServers := oq.statistics.KnownRelayServers() - if oq.statistics.AssumedOffline() && len(relayServers) > 0 { - sendMethod = statistics.SendViaRelay - relaySuccess := false - logrus.Infof("Sending to relay servers: %v", relayServers) - // TODO : how to pass through actual userID here?!?!?!?! - userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false) - if userErr != nil { - return userErr, sendMethod - } + hasRelayServers := len(relayServers) > 0 + shouldSendToRelays := oq.statistics.AssumedOffline() && hasRelayServers + if !shouldSendToRelays { + sendMethod = statistics.SendDirect + _, err = oq.client.SendTransaction(ctx, t) + } else { + // Try sending directly to the destination first in case they came back online. + sendMethod = statistics.SendDirect + _, err = oq.client.SendTransaction(ctx, t) + if err != nil { + // The destination is still offline, try sending to relays. + sendMethod = statistics.SendViaRelay + relaySuccess := false + logrus.Infof("Sending %q to relay servers: %v", t.TransactionID, relayServers) + // TODO : how to pass through actual userID here?!?!?!?! + userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false) + if userErr != nil { + return userErr, sendMethod + } - // Attempt sending to each known relay server. - for _, relayServer := range relayServers { - _, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer) - if relayErr != nil { - err = relayErr - } else { - // If sending to one of the relay servers succeeds, consider the send successful. - relaySuccess = true + // Attempt sending to each known relay server. + for _, relayServer := range relayServers { + _, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer) + if relayErr != nil { + err = relayErr + } else { + // If sending to one of the relay servers succeeds, consider the send successful. + relaySuccess = true + + // TODO : what about if the dest comes back online but can't see their relay? + // How do I sync with the dest in that case? + // Should change the database to have a "relay success" flag on events and if + // I see the node back online, maybe directly send through the backlog of events + // with "relay success"... could lead to duplicate events, but only those that + // I sent. And will lead to a much more consistent experience. + } } - } - // Clear the error if sending to any of the relay servers succeeded. - if relaySuccess { - err = nil + // Clear the error if sending to any of the relay servers succeeded. + if relaySuccess { + err = nil + } } - } else { - sendMethod = statistics.SendDirect - _, err = oq.client.SendTransaction(ctx, t) } switch errResponse := err.(type) { case nil: diff --git a/federationapi/queue/queue_test.go b/federationapi/queue/queue_test.go index 36e2ccbc..bccfb342 100644 --- a/federationapi/queue/queue_test.go +++ b/federationapi/queue/queue_test.go @@ -923,7 +923,7 @@ func TestSendPDUOnRelaySuccessRemovedFromDB(t *testing.T) { assert.NoError(t, err) check := func(log poll.LogT) poll.Result { - if fc.txCount.Load() == 1 { + if fc.txCount.Load() >= 1 { if fc.txRelayCount.Load() == 1 { data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100) assert.NoError(t, dbErr) @@ -962,7 +962,7 @@ func TestSendEDUOnRelaySuccessRemovedFromDB(t *testing.T) { assert.NoError(t, err) check := func(log poll.LogT) poll.Result { - if fc.txCount.Load() == 1 { + if fc.txCount.Load() >= 1 { if fc.txRelayCount.Load() == 1 { data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100) assert.NoError(t, dbErr) |