aboutsummaryrefslogtreecommitdiff
path: root/federationapi/statistics/statistics.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/statistics/statistics.go')
-rw-r--r--federationapi/statistics/statistics.go186
1 files changed, 157 insertions, 29 deletions
diff --git a/federationapi/statistics/statistics.go b/federationapi/statistics/statistics.go
index 0a44375c..866c0933 100644
--- a/federationapi/statistics/statistics.go
+++ b/federationapi/statistics/statistics.go
@@ -1,6 +1,7 @@
package statistics
import (
+ "context"
"math"
"math/rand"
"sync"
@@ -28,14 +29,24 @@ type Statistics struct {
// just blacklist the host altogether? The backoff is exponential,
// so the max time here to attempt is 2**failures seconds.
FailuresUntilBlacklist uint32
+
+ // How many times should we tolerate consecutive failures before we
+ // mark the destination as offline. At this point we should attempt
+ // to send messages to the user's async relay servers if we know them.
+ FailuresUntilAssumedOffline uint32
}
-func NewStatistics(db storage.Database, failuresUntilBlacklist uint32) Statistics {
+func NewStatistics(
+ db storage.Database,
+ failuresUntilBlacklist uint32,
+ failuresUntilAssumedOffline uint32,
+) Statistics {
return Statistics{
- DB: db,
- FailuresUntilBlacklist: failuresUntilBlacklist,
- backoffTimers: make(map[gomatrixserverlib.ServerName]*time.Timer),
- servers: make(map[gomatrixserverlib.ServerName]*ServerStatistics),
+ DB: db,
+ FailuresUntilBlacklist: failuresUntilBlacklist,
+ FailuresUntilAssumedOffline: failuresUntilAssumedOffline,
+ backoffTimers: make(map[gomatrixserverlib.ServerName]*time.Timer),
+ servers: make(map[gomatrixserverlib.ServerName]*ServerStatistics),
}
}
@@ -50,8 +61,9 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
if !found {
s.mutex.Lock()
server = &ServerStatistics{
- statistics: s,
- serverName: serverName,
+ statistics: s,
+ serverName: serverName,
+ knownRelayServers: []gomatrixserverlib.ServerName{},
}
s.servers[serverName] = server
s.mutex.Unlock()
@@ -61,24 +73,49 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
} else {
server.blacklisted.Store(blacklisted)
}
+ assumedOffline, err := s.DB.IsServerAssumedOffline(context.Background(), serverName)
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to get assumed offline entry %q", serverName)
+ } else {
+ server.assumedOffline.Store(assumedOffline)
+ }
+
+ knownRelayServers, err := s.DB.P2PGetRelayServersForServer(context.Background(), serverName)
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to get relay server list for %q", serverName)
+ } else {
+ server.relayMutex.Lock()
+ server.knownRelayServers = knownRelayServers
+ server.relayMutex.Unlock()
+ }
}
return server
}
+type SendMethod uint8
+
+const (
+ SendDirect SendMethod = iota
+ SendViaRelay
+)
+
// ServerStatistics contains information about our interactions with a
// remote federated host, e.g. how many times we were successful, how
// many times we failed etc. It also manages the backoff time and black-
// listing a remote host if it remains uncooperative.
type ServerStatistics struct {
- statistics *Statistics //
- serverName gomatrixserverlib.ServerName //
- blacklisted atomic.Bool // is the node blacklisted
- backoffStarted atomic.Bool // is the backoff started
- backoffUntil atomic.Value // time.Time until this backoff interval ends
- backoffCount atomic.Uint32 // number of times BackoffDuration has been called
- successCounter atomic.Uint32 // how many times have we succeeded?
- backoffNotifier func() // notifies destination queue when backoff completes
- notifierMutex sync.Mutex
+ statistics *Statistics //
+ serverName gomatrixserverlib.ServerName //
+ blacklisted atomic.Bool // is the node blacklisted
+ assumedOffline atomic.Bool // is the node assumed to be offline
+ backoffStarted atomic.Bool // is the backoff started
+ backoffUntil atomic.Value // time.Time until this backoff interval ends
+ backoffCount atomic.Uint32 // number of times BackoffDuration has been called
+ successCounter atomic.Uint32 // how many times have we succeeded?
+ backoffNotifier func() // notifies destination queue when backoff completes
+ notifierMutex sync.Mutex
+ knownRelayServers []gomatrixserverlib.ServerName
+ relayMutex sync.Mutex
}
const maxJitterMultiplier = 1.4
@@ -113,13 +150,19 @@ func (s *ServerStatistics) AssignBackoffNotifier(notifier func()) {
// attempt, which increases the sent counter and resets the idle and
// failure counters. If a host was blacklisted at this point then
// we will unblacklist it.
-func (s *ServerStatistics) Success() {
+// `relay` specifies whether the success was to the actual destination
+// or one of their relay servers.
+func (s *ServerStatistics) Success(method SendMethod) {
s.cancel()
s.backoffCount.Store(0)
- s.successCounter.Inc()
- if s.statistics.DB != nil {
- if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
- logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
+ // NOTE : Sending to the final destination vs. a relay server has
+ // slightly different semantics.
+ if method == SendDirect {
+ s.successCounter.Inc()
+ if s.blacklisted.Load() && s.statistics.DB != nil {
+ if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
+ logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
+ }
}
}
}
@@ -139,7 +182,18 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
// start a goroutine which will wait out the backoff and
// unset the backoffStarted flag when done.
if s.backoffStarted.CompareAndSwap(false, true) {
- if s.backoffCount.Inc() >= s.statistics.FailuresUntilBlacklist {
+ backoffCount := s.backoffCount.Inc()
+
+ if backoffCount >= s.statistics.FailuresUntilAssumedOffline {
+ s.assumedOffline.CompareAndSwap(false, true)
+ if s.statistics.DB != nil {
+ if err := s.statistics.DB.SetServerAssumedOffline(context.Background(), s.serverName); err != nil {
+ logrus.WithError(err).Errorf("Failed to set %q as assumed offline", s.serverName)
+ }
+ }
+ }
+
+ if backoffCount >= s.statistics.FailuresUntilBlacklist {
s.blacklisted.Store(true)
if s.statistics.DB != nil {
if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil {
@@ -157,13 +211,21 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
s.backoffUntil.Store(until)
s.statistics.backoffMutex.Lock()
- defer s.statistics.backoffMutex.Unlock()
s.statistics.backoffTimers[s.serverName] = time.AfterFunc(time.Until(until), s.backoffFinished)
+ s.statistics.backoffMutex.Unlock()
}
return s.backoffUntil.Load().(time.Time), false
}
+// MarkServerAlive removes the assumed offline and blacklisted statuses from this server.
+// Returns whether the server was blacklisted before this point.
+func (s *ServerStatistics) MarkServerAlive() bool {
+ s.removeAssumedOffline()
+ wasBlacklisted := s.removeBlacklist()
+ return wasBlacklisted
+}
+
// ClearBackoff stops the backoff timer for this destination if it is running
// and removes the timer from the backoffTimers map.
func (s *ServerStatistics) ClearBackoff() {
@@ -191,13 +253,13 @@ func (s *ServerStatistics) backoffFinished() {
}
// BackoffInfo returns information about the current or previous backoff.
-// Returns the last backoffUntil time and whether the server is currently blacklisted or not.
-func (s *ServerStatistics) BackoffInfo() (*time.Time, bool) {
+// Returns the last backoffUntil time.
+func (s *ServerStatistics) BackoffInfo() *time.Time {
until, ok := s.backoffUntil.Load().(time.Time)
if ok {
- return &until, s.blacklisted.Load()
+ return &until
}
- return nil, s.blacklisted.Load()
+ return nil
}
// Blacklisted returns true if the server is blacklisted and false
@@ -206,10 +268,33 @@ func (s *ServerStatistics) Blacklisted() bool {
return s.blacklisted.Load()
}
-// RemoveBlacklist removes the blacklisted status from the server.
-func (s *ServerStatistics) RemoveBlacklist() {
+// AssumedOffline returns true if the server is assumed offline and false
+// otherwise.
+func (s *ServerStatistics) AssumedOffline() bool {
+ return s.assumedOffline.Load()
+}
+
+// removeBlacklist removes the blacklisted status from the server.
+// Returns whether the server was blacklisted.
+func (s *ServerStatistics) removeBlacklist() bool {
+ var wasBlacklisted bool
+
+ if s.Blacklisted() {
+ wasBlacklisted = true
+ _ = s.statistics.DB.RemoveServerFromBlacklist(s.serverName)
+ }
s.cancel()
s.backoffCount.Store(0)
+
+ return wasBlacklisted
+}
+
+// removeAssumedOffline removes the assumed offline status from the server.
+func (s *ServerStatistics) removeAssumedOffline() {
+ if s.AssumedOffline() {
+ _ = s.statistics.DB.RemoveServerAssumedOffline(context.Background(), s.serverName)
+ }
+ s.assumedOffline.Store(false)
}
// SuccessCount returns the number of successful requests. This is
@@ -217,3 +302,46 @@ func (s *ServerStatistics) RemoveBlacklist() {
func (s *ServerStatistics) SuccessCount() uint32 {
return s.successCounter.Load()
}
+
+// KnownRelayServers returns the list of relay servers associated with this
+// server.
+func (s *ServerStatistics) KnownRelayServers() []gomatrixserverlib.ServerName {
+ s.relayMutex.Lock()
+ defer s.relayMutex.Unlock()
+ return s.knownRelayServers
+}
+
+func (s *ServerStatistics) AddRelayServers(relayServers []gomatrixserverlib.ServerName) {
+ seenSet := make(map[gomatrixserverlib.ServerName]bool)
+ uniqueList := []gomatrixserverlib.ServerName{}
+ for _, srv := range relayServers {
+ if seenSet[srv] {
+ continue
+ }
+ seenSet[srv] = true
+ uniqueList = append(uniqueList, srv)
+ }
+
+ err := s.statistics.DB.P2PAddRelayServersForServer(context.Background(), s.serverName, uniqueList)
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to add relay servers for %q. Servers: %v", s.serverName, uniqueList)
+ return
+ }
+
+ for _, newServer := range uniqueList {
+ alreadyKnown := false
+ knownRelayServers := s.KnownRelayServers()
+ for _, srv := range knownRelayServers {
+ if srv == newServer {
+ alreadyKnown = true
+ }
+ }
+ if !alreadyKnown {
+ {
+ s.relayMutex.Lock()
+ s.knownRelayServers = append(s.knownRelayServers, newServer)
+ s.relayMutex.Unlock()
+ }
+ }
+ }
+}