aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-08-06 16:00:42 +0100
committerGitHub <noreply@github.com>2020-08-06 16:00:42 +0100
commitb7491aae03324b722042ab685013003ad483d401 (patch)
tree487c556480e4df66e29214bbee291c5e168cb912 /cmd
parent642f9cb964b20f52133e11c52e40733f7bc07320 (diff)
Yggdrasil demo updates (#1241)
* PerformServersAlive in PerformBroadcastEDU * Don't double-pointer * More reliable QUIC session handling * Direct peer lookup, other tweaks * Tweaks * Try to wake up queues on incoming QUIC session * Set session callbak on gobind build * Fix incoming session storage * Stateless reset, other tweaks * Reset sessions when coordinates change * Disable HTTP connection reuse, tweak timeouts
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go22
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggconn/client.go12
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggconn/node.go54
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggconn/session.go194
4 files changed, 214 insertions, 68 deletions
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index 8f6b0eaf..81bf994b 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -72,7 +72,7 @@ func main() {
cfg.Matrix.ServerName = gomatrixserverlib.ServerName(ygg.DerivedServerName())
cfg.Matrix.PrivateKey = ygg.SigningPrivateKey()
cfg.Matrix.KeyID = gomatrixserverlib.KeyID(signing.KeyID)
- cfg.Matrix.FederationMaxRetries = 6
+ cfg.Matrix.FederationMaxRetries = 8
cfg.Kafka.UseNaffka = true
cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput"
cfg.Kafka.Topics.OutputClientData = "clientapiOutput"
@@ -83,7 +83,7 @@ func main() {
cfg.Database.SyncAPI = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName))
cfg.Database.RoomServer = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName))
cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
- cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName))
+ cfg.Database.E2EKey = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName))
cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.Database.CurrentState = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
@@ -122,6 +122,18 @@ func main() {
base, federation, rsAPI, stateAPI, keyRing,
)
+ ygg.SetSessionFunc(func(address string) {
+ req := &api.PerformServersAliveRequest{
+ Servers: []gomatrixserverlib.ServerName{
+ gomatrixserverlib.ServerName(address),
+ },
+ }
+ res := &api.PerformServersAliveResponse{}
+ if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil {
+ logrus.WithError(err).Error("Failed to send wake-up message to newly connected node")
+ }
+ })
+
rsComponent.SetFederationSenderAPI(fsAPI)
embed.Embed(base.BaseMux, *instancePort, "Yggdrasil Demo")
@@ -162,9 +174,9 @@ func main() {
httpServer := &http.Server{
Addr: ":0",
TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){},
- ReadTimeout: 15 * time.Second,
- WriteTimeout: 45 * time.Second,
- IdleTimeout: 60 * time.Second,
+ ReadTimeout: 10 * time.Second,
+ WriteTimeout: 10 * time.Second,
+ IdleTimeout: 30 * time.Second,
BaseContext: func(_ net.Listener) context.Context {
return context.Background()
},
diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/client.go b/cmd/dendrite-demo-yggdrasil/yggconn/client.go
index c5b3eb72..56afe264 100644
--- a/cmd/dendrite-demo-yggdrasil/yggconn/client.go
+++ b/cmd/dendrite-demo-yggdrasil/yggconn/client.go
@@ -24,9 +24,11 @@ func (n *Node) CreateClient(
tr.RegisterProtocol(
"matrix", &yggroundtripper{
inner: &http.Transport{
- TLSHandshakeTimeout: 20 * time.Second,
+ MaxIdleConns: -1,
+ MaxIdleConnsPerHost: -1,
+ TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
- IdleConnTimeout: 60 * time.Second,
+ IdleConnTimeout: 30 * time.Second,
DialContext: n.DialerContext,
},
},
@@ -41,9 +43,11 @@ func (n *Node) CreateFederationClient(
tr.RegisterProtocol(
"matrix", &yggroundtripper{
inner: &http.Transport{
- TLSHandshakeTimeout: 20 * time.Second,
+ MaxIdleConns: -1,
+ MaxIdleConnsPerHost: -1,
+ TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
- IdleConnTimeout: 60 * time.Second,
+ IdleConnTimeout: 30 * time.Second,
DialContext: n.DialerContext,
TLSClientConfig: n.tlsConfig,
},
diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go
index 2ed15b3a..9b123aa6 100644
--- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go
+++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go
@@ -32,6 +32,7 @@ import (
"github.com/lucas-clemente/quic-go"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
"github.com/matrix-org/gomatrixserverlib"
+ "go.uber.org/atomic"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
@@ -41,17 +42,20 @@ import (
)
type Node struct {
- core *yggdrasil.Core
- config *yggdrasilconfig.NodeConfig
- state *yggdrasilconfig.NodeState
- multicast *yggdrasilmulticast.Multicast
- log *gologme.Logger
- listener quic.Listener
- tlsConfig *tls.Config
- quicConfig *quic.Config
- sessions sync.Map // string -> quic.Session
- incoming chan QUICStream
- NewSession func(remote gomatrixserverlib.ServerName)
+ core *yggdrasil.Core
+ config *yggdrasilconfig.NodeConfig
+ state *yggdrasilconfig.NodeState
+ multicast *yggdrasilmulticast.Multicast
+ log *gologme.Logger
+ listener quic.Listener
+ tlsConfig *tls.Config
+ quicConfig *quic.Config
+ sessions sync.Map // string -> *session
+ sessionCount atomic.Uint32
+ sessionFunc func(address string)
+ coords sync.Map // string -> yggdrasil.Coords
+ incoming chan QUICStream
+ NewSession func(remote gomatrixserverlib.ServerName)
}
func (n *Node) Dialer(_, address string) (net.Conn, error) {
@@ -90,6 +94,19 @@ func Setup(instanceName, storageDirectory string) (*Node, error) {
}
}
+ n.core.SetCoordChangeCallback(func(old, new yggdrasil.Coords) {
+ fmt.Println("COORDINATE CHANGE!")
+ fmt.Println("Old:", old)
+ fmt.Println("New:", new)
+ n.sessions.Range(func(k, v interface{}) bool {
+ if s, ok := v.(*session); ok {
+ fmt.Println("Killing session", k)
+ s.kill()
+ }
+ return true
+ })
+ })
+
n.config.Peers = []string{}
n.config.AdminListen = "none"
n.config.MulticastInterfaces = []string{}
@@ -124,8 +141,9 @@ func Setup(instanceName, storageDirectory string) (*Node, error) {
MaxIncomingUniStreams: 0,
KeepAlive: true,
MaxIdleTimeout: time.Minute * 30,
- HandshakeTimeout: time.Second * 30,
+ HandshakeTimeout: time.Second * 15,
}
+ copy(n.quicConfig.StatelessResetKey, n.EncryptionPublicKey())
n.log.Println("Public curve25519:", n.core.EncryptionPublicKey())
n.log.Println("Public ed25519:", n.core.SigningPublicKey())
@@ -173,17 +191,25 @@ func (n *Node) SigningPrivateKey() ed25519.PrivateKey {
return ed25519.PrivateKey(privBytes)
}
+func (n *Node) SetSessionFunc(f func(address string)) {
+ n.sessionFunc = f
+}
+
func (n *Node) PeerCount() int {
return len(n.core.GetPeers()) - 1
}
+func (n *Node) SessionCount() int {
+ return int(n.sessionCount.Load())
+}
+
func (n *Node) KnownNodes() []gomatrixserverlib.ServerName {
nodemap := map[string]struct{}{
- "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{},
+ "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": {},
}
/*
for _, peer := range n.core.GetSwitchPeers() {
- nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{}
+ nodemap[hex.EncodeToString(peer.PublicKey[:])] = struct{}{}
}
*/
n.sessions.Range(func(_, v interface{}) bool {
diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go
index 0d231f6d..0cf524d9 100644
--- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go
+++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go
@@ -31,8 +31,32 @@ import (
"github.com/lucas-clemente/quic-go"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto"
+ "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
)
+type session struct {
+ node *Node
+ session quic.Session
+ address string
+ context context.Context
+ cancel context.CancelFunc
+}
+
+func (n *Node) newSession(sess quic.Session, address string) *session {
+ ctx, cancel := context.WithCancel(context.TODO())
+ return &session{
+ node: n,
+ session: sess,
+ address: address,
+ context: ctx,
+ cancel: cancel,
+ }
+}
+
+func (s *session) kill() {
+ s.cancel()
+}
+
func (n *Node) listenFromYgg() {
var err error
n.listener, err = quic.Listen(
@@ -55,22 +79,31 @@ func (n *Node) listenFromYgg() {
_ = session.CloseWithError(0, "expected a peer certificate")
continue
}
- address := session.ConnectionState().PeerCertificates[0].Subject.CommonName
+ address := session.ConnectionState().PeerCertificates[0].DNSNames[0]
n.log.Infoln("Accepted connection from", address)
- go n.listenFromQUIC(session, address)
+ go n.newSession(session, address).listenFromQUIC()
+ go n.sessionFunc(address)
}
}
-func (n *Node) listenFromQUIC(session quic.Session, address string) {
- n.sessions.Store(address, session)
- defer n.sessions.Delete(address)
+func (s *session) listenFromQUIC() {
+ if existing, ok := s.node.sessions.Load(s.address); ok {
+ if existingSession, ok := existing.(*session); ok {
+ fmt.Println("Killing existing session to replace", s.address)
+ existingSession.kill()
+ }
+ }
+ s.node.sessionCount.Inc()
+ s.node.sessions.Store(s.address, s)
+ defer s.node.sessions.Delete(s.address)
+ defer s.node.sessionCount.Dec()
for {
- st, err := session.AcceptStream(context.TODO())
+ st, err := s.session.AcceptStream(s.context)
if err != nil {
- n.log.Println("session.AcceptStream:", err)
+ s.node.log.Println("session.AcceptStream:", err)
return
}
- n.incoming <- QUICStream{st, session}
+ s.node.incoming <- QUICStream{st, s.session}
}
}
@@ -95,53 +128,124 @@ func (n *Node) Dial(network, address string) (net.Conn, error) {
}
// Implements http.Transport.DialContext
+// nolint:gocyclo
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
s, ok1 := n.sessions.Load(address)
- session, ok2 := s.(quic.Session)
- if !ok1 || !ok2 || (ok1 && ok2 && session.ConnectionState().HandshakeComplete) {
- dest, err := hex.DecodeString(address)
- if err != nil {
- return nil, err
- }
- if len(dest) != crypto.BoxPubKeyLen {
- return nil, errors.New("invalid key length supplied")
- }
- var pubKey crypto.BoxPubKey
- copy(pubKey[:], dest)
- nodeID := crypto.GetNodeID(&pubKey)
- nodeMask := &crypto.NodeID{}
- for i := range nodeMask {
- nodeMask[i] = 0xFF
+ session, ok2 := s.(*session)
+ if !ok1 || !ok2 {
+ // First of all, check if we think we know the coords of this
+ // node. If we do then we'll try to dial to it directly. This
+ // will either succeed or fail.
+ if v, ok := n.coords.Load(address); ok {
+ coords, ok := v.(yggdrasil.Coords)
+ if !ok {
+ n.coords.Delete(address)
+ return nil, errors.New("should have found yggdrasil.Coords but didn't")
+ }
+ n.log.Infof("Coords %s for %q cached, trying to dial", coords.String(), address)
+ var err error
+ // We think we know the coords. Try to dial the node.
+ if session, err = n.tryDial(address, coords); err != nil {
+ // We thought we knew the coords but it didn't result
+ // in a successful dial. Nuke them from the cache.
+ n.coords.Delete(address)
+ n.log.Infof("Cached coords %s for %q failed", coords.String(), address)
+ }
}
- fmt.Println("Resolving coords")
- coords, err := n.core.Resolve(nodeID, nodeMask)
- if err != nil {
- return nil, fmt.Errorf("n.core.Resolve: %w", err)
- }
- fmt.Println("Found coords:", coords)
- fmt.Println("Dialling")
-
- session, err = quic.Dial(
- n.core, // yggdrasil.PacketConn
- coords, // dial address
- address, // dial SNI
- n.tlsConfig, // TLS config
- n.quicConfig, // QUIC config
- )
- if err != nil {
- n.log.Println("n.dialer.DialContext:", err)
- return nil, err
+ // We either don't know the coords for the node, or we failed
+ // to dial it before, in which case try to resolve the coords.
+ if _, ok := n.coords.Load(address); !ok {
+ var coords yggdrasil.Coords
+ var err error
+
+ // First look and see if the node is something that we already
+ // know about from our direct switch peers.
+ for _, peer := range n.core.GetSwitchPeers() {
+ if peer.PublicKey.String() == address {
+ coords = peer.Coords
+ n.log.Infof("%q is a direct peer, coords are %s", address, coords.String())
+ n.coords.Store(address, coords)
+ break
+ }
+ }
+
+ // If it isn' a node that we know directly then try to search
+ // the network.
+ if coords == nil {
+ n.log.Infof("Searching for coords for %q", address)
+ dest, derr := hex.DecodeString(address)
+ if derr != nil {
+ return nil, derr
+ }
+ if len(dest) != crypto.BoxPubKeyLen {
+ return nil, errors.New("invalid key length supplied")
+ }
+ var pubKey crypto.BoxPubKey
+ copy(pubKey[:], dest)
+ nodeID := crypto.GetNodeID(&pubKey)
+ nodeMask := &crypto.NodeID{}
+ for i := range nodeMask {
+ nodeMask[i] = 0xFF
+ }
+
+ fmt.Println("Resolving coords")
+ coords, err = n.core.Resolve(nodeID, nodeMask)
+ if err != nil {
+ return nil, fmt.Errorf("n.core.Resolve: %w", err)
+ }
+ fmt.Println("Found coords:", coords)
+ n.coords.Store(address, coords)
+ }
+
+ // We now know the coords in theory. Let's try dialling the
+ // node again.
+ if session, err = n.tryDial(address, coords); err != nil {
+ return nil, fmt.Errorf("n.tryDial: %w", err)
+ }
}
- fmt.Println("Dial OK")
- go n.listenFromQUIC(session, address)
}
- st, err := session.OpenStream()
+
+ if session == nil {
+ return nil, fmt.Errorf("should have found session but didn't")
+ }
+
+ st, err := session.session.OpenStream()
if err != nil {
n.log.Println("session.OpenStream:", err)
+ _ = session.session.CloseWithError(0, "expected to be able to open session")
return nil, err
}
- return QUICStream{st, session}, nil
+ return QUICStream{st, session.session}, nil
+}
+
+func (n *Node) tryDial(address string, coords yggdrasil.Coords) (*session, error) {
+ quicSession, err := quic.Dial(
+ n.core, // yggdrasil.PacketConn
+ coords, // dial address
+ address, // dial SNI
+ n.tlsConfig, // TLS config
+ n.quicConfig, // QUIC config
+ )
+ if err != nil {
+ return nil, err
+ }
+ if len(quicSession.ConnectionState().PeerCertificates) != 1 {
+ _ = quicSession.CloseWithError(0, "expected a peer certificate")
+ return nil, errors.New("didn't receive a peer certificate")
+ }
+ if len(quicSession.ConnectionState().PeerCertificates[0].DNSNames) != 1 {
+ _ = quicSession.CloseWithError(0, "expected a DNS name")
+ return nil, errors.New("didn't receive a DNS name")
+ }
+ if gotAddress := quicSession.ConnectionState().PeerCertificates[0].DNSNames[0]; address != gotAddress {
+ _ = quicSession.CloseWithError(0, "you aren't the host I was hoping for")
+ return nil, fmt.Errorf("expected %q but dialled %q", address, gotAddress)
+ }
+ session := n.newSession(quicSession, address)
+ go session.listenFromQUIC()
+ go n.sessionFunc(address)
+ return session, nil
}
func (n *Node) generateTLSConfig() *tls.Config {