aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync/notifier_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/sync/notifier_test.go')
-rw-r--r--syncapi/sync/notifier_test.go130
1 files changed, 89 insertions, 41 deletions
diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go
index 4fa54393..904315e9 100644
--- a/syncapi/sync/notifier_test.go
+++ b/syncapi/sync/notifier_test.go
@@ -32,19 +32,40 @@ var (
randomMessageEvent gomatrixserverlib.Event
aliceInviteBobEvent gomatrixserverlib.Event
bobLeaveEvent gomatrixserverlib.Event
+ syncPositionVeryOld types.SyncPosition
+ syncPositionBefore types.SyncPosition
+ syncPositionAfter types.SyncPosition
+ syncPositionNewEDU types.SyncPosition
+ syncPositionAfter2 types.SyncPosition
)
var (
- streamPositionVeryOld = types.StreamPosition(5)
- streamPositionBefore = types.StreamPosition(11)
- streamPositionAfter = types.StreamPosition(12)
- streamPositionAfter2 = types.StreamPosition(13)
- roomID = "!test:localhost"
- alice = "@alice:localhost"
- bob = "@bob:localhost"
+ roomID = "!test:localhost"
+ alice = "@alice:localhost"
+ bob = "@bob:localhost"
)
func init() {
+ baseSyncPos := types.SyncPosition{
+ PDUPosition: 0,
+ TypingPosition: 0,
+ }
+
+ syncPositionVeryOld = baseSyncPos
+ syncPositionVeryOld.PDUPosition = 5
+
+ syncPositionBefore = baseSyncPos
+ syncPositionBefore.PDUPosition = 11
+
+ syncPositionAfter = baseSyncPos
+ syncPositionAfter.PDUPosition = 12
+
+ syncPositionNewEDU = syncPositionAfter
+ syncPositionNewEDU.TypingPosition = 1
+
+ syncPositionAfter2 = baseSyncPos
+ syncPositionAfter2.PDUPosition = 13
+
var err error
randomMessageEvent, err = gomatrixserverlib.NewEventFromTrustedJSON([]byte(`{
"type": "m.room.message",
@@ -92,19 +113,19 @@ func init() {
// Test that the current position is returned if a request is already behind.
func TestImmediateNotification(t *testing.T) {
- n := NewNotifier(streamPositionBefore)
- pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionVeryOld))
+ n := NewNotifier(syncPositionBefore)
+ pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionVeryOld))
if err != nil {
t.Fatalf("TestImmediateNotification error: %s", err)
}
- if pos != streamPositionBefore {
- t.Fatalf("TestImmediateNotification want %d, got %d", streamPositionBefore, pos)
+ if pos != syncPositionBefore {
+ t.Fatalf("TestImmediateNotification want %d, got %d", syncPositionBefore, pos)
}
}
// Test that new events to a joined room unblocks the request.
func TestNewEventAndJoinedToRoom(t *testing.T) {
- n := NewNotifier(streamPositionBefore)
+ n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -112,12 +133,12 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
}
- if pos != streamPositionAfter {
- t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", streamPositionAfter, pos)
+ if pos != syncPositionAfter {
+ t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", syncPositionAfter, pos)
}
wg.Done()
}()
@@ -125,14 +146,42 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
- n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
+ n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
wg.Wait()
}
// Test that an invite unblocks the request
func TestNewInviteEventForUser(t *testing.T) {
- n := NewNotifier(streamPositionBefore)
+ n := NewNotifier(syncPositionBefore)
+ n.setUsersJoinedToRooms(map[string][]string{
+ roomID: {alice, bob},
+ })
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
+ if err != nil {
+ t.Errorf("TestNewInviteEventForUser error: %s", err)
+ }
+ if pos != syncPositionAfter {
+ t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionAfter, pos)
+ }
+ wg.Done()
+ }()
+
+ stream := n.fetchUserStream(bob, true)
+ waitForBlocking(stream, 1)
+
+ n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
+
+ wg.Wait()
+}
+
+// Test an EDU-only update wakes up the request.
+func TestEDUWakeup(t *testing.T) {
+ n := NewNotifier(syncPositionAfter)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -140,12 +189,12 @@ func TestNewInviteEventForUser(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
if err != nil {
t.Errorf("TestNewInviteEventForUser error: %s", err)
}
- if pos != streamPositionAfter {
- t.Errorf("TestNewInviteEventForUser want %d, got %d", streamPositionAfter, pos)
+ if pos != syncPositionNewEDU {
+ t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionNewEDU, pos)
}
wg.Done()
}()
@@ -153,14 +202,14 @@ func TestNewInviteEventForUser(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 1)
- n.OnNewEvent(&aliceInviteBobEvent, "", streamPositionAfter)
+ n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
wg.Wait()
}
// Test that all blocked requests get woken up on a new event.
func TestMultipleRequestWakeup(t *testing.T) {
- n := NewNotifier(streamPositionBefore)
+ n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -168,12 +217,12 @@ func TestMultipleRequestWakeup(t *testing.T) {
var wg sync.WaitGroup
wg.Add(3)
poll := func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestMultipleRequestWakeup error: %s", err)
}
- if pos != streamPositionAfter {
- t.Errorf("TestMultipleRequestWakeup want %d, got %d", streamPositionAfter, pos)
+ if pos != syncPositionAfter {
+ t.Errorf("TestMultipleRequestWakeup want %d, got %d", syncPositionAfter, pos)
}
wg.Done()
}
@@ -184,7 +233,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
stream := n.fetchUserStream(bob, true)
waitForBlocking(stream, 3)
- n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter)
+ n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
wg.Wait()
@@ -198,7 +247,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// listen as bob. Make bob leave room. Make alice send event to room.
// Make sure alice gets woken up only and not bob as well.
- n := NewNotifier(streamPositionBefore)
+ n := NewNotifier(syncPositionBefore)
n.setUsersJoinedToRooms(map[string][]string{
roomID: {alice, bob},
})
@@ -208,18 +257,18 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
// Make bob leave the room
leaveWG.Add(1)
go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionBefore))
+ pos, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionBefore))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
}
- if pos != streamPositionAfter {
- t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter, pos)
+ if pos != syncPositionAfter {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter, pos)
}
leaveWG.Done()
}()
bobStream := n.fetchUserStream(bob, true)
waitForBlocking(bobStream, 1)
- n.OnNewEvent(&bobLeaveEvent, "", streamPositionAfter)
+ n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
leaveWG.Wait()
// send an event into the room. Make sure alice gets it. Bob should not.
@@ -227,19 +276,19 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
aliceStream := n.fetchUserStream(alice, true)
aliceWG.Add(1)
go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(alice, streamPositionAfter))
+ pos, err := waitForEvents(n, newTestSyncRequest(alice, syncPositionAfter))
if err != nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
}
- if pos != streamPositionAfter2 {
- t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", streamPositionAfter2, pos)
+ if pos != syncPositionAfter2 {
+ t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter2, pos)
}
aliceWG.Done()
}()
go func() {
// this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
- _, err := waitForEvents(n, newTestSyncRequest(bob, streamPositionAfter))
+ _, err := waitForEvents(n, newTestSyncRequest(bob, syncPositionAfter))
if err == nil {
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
}
@@ -248,7 +297,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
waitForBlocking(aliceStream, 1)
waitForBlocking(bobStream, 1)
- n.OnNewEvent(&randomMessageEvent, "", streamPositionAfter2)
+ n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
aliceWG.Wait()
// it's possible that at this point alice has been informed and bob is about to be informed, so wait
@@ -256,18 +305,17 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
time.Sleep(1 * time.Millisecond)
}
-// same as Notifier.WaitForEvents but with a timeout.
-func waitForEvents(n *Notifier, req syncRequest) (types.StreamPosition, error) {
+func waitForEvents(n *Notifier, req syncRequest) (types.SyncPosition, error) {
listener := n.GetListener(req)
defer listener.Close()
select {
case <-time.After(5 * time.Second):
- return types.StreamPosition(0), fmt.Errorf(
+ return types.SyncPosition{}, fmt.Errorf(
"waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since,
)
case <-listener.GetNotifyChannel(*req.since):
- p := listener.GetStreamPosition()
+ p := listener.GetSyncPosition()
return p, nil
}
}
@@ -280,7 +328,7 @@ func waitForBlocking(s *UserStream, numBlocking uint) {
}
}
-func newTestSyncRequest(userID string, since types.StreamPosition) syncRequest {
+func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
return syncRequest{
device: authtypes.Device{UserID: userID},
timeout: 1 * time.Minute,