aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'appservice')
-rw-r--r--appservice/appservice.go6
-rw-r--r--appservice/consumers/roomserver.go82
-rw-r--r--appservice/storage/interface.go2
3 files changed, 45 insertions, 45 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index 5f16c10b..924a609e 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -32,7 +32,7 @@ import (
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/kafka"
+ "github.com/matrix-org/dendrite/setup/jetstream"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
)
@@ -58,7 +58,7 @@ func NewInternalAPI(
},
},
}
- consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka)
+ js, _, _ := jetstream.Prepare(&base.Cfg.Global.JetStream)
// Create a connection to the appservice postgres DB
appserviceDB, err := storage.NewDatabase(&base.Cfg.AppServiceAPI.Database)
@@ -97,7 +97,7 @@ func NewInternalAPI(
// We can't add ASes at runtime so this is safe to do.
if len(workerStates) > 0 {
consumer := consumers.NewOutputRoomEventConsumer(
- base.ProcessContext, base.Cfg, consumer, appserviceDB,
+ base.ProcessContext, base.Cfg, js, appserviceDB,
rsAPI, workerStates,
)
if err := consumer.Start(); err != nil {
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 2ad7f68f..139b5724 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -20,23 +20,25 @@ import (
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
- "github.com/Shopify/sarama"
log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- roomServerConsumer *internal.ContinualConsumer
- asDB storage.Database
- rsAPI api.RoomserverInternalAPI
- serverName string
- workerStates []types.ApplicationServiceWorkerState
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ topic string
+ asDB storage.Database
+ rsAPI api.RoomserverInternalAPI
+ serverName string
+ workerStates []types.ApplicationServiceWorkerState
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
@@ -44,55 +46,55 @@ type OutputRoomEventConsumer struct {
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
appserviceDB storage.Database,
rsAPI api.RoomserverInternalAPI,
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "appservice/roomserver",
- Topic: cfg.Global.Kafka.TopicFor(config.TopicOutputRoomEvent),
- Consumer: kafkaConsumer,
- PartitionStore: appserviceDB,
+ return &OutputRoomEventConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Global.JetStream.TopicFor(jetstream.OutputRoomEvent),
+ asDB: appserviceDB,
+ rsAPI: rsAPI,
+ serverName: string(cfg.Global.ServerName),
+ workerStates: workerStates,
}
- s := &OutputRoomEventConsumer{
- roomServerConsumer: &consumer,
- asDB: appserviceDB,
- rsAPI: rsAPI,
- serverName: string(cfg.Global.ServerName),
- workerStates: workerStates,
- }
- consumer.ProcessMessage = s.onMessage
-
- return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return s.roomServerConsumer.Start()
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ return err
}
// onMessage is called when the appservice component receives a new event from
// the room server output log.
-func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
- // Parse out the event JSON
- var output api.OutputEvent
- if err := json.Unmarshal(msg.Value, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("roomserver output log: message parse failure")
- return nil
- }
+func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
+ jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return true
+ }
- if output.Type != api.OutputTypeNewRoomEvent {
- return nil
- }
+ if output.Type != api.OutputTypeNewRoomEvent {
+ return true
+ }
+
+ events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
+ events = append(events, output.NewRoomEvent.AddStateEvents...)
- events := []*gomatrixserverlib.HeaderedEvent{output.NewRoomEvent.Event}
- events = append(events, output.NewRoomEvent.AddStateEvents...)
+ // Send event to any relevant application services
+ if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
+ log.WithError(err).Errorf("roomserver output log: filter error")
+ return true
+ }
- // Send event to any relevant application services
- return s.filterRoomserverEvents(context.TODO(), events)
+ return true
+ })
}
// filterRoomserverEvents takes in events and decides whether any of them need
diff --git a/appservice/storage/interface.go b/appservice/storage/interface.go
index 735e2f90..25d35af6 100644
--- a/appservice/storage/interface.go
+++ b/appservice/storage/interface.go
@@ -17,12 +17,10 @@ package storage
import (
"context"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
- internal.PartitionStorer
StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error
GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error)
CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error)