aboutsummaryrefslogtreecommitdiff
path: root/federationsender/queue/queue.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-07-20 16:55:20 +0100
committerGitHub <noreply@github.com>2020-07-20 16:55:20 +0100
commit11a39fe3b5afeadafd2c842951462d9332ebecca (patch)
tree88c3601666ecd6e891c64875f29e1f8ce22eac34 /federationsender/queue/queue.go
parentf3c482b078c2fbb6d6d7a41be44c2e841d70d8b8 (diff)
Deduplicate FS database, EDU persistence table (#1207)
* Deduplicate FS database, add some EDU persistence groundwork * Extend TransactionWriter to use optional existing transaction, use that for FS SQLite database writes * Fix build due to bad keyserver import * Working EDU persistence * gocyclo, unsurprisingly * Remove unused * Update copyright notices
Diffstat (limited to 'federationsender/queue/queue.go')
-rw-r--r--federationsender/queue/queue.go33
1 files changed, 27 insertions, 6 deletions
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index 46c9fddb..e488a34a 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -61,12 +61,23 @@ func NewOutgoingQueues(
queues: map[gomatrixserverlib.ServerName]*destinationQueue{},
}
// Look up which servers we have pending items for and then rehydrate those queues.
- if serverNames, err := db.GetPendingServerNames(context.Background()); err == nil {
- for _, serverName := range serverNames {
- queues.getQueue(serverName).wakeQueueIfNeeded()
+ serverNames := map[gomatrixserverlib.ServerName]struct{}{}
+ if names, err := db.GetPendingPDUServerNames(context.Background()); err == nil {
+ for _, serverName := range names {
+ serverNames[serverName] = struct{}{}
}
} else {
- log.WithError(err).Error("Failed to get server names for destination queue hydration")
+ log.WithError(err).Error("Failed to get PDU server names for destination queue hydration")
+ }
+ if names, err := db.GetPendingEDUServerNames(context.Background()); err == nil {
+ for _, serverName := range names {
+ serverNames[serverName] = struct{}{}
+ }
+ } else {
+ log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
+ }
+ for serverName := range serverNames {
+ queues.getQueue(serverName).wakeQueueIfNeeded()
}
return queues
}
@@ -91,9 +102,9 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
destination: destination,
client: oqs.client,
statistics: oqs.statistics.ForServer(destination),
- incomingEDUs: make(chan *gomatrixserverlib.EDU, 128),
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
notifyPDUs: make(chan bool, 1),
+ notifyEDUs: make(chan bool, 1),
interruptBackoff: make(chan bool),
signing: oqs.signing,
}
@@ -196,8 +207,18 @@ func (oqs *OutgoingQueues) SendEDU(
}).Info("Sending EDU event")
}
+ ephemeralJSON, err := json.Marshal(e)
+ if err != nil {
+ return fmt.Errorf("json.Marshal: %w", err)
+ }
+
+ nid, err := oqs.db.StoreJSON(context.TODO(), string(ephemeralJSON))
+ if err != nil {
+ return fmt.Errorf("sendevent: oqs.db.StoreJSON: %w", err)
+ }
+
for _, destination := range destinations {
- oqs.getQueue(destination).sendEDU(e)
+ oqs.getQueue(destination).sendEDU(nid)
}
return nil