aboutsummaryrefslogtreecommitdiff
path: root/federationapi/queue/queue_test.go
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-01-23 17:55:12 +0000
committerGitHub <noreply@github.com>2023-01-23 17:55:12 +0000
commit5b73592f5a4dddf64184fcbe33f4c1835c656480 (patch)
treeb6dac51b6be7a1e591f24881ee1bfae1b92088e9 /federationapi/queue/queue_test.go
parent48fa869fa3578741d1d5775d30f24f6b097ab995 (diff)
Initial Store & Forward Implementation (#2917)
This adds store & forward relays into dendrite for p2p. A few things have changed: - new relay api serves new http endpoints for s&f federation - updated outbound federation queueing which will attempt to forward using s&f if appropriate - database entries to track s&f relays for other nodes
Diffstat (limited to 'federationapi/queue/queue_test.go')
-rw-r--r--federationapi/queue/queue_test.go436
1 files changed, 189 insertions, 247 deletions
diff --git a/federationapi/queue/queue_test.go b/federationapi/queue/queue_test.go
index c317edc2..36e2ccbc 100644
--- a/federationapi/queue/queue_test.go
+++ b/federationapi/queue/queue_test.go
@@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
- "sync"
"testing"
"time"
@@ -26,13 +25,11 @@ import (
"gotest.tools/v3/poll"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
- "github.com/matrix-org/dendrite/federationapi/storage/shared"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
@@ -57,7 +54,7 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase
}
} else {
// Fake Database
- db := createDatabase()
+ db := test.NewInMemoryFederationDatabase()
b := struct {
ProcessContext *process.ProcessContext
}{ProcessContext: process.NewProcessContext()}
@@ -65,220 +62,6 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase
}
}
-func createDatabase() storage.Database {
- return &fakeDatabase{
- pendingPDUServers: make(map[gomatrixserverlib.ServerName]struct{}),
- pendingEDUServers: make(map[gomatrixserverlib.ServerName]struct{}),
- blacklistedServers: make(map[gomatrixserverlib.ServerName]struct{}),
- pendingPDUs: make(map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent),
- pendingEDUs: make(map[*shared.Receipt]*gomatrixserverlib.EDU),
- associatedPDUs: make(map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}),
- associatedEDUs: make(map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}),
- }
-}
-
-type fakeDatabase struct {
- storage.Database
- dbMutex sync.Mutex
- pendingPDUServers map[gomatrixserverlib.ServerName]struct{}
- pendingEDUServers map[gomatrixserverlib.ServerName]struct{}
- blacklistedServers map[gomatrixserverlib.ServerName]struct{}
- pendingPDUs map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent
- pendingEDUs map[*shared.Receipt]*gomatrixserverlib.EDU
- associatedPDUs map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}
- associatedEDUs map[gomatrixserverlib.ServerName]map[*shared.Receipt]struct{}
-}
-
-var nidMutex sync.Mutex
-var nid = int64(0)
-
-func (d *fakeDatabase) StoreJSON(ctx context.Context, js string) (*shared.Receipt, error) {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- var event gomatrixserverlib.HeaderedEvent
- if err := json.Unmarshal([]byte(js), &event); err == nil {
- nidMutex.Lock()
- defer nidMutex.Unlock()
- nid++
- receipt := shared.NewReceipt(nid)
- d.pendingPDUs[&receipt] = &event
- return &receipt, nil
- }
-
- var edu gomatrixserverlib.EDU
- if err := json.Unmarshal([]byte(js), &edu); err == nil {
- nidMutex.Lock()
- defer nidMutex.Unlock()
- nid++
- receipt := shared.NewReceipt(nid)
- d.pendingEDUs[&receipt] = &edu
- return &receipt, nil
- }
-
- return nil, errors.New("Failed to determine type of json to store")
-}
-
-func (d *fakeDatabase) GetPendingPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (pdus map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent, err error) {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- pduCount := 0
- pdus = make(map[*shared.Receipt]*gomatrixserverlib.HeaderedEvent)
- if receipts, ok := d.associatedPDUs[serverName]; ok {
- for receipt := range receipts {
- if event, ok := d.pendingPDUs[receipt]; ok {
- pdus[receipt] = event
- pduCount++
- if pduCount == limit {
- break
- }
- }
- }
- }
- return pdus, nil
-}
-
-func (d *fakeDatabase) GetPendingEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, limit int) (edus map[*shared.Receipt]*gomatrixserverlib.EDU, err error) {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- eduCount := 0
- edus = make(map[*shared.Receipt]*gomatrixserverlib.EDU)
- if receipts, ok := d.associatedEDUs[serverName]; ok {
- for receipt := range receipts {
- if event, ok := d.pendingEDUs[receipt]; ok {
- edus[receipt] = event
- eduCount++
- if eduCount == limit {
- break
- }
- }
- }
- }
- return edus, nil
-}
-
-func (d *fakeDatabase) AssociatePDUWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.ServerName]struct{}, receipt *shared.Receipt) error {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- if _, ok := d.pendingPDUs[receipt]; ok {
- for destination := range destinations {
- if _, ok := d.associatedPDUs[destination]; !ok {
- d.associatedPDUs[destination] = make(map[*shared.Receipt]struct{})
- }
- d.associatedPDUs[destination][receipt] = struct{}{}
- }
-
- return nil
- } else {
- return errors.New("PDU doesn't exist")
- }
-}
-
-func (d *fakeDatabase) AssociateEDUWithDestinations(ctx context.Context, destinations map[gomatrixserverlib.ServerName]struct{}, receipt *shared.Receipt, eduType string, expireEDUTypes map[string]time.Duration) error {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- if _, ok := d.pendingEDUs[receipt]; ok {
- for destination := range destinations {
- if _, ok := d.associatedEDUs[destination]; !ok {
- d.associatedEDUs[destination] = make(map[*shared.Receipt]struct{})
- }
- d.associatedEDUs[destination][receipt] = struct{}{}
- }
-
- return nil
- } else {
- return errors.New("EDU doesn't exist")
- }
-}
-
-func (d *fakeDatabase) CleanPDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- if pdus, ok := d.associatedPDUs[serverName]; ok {
- for _, receipt := range receipts {
- delete(pdus, receipt)
- }
- }
-
- return nil
-}
-
-func (d *fakeDatabase) CleanEDUs(ctx context.Context, serverName gomatrixserverlib.ServerName, receipts []*shared.Receipt) error {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- if edus, ok := d.associatedEDUs[serverName]; ok {
- for _, receipt := range receipts {
- delete(edus, receipt)
- }
- }
-
- return nil
-}
-
-func (d *fakeDatabase) GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- servers := []gomatrixserverlib.ServerName{}
- for server := range d.pendingPDUServers {
- servers = append(servers, server)
- }
- return servers, nil
-}
-
-func (d *fakeDatabase) GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error) {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- servers := []gomatrixserverlib.ServerName{}
- for server := range d.pendingEDUServers {
- servers = append(servers, server)
- }
- return servers, nil
-}
-
-func (d *fakeDatabase) AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- d.blacklistedServers[serverName] = struct{}{}
- return nil
-}
-
-func (d *fakeDatabase) RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- delete(d.blacklistedServers, serverName)
- return nil
-}
-
-func (d *fakeDatabase) RemoveAllServersFromBlacklist() error {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- d.blacklistedServers = make(map[gomatrixserverlib.ServerName]struct{})
- return nil
-}
-
-func (d *fakeDatabase) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
- d.dbMutex.Lock()
- defer d.dbMutex.Unlock()
-
- isBlacklisted := false
- if _, ok := d.blacklistedServers[serverName]; ok {
- isBlacklisted = true
- }
-
- return isBlacklisted, nil
-}
-
type stubFederationRoomServerAPI struct {
rsapi.FederationRoomserverAPI
}
@@ -290,8 +73,10 @@ func (r *stubFederationRoomServerAPI) QueryServerBannedFromRoom(ctx context.Cont
type stubFederationClient struct {
api.FederationClient
- shouldTxSucceed bool
- txCount atomic.Uint32
+ shouldTxSucceed bool
+ shouldTxRelaySucceed bool
+ txCount atomic.Uint32
+ txRelayCount atomic.Uint32
}
func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixserverlib.Transaction) (res gomatrixserverlib.RespSend, err error) {
@@ -304,6 +89,16 @@ func (f *stubFederationClient) SendTransaction(ctx context.Context, t gomatrixse
return gomatrixserverlib.RespSend{}, result
}
+func (f *stubFederationClient) P2PSendTransactionToRelay(ctx context.Context, u gomatrixserverlib.UserID, t gomatrixserverlib.Transaction, forwardingServer gomatrixserverlib.ServerName) (res gomatrixserverlib.EmptyResp, err error) {
+ var result error
+ if !f.shouldTxRelaySucceed {
+ result = fmt.Errorf("relay transaction failed")
+ }
+
+ f.txRelayCount.Add(1)
+ return gomatrixserverlib.EmptyResp{}, result
+}
+
func mustCreatePDU(t *testing.T) *gomatrixserverlib.HeaderedEvent {
t.Helper()
content := `{"type":"m.room.message"}`
@@ -319,15 +114,18 @@ func mustCreateEDU(t *testing.T) *gomatrixserverlib.EDU {
return &gomatrixserverlib.EDU{Type: gomatrixserverlib.MTyping}
}
-func testSetup(failuresUntilBlacklist uint32, shouldTxSucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) {
+func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32, shouldTxSucceed bool, shouldTxRelaySucceed bool, t *testing.T, dbType test.DBType, realDatabase bool) (storage.Database, *stubFederationClient, *OutgoingQueues, *process.ProcessContext, func()) {
db, processContext, close := mustCreateFederationDatabase(t, dbType, realDatabase)
fc := &stubFederationClient{
- shouldTxSucceed: shouldTxSucceed,
- txCount: *atomic.NewUint32(0),
+ shouldTxSucceed: shouldTxSucceed,
+ shouldTxRelaySucceed: shouldTxRelaySucceed,
+ txCount: *atomic.NewUint32(0),
+ txRelayCount: *atomic.NewUint32(0),
}
rs := &stubFederationRoomServerAPI{}
- stats := statistics.NewStatistics(db, failuresUntilBlacklist)
+
+ stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline)
signingInfo := []*gomatrixserverlib.SigningIdentity{
{
KeyID: "ed21019:auto",
@@ -344,7 +142,7 @@ func TestSendPDUOnSuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -373,7 +171,7 @@ func TestSendEDUOnSuccessRemovedFromDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -402,7 +200,7 @@ func TestSendPDUOnFailStoredInDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -432,7 +230,7 @@ func TestSendEDUOnFailStoredInDB(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -462,7 +260,7 @@ func TestSendPDUAgainDoesntInterruptBackoff(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -513,7 +311,7 @@ func TestSendEDUAgainDoesntInterruptBackoff(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -564,7 +362,7 @@ func TestSendPDUMultipleFailuresBlacklisted(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -596,7 +394,7 @@ func TestSendEDUMultipleFailuresBlacklisted(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -628,7 +426,7 @@ func TestSendPDUBlacklistedWithPriorExternalFailure(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -662,7 +460,7 @@ func TestSendEDUBlacklistedWithPriorExternalFailure(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(2)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -696,7 +494,7 @@ func TestRetryServerSendsPDUSuccessfully(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(1)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -730,8 +528,8 @@ func TestRetryServerSendsPDUSuccessfully(t *testing.T) {
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true
- db.RemoveServerFromBlacklist(destination)
- queues.RetryServer(destination)
+ wasBlacklisted := dest.statistics.MarkServerAlive()
+ queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result {
data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
@@ -747,7 +545,7 @@ func TestRetryServerSendsEDUSuccessfully(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(1)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -781,8 +579,8 @@ func TestRetryServerSendsEDUSuccessfully(t *testing.T) {
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true
- db.RemoveServerFromBlacklist(destination)
- queues.RetryServer(destination)
+ wasBlacklisted := dest.statistics.MarkServerAlive()
+ queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result {
data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
assert.NoError(t, dbErr)
@@ -801,7 +599,7 @@ func TestSendPDUBatches(t *testing.T) {
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -845,7 +643,7 @@ func TestSendEDUBatches(t *testing.T) {
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -889,7 +687,7 @@ func TestSendPDUAndEDUBatches(t *testing.T) {
// test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
// db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, dbType, true)
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -940,7 +738,7 @@ func TestExternalFailureBackoffDoesntStartQueue(t *testing.T) {
t.Parallel()
failuresUntilBlacklist := uint32(16)
destination := gomatrixserverlib.ServerName("remotehost")
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, true, t, test.DBTypeSQLite, false)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, true, false, t, test.DBTypeSQLite, false)
defer close()
defer func() {
pc.ShutdownDendrite()
@@ -978,7 +776,7 @@ func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
destination := gomatrixserverlib.ServerName("remotehost")
destinations := map[gomatrixserverlib.ServerName]struct{}{destination: {}}
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
- db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, false, t, dbType, true)
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilBlacklist+1, false, false, t, dbType, true)
// NOTE : These defers aren't called if go test is killed so the dbs may not get cleaned up.
defer close()
defer func() {
@@ -1023,8 +821,8 @@ func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
poll.WaitOn(t, checkBlacklisted, poll.WithTimeout(10*time.Second), poll.WithDelay(100*time.Millisecond))
fc.shouldTxSucceed = true
- db.RemoveServerFromBlacklist(destination)
- queues.RetryServer(destination)
+ wasBlacklisted := dest.statistics.MarkServerAlive()
+ queues.RetryServer(destination, wasBlacklisted)
checkRetry := func(log poll.LogT) poll.Result {
pduData, dbErrPDU := db.GetPendingPDUs(pc.Context(), destination, 200)
assert.NoError(t, dbErrPDU)
@@ -1038,3 +836,147 @@ func TestQueueInteractsWithRealDatabasePDUAndEDU(t *testing.T) {
poll.WaitOn(t, checkRetry, poll.WithTimeout(10*time.Second), poll.WithDelay(100*time.Millisecond))
})
}
+
+func TestSendPDUMultipleFailuresAssumedOffline(t *testing.T) {
+ t.Parallel()
+ failuresUntilBlacklist := uint32(7)
+ failuresUntilAssumedOffline := uint32(2)
+ destination := gomatrixserverlib.ServerName("remotehost")
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, false, t, test.DBTypeSQLite, false)
+ defer close()
+ defer func() {
+ pc.ShutdownDendrite()
+ <-pc.WaitForShutdown()
+ }()
+
+ ev := mustCreatePDU(t)
+ err := queues.SendEvent(ev, "localhost", []gomatrixserverlib.ServerName{destination})
+ assert.NoError(t, err)
+
+ check := func(log poll.LogT) poll.Result {
+ if fc.txCount.Load() == failuresUntilAssumedOffline {
+ data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
+ assert.NoError(t, dbErr)
+ if len(data) == 1 {
+ if val, _ := db.IsServerAssumedOffline(context.Background(), destination); val {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for server to be assumed offline")
+ }
+ return poll.Continue("waiting for event to be added to database. Currently present PDU: %d", len(data))
+ }
+ return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
+}
+
+func TestSendEDUMultipleFailuresAssumedOffline(t *testing.T) {
+ t.Parallel()
+ failuresUntilBlacklist := uint32(7)
+ failuresUntilAssumedOffline := uint32(2)
+ destination := gomatrixserverlib.ServerName("remotehost")
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, false, t, test.DBTypeSQLite, false)
+ defer close()
+ defer func() {
+ pc.ShutdownDendrite()
+ <-pc.WaitForShutdown()
+ }()
+
+ ev := mustCreateEDU(t)
+ err := queues.SendEDU(ev, "localhost", []gomatrixserverlib.ServerName{destination})
+ assert.NoError(t, err)
+
+ check := func(log poll.LogT) poll.Result {
+ if fc.txCount.Load() == failuresUntilAssumedOffline {
+ data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
+ assert.NoError(t, dbErr)
+ if len(data) == 1 {
+ if val, _ := db.IsServerAssumedOffline(context.Background(), destination); val {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for server to be assumed offline")
+ }
+ return poll.Continue("waiting for event to be added to database. Currently present EDU: %d", len(data))
+ }
+ return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
+}
+
+func TestSendPDUOnRelaySuccessRemovedFromDB(t *testing.T) {
+ t.Parallel()
+ failuresUntilBlacklist := uint32(16)
+ failuresUntilAssumedOffline := uint32(1)
+ destination := gomatrixserverlib.ServerName("remotehost")
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, true, t, test.DBTypeSQLite, false)
+ defer close()
+ defer func() {
+ pc.ShutdownDendrite()
+ <-pc.WaitForShutdown()
+ }()
+
+ relayServers := []gomatrixserverlib.ServerName{"relayserver"}
+ queues.statistics.ForServer(destination).AddRelayServers(relayServers)
+
+ ev := mustCreatePDU(t)
+ err := queues.SendEvent(ev, "localhost", []gomatrixserverlib.ServerName{destination})
+ assert.NoError(t, err)
+
+ check := func(log poll.LogT) poll.Result {
+ if fc.txCount.Load() == 1 {
+ if fc.txRelayCount.Load() == 1 {
+ data, dbErr := db.GetPendingPDUs(pc.Context(), destination, 100)
+ assert.NoError(t, dbErr)
+ if len(data) == 0 {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for event to be removed from database. Currently present PDU: %d", len(data))
+ }
+ return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load())
+ }
+ return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
+
+ assumedOffline, _ := db.IsServerAssumedOffline(context.Background(), destination)
+ assert.Equal(t, true, assumedOffline)
+}
+
+func TestSendEDUOnRelaySuccessRemovedFromDB(t *testing.T) {
+ t.Parallel()
+ failuresUntilBlacklist := uint32(16)
+ failuresUntilAssumedOffline := uint32(1)
+ destination := gomatrixserverlib.ServerName("remotehost")
+ db, fc, queues, pc, close := testSetup(failuresUntilBlacklist, failuresUntilAssumedOffline, false, true, t, test.DBTypeSQLite, false)
+ defer close()
+ defer func() {
+ pc.ShutdownDendrite()
+ <-pc.WaitForShutdown()
+ }()
+
+ relayServers := []gomatrixserverlib.ServerName{"relayserver"}
+ queues.statistics.ForServer(destination).AddRelayServers(relayServers)
+
+ ev := mustCreateEDU(t)
+ err := queues.SendEDU(ev, "localhost", []gomatrixserverlib.ServerName{destination})
+ assert.NoError(t, err)
+
+ check := func(log poll.LogT) poll.Result {
+ if fc.txCount.Load() == 1 {
+ if fc.txRelayCount.Load() == 1 {
+ data, dbErr := db.GetPendingEDUs(pc.Context(), destination, 100)
+ assert.NoError(t, dbErr)
+ if len(data) == 0 {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for event to be removed from database. Currently present EDU: %d", len(data))
+ }
+ return poll.Continue("waiting for more relay send attempts before checking database. Currently %d", fc.txRelayCount.Load())
+ }
+ return poll.Continue("waiting for more send attempts before checking database. Currently %d", fc.txCount.Load())
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
+
+ assumedOffline, _ := db.IsServerAssumedOffline(context.Background(), destination)
+ assert.Equal(t, true, assumedOffline)
+}