aboutsummaryrefslogtreecommitdiff
path: root/syncapi/notifier/notifier.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/notifier/notifier.go')
-rw-r--r--syncapi/notifier/notifier.go45
1 files changed, 27 insertions, 18 deletions
diff --git a/syncapi/notifier/notifier.go b/syncapi/notifier/notifier.go
index f7645685..4ee7c860 100644
--- a/syncapi/notifier/notifier.go
+++ b/syncapi/notifier/notifier.go
@@ -20,6 +20,7 @@ import (
"time"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/dendrite/roomserver/api"
rstypes "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -36,7 +37,8 @@ import (
// the event, but the token has already advanced by the time they fetch it, resulting
// in missed events.
type Notifier struct {
- lock *sync.RWMutex
+ lock *sync.RWMutex
+ rsAPI api.SyncRoomserverAPI
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
roomIDToJoinedUsers map[string]*userIDSet
// A map of RoomID => Set<UserID> : Must only be accessed by the OnNewEvent goroutine
@@ -55,8 +57,9 @@ type Notifier struct {
// NewNotifier creates a new notifier set to the given sync position.
// In order for this to be of any use, the Notifier needs to be told all rooms and
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
-func NewNotifier() *Notifier {
+func NewNotifier(rsAPI api.SyncRoomserverAPI) *Notifier {
return &Notifier{
+ rsAPI: rsAPI,
roomIDToJoinedUsers: make(map[string]*userIDSet),
roomIDToPeekingDevices: make(map[string]peekingDeviceSet),
userDeviceStreams: make(map[string]map[string]*UserDeviceStream),
@@ -104,26 +107,32 @@ func (n *Notifier) OnNewEvent(
peekingDevicesToNotify := n._peekingDevices(ev.RoomID())
// If this is an invite, also add in the invitee to this list.
if ev.Type() == "m.room.member" && ev.StateKey() != nil {
- targetUserID := *ev.StateKey()
- membership, err := ev.Membership()
+ targetUserID, err := n.rsAPI.QueryUserIDForSender(context.Background(), ev.RoomID(), spec.SenderID(*ev.StateKey()))
if err != nil {
log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
- "Notifier.OnNewEvent: Failed to unmarshal member event",
+ "Notifier.OnNewEvent: Failed to find the userID for this event",
)
} else {
- // Keep the joined user map up-to-date
- switch membership {
- case spec.Invite:
- usersToNotify = append(usersToNotify, targetUserID)
- case spec.Join:
- // Manually append the new user's ID so they get notified
- // along all members in the room
- usersToNotify = append(usersToNotify, targetUserID)
- n._addJoinedUser(ev.RoomID(), targetUserID)
- case spec.Leave:
- fallthrough
- case spec.Ban:
- n._removeJoinedUser(ev.RoomID(), targetUserID)
+ membership, err := ev.Membership()
+ if err != nil {
+ log.WithError(err).WithField("event_id", ev.EventID()).Errorf(
+ "Notifier.OnNewEvent: Failed to unmarshal member event",
+ )
+ } else {
+ // Keep the joined user map up-to-date
+ switch membership {
+ case spec.Invite:
+ usersToNotify = append(usersToNotify, targetUserID.String())
+ case spec.Join:
+ // Manually append the new user's ID so they get notified
+ // along all members in the room
+ usersToNotify = append(usersToNotify, targetUserID.String())
+ n._addJoinedUser(ev.RoomID(), targetUserID.String())
+ case spec.Leave:
+ fallthrough
+ case spec.Ban:
+ n._removeJoinedUser(ev.RoomID(), targetUserID.String())
+ }
}
}
}