aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--federationsender/queue/destinationqueue.go6
-rw-r--r--federationsender/queue/queue.go33
-rw-r--r--syncapi/sync/requestpool.go31
3 files changed, 70 insertions, 0 deletions
diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go
index 31eeaebc..c8b0bf65 100644
--- a/federationsender/queue/destinationqueue.go
+++ b/federationsender/queue/destinationqueue.go
@@ -242,6 +242,8 @@ func (oq *destinationQueue) backgroundSend() {
if !oq.running.CAS(false, true) {
return
}
+ destinationQueueRunning.Inc()
+ defer destinationQueueRunning.Dec()
defer oq.running.Store(false)
// Mark the queue as overflowed, so we will consult the database
@@ -295,10 +297,14 @@ func (oq *destinationQueue) backgroundSend() {
// time.
duration := time.Until(*until)
log.Warnf("Backing off %q for %s", oq.destination, duration)
+ oq.backingOff.Store(true)
+ destinationQueueBackingOff.Inc()
select {
case <-time.After(duration):
case <-oq.interruptBackoff:
}
+ destinationQueueBackingOff.Dec()
+ oq.backingOff.Store(false)
}
// Work out which PDUs/EDUs to include in the next transaction.
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index da30e4de..8054856e 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/federationsender/storage/shared"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
@@ -45,6 +46,37 @@ type OutgoingQueues struct {
queues map[gomatrixserverlib.ServerName]*destinationQueue
}
+func init() {
+ prometheus.MustRegister(
+ destinationQueueTotal, destinationQueueRunning,
+ destinationQueueBackingOff,
+ )
+}
+
+var destinationQueueTotal = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "federationsender",
+ Name: "destination_queues_total",
+ },
+)
+
+var destinationQueueRunning = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "federationsender",
+ Name: "destination_queues_running",
+ },
+)
+
+var destinationQueueBackingOff = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "federationsender",
+ Name: "destination_queues_backing_off",
+ },
+)
+
// NewOutgoingQueues makes a new OutgoingQueues
func NewOutgoingQueues(
db storage.Database,
@@ -116,6 +148,7 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
defer oqs.queuesMutex.Unlock()
oq := oqs.queues[destination]
if oq == nil {
+ destinationQueueTotal.Inc()
oq = &destinationQueue{
db: oqs.db,
rsAPI: oqs.rsAPI,
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index a4eec467..32dfb2d6 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -35,6 +35,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
@@ -99,6 +100,30 @@ func (rp *RequestPool) updateLastSeen(req *http.Request, device *userapi.Device)
rp.lastseen.Store(device.UserID+device.ID, time.Now())
}
+func init() {
+ prometheus.MustRegister(
+ activeSyncRequests, waitingSyncRequests,
+ )
+}
+
+var activeSyncRequests = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "syncapi",
+ Name: "active_sync_requests",
+ Help: "The number of sync requests that are active right now",
+ },
+)
+
+var waitingSyncRequests = prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Namespace: "dendrite",
+ Subsystem: "syncapi",
+ Name: "waiting_sync_requests",
+ Help: "The number of sync requests that are waiting to be woken by a notifier",
+ },
+)
+
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
// called in a dedicated goroutine for this request. This function will block the goroutine
// until a response is ready, or it times out.
@@ -122,6 +147,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
"limit": syncReq.limit,
})
+ activeSyncRequests.Inc()
+ defer activeSyncRequests.Dec()
+
rp.updateLastSeen(req, device)
currPos := rp.notifier.CurrentPosition()
@@ -139,6 +167,9 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
+ waitingSyncRequests.Inc()
+ defer waitingSyncRequests.Dec()
+
// Otherwise, we wait for the notifier to tell us if something *may* have
// happened. We loop in case it turns out that nothing did happen.