aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-04-27 15:29:49 +0100
committerNeil Alexander <neilalexander@users.noreply.github.com>2022-04-27 15:29:49 +0100
commit923f789ca3174a685bd53ce5e64a5e86cabd38cb (patch)
tree77dedd2028e257e3c1c24f77e19d889189ec38ad /setup/jetstream
parent103795d33a09728d7619e73014d507505ff121e2 (diff)
Fix graceful shutdown
Diffstat (limited to 'setup/jetstream')
-rw-r--r--setup/jetstream/helpers.go16
1 files changed, 13 insertions, 3 deletions
diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go
index 78cecb6a..1c07583e 100644
--- a/setup/jetstream/helpers.go
+++ b/setup/jetstream/helpers.go
@@ -35,6 +35,16 @@ func JetStreamConsumer(
}
go func() {
for {
+ // If the parent context has given up then there's no point in
+ // carrying on doing anything, so stop the listener.
+ select {
+ case <-ctx.Done():
+ if err := sub.Unsubscribe(); err != nil {
+ logrus.WithContext(ctx).Warnf("Failed to unsubscribe %q", durable)
+ }
+ return
+ default:
+ }
// The context behaviour here is surprising — we supply a context
// so that we can interrupt the fetch if we want, but NATS will still
// enforce its own deadline (roughly 5 seconds by default). Therefore
@@ -65,18 +75,18 @@ func JetStreamConsumer(
continue
}
msg := msgs[0]
- if err = msg.InProgress(); err != nil {
+ if err = msg.InProgress(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
sentry.CaptureException(err)
continue
}
if f(ctx, msg) {
- if err = msg.AckSync(); err != nil {
+ if err = msg.AckSync(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
sentry.CaptureException(err)
}
} else {
- if err = msg.Nak(); err != nil {
+ if err = msg.Nak(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
sentry.CaptureException(err)
}