aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-03-28 16:25:26 +0100
committerGitHub <noreply@github.com>2022-03-28 16:25:26 +0100
commit7972915806348847ecd9a9b8a1b1ff0609cb883c (patch)
treedfbf719e8229dc4faeef133b150b2d7a6dc7eac8 /cmd
parent0692be44d91a42945dde0eab3e2f7481cc2e0896 (diff)
User directory for nearby Pinecone peers (P2P demo) (#2311)
* User directory for nearby Pinecone peers * Fix mux routing * Use config to determine which server notices user to exclude
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dendrite-demo-pinecone/main.go21
-rw-r--r--cmd/dendrite-demo-pinecone/users/users.go145
-rw-r--r--cmd/dendrite-polylith-multi/personalities/clientapi.go2
3 files changed, 160 insertions, 8 deletions
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go
index 45f18698..87054dc8 100644
--- a/cmd/dendrite-demo-pinecone/main.go
+++ b/cmd/dendrite-demo-pinecone/main.go
@@ -35,6 +35,7 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
@@ -198,6 +199,9 @@ func main() {
rsComponent.SetFederationAPI(fsAPI, keyRing)
+ userProvider := users.NewPineconeUserProvider(pRouter, pQUIC, userAPI, federation)
+ roomProvider := rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation)
+
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
@@ -205,13 +209,14 @@ func main() {
FedClient: federation,
KeyRing: keyRing,
- AppserviceAPI: asAPI,
- EDUInternalAPI: eduInputAPI,
- FederationAPI: fsAPI,
- RoomserverAPI: rsAPI,
- UserAPI: userAPI,
- KeyAPI: keyAPI,
- ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation),
+ AppserviceAPI: asAPI,
+ EDUInternalAPI: eduInputAPI,
+ FederationAPI: fsAPI,
+ RoomserverAPI: rsAPI,
+ UserAPI: userAPI,
+ KeyAPI: keyAPI,
+ ExtPublicRoomsProvider: roomProvider,
+ ExtUserDirectoryProvider: userProvider,
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
@@ -250,10 +255,12 @@ func main() {
embed.Embed(httpRouter, *instancePort, "Pinecone Demo")
pMux := mux.NewRouter().SkipClean(true).UseEncodedPath()
+ pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles)
pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux)
pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
pHTTP := pQUIC.HTTP()
+ pHTTP.Mux().Handle(users.PublicURL, pMux)
pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux)
pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux)
diff --git a/cmd/dendrite-demo-pinecone/users/users.go b/cmd/dendrite-demo-pinecone/users/users.go
new file mode 100644
index 00000000..ffbd27ee
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/users/users.go
@@ -0,0 +1,145 @@
+package users
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/matrix-org/dendrite/clientapi/auth/authtypes"
+ clienthttputil "github.com/matrix-org/dendrite/clientapi/httputil"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+
+ pineconeRouter "github.com/matrix-org/pinecone/router"
+ pineconeSessions "github.com/matrix-org/pinecone/sessions"
+)
+
+type PineconeUserProvider struct {
+ r *pineconeRouter.Router
+ s *pineconeSessions.Sessions
+ userAPI userapi.UserProfileAPI
+ fedClient *gomatrixserverlib.FederationClient
+}
+
+const PublicURL = "/_matrix/p2p/profiles"
+
+func NewPineconeUserProvider(
+ r *pineconeRouter.Router,
+ s *pineconeSessions.Sessions,
+ userAPI userapi.UserProfileAPI,
+ fedClient *gomatrixserverlib.FederationClient,
+) *PineconeUserProvider {
+ p := &PineconeUserProvider{
+ r: r,
+ s: s,
+ userAPI: userAPI,
+ fedClient: fedClient,
+ }
+ return p
+}
+
+func (p *PineconeUserProvider) FederatedUserProfiles(w http.ResponseWriter, r *http.Request) {
+ req := &userapi.QuerySearchProfilesRequest{Limit: 25}
+ res := &userapi.QuerySearchProfilesResponse{}
+ if err := clienthttputil.UnmarshalJSONRequest(r, &req); err != nil {
+ w.WriteHeader(400)
+ return
+ }
+ if err := p.userAPI.QuerySearchProfiles(r.Context(), req, res); err != nil {
+ w.WriteHeader(400)
+ return
+ }
+ j, err := json.Marshal(res)
+ if err != nil {
+ w.WriteHeader(400)
+ return
+ }
+ w.WriteHeader(200)
+ _, _ = w.Write(j)
+}
+
+func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error {
+ list := map[string]struct{}{}
+ for _, k := range p.r.Peers() {
+ list[k.PublicKey] = struct{}{}
+ }
+ res.Profiles = bulkFetchUserDirectoriesFromServers(context.Background(), req, p.fedClient, list)
+ return nil
+}
+
+// bulkFetchUserDirectoriesFromServers fetches users from the list of homeservers.
+// Returns a list of user profiles.
+func bulkFetchUserDirectoriesFromServers(
+ ctx context.Context, req *userapi.QuerySearchProfilesRequest,
+ fedClient *gomatrixserverlib.FederationClient,
+ homeservers map[string]struct{},
+) (profiles []authtypes.Profile) {
+ jsonBody, err := json.Marshal(req)
+ if err != nil {
+ return nil
+ }
+
+ limit := 200
+ // follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
+ // goroutines send rooms to this channel
+ profileCh := make(chan authtypes.Profile, int(limit))
+ // signalling channel to tell goroutines to stop sending rooms and quit
+ done := make(chan bool)
+ // signalling to say when we can close the room channel
+ var wg sync.WaitGroup
+ wg.Add(len(homeservers))
+ // concurrently query for public rooms
+ reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5)
+ for hs := range homeservers {
+ go func(homeserverDomain string) {
+ defer wg.Done()
+ util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for users")
+
+ jsonBodyReader := bytes.NewBuffer(jsonBody)
+ httpReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("matrix://%s%s", homeserverDomain, PublicURL), jsonBodyReader)
+ if err != nil {
+ util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
+ "bulkFetchUserDirectoriesFromServers: failed to create request",
+ )
+ }
+ res := &userapi.QuerySearchProfilesResponse{}
+ if err = fedClient.DoRequestAndParseResponse(reqctx, httpReq, res); err != nil {
+ util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
+ "bulkFetchUserDirectoriesFromServers: failed to query hs",
+ )
+ return
+ }
+ for _, profile := range res.Profiles {
+ profile.ServerName = homeserverDomain
+ // atomically send a room or stop
+ select {
+ case profileCh <- profile:
+ case <-done:
+ case <-reqctx.Done():
+ util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending profiles")
+ return
+ }
+ }
+ }(hs)
+ }
+
+ select {
+ case <-time.After(5 * time.Second):
+ default:
+ wg.Wait()
+ }
+ reqcancel()
+ close(done)
+ close(profileCh)
+
+ for profile := range profileCh {
+ profiles = append(profiles, profile)
+ }
+
+ return profiles
+}
diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go
index a2036de3..978d8b0a 100644
--- a/cmd/dendrite-polylith-multi/personalities/clientapi.go
+++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go
@@ -33,7 +33,7 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
clientapi.AddPublicRoutes(
base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI,
- federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI,
+ federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI,
keyAPI, nil, &cfg.MSCs,
)