aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-05-06 12:00:42 +0100
committerGitHub <noreply@github.com>2021-05-06 12:00:42 +0100
commit1002e87b60659291af964c6d07c3a9057a3ed9b7 (patch)
tree81ca40385ac4072826e05d46c0d0a1562f13fa4e /cmd
parent464b908bd0c13854b3f6b9a17467f39e0608dc08 (diff)
Pinecone P2P demo (#1856)
* Pinecone demo * Enable multicast, fix HTTP routing * Fix multicast import * Fix build * Update Pinecone demo * Fix the keys * Tweaks * Pinecone room directory support (early) * Fix gobind-pinecone * Add pinecone listener * Fix public key value * Use AuthenticatedConnect for dial * Fix gobind-pinecone * Stop panics * Give fsAPI to keyserver * Pinecone demo fixes * Update gobind build scripts * Account creation * Tweaks * Setup tweaks * API tweaks * API tweaks * API tweaks * Port mutex * Re-enable multicast * Add ReadCopy * Update quic-go, fixes * Shutdowns fixed for iOS * Update build script * Add WebSocket support * Bug fixes * Netconn context * Fix WebSocket connectivity * Fixes to gobind API * Strip frameworks * Configurability updates * Update go.mod * Update go.mod/go.sum * Update go.mod/go.sum * Update go.mod/go.sum * Try to stay connected tto static peer * Update gobind-pinecone * Update go.mod/go.sum * Test uTP+TLS * Use HTTP/2 * Don't use HTTP/2 * Update go.mod/go.sum * Attempt to reconnect to the static peer if it drops * Stay connected to static peers more stickily * Retry room directory lookups if they fail * NewQUIC -> NewSessions * Storage updates * Don't return immediately when there's nothing to sync * Updates * Try to reconnect to static peer more * Update go.mod/go.sum * Require Go 1.14 * Update go.mod/go.sum * Update go.mod/go.sum
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dendrite-demo-pinecone/conn/client.go91
-rw-r--r--cmd/dendrite-demo-pinecone/conn/ws.go81
-rw-r--r--cmd/dendrite-demo-pinecone/embed/embed_other.go9
-rw-r--r--cmd/dendrite-demo-pinecone/embed/embed_riotweb.go83
-rw-r--r--cmd/dendrite-demo-pinecone/main.go279
-rw-r--r--cmd/dendrite-demo-pinecone/rooms/rooms.go150
-rw-r--r--cmd/dendrite-demo-yggdrasil/README.md2
7 files changed, 694 insertions, 1 deletions
diff --git a/cmd/dendrite-demo-pinecone/conn/client.go b/cmd/dendrite-demo-pinecone/conn/client.go
new file mode 100644
index 00000000..bf23085d
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/conn/client.go
@@ -0,0 +1,91 @@
+package conn
+
+import (
+ "fmt"
+ "net"
+ "net/http"
+ "strings"
+
+ "github.com/gorilla/websocket"
+ "github.com/matrix-org/dendrite/setup"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ pineconeRouter "github.com/matrix-org/pinecone/router"
+ pineconeSessions "github.com/matrix-org/pinecone/sessions"
+)
+
+func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error {
+ var parent net.Conn
+ if strings.HasPrefix(peer, "ws://") || strings.HasPrefix(peer, "wss://") {
+ c, _, err := websocket.DefaultDialer.Dial(peer, nil)
+ if err != nil {
+ return fmt.Errorf("websocket.DefaultDialer.Dial: %w", err)
+ }
+ parent = WrapWebSocketConn(c)
+ } else {
+ var err error
+ parent, err = net.Dial("tcp", peer)
+ if err != nil {
+ return fmt.Errorf("net.Dial: %w", err)
+ }
+ }
+ if parent == nil {
+ return fmt.Errorf("failed to wrap connection")
+ }
+ _, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote)
+ return err
+}
+
+type RoundTripper struct {
+ inner *http.Transport
+}
+
+func (y *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
+ req.URL.Scheme = "http"
+ return y.inner.RoundTrip(req)
+}
+
+func CreateClient(
+ base *setup.BaseDendrite, s *pineconeSessions.Sessions,
+) *gomatrixserverlib.Client {
+ tr := &http.Transport{}
+ tr.RegisterProtocol(
+ "matrix", &RoundTripper{
+ inner: &http.Transport{
+ MaxIdleConns: 100,
+ MaxIdleConnsPerHost: 5,
+ Dial: s.Dial,
+ DialContext: s.DialContext,
+ DialTLS: s.DialTLS,
+ DialTLSContext: s.DialTLSContext,
+ },
+ },
+ )
+ return gomatrixserverlib.NewClient(
+ gomatrixserverlib.WithTransport(tr),
+ )
+}
+
+func CreateFederationClient(
+ base *setup.BaseDendrite, s *pineconeSessions.Sessions,
+) *gomatrixserverlib.FederationClient {
+ tr := &http.Transport{}
+ tr.RegisterProtocol(
+ "matrix", &RoundTripper{
+ inner: &http.Transport{
+ MaxIdleConns: 100,
+ MaxIdleConnsPerHost: 5,
+ Dial: s.Dial,
+ DialContext: s.DialContext,
+ DialTLS: s.DialTLS,
+ DialTLSContext: s.DialTLSContext,
+ },
+ },
+ )
+ return gomatrixserverlib.NewFederationClient(
+ base.Cfg.Global.ServerName,
+ base.Cfg.Global.KeyID,
+ base.Cfg.Global.PrivateKey,
+ gomatrixserverlib.WithTransport(tr),
+ )
+}
diff --git a/cmd/dendrite-demo-pinecone/conn/ws.go b/cmd/dendrite-demo-pinecone/conn/ws.go
new file mode 100644
index 00000000..ef403e29
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/conn/ws.go
@@ -0,0 +1,81 @@
+package conn
+
+import (
+ "io"
+ "net"
+ "time"
+
+ "github.com/gorilla/websocket"
+)
+
+func WrapWebSocketConn(c *websocket.Conn) *WebSocketConn {
+ return &WebSocketConn{c: c}
+}
+
+type WebSocketConn struct {
+ r io.Reader
+ c *websocket.Conn
+}
+
+func (c *WebSocketConn) Write(p []byte) (int, error) {
+ err := c.c.WriteMessage(websocket.BinaryMessage, p)
+ if err != nil {
+ return 0, err
+ }
+ return len(p), nil
+}
+
+func (c *WebSocketConn) Read(p []byte) (int, error) {
+ for {
+ if c.r == nil {
+ // Advance to next message.
+ var err error
+ _, c.r, err = c.c.NextReader()
+ if err != nil {
+ return 0, err
+ }
+ }
+ n, err := c.r.Read(p)
+ if err == io.EOF {
+ // At end of message.
+ c.r = nil
+ if n > 0 {
+ return n, nil
+ } else {
+ // No data read, continue to next message.
+ continue
+ }
+ }
+ return n, err
+ }
+}
+
+func (c *WebSocketConn) Close() error {
+ return c.c.Close()
+}
+
+func (c *WebSocketConn) LocalAddr() net.Addr {
+ return c.c.LocalAddr()
+}
+
+func (c *WebSocketConn) RemoteAddr() net.Addr {
+ return c.c.RemoteAddr()
+}
+
+func (c *WebSocketConn) SetDeadline(t time.Time) error {
+ if err := c.SetReadDeadline(t); err != nil {
+ return err
+ }
+ if err := c.SetWriteDeadline(t); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (c *WebSocketConn) SetReadDeadline(t time.Time) error {
+ return c.c.SetReadDeadline(t)
+}
+
+func (c *WebSocketConn) SetWriteDeadline(t time.Time) error {
+ return c.c.SetWriteDeadline(t)
+}
diff --git a/cmd/dendrite-demo-pinecone/embed/embed_other.go b/cmd/dendrite-demo-pinecone/embed/embed_other.go
new file mode 100644
index 00000000..59888114
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/embed/embed_other.go
@@ -0,0 +1,9 @@
+// +build !riotweb
+
+package embed
+
+import "github.com/gorilla/mux"
+
+func Embed(_ *mux.Router, _ int, _ string) {
+
+}
diff --git a/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go b/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go
new file mode 100644
index 00000000..d25745ca
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go
@@ -0,0 +1,83 @@
+// +build riotweb
+
+package embed
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "regexp"
+
+ "github.com/gorilla/mux"
+ "github.com/tidwall/sjson"
+)
+
+// From within the Riot Web directory:
+// go run github.com/mjibson/esc -o /path/to/dendrite/internal/embed/fs_riotweb.go -private -pkg embed .
+
+var cssFile = regexp.MustCompile("\\.css$")
+var jsFile = regexp.MustCompile("\\.js$")
+
+type mimeFixingHandler struct {
+ fs http.Handler
+}
+
+func (h mimeFixingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ ruri := r.RequestURI
+ fmt.Println(ruri)
+ switch {
+ case cssFile.MatchString(ruri):
+ w.Header().Set("Content-Type", "text/css")
+ case jsFile.MatchString(ruri):
+ w.Header().Set("Content-Type", "application/javascript")
+ default:
+ }
+ h.fs.ServeHTTP(w, r)
+}
+
+func Embed(rootMux *mux.Router, listenPort int, serverName string) {
+ embeddedFS := _escFS(false)
+ embeddedServ := mimeFixingHandler{http.FileServer(embeddedFS)}
+
+ rootMux.NotFoundHandler = embeddedServ
+ rootMux.HandleFunc("/config.json", func(w http.ResponseWriter, r *http.Request) {
+ url := fmt.Sprintf("http://%s:%d", r.Header("Host"), listenPort)
+ configFile, err := embeddedFS.Open("/config.sample.json")
+ if err != nil {
+ w.WriteHeader(500)
+ io.WriteString(w, "Couldn't open the file: "+err.Error())
+ return
+ }
+ configFileInfo, err := configFile.Stat()
+ if err != nil {
+ w.WriteHeader(500)
+ io.WriteString(w, "Couldn't stat the file: "+err.Error())
+ return
+ }
+ buf := make([]byte, configFileInfo.Size())
+ n, err := configFile.Read(buf)
+ if err != nil {
+ w.WriteHeader(500)
+ io.WriteString(w, "Couldn't read the file: "+err.Error())
+ return
+ }
+ if int64(n) != configFileInfo.Size() {
+ w.WriteHeader(500)
+ io.WriteString(w, "The returned file size didn't match what we expected")
+ return
+ }
+ js, _ := sjson.SetBytes(buf, "default_server_config.m\\.homeserver.base_url", url)
+ js, _ = sjson.SetBytes(js, "default_server_config.m\\.homeserver.server_name", serverName)
+ js, _ = sjson.SetBytes(js, "brand", fmt.Sprintf("Riot %s", serverName))
+ js, _ = sjson.SetBytes(js, "disable_guests", true)
+ js, _ = sjson.SetBytes(js, "disable_3pid_login", true)
+ js, _ = sjson.DeleteBytes(js, "welcomeUserId")
+ _, _ = w.Write(js)
+ })
+
+ fmt.Println("*-------------------------------*")
+ fmt.Println("| This build includes Riot Web! |")
+ fmt.Println("*-------------------------------*")
+ fmt.Println("Point your browser to:", url)
+ fmt.Println()
+}
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go
new file mode 100644
index 00000000..46a533f0
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/main.go
@@ -0,0 +1,279 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/tls"
+ "encoding/hex"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "math"
+ "net"
+ "net/http"
+ "os"
+ "time"
+
+ "github.com/gorilla/mux"
+ "github.com/gorilla/websocket"
+ "github.com/matrix-org/dendrite/appservice"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
+ "github.com/matrix-org/dendrite/eduserver"
+ "github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/federationsender"
+ "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/httputil"
+ "github.com/matrix-org/dendrite/keyserver"
+ "github.com/matrix-org/dendrite/roomserver"
+ "github.com/matrix-org/dendrite/setup"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/userapi"
+ "github.com/matrix-org/gomatrixserverlib"
+ "go.uber.org/atomic"
+
+ pineconeMulticast "github.com/matrix-org/pinecone/multicast"
+ pineconeRouter "github.com/matrix-org/pinecone/router"
+ pineconeSessions "github.com/matrix-org/pinecone/sessions"
+ pineconeTypes "github.com/matrix-org/pinecone/types"
+
+ "github.com/sirupsen/logrus"
+)
+
+var (
+ instanceName = flag.String("name", "dendrite-p2p-pinecone", "the name of this P2P demo instance")
+ instancePort = flag.Int("port", 8008, "the port that the client API will listen on")
+ instancePeer = flag.String("peer", "", "the static Pinecone peer to connect to")
+ instanceListen = flag.String("listen", ":0", "the port Pinecone peers can connect to")
+)
+
+// nolint:gocyclo
+func main() {
+ flag.Parse()
+ internal.SetupPprof()
+
+ var pk ed25519.PublicKey
+ var sk ed25519.PrivateKey
+
+ keyfile := *instanceName + ".key"
+ if _, err := os.Stat(keyfile); os.IsNotExist(err) {
+ if pk, sk, err = ed25519.GenerateKey(nil); err != nil {
+ panic(err)
+ }
+ if err = ioutil.WriteFile(keyfile, sk, 0644); err != nil {
+ panic(err)
+ }
+ } else if err == nil {
+ if sk, err = ioutil.ReadFile(keyfile); err != nil {
+ panic(err)
+ }
+ if len(sk) != ed25519.PrivateKeySize {
+ panic("the private key is not long enough")
+ }
+ pk = sk.Public().(ed25519.PublicKey)
+ }
+
+ logger := log.New(os.Stdout, "", 0)
+ pRouter := pineconeRouter.NewRouter(logger, "dendrite", sk, pk, nil)
+
+ go func() {
+ listener, err := net.Listen("tcp", *instanceListen)
+ if err != nil {
+ panic(err)
+ }
+
+ fmt.Println("Listening on", listener.Addr())
+
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ logrus.WithError(err).Error("listener.Accept failed")
+ continue
+ }
+
+ port, err := pRouter.AuthenticatedConnect(conn, "", pineconeRouter.PeerTypeRemote)
+ if err != nil {
+ logrus.WithError(err).Error("pSwitch.AuthenticatedConnect failed")
+ continue
+ }
+
+ fmt.Println("Inbound connection", conn.RemoteAddr(), "is connected to port", port)
+ }
+ }()
+
+ pQUIC := pineconeSessions.NewSessions(logger, pRouter)
+ pMulticast := pineconeMulticast.NewMulticast(logger, pRouter)
+ pMulticast.Start()
+
+ var staticPeerAttempts atomic.Uint32
+ var connectToStaticPeer func()
+ connectToStaticPeer = func() {
+ uri := *instancePeer
+ if uri == "" {
+ return
+ }
+ if err := conn.ConnectToPeer(pRouter, uri); err != nil {
+ exp := time.Second * time.Duration(math.Exp2(float64(staticPeerAttempts.Inc())))
+ time.AfterFunc(exp, connectToStaticPeer)
+ } else {
+ staticPeerAttempts.Store(0)
+ }
+ }
+ pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) {
+ if peertype == pineconeRouter.PeerTypeRemote && err != nil {
+ staticPeerAttempts.Store(0)
+ time.AfterFunc(time.Second, connectToStaticPeer)
+ }
+ })
+ go connectToStaticPeer()
+
+ cfg := &config.Dendrite{}
+ cfg.Defaults()
+ cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk))
+ cfg.Global.PrivateKey = sk
+ cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
+ cfg.Global.Kafka.UseNaffka = true
+ cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
+ cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName))
+ cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
+ cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName))
+ cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName))
+ cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-signingkeyserver.db", *instanceName))
+ cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName))
+ cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
+ cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
+ cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
+ if err := cfg.Derive(); err != nil {
+ panic(err)
+ }
+
+ base := setup.NewBaseDendrite(cfg, "Monolith", false)
+ defer base.Close() // nolint: errcheck
+
+ accountDB := base.CreateAccountsDB()
+ federation := conn.CreateFederationClient(base, pQUIC)
+
+ serverKeyAPI := &signing.YggdrasilKeys{}
+ keyRing := serverKeyAPI.KeyRing()
+
+ rsComponent := roomserver.NewInternalAPI(
+ base, keyRing,
+ )
+ rsAPI := rsComponent
+ fsAPI := federationsender.NewInternalAPI(
+ base, federation, rsAPI, keyRing,
+ )
+
+ keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI)
+ userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI)
+ keyAPI.SetUserAPI(userAPI)
+
+ eduInputAPI := eduserver.NewInternalAPI(
+ base, cache.New(), userAPI,
+ )
+
+ asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
+
+ rsComponent.SetFederationSenderAPI(fsAPI)
+
+ monolith := setup.Monolith{
+ Config: base.Cfg,
+ AccountDB: accountDB,
+ Client: conn.CreateClient(base, pQUIC),
+ FedClient: federation,
+ KeyRing: keyRing,
+
+ AppserviceAPI: asAPI,
+ EDUInternalAPI: eduInputAPI,
+ FederationSenderAPI: fsAPI,
+ RoomserverAPI: rsAPI,
+ UserAPI: userAPI,
+ KeyAPI: keyAPI,
+ ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation),
+ }
+ monolith.AddAllPublicRoutes(
+ base.ProcessContext,
+ base.PublicClientAPIMux,
+ base.PublicFederationAPIMux,
+ base.PublicKeyAPIMux,
+ base.PublicMediaAPIMux,
+ )
+
+ wsUpgrader := websocket.Upgrader{}
+ httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath()
+ httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux)
+ httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux)
+ httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
+ httpRouter.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
+ c, err := wsUpgrader.Upgrade(w, r, nil)
+ if err != nil {
+ logrus.WithError(err).Error("Failed to upgrade WebSocket connection")
+ return
+ }
+ conn := conn.WrapWebSocketConn(c)
+ if _, err = pRouter.AuthenticatedConnect(conn, "websocket", pineconeRouter.PeerTypeRemote); err != nil {
+ logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
+ }
+ })
+ embed.Embed(httpRouter, *instancePort, "Pinecone Demo")
+
+ pMux := mux.NewRouter().SkipClean(true).UseEncodedPath()
+ pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux)
+ pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux)
+
+ pHTTP := pQUIC.HTTP()
+ pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux)
+ pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux)
+
+ // Build both ends of a HTTP multiplex.
+ httpServer := &http.Server{
+ Addr: ":0",
+ TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
+ ReadTimeout: 10 * time.Second,
+ WriteTimeout: 10 * time.Second,
+ IdleTimeout: 60 * time.Second,
+ BaseContext: func(_ net.Listener) context.Context {
+ return context.Background()
+ },
+ Handler: pMux,
+ }
+
+ go func() {
+ pubkey := pRouter.PublicKey()
+ logrus.Info("Listening on ", hex.EncodeToString(pubkey[:]))
+ logrus.Fatal(httpServer.Serve(pQUIC))
+ }()
+ go func() {
+ httpBindAddr := fmt.Sprintf(":%d", *instancePort)
+ logrus.Info("Listening on ", httpBindAddr)
+ logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter))
+ }()
+ go func() {
+ logrus.Info("Sending wake-up message to known nodes")
+ req := &api.PerformBroadcastEDURequest{}
+ res := &api.PerformBroadcastEDUResponse{}
+ if err := fsAPI.PerformBroadcastEDU(context.TODO(), req, res); err != nil {
+ logrus.WithError(err).Error("Failed to send wake-up message to known nodes")
+ }
+ }()
+
+ base.WaitForShutdown()
+}
diff --git a/cmd/dendrite-demo-pinecone/rooms/rooms.go b/cmd/dendrite-demo-pinecone/rooms/rooms.go
new file mode 100644
index 00000000..002e4393
--- /dev/null
+++ b/cmd/dendrite-demo-pinecone/rooms/rooms.go
@@ -0,0 +1,150 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rooms
+
+import (
+ "context"
+ "crypto/ed25519"
+ "encoding/hex"
+ "sync"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+
+ pineconeRouter "github.com/matrix-org/pinecone/router"
+ pineconeSessions "github.com/matrix-org/pinecone/sessions"
+)
+
+const pineconeRoomAttempts = 3
+
+type PineconeRoomProvider struct {
+ r *pineconeRouter.Router
+ s *pineconeSessions.Sessions
+ fedSender api.FederationSenderInternalAPI
+ fedClient *gomatrixserverlib.FederationClient
+}
+
+func NewPineconeRoomProvider(
+ r *pineconeRouter.Router,
+ s *pineconeSessions.Sessions,
+ fedSender api.FederationSenderInternalAPI,
+ fedClient *gomatrixserverlib.FederationClient,
+) *PineconeRoomProvider {
+ p := &PineconeRoomProvider{
+ r: r,
+ s: s,
+ fedSender: fedSender,
+ fedClient: fedClient,
+ }
+ return p
+}
+
+func (p *PineconeRoomProvider) Rooms() []gomatrixserverlib.PublicRoom {
+ known := []ed25519.PublicKey{}
+ for _, k := range p.r.KnownNodes() {
+ known = append(known, k[:])
+ }
+ known = append(known, p.s.Sessions()...)
+ list := []gomatrixserverlib.ServerName{}
+ for _, k := range known {
+ if len(k) == ed25519.PublicKeySize {
+ list = append(list, gomatrixserverlib.ServerName(hex.EncodeToString(k)))
+ }
+ }
+ return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, list)
+}
+
+// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers.
+// Returns a list of public rooms.
+func bulkFetchPublicRoomsFromServers(
+ ctx context.Context, fedClient *gomatrixserverlib.FederationClient,
+ homeservers []gomatrixserverlib.ServerName,
+) (publicRooms []gomatrixserverlib.PublicRoom) {
+ limit := 200
+ // follow pipeline semantics, see https://blog.golang.org/pipelines for more info.
+ // goroutines send rooms to this channel
+ roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit))
+ // signalling channel to tell goroutines to stop sending rooms and quit
+ done := make(chan bool)
+ // signalling to say when we can close the room channel
+ var wg sync.WaitGroup
+ wg.Add(len(homeservers))
+ // concurrently query for public rooms
+ for _, hs := range homeservers {
+ go func(homeserverDomain gomatrixserverlib.ServerName) {
+ defer wg.Done()
+ util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms")
+ var fres gomatrixserverlib.RespPublicRooms
+ var err error
+ for i := 0; i < pineconeRoomAttempts; i++ {
+ fres, err = fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "")
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn(
+ "bulkFetchPublicRoomsFromServers: failed to query hs",
+ )
+ if i == pineconeRoomAttempts-1 {
+ return
+ }
+ } else {
+ break
+ }
+ }
+ for _, room := range fres.Chunk {
+ // atomically send a room or stop
+ select {
+ case roomCh <- room:
+ case <-done:
+ util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms")
+ return
+ }
+ }
+ }(hs)
+ }
+
+ // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request.
+ // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be
+ // closed.
+ go func() {
+ wg.Wait()
+ util.GetLogger(ctx).Info("Cleaning up resources")
+ close(roomCh)
+ }()
+
+ // fan-in results with timeout. We stop when we reach the limit.
+FanIn:
+ for len(publicRooms) < int(limit) || limit == 0 {
+ // add a room or timeout
+ select {
+ case room, ok := <-roomCh:
+ if !ok {
+ util.GetLogger(ctx).Info("All homeservers have been queried, returning results.")
+ break FanIn
+ }
+ publicRooms = append(publicRooms, room)
+ case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got.
+ util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early")
+ break FanIn
+ case <-ctx.Done(): // the client hung up on us, let's stop.
+ util.GetLogger(ctx).Info("Client hung up, returning early")
+ break FanIn
+ }
+ }
+ // tell goroutines to stop
+ close(done)
+
+ return publicRooms
+}
diff --git a/cmd/dendrite-demo-yggdrasil/README.md b/cmd/dendrite-demo-yggdrasil/README.md
index 148b9a58..33df7e60 100644
--- a/cmd/dendrite-demo-yggdrasil/README.md
+++ b/cmd/dendrite-demo-yggdrasil/README.md
@@ -1,6 +1,6 @@
# Yggdrasil Demo
-This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.13 or later.
+This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.14 or later.
To run the homeserver, start at the root of the Dendrite repository and run: