diff options
author | Kegsay <kegan@matrix.org> | 2020-07-30 11:15:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-30 11:15:46 +0100 |
commit | 9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b (patch) | |
tree | c6a155f7b65e25c8ab4abaf7413fbb0531a02b12 /syncapi | |
parent | 0fdd4f14d123e76bd3d0368947d3aab84a787946 (diff) |
Hook up device list updates to the sync notifier (#1231)
* WIP hooking up key changes
* Fix import cycle, get tests passing and binary compiling
* Linting and update whitelist
Diffstat (limited to 'syncapi')
-rw-r--r-- | syncapi/consumers/keychange.go | 191 | ||||
-rw-r--r-- | syncapi/internal/keychange.go | 219 | ||||
-rw-r--r-- | syncapi/internal/keychange_test.go (renamed from syncapi/consumers/keychange_test.go) | 93 | ||||
-rw-r--r-- | syncapi/sync/notifier.go | 10 | ||||
-rw-r--r-- | syncapi/sync/requestpool.go | 32 | ||||
-rw-r--r-- | syncapi/syncapi.go | 14 | ||||
-rw-r--r-- | syncapi/types/types.go | 4 |
7 files changed, 335 insertions, 228 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go index 78aff601..35978be7 100644 --- a/syncapi/consumers/keychange.go +++ b/syncapi/consumers/keychange.go @@ -23,10 +23,11 @@ import ( currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/api" + syncinternal "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" + syncapi "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/util" log "github.com/sirupsen/logrus" ) @@ -39,6 +40,7 @@ type OutputKeyChangeEventConsumer struct { keyAPI api.KeyInternalAPI partitionToOffset map[int32]int64 partitionToOffsetMu sync.Mutex + notifier *syncapi.Notifier } // NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer. @@ -47,6 +49,7 @@ func NewOutputKeyChangeEventConsumer( serverName gomatrixserverlib.ServerName, topic string, kafkaConsumer sarama.Consumer, + n *syncapi.Notifier, keyAPI api.KeyInternalAPI, currentStateAPI currentstateAPI.CurrentStateInternalAPI, store storage.Database, @@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer( currentStateAPI: currentStateAPI, partitionToOffset: make(map[int32]int64), partitionToOffsetMu: sync.Mutex{}, + notifier: n, } consumer.ProcessMessage = s.onMessage @@ -110,59 +114,22 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er return err } // TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams - return nil -} - -// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response -// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST -// be already filled in with join/leave information. -func (s *OutputKeyChangeEventConsumer) Catchup( - ctx context.Context, userID string, res *types.Response, tok types.StreamingToken, -) (newTok *types.StreamingToken, hasNew bool, err error) { - // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. - newlyJoinedRooms := joinedRooms(res, userID) - newlyLeftRooms := leftRooms(res) - if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { - changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms) - if err != nil { - return nil, false, err - } - res.DeviceLists.Changed = changed - res.DeviceLists.Left = left - hasNew = len(changed) > 0 || len(left) > 0 - } - - // now also track users who we already share rooms with but who have updated their devices between the two tokens - // TODO: Extract partition/offset from sync token - var partition int32 - var offset int64 - var queryRes api.QueryKeyChangesResponse - s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ - Partition: partition, - Offset: offset, - }, &queryRes) - if queryRes.Error != nil { - // don't fail the catchup because we may have got useful information by tracking membership - util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") - } else { - // TODO: Make a new streaming token using the new offset - userSet := make(map[string]bool) - for _, userID := range res.DeviceLists.Changed { - userSet[userID] = true - } - for _, userID := range queryRes.UserIDs { - if !userSet[userID] { - res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) - } - } + posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{ + syncinternal.DeviceListLogName: &types.LogPosition{ + Offset: msg.Offset, + Partition: msg.Partition, + }, + }) + for userID := range queryRes.UserIDsToCount { + s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID) } - return + return nil } func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are now sharing rooms with which we previously were not and notify them about the joining // users keys: - changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil) + changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil) if err != nil { log.WithError(err).Error("OnJoinEvent: failed to work out changed users") return @@ -175,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) { // work out who we are no longer sharing any rooms with and notify them about the leaving user - _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()}) + _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()}) if err != nil { log.WithError(err).Error("OnLeaveEvent: failed to work out left users") return @@ -186,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header } } - -// nolint:gocyclo -func (s *OutputKeyChangeEventConsumer) trackChangedUsers( - ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string, -) (changed, left []string, err error) { - // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. - - // Leave algorithm: - // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. - // - Get users in newly left room. - QueryCurrentState - // - Loop set of users and decrement by 1 for each user in newly left room. - // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. - var queryRes currentstateAPI.QuerySharedUsersResponse - err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ - UserID: userID, - IncludeRoomIDs: newlyLeftRooms, - }, &queryRes) - if err != nil { - return nil, nil, err - } - var stateRes currentstateAPI.QueryBulkStateContentResponse - err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: newlyLeftRooms, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - { - EventType: gomatrixserverlib.MRoomMember, - StateKey: "*", - }, - }, - AllowWildcards: true, - }, &stateRes) - if err != nil { - return nil, nil, err - } - for _, state := range stateRes.Rooms { - for tuple, membership := range state { - if membership != gomatrixserverlib.Join { - continue - } - queryRes.UserIDsToCount[tuple.StateKey]-- - } - } - for userID, count := range queryRes.UserIDsToCount { - if count <= 0 { - left = append(left, userID) // left is returned - } - } - - // Join algorithm: - // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. - // - Get users in newly joined room - QueryCurrentState - // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? - // - If yes: then they already shared a room in common, do nothing. - // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' - err = s.currentStateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ - UserID: userID, - ExcludeRoomIDs: newlyJoinedRooms, - }, &queryRes) - if err != nil { - return nil, left, err - } - err = s.currentStateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: newlyJoinedRooms, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - { - EventType: gomatrixserverlib.MRoomMember, - StateKey: "*", - }, - }, - AllowWildcards: true, - }, &stateRes) - if err != nil { - return nil, left, err - } - for _, state := range stateRes.Rooms { - for tuple, membership := range state { - if membership != gomatrixserverlib.Join { - continue - } - // new user who we weren't previously sharing rooms with - if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { - changed = append(changed, tuple.StateKey) // changed is returned - } - } - } - return changed, left, nil -} - -func joinedRooms(res *types.Response, userID string) []string { - var roomIDs []string - for roomID, join := range res.Rooms.Join { - // we would expect to see our join event somewhere if we newly joined the room. - // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. - newlyJoined := membershipEventPresent(join.State.Events, userID) - if newlyJoined { - roomIDs = append(roomIDs, roomID) - continue - } - newlyJoined = membershipEventPresent(join.Timeline.Events, userID) - if newlyJoined { - roomIDs = append(roomIDs, roomID) - } - } - return roomIDs -} - -func leftRooms(res *types.Response) []string { - roomIDs := make([]string, len(res.Rooms.Leave)) - i := 0 - for roomID := range res.Rooms.Leave { - roomIDs[i] = roomID - i++ - } - return roomIDs -} - -func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { - for _, ev := range events { - // it's enough to know that we have our member event here, don't need to check membership content - // as it's implied by being in the respective section of the sync response. - if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { - return true - } - } - return false -} diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go new file mode 100644 index 00000000..b594cc62 --- /dev/null +++ b/syncapi/internal/keychange.go @@ -0,0 +1,219 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + + "github.com/Shopify/sarama" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + "github.com/matrix-org/dendrite/keyserver/api" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const DeviceListLogName = "dl" + +// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response +// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST +// be already filled in with join/leave information. +func DeviceListCatchup( + ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, + userID string, res *types.Response, tok types.StreamingToken, +) (newTok *types.StreamingToken, hasNew bool, err error) { + // Track users who we didn't track before but now do by virtue of sharing a room with them, or not. + newlyJoinedRooms := joinedRooms(res, userID) + newlyLeftRooms := leftRooms(res) + if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 { + changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms) + if err != nil { + return nil, false, err + } + res.DeviceLists.Changed = changed + res.DeviceLists.Left = left + hasNew = len(changed) > 0 || len(left) > 0 + } + + // now also track users who we already share rooms with but who have updated their devices between the two tokens + + var partition int32 + var offset int64 + // Extract partition/offset from sync token + // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make. + logOffset := tok.Log(DeviceListLogName) + if logOffset != nil { + partition = logOffset.Partition + offset = logOffset.Offset + } else { + partition = -1 + offset = sarama.OffsetOldest + } + var queryRes api.QueryKeyChangesResponse + keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{ + Partition: partition, + Offset: offset, + }, &queryRes) + if queryRes.Error != nil { + // don't fail the catchup because we may have got useful information by tracking membership + util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed") + return + } + userSet := make(map[string]bool) + for _, userID := range res.DeviceLists.Changed { + userSet[userID] = true + } + for _, userID := range queryRes.UserIDs { + if !userSet[userID] { + res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID) + hasNew = true + } + } + // Make a new streaming token using the new offset + tok.SetLog(DeviceListLogName, &types.LogPosition{ + Offset: queryRes.Offset, + Partition: queryRes.Partition, + }) + newTok = &tok + return +} + +// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response. +// nolint:gocyclo +func TrackChangedUsers( + ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string, +) (changed, left []string, err error) { + // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users. + + // Leave algorithm: + // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'. + // - Get users in newly left room. - QueryCurrentState + // - Loop set of users and decrement by 1 for each user in newly left room. + // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync. + var queryRes currentstateAPI.QuerySharedUsersResponse + err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + IncludeRoomIDs: newlyLeftRooms, + }, &queryRes) + if err != nil { + return nil, nil, err + } + var stateRes currentstateAPI.QueryBulkStateContentResponse + err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyLeftRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, nil, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != gomatrixserverlib.Join { + continue + } + queryRes.UserIDsToCount[tuple.StateKey]-- + } + } + for userID, count := range queryRes.UserIDsToCount { + if count <= 0 { + left = append(left, userID) // left is returned + } + } + + // Join algorithm: + // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'. + // - Get users in newly joined room - QueryCurrentState + // - Loop set of users in newly joined room, do they appear in the set of users prior to joining? + // - If yes: then they already shared a room in common, do nothing. + // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]' + err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{ + UserID: userID, + ExcludeRoomIDs: newlyJoinedRooms, + }, &queryRes) + if err != nil { + return nil, left, err + } + err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ + RoomIDs: newlyJoinedRooms, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomMember, + StateKey: "*", + }, + }, + AllowWildcards: true, + }, &stateRes) + if err != nil { + return nil, left, err + } + for _, state := range stateRes.Rooms { + for tuple, membership := range state { + if membership != gomatrixserverlib.Join { + continue + } + // new user who we weren't previously sharing rooms with + if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok { + changed = append(changed, tuple.StateKey) // changed is returned + } + } + } + return changed, left, nil +} + +func joinedRooms(res *types.Response, userID string) []string { + var roomIDs []string + for roomID, join := range res.Rooms.Join { + // we would expect to see our join event somewhere if we newly joined the room. + // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'. + newlyJoined := membershipEventPresent(join.State.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + continue + } + newlyJoined = membershipEventPresent(join.Timeline.Events, userID) + if newlyJoined { + roomIDs = append(roomIDs, roomID) + } + } + return roomIDs +} + +func leftRooms(res *types.Response) []string { + roomIDs := make([]string, len(res.Rooms.Leave)) + i := 0 + for roomID := range res.Rooms.Leave { + roomIDs[i] = roomID + i++ + } + return roomIDs +} + +func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool { + for _, ev := range events { + // it's enough to know that we have our member event here, don't need to check membership content + // as it's implied by being in the respective section of the sync response. + if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID { + return true + } + } + return false +} diff --git a/syncapi/consumers/keychange_test.go b/syncapi/internal/keychange_test.go index 3ecb3f58..d0d27e44 100644 --- a/syncapi/consumers/keychange_test.go +++ b/syncapi/internal/keychange_test.go @@ -1,4 +1,4 @@ -package consumers +package internal import ( "context" @@ -159,18 +159,17 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { newShareUser := "@bill:localhost" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, @@ -182,18 +181,17 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { removeUser := "@bill:localhost" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {removeUser}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, @@ -205,16 +203,15 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) { func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { existingUser := "@bob:localhost" newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -227,18 +224,17 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) { func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { existingUser := "@bob:localhost" newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyLeftRoom: {existingUser}, "!another:room": {syncingUser, existingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: false, @@ -249,11 +245,6 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) { func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { existingUser := "@bob1:localhost" roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ - roomIDToJoinedMembers: map[string][]string{ - roomID: {syncingUser, existingUser}, - }, - }, nil) syncResponse := types.NewResponse() empty := "" roomStateEvents := []gomatrixserverlib.ClientEvent{ @@ -295,9 +286,13 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) { jr.Timeline.Events = roomTimelineEvents syncResponse.Rooms.Join[roomID] = jr - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + roomID: {syncingUser, existingUser}, + }, + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: false, @@ -312,18 +307,17 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) { newlyLeftUser2 := "@debra:localhost" newlyJoinedRoom := "!join:bar" newlyLeftRoom := "!left:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ + syncResponse := types.NewResponse() + syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) + syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) + + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ roomIDToJoinedMembers: map[string][]string{ newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2}, newlyLeftRoom: {newlyLeftUser, newlyLeftUser2}, "!another:room": {syncingUser}, }, - }, nil) - syncResponse := types.NewResponse() - syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom}) - syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom}) - - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + }, syncingUser, syncResponse, emptyToken) if err != nil { t.Fatalf("Catchup returned an error: %s", err) } @@ -348,12 +342,6 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { newShareUser := "@berta:localhost" newShareUser2 := "@bobby:localhost" roomID := "!join:bar" - consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{ - roomIDToJoinedMembers: map[string][]string{ - roomID: {newShareUser, newShareUser2}, - "!another:room": {syncingUser}, - }, - }, nil) syncResponse := types.NewResponse() roomEvents := []gomatrixserverlib.ClientEvent{ { @@ -408,9 +396,14 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) { lr.Timeline.Events = roomEvents syncResponse.Rooms.Leave[roomID] = lr - _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken) + _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{ + roomIDToJoinedMembers: map[string][]string{ + roomID: {newShareUser, newShareUser2}, + "!another:room": {syncingUser}, + }, + }, syncingUser, syncResponse, emptyToken) if err != nil { - t.Fatalf("Catchup returned an error: %s", err) + t.Fatalf("DeviceListCatchup returned an error: %s", err) } assertCatchup(t, hasNew, syncResponse, wantCatchup{ hasNew: true, diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go index 325e7535..df23a2f4 100644 --- a/syncapi/sync/notifier.go +++ b/syncapi/sync/notifier.go @@ -132,6 +132,16 @@ func (n *Notifier) OnNewSendToDevice( n.wakeupUserDevice(userID, deviceIDs, latestPos) } +func (n *Notifier) OnNewKeyChange( + posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string, +) { + n.streamLock.Lock() + defer n.streamLock.Unlock() + latestPos := n.currPos.WithUpdates(posUpdate) + n.currPos = latestPos + n.wakeupUsers([]string{wakeUserID}, latestPos) +} + // GetListener returns a UserStreamListener that can be used to wait for // updates for a user. Must be closed. // notify for anything before sincePos diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index bf6a9e01..754d6983 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -22,6 +22,9 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + keyapi "github.com/matrix-org/dendrite/keyserver/api" + "github.com/matrix-org/dendrite/syncapi/internal" "github.com/matrix-org/dendrite/syncapi/storage" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" @@ -35,11 +38,16 @@ type RequestPool struct { db storage.Database userAPI userapi.UserInternalAPI notifier *Notifier + keyAPI keyapi.KeyInternalAPI + stateAPI currentstateAPI.CurrentStateInternalAPI } // NewRequestPool makes a new RequestPool -func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool { - return &RequestPool{db, userAPI, n} +func NewRequestPool( + db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI, + stateAPI currentstateAPI.CurrentStateInternalAPI, +) *RequestPool { + return &RequestPool{db, userAPI, n, keyAPI, stateAPI} } // OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be @@ -164,6 +172,10 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea if err != nil { return } + res, err = rp.appendDeviceLists(res, req.device.UserID, since) + if err != nil { + return + } // Before we return the sync response, make sure that we take action on // any send-to-device database updates or deletions that we need to do. @@ -192,6 +204,22 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea return } +func (rp *RequestPool) appendDeviceLists( + data *types.Response, userID string, since types.StreamingToken, +) (*types.Response, error) { + // TODO: Currently this code will race which may result in duplicates but not missing data. + // This happens because, whilst we are told the range to fetch here (since / latest) the + // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then + // returns the latest position with which the response has authority on). We'd need to tweak + // the API to expose a "to" value to fix this. + _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since) + if err != nil { + return nil, err + } + + return data, nil +} + // nolint:gocyclo func (rp *RequestPool) appendAccountData( data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition, diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go index caf91e27..754cd502 100644 --- a/syncapi/syncapi.go +++ b/syncapi/syncapi.go @@ -21,7 +21,9 @@ import ( "github.com/gorilla/mux" "github.com/sirupsen/logrus" + currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api" "github.com/matrix-org/dendrite/internal/config" + keyapi "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/roomserver/api" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -39,6 +41,8 @@ func AddPublicRoutes( consumer sarama.Consumer, userAPI userapi.UserInternalAPI, rsAPI api.RoomserverInternalAPI, + keyAPI keyapi.KeyInternalAPI, + currentStateAPI currentstateapi.CurrentStateInternalAPI, federation *gomatrixserverlib.FederationClient, cfg *config.Dendrite, ) { @@ -58,7 +62,7 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start notifier") } - requestPool := sync.NewRequestPool(syncDB, notifier, userAPI) + requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI) roomConsumer := consumers.NewOutputRoomEventConsumer( cfg, consumer, notifier, syncDB, rsAPI, @@ -88,5 +92,13 @@ func AddPublicRoutes( logrus.WithError(err).Panicf("failed to start send-to-device consumer") } + keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer( + cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent), + consumer, notifier, keyAPI, currentStateAPI, syncDB, + ) + if err = keyChangeConsumer.Start(); err != nil { + logrus.WithError(err).Panicf("failed to start key change consumer") + } + routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 7bba8e52..f20c73bf 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -110,6 +110,10 @@ type StreamingToken struct { logs map[string]*LogPosition } +func (t *StreamingToken) SetLog(name string, lp *LogPosition) { + t.logs[name] = lp +} + func (t *StreamingToken) Log(name string) *LogPosition { l, ok := t.logs[name] if !ok { |