diff options
Diffstat (limited to 'syncapi/consumers/keychange.go')
-rw-r--r-- | syncapi/consumers/keychange.go | 191 |
1 files changed, 16 insertions, 175 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 78aff601..35978be7 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -23,10 +23,11 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" + syncinternal "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" + syncapi "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) @@ -39,6 +40,7 @@ type OutputKeyChangeEventConsumer struct { keyAPI api.KeyInternalAPI partitionToOffset map[int32]int64 partitionToOffsetMu sync.Mutex + notifier *syncapi.Notifier } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. @@ -47,6 +49,7 @@ func NewOutputKeyChangeEventConsumer( serverName gomatrixserverlib.ServerName, topic string, kafkaConsumer sarama.Consumer, + n *syncapi.Notifier, keyAPI api.KeyInternalAPI, currentStateAPI currentstateAPI.CurrentStateInternalAPI, store storage.Database, @@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer( currentStateAPI: currentStateAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, + notifier: n, } consumer.ProcessMessage = s.onMessage @@ -110,59 +114,22 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return err } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams - return nil -} - -// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response -// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST -// be already filled in with join/leave information. -func (s *OutputKeyChangeEventConsumer) Catchup( - ctx context.Context, userID string, res *types.Response, tok types.StreamingToken, -) (newTok *types.StreamingToken, hasNew bool, err error) { - // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. - newlyJoinedRooms := joinedRooms(res, userID) - newlyLeftRooms := leftRooms(res) - if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { - changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms) - if err != nil { - return nil, false, err - } - res.DeviceLists.Changed = changed - res.DeviceLists.Left = left - hasNew = len(changed) > 0 || len(left) > 0 - } - - // now also track users who we already share rooms with but who have updated their devices between the two tokens - // TODO: Extract partition/offset from sync token - var partition int32 - var offset int64 - var queryRes api.QueryKeyChangesResponse - s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ - Partition: partition, - Offset: offset, - }, &queryRes) - if queryRes.Error != nil { - // don't fail the catchup because we may have got useful information by tracking membership - util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") - } else { - // TODO: Make a new streaming token using the new offset - userSet := make(map[string]bool) - for _, userID := range res.DeviceLists.Changed { - userSet[userID] = true - } - for _, userID := range queryRes.UserIDs { - if !userSet[userID] { - res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) - } - } + posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ + syncinternal.DeviceListLogName: &types.LogPosition{ + Offset: msg.Offset, + Partition: msg.Partition, + }, + }) + for userID := range queryRes.UserIDsToCount { + s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) } - return + return nil } func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are now sharing rooms with which we previously were not and notify them about the joining // users keys: - changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil) + changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) if err != nil { log.WithError(err).Error("OnJoinEvent: failed to work out changed users") return @@ -175,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()}) + _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) if err != nil { log.WithError(err).Error("OnLeaveEvent: failed to work out left users") return @@ -186,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header } } - -// nolint:gocyclo -func (s *OutputKeyChangeEventConsumer) trackChangedUsers( - ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string, -) (changed, left []string, err error) { - // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. - - // Leave algorithm: - // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. - // - Get users in newly left room. - QueryCurrentState - // - Loop set of users and decrement by 1 for each user in newly left room. - // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. - var queryRes currentstateAPI.QuerySharedUsersResponse - err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ - UserID: userID, - IncludeRoomIDs: newlyLeftRooms, - }, &queryRes) - if err != nil { - return nil, nil, err - } - var stateRes currentstateAPI.QueryBulkStateContentResponse - err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: newlyLeftRooms, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - { - EventType: gomatrixserverlib.MRoomMember, - StateKey: "*", - }, - }, - AllowWildcards: true, - }, &stateRes) - if err != nil { - return nil, nil, err - } - for _, state := range stateRes.Rooms { - for tuple, membership := range state { - if membership != gomatrixserverlib.Join { - continue - } - queryRes.UserIDsToCount[tuple.StateKey]-- - } - } - for userID, count := range queryRes.UserIDsToCount { - if count <= 0 { - left = append(left, userID) // left is returned - } - } - - // Join algorithm: - // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. - // - Get users in newly joined room - QueryCurrentState - // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? - // - If yes: then they already shared a room in common, do nothing. - // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' - err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ - UserID: userID, - ExcludeRoomIDs: newlyJoinedRooms, - }, &queryRes) - if err != nil { - return nil, left, err - } - err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: newlyJoinedRooms, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - { - EventType: gomatrixserverlib.MRoomMember, - StateKey: "*", - }, - }, - AllowWildcards: true, - }, &stateRes) - if err != nil { - return nil, left, err - } - for _, state := range stateRes.Rooms { - for tuple, membership := range state { - if membership != gomatrixserverlib.Join { - continue - } - // new user who we weren't previously sharing rooms with - if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { - changed = append(changed, tuple.StateKey) // changed is returned - } - } - } - return changed, left, nil -} - -func joinedRooms(res *types.Response, userID string) []string { - var roomIDs []string - for roomID, join := range res.Rooms.Join { - // we would expect to see our join event somewhere if we newly joined the room. - // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. - newlyJoined := membershipEventPresent(join.State.Events, userID) - if newlyJoined { - roomIDs = append(roomIDs, roomID) - continue - } - newlyJoined = membershipEventPresent(join.Timeline.Events, userID) - if newlyJoined { - roomIDs = append(roomIDs, roomID) - } - } - return roomIDs -} - -func leftRooms(res *types.Response) []string { - roomIDs := make([]string, len(res.Rooms.Leave)) - i := 0 - for roomID := range res.Rooms.Leave { - roomIDs[i] = roomID - i++ - } - return roomIDs -} - -func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { - for _, ev := range events { - // it's enough to know that we have our member event here, don't need to check membership content - // as it's implied by being in the respective section of the sync response. - if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { - return true - } - } - return false -} |