aboutsummaryrefslogtreecommitdiff
path: root/clientapi
diff options
context:
space:
mode:
Diffstat (limited to 'clientapi')
-rw-r--r--clientapi/clientapi.go8
-rw-r--r--clientapi/producers/syncapi.go21
-rw-r--r--clientapi/routing/createroom.go1
-rw-r--r--clientapi/routing/membership.go1
-rw-r--r--clientapi/routing/profile.go4
-rw-r--r--clientapi/routing/redaction.go2
-rw-r--r--clientapi/routing/sendevent.go1
-rw-r--r--clientapi/threepid/invites.go1
8 files changed, 23 insertions, 16 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 64a7aa5e..7c772125 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -26,7 +26,7 @@ import (
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/gomatrixserverlib"
@@ -49,11 +49,11 @@ func AddPublicRoutes(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
- _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+ js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
- Producer: producer,
- Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),
+ JetStream: js,
+ Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
}
routing.Setup(
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 6ab8eef2..bd6af5f1 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -17,39 +17,42 @@ package producers
import (
"encoding/json"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct {
- Topic string
- Producer sarama.SyncProducer
+ Topic string
+ JetStream nats.JetStreamContext
}
// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
- var m sarama.ProducerMessage
+ m := &nats.Msg{
+ Subject: p.Topic,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
data := eventutil.AccountData{
RoomID: roomID,
Type: dataType,
}
- value, err := json.Marshal(data)
+ var err error
+ m.Data, err = json.Marshal(data)
if err != nil {
return err
}
- m.Topic = string(p.Topic)
- m.Key = sarama.StringEncoder(userID)
- m.Value = sarama.ByteEncoder(value)
log.WithFields(log.Fields{
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
}).Infof("Producing to topic '%s'", p.Topic)
- _, _, err = p.Producer.SendMessage(&m)
+ _, err = p.JetStream.PublishMsg(m)
return err
}
diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go
index 8f96c3d3..85331192 100644
--- a/clientapi/routing/createroom.go
+++ b/clientapi/routing/createroom.go
@@ -463,6 +463,7 @@ func createRoom(
},
ev.Headered(roomVersion),
nil,
+ false,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEventWithState failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go
index 33fb3883..7ddb827e 100644
--- a/clientapi/routing/membership.go
+++ b/clientapi/routing/membership.go
@@ -110,6 +110,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us
[]*gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)},
cfg.Matrix.ServerName,
nil,
+ false,
); err != nil {
util.GetLogger(ctx).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go
index 7bea35e5..9de1869b 100644
--- a/clientapi/routing/profile.go
+++ b/clientapi/routing/profile.go
@@ -169,7 +169,7 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
- if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
+ if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -286,7 +286,7 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
- if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
+ if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go
index c25ca4ef..8492236b 100644
--- a/clientapi/routing/redaction.go
+++ b/clientapi/routing/redaction.go
@@ -120,7 +120,7 @@ func SendRedaction(
JSON: jsonerror.NotFound("Room does not exist"),
}
}
- if err = roomserverAPI.SendEvents(context.Background(), rsAPI, roomserverAPI.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil); err != nil {
+ if err = roomserverAPI.SendEvents(context.Background(), rsAPI, roomserverAPI.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go
index 204d2592..f0498312 100644
--- a/clientapi/routing/sendevent.go
+++ b/clientapi/routing/sendevent.go
@@ -122,6 +122,7 @@ func SendEvent(
},
cfg.Matrix.ServerName,
txnAndSessionID,
+ false,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go
index 53cd6b8c..985cf00c 100644
--- a/clientapi/threepid/invites.go
+++ b/clientapi/threepid/invites.go
@@ -367,5 +367,6 @@ func emit3PIDInviteEvent(
},
cfg.Matrix.ServerName,
nil,
+ false,
)
}