1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
package util
import (
"context"
"strings"
"time"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/dendrite/userapi/storage/tables"
"github.com/matrix-org/gomatrixserverlib/spec"
log "github.com/sirupsen/logrus"
)
// NotifyUserCountsAsync sends notifications to a local user's
// notification destinations. UserDatabase lookups run synchronously, but
// a single goroutine is started when talking to the Push
// gateways. There is no way to know when the background goroutine has
// finished.
func NotifyUserCountsAsync(ctx context.Context, pgClient pushgateway.Client, localpart string, serverName spec.ServerName, db storage.UserDatabase) error {
pusherDevices, err := GetPushDevices(ctx, localpart, serverName, nil, db)
if err != nil {
return err
}
if len(pusherDevices) == 0 {
return nil
}
userNumUnreadNotifs, err := db.GetNotificationCount(ctx, localpart, serverName, tables.AllNotifications)
if err != nil {
return err
}
log.WithFields(log.Fields{
"localpart": localpart,
"app_id0": pusherDevices[0].Device.AppID,
"pushkey": pusherDevices[0].Device.PushKey,
}).Tracef("Notifying HTTP push gateway about notification counts")
// TODO: think about bounding this to one per user, and what
// ordering guarantees we must provide.
go func() {
// This background processing cannot be tied to a request.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// TODO: we could batch all devices with the same URL, but
// Sytest requires consumers/roomserver.go to do it
// one-by-one, so we do the same here.
for _, pusherDevice := range pusherDevices {
// TODO: support "email".
if !strings.HasPrefix(pusherDevice.URL, "http") {
continue
}
req := pushgateway.NotifyRequest{
Notification: pushgateway.Notification{
Counts: &pushgateway.Counts{
Unread: int(userNumUnreadNotifs),
},
Devices: []*pushgateway.Device{&pusherDevice.Device},
},
}
if err := pgClient.Notify(ctx, pusherDevice.URL, &req, &pushgateway.NotifyResponse{}); err != nil {
log.WithFields(log.Fields{
"localpart": localpart,
"app_id0": pusherDevice.Device.AppID,
"pushkey": pusherDevice.Device.PushKey,
}).WithError(err).Error("HTTP push gateway request failed")
return
}
}
}()
return nil
}
|