diff options
Diffstat (limited to 'federationapi/queue/destinationqueue.go')
-rw-r--r-- | federationapi/queue/destinationqueue.go | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/federationapi/queue/destinationqueue.go b/federationapi/queue/destinationqueue.go index a5f8c03b..74794040 100644 --- a/federationapi/queue/destinationqueue.go +++ b/federationapi/queue/destinationqueue.go @@ -78,7 +78,7 @@ func (oq *destinationQueue) sendEvent(event *gomatrixserverlib.HeaderedEvent, re // this destination queue. We'll then be able to retrieve the PDU // later. if err := oq.db.AssociatePDUWithDestination( - context.TODO(), + oq.process.Context(), "", // TODO: remove this, as we don't need to persist the transaction ID oq.destination, // the destination server name receipt, // NIDs from federationapi_queue_json table @@ -122,7 +122,7 @@ func (oq *destinationQueue) sendEDU(event *gomatrixserverlib.EDU, receipt *share // this destination queue. We'll then be able to retrieve the PDU // later. if err := oq.db.AssociateEDUWithDestination( - context.TODO(), + oq.process.Context(), oq.destination, // the destination server name receipt, // NIDs from federationapi_queue_json table event.Type, @@ -177,7 +177,7 @@ func (oq *destinationQueue) getPendingFromDatabase() { // Check to see if there's anything to do for this server // in the database. retrieved := false - ctx := context.Background() + ctx := oq.process.Context() oq.pendingMutex.Lock() defer oq.pendingMutex.Unlock() @@ -271,6 +271,9 @@ func (oq *destinationQueue) backgroundSend() { // restarted automatically the next time we have an event to // send. return + case <-oq.process.Context().Done(): + // The parent process is shutting down, so stop. + return } // If we are backing off this server then wait for the @@ -420,13 +423,13 @@ func (oq *destinationQueue) nextTransaction( // Clean up the transaction in the database. if pduReceipts != nil { //logrus.Infof("Cleaning PDUs %q", pduReceipt.String()) - if err = oq.db.CleanPDUs(context.Background(), oq.destination, pduReceipts); err != nil { + if err = oq.db.CleanPDUs(oq.process.Context(), oq.destination, pduReceipts); err != nil { logrus.WithError(err).Errorf("Failed to clean PDUs for server %q", t.Destination) } } if eduReceipts != nil { //logrus.Infof("Cleaning EDUs %q", eduReceipt.String()) - if err = oq.db.CleanEDUs(context.Background(), oq.destination, eduReceipts); err != nil { + if err = oq.db.CleanEDUs(oq.process.Context(), oq.destination, eduReceipts); err != nil { logrus.WithError(err).Errorf("Failed to clean EDUs for server %q", t.Destination) } } |