aboutsummaryrefslogtreecommitdiff
path: root/appservice
diff options
context:
space:
mode:
authorWill Hunt <will@half-shot.uk>2021-03-03 16:27:44 +0000
committerGitHub <noreply@github.com>2021-03-03 16:27:44 +0000
commita2773922d2fe40e6d95d73f532640702709ab526 (patch)
treeaaf2f63073eb840655d6b244d0c85b534d7b91c3 /appservice
parentd15836e260130f85edd5d9a104e5304f001d2681 (diff)
Send events to appservice based on room membership (#1680)
* Check membership of room * Use QueryStateAfterEventsResponse * Fix complexity * Changes that I made a long time ago * Rename to appserviceJoinedAtEvent * Check membership in GetMemberships * Update QueryMembershipsForRoom * Tweaks in client API * Update appserviceJoinedAtEvent * Comments * Try QueryMembershipForUser instead * Undo some changes to client API that shouldn't be needed * More /event tweaks * Refactor /event bit * Go back to QueryMembershipsForRoom because appservices are hard * Fix bugs in onMessage * Add comments Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'appservice')
-rw-r--r--appservice/consumers/roomserver.go42
-rw-r--r--appservice/workers/transaction_scheduler.go2
2 files changed, 39 insertions, 5 deletions
diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go
index 5cbffa35..2ad7f68f 100644
--- a/appservice/consumers/roomserver.go
+++ b/appservice/consumers/roomserver.go
@@ -85,9 +85,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
if output.Type != api.OutputTypeNewRoomEvent {
- log.WithField("type", output.Type).Debug(
- "roomserver output log: ignoring unknown output type",
- )
return nil
}
@@ -114,6 +111,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
+ return err
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
@@ -126,8 +124,43 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents(
return nil
}
+// appserviceJoinedAtEvent returns a boolean depending on whether a given
+// appservice has membership at the time a given event was created.
+func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
+ // TODO: This is only checking the current room state, not the state at
+ // the event in question. Pretty sure this is what Synapse does too, but
+ // until we have a lighter way of checking the state before the event that
+ // doesn't involve state res, then this is probably OK.
+ membershipReq := &api.QueryMembershipsForRoomRequest{
+ RoomID: event.RoomID(),
+ JoinedOnly: true,
+ }
+ membershipRes := &api.QueryMembershipsForRoomResponse{}
+
+ // XXX: This could potentially race if the state for the event is not known yet
+ // e.g. the event came over federation but we do not have the full state persisted.
+ if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
+ for _, ev := range membershipRes.JoinEvents {
+ var membership gomatrixserverlib.MemberContent
+ if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil {
+ continue
+ }
+ if appservice.IsInterestedInUserID(*ev.StateKey) {
+ return true
+ }
+ }
+ } else {
+ log.WithFields(log.Fields{
+ "room_id": event.RoomID(),
+ }).WithError(err).Errorf("Unable to get membership for room")
+ }
+ return false
+}
+
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
+//
+// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
// No reason to queue events if they'll never be sent to the application
// service
@@ -162,5 +195,6 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
}).WithError(err).Errorf("Unable to get aliases for room")
}
- return false
+ // Check if any of the members in the room match the appservice
+ return s.appserviceJoinedAtEvent(ctx, event, appservice)
}
diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go
index 6528fc1b..45748c21 100644
--- a/appservice/workers/transaction_scheduler.go
+++ b/appservice/workers/transaction_scheduler.go
@@ -62,7 +62,7 @@ func SetupTransactionWorkers(
func worker(db storage.Database, ws types.ApplicationServiceWorkerState) {
log.WithFields(log.Fields{
"appservice": ws.AppService.ID,
- }).Info("starting application service")
+ }).Info("Starting application service")
ctx := context.Background()
// Create a HTTP client for sending requests to app services