aboutsummaryrefslogtreecommitdiff
path: root/userapi/consumers/syncapi_streamevent.go
diff options
context:
space:
mode:
Diffstat (limited to 'userapi/consumers/syncapi_streamevent.go')
-rw-r--r--userapi/consumers/syncapi_streamevent.go588
1 files changed, 588 insertions, 0 deletions
diff --git a/userapi/consumers/syncapi_streamevent.go b/userapi/consumers/syncapi_streamevent.go
new file mode 100644
index 00000000..11081327
--- /dev/null
+++ b/userapi/consumers/syncapi_streamevent.go
@@ -0,0 +1,588 @@
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/matrix-org/dendrite/internal/eventutil"
+ "github.com/matrix-org/dendrite/internal/pushgateway"
+ "github.com/matrix-org/dendrite/internal/pushrules"
+ rsapi "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/dendrite/userapi/producers"
+ "github.com/matrix-org/dendrite/userapi/storage"
+ "github.com/matrix-org/dendrite/userapi/storage/tables"
+ "github.com/matrix-org/dendrite/userapi/util"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+type OutputStreamEventConsumer struct {
+ ctx context.Context
+ cfg *config.UserAPI
+ userAPI api.UserInternalAPI
+ rsAPI rsapi.RoomserverInternalAPI
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ topic string
+ pgClient pushgateway.Client
+ syncProducer *producers.SyncAPI
+}
+
+func NewOutputStreamEventConsumer(
+ process *process.ProcessContext,
+ cfg *config.UserAPI,
+ js nats.JetStreamContext,
+ store storage.Database,
+ pgClient pushgateway.Client,
+ userAPI api.UserInternalAPI,
+ rsAPI rsapi.RoomserverInternalAPI,
+ syncProducer *producers.SyncAPI,
+) *OutputStreamEventConsumer {
+ return &OutputStreamEventConsumer{
+ ctx: process.Context(),
+ cfg: cfg,
+ jetstream: js,
+ db: store,
+ durable: cfg.Matrix.JetStream.Durable("UserAPISyncAPIStreamEventConsumer"),
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputStreamEvent),
+ pgClient: pgClient,
+ userAPI: userAPI,
+ rsAPI: rsAPI,
+ syncProducer: syncProducer,
+ }
+}
+
+func (s *OutputStreamEventConsumer) Start() error {
+ if err := jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ ); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *OutputStreamEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ var output types.StreamedEvent
+ output.Event = &gomatrixserverlib.HeaderedEvent{}
+ if err := json.Unmarshal(msg.Data, &output); err != nil {
+ log.WithError(err).Errorf("userapi consumer: message parse failure")
+ return true
+ }
+ if output.Event.Event == nil {
+ log.Errorf("userapi consumer: expected event")
+ return true
+ }
+
+ log.WithFields(log.Fields{
+ "event_id": output.Event.EventID(),
+ "event_type": output.Event.Type(),
+ "stream_pos": output.StreamPosition,
+ }).Tracef("Received message from sync API: %#v", output)
+
+ if err := s.processMessage(ctx, output.Event, int64(output.StreamPosition)); err != nil {
+ log.WithFields(log.Fields{
+ "event_id": output.Event.EventID(),
+ }).WithError(err).Errorf("userapi consumer: process room event failure")
+ }
+
+ return true
+}
+
+func (s *OutputStreamEventConsumer) processMessage(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos int64) error {
+ members, roomSize, err := s.localRoomMembers(ctx, event.RoomID())
+ if err != nil {
+ return fmt.Errorf("s.localRoomMembers: %w", err)
+ }
+
+ if event.Type() == gomatrixserverlib.MRoomMember {
+ cevent := gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatAll)
+ var member *localMembership
+ member, err = newLocalMembership(&cevent)
+ if err != nil {
+ return fmt.Errorf("newLocalMembership: %w", err)
+ }
+ if member.Membership == gomatrixserverlib.Invite && member.Domain == s.cfg.Matrix.ServerName {
+ // localRoomMembers only adds joined members. An invite
+ // should also be pushed to the target user.
+ members = append(members, member)
+ }
+ }
+
+ // TODO: run in parallel with localRoomMembers.
+ roomName, err := s.roomName(ctx, event)
+ if err != nil {
+ return fmt.Errorf("s.roomName: %w", err)
+ }
+
+ log.WithFields(log.Fields{
+ "event_id": event.EventID(),
+ "room_id": event.RoomID(),
+ "num_members": len(members),
+ "room_size": roomSize,
+ }).Tracef("Notifying members")
+
+ // Notification.UserIsTarget is a per-member field, so we
+ // cannot group all users in a single request.
+ //
+ // TODO: does it have to be set? It's not required, and
+ // removing it means we can send all notifications to
+ // e.g. Element's Push gateway in one go.
+ for _, mem := range members {
+ if err := s.notifyLocal(ctx, event, pos, mem, roomSize, roomName); err != nil {
+ log.WithFields(log.Fields{
+ "localpart": mem.Localpart,
+ }).WithError(err).Debugf("Unable to push to local user")
+ continue
+ }
+ }
+
+ return nil
+}
+
+type localMembership struct {
+ gomatrixserverlib.MemberContent
+ UserID string
+ Localpart string
+ Domain gomatrixserverlib.ServerName
+}
+
+func newLocalMembership(event *gomatrixserverlib.ClientEvent) (*localMembership, error) {
+ if event.StateKey == nil {
+ return nil, fmt.Errorf("missing state_key")
+ }
+
+ var member localMembership
+ if err := json.Unmarshal(event.Content, &member.MemberContent); err != nil {
+ return nil, err
+ }
+
+ localpart, domain, err := gomatrixserverlib.SplitID('@', *event.StateKey)
+ if err != nil {
+ return nil, err
+ }
+
+ member.UserID = *event.StateKey
+ member.Localpart = localpart
+ member.Domain = domain
+ return &member, nil
+}
+
+// localRoomMembers fetches the current local members of a room, and
+// the total number of members.
+func (s *OutputStreamEventConsumer) localRoomMembers(ctx context.Context, roomID string) ([]*localMembership, int, error) {
+ req := &rsapi.QueryMembershipsForRoomRequest{
+ RoomID: roomID,
+ JoinedOnly: true,
+ }
+ var res rsapi.QueryMembershipsForRoomResponse
+
+ // XXX: This could potentially race if the state for the event is not known yet
+ // e.g. the event came over federation but we do not have the full state persisted.
+ if err := s.rsAPI.QueryMembershipsForRoom(ctx, req, &res); err != nil {
+ return nil, 0, err
+ }
+
+ var members []*localMembership
+ var ntotal int
+ for _, event := range res.JoinEvents {
+ member, err := newLocalMembership(&event)
+ if err != nil {
+ log.WithError(err).Errorf("Parsing MemberContent")
+ continue
+ }
+ if member.Membership != gomatrixserverlib.Join {
+ continue
+ }
+ if member.Domain != s.cfg.Matrix.ServerName {
+ continue
+ }
+
+ ntotal++
+ members = append(members, member)
+ }
+
+ return members, ntotal, nil
+}
+
+// roomName returns the name in the event (if type==m.room.name), or
+// looks it up in roomserver. If there is no name,
+// m.room.canonical_alias is consulted. Returns an empty string if the
+// room has no name.
+func (s *OutputStreamEventConsumer) roomName(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) (string, error) {
+ if event.Type() == gomatrixserverlib.MRoomName {
+ name, err := unmarshalRoomName(event)
+ if err != nil {
+ return "", err
+ }
+
+ if name != "" {
+ return name, nil
+ }
+ }
+
+ req := &rsapi.QueryCurrentStateRequest{
+ RoomID: event.RoomID(),
+ StateTuples: []gomatrixserverlib.StateKeyTuple{roomNameTuple, canonicalAliasTuple},
+ }
+ var res rsapi.QueryCurrentStateResponse
+
+ if err := s.rsAPI.QueryCurrentState(ctx, req, &res); err != nil {
+ return "", nil
+ }
+
+ if eventS := res.StateEvents[roomNameTuple]; eventS != nil {
+ return unmarshalRoomName(eventS)
+ }
+
+ if event.Type() == gomatrixserverlib.MRoomCanonicalAlias {
+ alias, err := unmarshalCanonicalAlias(event)
+ if err != nil {
+ return "", err
+ }
+
+ if alias != "" {
+ return alias, nil
+ }
+ }
+
+ if event = res.StateEvents[canonicalAliasTuple]; event != nil {
+ return unmarshalCanonicalAlias(event)
+ }
+
+ return "", nil
+}
+
+var (
+ canonicalAliasTuple = gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias}
+ roomNameTuple = gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomName}
+)
+
+func unmarshalRoomName(event *gomatrixserverlib.HeaderedEvent) (string, error) {
+ var nc eventutil.NameContent
+ if err := json.Unmarshal(event.Content(), &nc); err != nil {
+ return "", fmt.Errorf("unmarshaling NameContent: %w", err)
+ }
+
+ return nc.Name, nil
+}
+
+func unmarshalCanonicalAlias(event *gomatrixserverlib.HeaderedEvent) (string, error) {
+ var cac eventutil.CanonicalAliasContent
+ if err := json.Unmarshal(event.Content(), &cac); err != nil {
+ return "", fmt.Errorf("unmarshaling CanonicalAliasContent: %w", err)
+ }
+
+ return cac.Alias, nil
+}
+
+// notifyLocal finds the right push actions for a local user, given an event.
+func (s *OutputStreamEventConsumer) notifyLocal(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, pos int64, mem *localMembership, roomSize int, roomName string) error {
+ actions, err := s.evaluatePushRules(ctx, event, mem, roomSize)
+ if err != nil {
+ return err
+ }
+ a, tweaks, err := pushrules.ActionsToTweaks(actions)
+ if err != nil {
+ return err
+ }
+ // TODO: support coalescing.
+ if a != pushrules.NotifyAction && a != pushrules.CoalesceAction {
+ log.WithFields(log.Fields{
+ "event_id": event.EventID(),
+ "room_id": event.RoomID(),
+ "localpart": mem.Localpart,
+ }).Tracef("Push rule evaluation rejected the event")
+ return nil
+ }
+
+ devicesByURLAndFormat, profileTag, err := s.localPushDevices(ctx, mem.Localpart, tweaks)
+ if err != nil {
+ return err
+ }
+
+ n := &api.Notification{
+ Actions: actions,
+ // UNSPEC: the spec doesn't say this is a ClientEvent, but the
+ // fields seem to match. room_id should be missing, which
+ // matches the behaviour of FormatSync.
+ Event: gomatrixserverlib.HeaderedToClientEvent(event, gomatrixserverlib.FormatSync),
+ // TODO: this is per-device, but it's not part of the primary
+ // key. So inserting one notification per profile tag doesn't
+ // make sense. What is this supposed to be? Sytests require it
+ // to "work", but they only use a single device.
+ ProfileTag: profileTag,
+ RoomID: event.RoomID(),
+ TS: gomatrixserverlib.AsTimestamp(time.Now()),
+ }
+ if err = s.db.InsertNotification(ctx, mem.Localpart, event.EventID(), pos, tweaks, n); err != nil {
+ return err
+ }
+
+ if err = s.syncProducer.GetAndSendNotificationData(ctx, mem.UserID, event.RoomID()); err != nil {
+ return err
+ }
+
+ // We do this after InsertNotification. Thus, this should always return >=1.
+ userNumUnreadNotifs, err := s.db.GetNotificationCount(ctx, mem.Localpart, tables.AllNotifications)
+ if err != nil {
+ return err
+ }
+
+ log.WithFields(log.Fields{
+ "event_id": event.EventID(),
+ "room_id": event.RoomID(),
+ "localpart": mem.Localpart,
+ "num_urls": len(devicesByURLAndFormat),
+ "num_unread": userNumUnreadNotifs,
+ }).Tracef("Notifying single member")
+
+ // Push gateways are out of our control, and we cannot risk
+ // looking up the server on a misbehaving push gateway. Each user
+ // receives a goroutine now that all internal API calls have been
+ // made.
+ //
+ // TODO: think about bounding this to one per user, and what
+ // ordering guarantees we must provide.
+ go func() {
+ // This background processing cannot be tied to a request.
+ ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ var rejected []*pushgateway.Device
+ for url, fmts := range devicesByURLAndFormat {
+ for format, devices := range fmts {
+ // TODO: support "email".
+ if !strings.HasPrefix(url, "http") {
+ continue
+ }
+
+ // UNSPEC: the specification suggests there can be
+ // more than one device per request. There is at least
+ // one Sytest that expects one HTTP request per
+ // device, rather than per URL. For now, we must
+ // notify each one separately.
+ for _, dev := range devices {
+ rej, err := s.notifyHTTP(ctx, event, url, format, []*pushgateway.Device{dev}, mem.Localpart, roomName, int(userNumUnreadNotifs))
+ if err != nil {
+ log.WithFields(log.Fields{
+ "event_id": event.EventID(),
+ "localpart": mem.Localpart,
+ }).WithError(err).Errorf("Unable to notify HTTP pusher")
+ continue
+ }
+ rejected = append(rejected, rej...)
+ }
+ }
+ }
+
+ if len(rejected) > 0 {
+ s.deleteRejectedPushers(ctx, rejected, mem.Localpart)
+ }
+ }()
+
+ return nil
+}
+
+// evaluatePushRules fetches and evaluates the push rules of a local
+// user. Returns actions (including dont_notify).
+func (s *OutputStreamEventConsumer) evaluatePushRules(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, mem *localMembership, roomSize int) ([]*pushrules.Action, error) {
+ if event.Sender() == mem.UserID {
+ // SPEC: Homeservers MUST NOT notify the Push Gateway for
+ // events that the user has sent themselves.
+ return nil, nil
+ }
+
+ var res api.QueryPushRulesResponse
+ if err := s.userAPI.QueryPushRules(ctx, &api.QueryPushRulesRequest{UserID: mem.UserID}, &res); err != nil {
+ return nil, err
+ }
+
+ ec := &ruleSetEvalContext{
+ ctx: ctx,
+ rsAPI: s.rsAPI,
+ mem: mem,
+ roomID: event.RoomID(),
+ roomSize: roomSize,
+ }
+ eval := pushrules.NewRuleSetEvaluator(ec, &res.RuleSets.Global)
+ rule, err := eval.MatchEvent(event.Event)
+ if err != nil {
+ return nil, err
+ }
+ if rule == nil {
+ // SPEC: If no rules match an event, the homeserver MUST NOT
+ // notify the Push Gateway for that event.
+ return nil, err
+ }
+
+ log.WithFields(log.Fields{
+ "event_id": event.EventID(),
+ "room_id": event.RoomID(),
+ "localpart": mem.Localpart,
+ "rule_id": rule.RuleID,
+ }).Tracef("Matched a push rule")
+
+ return rule.Actions, nil
+}
+
+type ruleSetEvalContext struct {
+ ctx context.Context
+ rsAPI rsapi.RoomserverInternalAPI
+ mem *localMembership
+ roomID string
+ roomSize int
+}
+
+func (rse *ruleSetEvalContext) UserDisplayName() string { return rse.mem.DisplayName }
+
+func (rse *ruleSetEvalContext) RoomMemberCount() (int, error) { return rse.roomSize, nil }
+
+func (rse *ruleSetEvalContext) HasPowerLevel(userID, levelKey string) (bool, error) {
+ req := &rsapi.QueryLatestEventsAndStateRequest{
+ RoomID: rse.roomID,
+ StateToFetch: []gomatrixserverlib.StateKeyTuple{
+ {EventType: gomatrixserverlib.MRoomPowerLevels},
+ },
+ }
+ var res rsapi.QueryLatestEventsAndStateResponse
+ if err := rse.rsAPI.QueryLatestEventsAndState(rse.ctx, req, &res); err != nil {
+ return false, err
+ }
+ for _, ev := range res.StateEvents {
+ if ev.Type() != gomatrixserverlib.MRoomPowerLevels {
+ continue
+ }
+
+ plc, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev.Event)
+ if err != nil {
+ return false, err
+ }
+ return plc.UserLevel(userID) >= plc.NotificationLevel(levelKey), nil
+ }
+ return true, nil
+}
+
+// localPushDevices pushes to the configured devices of a local
+// user. The map keys are [url][format].
+func (s *OutputStreamEventConsumer) localPushDevices(ctx context.Context, localpart string, tweaks map[string]interface{}) (map[string]map[string][]*pushgateway.Device, string, error) {
+ pusherDevices, err := util.GetPushDevices(ctx, localpart, tweaks, s.db)
+ if err != nil {
+ return nil, "", err
+ }
+
+ var profileTag string
+ devicesByURL := make(map[string]map[string][]*pushgateway.Device, len(pusherDevices))
+ for _, pusherDevice := range pusherDevices {
+ if profileTag == "" {
+ profileTag = pusherDevice.Pusher.ProfileTag
+ }
+
+ url := pusherDevice.URL
+ if devicesByURL[url] == nil {
+ devicesByURL[url] = make(map[string][]*pushgateway.Device, 2)
+ }
+ devicesByURL[url][pusherDevice.Format] = append(devicesByURL[url][pusherDevice.Format], &pusherDevice.Device)
+ }
+
+ return devicesByURL, profileTag, nil
+}
+
+// notifyHTTP performs a notificatation to a Push Gateway.
+func (s *OutputStreamEventConsumer) notifyHTTP(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, url, format string, devices []*pushgateway.Device, localpart, roomName string, userNumUnreadNotifs int) ([]*pushgateway.Device, error) {
+ logger := log.WithFields(log.Fields{
+ "event_id": event.EventID(),
+ "url": url,
+ "localpart": localpart,
+ "num_devices": len(devices),
+ })
+
+ var req pushgateway.NotifyRequest
+ switch format {
+ case "event_id_only":
+ req = pushgateway.NotifyRequest{
+ Notification: pushgateway.Notification{
+ Counts: &pushgateway.Counts{},
+ Devices: devices,
+ EventID: event.EventID(),
+ RoomID: event.RoomID(),
+ },
+ }
+
+ default:
+ req = pushgateway.NotifyRequest{
+ Notification: pushgateway.Notification{
+ Content: event.Content(),
+ Counts: &pushgateway.Counts{
+ Unread: userNumUnreadNotifs,
+ },
+ Devices: devices,
+ EventID: event.EventID(),
+ ID: event.EventID(),
+ RoomID: event.RoomID(),
+ RoomName: roomName,
+ Sender: event.Sender(),
+ Type: event.Type(),
+ },
+ }
+ if mem, err := event.Membership(); err == nil {
+ req.Notification.Membership = mem
+ }
+ if event.StateKey() != nil && *event.StateKey() == fmt.Sprintf("@%s:%s", localpart, s.cfg.Matrix.ServerName) {
+ req.Notification.UserIsTarget = true
+ }
+ }
+
+ logger.Debugf("Notifying push gateway %s", url)
+ var res pushgateway.NotifyResponse
+ if err := s.pgClient.Notify(ctx, url, &req, &res); err != nil {
+ logger.WithError(err).Errorf("Failed to notify push gateway %s", url)
+ return nil, err
+ }
+ logger.WithField("num_rejected", len(res.Rejected)).Tracef("Push gateway result")
+
+ if len(res.Rejected) == 0 {
+ return nil, nil
+ }
+
+ devMap := make(map[string]*pushgateway.Device, len(devices))
+ for _, d := range devices {
+ devMap[d.PushKey] = d
+ }
+ rejected := make([]*pushgateway.Device, 0, len(res.Rejected))
+ for _, pushKey := range res.Rejected {
+ d := devMap[pushKey]
+ if d != nil {
+ rejected = append(rejected, d)
+ }
+ }
+
+ return rejected, nil
+}
+
+// deleteRejectedPushers deletes the pushers associated with the given devices.
+func (s *OutputStreamEventConsumer) deleteRejectedPushers(ctx context.Context, devices []*pushgateway.Device, localpart string) {
+ log.WithFields(log.Fields{
+ "localpart": localpart,
+ "app_id0": devices[0].AppID,
+ "num_devices": len(devices),
+ }).Warnf("Deleting pushers rejected by the HTTP push gateway")
+
+ for _, d := range devices {
+ if err := s.db.RemovePusher(ctx, d.AppID, d.PushKey, localpart); err != nil {
+ log.WithFields(log.Fields{
+ "localpart": localpart,
+ }).WithError(err).Errorf("Unable to delete rejected pusher")
+ }
+ }
+}