aboutsummaryrefslogtreecommitdiff
path: root/setup
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-04-27 15:29:49 +0100
committerNeil Alexander <neilalexander@users.noreply.github.com>2022-04-27 15:29:49 +0100
commit923f789ca3174a685bd53ce5e64a5e86cabd38cb (patch)
tree77dedd2028e257e3c1c24f77e19d889189ec38ad /setup
parent103795d33a09728d7619e73014d507505ff121e2 (diff)
Fix graceful shutdown
Diffstat (limited to 'setup')
-rw-r--r--setup/base/base.go12
-rw-r--r--setup/jetstream/helpers.go16
2 files changed, 19 insertions, 9 deletions
diff --git a/setup/base/base.go b/setup/base/base.go
index 03ea2ad7..e67b034a 100644
--- a/setup/base/base.go
+++ b/setup/base/base.go
@@ -469,14 +469,14 @@ func (b *BaseDendrite) SetupAndServeHTTP(
}
minwinsvc.SetOnExit(b.ProcessContext.ShutdownDendrite)
- b.WaitForShutdown()
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- _ = internalServ.Shutdown(ctx)
- _ = externalServ.Shutdown(ctx)
+ <-b.ProcessContext.WaitForShutdown()
+ logrus.Infof("Stopping HTTP listeners")
+ _ = internalServ.Shutdown(context.Background())
+ _ = externalServ.Shutdown(context.Background())
logrus.Infof("Stopped HTTP listeners")
+
+ b.WaitForShutdown()
}
func (b *BaseDendrite) WaitForShutdown() {
diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go
index 78cecb6a..1c07583e 100644
--- a/setup/jetstream/helpers.go
+++ b/setup/jetstream/helpers.go
@@ -35,6 +35,16 @@ func JetStreamConsumer(
}
go func() {
for {
+ // If the parent context has given up then there's no point in
+ // carrying on doing anything, so stop the listener.
+ select {
+ case <-ctx.Done():
+ if err := sub.Unsubscribe(); err != nil {
+ logrus.WithContext(ctx).Warnf("Failed to unsubscribe %q", durable)
+ }
+ return
+ default:
+ }
// The context behaviour here is surprising — we supply a context
// so that we can interrupt the fetch if we want, but NATS will still
// enforce its own deadline (roughly 5 seconds by default). Therefore
@@ -65,18 +75,18 @@ func JetStreamConsumer(
continue
}
msg := msgs[0]
- if err = msg.InProgress(); err != nil {
+ if err = msg.InProgress(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
sentry.CaptureException(err)
continue
}
if f(ctx, msg) {
- if err = msg.AckSync(); err != nil {
+ if err = msg.AckSync(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.AckSync: %w", err))
sentry.CaptureException(err)
}
} else {
- if err = msg.Nak(); err != nil {
+ if err = msg.Nak(nats.Context(ctx)); err != nil {
logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
sentry.CaptureException(err)
}