aboutsummaryrefslogtreecommitdiff
path: root/cmd/dendrite-demo-pinecone/users/users.go
blob: ffbd27ee9fb351d3049d6f83d005b48981dd1d96 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package users

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
	clienthttputil "github.com/matrix-org/dendrite/clientapi/httputil"
	userapi "github.com/matrix-org/dendrite/userapi/api"
	"github.com/matrix-org/gomatrixserverlib"
	"github.com/matrix-org/util"

	pineconeRouter "github.com/matrix-org/pinecone/router"
	pineconeSessions "github.com/matrix-org/pinecone/sessions"
)

type PineconeUserProvider struct {
	r         *pineconeRouter.Router
	s         *pineconeSessions.Sessions
	userAPI   userapi.UserProfileAPI
	fedClient *gomatrixserverlib.FederationClient
}

const PublicURL = "/_matrix/p2p/profiles"

func NewPineconeUserProvider(
	r *pineconeRouter.Router,
	s *pineconeSessions.Sessions,
	userAPI userapi.UserProfileAPI,
	fedClient *gomatrixserverlib.FederationClient,
) *PineconeUserProvider {
	p := &PineconeUserProvider{
		r:         r,
		s:         s,
		userAPI:   userAPI,
		fedClient: fedClient,
	}
	return p
}

func (p *PineconeUserProvider) FederatedUserProfiles(w http.ResponseWriter, r *http.Request) {
	req := &userapi.QuerySearchProfilesRequest{Limit: 25}
	res := &userapi.QuerySearchProfilesResponse{}
	if err := clienthttputil.UnmarshalJSONRequest(r, &req); err != nil {
		w.WriteHeader(400)
		return
	}
	if err := p.userAPI.QuerySearchProfiles(r.Context(), req, res); err != nil {
		w.WriteHeader(400)
		return
	}
	j, err := json.Marshal(res)
	if err != nil {
		w.WriteHeader(400)
		return
	}
	w.WriteHeader(200)
	_, _ = w.Write(j)
}

func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error {
	list := map[string]struct{}{}
	for _, k := range p.r.Peers() {
		list[k.PublicKey] = struct{}{}
	}
	res.Profiles = bulkFetchUserDirectoriesFromServers(context.Background(), req, p.fedClient, list)
	return nil
}

// bulkFetchUserDirectoriesFromServers fetches users from the list of homeservers.
// Returns a list of user profiles.
func bulkFetchUserDirectoriesFromServers(
	ctx context.Context, req *userapi.QuerySearchProfilesRequest,
	fedClient *gomatrixserverlib.FederationClient,
	homeservers map[string]struct{},
) (profiles []authtypes.Profile) {
	jsonBody, err := json.Marshal(req)
	if err != nil {
		return nil
	}

	limit := 200
	// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
	// goroutines send rooms to this channel
	profileCh := make(chan authtypes.Profile, int(limit))
	// signalling channel to tell goroutines to stop sending rooms and quit
	done := make(chan bool)
	// signalling to say when we can close the room channel
	var wg sync.WaitGroup
	wg.Add(len(homeservers))
	// concurrently query for public rooms
	reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5)
	for hs := range homeservers {
		go func(homeserverDomain string) {
			defer wg.Done()
			util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for users")

			jsonBodyReader := bytes.NewBuffer(jsonBody)
			httpReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("matrix://%s%s", homeserverDomain, PublicURL), jsonBodyReader)
			if err != nil {
				util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
					"bulkFetchUserDirectoriesFromServers: failed to create request",
				)
			}
			res := &userapi.QuerySearchProfilesResponse{}
			if err = fedClient.DoRequestAndParseResponse(reqctx, httpReq, res); err != nil {
				util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
					"bulkFetchUserDirectoriesFromServers: failed to query hs",
				)
				return
			}
			for _, profile := range res.Profiles {
				profile.ServerName = homeserverDomain
				// atomically send a room or stop
				select {
				case profileCh <- profile:
				case <-done:
				case <-reqctx.Done():
					util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending profiles")
					return
				}
			}
		}(hs)
	}

	select {
	case <-time.After(5 * time.Second):
	default:
		wg.Wait()
	}
	reqcancel()
	close(done)
	close(profileCh)

	for profile := range profileCh {
		profiles = append(profiles, profile)
	}

	return profiles
}