aboutsummaryrefslogtreecommitdiff
path: root/federationapi/statistics
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-01-23 17:55:12 +0000
committerGitHub <noreply@github.com>2023-01-23 17:55:12 +0000
commit5b73592f5a4dddf64184fcbe33f4c1835c656480 (patch)
treeb6dac51b6be7a1e591f24881ee1bfae1b92088e9 /federationapi/statistics
parent48fa869fa3578741d1d5775d30f24f6b097ab995 (diff)
Initial Store & Forward Implementation (#2917)
This adds store & forward relays into dendrite for p2p. A few things have changed: - new relay api serves new http endpoints for s&f federation - updated outbound federation queueing which will attempt to forward using s&f if appropriate - database entries to track s&f relays for other nodes
Diffstat (limited to 'federationapi/statistics')
-rw-r--r--federationapi/statistics/statistics.go186
-rw-r--r--federationapi/statistics/statistics_test.go58
2 files changed, 209 insertions, 35 deletions
diff --git a/federationapi/statistics/statistics.go b/federationapi/statistics/statistics.go
index 0a44375c..866c0933 100644
--- a/federationapi/statistics/statistics.go
+++ b/federationapi/statistics/statistics.go
@@ -1,6 +1,7 @@
package statistics
import (
+ "context"
"math"
"math/rand"
"sync"
@@ -28,14 +29,24 @@ type Statistics struct {
// just blacklist the host altogether? The backoff is exponential,
// so the max time here to attempt is 2**failures seconds.
FailuresUntilBlacklist uint32
+
+ // How many times should we tolerate consecutive failures before we
+ // mark the destination as offline. At this point we should attempt
+ // to send messages to the user's async relay servers if we know them.
+ FailuresUntilAssumedOffline uint32
}
-func NewStatistics(db storage.Database, failuresUntilBlacklist uint32) Statistics {
+func NewStatistics(
+ db storage.Database,
+ failuresUntilBlacklist uint32,
+ failuresUntilAssumedOffline uint32,
+) Statistics {
return Statistics{
- DB: db,
- FailuresUntilBlacklist: failuresUntilBlacklist,
- backoffTimers: make(map[gomatrixserverlib.ServerName]*time.Timer),
- servers: make(map[gomatrixserverlib.ServerName]*ServerStatistics),
+ DB: db,
+ FailuresUntilBlacklist: failuresUntilBlacklist,
+ FailuresUntilAssumedOffline: failuresUntilAssumedOffline,
+ backoffTimers: make(map[gomatrixserverlib.ServerName]*time.Timer),
+ servers: make(map[gomatrixserverlib.ServerName]*ServerStatistics),
}
}
@@ -50,8 +61,9 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
if !found {
s.mutex.Lock()
server = &ServerStatistics{
- statistics: s,
- serverName: serverName,
+ statistics: s,
+ serverName: serverName,
+ knownRelayServers: []gomatrixserverlib.ServerName{},
}
s.servers[serverName] = server
s.mutex.Unlock()
@@ -61,24 +73,49 @@ func (s *Statistics) ForServer(serverName gomatrixserverlib.ServerName) *ServerS
} else {
server.blacklisted.Store(blacklisted)
}
+ assumedOffline, err := s.DB.IsServerAssumedOffline(context.Background(), serverName)
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to get assumed offline entry %q", serverName)
+ } else {
+ server.assumedOffline.Store(assumedOffline)
+ }
+
+ knownRelayServers, err := s.DB.P2PGetRelayServersForServer(context.Background(), serverName)
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to get relay server list for %q", serverName)
+ } else {
+ server.relayMutex.Lock()
+ server.knownRelayServers = knownRelayServers
+ server.relayMutex.Unlock()
+ }
}
return server
}
+type SendMethod uint8
+
+const (
+ SendDirect SendMethod = iota
+ SendViaRelay
+)
+
// ServerStatistics contains information about our interactions with a
// remote federated host, e.g. how many times we were successful, how
// many times we failed etc. It also manages the backoff time and black-
// listing a remote host if it remains uncooperative.
type ServerStatistics struct {
- statistics *Statistics //
- serverName gomatrixserverlib.ServerName //
- blacklisted atomic.Bool // is the node blacklisted
- backoffStarted atomic.Bool // is the backoff started
- backoffUntil atomic.Value // time.Time until this backoff interval ends
- backoffCount atomic.Uint32 // number of times BackoffDuration has been called
- successCounter atomic.Uint32 // how many times have we succeeded?
- backoffNotifier func() // notifies destination queue when backoff completes
- notifierMutex sync.Mutex
+ statistics *Statistics //
+ serverName gomatrixserverlib.ServerName //
+ blacklisted atomic.Bool // is the node blacklisted
+ assumedOffline atomic.Bool // is the node assumed to be offline
+ backoffStarted atomic.Bool // is the backoff started
+ backoffUntil atomic.Value // time.Time until this backoff interval ends
+ backoffCount atomic.Uint32 // number of times BackoffDuration has been called
+ successCounter atomic.Uint32 // how many times have we succeeded?
+ backoffNotifier func() // notifies destination queue when backoff completes
+ notifierMutex sync.Mutex
+ knownRelayServers []gomatrixserverlib.ServerName
+ relayMutex sync.Mutex
}
const maxJitterMultiplier = 1.4
@@ -113,13 +150,19 @@ func (s *ServerStatistics) AssignBackoffNotifier(notifier func()) {
// attempt, which increases the sent counter and resets the idle and
// failure counters. If a host was blacklisted at this point then
// we will unblacklist it.
-func (s *ServerStatistics) Success() {
+// `relay` specifies whether the success was to the actual destination
+// or one of their relay servers.
+func (s *ServerStatistics) Success(method SendMethod) {
s.cancel()
s.backoffCount.Store(0)
- s.successCounter.Inc()
- if s.statistics.DB != nil {
- if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
- logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
+ // NOTE : Sending to the final destination vs. a relay server has
+ // slightly different semantics.
+ if method == SendDirect {
+ s.successCounter.Inc()
+ if s.blacklisted.Load() && s.statistics.DB != nil {
+ if err := s.statistics.DB.RemoveServerFromBlacklist(s.serverName); err != nil {
+ logrus.WithError(err).Errorf("Failed to remove %q from blacklist", s.serverName)
+ }
}
}
}
@@ -139,7 +182,18 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
// start a goroutine which will wait out the backoff and
// unset the backoffStarted flag when done.
if s.backoffStarted.CompareAndSwap(false, true) {
- if s.backoffCount.Inc() >= s.statistics.FailuresUntilBlacklist {
+ backoffCount := s.backoffCount.Inc()
+
+ if backoffCount >= s.statistics.FailuresUntilAssumedOffline {
+ s.assumedOffline.CompareAndSwap(false, true)
+ if s.statistics.DB != nil {
+ if err := s.statistics.DB.SetServerAssumedOffline(context.Background(), s.serverName); err != nil {
+ logrus.WithError(err).Errorf("Failed to set %q as assumed offline", s.serverName)
+ }
+ }
+ }
+
+ if backoffCount >= s.statistics.FailuresUntilBlacklist {
s.blacklisted.Store(true)
if s.statistics.DB != nil {
if err := s.statistics.DB.AddServerToBlacklist(s.serverName); err != nil {
@@ -157,13 +211,21 @@ func (s *ServerStatistics) Failure() (time.Time, bool) {
s.backoffUntil.Store(until)
s.statistics.backoffMutex.Lock()
- defer s.statistics.backoffMutex.Unlock()
s.statistics.backoffTimers[s.serverName] = time.AfterFunc(time.Until(until), s.backoffFinished)
+ s.statistics.backoffMutex.Unlock()
}
return s.backoffUntil.Load().(time.Time), false
}
+// MarkServerAlive removes the assumed offline and blacklisted statuses from this server.
+// Returns whether the server was blacklisted before this point.
+func (s *ServerStatistics) MarkServerAlive() bool {
+ s.removeAssumedOffline()
+ wasBlacklisted := s.removeBlacklist()
+ return wasBlacklisted
+}
+
// ClearBackoff stops the backoff timer for this destination if it is running
// and removes the timer from the backoffTimers map.
func (s *ServerStatistics) ClearBackoff() {
@@ -191,13 +253,13 @@ func (s *ServerStatistics) backoffFinished() {
}
// BackoffInfo returns information about the current or previous backoff.
-// Returns the last backoffUntil time and whether the server is currently blacklisted or not.
-func (s *ServerStatistics) BackoffInfo() (*time.Time, bool) {
+// Returns the last backoffUntil time.
+func (s *ServerStatistics) BackoffInfo() *time.Time {
until, ok := s.backoffUntil.Load().(time.Time)
if ok {
- return &until, s.blacklisted.Load()
+ return &until
}
- return nil, s.blacklisted.Load()
+ return nil
}
// Blacklisted returns true if the server is blacklisted and false
@@ -206,10 +268,33 @@ func (s *ServerStatistics) Blacklisted() bool {
return s.blacklisted.Load()
}
-// RemoveBlacklist removes the blacklisted status from the server.
-func (s *ServerStatistics) RemoveBlacklist() {
+// AssumedOffline returns true if the server is assumed offline and false
+// otherwise.
+func (s *ServerStatistics) AssumedOffline() bool {
+ return s.assumedOffline.Load()
+}
+
+// removeBlacklist removes the blacklisted status from the server.
+// Returns whether the server was blacklisted.
+func (s *ServerStatistics) removeBlacklist() bool {
+ var wasBlacklisted bool
+
+ if s.Blacklisted() {
+ wasBlacklisted = true
+ _ = s.statistics.DB.RemoveServerFromBlacklist(s.serverName)
+ }
s.cancel()
s.backoffCount.Store(0)
+
+ return wasBlacklisted
+}
+
+// removeAssumedOffline removes the assumed offline status from the server.
+func (s *ServerStatistics) removeAssumedOffline() {
+ if s.AssumedOffline() {
+ _ = s.statistics.DB.RemoveServerAssumedOffline(context.Background(), s.serverName)
+ }
+ s.assumedOffline.Store(false)
}
// SuccessCount returns the number of successful requests. This is
@@ -217,3 +302,46 @@ func (s *ServerStatistics) RemoveBlacklist() {
func (s *ServerStatistics) SuccessCount() uint32 {
return s.successCounter.Load()
}
+
+// KnownRelayServers returns the list of relay servers associated with this
+// server.
+func (s *ServerStatistics) KnownRelayServers() []gomatrixserverlib.ServerName {
+ s.relayMutex.Lock()
+ defer s.relayMutex.Unlock()
+ return s.knownRelayServers
+}
+
+func (s *ServerStatistics) AddRelayServers(relayServers []gomatrixserverlib.ServerName) {
+ seenSet := make(map[gomatrixserverlib.ServerName]bool)
+ uniqueList := []gomatrixserverlib.ServerName{}
+ for _, srv := range relayServers {
+ if seenSet[srv] {
+ continue
+ }
+ seenSet[srv] = true
+ uniqueList = append(uniqueList, srv)
+ }
+
+ err := s.statistics.DB.P2PAddRelayServersForServer(context.Background(), s.serverName, uniqueList)
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to add relay servers for %q. Servers: %v", s.serverName, uniqueList)
+ return
+ }
+
+ for _, newServer := range uniqueList {
+ alreadyKnown := false
+ knownRelayServers := s.KnownRelayServers()
+ for _, srv := range knownRelayServers {
+ if srv == newServer {
+ alreadyKnown = true
+ }
+ }
+ if !alreadyKnown {
+ {
+ s.relayMutex.Lock()
+ s.knownRelayServers = append(s.knownRelayServers, newServer)
+ s.relayMutex.Unlock()
+ }
+ }
+ }
+}
diff --git a/federationapi/statistics/statistics_test.go b/federationapi/statistics/statistics_test.go
index 6aa997f4..183b9aa0 100644
--- a/federationapi/statistics/statistics_test.go
+++ b/federationapi/statistics/statistics_test.go
@@ -4,17 +4,26 @@ import (
"math"
"testing"
"time"
+
+ "github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/stretchr/testify/assert"
+)
+
+const (
+ FailuresUntilAssumedOffline = 3
+ FailuresUntilBlacklist = 8
)
func TestBackoff(t *testing.T) {
- stats := NewStatistics(nil, 7)
+ stats := NewStatistics(nil, FailuresUntilBlacklist, FailuresUntilAssumedOffline)
server := ServerStatistics{
statistics: &stats,
serverName: "test.com",
}
// Start by checking that counting successes works.
- server.Success()
+ server.Success(SendDirect)
if successes := server.SuccessCount(); successes != 1 {
t.Fatalf("Expected success count 1, got %d", successes)
}
@@ -31,9 +40,8 @@ func TestBackoff(t *testing.T) {
// side effects since a backoff is already in progress. If it does
// then we'll fail.
until, blacklisted := server.Failure()
-
- // Get the duration.
- _, blacklist := server.BackoffInfo()
+ blacklist := server.Blacklisted()
+ assumedOffline := server.AssumedOffline()
duration := time.Until(until)
// Unset the backoff, or otherwise our next call will think that
@@ -41,16 +49,43 @@ func TestBackoff(t *testing.T) {
server.cancel()
server.backoffStarted.Store(false)
+ if i >= stats.FailuresUntilAssumedOffline {
+ if !assumedOffline {
+ t.Fatalf("Backoff %d should have resulted in assuming the destination was offline but didn't", i)
+ }
+ }
+
+ // Check if we should be assumed offline by now.
+ if i >= stats.FailuresUntilAssumedOffline {
+ if !assumedOffline {
+ t.Fatalf("Backoff %d should have resulted in assumed offline but didn't", i)
+ } else {
+ t.Logf("Backoff %d is assumed offline as expected", i)
+ }
+ } else {
+ if assumedOffline {
+ t.Fatalf("Backoff %d should not have resulted in assumed offline but did", i)
+ } else {
+ t.Logf("Backoff %d is not assumed offline as expected", i)
+ }
+ }
+
// Check if we should be blacklisted by now.
if i >= stats.FailuresUntilBlacklist {
if !blacklist {
t.Fatalf("Backoff %d should have resulted in blacklist but didn't", i)
} else if blacklist != blacklisted {
- t.Fatalf("BackoffInfo and Failure returned different blacklist values")
+ t.Fatalf("Blacklisted and Failure returned different blacklist values")
} else {
t.Logf("Backoff %d is blacklisted as expected", i)
continue
}
+ } else {
+ if blacklist {
+ t.Fatalf("Backoff %d should not have resulted in blacklist but did", i)
+ } else {
+ t.Logf("Backoff %d is not blacklisted as expected", i)
+ }
}
// Check if the duration is what we expect.
@@ -69,3 +104,14 @@ func TestBackoff(t *testing.T) {
}
}
}
+
+func TestRelayServersListing(t *testing.T) {
+ stats := NewStatistics(test.NewInMemoryFederationDatabase(), FailuresUntilBlacklist, FailuresUntilAssumedOffline)
+ server := ServerStatistics{statistics: &stats}
+ server.AddRelayServers([]gomatrixserverlib.ServerName{"relay1", "relay1", "relay2"})
+ relayServers := server.KnownRelayServers()
+ assert.Equal(t, []gomatrixserverlib.ServerName{"relay1", "relay2"}, relayServers)
+ server.AddRelayServers([]gomatrixserverlib.ServerName{"relay1", "relay1", "relay2"})
+ relayServers = server.KnownRelayServers()
+ assert.Equal(t, []gomatrixserverlib.ServerName{"relay1", "relay2"}, relayServers)
+}