aboutsummaryrefslogtreecommitdiff
path: root/cmd/dendritejs
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-03 12:59:00 +0100
committerGitHub <noreply@github.com>2020-07-03 12:59:00 +0100
commit6c4b8185d7f9b4f66cc673fd13c448dff53472c0 (patch)
tree4c544d155e8c55df7d18f45beb15e473a11f8d0a /cmd/dendritejs
parent1773fd84b7634a1655e78ee5fe31e6235ed6240c (diff)
Implement ExtraPublicRoomsProvider for p2p demos (#1180)
* Change API and rename to ExtraPublicRoomsProvider * Make dendritejs work again * Maybe make libp2p demo work again * Linting
Diffstat (limited to 'cmd/dendritejs')
-rw-r--r--cmd/dendritejs/main.go2
-rw-r--r--cmd/dendritejs/publicrooms.go87
2 files changed, 86 insertions, 3 deletions
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index 6e2bdafe..1443bc18 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -211,7 +211,7 @@ func main() {
)
fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing)
rsAPI.SetFederationSenderAPI(fedSenderAPI)
- p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI)
+ p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
diff --git a/cmd/dendritejs/publicrooms.go b/cmd/dendritejs/publicrooms.go
index 5032bc15..a4623ba3 100644
--- a/cmd/dendritejs/publicrooms.go
+++ b/cmd/dendritejs/publicrooms.go
@@ -18,22 +18,29 @@ package main
import (
"context"
+ "sync"
+ "time"
"github.com/matrix-org/dendrite/federationsender/api"
go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
)
type libp2pPublicRoomsProvider struct {
node *go_http_js_libp2p.P2pLocalNode
providers []go_http_js_libp2p.PeerInfo
fedSender api.FederationSenderInternalAPI
+ fedClient *gomatrixserverlib.FederationClient
}
-func NewLibP2PPublicRoomsProvider(node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI) *libp2pPublicRoomsProvider {
+func NewLibP2PPublicRoomsProvider(
+ node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient,
+) *libp2pPublicRoomsProvider {
p := &libp2pPublicRoomsProvider{
node: node,
fedSender: fedSender,
+ fedClient: fedClient,
}
node.RegisterFoundProviders(p.foundProviders)
return p
@@ -62,10 +69,86 @@ func (p *libp2pPublicRoomsProvider) foundProviders(peerInfos []go_http_js_libp2p
p.providers = peerInfos
}
-func (p *libp2pPublicRoomsProvider) Homeservers() []string {
+func (p *libp2pPublicRoomsProvider) Rooms() []gomatrixserverlib.PublicRoom {
+ return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.homeservers())
+}
+
+func (p *libp2pPublicRoomsProvider) homeservers() []string {
result := make([]string, len(p.providers))
for i := range p.providers {
result[i] = p.providers[i].Id
}
return result
}
+
+// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
+// Returns a list of public rooms.
+func bulkFetchPublicRoomsFromServers(
+ ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string,
+) (publicRooms []gomatrixserverlib.PublicRoom) {
+ limit := 200
+ // follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
+ // goroutines send rooms to this channel
+ roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
+ // signalling channel to tell goroutines to stop sending rooms and quit
+ done := make(chan bool)
+ // signalling to say when we can close the room channel
+ var wg sync.WaitGroup
+ wg.Add(len(homeservers))
+ // concurrently query for public rooms
+ for _, hs := range homeservers {
+ go func(homeserverDomain string) {
+ defer wg.Done()
+ util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
+ fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "")
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
+ "bulkFetchPublicRoomsFromServers: failed to query hs",
+ )
+ return
+ }
+ for _, room := range fres.Chunk {
+ // atomically send a room or stop
+ select {
+ case roomCh <- room:
+ case <-done:
+ util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
+ return
+ }
+ }
+ }(hs)
+ }
+
+ // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
+ // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
+ // closed.
+ go func() {
+ wg.Wait()
+ util.GetLogger(ctx).Info("Cleaning up resources")
+ close(roomCh)
+ }()
+
+ // fan-in results with timeout. We stop when we reach the limit.
+FanIn:
+ for len(publicRooms) < int(limit) || limit == 0 {
+ // add a room or timeout
+ select {
+ case room, ok := <-roomCh:
+ if !ok {
+ util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
+ break FanIn
+ }
+ publicRooms = append(publicRooms, room)
+ case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got.
+ util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early")
+ break FanIn
+ case <-ctx.Done(): // the client hung up on us, let's stop.
+ util.GetLogger(ctx).Info("Client hung up, returning early")
+ break FanIn
+ }
+ }
+ // tell goroutines to stop
+ close(done)
+
+ return publicRooms
+}