diff options
author | Hilmar Gústafsson <LiHRaM@users.noreply.github.com> | 2020-04-14 17:15:59 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-14 16:15:59 +0100 |
commit | 73d2f59e303fa998f997c483bb6843bf77e069e5 (patch) | |
tree | a10ca5e3976f4ec1f3c1a87472dc475b10ad6af7 /cmd | |
parent | 48303d06cb91d19582f776af32ee22e5d820f031 (diff) |
WIP: Add libp2p-go (#956)
* Add libp2p-go
* Some tweaks, tidying up
(cherry picked from commit 1a5bb121f8121c4f68a27abbf25a9a35a1b7c63e)
* Move p2p dockerfile
(cherry picked from commit 8d3bf44ea1bf37f950034e73bcdc315afdabe79a)
* Remove containsBackwardsExtremity
* Fix some linter errors, update some libp2p packages/calls, other tidying up
* Add -port for dendrite-p2p-demo
* Use instance name as key ID
* Remove P2P demo docker stuff, no longer needed now that we have SQLite
* Remove Dockerfile-p2p too
* Remove p2p logic from dendrite-monolith-server
* Inject publicRoomsDB in publicroomsapi
Inject publicRoomsDB instead of switching on base.libP2P.
See: https://github.com/matrix-org/dendrite/pull/956/files?file-filters%5B%5D=.go#r406276914
* Fix lint warning
* Extract mDNSListener from base.go
* Extract CreateFederationClient into demo
* Create P2PDendrite from BaseDendrite
Extract logic specific to P2PDendrite from base.go
* Set base.go to upstream/master
* Move pubsub to demo cmd
* Move PostgreswithDHT to cmd
* Remove unstable features
* Add copyrights
* Move libp2pvalidator into p2pdendrite
* Rename dendrite-p2p-demo -> dendrite-demo-libp2p
* Update copyrights
* go mod tidy
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'cmd')
-rw-r--r-- | cmd/dendrite-demo-libp2p/main.go | 203 | ||||
-rw-r--r-- | cmd/dendrite-demo-libp2p/mdnslistener.go | 63 | ||||
-rw-r--r-- | cmd/dendrite-demo-libp2p/p2pdendrite.go | 126 | ||||
-rw-r--r-- | cmd/dendrite-demo-libp2p/storage/postgreswithdht/storage.go | 164 | ||||
-rw-r--r-- | cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go | 179 | ||||
-rw-r--r-- | cmd/dendrite-demo-libp2p/storage/storage.go | 61 | ||||
-rw-r--r-- | cmd/dendrite-monolith-server/main.go | 7 | ||||
-rw-r--r-- | cmd/dendrite-public-rooms-api-server/main.go | 9 | ||||
-rw-r--r-- | cmd/dendritejs/main.go | 7 |
9 files changed, 815 insertions, 4 deletions
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go new file mode 100644 index 00000000..df3b48ad --- /dev/null +++ b/cmd/dendrite-demo-libp2p/main.go @@ -0,0 +1,203 @@ +// Copyright 2017 Vector Creations Ltd +// Copyright 2018 New Vector Ltd +// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "crypto/ed25519" + "flag" + "fmt" + "io/ioutil" + "net/http" + "os" + "time" + + gostream "github.com/libp2p/go-libp2p-gostream" + p2phttp "github.com/libp2p/go-libp2p-http" + p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery" + "github.com/matrix-org/dendrite/appservice" + "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/common/config" + "github.com/matrix-org/dendrite/common/keydb" + "github.com/matrix-org/dendrite/common/transactions" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/federationapi" + "github.com/matrix-org/dendrite/federationsender" + "github.com/matrix-org/dendrite/mediaapi" + "github.com/matrix-org/dendrite/publicroomsapi" + "github.com/matrix-org/dendrite/roomserver" + "github.com/matrix-org/dendrite/syncapi" + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/eduserver/cache" + + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" +) + +func createKeyDB( + base *P2PDendrite, +) keydb.Database { + db, err := keydb.NewDatabase( + string(base.Base.Cfg.Database.ServerKey), + base.Base.Cfg.Matrix.ServerName, + base.Base.Cfg.Matrix.PrivateKey.Public().(ed25519.PublicKey), + base.Base.Cfg.Matrix.KeyID, + ) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to keys db") + } + mdns := mDNSListener{ + host: base.LibP2P, + keydb: db, + } + serv, err := p2pdisc.NewMdnsService( + base.LibP2PContext, + base.LibP2P, + time.Second*10, + "_matrix-dendrite-p2p._tcp", + ) + if err != nil { + panic(err) + } + serv.RegisterNotifee(&mdns) + return db +} + +func createFederationClient( + base *P2PDendrite, +) *gomatrixserverlib.FederationClient { + fmt.Println("Running in libp2p federation mode") + fmt.Println("Warning: Federation with non-libp2p homeservers will not work in this mode yet!") + tr := &http.Transport{} + tr.RegisterProtocol( + "matrix", + p2phttp.NewTransport(base.LibP2P, p2phttp.ProtocolOption("/matrix")), + ) + return gomatrixserverlib.NewFederationClientWithTransport( + base.Base.Cfg.Matrix.ServerName, base.Base.Cfg.Matrix.KeyID, base.Base.Cfg.Matrix.PrivateKey, tr, + ) +} + +func main() { + instanceName := flag.String("name", "dendrite-p2p", "the name of this P2P demo instance") + instancePort := flag.Int("port", 8080, "the port that the client API will listen on") + flag.Parse() + + filename := fmt.Sprintf("%s-private.key", *instanceName) + _, err := os.Stat(filename) + var privKey ed25519.PrivateKey + if os.IsNotExist(err) { + _, privKey, _ = ed25519.GenerateKey(nil) + if err = ioutil.WriteFile(filename, privKey, 0600); err != nil { + fmt.Printf("Couldn't write private key to file '%s': %s\n", filename, err) + } + } else { + privKey, err = ioutil.ReadFile(filename) + if err != nil { + fmt.Printf("Couldn't read private key from file '%s': %s\n", filename, err) + _, privKey, _ = ed25519.GenerateKey(nil) + } + } + + cfg := config.Dendrite{} + cfg.Matrix.ServerName = "p2p" + cfg.Matrix.PrivateKey = privKey + cfg.Matrix.KeyID = gomatrixserverlib.KeyID(fmt.Sprintf("ed25519:%s", *instanceName)) + cfg.Kafka.UseNaffka = true + cfg.Kafka.Topics.OutputRoomEvent = "roomserverOutput" + cfg.Kafka.Topics.OutputClientData = "clientapiOutput" + cfg.Kafka.Topics.OutputTypingEvent = "typingServerOutput" + cfg.Kafka.Topics.UserUpdates = "userUpdates" + cfg.Database.Account = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) + cfg.Database.Device = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) + cfg.Database.MediaAPI = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) + cfg.Database.SyncAPI = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) + cfg.Database.RoomServer = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) + cfg.Database.ServerKey = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName)) + cfg.Database.FederationSender = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName)) + cfg.Database.AppService = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName)) + cfg.Database.PublicRoomsAPI = config.DataSource(fmt.Sprintf("file:%s-publicroomsa.db", *instanceName)) + cfg.Database.Naffka = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName)) + if err = cfg.Derive(); err != nil { + panic(err) + } + + base := NewP2PDendrite(&cfg, "Monolith") + defer base.Base.Close() // nolint: errcheck + + accountDB := base.Base.CreateAccountsDB() + deviceDB := base.Base.CreateDeviceDB() + keyDB := createKeyDB(base) + federation := createFederationClient(base) + keyRing := keydb.CreateKeyRing(federation.Client, keyDB) + + alias, input, query := roomserver.SetupRoomServerComponent(&base.Base) + eduInputAPI := eduserver.SetupEDUServerComponent(&base.Base, cache.New()) + asQuery := appservice.SetupAppServiceAPIComponent( + &base.Base, accountDB, deviceDB, federation, alias, query, transactions.New(), + ) + fedSenderAPI := federationsender.SetupFederationSenderComponent(&base.Base, federation, query) + + clientapi.SetupClientAPIComponent( + &base.Base, deviceDB, accountDB, + federation, &keyRing, alias, input, query, + eduInputAPI, asQuery, transactions.New(), fedSenderAPI, + ) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) + federationapi.SetupFederationAPIComponent(&base.Base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) + mediaapi.SetupMediaAPIComponent(&base.Base, deviceDB) + publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to public rooms db") + } + publicroomsapi.SetupPublicRoomsAPIComponent(&base.Base, deviceDB, publicRoomsDB, query, federation, nil) // Check this later + syncapi.SetupSyncAPIComponent(&base.Base, deviceDB, accountDB, query, federation, &cfg) + + httpHandler := common.WrapHandlerInCORS(base.Base.APIMux) + + // Set up the API endpoints we handle. /metrics is for prometheus, and is + // not wrapped by CORS, while everything else is + http.Handle("/metrics", promhttp.Handler()) + http.Handle("/", httpHandler) + + // Expose the matrix APIs directly rather than putting them under a /api path. + go func() { + httpBindAddr := fmt.Sprintf(":%d", *instancePort) + logrus.Info("Listening on ", httpBindAddr) + logrus.Fatal(http.ListenAndServe(httpBindAddr, nil)) + }() + // Expose the matrix APIs also via libp2p + if base.LibP2P != nil { + go func() { + logrus.Info("Listening on libp2p host ID ", base.LibP2P.ID()) + listener, err := gostream.Listen(base.LibP2P, "/matrix") + if err != nil { + panic(err) + } + defer func() { + logrus.Fatal(listener.Close()) + }() + logrus.Fatal(http.Serve(listener, nil)) + }() + } + + // We want to block forever to let the HTTP and HTTPS handler serve the APIs + select {} +} diff --git a/cmd/dendrite-demo-libp2p/mdnslistener.go b/cmd/dendrite-demo-libp2p/mdnslistener.go new file mode 100644 index 00000000..3fefbec2 --- /dev/null +++ b/cmd/dendrite-demo-libp2p/mdnslistener.go @@ -0,0 +1,63 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + "math" + + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/matrix-org/dendrite/common/keydb" + "github.com/matrix-org/gomatrixserverlib" +) + +type mDNSListener struct { + keydb keydb.Database + host host.Host +} + +func (n *mDNSListener) HandlePeerFound(p peer.AddrInfo) { + if err := n.host.Connect(context.Background(), p); err != nil { + fmt.Println("Error adding peer", p.ID.String(), "via mDNS:", err) + } + if pubkey, err := p.ID.ExtractPublicKey(); err == nil { + raw, _ := pubkey.Raw() + if err := n.keydb.StoreKeys( + context.Background(), + map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.PublicKeyLookupResult{ + { + ServerName: gomatrixserverlib.ServerName(p.ID.String()), + KeyID: "ed25519:p2pdemo", + }: { + VerifyKey: gomatrixserverlib.VerifyKey{ + Key: gomatrixserverlib.Base64String(raw), + }, + ValidUntilTS: math.MaxUint64 >> 1, + ExpiredTS: gomatrixserverlib.PublicKeyNotExpired, + }, + }, + ); err != nil { + fmt.Println("Failed to store keys:", err) + } + } + fmt.Println("Discovered", len(n.host.Peerstore().Peers())-1, "other libp2p peer(s):") + for _, peer := range n.host.Peerstore().Peers() { + if peer != n.host.ID() { + fmt.Println("-", peer) + } + } +} diff --git a/cmd/dendrite-demo-libp2p/p2pdendrite.go b/cmd/dendrite-demo-libp2p/p2pdendrite.go new file mode 100644 index 00000000..a9db3b39 --- /dev/null +++ b/cmd/dendrite-demo-libp2p/p2pdendrite.go @@ -0,0 +1,126 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "fmt" + + "errors" + + pstore "github.com/libp2p/go-libp2p-core/peerstore" + record "github.com/libp2p/go-libp2p-record" + "github.com/matrix-org/dendrite/common/basecomponent" + + "github.com/libp2p/go-libp2p" + circuit "github.com/libp2p/go-libp2p-circuit" + crypto "github.com/libp2p/go-libp2p-core/crypto" + routing "github.com/libp2p/go-libp2p-core/routing" + + host "github.com/libp2p/go-libp2p-core/host" + dht "github.com/libp2p/go-libp2p-kad-dht" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/common/config" +) + +// P2PDendrite is a Peer-to-Peer variant of BaseDendrite. +type P2PDendrite struct { + Base basecomponent.BaseDendrite + + // Store our libp2p object so that we can make outgoing connections from it + // later + LibP2P host.Host + LibP2PContext context.Context + LibP2PCancel context.CancelFunc + LibP2PDHT *dht.IpfsDHT + LibP2PPubsub *pubsub.PubSub +} + +// NewP2PDendrite creates a new instance to be used by a component. +// The componentName is used for logging purposes, and should be a friendly name +// of the component running, e.g. SyncAPI. +func NewP2PDendrite(cfg *config.Dendrite, componentName string) *P2PDendrite { + baseDendrite := basecomponent.NewBaseDendrite(cfg, componentName) + + ctx, cancel := context.WithCancel(context.Background()) + + privKey, err := crypto.UnmarshalEd25519PrivateKey(cfg.Matrix.PrivateKey[:]) + if err != nil { + panic(err) + } + + //defaultIP6ListenAddr, _ := multiaddr.NewMultiaddr("/ip6/::/tcp/0") + var libp2pdht *dht.IpfsDHT + libp2p, err := libp2p.New(ctx, + libp2p.Identity(privKey), + libp2p.DefaultListenAddrs, + //libp2p.ListenAddrs(defaultIP6ListenAddr), + libp2p.DefaultTransports, + libp2p.Routing(func(h host.Host) (r routing.PeerRouting, err error) { + libp2pdht, err = dht.New(ctx, h) + if err != nil { + return nil, err + } + libp2pdht.Validator = libP2PValidator{} + r = libp2pdht + return + }), + libp2p.EnableAutoRelay(), + libp2p.EnableRelay(circuit.OptHop), + ) + if err != nil { + panic(err) + } + + libp2ppubsub, err := pubsub.NewFloodSub(context.Background(), libp2p, []pubsub.Option{ + pubsub.WithMessageSigning(true), + }...) + if err != nil { + panic(err) + } + + fmt.Println("Our public key:", privKey.GetPublic()) + fmt.Println("Our node ID:", libp2p.ID()) + fmt.Println("Our addresses:", libp2p.Addrs()) + + cfg.Matrix.ServerName = gomatrixserverlib.ServerName(libp2p.ID().String()) + + return &P2PDendrite{ + Base: *baseDendrite, + LibP2P: libp2p, + LibP2PContext: ctx, + LibP2PCancel: cancel, + LibP2PDHT: libp2pdht, + LibP2PPubsub: libp2ppubsub, + } +} + +type libP2PValidator struct { + KeyBook pstore.KeyBook +} + +func (v libP2PValidator) Validate(key string, value []byte) error { + ns, _, err := record.SplitKey(key) + if err != nil || ns != "matrix" { + return errors.New("not Matrix path") + } + return nil +} + +func (v libP2PValidator) Select(k string, vals [][]byte) (int, error) { + return 0, nil +} diff --git a/cmd/dendrite-demo-libp2p/storage/postgreswithdht/storage.go b/cmd/dendrite-demo-libp2p/storage/postgreswithdht/storage.go new file mode 100644 index 00000000..819469ee --- /dev/null +++ b/cmd/dendrite-demo-libp2p/storage/postgreswithdht/storage.go @@ -0,0 +1,164 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgreswithdht + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/matrix-org/dendrite/publicroomsapi/storage/postgres" + "github.com/matrix-org/gomatrixserverlib" + + dht "github.com/libp2p/go-libp2p-kad-dht" +) + +const DHTInterval = time.Second * 10 + +// PublicRoomsServerDatabase represents a public rooms server database. +type PublicRoomsServerDatabase struct { + dht *dht.IpfsDHT + postgres.PublicRoomsServerDatabase + ourRoomsContext context.Context // our current value in the DHT + ourRoomsCancel context.CancelFunc // cancel when we want to expire our value + foundRooms map[string]gomatrixserverlib.PublicRoom // additional rooms we have learned about from the DHT + foundRoomsMutex sync.RWMutex // protects foundRooms + maintenanceTimer *time.Timer // + roomsAdvertised atomic.Value // stores int + roomsDiscovered atomic.Value // stores int +} + +// NewPublicRoomsServerDatabase creates a new public rooms server database. +func NewPublicRoomsServerDatabase(dataSourceName string, dht *dht.IpfsDHT) (*PublicRoomsServerDatabase, error) { + pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName) + if err != nil { + return nil, err + } + provider := PublicRoomsServerDatabase{ + dht: dht, + PublicRoomsServerDatabase: *pg, + } + go provider.ResetDHTMaintenance() + provider.roomsAdvertised.Store(0) + provider.roomsDiscovered.Store(0) + return &provider, nil +} + +func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) { + return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID) +} + +func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error { + d.ResetDHTMaintenance() + return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID) +} + +func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) { + count, err := d.PublicRoomsServerDatabase.CountPublicRooms(ctx) + if err != nil { + return 0, err + } + d.foundRoomsMutex.RLock() + defer d.foundRoomsMutex.RUnlock() + return count + int64(len(d.foundRooms)), nil +} + +func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) { + realfilter := filter + if realfilter == "__local__" { + realfilter = "" + } + rooms, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, realfilter) + if err != nil { + return []gomatrixserverlib.PublicRoom{}, err + } + if filter != "__local__" { + d.foundRoomsMutex.RLock() + defer d.foundRoomsMutex.RUnlock() + for _, room := range d.foundRooms { + rooms = append(rooms, room) + } + } + return rooms, nil +} + +func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error { + return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove) +} + +func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error { + return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event) +} + +func (d *PublicRoomsServerDatabase) ResetDHTMaintenance() { + if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() { + <-d.maintenanceTimer.C + } + d.Interval() +} + +func (d *PublicRoomsServerDatabase) Interval() { + if err := d.AdvertiseRoomsIntoDHT(); err != nil { + // fmt.Println("Failed to advertise room in DHT:", err) + } + if err := d.FindRoomsInDHT(); err != nil { + // fmt.Println("Failed to find rooms in DHT:", err) + } + fmt.Println("Found", d.roomsDiscovered.Load(), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)") + d.maintenanceTimer = time.AfterFunc(DHTInterval, d.Interval) +} + +func (d *PublicRoomsServerDatabase) AdvertiseRoomsIntoDHT() error { + dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second) + _ = dbCancel + ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__") + if err != nil { + return err + } + if j, err := json.Marshal(ourRooms); err == nil { + d.roomsAdvertised.Store(len(ourRooms)) + d.ourRoomsContext, d.ourRoomsCancel = context.WithCancel(context.Background()) + if err := d.dht.PutValue(d.ourRoomsContext, "/matrix/publicRooms", j); err != nil { + return err + } + } + return nil +} + +func (d *PublicRoomsServerDatabase) FindRoomsInDHT() error { + d.foundRoomsMutex.Lock() + searchCtx, searchCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer searchCancel() + defer d.foundRoomsMutex.Unlock() + results, err := d.dht.GetValues(searchCtx, "/matrix/publicRooms", 1024) + if err != nil { + return err + } + d.foundRooms = make(map[string]gomatrixserverlib.PublicRoom) + for _, result := range results { + var received []gomatrixserverlib.PublicRoom + if err := json.Unmarshal(result.Val, &received); err != nil { + return err + } + for _, room := range received { + d.foundRooms[room.RoomID] = room + } + } + d.roomsDiscovered.Store(len(d.foundRooms)) + return nil +} diff --git a/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go b/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go new file mode 100644 index 00000000..66119224 --- /dev/null +++ b/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go @@ -0,0 +1,179 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgreswithpubsub + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/matrix-org/dendrite/publicroomsapi/storage/postgres" + "github.com/matrix-org/gomatrixserverlib" + + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +const MaintenanceInterval = time.Second * 10 + +type discoveredRoom struct { + time time.Time + room gomatrixserverlib.PublicRoom +} + +// PublicRoomsServerDatabase represents a public rooms server database. +type PublicRoomsServerDatabase struct { + postgres.PublicRoomsServerDatabase // + pubsub *pubsub.PubSub // + subscription *pubsub.Subscription // + foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT + foundRoomsMutex sync.RWMutex // protects foundRooms + maintenanceTimer *time.Timer // + roomsAdvertised atomic.Value // stores int +} + +// NewPublicRoomsServerDatabase creates a new public rooms server database. +func NewPublicRoomsServerDatabase(dataSourceName string, pubsub *pubsub.PubSub) (*PublicRoomsServerDatabase, error) { + pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName) + if err != nil { + return nil, err + } + provider := PublicRoomsServerDatabase{ + pubsub: pubsub, + PublicRoomsServerDatabase: *pg, + foundRooms: make(map[string]discoveredRoom), + } + if topic, err := pubsub.Join("/matrix/publicRooms"); err != nil { + return nil, err + } else if sub, err := topic.Subscribe(); err == nil { + provider.subscription = sub + go provider.MaintenanceTimer() + go provider.FindRooms() + provider.roomsAdvertised.Store(0) + return &provider, nil + } else { + return nil, err + } +} + +func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) { + return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID) +} + +func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error { + d.MaintenanceTimer() + return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID) +} + +func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) { + d.foundRoomsMutex.RLock() + defer d.foundRoomsMutex.RUnlock() + return int64(len(d.foundRooms)), nil +} + +func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) { + var rooms []gomatrixserverlib.PublicRoom + if filter == "__local__" { + if r, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, ""); err == nil { + rooms = append(rooms, r...) + } else { + return []gomatrixserverlib.PublicRoom{}, err + } + } else { + d.foundRoomsMutex.RLock() + defer d.foundRoomsMutex.RUnlock() + for _, room := range d.foundRooms { + rooms = append(rooms, room.room) + } + } + return rooms, nil +} + +func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error { + return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove) +} + +func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error { + return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event) +} + +func (d *PublicRoomsServerDatabase) MaintenanceTimer() { + if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() { + <-d.maintenanceTimer.C + } + d.Interval() +} + +func (d *PublicRoomsServerDatabase) Interval() { + d.foundRoomsMutex.Lock() + for k, v := range d.foundRooms { + if time.Since(v.time) > time.Minute { + delete(d.foundRooms, k) + } + } + d.foundRoomsMutex.Unlock() + if err := d.AdvertiseRooms(); err != nil { + fmt.Println("Failed to advertise room in DHT:", err) + } + d.foundRoomsMutex.RLock() + defer d.foundRoomsMutex.RUnlock() + fmt.Println("Found", len(d.foundRooms), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)") + d.maintenanceTimer = time.AfterFunc(MaintenanceInterval, d.Interval) +} + +func (d *PublicRoomsServerDatabase) AdvertiseRooms() error { + dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second) + _ = dbCancel + ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__") + if err != nil { + return err + } + advertised := 0 + for _, room := range ourRooms { + if j, err := json.Marshal(room); err == nil { + if topic, err := d.pubsub.Join("/matrix/publicRooms"); err != nil { + fmt.Println("Failed to subscribe to topic:", err) + } else if err := topic.Publish(context.TODO(), j); err != nil { + fmt.Println("Failed to publish public room:", err) + } else { + advertised++ + } + } + } + + d.roomsAdvertised.Store(advertised) + return nil +} + +func (d *PublicRoomsServerDatabase) FindRooms() { + for { + msg, err := d.subscription.Next(context.Background()) + if err != nil { + continue + } + received := discoveredRoom{ + time: time.Now(), + } + if err := json.Unmarshal(msg.Data, &received.room); err != nil { + fmt.Println("Unmarshal error:", err) + continue + } + d.foundRoomsMutex.Lock() + d.foundRooms[received.room.RoomID] = received + d.foundRoomsMutex.Unlock() + } +} diff --git a/cmd/dendrite-demo-libp2p/storage/storage.go b/cmd/dendrite-demo-libp2p/storage/storage.go new file mode 100644 index 00000000..668edbaa --- /dev/null +++ b/cmd/dendrite-demo-libp2p/storage/storage.go @@ -0,0 +1,61 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "net/url" + + dht "github.com/libp2p/go-libp2p-kad-dht" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage/postgreswithdht" + "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/matrix-org/dendrite/publicroomsapi/storage/sqlite3" +) + +const schemePostgres = "postgres" +const schemeFile = "file" + +// NewPublicRoomsServerDatabase opens a database connection. +func NewPublicRoomsServerDatabaseWithDHT(dataSourceName string, dht *dht.IpfsDHT) (storage.Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht) + } + switch uri.Scheme { + case schemePostgres: + return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht) + case schemeFile: + return sqlite3.NewPublicRoomsServerDatabase(dataSourceName) + default: + return postgreswithdht.NewPublicRoomsServerDatabase(dataSourceName, dht) + } +} + +// NewPublicRoomsServerDatabase opens a database connection. +func NewPublicRoomsServerDatabaseWithPubSub(dataSourceName string, pubsub *pubsub.PubSub) (storage.Database, error) { + uri, err := url.Parse(dataSourceName) + if err != nil { + return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub) + } + switch uri.Scheme { + case schemePostgres: + return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub) + case schemeFile: + return sqlite3.NewPublicRoomsServerDatabase(dataSourceName) + default: + return postgreswithpubsub.NewPublicRoomsServerDatabase(dataSourceName, pubsub) + } +} diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index c71d956b..d47d2e1b 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -31,6 +31,7 @@ import ( "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/publicroomsapi" + "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -71,7 +72,11 @@ func main() { eduProducer := producers.NewEDUServerProducer(eduInputAPI) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) - publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil) + publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to public rooms db") + } + publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, federation, nil) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) httpHandler := common.WrapHandlerInCORS(base.APIMux) diff --git a/cmd/dendrite-public-rooms-api-server/main.go b/cmd/dendrite-public-rooms-api-server/main.go index 6b7eac7d..f6a782f6 100644 --- a/cmd/dendrite-public-rooms-api-server/main.go +++ b/cmd/dendrite-public-rooms-api-server/main.go @@ -17,6 +17,8 @@ package main import ( "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/publicroomsapi" + "github.com/matrix-org/dendrite/publicroomsapi/storage" + "github.com/sirupsen/logrus" ) func main() { @@ -27,8 +29,11 @@ func main() { deviceDB := base.CreateDeviceDB() _, _, query := base.CreateHTTPRoomserverAPIs() - - publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, nil, nil) + publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to public rooms db") + } + publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, nil, nil) base.SetupAndServeHTTP(string(base.Cfg.Bind.PublicRoomsAPI), string(base.Cfg.Listen.PublicRoomsAPI)) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 05802725..9bd8f2ee 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -34,6 +34,7 @@ import ( "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/publicroomsapi" + "github.com/matrix-org/dendrite/publicroomsapi/storage" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" "github.com/matrix-org/go-http-js-libp2p/go_http_js_libp2p" @@ -137,7 +138,11 @@ func main() { eduProducer := producers.NewEDUServerProducer(eduInputAPI) federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) - publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider) + publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI)) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to public rooms db") + } + publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, publicRoomsDB, query, federation, p2pPublicRoomProvider) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) httpHandler := common.WrapHandlerInCORS(base.APIMux) |