aboutsummaryrefslogtreecommitdiff
path: root/federationapi/consumers/roomserver.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-02-02 13:32:48 +0000
committerGitHub <noreply@github.com>2022-02-02 13:32:48 +0000
commitc773b038bb1432f2265759ddf1da5e98b9bda525 (patch)
tree357c7dac69f4f87dda61bbaf004ae3351d5277d8 /federationapi/consumers/roomserver.go
parent2dee706f9ef2de70516dbc993dcfc8ec6f7fdd52 (diff)
Use pull consumers (#2140)
* Pull consumers * Pull consumers * Only nuke consumers if they are push consumers * Clean up old consumers * Better error handling * Update comments
Diffstat (limited to 'federationapi/consumers/roomserver.go')
-rw-r--r--federationapi/consumers/roomserver.go100
1 files changed, 48 insertions, 52 deletions
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index 25ea7827..ac29f930 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -37,7 +37,7 @@ type OutputRoomEventConsumer struct {
cfg *config.FederationAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
- durable nats.SubOpt
+ durable string
db storage.Database
queues *queue.OutgoingQueues
topic string
@@ -66,74 +66,70 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- _, err := s.jetstream.Subscribe(
- s.topic, s.onMessage, s.durable,
- nats.DeliverAll(),
- nats.ManualAck(),
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
)
- return err
}
// onMessage is called when the federation server receives a new event from the room server output log.
// It is unsafe to call this with messages for the same room in multiple gorountines
// because updates it will likely fail with a types.EventIDMismatchError when it
// realises that it cannot update the room state using the deltas.
-func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
- jetstream.WithJetStreamMessage(msg, func(msg *nats.Msg) bool {
- // Parse out the event JSON
- var output api.OutputEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("roomserver output log: message parse failure")
- return true
- }
+func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("roomserver output log: message parse failure")
+ return true
+ }
- switch output.Type {
- case api.OutputTypeNewRoomEvent:
- ev := output.NewRoomEvent.Event
+ switch output.Type {
+ case api.OutputTypeNewRoomEvent:
+ ev := output.NewRoomEvent.Event
- if output.NewRoomEvent.RewritesState {
- if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil {
- log.WithError(err).Errorf("roomserver output log: purge room state failure")
- return false
- }
- }
-
- if err := s.processMessage(*output.NewRoomEvent); err != nil {
- switch err.(type) {
- case *queue.ErrorFederationDisabled:
- log.WithField("error", output.Type).Info(
- err.Error(),
- )
- default:
- // panic rather than continue with an inconsistent database
- log.WithFields(log.Fields{
- "event_id": ev.EventID(),
- "event": string(ev.JSON()),
- "add": output.NewRoomEvent.AddsStateEventIDs,
- "del": output.NewRoomEvent.RemovesStateEventIDs,
- log.ErrorKey: err,
- }).Panicf("roomserver output log: write room event failure")
- }
+ if output.NewRoomEvent.RewritesState {
+ if err := s.db.PurgeRoomState(s.ctx, ev.RoomID()); err != nil {
+ log.WithError(err).Errorf("roomserver output log: purge room state failure")
+ return false
}
+ }
- case api.OutputTypeNewInboundPeek:
- if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
+ if err := s.processMessage(*output.NewRoomEvent); err != nil {
+ switch err.(type) {
+ case *queue.ErrorFederationDisabled:
+ log.WithField("error", output.Type).Info(
+ err.Error(),
+ )
+ default:
+ // panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
- "event": output.NewInboundPeek,
+ "event_id": ev.EventID(),
+ "event": string(ev.JSON()),
+ "add": output.NewRoomEvent.AddsStateEventIDs,
+ "del": output.NewRoomEvent.RemovesStateEventIDs,
log.ErrorKey: err,
- }).Panicf("roomserver output log: remote peek event failure")
- return false
+ }).Panicf("roomserver output log: write room event failure")
}
+ }
- default:
- log.WithField("type", output.Type).Debug(
- "roomserver output log: ignoring unknown output type",
- )
+ case api.OutputTypeNewInboundPeek:
+ if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
+ log.WithFields(log.Fields{
+ "event": output.NewInboundPeek,
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: remote peek event failure")
+ return false
}
- return true
- })
+ default:
+ log.WithField("type", output.Type).Debug(
+ "roomserver output log: ignoring unknown output type",
+ )
+ }
+
+ return true
}
// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)