diff options
Diffstat (limited to 'appservice/consumers')
-rw-r--r-- | appservice/consumers/roomserver.go | 42 |
1 files changed, 38 insertions, 4 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 5cbffa35..2ad7f68f 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -85,9 +85,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { } if output.Type != api.OutputTypeNewRoomEvent { - log.WithField("type", output.Type).Debug( - "roomserver output log: ignoring unknown output type", - ) return nil } @@ -114,6 +111,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( // Queue this event to be sent off to the application service if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil { log.WithError(err).Warn("failed to insert incoming event into appservices database") + return err } else { // Tell our worker to send out new messages by updating remaining message // count and waking them up with a broadcast @@ -126,8 +124,43 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( return nil } +// appserviceJoinedAtEvent returns a boolean depending on whether a given +// appservice has membership at the time a given event was created. +func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool { + // TODO: This is only checking the current room state, not the state at + // the event in question. Pretty sure this is what Synapse does too, but + // until we have a lighter way of checking the state before the event that + // doesn't involve state res, then this is probably OK. + membershipReq := &api.QueryMembershipsForRoomRequest{ + RoomID: event.RoomID(), + JoinedOnly: true, + } + membershipRes := &api.QueryMembershipsForRoomResponse{} + + // XXX: This could potentially race if the state for the event is not known yet + // e.g. the event came over federation but we do not have the full state persisted. + if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil { + for _, ev := range membershipRes.JoinEvents { + var membership gomatrixserverlib.MemberContent + if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil { + continue + } + if appservice.IsInterestedInUserID(*ev.StateKey) { + return true + } + } + } else { + log.WithFields(log.Fields{ + "room_id": event.RoomID(), + }).WithError(err).Errorf("Unable to get membership for room") + } + return false +} + // appserviceIsInterestedInEvent returns a boolean depending on whether a given // event falls within one of a given application service's namespaces. +// +// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682 func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool { // No reason to queue events if they'll never be sent to the application // service @@ -162,5 +195,6 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont }).WithError(err).Errorf("Unable to get aliases for room") } - return false + // Check if any of the members in the room match the appservice + return s.appserviceJoinedAtEvent(ctx, event, appservice) } |