aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream
diff options
context:
space:
mode:
Diffstat (limited to 'setup/jetstream')
-rw-r--r--setup/jetstream/helpers.go25
1 files changed, 20 insertions, 5 deletions
diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go
index 1c07583e..f47637c6 100644
--- a/setup/jetstream/helpers.go
+++ b/setup/jetstream/helpers.go
@@ -9,9 +9,16 @@ import (
"github.com/sirupsen/logrus"
)
+// JetStreamConsumer starts a durable consumer on the given subject with the
+// given durable name. The function will be called when one or more messages
+// is available, up to the maximum batch size specified. If the batch is set to
+// 1 then messages will be delivered one at a time. If the function is called,
+// the messages array is guaranteed to be at least 1 in size. Any provided NATS
+// options will be passed through to the pull subscriber creation. The consumer
+// will continue to run until the context expires, at which point it will stop.
func JetStreamConsumer(
- ctx context.Context, js nats.JetStreamContext, subj, durable string,
- f func(ctx context.Context, msg *nats.Msg) bool,
+ ctx context.Context, js nats.JetStreamContext, subj, durable string, batch int,
+ f func(ctx context.Context, msgs []*nats.Msg) bool,
opts ...nats.SubOpt,
) error {
defer func() {
@@ -27,6 +34,14 @@ func JetStreamConsumer(
}
}()
+ // If the batch size is greater than 1, we will want to acknowledge all
+ // received messages in the batch. Below we will send an acknowledgement
+ // for the most recent message in the batch and AckAll will ensure that
+ // all messages that came before it are also acknowledged implicitly.
+ if batch > 1 {
+ opts = append(opts, nats.AckAll())
+ }
+
name := durable + "Pull"
sub, err := js.PullSubscribe(subj, name, opts...)
if err != nil {
@@ -50,7 +65,7 @@ func JetStreamConsumer(
// enforce its own deadline (roughly 5 seconds by default). Therefore
// it is our responsibility to check whether our context expired or
// not when a context error is returned. Footguns. Footguns everywhere.
- msgs, err := sub.Fetch(1, nats.Context(ctx))
+ msgs, err := sub.Fetch(batch, nats.Context(ctx))
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
// Work out whether it was the JetStream context that expired
@@ -74,13 +89,13 @@ func JetStreamConsumer(
if len(msgs) < 1 {
continue
}
- msg := msgs[0]
+ msg := msgs[len(msgs)-1] // most recent message, in case of AckAll
if err = msg.InProgress(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
sentry.CaptureException(err)
continue
}
- if f(ctx, msg) {
+ if f(ctx, msgs) {
if err = msg.AckSync(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
sentry.CaptureException(err)