aboutsummaryrefslogtreecommitdiff
path: root/cmd
diff options
context:
space:
mode:
authorHilmar Gústafsson <LiHRaM@users.noreply.github.com>2020-04-14 17:15:59 +0200
committerGitHub <noreply@github.com>2020-04-14 16:15:59 +0100
commit73d2f59e303fa998f997c483bb6843bf77e069e5 (patch)
treea10ca5e3976f4ec1f3c1a87472dc475b10ad6af7 /cmd
parent48303d06cb91d19582f776af32ee22e5d820f031 (diff)
WIP: Add libp2p-go (#956)
* Add libp2p-go * Some tweaks, tidying up (cherry picked from commit 1a5bb121f8121c4f68a27abbf25a9a35a1b7c63e) * Move p2p dockerfile (cherry picked from commit 8d3bf44ea1bf37f950034e73bcdc315afdabe79a) * Remove containsBackwardsExtremity * Fix some linter errors, update some libp2p packages/calls, other tidying up * Add -port for dendrite-p2p-demo * Use instance name as key ID * Remove P2P demo docker stuff, no longer needed now that we have SQLite * Remove Dockerfile-p2p too * Remove p2p logic from dendrite-monolith-server * Inject publicRoomsDB in publicroomsapi Inject publicRoomsDB instead of switching on base.libP2P. See: https://github.com/matrix-org/dendrite/pull/956/files?file-filters%5B%5D=.go#r406276914 * Fix lint warning * Extract mDNSListener from base.go * Extract CreateFederationClient into demo * Create P2PDendrite from BaseDendrite Extract logic specific to P2PDendrite from base.go * Set base.go to upstream/master * Move pubsub to demo cmd * Move PostgreswithDHT to cmd * Remove unstable features * Add copyrights * Move libp2pvalidator into p2pdendrite * Rename dendrite-p2p-demo -> dendrite-demo-libp2p * Update copyrights * go mod tidy Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'cmd')
-rw-r--r--cmd/dendrite-demo-libp2p/main.go203
-rw-r--r--cmd/dendrite-demo-libp2p/mdnslistener.go63
-rw-r--r--cmd/dendrite-demo-libp2p/p2pdendrite.go126
-rw-r--r--cmd/dendrite-demo-libp2p/storage/postgreswithdht/storage.go164
-rw-r--r--cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go179
-rw-r--r--cmd/dendrite-demo-libp2p/storage/storage.go61
-rw-r--r--cmd/dendrite-monolith-server/main.go7
-rw-r--r--cmd/dendrite-public-rooms-api-server/main.go9
-rw-r--r--cmd/dendritejs/main.go7
9 files changed, 815 insertions, 4 deletions
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go
new file mode 100644
index 00000000..df3b48ad
--- /dev/null
+++ b/cmd/dendrite-demo-libp2p/main.go
@@ -0,0 +1,203 @@
+// Copyright 2017 Vector Creations Ltd
+// Copyright 2018 New Vector Ltd
+// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "crypto/ed25519"
+ "flag"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "time"
+
+ gostream "github.com/libp2p/go-libp2p-gostream"
+ p2phttp "github.com/libp2p/go-libp2p-http"
+ p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
+ "github.com/matrix-org/dendrite/appservice"
+ "github.com/matrix-org/dendrite/clientapi"
+ "github.com/matrix-org/dendrite/clientapi/producers"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
+ "github.com/matrix-org/dendrite/common"
+ "github.com/matrix-org/dendrite/common/config"
+ "github.com/matrix-org/dendrite/common/keydb"
+ "github.com/matrix-org/dendrite/common/transactions"
+ "github.com/matrix-org/dendrite/eduserver"
+ "github.com/matrix-org/dendrite/federationapi"
+ "github.com/matrix-org/dendrite/federationsender"
+ "github.com/matrix-org/dendrite/mediaapi"
+ "github.com/matrix-org/dendrite/publicroomsapi"
+ "github.com/matrix-org/dendrite/roomserver"
+ "github.com/matrix-org/dendrite/syncapi"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ "github.com/matrix-org/dendrite/eduserver/cache"
+
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/sirupsen/logrus"
+)
+
+func createKeyDB(
+ base *P2PDendrite,
+) keydb.Database {
+ db, err := keydb.NewDatabase(
+ string(base.Base.Cfg.Database.ServerKey),
+ base.Base.Cfg.Matrix.ServerName,
+ base.Base.Cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey),
+ base.Base.Cfg.Matrix.KeyID,
+ )
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to keys db")
+ }
+ mdns := mDNSListener{
+ host: base.LibP2P,
+ keydb: db,
+ }
+ serv, err := p2pdisc.NewMdnsService(
+ base.LibP2PContext,
+ base.LibP2P,
+ time.Second*10,
+ "_matrix-dendrite-p2p._tcp",
+ )
+ if err != nil {
+ panic(err)
+ }
+ serv.RegisterNotifee(&mdns)
+ return db
+}
+
+func createFederationClient(
+ base *P2PDendrite,
+) *gomatrixserverlib.FederationClient {
+ fmt.Println("Running in libp2p federation mode")
+ fmt.Println("Warning: Federation with non-libp2p homeservers will not work in this mode yet!")
+ tr := &http.Transport{}
+ tr.RegisterProtocol(
+ "matrix",
+ p2phttp.NewTransport(base.LibP2P, p2phttp.ProtocolOption("/matrix")),
+ )
+ return gomatrixserverlib.NewFederationClientWithTransport(
+ base.Base.Cfg.Matrix.ServerName, base.Base.Cfg.Matrix.KeyID, base.Base.Cfg.Matrix.PrivateKey, tr,
+ )
+}
+
+func main() {
+ instanceName := flag.String("name", "dendrite-p2p", "the name of this P2P demo instance")
+ instancePort := flag.Int("port", 8080, "the port that the client API will listen on")
+ flag.Parse()
+
+ filename := fmt.Sprintf("%s-private.key", *instanceName)
+ _, err := os.Stat(filename)
+ var privKey ed25519.PrivateKey
+ if os.IsNotExist(err) {
+ _, privKey, _ = ed25519.GenerateKey(nil)
+ if err = ioutil.WriteFile(filename, privKey, 0600); err != nil {
+ fmt.Printf("Couldn't write private key to file '%s': %s\n", filename, err)
+ }
+ } else {
+ privKey, err = ioutil.ReadFile(filename)
+ if err != nil {
+ fmt.Printf("Couldn't read private key from file '%s': %s\n", filename, err)
+ _, privKey, _ = ed25519.GenerateKey(nil)
+ }
+ }
+
+ cfg := config.Dendrite{}
+ cfg.Matrix.ServerName = "p2p"
+ cfg.Matrix.PrivateKey = privKey
+ cfg.Matrix.KeyID = gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", *instanceName))
+ cfg.Kafka.UseNaffka = true
+ cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput"
+ cfg.Kafka.Topics.OutputClientData = "clientapiOutput"
+ cfg.Kafka.Topics.OutputTypingEvent = "typingServerOutput"
+ cfg.Kafka.Topics.UserUpdates = "userUpdates"
+ cfg.Database.Account = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName))
+ cfg.Database.Device = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName))
+ cfg.Database.MediaAPI = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName))
+ cfg.Database.SyncAPI = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName))
+ cfg.Database.RoomServer = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName))
+ cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
+ cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
+ cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
+ cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicroomsa.db", *instanceName))
+ cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
+ if err = cfg.Derive(); err != nil {
+ panic(err)
+ }
+
+ base := NewP2PDendrite(&cfg, "Monolith")
+ defer base.Base.Close() // nolint: errcheck
+
+ accountDB := base.Base.CreateAccountsDB()
+ deviceDB := base.Base.CreateDeviceDB()
+ keyDB := createKeyDB(base)
+ federation := createFederationClient(base)
+ keyRing := keydb.CreateKeyRing(federation.Client, keyDB)
+
+ alias, input, query := roomserver.SetupRoomServerComponent(&base.Base)
+ eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New())
+ asQuery := appservice.SetupAppServiceAPIComponent(
+ &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(),
+ )
+ fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query)
+
+ clientapi.SetupClientAPIComponent(
+ &base.Base, deviceDB, accountDB,
+ federation, &keyRing, alias, input, query,
+ eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
+ )
+ eduProducer := producers.NewEDUServerProducer(eduInputAPI)
+ federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
+ mediaapi.SetupMediaAPIComponent(&base.Base, deviceDB)
+ publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub)
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to public rooms db")
+ }
+ publicroomsapi.SetupPublicRoomsAPIComponent(&base.Base, deviceDB, publicRoomsDB, query, federation, nil) // Check this later
+ syncapi.SetupSyncAPIComponent(&base.Base, deviceDB, accountDB, query, federation, &cfg)
+
+ httpHandler := common.WrapHandlerInCORS(base.Base.APIMux)
+
+ // Set up the API endpoints we handle. /metrics is for prometheus, and is
+ // not wrapped by CORS, while everything else is
+ http.Handle("/metrics", promhttp.Handler())
+ http.Handle("/", httpHandler)
+
+ // Expose the matrix APIs directly rather than putting them under a /api path.
+ go func() {
+ httpBindAddr := fmt.Sprintf(":%d", *instancePort)
+ logrus.Info("Listening on ", httpBindAddr)
+ logrus.Fatal(http.ListenAndServe(httpBindAddr, nil))
+ }()
+ // Expose the matrix APIs also via libp2p
+ if base.LibP2P != nil {
+ go func() {
+ logrus.Info("Listening on libp2p host ID ", base.LibP2P.ID())
+ listener, err := gostream.Listen(base.LibP2P, "/matrix")
+ if err != nil {
+ panic(err)
+ }
+ defer func() {
+ logrus.Fatal(listener.Close())
+ }()
+ logrus.Fatal(http.Serve(listener, nil))
+ }()
+ }
+
+ // We want to block forever to let the HTTP and HTTPS handler serve the APIs
+ select {}
+}
diff --git a/cmd/dendrite-demo-libp2p/mdnslistener.go b/cmd/dendrite-demo-libp2p/mdnslistener.go
new file mode 100644
index 00000000..3fefbec2
--- /dev/null
+++ b/cmd/dendrite-demo-libp2p/mdnslistener.go
@@ -0,0 +1,63 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "math"
+
+ "github.com/libp2p/go-libp2p-core/host"
+ "github.com/libp2p/go-libp2p-core/peer"
+ "github.com/matrix-org/dendrite/common/keydb"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type mDNSListener struct {
+ keydb keydb.Database
+ host host.Host
+}
+
+func (n *mDNSListener) HandlePeerFound(p peer.AddrInfo) {
+ if err := n.host.Connect(context.Background(), p); err != nil {
+ fmt.Println("Error adding peer", p.ID.String(), "via mDNS:", err)
+ }
+ if pubkey, err := p.ID.ExtractPublicKey(); err == nil {
+ raw, _ := pubkey.Raw()
+ if err := n.keydb.StoreKeys(
+ context.Background(),
+ map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{
+ {
+ ServerName: gomatrixserverlib.ServerName(p.ID.String()),
+ KeyID: "ed25519:p2pdemo",
+ }: {
+ VerifyKey: gomatrixserverlib.VerifyKey{
+ Key: gomatrixserverlib.Base64String(raw),
+ },
+ ValidUntilTS: math.MaxUint64 >> 1,
+ ExpiredTS: gomatrixserverlib.PublicKeyNotExpired,
+ },
+ },
+ ); err != nil {
+ fmt.Println("Failed to store keys:", err)
+ }
+ }
+ fmt.Println("Discovered", len(n.host.Peerstore().Peers())-1, "other libp2p peer(s):")
+ for _, peer := range n.host.Peerstore().Peers() {
+ if peer != n.host.ID() {
+ fmt.Println("-", peer)
+ }
+ }
+}
diff --git a/cmd/dendrite-demo-libp2p/p2pdendrite.go b/cmd/dendrite-demo-libp2p/p2pdendrite.go
new file mode 100644
index 00000000..a9db3b39
--- /dev/null
+++ b/cmd/dendrite-demo-libp2p/p2pdendrite.go
@@ -0,0 +1,126 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "fmt"
+
+ "errors"
+
+ pstore "github.com/libp2p/go-libp2p-core/peerstore"
+ record "github.com/libp2p/go-libp2p-record"
+ "github.com/matrix-org/dendrite/common/basecomponent"
+
+ "github.com/libp2p/go-libp2p"
+ circuit "github.com/libp2p/go-libp2p-circuit"
+ crypto "github.com/libp2p/go-libp2p-core/crypto"
+ routing "github.com/libp2p/go-libp2p-core/routing"
+
+ host "github.com/libp2p/go-libp2p-core/host"
+ dht "github.com/libp2p/go-libp2p-kad-dht"
+ pubsub "github.com/libp2p/go-libp2p-pubsub"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ "github.com/matrix-org/dendrite/common/config"
+)
+
+// P2PDendrite is a Peer-to-Peer variant of BaseDendrite.
+type P2PDendrite struct {
+ Base basecomponent.BaseDendrite
+
+ // Store our libp2p object so that we can make outgoing connections from it
+ // later
+ LibP2P host.Host
+ LibP2PContext context.Context
+ LibP2PCancel context.CancelFunc
+ LibP2PDHT *dht.IpfsDHT
+ LibP2PPubsub *pubsub.PubSub
+}
+
+// NewP2PDendrite creates a new instance to be used by a component.
+// The componentName is used for logging purposes, and should be a friendly name
+// of the component running, e.g. SyncAPI.
+func NewP2PDendrite(cfg *config.Dendrite, componentName string) *P2PDendrite {
+ baseDendrite := basecomponent.NewBaseDendrite(cfg, componentName)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ privKey, err := crypto.UnmarshalEd25519PrivateKey(cfg.Matrix.PrivateKey[:])
+ if err != nil {
+ panic(err)
+ }
+
+ //defaultIP6ListenAddr, _ := multiaddr.NewMultiaddr("/ip6/::/tcp/0")
+ var libp2pdht *dht.IpfsDHT
+ libp2p, err := libp2p.New(ctx,
+ libp2p.Identity(privKey),
+ libp2p.DefaultListenAddrs,
+ //libp2p.ListenAddrs(defaultIP6ListenAddr),
+ libp2p.DefaultTransports,
+ libp2p.Routing(func(h host.Host) (r routing.PeerRouting, err error) {
+ libp2pdht, err = dht.New(ctx, h)
+ if err != nil {
+ return nil, err
+ }
+ libp2pdht.Validator = libP2PValidator{}
+ r = libp2pdht
+ return
+ }),
+ libp2p.EnableAutoRelay(),
+ libp2p.EnableRelay(circuit.OptHop),
+ )
+ if err != nil {
+ panic(err)
+ }
+
+ libp2ppubsub, err := pubsub.NewFloodSub(context.Background(), libp2p, []pubsub.Option{
+ pubsub.WithMessageSigning(true),
+ }...)
+ if err != nil {
+ panic(err)
+ }
+
+ fmt.Println("Our public key:", privKey.GetPublic())
+ fmt.Println("Our node ID:", libp2p.ID())
+ fmt.Println("Our addresses:", libp2p.Addrs())
+
+ cfg.Matrix.ServerName = gomatrixserverlib.ServerName(libp2p.ID().String())
+
+ return &P2PDendrite{
+ Base: *baseDendrite,
+ LibP2P: libp2p,
+ LibP2PContext: ctx,
+ LibP2PCancel: cancel,
+ LibP2PDHT: libp2pdht,
+ LibP2PPubsub: libp2ppubsub,
+ }
+}
+
+type libP2PValidator struct {
+ KeyBook pstore.KeyBook
+}
+
+func (v libP2PValidator) Validate(key string, value []byte) error {
+ ns, _, err := record.SplitKey(key)
+ if err != nil || ns != "matrix" {
+ return errors.New("not Matrix path")
+ }
+ return nil
+}
+
+func (v libP2PValidator) Select(k string, vals [][]byte) (int, error) {
+ return 0, nil
+}
diff --git a/cmd/dendrite-demo-libp2p/storage/postgreswithdht/storage.go b/cmd/dendrite-demo-libp2p/storage/postgreswithdht/storage.go
new file mode 100644
index 00000000..819469ee
--- /dev/null
+++ b/cmd/dendrite-demo-libp2p/storage/postgreswithdht/storage.go
@@ -0,0 +1,164 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgreswithdht
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ dht "github.com/libp2p/go-libp2p-kad-dht"
+)
+
+const DHTInterval = time.Second * 10
+
+// PublicRoomsServerDatabase represents a public rooms server database.
+type PublicRoomsServerDatabase struct {
+ dht *dht.IpfsDHT
+ postgres.PublicRoomsServerDatabase
+ ourRoomsContext context.Context // our current value in the DHT
+ ourRoomsCancel context.CancelFunc // cancel when we want to expire our value
+ foundRooms map[string]gomatrixserverlib.PublicRoom // additional rooms we have learned about from the DHT
+ foundRoomsMutex sync.RWMutex // protects foundRooms
+ maintenanceTimer *time.Timer //
+ roomsAdvertised atomic.Value // stores int
+ roomsDiscovered atomic.Value // stores int
+}
+
+// NewPublicRoomsServerDatabase creates a new public rooms server database.
+func NewPublicRoomsServerDatabase(dataSourceName string, dht *dht.IpfsDHT) (*PublicRoomsServerDatabase, error) {
+ pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName)
+ if err != nil {
+ return nil, err
+ }
+ provider := PublicRoomsServerDatabase{
+ dht: dht,
+ PublicRoomsServerDatabase: *pg,
+ }
+ go provider.ResetDHTMaintenance()
+ provider.roomsAdvertised.Store(0)
+ provider.roomsDiscovered.Store(0)
+ return &provider, nil
+}
+
+func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) {
+ return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID)
+}
+
+func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error {
+ d.ResetDHTMaintenance()
+ return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID)
+}
+
+func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) {
+ count, err := d.PublicRoomsServerDatabase.CountPublicRooms(ctx)
+ if err != nil {
+ return 0, err
+ }
+ d.foundRoomsMutex.RLock()
+ defer d.foundRoomsMutex.RUnlock()
+ return count + int64(len(d.foundRooms)), nil
+}
+
+func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) {
+ realfilter := filter
+ if realfilter == "__local__" {
+ realfilter = ""
+ }
+ rooms, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, realfilter)
+ if err != nil {
+ return []gomatrixserverlib.PublicRoom{}, err
+ }
+ if filter != "__local__" {
+ d.foundRoomsMutex.RLock()
+ defer d.foundRoomsMutex.RUnlock()
+ for _, room := range d.foundRooms {
+ rooms = append(rooms, room)
+ }
+ }
+ return rooms, nil
+}
+
+func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error {
+ return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove)
+}
+
+func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error {
+ return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event)
+}
+
+func (d *PublicRoomsServerDatabase) ResetDHTMaintenance() {
+ if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() {
+ <-d.maintenanceTimer.C
+ }
+ d.Interval()
+}
+
+func (d *PublicRoomsServerDatabase) Interval() {
+ if err := d.AdvertiseRoomsIntoDHT(); err != nil {
+ // fmt.Println("Failed to advertise room in DHT:", err)
+ }
+ if err := d.FindRoomsInDHT(); err != nil {
+ // fmt.Println("Failed to find rooms in DHT:", err)
+ }
+ fmt.Println("Found", d.roomsDiscovered.Load(), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)")
+ d.maintenanceTimer = time.AfterFunc(DHTInterval, d.Interval)
+}
+
+func (d *PublicRoomsServerDatabase) AdvertiseRoomsIntoDHT() error {
+ dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second)
+ _ = dbCancel
+ ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__")
+ if err != nil {
+ return err
+ }
+ if j, err := json.Marshal(ourRooms); err == nil {
+ d.roomsAdvertised.Store(len(ourRooms))
+ d.ourRoomsContext, d.ourRoomsCancel = context.WithCancel(context.Background())
+ if err := d.dht.PutValue(d.ourRoomsContext, "/matrix/publicRooms", j); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (d *PublicRoomsServerDatabase) FindRoomsInDHT() error {
+ d.foundRoomsMutex.Lock()
+ searchCtx, searchCancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer searchCancel()
+ defer d.foundRoomsMutex.Unlock()
+ results, err := d.dht.GetValues(searchCtx, "/matrix/publicRooms", 1024)
+ if err != nil {
+ return err
+ }
+ d.foundRooms = make(map[string]gomatrixserverlib.PublicRoom)
+ for _, result := range results {
+ var received []gomatrixserverlib.PublicRoom
+ if err := json.Unmarshal(result.Val, &received); err != nil {
+ return err
+ }
+ for _, room := range received {
+ d.foundRooms[room.RoomID] = room
+ }
+ }
+ d.roomsDiscovered.Store(len(d.foundRooms))
+ return nil
+}
diff --git a/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go b/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go
new file mode 100644
index 00000000..66119224
--- /dev/null
+++ b/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go
@@ -0,0 +1,179 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgreswithpubsub
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
+ "github.com/matrix-org/gomatrixserverlib"
+
+ pubsub "github.com/libp2p/go-libp2p-pubsub"
+)
+
+const MaintenanceInterval = time.Second * 10
+
+type discoveredRoom struct {
+ time time.Time
+ room gomatrixserverlib.PublicRoom
+}
+
+// PublicRoomsServerDatabase represents a public rooms server database.
+type PublicRoomsServerDatabase struct {
+ postgres.PublicRoomsServerDatabase //
+ pubsub *pubsub.PubSub //
+ subscription *pubsub.Subscription //
+ foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT
+ foundRoomsMutex sync.RWMutex // protects foundRooms
+ maintenanceTimer *time.Timer //
+ roomsAdvertised atomic.Value // stores int
+}
+
+// NewPublicRoomsServerDatabase creates a new public rooms server database.
+func NewPublicRoomsServerDatabase(dataSourceName string, pubsub *pubsub.PubSub) (*PublicRoomsServerDatabase, error) {
+ pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName)
+ if err != nil {
+ return nil, err
+ }
+ provider := PublicRoomsServerDatabase{
+ pubsub: pubsub,
+ PublicRoomsServerDatabase: *pg,
+ foundRooms: make(map[string]discoveredRoom),
+ }
+ if topic, err := pubsub.Join("/matrix/publicRooms"); err != nil {
+ return nil, err
+ } else if sub, err := topic.Subscribe(); err == nil {
+ provider.subscription = sub
+ go provider.MaintenanceTimer()
+ go provider.FindRooms()
+ provider.roomsAdvertised.Store(0)
+ return &provider, nil
+ } else {
+ return nil, err
+ }
+}
+
+func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) {
+ return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID)
+}
+
+func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error {
+ d.MaintenanceTimer()
+ return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID)
+}
+
+func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) {
+ d.foundRoomsMutex.RLock()
+ defer d.foundRoomsMutex.RUnlock()
+ return int64(len(d.foundRooms)), nil
+}
+
+func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) {
+ var rooms []gomatrixserverlib.PublicRoom
+ if filter == "__local__" {
+ if r, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, ""); err == nil {
+ rooms = append(rooms, r...)
+ } else {
+ return []gomatrixserverlib.PublicRoom{}, err
+ }
+ } else {
+ d.foundRoomsMutex.RLock()
+ defer d.foundRoomsMutex.RUnlock()
+ for _, room := range d.foundRooms {
+ rooms = append(rooms, room.room)
+ }
+ }
+ return rooms, nil
+}
+
+func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error {
+ return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove)
+}
+
+func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error {
+ return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event)
+}
+
+func (d *PublicRoomsServerDatabase) MaintenanceTimer() {
+ if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() {
+ <-d.maintenanceTimer.C
+ }
+ d.Interval()
+}
+
+func (d *PublicRoomsServerDatabase) Interval() {
+ d.foundRoomsMutex.Lock()
+ for k, v := range d.foundRooms {
+ if time.Since(v.time) > time.Minute {
+ delete(d.foundRooms, k)
+ }
+ }
+ d.foundRoomsMutex.Unlock()
+ if err := d.AdvertiseRooms(); err != nil {
+ fmt.Println("Failed to advertise room in DHT:", err)
+ }
+ d.foundRoomsMutex.RLock()
+ defer d.foundRoomsMutex.RUnlock()
+ fmt.Println("Found", len(d.foundRooms), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)")
+ d.maintenanceTimer = time.AfterFunc(MaintenanceInterval, d.Interval)
+}
+
+func (d *PublicRoomsServerDatabase) AdvertiseRooms() error {
+ dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second)
+ _ = dbCancel
+ ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__")
+ if err != nil {
+ return err
+ }
+ advertised := 0
+ for _, room := range ourRooms {
+ if j, err := json.Marshal(room); err == nil {
+ if topic, err := d.pubsub.Join("/matrix/publicRooms"); err != nil {
+ fmt.Println("Failed to subscribe to topic:", err)
+ } else if err := topic.Publish(context.TODO(), j); err != nil {
+ fmt.Println("Failed to publish public room:", err)
+ } else {
+ advertised++
+ }
+ }
+ }
+
+ d.roomsAdvertised.Store(advertised)
+ return nil
+}
+
+func (d *PublicRoomsServerDatabase) FindRooms() {
+ for {
+ msg, err := d.subscription.Next(context.Background())
+ if err != nil {
+ continue
+ }
+ received := discoveredRoom{
+ time: time.Now(),
+ }
+ if err := json.Unmarshal(msg.Data, &received.room); err != nil {
+ fmt.Println("Unmarshal error:", err)
+ continue
+ }
+ d.foundRoomsMutex.Lock()
+ d.foundRooms[received.room.RoomID] = received
+ d.foundRoomsMutex.Unlock()
+ }
+}
diff --git a/cmd/dendrite-demo-libp2p/storage/storage.go b/cmd/dendrite-demo-libp2p/storage/storage.go
new file mode 100644
index 00000000..668edbaa
--- /dev/null
+++ b/cmd/dendrite-demo-libp2p/storage/storage.go
@@ -0,0 +1,61 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package storage
+
+import (
+ "net/url"
+
+ dht "github.com/libp2p/go-libp2p-kad-dht"
+ pubsub "github.com/libp2p/go-libp2p-pubsub"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage/postgreswithdht"
+ "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub"
+ "github.com/matrix-org/dendrite/publicroomsapi/storage"
+ "github.com/matrix-org/dendrite/publicroomsapi/storage/sqlite3"
+)
+
+const schemePostgres = "postgres"
+const schemeFile = "file"
+
+// NewPublicRoomsServerDatabase opens a database connection.
+func NewPublicRoomsServerDatabaseWithDHT(dataSourceName string, dht *dht.IpfsDHT) (storage.Database, error) {
+ uri, err := url.Parse(dataSourceName)
+ if err != nil {
+ return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht)
+ }
+ switch uri.Scheme {
+ case schemePostgres:
+ return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht)
+ case schemeFile:
+ return sqlite3.NewPublicRoomsServerDatabase(dataSourceName)
+ default:
+ return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht)
+ }
+}
+
+// NewPublicRoomsServerDatabase opens a database connection.
+func NewPublicRoomsServerDatabaseWithPubSub(dataSourceName string, pubsub *pubsub.PubSub) (storage.Database, error) {
+ uri, err := url.Parse(dataSourceName)
+ if err != nil {
+ return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub)
+ }
+ switch uri.Scheme {
+ case schemePostgres:
+ return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub)
+ case schemeFile:
+ return sqlite3.NewPublicRoomsServerDatabase(dataSourceName)
+ default:
+ return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub)
+ }
+}
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index c71d956b..d47d2e1b 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
+ "github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -71,7 +72,11 @@ func main() {
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
- publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil)
+ publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to public rooms db")
+ }
+ publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, federation, nil)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
httpHandler := common.WrapHandlerInCORS(base.APIMux)
diff --git a/cmd/dendrite-public-rooms-api-server/main.go b/cmd/dendrite-public-rooms-api-server/main.go
index 6b7eac7d..f6a782f6 100644
--- a/cmd/dendrite-public-rooms-api-server/main.go
+++ b/cmd/dendrite-public-rooms-api-server/main.go
@@ -17,6 +17,8 @@ package main
import (
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/publicroomsapi"
+ "github.com/matrix-org/dendrite/publicroomsapi/storage"
+ "github.com/sirupsen/logrus"
)
func main() {
@@ -27,8 +29,11 @@ func main() {
deviceDB := base.CreateDeviceDB()
_, _, query := base.CreateHTTPRoomserverAPIs()
-
- publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, nil, nil)
+ publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to public rooms db")
+ }
+ publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, nil, nil)
base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI))
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index 05802725..9bd8f2ee 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -34,6 +34,7 @@ import (
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/mediaapi"
"github.com/matrix-org/dendrite/publicroomsapi"
+ "github.com/matrix-org/dendrite/publicroomsapi/storage"
"github.com/matrix-org/dendrite/roomserver"
"github.com/matrix-org/dendrite/syncapi"
"github.com/matrix-org/go-http-js-libp2p/go_http_js_libp2p"
@@ -137,7 +138,11 @@ func main() {
eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
- publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider)
+ publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI))
+ if err != nil {
+ logrus.WithError(err).Panicf("failed to connect to public rooms db")
+ }
+ publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, federation, p2pPublicRoomProvider)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
httpHandler := common.WrapHandlerInCORS(base.APIMux)