aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--appservice/consumers/roomserver.go2
-rw-r--r--clientapi/clientapi.go2
-rw-r--r--eduserver/eduserver.go6
-rw-r--r--federationapi/consumers/eduserver.go6
-rw-r--r--federationapi/consumers/keychange.go4
-rw-r--r--federationapi/consumers/roomserver.go2
-rw-r--r--keyserver/keyserver.go2
-rw-r--r--roomserver/internal/api.go1
-rw-r--r--roomserver/internal/input/input.go349
-rw-r--r--roomserver/roomserver.go4
-rw-r--r--setup/config/config_jetstream.go4
-rw-r--r--setup/jetstream/nats.go31
-rw-r--r--setup/jetstream/streams.go14
-rw-r--r--syncapi/consumers/clientapi.go2
-rw-r--r--syncapi/consumers/eduserver_receipts.go2
-rw-r--r--syncapi/consumers/eduserver_sendtodevice.go2
-rw-r--r--syncapi/consumers/eduserver_typing.go2
-rw-r--r--syncapi/consumers/roomserver.go2
-rw-r--r--syncapi/consumers/userapi.go2
-rw-r--r--syncapi/syncapi.go6
-rw-r--r--userapi/consumers/syncapi_readupdate.go2
-rw-r--r--userapi/consumers/syncapi_streamevent.go2
-rw-r--r--userapi/userapi.go4
23 files changed, 323 insertions, 130 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 9d723bed..01790722 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -56,7 +56,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(),
jetstream: js,
durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
- topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
+ topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
asDB: appserviceDB,
rsAPI: rsAPI,
serverName: string(cfg.Global.ServerName),
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 75184d3b..4550343c 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -55,7 +55,7 @@ func AddPublicRoutes(
syncProducer := &producers.SyncAPIProducer{
JetStream: js,
- Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
+ Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
}
routing.Setup(
diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go
index 9fe8704c..91208a40 100644
--- a/eduserver/eduserver.go
+++ b/eduserver/eduserver.go
@@ -48,9 +48,9 @@ func NewInternalAPI(
Cache: eduCache,
UserAPI: userAPI,
JetStream: js,
- OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
- OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
- OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
+ OutputTypingEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ OutputReceiptEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
ServerName: cfg.Matrix.ServerName,
}
}
diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go
index 1f81fa25..e14e60f4 100644
--- a/federationapi/consumers/eduserver.go
+++ b/federationapi/consumers/eduserver.go
@@ -58,9 +58,9 @@ func NewOutputEDUConsumer(
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"),
- typingTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
- sendToDeviceTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
- receiptTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
+ typingTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ sendToDeviceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ receiptTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
}
}
diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go
index 33d716d2..94e45435 100644
--- a/federationapi/consumers/keychange.go
+++ b/federationapi/consumers/keychange.go
@@ -55,8 +55,8 @@ func NewKeyChangeConsumer(
return &KeyChangeConsumer{
ctx: process.Context(),
jetstream: js,
- durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"),
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
+ durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
queues: queues,
db: store,
serverName: cfg.Matrix.ServerName,
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index 989f7cf4..ff2c8e5d 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -61,7 +61,7 @@ func NewOutputRoomEventConsumer(
queues: queues,
rsAPI: rsAPI,
durable: cfg.Matrix.JetStream.Durable("FederationAPIRoomServerConsumer"),
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
}
}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index cf66bd38..c557dfba 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -46,7 +46,7 @@ func NewInternalAPI(
logrus.WithError(err).Panicf("failed to connect to key server database")
}
keyChangeProducer := &producers.KeyChange{
- Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
+ Topic: string(cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent)),
JetStream: js,
DB: db,
}
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 91001e41..f96cefcb 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -90,6 +90,7 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
r.KeyRing = keyRing
r.Inputer = &input.Inputer{
+ Cfg: r.Cfg,
ProcessContext: r.ProcessContext,
DB: r.DB,
InputRoomEventTopic: r.InputRoomEventTopic,
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index c6e35461..6a8ae6d0 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -19,6 +19,7 @@ import (
"context"
"encoding/json"
"errors"
+ "fmt"
"sync"
"time"
@@ -29,6 +30,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
@@ -45,7 +47,35 @@ var keyContentFields = map[string]string{
"m.room.member": "membership",
}
+// Inputer is responsible for consuming from the roomserver input
+// streams and processing the events. All input events are queued
+// into a single NATS stream and the order is preserved strictly.
+// The `room_id` message header will contain the room ID which will
+// be used to assign the pending event to a per-room worker.
+//
+// The input API maintains an ephemeral headers-only consumer. It
+// will speed through the stream working out which room IDs are
+// pending and create durable consumers for them. The durable
+// consumer will then be used for each room worker goroutine to
+// fetch events one by one and process them. Each room having a
+// durable consumer of its own means there is no head-of-line
+// blocking between rooms. Filtering ensures that each durable
+// consumer only receives events for the room it is interested in.
+//
+// The ephemeral consumer closely tracks the newest events. The
+// per-room durable consumers will only progress through the stream
+// as events are processed.
+//
+// A BC * -> positions of each consumer (* = ephemeral)
+// ⌄ ⌄⌄ ⌄
+// ABAABCAABCAA -> newest (letter = subject for each message)
+//
+// In this example, A is still processing an event but has two
+// pending events to process afterwards. Both B and C are caught
+// up, so they will do nothing until a new event comes in for B
+// or C.
type Inputer struct {
+ Cfg *config.RoomServer
ProcessContext *process.ProcessContext
DB storage.Database
NATSClient *nats.Conn
@@ -57,147 +87,275 @@ type Inputer struct {
ACLs *acls.ServerACLs
InputRoomEventTopic string
OutputRoomEventTopic string
- workers sync.Map // room ID -> *phony.Inbox
+ workers sync.Map // room ID -> *worker
Queryer *query.Queryer
}
-func (r *Inputer) workerForRoom(roomID string) *phony.Inbox {
- inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
- return inbox.(*phony.Inbox)
+type worker struct {
+ phony.Inbox
+ sync.Mutex
+ r *Inputer
+ roomID string
+ subscription *nats.Subscription
}
-// eventsInProgress is an in-memory map to keep a track of which events we have
-// queued up for processing. If we get a redelivery from NATS and we still have
-// the queued up item then we won't do anything with the redelivered message. If
-// we've restarted Dendrite and now this map is empty then it means that we will
-// reload pending work from NATS.
-var eventsInProgress sync.Map
+func (r *Inputer) startWorkerForRoom(roomID string) {
+ v, loaded := r.workers.LoadOrStore(roomID, &worker{
+ r: r,
+ roomID: roomID,
+ })
+ w := v.(*worker)
+ w.Lock()
+ defer w.Unlock()
+ if !loaded || w.subscription == nil {
+ consumer := r.Cfg.Matrix.JetStream.Prefixed("RoomInput" + jetstream.Tokenise(w.roomID))
+ subject := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(w.roomID))
-// onMessage is called when a new event arrives in the roomserver input stream.
-func (r *Inputer) Start() error {
- _, err := r.JetStream.Subscribe(
- r.InputRoomEventTopic,
- // We specifically don't use jetstream.WithJetStreamMessage here because we
- // queue the task off to a room-specific queue and the ACK needs to be sent
- // later, possibly with an error response to the inputter if synchronous.
- func(msg *nats.Msg) {
- roomID := msg.Header.Get("room_id")
- var inputRoomEvent api.InputRoomEvent
- if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
- _ = msg.Term()
- return
- }
+ // Create the consumer. We do this as a specific step rather than
+ // letting PullSubscribe create it for us because we need the consumer
+ // to outlive the subscription. If we do it this way, we can Bind in the
+ // next step, and when we Unsubscribe, the consumer continues to live. If
+ // we leave PullSubscribe to create the durable consumer, Unsubscribe will
+ // delete it because it thinks it "owns" it, which in turn breaks the
+ // interest-based retention storage policy.
+ // If the durable consumer already exists, this is effectively a no-op.
+ // Another interesting tid-bit here: the ACK policy is set to "all" so that
+ // if we acknowledge a message, we also acknowledge everything that comes
+ // before it. This is necessary because otherwise our consumer will never
+ // acknowledge things we filtered out for other subjects and therefore they
+ // will linger around forever.
+ if _, err := w.r.JetStream.AddConsumer(
+ r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
+ &nats.ConsumerConfig{
+ Durable: consumer,
+ AckPolicy: nats.AckAllPolicy,
+ DeliverPolicy: nats.DeliverAllPolicy,
+ FilterSubject: subject,
+ AckWait: MaximumMissingProcessingTime + (time.Second * 10),
+ },
+ ); err != nil {
+ logrus.WithError(err).Errorf("Failed to create consumer for room %q", w.roomID)
+ return
+ }
- _ = msg.InProgress()
- index := roomID + "\000" + inputRoomEvent.Event.EventID()
- if _, ok := eventsInProgress.LoadOrStore(index, struct{}{}); ok {
- // We're already waiting to deal with this event, so there's no
- // point in queuing it up again. We've notified NATS that we're
- // working on the message still, so that will have deferred the
- // redelivery by a bit.
- return
- }
+ // Bind to our durable consumer. We want to receive all messages waiting
+ // for this subject and we want to manually acknowledge them, so that we
+ // can ensure they are only cleaned up when we are done processing them.
+ sub, err := w.r.JetStream.PullSubscribe(
+ subject, consumer,
+ nats.ManualAck(),
+ nats.DeliverAll(),
+ nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
+ nats.Bind(r.InputRoomEventTopic, consumer),
+ )
+ if err != nil {
+ logrus.WithError(err).Errorf("Failed to subscribe to stream for room %q", w.roomID)
+ return
+ }
- roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
- r.workerForRoom(roomID).Act(nil, func() {
- _ = msg.InProgress() // resets the acknowledgement wait timer
- defer eventsInProgress.Delete(index)
- defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
- var errString string
- if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil {
- if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
- sentry.CaptureException(err)
- }
- logrus.WithError(err).WithFields(logrus.Fields{
- "room_id": roomID,
- "event_id": inputRoomEvent.Event.EventID(),
- "type": inputRoomEvent.Event.Type(),
- }).Warn("Roomserver failed to process async event")
- _ = msg.Term()
- errString = err.Error()
- } else {
- _ = msg.Ack()
- }
- if replyTo := msg.Header.Get("sync"); replyTo != "" {
- if err := r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
- logrus.WithError(err).WithFields(logrus.Fields{
- "room_id": roomID,
- "event_id": inputRoomEvent.Event.EventID(),
- "type": inputRoomEvent.Event.Type(),
- }).Warn("Roomserver failed to respond for sync event")
- }
- }
- })
+ // Go and start pulling messages off the queue.
+ w.subscription = sub
+ w.Act(nil, w._next)
+ }
+}
+
+// Start creates an ephemeral non-durable consumer on the roomserver
+// input topic. It is configured to deliver us headers only because we
+// don't actually care about the contents of the message at this point,
+// we only care about the `room_id` field. Once a message arrives, we
+// will look to see if we have a worker for that room which has its
+// own consumer. If we don't, we'll start one.
+func (r *Inputer) Start() error {
+ _, err := r.JetStream.Subscribe(
+ "", // This is blank because we specified it in BindStream.
+ func(m *nats.Msg) {
+ roomID := m.Header.Get(jetstream.RoomID)
+ r.startWorkerForRoom(roomID)
+ _ = m.Ack()
},
- // NATS wants to acknowledge automatically by default when the message is
- // read from the stream, but we want to override that behaviour by making
- // sure that we only acknowledge when we're happy we've done everything we
- // can. This ensures we retry things when it makes sense to do so.
- nats.ManualAck(),
- // Use a durable named consumer.
- r.Durable,
- // If we've missed things in the stream, e.g. we restarted, then replay
- // all of the queued messages that were waiting for us.
+ nats.HeadersOnly(),
nats.DeliverAll(),
- // Ensure that NATS doesn't try to resend us something that wasn't done
- // within the period of time that we might still be processing it.
- nats.AckWait(MaximumMissingProcessingTime+(time.Second*10)),
- // It is recommended to disable this for pull consumers as per the docs:
- // https://docs.nats.io/nats-concepts/jetstream/consumers#note-about-push-and-pull-consumers
- nats.MaxAckPending(-1),
+ nats.AckAll(),
+ nats.BindStream(r.InputRoomEventTopic),
)
return err
}
-// InputRoomEvents implements api.RoomserverInternalAPI
-func (r *Inputer) InputRoomEvents(
+// _next is called by the worker for the room. It must only be called
+// by the actor embedded into the worker.
+func (w *worker) _next() {
+ // Look up what the next event is that's waiting to be processed.
+ ctx, cancel := context.WithTimeout(w.r.ProcessContext.Context(), time.Minute)
+ defer cancel()
+ msgs, err := w.subscription.Fetch(1, nats.Context(ctx))
+ switch err {
+ case nil:
+ // Make sure that once we're done here, we queue up another call
+ // to _next in the inbox.
+ defer w.Act(nil, w._next)
+
+ // If no error was reported, but we didn't get exactly one message,
+ // then skip over this and try again on the next iteration.
+ if len(msgs) != 1 {
+ return
+ }
+
+ case context.DeadlineExceeded:
+ // The context exceeded, so we've been waiting for more than a
+ // minute for activity in this room. At this point we will shut
+ // down the subscriber to free up resources. It'll get started
+ // again if new activity happens.
+ if err = w.subscription.Unsubscribe(); err != nil {
+ logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
+ }
+ w.Lock()
+ w.subscription = nil
+ w.Unlock()
+ return
+
+ default:
+ // Something went wrong while trying to fetch the next event
+ // from the queue. In which case, we'll shut down the subscriber
+ // and wait to be notified about new room activity again. Maybe
+ // the problem will be corrected by then.
+ logrus.WithError(err).Errorf("Failed to get next stream message for room %q", w.roomID)
+ if err = w.subscription.Unsubscribe(); err != nil {
+ logrus.WithError(err).Errorf("Failed to unsubscribe to stream for room %q", w.roomID)
+ }
+ w.Lock()
+ w.subscription = nil
+ w.Unlock()
+ return
+ }
+
+ // Try to unmarshal the input room event. If the JSON unmarshalling
+ // fails then we'll terminate the message — this notifies NATS that
+ // we are done with the message and never want to see it again.
+ msg := msgs[0]
+ var inputRoomEvent api.InputRoomEvent
+ if err = json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
+ _ = msg.Term()
+ return
+ }
+
+ roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Inc()
+ defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": w.roomID}).Dec()
+
+ // Process the room event. If something goes wrong then we'll tell
+ // NATS to terminate the message. We'll store the error result as
+ // a string, because we might want to return that to the caller if
+ // it was a synchronous request.
+ var errString string
+ if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil {
+ if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
+ sentry.CaptureException(err)
+ }
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "room_id": w.roomID,
+ "event_id": inputRoomEvent.Event.EventID(),
+ "type": inputRoomEvent.Event.Type(),
+ }).Warn("Roomserver failed to process async event")
+ _ = msg.Term()
+ errString = err.Error()
+ } else {
+ _ = msg.Ack()
+ }
+
+ // If it was a synchronous input request then the "sync" field
+ // will be present in the message. That means that someone is
+ // waiting for a response. The temporary inbox name is present in
+ // that field, so send back the error string (if any). If there
+ // was no error then we'll return a blank message, which means
+ // that everything was OK.
+ if replyTo := msg.Header.Get("sync"); replyTo != "" {
+ if err = w.r.NATSClient.Publish(replyTo, []byte(errString)); err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "room_id": w.roomID,
+ "event_id": inputRoomEvent.Event.EventID(),
+ "type": inputRoomEvent.Event.Type(),
+ }).Warn("Roomserver failed to respond for sync event")
+ }
+ }
+}
+
+// queueInputRoomEvents queues events into the roomserver input
+// stream in NATS.
+func (r *Inputer) queueInputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
- response *api.InputRoomEventsResponse,
-) {
+) (replySub *nats.Subscription, err error) {
+ // If the request is synchronous then we need to create a
+ // temporary inbox to wait for responses on, and then create
+ // a subscription to it. If it's asynchronous then we won't
+ // bother, so these values will remain empty.
var replyTo string
- var replySub *nats.Subscription
if !request.Asynchronous {
- var err error
replyTo = nats.NewInbox()
replySub, err = r.NATSClient.SubscribeSync(replyTo)
if err != nil {
- response.ErrMsg = err.Error()
- return
+ return nil, fmt.Errorf("r.NATSClient.SubscribeSync: %w", err)
+ }
+ if replySub == nil {
+ // This shouldn't ever happen, but it doesn't hurt to check
+ // because we can potentially avoid a nil pointer panic later
+ // if it did for some reason.
+ return nil, fmt.Errorf("expected a subscription to the temporary inbox")
}
}
- var err error
+ // For each event, marshal the input room event and then
+ // send it into the input queue.
for _, e := range request.InputRoomEvents {
+ roomID := e.Event.RoomID()
+ subj := r.Cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEventSubj(roomID))
msg := &nats.Msg{
- Subject: r.InputRoomEventTopic,
+ Subject: subj,
Header: nats.Header{},
- Reply: replyTo,
}
- roomID := e.Event.RoomID()
msg.Header.Set("room_id", roomID)
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
msg.Data, err = json.Marshal(e)
if err != nil {
- response.ErrMsg = err.Error()
- return
+ return nil, fmt.Errorf("json.Marshal: %w", err)
}
- if _, err = r.JetStream.PublishMsg(msg); err != nil {
+ if _, err = r.JetStream.PublishMsg(msg, nats.Context(ctx)); err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"room_id": roomID,
"event_id": e.Event.EventID(),
+ "subj": subj,
}).Error("Roomserver failed to queue async event")
- return
+ return nil, fmt.Errorf("r.JetStream.PublishMsg: %w", err)
}
}
+ return
+}
- if request.Asynchronous || replySub == nil {
+// InputRoomEvents implements api.RoomserverInternalAPI
+func (r *Inputer) InputRoomEvents(
+ ctx context.Context,
+ request *api.InputRoomEventsRequest,
+ response *api.InputRoomEventsResponse,
+) {
+ // Queue up the event into the roomserver.
+ replySub, err := r.queueInputRoomEvents(ctx, request)
+ if err != nil {
+ response.ErrMsg = err.Error()
return
}
+ // If we aren't waiting for synchronous responses then we can
+ // give up here, there is nothing further to do.
+ if replySub == nil {
+ return
+ }
+
+ // Otherwise, we'll want to sit and wait for the responses
+ // from the roomserver. There will be one response for every
+ // input we submitted. The last error value we receive will
+ // be the one returned as the error string.
defer replySub.Drain() // nolint:errcheck
for i := 0; i < len(request.InputRoomEvents); i++ {
msg, err := replySub.NextMsgWithContext(ctx)
@@ -207,7 +365,6 @@ func (r *Inputer) InputRoomEvents(
}
if len(msg.Data) > 0 {
response.ErrMsg = string(msg.Data)
- return
}
}
}
diff --git a/roomserver/roomserver.go b/roomserver/roomserver.go
index 896773ba..36e3c526 100644
--- a/roomserver/roomserver.go
+++ b/roomserver/roomserver.go
@@ -54,8 +54,8 @@ func NewInternalAPI(
return internal.NewRoomserverAPI(
base.ProcessContext, cfg, roomserverDB, js, nc,
- cfg.Matrix.JetStream.TopicFor(jetstream.InputRoomEvent),
- cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
+ cfg.Matrix.JetStream.Prefixed(jetstream.InputRoomEvent),
+ cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
base.Caches, perspectiveServerNames,
)
}
diff --git a/setup/config/config_jetstream.go b/setup/config/config_jetstream.go
index 9271cd8b..b6a93d39 100644
--- a/setup/config/config_jetstream.go
+++ b/setup/config/config_jetstream.go
@@ -19,12 +19,12 @@ type JetStream struct {
InMemory bool `yaml:"in_memory"`
}
-func (c *JetStream) TopicFor(name string) string {
+func (c *JetStream) Prefixed(name string) string {
return fmt.Sprintf("%s%s", c.TopicPrefix, name)
}
func (c *JetStream) Durable(name string) string {
- return c.TopicFor(name)
+ return c.Prefixed(name)
}
func (c *JetStream) Defaults(generate bool) {
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 43cc0331..748c191b 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -1,6 +1,7 @@
package jetstream
import (
+ "reflect"
"strings"
"sync"
"time"
@@ -75,14 +76,35 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
}
for _, stream := range streams { // streams are defined in streams.go
- name := cfg.TopicFor(stream.Name)
+ name := cfg.Prefixed(stream.Name)
info, err := s.StreamInfo(name)
if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info")
}
+ subjects := stream.Subjects
+ if len(subjects) == 0 {
+ // By default we want each stream to listen for the subjects
+ // that are either an exact match for the stream name, or where
+ // the first part of the subject is the stream name. ">" is a
+ // wildcard in NATS for one or more subject tokens. In the case
+ // that the stream is called "Foo", this will match any message
+ // with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
+ subjects = []string{name, name + ".>"}
+ }
+ if info != nil {
+ switch {
+ case !reflect.DeepEqual(info.Config.Subjects, subjects):
+ fallthrough
+ case info.Config.Retention != stream.Retention:
+ fallthrough
+ case info.Config.Storage != stream.Storage:
+ if err = s.DeleteStream(name); err != nil {
+ logrus.WithError(err).Fatal("Unable to delete stream")
+ }
+ info = nil
+ }
+ }
if info == nil {
- stream.Subjects = []string{name}
-
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {
@@ -93,8 +115,9 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (natsclient.JetStream
// array, otherwise we end up with namespaces on namespaces.
namespaced := *stream
namespaced.Name = name
+ namespaced.Subjects = subjects
if _, err = s.AddStream(&namespaced); err != nil {
- logrus.WithError(err).WithField("stream", name).Fatal("Unable to add stream")
+ logrus.WithError(err).WithField("stream", name).WithField("subjects", subjects).Fatal("Unable to add stream")
}
}
}
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
index aa3e95cb..aa979924 100644
--- a/setup/jetstream/streams.go
+++ b/setup/jetstream/streams.go
@@ -1,6 +1,8 @@
package jetstream
import (
+ "fmt"
+ "regexp"
"time"
"github.com/nats-io/nats.go"
@@ -24,10 +26,20 @@ var (
OutputReadUpdate = "OutputReadUpdate"
)
+var safeCharacters = regexp.MustCompile("[^A-Za-z0-9$]+")
+
+func Tokenise(str string) string {
+ return safeCharacters.ReplaceAllString(str, "_")
+}
+
+func InputRoomEventSubj(roomID string) string {
+ return fmt.Sprintf("%s.%s", InputRoomEvent, Tokenise(roomID))
+}
+
var streams = []*nats.StreamConfig{
{
Name: InputRoomEvent,
- Retention: nats.WorkQueuePolicy,
+ Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index fcb7b5b1..40c1cd3d 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -61,7 +61,7 @@ func NewOutputClientDataConsumer(
return &OutputClientDataConsumer{
ctx: process.Context(),
jetstream: js,
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
db: store,
notifier: notifier,
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go
index 4e4c61c6..ab79998e 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/eduserver_receipts.go
@@ -62,7 +62,7 @@ func NewOutputReceiptEventConsumer(
return &OutputReceiptEventConsumer{
ctx: process.Context(),
jetstream: js,
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
db: store,
notifier: notifier,
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go
index b0beef06..bdbe7735 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/eduserver_sendtodevice.go
@@ -57,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
return &OutputSendToDeviceEventConsumer{
ctx: process.Context(),
jetstream: js,
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
db: store,
serverName: cfg.Matrix.ServerName,
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go
index cae5df8a..c2828c7f 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/eduserver_typing.go
@@ -56,7 +56,7 @@ func NewOutputTypingEventConsumer(
return &OutputTypingEventConsumer{
ctx: process.Context(),
jetstream: js,
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
eduCache: eduCache,
notifier: notifier,
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 640c505c..5bdc0fad 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -65,7 +65,7 @@ func NewOutputRoomEventConsumer(
ctx: process.Context(),
cfg: cfg,
jetstream: js,
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"),
db: store,
notifier: notifier,
diff --git a/syncapi/consumers/userapi.go b/syncapi/consumers/userapi.go
index a3b2dd53..010fa7c8 100644
--- a/syncapi/consumers/userapi.go
+++ b/syncapi/consumers/userapi.go
@@ -56,7 +56,7 @@ func NewOutputNotificationDataConsumer(
ctx: process.Context(),
jetstream: js,
durable: cfg.Matrix.JetStream.Durable("SyncAPINotificationDataConsumer"),
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
db: store,
notifier: notifier,
stream: stream,
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index f1f82722..ed8118bf 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -67,18 +67,18 @@ func AddPublicRoutes(
userAPIStreamEventProducer := &producers.UserAPIStreamEventProducer{
JetStream: js,
- Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
+ Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
}
userAPIReadUpdateProducer := &producers.UserAPIReadProducer{
JetStream: js,
- Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate),
+ Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
}
_ = userAPIReadUpdateProducer
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
+ process, cfg, cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
js, keyAPI, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider,
)
diff --git a/userapi/consumers/syncapi_readupdate.go b/userapi/consumers/syncapi_readupdate.go
index 2e58020b..067f9333 100644
--- a/userapi/consumers/syncapi_readupdate.go
+++ b/userapi/consumers/syncapi_readupdate.go
@@ -47,7 +47,7 @@ func NewOutputReadUpdateConsumer(
db: store,
ServerName: cfg.Matrix.ServerName,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIReadUpdateConsumer"),
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReadUpdate),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReadUpdate),
pgClient: pgClient,
userAPI: userAPI,
syncProducer: syncProducer,
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go
index 11081327..da3cd393 100644
--- a/userapi/consumers/syncapi_streamevent.go
+++ b/userapi/consumers/syncapi_streamevent.go
@@ -54,7 +54,7 @@ func NewOutputStreamEventConsumer(
jetstream: js,
db: store,
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
- topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
pgClient: pgClient,
userAPI: userAPI,
rsAPI: rsAPI,
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 97bdf7b2..e91ce3a7 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -54,8 +54,8 @@ func NewInternalAPI(
// it's handled by clientapi, and hence uses its topic. When user
// API handles it for all account data, we can remove it from
// here.
- cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
- cfg.Matrix.JetStream.TopicFor(jetstream.OutputNotificationData),
+ cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
+ cfg.Matrix.JetStream.Prefixed(jetstream.OutputNotificationData),
)
userAPI := &internal.UserInternalAPI{