aboutsummaryrefslogtreecommitdiff
path: root/cmd/dendrite-demo-yggdrasil
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-07-02 17:43:07 +0100
committerGitHub <noreply@github.com>2020-07-02 17:43:07 +0100
commit38caf8e5b7623c090f8949076b57d769e42011ad (patch)
tree49d5434f09945bb7ea9313b153808f1a1abc6204 /cmd/dendrite-demo-yggdrasil
parent9c1f38621c4d787761092bc841e06ca424fbbf35 (diff)
Yggdrasil+QUIC demo, federation sender tweaks (#1177)
* Initial QUIC work * Update Yggdrasil demo * Make sure that the federation sender knows how many pending events are in the database when the worker starts * QUIC tunables * pprof * Don't spin * Set build info for Yggdrasil
Diffstat (limited to 'cmd/dendrite-demo-yggdrasil')
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go2
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggconn/node.go63
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggconn/session.go116
-rw-r--r--cmd/dendrite-demo-yggdrasil/yggconn/stream.go20
4 files changed, 127 insertions, 74 deletions
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index 7476d686..7a527d87 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
+ "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/setup"
@@ -50,6 +51,7 @@ var (
// nolint:gocyclo
func main() {
flag.Parse()
+ internal.SetupPprof()
ygg, err := yggconn.Setup(*instanceName, *instancePeer, ".")
if err != nil {
diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go
index 18d207a9..eb176493 100644
--- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go
+++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go
@@ -17,6 +17,7 @@ package yggconn
import (
"context"
"crypto/ed25519"
+ "crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
@@ -26,10 +27,11 @@ import (
"os"
"strings"
"sync"
+ "time"
+ "github.com/lucas-clemente/quic-go"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/convert"
- "github.com/libp2p/go-yamux"
yggdrasiladmin "github.com/yggdrasil-network/yggdrasil-go/src/admin"
yggdrasilconfig "github.com/yggdrasil-network/yggdrasil-go/src/config"
yggdrasilmulticast "github.com/yggdrasil-network/yggdrasil-go/src/multicast"
@@ -39,16 +41,26 @@ import (
)
type Node struct {
- core *yggdrasil.Core
- config *yggdrasilconfig.NodeConfig
- state *yggdrasilconfig.NodeState
- admin *yggdrasiladmin.AdminSocket
- multicast *yggdrasilmulticast.Multicast
- log *gologme.Logger
- listener *yggdrasil.Listener
- dialer *yggdrasil.Dialer
- sessions sync.Map // string -> yamux.Session
- incoming chan *yamux.Stream
+ core *yggdrasil.Core
+ config *yggdrasilconfig.NodeConfig
+ state *yggdrasilconfig.NodeState
+ admin *yggdrasiladmin.AdminSocket
+ multicast *yggdrasilmulticast.Multicast
+ log *gologme.Logger
+ packetConn *yggdrasil.PacketConn
+ listener quic.Listener
+ tlsConfig *tls.Config
+ quicConfig *quic.Config
+ sessions sync.Map // string -> quic.Session
+ incoming chan QUICStream
+}
+
+func (n *Node) BuildName() string {
+ return "dendrite"
+}
+
+func (n *Node) BuildVersion() string {
+ return "dev"
}
func (n *Node) Dialer(_, address string) (net.Conn, error) {
@@ -74,8 +86,9 @@ func Setup(instanceName, instancePeer, storageDirectory string) (*Node, error) {
admin: &yggdrasiladmin.AdminSocket{},
multicast: &yggdrasilmulticast.Multicast{},
log: gologme.New(os.Stdout, "YGG ", log.Flags()),
- incoming: make(chan *yamux.Stream),
+ incoming: make(chan QUICStream),
}
+ n.core.SetBuildInfo(n)
yggfile := fmt.Sprintf("%s/%s-yggdrasil.conf", storageDirectory, instanceName)
if _, err := os.Stat(yggfile); !os.IsNotExist(err) {
@@ -114,29 +127,21 @@ func Setup(instanceName, instancePeer, storageDirectory string) (*Node, error) {
panic(err)
}
}
- /*
- if err = n.admin.Init(n.core, n.state, n.log, nil); err != nil {
- panic(err)
- }
- if err = n.admin.Start(); err != nil {
- panic(err)
- }
- */
if err = n.multicast.Init(n.core, n.state, n.log, nil); err != nil {
panic(err)
}
if err = n.multicast.Start(); err != nil {
panic(err)
}
- //n.admin.SetupAdminHandlers(n.admin)
- //n.multicast.SetupAdminHandlers(n.admin)
- n.listener, err = n.core.ConnListen()
- if err != nil {
- panic(err)
- }
- n.dialer, err = n.core.ConnDialer()
- if err != nil {
- panic(err)
+
+ n.packetConn = n.core.PacketConn()
+ n.tlsConfig = n.generateTLSConfig()
+ n.quicConfig = &quic.Config{
+ MaxIncomingStreams: 0,
+ MaxIncomingUniStreams: 0,
+ KeepAlive: true,
+ MaxIdleTimeout: time.Second * 120,
+ HandshakeTimeout: time.Second * 30,
}
n.log.Println("Public curve25519:", n.core.EncryptionPublicKey())
diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go
index c50b6b73..857b2cc9 100644
--- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go
+++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go
@@ -16,60 +16,52 @@ package yggconn
import (
"context"
+ "crypto/rand"
+ "crypto/rsa"
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/hex"
+ "encoding/pem"
+ "errors"
+ "math/big"
"net"
- "strings"
"time"
- "github.com/libp2p/go-yamux"
+ "github.com/lucas-clemente/quic-go"
+ "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
)
-func (n *Node) yamuxConfig() *yamux.Config {
- cfg := yamux.DefaultConfig()
- cfg.EnableKeepAlive = false
- cfg.ConnectionWriteTimeout = time.Second * 15
- cfg.MaxMessageSize = 65535
- cfg.ReadBufSize = 655350
- return cfg
-}
-
func (n *Node) listenFromYgg() {
+ var err error
+ n.listener, err = quic.Listen(
+ n.packetConn, // yggdrasil.PacketConn
+ n.tlsConfig, // TLS config
+ n.quicConfig, // QUIC config
+ )
+ if err != nil {
+ panic(err)
+ }
+
for {
- conn, err := n.listener.Accept()
+ session, err := n.listener.Accept(context.TODO())
if err != nil {
n.log.Println("n.listener.Accept:", err)
return
}
- var session *yamux.Session
- // If the remote address is lower than ours then we'll be the
- // server. Otherwse we'll be the client.
- if strings.Compare(conn.RemoteAddr().String(), n.DerivedSessionName()) < 0 {
- session, err = yamux.Server(conn, n.yamuxConfig())
- } else {
- session, err = yamux.Client(conn, n.yamuxConfig())
- }
- if err != nil {
- return
- }
- go n.listenFromYggConn(session)
+ go n.listenFromQUIC(session)
}
}
-func (n *Node) listenFromYggConn(session *yamux.Session) {
+func (n *Node) listenFromQUIC(session quic.Session) {
n.sessions.Store(session.RemoteAddr().String(), session)
defer n.sessions.Delete(session.RemoteAddr())
- defer func() {
- if err := session.Close(); err != nil {
- n.log.Println("session.Close:", err)
- }
- }()
-
for {
- st, err := session.AcceptStream()
+ st, err := session.AcceptStream(context.TODO())
if err != nil {
n.log.Println("session.AcceptStream:", err)
return
}
- n.incoming <- st
+ n.incoming <- QUICStream{st, session}
}
}
@@ -96,29 +88,63 @@ func (n *Node) Dial(network, address string) (net.Conn, error) {
// Implements http.Transport.DialContext
func (n *Node) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
s, ok1 := n.sessions.Load(address)
- session, ok2 := s.(*yamux.Session)
- if !ok1 || !ok2 || (ok1 && ok2 && session.IsClosed()) {
- conn, err := n.dialer.DialContext(ctx, network, address)
+ session, ok2 := s.(quic.Session)
+ if !ok1 || !ok2 || (ok1 && ok2 && session.ConnectionState().HandshakeComplete) {
+ dest, err := hex.DecodeString(address)
if err != nil {
- n.log.Println("n.dialer.DialContext:", err)
return nil, err
}
- // If the remote address is lower than ours then we will be the
- // server. Otherwise we'll be the client.
- if strings.Compare(conn.RemoteAddr().String(), n.DerivedSessionName()) < 0 {
- session, err = yamux.Server(conn, n.yamuxConfig())
- } else {
- session, err = yamux.Client(conn, n.yamuxConfig())
+ if len(dest) != crypto.BoxPubKeyLen {
+ return nil, errors.New("invalid key length supplied")
}
+ var pubKey crypto.BoxPubKey
+ copy(pubKey[:], dest)
+
+ session, err = quic.Dial(
+ n.packetConn, // yggdrasil.PacketConn
+ &pubKey, // dial address
+ address, // dial SNI
+ n.tlsConfig, // TLS config
+ n.quicConfig, // QUIC config
+ )
if err != nil {
+ n.log.Println("n.dialer.DialContext:", err)
return nil, err
}
- go n.listenFromYggConn(session)
+ go n.listenFromQUIC(session)
}
st, err := session.OpenStream()
if err != nil {
n.log.Println("session.OpenStream:", err)
return nil, err
}
- return st, nil
+ return QUICStream{st, session}, nil
+}
+
+func (n *Node) generateTLSConfig() *tls.Config {
+ key, err := rsa.GenerateKey(rand.Reader, 1024)
+ if err != nil {
+ panic(err)
+ }
+ template := x509.Certificate{
+ SerialNumber: big.NewInt(1),
+ NotAfter: time.Now().Add(time.Hour * 24 * 365),
+ DNSNames: []string{n.DerivedSessionName()},
+ }
+ certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
+ if err != nil {
+ panic(err)
+ }
+ keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})
+ certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
+
+ tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
+ if err != nil {
+ panic(err)
+ }
+ return &tls.Config{
+ Certificates: []tls.Certificate{tlsCert},
+ NextProtos: []string{"quic-matrix-ygg"},
+ InsecureSkipVerify: true,
+ }
}
diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/stream.go b/cmd/dendrite-demo-yggdrasil/yggconn/stream.go
new file mode 100644
index 00000000..dac7447e
--- /dev/null
+++ b/cmd/dendrite-demo-yggdrasil/yggconn/stream.go
@@ -0,0 +1,20 @@
+package yggconn
+
+import (
+ "net"
+
+ "github.com/lucas-clemente/quic-go"
+)
+
+type QUICStream struct {
+ quic.Stream
+ session quic.Session
+}
+
+func (s QUICStream) LocalAddr() net.Addr {
+ return s.session.LocalAddr()
+}
+
+func (s QUICStream) RemoteAddr() net.Addr {
+ return s.session.RemoteAddr()
+}