aboutsummaryrefslogtreecommitdiff
path: root/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-02 17:11:33 +0100
committerGitHub <noreply@github.com>2020-07-02 17:11:33 +0100
commit9c1f38621c4d787761092bc841e06ca424fbbf35 (patch)
treec7663e2f1f0af6414d1d66567265f5158360d8f2 /cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go
parent4c1e6597c0ea82f5390b73f35036db58e65542cc (diff)
Remove publicroomsapi (#1176)
* Remove all of publicroomsapi * Remove references to publicroomsapi * Remove doc references to publicroomsapi
Diffstat (limited to 'cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go')
-rw-r--r--cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go179
1 files changed, 0 insertions, 179 deletions
diff --git a/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go b/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go
deleted file mode 100644
index cf642eb3..00000000
--- a/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go
+++ /dev/null
@@ -1,179 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package postgreswithpubsub
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
- "github.com/matrix-org/gomatrixserverlib"
-
- pubsub "github.com/libp2p/go-libp2p-pubsub"
-)
-
-const MaintenanceInterval = time.Second * 10
-
-type discoveredRoom struct {
- time time.Time
- room gomatrixserverlib.PublicRoom
-}
-
-// PublicRoomsServerDatabase represents a public rooms server database.
-type PublicRoomsServerDatabase struct {
- postgres.PublicRoomsServerDatabase //
- pubsub *pubsub.PubSub //
- subscription *pubsub.Subscription //
- foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT
- foundRoomsMutex sync.RWMutex // protects foundRooms
- maintenanceTimer *time.Timer //
- roomsAdvertised atomic.Value // stores int
-}
-
-// NewPublicRoomsServerDatabase creates a new public rooms server database.
-func NewPublicRoomsServerDatabase(dataSourceName string, pubsub *pubsub.PubSub, localServerName gomatrixserverlib.ServerName) (*PublicRoomsServerDatabase, error) {
- pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName, nil, localServerName)
- if err != nil {
- return nil, err
- }
- provider := PublicRoomsServerDatabase{
- pubsub: pubsub,
- PublicRoomsServerDatabase: *pg,
- foundRooms: make(map[string]discoveredRoom),
- }
- if topic, err := pubsub.Join("/matrix/publicRooms"); err != nil {
- return nil, err
- } else if sub, err := topic.Subscribe(); err == nil {
- provider.subscription = sub
- go provider.MaintenanceTimer()
- go provider.FindRooms()
- provider.roomsAdvertised.Store(0)
- return &provider, nil
- } else {
- return nil, err
- }
-}
-
-func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) {
- return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID)
-}
-
-func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error {
- d.MaintenanceTimer()
- return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID)
-}
-
-func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) {
- d.foundRoomsMutex.RLock()
- defer d.foundRoomsMutex.RUnlock()
- return int64(len(d.foundRooms)), nil
-}
-
-func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) {
- var rooms []gomatrixserverlib.PublicRoom
- if filter == "__local__" {
- if r, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, ""); err == nil {
- rooms = append(rooms, r...)
- } else {
- return []gomatrixserverlib.PublicRoom{}, err
- }
- } else {
- d.foundRoomsMutex.RLock()
- defer d.foundRoomsMutex.RUnlock()
- for _, room := range d.foundRooms {
- rooms = append(rooms, room.room)
- }
- }
- return rooms, nil
-}
-
-func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error {
- return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove)
-}
-
-func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error {
- return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event)
-}
-
-func (d *PublicRoomsServerDatabase) MaintenanceTimer() {
- if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() {
- <-d.maintenanceTimer.C
- }
- d.Interval()
-}
-
-func (d *PublicRoomsServerDatabase) Interval() {
- d.foundRoomsMutex.Lock()
- for k, v := range d.foundRooms {
- if time.Since(v.time) > time.Minute {
- delete(d.foundRooms, k)
- }
- }
- d.foundRoomsMutex.Unlock()
- if err := d.AdvertiseRooms(); err != nil {
- fmt.Println("Failed to advertise room in DHT:", err)
- }
- d.foundRoomsMutex.RLock()
- defer d.foundRoomsMutex.RUnlock()
- fmt.Println("Found", len(d.foundRooms), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)")
- d.maintenanceTimer = time.AfterFunc(MaintenanceInterval, d.Interval)
-}
-
-func (d *PublicRoomsServerDatabase) AdvertiseRooms() error {
- dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second)
- _ = dbCancel
- ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__")
- if err != nil {
- return err
- }
- advertised := 0
- for _, room := range ourRooms {
- if j, err := json.Marshal(room); err == nil {
- if topic, err := d.pubsub.Join("/matrix/publicRooms"); err != nil {
- fmt.Println("Failed to subscribe to topic:", err)
- } else if err := topic.Publish(context.TODO(), j); err != nil {
- fmt.Println("Failed to publish public room:", err)
- } else {
- advertised++
- }
- }
- }
-
- d.roomsAdvertised.Store(advertised)
- return nil
-}
-
-func (d *PublicRoomsServerDatabase) FindRooms() {
- for {
- msg, err := d.subscription.Next(context.Background())
- if err != nil {
- continue
- }
- received := discoveredRoom{
- time: time.Now(),
- }
- if err := json.Unmarshal(msg.Data, &received.room); err != nil {
- fmt.Println("Unmarshal error:", err)
- continue
- }
- d.foundRoomsMutex.Lock()
- d.foundRooms[received.room.RoomID] = received
- d.foundRoomsMutex.Unlock()
- }
-}