diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-03-28 16:25:26 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-28 16:25:26 +0100 |
commit | 7972915806348847ecd9a9b8a1b1ff0609cb883c (patch) | |
tree | dfbf719e8229dc4faeef133b150b2d7a6dc7eac8 /cmd | |
parent | 0692be44d91a42945dde0eab3e2f7481cc2e0896 (diff) |
User directory for nearby Pinecone peers (P2P demo) (#2311)
* User directory for nearby Pinecone peers
* Fix mux routing
* Use config to determine which server notices user to exclude
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/dendrite-demo-pinecone/main.go | 21 | ||||
-rw-r--r-- | cmd/dendrite-demo-pinecone/users/users.go | 145 | ||||
-rw-r--r-- | cmd/dendrite-polylith-multi/personalities/clientapi.go | 2 |
3 files changed, 160 insertions, 8 deletions
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go index 45f18698..87054dc8 100644 --- a/cmd/dendrite-demo-pinecone/main.go +++ b/cmd/dendrite-demo-pinecone/main.go @@ -35,6 +35,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -198,6 +199,9 @@ func main() { rsComponent.SetFederationAPI(fsAPI, keyRing) + userProvider := users.NewPineconeUserProvider(pRouter, pQUIC, userAPI, federation) + roomProvider := rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -205,13 +209,14 @@ func main() { FedClient: federation, KeyRing: keyRing, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationAPI: fsAPI, - RoomserverAPI: rsAPI, - UserAPI: userAPI, - KeyAPI: keyAPI, - ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation), + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, + ExtPublicRoomsProvider: roomProvider, + ExtUserDirectoryProvider: userProvider, } monolith.AddAllPublicRoutes( base.ProcessContext, @@ -250,10 +255,12 @@ func main() { embed.Embed(httpRouter, *instancePort, "Pinecone Demo") pMux := mux.NewRouter().SkipClean(true).UseEncodedPath() + pMux.PathPrefix(users.PublicURL).HandlerFunc(userProvider.FederatedUserProfiles) pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) pHTTP := pQUIC.HTTP() + pHTTP.Mux().Handle(users.PublicURL, pMux) pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) diff --git a/cmd/dendrite-demo-pinecone/users/users.go b/cmd/dendrite-demo-pinecone/users/users.go new file mode 100644 index 00000000..ffbd27ee --- /dev/null +++ b/cmd/dendrite-demo-pinecone/users/users.go @@ -0,0 +1,145 @@ +package users + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/matrix-org/dendrite/clientapi/auth/authtypes" + clienthttputil "github.com/matrix-org/dendrite/clientapi/httputil" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeSessions "github.com/matrix-org/pinecone/sessions" +) + +type PineconeUserProvider struct { + r *pineconeRouter.Router + s *pineconeSessions.Sessions + userAPI userapi.UserProfileAPI + fedClient *gomatrixserverlib.FederationClient +} + +const PublicURL = "/_matrix/p2p/profiles" + +func NewPineconeUserProvider( + r *pineconeRouter.Router, + s *pineconeSessions.Sessions, + userAPI userapi.UserProfileAPI, + fedClient *gomatrixserverlib.FederationClient, +) *PineconeUserProvider { + p := &PineconeUserProvider{ + r: r, + s: s, + userAPI: userAPI, + fedClient: fedClient, + } + return p +} + +func (p *PineconeUserProvider) FederatedUserProfiles(w http.ResponseWriter, r *http.Request) { + req := &userapi.QuerySearchProfilesRequest{Limit: 25} + res := &userapi.QuerySearchProfilesResponse{} + if err := clienthttputil.UnmarshalJSONRequest(r, &req); err != nil { + w.WriteHeader(400) + return + } + if err := p.userAPI.QuerySearchProfiles(r.Context(), req, res); err != nil { + w.WriteHeader(400) + return + } + j, err := json.Marshal(res) + if err != nil { + w.WriteHeader(400) + return + } + w.WriteHeader(200) + _, _ = w.Write(j) +} + +func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error { + list := map[string]struct{}{} + for _, k := range p.r.Peers() { + list[k.PublicKey] = struct{}{} + } + res.Profiles = bulkFetchUserDirectoriesFromServers(context.Background(), req, p.fedClient, list) + return nil +} + +// bulkFetchUserDirectoriesFromServers fetches users from the list of homeservers. +// Returns a list of user profiles. +func bulkFetchUserDirectoriesFromServers( + ctx context.Context, req *userapi.QuerySearchProfilesRequest, + fedClient *gomatrixserverlib.FederationClient, + homeservers map[string]struct{}, +) (profiles []authtypes.Profile) { + jsonBody, err := json.Marshal(req) + if err != nil { + return nil + } + + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + profileCh := make(chan authtypes.Profile, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5) + for hs := range homeservers { + go func(homeserverDomain string) { + defer wg.Done() + util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for users") + + jsonBodyReader := bytes.NewBuffer(jsonBody) + httpReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("matrix://%s%s", homeserverDomain, PublicURL), jsonBodyReader) + if err != nil { + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchUserDirectoriesFromServers: failed to create request", + ) + } + res := &userapi.QuerySearchProfilesResponse{} + if err = fedClient.DoRequestAndParseResponse(reqctx, httpReq, res); err != nil { + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchUserDirectoriesFromServers: failed to query hs", + ) + return + } + for _, profile := range res.Profiles { + profile.ServerName = homeserverDomain + // atomically send a room or stop + select { + case profileCh <- profile: + case <-done: + case <-reqctx.Done(): + util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending profiles") + return + } + } + }(hs) + } + + select { + case <-time.After(5 * time.Second): + default: + wg.Wait() + } + reqcancel() + close(done) + close(profileCh) + + for profile := range profileCh { + profiles = append(profiles, profile) + } + + return profiles +} diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go index a2036de3..978d8b0a 100644 --- a/cmd/dendrite-polylith-multi/personalities/clientapi.go +++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go @@ -33,7 +33,7 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) { clientapi.AddPublicRoutes( base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI, - federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, + federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI, keyAPI, nil, &cfg.MSCs, ) |