diff options
author | Kegsay <kegan@matrix.org> | 2020-07-03 12:59:00 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-03 12:59:00 +0100 |
commit | 6c4b8185d7f9b4f66cc673fd13c448dff53472c0 (patch) | |
tree | 4c544d155e8c55df7d18f45beb15e473a11f8d0a /clientapi | |
parent | 1773fd84b7634a1655e78ee5fe31e6235ed6240c (diff) |
Implement ExtraPublicRoomsProvider for p2p demos (#1180)
* Change API and rename to ExtraPublicRoomsProvider
* Make dendritejs work again
* Maybe make libp2p demo work again
* Linting
Diffstat (limited to 'clientapi')
-rw-r--r-- | clientapi/api/api.go | 11 | ||||
-rw-r--r-- | clientapi/clientapi.go | 2 | ||||
-rw-r--r-- | clientapi/routing/directory_public.go | 294 | ||||
-rw-r--r-- | clientapi/routing/directory_public_test.go | 14 | ||||
-rw-r--r-- | clientapi/routing/routing.go | 8 |
5 files changed, 100 insertions, 229 deletions
diff --git a/clientapi/api/api.go b/clientapi/api/api.go index dae462c0..d96b032f 100644 --- a/clientapi/api/api.go +++ b/clientapi/api/api.go @@ -14,9 +14,10 @@ package api -type ExternalPublicRoomsProvider interface { - // The list of homeserver domains to query. These servers will receive a request - // via this API: https://matrix.org/docs/spec/server_server/latest#public-room-directory - // This will be called -on demand- by clients, so cache appropriately! - Homeservers() []string +import "github.com/matrix-org/gomatrixserverlib" + +// ExtraPublicRoomsProvider provides a way to inject extra published rooms into /publicRooms requests. +type ExtraPublicRoomsProvider interface { + // Rooms returns the extra rooms. This is called on-demand by clients, so cache appropriately. + Rooms() []gomatrixserverlib.PublicRoom } diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index bbce6dcc..029a73da 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,7 +48,7 @@ func AddPublicRoutes( transactionsCache *transactions.Cache, fsAPI federationSenderAPI.FederationSenderInternalAPI, userAPI userapi.UserInternalAPI, - extRoomsProvider api.ExternalPublicRoomsProvider, + extRoomsProvider api.ExtraPublicRoomsProvider, ) { syncProducer := &producers.SyncAPIProducer{ Producer: producer, diff --git a/clientapi/routing/directory_public.go b/clientapi/routing/directory_public.go index 64600cb4..925c1b8a 100644 --- a/clientapi/routing/directory_public.go +++ b/clientapi/routing/directory_public.go @@ -22,7 +22,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -33,6 +32,11 @@ import ( "github.com/matrix-org/util" ) +var ( + cacheMu sync.Mutex + publicRoomsCache []gomatrixserverlib.PublicRoom +) + type PublicRoomReq struct { Since string `json:"since,omitempty"` Limit int16 `json:"limit,omitempty"` @@ -46,13 +50,15 @@ type filter struct { // GetPostPublicRooms implements GET and POST /publicRooms func GetPostPublicRooms( req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, + extRoomsProvider api.ExtraPublicRoomsProvider, ) util.JSONResponse { var request PublicRoomReq if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { return *fillErr } - response, err := publicRooms(req.Context(), request, rsAPI, stateAPI) + response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider) if err != nil { + util.GetLogger(req.Context()).WithError(err).Errorf("failed to work out public rooms") return jsonerror.InternalServerError() } return util.JSONResponse{ @@ -61,146 +67,9 @@ func GetPostPublicRooms( } } -// GetPostPublicRoomsWithExternal is the same as GetPostPublicRooms but also mixes in public rooms from the provider supplied. -func GetPostPublicRoomsWithExternal( - req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, - fedClient *gomatrixserverlib.FederationClient, extRoomsProvider api.ExternalPublicRoomsProvider, -) util.JSONResponse { - var request PublicRoomReq - if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { - return *fillErr - } - response, err := publicRooms(req.Context(), request, rsAPI, stateAPI) - if err != nil { - return jsonerror.InternalServerError() - } - - if request.Since != "" { - // TODO: handle pagination tokens sensibly rather than ignoring them. - // ignore paginated requests since we don't handle them yet over federation. - // Only the initial request will contain federated rooms. - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } - } - - // If we have already hit the limit on the number of rooms, bail. - var limit int - if request.Limit > 0 { - limit = int(request.Limit) - len(response.Chunk) - if limit <= 0 { - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } - } - } - - // downcasting `limit` is safe as we know it isn't bigger than request.Limit which is int16 - fedRooms := bulkFetchPublicRoomsFromServers(req.Context(), fedClient, extRoomsProvider.Homeservers(), int16(limit)) - response.Chunk = append(response.Chunk, fedRooms...) - - // de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers - // are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit) - var publicRooms []gomatrixserverlib.PublicRoom - haveRoomIDs := make(map[string]bool) - rand.Shuffle(len(response.Chunk), func(i, j int) { - response.Chunk[i], response.Chunk[j] = response.Chunk[j], response.Chunk[i] - }) - for _, r := range response.Chunk { - if haveRoomIDs[r.RoomID] { - continue - } - haveRoomIDs[r.RoomID] = true - publicRooms = append(publicRooms, r) - } - // sort by member count - sort.SliceStable(publicRooms, func(i, j int) bool { - return publicRooms[i].JoinedMembersCount > publicRooms[j].JoinedMembersCount - }) - - response.Chunk = publicRooms - - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } -} - -// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. -// Returns a list of public rooms up to the limit specified. -func bulkFetchPublicRoomsFromServers( - ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string, limit int16, -) (publicRooms []gomatrixserverlib.PublicRoom) { - // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. - // goroutines send rooms to this channel - roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) - // signalling channel to tell goroutines to stop sending rooms and quit - done := make(chan bool) - // signalling to say when we can close the room channel - var wg sync.WaitGroup - wg.Add(len(homeservers)) - // concurrently query for public rooms - for _, hs := range homeservers { - go func(homeserverDomain string) { - defer wg.Done() - util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") - fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "") - if err != nil { - util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( - "bulkFetchPublicRoomsFromServers: failed to query hs", - ) - return - } - for _, room := range fres.Chunk { - // atomically send a room or stop - select { - case roomCh <- room: - case <-done: - util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") - return - } - } - }(hs) - } - - // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. - // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be - // closed. - go func() { - wg.Wait() - util.GetLogger(ctx).Info("Cleaning up resources") - close(roomCh) - }() - - // fan-in results with timeout. We stop when we reach the limit. -FanIn: - for len(publicRooms) < int(limit) || limit == 0 { - // add a room or timeout - select { - case room, ok := <-roomCh: - if !ok { - util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") - break FanIn - } - publicRooms = append(publicRooms, room) - case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got. - util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early") - break FanIn - case <-ctx.Done(): // the client hung up on us, let's stop. - util.GetLogger(ctx).Info("Client hung up, returning early") - break FanIn - } - } - // tell goroutines to stop - close(done) - - return publicRooms -} - func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, - stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) { + stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, +) (*gomatrixserverlib.RespPublicRooms, error) { var response gomatrixserverlib.RespPublicRooms var limit int16 @@ -216,23 +85,25 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI util.GetLogger(ctx).WithError(err).Error("strconv.ParseInt failed") return nil, err } + err = nil - var queryRes roomserverAPI.QueryPublishedRoomsResponse - err = rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes) - if err != nil { - util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") - return nil, err + var rooms []gomatrixserverlib.PublicRoom + if request.Since == "" { + rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider, stateAPI) + } else { + rooms = getPublicRoomsFromCache() } - response.TotalRoomCountEstimate = len(queryRes.RoomIDs) - roomIDs, prev, next := sliceInto(queryRes.RoomIDs, offset, limit) + response.TotalRoomCountEstimate = len(rooms) + + chunk, prev, next := sliceInto(rooms, offset, limit) if prev >= 0 { response.PrevBatch = "T" + strconv.Itoa(prev) } if next >= 0 { response.NextBatch = "T" + strconv.Itoa(next) } - response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI) + response.Chunk = chunk return &response, err } @@ -273,71 +144,6 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO return nil } -// due to lots of switches -// nolint:gocyclo -func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { - avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} - nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} - canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} - topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""} - guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""} - visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} - joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} - - var stateRes currentstateAPI.QueryBulkStateContentResponse - err := stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: roomIDs, - AllowWildcards: true, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple, - {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"}, - }, - }, &stateRes) - if err != nil { - util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed") - return nil, err - } - chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs)) - i := 0 - for roomID, data := range stateRes.Rooms { - pub := gomatrixserverlib.PublicRoom{ - RoomID: roomID, - } - joinCount := 0 - var joinRule, guestAccess string - for tuple, contentVal := range data { - if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" { - joinCount++ - continue - } - switch tuple { - case avatarTuple: - pub.AvatarURL = contentVal - case nameTuple: - pub.Name = contentVal - case topicTuple: - pub.Topic = contentVal - case canonicalTuple: - pub.CanonicalAlias = contentVal - case visibilityTuple: - pub.WorldReadable = contentVal == "world_readable" - // need both of these to determine whether guests can join - case joinRuleTuple: - joinRule = contentVal - case guestTuple: - guestAccess = contentVal - } - } - if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" { - pub.GuestCanJoin = true - } - pub.JoinedMembersCount = joinCount - chunk[i] = pub - i++ - } - return chunk, nil -} - // sliceInto returns a subslice of `slice` which honours the since/limit values given. // // 0 1 2 3 4 5 6 index @@ -348,7 +154,7 @@ func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI // limit=3&since=6 => G (prev='3', next='') // // A value of '-1' for prev/next indicates no position. -func sliceInto(slice []string, since int64, limit int16) (subset []string, prev, next int) { +func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (subset []gomatrixserverlib.PublicRoom, prev, next int) { prev = -1 next = -1 @@ -371,3 +177,61 @@ func sliceInto(slice []string, since int64, limit int16) (subset []string, prev, subset = slice[since:nextIndex] return } + +func refreshPublicRoomCache( + ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, + stateAPI currentstateAPI.CurrentStateInternalAPI, +) []gomatrixserverlib.PublicRoom { + cacheMu.Lock() + defer cacheMu.Unlock() + var extraRooms []gomatrixserverlib.PublicRoom + if extRoomsProvider != nil { + extraRooms = extRoomsProvider.Rooms() + } + + var queryRes roomserverAPI.QueryPublishedRoomsResponse + err := rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") + return publicRoomsCache + } + pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") + return publicRoomsCache + } + publicRoomsCache = []gomatrixserverlib.PublicRoom{} + publicRoomsCache = append(publicRoomsCache, pubRooms...) + publicRoomsCache = append(publicRoomsCache, extraRooms...) + publicRoomsCache = dedupeAndShuffle(publicRoomsCache) + + // sort by total joined member count (big to small) + sort.SliceStable(publicRoomsCache, func(i, j int) bool { + return publicRoomsCache[i].JoinedMembersCount > publicRoomsCache[j].JoinedMembersCount + }) + return publicRoomsCache +} + +func getPublicRoomsFromCache() []gomatrixserverlib.PublicRoom { + cacheMu.Lock() + defer cacheMu.Unlock() + return publicRoomsCache +} + +func dedupeAndShuffle(in []gomatrixserverlib.PublicRoom) []gomatrixserverlib.PublicRoom { + // de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers + // are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit) + var publicRooms []gomatrixserverlib.PublicRoom + haveRoomIDs := make(map[string]bool) + rand.Shuffle(len(in), func(i, j int) { + in[i], in[j] = in[j], in[i] + }) + for _, r := range in { + if haveRoomIDs[r.RoomID] { + continue + } + haveRoomIDs[r.RoomID] = true + publicRooms = append(publicRooms, r) + } + return publicRooms +} diff --git a/clientapi/routing/directory_public_test.go b/clientapi/routing/directory_public_test.go index f2a1d551..bb3912b8 100644 --- a/clientapi/routing/directory_public_test.go +++ b/clientapi/routing/directory_public_test.go @@ -3,16 +3,26 @@ package routing import ( "reflect" "testing" + + "github.com/matrix-org/gomatrixserverlib" ) +func pubRoom(name string) gomatrixserverlib.PublicRoom { + return gomatrixserverlib.PublicRoom{ + Name: name, + } +} + func TestSliceInto(t *testing.T) { - slice := []string{"a", "b", "c", "d", "e", "f", "g"} + slice := []gomatrixserverlib.PublicRoom{ + pubRoom("a"), pubRoom("b"), pubRoom("c"), pubRoom("d"), pubRoom("e"), pubRoom("f"), pubRoom("g"), + } limit := int16(3) testCases := []struct { since int64 wantPrev int wantNext int - wantSubset []string + wantSubset []gomatrixserverlib.PublicRoom }{ { since: 0, diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 85c5c0d9..754fbca8 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -61,7 +61,7 @@ func Setup( transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, - extRoomsProvider api.ExternalPublicRoomsProvider, + extRoomsProvider api.ExtraPublicRoomsProvider, ) { publicAPIMux.Handle("/client/versions", @@ -313,11 +313,7 @@ func Setup( ).Methods(http.MethodPut, http.MethodOptions) r0mux.Handle("/publicRooms", httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse { - /* TODO: - if extRoomsProvider != nil { - return GetPostPublicRoomsWithExternal(req, stateAPI, fedClient, extRoomsProvider) - } */ - return GetPostPublicRooms(req, rsAPI, stateAPI) + return GetPostPublicRooms(req, rsAPI, stateAPI, extRoomsProvider) }), ).Methods(http.MethodGet, http.MethodPost, http.MethodOptions) |