aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDevon Hudson <devonhudson@librem.one>2023-01-29 12:26:16 -0700
committerDevon Hudson <devonhudson@librem.one>2023-01-29 12:26:16 -0700
commit0f998e3af3be06a1f0626de8cb74413c5da310f4 (patch)
tree34b439398155566574445a23ec8974a2a543556c
parent63df85db6d5bc528a784dc52e550fc64385c5f67 (diff)
Add pinecone demo toggle for dis/enabling relaying for other nodes
-rw-r--r--build/gobind-pinecone/monolith.go13
-rw-r--r--cmd/dendrite-demo-pinecone/main.go2
-rw-r--r--relayapi/api/api.go6
-rw-r--r--relayapi/internal/api.go6
-rw-r--r--relayapi/internal/perform.go14
-rw-r--r--relayapi/internal/perform_test.go6
-rw-r--r--relayapi/relayapi.go2
-rw-r--r--relayapi/relayapi_test.go52
-rw-r--r--relayapi/routing/relaytxn.go4
-rw-r--r--relayapi/routing/relaytxn_test.go8
-rw-r--r--relayapi/routing/routing.go17
-rw-r--r--relayapi/routing/sendrelay.go4
-rw-r--r--relayapi/routing/sendrelay_test.go10
13 files changed, 119 insertions, 25 deletions
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go
index 5e8e5875..a219621b 100644
--- a/build/gobind-pinecone/monolith.go
+++ b/build/gobind-pinecone/monolith.go
@@ -90,6 +90,7 @@ type DendriteMonolith struct {
httpServer *http.Server
userAPI userapiAPI.UserInternalAPI
federationAPI api.FederationInternalAPI
+ relayAPI relayServerAPI.RelayInternalAPI
relayRetriever RelayServerRetriever
}
@@ -313,6 +314,14 @@ func (m *DendriteMonolith) GetRelayServers(nodeID string) string {
return relaysString
}
+func (m *DendriteMonolith) RelayingEnabled() bool {
+ return m.relayAPI.RelayingEnabled()
+}
+
+func (m *DendriteMonolith) SetRelayingEnabled(enabled bool) {
+ m.relayAPI.SetRelayingEnabled(enabled)
+}
+
func (m *DendriteMonolith) DisconnectType(peertype int) {
for _, p := range m.PineconeRouter.Peers() {
if int(peertype) == p.PeerType {
@@ -528,7 +537,7 @@ func (m *DendriteMonolith) Start() {
Config: &base.Cfg.FederationAPI,
UserAPI: m.userAPI,
}
- relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer)
+ m.relayAPI = relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, false)
monolith := setup.Monolith{
Config: base.Cfg,
@@ -541,7 +550,7 @@ func (m *DendriteMonolith) Start() {
RoomserverAPI: rsAPI,
UserAPI: m.userAPI,
KeyAPI: keyAPI,
- RelayAPI: relayAPI,
+ RelayAPI: m.relayAPI,
ExtPublicRoomsProvider: roomProvider,
ExtUserDirectoryProvider: userProvider,
}
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go
index a813c37a..83655b69 100644
--- a/cmd/dendrite-demo-pinecone/main.go
+++ b/cmd/dendrite-demo-pinecone/main.go
@@ -244,7 +244,7 @@ func main() {
Config: &base.Cfg.FederationAPI,
UserAPI: userAPI,
}
- relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer)
+ relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer, true)
monolith := setup.Monolith{
Config: base.Cfg,
diff --git a/relayapi/api/api.go b/relayapi/api/api.go
index 9db39322..9b4b62e5 100644
--- a/relayapi/api/api.go
+++ b/relayapi/api/api.go
@@ -30,6 +30,12 @@ type RelayInternalAPI interface {
userID gomatrixserverlib.UserID,
relayServer gomatrixserverlib.ServerName,
) error
+
+ // Tells the relayapi whether or not it should act as a relay server for external servers.
+ SetRelayingEnabled(bool)
+
+ // Obtain whether the relayapi is currently configured to act as a relay server for external servers.
+ RelayingEnabled() bool
}
// RelayServerAPI exposes the store & query transaction functionality of a relay server.
diff --git a/relayapi/internal/api.go b/relayapi/internal/api.go
index 3ff8c2ad..3a5762fb 100644
--- a/relayapi/internal/api.go
+++ b/relayapi/internal/api.go
@@ -15,6 +15,8 @@
package internal
import (
+ "sync"
+
fedAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/relayapi/storage"
@@ -30,6 +32,8 @@ type RelayInternalAPI struct {
producer *producers.SyncAPIProducer
presenceEnabledInbound bool
serverName gomatrixserverlib.ServerName
+ relayingEnabledMutex sync.Mutex
+ relayingEnabled bool
}
func NewRelayInternalAPI(
@@ -40,6 +44,7 @@ func NewRelayInternalAPI(
producer *producers.SyncAPIProducer,
presenceEnabledInbound bool,
serverName gomatrixserverlib.ServerName,
+ relayingEnabled bool,
) *RelayInternalAPI {
return &RelayInternalAPI{
db: db,
@@ -49,5 +54,6 @@ func NewRelayInternalAPI(
producer: producer,
presenceEnabledInbound: presenceEnabledInbound,
serverName: serverName,
+ relayingEnabled: relayingEnabled,
}
}
diff --git a/relayapi/internal/perform.go b/relayapi/internal/perform.go
index 59429933..c00d0d0e 100644
--- a/relayapi/internal/perform.go
+++ b/relayapi/internal/perform.go
@@ -24,6 +24,20 @@ import (
"github.com/sirupsen/logrus"
)
+// SetRelayingEnabled implements api.RelayInternalAPI
+func (r *RelayInternalAPI) SetRelayingEnabled(enabled bool) {
+ r.relayingEnabledMutex.Lock()
+ defer r.relayingEnabledMutex.Unlock()
+ r.relayingEnabled = enabled
+}
+
+// RelayingEnabled implements api.RelayInternalAPI
+func (r *RelayInternalAPI) RelayingEnabled() bool {
+ r.relayingEnabledMutex.Lock()
+ defer r.relayingEnabledMutex.Unlock()
+ return r.relayingEnabled
+}
+
// PerformRelayServerSync implements api.RelayInternalAPI
func (r *RelayInternalAPI) PerformRelayServerSync(
ctx context.Context,
diff --git a/relayapi/internal/perform_test.go b/relayapi/internal/perform_test.go
index fb71b7d0..5673b199 100644
--- a/relayapi/internal/perform_test.go
+++ b/relayapi/internal/perform_test.go
@@ -72,7 +72,7 @@ func TestPerformRelayServerSync(t *testing.T) {
fedClient := &testFedClient{}
relayAPI := NewRelayInternalAPI(
- &db, fedClient, nil, nil, nil, false, "",
+ &db, fedClient, nil, nil, nil, false, "", true,
)
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))
@@ -92,7 +92,7 @@ func TestPerformRelayServerSyncFedError(t *testing.T) {
fedClient := &testFedClient{shouldFail: true}
relayAPI := NewRelayInternalAPI(
- &db, fedClient, nil, nil, nil, false, "",
+ &db, fedClient, nil, nil, nil, false, "", true,
)
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))
@@ -112,7 +112,7 @@ func TestPerformRelayServerSyncRunsUntilQueueEmpty(t *testing.T) {
fedClient := &testFedClient{queueDepth: 2}
relayAPI := NewRelayInternalAPI(
- &db, fedClient, nil, nil, nil, false, "",
+ &db, fedClient, nil, nil, nil, false, "", true,
)
err = relayAPI.PerformRelayServerSync(context.Background(), *userID, gomatrixserverlib.ServerName("relay"))
diff --git a/relayapi/relayapi.go b/relayapi/relayapi.go
index f9f9d4ff..200a1814 100644
--- a/relayapi/relayapi.go
+++ b/relayapi/relayapi.go
@@ -54,6 +54,7 @@ func NewRelayInternalAPI(
rsAPI rsAPI.RoomserverInternalAPI,
keyRing *gomatrixserverlib.KeyRing,
producer *producers.SyncAPIProducer,
+ relayingEnabled bool,
) api.RelayInternalAPI {
cfg := &base.Cfg.RelayAPI
@@ -70,5 +71,6 @@ func NewRelayInternalAPI(
producer,
base.Cfg.Global.Presence.EnableInbound,
base.Cfg.Global.ServerName,
+ relayingEnabled,
)
}
diff --git a/relayapi/relayapi_test.go b/relayapi/relayapi_test.go
index dfa06811..f1b3262a 100644
--- a/relayapi/relayapi_test.go
+++ b/relayapi/relayapi_test.go
@@ -36,7 +36,7 @@ func TestCreateNewRelayInternalAPI(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
- relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil)
+ relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true)
assert.NotNil(t, relayAPI)
})
}
@@ -52,7 +52,7 @@ func TestCreateRelayInternalInvalidDatabasePanics(t *testing.T) {
defer close()
assert.Panics(t, func() {
- relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil)
+ relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true)
})
})
}
@@ -108,7 +108,7 @@ func TestCreateRelayPublicRoutes(t *testing.T) {
base, close := testrig.CreateBaseDendrite(t, dbType)
defer close()
- relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil)
+ relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, true)
assert.NotNil(t, relayAPI)
serverKeyAPI := &signing.YggdrasilKeys{}
@@ -116,10 +116,9 @@ func TestCreateRelayPublicRoutes(t *testing.T) {
relayapi.AddPublicRoutes(base, keyRing, relayAPI)
testCases := []struct {
- name string
- req *http.Request
- wantCode int
- wantJoinedRooms []string
+ name string
+ req *http.Request
+ wantCode int
}{
{
name: "relay_txn invalid user id",
@@ -152,3 +151,42 @@ func TestCreateRelayPublicRoutes(t *testing.T) {
}
})
}
+
+func TestDisableRelayPublicRoutes(t *testing.T) {
+ test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
+ base, close := testrig.CreateBaseDendrite(t, dbType)
+ defer close()
+
+ relayAPI := relayapi.NewRelayInternalAPI(base, nil, nil, nil, nil, false)
+ assert.NotNil(t, relayAPI)
+
+ serverKeyAPI := &signing.YggdrasilKeys{}
+ keyRing := serverKeyAPI.KeyRing()
+ relayapi.AddPublicRoutes(base, keyRing, relayAPI)
+
+ testCases := []struct {
+ name string
+ req *http.Request
+ wantCode int
+ }{
+ {
+ name: "relay_txn valid user id",
+ req: createGetRelayTxnHTTPRequest(base.Cfg.Global.ServerName, "@user:local"),
+ wantCode: 404,
+ },
+ {
+ name: "send_relay valid user id",
+ req: createSendRelayTxnHTTPRequest(base.Cfg.Global.ServerName, "123", "@user:local"),
+ wantCode: 404,
+ },
+ }
+
+ for _, tc := range testCases {
+ w := httptest.NewRecorder()
+ base.PublicFederationAPIMux.ServeHTTP(w, tc.req)
+ if w.Code != tc.wantCode {
+ t.Fatalf("%s: got HTTP %d want %d", tc.name, w.Code, tc.wantCode)
+ }
+ }
+ })
+}
diff --git a/relayapi/routing/relaytxn.go b/relayapi/routing/relaytxn.go
index 1b11b0ec..76241a33 100644
--- a/relayapi/routing/relaytxn.go
+++ b/relayapi/routing/relaytxn.go
@@ -31,7 +31,7 @@ type RelayTransactionResponse struct {
EntriesQueued bool `json:"entries_queued"`
}
-// GetTransactionFromRelay implements /_matrix/federation/v1/relay_txn/{userID}
+// GetTransactionFromRelay implements GET /_matrix/federation/v1/relay_txn/{userID}
// This endpoint can be extracted into a separate relay server service.
func GetTransactionFromRelay(
httpReq *http.Request,
@@ -39,7 +39,7 @@ func GetTransactionFromRelay(
relayAPI api.RelayInternalAPI,
userID gomatrixserverlib.UserID,
) util.JSONResponse {
- logrus.Infof("Handling relay_txn for %s", userID.Raw())
+ logrus.Infof("Processing relay_txn for %s", userID.Raw())
previousEntry := gomatrixserverlib.RelayEntry{}
if err := json.Unmarshal(fedReq.Content(), &previousEntry); err != nil {
diff --git a/relayapi/routing/relaytxn_test.go b/relayapi/routing/relaytxn_test.go
index a47fdb19..3cb4c8ad 100644
--- a/relayapi/routing/relaytxn_test.go
+++ b/relayapi/routing/relaytxn_test.go
@@ -57,7 +57,7 @@ func TestGetEmptyDatabaseReturnsNothing(t *testing.T) {
assert.NoError(t, err, "Failed to store transaction")
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
request := createQuery(*userID, gomatrixserverlib.RelayEntry{})
@@ -90,7 +90,7 @@ func TestGetInvalidPrevEntryFails(t *testing.T) {
assert.NoError(t, err, "Failed to store transaction")
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
request := createQuery(*userID, gomatrixserverlib.RelayEntry{EntryID: -1})
@@ -123,7 +123,7 @@ func TestGetReturnsSavedTransaction(t *testing.T) {
assert.NoError(t, err, "Failed to associate transaction with user")
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
request := createQuery(*userID, gomatrixserverlib.RelayEntry{})
@@ -186,7 +186,7 @@ func TestGetReturnsMultipleSavedTransactions(t *testing.T) {
assert.NoError(t, err, "Failed to associate transaction with user")
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
request := createQuery(*userID, gomatrixserverlib.RelayEntry{})
diff --git a/relayapi/routing/routing.go b/relayapi/routing/routing.go
index 6df0cdc5..c908e950 100644
--- a/relayapi/routing/routing.go
+++ b/relayapi/routing/routing.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/sirupsen/logrus"
)
// Setup registers HTTP handlers with the given ServeMux.
@@ -48,6 +49,14 @@ func Setup(
v1fedmux.Handle("/send_relay/{txnID}/{userID}", MakeRelayAPI(
"send_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
+ logrus.Infof("Handling send_relay from: %s", request.Origin())
+ if !relayAPI.RelayingEnabled() {
+ logrus.Warnf("Dropping send_relay from: %s", request.Origin())
+ return util.JSONResponse{
+ Code: http.StatusNotFound,
+ }
+ }
+
userID, err := gomatrixserverlib.NewUserID(vars["userID"], false)
if err != nil {
return util.JSONResponse{
@@ -65,6 +74,14 @@ func Setup(
v1fedmux.Handle("/relay_txn/{userID}", MakeRelayAPI(
"get_relay_transaction", "", cfg.Matrix.IsLocalServerName, keys,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
+ logrus.Infof("Handling relay_txn from: %s", request.Origin())
+ if !relayAPI.RelayingEnabled() {
+ logrus.Warnf("Dropping relay_txn from: %s", request.Origin())
+ return util.JSONResponse{
+ Code: http.StatusNotFound,
+ }
+ }
+
userID, err := gomatrixserverlib.NewUserID(vars["userID"], false)
if err != nil {
return util.JSONResponse{
diff --git a/relayapi/routing/sendrelay.go b/relayapi/routing/sendrelay.go
index a7027f29..85a722c8 100644
--- a/relayapi/routing/sendrelay.go
+++ b/relayapi/routing/sendrelay.go
@@ -25,7 +25,7 @@ import (
"github.com/sirupsen/logrus"
)
-// SendTransactionToRelay implements PUT /_matrix/federation/v1/relay_txn/{txnID}/{userID}
+// SendTransactionToRelay implements PUT /_matrix/federation/v1/send_relay/{txnID}/{userID}
// This endpoint can be extracted into a separate relay server service.
func SendTransactionToRelay(
httpReq *http.Request,
@@ -34,6 +34,8 @@ func SendTransactionToRelay(
txnID gomatrixserverlib.TransactionID,
userID gomatrixserverlib.UserID,
) util.JSONResponse {
+ logrus.Infof("Processing send_relay for %s", userID.Raw())
+
var txnEvents struct {
PDUs []json.RawMessage `json:"pdus"`
EDUs []gomatrixserverlib.EDU `json:"edus"`
diff --git a/relayapi/routing/sendrelay_test.go b/relayapi/routing/sendrelay_test.go
index d9ed7500..66594c47 100644
--- a/relayapi/routing/sendrelay_test.go
+++ b/relayapi/routing/sendrelay_test.go
@@ -72,7 +72,7 @@ func TestForwardEmptyReturnsOk(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, txn)
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
@@ -101,7 +101,7 @@ func TestForwardBadJSONReturnsError(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content)
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
@@ -135,7 +135,7 @@ func TestForwardTooManyPDUsReturnsError(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content)
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
@@ -169,7 +169,7 @@ func TestForwardTooManyEDUsReturnsError(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, content)
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
response := routing.SendTransactionToRelay(httpReq, &request, relayAPI, "1", *userID)
@@ -192,7 +192,7 @@ func TestUniqueTransactionStoredInDatabase(t *testing.T) {
request := createFederationRequest(*userID, txn.TransactionID, txn.Origin, txn.Destination, txn)
relayAPI := internal.NewRelayInternalAPI(
- &db, nil, nil, nil, nil, false, "",
+ &db, nil, nil, nil, nil, false, "", true,
)
response := routing.SendTransactionToRelay(