diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-07-20 16:55:20 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-20 16:55:20 +0100 |
commit | 11a39fe3b5afeadafd2c842951462d9332ebecca (patch) | |
tree | 88c3601666ecd6e891c64875f29e1f8ce22eac34 /federationsender/queue/queue.go | |
parent | f3c482b078c2fbb6d6d7a41be44c2e841d70d8b8 (diff) |
Deduplicate FS database, EDU persistence table (#1207)
* Deduplicate FS database, add some EDU persistence groundwork
* Extend TransactionWriter to use optional existing transaction, use that for FS SQLite database writes
* Fix build due to bad keyserver import
* Working EDU persistence
* gocyclo, unsurprisingly
* Remove unused
* Update copyright notices
Diffstat (limited to 'federationsender/queue/queue.go')
-rw-r--r-- | federationsender/queue/queue.go | 33 |
1 files changed, 27 insertions, 6 deletions
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go index 46c9fddb..e488a34a 100644 --- a/federationsender/queue/queue.go +++ b/federationsender/queue/queue.go @@ -61,12 +61,23 @@ func NewOutgoingQueues( queues: map[gomatrixserverlib.ServerName]*destinationQueue{}, } // Look up which servers we have pending items for and then rehydrate those queues. - if serverNames, err := db.GetPendingServerNames(context.Background()); err == nil { - for _, serverName := range serverNames { - queues.getQueue(serverName).wakeQueueIfNeeded() + serverNames := map[gomatrixserverlib.ServerName]struct{}{} + if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} } } else { - log.WithError(err).Error("Failed to get server names for destination queue hydration") + log.WithError(err).Error("Failed to get PDU server names for destination queue hydration") + } + if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil { + for _, serverName := range names { + serverNames[serverName] = struct{}{} + } + } else { + log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") + } + for serverName := range serverNames { + queues.getQueue(serverName).wakeQueueIfNeeded() } return queues } @@ -91,9 +102,9 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d destination: destination, client: oqs.client, statistics: oqs.statistics.ForServer(destination), - incomingEDUs: make(chan *gomatrixserverlib.EDU, 128), incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128), notifyPDUs: make(chan bool, 1), + notifyEDUs: make(chan bool, 1), interruptBackoff: make(chan bool), signing: oqs.signing, } @@ -196,8 +207,18 @@ func (oqs *OutgoingQueues) SendEDU( }).Info("Sending EDU event") } + ephemeralJSON, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("json.Marshal: %w", err) + } + + nid, err := oqs.db.StoreJSON(context.TODO(), string(ephemeralJSON)) + if err != nil { + return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err) + } + for _, destination := range destinations { - oqs.getQueue(destination).sendEDU(e) + oqs.getQueue(destination).sendEDU(nid) } return nil |