aboutsummaryrefslogtreecommitdiff
path: root/federationapi/queue
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-01-28 23:27:53 +0000
committerGitHub <noreply@github.com>2023-01-28 23:27:53 +0000
commit63df85db6d5bc528a784dc52e550fc64385c5f67 (patch)
tree80da0f2cbcf9f4473974e600f90f20aed9803707 /federationapi/queue
parent2debabf0f09bb6e55063bbaa00dfb77090789abc (diff)
Relay integration to pinecone demos (#2955)
This extends the dendrite monolith for pinecone to integrate the s&f features into the mobile apps. Also makes a few tweaks to federation queueing/statistics to make some edge cases more robust.
Diffstat (limited to 'federationapi/queue')
-rw-r--r--federationapi/queue/destinationqueue.go63
-rw-r--r--federationapi/queue/queue_test.go4
2 files changed, 41 insertions, 26 deletions
diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go
index 51350916..12e6db9f 100644
--- a/federationapi/queue/destinationqueue.go
+++ b/federationapi/queue/destinationqueue.go
@@ -410,34 +410,49 @@ func (oq *destinationQueue) nextTransaction(
defer cancel()
relayServers := oq.statistics.KnownRelayServers()
- if oq.statistics.AssumedOffline() && len(relayServers) > 0 {
- sendMethod = statistics.SendViaRelay
- relaySuccess := false
- logrus.Infof("Sending to relay servers: %v", relayServers)
- // TODO : how to pass through actual userID here?!?!?!?!
- userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false)
- if userErr != nil {
- return userErr, sendMethod
- }
+ hasRelayServers := len(relayServers) > 0
+ shouldSendToRelays := oq.statistics.AssumedOffline() && hasRelayServers
+ if !shouldSendToRelays {
+ sendMethod = statistics.SendDirect
+ _, err = oq.client.SendTransaction(ctx, t)
+ } else {
+ // Try sending directly to the destination first in case they came back online.
+ sendMethod = statistics.SendDirect
+ _, err = oq.client.SendTransaction(ctx, t)
+ if err != nil {
+ // The destination is still offline, try sending to relays.
+ sendMethod = statistics.SendViaRelay
+ relaySuccess := false
+ logrus.Infof("Sending %q to relay servers: %v", t.TransactionID, relayServers)
+ // TODO : how to pass through actual userID here?!?!?!?!
+ userID, userErr := gomatrixserverlib.NewUserID("@user:"+string(oq.destination), false)
+ if userErr != nil {
+ return userErr, sendMethod
+ }
- // Attempt sending to each known relay server.
- for _, relayServer := range relayServers {
- _, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
- if relayErr != nil {
- err = relayErr
- } else {
- // If sending to one of the relay servers succeeds, consider the send successful.
- relaySuccess = true
+ // Attempt sending to each known relay server.
+ for _, relayServer := range relayServers {
+ _, relayErr := oq.client.P2PSendTransactionToRelay(ctx, *userID, t, relayServer)
+ if relayErr != nil {
+ err = relayErr
+ } else {
+ // If sending to one of the relay servers succeeds, consider the send successful.
+ relaySuccess = true
+
+ // TODO : what about if the dest comes back online but can't see their relay?
+ // How do I sync with the dest in that case?
+ // Should change the database to have a "relay success" flag on events and if
+ // I see the node back online, maybe directly send through the backlog of events
+ // with "relay success"... could lead to duplicate events, but only those that
+ // I sent. And will lead to a much more consistent experience.
+ }
}
- }
- // Clear the error if sending to any of the relay servers succeeded.
- if relaySuccess {
- err = nil
+ // Clear the error if sending to any of the relay servers succeeded.
+ if relaySuccess {
+ err = nil
+ }
}
- } else {
- sendMethod = statistics.SendDirect
- _, err = oq.client.SendTransaction(ctx, t)
}
switch errResponse := err.(type) {
case nil:
diff --git a/federationapi/queue/queue_test.go b/federationapi/queue/queue_test.go
index 36e2ccbc..bccfb342 100644
--- a/federationapi/queue/queue_test.go
+++ b/federationapi/queue/queue_test.go
@@ -923,7 +923,7 @@ func TestSendPDUOnRelaySuccessRemovedFromDB(t *testing.T) {
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
- if fc.txCount.Load() == 1 {
+ if fc.txCount.Load() >= 1 {
if fc.txRelayCount.Load() == 1 {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
@@ -962,7 +962,7 @@ func TestSendEDUOnRelaySuccessRemovedFromDB(t *testing.T) {
assert.NoError(t, err)
check := func(log poll.LogT) poll.Result {
- if fc.txCount.Load() == 1 {
+ if fc.txCount.Load() >= 1 {
if fc.txRelayCount.Load() == 1 {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)