diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2020-12-03 11:11:46 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-12-03 11:11:46 +0000 |
commit | be7d8595be0533207f8942b129c16f3844550712 (patch) | |
tree | 7203824174cbec3f2bf33b4f62bfe25083de1b97 /syncapi | |
parent | 2b03d24358aeac14ba7c8c63e35012d6e91c1509 (diff) |
Peeking updates (#1607)
* Add unpeek
* Don't allow peeks into encrypted rooms
* Fix send tests
* Update consumers
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/roomserver.go | 22 | ||||
-rw-r--r-- | syncapi/storage/interface.go | 3 | ||||
-rw-r--r-- | syncapi/storage/shared/syncserver.go | 17 | ||||
-rw-r--r-- | syncapi/sync/notifier.go | 12 |
4 files changed, 54 insertions, 0 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index 4d453f13..11d75a68 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -105,6 +105,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent) case api.OutputTypeNewPeek: return s.onNewPeek(context.TODO(), *output.NewPeek) + case api.OutputTypeRetirePeek: + return s.onRetirePeek(context.TODO(), *output.RetirePeek) case api.OutputTypeRedactedEvent: return s.onRedactEvent(context.TODO(), *output.RedactedEvent) default: @@ -309,6 +311,26 @@ func (s *OutputRoomEventConsumer) onNewPeek( return nil } +func (s *OutputRoomEventConsumer) onRetirePeek( + ctx context.Context, msg api.OutputRetirePeek, +) error { + sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID) + if err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + log.ErrorKey: err, + }).Panicf("roomserver output log: write peek failure") + return nil + } + // tell the notifier about the new peek so it knows to wake up new devices + s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID) + + // we need to wake up the users who might need to now be peeking into this room, + // so we send in a dummy event to trigger a wakeup + s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, nil)) + return nil +} + func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) { if event.StateKey() == nil { return event, nil diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index eaa0f64f..456ca1b1 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -91,6 +91,9 @@ type Database interface { // AddPeek adds a new peek to our DB for a given room by a given user's device. // Returns an error if there was a problem communicating with the database. AddPeek(ctx context.Context, RoomID, UserID, DeviceID string) (types.StreamPosition, error) + // DeletePeek removes an existing peek from the database for a given room by a user's device. + // Returns an error if there was a problem communicating with the database. + DeletePeek(ctx context.Context, roomID, userID, deviceID string) (sp types.StreamPosition, err error) // DeletePeek deletes all peeks for a given room by a given user // Returns an error if there was a problem communicating with the database. DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index fd8ca041..6c35a765 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -178,6 +178,23 @@ func (d *Database) AddPeek( return } +// DeletePeeks tracks the fact that a user has stopped peeking from the specified +// device. If the peeks was successfully deleted this returns the stream ID it was +// stored at. Returns an error if there was a problem communicating with the database. +func (d *Database) DeletePeek( + ctx context.Context, roomID, userID, deviceID string, +) (sp types.StreamPosition, err error) { + err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { + sp, err = d.Peeks.DeletePeek(ctx, txn, roomID, userID, deviceID) + return err + }) + if err == sql.ErrNoRows { + sp = 0 + err = nil + } + return +} + // DeletePeeks tracks the fact that a user has stopped peeking from all devices // If the peeks was successfully deleted this returns the stream ID it was stored at. // Returns an error if there was a problem communicating with the database. diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index daa3a1d8..1d8cd624 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -137,6 +137,18 @@ func (n *Notifier) OnNewPeek( // by calling OnNewEvent. } +func (n *Notifier) OnRetirePeek( + roomID, userID, deviceID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + + n.removePeekingDevice(roomID, userID, deviceID) + + // we don't wake up devices here given the roomserver consumer will do this shortly afterwards + // by calling OnRetireEvent. +} + func (n *Notifier) OnNewSendToDevice( userID string, deviceIDs []string, posUpdate types.StreamingToken, |