diff options
author | Till <2353100+S7evinK@users.noreply.github.com> | 2022-09-13 09:35:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-13 09:35:45 +0200 |
commit | c366ccdfcaf1ea820bef4744fda90aff4db1c67d (patch) | |
tree | a8a7a6134b98797926b6e232ff748d81dd4bc8aa /clientapi | |
parent | 100fa9b2354efa05b4fbff3a3cb745ea7783d41c (diff) |
Send-to-device consumer/producer tweaks (#2713)
Some tweaks for the send-to-device consumers/producers:
- use `json.RawMessage` without marshalling it first
- try further devices (if available) if we failed to `PublishMsg` in the
producers
- some logging changes (to better debug E2EE issues)
Diffstat (limited to 'clientapi')
-rw-r--r-- | clientapi/producers/syncapi.go | 32 |
1 files changed, 15 insertions, 17 deletions
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go index 5933ce1a..2dc0c484 100644 --- a/clientapi/producers/syncapi.go +++ b/clientapi/producers/syncapi.go @@ -21,12 +21,13 @@ import ( "strconv" "time" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/syncapi/types" - userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" log "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/setup/jetstream" + "github.com/matrix-org/dendrite/syncapi/types" + userapi "github.com/matrix-org/dendrite/userapi/api" ) // SyncAPIProducer produces events for the sync API server to consume @@ -61,7 +62,7 @@ func (p *SyncAPIProducer) SendReceipt( func (p *SyncAPIProducer) SendToDevice( ctx context.Context, sender, userID, deviceID, eventType string, - message interface{}, + message json.RawMessage, ) error { devices := []string{} _, domain, err := gomatrixserverlib.SplitID('@', userID) @@ -89,24 +90,19 @@ func (p *SyncAPIProducer) SendToDevice( devices = append(devices, deviceID) } - js, err := json.Marshal(message) - if err != nil { - return err - } - log.WithFields(log.Fields{ "user_id": userID, "num_devices": len(devices), "type": eventType, }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent) - for _, device := range devices { + for i, device := range devices { ote := &types.OutputSendToDeviceEvent{ UserID: userID, DeviceID: device, SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ Sender: sender, Type: eventType, - Content: js, + Content: message, }, } @@ -115,15 +111,17 @@ func (p *SyncAPIProducer) SendToDevice( log.WithError(err).Error("sendToDevice failed json.Marshal") return err } - m := &nats.Msg{ - Subject: p.TopicSendToDeviceEvent, - Data: eventJSON, - Header: nats.Header{}, - } + m := nats.NewMsg(p.TopicSendToDeviceEvent) + m.Data = eventJSON m.Header.Set("sender", sender) m.Header.Set(jetstream.UserID, userID) + if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil { - log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage") + if i < len(devices)-1 { + log.WithError(err).Warn("sendToDevice failed to PublishMsg, trying further devices") + continue + } + log.WithError(err).Error("sendToDevice failed to PublishMsg for all devices") return err } } |