aboutsummaryrefslogtreecommitdiff
path: root/federationapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-08-31 12:21:56 +0100
committerGitHub <noreply@github.com>2022-08-31 12:21:56 +0100
commit175f65407a7f684753334022e66b8209f3db7396 (patch)
treecaebde61a179b12e8ddc161359cbb8d9682aab96 /federationapi
parentba0b3adab4de7865afd467b61638437b1af39fce (diff)
Allow batching in `JetStreamConsumer` (#2686)
This allows us to receive more than one message from NATS at a time if we want.
Diffstat (limited to 'federationapi')
-rw-r--r--federationapi/consumers/keychange.go7
-rw-r--r--federationapi/consumers/presence.go5
-rw-r--r--federationapi/consumers/receipts.go5
-rw-r--r--federationapi/consumers/roomserver.go7
-rw-r--r--federationapi/consumers/sendtodevice.go7
-rw-r--r--federationapi/consumers/typing.go5
6 files changed, 21 insertions, 15 deletions
diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go
index 6d3cf0e4..f3314bc9 100644
--- a/federationapi/consumers/keychange.go
+++ b/federationapi/consumers/keychange.go
@@ -67,14 +67,15 @@ func NewKeyChangeConsumer(
// Start consuming from key servers
func (t *KeyChangeConsumer) Start() error {
return jetstream.JetStreamConsumer(
- t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
- nats.DeliverAll(), nats.ManualAck(),
+ t.ctx, t.jetstream, t.topic, t.durable, 1,
+ t.onMessage, nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
-func (t *KeyChangeConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (t *KeyChangeConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
var m api.DeviceMessage
if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go
index a65d2aa0..e76103cd 100644
--- a/federationapi/consumers/presence.go
+++ b/federationapi/consumers/presence.go
@@ -69,14 +69,15 @@ func (t *OutputPresenceConsumer) Start() error {
return nil
}
return jetstream.JetStreamConsumer(
- t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
)
}
// onMessage is called in response to a message received on the presence
// events topic from the client api.
-func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
// only send presence events which originated from us
userID := msg.Header.Get(jetstream.UserID)
_, serverName, err := gomatrixserverlib.SplitID('@', userID)
diff --git a/federationapi/consumers/receipts.go b/federationapi/consumers/receipts.go
index 2c9d79bc..366cb264 100644
--- a/federationapi/consumers/receipts.go
+++ b/federationapi/consumers/receipts.go
@@ -65,14 +65,15 @@ func NewOutputReceiptConsumer(
// Start consuming from the clientapi
func (t *OutputReceiptConsumer) Start() error {
return jetstream.JetStreamConsumer(
- t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
)
}
// onMessage is called in response to a message received on the receipt
// events topic from the client api.
-func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
receipt := syncTypes.OutputReceiptEvent{
UserID: msg.Header.Get(jetstream.UserID),
RoomID: msg.Header.Get(jetstream.RoomID),
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index 2622ecb3..349b50b0 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -68,8 +68,8 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
- nats.DeliverAll(), nats.ManualAck(),
+ s.ctx, s.jetstream, s.topic, s.durable, 1,
+ s.onMessage, nats.DeliverAll(), nats.ManualAck(),
)
}
@@ -77,7 +77,8 @@ func (s *OutputRoomEventConsumer) Start() error {
// It is unsafe to call this with messages for the same room in multiple gorountines
// because updates it will likely fail with a types.EventIDMismatchError when it
// realises that it cannot update the room state using the deltas.
-func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
diff --git a/federationapi/consumers/sendtodevice.go b/federationapi/consumers/sendtodevice.go
index f99a895e..e44bad72 100644
--- a/federationapi/consumers/sendtodevice.go
+++ b/federationapi/consumers/sendtodevice.go
@@ -63,14 +63,15 @@ func NewOutputSendToDeviceConsumer(
// Start consuming from the client api
func (t *OutputSendToDeviceConsumer) Start() error {
return jetstream.JetStreamConsumer(
- t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
- nats.DeliverAll(), nats.ManualAck(),
+ t.ctx, t.jetstream, t.topic, t.durable, 1,
+ t.onMessage, nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called in response to a message received on the
// send-to-device events topic from the client api.
-func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
// only send send-to-device events which originated from us
sender := msg.Header.Get("sender")
_, originServerName, err := gomatrixserverlib.SplitID('@', sender)
diff --git a/federationapi/consumers/typing.go b/federationapi/consumers/typing.go
index 428e1a86..9c737913 100644
--- a/federationapi/consumers/typing.go
+++ b/federationapi/consumers/typing.go
@@ -62,14 +62,15 @@ func NewOutputTypingConsumer(
// Start consuming from the clientapi
func (t *OutputTypingConsumer) Start() error {
return jetstream.JetStreamConsumer(
- t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ t.ctx, t.jetstream, t.topic, t.durable, 1, t.onMessage,
nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
)
}
// onMessage is called in response to a message received on the typing
// events topic from the client api.
-func (t *OutputTypingConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
// Extract the typing event from msg.
roomID := msg.Header.Get(jetstream.RoomID)
userID := msg.Header.Get(jetstream.UserID)