aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-12-03 11:11:46 +0000
committerGitHub <noreply@github.com>2020-12-03 11:11:46 +0000
commitbe7d8595be0533207f8942b129c16f3844550712 (patch)
tree7203824174cbec3f2bf33b4f62bfe25083de1b97 /syncapi
parent2b03d24358aeac14ba7c8c63e35012d6e91c1509 (diff)
Peeking updates (#1607)
* Add unpeek * Don't allow peeks into encrypted rooms * Fix send tests * Update consumers
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go22
-rw-r--r--syncapi/storage/interface.go3
-rw-r--r--syncapi/storage/shared/syncserver.go17
-rw-r--r--syncapi/sync/notifier.go12
4 files changed, 54 insertions, 0 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 4d453f13..11d75a68 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -105,6 +105,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
case api.OutputTypeNewPeek:
return s.onNewPeek(context.TODO(), *output.NewPeek)
+ case api.OutputTypeRetirePeek:
+ return s.onRetirePeek(context.TODO(), *output.RetirePeek)
case api.OutputTypeRedactedEvent:
return s.onRedactEvent(context.TODO(), *output.RedactedEvent)
default:
@@ -309,6 +311,26 @@ func (s *OutputRoomEventConsumer) onNewPeek(
return nil
}
+func (s *OutputRoomEventConsumer) onRetirePeek(
+ ctx context.Context, msg api.OutputRetirePeek,
+) error {
+ sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
+ if err != nil {
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write peek failure")
+ return nil
+ }
+ // tell the notifier about the new peek so it knows to wake up new devices
+ s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID)
+
+ // we need to wake up the users who might need to now be peeking into this room,
+ // so we send in a dummy event to trigger a wakeup
+ s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, nil))
+ return nil
+}
+
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {
if event.StateKey() == nil {
return event, nil
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index eaa0f64f..456ca1b1 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -91,6 +91,9 @@ type Database interface {
// AddPeek adds a new peek to our DB for a given room by a given user's device.
// Returns an error if there was a problem communicating with the database.
AddPeek(ctx context.Context, RoomID, UserID, DeviceID string) (types.StreamPosition, error)
+ // DeletePeek removes an existing peek from the database for a given room by a user's device.
+ // Returns an error if there was a problem communicating with the database.
+ DeletePeek(ctx context.Context, roomID, userID, deviceID string) (sp types.StreamPosition, err error)
// DeletePeek deletes all peeks for a given room by a given user
// Returns an error if there was a problem communicating with the database.
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index fd8ca041..6c35a765 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -178,6 +178,23 @@ func (d *Database) AddPeek(
return
}
+// DeletePeeks tracks the fact that a user has stopped peeking from the specified
+// device. If the peeks was successfully deleted this returns the stream ID it was
+// stored at. Returns an error if there was a problem communicating with the database.
+func (d *Database) DeletePeek(
+ ctx context.Context, roomID, userID, deviceID string,
+) (sp types.StreamPosition, err error) {
+ err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ sp, err = d.Peeks.DeletePeek(ctx, txn, roomID, userID, deviceID)
+ return err
+ })
+ if err == sql.ErrNoRows {
+ sp = 0
+ err = nil
+ }
+ return
+}
+
// DeletePeeks tracks the fact that a user has stopped peeking from all devices
// If the peeks was successfully deleted this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index daa3a1d8..1d8cd624 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -137,6 +137,18 @@ func (n *Notifier) OnNewPeek(
// by calling OnNewEvent.
}
+func (n *Notifier) OnRetirePeek(
+ roomID, userID, deviceID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.removePeekingDevice(roomID, userID, deviceID)
+
+ // we don't wake up devices here given the roomserver consumer will do this shortly afterwards
+ // by calling OnRetireEvent.
+}
+
func (n *Notifier) OnNewSendToDevice(
userID string, deviceIDs []string,
posUpdate types.StreamingToken,