diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-08-31 12:21:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-31 12:21:56 +0100 |
commit | 175f65407a7f684753334022e66b8209f3db7396 (patch) | |
tree | caebde61a179b12e8ddc161359cbb8d9682aab96 /keyserver | |
parent | ba0b3adab4de7865afd467b61638437b1af39fce (diff) |
Allow batching in `JetStreamConsumer` (#2686)
This allows us to receive more than one message from NATS at a time if we want.
Diffstat (limited to 'keyserver')
-rw-r--r-- | keyserver/consumers/devicelistupdate.go | 7 |
1 files changed, 4 insertions, 3 deletions
diff --git a/keyserver/consumers/devicelistupdate.go b/keyserver/consumers/devicelistupdate.go index f4f24628..d15f9426 100644 --- a/keyserver/consumers/devicelistupdate.go +++ b/keyserver/consumers/devicelistupdate.go @@ -55,14 +55,15 @@ func NewDeviceListUpdateConsumer( // Start consuming from key servers func (t *DeviceListUpdateConsumer) Start() error { return jetstream.JetStreamConsumer( - t.ctx, t.jetstream, t.topic, t.durable, t.onMessage, - nats.DeliverAll(), nats.ManualAck(), + t.ctx, t.jetstream, t.topic, t.durable, 1, + t.onMessage, nats.DeliverAll(), nats.ManualAck(), ) } // onMessage is called in response to a message received on the // key change events topic from the key server. -func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { +func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { + msg := msgs[0] // Guaranteed to exist if onMessage is called var m gomatrixserverlib.DeviceListUpdateEvent if err := json.Unmarshal(msg.Data, &m); err != nil { logrus.WithError(err).Errorf("Failed to read from device list update input topic") |