aboutsummaryrefslogtreecommitdiff
path: root/userapi/consumers/syncapi_readupdate.go
blob: 067f93330e3567c76f25910036fbd5a90f8bdada (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package consumers

import (
	"context"
	"encoding/json"

	"github.com/matrix-org/dendrite/internal/pushgateway"
	"github.com/matrix-org/dendrite/setup/config"
	"github.com/matrix-org/dendrite/setup/jetstream"
	"github.com/matrix-org/dendrite/setup/process"
	"github.com/matrix-org/dendrite/syncapi/types"
	uapi "github.com/matrix-org/dendrite/userapi/api"
	"github.com/matrix-org/dendrite/userapi/producers"
	"github.com/matrix-org/dendrite/userapi/storage"
	"github.com/matrix-org/dendrite/userapi/util"
	"github.com/matrix-org/gomatrixserverlib"
	"github.com/nats-io/nats.go"
	log "github.com/sirupsen/logrus"
)

type OutputReadUpdateConsumer struct {
	ctx          context.Context
	cfg          *config.UserAPI
	jetstream    nats.JetStreamContext
	durable      string
	db           storage.Database
	pgClient     pushgateway.Client
	ServerName   gomatrixserverlib.ServerName
	topic        string
	userAPI      uapi.UserInternalAPI
	syncProducer *producers.SyncAPI
}

func NewOutputReadUpdateConsumer(
	process *process.ProcessContext,
	cfg *config.UserAPI,
	js nats.JetStreamContext,
	store storage.Database,
	pgClient pushgateway.Client,
	userAPI uapi.UserInternalAPI,
	syncProducer *producers.SyncAPI,
) *OutputReadUpdateConsumer {
	return &OutputReadUpdateConsumer{
		ctx:          process.Context(),
		cfg:          cfg,
		jetstream:    js,
		db:           store,
		ServerName:   cfg.Matrix.ServerName,
		durable:      cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
		topic:        cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
		pgClient:     pgClient,
		userAPI:      userAPI,
		syncProducer: syncProducer,
	}
}

func (s *OutputReadUpdateConsumer) Start() error {
	if err := jetstream.JetStreamConsumer(
		s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
		nats.DeliverAll(), nats.ManualAck(),
	); err != nil {
		return err
	}
	return nil
}

func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
	var read types.ReadUpdate
	if err := json.Unmarshal(msg.Data, &read); err != nil {
		log.WithError(err).Error("userapi clientapi consumer: message parse failure")
		return true
	}
	if read.FullyRead == 0 && read.Read == 0 {
		return true
	}

	userID := string(msg.Header.Get(jetstream.UserID))
	roomID := string(msg.Header.Get(jetstream.RoomID))

	localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
	if err != nil {
		log.WithError(err).Error("userapi clientapi consumer: SplitID failure")
		return true
	}
	if domain != s.ServerName {
		log.Error("userapi clientapi consumer: not a local user")
		return true
	}

	log := log.WithFields(log.Fields{
		"room_id": roomID,
		"user_id": userID,
	})
	log.Tracef("Received read update from sync API: %#v", read)

	if read.Read > 0 {
		updated, err := s.db.SetNotificationsRead(ctx, localpart, roomID, int64(read.Read), true)
		if err != nil {
			log.WithError(err).Error("userapi EDU consumer")
			return false
		}

		if updated {
			if err = s.syncProducer.GetAndSendNotificationData(ctx, userID, roomID); err != nil {
				log.WithError(err).Error("userapi EDU consumer: GetAndSendNotificationData failed")
				return false
			}
			if err = util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
				log.WithError(err).Error("userapi EDU consumer: NotifyUserCounts failed")
				return false
			}
		}
	}

	if read.FullyRead > 0 {
		deleted, err := s.db.DeleteNotificationsUpTo(ctx, localpart, roomID, int64(read.FullyRead))
		if err != nil {
			log.WithError(err).Errorf("userapi clientapi consumer: DeleteNotificationsUpTo failed")
			return false
		}

		if deleted {
			if err := util.NotifyUserCountsAsync(ctx, s.pgClient, localpart, s.db); err != nil {
				log.WithError(err).Error("userapi clientapi consumer: NotifyUserCounts failed")
				return false
			}

			if err := s.syncProducer.GetAndSendNotificationData(ctx, userID, read.RoomID); err != nil {
				log.WithError(err).Errorf("userapi clientapi consumer: GetAndSendNotificationData failed")
				return false
			}
		}
	}

	return true
}