aboutsummaryrefslogtreecommitdiff
path: root/syncapi/notifier/notifier.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/notifier/notifier.go')
-rw-r--r--syncapi/notifier/notifier.go481
1 files changed, 481 insertions, 0 deletions
diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go
new file mode 100644
index 00000000..d853cc0e
--- /dev/null
+++ b/syncapi/notifier/notifier.go
@@ -0,0 +1,481 @@
+// Copyright 2017 Vector Creations Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package notifier
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ log "github.com/sirupsen/logrus"
+)
+
+// Notifier will wake up sleeping requests when there is some new data.
+// It does not tell requests what that data is, only the sync position which
+// they can use to get at it. This is done to prevent races whereby we tell the caller
+// the event, but the token has already advanced by the time they fetch it, resulting
+// in missed events.
+type Notifier struct {
+ // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
+ roomIDToJoinedUsers map[string]userIDSet
+ // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
+ roomIDToPeekingDevices map[string]peekingDeviceSet
+ // Protects currPos and userStreams.
+ streamLock *sync.Mutex
+ // The latest sync position
+ currPos types.StreamingToken
+ // A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request.
+ userDeviceStreams map[string]map[string]*UserDeviceStream
+ // The last time we cleaned out stale entries from the userStreams map
+ lastCleanUpTime time.Time
+}
+
+// NewNotifier creates a new notifier set to the given sync position.
+// In order for this to be of any use, the Notifier needs to be told all rooms and
+// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
+func NewNotifier(currPos types.StreamingToken) *Notifier {
+ return &Notifier{
+ currPos: currPos,
+ roomIDToJoinedUsers: make(map[string]userIDSet),
+ roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
+ userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
+ streamLock: &sync.Mutex{},
+ lastCleanUpTime: time.Now(),
+ }
+}
+
+// OnNewEvent is called when a new event is received from the room server. Must only be
+// called from a single goroutine, to avoid races between updates which could set the
+// current sync position incorrectly.
+// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event
+// (based on the users in the event's room),
+// a roomID directly, or a list of user IDs, prioritised by parameter ordering.
+// posUpdate contains the latest position(s) for one or more types of events.
+// If a position in posUpdate is 0, it means no updates are available of that type.
+// Typically a consumer supplies a posUpdate with the latest sync position for the
+// event type it handles, leaving other fields as 0.
+func (n *Notifier) OnNewEvent(
+ ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
+ posUpdate types.StreamingToken,
+) {
+ // update the current position then notify relevant /sync streams.
+ // This needs to be done PRIOR to waking up users as they will read this value.
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.removeEmptyUserStreams()
+
+ if ev != nil {
+ // Map this event's room_id to a list of joined users, and wake them up.
+ usersToNotify := n.joinedUsers(ev.RoomID())
+ // Map this event's room_id to a list of peeking devices, and wake them up.
+ peekingDevicesToNotify := n.PeekingDevices(ev.RoomID())
+ // If this is an invite, also add in the invitee to this list.
+ if ev.Type() == "m.room.member" && ev.StateKey() != nil {
+ targetUserID := *ev.StateKey()
+ membership, err := ev.Membership()
+ if err != nil {
+ log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
+ "Notifier.OnNewEvent: Failed to unmarshal member event",
+ )
+ } else {
+ // Keep the joined user map up-to-date
+ switch membership {
+ case gomatrixserverlib.Invite:
+ usersToNotify = append(usersToNotify, targetUserID)
+ case gomatrixserverlib.Join:
+ // Manually append the new user's ID so they get notified
+ // along all members in the room
+ usersToNotify = append(usersToNotify, targetUserID)
+ n.addJoinedUser(ev.RoomID(), targetUserID)
+ case gomatrixserverlib.Leave:
+ fallthrough
+ case gomatrixserverlib.Ban:
+ n.removeJoinedUser(ev.RoomID(), targetUserID)
+ }
+ }
+ }
+
+ n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
+ } else if roomID != "" {
+ n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
+ } else if len(userIDs) > 0 {
+ n.wakeupUsers(userIDs, nil, n.currPos)
+ } else {
+ log.WithFields(log.Fields{
+ "posUpdate": posUpdate.String,
+ }).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up")
+ }
+}
+
+func (n *Notifier) OnNewAccountData(
+ userID string, posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers([]string{userID}, nil, posUpdate)
+}
+
+func (n *Notifier) OnNewPeek(
+ roomID, userID, deviceID string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.addPeekingDevice(roomID, userID, deviceID)
+
+ // we don't wake up devices here given the roomserver consumer will do this shortly afterwards
+ // by calling OnNewEvent.
+}
+
+func (n *Notifier) OnRetirePeek(
+ roomID, userID, deviceID string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.removePeekingDevice(roomID, userID, deviceID)
+
+ // we don't wake up devices here given the roomserver consumer will do this shortly afterwards
+ // by calling OnRetireEvent.
+}
+
+func (n *Notifier) OnNewSendToDevice(
+ userID string, deviceIDs []string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUserDevice(userID, deviceIDs, n.currPos)
+}
+
+// OnNewReceipt updates the current position
+func (n *Notifier) OnNewTyping(
+ roomID string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
+}
+
+// OnNewReceipt updates the current position
+func (n *Notifier) OnNewReceipt(
+ roomID string,
+ posUpdate types.StreamingToken,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
+}
+
+func (n *Notifier) OnNewKeyChange(
+ posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
+}
+
+func (n *Notifier) OnNewInvite(
+ posUpdate types.StreamingToken, wakeUserID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.currPos.ApplyUpdates(posUpdate)
+ n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
+}
+
+// GetListener returns a UserStreamListener that can be used to wait for
+// updates for a user. Must be closed.
+// notify for anything before sincePos
+func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener {
+ // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
+ // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
+ // - Incoming events wake requests for a matching room ID
+ // - Incoming events wake requests for a matching user ID (needed for invites)
+
+ // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
+ // but given we don't do /events, let's pretend it doesn't exist.
+
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ n.removeEmptyUserStreams()
+
+ return n.fetchUserDeviceStream(req.Device.UserID, req.Device.ID, true).GetListener(req.Context)
+}
+
+// Load the membership states required to notify users correctly.
+func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
+ roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
+ if err != nil {
+ return err
+ }
+ n.setUsersJoinedToRooms(roomToUsers)
+
+ roomToPeekingDevices, err := db.AllPeekingDevicesInRooms(ctx)
+ if err != nil {
+ return err
+ }
+ n.setPeekingDevices(roomToPeekingDevices)
+
+ return nil
+}
+
+// CurrentPosition returns the current sync position
+func (n *Notifier) CurrentPosition() types.StreamingToken {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ return n.currPos
+}
+
+// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
+// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
+// OnNewEvent (eg on startup) to prevent racing.
+func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
+ // This is just the bulk form of addJoinedUser
+ for roomID, userIDs := range roomIDToUserIDs {
+ if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
+ n.roomIDToJoinedUsers[roomID] = make(userIDSet)
+ }
+ for _, userID := range userIDs {
+ n.roomIDToJoinedUsers[roomID].add(userID)
+ }
+ }
+}
+
+// setPeekingDevices marks the given devices as peeking in the given rooms, such that new events from
+// these rooms will wake the given devices' /sync requests. This should be called prior to ANY calls to
+// OnNewEvent (eg on startup) to prevent racing.
+func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.PeekingDevice) {
+ // This is just the bulk form of addPeekingDevice
+ for roomID, peekingDevices := range roomIDToPeekingDevices {
+ if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
+ n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
+ }
+ for _, peekingDevice := range peekingDevices {
+ n.roomIDToPeekingDevices[roomID].add(peekingDevice)
+ }
+ }
+}
+
+// wakeupUsers will wake up the sync strems for all of the devices for all of the
+// specified user IDs, and also the specified peekingDevices
+func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
+ for _, userID := range userIDs {
+ for _, stream := range n.fetchUserStreams(userID) {
+ if stream == nil {
+ continue
+ }
+ stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
+ }
+ }
+
+ for _, peekingDevice := range peekingDevices {
+ // TODO: don't bother waking up for devices whose users we already woke up
+ if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil {
+ stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
+ }
+ }
+}
+
+// wakeupUserDevice will wake up the sync stream for a specific user device. Other
+// device streams will be left alone.
+// nolint:unused
+func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) {
+ for _, deviceID := range deviceIDs {
+ if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil {
+ stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
+ }
+ }
+}
+
+// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true,
+// a stream will be made for this device if one doesn't exist and it will be returned. This
+// function does not wait for data to be available on the stream.
+// NB: Callers should have locked the mutex before calling this function.
+func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream {
+ _, ok := n.userDeviceStreams[userID]
+ if !ok {
+ if !makeIfNotExists {
+ return nil
+ }
+ n.userDeviceStreams[userID] = map[string]*UserDeviceStream{}
+ }
+ stream, ok := n.userDeviceStreams[userID][deviceID]
+ if !ok {
+ if !makeIfNotExists {
+ return nil
+ }
+ // TODO: Unbounded growth of streams (1 per user)
+ if stream = NewUserDeviceStream(userID, deviceID, n.currPos); stream != nil {
+ n.userDeviceStreams[userID][deviceID] = stream
+ }
+ }
+ return stream
+}
+
+// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true,
+// a stream will be made for this user if one doesn't exist and it will be returned. This
+// function does not wait for data to be available on the stream.
+// NB: Callers should have locked the mutex before calling this function.
+func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
+ user, ok := n.userDeviceStreams[userID]
+ if !ok {
+ return []*UserDeviceStream{}
+ }
+ streams := []*UserDeviceStream{}
+ for _, stream := range user {
+ streams = append(streams, stream)
+ }
+ return streams
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) addJoinedUser(roomID, userID string) {
+ if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
+ n.roomIDToJoinedUsers[roomID] = make(userIDSet)
+ }
+ n.roomIDToJoinedUsers[roomID].add(userID)
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) removeJoinedUser(roomID, userID string) {
+ if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
+ n.roomIDToJoinedUsers[roomID] = make(userIDSet)
+ }
+ n.roomIDToJoinedUsers[roomID].remove(userID)
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
+ if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
+ return
+ }
+ return n.roomIDToJoinedUsers[roomID].values()
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
+ if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
+ n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
+ }
+ n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+// nolint:unused
+func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
+ if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
+ n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
+ }
+ // XXX: is this going to work as a key?
+ n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
+}
+
+// Not thread-safe: must be called on the OnNewEvent goroutine only
+func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
+ if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
+ return
+ }
+ return n.roomIDToPeekingDevices[roomID].values()
+}
+
+// removeEmptyUserStreams iterates through the user stream map and removes any
+// that have been empty for a certain amount of time. This is a crude way of
+// ensuring that the userStreams map doesn't grow forver.
+// This should be called when the notifier gets called for whatever reason,
+// the function itself is responsible for ensuring it doesn't iterate too
+// often.
+// NB: Callers should have locked the mutex before calling this function.
+func (n *Notifier) removeEmptyUserStreams() {
+ // Only clean up now and again
+ now := time.Now()
+ if n.lastCleanUpTime.Add(time.Minute).After(now) {
+ return
+ }
+ n.lastCleanUpTime = now
+
+ deleteBefore := now.Add(-5 * time.Minute)
+ for user, byUser := range n.userDeviceStreams {
+ for device, stream := range byUser {
+ if stream.TimeOfLastNonEmpty().Before(deleteBefore) {
+ delete(n.userDeviceStreams[user], device)
+ }
+ if len(n.userDeviceStreams[user]) == 0 {
+ delete(n.userDeviceStreams, user)
+ }
+ }
+ }
+}
+
+// A string set, mainly existing for improving clarity of structs in this file.
+type userIDSet map[string]bool
+
+func (s userIDSet) add(str string) {
+ s[str] = true
+}
+
+func (s userIDSet) remove(str string) {
+ delete(s, str)
+}
+
+func (s userIDSet) values() (vals []string) {
+ for str := range s {
+ vals = append(vals, str)
+ }
+ return
+}
+
+// A set of PeekingDevices, similar to userIDSet
+
+type peekingDeviceSet map[types.PeekingDevice]bool
+
+func (s peekingDeviceSet) add(d types.PeekingDevice) {
+ s[d] = true
+}
+
+// nolint:unused
+func (s peekingDeviceSet) remove(d types.PeekingDevice) {
+ delete(s, d)
+}
+
+func (s peekingDeviceSet) values() (vals []types.PeekingDevice) {
+ for d := range s {
+ vals = append(vals, d)
+ }
+ return
+}