aboutsummaryrefslogtreecommitdiff
path: root/clientapi
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-09-13 09:35:45 +0200
committerGitHub <noreply@github.com>2022-09-13 09:35:45 +0200
commitc366ccdfcaf1ea820bef4744fda90aff4db1c67d (patch)
treea8a7a6134b98797926b6e232ff748d81dd4bc8aa /clientapi
parent100fa9b2354efa05b4fbff3a3cb745ea7783d41c (diff)
Send-to-device consumer/producer tweaks (#2713)
Some tweaks for the send-to-device consumers/producers: - use `json.RawMessage` without marshalling it first - try further devices (if available) if we failed to `PublishMsg` in the producers - some logging changes (to better debug E2EE issues)
Diffstat (limited to 'clientapi')
-rw-r--r--clientapi/producers/syncapi.go32
1 files changed, 15 insertions, 17 deletions
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 5933ce1a..2dc0c484 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -21,12 +21,13 @@ import (
"strconv"
"time"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/syncapi/types"
- userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
)
// SyncAPIProducer produces events for the sync API server to consume
@@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string,
- message interface{},
+ message json.RawMessage,
) error {
devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
@@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
devices = append(devices, deviceID)
}
- js, err := json.Marshal(message)
- if err != nil {
- return err
- }
-
log.WithFields(log.Fields{
"user_id": userID,
"num_devices": len(devices),
"type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
- for _, device := range devices {
+ for i, device := range devices {
ote := &types.OutputSendToDeviceEvent{
UserID: userID,
DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender,
Type: eventType,
- Content: js,
+ Content: message,
},
}
@@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
log.WithError(err).Error("sendToDevice failed json.Marshal")
return err
}
- m := &nats.Msg{
- Subject: p.TopicSendToDeviceEvent,
- Data: eventJSON,
- Header: nats.Header{},
- }
+ m := nats.NewMsg(p.TopicSendToDeviceEvent)
+ m.Data = eventJSON
m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID)
+
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
- log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
+ if i < len(devices)-1 {
+ log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
+ continue
+ }
+ log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
return err
}
}