aboutsummaryrefslogtreecommitdiff
path: root/userapi/consumers/syncapi_streamevent.go
diff options
context:
space:
mode:
Diffstat (limited to 'userapi/consumers/syncapi_streamevent.go')
-rw-r--r--userapi/consumers/syncapi_streamevent.go13
1 files changed, 5 insertions, 8 deletions
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go
index 3ac6f58d..f3b2bf27 100644
--- a/userapi/consumers/syncapi_streamevent.go
+++ b/userapi/consumers/syncapi_streamevent.go
@@ -29,7 +29,6 @@ import (
type OutputStreamEventConsumer struct {
ctx context.Context
cfg *config.UserAPI
- userAPI api.UserInternalAPI
rsAPI rsapi.UserRoomserverAPI
jetstream nats.JetStreamContext
durable string
@@ -45,7 +44,6 @@ func NewOutputStreamEventConsumer(
js nats.JetStreamContext,
store storage.Database,
pgClient pushgateway.Client,
- userAPI api.UserInternalAPI,
rsAPI rsapi.UserRoomserverAPI,
syncProducer *producers.SyncAPI,
) *OutputStreamEventConsumer {
@@ -57,7 +55,6 @@ func NewOutputStreamEventConsumer(
durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent),
pgClient: pgClient,
- userAPI: userAPI,
rsAPI: rsAPI,
syncProducer: syncProducer,
}
@@ -305,7 +302,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
"event_id": event.EventID(),
"room_id": event.RoomID(),
"localpart": mem.Localpart,
- }).Tracef("Push rule evaluation rejected the event")
+ }).Debugf("Push rule evaluation rejected the event")
return nil
}
@@ -348,7 +345,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma
"localpart": mem.Localpart,
"num_urls": len(devicesByURLAndFormat),
"num_unread": userNumUnreadNotifs,
- }).Tracef("Notifying single member")
+ }).Debugf("Notifying single member")
// Push gateways are out of our control, and we cannot risk
// looking up the server on a misbehaving push gateway. Each user
@@ -422,8 +419,8 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
return nil, fmt.Errorf("user %s is ignored", sender)
}
}
- var res api.QueryPushRulesResponse
- if err = s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil {
+ ruleSets, err := s.db.QueryPushRules(ctx, mem.Localpart)
+ if err != nil {
return nil, err
}
@@ -434,7 +431,7 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event
roomID: event.RoomID(),
roomSize: roomSize,
}
- eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global)
+ eval := pushrules.NewRuleSetEvaluator(ec, &ruleSets.Global)
rule, err := eval.MatchEvent(event.Event)
if err != nil {
return nil, err