aboutsummaryrefslogtreecommitdiff
path: root/clientapi
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2022-01-05 18:44:49 +0100
committerGitHub <noreply@github.com>2022-01-05 17:44:49 +0000
commit161f14517669410d3e8207dc41eea5c9695f7e17 (patch)
tree20db8ed83d92c688206242f84880ff2e35a1d5eb /clientapi
parenta47b12dc7d692e0ddd4aaa0801dafc9bb462aad9 (diff)
Add NATS JetStream support (#1866)
* Add NATS JetStream support Update shopify/sarama * Fix addresses * Don't change Addresses in Defaults * Update saramajetstream * Add missing error check Keep typing events for at least one minute * Use all configured NATS addresses * Update saramajetstream * Try setting up with NATS * Make sure NATS uses own persistent directory (TODO: make this configurable) * Update go.mod/go.sum * Jetstream package * Various other refactoring * Build fixes * Config tweaks, make random jetstream storage path for CI * Disable interest policies * Try to sane default on jetstream base path * Try to use in-memory for CI * Restore storage/retention * Update nats.go dependency * Adapt changes to config * Remove unneeded TopicFor * Dep update * Revert "Remove unneeded TopicFor" This reverts commit f5a4e4a339b6f94ec215778dca22204adaa893d1. * Revert changes made to streams * Fix build problems * Update nats-server * Update go.mod/go.sum * Roomserver input API queuing using NATS * Fix topic naming * Prometheus metrics * More refactoring to remove saramajetstream * Add missing topic * Don't try to populate map that doesn't exist * Roomserver output topic * Update go.mod/go.sum * Message acknowledgements * Ack tweaks * Try to resume transaction re-sends * Try to resume transaction re-sends * Update to matrix-org/gomatrixserverlib@91dadfb * Remove internal.PartitionStorer from components that don't consume keychanges * Try to reduce re-allocations a bit in resolveConflictsV2 * Tweak delivery options on RS input * Publish send-to-device messages into correct JetStream subject * Async and sync roomserver input * Update dendrite-config.yaml * Remove roomserver tests for now (they need rewriting) * Remove roomserver test again (was merged back in) * Update documentation * Docker updates * More Docker updates * Update Docker readme again * Fix lint issues * Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that * Go 1.16 instead of Go 1.13 for upgrade tests and Complement * Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit 368675283fc44501f227639811bdb16dd5deef8c. * Don't report any errors on `/send` to see what fun that creates * Fix panics on closed channel sends * Enforce state key matches sender * Do the same for leave * Various tweaks to make tests happier Squashed commit of the following: commit 13f9028e7a63662759ce7c55504a9d2423058668 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:47:14 2022 +0000 Do the same for leave commit e6be7f05c349fafbdddfe818337a17a60c867be1 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:33:42 2022 +0000 Enforce state key matches sender commit 85ede6d64bf10ce9b91cdd6d80f87350ee55242f Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 14:07:04 2022 +0000 Fix panics on closed channel sends commit 9755494a98bed62450f8001d8128e40481d27e15 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:38:22 2022 +0000 Don't report any errors on `/send` to see what fun that creates commit 3bb4f87b5dd56882febb4db5621db484c8789b7c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:00:26 2022 +0000 Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit 368675283fc44501f227639811bdb16dd5deef8c. commit fe2673ed7be9559eaca134424e403a4faca100b0 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 12:09:34 2022 +0000 Go 1.16 instead of Go 1.13 for upgrade tests and Complement commit 368675283fc44501f227639811bdb16dd5deef8c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 11:51:45 2022 +0000 Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that commit b028dfc08577bcf52e6cb498026e15fa5d46d07c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 10:29:08 2022 +0000 Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Merge in NATS Server v2.6.6 and nats.go v1.13 into the in-process connection fork * Add `jetstream.WithJetStreamMessage` to make ack/nak-ing less messy, use process context in consumers * Fix consumer component name in federation API * Add comment explaining where streams are defined * Tweaks to roomserver input with comments * Finish that sentence that I apparently forgot to finish in INSTALL.md * Bump version number of config to 2 * Add comments around asynchronous sends to roomserver in processEventWithMissingState * More useful error message when the config version does not match * Set version in generate-config * Fix version in config.Defaults Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'clientapi')
-rw-r--r--clientapi/clientapi.go8
-rw-r--r--clientapi/producers/syncapi.go21
-rw-r--r--clientapi/routing/createroom.go1
-rw-r--r--clientapi/routing/membership.go1
-rw-r--r--clientapi/routing/profile.go4
-rw-r--r--clientapi/routing/redaction.go2
-rw-r--r--clientapi/routing/sendevent.go1
-rw-r--r--clientapi/threepid/invites.go1
8 files changed, 23 insertions, 16 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 64a7aa5e..7c772125 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -26,7 +26,7 @@ import (
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/gomatrixserverlib"
@@ -49,11 +49,11 @@ func AddPublicRoutes(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
- _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+ js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
- Producer: producer,
- Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData),
+ JetStream: js,
+ Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
}
routing.Setup(
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 6ab8eef2..bd6af5f1 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -17,39 +17,42 @@ package producers
import (
"encoding/json"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct {
- Topic string
- Producer sarama.SyncProducer
+ Topic string
+ JetStream nats.JetStreamContext
}
// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string) error {
- var m sarama.ProducerMessage
+ m := &nats.Msg{
+ Subject: p.Topic,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
data := eventutil.AccountData{
RoomID: roomID,
Type: dataType,
}
- value, err := json.Marshal(data)
+ var err error
+ m.Data, err = json.Marshal(data)
if err != nil {
return err
}
- m.Topic = string(p.Topic)
- m.Key = sarama.StringEncoder(userID)
- m.Value = sarama.ByteEncoder(value)
log.WithFields(log.Fields{
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
}).Infof("Producing to topic '%s'", p.Topic)
- _, _, err = p.Producer.SendMessage(&m)
+ _, err = p.JetStream.PublishMsg(m)
return err
}
diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go
index 8f96c3d3..85331192 100644
--- a/clientapi/routing/createroom.go
+++ b/clientapi/routing/createroom.go
@@ -463,6 +463,7 @@ func createRoom(
},
ev.Headered(roomVersion),
nil,
+ false,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEventWithState failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go
index 33fb3883..7ddb827e 100644
--- a/clientapi/routing/membership.go
+++ b/clientapi/routing/membership.go
@@ -110,6 +110,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us
[]*gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)},
cfg.Matrix.ServerName,
nil,
+ false,
); err != nil {
util.GetLogger(ctx).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go
index 7bea35e5..9de1869b 100644
--- a/clientapi/routing/profile.go
+++ b/clientapi/routing/profile.go
@@ -169,7 +169,7 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
- if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
+ if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -286,7 +286,7 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
- if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
+ if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go
index c25ca4ef..8492236b 100644
--- a/clientapi/routing/redaction.go
+++ b/clientapi/routing/redaction.go
@@ -120,7 +120,7 @@ func SendRedaction(
JSON: jsonerror.NotFound("Room does not exist"),
}
}
- if err = roomserverAPI.SendEvents(context.Background(), rsAPI, roomserverAPI.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil); err != nil {
+ if err = roomserverAPI.SendEvents(context.Background(), rsAPI, roomserverAPI.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go
index 204d2592..f0498312 100644
--- a/clientapi/routing/sendevent.go
+++ b/clientapi/routing/sendevent.go
@@ -122,6 +122,7 @@ func SendEvent(
},
cfg.Matrix.ServerName,
txnAndSessionID,
+ false,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go
index 53cd6b8c..985cf00c 100644
--- a/clientapi/threepid/invites.go
+++ b/clientapi/threepid/invites.go
@@ -367,5 +367,6 @@ func emit3PIDInviteEvent(
},
cfg.Matrix.ServerName,
nil,
+ false,
)
}