diff options
author | Kegsay <kegan@matrix.org> | 2020-07-03 12:59:00 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-03 12:59:00 +0100 |
commit | 6c4b8185d7f9b4f66cc673fd13c448dff53472c0 (patch) | |
tree | 4c544d155e8c55df7d18f45beb15e473a11f8d0a /cmd/dendritejs | |
parent | 1773fd84b7634a1655e78ee5fe31e6235ed6240c (diff) |
Implement ExtraPublicRoomsProvider for p2p demos (#1180)
* Change API and rename to ExtraPublicRoomsProvider
* Make dendritejs work again
* Maybe make libp2p demo work again
* Linting
Diffstat (limited to 'cmd/dendritejs')
-rw-r--r-- | cmd/dendritejs/main.go | 2 | ||||
-rw-r--r-- | cmd/dendritejs/publicrooms.go | 87 |
2 files changed, 86 insertions, 3 deletions
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 6e2bdafe..1443bc18 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -211,7 +211,7 @@ func main() { ) fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing) rsAPI.SetFederationSenderAPI(fedSenderAPI) - p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) + p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) diff --git a/cmd/dendritejs/publicrooms.go b/cmd/dendritejs/publicrooms.go index 5032bc15..a4623ba3 100644 --- a/cmd/dendritejs/publicrooms.go +++ b/cmd/dendritejs/publicrooms.go @@ -18,22 +18,29 @@ package main import ( "context" + "sync" + "time" "github.com/matrix-org/dendrite/federationsender/api" go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) type libp2pPublicRoomsProvider struct { node *go_http_js_libp2p.P2pLocalNode providers []go_http_js_libp2p.PeerInfo fedSender api.FederationSenderInternalAPI + fedClient *gomatrixserverlib.FederationClient } -func NewLibP2PPublicRoomsProvider(node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI) *libp2pPublicRoomsProvider { +func NewLibP2PPublicRoomsProvider( + node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient, +) *libp2pPublicRoomsProvider { p := &libp2pPublicRoomsProvider{ node: node, fedSender: fedSender, + fedClient: fedClient, } node.RegisterFoundProviders(p.foundProviders) return p @@ -62,10 +69,86 @@ func (p *libp2pPublicRoomsProvider) foundProviders(peerInfos []go_http_js_libp2p p.providers = peerInfos } -func (p *libp2pPublicRoomsProvider) Homeservers() []string { +func (p *libp2pPublicRoomsProvider) Rooms() []gomatrixserverlib.PublicRoom { + return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.homeservers()) +} + +func (p *libp2pPublicRoomsProvider) homeservers() []string { result := make([]string, len(p.providers)) for i := range p.providers { result[i] = p.providers[i].Id } return result } + +// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. +// Returns a list of public rooms. +func bulkFetchPublicRoomsFromServers( + ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string, +) (publicRooms []gomatrixserverlib.PublicRoom) { + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + for _, hs := range homeservers { + go func(homeserverDomain string) { + defer wg.Done() + util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") + fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "") + if err != nil { + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchPublicRoomsFromServers: failed to query hs", + ) + return + } + for _, room := range fres.Chunk { + // atomically send a room or stop + select { + case roomCh <- room: + case <-done: + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") + return + } + } + }(hs) + } + + // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. + // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be + // closed. + go func() { + wg.Wait() + util.GetLogger(ctx).Info("Cleaning up resources") + close(roomCh) + }() + + // fan-in results with timeout. We stop when we reach the limit. +FanIn: + for len(publicRooms) < int(limit) || limit == 0 { + // add a room or timeout + select { + case room, ok := <-roomCh: + if !ok { + util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") + break FanIn + } + publicRooms = append(publicRooms, room) + case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got. + util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early") + break FanIn + case <-ctx.Done(): // the client hung up on us, let's stop. + util.GetLogger(ctx).Info("Client hung up, returning early") + break FanIn + } + } + // tell goroutines to stop + close(done) + + return publicRooms +} |