diff options
author | S7evinK <tfaelligen@gmail.com> | 2022-01-05 18:44:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-05 17:44:49 +0000 |
commit | 161f14517669410d3e8207dc41eea5c9695f7e17 (patch) | |
tree | 20db8ed83d92c688206242f84880ff2e35a1d5eb /roomserver | |
parent | a47b12dc7d692e0ddd4aaa0801dafc9bb462aad9 (diff) |
Add NATS JetStream support (#1866)
* Add NATS JetStream support
Update shopify/sarama
* Fix addresses
* Don't change Addresses in Defaults
* Update saramajetstream
* Add missing error check
Keep typing events for at least one minute
* Use all configured NATS addresses
* Update saramajetstream
* Try setting up with NATS
* Make sure NATS uses own persistent directory (TODO: make this configurable)
* Update go.mod/go.sum
* Jetstream package
* Various other refactoring
* Build fixes
* Config tweaks, make random jetstream storage path for CI
* Disable interest policies
* Try to sane default on jetstream base path
* Try to use in-memory for CI
* Restore storage/retention
* Update nats.go dependency
* Adapt changes to config
* Remove unneeded TopicFor
* Dep update
* Revert "Remove unneeded TopicFor"
This reverts commit f5a4e4a339b6f94ec215778dca22204adaa893d1.
* Revert changes made to streams
* Fix build problems
* Update nats-server
* Update go.mod/go.sum
* Roomserver input API queuing using NATS
* Fix topic naming
* Prometheus metrics
* More refactoring to remove saramajetstream
* Add missing topic
* Don't try to populate map that doesn't exist
* Roomserver output topic
* Update go.mod/go.sum
* Message acknowledgements
* Ack tweaks
* Try to resume transaction re-sends
* Try to resume transaction re-sends
* Update to matrix-org/gomatrixserverlib@91dadfb
* Remove internal.PartitionStorer from components that don't consume keychanges
* Try to reduce re-allocations a bit in resolveConflictsV2
* Tweak delivery options on RS input
* Publish send-to-device messages into correct JetStream subject
* Async and sync roomserver input
* Update dendrite-config.yaml
* Remove roomserver tests for now (they need rewriting)
* Remove roomserver test again (was merged back in)
* Update documentation
* Docker updates
* More Docker updates
* Update Docker readme again
* Fix lint issues
* Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset)
* Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that
* Go 1.16 instead of Go 1.13 for upgrade tests and Complement
* Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that"
This reverts commit 368675283fc44501f227639811bdb16dd5deef8c.
* Don't report any errors on `/send` to see what fun that creates
* Fix panics on closed channel sends
* Enforce state key matches sender
* Do the same for leave
* Various tweaks to make tests happier
Squashed commit of the following:
commit 13f9028e7a63662759ce7c55504a9d2423058668
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 15:47:14 2022 +0000
Do the same for leave
commit e6be7f05c349fafbdddfe818337a17a60c867be1
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 15:33:42 2022 +0000
Enforce state key matches sender
commit 85ede6d64bf10ce9b91cdd6d80f87350ee55242f
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 14:07:04 2022 +0000
Fix panics on closed channel sends
commit 9755494a98bed62450f8001d8128e40481d27e15
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 13:38:22 2022 +0000
Don't report any errors on `/send` to see what fun that creates
commit 3bb4f87b5dd56882febb4db5621db484c8789b7c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 13:00:26 2022 +0000
Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that"
This reverts commit 368675283fc44501f227639811bdb16dd5deef8c.
commit fe2673ed7be9559eaca134424e403a4faca100b0
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 12:09:34 2022 +0000
Go 1.16 instead of Go 1.13 for upgrade tests and Complement
commit 368675283fc44501f227639811bdb16dd5deef8c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 11:51:45 2022 +0000
Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that
commit b028dfc08577bcf52e6cb498026e15fa5d46d07c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 10:29:08 2022 +0000
Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset)
* Merge in NATS Server v2.6.6 and nats.go v1.13 into the in-process connection fork
* Add `jetstream.WithJetStreamMessage` to make ack/nak-ing less messy, use process context in consumers
* Fix consumer component name in federation API
* Add comment explaining where streams are defined
* Tweaks to roomserver input with comments
* Finish that sentence that I apparently forgot to finish in INSTALL.md
* Bump version number of config to 2
* Add comments around asynchronous sends to roomserver in processEventWithMissingState
* More useful error message when the config version does not match
* Set version in generate-config
* Fix version in config.Defaults
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'roomserver')
-rw-r--r-- | roomserver/api/input.go | 1 | ||||
-rw-r--r-- | roomserver/api/wrapper.go | 15 | ||||
-rw-r--r-- | roomserver/internal/api.go | 17 | ||||
-rw-r--r-- | roomserver/internal/input/input.go | 250 | ||||
-rw-r--r-- | roomserver/internal/input/input_events.go | 28 | ||||
-rw-r--r-- | roomserver/internal/input/input_fifo.go | 64 | ||||
-rw-r--r-- | roomserver/roomserver.go | 11 | ||||
-rw-r--r-- | roomserver/roomserver_test.go | 407 |
8 files changed, 163 insertions, 630 deletions
diff --git a/roomserver/api/input.go b/roomserver/api/input.go index 8e6e4ac7..a537e64e 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -86,6 +86,7 @@ type TransactionID struct { // InputRoomEventsRequest is a request to InputRoomEvents type InputRoomEventsRequest struct { InputRoomEvents []InputRoomEvent `json:"input_room_events"` + Asynchronous bool `json:"async"` } // InputRoomEventsResponse is a response to InputRoomEvents diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index de66df80..cdb186c0 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -27,6 +27,7 @@ func SendEvents( ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, events []*gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, + async bool, ) error { ires := make([]InputRoomEvent, len(events)) for i, event := range events { @@ -38,7 +39,7 @@ func SendEvents( TransactionID: txnID, } } - return SendInputRoomEvents(ctx, rsAPI, ires) + return SendInputRoomEvents(ctx, rsAPI, ires, async) } // SendEventWithState writes an event with the specified kind to the roomserver @@ -47,7 +48,7 @@ func SendEvents( func SendEventWithState( ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, - haveEventIDs map[string]bool, + haveEventIDs map[string]bool, async bool, ) error { outliers, err := state.Events() if err != nil { @@ -79,14 +80,18 @@ func SendEventWithState( StateEventIDs: stateEventIDs, }) - return SendInputRoomEvents(ctx, rsAPI, ires) + return SendInputRoomEvents(ctx, rsAPI, ires, async) } // SendInputRoomEvents to the roomserver. func SendInputRoomEvents( - ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, + ctx context.Context, rsAPI RoomserverInternalAPI, + ires []InputRoomEvent, async bool, ) error { - request := InputRoomEventsRequest{InputRoomEvents: ires} + request := InputRoomEventsRequest{ + InputRoomEvents: ires, + Asynchronous: async, + } var response InputRoomEventsResponse rsAPI.InputRoomEvents(ctx, &request, &response) return response.Err() diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 67bbc7ab..5cfe68da 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -3,7 +3,6 @@ package internal import ( "context" - "github.com/Shopify/sarama" "github.com/getsentry/sentry-go" asAPI "github.com/matrix-org/dendrite/appservice/api" fsAPI "github.com/matrix-org/dendrite/federationapi/api" @@ -16,6 +15,8 @@ import ( "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" ) // RoomserverInternalAPI is an implementation of api.RoomserverInternalAPI @@ -33,19 +34,19 @@ type RoomserverInternalAPI struct { *perform.Forgetter DB storage.Database Cfg *config.RoomServer - Producer sarama.SyncProducer Cache caching.RoomServerCaches ServerName gomatrixserverlib.ServerName KeyRing gomatrixserverlib.JSONVerifier fsAPI fsAPI.FederationInternalAPI asAPI asAPI.AppServiceQueryAPI - OutputRoomEventTopic string // Kafka topic for new output room events + InputRoomEventTopic string // JetStream topic for new input room events + OutputRoomEventTopic string // JetStream topic for new output room events PerspectiveServerNames []gomatrixserverlib.ServerName } func NewRoomserverAPI( - cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer, - outputRoomEventTopic string, caches caching.RoomServerCaches, + cfg *config.RoomServer, roomserverDB storage.Database, consumer nats.JetStreamContext, + inputRoomEventTopic, outputRoomEventTopic string, caches caching.RoomServerCaches, perspectiveServerNames []gomatrixserverlib.ServerName, ) *RoomserverInternalAPI { serverACLs := acls.NewServerACLs(roomserverDB) @@ -63,13 +64,17 @@ func NewRoomserverAPI( }, Inputer: &input.Inputer{ DB: roomserverDB, + InputRoomEventTopic: inputRoomEventTopic, OutputRoomEventTopic: outputRoomEventTopic, - Producer: producer, + JetStream: consumer, ServerName: cfg.Matrix.ServerName, ACLs: serverACLs, }, // perform-er structs get initialised when we have a federation sender to use } + if err := a.Inputer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start roomserver input API") + } return a } diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index de40e133..1eab6780 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -19,19 +19,19 @@ import ( "context" "encoding/json" "sync" - "time" - "github.com/Shopify/sarama" + "github.com/Arceliar/phony" "github.com/getsentry/sentry-go" "github.com/matrix-org/dendrite/internal/hooks" "github.com/matrix-org/dendrite/roomserver/acls" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" - "go.uber.org/atomic" ) var keyContentFields = map[string]string{ @@ -42,105 +42,161 @@ var keyContentFields = map[string]string{ type Inputer struct { DB storage.Database - Producer sarama.SyncProducer + JetStream nats.JetStreamContext ServerName gomatrixserverlib.ServerName ACLs *acls.ServerACLs + InputRoomEventTopic string OutputRoomEventTopic string - workers sync.Map // room ID -> *inputWorker + workers sync.Map // room ID -> *phony.Inbox } -type inputTask struct { - ctx context.Context - event *api.InputRoomEvent - wg *sync.WaitGroup - err error // written back by worker, only safe to read when all tasks are done -} - -type inputWorker struct { - r *Inputer - running atomic.Bool - input *fifoQueue +// onMessage is called when a new event arrives in the roomserver input stream. +func (r *Inputer) Start() error { + _, err := r.JetStream.Subscribe( + r.InputRoomEventTopic, + // We specifically don't use jetstream.WithJetStreamMessage here because we + // queue the task off to a room-specific queue and the ACK needs to be sent + // later, possibly with an error response to the inputter if synchronous. + func(msg *nats.Msg) { + roomID := msg.Header.Get("room_id") + defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() + var inputRoomEvent api.InputRoomEvent + if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil { + _ = msg.Term() + return + } + inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{}) + inbox.(*phony.Inbox).Act(nil, func() { + if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil { + sentry.CaptureException(err) + } else { + hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event) + } + _ = msg.Ack() + }) + }, + // NATS wants to acknowledge automatically by default when the message is + // read from the stream, but we want to override that behaviour by making + // sure that we only acknowledge when we're happy we've done everything we + // can. This ensures we retry things when it makes sense to do so. + nats.ManualAck(), + // NATS will try to redeliver things to us automatically if we don't ack + // or nak them within a certain amount of time. This stops that from + // happening, so we don't end up doing a lot of unnecessary duplicate work. + nats.MaxDeliver(0), + ) + return err } -// Guarded by a CAS on w.running -func (w *inputWorker) start() { - defer w.running.Store(false) - for { - select { - case <-w.input.wait(): - task, ok := w.input.pop() - if !ok { - continue +// InputRoomEvents implements api.RoomserverInternalAPI +func (r *Inputer) InputRoomEvents( + ctx context.Context, + request *api.InputRoomEventsRequest, + response *api.InputRoomEventsResponse, +) { + if request.Asynchronous { + var err error + for _, e := range request.InputRoomEvents { + msg := &nats.Msg{ + Subject: r.InputRoomEventTopic, + Header: nats.Header{}, + } + roomID := e.Event.RoomID() + msg.Header.Set("room_id", roomID) + msg.Data, err = json.Marshal(e) + if err != nil { + response.ErrMsg = err.Error() + return } - roomserverInputBackpressure.With(prometheus.Labels{ - "room_id": task.event.Event.RoomID(), - }).Dec() - hooks.Run(hooks.KindNewEventReceived, task.event.Event) - _, task.err = w.r.processRoomEvent(task.ctx, task.event) - if task.err == nil { - hooks.Run(hooks.KindNewEventPersisted, task.event.Event) - } else { - sentry.CaptureException(task.err) + if _, err = r.JetStream.PublishMsg(msg); err != nil { + return + } + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() + } + } else { + responses := make(chan error, len(request.InputRoomEvents)) + defer close(responses) + for _, e := range request.InputRoomEvents { + inputRoomEvent := e + inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{}) + inbox.(*phony.Inbox).Act(nil, func() { + err := r.processRoomEvent(context.TODO(), &inputRoomEvent) + if err != nil { + sentry.CaptureException(err) + } else { + hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event) + } + select { + case <-ctx.Done(): + default: + responses <- err + } + }) + } + for i := 0; i < len(request.InputRoomEvents); i++ { + select { + case <-ctx.Done(): + return + case err := <-responses: + if err != nil { + response.ErrMsg = err.Error() + return + } } - task.wg.Done() - case <-time.After(time.Second * 5): - return } } } // WriteOutputEvents implements OutputRoomEventWriter func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { - messages := make([]*sarama.ProducerMessage, len(updates)) - for i := range updates { - value, err := json.Marshal(updates[i]) + var err error + for _, update := range updates { + msg := &nats.Msg{ + Subject: r.OutputRoomEventTopic, + Header: nats.Header{}, + } + msg.Header.Set(jetstream.RoomID, roomID) + msg.Data, err = json.Marshal(update) if err != nil { return err } logger := log.WithFields(log.Fields{ "room_id": roomID, - "type": updates[i].Type, + "type": update.Type, }) - if updates[i].NewRoomEvent != nil { - eventType := updates[i].NewRoomEvent.Event.Type() + if update.NewRoomEvent != nil { + eventType := update.NewRoomEvent.Event.Type() logger = logger.WithFields(log.Fields{ "event_type": eventType, - "event_id": updates[i].NewRoomEvent.Event.EventID(), - "adds_state": len(updates[i].NewRoomEvent.AddsStateEventIDs), - "removes_state": len(updates[i].NewRoomEvent.RemovesStateEventIDs), - "send_as_server": updates[i].NewRoomEvent.SendAsServer, - "sender": updates[i].NewRoomEvent.Event.Sender(), + "event_id": update.NewRoomEvent.Event.EventID(), + "adds_state": len(update.NewRoomEvent.AddsStateEventIDs), + "removes_state": len(update.NewRoomEvent.RemovesStateEventIDs), + "send_as_server": update.NewRoomEvent.SendAsServer, + "sender": update.NewRoomEvent.Event.Sender(), }) - if updates[i].NewRoomEvent.Event.StateKey() != nil { - logger = logger.WithField("state_key", *updates[i].NewRoomEvent.Event.StateKey()) + if update.NewRoomEvent.Event.StateKey() != nil { + logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey()) } contentKey := keyContentFields[eventType] if contentKey != "" { - value := gjson.GetBytes(updates[i].NewRoomEvent.Event.Content(), contentKey) + value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey) if value.Exists() { logger = logger.WithField("content_value", value.String()) } } - if eventType == "m.room.server_acl" && updates[i].NewRoomEvent.Event.StateKeyEquals("") { - ev := updates[i].NewRoomEvent.Event.Unwrap() + if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") { + ev := update.NewRoomEvent.Event.Unwrap() defer r.ACLs.OnServerACLUpdate(ev) } } - logger.Infof("Producing to topic '%s'", r.OutputRoomEventTopic) - messages[i] = &sarama.ProducerMessage{ - Topic: r.OutputRoomEventTopic, - Key: sarama.StringEncoder(roomID), - Value: sarama.ByteEncoder(value), - } - } - errs := r.Producer.SendMessages(messages) - if errs != nil { - for _, err := range errs.(sarama.ProducerErrors) { - log.WithError(err).WithField("message_bytes", err.Msg.Value.Length()).Error("Write to kafka failed") + logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic) + if _, err := r.JetStream.PublishMsg(msg); err != nil { + logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err) + return err } } - return errs + return nil } func init() { @@ -156,67 +212,3 @@ var roomserverInputBackpressure = prometheus.NewGaugeVec( }, []string{"room_id"}, ) - -// InputRoomEvents implements api.RoomserverInternalAPI -func (r *Inputer) InputRoomEvents( - _ context.Context, - request *api.InputRoomEventsRequest, - response *api.InputRoomEventsResponse, -) { - // Create a wait group. Each task that we dispatch will call Done on - // this wait group so that we know when all of our events have been - // processed. - wg := &sync.WaitGroup{} - wg.Add(len(request.InputRoomEvents)) - tasks := make([]*inputTask, len(request.InputRoomEvents)) - - for i, e := range request.InputRoomEvents { - // Work out if we are running per-room workers or if we're just doing - // it on a global basis (e.g. SQLite). - roomID := "global" - if r.DB.SupportsConcurrentRoomInputs() { - roomID = e.Event.RoomID() - } - - // Look up the worker, or create it if it doesn't exist. This channel - // is buffered to reduce the chance that we'll be blocked by another - // room - the channel will be quite small as it's just pointer types. - w, _ := r.workers.LoadOrStore(roomID, &inputWorker{ - r: r, - input: newFIFOQueue(), - }) - worker := w.(*inputWorker) - - // Create a task. This contains the input event and a reference to - // the wait group, so that the worker can notify us when this specific - // task has been finished. - tasks[i] = &inputTask{ - ctx: context.Background(), - event: &request.InputRoomEvents[i], - wg: wg, - } - - // Send the task to the worker. - if worker.running.CAS(false, true) { - go worker.start() - } - worker.input.push(tasks[i]) - roomserverInputBackpressure.With(prometheus.Labels{ - "room_id": roomID, - }).Inc() - } - - // Wait for all of the workers to return results about our tasks. - wg.Wait() - - // If any of the tasks returned an error, we should probably report - // that back to the caller. - for _, task := range tasks { - if task.err != nil { - response.ErrMsg = task.err.Error() - _, rejected := task.err.(*gomatrixserverlib.NotAllowed) - response.NotAllowed = rejected - return - } - } -} diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index fc712f47..791f7f30 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -62,7 +62,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec( func (r *Inputer) processRoomEvent( ctx context.Context, input *api.InputRoomEvent, -) (eventID string, err error) { +) (err error) { // Measure how long it takes to process this event. started := time.Now() defer func() { @@ -88,11 +88,11 @@ func (r *Inputer) processRoomEvent( case gomatrixserverlib.EventIDFormatV1: if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) { util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") - return event.EventID(), nil + return nil } default: util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring") - return event.EventID(), nil + return nil } } } @@ -124,14 +124,14 @@ func (r *Inputer) processRoomEvent( // Store the event. _, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected) if err != nil { - return "", fmt.Errorf("r.DB.StoreEvent: %w", err) + return fmt.Errorf("r.DB.StoreEvent: %w", err) } // if storing this event results in it being redacted then do so. if !isRejected && redactedEventID == event.EventID() { r, rerr := eventutil.RedactEvent(redactionEvent, event) if rerr != nil { - return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr) + return fmt.Errorf("eventutil.RedactEvent: %w", rerr) } event = r } @@ -146,15 +146,15 @@ func (r *Inputer) processRoomEvent( "room": event.RoomID(), "sender": event.Sender(), }).Debug("Stored outlier") - return event.EventID(), nil + return nil } roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID()) if err != nil { - return "", fmt.Errorf("r.DB.RoomInfo: %w", err) + return fmt.Errorf("r.DB.RoomInfo: %w", err) } if roomInfo == nil { - return "", fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID()) + return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID()) } if stateAtEvent.BeforeStateSnapshotNID == 0 { @@ -162,7 +162,7 @@ func (r *Inputer) processRoomEvent( // Lets calculate one. err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected) if err != nil && input.Kind != api.KindOld { - return "", fmt.Errorf("r.calculateAndSetState: %w", err) + return fmt.Errorf("r.calculateAndSetState: %w", err) } } @@ -175,7 +175,7 @@ func (r *Inputer) processRoomEvent( "soft_fail": softfail, "sender": event.Sender(), }).Debug("Stored rejected event") - return event.EventID(), rejectionErr + return rejectionErr } switch input.Kind { @@ -189,7 +189,7 @@ func (r *Inputer) processRoomEvent( input.TransactionID, // transaction ID input.HasState, // rewrites state? ); err != nil { - return "", fmt.Errorf("r.updateLatestEvents: %w", err) + return fmt.Errorf("r.updateLatestEvents: %w", err) } case api.KindOld: err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{ @@ -201,7 +201,7 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err) + return fmt.Errorf("r.WriteOutputEvents (old): %w", err) } } @@ -220,12 +220,12 @@ func (r *Inputer) processRoomEvent( }, }) if err != nil { - return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) + return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err) } } // Update the extremities of the event graph for the room - return event.EventID(), nil + return nil } func (r *Inputer) calculateAndSetState( diff --git a/roomserver/internal/input/input_fifo.go b/roomserver/internal/input/input_fifo.go deleted file mode 100644 index 694b1724..00000000 --- a/roomserver/internal/input/input_fifo.go +++ /dev/null @@ -1,64 +0,0 @@ -package input - -import ( - "sync" -) - -type fifoQueue struct { - tasks []*inputTask - count int - mutex sync.Mutex - notifs chan struct{} -} - -func newFIFOQueue() *fifoQueue { - q := &fifoQueue{ - notifs: make(chan struct{}, 1), - } - return q -} - -func (q *fifoQueue) push(frame *inputTask) { - q.mutex.Lock() - defer q.mutex.Unlock() - q.tasks = append(q.tasks, frame) - q.count++ - select { - case q.notifs <- struct{}{}: - default: - } -} - -// pop returns the first item of the queue, if there is one. -// The second return value will indicate if a task was returned. -// You must check this value, even after calling wait(). -func (q *fifoQueue) pop() (*inputTask, bool) { - q.mutex.Lock() - defer q.mutex.Unlock() - if q.count == 0 { - return nil, false - } - frame := q.tasks[0] - q.tasks[0] = nil - q.tasks = q.tasks[1:] - q.count-- - if q.count == 0 { - // Force a GC of the underlying array, since it might have - // grown significantly if the queue was hammered for some reason - q.tasks = nil - } - return frame, true -} - -// wait returns a channel which can be used to detect when an -// item is waiting in the queue. -func (q *fifoQueue) wait() <-chan struct{} { - q.mutex.Lock() - defer q.mutex.Unlock() - if q.count > 0 && len(q.notifs) == 0 { - ch := make(chan struct{}) - close(ch) - return ch - } - return q.notifs -} diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go index e4742100..669957be 100644 --- a/roomserver/roomserver.go +++ b/roomserver/roomserver.go @@ -23,8 +23,7 @@ import ( "github.com/matrix-org/dendrite/roomserver/internal" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/sirupsen/logrus" ) @@ -41,8 +40,6 @@ func NewInternalAPI( ) api.RoomserverInternalAPI { cfg := &base.Cfg.RoomServer - _, producer := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) - var perspectiveServerNames []gomatrixserverlib.ServerName for _, kp := range base.Cfg.FederationAPI.KeyPerspectives { perspectiveServerNames = append(perspectiveServerNames, kp.ServerName) @@ -53,8 +50,12 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to room server db") } + js, _, _ := jetstream.Prepare(&cfg.Matrix.JetStream) + return internal.NewRoomserverAPI( - cfg, roomserverDB, producer, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + cfg, roomserverDB, js, + cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent), + cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent), base.Caches, perspectiveServerNames, ) } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go deleted file mode 100644 index 40e8e92d..00000000 --- a/roomserver/roomserver_test.go +++ /dev/null @@ -1,407 +0,0 @@ -package roomserver - -import ( - "bytes" - "context" - "crypto/ed25519" - "encoding/json" - "fmt" - "os" - "reflect" - "testing" - "time" - - "github.com/Shopify/sarama" - "github.com/matrix-org/dendrite/internal/caching" - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/dendrite/roomserver/internal" - "github.com/matrix-org/dendrite/roomserver/storage" - "github.com/matrix-org/dendrite/setup/base" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/gomatrixserverlib" - "github.com/sirupsen/logrus" -) - -const ( - testOrigin = gomatrixserverlib.ServerName("kaer.morhen") - // we have to use an on-disk DB because we open multiple connections due to the *Updater structs. - // Using :memory: results in a brand new DB for each open connection, and sharing memory via - // ?cache=shared just allows read-only sharing, so writes to the database on other connections are lost. - roomserverDBFileURI = "file:roomserver_test.db" - roomserverDBFilePath = "./roomserver_test.db" -) - -var ( - ctx = context.Background() -) - -type dummyProducer struct { - topic string - producedMessages []*api.OutputEvent -} - -// SendMessage produces a given message, and returns only when it either has -// succeeded or failed to produce. It will return the partition and the offset -// of the produced message, or an error if the message failed to produce. -func (p *dummyProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { - if msg.Topic != p.topic { - return 0, 0, nil - } - be := msg.Value.(sarama.ByteEncoder) - b := json.RawMessage(be) - fmt.Println("SENDING >>>>>>>> ", string(b)) - var out api.OutputEvent - err = json.Unmarshal(b, &out) - if err != nil { - return 0, 0, err - } - p.producedMessages = append(p.producedMessages, &out) - return 0, 0, nil -} - -// SendMessages produces a given set of messages, and returns only when all -// messages in the set have either succeeded or failed. Note that messages -// can succeed and fail individually; if some succeed and some fail, -// SendMessages will return an error. -func (p *dummyProducer) SendMessages(msgs []*sarama.ProducerMessage) error { - for _, m := range msgs { - p.SendMessage(m) - } - return nil -} - -// Close shuts down the producer and waits for any buffered messages to be -// flushed. You must call this function before a producer object passes out of -// scope, as it may otherwise leak memory. You must call this before calling -// Close on the underlying client. -func (p *dummyProducer) Close() error { - return nil -} - -func deleteDatabase() { - err := os.Remove(roomserverDBFilePath) - if err != nil { - fmt.Printf("failed to delete database %s: %s\n", roomserverDBFilePath, err) - } -} - -type fledglingEvent struct { - Type string - StateKey *string - Content interface{} - Sender string - RoomID string -} - -func mustCreateEvents(t *testing.T, roomVer gomatrixserverlib.RoomVersion, events []fledglingEvent) (result []*gomatrixserverlib.HeaderedEvent) { - t.Helper() - depth := int64(1) - seed := make([]byte, ed25519.SeedSize) // zero seed - key := ed25519.NewKeyFromSeed(seed) - var prevs []string - roomState := make(map[gomatrixserverlib.StateKeyTuple]string) // state -> event ID - for _, ev := range events { - eb := gomatrixserverlib.EventBuilder{ - Sender: ev.Sender, - Depth: depth, - Type: ev.Type, - StateKey: ev.StateKey, - RoomID: ev.RoomID, - PrevEvents: prevs, - } - err := eb.SetContent(ev.Content) - if err != nil { - t.Fatalf("mustCreateEvent: failed to marshal event content %+v", ev.Content) - } - stateNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(&eb) - if err != nil { - t.Fatalf("mustCreateEvent: failed to work out auth_events : %s", err) - } - var authEvents []string - for _, tuple := range stateNeeded.Tuples() { - eventID := roomState[tuple] - if eventID != "" { - authEvents = append(authEvents, eventID) - } - } - eb.AuthEvents = authEvents - signedEvent, err := eb.Build(time.Now(), testOrigin, "ed25519:test", key, roomVer) - if err != nil { - t.Fatalf("mustCreateEvent: failed to sign event: %s", err) - } - depth++ - prevs = []string{signedEvent.EventID()} - if ev.StateKey != nil { - roomState[gomatrixserverlib.StateKeyTuple{ - EventType: ev.Type, - StateKey: *ev.StateKey, - }] = signedEvent.EventID() - } - result = append(result, signedEvent.Headered(roomVer)) - } - return -} - -func mustLoadRawEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) []*gomatrixserverlib.HeaderedEvent { - t.Helper() - hs := make([]*gomatrixserverlib.HeaderedEvent, len(events)) - for i := range events { - e, err := gomatrixserverlib.NewEventFromTrustedJSON(events[i], false, ver) - if err != nil { - t.Fatalf("cannot load test data: " + err.Error()) - } - hs[i] = e.Headered(ver) - } - return hs -} - -func mustCreateRoomserverAPI(t *testing.T) (api.RoomserverInternalAPI, *dummyProducer) { - t.Helper() - cfg := &config.Dendrite{} - cfg.Defaults(true) - cfg.Global.ServerName = testOrigin - cfg.Global.Kafka.UseNaffka = true - cfg.RoomServer.Database = config.DatabaseOptions{ - ConnectionString: roomserverDBFileURI, - } - dp := &dummyProducer{ - topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), - } - cache, err := caching.NewInMemoryLRUCache(false) - if err != nil { - t.Fatalf("failed to make caches: %s", err) - } - base := &base.BaseDendrite{ - Caches: cache, - Cfg: cfg, - } - roomserverDB, err := storage.Open(&cfg.RoomServer.Database, base.Caches) - if err != nil { - logrus.WithError(err).Panicf("failed to connect to room server db") - } - return internal.NewRoomserverAPI( - &cfg.RoomServer, roomserverDB, dp, string(cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent)), - base.Caches, nil, - ), dp -} - -func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []json.RawMessage) (api.RoomserverInternalAPI, *dummyProducer, []*gomatrixserverlib.HeaderedEvent) { - t.Helper() - rsAPI, dp := mustCreateRoomserverAPI(t) - hevents := mustLoadRawEvents(t, ver, events) - if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil { - t.Errorf("failed to SendEvents: %s", err) - } - return rsAPI, dp, hevents -} - -func TestOutputRedactedEvent(t *testing.T) { - redactionEvents := []json.RawMessage{ - // create event - []byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$N4us6vqqq3RjvpKd:kaer.morhen","hashes":{"sha256":"WTdrCn/YsiounXcJPsLP8xT0ZjHiO5Ov0NvXYmK2onE"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"9+5JcpaN5b5KlHYHGp6r+GoNDH98lbfzGYwjfxensa5C5D/bDACaYnMDLnhwsHOE5nxgI+jT/GV271pz6PMSBQ"}},"state_key":"","type":"m.room.create"}`), - // join event - []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}]],"content":{"membership":"join"},"depth":1,"event_id":"$6sUiGPQ0a3tqYGKo:kaer.morhen","hashes":{"sha256":"eYVBC7RO+FlxRyW1aXYf/ad4Dzi7T93tArdGw3r4RwQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"tiDBTPFa53YMfHiupX3vSRE/ZcCiCjmGt7gDpIpDpwZapeays5Vqqcqb7KiywrDldpTkrrdJBAw2jXcq6ZyhDw"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`), - // room name - []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"name":"My Room Name"},"depth":2,"event_id":"$VC1zZ9YWwuUbSNHD:kaer.morhen","hashes":{"sha256":"bpqTkfLx6KHzWz7/wwpsXnXwJWEGW14aV63ffexzDFg"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"mhJZ3X4bAKrF/T0mtPf1K2Tmls0h6xGY1IPDpJ/SScQBqDlu3HQR2BPa7emqj5bViyLTWVNh+ZCpzx/6STTrAg"}},"state_key":"","type":"m.room.name"}`), - // redact room name - []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"reason":"Spamming"},"depth":3,"event_id":"$tJI0pE3b8u9UMYpT:kaer.morhen","hashes":{"sha256":"/3TStqa5SQqYaEtl7ajEvSRvu6d12MMKfICUzrBpd2Q"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$VC1zZ9YWwuUbSNHD:kaer.morhen",{"sha256":"+l8cNa7syvm0EF7CAmQRlYknLEMjivnI4FLhB/TUBEY"}]],"redacts":"$VC1zZ9YWwuUbSNHD:kaer.morhen","room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"QBOh+amf0vTJbm6+9VwAcR9uJviBIor2KON0Y7+EyQx5YbUZEzW1HPeJxarLIHBcxMzgOVzjuM+StzjbUgDzAg"}},"type":"m.room.redaction"}`), - // message - []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"body":"Test Message"},"depth":4,"event_id":"$o8KHsgSIYbJrddnd:kaer.morhen","hashes":{"sha256":"IE/rGVlKOpiGWeIo887g1CK1drYqcWDZhL6THZHkJ1c"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$tJI0pE3b8u9UMYpT:kaer.morhen",{"sha256":"zvmwyXuDox7jpA16JRH6Fc1zbfQht2tpkBbMTUOi3Jw"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"/3z+pJjiJXWhwfqIEzmNksvBHCoXTktK/y0rRuWJXw6i1+ygRG/suDCKhFuuz6gPapRmEMPVILi2mJqHHXPKAg"}},"type":"m.room.message"}`), - // redact previous message - []byte(`{"auth_events":[["$N4us6vqqq3RjvpKd:kaer.morhen",{"sha256":"SylirfgfXFhscZL7p10NmOa1nFFEckiwz0lAideQMIM"}],["$6sUiGPQ0a3tqYGKo:kaer.morhen",{"sha256":"IS4HSMqpqVUGh1Z3qgC99YcaizjCoO4yFhYYe8j53IE"}]],"content":{"reason":"Spamming more"},"depth":5,"event_id":"$UpsE8belb2gJItJG:kaer.morhen","hashes":{"sha256":"zU8PWJOld/I7OtjdpltFSKC+DMNm2ZyEXAHcprsafD0"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$o8KHsgSIYbJrddnd:kaer.morhen",{"sha256":"UgjMuCFXH4warIjKuwlRq9zZ6dSJrZWCd+CkqtgLSHM"}]],"redacts":"$o8KHsgSIYbJrddnd:kaer.morhen","room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zxFGr/7aGOzqOEN6zRNrBpFkkMnfGFPbCteYL33wC+PycBPIK+2WRa5qlAR2+lcLiK3HjIzwRYkKNsVFTqvRAw"}},"type":"m.room.redaction"}`), - } - var redactedOutputs []api.OutputEvent - deleteDatabase() - _, producer, hevents := mustSendEvents(t, gomatrixserverlib.RoomVersionV1, redactionEvents) - defer deleteDatabase() - for _, msg := range producer.producedMessages { - if msg.Type == api.OutputTypeRedactedEvent { - redactedOutputs = append(redactedOutputs, *msg) - } - } - wantRedactedOutputs := []api.OutputEvent{ - { - Type: api.OutputTypeRedactedEvent, - RedactedEvent: &api.OutputRedactedEvent{ - RedactedEventID: hevents[2].EventID(), - RedactedBecause: hevents[3], - }, - }, - { - Type: api.OutputTypeRedactedEvent, - RedactedEvent: &api.OutputRedactedEvent{ - RedactedEventID: hevents[4].EventID(), - RedactedBecause: hevents[5], - }, - }, - } - t.Logf("redactedOutputs: %+v", redactedOutputs) - if len(wantRedactedOutputs) != len(redactedOutputs) { - t.Fatalf("Got %d redacted events, want %d", len(redactedOutputs), len(wantRedactedOutputs)) - } - for i := 0; i < len(wantRedactedOutputs); i++ { - if !reflect.DeepEqual(*redactedOutputs[i].RedactedEvent, *wantRedactedOutputs[i].RedactedEvent) { - t.Errorf("OutputRedactionEvent %d: wrong event got:\n%+v want:\n%+v", i+1, redactedOutputs[i].RedactedEvent, wantRedactedOutputs[i].RedactedEvent) - } - } -} - -// This tests that rewriting state works correctly. -// This creates a small room with a create/join/name state, then replays it -// with a new room name. We expect the output events to contain the original events, -// followed by a single OutputNewRoomEvent with RewritesState set to true with the -// rewritten state events (with the 2nd room name). -func TestOutputRewritesState(t *testing.T) { - roomID := "!foo:" + string(testOrigin) - alice := "@alice:" + string(testOrigin) - emptyKey := "" - originalEvents := mustCreateEvents(t, gomatrixserverlib.RoomVersionV6, []fledglingEvent{ - { - RoomID: roomID, - Sender: alice, - Content: map[string]interface{}{ - "creator": alice, - "room_version": "6", - }, - StateKey: &emptyKey, - Type: gomatrixserverlib.MRoomCreate, - }, - { - RoomID: roomID, - Sender: alice, - Content: map[string]interface{}{ - "membership": "join", - }, - StateKey: &alice, - Type: gomatrixserverlib.MRoomMember, - }, - { - RoomID: roomID, - Sender: alice, - Content: map[string]interface{}{ - "body": "hello world", - }, - StateKey: nil, - Type: "m.room.message", - }, - { - RoomID: roomID, - Sender: alice, - Content: map[string]interface{}{ - "name": "Room Name", - }, - StateKey: &emptyKey, - Type: "m.room.name", - }, - }) - rewriteEvents := mustCreateEvents(t, gomatrixserverlib.RoomVersionV6, []fledglingEvent{ - { - RoomID: roomID, - Sender: alice, - Content: map[string]interface{}{ - "creator": alice, - }, - StateKey: &emptyKey, - Type: gomatrixserverlib.MRoomCreate, - }, - { - RoomID: roomID, - Sender: alice, - Content: map[string]interface{}{ - "membership": "join", - }, - StateKey: &alice, - Type: gomatrixserverlib.MRoomMember, - }, - { - RoomID: roomID, - Sender: alice, - Content: map[string]interface{}{ - "name": "Room Name 2", - }, - StateKey: &emptyKey, - Type: "m.room.name", - }, - { - RoomID: roomID, - Sender: alice, - Content: map[string]interface{}{ - "body": "hello world 2", - }, - StateKey: nil, - Type: "m.room.message", - }, - }) - deleteDatabase() - rsAPI, producer := mustCreateRoomserverAPI(t) - defer deleteDatabase() - err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil) - if err != nil { - t.Fatalf("failed to send original events: %s", err) - } - // assert we got them produced, this is just a sanity check and isn't the intention of this test - if len(producer.producedMessages) != len(originalEvents) { - t.Fatalf("SendEvents didn't result in same number of produced output events: got %d want %d", len(producer.producedMessages), len(originalEvents)) - } - producer.producedMessages = nil // we aren't actually interested in these events, just the rewrite ones - - var inputEvents []api.InputRoomEvent - // slowly build up the state IDs again, we're basically telling the roomserver what to store as a snapshot - var stateIDs []string - // skip the last event, we'll use this to tie together the rewrite as the KindNew event - for i := 0; i < len(rewriteEvents)-1; i++ { - ev := rewriteEvents[i] - inputEvents = append(inputEvents, api.InputRoomEvent{ - Kind: api.KindOutlier, - Event: ev, - AuthEventIDs: ev.AuthEventIDs(), - HasState: true, - StateEventIDs: stateIDs, - }) - if ev.StateKey() != nil { - stateIDs = append(stateIDs, ev.EventID()) - } - } - lastEv := rewriteEvents[len(rewriteEvents)-1] - inputEvents = append(inputEvents, api.InputRoomEvent{ - Kind: api.KindNew, - Event: lastEv, - AuthEventIDs: lastEv.AuthEventIDs(), - HasState: true, - StateEventIDs: stateIDs, - }) - if err := api.SendInputRoomEvents(context.Background(), rsAPI, inputEvents); err != nil { - t.Fatalf("SendInputRoomEvents returned error for rewrite events: %s", err) - } - // we should just have one output event with the entire state of the room in it - if len(producer.producedMessages) != 1 { - t.Fatalf("Rewritten events got output, want only 1 got %d", len(producer.producedMessages)) - } - outputEvent := producer.producedMessages[len(producer.producedMessages)-1] - if !outputEvent.NewRoomEvent.RewritesState { - t.Errorf("RewritesState flag not set on output event") - } - if !reflect.DeepEqual(stateIDs, outputEvent.NewRoomEvent.AddsStateEventIDs) { - t.Errorf("Output event is missing room state event IDs, got %v want %v", outputEvent.NewRoomEvent.AddsStateEventIDs, stateIDs) - } - if !bytes.Equal(outputEvent.NewRoomEvent.Event.JSON(), lastEv.JSON()) { - t.Errorf( - "Output event isn't the latest KindNew event:\ngot %s\nwant %s", - string(outputEvent.NewRoomEvent.Event.JSON()), - string(lastEv.JSON()), - ) - } - if len(outputEvent.NewRoomEvent.AddStateEvents) != len(stateIDs) { - t.Errorf("Output event is missing room state events themselves, got %d want %d", len(outputEvent.NewRoomEvent.AddStateEvents), len(stateIDs)) - } - // make sure the state got overwritten, check the room name - hasRoomName := false - for _, ev := range outputEvent.NewRoomEvent.AddStateEvents { - if ev.Type() == "m.room.name" { - hasRoomName = string(ev.Content()) == `{"name":"Room Name 2"}` - } - } - if !hasRoomName { - t.Errorf("Output event did not overwrite room state") - } -} |