aboutsummaryrefslogtreecommitdiff
path: root/eduserver/input/input.go
diff options
context:
space:
mode:
Diffstat (limited to 'eduserver/input/input.go')
-rw-r--r--eduserver/input/input.go51
1 files changed, 21 insertions, 30 deletions
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
index bdc24374..e7501a90 100644
--- a/eduserver/input/input.go
+++ b/eduserver/input/input.go
@@ -21,12 +21,12 @@ import (
"encoding/json"
"time"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
@@ -43,7 +43,7 @@ type EDUServerInputAPI struct {
// The kafka topic to output new key change events to
OutputKeyChangeEventTopic string
// kafka producer
- Producer sarama.SyncProducer
+ JetStream nats.JetStreamContext
// Internal user query API
UserAPI userapi.UserInternalAPI
// our server name
@@ -100,13 +100,11 @@ func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
"user_id": request.UserID,
}).Infof("Producing to topic '%s'", t.OutputKeyChangeEventTopic)
- m := &sarama.ProducerMessage{
- Topic: string(t.OutputKeyChangeEventTopic),
- Key: sarama.StringEncoder(request.UserID),
- Value: sarama.ByteEncoder(eventJSON),
- }
-
- _, _, err = t.Producer.SendMessage(m)
+ _, err = t.JetStream.PublishMsg(&nats.Msg{
+ Subject: t.OutputKeyChangeEventTopic,
+ Header: nats.Header{},
+ Data: eventJSON,
+ })
return err
}
@@ -138,13 +136,11 @@ func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
"typing": ite.Typing,
}).Infof("Producing to topic '%s'", t.OutputTypingEventTopic)
- m := &sarama.ProducerMessage{
- Topic: string(t.OutputTypingEventTopic),
- Key: sarama.StringEncoder(ite.RoomID),
- Value: sarama.ByteEncoder(eventJSON),
- }
-
- _, _, err = t.Producer.SendMessage(m)
+ _, err = t.JetStream.PublishMsg(&nats.Msg{
+ Subject: t.OutputTypingEventTopic,
+ Header: nats.Header{},
+ Data: eventJSON,
+ })
return err
}
@@ -193,14 +189,10 @@ func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) e
return err
}
- m := &sarama.ProducerMessage{
- Topic: string(t.OutputSendToDeviceEventTopic),
- Key: sarama.StringEncoder(ote.UserID),
- Value: sarama.ByteEncoder(eventJSON),
- }
-
- _, _, err = t.Producer.SendMessage(m)
- if err != nil {
+ if _, err = t.JetStream.PublishMsg(&nats.Msg{
+ Subject: t.OutputSendToDeviceEventTopic,
+ Data: eventJSON,
+ }); err != nil {
logrus.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
return err
}
@@ -228,11 +220,10 @@ func (t *EDUServerInputAPI) InputReceiptEvent(
if err != nil {
return err
}
- m := &sarama.ProducerMessage{
- Topic: t.OutputReceiptEventTopic,
- Key: sarama.StringEncoder(request.InputReceiptEvent.RoomID + ":" + request.InputReceiptEvent.UserID),
- Value: sarama.ByteEncoder(js),
- }
- _, _, err = t.Producer.SendMessage(m)
+
+ _, err = t.JetStream.PublishMsg(&nats.Msg{
+ Subject: t.OutputReceiptEventTopic,
+ Data: js,
+ })
return err
}