aboutsummaryrefslogtreecommitdiff
path: root/clientapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-03 12:59:00 +0100
committerGitHub <noreply@github.com>2020-07-03 12:59:00 +0100
commit6c4b8185d7f9b4f66cc673fd13c448dff53472c0 (patch)
tree4c544d155e8c55df7d18f45beb15e473a11f8d0a /clientapi
parent1773fd84b7634a1655e78ee5fe31e6235ed6240c (diff)
Implement ExtraPublicRoomsProvider for p2p demos (#1180)
* Change API and rename to ExtraPublicRoomsProvider * Make dendritejs work again * Maybe make libp2p demo work again * Linting
Diffstat (limited to 'clientapi')
-rw-r--r--clientapi/api/api.go11
-rw-r--r--clientapi/clientapi.go2
-rw-r--r--clientapi/routing/directory_public.go294
-rw-r--r--clientapi/routing/directory_public_test.go14
-rw-r--r--clientapi/routing/routing.go8
5 files changed, 100 insertions, 229 deletions
diff --git a/clientapi/api/api.go b/clientapi/api/api.go
index dae462c0..d96b032f 100644
--- a/clientapi/api/api.go
+++ b/clientapi/api/api.go
@@ -14,9 +14,10 @@
package api
-type ExternalPublicRoomsProvider interface {
- // The list of homeserver domains to query. These servers will receive a request
- // via this API: https://matrix.org/docs/spec/server_server/latest#public-room-directory
- // This will be called -on demand- by clients, so cache appropriately!
- Homeservers() []string
+import "github.com/matrix-org/gomatrixserverlib"
+
+// ExtraPublicRoomsProvider provides a way to inject extra published rooms into /publicRooms requests.
+type ExtraPublicRoomsProvider interface {
+ // Rooms returns the extra rooms. This is called on-demand by clients, so cache appropriately.
+ Rooms() []gomatrixserverlib.PublicRoom
}
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index bbce6dcc..029a73da 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -48,7 +48,7 @@ func AddPublicRoutes(
transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
userAPI userapi.UserInternalAPI,
- extRoomsProvider api.ExternalPublicRoomsProvider,
+ extRoomsProvider api.ExtraPublicRoomsProvider,
) {
syncProducer := &producers.SyncAPIProducer{
Producer: producer,
diff --git a/clientapi/routing/directory_public.go b/clientapi/routing/directory_public.go
index 64600cb4..925c1b8a 100644
--- a/clientapi/routing/directory_public.go
+++ b/clientapi/routing/directory_public.go
@@ -22,7 +22,6 @@ import (
"strconv"
"strings"
"sync"
- "time"
"github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/httputil"
@@ -33,6 +32,11 @@ import (
"github.com/matrix-org/util"
)
+var (
+ cacheMu sync.Mutex
+ publicRoomsCache []gomatrixserverlib.PublicRoom
+)
+
type PublicRoomReq struct {
Since string `json:"since,omitempty"`
Limit int16 `json:"limit,omitempty"`
@@ -46,13 +50,15 @@ type filter struct {
// GetPostPublicRooms implements GET and POST /publicRooms
func GetPostPublicRooms(
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
+ extRoomsProvider api.ExtraPublicRoomsProvider,
) util.JSONResponse {
var request PublicRoomReq
if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil {
return *fillErr
}
- response, err := publicRooms(req.Context(), request, rsAPI, stateAPI)
+ response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider)
if err != nil {
+ util.GetLogger(req.Context()).WithError(err).Errorf("failed to work out public rooms")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
@@ -61,146 +67,9 @@ func GetPostPublicRooms(
}
}
-// GetPostPublicRoomsWithExternal is the same as GetPostPublicRooms but also mixes in public rooms from the provider supplied.
-func GetPostPublicRoomsWithExternal(
- req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
- fedClient *gomatrixserverlib.FederationClient, extRoomsProvider api.ExternalPublicRoomsProvider,
-) util.JSONResponse {
- var request PublicRoomReq
- if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil {
- return *fillErr
- }
- response, err := publicRooms(req.Context(), request, rsAPI, stateAPI)
- if err != nil {
- return jsonerror.InternalServerError()
- }
-
- if request.Since != "" {
- // TODO: handle pagination tokens sensibly rather than ignoring them.
- // ignore paginated requests since we don't handle them yet over federation.
- // Only the initial request will contain federated rooms.
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: response,
- }
- }
-
- // If we have already hit the limit on the number of rooms, bail.
- var limit int
- if request.Limit > 0 {
- limit = int(request.Limit) - len(response.Chunk)
- if limit <= 0 {
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: response,
- }
- }
- }
-
- // downcasting `limit` is safe as we know it isn't bigger than request.Limit which is int16
- fedRooms := bulkFetchPublicRoomsFromServers(req.Context(), fedClient, extRoomsProvider.Homeservers(), int16(limit))
- response.Chunk = append(response.Chunk, fedRooms...)
-
- // de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers
- // are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit)
- var publicRooms []gomatrixserverlib.PublicRoom
- haveRoomIDs := make(map[string]bool)
- rand.Shuffle(len(response.Chunk), func(i, j int) {
- response.Chunk[i], response.Chunk[j] = response.Chunk[j], response.Chunk[i]
- })
- for _, r := range response.Chunk {
- if haveRoomIDs[r.RoomID] {
- continue
- }
- haveRoomIDs[r.RoomID] = true
- publicRooms = append(publicRooms, r)
- }
- // sort by member count
- sort.SliceStable(publicRooms, func(i, j int) bool {
- return publicRooms[i].JoinedMembersCount > publicRooms[j].JoinedMembersCount
- })
-
- response.Chunk = publicRooms
-
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: response,
- }
-}
-
-// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
-// Returns a list of public rooms up to the limit specified.
-func bulkFetchPublicRoomsFromServers(
- ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string, limit int16,
-) (publicRooms []gomatrixserverlib.PublicRoom) {
- // follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
- // goroutines send rooms to this channel
- roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
- // signalling channel to tell goroutines to stop sending rooms and quit
- done := make(chan bool)
- // signalling to say when we can close the room channel
- var wg sync.WaitGroup
- wg.Add(len(homeservers))
- // concurrently query for public rooms
- for _, hs := range homeservers {
- go func(homeserverDomain string) {
- defer wg.Done()
- util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
- fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "")
- if err != nil {
- util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
- "bulkFetchPublicRoomsFromServers: failed to query hs",
- )
- return
- }
- for _, room := range fres.Chunk {
- // atomically send a room or stop
- select {
- case roomCh <- room:
- case <-done:
- util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
- return
- }
- }
- }(hs)
- }
-
- // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
- // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
- // closed.
- go func() {
- wg.Wait()
- util.GetLogger(ctx).Info("Cleaning up resources")
- close(roomCh)
- }()
-
- // fan-in results with timeout. We stop when we reach the limit.
-FanIn:
- for len(publicRooms) < int(limit) || limit == 0 {
- // add a room or timeout
- select {
- case room, ok := <-roomCh:
- if !ok {
- util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
- break FanIn
- }
- publicRooms = append(publicRooms, room)
- case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got.
- util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early")
- break FanIn
- case <-ctx.Done(): // the client hung up on us, let's stop.
- util.GetLogger(ctx).Info("Client hung up, returning early")
- break FanIn
- }
- }
- // tell goroutines to stop
- close(done)
-
- return publicRooms
-}
-
func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
- stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) {
+ stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
+) (*gomatrixserverlib.RespPublicRooms, error) {
var response gomatrixserverlib.RespPublicRooms
var limit int16
@@ -216,23 +85,25 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI
util.GetLogger(ctx).WithError(err).Error("strconv.ParseInt failed")
return nil, err
}
+ err = nil
- var queryRes roomserverAPI.QueryPublishedRoomsResponse
- err = rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes)
- if err != nil {
- util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
- return nil, err
+ var rooms []gomatrixserverlib.PublicRoom
+ if request.Since == "" {
+ rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider, stateAPI)
+ } else {
+ rooms = getPublicRoomsFromCache()
}
- response.TotalRoomCountEstimate = len(queryRes.RoomIDs)
- roomIDs, prev, next := sliceInto(queryRes.RoomIDs, offset, limit)
+ response.TotalRoomCountEstimate = len(rooms)
+
+ chunk, prev, next := sliceInto(rooms, offset, limit)
if prev >= 0 {
response.PrevBatch = "T" + strconv.Itoa(prev)
}
if next >= 0 {
response.NextBatch = "T" + strconv.Itoa(next)
}
- response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI)
+ response.Chunk = chunk
return &response, err
}
@@ -273,71 +144,6 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
return nil
}
-// due to lots of switches
-// nolint:gocyclo
-func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
- avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
- nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
- canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
- topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""}
- guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""}
- visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
- joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
-
- var stateRes currentstateAPI.QueryBulkStateContentResponse
- err := stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
- RoomIDs: roomIDs,
- AllowWildcards: true,
- StateTuples: []gomatrixserverlib.StateKeyTuple{
- nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple,
- {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"},
- },
- }, &stateRes)
- if err != nil {
- util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed")
- return nil, err
- }
- chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs))
- i := 0
- for roomID, data := range stateRes.Rooms {
- pub := gomatrixserverlib.PublicRoom{
- RoomID: roomID,
- }
- joinCount := 0
- var joinRule, guestAccess string
- for tuple, contentVal := range data {
- if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" {
- joinCount++
- continue
- }
- switch tuple {
- case avatarTuple:
- pub.AvatarURL = contentVal
- case nameTuple:
- pub.Name = contentVal
- case topicTuple:
- pub.Topic = contentVal
- case canonicalTuple:
- pub.CanonicalAlias = contentVal
- case visibilityTuple:
- pub.WorldReadable = contentVal == "world_readable"
- // need both of these to determine whether guests can join
- case joinRuleTuple:
- joinRule = contentVal
- case guestTuple:
- guestAccess = contentVal
- }
- }
- if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" {
- pub.GuestCanJoin = true
- }
- pub.JoinedMembersCount = joinCount
- chunk[i] = pub
- i++
- }
- return chunk, nil
-}
-
// sliceInto returns a subslice of `slice` which honours the since/limit values given.
//
// 0 1 2 3 4 5 6 index
@@ -348,7 +154,7 @@ func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI
// limit=3&since=6 => G (prev='3', next='')
//
// A value of '-1' for prev/next indicates no position.
-func sliceInto(slice []string, since int64, limit int16) (subset []string, prev, next int) {
+func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (subset []gomatrixserverlib.PublicRoom, prev, next int) {
prev = -1
next = -1
@@ -371,3 +177,61 @@ func sliceInto(slice []string, since int64, limit int16) (subset []string, prev,
subset = slice[since:nextIndex]
return
}
+
+func refreshPublicRoomCache(
+ ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
+) []gomatrixserverlib.PublicRoom {
+ cacheMu.Lock()
+ defer cacheMu.Unlock()
+ var extraRooms []gomatrixserverlib.PublicRoom
+ if extRoomsProvider != nil {
+ extraRooms = extRoomsProvider.Rooms()
+ }
+
+ var queryRes roomserverAPI.QueryPublishedRoomsResponse
+ err := rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
+ return publicRoomsCache
+ }
+ pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
+ return publicRoomsCache
+ }
+ publicRoomsCache = []gomatrixserverlib.PublicRoom{}
+ publicRoomsCache = append(publicRoomsCache, pubRooms...)
+ publicRoomsCache = append(publicRoomsCache, extraRooms...)
+ publicRoomsCache = dedupeAndShuffle(publicRoomsCache)
+
+ // sort by total joined member count (big to small)
+ sort.SliceStable(publicRoomsCache, func(i, j int) bool {
+ return publicRoomsCache[i].JoinedMembersCount > publicRoomsCache[j].JoinedMembersCount
+ })
+ return publicRoomsCache
+}
+
+func getPublicRoomsFromCache() []gomatrixserverlib.PublicRoom {
+ cacheMu.Lock()
+ defer cacheMu.Unlock()
+ return publicRoomsCache
+}
+
+func dedupeAndShuffle(in []gomatrixserverlib.PublicRoom) []gomatrixserverlib.PublicRoom {
+ // de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers
+ // are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit)
+ var publicRooms []gomatrixserverlib.PublicRoom
+ haveRoomIDs := make(map[string]bool)
+ rand.Shuffle(len(in), func(i, j int) {
+ in[i], in[j] = in[j], in[i]
+ })
+ for _, r := range in {
+ if haveRoomIDs[r.RoomID] {
+ continue
+ }
+ haveRoomIDs[r.RoomID] = true
+ publicRooms = append(publicRooms, r)
+ }
+ return publicRooms
+}
diff --git a/clientapi/routing/directory_public_test.go b/clientapi/routing/directory_public_test.go
index f2a1d551..bb3912b8 100644
--- a/clientapi/routing/directory_public_test.go
+++ b/clientapi/routing/directory_public_test.go
@@ -3,16 +3,26 @@ package routing
import (
"reflect"
"testing"
+
+ "github.com/matrix-org/gomatrixserverlib"
)
+func pubRoom(name string) gomatrixserverlib.PublicRoom {
+ return gomatrixserverlib.PublicRoom{
+ Name: name,
+ }
+}
+
func TestSliceInto(t *testing.T) {
- slice := []string{"a", "b", "c", "d", "e", "f", "g"}
+ slice := []gomatrixserverlib.PublicRoom{
+ pubRoom("a"), pubRoom("b"), pubRoom("c"), pubRoom("d"), pubRoom("e"), pubRoom("f"), pubRoom("g"),
+ }
limit := int16(3)
testCases := []struct {
since int64
wantPrev int
wantNext int
- wantSubset []string
+ wantSubset []gomatrixserverlib.PublicRoom
}{
{
since: 0,
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index 85c5c0d9..754fbca8 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -61,7 +61,7 @@ func Setup(
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
- extRoomsProvider api.ExternalPublicRoomsProvider,
+ extRoomsProvider api.ExtraPublicRoomsProvider,
) {
publicAPIMux.Handle("/client/versions",
@@ -313,11 +313,7 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/publicRooms",
httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse {
- /* TODO:
- if extRoomsProvider != nil {
- return GetPostPublicRoomsWithExternal(req, stateAPI, fedClient, extRoomsProvider)
- } */
- return GetPostPublicRooms(req, rsAPI, stateAPI)
+ return GetPostPublicRooms(req, rsAPI, stateAPI, extRoomsProvider)
}),
).Methods(http.MethodGet, http.MethodPost, http.MethodOptions)