aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-30 11:15:46 +0100
committerGitHub <noreply@github.com>2020-07-30 11:15:46 +0100
commit9355fb5ac8c911bdbde6dcc0f279f716d8a8f60b (patch)
treec6a155f7b65e25c8ab4abaf7413fbb0531a02b12 /syncapi
parent0fdd4f14d123e76bd3d0368947d3aab84a787946 (diff)
Hook up device list updates to the sync notifier (#1231)
* WIP hooking up key changes * Fix import cycle, get tests passing and binary compiling * Linting and update whitelist
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/keychange.go191
-rw-r--r--syncapi/internal/keychange.go219
-rw-r--r--syncapi/internal/keychange_test.go (renamed from syncapi/consumers/keychange_test.go)93
-rw-r--r--syncapi/sync/notifier.go10
-rw-r--r--syncapi/sync/requestpool.go32
-rw-r--r--syncapi/syncapi.go14
-rw-r--r--syncapi/types/types.go4
7 files changed, 335 insertions, 228 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 78aff601..35978be7 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -23,10 +23,11 @@ import (
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
+ syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
+ syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
@@ -39,6 +40,7 @@ type OutputKeyChangeEventConsumer struct {
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
+ notifier *syncapi.Notifier
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@@ -47,6 +49,7 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
+ n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
currentStateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
@@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer(
currentStateAPI: currentStateAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
+ notifier: n,
}
consumer.ProcessMessage = s.onMessage
@@ -110,59 +114,22 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
- return nil
-}
-
-// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
-// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
-// be already filled in with join/leave information.
-func (s *OutputKeyChangeEventConsumer) Catchup(
- ctx context.Context, userID string, res *types.Response, tok types.StreamingToken,
-) (newTok *types.StreamingToken, hasNew bool, err error) {
- // Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
- newlyJoinedRooms := joinedRooms(res, userID)
- newlyLeftRooms := leftRooms(res)
- if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
- changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
- if err != nil {
- return nil, false, err
- }
- res.DeviceLists.Changed = changed
- res.DeviceLists.Left = left
- hasNew = len(changed) > 0 || len(left) > 0
- }
-
- // now also track users who we already share rooms with but who have updated their devices between the two tokens
- // TODO: Extract partition/offset from sync token
- var partition int32
- var offset int64
- var queryRes api.QueryKeyChangesResponse
- s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
- Partition: partition,
- Offset: offset,
- }, &queryRes)
- if queryRes.Error != nil {
- // don't fail the catchup because we may have got useful information by tracking membership
- util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
- } else {
- // TODO: Make a new streaming token using the new offset
- userSet := make(map[string]bool)
- for _, userID := range res.DeviceLists.Changed {
- userSet[userID] = true
- }
- for _, userID := range queryRes.UserIDs {
- if !userSet[userID] {
- res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
- }
- }
+ posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
+ syncinternal.DeviceListLogName: &types.LogPosition{
+ Offset: msg.Offset,
+ Partition: msg.Partition,
+ },
+ })
+ for userID := range queryRes.UserIDsToCount {
+ s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
}
- return
+ return nil
}
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys:
- changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil)
+ changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return
@@ -175,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user
- _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()})
+ _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return
@@ -186,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header
}
}
-
-// nolint:gocyclo
-func (s *OutputKeyChangeEventConsumer) trackChangedUsers(
- ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string,
-) (changed, left []string, err error) {
- // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
-
- // Leave algorithm:
- // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
- // - Get users in newly left room. - QueryCurrentState
- // - Loop set of users and decrement by 1 for each user in newly left room.
- // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
- var queryRes currentstateAPI.QuerySharedUsersResponse
- err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
- UserID: userID,
- IncludeRoomIDs: newlyLeftRooms,
- }, &queryRes)
- if err != nil {
- return nil, nil, err
- }
- var stateRes currentstateAPI.QueryBulkStateContentResponse
- err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
- RoomIDs: newlyLeftRooms,
- StateTuples: []gomatrixserverlib.StateKeyTuple{
- {
- EventType: gomatrixserverlib.MRoomMember,
- StateKey: "*",
- },
- },
- AllowWildcards: true,
- }, &stateRes)
- if err != nil {
- return nil, nil, err
- }
- for _, state := range stateRes.Rooms {
- for tuple, membership := range state {
- if membership != gomatrixserverlib.Join {
- continue
- }
- queryRes.UserIDsToCount[tuple.StateKey]--
- }
- }
- for userID, count := range queryRes.UserIDsToCount {
- if count <= 0 {
- left = append(left, userID) // left is returned
- }
- }
-
- // Join algorithm:
- // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
- // - Get users in newly joined room - QueryCurrentState
- // - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
- // - If yes: then they already shared a room in common, do nothing.
- // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
- err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
- UserID: userID,
- ExcludeRoomIDs: newlyJoinedRooms,
- }, &queryRes)
- if err != nil {
- return nil, left, err
- }
- err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
- RoomIDs: newlyJoinedRooms,
- StateTuples: []gomatrixserverlib.StateKeyTuple{
- {
- EventType: gomatrixserverlib.MRoomMember,
- StateKey: "*",
- },
- },
- AllowWildcards: true,
- }, &stateRes)
- if err != nil {
- return nil, left, err
- }
- for _, state := range stateRes.Rooms {
- for tuple, membership := range state {
- if membership != gomatrixserverlib.Join {
- continue
- }
- // new user who we weren't previously sharing rooms with
- if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
- changed = append(changed, tuple.StateKey) // changed is returned
- }
- }
- }
- return changed, left, nil
-}
-
-func joinedRooms(res *types.Response, userID string) []string {
- var roomIDs []string
- for roomID, join := range res.Rooms.Join {
- // we would expect to see our join event somewhere if we newly joined the room.
- // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
- newlyJoined := membershipEventPresent(join.State.Events, userID)
- if newlyJoined {
- roomIDs = append(roomIDs, roomID)
- continue
- }
- newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
- if newlyJoined {
- roomIDs = append(roomIDs, roomID)
- }
- }
- return roomIDs
-}
-
-func leftRooms(res *types.Response) []string {
- roomIDs := make([]string, len(res.Rooms.Leave))
- i := 0
- for roomID := range res.Rooms.Leave {
- roomIDs[i] = roomID
- i++
- }
- return roomIDs
-}
-
-func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
- for _, ev := range events {
- // it's enough to know that we have our member event here, don't need to check membership content
- // as it's implied by being in the respective section of the sync response.
- if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
- return true
- }
- }
- return false
-}
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
new file mode 100644
index 00000000..b594cc62
--- /dev/null
+++ b/syncapi/internal/keychange.go
@@ -0,0 +1,219 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+
+ "github.com/Shopify/sarama"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
+ "github.com/matrix-org/dendrite/keyserver/api"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+const DeviceListLogName = "dl"
+
+// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
+// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
+// be already filled in with join/leave information.
+func DeviceListCatchup(
+ ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
+ userID string, res *types.Response, tok types.StreamingToken,
+) (newTok *types.StreamingToken, hasNew bool, err error) {
+ // Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
+ newlyJoinedRooms := joinedRooms(res, userID)
+ newlyLeftRooms := leftRooms(res)
+ if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
+ changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
+ if err != nil {
+ return nil, false, err
+ }
+ res.DeviceLists.Changed = changed
+ res.DeviceLists.Left = left
+ hasNew = len(changed) > 0 || len(left) > 0
+ }
+
+ // now also track users who we already share rooms with but who have updated their devices between the two tokens
+
+ var partition int32
+ var offset int64
+ // Extract partition/offset from sync token
+ // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
+ logOffset := tok.Log(DeviceListLogName)
+ if logOffset != nil {
+ partition = logOffset.Partition
+ offset = logOffset.Offset
+ } else {
+ partition = -1
+ offset = sarama.OffsetOldest
+ }
+ var queryRes api.QueryKeyChangesResponse
+ keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
+ Partition: partition,
+ Offset: offset,
+ }, &queryRes)
+ if queryRes.Error != nil {
+ // don't fail the catchup because we may have got useful information by tracking membership
+ util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
+ return
+ }
+ userSet := make(map[string]bool)
+ for _, userID := range res.DeviceLists.Changed {
+ userSet[userID] = true
+ }
+ for _, userID := range queryRes.UserIDs {
+ if !userSet[userID] {
+ res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
+ hasNew = true
+ }
+ }
+ // Make a new streaming token using the new offset
+ tok.SetLog(DeviceListLogName, &types.LogPosition{
+ Offset: queryRes.Offset,
+ Partition: queryRes.Partition,
+ })
+ newTok = &tok
+ return
+}
+
+// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
+// nolint:gocyclo
+func TrackChangedUsers(
+ ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
+) (changed, left []string, err error) {
+ // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
+
+ // Leave algorithm:
+ // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
+ // - Get users in newly left room. - QueryCurrentState
+ // - Loop set of users and decrement by 1 for each user in newly left room.
+ // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
+ var queryRes currentstateAPI.QuerySharedUsersResponse
+ err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ UserID: userID,
+ IncludeRoomIDs: newlyLeftRooms,
+ }, &queryRes)
+ if err != nil {
+ return nil, nil, err
+ }
+ var stateRes currentstateAPI.QueryBulkStateContentResponse
+ err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ RoomIDs: newlyLeftRooms,
+ StateTuples: []gomatrixserverlib.StateKeyTuple{
+ {
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: "*",
+ },
+ },
+ AllowWildcards: true,
+ }, &stateRes)
+ if err != nil {
+ return nil, nil, err
+ }
+ for _, state := range stateRes.Rooms {
+ for tuple, membership := range state {
+ if membership != gomatrixserverlib.Join {
+ continue
+ }
+ queryRes.UserIDsToCount[tuple.StateKey]--
+ }
+ }
+ for userID, count := range queryRes.UserIDsToCount {
+ if count <= 0 {
+ left = append(left, userID) // left is returned
+ }
+ }
+
+ // Join algorithm:
+ // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
+ // - Get users in newly joined room - QueryCurrentState
+ // - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
+ // - If yes: then they already shared a room in common, do nothing.
+ // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
+ err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ UserID: userID,
+ ExcludeRoomIDs: newlyJoinedRooms,
+ }, &queryRes)
+ if err != nil {
+ return nil, left, err
+ }
+ err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ RoomIDs: newlyJoinedRooms,
+ StateTuples: []gomatrixserverlib.StateKeyTuple{
+ {
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: "*",
+ },
+ },
+ AllowWildcards: true,
+ }, &stateRes)
+ if err != nil {
+ return nil, left, err
+ }
+ for _, state := range stateRes.Rooms {
+ for tuple, membership := range state {
+ if membership != gomatrixserverlib.Join {
+ continue
+ }
+ // new user who we weren't previously sharing rooms with
+ if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
+ changed = append(changed, tuple.StateKey) // changed is returned
+ }
+ }
+ }
+ return changed, left, nil
+}
+
+func joinedRooms(res *types.Response, userID string) []string {
+ var roomIDs []string
+ for roomID, join := range res.Rooms.Join {
+ // we would expect to see our join event somewhere if we newly joined the room.
+ // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
+ newlyJoined := membershipEventPresent(join.State.Events, userID)
+ if newlyJoined {
+ roomIDs = append(roomIDs, roomID)
+ continue
+ }
+ newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
+ if newlyJoined {
+ roomIDs = append(roomIDs, roomID)
+ }
+ }
+ return roomIDs
+}
+
+func leftRooms(res *types.Response) []string {
+ roomIDs := make([]string, len(res.Rooms.Leave))
+ i := 0
+ for roomID := range res.Rooms.Leave {
+ roomIDs[i] = roomID
+ i++
+ }
+ return roomIDs
+}
+
+func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
+ for _, ev := range events {
+ // it's enough to know that we have our member event here, don't need to check membership content
+ // as it's implied by being in the respective section of the sync response.
+ if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
+ return true
+ }
+ }
+ return false
+}
diff --git a/syncapi/consumers/keychange_test.go b/syncapi/internal/keychange_test.go
index 3ecb3f58..d0d27e44 100644
--- a/syncapi/consumers/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -1,4 +1,4 @@
-package consumers
+package internal
import (
"context"
@@ -159,18 +159,17 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs
func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
newShareUser := "@bill:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true,
@@ -182,18 +181,17 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
removeUser := "@bill:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser},
"!another:room": {syncingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true,
@@ -205,16 +203,15 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
existingUser := "@bob:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -227,18 +224,17 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
existingUser := "@bob:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: false,
@@ -249,11 +245,6 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
existingUser := "@bob1:localhost"
roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
- roomIDToJoinedMembers: map[string][]string{
- roomID: {syncingUser, existingUser},
- },
- }, nil)
syncResponse := types.NewResponse()
empty := ""
roomStateEvents := []gomatrixserverlib.ClientEvent{
@@ -295,9 +286,13 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ roomID: {syncingUser, existingUser},
+ },
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: false,
@@ -312,18 +307,17 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
newlyLeftUser2 := "@debra:localhost"
newlyJoinedRoom := "!join:bar"
newlyLeftRoom := "!left:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
"!another:room": {syncingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -348,12 +342,6 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
newShareUser := "@berta:localhost"
newShareUser2 := "@bobby:localhost"
roomID := "!join:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
- roomIDToJoinedMembers: map[string][]string{
- roomID: {newShareUser, newShareUser2},
- "!another:room": {syncingUser},
- },
- }, nil)
syncResponse := types.NewResponse()
roomEvents := []gomatrixserverlib.ClientEvent{
{
@@ -408,9 +396,14 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ roomID: {newShareUser, newShareUser2},
+ "!another:room": {syncingUser},
+ },
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true,
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index 325e7535..df23a2f4 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -132,6 +132,16 @@ func (n *Notifier) OnNewSendToDevice(
n.wakeupUserDevice(userID, deviceIDs, latestPos)
}
+func (n *Notifier) OnNewKeyChange(
+ posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+ latestPos := n.currPos.WithUpdates(posUpdate)
+ n.currPos = latestPos
+ n.wakeupUsers([]string{wakeUserID}, latestPos)
+}
+
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index bf6a9e01..754d6983 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -22,6 +22,9 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -35,11 +38,16 @@ type RequestPool struct {
db storage.Database
userAPI userapi.UserInternalAPI
notifier *Notifier
+ keyAPI keyapi.KeyInternalAPI
+ stateAPI currentstateAPI.CurrentStateInternalAPI
}
// NewRequestPool makes a new RequestPool
-func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool {
- return &RequestPool{db, userAPI, n}
+func NewRequestPool(
+ db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
+) *RequestPool {
+ return &RequestPool{db, userAPI, n, keyAPI, stateAPI}
}
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@@ -164,6 +172,10 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return
}
+ res, err = rp.appendDeviceLists(res, req.device.UserID, since)
+ if err != nil {
+ return
+ }
// Before we return the sync response, make sure that we take action on
// any send-to-device database updates or deletions that we need to do.
@@ -192,6 +204,22 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
return
}
+func (rp *RequestPool) appendDeviceLists(
+ data *types.Response, userID string, since types.StreamingToken,
+) (*types.Response, error) {
+ // TODO: Currently this code will race which may result in duplicates but not missing data.
+ // This happens because, whilst we are told the range to fetch here (since / latest) the
+ // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
+ // returns the latest position with which the response has authority on). We'd need to tweak
+ // the API to expose a "to" value to fix this.
+ _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since)
+ if err != nil {
+ return nil, err
+ }
+
+ return data, nil
+}
+
// nolint:gocyclo
func (rp *RequestPool) appendAccountData(
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index caf91e27..754cd502 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -21,7 +21,9 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
+ currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -39,6 +41,8 @@ func AddPublicRoutes(
consumer sarama.Consumer,
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
+ keyAPI keyapi.KeyInternalAPI,
+ currentStateAPI currentstateapi.CurrentStateInternalAPI,
federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite,
) {
@@ -58,7 +62,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start notifier")
}
- requestPool := sync.NewRequestPool(syncDB, notifier, userAPI)
+ requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, notifier, syncDB, rsAPI,
@@ -88,5 +92,13 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
+ keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
+ cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
+ consumer, notifier, keyAPI, currentStateAPI, syncDB,
+ )
+ if err = keyChangeConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start key change consumer")
+ }
+
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 7bba8e52..f20c73bf 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -110,6 +110,10 @@ type StreamingToken struct {
logs map[string]*LogPosition
}
+func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
+ t.logs[name] = lp
+}
+
func (t *StreamingToken) Log(name string) *LogPosition {
l, ok := t.logs[name]
if !ok {