diff options
author | S7evinK <2353100+S7evinK@users.noreply.github.com> | 2022-03-29 14:14:35 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-29 14:14:35 +0200 |
commit | 49dc49b232432d52c082646cc6f778593f4cb8b4 (patch) | |
tree | e809deedcae2127e930b0d382c863561b61fd9d2 /setup/jetstream | |
parent | 7972915806348847ecd9a9b8a1b1ff0609cb883c (diff) |
Remove eduserver (#2306)
* Move receipt sending to own JetStream producer
* Move SendToDevice to producer
* Remove most parts of the EDU server
* Fix SendToDevice & copyrights
* Move structs, cleanup EDU Server traces
* Use HeadersOnly subscription
* Missing file
* Fix linter issues
* Move consumers to own files
* Rename durable consumer; Consumer cleanup
* Docs/config cleanup
Diffstat (limited to 'setup/jetstream')
-rw-r--r-- | setup/jetstream/nats.go | 21 | ||||
-rw-r--r-- | setup/jetstream/streams.go | 5 |
2 files changed, 24 insertions, 2 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 328cf915..4e4fe7a2 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -157,5 +157,26 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc } } + // Clean up old consumers so that interest-based consumers do the + // right thing. + for stream, consumers := range map[string][]string{ + OutputClientData: {"SyncAPIClientAPIConsumer"}, + OutputReceiptEvent: {"SyncAPIEDUServerReceiptConsumer", "FederationAPIEDUServerConsumer"}, + OutputSendToDeviceEvent: {"SyncAPIEDUServerSendToDeviceConsumer", "FederationAPIEDUServerConsumer"}, + OutputTypingEvent: {"SyncAPIEDUServerTypingConsumer", "FederationAPIEDUServerConsumer"}, + } { + streamName := cfg.Matrix.JetStream.Prefixed(stream) + for _, consumer := range consumers { + consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull" + consumerInfo, err := s.ConsumerInfo(streamName, consumerName) + if err != nil || consumerInfo == nil { + continue + } + if err = s.DeleteConsumer(streamName, consumerName); err != nil { + logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream) + } + } + } + return s, nc } diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go index aa979924..5f0d37fd 100644 --- a/setup/jetstream/streams.go +++ b/setup/jetstream/streams.go @@ -9,8 +9,9 @@ import ( ) const ( - UserID = "user_id" - RoomID = "room_id" + UserID = "user_id" + RoomID = "room_id" + EventID = "event_id" ) var ( |