aboutsummaryrefslogtreecommitdiff
path: root/clientapi/producers
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-03-29 14:14:35 +0200
committerGitHub <noreply@github.com>2022-03-29 14:14:35 +0200
commit49dc49b232432d52c082646cc6f778593f4cb8b4 (patch)
treee809deedcae2127e930b0d382c863561b61fd9d2 /clientapi/producers
parent7972915806348847ecd9a9b8a1b1ff0609cb883c (diff)
Remove eduserver (#2306)
* Move receipt sending to own JetStream producer * Move SendToDevice to producer * Remove most parts of the EDU server * Fix SendToDevice & copyrights * Move structs, cleanup EDU Server traces * Use HeadersOnly subscription * Missing file * Fix linter issues * Move consumers to own files * Rename durable consumer; Consumer cleanup * Docs/config cleanup
Diffstat (limited to 'clientapi/producers')
-rw-r--r--clientapi/producers/syncapi.go124
1 files changed, 120 insertions, 4 deletions
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 9ab90391..2dee04e3 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -15,24 +15,34 @@
package producers
import (
+ "context"
"encoding/json"
+ "strconv"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct {
- Topic string
- JetStream nats.JetStreamContext
+ TopicClientData string
+ TopicReceiptEvent string
+ TopicSendToDeviceEvent string
+ TopicTypingEvent string
+ JetStream nats.JetStreamContext
+ ServerName gomatrixserverlib.ServerName
+ UserAPI userapi.UserInternalAPI
}
// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error {
m := &nats.Msg{
- Subject: p.Topic,
+ Subject: p.TopicClientData,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
@@ -52,8 +62,114 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
- }).Tracef("Producing to topic '%s'", p.Topic)
+ }).Tracef("Producing to topic '%s'", p.TopicClientData)
_, err = p.JetStream.PublishMsg(m)
return err
}
+
+func (p *SyncAPIProducer) SendReceipt(
+ ctx context.Context,
+ userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp,
+) error {
+ m := &nats.Msg{
+ Subject: p.TopicReceiptEvent,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set(jetstream.RoomID, roomID)
+ m.Header.Set(jetstream.EventID, eventID)
+ m.Header.Set("type", receiptType)
+ m.Header.Set("timestamp", strconv.Itoa(int(timestamp)))
+
+ log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}
+
+func (p *SyncAPIProducer) SendToDevice(
+ ctx context.Context, sender, userID, deviceID, eventType string,
+ message interface{},
+) error {
+ devices := []string{}
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ return err
+ }
+
+ // If the event is targeted locally then we want to expand the wildcard
+ // out into individual device IDs so that we can send them to each respective
+ // device. If the event isn't targeted locally then we can't expand the
+ // wildcard as we don't know about the remote devices, so instead we leave it
+ // as-is, so that the federation sender can send it on with the wildcard intact.
+ if domain == p.ServerName && deviceID == "*" {
+ var res userapi.QueryDevicesResponse
+ err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{
+ UserID: userID,
+ }, &res)
+ if err != nil {
+ return err
+ }
+ for _, dev := range res.Devices {
+ devices = append(devices, dev.ID)
+ }
+ } else {
+ devices = append(devices, deviceID)
+ }
+
+ js, err := json.Marshal(message)
+ if err != nil {
+ return err
+ }
+
+ log.WithFields(log.Fields{
+ "user_id": userID,
+ "num_devices": len(devices),
+ "type": eventType,
+ }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
+ for _, device := range devices {
+ ote := &types.OutputSendToDeviceEvent{
+ UserID: userID,
+ DeviceID: device,
+ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
+ Sender: sender,
+ Type: eventType,
+ Content: js,
+ },
+ }
+
+ eventJSON, err := json.Marshal(ote)
+ if err != nil {
+ log.WithError(err).Error("sendToDevice failed json.Marshal")
+ return err
+ }
+ m := &nats.Msg{
+ Subject: p.TopicSendToDeviceEvent,
+ Data: eventJSON,
+ Header: nats.Header{},
+ }
+ m.Header.Set("sender", sender)
+ m.Header.Set(jetstream.UserID, userID)
+ if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
+ log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
+ return err
+ }
+ }
+ return nil
+}
+
+func (p *SyncAPIProducer) SendTyping(
+ ctx context.Context, userID, roomID string, typing bool, timeoutMS int64,
+) error {
+ m := &nats.Msg{
+ Subject: p.TopicTypingEvent,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set(jetstream.RoomID, roomID)
+ m.Header.Set("typing", strconv.FormatBool(typing))
+ m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS)))
+
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}