aboutsummaryrefslogtreecommitdiff
path: root/appservice/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'appservice/consumers/roomserver.go')
-rw-r--r--appservice/consumers/roomserver.go7
1 files changed, 4 insertions, 3 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index d567408b..21b52bc3 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -68,14 +68,15 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
- nats.DeliverAll(), nats.ManualAck(),
+ s.ctx, s.jetstream, s.topic, s.durable, 1,
+ s.onMessage, nats.DeliverAll(), nats.ManualAck(),
)
}
// onMessage is called when the appservice component receives a new event from
// the room server output log.
-func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {