diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-09-01 09:20:40 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-01 09:20:40 +0100 |
commit | ad6b902b8462adb568d799c69a74b60d69574d0c (patch) | |
tree | 9037eb130a47c25cb320116758baa6ee265e89b6 /setup/jetstream | |
parent | 175f65407a7f684753334022e66b8209f3db7396 (diff) |
Refactor appservices component (#2687)
This PR refactors the app services component. It makes the following changes:
* Each appservice now gets its own NATS JetStream consumer
* The appservice database is now removed entirely, since we just use JetStream as a data source instead
* The entire component is now much simpler and we deleted lots of lines of code 💅
The result is that it should be much lighter and hopefully much more performant.
Diffstat (limited to 'setup/jetstream')
-rw-r--r-- | setup/jetstream/helpers.go | 35 | ||||
-rw-r--r-- | setup/jetstream/nats.go | 1 |
2 files changed, 17 insertions, 19 deletions
diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index f47637c6..1ec860b0 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -34,14 +34,6 @@ func JetStreamConsumer( } }() - // If the batch size is greater than 1, we will want to acknowledge all - // received messages in the batch. Below we will send an acknowledgement - // for the most recent message in the batch and AckAll will ensure that - // all messages that came before it are also acknowledged implicitly. - if batch > 1 { - opts = append(opts, nats.AckAll()) - } - name := durable + "Pull" sub, err := js.PullSubscribe(subj, name, opts...) if err != nil { @@ -89,21 +81,26 @@ func JetStreamConsumer( if len(msgs) < 1 { continue } - msg := msgs[len(msgs)-1] // most recent message, in case of AckAll - if err = msg.InProgress(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) - sentry.CaptureException(err) - continue + for _, msg := range msgs { + if err = msg.InProgress(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) + sentry.CaptureException(err) + continue + } } if f(ctx, msgs) { - if err = msg.AckSync(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) - sentry.CaptureException(err) + for _, msg := range msgs { + if err = msg.AckSync(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) + sentry.CaptureException(err) + } } } else { - if err = msg.Nak(nats.Context(ctx)); err != nil { - logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) - sentry.CaptureException(err) + for _, msg := range msgs { + if err = msg.Nak(nats.Context(ctx)); err != nil { + logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) + sentry.CaptureException(err) + } } } } diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go index 051d55a3..3660e91e 100644 --- a/setup/jetstream/nats.go +++ b/setup/jetstream/nats.go @@ -183,6 +183,7 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc OutputReceiptEvent: {"SyncAPIEDUServerReceiptConsumer", "FederationAPIEDUServerConsumer"}, OutputSendToDeviceEvent: {"SyncAPIEDUServerSendToDeviceConsumer", "FederationAPIEDUServerConsumer"}, OutputTypingEvent: {"SyncAPIEDUServerTypingConsumer", "FederationAPIEDUServerConsumer"}, + OutputRoomEvent: {"AppserviceRoomserverConsumer"}, } { streamName := cfg.Matrix.JetStream.Prefixed(stream) for _, consumer := range consumers { |