diff options
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/main.go | 22 | ||||
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/yggconn/client.go | 12 | ||||
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/yggconn/node.go | 54 | ||||
-rw-r--r-- | cmd/dendrite-demo-yggdrasil/yggconn/session.go | 194 |
4 files changed, 214 insertions, 68 deletions
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 8f6b0eaf..81bf994b 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -72,7 +72,7 @@ func main() { cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName()) cfg.Matrix.PrivateKey = ygg.SigningPrivateKey() cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID) - cfg.Matrix.FederationMaxRetries = 6 + cfg.Matrix.FederationMaxRetries = 8 cfg.Kafka.UseNaffka = true cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" cfg.Kafka.Topics.OutputClientData = "clientapiOutput" @@ -83,7 +83,7 @@ func main() { cfg.Database.SyncAPI = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) cfg.Database.RoomServer = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName)) - cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName)) + cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName)) cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName)) @@ -122,6 +122,18 @@ func main() { base, federation, rsAPI, stateAPI, keyRing, ) + ygg.SetSessionFunc(func(address string) { + req := &api.PerformServersAliveRequest{ + Servers: []gomatrixserverlib.ServerName{ + gomatrixserverlib.ServerName(address), + }, + } + res := &api.PerformServersAliveResponse{} + if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil { + logrus.WithError(err).Error("Failed to send wake-up message to newly connected node") + } + }) + rsComponent.SetFederationSenderAPI(fsAPI) embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo") @@ -162,9 +174,9 @@ func main() { httpServer := &http.Server{ Addr: ":0", TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, - ReadTimeout: 15 * time.Second, - WriteTimeout: 45 * time.Second, - IdleTimeout: 60 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 30 * time.Second, BaseContext: func(_ net.Listener) context.Context { return context.Background() }, diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/client.go b/cmd/dendrite-demo-yggdrasil/yggconn/client.go index c5b3eb72..56afe264 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/client.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/client.go @@ -24,9 +24,11 @@ func (n *Node) CreateClient( tr.RegisterProtocol( "matrix", &yggroundtripper{ inner: &http.Transport{ - TLSHandshakeTimeout: 20 * time.Second, + MaxIdleConns: -1, + MaxIdleConnsPerHost: -1, + TLSHandshakeTimeout: 10 * time.Second, ResponseHeaderTimeout: 10 * time.Second, - IdleConnTimeout: 60 * time.Second, + IdleConnTimeout: 30 * time.Second, DialContext: n.DialerContext, }, }, @@ -41,9 +43,11 @@ func (n *Node) CreateFederationClient( tr.RegisterProtocol( "matrix", &yggroundtripper{ inner: &http.Transport{ - TLSHandshakeTimeout: 20 * time.Second, + MaxIdleConns: -1, + MaxIdleConnsPerHost: -1, + TLSHandshakeTimeout: 10 * time.Second, ResponseHeaderTimeout: 10 * time.Second, - IdleConnTimeout: 60 * time.Second, + IdleConnTimeout: 30 * time.Second, DialContext: n.DialerContext, TLSClientConfig: n.tlsConfig, }, diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index 2ed15b3a..9b123aa6 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -32,6 +32,7 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert" "github.com/matrix-org/gomatrixserverlib" + "go.uber.org/atomic" yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config" yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast" @@ -41,17 +42,20 @@ import ( ) type Node struct { - core *yggdrasil.Core - config *yggdrasilconfig.NodeConfig - state *yggdrasilconfig.NodeState - multicast *yggdrasilmulticast.Multicast - log *gologme.Logger - listener quic.Listener - tlsConfig *tls.Config - quicConfig *quic.Config - sessions sync.Map // string -> quic.Session - incoming chan QUICStream - NewSession func(remote gomatrixserverlib.ServerName) + core *yggdrasil.Core + config *yggdrasilconfig.NodeConfig + state *yggdrasilconfig.NodeState + multicast *yggdrasilmulticast.Multicast + log *gologme.Logger + listener quic.Listener + tlsConfig *tls.Config + quicConfig *quic.Config + sessions sync.Map // string -> *session + sessionCount atomic.Uint32 + sessionFunc func(address string) + coords sync.Map // string -> yggdrasil.Coords + incoming chan QUICStream + NewSession func(remote gomatrixserverlib.ServerName) } func (n *Node) Dialer(_, address string) (net.Conn, error) { @@ -90,6 +94,19 @@ func Setup(instanceName, storageDirectory string) (*Node, error) { } } + n.core.SetCoordChangeCallback(func(old, new yggdrasil.Coords) { + fmt.Println("COORDINATE CHANGE!") + fmt.Println("Old:", old) + fmt.Println("New:", new) + n.sessions.Range(func(k, v interface{}) bool { + if s, ok := v.(*session); ok { + fmt.Println("Killing session", k) + s.kill() + } + return true + }) + }) + n.config.Peers = []string{} n.config.AdminListen = "none" n.config.MulticastInterfaces = []string{} @@ -124,8 +141,9 @@ func Setup(instanceName, storageDirectory string) (*Node, error) { MaxIncomingUniStreams: 0, KeepAlive: true, MaxIdleTimeout: time.Minute * 30, - HandshakeTimeout: time.Second * 30, + HandshakeTimeout: time.Second * 15, } + copy(n.quicConfig.StatelessResetKey, n.EncryptionPublicKey()) n.log.Println("Public curve25519:", n.core.EncryptionPublicKey()) n.log.Println("Public ed25519:", n.core.SigningPublicKey()) @@ -173,17 +191,25 @@ func (n *Node) SigningPrivateKey() ed25519.PrivateKey { return ed25519.PrivateKey(privBytes) } +func (n *Node) SetSessionFunc(f func(address string)) { + n.sessionFunc = f +} + func (n *Node) PeerCount() int { return len(n.core.GetPeers()) - 1 } +func (n *Node) SessionCount() int { + return int(n.sessionCount.Load()) +} + func (n *Node) KnownNodes() []gomatrixserverlib.ServerName { nodemap := map[string]struct{}{ - "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{}, + "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": {}, } /* for _, peer := range n.core.GetSwitchPeers() { - nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{} + nodemap[hex.EncodeToString(peer.PublicKey[:])] = struct{}{} } */ n.sessions.Range(func(_, v interface{}) bool { diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go index 0d231f6d..0cf524d9 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go @@ -31,8 +31,32 @@ import ( "github.com/lucas-clemente/quic-go" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" ) +type session struct { + node *Node + session quic.Session + address string + context context.Context + cancel context.CancelFunc +} + +func (n *Node) newSession(sess quic.Session, address string) *session { + ctx, cancel := context.WithCancel(context.TODO()) + return &session{ + node: n, + session: sess, + address: address, + context: ctx, + cancel: cancel, + } +} + +func (s *session) kill() { + s.cancel() +} + func (n *Node) listenFromYgg() { var err error n.listener, err = quic.Listen( @@ -55,22 +79,31 @@ func (n *Node) listenFromYgg() { _ = session.CloseWithError(0, "expected a peer certificate") continue } - address := session.ConnectionState().PeerCertificates[0].Subject.CommonName + address := session.ConnectionState().PeerCertificates[0].DNSNames[0] n.log.Infoln("Accepted connection from", address) - go n.listenFromQUIC(session, address) + go n.newSession(session, address).listenFromQUIC() + go n.sessionFunc(address) } } -func (n *Node) listenFromQUIC(session quic.Session, address string) { - n.sessions.Store(address, session) - defer n.sessions.Delete(address) +func (s *session) listenFromQUIC() { + if existing, ok := s.node.sessions.Load(s.address); ok { + if existingSession, ok := existing.(*session); ok { + fmt.Println("Killing existing session to replace", s.address) + existingSession.kill() + } + } + s.node.sessionCount.Inc() + s.node.sessions.Store(s.address, s) + defer s.node.sessions.Delete(s.address) + defer s.node.sessionCount.Dec() for { - st, err := session.AcceptStream(context.TODO()) + st, err := s.session.AcceptStream(s.context) if err != nil { - n.log.Println("session.AcceptStream:", err) + s.node.log.Println("session.AcceptStream:", err) return } - n.incoming <- QUICStream{st, session} + s.node.incoming <- QUICStream{st, s.session} } } @@ -95,53 +128,124 @@ func (n *Node) Dial(network, address string) (net.Conn, error) { } // Implements http.Transport.DialContext +// nolint:gocyclo func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) { s, ok1 := n.sessions.Load(address) - session, ok2 := s.(quic.Session) - if !ok1 || !ok2 || (ok1 && ok2 && session.ConnectionState().HandshakeComplete) { - dest, err := hex.DecodeString(address) - if err != nil { - return nil, err - } - if len(dest) != crypto.BoxPubKeyLen { - return nil, errors.New("invalid key length supplied") - } - var pubKey crypto.BoxPubKey - copy(pubKey[:], dest) - nodeID := crypto.GetNodeID(&pubKey) - nodeMask := &crypto.NodeID{} - for i := range nodeMask { - nodeMask[i] = 0xFF + session, ok2 := s.(*session) + if !ok1 || !ok2 { + // First of all, check if we think we know the coords of this + // node. If we do then we'll try to dial to it directly. This + // will either succeed or fail. + if v, ok := n.coords.Load(address); ok { + coords, ok := v.(yggdrasil.Coords) + if !ok { + n.coords.Delete(address) + return nil, errors.New("should have found yggdrasil.Coords but didn't") + } + n.log.Infof("Coords %s for %q cached, trying to dial", coords.String(), address) + var err error + // We think we know the coords. Try to dial the node. + if session, err = n.tryDial(address, coords); err != nil { + // We thought we knew the coords but it didn't result + // in a successful dial. Nuke them from the cache. + n.coords.Delete(address) + n.log.Infof("Cached coords %s for %q failed", coords.String(), address) + } } - fmt.Println("Resolving coords") - coords, err := n.core.Resolve(nodeID, nodeMask) - if err != nil { - return nil, fmt.Errorf("n.core.Resolve: %w", err) - } - fmt.Println("Found coords:", coords) - fmt.Println("Dialling") - - session, err = quic.Dial( - n.core, // yggdrasil.PacketConn - coords, // dial address - address, // dial SNI - n.tlsConfig, // TLS config - n.quicConfig, // QUIC config - ) - if err != nil { - n.log.Println("n.dialer.DialContext:", err) - return nil, err + // We either don't know the coords for the node, or we failed + // to dial it before, in which case try to resolve the coords. + if _, ok := n.coords.Load(address); !ok { + var coords yggdrasil.Coords + var err error + + // First look and see if the node is something that we already + // know about from our direct switch peers. + for _, peer := range n.core.GetSwitchPeers() { + if peer.PublicKey.String() == address { + coords = peer.Coords + n.log.Infof("%q is a direct peer, coords are %s", address, coords.String()) + n.coords.Store(address, coords) + break + } + } + + // If it isn' a node that we know directly then try to search + // the network. + if coords == nil { + n.log.Infof("Searching for coords for %q", address) + dest, derr := hex.DecodeString(address) + if derr != nil { + return nil, derr + } + if len(dest) != crypto.BoxPubKeyLen { + return nil, errors.New("invalid key length supplied") + } + var pubKey crypto.BoxPubKey + copy(pubKey[:], dest) + nodeID := crypto.GetNodeID(&pubKey) + nodeMask := &crypto.NodeID{} + for i := range nodeMask { + nodeMask[i] = 0xFF + } + + fmt.Println("Resolving coords") + coords, err = n.core.Resolve(nodeID, nodeMask) + if err != nil { + return nil, fmt.Errorf("n.core.Resolve: %w", err) + } + fmt.Println("Found coords:", coords) + n.coords.Store(address, coords) + } + + // We now know the coords in theory. Let's try dialling the + // node again. + if session, err = n.tryDial(address, coords); err != nil { + return nil, fmt.Errorf("n.tryDial: %w", err) + } } - fmt.Println("Dial OK") - go n.listenFromQUIC(session, address) } - st, err := session.OpenStream() + + if session == nil { + return nil, fmt.Errorf("should have found session but didn't") + } + + st, err := session.session.OpenStream() if err != nil { n.log.Println("session.OpenStream:", err) + _ = session.session.CloseWithError(0, "expected to be able to open session") return nil, err } - return QUICStream{st, session}, nil + return QUICStream{st, session.session}, nil +} + +func (n *Node) tryDial(address string, coords yggdrasil.Coords) (*session, error) { + quicSession, err := quic.Dial( + n.core, // yggdrasil.PacketConn + coords, // dial address + address, // dial SNI + n.tlsConfig, // TLS config + n.quicConfig, // QUIC config + ) + if err != nil { + return nil, err + } + if len(quicSession.ConnectionState().PeerCertificates) != 1 { + _ = quicSession.CloseWithError(0, "expected a peer certificate") + return nil, errors.New("didn't receive a peer certificate") + } + if len(quicSession.ConnectionState().PeerCertificates[0].DNSNames) != 1 { + _ = quicSession.CloseWithError(0, "expected a DNS name") + return nil, errors.New("didn't receive a DNS name") + } + if gotAddress := quicSession.ConnectionState().PeerCertificates[0].DNSNames[0]; address != gotAddress { + _ = quicSession.CloseWithError(0, "you aren't the host I was hoping for") + return nil, fmt.Errorf("expected %q but dialled %q", address, gotAddress) + } + session := n.newSession(quicSession, address) + go session.listenFromQUIC() + go n.sessionFunc(address) + return session, nil } func (n *Node) generateTLSConfig() *tls.Config { |