diff options
author | devonh <devon.dmytro@gmail.com> | 2022-11-18 00:29:23 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-18 00:29:23 +0000 |
commit | a8e7ffc7ab147ebced766da8e0e1ebb1d75f846a (patch) | |
tree | 644a90bcf421b3f7a67c860484c991870e697e24 /build | |
parent | ffd8e21ce52bbf542e0e2ed74f032af6a163c56c (diff) |
Add p2p wakeup broadcast handling to pinecone demos (#2841)
Adds wakeup broadcast handling to the pinecone demos.
This will reset their blacklist status and interrupt any ongoing
federation queue backoffs currently in progress for this peer.
The end result is that any queued events will quickly be sent to the
peer if they had disconnected while attempting to send events to them.
Diffstat (limited to 'build')
-rw-r--r-- | build/gobind-pinecone/monolith.go | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go index adb4e40a..9100ebf0 100644 --- a/build/gobind-pinecone/monolith.go +++ b/build/gobind-pinecone/monolith.go @@ -40,6 +40,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" "github.com/matrix-org/dendrite/roomserver" @@ -58,6 +59,7 @@ import ( pineconeConnections "github.com/matrix-org/pinecone/connections" pineconeMulticast "github.com/matrix-org/pinecone/multicast" pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeEvents "github.com/matrix-org/pinecone/router/events" pineconeSessions "github.com/matrix-org/pinecone/sessions" "github.com/matrix-org/pinecone/types" @@ -295,7 +297,12 @@ func (m *DendriteMonolith) Start() { m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) + pineconeEventChannel := make(chan pineconeEvents.Event) m.PineconeRouter = pineconeRouter.NewRouter(logrus.WithField("pinecone", "router"), sk) + m.PineconeRouter.EnableHopLimiting() + m.PineconeRouter.EnableWakeupBroadcasts() + m.PineconeRouter.Subscribe(pineconeEventChannel) + m.PineconeQUIC = pineconeSessions.NewSessions(logrus.WithField("pinecone", "sessions"), m.PineconeRouter, []string{"matrix"}) m.PineconeMulticast = pineconeMulticast.NewMulticast(logrus.WithField("pinecone", "multicast"), m.PineconeRouter) m.PineconeManager = pineconeConnections.NewConnectionManager(m.PineconeRouter, nil) @@ -423,6 +430,34 @@ func (m *DendriteMonolith) Start() { m.logger.Fatal(err) } }() + + go func(ch <-chan pineconeEvents.Event) { + eLog := logrus.WithField("pinecone", "events") + + for event := range ch { + switch e := event.(type) { + case pineconeEvents.PeerAdded: + case pineconeEvents.PeerRemoved: + case pineconeEvents.TreeParentUpdate: + case pineconeEvents.SnakeDescUpdate: + case pineconeEvents.TreeRootAnnUpdate: + case pineconeEvents.SnakeEntryAdded: + case pineconeEvents.SnakeEntryRemoved: + case pineconeEvents.BroadcastReceived: + eLog.Info("Broadcast received from: ", e.PeerID) + + req := &api.PerformWakeupServersRequest{ + ServerNames: []gomatrixserverlib.ServerName{gomatrixserverlib.ServerName(e.PeerID)}, + } + res := &api.PerformWakeupServersResponse{} + if err := fsAPI.PerformWakeupServers(base.Context(), req, res); err != nil { + logrus.WithError(err).Error("Failed to wakeup destination", e.PeerID) + } + case pineconeEvents.BandwidthReport: + default: + } + } + }(pineconeEventChannel) } func (m *DendriteMonolith) Stop() { |