aboutsummaryrefslogtreecommitdiff
path: root/build
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-01-23 17:55:12 +0000
committerGitHub <noreply@github.com>2023-01-23 17:55:12 +0000
commit5b73592f5a4dddf64184fcbe33f4c1835c656480 (patch)
treeb6dac51b6be7a1e591f24881ee1bfae1b92088e9 /build
parent48fa869fa3578741d1d5775d30f24f6b097ab995 (diff)
Initial Store & Forward Implementation (#2917)
This adds store & forward relays into dendrite for p2p. A few things have changed: - new relay api serves new http endpoints for s&f federation - updated outbound federation queueing which will attempt to forward using s&f if appropriate - database entries to track s&f relays for other nodes
Diffstat (limited to 'build')
-rw-r--r--build/gobind-pinecone/monolith.go201
-rw-r--r--build/gobind-pinecone/monolith_test.go198
2 files changed, 361 insertions, 38 deletions
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go
index b8f8111d..ff61ea6c 100644
--- a/build/gobind-pinecone/monolith.go
+++ b/build/gobind-pinecone/monolith.go
@@ -41,13 +41,16 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
+ "github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
+ "github.com/matrix-org/dendrite/relayapi"
+ relayServerAPI "github.com/matrix-org/dendrite/relayapi/api"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/setup"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/test"
"github.com/matrix-org/dendrite/userapi"
userapiAPI "github.com/matrix-org/dendrite/userapi/api"
@@ -67,24 +70,27 @@ import (
)
const (
- PeerTypeRemote = pineconeRouter.PeerTypeRemote
- PeerTypeMulticast = pineconeRouter.PeerTypeMulticast
- PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth
- PeerTypeBonjour = pineconeRouter.PeerTypeBonjour
+ PeerTypeRemote = pineconeRouter.PeerTypeRemote
+ PeerTypeMulticast = pineconeRouter.PeerTypeMulticast
+ PeerTypeBluetooth = pineconeRouter.PeerTypeBluetooth
+ PeerTypeBonjour = pineconeRouter.PeerTypeBonjour
+ relayServerRetryInterval = time.Second * 30
)
type DendriteMonolith struct {
- logger logrus.Logger
- PineconeRouter *pineconeRouter.Router
- PineconeMulticast *pineconeMulticast.Multicast
- PineconeQUIC *pineconeSessions.Sessions
- PineconeManager *pineconeConnections.ConnectionManager
- StorageDirectory string
- CacheDirectory string
- listener net.Listener
- httpServer *http.Server
- processContext *process.ProcessContext
- userAPI userapiAPI.UserInternalAPI
+ logger logrus.Logger
+ baseDendrite *base.BaseDendrite
+ PineconeRouter *pineconeRouter.Router
+ PineconeMulticast *pineconeMulticast.Multicast
+ PineconeQUIC *pineconeSessions.Sessions
+ PineconeManager *pineconeConnections.ConnectionManager
+ StorageDirectory string
+ CacheDirectory string
+ listener net.Listener
+ httpServer *http.Server
+ userAPI userapiAPI.UserInternalAPI
+ federationAPI api.FederationInternalAPI
+ relayServersQueried map[gomatrixserverlib.ServerName]bool
}
func (m *DendriteMonolith) PublicKey() string {
@@ -326,6 +332,7 @@ func (m *DendriteMonolith) Start() {
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", filepath.Join(m.StorageDirectory, prefix)))
cfg.MediaAPI.BasePath = config.Path(filepath.Join(m.CacheDirectory, "media"))
cfg.MediaAPI.AbsBasePath = config.Path(filepath.Join(m.CacheDirectory, "media"))
+ cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(m.StorageDirectory, prefix)))
cfg.MSCs.MSCs = []string{"msc2836", "msc2946"}
cfg.ClientAPI.RegistrationDisabled = false
cfg.ClientAPI.OpenRegistrationWithoutVerificationEnabled = true
@@ -335,9 +342,9 @@ func (m *DendriteMonolith) Start() {
panic(err)
}
- base := base.NewBaseDendrite(cfg, "Monolith")
+ base := base.NewBaseDendrite(cfg, "Monolith", base.DisableMetrics)
+ m.baseDendrite = base
base.ConfigureAdminEndpoints()
- defer base.Close() // nolint: errcheck
federation := conn.CreateFederationClient(base, m.PineconeQUIC)
@@ -346,11 +353,11 @@ func (m *DendriteMonolith) Start() {
rsAPI := roomserver.NewInternalAPI(base)
- fsAPI := federationapi.NewInternalAPI(
+ m.federationAPI = federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, keyRing, true,
)
- keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI, rsAPI)
+ keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, m.federationAPI, rsAPI)
m.userAPI = userapi.NewInternalAPI(base, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(m.userAPI)
@@ -358,10 +365,24 @@ func (m *DendriteMonolith) Start() {
// The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency
- rsAPI.SetFederationAPI(fsAPI, keyRing)
+ rsAPI.SetFederationAPI(m.federationAPI, keyRing)
userProvider := users.NewPineconeUserProvider(m.PineconeRouter, m.PineconeQUIC, m.userAPI, federation)
- roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, fsAPI, federation)
+ roomProvider := rooms.NewPineconeRoomProvider(m.PineconeRouter, m.PineconeQUIC, m.federationAPI, federation)
+
+ js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream)
+ producer := &producers.SyncAPIProducer{
+ JetStream: js,
+ TopicReceiptEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ TopicTypingEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ TopicPresenceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ TopicDeviceListUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
+ TopicSigningKeyUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
+ Config: &base.Cfg.FederationAPI,
+ UserAPI: m.userAPI,
+ }
+ relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer)
monolith := setup.Monolith{
Config: base.Cfg,
@@ -370,10 +391,11 @@ func (m *DendriteMonolith) Start() {
KeyRing: keyRing,
AppserviceAPI: asAPI,
- FederationAPI: fsAPI,
+ FederationAPI: m.federationAPI,
RoomserverAPI: rsAPI,
UserAPI: m.userAPI,
KeyAPI: keyAPI,
+ RelayAPI: relayAPI,
ExtPublicRoomsProvider: roomProvider,
ExtUserDirectoryProvider: userProvider,
}
@@ -411,8 +433,6 @@ func (m *DendriteMonolith) Start() {
Handler: h2c.NewHandler(pMux, h2s),
}
- m.processContext = base.ProcessContext
-
go func() {
m.logger.Info("Listening on ", cfg.Global.ServerName)
@@ -420,7 +440,7 @@ func (m *DendriteMonolith) Start() {
case net.ErrClosed, http.ErrServerClosed:
m.logger.Info("Stopped listening on ", cfg.Global.ServerName)
default:
- m.logger.Fatal(err)
+ m.logger.Error("Stopped listening on ", cfg.Global.ServerName)
}
}()
go func() {
@@ -430,33 +450,44 @@ func (m *DendriteMonolith) Start() {
case net.ErrClosed, http.ErrServerClosed:
m.logger.Info("Stopped listening on ", cfg.Global.ServerName)
default:
- m.logger.Fatal(err)
+ m.logger.Error("Stopped listening on ", cfg.Global.ServerName)
}
}()
go func(ch <-chan pineconeEvents.Event) {
eLog := logrus.WithField("pinecone", "events")
+ stopRelayServerSync := make(chan bool)
+
+ relayRetriever := RelayServerRetriever{
+ Context: context.Background(),
+ ServerName: gomatrixserverlib.ServerName(m.PineconeRouter.PublicKey().String()),
+ FederationAPI: m.federationAPI,
+ relayServersQueried: make(map[gomatrixserverlib.ServerName]bool),
+ RelayAPI: monolith.RelayAPI,
+ running: *atomic.NewBool(false),
+ }
+ relayRetriever.InitializeRelayServers(eLog)
for event := range ch {
switch e := event.(type) {
case pineconeEvents.PeerAdded:
+ if !relayRetriever.running.Load() {
+ go relayRetriever.SyncRelayServers(stopRelayServerSync)
+ }
case pineconeEvents.PeerRemoved:
- case pineconeEvents.TreeParentUpdate:
- case pineconeEvents.SnakeDescUpdate:
- case pineconeEvents.TreeRootAnnUpdate:
- case pineconeEvents.SnakeEntryAdded:
- case pineconeEvents.SnakeEntryRemoved:
+ if relayRetriever.running.Load() && m.PineconeRouter.TotalPeerCount() == 0 {
+ stopRelayServerSync <- true
+ }
case pineconeEvents.BroadcastReceived:
- eLog.Info("Broadcast received from: ", e.PeerID)
+ // eLog.Info("Broadcast received from: ", e.PeerID)
req := &api.PerformWakeupServersRequest{
ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)},
}
res := &api.PerformWakeupServersResponse{}
- if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
- logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID)
+ if err := m.federationAPI.PerformWakeupServers(base.Context(), req, res); err != nil {
+ eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID)
}
- case pineconeEvents.BandwidthReport:
default:
}
}
@@ -464,12 +495,106 @@ func (m *DendriteMonolith) Start() {
}
func (m *DendriteMonolith) Stop() {
- m.processContext.ShutdownDendrite()
+ m.baseDendrite.Close()
+ m.baseDendrite.WaitForShutdown()
_ = m.listener.Close()
m.PineconeMulticast.Stop()
_ = m.PineconeQUIC.Close()
_ = m.PineconeRouter.Close()
- m.processContext.WaitForComponentsToFinish()
+}
+
+type RelayServerRetriever struct {
+ Context context.Context
+ ServerName gomatrixserverlib.ServerName
+ FederationAPI api.FederationInternalAPI
+ RelayAPI relayServerAPI.RelayInternalAPI
+ relayServersQueried map[gomatrixserverlib.ServerName]bool
+ queriedServersMutex sync.Mutex
+ running atomic.Bool
+}
+
+func (m *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) {
+ request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(m.ServerName)}
+ response := api.P2PQueryRelayServersResponse{}
+ err := m.FederationAPI.P2PQueryRelayServers(m.Context, &request, &response)
+ if err != nil {
+ eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error())
+ }
+ for _, server := range response.RelayServers {
+ m.relayServersQueried[server] = false
+ }
+
+ eLog.Infof("Registered relay servers: %v", response.RelayServers)
+}
+
+func (m *RelayServerRetriever) SyncRelayServers(stop <-chan bool) {
+ defer m.running.Store(false)
+
+ t := time.NewTimer(relayServerRetryInterval)
+ for {
+ relayServersToQuery := []gomatrixserverlib.ServerName{}
+ func() {
+ m.queriedServersMutex.Lock()
+ defer m.queriedServersMutex.Unlock()
+ for server, complete := range m.relayServersQueried {
+ if !complete {
+ relayServersToQuery = append(relayServersToQuery, server)
+ }
+ }
+ }()
+ if len(relayServersToQuery) == 0 {
+ // All relay servers have been synced.
+ return
+ }
+ m.queryRelayServers(relayServersToQuery)
+ t.Reset(relayServerRetryInterval)
+
+ select {
+ case <-stop:
+ if !t.Stop() {
+ <-t.C
+ }
+ return
+ case <-t.C:
+ }
+ }
+}
+
+func (m *RelayServerRetriever) GetQueriedServerStatus() map[gomatrixserverlib.ServerName]bool {
+ m.queriedServersMutex.Lock()
+ defer m.queriedServersMutex.Unlock()
+
+ result := map[gomatrixserverlib.ServerName]bool{}
+ for server, queried := range m.relayServersQueried {
+ result[server] = queried
+ }
+ return result
+}
+
+func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) {
+ logrus.Info("querying relay servers for any available transactions")
+ for _, server := range relayServers {
+ userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false)
+ if err != nil {
+ return
+ }
+ err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server)
+ if err == nil {
+ func() {
+ m.queriedServersMutex.Lock()
+ defer m.queriedServersMutex.Unlock()
+ m.relayServersQueried[server] = true
+ }()
+ // TODO : What happens if your relay receives new messages after this point?
+ // Should you continue to check with them, or should they try and contact you?
+ // They could send a "new_async_events" message your way maybe?
+ // Then you could mark them as needing to be queried again.
+ // What if you miss this message?
+ // Maybe you should try querying them again after a certain period of time as a backup?
+ } else {
+ logrus.Errorf("Failed querying relay server: %s", err.Error())
+ }
+ }
}
const MaxFrameSize = types.MaxFrameSize
diff --git a/build/gobind-pinecone/monolith_test.go b/build/gobind-pinecone/monolith_test.go
new file mode 100644
index 00000000..edcf22bb
--- /dev/null
+++ b/build/gobind-pinecone/monolith_test.go
@@ -0,0 +1,198 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package gobind
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "testing"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationapi/api"
+ relayServerAPI "github.com/matrix-org/dendrite/relayapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/assert"
+ "gotest.tools/v3/poll"
+)
+
+var TestBuf = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+
+type TestNetConn struct {
+ net.Conn
+ shouldFail bool
+}
+
+func (t *TestNetConn) Read(b []byte) (int, error) {
+ if t.shouldFail {
+ return 0, fmt.Errorf("Failed")
+ } else {
+ n := copy(b, TestBuf)
+ return n, nil
+ }
+}
+
+func (t *TestNetConn) Write(b []byte) (int, error) {
+ if t.shouldFail {
+ return 0, fmt.Errorf("Failed")
+ } else {
+ return len(b), nil
+ }
+}
+
+func (t *TestNetConn) Close() error {
+ if t.shouldFail {
+ return fmt.Errorf("Failed")
+ } else {
+ return nil
+ }
+}
+
+func TestConduitStoresPort(t *testing.T) {
+ conduit := Conduit{port: 7}
+ assert.Equal(t, 7, conduit.Port())
+}
+
+func TestConduitRead(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ b := make([]byte, len(TestBuf))
+ bytes, err := conduit.Read(b)
+ assert.NoError(t, err)
+ assert.Equal(t, len(TestBuf), bytes)
+ assert.Equal(t, TestBuf, b)
+}
+
+func TestConduitReadCopy(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ result, err := conduit.ReadCopy()
+ assert.NoError(t, err)
+ assert.Equal(t, TestBuf, result)
+}
+
+func TestConduitWrite(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ bytes, err := conduit.Write(TestBuf)
+ assert.NoError(t, err)
+ assert.Equal(t, len(TestBuf), bytes)
+}
+
+func TestConduitClose(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ err := conduit.Close()
+ assert.NoError(t, err)
+ assert.True(t, conduit.closed.Load())
+}
+
+func TestConduitReadClosed(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ err := conduit.Close()
+ assert.NoError(t, err)
+ b := make([]byte, len(TestBuf))
+ _, err = conduit.Read(b)
+ assert.Error(t, err)
+}
+
+func TestConduitReadCopyClosed(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ err := conduit.Close()
+ assert.NoError(t, err)
+ _, err = conduit.ReadCopy()
+ assert.Error(t, err)
+}
+
+func TestConduitWriteClosed(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{}}
+ err := conduit.Close()
+ assert.NoError(t, err)
+ _, err = conduit.Write(TestBuf)
+ assert.Error(t, err)
+}
+
+func TestConduitReadCopyFails(t *testing.T) {
+ conduit := Conduit{conn: &TestNetConn{shouldFail: true}}
+ _, err := conduit.ReadCopy()
+ assert.Error(t, err)
+}
+
+var testRelayServers = []gomatrixserverlib.ServerName{"relay1", "relay2"}
+
+type FakeFedAPI struct {
+ api.FederationInternalAPI
+}
+
+func (f *FakeFedAPI) P2PQueryRelayServers(ctx context.Context, req *api.P2PQueryRelayServersRequest, res *api.P2PQueryRelayServersResponse) error {
+ res.RelayServers = testRelayServers
+ return nil
+}
+
+type FakeRelayAPI struct {
+ relayServerAPI.RelayInternalAPI
+}
+
+func (r *FakeRelayAPI) PerformRelayServerSync(ctx context.Context, userID gomatrixserverlib.UserID, relayServer gomatrixserverlib.ServerName) error {
+ return nil
+}
+
+func TestRelayRetrieverInitialization(t *testing.T) {
+ retriever := RelayServerRetriever{
+ Context: context.Background(),
+ ServerName: "server",
+ relayServersQueried: make(map[gomatrixserverlib.ServerName]bool),
+ FederationAPI: &FakeFedAPI{},
+ RelayAPI: &FakeRelayAPI{},
+ }
+
+ retriever.InitializeRelayServers(logrus.WithField("test", "relay"))
+ relayServers := retriever.GetQueriedServerStatus()
+ assert.Equal(t, 2, len(relayServers))
+}
+
+func TestRelayRetrieverSync(t *testing.T) {
+ retriever := RelayServerRetriever{
+ Context: context.Background(),
+ ServerName: "server",
+ relayServersQueried: make(map[gomatrixserverlib.ServerName]bool),
+ FederationAPI: &FakeFedAPI{},
+ RelayAPI: &FakeRelayAPI{},
+ }
+
+ retriever.InitializeRelayServers(logrus.WithField("test", "relay"))
+ relayServers := retriever.GetQueriedServerStatus()
+ assert.Equal(t, 2, len(relayServers))
+
+ stopRelayServerSync := make(chan bool)
+ go retriever.SyncRelayServers(stopRelayServerSync)
+
+ check := func(log poll.LogT) poll.Result {
+ relayServers := retriever.GetQueriedServerStatus()
+ for _, queried := range relayServers {
+ if !queried {
+ return poll.Continue("waiting for all servers to be queried")
+ }
+ }
+
+ stopRelayServerSync <- true
+ return poll.Success()
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(5*time.Second), poll.WithDelay(100*time.Millisecond))
+}
+
+func TestMonolithStarts(t *testing.T) {
+ monolith := DendriteMonolith{}
+ monolith.Start()
+ monolith.PublicKey()
+ monolith.Stop()
+}