diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-04-07 17:08:15 +0100 |
---|---|---|
committer | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-04-07 17:08:15 +0100 |
commit | 7902859bc1df63665ac329f8b47457a437589a87 (patch) | |
tree | 159403bf62570725f98e5fa322e63d0ca4588c20 | |
parent | 87b1ed1338fa610bc550fdac4ab4b17202b46cc1 (diff) |
Fix lock contention in sync notifier
-rw-r--r-- | syncapi/notifier/notifier.go | 210 | ||||
-rw-r--r-- | syncapi/notifier/notifier_test.go | 8 |
2 files changed, 105 insertions, 113 deletions
diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go index 44f13338..443744b6 100644 --- a/syncapi/notifier/notifier.go +++ b/syncapi/notifier/notifier.go @@ -25,28 +25,28 @@ import ( log "github.com/sirupsen/logrus" ) +// NOTE: ALL FUNCTIONS IN THIS FILE PREFIXED WITH _ ARE NOT THREAD-SAFE +// AND MUST ONLY BE CALLED WHEN THE NOTIFIER LOCK IS HELD! + // Notifier will wake up sleeping requests when there is some new data. // It does not tell requests what that data is, only the sync position which // they can use to get at it. This is done to prevent races whereby we tell the caller // the event, but the token has already advanced by the time they fetch it, resulting // in missed events. type Notifier struct { + lock *sync.RWMutex // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine roomIDToJoinedUsers map[string]userIDSet // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine roomIDToPeekingDevices map[string]peekingDeviceSet - // Protects currPos and userStreams. - streamLock *sync.Mutex // The latest sync position currPos types.StreamingToken // A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request. userDeviceStreams map[string]map[string]*UserDeviceStream // The last time we cleaned out stale entries from the userStreams map lastCleanUpTime time.Time - // Protects roomIDToJoinedUsers and roomIDToPeekingDevices - mapLock *sync.RWMutex // This map is reused to prevent allocations and GC pressure in SharedUsers. - _sharedUsers map[string]struct{} + _sharedUserMap map[string]struct{} } // NewNotifier creates a new notifier set to the given sync position. @@ -57,18 +57,17 @@ func NewNotifier() *Notifier { roomIDToJoinedUsers: make(map[string]userIDSet), roomIDToPeekingDevices: make(map[string]peekingDeviceSet), userDeviceStreams: make(map[string]map[string]*UserDeviceStream), - streamLock: &sync.Mutex{}, - mapLock: &sync.RWMutex{}, + lock: &sync.RWMutex{}, lastCleanUpTime: time.Now(), - _sharedUsers: map[string]struct{}{}, + _sharedUserMap: map[string]struct{}{}, } } // SetCurrentPosition sets the current streaming positions. // This must be called directly after NewNotifier and initialising the streams. func (n *Notifier) SetCurrentPosition(currPos types.StreamingToken) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos = currPos } @@ -89,17 +88,16 @@ func (n *Notifier) OnNewEvent( ) { // update the current position then notify relevant /sync streams. // This needs to be done PRIOR to waking up users as they will read this value. - n.streamLock.Lock() - defer n.streamLock.Unlock() - + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.removeEmptyUserStreams() + n._removeEmptyUserStreams() if ev != nil { // Map this event's room_id to a list of joined users, and wake them up. - usersToNotify := n.JoinedUsers(ev.RoomID()) + usersToNotify := n._joinedUsers(ev.RoomID()) // Map this event's room_id to a list of peeking devices, and wake them up. - peekingDevicesToNotify := n.PeekingDevices(ev.RoomID()) + peekingDevicesToNotify := n._peekingDevices(ev.RoomID()) // If this is an invite, also add in the invitee to this list. if ev.Type() == "m.room.member" && ev.StateKey() != nil { targetUserID := *ev.StateKey() @@ -117,20 +115,20 @@ func (n *Notifier) OnNewEvent( // Manually append the new user's ID so they get notified // along all members in the room usersToNotify = append(usersToNotify, targetUserID) - n.addJoinedUser(ev.RoomID(), targetUserID) + n._addJoinedUser(ev.RoomID(), targetUserID) case gomatrixserverlib.Leave: fallthrough case gomatrixserverlib.Ban: - n.removeJoinedUser(ev.RoomID(), targetUserID) + n._removeJoinedUser(ev.RoomID(), targetUserID) } } } - n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos) + n._wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos) } else if roomID != "" { - n.wakeupUsers(n.JoinedUsers(roomID), n.PeekingDevices(roomID), n.currPos) + n._wakeupUsers(n._joinedUsers(roomID), n._peekingDevices(roomID), n.currPos) } else if len(userIDs) > 0 { - n.wakeupUsers(userIDs, nil, n.currPos) + n._wakeupUsers(userIDs, nil, n.currPos) } else { log.WithFields(log.Fields{ "posUpdate": posUpdate.String, @@ -141,22 +139,22 @@ func (n *Notifier) OnNewEvent( func (n *Notifier) OnNewAccountData( userID string, posUpdate types.StreamingToken, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers([]string{userID}, nil, posUpdate) + n._wakeupUsers([]string{userID}, nil, posUpdate) } func (n *Notifier) OnNewPeek( roomID, userID, deviceID string, posUpdate types.StreamingToken, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.addPeekingDevice(roomID, userID, deviceID) + n._addPeekingDevice(roomID, userID, deviceID) // we don't wake up devices here given the roomserver consumer will do this shortly afterwards // by calling OnNewEvent. @@ -166,11 +164,11 @@ func (n *Notifier) OnRetirePeek( roomID, userID, deviceID string, posUpdate types.StreamingToken, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.removePeekingDevice(roomID, userID, deviceID) + n._removePeekingDevice(roomID, userID, deviceID) // we don't wake up devices here given the roomserver consumer will do this shortly afterwards // by calling OnRetireEvent. @@ -180,11 +178,11 @@ func (n *Notifier) OnNewSendToDevice( userID string, deviceIDs []string, posUpdate types.StreamingToken, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUserDevice(userID, deviceIDs, n.currPos) + n._wakeupUserDevice(userID, deviceIDs, n.currPos) } // OnNewReceipt updates the current position @@ -192,11 +190,11 @@ func (n *Notifier) OnNewTyping( roomID string, posUpdate types.StreamingToken, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos) + n._wakeupUsers(n._joinedUsers(roomID), nil, n.currPos) } // OnNewReceipt updates the current position @@ -204,80 +202,84 @@ func (n *Notifier) OnNewReceipt( roomID string, posUpdate types.StreamingToken, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers(n.JoinedUsers(roomID), nil, n.currPos) + n._wakeupUsers(n._joinedUsers(roomID), nil, n.currPos) } func (n *Notifier) OnNewKeyChange( posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) + n._wakeupUsers([]string{wakeUserID}, nil, n.currPos) } func (n *Notifier) OnNewInvite( posUpdate types.StreamingToken, wakeUserID string, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) + n._wakeupUsers([]string{wakeUserID}, nil, n.currPos) } func (n *Notifier) OnNewNotificationData( userID string, posUpdate types.StreamingToken, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers([]string{userID}, nil, n.currPos) + n._wakeupUsers([]string{userID}, nil, n.currPos) } func (n *Notifier) OnNewPresence( posUpdate types.StreamingToken, userID string, ) { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() n.currPos.ApplyUpdates(posUpdate) - sharedUsers := n.SharedUsers(userID) + sharedUsers := n._sharedUsers(userID) sharedUsers = append(sharedUsers, userID) - n.wakeupUsers(sharedUsers, nil, n.currPos) + n._wakeupUsers(sharedUsers, nil, n.currPos) } func (n *Notifier) SharedUsers(userID string) []string { - n.mapLock.RLock() - defer n.mapLock.RUnlock() - n._sharedUsers[userID] = struct{}{} + n.lock.RLock() + defer n.lock.RUnlock() + return n._sharedUsers(userID) +} + +func (n *Notifier) _sharedUsers(userID string) []string { + n._sharedUserMap[userID] = struct{}{} for roomID, users := range n.roomIDToJoinedUsers { if _, ok := users[userID]; !ok { continue } - for _, userID := range n.JoinedUsers(roomID) { - n._sharedUsers[userID] = struct{}{} + for _, userID := range n._joinedUsers(roomID) { + n._sharedUserMap[userID] = struct{}{} } } - sharedUsers := make([]string, 0, len(n._sharedUsers)+1) - for userID := range n._sharedUsers { + sharedUsers := make([]string, 0, len(n._sharedUserMap)+1) + for userID := range n._sharedUserMap { sharedUsers = append(sharedUsers, userID) - delete(n._sharedUsers, userID) + delete(n._sharedUserMap, userID) } return sharedUsers } func (n *Notifier) IsSharedUser(userA, userB string) bool { - n.mapLock.RLock() - defer n.mapLock.RUnlock() + n.lock.RLock() + defer n.lock.RUnlock() var okA, okB bool for _, users := range n.roomIDToJoinedUsers { _, okA = users[userA] @@ -301,18 +303,18 @@ func (n *Notifier) GetListener(req types.SyncRequest) UserDeviceStreamListener { // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, // but given we don't do /events, let's pretend it doesn't exist. - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() - n.removeEmptyUserStreams() + n._removeEmptyUserStreams() - return n.fetchUserDeviceStream(req.Device.UserID, req.Device.ID, true).GetListener(req.Context) + return n._fetchUserDeviceStream(req.Device.UserID, req.Device.ID, true).GetListener(req.Context) } // Load the membership states required to notify users correctly. func (n *Notifier) Load(ctx context.Context, db storage.Database) error { - n.mapLock.Lock() - defer n.mapLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() roomToUsers, err := db.AllJoinedUsersInRooms(ctx) if err != nil { return err @@ -330,8 +332,8 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error { // CurrentPosition returns the current sync position func (n *Notifier) CurrentPosition() types.StreamingToken { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.RLock() + defer n.lock.RUnlock() return n.currPos } @@ -366,11 +368,11 @@ func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.P } } -// wakeupUsers will wake up the sync strems for all of the devices for all of the +// _wakeupUsers will wake up the sync strems for all of the devices for all of the // specified user IDs, and also the specified peekingDevices -func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) { +func (n *Notifier) _wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) { for _, userID := range userIDs { - for _, stream := range n.fetchUserStreams(userID) { + for _, stream := range n._fetchUserStreams(userID) { if stream == nil { continue } @@ -380,28 +382,27 @@ func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingD for _, peekingDevice := range peekingDevices { // TODO: don't bother waking up for devices whose users we already woke up - if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil { + if stream := n._fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil { stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream } } } -// wakeupUserDevice will wake up the sync stream for a specific user device. Other +// _wakeupUserDevice will wake up the sync stream for a specific user device. Other // device streams will be left alone. // nolint:unused -func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) { +func (n *Notifier) _wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) { for _, deviceID := range deviceIDs { - if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil { + if stream := n._fetchUserDeviceStream(userID, deviceID, false); stream != nil { stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream } } } -// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true, +// _fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true, // a stream will be made for this device if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream { +func (n *Notifier) _fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream { _, ok := n.userDeviceStreams[userID] if !ok { if !makeIfNotExists { @@ -422,11 +423,10 @@ func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExist return stream } -// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true, +// _fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true, // a stream will be made for this user if one doesn't exist and it will be returned. This // function does not wait for data to be available on the stream. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream { +func (n *Notifier) _fetchUserStreams(userID string) []*UserDeviceStream { user, ok := n.userDeviceStreams[userID] if !ok { return []*UserDeviceStream{} @@ -438,51 +438,41 @@ func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream { return streams } -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) addJoinedUser(roomID, userID string) { - n.mapLock.Lock() - defer n.mapLock.Unlock() +func (n *Notifier) _addJoinedUser(roomID, userID string) { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(userIDSet) } n.roomIDToJoinedUsers[roomID].add(userID) } -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) removeJoinedUser(roomID, userID string) { - n.mapLock.Lock() - defer n.mapLock.Unlock() +func (n *Notifier) _removeJoinedUser(roomID, userID string) { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { n.roomIDToJoinedUsers[roomID] = make(userIDSet) } n.roomIDToJoinedUsers[roomID].remove(userID) } -// Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) JoinedUsers(roomID string) (userIDs []string) { - n.mapLock.RLock() - defer n.mapLock.RUnlock() + n.lock.RLock() + defer n.lock.RUnlock() + return n._joinedUsers(roomID) +} + +func (n *Notifier) _joinedUsers(roomID string) (userIDs []string) { if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { return } return n.roomIDToJoinedUsers[roomID].values() } -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { - n.mapLock.Lock() - defer n.mapLock.Unlock() +func (n *Notifier) _addPeekingDevice(roomID, userID, deviceID string) { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) } n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID}) } -// Not thread-safe: must be called on the OnNewEvent goroutine only -// nolint:unused -func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { - n.mapLock.Lock() - defer n.mapLock.Unlock() +func (n *Notifier) _removePeekingDevice(roomID, userID, deviceID string) { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) } @@ -490,24 +480,26 @@ func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID}) } -// Not thread-safe: must be called on the OnNewEvent goroutine only func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) { - n.mapLock.RLock() - defer n.mapLock.RUnlock() + n.lock.RLock() + defer n.lock.RUnlock() + return n._peekingDevices(roomID) +} + +func (n *Notifier) _peekingDevices(roomID string) (peekingDevices []types.PeekingDevice) { if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { return } return n.roomIDToPeekingDevices[roomID].values() } -// removeEmptyUserStreams iterates through the user stream map and removes any +// _removeEmptyUserStreams iterates through the user stream map and removes any // that have been empty for a certain amount of time. This is a crude way of // ensuring that the userStreams map doesn't grow forver. // This should be called when the notifier gets called for whatever reason, // the function itself is responsible for ensuring it doesn't iterate too // often. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) removeEmptyUserStreams() { +func (n *Notifier) _removeEmptyUserStreams() { // Only clean up now and again now := time.Now() if n.lastCleanUpTime.Add(time.Minute).After(now) { diff --git a/syncapi/notifier/notifier_test.go b/syncapi/notifier/notifier_test.go index 8b305586..2273ad76 100644 --- a/syncapi/notifier/notifier_test.go +++ b/syncapi/notifier/notifier_test.go @@ -175,7 +175,7 @@ func TestCorrectStreamWakeup(t *testing.T) { time.Sleep(1 * time.Second) wake := "two" - n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter) + n._wakeupUserDevice(alice, []string{wake}, syncPositionAfter) if result := <-awoken; result != wake { t.Fatalf("expected to wake %q, got %q", wake, result) @@ -359,10 +359,10 @@ func waitForBlocking(s *UserDeviceStream, numBlocking uint) { // lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock. // A new stream is made if it doesn't exist already. func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream { - n.streamLock.Lock() - defer n.streamLock.Unlock() + n.lock.Lock() + defer n.lock.Unlock() - return n.fetchUserDeviceStream(userID, deviceID, true) + return n._fetchUserDeviceStream(userID, deviceID, true) } func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) types.SyncRequest { |