aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-03-29 14:14:35 +0200
committerGitHub <noreply@github.com>2022-03-29 14:14:35 +0200
commit49dc49b232432d52c082646cc6f778593f4cb8b4 (patch)
treee809deedcae2127e930b0d382c863561b61fd9d2 /setup/jetstream
parent7972915806348847ecd9a9b8a1b1ff0609cb883c (diff)
Remove eduserver (#2306)
* Move receipt sending to own JetStream producer * Move SendToDevice to producer * Remove most parts of the EDU server * Fix SendToDevice & copyrights * Move structs, cleanup EDU Server traces * Use HeadersOnly subscription * Missing file * Fix linter issues * Move consumers to own files * Rename durable consumer; Consumer cleanup * Docs/config cleanup
Diffstat (limited to 'setup/jetstream')
-rw-r--r--setup/jetstream/nats.go21
-rw-r--r--setup/jetstream/streams.go5
2 files changed, 24 insertions, 2 deletions
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 328cf915..4e4fe7a2 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -157,5 +157,26 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
}
}
+ // Clean up old consumers so that interest-based consumers do the
+ // right thing.
+ for stream, consumers := range map[string][]string{
+ OutputClientData: {"SyncAPIClientAPIConsumer"},
+ OutputReceiptEvent: {"SyncAPIEDUServerReceiptConsumer", "FederationAPIEDUServerConsumer"},
+ OutputSendToDeviceEvent: {"SyncAPIEDUServerSendToDeviceConsumer", "FederationAPIEDUServerConsumer"},
+ OutputTypingEvent: {"SyncAPIEDUServerTypingConsumer", "FederationAPIEDUServerConsumer"},
+ } {
+ streamName := cfg.Matrix.JetStream.Prefixed(stream)
+ for _, consumer := range consumers {
+ consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull"
+ consumerInfo, err := s.ConsumerInfo(streamName, consumerName)
+ if err != nil || consumerInfo == nil {
+ continue
+ }
+ if err = s.DeleteConsumer(streamName, consumerName); err != nil {
+ logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream)
+ }
+ }
+ }
+
return s, nc
}
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
index aa979924..5f0d37fd 100644
--- a/setup/jetstream/streams.go
+++ b/setup/jetstream/streams.go
@@ -9,8 +9,9 @@ import (
)
const (
- UserID = "user_id"
- RoomID = "room_id"
+ UserID = "user_id"
+ RoomID = "room_id"
+ EventID = "event_id"
)
var (