diff options
author | devonh <devon.dmytro@gmail.com> | 2023-01-23 17:55:12 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-23 17:55:12 +0000 |
commit | 5b73592f5a4dddf64184fcbe33f4c1835c656480 (patch) | |
tree | b6dac51b6be7a1e591f24881ee1bfae1b92088e9 /cmd | |
parent | 48fa869fa3578741d1d5775d30f24f6b097ab995 (diff) |
Initial Store & Forward Implementation (#2917)
This adds store & forward relays into dendrite for p2p.
A few things have changed:
- new relay api serves new http endpoints for s&f federation
- updated outbound federation queueing which will attempt to forward
using s&f if appropriate
- database entries to track s&f relays for other nodes
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/dendrite-demo-pinecone/ARCHITECTURE.md | 59 | ||||
-rw-r--r-- | cmd/dendrite-demo-pinecone/README.md | 39 | ||||
-rw-r--r-- | cmd/dendrite-demo-pinecone/main.go | 125 |
3 files changed, 215 insertions, 8 deletions
diff --git a/cmd/dendrite-demo-pinecone/ARCHITECTURE.md b/cmd/dendrite-demo-pinecone/ARCHITECTURE.md new file mode 100644 index 00000000..1b094105 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/ARCHITECTURE.md @@ -0,0 +1,59 @@ +## Relay Server Architecture + +Relay Servers function similar to the way physical mail drop boxes do. +A node can have many associated relay servers. Matrix events can be sent to them instead of to the destination node, and the destination node will eventually retrieve them from the relay server. +Nodes that want to send events to an offline node need to know what relay servers are associated with their intended destination. +Currently this is manually configured in the dendrite database. In the future this information could be configurable in the app and shared automatically via other means. + +Currently events are sent as complete Matrix Transactions. +Transactions include a list of PDUs, (which contain, among other things, lists of authorization events, previous events, and signatures) a list of EDUs, and other information about the transaction. +There is no additional information sent along with the transaction other than what is typically added to them during Matrix federation today. +In the future this will probably need to change in order to handle more complex room state resolution during p2p usage. + +### Relay Server Architecture + +``` + 0 +--------------------+ + +----------------------------------------+ | P2P Node A | + | Relay Server | | +--------+ | + | | | | Client | | + | +--------------------+ | | +--------+ | + | | Relay Server API | | | | | + | | | | | V | + | .--------. 2 | +-------------+ | | 1 | +------------+ | + | |`--------`| <----- | Forwarder | <------------- | Homeserver | | + | | Database | | +-------------+ | | | +------------+ | + | `----------` | | | +--------------------+ + | ^ | | | + | | 4 | +-------------+ | | + | `------------ | Retriever | <------. +--------------------+ + | | +-------------+ | | | | P2P Node B | + | | | | | | +--------+ | + | +--------------------+ | | | | Client | | + | | | | +--------+ | + +----------------------------------------+ | | | | + | | V | + 3 | | +------------+ | + `------ | Homeserver | | + | +------------+ | + +--------------------+ +``` + +- 0: This relay server is currently only acting on behalf of `P2P Node B`. It will only receive, and later forward events that are destined for `P2P Node B`. +- 1: When `P2P Node A` fails sending directly to `P2P Node B` (after a configurable number of attempts), it checks for any known relay servers associated with `P2P Node B` and sends to all of them. + - If sending to any of the relay servers succeeds, that transaction is considered to be successfully sent. +- 2: The relay server `forwarder` stores the transaction json in its database and marks it as destined for `P2P Node B`. +- 3: When `P2P Node B` comes online, it queries all its relay servers for any missed messages. +- 4: The relay server `retriever` will look in its database for any transactions that are destined for `P2P Node B` and returns them one at a time. + +For now, it is important that we don’t design out a hybrid approach of having both sender-side and recipient-side relay servers. +Both approaches make sense and determining which makes for a better experience depends on the use case. + +#### Sender-Side Relay Servers + +If we are running around truly ad-hoc, and I don't know when or where you will be able to pick up messages, then having a sender designated server makes sense to give things the best chance at making their way to the destination. +But in order to achieve this, you are either relying on p2p presence broadcasts for the relay to know when to try forwarding (which means you are in a pretty small network), or the relay just keeps on periodically attempting to forward to the destination which will lead to a lot of extra traffic on the network. + +#### Recipient-Side Relay Servers + +If we have agreed to some static relay server before going off and doing other things, or if we are talking about more global p2p federation, then having a recipient designated relay server can cut down on redundant traffic since it will sit there idle until the recipient pulls events from it. diff --git a/cmd/dendrite-demo-pinecone/README.md b/cmd/dendrite-demo-pinecone/README.md index d6dd9590..5cacd092 100644 --- a/cmd/dendrite-demo-pinecone/README.md +++ b/cmd/dendrite-demo-pinecone/README.md @@ -24,3 +24,42 @@ Then point your favourite Matrix client to the homeserver URL`http://localhost: If your peering connection is operational then you should see a `Connected TCP:` line in the log output. If not then try a different peer. Once logged in, you should be able to open the room directory or join a room by its ID. + +## Store & Forward Relays + +To test out the store & forward relay functionality, you need a minimum of 3 instances. +One instance will act as the relay, and the other two instances will be the users trying to communicate. +Then you can send messages between the two nodes and watch as the relay is used if the receiving node is offline. + +### Launching the Nodes + +Relay Server: +``` +go run cmd/dendrite-demo-pinecone/main.go -dir relay/ -listen "[::]:49000" +``` + +Node 1: +``` +go run cmd/dendrite-demo-pinecone/main.go -dir node-1/ -peer "[::]:49000" -port 8007 +``` + +Node 2: +``` +go run cmd/dendrite-demo-pinecone/main.go -dir node-2/ -peer "[::]:49000" -port 8009 +``` + +### Database Setup + +At the moment, the database must be manually configured. +For both `Node 1` and `Node 2` add the following entries to their respective `relay_server` table in the federationapi database: +``` +server_name: {node_1_public_key}, relay_server_name: {relay_public_key} +server_name: {node_2_public_key}, relay_server_name: {relay_public_key} +``` + +After editing the database you will need to relaunch the nodes for the changes to be picked up by dendrite. + +### Testing + +Now you can run two separate instances of element and connect them to `Node 1` and `Node 2`. +You can shutdown one of the nodes and continue sending messages. If you wait long enough, the message will be sent to the relay server. (you can see this in the log output of the relay server) diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 3f627b41..a813c37a 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -38,16 +38,21 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationapi/api" + "github.com/matrix-org/dendrite/federationapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/relayapi" + relayServerAPI "github.com/matrix-org/dendrite/relayapi/api" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/test" "github.com/matrix-org/dendrite/userapi" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" @@ -66,6 +71,8 @@ var ( instanceDir = flag.String("dir", ".", "the directory to store the databases in (if --config not specified)") ) +const relayServerRetryInterval = time.Second * 30 + // nolint:gocyclo func main() { flag.Parse() @@ -139,6 +146,7 @@ func main() { cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", filepath.Join(*instanceDir, *instanceName))) cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", filepath.Join(*instanceDir, *instanceName))) cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationapi.db", filepath.Join(*instanceDir, *instanceName))) + cfg.RelayAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-relayapi.db", filepath.Join(*instanceDir, *instanceName))) cfg.MSCs.MSCs = []string{"msc2836", "msc2946"} cfg.MSCs.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mscs.db", filepath.Join(*instanceDir, *instanceName))) cfg.ClientAPI.RegistrationDisabled = false @@ -224,6 +232,20 @@ func main() { userProvider := users.NewPineconeUserProvider(pRouter, pQUIC, userAPI, federation) roomProvider := rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation) + js, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) + producer := &producers.SyncAPIProducer{ + JetStream: js, + TopicReceiptEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent), + TopicSendToDeviceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent), + TopicTypingEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent), + TopicPresenceEvent: base.Cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent), + TopicDeviceListUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate), + TopicSigningKeyUpdate: base.Cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate), + Config: &base.Cfg.FederationAPI, + UserAPI: userAPI, + } + relayAPI := relayapi.NewRelayInternalAPI(base, federation, rsAPI, keyRing, producer) + monolith := setup.Monolith{ Config: base.Cfg, Client: conn.CreateClient(base, pQUIC), @@ -235,6 +257,7 @@ func main() { RoomserverAPI: rsAPI, UserAPI: userAPI, KeyAPI: keyAPI, + RelayAPI: relayAPI, ExtPublicRoomsProvider: roomProvider, ExtUserDirectoryProvider: userProvider, } @@ -305,27 +328,38 @@ func main() { go func(ch <-chan pineconeEvents.Event) { eLog := logrus.WithField("pinecone", "events") + relayServerSyncRunning := atomic.NewBool(false) + stopRelayServerSync := make(chan bool) + + m := RelayServerRetriever{ + Context: context.Background(), + ServerName: gomatrixserverlib.ServerName(pRouter.PublicKey().String()), + FederationAPI: fsAPI, + RelayServersQueried: make(map[gomatrixserverlib.ServerName]bool), + RelayAPI: monolith.RelayAPI, + } + m.InitializeRelayServers(eLog) for event := range ch { switch e := event.(type) { case pineconeEvents.PeerAdded: + if !relayServerSyncRunning.Load() { + go m.syncRelayServers(stopRelayServerSync, *relayServerSyncRunning) + } case pineconeEvents.PeerRemoved: - case pineconeEvents.TreeParentUpdate: - case pineconeEvents.SnakeDescUpdate: - case pineconeEvents.TreeRootAnnUpdate: - case pineconeEvents.SnakeEntryAdded: - case pineconeEvents.SnakeEntryRemoved: + if relayServerSyncRunning.Load() && pRouter.TotalPeerCount() == 0 { + stopRelayServerSync <- true + } case pineconeEvents.BroadcastReceived: - eLog.Info("Broadcast received from: ", e.PeerID) + // eLog.Info("Broadcast received from: ", e.PeerID) req := &api.PerformWakeupServersRequest{ ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, } res := &api.PerformWakeupServersResponse{} if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { - logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID) + eLog.WithError(err).Error("Failed to wakeup destination", e.PeerID) } - case pineconeEvents.BandwidthReport: default: } } @@ -333,3 +367,78 @@ func main() { base.WaitForShutdown() } + +type RelayServerRetriever struct { + Context context.Context + ServerName gomatrixserverlib.ServerName + FederationAPI api.FederationInternalAPI + RelayServersQueried map[gomatrixserverlib.ServerName]bool + RelayAPI relayServerAPI.RelayInternalAPI +} + +func (m *RelayServerRetriever) InitializeRelayServers(eLog *logrus.Entry) { + request := api.P2PQueryRelayServersRequest{Server: gomatrixserverlib.ServerName(m.ServerName)} + response := api.P2PQueryRelayServersResponse{} + err := m.FederationAPI.P2PQueryRelayServers(m.Context, &request, &response) + if err != nil { + eLog.Warnf("Failed obtaining list of this node's relay servers: %s", err.Error()) + } + for _, server := range response.RelayServers { + m.RelayServersQueried[server] = false + } + + eLog.Infof("Registered relay servers: %v", response.RelayServers) +} + +func (m *RelayServerRetriever) syncRelayServers(stop <-chan bool, running atomic.Bool) { + defer running.Store(false) + + t := time.NewTimer(relayServerRetryInterval) + for { + relayServersToQuery := []gomatrixserverlib.ServerName{} + for server, complete := range m.RelayServersQueried { + if !complete { + relayServersToQuery = append(relayServersToQuery, server) + } + } + if len(relayServersToQuery) == 0 { + // All relay servers have been synced. + return + } + m.queryRelayServers(relayServersToQuery) + t.Reset(relayServerRetryInterval) + + select { + case <-stop: + // We have been asked to stop syncing, drain the timer and return. + if !t.Stop() { + <-t.C + } + return + case <-t.C: + // The timer has expired. Continue to the next loop iteration. + } + } +} + +func (m *RelayServerRetriever) queryRelayServers(relayServers []gomatrixserverlib.ServerName) { + logrus.Info("querying relay servers for any available transactions") + for _, server := range relayServers { + userID, err := gomatrixserverlib.NewUserID("@user:"+string(m.ServerName), false) + if err != nil { + return + } + err = m.RelayAPI.PerformRelayServerSync(context.Background(), *userID, server) + if err == nil { + m.RelayServersQueried[server] = true + // TODO : What happens if your relay receives new messages after this point? + // Should you continue to check with them, or should they try and contact you? + // They could send a "new_async_events" message your way maybe? + // Then you could mark them as needing to be queried again. + // What if you miss this message? + // Maybe you should try querying them again after a certain period of time as a backup? + } else { + logrus.Errorf("Failed querying relay server: %s", err.Error()) + } + } +} |