aboutsummaryrefslogtreecommitdiff
path: root/federationapi/queue
diff options
context:
space:
mode:
authorkegsay <kegan@matrix.org>2022-05-17 13:23:35 +0100
committerGitHub <noreply@github.com>2022-05-17 13:23:35 +0100
commit6de29c1cd23d218f04d2e570932db8967d6adc4f (patch)
treeb95fa478ef9ecd2c21963868a3626063bdff7cbc /federationapi/queue
parentcd82460513d5abf04e56c01667d56499d4c354be (diff)
bugfix: E2EE device keys could sometimes not be sent to remote servers (#2466)
* Fix flakey sytest 'Local device key changes get to remote servers' * Debug logs * Remove internal/test and use /test only Remove a lot of ancient code too. * Use FederationRoomserverAPI in more places * Use more interfaces in federationapi; begin adding regression test * Linting * Add regression test * Unbreak tests * ALL THE LOGS * Fix a race condition which could cause events to not be sent to servers If a new room event which rewrites state arrives, we remove all joined hosts then re-calculate them. This wasn't done in a transaction so for a brief period we would have no joined hosts. During this interim, key change events which arrive would not be sent to destination servers. This would sporadically fail on sytest. * Unbreak new tests * Linting
Diffstat (limited to 'federationapi/queue')
-rw-r--r--federationapi/queue/destinationqueue.go31
-rw-r--r--federationapi/queue/queue.go9
2 files changed, 21 insertions, 19 deletions
diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go
index 74794040..b6edec5d 100644
--- a/federationapi/queue/destinationqueue.go
+++ b/federationapi/queue/destinationqueue.go
@@ -21,6 +21,7 @@ import (
"sync"
"time"
+ fedapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/storage/shared"
@@ -49,21 +50,21 @@ type destinationQueue struct {
db storage.Database
process *process.ProcessContext
signing *SigningInfo
- rsAPI api.RoomserverInternalAPI
- client *gomatrixserverlib.FederationClient // federation client
- origin gomatrixserverlib.ServerName // origin of requests
- destination gomatrixserverlib.ServerName // destination of requests
- running atomic.Bool // is the queue worker running?
- backingOff atomic.Bool // true if we're backing off
- overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more
- statistics *statistics.ServerStatistics // statistics about this remote server
- transactionIDMutex sync.Mutex // protects transactionID
- transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful
- notify chan struct{} // interrupts idle wait pending PDUs/EDUs
- pendingPDUs []*queuedPDU // PDUs waiting to be sent
- pendingEDUs []*queuedEDU // EDUs waiting to be sent
- pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs
- interruptBackoff chan bool // interrupts backoff
+ rsAPI api.FederationRoomserverAPI
+ client fedapi.FederationClient // federation client
+ origin gomatrixserverlib.ServerName // origin of requests
+ destination gomatrixserverlib.ServerName // destination of requests
+ running atomic.Bool // is the queue worker running?
+ backingOff atomic.Bool // true if we're backing off
+ overflowed atomic.Bool // the queues exceed maxPDUsInMemory/maxEDUsInMemory, so we should consult the database for more
+ statistics *statistics.ServerStatistics // statistics about this remote server
+ transactionIDMutex sync.Mutex // protects transactionID
+ transactionID gomatrixserverlib.TransactionID // last transaction ID if retrying, or "" if last txn was successful
+ notify chan struct{} // interrupts idle wait pending PDUs/EDUs
+ pendingPDUs []*queuedPDU // PDUs waiting to be sent
+ pendingEDUs []*queuedEDU // EDUs waiting to be sent
+ pendingMutex sync.RWMutex // protects pendingPDUs and pendingEDUs
+ interruptBackoff chan bool // interrupts backoff
}
// Send event adds the event to the pending queue for the destination.
diff --git a/federationapi/queue/queue.go b/federationapi/queue/queue.go
index d152886f..4c25c4ce 100644
--- a/federationapi/queue/queue.go
+++ b/federationapi/queue/queue.go
@@ -26,6 +26,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
+ fedapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/storage/shared"
@@ -39,9 +40,9 @@ type OutgoingQueues struct {
db storage.Database
process *process.ProcessContext
disabled bool
- rsAPI api.RoomserverInternalAPI
+ rsAPI api.FederationRoomserverAPI
origin gomatrixserverlib.ServerName
- client *gomatrixserverlib.FederationClient
+ client fedapi.FederationClient
statistics *statistics.Statistics
signing *SigningInfo
queuesMutex sync.Mutex // protects the below
@@ -85,8 +86,8 @@ func NewOutgoingQueues(
process *process.ProcessContext,
disabled bool,
origin gomatrixserverlib.ServerName,
- client *gomatrixserverlib.FederationClient,
- rsAPI api.RoomserverInternalAPI,
+ client fedapi.FederationClient,
+ rsAPI api.FederationRoomserverAPI,
statistics *statistics.Statistics,
signing *SigningInfo,
) *OutgoingQueues {