diff options
author | S7evinK <tfaelligen@gmail.com> | 2022-01-05 18:44:49 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-05 17:44:49 +0000 |
commit | 161f14517669410d3e8207dc41eea5c9695f7e17 (patch) | |
tree | 20db8ed83d92c688206242f84880ff2e35a1d5eb /appservice | |
parent | a47b12dc7d692e0ddd4aaa0801dafc9bb462aad9 (diff) |
Add NATS JetStream support (#1866)
* Add NATS JetStream support
Update shopify/sarama
* Fix addresses
* Don't change Addresses in Defaults
* Update saramajetstream
* Add missing error check
Keep typing events for at least one minute
* Use all configured NATS addresses
* Update saramajetstream
* Try setting up with NATS
* Make sure NATS uses own persistent directory (TODO: make this configurable)
* Update go.mod/go.sum
* Jetstream package
* Various other refactoring
* Build fixes
* Config tweaks, make random jetstream storage path for CI
* Disable interest policies
* Try to sane default on jetstream base path
* Try to use in-memory for CI
* Restore storage/retention
* Update nats.go dependency
* Adapt changes to config
* Remove unneeded TopicFor
* Dep update
* Revert "Remove unneeded TopicFor"
This reverts commit f5a4e4a339b6f94ec215778dca22204adaa893d1.
* Revert changes made to streams
* Fix build problems
* Update nats-server
* Update go.mod/go.sum
* Roomserver input API queuing using NATS
* Fix topic naming
* Prometheus metrics
* More refactoring to remove saramajetstream
* Add missing topic
* Don't try to populate map that doesn't exist
* Roomserver output topic
* Update go.mod/go.sum
* Message acknowledgements
* Ack tweaks
* Try to resume transaction re-sends
* Try to resume transaction re-sends
* Update to matrix-org/gomatrixserverlib@91dadfb
* Remove internal.PartitionStorer from components that don't consume keychanges
* Try to reduce re-allocations a bit in resolveConflictsV2
* Tweak delivery options on RS input
* Publish send-to-device messages into correct JetStream subject
* Async and sync roomserver input
* Update dendrite-config.yaml
* Remove roomserver tests for now (they need rewriting)
* Remove roomserver test again (was merged back in)
* Update documentation
* Docker updates
* More Docker updates
* Update Docker readme again
* Fix lint issues
* Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset)
* Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that
* Go 1.16 instead of Go 1.13 for upgrade tests and Complement
* Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that"
This reverts commit 368675283fc44501f227639811bdb16dd5deef8c.
* Don't report any errors on `/send` to see what fun that creates
* Fix panics on closed channel sends
* Enforce state key matches sender
* Do the same for leave
* Various tweaks to make tests happier
Squashed commit of the following:
commit 13f9028e7a63662759ce7c55504a9d2423058668
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 15:47:14 2022 +0000
Do the same for leave
commit e6be7f05c349fafbdddfe818337a17a60c867be1
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 15:33:42 2022 +0000
Enforce state key matches sender
commit 85ede6d64bf10ce9b91cdd6d80f87350ee55242f
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 14:07:04 2022 +0000
Fix panics on closed channel sends
commit 9755494a98bed62450f8001d8128e40481d27e15
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 13:38:22 2022 +0000
Don't report any errors on `/send` to see what fun that creates
commit 3bb4f87b5dd56882febb4db5621db484c8789b7c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 13:00:26 2022 +0000
Revert "Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that"
This reverts commit 368675283fc44501f227639811bdb16dd5deef8c.
commit fe2673ed7be9559eaca134424e403a4faca100b0
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 12:09:34 2022 +0000
Go 1.16 instead of Go 1.13 for upgrade tests and Complement
commit 368675283fc44501f227639811bdb16dd5deef8c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 11:51:45 2022 +0000
Don't report event rejection errors via `/send`, since apparently this is upsetting tests that don't expect that
commit b028dfc08577bcf52e6cb498026e15fa5d46d07c
Author: Neil Alexander <neilalexander@users.noreply.github.com>
Date: Tue Jan 4 10:29:08 2022 +0000
Send final event in `processEvent` synchronously (since this might stop Sytest from being so upset)
* Merge in NATS Server v2.6.6 and nats.go v1.13 into the in-process connection fork
* Add `jetstream.WithJetStreamMessage` to make ack/nak-ing less messy, use process context in consumers
* Fix consumer component name in federation API
* Add comment explaining where streams are defined
* Tweaks to roomserver input with comments
* Finish that sentence that I apparently forgot to finish in INSTALL.md
* Bump version number of config to 2
* Add comments around asynchronous sends to roomserver in processEventWithMissingState
* More useful error message when the config version does not match
* Set version in generate-config
* Fix version in config.Defaults
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'appservice')
-rw-r--r-- | appservice/appservice.go | 6 | ||||
-rw-r--r-- | appservice/consumers/roomserver.go | 82 | ||||
-rw-r--r-- | appservice/storage/interface.go | 2 |
3 files changed, 45 insertions, 45 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go index 5f16c10b..924a609e 100644 --- a/appservice/appservice.go +++ b/appservice/appservice.go @@ -32,7 +32,7 @@ import ( roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/base" "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/kafka" + "github.com/matrix-org/dendrite/setup/jetstream" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/sirupsen/logrus" ) @@ -58,7 +58,7 @@ func NewInternalAPI( }, }, } - consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka) + js, _, _ := jetstream.Prepare(&base.Cfg.Global.JetStream) // Create a connection to the appservice postgres DB appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database) @@ -97,7 +97,7 @@ func NewInternalAPI( // We can't add ASes at runtime so this is safe to do. if len(workerStates) > 0 { consumer := consumers.NewOutputRoomEventConsumer( - base.ProcessContext, base.Cfg, consumer, appserviceDB, + base.ProcessContext, base.Cfg, js, appserviceDB, rsAPI, workerStates, ) if err := consumer.Start(); err != nil { diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 2ad7f68f..139b5724 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -20,23 +20,25 @@ import ( "github.com/matrix-org/dendrite/appservice/storage" "github.com/matrix-org/dendrite/appservice/types" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/jetstream" "github.com/matrix-org/dendrite/setup/process" "github.com/matrix-org/gomatrixserverlib" + "github.com/nats-io/nats.go" - "github.com/Shopify/sarama" log "github.com/sirupsen/logrus" ) // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { - roomServerConsumer *internal.ContinualConsumer - asDB storage.Database - rsAPI api.RoomserverInternalAPI - serverName string - workerStates []types.ApplicationServiceWorkerState + ctx context.Context + jetstream nats.JetStreamContext + topic string + asDB storage.Database + rsAPI api.RoomserverInternalAPI + serverName string + workerStates []types.ApplicationServiceWorkerState } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call @@ -44,55 +46,55 @@ type OutputRoomEventConsumer struct { func NewOutputRoomEventConsumer( process *process.ProcessContext, cfg *config.Dendrite, - kafkaConsumer sarama.Consumer, + js nats.JetStreamContext, appserviceDB storage.Database, rsAPI api.RoomserverInternalAPI, workerStates []types.ApplicationServiceWorkerState, ) *OutputRoomEventConsumer { - consumer := internal.ContinualConsumer{ - Process: process, - ComponentName: "appservice/roomserver", - Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent), - Consumer: kafkaConsumer, - PartitionStore: appserviceDB, + return &OutputRoomEventConsumer{ + ctx: process.Context(), + jetstream: js, + topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent), + asDB: appserviceDB, + rsAPI: rsAPI, + serverName: string(cfg.Global.ServerName), + workerStates: workerStates, } - s := &OutputRoomEventConsumer{ - roomServerConsumer: &consumer, - asDB: appserviceDB, - rsAPI: rsAPI, - serverName: string(cfg.Global.ServerName), - workerStates: workerStates, - } - consumer.ProcessMessage = s.onMessage - - return s } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - return s.roomServerConsumer.Start() + _, err := s.jetstream.Subscribe(s.topic, s.onMessage) + return err } // onMessage is called when the appservice component receives a new event from // the room server output log. -func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { - // Parse out the event JSON - var output api.OutputEvent - if err := json.Unmarshal(msg.Value, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return nil - } +func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { + jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return true + } - if output.Type != api.OutputTypeNewRoomEvent { - return nil - } + if output.Type != api.OutputTypeNewRoomEvent { + return true + } + + events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} + events = append(events, output.NewRoomEvent.AddStateEvents...) - events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} - events = append(events, output.NewRoomEvent.AddStateEvents...) + // Send event to any relevant application services + if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { + log.WithError(err).Errorf("roomserver output log: filter error") + return true + } - // Send event to any relevant application services - return s.filterRoomserverEvents(context.TODO(), events) + return true + }) } // filterRoomserverEvents takes in events and decides whether any of them need diff --git a/appservice/storage/interface.go b/appservice/storage/interface.go index 735e2f90..25d35af6 100644 --- a/appservice/storage/interface.go +++ b/appservice/storage/interface.go @@ -17,12 +17,10 @@ package storage import ( "context" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/gomatrixserverlib" ) type Database interface { - internal.PartitionStorer StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error) |