diff options
author | devonh <devon.dmytro@gmail.com> | 2022-11-18 00:29:23 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-18 00:29:23 +0000 |
commit | a8e7ffc7ab147ebced766da8e0e1ebb1d75f846a (patch) | |
tree | 644a90bcf421b3f7a67c860484c991870e697e24 /cmd | |
parent | ffd8e21ce52bbf542e0e2ed74f032af6a163c56c (diff) |
Add p2p wakeup broadcast handling to pinecone demos (#2841)
Adds wakeup broadcast handling to the pinecone demos.
This will reset their blacklist status and interrupt any ongoing
federation queue backoffs currently in progress for this peer.
The end result is that any queued events will quickly be sent to the
peer if they had disconnected while attempting to send events to them.
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/dendrite-demo-pinecone/main.go | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 6c719a1e..421b17d5 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -37,6 +37,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" @@ -51,6 +52,7 @@ import ( pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeEvents "github.com/matrix-org/pinecone/router/events" pineconeSessions "github.com/matrix-org/pinecone/sessions" "github.com/sirupsen/logrus" @@ -155,7 +157,12 @@ func main() { base := base.NewBaseDendrite(cfg, "Monolith") defer base.Close() // nolint: errcheck + pineconeEventChannel := make(chan pineconeEvents.Event) pRouter := pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) + pRouter.EnableHopLimiting() + pRouter.EnableWakeupBroadcasts() + pRouter.Subscribe(pineconeEventChannel) + pQUIC := pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), pRouter, []string{"matrix"}) pMulticast := pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), pRouter) pManager := pineconeConnections.NewConnectionManager(pRouter, nil) @@ -293,5 +300,33 @@ func main() { logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter)) }() + go func(ch <-chan pineconeEvents.Event) { + eLog := logrus.WithField("pinecone", "events") + + for event := range ch { + switch e := event.(type) { + case pineconeEvents.PeerAdded: + case pineconeEvents.PeerRemoved: + case pineconeEvents.TreeParentUpdate: + case pineconeEvents.SnakeDescUpdate: + case pineconeEvents.TreeRootAnnUpdate: + case pineconeEvents.SnakeEntryAdded: + case pineconeEvents.SnakeEntryRemoved: + case pineconeEvents.BroadcastReceived: + eLog.Info("Broadcast received from: ", e.PeerID) + + req := &api.PerformWakeupServersRequest{ + ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, + } + res := &api.PerformWakeupServersResponse{} + if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { + logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID) + } + case pineconeEvents.BandwidthReport: + default: + } + } + }(pineconeEventChannel) + base.WaitForShutdown() } |