diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2021-01-08 16:59:06 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-08 16:59:06 +0000 |
commit | b5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch) | |
tree | b3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/streams | |
parent | 56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff) |
Sync refactor — Part 1 (#1688)
* It's half-alive
* Wakeups largely working
* Other tweaks, typing works
* Fix bugs, add receipt stream
* Delete notifier, other tweaks
* Dedupe a bit, add a template for the invite stream
* Clean up, add templates for other streams
* Don't leak channels
* Bring forward some more PDU logic, clean up other places
* Add some more wakeups
* Use addRoomDeltaToResponse
* Log tweaks, typing fixed?
* Fix timed out syncs
* Don't reset next batch position on timeout
* Add account data stream/position
* End of day
* Fix complete sync for receipt, typing
* Streams package
* Clean up a bit
* Complete sync send-to-device
* Don't drop errors
* More lightweight notifications
* Fix typing positions
* Don't advance position on remove again unless needed
* Device list updates
* Advance account data position
* Use limit for incremental sync
* Limit fixes, amongst other things
* Remove some fmt.Println
* Tweaks
* Re-add notifier
* Fix invite position
* Fixes
* Notify account data without advancing PDU position in notifier
* Apply account data position
* Get initial position for account data
* Fix position update
* Fix complete sync positions
* Review comments @Kegsay
* Room consumer parameters
Diffstat (limited to 'syncapi/streams')
-rw-r--r-- | syncapi/streams/stream_accountdata.go | 132 | ||||
-rw-r--r-- | syncapi/streams/stream_devicelist.go | 43 | ||||
-rw-r--r-- | syncapi/streams/stream_invite.go | 64 | ||||
-rw-r--r-- | syncapi/streams/stream_pdu.go | 305 | ||||
-rw-r--r-- | syncapi/streams/stream_receipt.go | 91 | ||||
-rw-r--r-- | syncapi/streams/stream_sendtodevice.go | 51 | ||||
-rw-r--r-- | syncapi/streams/stream_typing.go | 57 | ||||
-rw-r--r-- | syncapi/streams/streams.go | 78 | ||||
-rw-r--r-- | syncapi/streams/template_pstream.go | 38 | ||||
-rw-r--r-- | syncapi/streams/template_stream.go | 38 |
10 files changed, 897 insertions, 0 deletions
diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go new file mode 100644 index 00000000..aa7f0937 --- /dev/null +++ b/syncapi/streams/stream_accountdata.go @@ -0,0 +1,132 @@ +package streams + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +type AccountDataStreamProvider struct { + StreamProvider + userAPI userapi.UserInternalAPI +} + +func (p *AccountDataStreamProvider) Setup() { + p.StreamProvider.Setup() + + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := p.DB.MaxStreamPositionForAccountData(context.Background()) + if err != nil { + panic(err) + } + p.latest = id +} + +func (p *AccountDataStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + dataReq := &userapi.QueryAccountDataRequest{ + UserID: req.Device.UserID, + } + dataRes := &userapi.QueryAccountDataResponse{} + if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil { + req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed") + return p.LatestPosition(ctx) + } + for datatype, databody := range dataRes.GlobalAccountData { + req.Response.AccountData.Events = append( + req.Response.AccountData.Events, + gomatrixserverlib.ClientEvent{ + Type: datatype, + Content: gomatrixserverlib.RawJSON(databody), + }, + ) + } + for r, j := range req.Response.Rooms.Join { + for datatype, databody := range dataRes.RoomAccountData[r] { + j.AccountData.Events = append( + j.AccountData.Events, + gomatrixserverlib.ClientEvent{ + Type: datatype, + Content: gomatrixserverlib.RawJSON(databody), + }, + ) + req.Response.Rooms.Join[r] = j + } + } + + return p.LatestPosition(ctx) +} + +func (p *AccountDataStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + r := types.Range{ + From: from, + To: to, + } + accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead + + dataTypes, err := p.DB.GetAccountDataInRange( + ctx, req.Device.UserID, r, &accountDataFilter, + ) + if err != nil { + req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed") + return from + } + + if len(dataTypes) == 0 { + // TODO: this fixes the sytest but is it the right thing to do? + dataTypes[""] = []string{"m.push_rules"} + } + + // Iterate over the rooms + for roomID, dataTypes := range dataTypes { + // Request the missing data from the database + for _, dataType := range dataTypes { + dataReq := userapi.QueryAccountDataRequest{ + UserID: req.Device.UserID, + RoomID: roomID, + DataType: dataType, + } + dataRes := userapi.QueryAccountDataResponse{} + err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes) + if err != nil { + req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed") + continue + } + if roomID == "" { + if globalData, ok := dataRes.GlobalAccountData[dataType]; ok { + req.Response.AccountData.Events = append( + req.Response.AccountData.Events, + gomatrixserverlib.ClientEvent{ + Type: dataType, + Content: gomatrixserverlib.RawJSON(globalData), + }, + ) + } + } else { + if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok { + joinData := req.Response.Rooms.Join[roomID] + joinData.AccountData.Events = append( + joinData.AccountData.Events, + gomatrixserverlib.ClientEvent{ + Type: dataType, + Content: gomatrixserverlib.RawJSON(roomData), + }, + ) + req.Response.Rooms.Join[roomID] = joinData + } + } + } + } + + return to +} diff --git a/syncapi/streams/stream_devicelist.go b/syncapi/streams/stream_devicelist.go new file mode 100644 index 00000000..c43d50a4 --- /dev/null +++ b/syncapi/streams/stream_devicelist.go @@ -0,0 +1,43 @@ +package streams + +import ( + "context" + + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" + "github.com/matrix-org/dendrite/syncapi/types" +) + +type DeviceListStreamProvider struct { + PartitionedStreamProvider + rsAPI api.RoomserverInternalAPI + keyAPI keyapi.KeyInternalAPI +} + +func (p *DeviceListStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.LogPosition { + return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx)) +} + +func (p *DeviceListStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.LogPosition, +) types.LogPosition { + var err error + to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to) + if err != nil { + req.Log.WithError(err).Error("internal.DeviceListCatchup failed") + return from + } + err = internal.DeviceOTKCounts(req.Context, p.keyAPI, req.Device.UserID, req.Device.ID, req.Response) + if err != nil { + req.Log.WithError(err).Error("internal.DeviceListCatchup failed") + return from + } + + return to +} diff --git a/syncapi/streams/stream_invite.go b/syncapi/streams/stream_invite.go new file mode 100644 index 00000000..10a0dda8 --- /dev/null +++ b/syncapi/streams/stream_invite.go @@ -0,0 +1,64 @@ +package streams + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type InviteStreamProvider struct { + StreamProvider +} + +func (p *InviteStreamProvider) Setup() { + p.StreamProvider.Setup() + + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := p.DB.MaxStreamPositionForInvites(context.Background()) + if err != nil { + panic(err) + } + p.latest = id +} + +func (p *InviteStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +func (p *InviteStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + r := types.Range{ + From: from, + To: to, + } + + invites, retiredInvites, err := p.DB.InviteEventsInRange( + ctx, req.Device.UserID, r, + ) + if err != nil { + req.Log.WithError(err).Error("p.DB.InviteEventsInRange failed") + return from + } + + for roomID, inviteEvent := range invites { + ir := types.NewInviteResponse(inviteEvent) + req.Response.Rooms.Invite[roomID] = *ir + } + + for roomID := range retiredInvites { + if _, ok := req.Response.Rooms.Join[roomID]; !ok { + lr := types.NewLeaveResponse() + req.Response.Rooms.Leave[roomID] = *lr + } + } + + return to +} diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go new file mode 100644 index 00000000..016c182e --- /dev/null +++ b/syncapi/streams/stream_pdu.go @@ -0,0 +1,305 @@ +package streams + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +type PDUStreamProvider struct { + StreamProvider +} + +func (p *PDUStreamProvider) Setup() { + p.StreamProvider.Setup() + + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + id, err := p.DB.MaxStreamPositionForPDUs(context.Background()) + if err != nil { + panic(err) + } + p.latest = id +} + +func (p *PDUStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + from := types.StreamPosition(0) + to := p.LatestPosition(ctx) + + // Get the current sync position which we will base the sync response on. + // For complete syncs, we want to start at the most recent events and work + // backwards, so that we show the most recent events in the room. + r := types.Range{ + From: to, + To: 0, + Backwards: true, + } + + // Extract room state and recent events for all rooms the user is joined to. + joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join) + if err != nil { + req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed") + return from + } + + stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request + + // Build up a /sync response. Add joined rooms. + for _, roomID := range joinedRoomIDs { + var jr *types.JoinResponse + jr, err = p.getJoinResponseForCompleteSync( + ctx, roomID, r, &stateFilter, req.Limit, req.Device, + ) + if err != nil { + req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") + return from + } + req.Response.Rooms.Join[roomID] = *jr + req.Rooms[roomID] = gomatrixserverlib.Join + } + + // Add peeked rooms. + peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r) + if err != nil { + req.Log.WithError(err).Error("p.DB.PeeksInRange failed") + return from + } + for _, peek := range peeks { + if !peek.Deleted { + var jr *types.JoinResponse + jr, err = p.getJoinResponseForCompleteSync( + ctx, peek.RoomID, r, &stateFilter, req.Limit, req.Device, + ) + if err != nil { + req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") + return from + } + req.Response.Rooms.Peek[peek.RoomID] = *jr + } + } + + return to +} + +// nolint:gocyclo +func (p *PDUStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) (newPos types.StreamPosition) { + r := types.Range{ + From: from, + To: to, + Backwards: from > to, + } + newPos = to + + var err error + var stateDeltas []types.StateDelta + var joinedRooms []string + + // TODO: use filter provided in request + stateFilter := gomatrixserverlib.DefaultStateFilter() + + if req.WantFullState { + if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed") + return + } + } else { + if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") + return + } + } + + for _, roomID := range joinedRooms { + req.Rooms[roomID] = gomatrixserverlib.Join + } + + for _, delta := range stateDeltas { + if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil { + req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed") + return newPos + } + } + + return r.To +} + +func (p *PDUStreamProvider) addRoomDeltaToResponse( + ctx context.Context, + device *userapi.Device, + r types.Range, + delta types.StateDelta, + numRecentEventsPerRoom int, + res *types.Response, +) error { + if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave { + // make sure we don't leak recent events after the leave event. + // TODO: History visibility makes this somewhat complex to handle correctly. For example: + // TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join). + // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave + // in a single /sync request + // This is all "okay" assuming history_visibility == "shared" which it is by default. + r.To = delta.MembershipPos + } + recentStreamEvents, limited, err := p.DB.RecentEvents( + ctx, delta.RoomID, r, + numRecentEventsPerRoom, true, true, + ) + if err != nil { + return err + } + recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) + delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back + prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) + if err != nil { + return err + } + + // XXX: should we ever get this far if we have no recent events or state in this room? + // in practice we do for peeks, but possibly not joins? + if len(recentEvents) == 0 && len(delta.StateEvents) == 0 { + return nil + } + + switch delta.Membership { + case gomatrixserverlib.Join: + jr := types.NewJoinResponse() + + jr.Timeline.PrevBatch = &prevBatch + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Join[delta.RoomID] = *jr + case gomatrixserverlib.Peek: + jr := types.NewJoinResponse() + + jr.Timeline.PrevBatch = &prevBatch + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Peek[delta.RoomID] = *jr + case gomatrixserverlib.Leave: + fallthrough // transitions to leave are the same as ban + case gomatrixserverlib.Ban: + // TODO: recentEvents may contain events that this user is not allowed to see because they are + // no longer in the room. + lr := types.NewLeaveResponse() + lr.Timeline.PrevBatch = &prevBatch + lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync) + res.Rooms.Leave[delta.RoomID] = *lr + } + + return nil +} + +func (p *PDUStreamProvider) getJoinResponseForCompleteSync( + ctx context.Context, + roomID string, + r types.Range, + stateFilter *gomatrixserverlib.StateFilter, + numRecentEventsPerRoom int, device *userapi.Device, +) (jr *types.JoinResponse, err error) { + var stateEvents []*gomatrixserverlib.HeaderedEvent + stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter) + if err != nil { + return + } + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + var recentStreamEvents []types.StreamEvent + var limited bool + recentStreamEvents, limited, err = p.DB.RecentEvents( + ctx, roomID, r, numRecentEventsPerRoom, true, true, + ) + if err != nil { + return + } + + // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the + // user shouldn't see, we check the recent events and remove any prior to the join event of the user + // which is equiv to history_visibility: joined + joinEventIndex := -1 + for i := len(recentStreamEvents) - 1; i >= 0; i-- { + ev := recentStreamEvents[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { + membership, _ := ev.Membership() + if membership == "join" { + joinEventIndex = i + if i > 0 { + // the create event happens before the first join, so we should cut it at that point instead + if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { + joinEventIndex = i - 1 + break + } + } + break + } + } + } + if joinEventIndex != -1 { + // cut all events earlier than the join (but not the join itself) + recentStreamEvents = recentStreamEvents[joinEventIndex:] + limited = false // so clients know not to try to backpaginate + } + + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var prevBatch *types.TopologyToken + if len(recentStreamEvents) > 0 { + var backwardTopologyPos, backwardStreamPos types.StreamPosition + backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID()) + if err != nil { + return + } + prevBatch = &types.TopologyToken{ + Depth: backwardTopologyPos, + PDUPosition: backwardStreamPos, + } + prevBatch.Decrement() + } + + // We don't include a device here as we don't need to send down + // transaction IDs for complete syncs, but we do it anyway because Sytest demands it for: + // "Can sync a room with a message with a transaction id" - which does a complete sync to check. + recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) + stateEvents = removeDuplicates(stateEvents, recentEvents) + jr = types.NewJoinResponse() + jr.Timeline.PrevBatch = prevBatch + jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.Limited = limited + jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync) + return jr, nil +} + +func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent { + for _, recentEv := range recentEvents { + if recentEv.StateKey() == nil { + continue // not a state event + } + // TODO: This is a linear scan over all the current state events in this room. This will + // be slow for big rooms. We should instead sort the state events by event ID (ORDER BY) + // then do a binary search to find matching events, similar to what roomserver does. + for j := 0; j < len(stateEvents); j++ { + if stateEvents[j].EventID() == recentEv.EventID() { + // overwrite the element to remove with the last element then pop the last element. + // This is orders of magnitude faster than re-slicing, but doesn't preserve ordering + // (we don't care about the order of stateEvents) + stateEvents[j] = stateEvents[len(stateEvents)-1] + stateEvents = stateEvents[:len(stateEvents)-1] + break // there shouldn't be multiple events with the same event ID + } + } + } + return stateEvents +} diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go new file mode 100644 index 00000000..259d07bd --- /dev/null +++ b/syncapi/streams/stream_receipt.go @@ -0,0 +1,91 @@ +package streams + +import ( + "context" + "encoding/json" + + eduAPI "github.com/matrix-org/dendrite/eduserver/api" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +type ReceiptStreamProvider struct { + StreamProvider +} + +func (p *ReceiptStreamProvider) Setup() { + p.StreamProvider.Setup() + + id, err := p.DB.MaxStreamPositionForReceipts(context.Background()) + if err != nil { + panic(err) + } + p.latest = id +} + +func (p *ReceiptStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +func (p *ReceiptStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + var joinedRooms []string + for roomID, membership := range req.Rooms { + if membership == gomatrixserverlib.Join { + joinedRooms = append(joinedRooms, roomID) + } + } + + lastPos, receipts, err := p.DB.RoomReceiptsAfter(ctx, joinedRooms, from) + if err != nil { + req.Log.WithError(err).Error("p.DB.RoomReceiptsAfter failed") + return from + } + + if len(receipts) == 0 || lastPos == 0 { + return to + } + + // Group receipts by room, so we can create one ClientEvent for every room + receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent) + for _, receipt := range receipts { + receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt) + } + + for roomID, receipts := range receiptsByRoom { + jr := req.Response.Rooms.Join[roomID] + var ok bool + + ev := gomatrixserverlib.ClientEvent{ + Type: gomatrixserverlib.MReceipt, + RoomID: roomID, + } + content := make(map[string]eduAPI.ReceiptMRead) + for _, receipt := range receipts { + var read eduAPI.ReceiptMRead + if read, ok = content[receipt.EventID]; !ok { + read = eduAPI.ReceiptMRead{ + User: make(map[string]eduAPI.ReceiptTS), + } + } + read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp} + content[receipt.EventID] = read + } + ev.Content, err = json.Marshal(content) + if err != nil { + req.Log.WithError(err).Error("json.Marshal failed") + return from + } + + jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + req.Response.Rooms.Join[roomID] = jr + } + + return lastPos +} diff --git a/syncapi/streams/stream_sendtodevice.go b/syncapi/streams/stream_sendtodevice.go new file mode 100644 index 00000000..804f525d --- /dev/null +++ b/syncapi/streams/stream_sendtodevice.go @@ -0,0 +1,51 @@ +package streams + +import ( + "context" + + "github.com/matrix-org/dendrite/syncapi/types" +) + +type SendToDeviceStreamProvider struct { + StreamProvider +} + +func (p *SendToDeviceStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +func (p *SendToDeviceStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + // See if we have any new tasks to do for the send-to-device messaging. + lastPos, events, updates, deletions, err := p.DB.SendToDeviceUpdatesForSync(req.Context, req.Device.UserID, req.Device.ID, req.Since) + if err != nil { + req.Log.WithError(err).Error("p.DB.SendToDeviceUpdatesForSync failed") + return from + } + + // Before we return the sync response, make sure that we take action on + // any send-to-device database updates or deletions that we need to do. + // Then add the updates into the sync response. + if len(updates) > 0 || len(deletions) > 0 { + // Handle the updates and deletions in the database. + err = p.DB.CleanSendToDeviceUpdates(context.Background(), updates, deletions, req.Since) + if err != nil { + req.Log.WithError(err).Error("p.DB.CleanSendToDeviceUpdates failed") + return from + } + } + if len(events) > 0 { + // Add the updates into the sync response. + for _, event := range events { + req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent) + } + } + + return lastPos +} diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go new file mode 100644 index 00000000..60d5acf4 --- /dev/null +++ b/syncapi/streams/stream_typing.go @@ -0,0 +1,57 @@ +package streams + +import ( + "context" + "encoding/json" + + "github.com/matrix-org/dendrite/eduserver/cache" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +type TypingStreamProvider struct { + StreamProvider + EDUCache *cache.EDUCache +} + +func (p *TypingStreamProvider) CompleteSync( + ctx context.Context, + req *types.SyncRequest, +) types.StreamPosition { + return p.IncrementalSync(ctx, req, 0, p.LatestPosition(ctx)) +} + +func (p *TypingStreamProvider) IncrementalSync( + ctx context.Context, + req *types.SyncRequest, + from, to types.StreamPosition, +) types.StreamPosition { + var err error + for roomID, membership := range req.Rooms { + if membership != gomatrixserverlib.Join { + continue + } + + jr := req.Response.Rooms.Join[roomID] + + if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter( + roomID, int64(from), + ); updated { + ev := gomatrixserverlib.ClientEvent{ + Type: gomatrixserverlib.MTyping, + } + ev.Content, err = json.Marshal(map[string]interface{}{ + "user_ids": users, + }) + if err != nil { + req.Log.WithError(err).Error("json.Marshal failed") + return from + } + + jr.Ephemeral.Events = append(jr.Ephemeral.Events, ev) + req.Response.Rooms.Join[roomID] = jr + } + } + + return to +} diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go new file mode 100644 index 00000000..ba4118df --- /dev/null +++ b/syncapi/streams/streams.go @@ -0,0 +1,78 @@ +package streams + +import ( + "context" + + "github.com/matrix-org/dendrite/eduserver/cache" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" +) + +type Streams struct { + PDUStreamProvider types.StreamProvider + TypingStreamProvider types.StreamProvider + ReceiptStreamProvider types.StreamProvider + InviteStreamProvider types.StreamProvider + SendToDeviceStreamProvider types.StreamProvider + AccountDataStreamProvider types.StreamProvider + DeviceListStreamProvider types.PartitionedStreamProvider +} + +func NewSyncStreamProviders( + d storage.Database, userAPI userapi.UserInternalAPI, + rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI, + eduCache *cache.EDUCache, +) *Streams { + streams := &Streams{ + PDUStreamProvider: &PDUStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, + TypingStreamProvider: &TypingStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + EDUCache: eduCache, + }, + ReceiptStreamProvider: &ReceiptStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, + InviteStreamProvider: &InviteStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, + SendToDeviceStreamProvider: &SendToDeviceStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + }, + AccountDataStreamProvider: &AccountDataStreamProvider{ + StreamProvider: StreamProvider{DB: d}, + userAPI: userAPI, + }, + DeviceListStreamProvider: &DeviceListStreamProvider{ + PartitionedStreamProvider: PartitionedStreamProvider{DB: d}, + rsAPI: rsAPI, + keyAPI: keyAPI, + }, + } + + streams.PDUStreamProvider.Setup() + streams.TypingStreamProvider.Setup() + streams.ReceiptStreamProvider.Setup() + streams.InviteStreamProvider.Setup() + streams.SendToDeviceStreamProvider.Setup() + streams.AccountDataStreamProvider.Setup() + streams.DeviceListStreamProvider.Setup() + + return streams +} + +func (s *Streams) Latest(ctx context.Context) types.StreamingToken { + return types.StreamingToken{ + PDUPosition: s.PDUStreamProvider.LatestPosition(ctx), + TypingPosition: s.TypingStreamProvider.LatestPosition(ctx), + ReceiptPosition: s.PDUStreamProvider.LatestPosition(ctx), + InvitePosition: s.InviteStreamProvider.LatestPosition(ctx), + SendToDevicePosition: s.SendToDeviceStreamProvider.LatestPosition(ctx), + AccountDataPosition: s.AccountDataStreamProvider.LatestPosition(ctx), + DeviceListPosition: s.DeviceListStreamProvider.LatestPosition(ctx), + } +} diff --git a/syncapi/streams/template_pstream.go b/syncapi/streams/template_pstream.go new file mode 100644 index 00000000..265e22a2 --- /dev/null +++ b/syncapi/streams/template_pstream.go @@ -0,0 +1,38 @@ +package streams + +import ( + "context" + "sync" + + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" +) + +type PartitionedStreamProvider struct { + DB storage.Database + latest types.LogPosition + latestMutex sync.RWMutex +} + +func (p *PartitionedStreamProvider) Setup() { +} + +func (p *PartitionedStreamProvider) Advance( + latest types.LogPosition, +) { + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + if latest.IsAfter(&p.latest) { + p.latest = latest + } +} + +func (p *PartitionedStreamProvider) LatestPosition( + ctx context.Context, +) types.LogPosition { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + + return p.latest +} diff --git a/syncapi/streams/template_stream.go b/syncapi/streams/template_stream.go new file mode 100644 index 00000000..15074cc1 --- /dev/null +++ b/syncapi/streams/template_stream.go @@ -0,0 +1,38 @@ +package streams + +import ( + "context" + "sync" + + "github.com/matrix-org/dendrite/syncapi/storage" + "github.com/matrix-org/dendrite/syncapi/types" +) + +type StreamProvider struct { + DB storage.Database + latest types.StreamPosition + latestMutex sync.RWMutex +} + +func (p *StreamProvider) Setup() { +} + +func (p *StreamProvider) Advance( + latest types.StreamPosition, +) { + p.latestMutex.Lock() + defer p.latestMutex.Unlock() + + if latest > p.latest { + p.latest = latest + } +} + +func (p *StreamProvider) LatestPosition( + ctx context.Context, +) types.StreamPosition { + p.latestMutex.RLock() + defer p.latestMutex.RUnlock() + + return p.latest +} |