diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-05-06 12:00:42 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-06 12:00:42 +0100 |
commit | 1002e87b60659291af964c6d07c3a9057a3ed9b7 (patch) | |
tree | 81ca40385ac4072826e05d46c0d0a1562f13fa4e /cmd | |
parent | 464b908bd0c13854b3f6b9a17467f39e0608dc08 (diff) |
Pinecone P2P demo (#1856)
* Pinecone demo
* Enable multicast, fix HTTP routing
* Fix multicast import
* Fix build
* Update Pinecone demo
* Fix the keys
* Tweaks
* Pinecone room directory support (early)
* Fix gobind-pinecone
* Add pinecone listener
* Fix public key value
* Use AuthenticatedConnect for dial
* Fix gobind-pinecone
* Stop panics
* Give fsAPI to keyserver
* Pinecone demo fixes
* Update gobind build scripts
* Account creation
* Tweaks
* Setup tweaks
* API tweaks
* API tweaks
* API tweaks
* Port mutex
* Re-enable multicast
* Add ReadCopy
* Update quic-go, fixes
* Shutdowns fixed for iOS
* Update build script
* Add WebSocket support
* Bug fixes
* Netconn context
* Fix WebSocket connectivity
* Fixes to gobind API
* Strip frameworks
* Configurability updates
* Update go.mod
* Update go.mod/go.sum
* Update go.mod/go.sum
* Update go.mod/go.sum
* Try to stay connected tto static peer
* Update gobind-pinecone
* Update go.mod/go.sum
* Test uTP+TLS
* Use HTTP/2
* Don't use HTTP/2
* Update go.mod/go.sum
* Attempt to reconnect to the static peer if it drops
* Stay connected to static peers more stickily
* Retry room directory lookups if they fail
* NewQUIC -> NewSessions
* Storage updates
* Don't return immediately when there's nothing to sync
* Updates
* Try to reconnect to static peer more
* Update go.mod/go.sum
* Require Go 1.14
* Update go.mod/go.sum
* Update go.mod/go.sum
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/dendrite-demo-pinecone/conn/client.go | 91 | ||||
-rw-r--r-- | cmd/dendrite-demo-pinecone/conn/ws.go | 81 | ||||
-rw-r--r-- | cmd/dendrite-demo-pinecone/embed/embed_other.go | 9 | ||||
-rw-r--r-- | cmd/dendrite-demo-pinecone/embed/embed_riotweb.go | 83 | ||||
-rw-r--r-- | cmd/dendrite-demo-pinecone/main.go | 279 | ||||
-rw-r--r-- | cmd/dendrite-demo-pinecone/rooms/rooms.go | 150 | ||||
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/README.md | 2 |
7 files changed, 694 insertions, 1 deletions
diff --git a/cmd/dendrite-demo-pinecone/conn/client.go b/cmd/dendrite-demo-pinecone/conn/client.go new file mode 100644 index 00000000..bf23085d --- /dev/null +++ b/cmd/dendrite-demo-pinecone/conn/client.go @@ -0,0 +1,91 @@ +package conn + +import ( + "fmt" + "net" + "net/http" + "strings" + + "github.com/gorilla/websocket" + "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/gomatrixserverlib" + + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeSessions "github.com/matrix-org/pinecone/sessions" +) + +func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error { + var parent net.Conn + if strings.HasPrefix(peer, "ws://") || strings.HasPrefix(peer, "wss://") { + c, _, err := websocket.DefaultDialer.Dial(peer, nil) + if err != nil { + return fmt.Errorf("websocket.DefaultDialer.Dial: %w", err) + } + parent = WrapWebSocketConn(c) + } else { + var err error + parent, err = net.Dial("tcp", peer) + if err != nil { + return fmt.Errorf("net.Dial: %w", err) + } + } + if parent == nil { + return fmt.Errorf("failed to wrap connection") + } + _, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote) + return err +} + +type RoundTripper struct { + inner *http.Transport +} + +func (y *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + req.URL.Scheme = "http" + return y.inner.RoundTrip(req) +} + +func CreateClient( + base *setup.BaseDendrite, s *pineconeSessions.Sessions, +) *gomatrixserverlib.Client { + tr := &http.Transport{} + tr.RegisterProtocol( + "matrix", &RoundTripper{ + inner: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 5, + Dial: s.Dial, + DialContext: s.DialContext, + DialTLS: s.DialTLS, + DialTLSContext: s.DialTLSContext, + }, + }, + ) + return gomatrixserverlib.NewClient( + gomatrixserverlib.WithTransport(tr), + ) +} + +func CreateFederationClient( + base *setup.BaseDendrite, s *pineconeSessions.Sessions, +) *gomatrixserverlib.FederationClient { + tr := &http.Transport{} + tr.RegisterProtocol( + "matrix", &RoundTripper{ + inner: &http.Transport{ + MaxIdleConns: 100, + MaxIdleConnsPerHost: 5, + Dial: s.Dial, + DialContext: s.DialContext, + DialTLS: s.DialTLS, + DialTLSContext: s.DialTLSContext, + }, + }, + ) + return gomatrixserverlib.NewFederationClient( + base.Cfg.Global.ServerName, + base.Cfg.Global.KeyID, + base.Cfg.Global.PrivateKey, + gomatrixserverlib.WithTransport(tr), + ) +} diff --git a/cmd/dendrite-demo-pinecone/conn/ws.go b/cmd/dendrite-demo-pinecone/conn/ws.go new file mode 100644 index 00000000..ef403e29 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/conn/ws.go @@ -0,0 +1,81 @@ +package conn + +import ( + "io" + "net" + "time" + + "github.com/gorilla/websocket" +) + +func WrapWebSocketConn(c *websocket.Conn) *WebSocketConn { + return &WebSocketConn{c: c} +} + +type WebSocketConn struct { + r io.Reader + c *websocket.Conn +} + +func (c *WebSocketConn) Write(p []byte) (int, error) { + err := c.c.WriteMessage(websocket.BinaryMessage, p) + if err != nil { + return 0, err + } + return len(p), nil +} + +func (c *WebSocketConn) Read(p []byte) (int, error) { + for { + if c.r == nil { + // Advance to next message. + var err error + _, c.r, err = c.c.NextReader() + if err != nil { + return 0, err + } + } + n, err := c.r.Read(p) + if err == io.EOF { + // At end of message. + c.r = nil + if n > 0 { + return n, nil + } else { + // No data read, continue to next message. + continue + } + } + return n, err + } +} + +func (c *WebSocketConn) Close() error { + return c.c.Close() +} + +func (c *WebSocketConn) LocalAddr() net.Addr { + return c.c.LocalAddr() +} + +func (c *WebSocketConn) RemoteAddr() net.Addr { + return c.c.RemoteAddr() +} + +func (c *WebSocketConn) SetDeadline(t time.Time) error { + if err := c.SetReadDeadline(t); err != nil { + return err + } + if err := c.SetWriteDeadline(t); err != nil { + return err + } + return nil +} + +func (c *WebSocketConn) SetReadDeadline(t time.Time) error { + return c.c.SetReadDeadline(t) +} + +func (c *WebSocketConn) SetWriteDeadline(t time.Time) error { + return c.c.SetWriteDeadline(t) +} diff --git a/cmd/dendrite-demo-pinecone/embed/embed_other.go b/cmd/dendrite-demo-pinecone/embed/embed_other.go new file mode 100644 index 00000000..59888114 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/embed/embed_other.go @@ -0,0 +1,9 @@ +// +build !riotweb + +package embed + +import "github.com/gorilla/mux" + +func Embed(_ *mux.Router, _ int, _ string) { + +} diff --git a/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go b/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go new file mode 100644 index 00000000..d25745ca --- /dev/null +++ b/cmd/dendrite-demo-pinecone/embed/embed_riotweb.go @@ -0,0 +1,83 @@ +// +build riotweb + +package embed + +import ( + "fmt" + "io" + "net/http" + "regexp" + + "github.com/gorilla/mux" + "github.com/tidwall/sjson" +) + +// From within the Riot Web directory: +// go run github.com/mjibson/esc -o /path/to/dendrite/internal/embed/fs_riotweb.go -private -pkg embed . + +var cssFile = regexp.MustCompile("\\.css$") +var jsFile = regexp.MustCompile("\\.js$") + +type mimeFixingHandler struct { + fs http.Handler +} + +func (h mimeFixingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ruri := r.RequestURI + fmt.Println(ruri) + switch { + case cssFile.MatchString(ruri): + w.Header().Set("Content-Type", "text/css") + case jsFile.MatchString(ruri): + w.Header().Set("Content-Type", "application/javascript") + default: + } + h.fs.ServeHTTP(w, r) +} + +func Embed(rootMux *mux.Router, listenPort int, serverName string) { + embeddedFS := _escFS(false) + embeddedServ := mimeFixingHandler{http.FileServer(embeddedFS)} + + rootMux.NotFoundHandler = embeddedServ + rootMux.HandleFunc("/config.json", func(w http.ResponseWriter, r *http.Request) { + url := fmt.Sprintf("http://%s:%d", r.Header("Host"), listenPort) + configFile, err := embeddedFS.Open("/config.sample.json") + if err != nil { + w.WriteHeader(500) + io.WriteString(w, "Couldn't open the file: "+err.Error()) + return + } + configFileInfo, err := configFile.Stat() + if err != nil { + w.WriteHeader(500) + io.WriteString(w, "Couldn't stat the file: "+err.Error()) + return + } + buf := make([]byte, configFileInfo.Size()) + n, err := configFile.Read(buf) + if err != nil { + w.WriteHeader(500) + io.WriteString(w, "Couldn't read the file: "+err.Error()) + return + } + if int64(n) != configFileInfo.Size() { + w.WriteHeader(500) + io.WriteString(w, "The returned file size didn't match what we expected") + return + } + js, _ := sjson.SetBytes(buf, "default_server_config.m\\.homeserver.base_url", url) + js, _ = sjson.SetBytes(js, "default_server_config.m\\.homeserver.server_name", serverName) + js, _ = sjson.SetBytes(js, "brand", fmt.Sprintf("Riot %s", serverName)) + js, _ = sjson.SetBytes(js, "disable_guests", true) + js, _ = sjson.SetBytes(js, "disable_3pid_login", true) + js, _ = sjson.DeleteBytes(js, "welcomeUserId") + _, _ = w.Write(js) + }) + + fmt.Println("*-------------------------------*") + fmt.Println("| This build includes Riot Web! |") + fmt.Println("*-------------------------------*") + fmt.Println("Point your browser to:", url) + fmt.Println() +} diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go new file mode 100644 index 00000000..46a533f0 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/main.go @@ -0,0 +1,279 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "crypto/ed25519" + "crypto/tls" + "encoding/hex" + "flag" + "fmt" + "io/ioutil" + "log" + "math" + "net" + "net/http" + "os" + "time" + + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/matrix-org/dendrite/appservice" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/embed" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/federationsender" + "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/roomserver" + "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/userapi" + "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" + + pineconeMulticast "github.com/matrix-org/pinecone/multicast" + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeSessions "github.com/matrix-org/pinecone/sessions" + pineconeTypes "github.com/matrix-org/pinecone/types" + + "github.com/sirupsen/logrus" +) + +var ( + instanceName = flag.String("name", "dendrite-p2p-pinecone", "the name of this P2P demo instance") + instancePort = flag.Int("port", 8008, "the port that the client API will listen on") + instancePeer = flag.String("peer", "", "the static Pinecone peer to connect to") + instanceListen = flag.String("listen", ":0", "the port Pinecone peers can connect to") +) + +// nolint:gocyclo +func main() { + flag.Parse() + internal.SetupPprof() + + var pk ed25519.PublicKey + var sk ed25519.PrivateKey + + keyfile := *instanceName + ".key" + if _, err := os.Stat(keyfile); os.IsNotExist(err) { + if pk, sk, err = ed25519.GenerateKey(nil); err != nil { + panic(err) + } + if err = ioutil.WriteFile(keyfile, sk, 0644); err != nil { + panic(err) + } + } else if err == nil { + if sk, err = ioutil.ReadFile(keyfile); err != nil { + panic(err) + } + if len(sk) != ed25519.PrivateKeySize { + panic("the private key is not long enough") + } + pk = sk.Public().(ed25519.PublicKey) + } + + logger := log.New(os.Stdout, "", 0) + pRouter := pineconeRouter.NewRouter(logger, "dendrite", sk, pk, nil) + + go func() { + listener, err := net.Listen("tcp", *instanceListen) + if err != nil { + panic(err) + } + + fmt.Println("Listening on", listener.Addr()) + + for { + conn, err := listener.Accept() + if err != nil { + logrus.WithError(err).Error("listener.Accept failed") + continue + } + + port, err := pRouter.AuthenticatedConnect(conn, "", pineconeRouter.PeerTypeRemote) + if err != nil { + logrus.WithError(err).Error("pSwitch.AuthenticatedConnect failed") + continue + } + + fmt.Println("Inbound connection", conn.RemoteAddr(), "is connected to port", port) + } + }() + + pQUIC := pineconeSessions.NewSessions(logger, pRouter) + pMulticast := pineconeMulticast.NewMulticast(logger, pRouter) + pMulticast.Start() + + var staticPeerAttempts atomic.Uint32 + var connectToStaticPeer func() + connectToStaticPeer = func() { + uri := *instancePeer + if uri == "" { + return + } + if err := conn.ConnectToPeer(pRouter, uri); err != nil { + exp := time.Second * time.Duration(math.Exp2(float64(staticPeerAttempts.Inc()))) + time.AfterFunc(exp, connectToStaticPeer) + } else { + staticPeerAttempts.Store(0) + } + } + pRouter.SetDisconnectedCallback(func(port pineconeTypes.SwitchPortID, public pineconeTypes.PublicKey, peertype int, err error) { + if peertype == pineconeRouter.PeerTypeRemote && err != nil { + staticPeerAttempts.Store(0) + time.AfterFunc(time.Second, connectToStaticPeer) + } + }) + go connectToStaticPeer() + + cfg := &config.Dendrite{} + cfg.Defaults() + cfg.Global.ServerName = gomatrixserverlib.ServerName(hex.EncodeToString(pk)) + cfg.Global.PrivateKey = sk + cfg.Global.KeyID = gomatrixserverlib.KeyID(signing.KeyID) + cfg.Global.Kafka.UseNaffka = true + cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) + cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) + cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) + cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) + cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) + cfg.SigningKeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-signingkeyserver.db", *instanceName)) + cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) + cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) + cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) + cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) + if err := cfg.Derive(); err != nil { + panic(err) + } + + base := setup.NewBaseDendrite(cfg, "Monolith", false) + defer base.Close() // nolint: errcheck + + accountDB := base.CreateAccountsDB() + federation := conn.CreateFederationClient(base, pQUIC) + + serverKeyAPI := &signing.YggdrasilKeys{} + keyRing := serverKeyAPI.KeyRing() + + rsComponent := roomserver.NewInternalAPI( + base, keyRing, + ) + rsAPI := rsComponent + fsAPI := federationsender.NewInternalAPI( + base, federation, rsAPI, keyRing, + ) + + keyAPI := keyserver.NewInternalAPI(&base.Cfg.KeyServer, fsAPI) + userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, nil, keyAPI) + keyAPI.SetUserAPI(userAPI) + + eduInputAPI := eduserver.NewInternalAPI( + base, cache.New(), userAPI, + ) + + asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI) + + rsComponent.SetFederationSenderAPI(fsAPI) + + monolith := setup.Monolith{ + Config: base.Cfg, + AccountDB: accountDB, + Client: conn.CreateClient(base, pQUIC), + FedClient: federation, + KeyRing: keyRing, + + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationSenderAPI: fsAPI, + RoomserverAPI: rsAPI, + UserAPI: userAPI, + KeyAPI: keyAPI, + ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pQUIC, fsAPI, federation), + } + monolith.AddAllPublicRoutes( + base.ProcessContext, + base.PublicClientAPIMux, + base.PublicFederationAPIMux, + base.PublicKeyAPIMux, + base.PublicMediaAPIMux, + ) + + wsUpgrader := websocket.Upgrader{} + httpRouter := mux.NewRouter().SkipClean(true).UseEncodedPath() + httpRouter.PathPrefix(httputil.InternalPathPrefix).Handler(base.InternalAPIMux) + httpRouter.PathPrefix(httputil.PublicClientPathPrefix).Handler(base.PublicClientAPIMux) + httpRouter.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) + httpRouter.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + c, err := wsUpgrader.Upgrade(w, r, nil) + if err != nil { + logrus.WithError(err).Error("Failed to upgrade WebSocket connection") + return + } + conn := conn.WrapWebSocketConn(c) + if _, err = pRouter.AuthenticatedConnect(conn, "websocket", pineconeRouter.PeerTypeRemote); err != nil { + logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch") + } + }) + embed.Embed(httpRouter, *instancePort, "Pinecone Demo") + + pMux := mux.NewRouter().SkipClean(true).UseEncodedPath() + pMux.PathPrefix(httputil.PublicFederationPathPrefix).Handler(base.PublicFederationAPIMux) + pMux.PathPrefix(httputil.PublicMediaPathPrefix).Handler(base.PublicMediaAPIMux) + + pHTTP := pQUIC.HTTP() + pHTTP.Mux().Handle(httputil.PublicFederationPathPrefix, pMux) + pHTTP.Mux().Handle(httputil.PublicMediaPathPrefix, pMux) + + // Build both ends of a HTTP multiplex. + httpServer := &http.Server{ + Addr: ":0", + TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 60 * time.Second, + BaseContext: func(_ net.Listener) context.Context { + return context.Background() + }, + Handler: pMux, + } + + go func() { + pubkey := pRouter.PublicKey() + logrus.Info("Listening on ", hex.EncodeToString(pubkey[:])) + logrus.Fatal(httpServer.Serve(pQUIC)) + }() + go func() { + httpBindAddr := fmt.Sprintf(":%d", *instancePort) + logrus.Info("Listening on ", httpBindAddr) + logrus.Fatal(http.ListenAndServe(httpBindAddr, httpRouter)) + }() + go func() { + logrus.Info("Sending wake-up message to known nodes") + req := &api.PerformBroadcastEDURequest{} + res := &api.PerformBroadcastEDUResponse{} + if err := fsAPI.PerformBroadcastEDU(context.TODO(), req, res); err != nil { + logrus.WithError(err).Error("Failed to send wake-up message to known nodes") + } + }() + + base.WaitForShutdown() +} diff --git a/cmd/dendrite-demo-pinecone/rooms/rooms.go b/cmd/dendrite-demo-pinecone/rooms/rooms.go new file mode 100644 index 00000000..002e4393 --- /dev/null +++ b/cmd/dendrite-demo-pinecone/rooms/rooms.go @@ -0,0 +1,150 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rooms + +import ( + "context" + "crypto/ed25519" + "encoding/hex" + "sync" + "time" + + "github.com/matrix-org/dendrite/federationsender/api" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + + pineconeRouter "github.com/matrix-org/pinecone/router" + pineconeSessions "github.com/matrix-org/pinecone/sessions" +) + +const pineconeRoomAttempts = 3 + +type PineconeRoomProvider struct { + r *pineconeRouter.Router + s *pineconeSessions.Sessions + fedSender api.FederationSenderInternalAPI + fedClient *gomatrixserverlib.FederationClient +} + +func NewPineconeRoomProvider( + r *pineconeRouter.Router, + s *pineconeSessions.Sessions, + fedSender api.FederationSenderInternalAPI, + fedClient *gomatrixserverlib.FederationClient, +) *PineconeRoomProvider { + p := &PineconeRoomProvider{ + r: r, + s: s, + fedSender: fedSender, + fedClient: fedClient, + } + return p +} + +func (p *PineconeRoomProvider) Rooms() []gomatrixserverlib.PublicRoom { + known := []ed25519.PublicKey{} + for _, k := range p.r.KnownNodes() { + known = append(known, k[:]) + } + known = append(known, p.s.Sessions()...) + list := []gomatrixserverlib.ServerName{} + for _, k := range known { + if len(k) == ed25519.PublicKeySize { + list = append(list, gomatrixserverlib.ServerName(hex.EncodeToString(k))) + } + } + return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, list) +} + +// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. +// Returns a list of public rooms. +func bulkFetchPublicRoomsFromServers( + ctx context.Context, fedClient *gomatrixserverlib.FederationClient, + homeservers []gomatrixserverlib.ServerName, +) (publicRooms []gomatrixserverlib.PublicRoom) { + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + for _, hs := range homeservers { + go func(homeserverDomain gomatrixserverlib.ServerName) { + defer wg.Done() + util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") + var fres gomatrixserverlib.RespPublicRooms + var err error + for i := 0; i < pineconeRoomAttempts; i++ { + fres, err = fedClient.GetPublicRooms(ctx, homeserverDomain, int(limit), "", false, "") + if err != nil { + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchPublicRoomsFromServers: failed to query hs", + ) + if i == pineconeRoomAttempts-1 { + return + } + } else { + break + } + } + for _, room := range fres.Chunk { + // atomically send a room or stop + select { + case roomCh <- room: + case <-done: + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") + return + } + } + }(hs) + } + + // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. + // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be + // closed. + go func() { + wg.Wait() + util.GetLogger(ctx).Info("Cleaning up resources") + close(roomCh) + }() + + // fan-in results with timeout. We stop when we reach the limit. +FanIn: + for len(publicRooms) < int(limit) || limit == 0 { + // add a room or timeout + select { + case room, ok := <-roomCh: + if !ok { + util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") + break FanIn + } + publicRooms = append(publicRooms, room) + case <-time.After(5 * time.Second): // we've waited long enough, let's tell the client what we got. + util.GetLogger(ctx).Info("Waited 5s for federated public rooms, returning early") + break FanIn + case <-ctx.Done(): // the client hung up on us, let's stop. + util.GetLogger(ctx).Info("Client hung up, returning early") + break FanIn + } + } + // tell goroutines to stop + close(done) + + return publicRooms +} diff --git a/cmd/dendrite-demo-yggdrasil/README.md b/cmd/dendrite-demo-yggdrasil/README.md index 148b9a58..33df7e60 100644 --- a/cmd/dendrite-demo-yggdrasil/README.md +++ b/cmd/dendrite-demo-yggdrasil/README.md @@ -1,6 +1,6 @@ # Yggdrasil Demo -This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.13 or later. +This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.14 or later. To run the homeserver, start at the root of the Dendrite repository and run: |