aboutsummaryrefslogtreecommitdiff
path: root/userapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-08-31 12:21:56 +0100
committerGitHub <noreply@github.com>2022-08-31 12:21:56 +0100
commit175f65407a7f684753334022e66b8209f3db7396 (patch)
treecaebde61a179b12e8ddc161359cbb8d9682aab96 /userapi
parentba0b3adab4de7865afd467b61638437b1af39fce (diff)
Allow batching in `JetStreamConsumer` (#2686)
This allows us to receive more than one message from NATS at a time if we want.
Diffstat (limited to 'userapi')
-rw-r--r--userapi/consumers/syncapi_readupdate.go7
-rw-r--r--userapi/consumers/syncapi_streamevent.go7
2 files changed, 8 insertions, 6 deletions
diff --git a/userapi/consumers/syncapi_readupdate.go b/userapi/consumers/syncapi_readupdate.go
index 067f9333..54654f75 100644
--- a/userapi/consumers/syncapi_readupdate.go
+++ b/userapi/consumers/syncapi_readupdate.go
@@ -56,15 +56,16 @@ func NewOutputReadUpdateConsumer(
func (s *OutputReadUpdateConsumer) Start() error {
if err := jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
- nats.DeliverAll(), nats.ManualAck(),
+ s.ctx, s.jetstream, s.topic, s.durable, 1,
+ s.onMessage, nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
}
-func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (s *OutputReadUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
var read types.ReadUpdate
if err := json.Unmarshal(msg.Data, &read); err != nil {
log.WithError(err).Error("userapi clientapi consumer: message parse failure")
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go
index ec351ef7..3ac6f58d 100644
--- a/userapi/consumers/syncapi_streamevent.go
+++ b/userapi/consumers/syncapi_streamevent.go
@@ -65,15 +65,16 @@ func NewOutputStreamEventConsumer(
func (s *OutputStreamEventConsumer) Start() error {
if err := jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
- nats.DeliverAll(), nats.ManualAck(),
+ s.ctx, s.jetstream, s.topic, s.durable, 1,
+ s.onMessage, nats.DeliverAll(), nats.ManualAck(),
); err != nil {
return err
}
return nil
}
-func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
var output types.StreamedEvent
output.Event = &gomatrixserverlib.HeaderedEvent{}
if err := json.Unmarshal(msg.Data, &output); err != nil {