aboutsummaryrefslogtreecommitdiff
path: root/syncapi/sync
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-08 16:59:06 +0000
committerGitHub <noreply@github.com>2021-01-08 16:59:06 +0000
commitb5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch)
treeb3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/sync
parent56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff)
Sync refactor — Part 1 (#1688)
* It's half-alive * Wakeups largely working * Other tweaks, typing works * Fix bugs, add receipt stream * Delete notifier, other tweaks * Dedupe a bit, add a template for the invite stream * Clean up, add templates for other streams * Don't leak channels * Bring forward some more PDU logic, clean up other places * Add some more wakeups * Use addRoomDeltaToResponse * Log tweaks, typing fixed? * Fix timed out syncs * Don't reset next batch position on timeout * Add account data stream/position * End of day * Fix complete sync for receipt, typing * Streams package * Clean up a bit * Complete sync send-to-device * Don't drop errors * More lightweight notifications * Fix typing positions * Don't advance position on remove again unless needed * Device list updates * Advance account data position * Use limit for incremental sync * Limit fixes, amongst other things * Remove some fmt.Println * Tweaks * Re-add notifier * Fix invite position * Fixes * Notify account data without advancing PDU position in notifier * Apply account data position * Get initial position for account data * Fix position update * Fix complete sync positions * Review comments @Kegsay * Room consumer parameters
Diffstat (limited to 'syncapi/sync')
-rw-r--r--syncapi/sync/notifier.go467
-rw-r--r--syncapi/sync/notifier_test.go374
-rw-r--r--syncapi/sync/request.go47
-rw-r--r--syncapi/sync/requestpool.go384
-rw-r--r--syncapi/sync/userstream.go162
5 files changed, 140 insertions, 1294 deletions
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
deleted file mode 100644
index 66460a8d..00000000
--- a/syncapi/sync/notifier.go
+++ /dev/null
@@ -1,467 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sync
-
-import (
- "context"
- "sync"
- "time"
-
- "github.com/matrix-org/dendrite/syncapi/storage"
- "github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
- log "github.com/sirupsen/logrus"
-)
-
-// Notifier will wake up sleeping requests when there is some new data.
-// It does not tell requests what that data is, only the sync position which
-// they can use to get at it. This is done to prevent races whereby we tell the caller
-// the event, but the token has already advanced by the time they fetch it, resulting
-// in missed events.
-type Notifier struct {
- // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
- roomIDToJoinedUsers map[string]userIDSet
- // A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
- roomIDToPeekingDevices map[string]peekingDeviceSet
- // Protects currPos and userStreams.
- streamLock *sync.Mutex
- // The latest sync position
- currPos types.StreamingToken
- // A map of user_id => device_id => UserStream which can be used to wake a given user's /sync request.
- userDeviceStreams map[string]map[string]*UserDeviceStream
- // The last time we cleaned out stale entries from the userStreams map
- lastCleanUpTime time.Time
-}
-
-// NewNotifier creates a new notifier set to the given sync position.
-// In order for this to be of any use, the Notifier needs to be told all rooms and
-// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
-func NewNotifier(pos types.StreamingToken) *Notifier {
- return &Notifier{
- currPos: pos,
- roomIDToJoinedUsers: make(map[string]userIDSet),
- roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
- userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
- streamLock: &sync.Mutex{},
- lastCleanUpTime: time.Now(),
- }
-}
-
-// OnNewEvent is called when a new event is received from the room server. Must only be
-// called from a single goroutine, to avoid races between updates which could set the
-// current sync position incorrectly.
-// Chooses which user sync streams to update by a provided *gomatrixserverlib.Event
-// (based on the users in the event's room),
-// a roomID directly, or a list of user IDs, prioritised by parameter ordering.
-// posUpdate contains the latest position(s) for one or more types of events.
-// If a position in posUpdate is 0, it means no updates are available of that type.
-// Typically a consumer supplies a posUpdate with the latest sync position for the
-// event type it handles, leaving other fields as 0.
-func (n *Notifier) OnNewEvent(
- ev *gomatrixserverlib.HeaderedEvent, roomID string, userIDs []string,
- posUpdate types.StreamingToken,
-) {
- // update the current position then notify relevant /sync streams.
- // This needs to be done PRIOR to waking up users as they will read this value.
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.currPos.ApplyUpdates(posUpdate)
- n.removeEmptyUserStreams()
-
- if ev != nil {
- // Map this event's room_id to a list of joined users, and wake them up.
- usersToNotify := n.joinedUsers(ev.RoomID())
- // Map this event's room_id to a list of peeking devices, and wake them up.
- peekingDevicesToNotify := n.PeekingDevices(ev.RoomID())
- // If this is an invite, also add in the invitee to this list.
- if ev.Type() == "m.room.member" && ev.StateKey() != nil {
- targetUserID := *ev.StateKey()
- membership, err := ev.Membership()
- if err != nil {
- log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
- "Notifier.OnNewEvent: Failed to unmarshal member event",
- )
- } else {
- // Keep the joined user map up-to-date
- switch membership {
- case gomatrixserverlib.Invite:
- usersToNotify = append(usersToNotify, targetUserID)
- case gomatrixserverlib.Join:
- // Manually append the new user's ID so they get notified
- // along all members in the room
- usersToNotify = append(usersToNotify, targetUserID)
- n.addJoinedUser(ev.RoomID(), targetUserID)
- case gomatrixserverlib.Leave:
- fallthrough
- case gomatrixserverlib.Ban:
- n.removeJoinedUser(ev.RoomID(), targetUserID)
- }
- }
- }
-
- n.wakeupUsers(usersToNotify, peekingDevicesToNotify, n.currPos)
- } else if roomID != "" {
- n.wakeupUsers(n.joinedUsers(roomID), n.PeekingDevices(roomID), n.currPos)
- } else if len(userIDs) > 0 {
- n.wakeupUsers(userIDs, nil, n.currPos)
- } else {
- log.WithFields(log.Fields{
- "posUpdate": posUpdate.String,
- }).Warn("Notifier.OnNewEvent called but caller supplied no user to wake up")
- }
-}
-
-func (n *Notifier) OnNewPeek(
- roomID, userID, deviceID string,
-) {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.addPeekingDevice(roomID, userID, deviceID)
-
- // we don't wake up devices here given the roomserver consumer will do this shortly afterwards
- // by calling OnNewEvent.
-}
-
-func (n *Notifier) OnRetirePeek(
- roomID, userID, deviceID string,
-) {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.removePeekingDevice(roomID, userID, deviceID)
-
- // we don't wake up devices here given the roomserver consumer will do this shortly afterwards
- // by calling OnRetireEvent.
-}
-
-func (n *Notifier) OnNewSendToDevice(
- userID string, deviceIDs []string,
- posUpdate types.StreamingToken,
-) {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.currPos.ApplyUpdates(posUpdate)
- n.wakeupUserDevice(userID, deviceIDs, n.currPos)
-}
-
-// OnNewReceipt updates the current position
-func (n *Notifier) OnNewTyping(
- roomID string,
- posUpdate types.StreamingToken,
-) {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.currPos.ApplyUpdates(posUpdate)
- n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
-}
-
-// OnNewReceipt updates the current position
-func (n *Notifier) OnNewReceipt(
- roomID string,
- posUpdate types.StreamingToken,
-) {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.currPos.ApplyUpdates(posUpdate)
- n.wakeupUsers(n.joinedUsers(roomID), nil, n.currPos)
-}
-
-func (n *Notifier) OnNewKeyChange(
- posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
-) {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.currPos.ApplyUpdates(posUpdate)
- n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
-}
-
-func (n *Notifier) OnNewInvite(
- posUpdate types.StreamingToken, wakeUserID string,
-) {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.currPos.ApplyUpdates(posUpdate)
- n.wakeupUsers([]string{wakeUserID}, nil, n.currPos)
-}
-
-// GetListener returns a UserStreamListener that can be used to wait for
-// updates for a user. Must be closed.
-// notify for anything before sincePos
-func (n *Notifier) GetListener(req syncRequest) UserDeviceStreamListener {
- // Do what synapse does: https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/notifier.py#L298
- // - Bucket request into a lookup map keyed off a list of joined room IDs and separately a user ID
- // - Incoming events wake requests for a matching room ID
- // - Incoming events wake requests for a matching user ID (needed for invites)
-
- // TODO: v1 /events 'peeking' has an 'explicit room ID' which is also tracked,
- // but given we don't do /events, let's pretend it doesn't exist.
-
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- n.removeEmptyUserStreams()
-
- return n.fetchUserDeviceStream(req.device.UserID, req.device.ID, true).GetListener(req.ctx)
-}
-
-// Load the membership states required to notify users correctly.
-func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
- roomToUsers, err := db.AllJoinedUsersInRooms(ctx)
- if err != nil {
- return err
- }
- n.setUsersJoinedToRooms(roomToUsers)
-
- roomToPeekingDevices, err := db.AllPeekingDevicesInRooms(ctx)
- if err != nil {
- return err
- }
- n.setPeekingDevices(roomToPeekingDevices)
-
- return nil
-}
-
-// CurrentPosition returns the current sync position
-func (n *Notifier) CurrentPosition() types.StreamingToken {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- return n.currPos
-}
-
-// setUsersJoinedToRooms marks the given users as 'joined' to the given rooms, such that new events from
-// these rooms will wake the given users /sync requests. This should be called prior to ANY calls to
-// OnNewEvent (eg on startup) to prevent racing.
-func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
- // This is just the bulk form of addJoinedUser
- for roomID, userIDs := range roomIDToUserIDs {
- if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
- n.roomIDToJoinedUsers[roomID] = make(userIDSet)
- }
- for _, userID := range userIDs {
- n.roomIDToJoinedUsers[roomID].add(userID)
- }
- }
-}
-
-// setPeekingDevices marks the given devices as peeking in the given rooms, such that new events from
-// these rooms will wake the given devices' /sync requests. This should be called prior to ANY calls to
-// OnNewEvent (eg on startup) to prevent racing.
-func (n *Notifier) setPeekingDevices(roomIDToPeekingDevices map[string][]types.PeekingDevice) {
- // This is just the bulk form of addPeekingDevice
- for roomID, peekingDevices := range roomIDToPeekingDevices {
- if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
- n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
- }
- for _, peekingDevice := range peekingDevices {
- n.roomIDToPeekingDevices[roomID].add(peekingDevice)
- }
- }
-}
-
-// wakeupUsers will wake up the sync strems for all of the devices for all of the
-// specified user IDs, and also the specified peekingDevices
-func (n *Notifier) wakeupUsers(userIDs []string, peekingDevices []types.PeekingDevice, newPos types.StreamingToken) {
- for _, userID := range userIDs {
- for _, stream := range n.fetchUserStreams(userID) {
- if stream == nil {
- continue
- }
- stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
- }
- }
-
- for _, peekingDevice := range peekingDevices {
- // TODO: don't bother waking up for devices whose users we already woke up
- if stream := n.fetchUserDeviceStream(peekingDevice.UserID, peekingDevice.DeviceID, false); stream != nil {
- stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
- }
- }
-}
-
-// wakeupUserDevice will wake up the sync stream for a specific user device. Other
-// device streams will be left alone.
-// nolint:unused
-func (n *Notifier) wakeupUserDevice(userID string, deviceIDs []string, newPos types.StreamingToken) {
- for _, deviceID := range deviceIDs {
- if stream := n.fetchUserDeviceStream(userID, deviceID, false); stream != nil {
- stream.Broadcast(newPos) // wake up all goroutines Wait()ing on this stream
- }
- }
-}
-
-// fetchUserDeviceStream retrieves a stream unique to the given device. If makeIfNotExists is true,
-// a stream will be made for this device if one doesn't exist and it will be returned. This
-// function does not wait for data to be available on the stream.
-// NB: Callers should have locked the mutex before calling this function.
-func (n *Notifier) fetchUserDeviceStream(userID, deviceID string, makeIfNotExists bool) *UserDeviceStream {
- _, ok := n.userDeviceStreams[userID]
- if !ok {
- if !makeIfNotExists {
- return nil
- }
- n.userDeviceStreams[userID] = map[string]*UserDeviceStream{}
- }
- stream, ok := n.userDeviceStreams[userID][deviceID]
- if !ok {
- if !makeIfNotExists {
- return nil
- }
- // TODO: Unbounded growth of streams (1 per user)
- if stream = NewUserDeviceStream(userID, deviceID, n.currPos); stream != nil {
- n.userDeviceStreams[userID][deviceID] = stream
- }
- }
- return stream
-}
-
-// fetchUserStreams retrieves all streams for the given user. If makeIfNotExists is true,
-// a stream will be made for this user if one doesn't exist and it will be returned. This
-// function does not wait for data to be available on the stream.
-// NB: Callers should have locked the mutex before calling this function.
-func (n *Notifier) fetchUserStreams(userID string) []*UserDeviceStream {
- user, ok := n.userDeviceStreams[userID]
- if !ok {
- return []*UserDeviceStream{}
- }
- streams := []*UserDeviceStream{}
- for _, stream := range user {
- streams = append(streams, stream)
- }
- return streams
-}
-
-// Not thread-safe: must be called on the OnNewEvent goroutine only
-func (n *Notifier) addJoinedUser(roomID, userID string) {
- if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
- n.roomIDToJoinedUsers[roomID] = make(userIDSet)
- }
- n.roomIDToJoinedUsers[roomID].add(userID)
-}
-
-// Not thread-safe: must be called on the OnNewEvent goroutine only
-func (n *Notifier) removeJoinedUser(roomID, userID string) {
- if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
- n.roomIDToJoinedUsers[roomID] = make(userIDSet)
- }
- n.roomIDToJoinedUsers[roomID].remove(userID)
-}
-
-// Not thread-safe: must be called on the OnNewEvent goroutine only
-func (n *Notifier) joinedUsers(roomID string) (userIDs []string) {
- if _, ok := n.roomIDToJoinedUsers[roomID]; !ok {
- return
- }
- return n.roomIDToJoinedUsers[roomID].values()
-}
-
-// Not thread-safe: must be called on the OnNewEvent goroutine only
-func (n *Notifier) addPeekingDevice(roomID, userID, deviceID string) {
- if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
- n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
- }
- n.roomIDToPeekingDevices[roomID].add(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
-}
-
-// Not thread-safe: must be called on the OnNewEvent goroutine only
-// nolint:unused
-func (n *Notifier) removePeekingDevice(roomID, userID, deviceID string) {
- if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
- n.roomIDToPeekingDevices[roomID] = make(peekingDeviceSet)
- }
- // XXX: is this going to work as a key?
- n.roomIDToPeekingDevices[roomID].remove(types.PeekingDevice{UserID: userID, DeviceID: deviceID})
-}
-
-// Not thread-safe: must be called on the OnNewEvent goroutine only
-func (n *Notifier) PeekingDevices(roomID string) (peekingDevices []types.PeekingDevice) {
- if _, ok := n.roomIDToPeekingDevices[roomID]; !ok {
- return
- }
- return n.roomIDToPeekingDevices[roomID].values()
-}
-
-// removeEmptyUserStreams iterates through the user stream map and removes any
-// that have been empty for a certain amount of time. This is a crude way of
-// ensuring that the userStreams map doesn't grow forver.
-// This should be called when the notifier gets called for whatever reason,
-// the function itself is responsible for ensuring it doesn't iterate too
-// often.
-// NB: Callers should have locked the mutex before calling this function.
-func (n *Notifier) removeEmptyUserStreams() {
- // Only clean up now and again
- now := time.Now()
- if n.lastCleanUpTime.Add(time.Minute).After(now) {
- return
- }
- n.lastCleanUpTime = now
-
- deleteBefore := now.Add(-5 * time.Minute)
- for user, byUser := range n.userDeviceStreams {
- for device, stream := range byUser {
- if stream.TimeOfLastNonEmpty().Before(deleteBefore) {
- delete(n.userDeviceStreams[user], device)
- }
- if len(n.userDeviceStreams[user]) == 0 {
- delete(n.userDeviceStreams, user)
- }
- }
- }
-}
-
-// A string set, mainly existing for improving clarity of structs in this file.
-type userIDSet map[string]bool
-
-func (s userIDSet) add(str string) {
- s[str] = true
-}
-
-func (s userIDSet) remove(str string) {
- delete(s, str)
-}
-
-func (s userIDSet) values() (vals []string) {
- for str := range s {
- vals = append(vals, str)
- }
- return
-}
-
-// A set of PeekingDevices, similar to userIDSet
-
-type peekingDeviceSet map[types.PeekingDevice]bool
-
-func (s peekingDeviceSet) add(d types.PeekingDevice) {
- s[d] = true
-}
-
-// nolint:unused
-func (s peekingDeviceSet) remove(d types.PeekingDevice) {
- delete(s, d)
-}
-
-func (s peekingDeviceSet) values() (vals []types.PeekingDevice) {
- for d := range s {
- vals = append(vals, d)
- }
- return
-}
diff --git a/syncapi/sync/notifier_test.go b/syncapi/sync/notifier_test.go
deleted file mode 100644
index d24da463..00000000
--- a/syncapi/sync/notifier_test.go
+++ /dev/null
@@ -1,374 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sync
-
-import (
- "context"
- "encoding/json"
- "fmt"
- "sync"
- "testing"
- "time"
-
- "github.com/matrix-org/dendrite/syncapi/types"
- userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
-)
-
-var (
- randomMessageEvent gomatrixserverlib.HeaderedEvent
- aliceInviteBobEvent gomatrixserverlib.HeaderedEvent
- bobLeaveEvent gomatrixserverlib.HeaderedEvent
- syncPositionVeryOld = types.StreamingToken{PDUPosition: 5}
- syncPositionBefore = types.StreamingToken{PDUPosition: 11}
- syncPositionAfter = types.StreamingToken{PDUPosition: 12}
- //syncPositionNewEDU = types.NewStreamToken(syncPositionAfter.PDUPosition, 1, 0, 0, nil)
- syncPositionAfter2 = types.StreamingToken{PDUPosition: 13}
-)
-
-var (
- roomID = "!test:localhost"
- alice = "@alice:localhost"
- aliceDev = "alicedevice"
- bob = "@bob:localhost"
- bobDev = "bobdev"
-)
-
-func init() {
- var err error
- err = json.Unmarshal([]byte(`{
- "_room_version": "1",
- "type": "m.room.message",
- "content": {
- "body": "Hello World",
- "msgtype": "m.text"
- },
- "sender": "@noone:localhost",
- "room_id": "`+roomID+`",
- "origin": "localhost",
- "origin_server_ts": 12345,
- "event_id": "$randomMessageEvent:localhost"
- }`), &randomMessageEvent)
- if err != nil {
- panic(err)
- }
- err = json.Unmarshal([]byte(`{
- "_room_version": "1",
- "type": "m.room.member",
- "state_key": "`+bob+`",
- "content": {
- "membership": "invite"
- },
- "sender": "`+alice+`",
- "room_id": "`+roomID+`",
- "origin": "localhost",
- "origin_server_ts": 12345,
- "event_id": "$aliceInviteBobEvent:localhost"
- }`), &aliceInviteBobEvent)
- if err != nil {
- panic(err)
- }
- err = json.Unmarshal([]byte(`{
- "_room_version": "1",
- "type": "m.room.member",
- "state_key": "`+bob+`",
- "content": {
- "membership": "leave"
- },
- "sender": "`+bob+`",
- "room_id": "`+roomID+`",
- "origin": "localhost",
- "origin_server_ts": 12345,
- "event_id": "$bobLeaveEvent:localhost"
- }`), &bobLeaveEvent)
- if err != nil {
- panic(err)
- }
-}
-
-func mustEqualPositions(t *testing.T, got, want types.StreamingToken) {
- if got.String() != want.String() {
- t.Fatalf("mustEqualPositions got %s want %s", got.String(), want.String())
- }
-}
-
-// Test that the current position is returned if a request is already behind.
-func TestImmediateNotification(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
- pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionVeryOld))
- if err != nil {
- t.Fatalf("TestImmediateNotification error: %s", err)
- }
- mustEqualPositions(t, pos, syncPositionBefore)
-}
-
-// Test that new events to a joined room unblocks the request.
-func TestNewEventAndJoinedToRoom(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
- n.setUsersJoinedToRooms(map[string][]string{
- roomID: {alice, bob},
- })
-
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
- if err != nil {
- t.Errorf("TestNewEventAndJoinedToRoom error: %w", err)
- }
- mustEqualPositions(t, pos, syncPositionAfter)
- wg.Done()
- }()
-
- stream := lockedFetchUserStream(n, bob, bobDev)
- waitForBlocking(stream, 1)
-
- n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
-
- wg.Wait()
-}
-
-func TestCorrectStream(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
- stream := lockedFetchUserStream(n, bob, bobDev)
- if stream.UserID != bob {
- t.Fatalf("expected user %q, got %q", bob, stream.UserID)
- }
- if stream.DeviceID != bobDev {
- t.Fatalf("expected device %q, got %q", bobDev, stream.DeviceID)
- }
-}
-
-func TestCorrectStreamWakeup(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
- awoken := make(chan string)
-
- streamone := lockedFetchUserStream(n, alice, "one")
- streamtwo := lockedFetchUserStream(n, alice, "two")
-
- go func() {
- select {
- case <-streamone.signalChannel:
- awoken <- "one"
- case <-streamtwo.signalChannel:
- awoken <- "two"
- }
- }()
-
- time.Sleep(1 * time.Second)
-
- wake := "two"
- n.wakeupUserDevice(alice, []string{wake}, syncPositionAfter)
-
- if result := <-awoken; result != wake {
- t.Fatalf("expected to wake %q, got %q", wake, result)
- }
-}
-
-// Test that an invite unblocks the request
-func TestNewInviteEventForUser(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
- n.setUsersJoinedToRooms(map[string][]string{
- roomID: {alice, bob},
- })
-
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
- if err != nil {
- t.Errorf("TestNewInviteEventForUser error: %w", err)
- }
- mustEqualPositions(t, pos, syncPositionAfter)
- wg.Done()
- }()
-
- stream := lockedFetchUserStream(n, bob, bobDev)
- waitForBlocking(stream, 1)
-
- n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionAfter)
-
- wg.Wait()
-}
-
-// Test an EDU-only update wakes up the request.
-// TODO: Fix this test, invites wake up with an incremented
-// PDU position, not EDU position
-/*
-func TestEDUWakeup(t *testing.T) {
- n := NewNotifier(syncPositionAfter)
- n.setUsersJoinedToRooms(map[string][]string{
- roomID: {alice, bob},
- })
-
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
- if err != nil {
- t.Errorf("TestNewInviteEventForUser error: %w", err)
- }
- mustEqualPositions(t, pos, syncPositionNewEDU)
- wg.Done()
- }()
-
- stream := lockedFetchUserStream(n, bob, bobDev)
- waitForBlocking(stream, 1)
-
- n.OnNewEvent(&aliceInviteBobEvent, "", nil, syncPositionNewEDU)
-
- wg.Wait()
-}
-*/
-
-// Test that all blocked requests get woken up on a new event.
-func TestMultipleRequestWakeup(t *testing.T) {
- n := NewNotifier(syncPositionBefore)
- n.setUsersJoinedToRooms(map[string][]string{
- roomID: {alice, bob},
- })
-
- var wg sync.WaitGroup
- wg.Add(3)
- poll := func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
- if err != nil {
- t.Errorf("TestMultipleRequestWakeup error: %w", err)
- }
- mustEqualPositions(t, pos, syncPositionAfter)
- wg.Done()
- }
- go poll()
- go poll()
- go poll()
-
- stream := lockedFetchUserStream(n, bob, bobDev)
- waitForBlocking(stream, 3)
-
- n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter)
-
- wg.Wait()
-
- numWaiting := stream.NumWaiting()
- if numWaiting != 0 {
- t.Errorf("TestMultipleRequestWakeup NumWaiting() want 0, got %d", numWaiting)
- }
-}
-
-// Test that you stop getting woken up when you leave a room.
-func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
- // listen as bob. Make bob leave room. Make alice send event to room.
- // Make sure alice gets woken up only and not bob as well.
- n := NewNotifier(syncPositionBefore)
- n.setUsersJoinedToRooms(map[string][]string{
- roomID: {alice, bob},
- })
-
- var leaveWG sync.WaitGroup
-
- // Make bob leave the room
- leaveWG.Add(1)
- go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionBefore))
- if err != nil {
- t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err)
- }
- mustEqualPositions(t, pos, syncPositionAfter)
- leaveWG.Done()
- }()
- bobStream := lockedFetchUserStream(n, bob, bobDev)
- waitForBlocking(bobStream, 1)
- n.OnNewEvent(&bobLeaveEvent, "", nil, syncPositionAfter)
- leaveWG.Wait()
-
- // send an event into the room. Make sure alice gets it. Bob should not.
- var aliceWG sync.WaitGroup
- aliceStream := lockedFetchUserStream(n, alice, aliceDev)
- aliceWG.Add(1)
- go func() {
- pos, err := waitForEvents(n, newTestSyncRequest(alice, aliceDev, syncPositionAfter))
- if err != nil {
- t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %w", err)
- }
- mustEqualPositions(t, pos, syncPositionAfter2)
- aliceWG.Done()
- }()
-
- go func() {
- // this should timeout with an error (but the main goroutine won't wait for the timeout explicitly)
- _, err := waitForEvents(n, newTestSyncRequest(bob, bobDev, syncPositionAfter))
- if err == nil {
- t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom expect error but got nil")
- }
- }()
-
- waitForBlocking(aliceStream, 1)
- waitForBlocking(bobStream, 1)
-
- n.OnNewEvent(&randomMessageEvent, "", nil, syncPositionAfter2)
- aliceWG.Wait()
-
- // it's possible that at this point alice has been informed and bob is about to be informed, so wait
- // for a fraction of a second to account for this race
- time.Sleep(1 * time.Millisecond)
-}
-
-func waitForEvents(n *Notifier, req syncRequest) (types.StreamingToken, error) {
- listener := n.GetListener(req)
- defer listener.Close()
-
- select {
- case <-time.After(5 * time.Second):
- return types.StreamingToken{}, fmt.Errorf(
- "waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
- )
- case <-listener.GetNotifyChannel(req.since):
- p := listener.GetSyncPosition()
- return p, nil
- }
-}
-
-// Wait until something is Wait()ing on the user stream.
-func waitForBlocking(s *UserDeviceStream, numBlocking uint) {
- for numBlocking != s.NumWaiting() {
- // This is horrible but I don't want to add a signalling mechanism JUST for testing.
- time.Sleep(1 * time.Microsecond)
- }
-}
-
-// lockedFetchUserStream invokes Notifier.fetchUserStream, respecting Notifier.streamLock.
-// A new stream is made if it doesn't exist already.
-func lockedFetchUserStream(n *Notifier, userID, deviceID string) *UserDeviceStream {
- n.streamLock.Lock()
- defer n.streamLock.Unlock()
-
- return n.fetchUserDeviceStream(userID, deviceID, true)
-}
-
-func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) syncRequest {
- return syncRequest{
- device: userapi.Device{
- UserID: userID,
- ID: deviceID,
- },
- timeout: 1 * time.Minute,
- since: since,
- wantFullState: false,
- limit: DefaultTimelineLimit,
- log: util.GetLogger(context.TODO()),
- ctx: context.TODO(),
- }
-}
diff --git a/syncapi/sync/request.go b/syncapi/sync/request.go
index f2f2894b..5f89ffc3 100644
--- a/syncapi/sync/request.go
+++ b/syncapi/sync/request.go
@@ -15,7 +15,6 @@
package sync
import (
- "context"
"encoding/json"
"net/http"
"strconv"
@@ -26,7 +25,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
- log "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
)
const defaultSyncTimeout = time.Duration(0)
@@ -40,18 +39,7 @@ type filter struct {
} `json:"room"`
}
-// syncRequest represents a /sync request, with sensible defaults/sanity checks applied.
-type syncRequest struct {
- ctx context.Context
- device userapi.Device
- limit int
- timeout time.Duration
- since types.StreamingToken // nil means that no since token was supplied
- wantFullState bool
- log *log.Entry
-}
-
-func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*syncRequest, error) {
+func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) {
timeout := getTimeout(req.URL.Query().Get("timeout"))
fullState := req.URL.Query().Get("full_state")
wantFullState := fullState != "" && fullState != "false"
@@ -87,15 +75,30 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
}
}
}
+
+ filter := gomatrixserverlib.DefaultEventFilter()
+ filter.Limit = timelineLimit
// TODO: Additional query params: set_presence, filter
- return &syncRequest{
- ctx: req.Context(),
- device: device,
- timeout: timeout,
- since: since,
- wantFullState: wantFullState,
- limit: timelineLimit,
- log: util.GetLogger(req.Context()),
+
+ logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
+ "user_id": device.UserID,
+ "device_id": device.ID,
+ "since": since,
+ "timeout": timeout,
+ "limit": timelineLimit,
+ })
+
+ return &types.SyncRequest{
+ Context: req.Context(), //
+ Log: logger, //
+ Device: &device, //
+ Response: types.NewResponse(), // Populated by all streams
+ Filter: filter, //
+ Since: since, //
+ Timeout: timeout, //
+ Limit: timelineLimit, //
+ Rooms: make(map[string]string), // Populated by the PDU stream
+ WantFullState: wantFullState, //
}, nil
}
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index 0751487a..384fc25c 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -17,8 +17,6 @@
package sync
import (
- "context"
- "fmt"
"net"
"net/http"
"strings"
@@ -30,13 +28,13 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/syncapi/internal"
+ "github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
- log "github.com/sirupsen/logrus"
)
// RequestPool manages HTTP long-poll connections for /sync
@@ -44,19 +42,30 @@ type RequestPool struct {
db storage.Database
cfg *config.SyncAPI
userAPI userapi.UserInternalAPI
- Notifier *Notifier
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
lastseen sync.Map
+ streams *streams.Streams
+ Notifier *notifier.Notifier
}
// NewRequestPool makes a new RequestPool
func NewRequestPool(
- db storage.Database, cfg *config.SyncAPI, n *Notifier,
+ db storage.Database, cfg *config.SyncAPI,
userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
+ streams *streams.Streams, notifier *notifier.Notifier,
) *RequestPool {
- rp := &RequestPool{db, cfg, userAPI, n, keyAPI, rsAPI, sync.Map{}}
+ rp := &RequestPool{
+ db: db,
+ cfg: cfg,
+ userAPI: userAPI,
+ keyAPI: keyAPI,
+ rsAPI: rsAPI,
+ lastseen: sync.Map{},
+ streams: streams,
+ Notifier: notifier,
+ }
go rp.cleanLastSeen()
return rp
}
@@ -128,8 +137,6 @@ var waitingSyncRequests = prometheus.NewGauge(
// called in a dedicated goroutine for this request. This function will block the goroutine
// until a response is ready, or it times out.
func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
- var syncData *types.Response
-
// Extract values from request
syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil {
@@ -139,89 +146,109 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
}
}
- logger := util.GetLogger(req.Context()).WithFields(log.Fields{
- "user_id": device.UserID,
- "device_id": device.ID,
- "since": syncReq.since,
- "timeout": syncReq.timeout,
- "limit": syncReq.limit,
- })
-
activeSyncRequests.Inc()
defer activeSyncRequests.Dec()
rp.updateLastSeen(req, device)
- currPos := rp.Notifier.CurrentPosition()
-
- if rp.shouldReturnImmediately(syncReq) {
- syncData, err = rp.currentSyncForUser(*syncReq, currPos)
- if err != nil {
- logger.WithError(err).Error("rp.currentSyncForUser failed")
- return jsonerror.InternalServerError()
- }
- logger.WithField("next", syncData.NextBatch).Info("Responding immediately")
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: syncData,
- }
- }
-
waitingSyncRequests.Inc()
defer waitingSyncRequests.Dec()
- // Otherwise, we wait for the notifier to tell us if something *may* have
- // happened. We loop in case it turns out that nothing did happen.
+ currentPos := rp.Notifier.CurrentPosition()
- timer := time.NewTimer(syncReq.timeout) // case of timeout=0 is handled above
- defer timer.Stop()
+ if !rp.shouldReturnImmediately(syncReq) {
+ timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
+ defer timer.Stop()
- userStreamListener := rp.Notifier.GetListener(*syncReq)
- defer userStreamListener.Close()
+ userStreamListener := rp.Notifier.GetListener(*syncReq)
+ defer userStreamListener.Close()
- // We need the loop in case userStreamListener wakes up even if there isn't
- // anything to send down. In this case, we'll jump out of the select but
- // don't want to send anything back until we get some actual content to
- // respond with, so we skip the return an go back to waiting for content to
- // be sent down or the request timing out.
- var hasTimedOut bool
- sincePos := syncReq.since
- for {
- select {
- // Wait for notifier to wake us up
- case <-userStreamListener.GetNotifyChannel(sincePos):
- currPos = userStreamListener.GetSyncPosition()
- // Or for timeout to expire
- case <-timer.C:
- // We just need to ensure we get out of the select after reaching the
- // timeout, but there's nothing specific we want to do in this case
- // apart from that, so we do nothing except stating we're timing out
- // and need to respond.
- hasTimedOut = true
- // Or for the request to be cancelled
- case <-req.Context().Done():
- logger.WithError(err).Error("request cancelled")
- return jsonerror.InternalServerError()
+ giveup := func() util.JSONResponse {
+ syncReq.Response.NextBatch = syncReq.Since
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: syncReq.Response,
+ }
}
- // Note that we don't time out during calculation of sync
- // response. This ensures that we don't waste the hard work
- // of calculating the sync only to get timed out before we
- // can respond
- syncData, err = rp.currentSyncForUser(*syncReq, currPos)
- if err != nil {
- logger.WithError(err).Error("rp.currentSyncForUser failed")
- return jsonerror.InternalServerError()
+ select {
+ case <-syncReq.Context.Done(): // Caller gave up
+ return giveup()
+
+ case <-timer.C: // Timeout reached
+ return giveup()
+
+ case <-userStreamListener.GetNotifyChannel(syncReq.Since):
+ syncReq.Log.Debugln("Responding to sync after wake-up")
+ currentPos.ApplyUpdates(userStreamListener.GetSyncPosition())
}
+ } else {
+ syncReq.Log.Debugln("Responding to sync immediately")
+ }
- if !syncData.IsEmpty() || hasTimedOut {
- logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding")
- return util.JSONResponse{
- Code: http.StatusOK,
- JSON: syncData,
- }
+ if syncReq.Since.IsEmpty() {
+ // Complete sync
+ syncReq.Response.NextBatch = types.StreamingToken{
+ PDUPosition: rp.streams.PDUStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ TypingPosition: rp.streams.TypingStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ ReceiptPosition: rp.streams.ReceiptStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ InvitePosition: rp.streams.InviteStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ AccountDataPosition: rp.streams.AccountDataStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ DeviceListPosition: rp.streams.DeviceListStreamProvider.CompleteSync(
+ syncReq.Context, syncReq,
+ ),
+ }
+ } else {
+ // Incremental sync
+ syncReq.Response.NextBatch = types.StreamingToken{
+ PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.PDUPosition, currentPos.PDUPosition,
+ ),
+ TypingPosition: rp.streams.TypingStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.TypingPosition, currentPos.TypingPosition,
+ ),
+ ReceiptPosition: rp.streams.ReceiptStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.ReceiptPosition, currentPos.ReceiptPosition,
+ ),
+ InvitePosition: rp.streams.InviteStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.InvitePosition, currentPos.InvitePosition,
+ ),
+ SendToDevicePosition: rp.streams.SendToDeviceStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.SendToDevicePosition, currentPos.SendToDevicePosition,
+ ),
+ AccountDataPosition: rp.streams.AccountDataStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.AccountDataPosition, currentPos.AccountDataPosition,
+ ),
+ DeviceListPosition: rp.streams.DeviceListStreamProvider.IncrementalSync(
+ syncReq.Context, syncReq,
+ syncReq.Since.DeviceListPosition, currentPos.DeviceListPosition,
+ ),
}
}
+
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: syncReq.Response,
+ }
}
func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *userapi.Device) util.JSONResponse {
@@ -247,18 +274,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
JSON: jsonerror.InvalidArgumentValue("bad 'to' value"),
}
}
- // work out room joins/leaves
- res, err := rp.db.IncrementalSync(
- req.Context(), types.NewResponse(), *device, fromToken, toToken, 10, false,
- )
+ syncReq, err := newSyncRequest(req, *device, rp.db)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("Failed to IncrementalSync")
+ util.GetLogger(req.Context()).WithError(err).Error("newSyncRequest failed")
return jsonerror.InternalServerError()
}
-
- res, err = rp.appendDeviceLists(res, device.UserID, fromToken, toToken)
+ rp.streams.PDUStreamProvider.IncrementalSync(req.Context(), syncReq, fromToken.PDUPosition, toToken.PDUPosition)
+ _, _, err = internal.DeviceListCatchup(
+ req.Context(), rp.keyAPI, rp.rsAPI, syncReq.Device.UserID,
+ syncReq.Response, fromToken.DeviceListPosition, toToken.DeviceListPosition,
+ )
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("Failed to appendDeviceLists info")
+ util.GetLogger(req.Context()).WithError(err).Error("Failed to DeviceListCatchup info")
return jsonerror.InternalServerError()
}
return util.JSONResponse{
@@ -267,199 +294,18 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
Changed []string `json:"changed"`
Left []string `json:"left"`
}{
- Changed: res.DeviceLists.Changed,
- Left: res.DeviceLists.Left,
+ Changed: syncReq.Response.DeviceLists.Changed,
+ Left: syncReq.Response.DeviceLists.Left,
},
}
}
-// nolint:gocyclo
-func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.StreamingToken) (*types.Response, error) {
- res := types.NewResponse()
-
- // See if we have any new tasks to do for the send-to-device messaging.
- lastPos, events, updates, deletions, err := rp.db.SendToDeviceUpdatesForSync(req.ctx, req.device.UserID, req.device.ID, req.since)
- if err != nil {
- return nil, fmt.Errorf("rp.db.SendToDeviceUpdatesForSync: %w", err)
- }
-
- // TODO: handle ignored users
- if req.since.IsEmpty() {
- res, err = rp.db.CompleteSync(req.ctx, res, req.device, req.limit)
- if err != nil {
- return res, fmt.Errorf("rp.db.CompleteSync: %w", err)
- }
- } else {
- res, err = rp.db.IncrementalSync(req.ctx, res, req.device, req.since, latestPos, req.limit, req.wantFullState)
- if err != nil {
- return res, fmt.Errorf("rp.db.IncrementalSync: %w", err)
- }
- }
-
- accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
- res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter)
- if err != nil {
- return res, fmt.Errorf("rp.appendAccountData: %w", err)
- }
- res, err = rp.appendDeviceLists(res, req.device.UserID, req.since, latestPos)
- if err != nil {
- return res, fmt.Errorf("rp.appendDeviceLists: %w", err)
- }
- err = internal.DeviceOTKCounts(req.ctx, rp.keyAPI, req.device.UserID, req.device.ID, res)
- if err != nil {
- return res, fmt.Errorf("internal.DeviceOTKCounts: %w", err)
- }
-
- // Before we return the sync response, make sure that we take action on
- // any send-to-device database updates or deletions that we need to do.
- // Then add the updates into the sync response.
- if len(updates) > 0 || len(deletions) > 0 {
- // Handle the updates and deletions in the database.
- err = rp.db.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.since)
- if err != nil {
- return res, fmt.Errorf("rp.db.CleanSendToDeviceUpdates: %w", err)
- }
- }
- if len(events) > 0 {
- // Add the updates into the sync response.
- for _, event := range events {
- res.ToDevice.Events = append(res.ToDevice.Events, event.SendToDeviceEvent)
- }
- }
-
- res.NextBatch.SendToDevicePosition = lastPos
- return res, err
-}
-
-func (rp *RequestPool) appendDeviceLists(
- data *types.Response, userID string, since, to types.StreamingToken,
-) (*types.Response, error) {
- _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to)
- if err != nil {
- return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
- }
-
- return data, nil
-}
-
-// nolint:gocyclo
-func (rp *RequestPool) appendAccountData(
- data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
- accountDataFilter *gomatrixserverlib.EventFilter,
-) (*types.Response, error) {
- // TODO: Account data doesn't have a sync position of its own, meaning that
- // account data might be sent multiple time to the client if multiple account
- // data keys were set between two message. This isn't a huge issue since the
- // duplicate data doesn't represent a huge quantity of data, but an optimisation
- // here would be making sure each data is sent only once to the client.
- if req.since.IsEmpty() {
- // If this is the initial sync, we don't need to check if a data has
- // already been sent. Instead, we send the whole batch.
- dataReq := &userapi.QueryAccountDataRequest{
- UserID: userID,
- }
- dataRes := &userapi.QueryAccountDataResponse{}
- if err := rp.userAPI.QueryAccountData(req.ctx, dataReq, dataRes); err != nil {
- return nil, err
- }
- for datatype, databody := range dataRes.GlobalAccountData {
- data.AccountData.Events = append(
- data.AccountData.Events,
- gomatrixserverlib.ClientEvent{
- Type: datatype,
- Content: gomatrixserverlib.RawJSON(databody),
- },
- )
- }
- for r, j := range data.Rooms.Join {
- for datatype, databody := range dataRes.RoomAccountData[r] {
- j.AccountData.Events = append(
- j.AccountData.Events,
- gomatrixserverlib.ClientEvent{
- Type: datatype,
- Content: gomatrixserverlib.RawJSON(databody),
- },
- )
- data.Rooms.Join[r] = j
- }
- }
- return data, nil
- }
-
- r := types.Range{
- From: req.since.PDUPosition,
- To: currentPos,
- }
- // If both positions are the same, it means that the data was saved after the
- // latest room event. In that case, we need to decrement the old position as
- // results are exclusive of Low.
- if r.Low() == r.High() {
- r.From--
- }
-
- // Sync is not initial, get all account data since the latest sync
- dataTypes, err := rp.db.GetAccountDataInRange(
- req.ctx, userID, r, accountDataFilter,
- )
- if err != nil {
- return nil, fmt.Errorf("rp.db.GetAccountDataInRange: %w", err)
- }
-
- if len(dataTypes) == 0 {
- // TODO: this fixes the sytest but is it the right thing to do?
- dataTypes[""] = []string{"m.push_rules"}
- }
-
- // Iterate over the rooms
- for roomID, dataTypes := range dataTypes {
- // Request the missing data from the database
- for _, dataType := range dataTypes {
- dataReq := userapi.QueryAccountDataRequest{
- UserID: userID,
- RoomID: roomID,
- DataType: dataType,
- }
- dataRes := userapi.QueryAccountDataResponse{}
- err = rp.userAPI.QueryAccountData(req.ctx, &dataReq, &dataRes)
- if err != nil {
- continue
- }
- if roomID == "" {
- if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
- data.AccountData.Events = append(
- data.AccountData.Events,
- gomatrixserverlib.ClientEvent{
- Type: dataType,
- Content: gomatrixserverlib.RawJSON(globalData),
- },
- )
- }
- } else {
- if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
- joinData := data.Rooms.Join[roomID]
- joinData.AccountData.Events = append(
- joinData.AccountData.Events,
- gomatrixserverlib.ClientEvent{
- Type: dataType,
- Content: gomatrixserverlib.RawJSON(roomData),
- },
- )
- data.Rooms.Join[roomID] = joinData
- }
- }
- }
- }
-
- return data, nil
-}
-
// shouldReturnImmediately returns whether the /sync request is an initial sync,
// or timeout=0, or full_state=true, in any of the cases the request should
// return immediately.
-func (rp *RequestPool) shouldReturnImmediately(syncReq *syncRequest) bool {
- if syncReq.since.IsEmpty() || syncReq.timeout == 0 || syncReq.wantFullState {
+func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest) bool {
+ if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState {
return true
}
- waiting, werr := rp.db.SendToDeviceUpdatesWaiting(context.TODO(), syncReq.device.UserID, syncReq.device.ID)
- return werr == nil && waiting
+ return false
}
diff --git a/syncapi/sync/userstream.go b/syncapi/sync/userstream.go
deleted file mode 100644
index ff9a4d00..00000000
--- a/syncapi/sync/userstream.go
+++ /dev/null
@@ -1,162 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package sync
-
-import (
- "context"
- "runtime"
- "sync"
- "time"
-
- "github.com/matrix-org/dendrite/syncapi/types"
-)
-
-// UserDeviceStream represents a communication mechanism between the /sync request goroutine
-// and the underlying sync server goroutines.
-// Goroutines can get a UserStreamListener to wait for updates, and can Broadcast()
-// updates.
-type UserDeviceStream struct {
- UserID string
- DeviceID string
- // The lock that protects changes to this struct
- lock sync.Mutex
- // Closed when there is an update.
- signalChannel chan struct{}
- // The last sync position that there may have been an update for the user
- pos types.StreamingToken
- // The last time when we had some listeners waiting
- timeOfLastChannel time.Time
- // The number of listeners waiting
- numWaiting uint
-}
-
-// UserDeviceStreamListener allows a sync request to wait for updates for a user.
-type UserDeviceStreamListener struct {
- userStream *UserDeviceStream
-
- // Whether the stream has been closed
- hasClosed bool
-}
-
-// NewUserDeviceStream creates a new user stream
-func NewUserDeviceStream(userID, deviceID string, currPos types.StreamingToken) *UserDeviceStream {
- return &UserDeviceStream{
- UserID: userID,
- DeviceID: deviceID,
- timeOfLastChannel: time.Now(),
- pos: currPos,
- signalChannel: make(chan struct{}),
- }
-}
-
-// GetListener returns UserStreamListener that a sync request can use to wait
-// for new updates with.
-// UserStreamListener must be closed
-func (s *UserDeviceStream) GetListener(ctx context.Context) UserDeviceStreamListener {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- s.numWaiting++ // We decrement when UserStreamListener is closed
-
- listener := UserDeviceStreamListener{
- userStream: s,
- }
-
- // Lets be a bit paranoid here and check that Close() is being called
- runtime.SetFinalizer(&listener, func(l *UserDeviceStreamListener) {
- if !l.hasClosed {
- l.Close()
- }
- })
-
- return listener
-}
-
-// Broadcast a new sync position for this user.
-func (s *UserDeviceStream) Broadcast(pos types.StreamingToken) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- s.pos = pos
-
- close(s.signalChannel)
-
- s.signalChannel = make(chan struct{})
-}
-
-// NumWaiting returns the number of goroutines waiting for waiting for updates.
-// Used for metrics and testing.
-func (s *UserDeviceStream) NumWaiting() uint {
- s.lock.Lock()
- defer s.lock.Unlock()
- return s.numWaiting
-}
-
-// TimeOfLastNonEmpty returns the last time that the number of waiting listeners
-// was non-empty, may be time.Now() if number of waiting listeners is currently
-// non-empty.
-func (s *UserDeviceStream) TimeOfLastNonEmpty() time.Time {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- if s.numWaiting > 0 {
- return time.Now()
- }
-
- return s.timeOfLastChannel
-}
-
-// GetSyncPosition returns last sync position which the UserStream was
-// notified about
-func (s *UserDeviceStreamListener) GetSyncPosition() types.StreamingToken {
- s.userStream.lock.Lock()
- defer s.userStream.lock.Unlock()
-
- return s.userStream.pos
-}
-
-// GetNotifyChannel returns a channel that is closed when there may be an
-// update for the user.
-// sincePos specifies from which point we want to be notified about. If there
-// has already been an update after sincePos we'll return a closed channel
-// immediately.
-func (s *UserDeviceStreamListener) GetNotifyChannel(sincePos types.StreamingToken) <-chan struct{} {
- s.userStream.lock.Lock()
- defer s.userStream.lock.Unlock()
-
- if s.userStream.pos.IsAfter(sincePos) {
- // If the listener is behind, i.e. missed a potential update, then we
- // want them to wake up immediately. We do this by returning a new
- // closed stream, which returns immediately when selected.
- closedChannel := make(chan struct{})
- close(closedChannel)
- return closedChannel
- }
-
- return s.userStream.signalChannel
-}
-
-// Close cleans up resources used
-func (s *UserDeviceStreamListener) Close() {
- s.userStream.lock.Lock()
- defer s.userStream.lock.Unlock()
-
- if !s.hasClosed {
- s.userStream.numWaiting--
- s.userStream.timeOfLastChannel = time.Now()
- }
-
- s.hasClosed = true
-}