aboutsummaryrefslogtreecommitdiff
path: root/appservice/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'appservice/consumers/roomserver.go')
-rw-r--r--appservice/consumers/roomserver.go318
1 files changed, 189 insertions, 129 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 21b52bc3..a30944e7 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -15,14 +15,18 @@
package consumers
import (
+ "bytes"
"context"
"encoding/json"
+ "fmt"
+ "math"
+ "net/http"
+ "net/url"
+ "time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
- "github.com/matrix-org/dendrite/appservice/storage"
- "github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -33,178 +37,192 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- asDB storage.Database
- rsAPI api.AppserviceRoomserverAPI
- serverName string
- workerStates []types.ApplicationServiceWorkerState
+ ctx context.Context
+ cfg *config.AppServiceAPI
+ client *http.Client
+ jetstream nats.JetStreamContext
+ topic string
+ rsAPI api.AppserviceRoomserverAPI
+}
+
+type appserviceState struct {
+ *config.ApplicationService
+ backoff int
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
- cfg *config.Dendrite,
+ cfg *config.AppServiceAPI,
+ client *http.Client,
js nats.JetStreamContext,
- appserviceDB storage.Database,
rsAPI api.AppserviceRoomserverAPI,
- workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
- ctx: process.Context(),
- jetstream: js,
- durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
- topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
- asDB: appserviceDB,
- rsAPI: rsAPI,
- serverName: string(cfg.Global.ServerName),
- workerStates: workerStates,
+ ctx: process.Context(),
+ cfg: cfg,
+ client: client,
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
+ rsAPI: rsAPI,
}
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, 1,
- s.onMessage, nats.DeliverAll(), nats.ManualAck(),
- )
+ for _, as := range s.cfg.Derived.ApplicationServices {
+ appsvc := as
+ state := &appserviceState{
+ ApplicationService: &appsvc,
+ }
+ token := jetstream.Tokenise(as.ID)
+ if err := jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic,
+ s.cfg.Matrix.JetStream.Durable("Appservice_"+token),
+ 50, // maximum number of events to send in a single transaction
+ func(ctx context.Context, msgs []*nats.Msg) bool {
+ return s.onMessage(ctx, state, msgs)
+ },
+ nats.DeliverNew(), nats.ManualAck(),
+ ); err != nil {
+ return fmt.Errorf("failed to create %q consumer: %w", token, err)
+ }
+ }
+ return nil
}
// onMessage is called when the appservice component receives a new event from
// the room server output log.
-func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
- msg := msgs[0] // Guaranteed to exist if onMessage is called
- // Parse out the event JSON
- var output api.OutputEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("roomserver output log: message parse failure")
- return true
- }
-
- log.WithFields(log.Fields{
- "type": output.Type,
- }).Debug("Got a message in OutputRoomEventConsumer")
-
- events := []*gomatrixserverlib.HeaderedEvent{}
- if output.Type == api.OutputTypeNewRoomEvent && output.NewRoomEvent != nil {
- newEventID := output.NewRoomEvent.Event.EventID()
- events = append(events, output.NewRoomEvent.Event)
- if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
- eventsReq := &api.QueryEventsByIDRequest{
- EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)),
+func (s *OutputRoomEventConsumer) onMessage(
+ ctx context.Context, state *appserviceState, msgs []*nats.Msg,
+) bool {
+ log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
+ events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
+ for _, msg := range msgs {
+ // Parse out the event JSON
+ var output api.OutputEvent
+ if err := json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring")
+ continue
+ }
+ switch output.Type {
+ case api.OutputTypeNewRoomEvent:
+ if output.NewRoomEvent == nil || !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) {
+ continue
}
- eventsRes := &api.QueryEventsByIDResponse{}
- for _, eventID := range output.NewRoomEvent.AddsStateEventIDs {
- if eventID != newEventID {
- eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
+ events = append(events, output.NewRoomEvent.Event)
+ if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
+ newEventID := output.NewRoomEvent.Event.EventID()
+ eventsReq := &api.QueryEventsByIDRequest{
+ EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)),
}
- }
- if len(eventsReq.EventIDs) > 0 {
- if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
- log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
- return false
+ eventsRes := &api.QueryEventsByIDResponse{}
+ for _, eventID := range output.NewRoomEvent.AddsStateEventIDs {
+ if eventID != newEventID {
+ eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
+ }
+ }
+ if len(eventsReq.EventIDs) > 0 {
+ if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
+ log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
+ return false
+ }
+ events = append(events, eventsRes.Events...)
}
- events = append(events, eventsRes.Events...)
}
+
+ case api.OutputTypeNewInviteEvent:
+ if output.NewInviteEvent == nil {
+ continue
+ }
+ events = append(events, output.NewInviteEvent.Event)
+
+ default:
+ continue
}
- } else if output.Type == api.OutputTypeNewInviteEvent && output.NewInviteEvent != nil {
- events = append(events, output.NewInviteEvent.Event)
- } else {
- log.WithFields(log.Fields{
- "type": output.Type,
- }).Debug("appservice OutputRoomEventConsumer ignoring event", string(msg.Data))
- return true
}
- // Send event to any relevant application services
- if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
- log.WithError(err).Errorf("roomserver output log: filter error")
+ // If there are no events selected for sending then we should
+ // ack the messages so that we don't get sent them again in the
+ // future.
+ if len(events) == 0 {
return true
}
- return true
+ // Send event to any relevant application services. If we hit
+ // an error here, return false, so that we negatively ack.
+ log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
+ return s.sendEvents(ctx, state, events) == nil
}
-// filterRoomserverEvents takes in events and decides whether any of them need
-// to be passed on to an external application service. It does this by checking
-// each namespace of each registered application service, and if there is a
-// match, adds the event to the queue for events to be sent to a particular
-// application service.
-func (s *OutputRoomEventConsumer) filterRoomserverEvents(
- ctx context.Context,
+// sendEvents passes events to the appservice by using the transactions
+// endpoint. It will block for the backoff period if necessary.
+func (s *OutputRoomEventConsumer) sendEvents(
+ ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent,
) error {
- for _, ws := range s.workerStates {
- for _, event := range events {
- // Check if this event is interesting to this application service
- if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
- // Queue this event to be sent off to the application service
- if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
- log.WithError(err).Warn("failed to insert incoming event into appservices database")
- return err
- } else {
- // Tell our worker to send out new messages by updating remaining message
- // count and waking them up with a broadcast
- ws.NotifyNewEvents()
- }
- }
- }
+ // Create the transaction body.
+ transaction, err := json.Marshal(
+ gomatrixserverlib.ApplicationServiceTransaction{
+ Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll),
+ },
+ )
+ if err != nil {
+ return err
}
- return nil
-}
+ // TODO: We should probably be more intelligent and pick something not
+ // in the control of the event. A NATS timestamp header or something maybe.
+ txnID := events[0].Event.OriginServerTS()
-// appserviceJoinedAtEvent returns a boolean depending on whether a given
-// appservice has membership at the time a given event was created.
-func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
- // TODO: This is only checking the current room state, not the state at
- // the event in question. Pretty sure this is what Synapse does too, but
- // until we have a lighter way of checking the state before the event that
- // doesn't involve state res, then this is probably OK.
- membershipReq := &api.QueryMembershipsForRoomRequest{
- RoomID: event.RoomID(),
- JoinedOnly: true,
+ // Send the transaction to the appservice.
+ // https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
+ address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
+ req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction))
+ if err != nil {
+ return err
+ }
+ req.Header.Set("Content-Type", "application/json")
+ resp, err := s.client.Do(req)
+ if err != nil {
+ return state.backoffAndPause(err)
}
- membershipRes := &api.QueryMembershipsForRoomResponse{}
- // XXX: This could potentially race if the state for the event is not known yet
- // e.g. the event came over federation but we do not have the full state persisted.
- if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
- for _, ev := range membershipRes.JoinEvents {
- var membership gomatrixserverlib.MemberContent
- if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil {
- continue
- }
- if appservice.IsInterestedInUserID(*ev.StateKey) {
- return true
- }
- }
- } else {
- log.WithFields(log.Fields{
- "room_id": event.RoomID(),
- }).WithError(err).Errorf("Unable to get membership for room")
+ // If the response was fine then we can clear any backoffs in place and
+ // report that everything was OK. Otherwise, back off for a while.
+ switch resp.StatusCode {
+ case http.StatusOK:
+ state.backoff = 0
+ default:
+ return state.backoffAndPause(fmt.Errorf("received HTTP status code %d from appservice", resp.StatusCode))
}
- return false
+ return nil
+}
+
+// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
+func (s *appserviceState) backoffAndPause(err error) error {
+ if s.backoff < 6 {
+ s.backoff++
+ }
+ duration := time.Second * time.Duration(math.Pow(2, float64(s.backoff)))
+ log.WithField("appservice", s.ID).WithError(err).Errorf("Unable to send transaction to appservice, backing off for %s", duration.String())
+ time.Sleep(duration)
+ return err
}
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
//
// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682
-func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
- // No reason to queue events if they'll never be sent to the application
- // service
- if appservice.URL == "" {
+func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool {
+ switch {
+ case appservice.URL == "":
return false
- }
-
- // Check Room ID and Sender of the event
- if appservice.IsInterestedInUserID(event.Sender()) ||
- appservice.IsInterestedInRoomID(event.RoomID()) {
+ case appservice.IsInterestedInUserID(event.Sender()):
+ return true
+ case appservice.IsInterestedInRoomID(event.RoomID()):
return true
}
@@ -225,10 +243,52 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
}
} else {
log.WithFields(log.Fields{
- "room_id": event.RoomID(),
+ "appservice": appservice.ID,
+ "room_id": event.RoomID(),
}).WithError(err).Errorf("Unable to get aliases for room")
}
// Check if any of the members in the room match the appservice
return s.appserviceJoinedAtEvent(ctx, event, appservice)
}
+
+// appserviceJoinedAtEvent returns a boolean depending on whether a given
+// appservice has membership at the time a given event was created.
+func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool {
+ // TODO: This is only checking the current room state, not the state at
+ // the event in question. Pretty sure this is what Synapse does too, but
+ // until we have a lighter way of checking the state before the event that
+ // doesn't involve state res, then this is probably OK.
+ membershipReq := &api.QueryMembershipsForRoomRequest{
+ RoomID: event.RoomID(),
+ JoinedOnly: true,
+ }
+ membershipRes := &api.QueryMembershipsForRoomResponse{}
+
+ // XXX: This could potentially race if the state for the event is not known yet
+ // e.g. the event came over federation but we do not have the full state persisted.
+ if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
+ for _, ev := range membershipRes.JoinEvents {
+ switch {
+ case ev.StateKey == nil:
+ continue
+ case ev.Type != gomatrixserverlib.MRoomMember:
+ continue
+ }
+ var membership gomatrixserverlib.MemberContent
+ err = json.Unmarshal(ev.Content, &membership)
+ switch {
+ case err != nil:
+ continue
+ case membership.Membership == gomatrixserverlib.Join:
+ return true
+ }
+ }
+ } else {
+ log.WithFields(log.Fields{
+ "appservice": appservice.ID,
+ "room_id": event.RoomID(),
+ }).WithError(err).Errorf("Unable to get membership for room")
+ }
+ return false
+}