diff options
Diffstat (limited to 'appservice/consumers/roomserver.go')
-rw-r--r-- | appservice/consumers/roomserver.go | 318 |
1 files changed, 189 insertions, 129 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 21b52bc3..a30944e7 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -15,14 +15,18 @@ package consumers import ( + "bytes" "context" "encoding/json" + "fmt" + "math" + "net/http" + "net/url" + "time" "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" - "github.com/matrix-org/dendrite/appservice/storage" - "github.com/matrix-org/dendrite/appservice/types" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/jetstream" @@ -33,178 +37,192 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { - ctx context.Context - jetstream nats.JetStreamContext - durable string - topic string - asDB storage.Database - rsAPI api.AppserviceRoomserverAPI - serverName string - workerStates []types.ApplicationServiceWorkerState + ctx context.Context + cfg *config.AppServiceAPI + client *http.Client + jetstream nats.JetStreamContext + topic string + rsAPI api.AppserviceRoomserverAPI +} + +type appserviceState struct { + *config.ApplicationService + backoff int } // NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call // Start() to begin consuming from room servers. func NewOutputRoomEventConsumer( process *process.ProcessContext, - cfg *config.Dendrite, + cfg *config.AppServiceAPI, + client *http.Client, js nats.JetStreamContext, - appserviceDB storage.Database, rsAPI api.AppserviceRoomserverAPI, - workerStates []types.ApplicationServiceWorkerState, ) *OutputRoomEventConsumer { return &OutputRoomEventConsumer{ - ctx: process.Context(), - jetstream: js, - durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"), - topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent), - asDB: appserviceDB, - rsAPI: rsAPI, - serverName: string(cfg.Global.ServerName), - workerStates: workerStates, + ctx: process.Context(), + cfg: cfg, + client: client, + jetstream: js, + topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent), + rsAPI: rsAPI, } } // Start consuming from room servers func (s *OutputRoomEventConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, 1, - s.onMessage, nats.DeliverAll(), nats.ManualAck(), - ) + for _, as := range s.cfg.Derived.ApplicationServices { + appsvc := as + state := &appserviceState{ + ApplicationService: &appsvc, + } + token := jetstream.Tokenise(as.ID) + if err := jetstream.JetStreamConsumer( + s.ctx, s.jetstream, s.topic, + s.cfg.Matrix.JetStream.Durable("Appservice_"+token), + 50, // maximum number of events to send in a single transaction + func(ctx context.Context, msgs []*nats.Msg) bool { + return s.onMessage(ctx, state, msgs) + }, + nats.DeliverNew(), nats.ManualAck(), + ); err != nil { + return fmt.Errorf("failed to create %q consumer: %w", token, err) + } + } + return nil } // onMessage is called when the appservice component receives a new event from // the room server output log. -func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool { - msg := msgs[0] // Guaranteed to exist if onMessage is called - // Parse out the event JSON - var output api.OutputEvent - if err := json.Unmarshal(msg.Data, &output); err != nil { - // If the message was invalid, log it and move on to the next message in the stream - log.WithError(err).Errorf("roomserver output log: message parse failure") - return true - } - - log.WithFields(log.Fields{ - "type": output.Type, - }).Debug("Got a message in OutputRoomEventConsumer") - - events := []*gomatrixserverlib.HeaderedEvent{} - if output.Type == api.OutputTypeNewRoomEvent && output.NewRoomEvent != nil { - newEventID := output.NewRoomEvent.Event.EventID() - events = append(events, output.NewRoomEvent.Event) - if len(output.NewRoomEvent.AddsStateEventIDs) > 0 { - eventsReq := &api.QueryEventsByIDRequest{ - EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)), +func (s *OutputRoomEventConsumer) onMessage( + ctx context.Context, state *appserviceState, msgs []*nats.Msg, +) bool { + log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs)) + events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs)) + for _, msg := range msgs { + // Parse out the event JSON + var output api.OutputEvent + if err := json.Unmarshal(msg.Data, &output); err != nil { + // If the message was invalid, log it and move on to the next message in the stream + log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring") + continue + } + switch output.Type { + case api.OutputTypeNewRoomEvent: + if output.NewRoomEvent == nil || !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) { + continue } - eventsRes := &api.QueryEventsByIDResponse{} - for _, eventID := range output.NewRoomEvent.AddsStateEventIDs { - if eventID != newEventID { - eventsReq.EventIDs = append(eventsReq.EventIDs, eventID) + events = append(events, output.NewRoomEvent.Event) + if len(output.NewRoomEvent.AddsStateEventIDs) > 0 { + newEventID := output.NewRoomEvent.Event.EventID() + eventsReq := &api.QueryEventsByIDRequest{ + EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)), } - } - if len(eventsReq.EventIDs) > 0 { - if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { - log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed") - return false + eventsRes := &api.QueryEventsByIDResponse{} + for _, eventID := range output.NewRoomEvent.AddsStateEventIDs { + if eventID != newEventID { + eventsReq.EventIDs = append(eventsReq.EventIDs, eventID) + } + } + if len(eventsReq.EventIDs) > 0 { + if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil { + log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed") + return false + } + events = append(events, eventsRes.Events...) } - events = append(events, eventsRes.Events...) } + + case api.OutputTypeNewInviteEvent: + if output.NewInviteEvent == nil { + continue + } + events = append(events, output.NewInviteEvent.Event) + + default: + continue } - } else if output.Type == api.OutputTypeNewInviteEvent && output.NewInviteEvent != nil { - events = append(events, output.NewInviteEvent.Event) - } else { - log.WithFields(log.Fields{ - "type": output.Type, - }).Debug("appservice OutputRoomEventConsumer ignoring event", string(msg.Data)) - return true } - // Send event to any relevant application services - if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { - log.WithError(err).Errorf("roomserver output log: filter error") + // If there are no events selected for sending then we should + // ack the messages so that we don't get sent them again in the + // future. + if len(events) == 0 { return true } - return true + // Send event to any relevant application services. If we hit + // an error here, return false, so that we negatively ack. + log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events)) + return s.sendEvents(ctx, state, events) == nil } -// filterRoomserverEvents takes in events and decides whether any of them need -// to be passed on to an external application service. It does this by checking -// each namespace of each registered application service, and if there is a -// match, adds the event to the queue for events to be sent to a particular -// application service. -func (s *OutputRoomEventConsumer) filterRoomserverEvents( - ctx context.Context, +// sendEvents passes events to the appservice by using the transactions +// endpoint. It will block for the backoff period if necessary. +func (s *OutputRoomEventConsumer) sendEvents( + ctx context.Context, state *appserviceState, events []*gomatrixserverlib.HeaderedEvent, ) error { - for _, ws := range s.workerStates { - for _, event := range events { - // Check if this event is interesting to this application service - if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) { - // Queue this event to be sent off to the application service - if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { - log.WithError(err).Warn("failed to insert incoming event into appservices database") - return err - } else { - // Tell our worker to send out new messages by updating remaining message - // count and waking them up with a broadcast - ws.NotifyNewEvents() - } - } - } + // Create the transaction body. + transaction, err := json.Marshal( + gomatrixserverlib.ApplicationServiceTransaction{ + Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll), + }, + ) + if err != nil { + return err } - return nil -} + // TODO: We should probably be more intelligent and pick something not + // in the control of the event. A NATS timestamp header or something maybe. + txnID := events[0].Event.OriginServerTS() -// appserviceJoinedAtEvent returns a boolean depending on whether a given -// appservice has membership at the time a given event was created. -func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool { - // TODO: This is only checking the current room state, not the state at - // the event in question. Pretty sure this is what Synapse does too, but - // until we have a lighter way of checking the state before the event that - // doesn't involve state res, then this is probably OK. - membershipReq := &api.QueryMembershipsForRoomRequest{ - RoomID: event.RoomID(), - JoinedOnly: true, + // Send the transaction to the appservice. + // https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid + address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken)) + req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := s.client.Do(req) + if err != nil { + return state.backoffAndPause(err) } - membershipRes := &api.QueryMembershipsForRoomResponse{} - // XXX: This could potentially race if the state for the event is not known yet - // e.g. the event came over federation but we do not have the full state persisted. - if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil { - for _, ev := range membershipRes.JoinEvents { - var membership gomatrixserverlib.MemberContent - if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil { - continue - } - if appservice.IsInterestedInUserID(*ev.StateKey) { - return true - } - } - } else { - log.WithFields(log.Fields{ - "room_id": event.RoomID(), - }).WithError(err).Errorf("Unable to get membership for room") + // If the response was fine then we can clear any backoffs in place and + // report that everything was OK. Otherwise, back off for a while. + switch resp.StatusCode { + case http.StatusOK: + state.backoff = 0 + default: + return state.backoffAndPause(fmt.Errorf("received HTTP status code %d from appservice", resp.StatusCode)) } - return false + return nil +} + +// backoff pauses the calling goroutine for a 2^some backoff exponent seconds +func (s *appserviceState) backoffAndPause(err error) error { + if s.backoff < 6 { + s.backoff++ + } + duration := time.Second * time.Duration(math.Pow(2, float64(s.backoff))) + log.WithField("appservice", s.ID).WithError(err).Errorf("Unable to send transaction to appservice, backing off for %s", duration.String()) + time.Sleep(duration) + return err } // appserviceIsInterestedInEvent returns a boolean depending on whether a given // event falls within one of a given application service's namespaces. // // TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682 -func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool { - // No reason to queue events if they'll never be sent to the application - // service - if appservice.URL == "" { +func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool { + switch { + case appservice.URL == "": return false - } - - // Check Room ID and Sender of the event - if appservice.IsInterestedInUserID(event.Sender()) || - appservice.IsInterestedInRoomID(event.RoomID()) { + case appservice.IsInterestedInUserID(event.Sender()): + return true + case appservice.IsInterestedInRoomID(event.RoomID()): return true } @@ -225,10 +243,52 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont } } else { log.WithFields(log.Fields{ - "room_id": event.RoomID(), + "appservice": appservice.ID, + "room_id": event.RoomID(), }).WithError(err).Errorf("Unable to get aliases for room") } // Check if any of the members in the room match the appservice return s.appserviceJoinedAtEvent(ctx, event, appservice) } + +// appserviceJoinedAtEvent returns a boolean depending on whether a given +// appservice has membership at the time a given event was created. +func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool { + // TODO: This is only checking the current room state, not the state at + // the event in question. Pretty sure this is what Synapse does too, but + // until we have a lighter way of checking the state before the event that + // doesn't involve state res, then this is probably OK. + membershipReq := &api.QueryMembershipsForRoomRequest{ + RoomID: event.RoomID(), + JoinedOnly: true, + } + membershipRes := &api.QueryMembershipsForRoomResponse{} + + // XXX: This could potentially race if the state for the event is not known yet + // e.g. the event came over federation but we do not have the full state persisted. + if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil { + for _, ev := range membershipRes.JoinEvents { + switch { + case ev.StateKey == nil: + continue + case ev.Type != gomatrixserverlib.MRoomMember: + continue + } + var membership gomatrixserverlib.MemberContent + err = json.Unmarshal(ev.Content, &membership) + switch { + case err != nil: + continue + case membership.Membership == gomatrixserverlib.Join: + return true + } + } + } else { + log.WithFields(log.Fields{ + "appservice": appservice.ID, + "room_id": event.RoomID(), + }).WithError(err).Errorf("Unable to get membership for room") + } + return false +} |