diff options
Diffstat (limited to 'userapi/consumers/syncapi_streamevent.go')
-rw-r--r-- | userapi/consumers/syncapi_streamevent.go | 13 |
1 files changed, 5 insertions, 8 deletions
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go index 3ac6f58d..f3b2bf27 100644 --- a/userapi/consumers/syncapi_streamevent.go +++ b/userapi/consumers/syncapi_streamevent.go @@ -29,7 +29,6 @@ import ( type OutputStreamEventConsumer struct { ctx context.Context cfg *config.UserAPI - userAPI api.UserInternalAPI rsAPI rsapi.UserRoomserverAPI jetstream nats.JetStreamContext durable string @@ -45,7 +44,6 @@ func NewOutputStreamEventConsumer( js nats.JetStreamContext, store storage.Database, pgClient pushgateway.Client, - userAPI api.UserInternalAPI, rsAPI rsapi.UserRoomserverAPI, syncProducer *producers.SyncAPI, ) *OutputStreamEventConsumer { @@ -57,7 +55,6 @@ func NewOutputStreamEventConsumer( durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"), topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputStreamEvent), pgClient: pgClient, - userAPI: userAPI, rsAPI: rsAPI, syncProducer: syncProducer, } @@ -305,7 +302,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma "event_id": event.EventID(), "room_id": event.RoomID(), "localpart": mem.Localpart, - }).Tracef("Push rule evaluation rejected the event") + }).Debugf("Push rule evaluation rejected the event") return nil } @@ -348,7 +345,7 @@ func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *goma "localpart": mem.Localpart, "num_urls": len(devicesByURLAndFormat), "num_unread": userNumUnreadNotifs, - }).Tracef("Notifying single member") + }).Debugf("Notifying single member") // Push gateways are out of our control, and we cannot risk // looking up the server on a misbehaving push gateway. Each user @@ -422,8 +419,8 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event return nil, fmt.Errorf("user %s is ignored", sender) } } - var res api.QueryPushRulesResponse - if err = s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil { + ruleSets, err := s.db.QueryPushRules(ctx, mem.Localpart) + if err != nil { return nil, err } @@ -434,7 +431,7 @@ func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event roomID: event.RoomID(), roomSize: roomSize, } - eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global) + eval := pushrules.NewRuleSetEvaluator(ec, &ruleSets.Global) rule, err := eval.MatchEvent(event.Event) if err != nil { return nil, err |