diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-01-08 16:59:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-08 16:59:06 +0000 |
commit | b5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch) | |
tree | b3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/sync | |
parent | 56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff) |
Sync refactor — Part 1 (#1688)
* It's half-alive
* Wakeups largely working
* Other tweaks, typing works
* Fix bugs, add receipt stream
* Delete notifier, other tweaks
* Dedupe a bit, add a template for the invite stream
* Clean up, add templates for other streams
* Don't leak channels
* Bring forward some more PDU logic, clean up other places
* Add some more wakeups
* Use addRoomDeltaToResponse
* Log tweaks, typing fixed?
* Fix timed out syncs
* Don't reset next batch position on timeout
* Add account data stream/position
* End of day
* Fix complete sync for receipt, typing
* Streams package
* Clean up a bit
* Complete sync send-to-device
* Don't drop errors
* More lightweight notifications
* Fix typing positions
* Don't advance position on remove again unless needed
* Device list updates
* Advance account data position
* Use limit for incremental sync
* Limit fixes, amongst other things
* Remove some fmt.Println
* Tweaks
* Re-add notifier
* Fix invite position
* Fixes
* Notify account data without advancing PDU position in notifier
* Apply account data position
* Get initial position for account data
* Fix position update
* Fix complete sync positions
* Review comments @Kegsay
* Room consumer parameters
Diffstat (limited to 'syncapi/sync')
-rw-r--r-- | syncapi/sync/notifier.go | 467 | ||||
-rw-r--r-- | syncapi/sync/notifier_test.go | 374 | ||||
-rw-r--r-- | syncapi/sync/request.go | 47 | ||||
-rw-r--r-- | syncapi/sync/requestpool.go | 384 | ||||
-rw-r--r-- | syncapi/sync/userstream.go | 162 |
5 files changed, 140 insertions, 1294 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go deleted file mode 100644 index 66460a8d..00000000 --- a/syncapi/sync/notifier.go +++ /dev/null @@ -1,467 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sync - -import ( - "context" - "sync" - "time" - - "github.com/matrix-org/dendrite/syncapi/storage" - "github.com/matrix-org/dendrite/syncapi/types" - "github.com/matrix-org/gomatrixserverlib" - log "github.com/sirupsen/logrus" -) - -// Notifier will wake up sleeping requests when there is some new data. -// It does not tell requests what that data is, only the sync position which -// they can use to get at it. This is done to prevent races whereby we tell the caller -// the event, but the token has already advanced by the time they fetch it, resulting -// in missed events. -type Notifier struct { - // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine - roomIDToJoinedUsers map[string]userIDSet - // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine - roomIDToPeekingDevices map[string]peekingDeviceSet - // Protects currPos and userStreams. - streamLock *sync.Mutex - // The latest sync position - currPos types.StreamingToken - // A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request. - userDeviceStreams map[string]map[string]*UserDeviceStream - // The last time we cleaned out stale entries from the userStreams map - lastCleanUpTime time.Time -} - -// NewNotifier creates a new notifier set to the given sync position. -// In order for this to be of any use, the Notifier needs to be told all rooms and -// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase). -func NewNotifier(pos types.StreamingToken) *Notifier { - return &Notifier{ - currPos: pos, - roomIDToJoinedUsers: make(map[string]userIDSet), - roomIDToPeekingDevices: make(map[string]peekingDeviceSet), - userDeviceStreams: make(map[string]map[string]*UserDeviceStream), - streamLock: &sync.Mutex{}, - lastCleanUpTime: time.Now(), - } -} - -// OnNewEvent is called when a new event is received from the room server. Must only be -// called from a single goroutine, to avoid races between updates which could set the -// current sync position incorrectly. -// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event -// (based on the users in the event's room), -// a roomID directly, or a list of user IDs, prioritised by parameter ordering. -// posUpdate contains the latest position(s) for one or more types of events. -// If a position in posUpdate is 0, it means no updates are available of that type. -// Typically a consumer supplies a posUpdate with the latest sync position for the -// event type it handles, leaving other fields as 0. -func (n *Notifier) OnNewEvent( - ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string, - posUpdate types.StreamingToken, -) { - // update the current position then notify relevant /sync streams. - // This needs to be done PRIOR to waking up users as they will read this value. - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.removeEmptyUserStreams() - - if ev != nil { - // Map this event's room_id to a list of joined users, and wake them up. - usersToNotify := n.joinedUsers(ev.RoomID()) - // Map this event's room_id to a list of peeking devices, and wake them up. - peekingDevicesToNotify := n.PeekingDevices(ev.RoomID()) - // If this is an invite, also add in the invitee to this list. - if ev.Type() == "m.room.member" && ev.StateKey() != nil { - targetUserID := *ev.StateKey() - membership, err := ev.Membership() - if err != nil { - log.WithError(err).WithField("event_id", ev.EventID()).Errorf( - "Notifier.OnNewEvent: Failed to unmarshal member event", - ) - } else { - // Keep the joined user map up-to-date - switch membership { - case gomatrixserverlib.Invite: - usersToNotify = append(usersToNotify, targetUserID) - case gomatrixserverlib.Join: - // Manually append the new user's ID so they get notified - // along all members in the room - usersToNotify = append(usersToNotify, targetUserID) - n.addJoinedUser(ev.RoomID(), targetUserID) - case gomatrixserverlib.Leave: - fallthrough - case gomatrixserverlib.Ban: - n.removeJoinedUser(ev.RoomID(), targetUserID) - } - } - } - - n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos) - } else if roomID != "" { - n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos) - } else if len(userIDs) > 0 { - n.wakeupUsers(userIDs, nil, n.currPos) - } else { - log.WithFields(log.Fields{ - "posUpdate": posUpdate.String, - }).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up") - } -} - -func (n *Notifier) OnNewPeek( - roomID, userID, deviceID string, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.addPeekingDevice(roomID, userID, deviceID) - - // we don't wake up devices here given the roomserver consumer will do this shortly afterwards - // by calling OnNewEvent. -} - -func (n *Notifier) OnRetirePeek( - roomID, userID, deviceID string, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.removePeekingDevice(roomID, userID, deviceID) - - // we don't wake up devices here given the roomserver consumer will do this shortly afterwards - // by calling OnRetireEvent. -} - -func (n *Notifier) OnNewSendToDevice( - userID string, deviceIDs []string, - posUpdate types.StreamingToken, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUserDevice(userID, deviceIDs, n.currPos) -} - -// OnNewReceipt updates the current position -func (n *Notifier) OnNewTyping( - roomID string, - posUpdate types.StreamingToken, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) -} - -// OnNewReceipt updates the current position -func (n *Notifier) OnNewReceipt( - roomID string, - posUpdate types.StreamingToken, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos) -} - -func (n *Notifier) OnNewKeyChange( - posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) -} - -func (n *Notifier) OnNewInvite( - posUpdate types.StreamingToken, wakeUserID string, -) { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.currPos.ApplyUpdates(posUpdate) - n.wakeupUsers([]string{wakeUserID}, nil, n.currPos) -} - -// GetListener returns a UserStreamListener that can be used to wait for -// updates for a user. Must be closed. -// notify for anything before sincePos -func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener { - // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298 - // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID - // - Incoming events wake requests for a matching room ID - // - Incoming events wake requests for a matching user ID (needed for invites) - - // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked, - // but given we don't do /events, let's pretend it doesn't exist. - - n.streamLock.Lock() - defer n.streamLock.Unlock() - - n.removeEmptyUserStreams() - - return n.fetchUserDeviceStream(req.device.UserID, req.device.ID, true).GetListener(req.ctx) -} - -// Load the membership states required to notify users correctly. -func (n *Notifier) Load(ctx context.Context, db storage.Database) error { - roomToUsers, err := db.AllJoinedUsersInRooms(ctx) - if err != nil { - return err - } - n.setUsersJoinedToRooms(roomToUsers) - - roomToPeekingDevices, err := db.AllPeekingDevicesInRooms(ctx) - if err != nil { - return err - } - n.setPeekingDevices(roomToPeekingDevices) - - return nil -} - -// CurrentPosition returns the current sync position -func (n *Notifier) CurrentPosition() types.StreamingToken { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - return n.currPos -} - -// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from -// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to -// OnNewEvent (eg on startup) to prevent racing. -func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) { - // This is just the bulk form of addJoinedUser - for roomID, userIDs := range roomIDToUserIDs { - if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(userIDSet) - } - for _, userID := range userIDs { - n.roomIDToJoinedUsers[roomID].add(userID) - } - } -} - -// setPeekingDevices marks the given devices as peeking in the given rooms, such that new events from -// these rooms will wake the given devices' /sync requests. This should be called prior to ANY calls to -// OnNewEvent (eg on startup) to prevent racing. -func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.PeekingDevice) { - // This is just the bulk form of addPeekingDevice - for roomID, peekingDevices := range roomIDToPeekingDevices { - if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) - } - for _, peekingDevice := range peekingDevices { - n.roomIDToPeekingDevices[roomID].add(peekingDevice) - } - } -} - -// wakeupUsers will wake up the sync strems for all of the devices for all of the -// specified user IDs, and also the specified peekingDevices -func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) { - for _, userID := range userIDs { - for _, stream := range n.fetchUserStreams(userID) { - if stream == nil { - continue - } - stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream - } - } - - for _, peekingDevice := range peekingDevices { - // TODO: don't bother waking up for devices whose users we already woke up - if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil { - stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream - } - } -} - -// wakeupUserDevice will wake up the sync stream for a specific user device. Other -// device streams will be left alone. -// nolint:unused -func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) { - for _, deviceID := range deviceIDs { - if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil { - stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream - } - } -} - -// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true, -// a stream will be made for this device if one doesn't exist and it will be returned. This -// function does not wait for data to be available on the stream. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream { - _, ok := n.userDeviceStreams[userID] - if !ok { - if !makeIfNotExists { - return nil - } - n.userDeviceStreams[userID] = map[string]*UserDeviceStream{} - } - stream, ok := n.userDeviceStreams[userID][deviceID] - if !ok { - if !makeIfNotExists { - return nil - } - // TODO: Unbounded growth of streams (1 per user) - if stream = NewUserDeviceStream(userID, deviceID, n.currPos); stream != nil { - n.userDeviceStreams[userID][deviceID] = stream - } - } - return stream -} - -// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true, -// a stream will be made for this user if one doesn't exist and it will be returned. This -// function does not wait for data to be available on the stream. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream { - user, ok := n.userDeviceStreams[userID] - if !ok { - return []*UserDeviceStream{} - } - streams := []*UserDeviceStream{} - for _, stream := range user { - streams = append(streams, stream) - } - return streams -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) addJoinedUser(roomID, userID string) { - if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(userIDSet) - } - n.roomIDToJoinedUsers[roomID].add(userID) -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) removeJoinedUser(roomID, userID string) { - if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - n.roomIDToJoinedUsers[roomID] = make(userIDSet) - } - n.roomIDToJoinedUsers[roomID].remove(userID) -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) joinedUsers(roomID string) (userIDs []string) { - if _, ok := n.roomIDToJoinedUsers[roomID]; !ok { - return - } - return n.roomIDToJoinedUsers[roomID].values() -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) { - if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) - } - n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID}) -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -// nolint:unused -func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) { - if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet) - } - // XXX: is this going to work as a key? - n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID}) -} - -// Not thread-safe: must be called on the OnNewEvent goroutine only -func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) { - if _, ok := n.roomIDToPeekingDevices[roomID]; !ok { - return - } - return n.roomIDToPeekingDevices[roomID].values() -} - -// removeEmptyUserStreams iterates through the user stream map and removes any -// that have been empty for a certain amount of time. This is a crude way of -// ensuring that the userStreams map doesn't grow forver. -// This should be called when the notifier gets called for whatever reason, -// the function itself is responsible for ensuring it doesn't iterate too -// often. -// NB: Callers should have locked the mutex before calling this function. -func (n *Notifier) removeEmptyUserStreams() { - // Only clean up now and again - now := time.Now() - if n.lastCleanUpTime.Add(time.Minute).After(now) { - return - } - n.lastCleanUpTime = now - - deleteBefore := now.Add(-5 * time.Minute) - for user, byUser := range n.userDeviceStreams { - for device, stream := range byUser { - if stream.TimeOfLastNonEmpty().Before(deleteBefore) { - delete(n.userDeviceStreams[user], device) - } - if len(n.userDeviceStreams[user]) == 0 { - delete(n.userDeviceStreams, user) - } - } - } -} - -// A string set, mainly existing for improving clarity of structs in this file. -type userIDSet map[string]bool - -func (s userIDSet) add(str string) { - s[str] = true -} - -func (s userIDSet) remove(str string) { - delete(s, str) -} - -func (s userIDSet) values() (vals []string) { - for str := range s { - vals = append(vals, str) - } - return -} - -// A set of PeekingDevices, similar to userIDSet - -type peekingDeviceSet map[types.PeekingDevice]bool - -func (s peekingDeviceSet) add(d types.PeekingDevice) { - s[d] = true -} - -// nolint:unused -func (s peekingDeviceSet) remove(d types.PeekingDevice) { - delete(s, d) -} - -func (s peekingDeviceSet) values() (vals []types.PeekingDevice) { - for d := range s { - vals = append(vals, d) - } - return -} diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go deleted file mode 100644 index d24da463..00000000 --- a/syncapi/sync/notifier_test.go +++ /dev/null @@ -1,374 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sync - -import ( - "context" - "encoding/json" - "fmt" - "sync" - "testing" - "time" - - "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" -) - -var ( - randomMessageEvent gomatrixserverlib.HeaderedEvent - aliceInviteBobEvent gomatrixserverlib.HeaderedEvent - bobLeaveEvent gomatrixserverlib.HeaderedEvent - syncPositionVeryOld = types.StreamingToken{PDUPosition: 5} - syncPositionBefore = types.StreamingToken{PDUPosition: 11} - syncPositionAfter = types.StreamingToken{PDUPosition: 12} - //syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil) - syncPositionAfter2 = types.StreamingToken{PDUPosition: 13} -) - -var ( - roomID = "!test:localhost" - alice = "@alice:localhost" - aliceDev = "alicedevice" - bob = "@bob:localhost" - bobDev = "bobdev" -) - -func init() { - var err error - err = json.Unmarshal([]byte(`{ - "_room_version": "1", - "type": "m.room.message", - "content": { - "body": "Hello World", - "msgtype": "m.text" - }, - "sender": "@noone:localhost", - "room_id": "`+roomID+`", - "origin": "localhost", - "origin_server_ts": 12345, - "event_id": "$randomMessageEvent:localhost" - }`), &randomMessageEvent) - if err != nil { - panic(err) - } - err = json.Unmarshal([]byte(`{ - "_room_version": "1", - "type": "m.room.member", - "state_key": "`+bob+`", - "content": { - "membership": "invite" - }, - "sender": "`+alice+`", - "room_id": "`+roomID+`", - "origin": "localhost", - "origin_server_ts": 12345, - "event_id": "$aliceInviteBobEvent:localhost" - }`), &aliceInviteBobEvent) - if err != nil { - panic(err) - } - err = json.Unmarshal([]byte(`{ - "_room_version": "1", - "type": "m.room.member", - "state_key": "`+bob+`", - "content": { - "membership": "leave" - }, - "sender": "`+bob+`", - "room_id": "`+roomID+`", - "origin": "localhost", - "origin_server_ts": 12345, - "event_id": "$bobLeaveEvent:localhost" - }`), &bobLeaveEvent) - if err != nil { - panic(err) - } -} - -func mustEqualPositions(t *testing.T, got, want types.StreamingToken) { - if got.String() != want.String() { - t.Fatalf("mustEqualPositions got %s want %s", got.String(), want.String()) - } -} - -// Test that the current position is returned if a request is already behind. -func TestImmediateNotification(t *testing.T) { - n := NewNotifier(syncPositionBefore) - pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld)) - if err != nil { - t.Fatalf("TestImmediateNotification error: %s", err) - } - mustEqualPositions(t, pos, syncPositionBefore) -} - -// Test that new events to a joined room unblocks the request. -func TestNewEventAndJoinedToRoom(t *testing.T) { - n := NewNotifier(syncPositionBefore) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) - if err != nil { - t.Errorf("TestNewEventAndJoinedToRoom error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter) - wg.Done() - }() - - stream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(stream, 1) - - n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter) - - wg.Wait() -} - -func TestCorrectStream(t *testing.T) { - n := NewNotifier(syncPositionBefore) - stream := lockedFetchUserStream(n, bob, bobDev) - if stream.UserID != bob { - t.Fatalf("expected user %q, got %q", bob, stream.UserID) - } - if stream.DeviceID != bobDev { - t.Fatalf("expected device %q, got %q", bobDev, stream.DeviceID) - } -} - -func TestCorrectStreamWakeup(t *testing.T) { - n := NewNotifier(syncPositionBefore) - awoken := make(chan string) - - streamone := lockedFetchUserStream(n, alice, "one") - streamtwo := lockedFetchUserStream(n, alice, "two") - - go func() { - select { - case <-streamone.signalChannel: - awoken <- "one" - case <-streamtwo.signalChannel: - awoken <- "two" - } - }() - - time.Sleep(1 * time.Second) - - wake := "two" - n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter) - - if result := <-awoken; result != wake { - t.Fatalf("expected to wake %q, got %q", wake, result) - } -} - -// Test that an invite unblocks the request -func TestNewInviteEventForUser(t *testing.T) { - n := NewNotifier(syncPositionBefore) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) - if err != nil { - t.Errorf("TestNewInviteEventForUser error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter) - wg.Done() - }() - - stream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(stream, 1) - - n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter) - - wg.Wait() -} - -// Test an EDU-only update wakes up the request. -// TODO: Fix this test, invites wake up with an incremented -// PDU position, not EDU position -/* -func TestEDUWakeup(t *testing.T) { - n := NewNotifier(syncPositionAfter) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var wg sync.WaitGroup - wg.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter)) - if err != nil { - t.Errorf("TestNewInviteEventForUser error: %w", err) - } - mustEqualPositions(t, pos, syncPositionNewEDU) - wg.Done() - }() - - stream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(stream, 1) - - n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU) - - wg.Wait() -} -*/ - -// Test that all blocked requests get woken up on a new event. -func TestMultipleRequestWakeup(t *testing.T) { - n := NewNotifier(syncPositionBefore) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var wg sync.WaitGroup - wg.Add(3) - poll := func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) - if err != nil { - t.Errorf("TestMultipleRequestWakeup error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter) - wg.Done() - } - go poll() - go poll() - go poll() - - stream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(stream, 3) - - n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter) - - wg.Wait() - - numWaiting := stream.NumWaiting() - if numWaiting != 0 { - t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting) - } -} - -// Test that you stop getting woken up when you leave a room. -func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) { - // listen as bob. Make bob leave room. Make alice send event to room. - // Make sure alice gets woken up only and not bob as well. - n := NewNotifier(syncPositionBefore) - n.setUsersJoinedToRooms(map[string][]string{ - roomID: {alice, bob}, - }) - - var leaveWG sync.WaitGroup - - // Make bob leave the room - leaveWG.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore)) - if err != nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter) - leaveWG.Done() - }() - bobStream := lockedFetchUserStream(n, bob, bobDev) - waitForBlocking(bobStream, 1) - n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter) - leaveWG.Wait() - - // send an event into the room. Make sure alice gets it. Bob should not. - var aliceWG sync.WaitGroup - aliceStream := lockedFetchUserStream(n, alice, aliceDev) - aliceWG.Add(1) - go func() { - pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter)) - if err != nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err) - } - mustEqualPositions(t, pos, syncPositionAfter2) - aliceWG.Done() - }() - - go func() { - // this should timeout with an error (but the main goroutine won't wait for the timeout explicitly) - _, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter)) - if err == nil { - t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil") - } - }() - - waitForBlocking(aliceStream, 1) - waitForBlocking(bobStream, 1) - - n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2) - aliceWG.Wait() - - // it's possible that at this point alice has been informed and bob is about to be informed, so wait - // for a fraction of a second to account for this race - time.Sleep(1 * time.Millisecond) -} - -func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) { - listener := n.GetListener(req) - defer listener.Close() - - select { - case <-time.After(5 * time.Second): - return types.StreamingToken{}, fmt.Errorf( - "waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since, - ) - case <-listener.GetNotifyChannel(req.since): - p := listener.GetSyncPosition() - return p, nil - } -} - -// Wait until something is Wait()ing on the user stream. -func waitForBlocking(s *UserDeviceStream, numBlocking uint) { - for numBlocking != s.NumWaiting() { - // This is horrible but I don't want to add a signalling mechanism JUST for testing. - time.Sleep(1 * time.Microsecond) - } -} - -// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock. -// A new stream is made if it doesn't exist already. -func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream { - n.streamLock.Lock() - defer n.streamLock.Unlock() - - return n.fetchUserDeviceStream(userID, deviceID, true) -} - -func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syncRequest { - return syncRequest{ - device: userapi.Device{ - UserID: userID, - ID: deviceID, - }, - timeout: 1 * time.Minute, - since: since, - wantFullState: false, - limit: DefaultTimelineLimit, - log: util.GetLogger(context.TODO()), - ctx: context.TODO(), - } -} diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go index f2f2894b..5f89ffc3 100644 --- a/syncapi/sync/request.go +++ b/syncapi/sync/request.go @@ -15,7 +15,6 @@ package sync import ( - "context" "encoding/json" "net/http" "strconv" @@ -26,7 +25,7 @@ import ( userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" ) const defaultSyncTimeout = time.Duration(0) @@ -40,18 +39,7 @@ type filter struct { } `json:"room"` } -// syncRequest represents a /sync request, with sensible defaults/sanity checks applied. -type syncRequest struct { - ctx context.Context - device userapi.Device - limit int - timeout time.Duration - since types.StreamingToken // nil means that no since token was supplied - wantFullState bool - log *log.Entry -} - -func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*syncRequest, error) { +func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) { timeout := getTimeout(req.URL.Query().Get("timeout")) fullState := req.URL.Query().Get("full_state") wantFullState := fullState != "" && fullState != "false" @@ -87,15 +75,30 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat } } } + + filter := gomatrixserverlib.DefaultEventFilter() + filter.Limit = timelineLimit // TODO: Additional query params: set_presence, filter - return &syncRequest{ - ctx: req.Context(), - device: device, - timeout: timeout, - since: since, - wantFullState: wantFullState, - limit: timelineLimit, - log: util.GetLogger(req.Context()), + + logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{ + "user_id": device.UserID, + "device_id": device.ID, + "since": since, + "timeout": timeout, + "limit": timelineLimit, + }) + + return &types.SyncRequest{ + Context: req.Context(), // + Log: logger, // + Device: &device, // + Response: types.NewResponse(), // Populated by all streams + Filter: filter, // + Since: since, // + Timeout: timeout, // + Limit: timelineLimit, // + Rooms: make(map[string]string), // Populated by the PDU stream + WantFullState: wantFullState, // }, nil } diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 0751487a..384fc25c 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -17,8 +17,6 @@ package sync import ( - "context" - "fmt" "net" "net/http" "strings" @@ -30,13 +28,13 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/syncapi/internal" + "github.com/matrix-org/dendrite/syncapi/notifier" "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/streams" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" - "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync @@ -44,19 +42,30 @@ type RequestPool struct { db storage.Database cfg *config.SyncAPI userAPI userapi.UserInternalAPI - Notifier *Notifier keyAPI keyapi.KeyInternalAPI rsAPI roomserverAPI.RoomserverInternalAPI lastseen sync.Map + streams *streams.Streams + Notifier *notifier.Notifier } // NewRequestPool makes a new RequestPool func NewRequestPool( - db storage.Database, cfg *config.SyncAPI, n *Notifier, + db storage.Database, cfg *config.SyncAPI, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, + streams *streams.Streams, notifier *notifier.Notifier, ) *RequestPool { - rp := &RequestPool{db, cfg, userAPI, n, keyAPI, rsAPI, sync.Map{}} + rp := &RequestPool{ + db: db, + cfg: cfg, + userAPI: userAPI, + keyAPI: keyAPI, + rsAPI: rsAPI, + lastseen: sync.Map{}, + streams: streams, + Notifier: notifier, + } go rp.cleanLastSeen() return rp } @@ -128,8 +137,6 @@ var waitingSyncRequests = prometheus.NewGauge( // called in a dedicated goroutine for this request. This function will block the goroutine // until a response is ready, or it times out. func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse { - var syncData *types.Response - // Extract values from request syncReq, err := newSyncRequest(req, *device, rp.db) if err != nil { @@ -139,89 +146,109 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } } - logger := util.GetLogger(req.Context()).WithFields(log.Fields{ - "user_id": device.UserID, - "device_id": device.ID, - "since": syncReq.since, - "timeout": syncReq.timeout, - "limit": syncReq.limit, - }) - activeSyncRequests.Inc() defer activeSyncRequests.Dec() rp.updateLastSeen(req, device) - currPos := rp.Notifier.CurrentPosition() - - if rp.shouldReturnImmediately(syncReq) { - syncData, err = rp.currentSyncForUser(*syncReq, currPos) - if err != nil { - logger.WithError(err).Error("rp.currentSyncForUser failed") - return jsonerror.InternalServerError() - } - logger.WithField("next", syncData.NextBatch).Info("Responding immediately") - return util.JSONResponse{ - Code: http.StatusOK, - JSON: syncData, - } - } - waitingSyncRequests.Inc() defer waitingSyncRequests.Dec() - // Otherwise, we wait for the notifier to tell us if something *may* have - // happened. We loop in case it turns out that nothing did happen. + currentPos := rp.Notifier.CurrentPosition() - timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above - defer timer.Stop() + if !rp.shouldReturnImmediately(syncReq) { + timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above + defer timer.Stop() - userStreamListener := rp.Notifier.GetListener(*syncReq) - defer userStreamListener.Close() + userStreamListener := rp.Notifier.GetListener(*syncReq) + defer userStreamListener.Close() - // We need the loop in case userStreamListener wakes up even if there isn't - // anything to send down. In this case, we'll jump out of the select but - // don't want to send anything back until we get some actual content to - // respond with, so we skip the return an go back to waiting for content to - // be sent down or the request timing out. - var hasTimedOut bool - sincePos := syncReq.since - for { - select { - // Wait for notifier to wake us up - case <-userStreamListener.GetNotifyChannel(sincePos): - currPos = userStreamListener.GetSyncPosition() - // Or for timeout to expire - case <-timer.C: - // We just need to ensure we get out of the select after reaching the - // timeout, but there's nothing specific we want to do in this case - // apart from that, so we do nothing except stating we're timing out - // and need to respond. - hasTimedOut = true - // Or for the request to be cancelled - case <-req.Context().Done(): - logger.WithError(err).Error("request cancelled") - return jsonerror.InternalServerError() + giveup := func() util.JSONResponse { + syncReq.Response.NextBatch = syncReq.Since + return util.JSONResponse{ + Code: http.StatusOK, + JSON: syncReq.Response, + } } - // Note that we don't time out during calculation of sync - // response. This ensures that we don't waste the hard work - // of calculating the sync only to get timed out before we - // can respond - syncData, err = rp.currentSyncForUser(*syncReq, currPos) - if err != nil { - logger.WithError(err).Error("rp.currentSyncForUser failed") - return jsonerror.InternalServerError() + select { + case <-syncReq.Context.Done(): // Caller gave up + return giveup() + + case <-timer.C: // Timeout reached + return giveup() + + case <-userStreamListener.GetNotifyChannel(syncReq.Since): + syncReq.Log.Debugln("Responding to sync after wake-up") + currentPos.ApplyUpdates(userStreamListener.GetSyncPosition()) } + } else { + syncReq.Log.Debugln("Responding to sync immediately") + } - if !syncData.IsEmpty() || hasTimedOut { - logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding") - return util.JSONResponse{ - Code: http.StatusOK, - JSON: syncData, - } + if syncReq.Since.IsEmpty() { + // Complete sync + syncReq.Response.NextBatch = types.StreamingToken{ + PDUPosition: rp.streams.PDUStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + TypingPosition: rp.streams.TypingStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + InvitePosition: rp.streams.InviteStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync( + syncReq.Context, syncReq, + ), + } + } else { + // Incremental sync + syncReq.Response.NextBatch = types.StreamingToken{ + PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.PDUPosition, currentPos.PDUPosition, + ), + TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.TypingPosition, currentPos.TypingPosition, + ), + ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition, + ), + InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.InvitePosition, currentPos.InvitePosition, + ), + SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition, + ), + AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition, + ), + DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync( + syncReq.Context, syncReq, + syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition, + ), } } + + return util.JSONResponse{ + Code: http.StatusOK, + JSON: syncReq.Response, + } } func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse { @@ -247,18 +274,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use JSON: jsonerror.InvalidArgumentValue("bad 'to' value"), } } - // work out room joins/leaves - res, err := rp.db.IncrementalSync( - req.Context(), types.NewResponse(), *device, fromToken, toToken, 10, false, - ) + syncReq, err := newSyncRequest(req, *device, rp.db) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync") + util.GetLogger(req.Context()).WithError(err).Error("newSyncRequest failed") return jsonerror.InternalServerError() } - - res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken) + rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition) + _, _, err = internal.DeviceListCatchup( + req.Context(), rp.keyAPI, rp.rsAPI, syncReq.Device.UserID, + syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition, + ) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info") + util.GetLogger(req.Context()).WithError(err).Error("Failed to DeviceListCatchup info") return jsonerror.InternalServerError() } return util.JSONResponse{ @@ -267,199 +294,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use Changed []string `json:"changed"` Left []string `json:"left"` }{ - Changed: res.DeviceLists.Changed, - Left: res.DeviceLists.Left, + Changed: syncReq.Response.DeviceLists.Changed, + Left: syncReq.Response.DeviceLists.Left, }, } } -// nolint:gocyclo -func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) { - res := types.NewResponse() - - // See if we have any new tasks to do for the send-to-device messaging. - lastPos, events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, req.since) - if err != nil { - return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err) - } - - // TODO: handle ignored users - if req.since.IsEmpty() { - res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit) - if err != nil { - return res, fmt.Errorf("rp.db.CompleteSync: %w", err) - } - } else { - res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.limit, req.wantFullState) - if err != nil { - return res, fmt.Errorf("rp.db.IncrementalSync: %w", err) - } - } - - accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead - res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter) - if err != nil { - return res, fmt.Errorf("rp.appendAccountData: %w", err) - } - res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos) - if err != nil { - return res, fmt.Errorf("rp.appendDeviceLists: %w", err) - } - err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res) - if err != nil { - return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err) - } - - // Before we return the sync response, make sure that we take action on - // any send-to-device database updates or deletions that we need to do. - // Then add the updates into the sync response. - if len(updates) > 0 || len(deletions) > 0 { - // Handle the updates and deletions in the database. - err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.since) - if err != nil { - return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err) - } - } - if len(events) > 0 { - // Add the updates into the sync response. - for _, event := range events { - res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent) - } - } - - res.NextBatch.SendToDevicePosition = lastPos - return res, err -} - -func (rp *RequestPool) appendDeviceLists( - data *types.Response, userID string, since, to types.StreamingToken, -) (*types.Response, error) { - _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to) - if err != nil { - return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err) - } - - return data, nil -} - -// nolint:gocyclo -func (rp *RequestPool) appendAccountData( - data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, - accountDataFilter *gomatrixserverlib.EventFilter, -) (*types.Response, error) { - // TODO: Account data doesn't have a sync position of its own, meaning that - // account data might be sent multiple time to the client if multiple account - // data keys were set between two message. This isn't a huge issue since the - // duplicate data doesn't represent a huge quantity of data, but an optimisation - // here would be making sure each data is sent only once to the client. - if req.since.IsEmpty() { - // If this is the initial sync, we don't need to check if a data has - // already been sent. Instead, we send the whole batch. - dataReq := &userapi.QueryAccountDataRequest{ - UserID: userID, - } - dataRes := &userapi.QueryAccountDataResponse{} - if err := rp.userAPI.QueryAccountData(req.ctx, dataReq, dataRes); err != nil { - return nil, err - } - for datatype, databody := range dataRes.GlobalAccountData { - data.AccountData.Events = append( - data.AccountData.Events, - gomatrixserverlib.ClientEvent{ - Type: datatype, - Content: gomatrixserverlib.RawJSON(databody), - }, - ) - } - for r, j := range data.Rooms.Join { - for datatype, databody := range dataRes.RoomAccountData[r] { - j.AccountData.Events = append( - j.AccountData.Events, - gomatrixserverlib.ClientEvent{ - Type: datatype, - Content: gomatrixserverlib.RawJSON(databody), - }, - ) - data.Rooms.Join[r] = j - } - } - return data, nil - } - - r := types.Range{ - From: req.since.PDUPosition, - To: currentPos, - } - // If both positions are the same, it means that the data was saved after the - // latest room event. In that case, we need to decrement the old position as - // results are exclusive of Low. - if r.Low() == r.High() { - r.From-- - } - - // Sync is not initial, get all account data since the latest sync - dataTypes, err := rp.db.GetAccountDataInRange( - req.ctx, userID, r, accountDataFilter, - ) - if err != nil { - return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err) - } - - if len(dataTypes) == 0 { - // TODO: this fixes the sytest but is it the right thing to do? - dataTypes[""] = []string{"m.push_rules"} - } - - // Iterate over the rooms - for roomID, dataTypes := range dataTypes { - // Request the missing data from the database - for _, dataType := range dataTypes { - dataReq := userapi.QueryAccountDataRequest{ - UserID: userID, - RoomID: roomID, - DataType: dataType, - } - dataRes := userapi.QueryAccountDataResponse{} - err = rp.userAPI.QueryAccountData(req.ctx, &dataReq, &dataRes) - if err != nil { - continue - } - if roomID == "" { - if globalData, ok := dataRes.GlobalAccountData[dataType]; ok { - data.AccountData.Events = append( - data.AccountData.Events, - gomatrixserverlib.ClientEvent{ - Type: dataType, - Content: gomatrixserverlib.RawJSON(globalData), - }, - ) - } - } else { - if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok { - joinData := data.Rooms.Join[roomID] - joinData.AccountData.Events = append( - joinData.AccountData.Events, - gomatrixserverlib.ClientEvent{ - Type: dataType, - Content: gomatrixserverlib.RawJSON(roomData), - }, - ) - data.Rooms.Join[roomID] = joinData - } - } - } - } - - return data, nil -} - // shouldReturnImmediately returns whether the /sync request is an initial sync, // or timeout=0, or full_state=true, in any of the cases the request should // return immediately. -func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool { - if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState { +func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest) bool { + if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState { return true } - waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID) - return werr == nil && waiting + return false } diff --git a/syncapi/sync/userstream.go b/syncapi/sync/userstream.go deleted file mode 100644 index ff9a4d00..00000000 --- a/syncapi/sync/userstream.go +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sync - -import ( - "context" - "runtime" - "sync" - "time" - - "github.com/matrix-org/dendrite/syncapi/types" -) - -// UserDeviceStream represents a communication mechanism between the /sync request goroutine -// and the underlying sync server goroutines. -// Goroutines can get a UserStreamListener to wait for updates, and can Broadcast() -// updates. -type UserDeviceStream struct { - UserID string - DeviceID string - // The lock that protects changes to this struct - lock sync.Mutex - // Closed when there is an update. - signalChannel chan struct{} - // The last sync position that there may have been an update for the user - pos types.StreamingToken - // The last time when we had some listeners waiting - timeOfLastChannel time.Time - // The number of listeners waiting - numWaiting uint -} - -// UserDeviceStreamListener allows a sync request to wait for updates for a user. -type UserDeviceStreamListener struct { - userStream *UserDeviceStream - - // Whether the stream has been closed - hasClosed bool -} - -// NewUserDeviceStream creates a new user stream -func NewUserDeviceStream(userID, deviceID string, currPos types.StreamingToken) *UserDeviceStream { - return &UserDeviceStream{ - UserID: userID, - DeviceID: deviceID, - timeOfLastChannel: time.Now(), - pos: currPos, - signalChannel: make(chan struct{}), - } -} - -// GetListener returns UserStreamListener that a sync request can use to wait -// for new updates with. -// UserStreamListener must be closed -func (s *UserDeviceStream) GetListener(ctx context.Context) UserDeviceStreamListener { - s.lock.Lock() - defer s.lock.Unlock() - - s.numWaiting++ // We decrement when UserStreamListener is closed - - listener := UserDeviceStreamListener{ - userStream: s, - } - - // Lets be a bit paranoid here and check that Close() is being called - runtime.SetFinalizer(&listener, func(l *UserDeviceStreamListener) { - if !l.hasClosed { - l.Close() - } - }) - - return listener -} - -// Broadcast a new sync position for this user. -func (s *UserDeviceStream) Broadcast(pos types.StreamingToken) { - s.lock.Lock() - defer s.lock.Unlock() - - s.pos = pos - - close(s.signalChannel) - - s.signalChannel = make(chan struct{}) -} - -// NumWaiting returns the number of goroutines waiting for waiting for updates. -// Used for metrics and testing. -func (s *UserDeviceStream) NumWaiting() uint { - s.lock.Lock() - defer s.lock.Unlock() - return s.numWaiting -} - -// TimeOfLastNonEmpty returns the last time that the number of waiting listeners -// was non-empty, may be time.Now() if number of waiting listeners is currently -// non-empty. -func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time { - s.lock.Lock() - defer s.lock.Unlock() - - if s.numWaiting > 0 { - return time.Now() - } - - return s.timeOfLastChannel -} - -// GetSyncPosition returns last sync position which the UserStream was -// notified about -func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken { - s.userStream.lock.Lock() - defer s.userStream.lock.Unlock() - - return s.userStream.pos -} - -// GetNotifyChannel returns a channel that is closed when there may be an -// update for the user. -// sincePos specifies from which point we want to be notified about. If there -// has already been an update after sincePos we'll return a closed channel -// immediately. -func (s *UserDeviceStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{} { - s.userStream.lock.Lock() - defer s.userStream.lock.Unlock() - - if s.userStream.pos.IsAfter(sincePos) { - // If the listener is behind, i.e. missed a potential update, then we - // want them to wake up immediately. We do this by returning a new - // closed stream, which returns immediately when selected. - closedChannel := make(chan struct{}) - close(closedChannel) - return closedChannel - } - - return s.userStream.signalChannel -} - -// Close cleans up resources used -func (s *UserDeviceStreamListener) Close() { - s.userStream.lock.Lock() - defer s.userStream.lock.Unlock() - - if !s.hasClosed { - s.userStream.numWaiting-- - s.userStream.timeOfLastChannel = time.Now() - } - - s.hasClosed = true -} |