aboutsummaryrefslogtreecommitdiff
path: root/syncapi/streams/stream_accountdata.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-08 16:59:06 +0000
committerGitHub <noreply@github.com>2021-01-08 16:59:06 +0000
commitb5a8935042dfb358f4176bc1ca46d0b8ebd62615 (patch)
treeb3b0409cc184f6392eb0230f2127d10d0752b4af /syncapi/streams/stream_accountdata.go
parent56a7839aedfdf849193bf25c0e2fcd8f4a0146d8 (diff)
Sync refactor — Part 1 (#1688)
* It's half-alive * Wakeups largely working * Other tweaks, typing works * Fix bugs, add receipt stream * Delete notifier, other tweaks * Dedupe a bit, add a template for the invite stream * Clean up, add templates for other streams * Don't leak channels * Bring forward some more PDU logic, clean up other places * Add some more wakeups * Use addRoomDeltaToResponse * Log tweaks, typing fixed? * Fix timed out syncs * Don't reset next batch position on timeout * Add account data stream/position * End of day * Fix complete sync for receipt, typing * Streams package * Clean up a bit * Complete sync send-to-device * Don't drop errors * More lightweight notifications * Fix typing positions * Don't advance position on remove again unless needed * Device list updates * Advance account data position * Use limit for incremental sync * Limit fixes, amongst other things * Remove some fmt.Println * Tweaks * Re-add notifier * Fix invite position * Fixes * Notify account data without advancing PDU position in notifier * Apply account data position * Get initial position for account data * Fix position update * Fix complete sync positions * Review comments @Kegsay * Room consumer parameters
Diffstat (limited to 'syncapi/streams/stream_accountdata.go')
-rw-r--r--syncapi/streams/stream_accountdata.go132
1 files changed, 132 insertions, 0 deletions
diff --git a/syncapi/streams/stream_accountdata.go b/syncapi/streams/stream_accountdata.go
new file mode 100644
index 00000000..aa7f0937
--- /dev/null
+++ b/syncapi/streams/stream_accountdata.go
@@ -0,0 +1,132 @@
+package streams
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+type AccountDataStreamProvider struct {
+ StreamProvider
+ userAPI userapi.UserInternalAPI
+}
+
+func (p *AccountDataStreamProvider) Setup() {
+ p.StreamProvider.Setup()
+
+ p.latestMutex.Lock()
+ defer p.latestMutex.Unlock()
+
+ id, err := p.DB.MaxStreamPositionForAccountData(context.Background())
+ if err != nil {
+ panic(err)
+ }
+ p.latest = id
+}
+
+func (p *AccountDataStreamProvider) CompleteSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+) types.StreamPosition {
+ dataReq := &userapi.QueryAccountDataRequest{
+ UserID: req.Device.UserID,
+ }
+ dataRes := &userapi.QueryAccountDataResponse{}
+ if err := p.userAPI.QueryAccountData(ctx, dataReq, dataRes); err != nil {
+ req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
+ return p.LatestPosition(ctx)
+ }
+ for datatype, databody := range dataRes.GlobalAccountData {
+ req.Response.AccountData.Events = append(
+ req.Response.AccountData.Events,
+ gomatrixserverlib.ClientEvent{
+ Type: datatype,
+ Content: gomatrixserverlib.RawJSON(databody),
+ },
+ )
+ }
+ for r, j := range req.Response.Rooms.Join {
+ for datatype, databody := range dataRes.RoomAccountData[r] {
+ j.AccountData.Events = append(
+ j.AccountData.Events,
+ gomatrixserverlib.ClientEvent{
+ Type: datatype,
+ Content: gomatrixserverlib.RawJSON(databody),
+ },
+ )
+ req.Response.Rooms.Join[r] = j
+ }
+ }
+
+ return p.LatestPosition(ctx)
+}
+
+func (p *AccountDataStreamProvider) IncrementalSync(
+ ctx context.Context,
+ req *types.SyncRequest,
+ from, to types.StreamPosition,
+) types.StreamPosition {
+ r := types.Range{
+ From: from,
+ To: to,
+ }
+ accountDataFilter := gomatrixserverlib.DefaultEventFilter() // TODO: use filter provided in req instead
+
+ dataTypes, err := p.DB.GetAccountDataInRange(
+ ctx, req.Device.UserID, r, &accountDataFilter,
+ )
+ if err != nil {
+ req.Log.WithError(err).Error("p.DB.GetAccountDataInRange failed")
+ return from
+ }
+
+ if len(dataTypes) == 0 {
+ // TODO: this fixes the sytest but is it the right thing to do?
+ dataTypes[""] = []string{"m.push_rules"}
+ }
+
+ // Iterate over the rooms
+ for roomID, dataTypes := range dataTypes {
+ // Request the missing data from the database
+ for _, dataType := range dataTypes {
+ dataReq := userapi.QueryAccountDataRequest{
+ UserID: req.Device.UserID,
+ RoomID: roomID,
+ DataType: dataType,
+ }
+ dataRes := userapi.QueryAccountDataResponse{}
+ err = p.userAPI.QueryAccountData(ctx, &dataReq, &dataRes)
+ if err != nil {
+ req.Log.WithError(err).Error("p.userAPI.QueryAccountData failed")
+ continue
+ }
+ if roomID == "" {
+ if globalData, ok := dataRes.GlobalAccountData[dataType]; ok {
+ req.Response.AccountData.Events = append(
+ req.Response.AccountData.Events,
+ gomatrixserverlib.ClientEvent{
+ Type: dataType,
+ Content: gomatrixserverlib.RawJSON(globalData),
+ },
+ )
+ }
+ } else {
+ if roomData, ok := dataRes.RoomAccountData[roomID][dataType]; ok {
+ joinData := req.Response.Rooms.Join[roomID]
+ joinData.AccountData.Events = append(
+ joinData.AccountData.Events,
+ gomatrixserverlib.ClientEvent{
+ Type: dataType,
+ Content: gomatrixserverlib.RawJSON(roomData),
+ },
+ )
+ req.Response.Rooms.Join[roomID] = joinData
+ }
+ }
+ }
+ }
+
+ return to
+}