aboutsummaryrefslogtreecommitdiff
path: root/federationapi
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2022-01-05 18:44:49 +0100
committerGitHub <noreply@github.com>2022-01-05 17:44:49 +0000
commit161f14517669410d3e8207dc41eea5c9695f7e17 (patch)
tree20db8ed83d92c688206242f84880ff2e35a1d5eb /federationapi
parenta47b12dc7d692e0ddd4aaa0801dafc9bb462aad9 (diff)
Add NATS JetStream support (#1866)
* Add NATS JetStream support Update shopify/sarama * Fix addresses * Don't change Addresses in Defaults * Update saramajetstream * Add missing error check Keep typing events for at least one minute * Use all configured NATS addresses * Update saramajetstream * Try setting up with NATS * Make sure NATS uses own persistent directory (TODO: make this configurable) * Update go.mod/go.sum * Jetstream package * Various other refactoring * Build fixes * Config tweaks, make random jetstream storage path for CI * Disable interest policies * Try to sane default on jetstream base path * Try to use in-memory for CI * Restore storage/retention * Update nats.go dependency * Adapt changes to config * Remove unneeded TopicFor * Dep update * Revert "Remove unneeded TopicFor" This reverts commit f5a4e4a339b6f94ec215778dca22204adaa893d1. * Revert changes made to streams * Fix build problems * Update nats-server * Update go.mod/go.sum * Roomserver input API queuing using NATS * Fix topic naming * Prometheus metrics * More refactoring to remove saramajetstream * Add missing topic * Don't try to populate map that doesn't exist * Roomserver output topic * Update go.mod/go.sum * Message acknowledgements * Ack tweaks * Try to resume transaction re-sends * Try to resume transaction re-sends * Update to matrix-org/gomatrixserverlib@91dadfb * Remove internal.PartitionStorer from components that don't consume keychanges * Try to reduce re-allocations a bit in resolveConflictsV2 * Tweak delivery options on RS input * Publish send-to-device messages into correct JetStream subject * Async and sync roomserver input * Update dendrite-config.yaml * Remove roomserver tests for now (they need rewriting) * Remove roomserver test again (was merged back in) * Update documentation * Docker updates * More Docker updates * Update Docker readme again * Fix lint issues * Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that * Go 1.16 instead of Go 1.13 for upgrade tests and Complement * Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit 368675283fc44501f227639811bdb16dd5deef8c. * Don't report any errors on `/send` to see what fun that creates * Fix panics on closed channel sends * Enforce state key matches sender * Do the same for leave * Various tweaks to make tests happier Squashed commit of the following: commit 13f9028e7a63662759ce7c55504a9d2423058668 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:47:14 2022 +0000 Do the same for leave commit e6be7f05c349fafbdddfe818337a17a60c867be1 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:33:42 2022 +0000 Enforce state key matches sender commit 85ede6d64bf10ce9b91cdd6d80f87350ee55242f Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 14:07:04 2022 +0000 Fix panics on closed channel sends commit 9755494a98bed62450f8001d8128e40481d27e15 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:38:22 2022 +0000 Don't report any errors on `/send` to see what fun that creates commit 3bb4f87b5dd56882febb4db5621db484c8789b7c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:00:26 2022 +0000 Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit 368675283fc44501f227639811bdb16dd5deef8c. commit fe2673ed7be9559eaca134424e403a4faca100b0 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 12:09:34 2022 +0000 Go 1.16 instead of Go 1.13 for upgrade tests and Complement commit 368675283fc44501f227639811bdb16dd5deef8c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 11:51:45 2022 +0000 Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that commit b028dfc08577bcf52e6cb498026e15fa5d46d07c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 10:29:08 2022 +0000 Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Merge in NATS Server v2.6.6 and nats.go v1.13 into the in-process connection fork * Add `jetstream.WithJetStreamMessage` to make ack/nak-ing less messy, use process context in consumers * Fix consumer component name in federation API * Add comment explaining where streams are defined * Tweaks to roomserver input with comments * Finish that sentence that I apparently forgot to finish in INSTALL.md * Bump version number of config to 2 * Add comments around asynchronous sends to roomserver in processEventWithMissingState * More useful error message when the config version does not match * Set version in generate-config * Fix version in config.Defaults Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'federationapi')
-rw-r--r--federationapi/consumers/eduserver.go361
-rw-r--r--federationapi/consumers/keychange.go16
-rw-r--r--federationapi/consumers/roomserver.go144
-rw-r--r--federationapi/federationapi.go8
-rw-r--r--federationapi/federationapi_keys_test.go2
-rw-r--r--federationapi/federationapi_test.go5
-rw-r--r--federationapi/internal/perform.go4
-rw-r--r--federationapi/routing/join.go6
-rw-r--r--federationapi/routing/leave.go10
-rw-r--r--federationapi/routing/send.go10
-rw-r--r--federationapi/routing/threepid.go3
11 files changed, 298 insertions, 271 deletions
diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go
index 56ec9eaf..9e52acef 100644
--- a/federationapi/consumers/eduserver.go
+++ b/federationapi/consumers/eduserver.go
@@ -17,233 +17,236 @@ package consumers
import (
"context"
"encoding/json"
- "fmt"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputEDUConsumer consumes events that originate in EDU server.
type OutputEDUConsumer struct {
- typingConsumer *internal.ContinualConsumer
- sendToDeviceConsumer *internal.ContinualConsumer
- receiptConsumer *internal.ContinualConsumer
- db storage.Database
- queues *queue.OutgoingQueues
- ServerName gomatrixserverlib.ServerName
- TypingTopic string
- SendToDeviceTopic string
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ db storage.Database
+ queues *queue.OutgoingQueues
+ ServerName gomatrixserverlib.ServerName
+ typingTopic string
+ sendToDeviceTopic string
+ receiptTopic string
}
// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
func NewOutputEDUConsumer(
process *process.ProcessContext,
cfg *config.FederationAPI,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
queues *queue.OutgoingQueues,
store storage.Database,
) *OutputEDUConsumer {
- c := &OutputEDUConsumer{
- typingConsumer: &internal.ContinualConsumer{
- Process: process,
- ComponentName: "eduserver/typing",
- Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
- Consumer: kafkaConsumer,
- PartitionStore: store,
- },
- sendToDeviceConsumer: &internal.ContinualConsumer{
- Process: process,
- ComponentName: "eduserver/sendtodevice",
- Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
- Consumer: kafkaConsumer,
- PartitionStore: store,
- },
- receiptConsumer: &internal.ContinualConsumer{
- Process: process,
- ComponentName: "eduserver/receipt",
- Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
- Consumer: kafkaConsumer,
- PartitionStore: store,
- },
+ return &OutputEDUConsumer{
+ ctx: process.Context(),
+ jetstream: js,
queues: queues,
db: store,
ServerName: cfg.Matrix.ServerName,
- TypingTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
- SendToDeviceTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
+ typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
+ sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
+ receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
}
- c.typingConsumer.ProcessMessage = c.onTypingEvent
- c.sendToDeviceConsumer.ProcessMessage = c.onSendToDeviceEvent
- c.receiptConsumer.ProcessMessage = c.onReceiptEvent
-
- return c
}
// Start consuming from EDU servers
func (t *OutputEDUConsumer) Start() error {
- if err := t.typingConsumer.Start(); err != nil {
- return fmt.Errorf("t.typingConsumer.Start: %w", err)
+ if _, err := t.jetstream.Subscribe(t.typingTopic, t.onTypingEvent); err != nil {
+ return err
}
- if err := t.sendToDeviceConsumer.Start(); err != nil {
- return fmt.Errorf("t.sendToDeviceConsumer.Start: %w", err)
+ if _, err := t.jetstream.Subscribe(t.sendToDeviceTopic, t.onSendToDeviceEvent); err != nil {
+ return err
}
- if err := t.receiptConsumer.Start(); err != nil {
- return fmt.Errorf("t.receiptConsumer.Start: %w", err)
+ if _, err := t.jetstream.Subscribe(t.receiptTopic, t.onReceiptEvent); err != nil {
+ return err
}
return nil
}
// onSendToDeviceEvent is called in response to a message received on the
// send-to-device events topic from the EDU server.
-func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *sarama.ConsumerMessage) error {
+func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) {
// Extract the send-to-device event from msg.
- var ote api.OutputSendToDeviceEvent
- if err := json.Unmarshal(msg.Value, &ote); err != nil {
- log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
- return nil
- }
-
- // only send send-to-device events which originated from us
- _, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
- if err != nil {
- log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
- return nil
- }
- if originServerName != t.ServerName {
- log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
- return nil
- }
-
- _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
- if err != nil {
- log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
- return nil
- }
-
- // Pack the EDU and marshal it
- edu := &gomatrixserverlib.EDU{
- Type: gomatrixserverlib.MDirectToDevice,
- Origin: string(t.ServerName),
- }
- tdm := gomatrixserverlib.ToDeviceMessage{
- Sender: ote.Sender,
- Type: ote.Type,
- MessageID: util.RandomString(32),
- Messages: map[string]map[string]json.RawMessage{
- ote.UserID: {
- ote.DeviceID: ote.Content,
+ jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
+ var ote api.OutputSendToDeviceEvent
+ if err := json.Unmarshal(msg.Data, &ote); err != nil {
+ log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
+ return true
+ }
+
+ // only send send-to-device events which originated from us
+ _, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
+ if err != nil {
+ log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
+ return true
+ }
+ if originServerName != t.ServerName {
+ log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
+ return true
+ }
+
+ _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
+ return true
+ }
+
+ // Pack the EDU and marshal it
+ edu := &gomatrixserverlib.EDU{
+ Type: gomatrixserverlib.MDirectToDevice,
+ Origin: string(t.ServerName),
+ }
+ tdm := gomatrixserverlib.ToDeviceMessage{
+ Sender: ote.Sender,
+ Type: ote.Type,
+ MessageID: util.RandomString(32),
+ Messages: map[string]map[string]json.RawMessage{
+ ote.UserID: {
+ ote.DeviceID: ote.Content,
+ },
},
- },
- }
- if edu.Content, err = json.Marshal(tdm); err != nil {
- return err
- }
-
- log.Infof("Sending send-to-device message into %q destination queue", destServerName)
- return t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName})
+ }
+ if edu.Content, err = json.Marshal(tdm); err != nil {
+ log.WithError(err).Error("failed to marshal EDU JSON")
+ return true
+ }
+
+ log.Infof("Sending send-to-device message into %q destination queue", destServerName)
+ if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
+ log.WithError(err).Error("failed to send EDU")
+ return false
+ }
+
+ return true
+ })
}
// onTypingEvent is called in response to a message received on the typing
// events topic from the EDU server.
-func (t *OutputEDUConsumer) onTypingEvent(msg *sarama.ConsumerMessage) error {
- // Extract the typing event from msg.
- var ote api.OutputTypingEvent
- if err := json.Unmarshal(msg.Value, &ote); err != nil {
- // Skip this msg but continue processing messages.
- log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
- return nil
- }
-
- // only send typing events which originated from us
- _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
- if err != nil {
- log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
- return nil
- }
- if typingServerName != t.ServerName {
- log.WithField("other_server", typingServerName).Info("Suppressing typing notif: originated elsewhere")
- return nil
- }
-
- joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID)
- if err != nil {
- return err
- }
-
- names := make([]gomatrixserverlib.ServerName, len(joined))
- for i := range joined {
- names[i] = joined[i].ServerName
- }
-
- edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
- if edu.Content, err = json.Marshal(map[string]interface{}{
- "room_id": ote.Event.RoomID,
- "user_id": ote.Event.UserID,
- "typing": ote.Event.Typing,
- }); err != nil {
- return err
- }
-
- return t.queues.SendEDU(edu, t.ServerName, names)
+func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) {
+ jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
+ // Extract the typing event from msg.
+ var ote api.OutputTypingEvent
+ if err := json.Unmarshal(msg.Data, &ote); err != nil {
+ // Skip this msg but continue processing messages.
+ log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
+ _ = msg.Ack()
+ return true
+ }
+
+ // only send typing events which originated from us
+ _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
+ _ = msg.Ack()
+ return true
+ }
+ if typingServerName != t.ServerName {
+ return true
+ }
+
+ joined, err := t.db.GetJoinedHosts(t.ctx, ote.Event.RoomID)
+ if err != nil {
+ log.WithError(err).WithField("room_id", ote.Event.RoomID).Error("failed to get joined hosts for room")
+ return false
+ }
+
+ names := make([]gomatrixserverlib.ServerName, len(joined))
+ for i := range joined {
+ names[i] = joined[i].ServerName
+ }
+
+ edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
+ if edu.Content, err = json.Marshal(map[string]interface{}{
+ "room_id": ote.Event.RoomID,
+ "user_id": ote.Event.UserID,
+ "typing": ote.Event.Typing,
+ }); err != nil {
+ log.WithError(err).Error("failed to marshal EDU JSON")
+ return true
+ }
+
+ if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
+ log.WithError(err).Error("failed to send EDU")
+ return false
+ }
+
+ return true
+ })
}
// onReceiptEvent is called in response to a message received on the receipt
// events topic from the EDU server.
-func (t *OutputEDUConsumer) onReceiptEvent(msg *sarama.ConsumerMessage) error {
- // Extract the typing event from msg.
- var receipt api.OutputReceiptEvent
- if err := json.Unmarshal(msg.Value, &receipt); err != nil {
- // Skip this msg but continue processing messages.
- log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
- return nil
- }
-
- // only send receipt events which originated from us
- _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
- if err != nil {
- log.WithError(err).WithField("user_id", receipt.UserID).Error("Failed to extract domain from receipt sender")
- return nil
- }
- if receiptServerName != t.ServerName {
- return nil // don't log, very spammy as it logs for each remote receipt
- }
-
- joined, err := t.db.GetJoinedHosts(context.TODO(), receipt.RoomID)
- if err != nil {
- return err
- }
-
- names := make([]gomatrixserverlib.ServerName, len(joined))
- for i := range joined {
- names[i] = joined[i].ServerName
- }
-
- content := map[string]api.FederationReceiptMRead{}
- content[receipt.RoomID] = api.FederationReceiptMRead{
- User: map[string]api.FederationReceiptData{
- receipt.UserID: {
- Data: api.ReceiptTS{
- TS: receipt.Timestamp,
+func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) {
+ jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
+ // Extract the typing event from msg.
+ var receipt api.OutputReceiptEvent
+ if err := json.Unmarshal(msg.Data, &receipt); err != nil {
+ // Skip this msg but continue processing messages.
+ log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
+ return true
+ }
+
+ // only send receipt events which originated from us
+ _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
+ return true
+ }
+ if receiptServerName != t.ServerName {
+ return true
+ }
+
+ joined, err := t.db.GetJoinedHosts(t.ctx, receipt.RoomID)
+ if err != nil {
+ log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room")
+ return false
+ }
+
+ names := make([]gomatrixserverlib.ServerName, len(joined))
+ for i := range joined {
+ names[i] = joined[i].ServerName
+ }
+
+ content := map[string]api.FederationReceiptMRead{}
+ content[receipt.RoomID] = api.FederationReceiptMRead{
+ User: map[string]api.FederationReceiptData{
+ receipt.UserID: {
+ Data: api.ReceiptTS{
+ TS: receipt.Timestamp,
+ },
+ EventIDs: []string{receipt.EventID},
},
- EventIDs: []string{receipt.EventID},
},
- },
- }
-
- edu := &gomatrixserverlib.EDU{
- Type: gomatrixserverlib.MReceipt,
- Origin: string(t.ServerName),
- }
- if edu.Content, err = json.Marshal(content); err != nil {
- return err
- }
-
- return t.queues.SendEDU(edu, t.ServerName, names)
+ }
+
+ edu := &gomatrixserverlib.EDU{
+ Type: gomatrixserverlib.MReceipt,
+ Origin: string(t.ServerName),
+ }
+ if edu.Content, err = json.Marshal(content); err != nil {
+ log.WithError(err).Error("failed to marshal EDU JSON")
+ return true
+ }
+
+ if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
+ log.WithError(err).Error("failed to send EDU")
+ return false
+ }
+
+ return true
+ })
}
diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go
index a8ae0894..8231fcf4 100644
--- a/federationapi/consumers/keychange.go
+++ b/federationapi/consumers/keychange.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@@ -34,6 +35,7 @@ import (
// KeyChangeConsumer consumes events that originate in key server.
type KeyChangeConsumer struct {
+ ctx context.Context
consumer *internal.ContinualConsumer
db storage.Database
queues *queue.OutgoingQueues
@@ -51,10 +53,11 @@ func NewKeyChangeConsumer(
rsAPI roomserverAPI.RoomserverInternalAPI,
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
+ ctx: process.Context(),
consumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "federationapi/keychange",
- Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
+ Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
},
@@ -100,6 +103,9 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
+ if m.DeviceKeys == nil {
+ return nil
+ }
logger := logrus.WithField("user_id", m.UserID)
// only send key change events which originated from us
@@ -113,7 +119,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
}
var queryRes roomserverAPI.QueryRoomsForUserResponse
- err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{
+ err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
UserID: m.UserID,
WantMembership: "join",
}, &queryRes)
@@ -122,7 +128,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
return nil
}
// send this key change to all servers who share rooms with this user.
- destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs)
+ destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs)
if err != nil {
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
return nil
@@ -165,7 +171,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
logger := logrus.WithField("user_id", output.UserID)
var queryRes roomserverAPI.QueryRoomsForUserResponse
- err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{
+ err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
UserID: output.UserID,
WantMembership: "join",
}, &queryRes)
@@ -174,7 +180,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
return nil
}
// send this key change to all servers who share rooms with this user.
- destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs)
+ destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
return nil
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index 20b1bacb..12410bb7 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -19,117 +19,115 @@ import (
"encoding/json"
"fmt"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/types"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- cfg *config.FederationAPI
- rsAPI api.RoomserverInternalAPI
- rsConsumer *internal.ContinualConsumer
- db storage.Database
- queues *queue.OutgoingQueues
+ ctx context.Context
+ cfg *config.FederationAPI
+ rsAPI api.RoomserverInternalAPI
+ jetstream nats.JetStreamContext
+ db storage.Database
+ queues *queue.OutgoingQueues
+ topic string
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.FederationAPI,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "federationapi/roomserver",
- Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
- Consumer: kafkaConsumer,
- PartitionStore: store,
+ return &OutputRoomEventConsumer{
+ ctx: process.Context(),
+ cfg: cfg,
+ jetstream: js,
+ db: store,
+ queues: queues,
+ rsAPI: rsAPI,
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
}
- s := &OutputRoomEventConsumer{
- cfg: cfg,
- rsConsumer: &consumer,
- db: store,
- queues: queues,
- rsAPI: rsAPI,
- }
- consumer.ProcessMessage = s.onMessage
-
- return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return s.rsConsumer.Start()
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ return err
}
// onMessage is called when the federation server receives a new event from the room server output log.
// It is unsafe to call this with messages for the same room in multiple gorountines
// because updates it will likely fail with a types.EventIDMismatchError when it
// realises that it cannot update the room state using the deltas.
-func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
- // Parse out the event JSON
- var output api.OutputEvent
- if err := json.Unmarshal(msg.Value, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("roomserver output log: message parse failure")
- return nil
- }
+func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
+ jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return true
+ }
- switch output.Type {
- case api.OutputTypeNewRoomEvent:
- ev := output.NewRoomEvent.Event
+ switch output.Type {
+ case api.OutputTypeNewRoomEvent:
+ ev := output.NewRoomEvent.Event
- if output.NewRoomEvent.RewritesState {
- if err := s.db.PurgeRoomState(context.TODO(), ev.RoomID()); err != nil {
- return fmt.Errorf("s.db.PurgeRoom: %w", err)
+ if output.NewRoomEvent.RewritesState {
+ if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil {
+ log.WithError(err).Errorf("roomserver output log: purge room state failure")
+ return false
+ }
}
- }
- if err := s.processMessage(*output.NewRoomEvent); err != nil {
- switch err.(type) {
- case *queue.ErrorFederationDisabled:
- log.WithField("error", output.Type).Info(
- err.Error(),
- )
- default:
- // panic rather than continue with an inconsistent database
+ if err := s.processMessage(*output.NewRoomEvent); err != nil {
+ switch err.(type) {
+ case *queue.ErrorFederationDisabled:
+ log.WithField("error", output.Type).Info(
+ err.Error(),
+ )
+ default:
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "event": string(ev.JSON()),
+ "add": output.NewRoomEvent.AddsStateEventIDs,
+ "del": output.NewRoomEvent.RemovesStateEventIDs,
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write room event failure")
+ }
+ }
+
+ case api.OutputTypeNewInboundPeek:
+ if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "event": string(ev.JSON()),
- "add": output.NewRoomEvent.AddsStateEventIDs,
- "del": output.NewRoomEvent.RemovesStateEventIDs,
+ "event": output.NewInboundPeek,
log.ErrorKey: err,
- }).Panicf("roomserver output log: write room event failure")
+ }).Panicf("roomserver output log: remote peek event failure")
+ return false
}
- return nil
- }
- case api.OutputTypeNewInboundPeek:
- if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
- log.WithFields(log.Fields{
- "event": output.NewInboundPeek,
- log.ErrorKey: err,
- }).Panicf("roomserver output log: remote peek event failure")
- return nil
+
+ default:
+ log.WithField("type", output.Type).Debug(
+ "roomserver output log: ignoring unknown output type",
+ )
}
- default:
- log.WithField("type", output.Type).Debug(
- "roomserver output log: ignoring unknown output type",
- )
- return nil
- }
- return nil
+ return true
+ })
}
// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)
@@ -146,7 +144,7 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
//
// This is making the tests flakey.
- return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
+ return s.db.AddInboundPeek(s.ctx, orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
}
// processMessage updates the list of currently joined hosts in the room
@@ -162,7 +160,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// TODO(#290): handle EventIDMismatchError and recover the current state by
// talking to the roomserver
oldJoinedHosts, err := s.db.UpdateRoom(
- context.TODO(),
+ s.ctx,
ore.Event.RoomID(),
ore.LastSentEventID,
ore.Event.EventID(),
@@ -255,7 +253,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
}
// handle peeking hosts
- inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.Event.RoomID())
+ inboundPeeks, err := s.db.GetInboundPeeks(s.ctx, ore.Event.Event.RoomID())
if err != nil {
return nil, err
}
@@ -373,7 +371,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
- if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
+ if err := s.rsAPI.QueryEventsByID(s.ctx, &eventReq, &eventResp); err != nil {
return nil, err
}
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index 02c4cfdb..0b181606 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -30,7 +30,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
@@ -92,7 +92,7 @@ func NewInternalAPI(
FailuresUntilBlacklist: cfg.FederationMaxRetries,
}
- consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
+ js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
queues := queue.NewOutgoingQueues(
federationDB, base.ProcessContext,
@@ -106,7 +106,7 @@ func NewInternalAPI(
)
rsConsumer := consumers.NewOutputRoomEventConsumer(
- base.ProcessContext, cfg, consumer, queues,
+ base.ProcessContext, cfg, js, queues,
federationDB, rsAPI,
)
if err = rsConsumer.Start(); err != nil {
@@ -114,7 +114,7 @@ func NewInternalAPI(
}
tsConsumer := consumers.NewOutputEDUConsumer(
- base.ProcessContext, cfg, consumer, queues, federationDB,
+ base.ProcessContext, cfg, js, queues, federationDB,
)
if err := tsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start typing server consumer")
diff --git a/federationapi/federationapi_keys_test.go b/federationapi/federationapi_keys_test.go
index 9e6c47cd..b9503963 100644
--- a/federationapi/federationapi_keys_test.go
+++ b/federationapi/federationapi_keys_test.go
@@ -74,7 +74,7 @@ func TestMain(m *testing.M) {
cfg.Defaults(true)
cfg.Global.ServerName = gomatrixserverlib.ServerName(s.name)
cfg.Global.PrivateKey = testPriv
- cfg.Global.Kafka.UseNaffka = true
+ cfg.Global.JetStream.InMemory = true
cfg.Global.KeyID = serverKeyID
cfg.Global.KeyValidityPeriod = s.validity
cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:")
diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go
index 8b5bdd03..c660f12e 100644
--- a/federationapi/federationapi_test.go
+++ b/federationapi/federationapi_test.go
@@ -23,10 +23,9 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
cfg.Global.KeyID = gomatrixserverlib.KeyID("ed25519:auto")
cfg.Global.ServerName = gomatrixserverlib.ServerName("localhost")
cfg.Global.PrivateKey = privKey
- cfg.Global.Kafka.UseNaffka = true
- cfg.Global.Kafka.Database.ConnectionString = config.DataSource("file::memory:")
+ cfg.Global.JetStream.InMemory = true
cfg.FederationAPI.Database.ConnectionString = config.DataSource("file::memory:")
- base := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics)
+ base := base.NewBaseDendrite(cfg, "Monolith")
keyRing := &test.NopJSONVerifier{}
fsAPI := base.FederationAPIHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
diff --git a/federationapi/internal/perform.go b/federationapi/internal/perform.go
index 82d04c21..b6c35842 100644
--- a/federationapi/internal/perform.go
+++ b/federationapi/internal/perform.go
@@ -249,7 +249,7 @@ func (r *FederationInternalAPI) performJoinUsingServer(
roomserverAPI.KindNew,
respState,
event.Headered(respMakeJoin.RoomVersion),
- nil,
+ nil, false,
); err != nil {
logrus.WithFields(logrus.Fields{
"room_id": roomID,
@@ -430,7 +430,7 @@ func (r *FederationInternalAPI) performOutboundPeekUsingServer(
roomserverAPI.KindNew,
&respState,
respPeek.LatestEvent.Headered(respPeek.RoomVersion),
- nil,
+ nil, false,
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}
diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go
index f0e1ae0d..7310a305 100644
--- a/federationapi/routing/join.go
+++ b/federationapi/routing/join.go
@@ -194,6 +194,12 @@ func SendJoin(
JSON: jsonerror.BadJSON("No state key was provided in the join event."),
}
}
+ if !event.StateKeyEquals(event.Sender()) {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.BadJSON("Event state key must match the event sender."),
+ }
+ }
// Check that the room ID is correct.
if event.RoomID() != roomID {
diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go
index 38f4ca76..6312adfa 100644
--- a/federationapi/routing/leave.go
+++ b/federationapi/routing/leave.go
@@ -175,10 +175,16 @@ func SendLeave(
}
}
- if event.StateKey() == nil {
+ if event.StateKey() == nil || event.StateKeyEquals("") {
return util.JSONResponse{
Code: http.StatusBadRequest,
- JSON: jsonerror.InvalidArgumentValue("missing state_key"),
+ JSON: jsonerror.BadJSON("No state key was provided in the leave event."),
+ }
+ }
+ if !event.StateKeyEquals(event.Sender()) {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.BadJSON("Event state key must match the event sender."),
}
}
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 4b5f0d66..fad23a5c 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -371,7 +371,7 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
for _, task := range tasks {
if task.err != nil {
results[task.event.EventID()] = gomatrixserverlib.PDUResult{
- Error: task.err.Error(),
+ // Error: task.err.Error(), TODO: this upsets tests if uncommented
}
} else {
results[task.event.EventID()] = gomatrixserverlib.PDUResult{}
@@ -692,6 +692,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
},
api.DoNotSendToOtherServers,
nil,
+ false,
)
}
@@ -734,6 +735,7 @@ withNextEvent:
SendAsServer: api.DoNotSendToOtherServers,
},
},
+ false,
); err != nil {
return fmt.Errorf("api.SendEvents: %w", err)
}
@@ -882,6 +884,9 @@ func (t *txnReq) processEventWithMissingState(
resolvedState,
backwardsExtremity.Headered(roomVersion),
hadEvents,
+ // Send the events to the roomserver asynchronously, so they will be
+ // processed when the roomserver is able, without blocking here.
+ true,
)
if err != nil {
return fmt.Errorf("api.SendEventWithState: %w", err)
@@ -902,6 +907,9 @@ func (t *txnReq) processEventWithMissingState(
append(headeredNewEvents, e.Headered(roomVersion)),
api.DoNotSendToOtherServers,
nil,
+ // Send the events to the roomserver asynchronously, so they will be
+ // processed when the roomserver is able, without blocking here.
+ true,
); err != nil {
return fmt.Errorf("api.SendEvents: %w", err)
}
diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go
index 5ba28881..fb919a59 100644
--- a/federationapi/routing/threepid.go
+++ b/federationapi/routing/threepid.go
@@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites(
}
// Send all the events
- if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil {
+ if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -180,6 +180,7 @@ func ExchangeThirdPartyInvite(
},
cfg.Matrix.ServerName,
nil,
+ false,
); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()