aboutsummaryrefslogtreecommitdiff
path: root/federationapi/statistics/statistics.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/statistics/statistics.go')
-rw-r--r--federationapi/statistics/statistics.go131
1 files changed, 87 insertions, 44 deletions
diff --git a/federationapi/statistics/statistics.go b/federationapi/statistics/statistics.go
index db6d5c73..2ba99112 100644
--- a/federationapi/statistics/statistics.go
+++ b/federationapi/statistics/statistics.go
@@ -2,6 +2,7 @@ package statistics
import (
"math"
+ "math/rand"
"sync"
"time"
@@ -20,12 +21,23 @@ type Statistics struct {
servers map[gomatrixserverlib.ServerName]*ServerStatistics
mutex sync.RWMutex
+ backoffTimers map[gomatrixserverlib.ServerName]*time.Timer
+ backoffMutex sync.RWMutex
+
// How many times should we tolerate consecutive failures before we
// just blacklist the host altogether? The backoff is exponential,
// so the max time here to attempt is 2**failures seconds.
FailuresUntilBlacklist uint32
}
+func NewStatistics(db storage.Database, failuresUntilBlacklist uint32) Statistics {
+ return Statistics{
+ DB: db,
+ FailuresUntilBlacklist: failuresUntilBlacklist,
+ backoffTimers: make(map[gomatrixserverlib.ServerName]*time.Timer),
+ }
+}
+
// ForServer returns server statistics for the given server name. If it
// does not exist, it will create empty statistics and return those.
func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerStatistics {
@@ -45,7 +57,6 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
server = &ServerStatistics{
statistics: s,
serverName: serverName,
- interrupt: make(chan struct{}),
}
s.servers[serverName] = server
s.mutex.Unlock()
@@ -64,29 +75,43 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
// many times we failed etc. It also manages the backoff time and black-
// listing a remote host if it remains uncooperative.
type ServerStatistics struct {
- statistics *Statistics //
- serverName gomatrixserverlib.ServerName //
- blacklisted atomic.Bool // is the node blacklisted
- backoffStarted atomic.Bool // is the backoff started
- backoffUntil atomic.Value // time.Time until this backoff interval ends
- backoffCount atomic.Uint32 // number of times BackoffDuration has been called
- interrupt chan struct{} // interrupts the backoff goroutine
- successCounter atomic.Uint32 // how many times have we succeeded?
+ statistics *Statistics //
+ serverName gomatrixserverlib.ServerName //
+ blacklisted atomic.Bool // is the node blacklisted
+ backoffStarted atomic.Bool // is the backoff started
+ backoffUntil atomic.Value // time.Time until this backoff interval ends
+ backoffCount atomic.Uint32 // number of times BackoffDuration has been called
+ successCounter atomic.Uint32 // how many times have we succeeded?
+ backoffNotifier func() // notifies destination queue when backoff completes
+ notifierMutex sync.Mutex
}
+const maxJitterMultiplier = 1.4
+const minJitterMultiplier = 0.8
+
// duration returns how long the next backoff interval should be.
func (s *ServerStatistics) duration(count uint32) time.Duration {
- return time.Second * time.Duration(math.Exp2(float64(count)))
+ // Add some jitter to minimise the chance of having multiple backoffs
+ // ending at the same time.
+ jitter := rand.Float64()*(maxJitterMultiplier-minJitterMultiplier) + minJitterMultiplier
+ duration := time.Millisecond * time.Duration(math.Exp2(float64(count))*jitter*1000)
+ return duration
}
// cancel will interrupt the currently active backoff.
func (s *ServerStatistics) cancel() {
s.blacklisted.Store(false)
s.backoffUntil.Store(time.Time{})
- select {
- case s.interrupt <- struct{}{}:
- default:
- }
+
+ s.ClearBackoff()
+}
+
+// AssignBackoffNotifier configures the channel to send to when
+// a backoff completes.
+func (s *ServerStatistics) AssignBackoffNotifier(notifier func()) {
+ s.notifierMutex.Lock()
+ defer s.notifierMutex.Unlock()
+ s.backoffNotifier = notifier
}
// Success updates the server statistics with a new successful
@@ -95,8 +120,8 @@ func (s *ServerStatistics) cancel() {
// we will unblacklist it.
func (s *ServerStatistics) Success() {
s.cancel()
- s.successCounter.Inc()
s.backoffCount.Store(0)
+ s.successCounter.Inc()
if s.statistics.DB != nil {
if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
@@ -105,13 +130,17 @@ func (s *ServerStatistics) Success() {
}
// Failure marks a failure and starts backing off if needed.
-// The next call to BackoffIfRequired will do the right thing
-// after this. It will return the time that the current failure
+// It will return the time that the current failure
// will result in backoff waiting until, and a bool signalling
// whether we have blacklisted and therefore to give up.
func (s *ServerStatistics) Failure() (time.Time, bool) {
+ // Return immediately if we have blacklisted this node.
+ if s.blacklisted.Load() {
+ return time.Time{}, true
+ }
+
// If we aren't already backing off, this call will start
- // a new backoff period. Increase the failure counter and
+ // a new backoff period, increase the failure counter and
// start a goroutine which will wait out the backoff and
// unset the backoffStarted flag when done.
if s.backoffStarted.CompareAndSwap(false, true) {
@@ -122,40 +151,48 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
logrus.WithError(err).Errorf("Failed to add %q to blacklist", s.serverName)
}
}
+ s.ClearBackoff()
return time.Time{}, true
}
- go func() {
- until, ok := s.backoffUntil.Load().(time.Time)
- if ok && !until.IsZero() {
- select {
- case <-time.After(time.Until(until)):
- case <-s.interrupt:
- }
- s.backoffStarted.Store(false)
- }
- }()
- }
+ // We're starting a new back off so work out what the next interval
+ // will be.
+ count := s.backoffCount.Load()
+ until := time.Now().Add(s.duration(count))
+ s.backoffUntil.Store(until)
- // Check if we have blacklisted this node.
- if s.blacklisted.Load() {
- return time.Now(), true
+ s.statistics.backoffMutex.Lock()
+ defer s.statistics.backoffMutex.Unlock()
+ s.statistics.backoffTimers[s.serverName] = time.AfterFunc(time.Until(until), s.backoffFinished)
}
- // If we're already backing off and we haven't yet surpassed
- // the deadline then return that. Repeated calls to Failure
- // within a single backoff interval will have no side effects.
- if until, ok := s.backoffUntil.Load().(time.Time); ok && !time.Now().After(until) {
- return until, false
+ return s.backoffUntil.Load().(time.Time), false
+}
+
+// ClearBackoff stops the backoff timer for this destination if it is running
+// and removes the timer from the backoffTimers map.
+func (s *ServerStatistics) ClearBackoff() {
+ // If the timer is still running then stop it so it's memory is cleaned up sooner.
+ s.statistics.backoffMutex.Lock()
+ defer s.statistics.backoffMutex.Unlock()
+ if timer, ok := s.statistics.backoffTimers[s.serverName]; ok {
+ timer.Stop()
}
+ delete(s.statistics.backoffTimers, s.serverName)
+
+ s.backoffStarted.Store(false)
+}
+
+// backoffFinished will clear the previous backoff and notify the destination queue.
+func (s *ServerStatistics) backoffFinished() {
+ s.ClearBackoff()
- // We're either backing off and have passed the deadline, or
- // we aren't backing off, so work out what the next interval
- // will be.
- count := s.backoffCount.Load()
- until := time.Now().Add(s.duration(count))
- s.backoffUntil.Store(until)
- return until, false
+ // Notify the destinationQueue if one is currently running.
+ s.notifierMutex.Lock()
+ defer s.notifierMutex.Unlock()
+ if s.backoffNotifier != nil {
+ s.backoffNotifier()
+ }
}
// BackoffInfo returns information about the current or previous backoff.
@@ -174,6 +211,12 @@ func (s *ServerStatistics) Blacklisted() bool {
return s.blacklisted.Load()
}
+// RemoveBlacklist removes the blacklisted status from the server.
+func (s *ServerStatistics) RemoveBlacklist() {
+ s.cancel()
+ s.backoffCount.Store(0)
+}
+
// SuccessCount returns the number of successful requests. This is
// usually useful in constructing transaction IDs.
func (s *ServerStatistics) SuccessCount() uint32 {