aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/keychange.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/keychange.go')
-rw-r--r--syncapi/consumers/keychange.go191
1 files changed, 16 insertions, 175 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 78aff601..35978be7 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -23,10 +23,11 @@ import (
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
+ syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
+ syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
@@ -39,6 +40,7 @@ type OutputKeyChangeEventConsumer struct {
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
+ notifier *syncapi.Notifier
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@@ -47,6 +49,7 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
+ n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
currentStateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
@@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer(
currentStateAPI: currentStateAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
+ notifier: n,
}
consumer.ProcessMessage = s.onMessage
@@ -110,59 +114,22 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
- return nil
-}
-
-// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
-// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
-// be already filled in with join/leave information.
-func (s *OutputKeyChangeEventConsumer) Catchup(
- ctx context.Context, userID string, res *types.Response, tok types.StreamingToken,
-) (newTok *types.StreamingToken, hasNew bool, err error) {
- // Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
- newlyJoinedRooms := joinedRooms(res, userID)
- newlyLeftRooms := leftRooms(res)
- if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
- changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
- if err != nil {
- return nil, false, err
- }
- res.DeviceLists.Changed = changed
- res.DeviceLists.Left = left
- hasNew = len(changed) > 0 || len(left) > 0
- }
-
- // now also track users who we already share rooms with but who have updated their devices between the two tokens
- // TODO: Extract partition/offset from sync token
- var partition int32
- var offset int64
- var queryRes api.QueryKeyChangesResponse
- s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
- Partition: partition,
- Offset: offset,
- }, &queryRes)
- if queryRes.Error != nil {
- // don't fail the catchup because we may have got useful information by tracking membership
- util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
- } else {
- // TODO: Make a new streaming token using the new offset
- userSet := make(map[string]bool)
- for _, userID := range res.DeviceLists.Changed {
- userSet[userID] = true
- }
- for _, userID := range queryRes.UserIDs {
- if !userSet[userID] {
- res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
- }
- }
+ posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
+ syncinternal.DeviceListLogName: &types.LogPosition{
+ Offset: msg.Offset,
+ Partition: msg.Partition,
+ },
+ })
+ for userID := range queryRes.UserIDsToCount {
+ s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
}
- return
+ return nil
}
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys:
- changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil)
+ changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return
@@ -175,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user
- _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()})
+ _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return
@@ -186,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header
}
}
-
-// nolint:gocyclo
-func (s *OutputKeyChangeEventConsumer) trackChangedUsers(
- ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string,
-) (changed, left []string, err error) {
- // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
-
- // Leave algorithm:
- // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
- // - Get users in newly left room. - QueryCurrentState
- // - Loop set of users and decrement by 1 for each user in newly left room.
- // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
- var queryRes currentstateAPI.QuerySharedUsersResponse
- err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
- UserID: userID,
- IncludeRoomIDs: newlyLeftRooms,
- }, &queryRes)
- if err != nil {
- return nil, nil, err
- }
- var stateRes currentstateAPI.QueryBulkStateContentResponse
- err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
- RoomIDs: newlyLeftRooms,
- StateTuples: []gomatrixserverlib.StateKeyTuple{
- {
- EventType: gomatrixserverlib.MRoomMember,
- StateKey: "*",
- },
- },
- AllowWildcards: true,
- }, &stateRes)
- if err != nil {
- return nil, nil, err
- }
- for _, state := range stateRes.Rooms {
- for tuple, membership := range state {
- if membership != gomatrixserverlib.Join {
- continue
- }
- queryRes.UserIDsToCount[tuple.StateKey]--
- }
- }
- for userID, count := range queryRes.UserIDsToCount {
- if count <= 0 {
- left = append(left, userID) // left is returned
- }
- }
-
- // Join algorithm:
- // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
- // - Get users in newly joined room - QueryCurrentState
- // - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
- // - If yes: then they already shared a room in common, do nothing.
- // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
- err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
- UserID: userID,
- ExcludeRoomIDs: newlyJoinedRooms,
- }, &queryRes)
- if err != nil {
- return nil, left, err
- }
- err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
- RoomIDs: newlyJoinedRooms,
- StateTuples: []gomatrixserverlib.StateKeyTuple{
- {
- EventType: gomatrixserverlib.MRoomMember,
- StateKey: "*",
- },
- },
- AllowWildcards: true,
- }, &stateRes)
- if err != nil {
- return nil, left, err
- }
- for _, state := range stateRes.Rooms {
- for tuple, membership := range state {
- if membership != gomatrixserverlib.Join {
- continue
- }
- // new user who we weren't previously sharing rooms with
- if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
- changed = append(changed, tuple.StateKey) // changed is returned
- }
- }
- }
- return changed, left, nil
-}
-
-func joinedRooms(res *types.Response, userID string) []string {
- var roomIDs []string
- for roomID, join := range res.Rooms.Join {
- // we would expect to see our join event somewhere if we newly joined the room.
- // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
- newlyJoined := membershipEventPresent(join.State.Events, userID)
- if newlyJoined {
- roomIDs = append(roomIDs, roomID)
- continue
- }
- newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
- if newlyJoined {
- roomIDs = append(roomIDs, roomID)
- }
- }
- return roomIDs
-}
-
-func leftRooms(res *types.Response) []string {
- roomIDs := make([]string, len(res.Rooms.Leave))
- i := 0
- for roomID := range res.Rooms.Leave {
- roomIDs[i] = roomID
- i++
- }
- return roomIDs
-}
-
-func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
- for _, ev := range events {
- // it's enough to know that we have our member event here, don't need to check membership content
- // as it's implied by being in the respective section of the sync response.
- if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
- return true
- }
- }
- return false
-}