diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-01-08 16:59:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-08 16:59:06 +0000 |
commit | b5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch) | |
tree | b3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/notifier/userstream.go | |
parent | 56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff) |
Sync refactor — Part 1 (#1688)
* It's half-alive
* Wakeups largely working
* Other tweaks, typing works
* Fix bugs, add receipt stream
* Delete notifier, other tweaks
* Dedupe a bit, add a template for the invite stream
* Clean up, add templates for other streams
* Don't leak channels
* Bring forward some more PDU logic, clean up other places
* Add some more wakeups
* Use addRoomDeltaToResponse
* Log tweaks, typing fixed?
* Fix timed out syncs
* Don't reset next batch position on timeout
* Add account data stream/position
* End of day
* Fix complete sync for receipt, typing
* Streams package
* Clean up a bit
* Complete sync send-to-device
* Don't drop errors
* More lightweight notifications
* Fix typing positions
* Don't advance position on remove again unless needed
* Device list updates
* Advance account data position
* Use limit for incremental sync
* Limit fixes, amongst other things
* Remove some fmt.Println
* Tweaks
* Re-add notifier
* Fix invite position
* Fixes
* Notify account data without advancing PDU position in notifier
* Apply account data position
* Get initial position for account data
* Fix position update
* Fix complete sync positions
* Review comments @Kegsay
* Room consumer parameters
Diffstat (limited to 'syncapi/notifier/userstream.go')
-rw-r--r-- | syncapi/notifier/userstream.go | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/syncapi/notifier/userstream.go b/syncapi/notifier/userstream.go new file mode 100644 index 00000000..720185d5 --- /dev/null +++ b/syncapi/notifier/userstream.go @@ -0,0 +1,162 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package notifier + +import ( + "context" + "runtime" + "sync" + "time" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +// UserDeviceStream represents a communication mechanism between the /sync request goroutine +// and the underlying sync server goroutines. +// Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() +// updates. +type UserDeviceStream struct { + UserID string + DeviceID string + // The lock that protects changes to this struct + lock sync.Mutex + // Closed when there is an update. + signalChannel chan struct{} + // The last sync position that there may have been an update for the user + pos types.StreamingToken + // The last time when we had some listeners waiting + timeOfLastChannel time.Time + // The number of listeners waiting + numWaiting uint +} + +// UserDeviceStreamListener allows a sync request to wait for updates for a user. +type UserDeviceStreamListener struct { + userStream *UserDeviceStream + + // Whether the stream has been closed + hasClosed bool +} + +// NewUserDeviceStream creates a new user stream +func NewUserDeviceStream(userID, deviceID string, currPos types.StreamingToken) *UserDeviceStream { + return &UserDeviceStream{ + UserID: userID, + DeviceID: deviceID, + timeOfLastChannel: time.Now(), + pos: currPos, + signalChannel: make(chan struct{}), + } +} + +// GetListener returns UserStreamListener that a sync request can use to wait +// for new updates with. +// UserStreamListener must be closed +func (s *UserDeviceStream) GetListener(ctx context.Context) UserDeviceStreamListener { + s.lock.Lock() + defer s.lock.Unlock() + + s.numWaiting++ // We decrement when UserStreamListener is closed + + listener := UserDeviceStreamListener{ + userStream: s, + } + + // Lets be a bit paranoid here and check that Close() is being called + runtime.SetFinalizer(&listener, func(l *UserDeviceStreamListener) { + if !l.hasClosed { + l.Close() + } + }) + + return listener +} + +// Broadcast a new sync position for this user. +func (s *UserDeviceStream) Broadcast(pos types.StreamingToken) { + s.lock.Lock() + defer s.lock.Unlock() + + s.pos = pos + + close(s.signalChannel) + + s.signalChannel = make(chan struct{}) +} + +// NumWaiting returns the number of goroutines waiting for waiting for updates. +// Used for metrics and testing. +func (s *UserDeviceStream) NumWaiting() uint { + s.lock.Lock() + defer s.lock.Unlock() + return s.numWaiting +} + +// TimeOfLastNonEmpty returns the last time that the number of waiting listeners +// was non-empty, may be time.Now() if number of waiting listeners is currently +// non-empty. +func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time { + s.lock.Lock() + defer s.lock.Unlock() + + if s.numWaiting > 0 { + return time.Now() + } + + return s.timeOfLastChannel +} + +// GetSyncPosition returns last sync position which the UserStream was +// notified about +func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + return s.userStream.pos +} + +// GetNotifyChannel returns a channel that is closed when there may be an +// update for the user. +// sincePos specifies from which point we want to be notified about. If there +// has already been an update after sincePos we'll return a closed channel +// immediately. +func (s *UserDeviceStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{} { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + if s.userStream.pos.IsAfter(sincePos) { + // If the listener is behind, i.e. missed a potential update, then we + // want them to wake up immediately. We do this by returning a new + // closed stream, which returns immediately when selected. + closedChannel := make(chan struct{}) + close(closedChannel) + return closedChannel + } + + return s.userStream.signalChannel +} + +// Close cleans up resources used +func (s *UserDeviceStreamListener) Close() { + s.userStream.lock.Lock() + defer s.userStream.lock.Unlock() + + if !s.hasClosed { + s.userStream.numWaiting-- + s.userStream.timeOfLastChannel = time.Now() + } + + s.hasClosed = true +} |