aboutsummaryrefslogtreecommitdiff
path: root/federationapi/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/consumers/roomserver.go')
-rw-r--r--federationapi/consumers/roomserver.go144
1 files changed, 71 insertions, 73 deletions
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index 20b1bacb..12410bb7 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -19,117 +19,115 @@ import (
"encoding/json"
"fmt"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/types"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- cfg *config.FederationAPI
- rsAPI api.RoomserverInternalAPI
- rsConsumer *internal.ContinualConsumer
- db storage.Database
- queues *queue.OutgoingQueues
+ ctx context.Context
+ cfg *config.FederationAPI
+ rsAPI api.RoomserverInternalAPI
+ jetstream nats.JetStreamContext
+ db storage.Database
+ queues *queue.OutgoingQueues
+ topic string
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.FederationAPI,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "federationapi/roomserver",
- Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
- Consumer: kafkaConsumer,
- PartitionStore: store,
+ return &OutputRoomEventConsumer{
+ ctx: process.Context(),
+ cfg: cfg,
+ jetstream: js,
+ db: store,
+ queues: queues,
+ rsAPI: rsAPI,
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
}
- s := &OutputRoomEventConsumer{
- cfg: cfg,
- rsConsumer: &consumer,
- db: store,
- queues: queues,
- rsAPI: rsAPI,
- }
- consumer.ProcessMessage = s.onMessage
-
- return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return s.rsConsumer.Start()
+ _, err := s.jetstream.Subscribe(s.topic, s.onMessage)
+ return err
}
// onMessage is called when the federation server receives a new event from the room server output log.
// It is unsafe to call this with messages for the same room in multiple gorountines
// because updates it will likely fail with a types.EventIDMismatchError when it
// realises that it cannot update the room state using the deltas.
-func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
- // Parse out the event JSON
- var output api.OutputEvent
- if err := json.Unmarshal(msg.Value, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("roomserver output log: message parse failure")
- return nil
- }
+func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
+ jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return true
+ }
- switch output.Type {
- case api.OutputTypeNewRoomEvent:
- ev := output.NewRoomEvent.Event
+ switch output.Type {
+ case api.OutputTypeNewRoomEvent:
+ ev := output.NewRoomEvent.Event
- if output.NewRoomEvent.RewritesState {
- if err := s.db.PurgeRoomState(context.TODO(), ev.RoomID()); err != nil {
- return fmt.Errorf("s.db.PurgeRoom: %w", err)
+ if output.NewRoomEvent.RewritesState {
+ if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil {
+ log.WithError(err).Errorf("roomserver output log: purge room state failure")
+ return false
+ }
}
- }
- if err := s.processMessage(*output.NewRoomEvent); err != nil {
- switch err.(type) {
- case *queue.ErrorFederationDisabled:
- log.WithField("error", output.Type).Info(
- err.Error(),
- )
- default:
- // panic rather than continue with an inconsistent database
+ if err := s.processMessage(*output.NewRoomEvent); err != nil {
+ switch err.(type) {
+ case *queue.ErrorFederationDisabled:
+ log.WithField("error", output.Type).Info(
+ err.Error(),
+ )
+ default:
+ // panic rather than continue with an inconsistent database
+ log.WithFields(log.Fields{
+ "event_id": ev.EventID(),
+ "event": string(ev.JSON()),
+ "add": output.NewRoomEvent.AddsStateEventIDs,
+ "del": output.NewRoomEvent.RemovesStateEventIDs,
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: write room event failure")
+ }
+ }
+
+ case api.OutputTypeNewInboundPeek:
+ if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "event": string(ev.JSON()),
- "add": output.NewRoomEvent.AddsStateEventIDs,
- "del": output.NewRoomEvent.RemovesStateEventIDs,
+ "event": output.NewInboundPeek,
log.ErrorKey: err,
- }).Panicf("roomserver output log: write room event failure")
+ }).Panicf("roomserver output log: remote peek event failure")
+ return false
}
- return nil
- }
- case api.OutputTypeNewInboundPeek:
- if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
- log.WithFields(log.Fields{
- "event": output.NewInboundPeek,
- log.ErrorKey: err,
- }).Panicf("roomserver output log: remote peek event failure")
- return nil
+
+ default:
+ log.WithField("type", output.Type).Debug(
+ "roomserver output log: ignoring unknown output type",
+ )
}
- default:
- log.WithField("type", output.Type).Debug(
- "roomserver output log: ignoring unknown output type",
- )
- return nil
- }
- return nil
+ return true
+ })
}
// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)
@@ -146,7 +144,7 @@ func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPee
//
// This is making the tests flakey.
- return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
+ return s.db.AddInboundPeek(s.ctx, orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
}
// processMessage updates the list of currently joined hosts in the room
@@ -162,7 +160,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
// TODO(#290): handle EventIDMismatchError and recover the current state by
// talking to the roomserver
oldJoinedHosts, err := s.db.UpdateRoom(
- context.TODO(),
+ s.ctx,
ore.Event.RoomID(),
ore.LastSentEventID,
ore.Event.EventID(),
@@ -255,7 +253,7 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
}
// handle peeking hosts
- inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.Event.RoomID())
+ inboundPeeks, err := s.db.GetInboundPeeks(s.ctx, ore.Event.Event.RoomID())
if err != nil {
return nil, err
}
@@ -373,7 +371,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
- if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
+ if err := s.rsAPI.QueryEventsByID(s.ctx, &eventReq, &eventResp); err != nil {
return nil, err
}