aboutsummaryrefslogtreecommitdiff
path: root/userapi/producers/syncapi.go
blob: 51eaa9856d393ac07a26bb0d085918731b22fa0d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package producers

import (
	"context"
	"encoding/json"

	"github.com/matrix-org/gomatrixserverlib"
	"github.com/nats-io/nats.go"
	log "github.com/sirupsen/logrus"

	"github.com/matrix-org/dendrite/internal/eventutil"
	"github.com/matrix-org/dendrite/setup/jetstream"
	"github.com/matrix-org/dendrite/userapi/storage"
)

type JetStreamPublisher interface {
	PublishMsg(*nats.Msg, ...nats.PubOpt) (*nats.PubAck, error)
}

// SyncAPI produces messages for the Sync API server to consume.
type SyncAPI struct {
	db                    storage.Database
	producer              JetStreamPublisher
	clientDataTopic       string
	notificationDataTopic string
}

func NewSyncAPI(db storage.Database, js JetStreamPublisher, clientDataTopic string, notificationDataTopic string) *SyncAPI {
	return &SyncAPI{
		db:                    db,
		producer:              js,
		clientDataTopic:       clientDataTopic,
		notificationDataTopic: notificationDataTopic,
	}
}

// SendAccountData sends account data to the Sync API server.
func (p *SyncAPI) SendAccountData(userID string, data eventutil.AccountData) error {
	m := &nats.Msg{
		Subject: p.clientDataTopic,
		Header:  nats.Header{},
	}
	m.Header.Set(jetstream.UserID, userID)

	var err error
	m.Data, err = json.Marshal(data)
	if err != nil {
		return err
	}

	log.WithFields(log.Fields{
		"user_id":   userID,
		"room_id":   data.RoomID,
		"data_type": data.Type,
	}).Tracef("Producing to topic '%s'", p.clientDataTopic)

	_, err = p.producer.PublishMsg(m)
	return err
}

// GetAndSendNotificationData reads the database and sends data about unread
// notifications to the Sync API server.
func (p *SyncAPI) GetAndSendNotificationData(ctx context.Context, userID, roomID string) error {
	localpart, domain, err := gomatrixserverlib.SplitID('@', userID)
	if err != nil {
		return err
	}

	ntotal, nhighlight, err := p.db.GetRoomNotificationCounts(ctx, localpart, domain, roomID)
	if err != nil {
		return err
	}

	return p.sendNotificationData(userID, &eventutil.NotificationData{
		RoomID:                  roomID,
		UnreadHighlightCount:    int(nhighlight),
		UnreadNotificationCount: int(ntotal),
	})
}

// sendNotificationData sends data about unread notifications to the Sync API server.
func (p *SyncAPI) sendNotificationData(userID string, data *eventutil.NotificationData) error {
	m := &nats.Msg{
		Subject: p.notificationDataTopic,
		Header:  nats.Header{},
	}
	m.Header.Set(jetstream.UserID, userID)

	var err error
	m.Data, err = json.Marshal(data)
	if err != nil {
		return err
	}

	log.WithFields(log.Fields{
		"user_id": userID,
		"room_id": data.RoomID,
	}).Tracef("Producing to topic '%s'", p.clientDataTopic)

	_, err = p.producer.PublishMsg(m)
	return err
}