1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
package users
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
clienthttputil "github.com/matrix-org/dendrite/clientapi/httputil"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
pineconeRouter "github.com/matrix-org/pinecone/router"
pineconeSessions "github.com/matrix-org/pinecone/sessions"
)
type PineconeUserProvider struct {
r *pineconeRouter.Router
s *pineconeSessions.Sessions
userAPI userapi.UserProfileAPI
fedClient *gomatrixserverlib.FederationClient
}
const PublicURL = "/_matrix/p2p/profiles"
func NewPineconeUserProvider(
r *pineconeRouter.Router,
s *pineconeSessions.Sessions,
userAPI userapi.UserProfileAPI,
fedClient *gomatrixserverlib.FederationClient,
) *PineconeUserProvider {
p := &PineconeUserProvider{
r: r,
s: s,
userAPI: userAPI,
fedClient: fedClient,
}
return p
}
func (p *PineconeUserProvider) FederatedUserProfiles(w http.ResponseWriter, r *http.Request) {
req := &userapi.QuerySearchProfilesRequest{Limit: 25}
res := &userapi.QuerySearchProfilesResponse{}
if err := clienthttputil.UnmarshalJSONRequest(r, &req); err != nil {
w.WriteHeader(400)
return
}
if err := p.userAPI.QuerySearchProfiles(r.Context(), req, res); err != nil {
w.WriteHeader(400)
return
}
j, err := json.Marshal(res)
if err != nil {
w.WriteHeader(400)
return
}
w.WriteHeader(200)
_, _ = w.Write(j)
}
func (p *PineconeUserProvider) QuerySearchProfiles(ctx context.Context, req *userapi.QuerySearchProfilesRequest, res *userapi.QuerySearchProfilesResponse) error {
list := map[string]struct{}{}
for _, k := range p.r.Peers() {
list[k.PublicKey] = struct{}{}
}
res.Profiles = bulkFetchUserDirectoriesFromServers(context.Background(), req, p.fedClient, list)
return nil
}
// bulkFetchUserDirectoriesFromServers fetches users from the list of homeservers.
// Returns a list of user profiles.
func bulkFetchUserDirectoriesFromServers(
ctx context.Context, req *userapi.QuerySearchProfilesRequest,
fedClient *gomatrixserverlib.FederationClient,
homeservers map[string]struct{},
) (profiles []authtypes.Profile) {
jsonBody, err := json.Marshal(req)
if err != nil {
return nil
}
limit := 200
// follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
// goroutines send rooms to this channel
profileCh := make(chan authtypes.Profile, int(limit))
// signalling channel to tell goroutines to stop sending rooms and quit
done := make(chan bool)
// signalling to say when we can close the room channel
var wg sync.WaitGroup
wg.Add(len(homeservers))
// concurrently query for public rooms
reqctx, reqcancel := context.WithTimeout(ctx, time.Second*5)
for hs := range homeservers {
go func(homeserverDomain string) {
defer wg.Done()
util.GetLogger(reqctx).WithField("hs", homeserverDomain).Info("Querying HS for users")
jsonBodyReader := bytes.NewBuffer(jsonBody)
httpReq, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("matrix://%s%s", homeserverDomain, PublicURL), jsonBodyReader)
if err != nil {
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchUserDirectoriesFromServers: failed to create request",
)
}
res := &userapi.QuerySearchProfilesResponse{}
if err = fedClient.DoRequestAndParseResponse(reqctx, httpReq, res); err != nil {
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Warn(
"bulkFetchUserDirectoriesFromServers: failed to query hs",
)
return
}
for _, profile := range res.Profiles {
profile.ServerName = homeserverDomain
// atomically send a room or stop
select {
case profileCh <- profile:
case <-done:
case <-reqctx.Done():
util.GetLogger(reqctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending profiles")
return
}
}
}(hs)
}
select {
case <-time.After(5 * time.Second):
default:
wg.Wait()
}
reqcancel()
close(done)
close(profileCh)
for profile := range profileCh {
profiles = append(profiles, profile)
}
return profiles
}
|