aboutsummaryrefslogtreecommitdiff
path: root/setup/jetstream
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-02-02 13:32:48 +0000
committerGitHub <noreply@github.com>2022-02-02 13:32:48 +0000
commitc773b038bb1432f2265759ddf1da5e98b9bda525 (patch)
tree357c7dac69f4f87dda61bbaf004ae3351d5277d8 /setup/jetstream
parent2dee706f9ef2de70516dbc993dcfc8ec6f7fdd52 (diff)
Use pull consumers (#2140)
* Pull consumers * Pull consumers * Only nuke consumers if they are push consumers * Clean up old consumers * Better error handling * Update comments
Diffstat (limited to 'setup/jetstream')
-rw-r--r--setup/jetstream/helpers.go85
1 files changed, 77 insertions, 8 deletions
diff --git a/setup/jetstream/helpers.go b/setup/jetstream/helpers.go
index 1891b96b..544b5f0c 100644
--- a/setup/jetstream/helpers.go
+++ b/setup/jetstream/helpers.go
@@ -1,12 +1,81 @@
package jetstream
-import "github.com/nats-io/nats.go"
-
-func WithJetStreamMessage(msg *nats.Msg, f func(msg *nats.Msg) bool) {
- _ = msg.InProgress()
- if f(msg) {
- _ = msg.Ack()
- } else {
- _ = msg.Nak()
+import (
+ "context"
+ "fmt"
+
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
+)
+
+func JetStreamConsumer(
+ ctx context.Context, js nats.JetStreamContext, subj, durable string,
+ f func(ctx context.Context, msg *nats.Msg) bool,
+ opts ...nats.SubOpt,
+) error {
+ defer func() {
+ // If there are existing consumers from before they were pull
+ // consumers, we need to clean up the old push consumers. However,
+ // in order to not affect the interest-based policies, we need to
+ // do this *after* creating the new pull consumers, which have
+ // "Pull" suffixed to their name.
+ if _, err := js.ConsumerInfo(subj, durable); err == nil {
+ if err := js.DeleteConsumer(subj, durable); err != nil {
+ logrus.WithContext(ctx).Warnf("Failed to clean up old consumer %q", durable)
+ }
+ }
+ }()
+
+ name := durable + "Pull"
+ sub, err := js.PullSubscribe(subj, name, opts...)
+ if err != nil {
+ return fmt.Errorf("nats.SubscribeSync: %w", err)
}
+ go func() {
+ for {
+ // The context behaviour here is surprising — we supply a context
+ // so that we can interrupt the fetch if we want, but NATS will still
+ // enforce its own deadline (roughly 5 seconds by default). Therefore
+ // it is our responsibility to check whether our context expired or
+ // not when a context error is returned. Footguns. Footguns everywhere.
+ msgs, err := sub.Fetch(1, nats.Context(ctx))
+ if err != nil {
+ if err == context.Canceled || err == context.DeadlineExceeded {
+ // Work out whether it was the JetStream context that expired
+ // or whether it was our supplied context.
+ select {
+ case <-ctx.Done():
+ // The supplied context expired, so we want to stop the
+ // consumer altogether.
+ return
+ default:
+ // The JetStream context expired, so the fetch probably
+ // just timed out and we should try again.
+ continue
+ }
+ } else {
+ // Something else went wrong, so we'll panic.
+ logrus.WithContext(ctx).WithField("subject", subj).Fatal(err)
+ }
+ }
+ if len(msgs) < 1 {
+ continue
+ }
+ msg := msgs[0]
+ if err = msg.InProgress(); err != nil {
+ logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.InProgress: %w", err))
+ continue
+ }
+ if f(ctx, msg) {
+ if err = msg.Ack(); err != nil {
+ logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Ack: %w", err))
+ }
+ } else {
+ if err = msg.Nak(); err != nil {
+ logrus.WithContext(ctx).WithField("subject", subj).Warn(fmt.Errorf("msg.Nak: %w", err))
+ }
+ }
+ }
+ }()
+ return nil
}