aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-08 16:59:06 +0000
committerGitHub <noreply@github.com>2021-01-08 16:59:06 +0000
commitb5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch)
treeb3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/streams
parent56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff)
Sync refactor — Part 1 (#1688)
* It's half-alive * Wakeups largely working * Other tweaks, typing works * Fix bugs, add receipt stream * Delete notifier, other tweaks * Dedupe a bit, add a template for the invite stream * Clean up, add templates for other streams * Don't leak channels * Bring forward some more PDU logic, clean up other places * Add some more wakeups * Use addRoomDeltaToResponse * Log tweaks, typing fixed? * Fix timed out syncs * Don't reset next batch position on timeout * Add account data stream/position * End of day * Fix complete sync for receipt, typing * Streams package * Clean up a bit * Complete sync send-to-device * Don't drop errors * More lightweight notifications * Fix typing positions * Don't advance position on remove again unless needed * Device list updates * Advance account data position * Use limit for incremental sync * Limit fixes, amongst other things * Remove some fmt.Println * Tweaks * Re-add notifier * Fix invite position * Fixes * Notify account data without advancing PDU position in notifier * Apply account data position * Get initial position for account data * Fix position update * Fix complete sync positions * Review comments @Kegsay * Room consumer parameters
Diffstat (limited to 'syncapi/streams')
-rw-r--r--syncapi/streams/stream_accountdata.go132
-rw-r--r--syncapi/streams/stream_devicelist.go43
-rw-r--r--syncapi/streams/stream_invite.go64
-rw-r--r--syncapi/streams/stream_pdu.go305
-rw-r--r--syncapi/streams/stream_receipt.go91
-rw-r--r--syncapi/streams/stream_sendtodevice.go51
-rw-r--r--syncapi/streams/stream_typing.go57
-rw-r--r--syncapi/streams/streams.go78
-rw-r--r--syncapi/streams/template_pstream.go38
-rw-r--r--syncapi/streams/template_stream.go38
10 files changed, 897 insertions, 0 deletions
diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go
new file mode 100644
index 00000000..aa7f0937
--- /dev/null
+++ b/syncapi/streams/stream_accountdata.go
@@ -0,0 +1,132 @@
+package streams
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type AccountDataStreamProvider struct {
+ StreamProvider
+ userAPI userapi.UserInternalAPI
+}
+
+func (p *AccountDataStreamProvider) Setup() {
+ p.StreamProvider.Setup()
+
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := p.DB.MaxStreamPositionForAccountData(context.Background())
+ if err != nil {
+ panic(err)
+ }
+ p.latest = id
+}
+
+func (p *AccountDataStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ dataReq := &userapi.QueryAccountDataRequest{
+ UserID: req.Device.UserID,
+ }
+ dataRes := &userapi.QueryAccountDataResponse{}
+ if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil {
+ req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
+ return p.LatestPosition(ctx)
+ }
+ for datatype, databody := range dataRes.GlobalAccountData {
+ req.Response.AccountData.Events = append(
+ req.Response.AccountData.Events,
+ gomatrixserverlib.ClientEvent{
+ Type: datatype,
+ Content: gomatrixserverlib.RawJSON(databody),
+ },
+ )
+ }
+ for r, j := range req.Response.Rooms.Join {
+ for datatype, databody := range dataRes.RoomAccountData[r] {
+ j.AccountData.Events = append(
+ j.AccountData.Events,
+ gomatrixserverlib.ClientEvent{
+ Type: datatype,
+ Content: gomatrixserverlib.RawJSON(databody),
+ },
+ )
+ req.Response.Rooms.Join[r] = j
+ }
+ }
+
+ return p.LatestPosition(ctx)
+}
+
+func (p *AccountDataStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) types.StreamPosition {
+ r := types.Range{
+ From: from,
+ To: to,
+ }
+ accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
+
+ dataTypes, err := p.DB.GetAccountDataInRange(
+ ctx, req.Device.UserID, r, &accountDataFilter,
+ )
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
+ return from
+ }
+
+ if len(dataTypes) == 0 {
+ // TODO: this fixes the sytest but is it the right thing to do?
+ dataTypes[""] = []string{"m.push_rules"}
+ }
+
+ // Iterate over the rooms
+ for roomID, dataTypes := range dataTypes {
+ // Request the missing data from the database
+ for _, dataType := range dataTypes {
+ dataReq := userapi.QueryAccountDataRequest{
+ UserID: req.Device.UserID,
+ RoomID: roomID,
+ DataType: dataType,
+ }
+ dataRes := userapi.QueryAccountDataResponse{}
+ err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes)
+ if err != nil {
+ req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
+ continue
+ }
+ if roomID == "" {
+ if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
+ req.Response.AccountData.Events = append(
+ req.Response.AccountData.Events,
+ gomatrixserverlib.ClientEvent{
+ Type: dataType,
+ Content: gomatrixserverlib.RawJSON(globalData),
+ },
+ )
+ }
+ } else {
+ if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
+ joinData := req.Response.Rooms.Join[roomID]
+ joinData.AccountData.Events = append(
+ joinData.AccountData.Events,
+ gomatrixserverlib.ClientEvent{
+ Type: dataType,
+ Content: gomatrixserverlib.RawJSON(roomData),
+ },
+ )
+ req.Response.Rooms.Join[roomID] = joinData
+ }
+ }
+ }
+ }
+
+ return to
+}
diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go
new file mode 100644
index 00000000..c43d50a4
--- /dev/null
+++ b/syncapi/streams/stream_devicelist.go
@@ -0,0 +1,43 @@
+package streams
+
+import (
+ "context"
+
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/internal"
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+type DeviceListStreamProvider struct {
+ PartitionedStreamProvider
+ rsAPI api.RoomserverInternalAPI
+ keyAPI keyapi.KeyInternalAPI
+}
+
+func (p *DeviceListStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.LogPosition {
+ return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx))
+}
+
+func (p *DeviceListStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.LogPosition,
+) types.LogPosition {
+ var err error
+ to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
+ if err != nil {
+ req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
+ return from
+ }
+ err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response)
+ if err != nil {
+ req.Log.WithError(err).Error("internal.DeviceListCatchup failed")
+ return from
+ }
+
+ return to
+}
diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go
new file mode 100644
index 00000000..10a0dda8
--- /dev/null
+++ b/syncapi/streams/stream_invite.go
@@ -0,0 +1,64 @@
+package streams
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+type InviteStreamProvider struct {
+ StreamProvider
+}
+
+func (p *InviteStreamProvider) Setup() {
+ p.StreamProvider.Setup()
+
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := p.DB.MaxStreamPositionForInvites(context.Background())
+ if err != nil {
+ panic(err)
+ }
+ p.latest = id
+}
+
+func (p *InviteStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+}
+
+func (p *InviteStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) types.StreamPosition {
+ r := types.Range{
+ From: from,
+ To: to,
+ }
+
+ invites, retiredInvites, err := p.DB.InviteEventsInRange(
+ ctx, req.Device.UserID, r,
+ )
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed")
+ return from
+ }
+
+ for roomID, inviteEvent := range invites {
+ ir := types.NewInviteResponse(inviteEvent)
+ req.Response.Rooms.Invite[roomID] = *ir
+ }
+
+ for roomID := range retiredInvites {
+ if _, ok := req.Response.Rooms.Join[roomID]; !ok {
+ lr := types.NewLeaveResponse()
+ req.Response.Rooms.Leave[roomID] = *lr
+ }
+ }
+
+ return to
+}
diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go
new file mode 100644
index 00000000..016c182e
--- /dev/null
+++ b/syncapi/streams/stream_pdu.go
@@ -0,0 +1,305 @@
+package streams
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type PDUStreamProvider struct {
+ StreamProvider
+}
+
+func (p *PDUStreamProvider) Setup() {
+ p.StreamProvider.Setup()
+
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := p.DB.MaxStreamPositionForPDUs(context.Background())
+ if err != nil {
+ panic(err)
+ }
+ p.latest = id
+}
+
+func (p *PDUStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ from := types.StreamPosition(0)
+ to := p.LatestPosition(ctx)
+
+ // Get the current sync position which we will base the sync response on.
+ // For complete syncs, we want to start at the most recent events and work
+ // backwards, so that we show the most recent events in the room.
+ r := types.Range{
+ From: to,
+ To: 0,
+ Backwards: true,
+ }
+
+ // Extract room state and recent events for all rooms the user is joined to.
+ joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
+ return from
+ }
+
+ stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
+
+ // Build up a /sync response. Add joined rooms.
+ for _, roomID := range joinedRoomIDs {
+ var jr *types.JoinResponse
+ jr, err = p.getJoinResponseForCompleteSync(
+ ctx, roomID, r, &stateFilter, req.Limit, req.Device,
+ )
+ if err != nil {
+ req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
+ return from
+ }
+ req.Response.Rooms.Join[roomID] = *jr
+ req.Rooms[roomID] = gomatrixserverlib.Join
+ }
+
+ // Add peeked rooms.
+ peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
+ return from
+ }
+ for _, peek := range peeks {
+ if !peek.Deleted {
+ var jr *types.JoinResponse
+ jr, err = p.getJoinResponseForCompleteSync(
+ ctx, peek.RoomID, r, &stateFilter, req.Limit, req.Device,
+ )
+ if err != nil {
+ req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
+ return from
+ }
+ req.Response.Rooms.Peek[peek.RoomID] = *jr
+ }
+ }
+
+ return to
+}
+
+// nolint:gocyclo
+func (p *PDUStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) (newPos types.StreamPosition) {
+ r := types.Range{
+ From: from,
+ To: to,
+ Backwards: from > to,
+ }
+ newPos = to
+
+ var err error
+ var stateDeltas []types.StateDelta
+ var joinedRooms []string
+
+ // TODO: use filter provided in request
+ stateFilter := gomatrixserverlib.DefaultStateFilter()
+
+ if req.WantFullState {
+ if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
+ req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
+ return
+ }
+ } else {
+ if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
+ req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
+ return
+ }
+ }
+
+ for _, roomID := range joinedRooms {
+ req.Rooms[roomID] = gomatrixserverlib.Join
+ }
+
+ for _, delta := range stateDeltas {
+ if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil {
+ req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
+ return newPos
+ }
+ }
+
+ return r.To
+}
+
+func (p *PDUStreamProvider) addRoomDeltaToResponse(
+ ctx context.Context,
+ device *userapi.Device,
+ r types.Range,
+ delta types.StateDelta,
+ numRecentEventsPerRoom int,
+ res *types.Response,
+) error {
+ if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
+ // make sure we don't leak recent events after the leave event.
+ // TODO: History visibility makes this somewhat complex to handle correctly. For example:
+ // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
+ // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
+ // in a single /sync request
+ // This is all "okay" assuming history_visibility == "shared" which it is by default.
+ r.To = delta.MembershipPos
+ }
+ recentStreamEvents, limited, err := p.DB.RecentEvents(
+ ctx, delta.RoomID, r,
+ numRecentEventsPerRoom, true, true,
+ )
+ if err != nil {
+ return err
+ }
+ recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
+ delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
+ prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
+ if err != nil {
+ return err
+ }
+
+ // XXX: should we ever get this far if we have no recent events or state in this room?
+ // in practice we do for peeks, but possibly not joins?
+ if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
+ return nil
+ }
+
+ switch delta.Membership {
+ case gomatrixserverlib.Join:
+ jr := types.NewJoinResponse()
+
+ jr.Timeline.PrevBatch = &prevBatch
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = limited
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Join[delta.RoomID] = *jr
+ case gomatrixserverlib.Peek:
+ jr := types.NewJoinResponse()
+
+ jr.Timeline.PrevBatch = &prevBatch
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = limited
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Peek[delta.RoomID] = *jr
+ case gomatrixserverlib.Leave:
+ fallthrough // transitions to leave are the same as ban
+ case gomatrixserverlib.Ban:
+ // TODO: recentEvents may contain events that this user is not allowed to see because they are
+ // no longer in the room.
+ lr := types.NewLeaveResponse()
+ lr.Timeline.PrevBatch = &prevBatch
+ lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
+ lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
+ res.Rooms.Leave[delta.RoomID] = *lr
+ }
+
+ return nil
+}
+
+func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
+ ctx context.Context,
+ roomID string,
+ r types.Range,
+ stateFilter *gomatrixserverlib.StateFilter,
+ numRecentEventsPerRoom int, device *userapi.Device,
+) (jr *types.JoinResponse, err error) {
+ var stateEvents []*gomatrixserverlib.HeaderedEvent
+ stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter)
+ if err != nil {
+ return
+ }
+ // TODO: When filters are added, we may need to call this multiple times to get enough events.
+ // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
+ var recentStreamEvents []types.StreamEvent
+ var limited bool
+ recentStreamEvents, limited, err = p.DB.RecentEvents(
+ ctx, roomID, r, numRecentEventsPerRoom, true, true,
+ )
+ if err != nil {
+ return
+ }
+
+ // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
+ // user shouldn't see, we check the recent events and remove any prior to the join event of the user
+ // which is equiv to history_visibility: joined
+ joinEventIndex := -1
+ for i := len(recentStreamEvents) - 1; i >= 0; i-- {
+ ev := recentStreamEvents[i]
+ if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
+ membership, _ := ev.Membership()
+ if membership == "join" {
+ joinEventIndex = i
+ if i > 0 {
+ // the create event happens before the first join, so we should cut it at that point instead
+ if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
+ joinEventIndex = i - 1
+ break
+ }
+ }
+ break
+ }
+ }
+ }
+ if joinEventIndex != -1 {
+ // cut all events earlier than the join (but not the join itself)
+ recentStreamEvents = recentStreamEvents[joinEventIndex:]
+ limited = false // so clients know not to try to backpaginate
+ }
+
+ // Retrieve the backward topology position, i.e. the position of the
+ // oldest event in the room's topology.
+ var prevBatch *types.TopologyToken
+ if len(recentStreamEvents) > 0 {
+ var backwardTopologyPos, backwardStreamPos types.StreamPosition
+ backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
+ if err != nil {
+ return
+ }
+ prevBatch = &types.TopologyToken{
+ Depth: backwardTopologyPos,
+ PDUPosition: backwardStreamPos,
+ }
+ prevBatch.Decrement()
+ }
+
+ // We don't include a device here as we don't need to send down
+ // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
+ // "Can sync a room with a message with a transaction id" - which does a complete sync to check.
+ recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
+ stateEvents = removeDuplicates(stateEvents, recentEvents)
+ jr = types.NewJoinResponse()
+ jr.Timeline.PrevBatch = prevBatch
+ jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
+ jr.Timeline.Limited = limited
+ jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
+ return jr, nil
+}
+
+func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
+ for _, recentEv := range recentEvents {
+ if recentEv.StateKey() == nil {
+ continue // not a state event
+ }
+ // TODO: This is a linear scan over all the current state events in this room. This will
+ // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
+ // then do a binary search to find matching events, similar to what roomserver does.
+ for j := 0; j < len(stateEvents); j++ {
+ if stateEvents[j].EventID() == recentEv.EventID() {
+ // overwrite the element to remove with the last element then pop the last element.
+ // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
+ // (we don't care about the order of stateEvents)
+ stateEvents[j] = stateEvents[len(stateEvents)-1]
+ stateEvents = stateEvents[:len(stateEvents)-1]
+ break // there shouldn't be multiple events with the same event ID
+ }
+ }
+ }
+ return stateEvents
+}
diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go
new file mode 100644
index 00000000..259d07bd
--- /dev/null
+++ b/syncapi/streams/stream_receipt.go
@@ -0,0 +1,91 @@
+package streams
+
+import (
+ "context"
+ "encoding/json"
+
+ eduAPI "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type ReceiptStreamProvider struct {
+ StreamProvider
+}
+
+func (p *ReceiptStreamProvider) Setup() {
+ p.StreamProvider.Setup()
+
+ id, err := p.DB.MaxStreamPositionForReceipts(context.Background())
+ if err != nil {
+ panic(err)
+ }
+ p.latest = id
+}
+
+func (p *ReceiptStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+}
+
+func (p *ReceiptStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) types.StreamPosition {
+ var joinedRooms []string
+ for roomID, membership := range req.Rooms {
+ if membership == gomatrixserverlib.Join {
+ joinedRooms = append(joinedRooms, roomID)
+ }
+ }
+
+ lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from)
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed")
+ return from
+ }
+
+ if len(receipts) == 0 || lastPos == 0 {
+ return to
+ }
+
+ // Group receipts by room, so we can create one ClientEvent for every room
+ receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
+ for _, receipt := range receipts {
+ receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
+ }
+
+ for roomID, receipts := range receiptsByRoom {
+ jr := req.Response.Rooms.Join[roomID]
+ var ok bool
+
+ ev := gomatrixserverlib.ClientEvent{
+ Type: gomatrixserverlib.MReceipt,
+ RoomID: roomID,
+ }
+ content := make(map[string]eduAPI.ReceiptMRead)
+ for _, receipt := range receipts {
+ var read eduAPI.ReceiptMRead
+ if read, ok = content[receipt.EventID]; !ok {
+ read = eduAPI.ReceiptMRead{
+ User: make(map[string]eduAPI.ReceiptTS),
+ }
+ }
+ read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
+ content[receipt.EventID] = read
+ }
+ ev.Content, err = json.Marshal(content)
+ if err != nil {
+ req.Log.WithError(err).Error("json.Marshal failed")
+ return from
+ }
+
+ jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
+ req.Response.Rooms.Join[roomID] = jr
+ }
+
+ return lastPos
+}
diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go
new file mode 100644
index 00000000..804f525d
--- /dev/null
+++ b/syncapi/streams/stream_sendtodevice.go
@@ -0,0 +1,51 @@
+package streams
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+type SendToDeviceStreamProvider struct {
+ StreamProvider
+}
+
+func (p *SendToDeviceStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+}
+
+func (p *SendToDeviceStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) types.StreamPosition {
+ // See if we have any new tasks to do for the send-to-device messaging.
+ lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since)
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed")
+ return from
+ }
+
+ // Before we return the sync response, make sure that we take action on
+ // any send-to-device database updates or deletions that we need to do.
+ // Then add the updates into the sync response.
+ if len(updates) > 0 || len(deletions) > 0 {
+ // Handle the updates and deletions in the database.
+ err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since)
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed")
+ return from
+ }
+ }
+ if len(events) > 0 {
+ // Add the updates into the sync response.
+ for _, event := range events {
+ req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
+ }
+ }
+
+ return lastPos
+}
diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go
new file mode 100644
index 00000000..60d5acf4
--- /dev/null
+++ b/syncapi/streams/stream_typing.go
@@ -0,0 +1,57 @@
+package streams
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type TypingStreamProvider struct {
+ StreamProvider
+ EDUCache *cache.EDUCache
+}
+
+func (p *TypingStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx))
+}
+
+func (p *TypingStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) types.StreamPosition {
+ var err error
+ for roomID, membership := range req.Rooms {
+ if membership != gomatrixserverlib.Join {
+ continue
+ }
+
+ jr := req.Response.Rooms.Join[roomID]
+
+ if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
+ roomID, int64(from),
+ ); updated {
+ ev := gomatrixserverlib.ClientEvent{
+ Type: gomatrixserverlib.MTyping,
+ }
+ ev.Content, err = json.Marshal(map[string]interface{}{
+ "user_ids": users,
+ })
+ if err != nil {
+ req.Log.WithError(err).Error("json.Marshal failed")
+ return from
+ }
+
+ jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev)
+ req.Response.Rooms.Join[roomID] = jr
+ }
+ }
+
+ return to
+}
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
new file mode 100644
index 00000000..ba4118df
--- /dev/null
+++ b/syncapi/streams/streams.go
@@ -0,0 +1,78 @@
+package streams
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/eduserver/cache"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ rsapi "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+)
+
+type Streams struct {
+ PDUStreamProvider types.StreamProvider
+ TypingStreamProvider types.StreamProvider
+ ReceiptStreamProvider types.StreamProvider
+ InviteStreamProvider types.StreamProvider
+ SendToDeviceStreamProvider types.StreamProvider
+ AccountDataStreamProvider types.StreamProvider
+ DeviceListStreamProvider types.PartitionedStreamProvider
+}
+
+func NewSyncStreamProviders(
+ d storage.Database, userAPI userapi.UserInternalAPI,
+ rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
+ eduCache *cache.EDUCache,
+) *Streams {
+ streams := &Streams{
+ PDUStreamProvider: &PDUStreamProvider{
+ StreamProvider: StreamProvider{DB: d},
+ },
+ TypingStreamProvider: &TypingStreamProvider{
+ StreamProvider: StreamProvider{DB: d},
+ EDUCache: eduCache,
+ },
+ ReceiptStreamProvider: &ReceiptStreamProvider{
+ StreamProvider: StreamProvider{DB: d},
+ },
+ InviteStreamProvider: &InviteStreamProvider{
+ StreamProvider: StreamProvider{DB: d},
+ },
+ SendToDeviceStreamProvider: &SendToDeviceStreamProvider{
+ StreamProvider: StreamProvider{DB: d},
+ },
+ AccountDataStreamProvider: &AccountDataStreamProvider{
+ StreamProvider: StreamProvider{DB: d},
+ userAPI: userAPI,
+ },
+ DeviceListStreamProvider: &DeviceListStreamProvider{
+ PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
+ rsAPI: rsAPI,
+ keyAPI: keyAPI,
+ },
+ }
+
+ streams.PDUStreamProvider.Setup()
+ streams.TypingStreamProvider.Setup()
+ streams.ReceiptStreamProvider.Setup()
+ streams.InviteStreamProvider.Setup()
+ streams.SendToDeviceStreamProvider.Setup()
+ streams.AccountDataStreamProvider.Setup()
+ streams.DeviceListStreamProvider.Setup()
+
+ return streams
+}
+
+func (s *Streams) Latest(ctx context.Context) types.StreamingToken {
+ return types.StreamingToken{
+ PDUPosition: s.PDUStreamProvider.LatestPosition(ctx),
+ TypingPosition: s.TypingStreamProvider.LatestPosition(ctx),
+ ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx),
+ InvitePosition: s.InviteStreamProvider.LatestPosition(ctx),
+ SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx),
+ AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx),
+ DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx),
+ }
+}
diff --git a/syncapi/streams/template_pstream.go b/syncapi/streams/template_pstream.go
new file mode 100644
index 00000000..265e22a2
--- /dev/null
+++ b/syncapi/streams/template_pstream.go
@@ -0,0 +1,38 @@
+package streams
+
+import (
+ "context"
+ "sync"
+
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+type PartitionedStreamProvider struct {
+ DB storage.Database
+ latest types.LogPosition
+ latestMutex sync.RWMutex
+}
+
+func (p *PartitionedStreamProvider) Setup() {
+}
+
+func (p *PartitionedStreamProvider) Advance(
+ latest types.LogPosition,
+) {
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ if latest.IsAfter(&p.latest) {
+ p.latest = latest
+ }
+}
+
+func (p *PartitionedStreamProvider) LatestPosition(
+ ctx context.Context,
+) types.LogPosition {
+ p.latestMutex.RLock()
+ defer p.latestMutex.RUnlock()
+
+ return p.latest
+}
diff --git a/syncapi/streams/template_stream.go b/syncapi/streams/template_stream.go
new file mode 100644
index 00000000..15074cc1
--- /dev/null
+++ b/syncapi/streams/template_stream.go
@@ -0,0 +1,38 @@
+package streams
+
+import (
+ "context"
+ "sync"
+
+ "github.com/matrix-org/dendrite/syncapi/storage"
+ "github.com/matrix-org/dendrite/syncapi/types"
+)
+
+type StreamProvider struct {
+ DB storage.Database
+ latest types.StreamPosition
+ latestMutex sync.RWMutex
+}
+
+func (p *StreamProvider) Setup() {
+}
+
+func (p *StreamProvider) Advance(
+ latest types.StreamPosition,
+) {
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ if latest > p.latest {
+ p.latest = latest
+ }
+}
+
+func (p *StreamProvider) LatestPosition(
+ ctx context.Context,
+) types.StreamPosition {
+ p.latestMutex.RLock()
+ defer p.latestMutex.RUnlock()
+
+ return p.latest
+}