aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-09-01 09:20:40 +0100
committerGitHub <noreply@github.com>2022-09-01 09:20:40 +0100
commitad6b902b8462adb568d799c69a74b60d69574d0c (patch)
tree9037eb130a47c25cb320116758baa6ee265e89b6 /setup/jetstream
parent175f65407a7f684753334022e66b8209f3db7396 (diff)
Refactor appservices component (#2687)
This PR refactors the app services component. It makes the following changes: * Each appservice now gets its own NATS JetStream consumer * The appservice database is now removed entirely, since we just use JetStream as a data source instead * The entire component is now much simpler and we deleted lots of lines of code 💅 The result is that it should be much lighter and hopefully much more performant.
Diffstat (limited to 'setup/jetstream')
-rw-r--r--setup/jetstream/helpers.go35
-rw-r--r--setup/jetstream/nats.go1
2 files changed, 17 insertions, 19 deletions
diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go
index f47637c6..1ec860b0 100644
--- a/setup/jetstream/helpers.go
+++ b/setup/jetstream/helpers.go
@@ -34,14 +34,6 @@ func JetStreamConsumer(
}
}()
- // If the batch size is greater than 1, we will want to acknowledge all
- // received messages in the batch. Below we will send an acknowledgement
- // for the most recent message in the batch and AckAll will ensure that
- // all messages that came before it are also acknowledged implicitly.
- if batch > 1 {
- opts = append(opts, nats.AckAll())
- }
-
name := durable + "Pull"
sub, err := js.PullSubscribe(subj, name, opts...)
if err != nil {
@@ -89,21 +81,26 @@ func JetStreamConsumer(
if len(msgs) < 1 {
continue
}
- msg := msgs[len(msgs)-1] // most recent message, in case of AckAll
- if err = msg.InProgress(nats.Context(ctx)); err != nil {
- logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
- sentry.CaptureException(err)
- continue
+ for _, msg := range msgs {
+ if err = msg.InProgress(nats.Context(ctx)); err != nil {
+ logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
+ sentry.CaptureException(err)
+ continue
+ }
}
if f(ctx, msgs) {
- if err = msg.AckSync(nats.Context(ctx)); err != nil {
- logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
- sentry.CaptureException(err)
+ for _, msg := range msgs {
+ if err = msg.AckSync(nats.Context(ctx)); err != nil {
+ logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
+ sentry.CaptureException(err)
+ }
}
} else {
- if err = msg.Nak(nats.Context(ctx)); err != nil {
- logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
- sentry.CaptureException(err)
+ for _, msg := range msgs {
+ if err = msg.Nak(nats.Context(ctx)); err != nil {
+ logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
+ sentry.CaptureException(err)
+ }
}
}
}
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 051d55a3..3660e91e 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -183,6 +183,7 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
OutputReceiptEvent: {"SyncAPIEDUServerReceiptConsumer", "FederationAPIEDUServerConsumer"},
OutputSendToDeviceEvent: {"SyncAPIEDUServerSendToDeviceConsumer", "FederationAPIEDUServerConsumer"},
OutputTypingEvent: {"SyncAPIEDUServerTypingConsumer", "FederationAPIEDUServerConsumer"},
+ OutputRoomEvent: {"AppserviceRoomserverConsumer"},
} {
streamName := cfg.Matrix.JetStream.Prefixed(stream)
for _, consumer := range consumers {