aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/sync')
-rw-r--r--syncapi/sync/notifier.go1
-rw-r--r--syncapi/sync/notifier_test.go21
2 files changed, 16 insertions, 6 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index 30ac3a2e..14bc2efb 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -185,6 +185,7 @@ func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) {
// fetchUserStream retrieves a stream unique to the given user. If makeIfNotExists is true,
// a stream will be made for this user if one doesn't exist and it will be returned. This
// function does not wait for data to be available on the stream.
+// NB: Callers should have locked the mutex before calling this function.
func (n *Notifier) fetchUserStream(userID string, makeIfNotExists bool) *UserStream {
stream, ok := n.userStreams[userID]
if !ok && makeIfNotExists {
diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go
index 904315e9..808e07cc 100644
--- a/syncapi/sync/notifier_test.go
+++ b/syncapi/sync/notifier_test.go
@@ -143,7 +143,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
wg.Done()
}()
- stream := n.fetchUserStream(bob, true)
+ stream := lockedFetchUserStream(n, bob)
waitForBlocking(stream, 1)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
@@ -171,7 +171,7 @@ func TestNewInviteEventForUser(t *testing.T) {
wg.Done()
}()
- stream := n.fetchUserStream(bob, true)
+ stream := lockedFetchUserStream(n, bob)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
@@ -199,7 +199,7 @@ func TestEDUWakeup(t *testing.T) {
wg.Done()
}()
- stream := n.fetchUserStream(bob, true)
+ stream := lockedFetchUserStream(n, bob)
waitForBlocking(stream, 1)
n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
@@ -230,7 +230,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
go poll()
go poll()
- stream := n.fetchUserStream(bob, true)
+ stream := lockedFetchUserStream(n, bob)
waitForBlocking(stream, 3)
n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
@@ -266,14 +266,14 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
}
leaveWG.Done()
}()
- bobStream := n.fetchUserStream(bob, true)
+ bobStream := lockedFetchUserStream(n, bob)
waitForBlocking(bobStream, 1)
n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
leaveWG.Wait()
// send an event into the room. Make sure alice gets it. Bob should not.
var aliceWG sync.WaitGroup
- aliceStream := n.fetchUserStream(alice, true)
+ aliceStream := lockedFetchUserStream(n, alice)
aliceWG.Add(1)
go func() {
pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
@@ -328,6 +328,15 @@ func waitForBlocking(s *UserStream, numBlocking uint) {
}
}
+// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
+// A new stream is made if it doesn't exist already.
+func lockedFetchUserStream(n *Notifier, userID string) *UserStream {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+
+ return n.fetchUserStream(userID, true)
+}
+
func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
return syncRequest{
device: authtypes.Device{UserID: userID},