diff options
author | Kegsay <kegan@matrix.org> | 2020-07-02 17:11:33 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-02 17:11:33 +0100 |
commit | 9c1f38621c4d787761092bc841e06ca424fbbf35 (patch) | |
tree | c7663e2f1f0af6414d1d66567265f5158360d8f2 /cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go | |
parent | 4c1e6597c0ea82f5390b73f35036db58e65542cc (diff) |
Remove publicroomsapi (#1176)
* Remove all of publicroomsapi
* Remove references to publicroomsapi
* Remove doc references to publicroomsapi
Diffstat (limited to 'cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go')
-rw-r--r-- | cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go | 179 |
1 files changed, 0 insertions, 179 deletions
diff --git a/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go b/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go deleted file mode 100644 index cf642eb3..00000000 --- a/cmd/dendrite-demo-libp2p/storage/postgreswithpubsub/storage.go +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package postgreswithpubsub - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/matrix-org/dendrite/publicroomsapi/storage/postgres" - "github.com/matrix-org/gomatrixserverlib" - - pubsub "github.com/libp2p/go-libp2p-pubsub" -) - -const MaintenanceInterval = time.Second * 10 - -type discoveredRoom struct { - time time.Time - room gomatrixserverlib.PublicRoom -} - -// PublicRoomsServerDatabase represents a public rooms server database. -type PublicRoomsServerDatabase struct { - postgres.PublicRoomsServerDatabase // - pubsub *pubsub.PubSub // - subscription *pubsub.Subscription // - foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT - foundRoomsMutex sync.RWMutex // protects foundRooms - maintenanceTimer *time.Timer // - roomsAdvertised atomic.Value // stores int -} - -// NewPublicRoomsServerDatabase creates a new public rooms server database. -func NewPublicRoomsServerDatabase(dataSourceName string, pubsub *pubsub.PubSub, localServerName gomatrixserverlib.ServerName) (*PublicRoomsServerDatabase, error) { - pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName, nil, localServerName) - if err != nil { - return nil, err - } - provider := PublicRoomsServerDatabase{ - pubsub: pubsub, - PublicRoomsServerDatabase: *pg, - foundRooms: make(map[string]discoveredRoom), - } - if topic, err := pubsub.Join("/matrix/publicRooms"); err != nil { - return nil, err - } else if sub, err := topic.Subscribe(); err == nil { - provider.subscription = sub - go provider.MaintenanceTimer() - go provider.FindRooms() - provider.roomsAdvertised.Store(0) - return &provider, nil - } else { - return nil, err - } -} - -func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) { - return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID) -} - -func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error { - d.MaintenanceTimer() - return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID) -} - -func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) { - d.foundRoomsMutex.RLock() - defer d.foundRoomsMutex.RUnlock() - return int64(len(d.foundRooms)), nil -} - -func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) { - var rooms []gomatrixserverlib.PublicRoom - if filter == "__local__" { - if r, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, ""); err == nil { - rooms = append(rooms, r...) - } else { - return []gomatrixserverlib.PublicRoom{}, err - } - } else { - d.foundRoomsMutex.RLock() - defer d.foundRoomsMutex.RUnlock() - for _, room := range d.foundRooms { - rooms = append(rooms, room.room) - } - } - return rooms, nil -} - -func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error { - return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove) -} - -func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error { - return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event) -} - -func (d *PublicRoomsServerDatabase) MaintenanceTimer() { - if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() { - <-d.maintenanceTimer.C - } - d.Interval() -} - -func (d *PublicRoomsServerDatabase) Interval() { - d.foundRoomsMutex.Lock() - for k, v := range d.foundRooms { - if time.Since(v.time) > time.Minute { - delete(d.foundRooms, k) - } - } - d.foundRoomsMutex.Unlock() - if err := d.AdvertiseRooms(); err != nil { - fmt.Println("Failed to advertise room in DHT:", err) - } - d.foundRoomsMutex.RLock() - defer d.foundRoomsMutex.RUnlock() - fmt.Println("Found", len(d.foundRooms), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)") - d.maintenanceTimer = time.AfterFunc(MaintenanceInterval, d.Interval) -} - -func (d *PublicRoomsServerDatabase) AdvertiseRooms() error { - dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second) - _ = dbCancel - ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__") - if err != nil { - return err - } - advertised := 0 - for _, room := range ourRooms { - if j, err := json.Marshal(room); err == nil { - if topic, err := d.pubsub.Join("/matrix/publicRooms"); err != nil { - fmt.Println("Failed to subscribe to topic:", err) - } else if err := topic.Publish(context.TODO(), j); err != nil { - fmt.Println("Failed to publish public room:", err) - } else { - advertised++ - } - } - } - - d.roomsAdvertised.Store(advertised) - return nil -} - -func (d *PublicRoomsServerDatabase) FindRooms() { - for { - msg, err := d.subscription.Next(context.Background()) - if err != nil { - continue - } - received := discoveredRoom{ - time: time.Now(), - } - if err := json.Unmarshal(msg.Data, &received.room); err != nil { - fmt.Println("Unmarshal error:", err) - continue - } - d.foundRoomsMutex.Lock() - d.foundRooms[received.room.RoomID] = received - d.foundRoomsMutex.Unlock() - } -} |