diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-04-27 15:29:49 +0100 |
---|---|---|
committer | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-04-27 15:29:49 +0100 |
commit | 923f789ca3174a685bd53ce5e64a5e86cabd38cb (patch) | |
tree | 77dedd2028e257e3c1c24f77e19d889189ec38ad /setup | |
parent | 103795d33a09728d7619e73014d507505ff121e2 (diff) |
Fix graceful shutdown
Diffstat (limited to 'setup')
-rw-r--r-- | setup/base/base.go | 12 | ||||
-rw-r--r-- | setup/jetstream/helpers.go | 16 |
2 files changed, 19 insertions, 9 deletions
diff --git a/setup/base/base.go b/setup/base/base.go index 03ea2ad7..e67b034a 100644 --- a/setup/base/base.go +++ b/setup/base/base.go @@ -469,14 +469,14 @@ func (b *BaseDendrite) SetupAndServeHTTP( } minwinsvc.SetOnExit(b.ProcessContext.ShutdownDendrite) - b.WaitForShutdown() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - _ = internalServ.Shutdown(ctx) - _ = externalServ.Shutdown(ctx) + <-b.ProcessContext.WaitForShutdown() + logrus.Infof("Stopping HTTP listeners") + _ = internalServ.Shutdown(context.Background()) + _ = externalServ.Shutdown(context.Background()) logrus.Infof("Stopped HTTP listeners") + + b.WaitForShutdown() } func (b *BaseDendrite) WaitForShutdown() { diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go index 78cecb6a..1c07583e 100644 --- a/setup/jetstream/helpers.go +++ b/setup/jetstream/helpers.go @@ -35,6 +35,16 @@ func JetStreamConsumer( } go func() { for { + // If the parent context has given up then there's no point in + // carrying on doing anything, so stop the listener. + select { + case <-ctx.Done(): + if err := sub.Unsubscribe(); err != nil { + logrus.WithContext(ctx).Warnf("Failed to unsubscribe %q", durable) + } + return + default: + } // The context behaviour here is surprising — we supply a context // so that we can interrupt the fetch if we want, but NATS will still // enforce its own deadline (roughly 5 seconds by default). Therefore @@ -65,18 +75,18 @@ func JetStreamConsumer( continue } msg := msgs[0] - if err = msg.InProgress(); err != nil { + if err = msg.InProgress(nats.Context(ctx)); err != nil { logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err)) sentry.CaptureException(err) continue } if f(ctx, msg) { - if err = msg.AckSync(); err != nil { + if err = msg.AckSync(nats.Context(ctx)); err != nil { logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err)) sentry.CaptureException(err) } } else { - if err = msg.Nak(); err != nil { + if err = msg.Nak(nats.Context(ctx)); err != nil { logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err)) sentry.CaptureException(err) } |