aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorS7evinK <tfaelligen@gmail.com>2022-01-05 18:44:49 +0100
committerGitHub <noreply@github.com>2022-01-05 17:44:49 +0000
commit161f14517669410d3e8207dc41eea5c9695f7e17 (patch)
tree20db8ed83d92c688206242f84880ff2e35a1d5eb /roomserver
parenta47b12dc7d692e0ddd4aaa0801dafc9bb462aad9 (diff)
Add NATS JetStream support (#1866)
* Add NATS JetStream support Update shopify/sarama * Fix addresses * Don't change Addresses in Defaults * Update saramajetstream * Add missing error check Keep typing events for at least one minute * Use all configured NATS addresses * Update saramajetstream * Try setting up with NATS * Make sure NATS uses own persistent directory (TODO: make this configurable) * Update go.mod/go.sum * Jetstream package * Various other refactoring * Build fixes * Config tweaks, make random jetstream storage path for CI * Disable interest policies * Try to sane default on jetstream base path * Try to use in-memory for CI * Restore storage/retention * Update nats.go dependency * Adapt changes to config * Remove unneeded TopicFor * Dep update * Revert "Remove unneeded TopicFor" This reverts commit f5a4e4a339b6f94ec215778dca22204adaa893d1. * Revert changes made to streams * Fix build problems * Update nats-server * Update go.mod/go.sum * Roomserver input API queuing using NATS * Fix topic naming * Prometheus metrics * More refactoring to remove saramajetstream * Add missing topic * Don't try to populate map that doesn't exist * Roomserver output topic * Update go.mod/go.sum * Message acknowledgements * Ack tweaks * Try to resume transaction re-sends * Try to resume transaction re-sends * Update to matrix-org/gomatrixserverlib@91dadfb * Remove internal.PartitionStorer from components that don't consume keychanges * Try to reduce re-allocations a bit in resolveConflictsV2 * Tweak delivery options on RS input * Publish send-to-device messages into correct JetStream subject * Async and sync roomserver input * Update dendrite-config.yaml * Remove roomserver tests for now (they need rewriting) * Remove roomserver test again (was merged back in) * Update documentation * Docker updates * More Docker updates * Update Docker readme again * Fix lint issues * Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that * Go 1.16 instead of Go 1.13 for upgrade tests and Complement * Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit 368675283fc44501f227639811bdb16dd5deef8c. * Don't report any errors on `/send` to see what fun that creates * Fix panics on closed channel sends * Enforce state key matches sender * Do the same for leave * Various tweaks to make tests happier Squashed commit of the following: commit 13f9028e7a63662759ce7c55504a9d2423058668 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:47:14 2022 +0000 Do the same for leave commit e6be7f05c349fafbdddfe818337a17a60c867be1 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 15:33:42 2022 +0000 Enforce state key matches sender commit 85ede6d64bf10ce9b91cdd6d80f87350ee55242f Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 14:07:04 2022 +0000 Fix panics on closed channel sends commit 9755494a98bed62450f8001d8128e40481d27e15 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:38:22 2022 +0000 Don't report any errors on `/send` to see what fun that creates commit 3bb4f87b5dd56882febb4db5621db484c8789b7c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 13:00:26 2022 +0000 Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that" This reverts commit 368675283fc44501f227639811bdb16dd5deef8c. commit fe2673ed7be9559eaca134424e403a4faca100b0 Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 12:09:34 2022 +0000 Go 1.16 instead of Go 1.13 for upgrade tests and Complement commit 368675283fc44501f227639811bdb16dd5deef8c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 11:51:45 2022 +0000 Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that commit b028dfc08577bcf52e6cb498026e15fa5d46d07c Author: Neil Alexander <neilalexander@users.noreply.github.com> Date: Tue Jan 4 10:29:08 2022 +0000 Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset) * Merge in NATS Server v2.6.6 and nats.go v1.13 into the in-process connection fork * Add `jetstream.WithJetStreamMessage` to make ack/nak-ing less messy, use process context in consumers * Fix consumer component name in federation API * Add comment explaining where streams are defined * Tweaks to roomserver input with comments * Finish that sentence that I apparently forgot to finish in INSTALL.md * Bump version number of config to 2 * Add comments around asynchronous sends to roomserver in processEventWithMissingState * More useful error message when the config version does not match * Set version in generate-config * Fix version in config.Defaults Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/input.go1
-rw-r--r--roomserver/api/wrapper.go15
-rw-r--r--roomserver/internal/api.go17
-rw-r--r--roomserver/internal/input/input.go250
-rw-r--r--roomserver/internal/input/input_events.go28
-rw-r--r--roomserver/internal/input/input_fifo.go64
-rw-r--r--roomserver/roomserver.go11
-rw-r--r--roomserver/roomserver_test.go407
8 files changed, 163 insertions, 630 deletions
diff --git a/roomserver/api/input.go b/roomserver/api/input.go
index 8e6e4ac7..a537e64e 100644
--- a/roomserver/api/input.go
+++ b/roomserver/api/input.go
@@ -86,6 +86,7 @@ type TransactionID struct {
// InputRoomEventsRequest is a request to InputRoomEvents
type InputRoomEventsRequest struct {
InputRoomEvents []InputRoomEvent `json:"input_room_events"`
+ Asynchronous bool `json:"async"`
}
// InputRoomEventsResponse is a response to InputRoomEvents
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go
index de66df80..cdb186c0 100644
--- a/roomserver/api/wrapper.go
+++ b/roomserver/api/wrapper.go
@@ -27,6 +27,7 @@ func SendEvents(
ctx context.Context, rsAPI RoomserverInternalAPI,
kind Kind, events []*gomatrixserverlib.HeaderedEvent,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
+ async bool,
) error {
ires := make([]InputRoomEvent, len(events))
for i, event := range events {
@@ -38,7 +39,7 @@ func SendEvents(
TransactionID: txnID,
}
}
- return SendInputRoomEvents(ctx, rsAPI, ires)
+ return SendInputRoomEvents(ctx, rsAPI, ires, async)
}
// SendEventWithState writes an event with the specified kind to the roomserver
@@ -47,7 +48,7 @@ func SendEvents(
func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
- haveEventIDs map[string]bool,
+ haveEventIDs map[string]bool, async bool,
) error {
outliers, err := state.Events()
if err != nil {
@@ -79,14 +80,18 @@ func SendEventWithState(
StateEventIDs: stateEventIDs,
})
- return SendInputRoomEvents(ctx, rsAPI, ires)
+ return SendInputRoomEvents(ctx, rsAPI, ires, async)
}
// SendInputRoomEvents to the roomserver.
func SendInputRoomEvents(
- ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent,
+ ctx context.Context, rsAPI RoomserverInternalAPI,
+ ires []InputRoomEvent, async bool,
) error {
- request := InputRoomEventsRequest{InputRoomEvents: ires}
+ request := InputRoomEventsRequest{
+ InputRoomEvents: ires,
+ Asynchronous: async,
+ }
var response InputRoomEventsResponse
rsAPI.InputRoomEvents(ctx, &request, &response)
return response.Err()
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 67bbc7ab..5cfe68da 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -3,7 +3,6 @@ package internal
import (
"context"
- "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
asAPI "github.com/matrix-org/dendrite/appservice/api"
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
@@ -16,6 +15,8 @@ import (
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
)
// RoomserverInternalAPI is an implementation of api.RoomserverInternalAPI
@@ -33,19 +34,19 @@ type RoomserverInternalAPI struct {
*perform.Forgetter
DB storage.Database
Cfg *config.RoomServer
- Producer sarama.SyncProducer
Cache caching.RoomServerCaches
ServerName gomatrixserverlib.ServerName
KeyRing gomatrixserverlib.JSONVerifier
fsAPI fsAPI.FederationInternalAPI
asAPI asAPI.AppServiceQueryAPI
- OutputRoomEventTopic string // Kafka topic for new output room events
+ InputRoomEventTopic string // JetStream topic for new input room events
+ OutputRoomEventTopic string // JetStream topic for new output room events
PerspectiveServerNames []gomatrixserverlib.ServerName
}
func NewRoomserverAPI(
- cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer,
- outputRoomEventTopic string, caches caching.RoomServerCaches,
+ cfg *config.RoomServer, roomserverDB storage.Database, consumer nats.JetStreamContext,
+ inputRoomEventTopic, outputRoomEventTopic string, caches caching.RoomServerCaches,
perspectiveServerNames []gomatrixserverlib.ServerName,
) *RoomserverInternalAPI {
serverACLs := acls.NewServerACLs(roomserverDB)
@@ -63,13 +64,17 @@ func NewRoomserverAPI(
},
Inputer: &input.Inputer{
DB: roomserverDB,
+ InputRoomEventTopic: inputRoomEventTopic,
OutputRoomEventTopic: outputRoomEventTopic,
- Producer: producer,
+ JetStream: consumer,
ServerName: cfg.Matrix.ServerName,
ACLs: serverACLs,
},
// perform-er structs get initialised when we have a federation sender to use
}
+ if err := a.Inputer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start roomserver input API")
+ }
return a
}
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index de40e133..1eab6780 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -19,19 +19,19 @@ import (
"context"
"encoding/json"
"sync"
- "time"
- "github.com/Shopify/sarama"
+ "github.com/Arceliar/phony"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal/hooks"
"github.com/matrix-org/dendrite/roomserver/acls"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
- "go.uber.org/atomic"
)
var keyContentFields = map[string]string{
@@ -42,105 +42,161 @@ var keyContentFields = map[string]string{
type Inputer struct {
DB storage.Database
- Producer sarama.SyncProducer
+ JetStream nats.JetStreamContext
ServerName gomatrixserverlib.ServerName
ACLs *acls.ServerACLs
+ InputRoomEventTopic string
OutputRoomEventTopic string
- workers sync.Map // room ID -> *inputWorker
+ workers sync.Map // room ID -> *phony.Inbox
}
-type inputTask struct {
- ctx context.Context
- event *api.InputRoomEvent
- wg *sync.WaitGroup
- err error // written back by worker, only safe to read when all tasks are done
-}
-
-type inputWorker struct {
- r *Inputer
- running atomic.Bool
- input *fifoQueue
+// onMessage is called when a new event arrives in the roomserver input stream.
+func (r *Inputer) Start() error {
+ _, err := r.JetStream.Subscribe(
+ r.InputRoomEventTopic,
+ // We specifically don't use jetstream.WithJetStreamMessage here because we
+ // queue the task off to a room-specific queue and the ACK needs to be sent
+ // later, possibly with an error response to the inputter if synchronous.
+ func(msg *nats.Msg) {
+ roomID := msg.Header.Get("room_id")
+ defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
+ var inputRoomEvent api.InputRoomEvent
+ if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
+ _ = msg.Term()
+ return
+ }
+ inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
+ inbox.(*phony.Inbox).Act(nil, func() {
+ if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
+ sentry.CaptureException(err)
+ } else {
+ hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event)
+ }
+ _ = msg.Ack()
+ })
+ },
+ // NATS wants to acknowledge automatically by default when the message is
+ // read from the stream, but we want to override that behaviour by making
+ // sure that we only acknowledge when we're happy we've done everything we
+ // can. This ensures we retry things when it makes sense to do so.
+ nats.ManualAck(),
+ // NATS will try to redeliver things to us automatically if we don't ack
+ // or nak them within a certain amount of time. This stops that from
+ // happening, so we don't end up doing a lot of unnecessary duplicate work.
+ nats.MaxDeliver(0),
+ )
+ return err
}
-// Guarded by a CAS on w.running
-func (w *inputWorker) start() {
- defer w.running.Store(false)
- for {
- select {
- case <-w.input.wait():
- task, ok := w.input.pop()
- if !ok {
- continue
+// InputRoomEvents implements api.RoomserverInternalAPI
+func (r *Inputer) InputRoomEvents(
+ ctx context.Context,
+ request *api.InputRoomEventsRequest,
+ response *api.InputRoomEventsResponse,
+) {
+ if request.Asynchronous {
+ var err error
+ for _, e := range request.InputRoomEvents {
+ msg := &nats.Msg{
+ Subject: r.InputRoomEventTopic,
+ Header: nats.Header{},
+ }
+ roomID := e.Event.RoomID()
+ msg.Header.Set("room_id", roomID)
+ msg.Data, err = json.Marshal(e)
+ if err != nil {
+ response.ErrMsg = err.Error()
+ return
}
- roomserverInputBackpressure.With(prometheus.Labels{
- "room_id": task.event.Event.RoomID(),
- }).Dec()
- hooks.Run(hooks.KindNewEventReceived, task.event.Event)
- _, task.err = w.r.processRoomEvent(task.ctx, task.event)
- if task.err == nil {
- hooks.Run(hooks.KindNewEventPersisted, task.event.Event)
- } else {
- sentry.CaptureException(task.err)
+ if _, err = r.JetStream.PublishMsg(msg); err != nil {
+ return
+ }
+ roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
+ }
+ } else {
+ responses := make(chan error, len(request.InputRoomEvents))
+ defer close(responses)
+ for _, e := range request.InputRoomEvents {
+ inputRoomEvent := e
+ inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
+ inbox.(*phony.Inbox).Act(nil, func() {
+ err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
+ if err != nil {
+ sentry.CaptureException(err)
+ } else {
+ hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event)
+ }
+ select {
+ case <-ctx.Done():
+ default:
+ responses <- err
+ }
+ })
+ }
+ for i := 0; i < len(request.InputRoomEvents); i++ {
+ select {
+ case <-ctx.Done():
+ return
+ case err := <-responses:
+ if err != nil {
+ response.ErrMsg = err.Error()
+ return
+ }
}
- task.wg.Done()
- case <-time.After(time.Second * 5):
- return
}
}
}
// WriteOutputEvents implements OutputRoomEventWriter
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
- messages := make([]*sarama.ProducerMessage, len(updates))
- for i := range updates {
- value, err := json.Marshal(updates[i])
+ var err error
+ for _, update := range updates {
+ msg := &nats.Msg{
+ Subject: r.OutputRoomEventTopic,
+ Header: nats.Header{},
+ }
+ msg.Header.Set(jetstream.RoomID, roomID)
+ msg.Data, err = json.Marshal(update)
if err != nil {
return err
}
logger := log.WithFields(log.Fields{
"room_id": roomID,
- "type": updates[i].Type,
+ "type": update.Type,
})
- if updates[i].NewRoomEvent != nil {
- eventType := updates[i].NewRoomEvent.Event.Type()
+ if update.NewRoomEvent != nil {
+ eventType := update.NewRoomEvent.Event.Type()
logger = logger.WithFields(log.Fields{
"event_type": eventType,
- "event_id": updates[i].NewRoomEvent.Event.EventID(),
- "adds_state": len(updates[i].NewRoomEvent.AddsStateEventIDs),
- "removes_state": len(updates[i].NewRoomEvent.RemovesStateEventIDs),
- "send_as_server": updates[i].NewRoomEvent.SendAsServer,
- "sender": updates[i].NewRoomEvent.Event.Sender(),
+ "event_id": update.NewRoomEvent.Event.EventID(),
+ "adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
+ "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
+ "send_as_server": update.NewRoomEvent.SendAsServer,
+ "sender": update.NewRoomEvent.Event.Sender(),
})
- if updates[i].NewRoomEvent.Event.StateKey() != nil {
- logger = logger.WithField("state_key", *updates[i].NewRoomEvent.Event.StateKey())
+ if update.NewRoomEvent.Event.StateKey() != nil {
+ logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
}
contentKey := keyContentFields[eventType]
if contentKey != "" {
- value := gjson.GetBytes(updates[i].NewRoomEvent.Event.Content(), contentKey)
+ value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
if value.Exists() {
logger = logger.WithField("content_value", value.String())
}
}
- if eventType == "m.room.server_acl" && updates[i].NewRoomEvent.Event.StateKeyEquals("") {
- ev := updates[i].NewRoomEvent.Event.Unwrap()
+ if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
+ ev := update.NewRoomEvent.Event.Unwrap()
defer r.ACLs.OnServerACLUpdate(ev)
}
}
- logger.Infof("Producing to topic '%s'", r.OutputRoomEventTopic)
- messages[i] = &sarama.ProducerMessage{
- Topic: r.OutputRoomEventTopic,
- Key: sarama.StringEncoder(roomID),
- Value: sarama.ByteEncoder(value),
- }
- }
- errs := r.Producer.SendMessages(messages)
- if errs != nil {
- for _, err := range errs.(sarama.ProducerErrors) {
- log.WithError(err).WithField("message_bytes", err.Msg.Value.Length()).Error("Write to kafka failed")
+ logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic)
+ if _, err := r.JetStream.PublishMsg(msg); err != nil {
+ logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err)
+ return err
}
}
- return errs
+ return nil
}
func init() {
@@ -156,67 +212,3 @@ var roomserverInputBackpressure = prometheus.NewGaugeVec(
},
[]string{"room_id"},
)
-
-// InputRoomEvents implements api.RoomserverInternalAPI
-func (r *Inputer) InputRoomEvents(
- _ context.Context,
- request *api.InputRoomEventsRequest,
- response *api.InputRoomEventsResponse,
-) {
- // Create a wait group. Each task that we dispatch will call Done on
- // this wait group so that we know when all of our events have been
- // processed.
- wg := &sync.WaitGroup{}
- wg.Add(len(request.InputRoomEvents))
- tasks := make([]*inputTask, len(request.InputRoomEvents))
-
- for i, e := range request.InputRoomEvents {
- // Work out if we are running per-room workers or if we're just doing
- // it on a global basis (e.g. SQLite).
- roomID := "global"
- if r.DB.SupportsConcurrentRoomInputs() {
- roomID = e.Event.RoomID()
- }
-
- // Look up the worker, or create it if it doesn't exist. This channel
- // is buffered to reduce the chance that we'll be blocked by another
- // room - the channel will be quite small as it's just pointer types.
- w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
- r: r,
- input: newFIFOQueue(),
- })
- worker := w.(*inputWorker)
-
- // Create a task. This contains the input event and a reference to
- // the wait group, so that the worker can notify us when this specific
- // task has been finished.
- tasks[i] = &inputTask{
- ctx: context.Background(),
- event: &request.InputRoomEvents[i],
- wg: wg,
- }
-
- // Send the task to the worker.
- if worker.running.CAS(false, true) {
- go worker.start()
- }
- worker.input.push(tasks[i])
- roomserverInputBackpressure.With(prometheus.Labels{
- "room_id": roomID,
- }).Inc()
- }
-
- // Wait for all of the workers to return results about our tasks.
- wg.Wait()
-
- // If any of the tasks returned an error, we should probably report
- // that back to the caller.
- for _, task := range tasks {
- if task.err != nil {
- response.ErrMsg = task.err.Error()
- _, rejected := task.err.(*gomatrixserverlib.NotAllowed)
- response.NotAllowed = rejected
- return
- }
- }
-}
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index fc712f47..791f7f30 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -62,7 +62,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
func (r *Inputer) processRoomEvent(
ctx context.Context,
input *api.InputRoomEvent,
-) (eventID string, err error) {
+) (err error) {
// Measure how long it takes to process this event.
started := time.Now()
defer func() {
@@ -88,11 +88,11 @@ func (r *Inputer) processRoomEvent(
case gomatrixserverlib.EventIDFormatV1:
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
- return event.EventID(), nil
+ return nil
}
default:
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
- return event.EventID(), nil
+ return nil
}
}
}
@@ -124,14 +124,14 @@ func (r *Inputer) processRoomEvent(
// Store the event.
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
if err != nil {
- return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
+ return fmt.Errorf("r.DB.StoreEvent: %w", err)
}
// if storing this event results in it being redacted then do so.
if !isRejected && redactedEventID == event.EventID() {
r, rerr := eventutil.RedactEvent(redactionEvent, event)
if rerr != nil {
- return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr)
+ return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
}
event = r
}
@@ -146,15 +146,15 @@ func (r *Inputer) processRoomEvent(
"room": event.RoomID(),
"sender": event.Sender(),
}).Debug("Stored outlier")
- return event.EventID(), nil
+ return nil
}
roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID())
if err != nil {
- return "", fmt.Errorf("r.DB.RoomInfo: %w", err)
+ return fmt.Errorf("r.DB.RoomInfo: %w", err)
}
if roomInfo == nil {
- return "", fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
+ return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
}
if stateAtEvent.BeforeStateSnapshotNID == 0 {
@@ -162,7 +162,7 @@ func (r *Inputer) processRoomEvent(
// Lets calculate one.
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
if err != nil && input.Kind != api.KindOld {
- return "", fmt.Errorf("r.calculateAndSetState: %w", err)
+ return fmt.Errorf("r.calculateAndSetState: %w", err)
}
}
@@ -175,7 +175,7 @@ func (r *Inputer) processRoomEvent(
"soft_fail": softfail,
"sender": event.Sender(),
}).Debug("Stored rejected event")
- return event.EventID(), rejectionErr
+ return rejectionErr
}
switch input.Kind {
@@ -189,7 +189,7 @@ func (r *Inputer) processRoomEvent(
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
); err != nil {
- return "", fmt.Errorf("r.updateLatestEvents: %w", err)
+ return fmt.Errorf("r.updateLatestEvents: %w", err)
}
case api.KindOld:
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
@@ -201,7 +201,7 @@ func (r *Inputer) processRoomEvent(
},
})
if err != nil {
- return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err)
+ return fmt.Errorf("r.WriteOutputEvents (old): %w", err)
}
}
@@ -220,12 +220,12 @@ func (r *Inputer) processRoomEvent(
},
})
if err != nil {
- return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
+ return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
}
}
// Update the extremities of the event graph for the room
- return event.EventID(), nil
+ return nil
}
func (r *Inputer) calculateAndSetState(
diff --git a/roomserver/internal/input/input_fifo.go b/roomserver/internal/input/input_fifo.go
deleted file mode 100644
index 694b1724..00000000
--- a/roomserver/internal/input/input_fifo.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package input
-
-import (
- "sync"
-)
-
-type fifoQueue struct {
- tasks []*inputTask
- count int
- mutex sync.Mutex
- notifs chan struct{}
-}
-
-func newFIFOQueue() *fifoQueue {
- q := &fifoQueue{
- notifs: make(chan struct{}, 1),
- }
- return q
-}
-
-func (q *fifoQueue) push(frame *inputTask) {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- q.tasks = append(q.tasks, frame)
- q.count++
- select {
- case q.notifs <- struct{}{}:
- default:
- }
-}
-
-// pop returns the first item of the queue, if there is one.
-// The second return value will indicate if a task was returned.
-// You must check this value, even after calling wait().
-func (q *fifoQueue) pop() (*inputTask, bool) {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- if q.count == 0 {
- return nil, false
- }
- frame := q.tasks[0]
- q.tasks[0] = nil
- q.tasks = q.tasks[1:]
- q.count--
- if q.count == 0 {
- // Force a GC of the underlying array, since it might have
- // grown significantly if the queue was hammered for some reason
- q.tasks = nil
- }
- return frame, true
-}
-
-// wait returns a channel which can be used to detect when an
-// item is waiting in the queue.
-func (q *fifoQueue) wait() <-chan struct{} {
- q.mutex.Lock()
- defer q.mutex.Unlock()
- if q.count > 0 && len(q.notifs) == 0 {
- ch := make(chan struct{})
- close(ch)
- return ch
- }
- return q.notifs
-}
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index e4742100..669957be 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -23,8 +23,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/internal"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/sirupsen/logrus"
)
@@ -41,8 +40,6 @@ func NewInternalAPI(
) api.RoomserverInternalAPI {
cfg := &base.Cfg.RoomServer
- _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka)
-
var perspectiveServerNames []gomatrixserverlib.ServerName
for _, kp := range base.Cfg.FederationAPI.KeyPerspectives {
perspectiveServerNames = append(perspectiveServerNames, kp.ServerName)
@@ -53,8 +50,12 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to room server db")
}
+ js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+
return internal.NewRoomserverAPI(
- cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
+ cfg, roomserverDB, js,
+ cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent),
+ cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames,
)
}
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
deleted file mode 100644
index 40e8e92d..00000000
--- a/roomserver/roomserver_test.go
+++ /dev/null
@@ -1,407 +0,0 @@
-package roomserver
-
-import (
- "bytes"
- "context"
- "crypto/ed25519"
- "encoding/json"
- "fmt"
- "os"
- "reflect"
- "testing"
- "time"
-
- "github.com/Shopify/sarama"
- "github.com/matrix-org/dendrite/internal/caching"
- "github.com/matrix-org/dendrite/roomserver/api"
- "github.com/matrix-org/dendrite/roomserver/internal"
- "github.com/matrix-org/dendrite/roomserver/storage"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/sirupsen/logrus"
-)
-
-const (
- testOrigin = gomatrixserverlib.ServerName("kaer.morhen")
- // we have to use an on-disk DB because we open multiple connections due to the *Updater structs.
- // Using :memory: results in a brand new DB for each open connection, and sharing memory via
- // ?cache=shared just allows read-only sharing, so writes to the database on other connections are lost.
- roomserverDBFileURI = "file:roomserver_test.db"
- roomserverDBFilePath = "./roomserver_test.db"
-)
-
-var (
- ctx = context.Background()
-)
-
-type dummyProducer struct {
- topic string
- producedMessages []*api.OutputEvent
-}
-
-// SendMessage produces a given message, and returns only when it either has
-// succeeded or failed to produce. It will return the partition and the offset
-// of the produced message, or an error if the message failed to produce.
-func (p *dummyProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
- if msg.Topic != p.topic {
- return 0, 0, nil
- }
- be := msg.Value.(sarama.ByteEncoder)
- b := json.RawMessage(be)
- fmt.Println("SENDING >>>>>>>> ", string(b))
- var out api.OutputEvent
- err = json.Unmarshal(b, &out)
- if err != nil {
- return 0, 0, err
- }
- p.producedMessages = append(p.producedMessages, &out)
- return 0, 0, nil
-}
-
-// SendMessages produces a given set of messages, and returns only when all
-// messages in the set have either succeeded or failed. Note that messages
-// can succeed and fail individually; if some succeed and some fail,
-// SendMessages will return an error.
-func (p *dummyProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
- for _, m := range msgs {
- p.SendMessage(m)
- }
- return nil
-}
-
-// Close shuts down the producer and waits for any buffered messages to be
-// flushed. You must call this function before a producer object passes out of
-// scope, as it may otherwise leak memory. You must call this before calling
-// Close on the underlying client.
-func (p *dummyProducer) Close() error {
- return nil
-}
-
-func deleteDatabase() {
- err := os.Remove(roomserverDBFilePath)
- if err != nil {
- fmt.Printf("failed to delete database %s: %s\n", roomserverDBFilePath, err)
- }
-}
-
-type fledglingEvent struct {
- Type string
- StateKey *string
- Content interface{}
- Sender string
- RoomID string
-}
-
-func mustCreateEvents(t *testing.T, roomVer gomatrixserverlib.RoomVersion, events []fledglingEvent) (result []*gomatrixserverlib.HeaderedEvent) {
- t.Helper()
- depth := int64(1)
- seed := make([]byte, ed25519.SeedSize) // zero seed
- key := ed25519.NewKeyFromSeed(seed)
- var prevs []string
- roomState := make(map[gomatrixserverlib.StateKeyTuple]string) // state -> event ID
- for _, ev := range events {
- eb := gomatrixserverlib.EventBuilder{
- Sender: ev.Sender,
- Depth: depth,
- Type: ev.Type,
- StateKey: ev.StateKey,
- RoomID: ev.RoomID,
- PrevEvents: prevs,
- }
- err := eb.SetContent(ev.Content)
- if err != nil {
- t.Fatalf("mustCreateEvent: failed to marshal event content %+v", ev.Content)
- }
- stateNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(&eb)
- if err != nil {
- t.Fatalf("mustCreateEvent: failed to work out auth_events : %s", err)
- }
- var authEvents []string
- for _, tuple := range stateNeeded.Tuples() {
- eventID := roomState[tuple]
- if eventID != "" {
- authEvents = append(authEvents, eventID)
- }
- }
- eb.AuthEvents = authEvents
- signedEvent, err := eb.Build(time.Now(), testOrigin, "ed25519:test", key, roomVer)
- if err != nil {
- t.Fatalf("mustCreateEvent: failed to sign event: %s", err)
- }
- depth++
- prevs = []string{signedEvent.EventID()}
- if ev.StateKey != nil {
- roomState[gomatrixserverlib.StateKeyTuple{
- EventType: ev.Type,
- StateKey: *ev.StateKey,
- }] = signedEvent.EventID()
- }
- result = append(result, signedEvent.Headered(roomVer))
- }
- return
-}
-
-func mustLoadRawEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) []*gomatrixserverlib.HeaderedEvent {
- t.Helper()
- hs := make([]*gomatrixserverlib.HeaderedEvent, len(events))
- for i := range events {
- e, err := gomatrixserverlib.NewEventFromTrustedJSON(events[i], false, ver)
- if err != nil {
- t.Fatalf("cannot load test data: " + err.Error())
- }
- hs[i] = e.Headered(ver)
- }
- return hs
-}
-
-func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyProducer) {
- t.Helper()
- cfg := &config.Dendrite{}
- cfg.Defaults(true)
- cfg.Global.ServerName = testOrigin
- cfg.Global.Kafka.UseNaffka = true
- cfg.RoomServer.Database = config.DatabaseOptions{
- ConnectionString: roomserverDBFileURI,
- }
- dp := &dummyProducer{
- topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
- }
- cache, err := caching.NewInMemoryLRUCache(false)
- if err != nil {
- t.Fatalf("failed to make caches: %s", err)
- }
- base := &base.BaseDendrite{
- Caches: cache,
- Cfg: cfg,
- }
- roomserverDB, err := storage.Open(&cfg.RoomServer.Database, base.Caches)
- if err != nil {
- logrus.WithError(err).Panicf("failed to connect to room server db")
- }
- return internal.NewRoomserverAPI(
- &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)),
- base.Caches, nil,
- ), dp
-}
-
-func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []*gomatrixserverlib.HeaderedEvent) {
- t.Helper()
- rsAPI, dp := mustCreateRoomserverAPI(t)
- hevents := mustLoadRawEvents(t, ver, events)
- if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil {
- t.Errorf("failed to SendEvents: %s", err)
- }
- return rsAPI, dp, hevents
-}
-
-func TestOutputRedactedEvent(t *testing.T) {
- redactionEvents := []json.RawMessage{
- // create event
- []byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$N4us6vqqq3RjvpKd:kaer.morhen","hashes":{"sha256":"WTdrCn/YsiounXcJPsLP8xT0ZjHiO5Ov0NvXYmK2onE"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"9+5JcpaN5b5KlHYHGp6r+GoNDH98lbfzGYwjfxensa5C5D/bDACaYnMDLnhwsHOE5nxgI+jT/GV271pz6PMSBQ"}},"state_key":"","type":"m.room.create"}`),
- // join event
- []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}]],"content":{"membership":"join"},"depth":1,"event_id":"$6sUiGPQ0a3tqYGKo:kaer.morhen","hashes":{"sha256":"eYVBC7RO+FlxRyW1aXYf/ad4Dzi7T93tArdGw3r4RwQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"tiDBTPFa53YMfHiupX3vSRE/ZcCiCjmGt7gDpIpDpwZapeays5Vqqcqb7KiywrDldpTkrrdJBAw2jXcq6ZyhDw"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`),
- // room name
- []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"name":"My Room Name"},"depth":2,"event_id":"$VC1zZ9YWwuUbSNHD:kaer.morhen","hashes":{"sha256":"bpqTkfLx6KHzWz7/wwpsXnXwJWEGW14aV63ffexzDFg"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"mhJZ3X4bAKrF/T0mtPf1K2Tmls0h6xGY1IPDpJ/SScQBqDlu3HQR2BPa7emqj5bViyLTWVNh+ZCpzx/6STTrAg"}},"state_key":"","type":"m.room.name"}`),
- // redact room name
- []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"reason":"Spamming"},"depth":3,"event_id":"$tJI0pE3b8u9UMYpT:kaer.morhen","hashes":{"sha256":"/3TStqa5SQqYaEtl7ajEvSRvu6d12MMKfICUzrBpd2Q"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$VC1zZ9YWwuUbSNHD:kaer.morhen",{"sha256":"+l8cNa7syvm0EF7CAmQRlYknLEMjivnI4FLhB/TUBEY"}]],"redacts":"$VC1zZ9YWwuUbSNHD:kaer.morhen","room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"QBOh+amf0vTJbm6+9VwAcR9uJviBIor2KON0Y7+EyQx5YbUZEzW1HPeJxarLIHBcxMzgOVzjuM+StzjbUgDzAg"}},"type":"m.room.redaction"}`),
- // message
- []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"body":"Test Message"},"depth":4,"event_id":"$o8KHsgSIYbJrddnd:kaer.morhen","hashes":{"sha256":"IE/rGVlKOpiGWeIo887g1CK1drYqcWDZhL6THZHkJ1c"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$tJI0pE3b8u9UMYpT:kaer.morhen",{"sha256":"zvmwyXuDox7jpA16JRH6Fc1zbfQht2tpkBbMTUOi3Jw"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"/3z+pJjiJXWhwfqIEzmNksvBHCoXTktK/y0rRuWJXw6i1+ygRG/suDCKhFuuz6gPapRmEMPVILi2mJqHHXPKAg"}},"type":"m.room.message"}`),
- // redact previous message
- []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"reason":"Spamming more"},"depth":5,"event_id":"$UpsE8belb2gJItJG:kaer.morhen","hashes":{"sha256":"zU8PWJOld/I7OtjdpltFSKC+DMNm2ZyEXAHcprsafD0"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$o8KHsgSIYbJrddnd:kaer.morhen",{"sha256":"UgjMuCFXH4warIjKuwlRq9zZ6dSJrZWCd+CkqtgLSHM"}]],"redacts":"$o8KHsgSIYbJrddnd:kaer.morhen","room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zxFGr/7aGOzqOEN6zRNrBpFkkMnfGFPbCteYL33wC+PycBPIK+2WRa5qlAR2+lcLiK3HjIzwRYkKNsVFTqvRAw"}},"type":"m.room.redaction"}`),
- }
- var redactedOutputs []api.OutputEvent
- deleteDatabase()
- _, producer, hevents := mustSendEvents(t, gomatrixserverlib.RoomVersionV1, redactionEvents)
- defer deleteDatabase()
- for _, msg := range producer.producedMessages {
- if msg.Type == api.OutputTypeRedactedEvent {
- redactedOutputs = append(redactedOutputs, *msg)
- }
- }
- wantRedactedOutputs := []api.OutputEvent{
- {
- Type: api.OutputTypeRedactedEvent,
- RedactedEvent: &api.OutputRedactedEvent{
- RedactedEventID: hevents[2].EventID(),
- RedactedBecause: hevents[3],
- },
- },
- {
- Type: api.OutputTypeRedactedEvent,
- RedactedEvent: &api.OutputRedactedEvent{
- RedactedEventID: hevents[4].EventID(),
- RedactedBecause: hevents[5],
- },
- },
- }
- t.Logf("redactedOutputs: %+v", redactedOutputs)
- if len(wantRedactedOutputs) != len(redactedOutputs) {
- t.Fatalf("Got %d redacted events, want %d", len(redactedOutputs), len(wantRedactedOutputs))
- }
- for i := 0; i < len(wantRedactedOutputs); i++ {
- if !reflect.DeepEqual(*redactedOutputs[i].RedactedEvent, *wantRedactedOutputs[i].RedactedEvent) {
- t.Errorf("OutputRedactionEvent %d: wrong event got:\n%+v want:\n%+v", i+1, redactedOutputs[i].RedactedEvent, wantRedactedOutputs[i].RedactedEvent)
- }
- }
-}
-
-// This tests that rewriting state works correctly.
-// This creates a small room with a create/join/name state, then replays it
-// with a new room name. We expect the output events to contain the original events,
-// followed by a single OutputNewRoomEvent with RewritesState set to true with the
-// rewritten state events (with the 2nd room name).
-func TestOutputRewritesState(t *testing.T) {
- roomID := "!foo:" + string(testOrigin)
- alice := "@alice:" + string(testOrigin)
- emptyKey := ""
- originalEvents := mustCreateEvents(t, gomatrixserverlib.RoomVersionV6, []fledglingEvent{
- {
- RoomID: roomID,
- Sender: alice,
- Content: map[string]interface{}{
- "creator": alice,
- "room_version": "6",
- },
- StateKey: &emptyKey,
- Type: gomatrixserverlib.MRoomCreate,
- },
- {
- RoomID: roomID,
- Sender: alice,
- Content: map[string]interface{}{
- "membership": "join",
- },
- StateKey: &alice,
- Type: gomatrixserverlib.MRoomMember,
- },
- {
- RoomID: roomID,
- Sender: alice,
- Content: map[string]interface{}{
- "body": "hello world",
- },
- StateKey: nil,
- Type: "m.room.message",
- },
- {
- RoomID: roomID,
- Sender: alice,
- Content: map[string]interface{}{
- "name": "Room Name",
- },
- StateKey: &emptyKey,
- Type: "m.room.name",
- },
- })
- rewriteEvents := mustCreateEvents(t, gomatrixserverlib.RoomVersionV6, []fledglingEvent{
- {
- RoomID: roomID,
- Sender: alice,
- Content: map[string]interface{}{
- "creator": alice,
- },
- StateKey: &emptyKey,
- Type: gomatrixserverlib.MRoomCreate,
- },
- {
- RoomID: roomID,
- Sender: alice,
- Content: map[string]interface{}{
- "membership": "join",
- },
- StateKey: &alice,
- Type: gomatrixserverlib.MRoomMember,
- },
- {
- RoomID: roomID,
- Sender: alice,
- Content: map[string]interface{}{
- "name": "Room Name 2",
- },
- StateKey: &emptyKey,
- Type: "m.room.name",
- },
- {
- RoomID: roomID,
- Sender: alice,
- Content: map[string]interface{}{
- "body": "hello world 2",
- },
- StateKey: nil,
- Type: "m.room.message",
- },
- })
- deleteDatabase()
- rsAPI, producer := mustCreateRoomserverAPI(t)
- defer deleteDatabase()
- err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil)
- if err != nil {
- t.Fatalf("failed to send original events: %s", err)
- }
- // assert we got them produced, this is just a sanity check and isn't the intention of this test
- if len(producer.producedMessages) != len(originalEvents) {
- t.Fatalf("SendEvents didn't result in same number of produced output events: got %d want %d", len(producer.producedMessages), len(originalEvents))
- }
- producer.producedMessages = nil // we aren't actually interested in these events, just the rewrite ones
-
- var inputEvents []api.InputRoomEvent
- // slowly build up the state IDs again, we're basically telling the roomserver what to store as a snapshot
- var stateIDs []string
- // skip the last event, we'll use this to tie together the rewrite as the KindNew event
- for i := 0; i < len(rewriteEvents)-1; i++ {
- ev := rewriteEvents[i]
- inputEvents = append(inputEvents, api.InputRoomEvent{
- Kind: api.KindOutlier,
- Event: ev,
- AuthEventIDs: ev.AuthEventIDs(),
- HasState: true,
- StateEventIDs: stateIDs,
- })
- if ev.StateKey() != nil {
- stateIDs = append(stateIDs, ev.EventID())
- }
- }
- lastEv := rewriteEvents[len(rewriteEvents)-1]
- inputEvents = append(inputEvents, api.InputRoomEvent{
- Kind: api.KindNew,
- Event: lastEv,
- AuthEventIDs: lastEv.AuthEventIDs(),
- HasState: true,
- StateEventIDs: stateIDs,
- })
- if err := api.SendInputRoomEvents(context.Background(), rsAPI, inputEvents); err != nil {
- t.Fatalf("SendInputRoomEvents returned error for rewrite events: %s", err)
- }
- // we should just have one output event with the entire state of the room in it
- if len(producer.producedMessages) != 1 {
- t.Fatalf("Rewritten events got output, want only 1 got %d", len(producer.producedMessages))
- }
- outputEvent := producer.producedMessages[len(producer.producedMessages)-1]
- if !outputEvent.NewRoomEvent.RewritesState {
- t.Errorf("RewritesState flag not set on output event")
- }
- if !reflect.DeepEqual(stateIDs, outputEvent.NewRoomEvent.AddsStateEventIDs) {
- t.Errorf("Output event is missing room state event IDs, got %v want %v", outputEvent.NewRoomEvent.AddsStateEventIDs, stateIDs)
- }
- if !bytes.Equal(outputEvent.NewRoomEvent.Event.JSON(), lastEv.JSON()) {
- t.Errorf(
- "Output event isn't the latest KindNew event:\ngot %s\nwant %s",
- string(outputEvent.NewRoomEvent.Event.JSON()),
- string(lastEv.JSON()),
- )
- }
- if len(outputEvent.NewRoomEvent.AddStateEvents) != len(stateIDs) {
- t.Errorf("Output event is missing room state events themselves, got %d want %d", len(outputEvent.NewRoomEvent.AddStateEvents), len(stateIDs))
- }
- // make sure the state got overwritten, check the room name
- hasRoomName := false
- for _, ev := range outputEvent.NewRoomEvent.AddStateEvents {
- if ev.Type() == "m.room.name" {
- hasRoomName = string(ev.Content()) == `{"name":"Room Name 2"}`
- }
- }
- if !hasRoomName {
- t.Errorf("Output event did not overwrite room state")
- }
-}