diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-02-02 13:32:48 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-02 13:32:48 +0000 |
commit | c773b038bb1432f2265759ddf1da5e98b9bda525 (patch) | |
tree | 357c7dac69f4f87dda61bbaf004ae3351d5277d8 /appservice/consumers | |
parent | 2dee706f9ef2de70516dbc993dcfc8ec6f7fdd52 (diff) |
Use pull consumers (#2140)
* Pull consumers
* Pull consumers
* Only nuke consumers if they are push consumers
* Clean up old consumers
* Better error handling
* Update comments
Diffstat (limited to 'appservice/consumers')
-rw-r--r-- | appservice/consumers/roomserver.go | 48 |
1 files changed, 24 insertions, 24 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 8aea5c34..7b59e370 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -34,7 +34,7 @@ import ( type OutputRoomEventConsumer struct { ctx context.Context jetstream nats.JetStreamContext - durable nats.SubOpt + durable string topic string asDB storage.Database rsAPI api.RoomserverInternalAPI @@ -66,37 +66,37 @@ func NewOutputRoomEventConsumer( // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - _, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable) - return err + return jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, + nats.DeliverAll(), nats.ManualAck(), + ) } // onMessage is called when the appservice component receives a new event from // the room server output log. -func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { - jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool { - // Parse out the event JSON - var output api.OutputEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return true - } - - if output.Type != api.OutputTypeNewRoomEvent { - return true - } +func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithError(err).Errorf("roomserver output log: message parse failure") + return true + } - events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} - events = append(events, output.NewRoomEvent.AddStateEvents...) + if output.Type != api.OutputTypeNewRoomEvent { + return true + } - // Send event to any relevant application services - if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { - log.WithError(err).Errorf("roomserver output log: filter error") - return true - } + events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event} + events = append(events, output.NewRoomEvent.AddStateEvents...) + // Send event to any relevant application services + if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { + log.WithError(err).Errorf("roomserver output log: filter error") return true - }) + } + + return true } // filterRoomserverEvents takes in events and decides whether any of them need |