aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2022-09-13 09:35:45 +0200
committerGitHub <noreply@github.com>2022-09-13 09:35:45 +0200
commitc366ccdfcaf1ea820bef4744fda90aff4db1c67d (patch)
treea8a7a6134b98797926b6e232ff748d81dd4bc8aa
parent100fa9b2354efa05b4fbff3a3cb745ea7783d41c (diff)
Send-to-device consumer/producer tweaks (#2713)
Some tweaks for the send-to-device consumers/producers: - use `json.RawMessage` without marshalling it first - try further devices (if available) if we failed to `PublishMsg` in the producers - some logging changes (to better debug E2EE issues)
-rw-r--r--clientapi/producers/syncapi.go32
-rw-r--r--federationapi/producers/syncapi.go24
-rw-r--r--syncapi/consumers/sendtodevice.go15
-rw-r--r--syncapi/syncapi_test.go4
4 files changed, 35 insertions, 40 deletions
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 5933ce1a..2dc0c484 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -21,12 +21,13 @@ import (
"strconv"
"time"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/syncapi/types"
- userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
)
// SyncAPIProducer produces events for the sync API server to consume
@@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt(
func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string,
- message interface{},
+ message json.RawMessage,
) error {
devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
@@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice(
devices = append(devices, deviceID)
}
- js, err := json.Marshal(message)
- if err != nil {
- return err
- }
-
log.WithFields(log.Fields{
"user_id": userID,
"num_devices": len(devices),
"type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
- for _, device := range devices {
+ for i, device := range devices {
ote := &types.OutputSendToDeviceEvent{
UserID: userID,
DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender,
Type: eventType,
- Content: js,
+ Content: message,
},
}
@@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice(
log.WithError(err).Error("sendToDevice failed json.Marshal")
return err
}
- m := &nats.Msg{
- Subject: p.TopicSendToDeviceEvent,
- Data: eventJSON,
- Header: nats.Header{},
- }
+ m := nats.NewMsg(p.TopicSendToDeviceEvent)
+ m.Data = eventJSON
m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID)
+
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
- log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
+ if i < len(devices)-1 {
+ log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
+ continue
+ }
+ log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
return err
}
}
diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go
index 86c8c10a..4abd3fbe 100644
--- a/federationapi/producers/syncapi.go
+++ b/federationapi/producers/syncapi.go
@@ -64,7 +64,7 @@ func (p *SyncAPIProducer) SendReceipt(
func (p *SyncAPIProducer) SendToDevice(
ctx context.Context, sender, userID, deviceID, eventType string,
- message interface{},
+ message json.RawMessage,
) error {
devices := []string{}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
@@ -92,24 +92,19 @@ func (p *SyncAPIProducer) SendToDevice(
devices = append(devices, deviceID)
}
- js, err := json.Marshal(message)
- if err != nil {
- return err
- }
-
log.WithFields(log.Fields{
"user_id": userID,
"num_devices": len(devices),
"type": eventType,
}).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
- for _, device := range devices {
+ for i, device := range devices {
ote := &types.OutputSendToDeviceEvent{
UserID: userID,
DeviceID: device,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
Sender: sender,
Type: eventType,
- Content: js,
+ Content: message,
},
}
@@ -118,16 +113,17 @@ func (p *SyncAPIProducer) SendToDevice(
log.WithError(err).Error("sendToDevice failed json.Marshal")
return err
}
- m := &nats.Msg{
- Subject: p.TopicSendToDeviceEvent,
- Data: eventJSON,
- Header: nats.Header{},
- }
+ m := nats.NewMsg(p.TopicSendToDeviceEvent)
+ m.Data = eventJSON
m.Header.Set("sender", sender)
m.Header.Set(jetstream.UserID, userID)
if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
- log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
+ if i < len(devices)-1 {
+ log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices")
+ continue
+ }
+ log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices")
return err
}
}
diff --git a/syncapi/consumers/sendtodevice.go b/syncapi/consumers/sendtodevice.go
index 89b01d7e..7d6aae59 100644
--- a/syncapi/consumers/sendtodevice.go
+++ b/syncapi/consumers/sendtodevice.go
@@ -19,16 +19,17 @@ import (
"encoding/json"
"github.com/getsentry/sentry-go"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
)
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
@@ -79,16 +80,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
sentry.CaptureException(err)
+ log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
return true
}
if domain != s.serverName {
+ log.Tracef("ignoring send-to-device event with destination %s", domain)
return true
}
var output types.OutputSendToDeviceEvent
if err = json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("output log: message parse failure")
+ log.WithError(err).Errorf("send-to-device: message parse failure")
sentry.CaptureException(err)
return true
}
@@ -105,7 +108,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
)
if err != nil {
sentry.CaptureException(err)
- log.WithError(err).Errorf("failed to store send-to-device message")
+ log.WithError(err).Errorf("send-to-device: failed to store message")
return false
}
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index c81256aa..a4985dbf 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -624,9 +624,7 @@ func testSendToDevice(t *testing.T, dbType test.DBType) {
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
for i := 0; i < tc.sendMessagesCount; i++ {
msgCounter++
- msg := map[string]string{
- "dummy": fmt.Sprintf("message %d", msgCounter),
- }
+ msg := json.RawMessage(fmt.Sprintf(`{"dummy":"message %d"}`, msgCounter))
if err := producer.SendToDevice(ctx, user.ID, user.ID, alice.ID, "m.dendrite.test", msg); err != nil {
t.Fatalf("unable to send to device message: %v", err)
}