aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-07-03 14:28:43 +0100
committerGitHub <noreply@github.com>2020-07-03 14:28:43 +0100
commit3a28ddfb7a01999f4681333e7c8526b2a430f357 (patch)
tree1c8486a8867fced55279849f8891ea985ee6a666 /cmd
parent3797c38ec86977e92a06679d8cc0642ed6ce6b63 (diff)
Yggdrasil demo initial public room directory (#1181)
* Don't return null to public directory request * Initial support for finding public rooms in Yggdrasil demo (incomplete) * Increase QUIC idle time to 15 minutes
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go4
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggconn/node.go29
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggconn/session.go4
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go120
4 files changed, 156 insertions, 1 deletions
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index 7a527d87..d3e9e549 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
"github.com/matrix-org/dendrite/currentstateserver"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
@@ -131,6 +132,9 @@ func main() {
UserAPI: userAPI,
StateAPI: stateAPI,
//ServerKeyAPI: serverKeyAPI,
+ ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
+ ygg, fsAPI, federation,
+ ),
}
monolith.AddAllPublicRoutes(base.PublicAPIMux)
diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go
index eb176493..da184fd5 100644
--- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go
+++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go
@@ -31,6 +31,7 @@ import (
"github.com/lucas-clemente/quic-go"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
+ "github.com/matrix-org/gomatrixserverlib"
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
@@ -140,7 +141,7 @@ func Setup(instanceName, instancePeer, storageDirectory string) (*Node, error) {
MaxIncomingStreams: 0,
MaxIncomingUniStreams: 0,
KeepAlive: true,
- MaxIdleTimeout: time.Second * 120,
+ MaxIdleTimeout: time.Second * 900,
HandshakeTimeout: time.Second * 30,
}
@@ -183,3 +184,29 @@ func (n *Node) SigningPrivateKey() ed25519.PrivateKey {
func (n *Node) PeerCount() int {
return len(n.core.GetPeers()) - 1
}
+
+func (n *Node) KnownNodes() []gomatrixserverlib.ServerName {
+ nodemap := map[string]struct{}{}
+ /*
+ for _, peer := range n.core.GetSwitchPeers() {
+ nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{}
+ }
+ */
+ n.sessions.Range(func(_, v interface{}) bool {
+ session, ok := v.(quic.Session)
+ if !ok {
+ return true
+ }
+ if len(session.ConnectionState().PeerCertificates) != 1 {
+ return true
+ }
+ subjectName := session.ConnectionState().PeerCertificates[0].Subject.CommonName
+ nodemap[subjectName] = struct{}{}
+ return true
+ })
+ var nodes []gomatrixserverlib.ServerName
+ for node := range nodemap {
+ nodes = append(nodes, gomatrixserverlib.ServerName(node))
+ }
+ return nodes
+}
diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go
index 857b2cc9..01cec813 100644
--- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go
+++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go
@@ -20,6 +20,7 @@ import (
"crypto/rsa"
"crypto/tls"
"crypto/x509"
+ "crypto/x509/pkix"
"encoding/hex"
"encoding/pem"
"errors"
@@ -127,6 +128,9 @@ func (n *Node) generateTLSConfig() *tls.Config {
panic(err)
}
template := x509.Certificate{
+ Subject: pkix.Name{
+ CommonName: n.DerivedServerName(),
+ },
SerialNumber: big.NewInt(1),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
DNSNames: []string{n.DerivedSessionName()},
diff --git a/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go b/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go
new file mode 100644
index 00000000..0174e84d
--- /dev/null
+++ b/cmd/dendrite-demo-yggdrasil/yggrooms/yggrooms.go
@@ -0,0 +1,120 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package yggrooms
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
+ "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+type YggdrasilRoomProvider struct {
+ node *yggconn.Node
+ fedSender api.FederationSenderInternalAPI
+ fedClient *gomatrixserverlib.FederationClient
+}
+
+func NewYggdrasilRoomProvider(
+ node *yggconn.Node, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient,
+) *YggdrasilRoomProvider {
+ p := &YggdrasilRoomProvider{
+ node: node,
+ fedSender: fedSender,
+ fedClient: fedClient,
+ }
+ return p
+}
+
+func (p *YggdrasilRoomProvider) Rooms() []gomatrixserverlib.PublicRoom {
+ return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.node.KnownNodes())
+}
+
+// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
+// Returns a list of public rooms.
+func bulkFetchPublicRoomsFromServers(
+ ctx context.Context, fedClient *gomatrixserverlib.FederationClient,
+ homeservers []gomatrixserverlib.ServerName,
+) (publicRooms []gomatrixserverlib.PublicRoom) {
+ limit := 200
+ // follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
+ // goroutines send rooms to this channel
+ roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
+ // signalling channel to tell goroutines to stop sending rooms and quit
+ done := make(chan bool)
+ // signalling to say when we can close the room channel
+ var wg sync.WaitGroup
+ wg.Add(len(homeservers))
+ // concurrently query for public rooms
+ for _, hs := range homeservers {
+ go func(homeserverDomain gomatrixserverlib.ServerName) {
+ defer wg.Done()
+ util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
+ fres, err := fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "")
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
+ "bulkFetchPublicRoomsFromServers: failed to query hs",
+ )
+ return
+ }
+ for _, room := range fres.Chunk {
+ // atomically send a room or stop
+ select {
+ case roomCh <- room:
+ case <-done:
+ util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
+ return
+ }
+ }
+ }(hs)
+ }
+
+ // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
+ // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
+ // closed.
+ go func() {
+ wg.Wait()
+ util.GetLogger(ctx).Info("Cleaning up resources")
+ close(roomCh)
+ }()
+
+ // fan-in results with timeout. We stop when we reach the limit.
+FanIn:
+ for len(publicRooms) < int(limit) || limit == 0 {
+ // add a room or timeout
+ select {
+ case room, ok := <-roomCh:
+ if !ok {
+ util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
+ break FanIn
+ }
+ publicRooms = append(publicRooms, room)
+ case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got.
+ util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early")
+ break FanIn
+ case <-ctx.Done(): // the client hung up on us, let's stop.
+ util.GetLogger(ctx).Info("Client hung up, returning early")
+ break FanIn
+ }
+ }
+ // tell goroutines to stop
+ close(done)
+
+ return publicRooms
+}