diff options
Diffstat (limited to 'setup/jetstream')
-rw-r--r-- | setup/jetstream/helpers.go | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 1c07583e..f47637c6 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -9,9 +9,16 @@ import ( "github.com/sirupsen/logrus" ) +// JetStreamConsumer starts a durable consumer on the given subject with the +// given durable name. The function will be called when one or more messages +// is available, up to the maximum batch size specified. If the batch is set to +// 1 then messages will be delivered one at a time. If the function is called, +// the messages array is guaranteed to be at least 1 in size. Any provided NATS +// options will be passed through to the pull subscriber creation. The consumer +// will continue to run until the context expires, at which point it will stop. func JetStreamConsumer( - ctx context.Context, js nats.JetStreamContext, subj, durable string, - f func(ctx context.Context, msg *nats.Msg) bool, + ctx context.Context, js nats.JetStreamContext, subj, durable string, batch int, + f func(ctx context.Context, msgs []*nats.Msg) bool, opts ...nats.SubOpt, ) error { defer func() { @@ -27,6 +34,14 @@ func JetStreamConsumer( } }() + // If the batch size is greater than 1, we will want to acknowledge all + // received messages in the batch. Below we will send an acknowledgement + // for the most recent message in the batch and AckAll will ensure that + // all messages that came before it are also acknowledged implicitly. + if batch > 1 { + opts = append(opts, nats.AckAll()) + } + name := durable + "Pull" sub, err := js.PullSubscribe(subj, name, opts...) if err != nil { @@ -50,7 +65,7 @@ func JetStreamConsumer( // enforce its own deadline (roughly 5 seconds by default). Therefore // it is our responsibility to check whether our context expired or // not when a context error is returned. Footguns. Footguns everywhere. - msgs, err := sub.Fetch(1, nats.Context(ctx)) + msgs, err := sub.Fetch(batch, nats.Context(ctx)) if err != nil { if err == context.Canceled || err == context.DeadlineExceeded { // Work out whether it was the JetStream context that expired @@ -74,13 +89,13 @@ func JetStreamConsumer( if len(msgs) < 1 { continue } - msg := msgs[0] + msg := msgs[len(msgs)-1] // most recent message, in case of AckAll if err = msg.InProgress(nats.Context(ctx)); err != nil { logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) sentry.CaptureException(err) continue } - if f(ctx, msg) { + if f(ctx, msgs) { if err = msg.AckSync(nats.Context(ctx)); err != nil { logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) sentry.CaptureException(err) |