aboutsummaryrefslogtreecommitdiff
path: root/userapi/util/notify.go
blob: 45d37525c1c93b5adb8658b8e6d32f8f7099c2ef (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package util

import (
	"context"
	"strings"
	"time"

	"github.com/matrix-org/dendrite/internal/pushgateway"
	"github.com/matrix-org/dendrite/userapi/storage"
	"github.com/matrix-org/dendrite/userapi/storage/tables"
	"github.com/matrix-org/gomatrixserverlib/spec"
	log "github.com/sirupsen/logrus"
)

// NotifyUserCountsAsync sends notifications to a local user's
// notification destinations. UserDatabase lookups run synchronously, but
// a single goroutine is started when talking to the Push
// gateways. There is no way to know when the background goroutine has
// finished.
func NotifyUserCountsAsync(ctx context.Context, pgClient pushgateway.Client, localpart string, serverName spec.ServerName, db storage.UserDatabase) error {
	pusherDevices, err := GetPushDevices(ctx, localpart, serverName, nil, db)
	if err != nil {
		return err
	}

	if len(pusherDevices) == 0 {
		return nil
	}

	userNumUnreadNotifs, err := db.GetNotificationCount(ctx, localpart, serverName, tables.AllNotifications)
	if err != nil {
		return err
	}

	log.WithFields(log.Fields{
		"localpart": localpart,
		"app_id0":   pusherDevices[0].Device.AppID,
		"pushkey":   pusherDevices[0].Device.PushKey,
	}).Tracef("Notifying HTTP push gateway about notification counts")

	// TODO: think about bounding this to one per user, and what
	// ordering guarantees we must provide.
	go func() {
		// This background processing cannot be tied to a request.
		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		defer cancel()

		// TODO: we could batch all devices with the same URL, but
		// Sytest requires consumers/roomserver.go to do it
		// one-by-one, so we do the same here.
		for _, pusherDevice := range pusherDevices {
			// TODO: support "email".
			if !strings.HasPrefix(pusherDevice.URL, "http") {
				continue
			}

			req := pushgateway.NotifyRequest{
				Notification: pushgateway.Notification{
					Counts: &pushgateway.Counts{
						Unread: int(userNumUnreadNotifs),
					},
					Devices: []*pushgateway.Device{&pusherDevice.Device},
				},
			}
			if err := pgClient.Notify(ctx, pusherDevice.URL, &req, &pushgateway.NotifyResponse{}); err != nil {
				log.WithFields(log.Fields{
					"localpart": localpart,
					"app_id0":   pusherDevice.Device.AppID,
					"pushkey":   pusherDevice.Device.PushKey,
				}).WithError(err).Error("HTTP push gateway request failed")
				return
			}
		}
	}()

	return nil
}