aboutsummaryrefslogtreecommitdiff
path: root/appservice/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'appservice/consumers/roomserver.go')
-rw-r--r--appservice/consumers/roomserver.go82
1 files changed, 42 insertions, 40 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 2ad7f68f..139b5724 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -20,23 +20,25 @@ import (
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
- "github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- roomServerConsumer *internal.ContinualConsumer
- asDB storage.Database
- rsAPI api.RoomserverInternalAPI
- serverName string
- workerStates []types.ApplicationServiceWorkerState
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ topic string
+ asDB storage.Database
+ rsAPI api.RoomserverInternalAPI
+ serverName string
+ workerStates []types.ApplicationServiceWorkerState
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
@@ -44,55 +46,55 @@ type OutputRoomEventConsumer struct {
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
appserviceDB storage.Database,
rsAPI api.RoomserverInternalAPI,
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "appservice/roomserver",
- Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
- Consumer: kafkaConsumer,
- PartitionStore: appserviceDB,
+ return &OutputRoomEventConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
+ asDB: appserviceDB,
+ rsAPI: rsAPI,
+ serverName: string(cfg.Global.ServerName),
+ workerStates: workerStates,
}
- s := &OutputRoomEventConsumer{
- roomServerConsumer: &consumer,
- asDB: appserviceDB,
- rsAPI: rsAPI,
- serverName: string(cfg.Global.ServerName),
- workerStates: workerStates,
- }
- consumer.ProcessMessage = s.onMessage
-
- return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return s.roomServerConsumer.Start()
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ return err
}
// onMessage is called when the appservice component receives a new event from
// the room server output log.
-func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
- // Parse out the event JSON
- var output api.OutputEvent
- if err := json.Unmarshal(msg.Value, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("roomserver output log: message parse failure")
- return nil
- }
+func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
+ jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return true
+ }
- if output.Type != api.OutputTypeNewRoomEvent {
- return nil
- }
+ if output.Type != api.OutputTypeNewRoomEvent {
+ return true
+ }
+
+ events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
+ events = append(events, output.NewRoomEvent.AddStateEvents...)
- events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
- events = append(events, output.NewRoomEvent.AddStateEvents...)
+ // Send event to any relevant application services
+ if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
+ log.WithError(err).Errorf("roomserver output log: filter error")
+ return true
+ }
- // Send event to any relevant application services
- return s.filterRoomserverEvents(context.TODO(), events)
+ return true
+ })
}
// filterRoomserverEvents takes in events and decides whether any of them need