diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-07-03 14:28:43 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-03 14:28:43 +0100 |
commit | 3a28ddfb7a01999f4681333e7c8526b2a430f357 (patch) | |
tree | 1c8486a8867fced55279849f8891ea985ee6a666 /cmd | |
parent | 3797c38ec86977e92a06679d8cc0642ed6ce6b63 (diff) |
Yggdrasil demo initial public room directory (#1181)
* Don't return null to public directory request
* Initial support for finding public rooms in Yggdrasil demo (incomplete)
* Increase QUIC idle time to 15 minutes
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/main.go | 4 | ||||
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/yggconn/node.go | 29 | ||||
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/yggconn/session.go | 4 | ||||
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go | 120 |
4 files changed, 156 insertions, 1 deletions
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 7a527d87..d3e9e549 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms" "github.com/matrix-org/dendrite/currentstateserver" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -131,6 +132,9 @@ func main() { UserAPI: userAPI, StateAPI: stateAPI, //ServerKeyAPI: serverKeyAPI, + ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( + ygg, fsAPI, federation, + ), } monolith.AddAllPublicRoutes(base.PublicAPIMux) diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index eb176493..da184fd5 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -31,6 +31,7 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert" + "github.com/matrix-org/gomatrixserverlib" yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin" yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" @@ -140,7 +141,7 @@ func Setup(instanceName, instancePeer, storageDirectory string) (*Node, error) { MaxIncomingStreams: 0, MaxIncomingUniStreams: 0, KeepAlive: true, - MaxIdleTimeout: time.Second * 120, + MaxIdleTimeout: time.Second * 900, HandshakeTimeout: time.Second * 30, } @@ -183,3 +184,29 @@ func (n *Node) SigningPrivateKey() ed25519.PrivateKey { func (n *Node) PeerCount() int { return len(n.core.GetPeers()) - 1 } + +func (n *Node) KnownNodes() []gomatrixserverlib.ServerName { + nodemap := map[string]struct{}{} + /* + for _, peer := range n.core.GetSwitchPeers() { + nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{} + } + */ + n.sessions.Range(func(_, v interface{}) bool { + session, ok := v.(quic.Session) + if !ok { + return true + } + if len(session.ConnectionState().PeerCertificates) != 1 { + return true + } + subjectName := session.ConnectionState().PeerCertificates[0].Subject.CommonName + nodemap[subjectName] = struct{}{} + return true + }) + var nodes []gomatrixserverlib.ServerName + for node := range nodemap { + nodes = append(nodes, gomatrixserverlib.ServerName(node)) + } + return nodes +} diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go index 857b2cc9..01cec813 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go @@ -20,6 +20,7 @@ import ( "crypto/rsa" "crypto/tls" "crypto/x509" + "crypto/x509/pkix" "encoding/hex" "encoding/pem" "errors" @@ -127,6 +128,9 @@ func (n *Node) generateTLSConfig() *tls.Config { panic(err) } template := x509.Certificate{ + Subject: pkix.Name{ + CommonName: n.DerivedServerName(), + }, SerialNumber: big.NewInt(1), NotAfter: time.Now().Add(time.Hour * 24 * 365), DNSNames: []string{n.DerivedSessionName()}, diff --git a/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go b/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go new file mode 100644 index 00000000..0174e84d --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go @@ -0,0 +1,120 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package yggrooms + +import ( + "context" + "sync" + "time" + + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" + "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +type YggdrasilRoomProvider struct { + node *yggconn.Node + fedSender api.FederationSenderInternalAPI + fedClient *gomatrixserverlib.FederationClient +} + +func NewYggdrasilRoomProvider( + node *yggconn.Node, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient, +) *YggdrasilRoomProvider { + p := &YggdrasilRoomProvider{ + node: node, + fedSender: fedSender, + fedClient: fedClient, + } + return p +} + +func (p *YggdrasilRoomProvider) Rooms() []gomatrixserverlib.PublicRoom { + return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.node.KnownNodes()) +} + +// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. +// Returns a list of public rooms. +func bulkFetchPublicRoomsFromServers( + ctx context.Context, fedClient *gomatrixserverlib.FederationClient, + homeservers []gomatrixserverlib.ServerName, +) (publicRooms []gomatrixserverlib.PublicRoom) { + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + for _, hs := range homeservers { + go func(homeserverDomain gomatrixserverlib.ServerName) { + defer wg.Done() + util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") + fres, err := fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "") + if err != nil { + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchPublicRoomsFromServers: failed to query hs", + ) + return + } + for _, room := range fres.Chunk { + // atomically send a room or stop + select { + case roomCh <- room: + case <-done: + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") + return + } + } + }(hs) + } + + // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. + // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be + // closed. + go func() { + wg.Wait() + util.GetLogger(ctx).Info("Cleaning up resources") + close(roomCh) + }() + + // fan-in results with timeout. We stop when we reach the limit. +FanIn: + for len(publicRooms) < int(limit) || limit == 0 { + // add a room or timeout + select { + case room, ok := <-roomCh: + if !ok { + util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") + break FanIn + } + publicRooms = append(publicRooms, room) + case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got. + util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early") + break FanIn + case <-ctx.Done(): // the client hung up on us, let's stop. + util.GetLogger(ctx).Info("Client hung up, returning early") + break FanIn + } + } + // tell goroutines to stop + close(done) + + return publicRooms +} |