aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cmd/dendrite-sync-api-server/main.go4
-rw-r--r--internal/setup/monolith.go3
-rw-r--r--keyserver/api/api.go2
-rw-r--r--keyserver/internal/internal.go4
-rw-r--r--keyserver/producers/keychange.go9
-rw-r--r--syncapi/consumers/keychange.go191
-rw-r--r--syncapi/internal/keychange.go219
-rw-r--r--syncapi/internal/keychange_test.go (renamed from syncapi/consumers/keychange_test.go)93
-rw-r--r--syncapi/sync/notifier.go10
-rw-r--r--syncapi/sync/requestpool.go32
-rw-r--r--syncapi/syncapi.go14
-rw-r--r--syncapi/types/types.go4
-rw-r--r--sytest-whitelist1
13 files changed, 356 insertions, 230 deletions
diff --git a/cmd/dendrite-sync-api-server/main.go b/cmd/dendrite-sync-api-server/main.go
index d67395fb..0761a1d1 100644
--- a/cmd/dendrite-sync-api-server/main.go
+++ b/cmd/dendrite-sync-api-server/main.go
@@ -29,7 +29,9 @@ func main() {
rsAPI := base.RoomserverHTTPClient()
- syncapi.AddPublicRoutes(base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, federation, cfg)
+ syncapi.AddPublicRoutes(
+ base.PublicAPIMux, base.KafkaConsumer, userAPI, rsAPI, base.KeyServerHTTPClient(), base.CurrentStateAPIClient(),
+ federation, cfg)
base.SetupAndServeHTTP(string(base.Cfg.Bind.SyncAPI), string(base.Cfg.Listen.SyncAPI))
diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go
index 1f6d9a76..f33f97ee 100644
--- a/internal/setup/monolith.go
+++ b/internal/setup/monolith.go
@@ -77,6 +77,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
)
mediaapi.AddPublicRoutes(publicMux, m.Config, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
- publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI, m.FedClient, m.Config,
+ publicMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
+ m.KeyAPI, m.StateAPI, m.FedClient, m.Config,
)
}
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index 406a252d..c9afb09c 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -143,6 +143,8 @@ type QueryKeyChangesRequest struct {
type QueryKeyChangesResponse struct {
// The set of users who have had their keys change.
UserIDs []string
+ // The partition being served - useful if the partition is unknown at request time
+ Partition int32
// The latest offset represented in this response.
Offset int64
// Set if there was a problem handling the request.
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index 240a5640..9a41e44f 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -41,6 +41,9 @@ type KeyInternalAPI struct {
}
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
+ if req.Partition < 0 {
+ req.Partition = a.Producer.DefaultPartition()
+ }
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset)
if err != nil {
res.Error = &api.KeyError{
@@ -48,6 +51,7 @@ func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyC
}
}
res.Offset = latest
+ res.Partition = req.Partition
res.UserIDs = userIDs
}
diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go
index d59dd200..c51d9f55 100644
--- a/keyserver/producers/keychange.go
+++ b/keyserver/producers/keychange.go
@@ -31,6 +31,15 @@ type KeyChange struct {
DB storage.Database
}
+// DefaultPartition returns the default partition this process is sending key changes to.
+// NB: A keyserver MUST send key changes to only 1 partition or else query operations will
+// become inconsistent. Partitions can be sharded (e.g by hash of user ID of key change) but
+// then all keyservers must be queried to calculate the entire set of key changes between
+// two sync tokens.
+func (p *KeyChange) DefaultPartition() int32 {
+ return 0
+}
+
// ProduceKeyChanges creates new change events for each key
func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error {
for _, key := range keys {
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 78aff601..35978be7 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -23,10 +23,11 @@ import (
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
+ syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
+ syncapi "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
@@ -39,6 +40,7 @@ type OutputKeyChangeEventConsumer struct {
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
+ notifier *syncapi.Notifier
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@@ -47,6 +49,7 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
+ n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
currentStateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
@@ -66,6 +69,7 @@ func NewOutputKeyChangeEventConsumer(
currentStateAPI: currentStateAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
+ notifier: n,
}
consumer.ProcessMessage = s.onMessage
@@ -110,59 +114,22 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
return err
}
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
- return nil
-}
-
-// Catchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
-// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
-// be already filled in with join/leave information.
-func (s *OutputKeyChangeEventConsumer) Catchup(
- ctx context.Context, userID string, res *types.Response, tok types.StreamingToken,
-) (newTok *types.StreamingToken, hasNew bool, err error) {
- // Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
- newlyJoinedRooms := joinedRooms(res, userID)
- newlyLeftRooms := leftRooms(res)
- if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
- changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
- if err != nil {
- return nil, false, err
- }
- res.DeviceLists.Changed = changed
- res.DeviceLists.Left = left
- hasNew = len(changed) > 0 || len(left) > 0
- }
-
- // now also track users who we already share rooms with but who have updated their devices between the two tokens
- // TODO: Extract partition/offset from sync token
- var partition int32
- var offset int64
- var queryRes api.QueryKeyChangesResponse
- s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
- Partition: partition,
- Offset: offset,
- }, &queryRes)
- if queryRes.Error != nil {
- // don't fail the catchup because we may have got useful information by tracking membership
- util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
- } else {
- // TODO: Make a new streaming token using the new offset
- userSet := make(map[string]bool)
- for _, userID := range res.DeviceLists.Changed {
- userSet[userID] = true
- }
- for _, userID := range queryRes.UserIDs {
- if !userSet[userID] {
- res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
- }
- }
+ posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
+ syncinternal.DeviceListLogName: &types.LogPosition{
+ Offset: msg.Offset,
+ Partition: msg.Partition,
+ },
+ })
+ for userID := range queryRes.UserIDsToCount {
+ s.notifier.OnNewKeyChange(posUpdate, userID, output.UserID)
}
- return
+ return nil
}
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys:
- changed, _, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), []string{ev.RoomID()}, nil)
+ changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return
@@ -175,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user
- _, left, err := s.trackChangedUsers(context.Background(), *ev.StateKey(), nil, []string{ev.RoomID()})
+ _, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return
@@ -186,129 +153,3 @@ func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.Header
}
}
-
-// nolint:gocyclo
-func (s *OutputKeyChangeEventConsumer) trackChangedUsers(
- ctx context.Context, userID string, newlyJoinedRooms, newlyLeftRooms []string,
-) (changed, left []string, err error) {
- // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
-
- // Leave algorithm:
- // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
- // - Get users in newly left room. - QueryCurrentState
- // - Loop set of users and decrement by 1 for each user in newly left room.
- // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
- var queryRes currentstateAPI.QuerySharedUsersResponse
- err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
- UserID: userID,
- IncludeRoomIDs: newlyLeftRooms,
- }, &queryRes)
- if err != nil {
- return nil, nil, err
- }
- var stateRes currentstateAPI.QueryBulkStateContentResponse
- err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
- RoomIDs: newlyLeftRooms,
- StateTuples: []gomatrixserverlib.StateKeyTuple{
- {
- EventType: gomatrixserverlib.MRoomMember,
- StateKey: "*",
- },
- },
- AllowWildcards: true,
- }, &stateRes)
- if err != nil {
- return nil, nil, err
- }
- for _, state := range stateRes.Rooms {
- for tuple, membership := range state {
- if membership != gomatrixserverlib.Join {
- continue
- }
- queryRes.UserIDsToCount[tuple.StateKey]--
- }
- }
- for userID, count := range queryRes.UserIDsToCount {
- if count <= 0 {
- left = append(left, userID) // left is returned
- }
- }
-
- // Join algorithm:
- // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
- // - Get users in newly joined room - QueryCurrentState
- // - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
- // - If yes: then they already shared a room in common, do nothing.
- // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
- err = s.currentStateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
- UserID: userID,
- ExcludeRoomIDs: newlyJoinedRooms,
- }, &queryRes)
- if err != nil {
- return nil, left, err
- }
- err = s.currentStateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
- RoomIDs: newlyJoinedRooms,
- StateTuples: []gomatrixserverlib.StateKeyTuple{
- {
- EventType: gomatrixserverlib.MRoomMember,
- StateKey: "*",
- },
- },
- AllowWildcards: true,
- }, &stateRes)
- if err != nil {
- return nil, left, err
- }
- for _, state := range stateRes.Rooms {
- for tuple, membership := range state {
- if membership != gomatrixserverlib.Join {
- continue
- }
- // new user who we weren't previously sharing rooms with
- if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
- changed = append(changed, tuple.StateKey) // changed is returned
- }
- }
- }
- return changed, left, nil
-}
-
-func joinedRooms(res *types.Response, userID string) []string {
- var roomIDs []string
- for roomID, join := range res.Rooms.Join {
- // we would expect to see our join event somewhere if we newly joined the room.
- // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
- newlyJoined := membershipEventPresent(join.State.Events, userID)
- if newlyJoined {
- roomIDs = append(roomIDs, roomID)
- continue
- }
- newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
- if newlyJoined {
- roomIDs = append(roomIDs, roomID)
- }
- }
- return roomIDs
-}
-
-func leftRooms(res *types.Response) []string {
- roomIDs := make([]string, len(res.Rooms.Leave))
- i := 0
- for roomID := range res.Rooms.Leave {
- roomIDs[i] = roomID
- i++
- }
- return roomIDs
-}
-
-func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
- for _, ev := range events {
- // it's enough to know that we have our member event here, don't need to check membership content
- // as it's implied by being in the respective section of the sync response.
- if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
- return true
- }
- }
- return false
-}
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
new file mode 100644
index 00000000..b594cc62
--- /dev/null
+++ b/syncapi/internal/keychange.go
@@ -0,0 +1,219 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+
+ "github.com/Shopify/sarama"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
+ "github.com/matrix-org/dendrite/keyserver/api"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+const DeviceListLogName = "dl"
+
+// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
+// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
+// be already filled in with join/leave information.
+func DeviceListCatchup(
+ ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
+ userID string, res *types.Response, tok types.StreamingToken,
+) (newTok *types.StreamingToken, hasNew bool, err error) {
+ // Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
+ newlyJoinedRooms := joinedRooms(res, userID)
+ newlyLeftRooms := leftRooms(res)
+ if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
+ changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
+ if err != nil {
+ return nil, false, err
+ }
+ res.DeviceLists.Changed = changed
+ res.DeviceLists.Left = left
+ hasNew = len(changed) > 0 || len(left) > 0
+ }
+
+ // now also track users who we already share rooms with but who have updated their devices between the two tokens
+
+ var partition int32
+ var offset int64
+ // Extract partition/offset from sync token
+ // TODO: In a world where keyserver is sharded there will be multiple partitions and hence multiple QueryKeyChanges to make.
+ logOffset := tok.Log(DeviceListLogName)
+ if logOffset != nil {
+ partition = logOffset.Partition
+ offset = logOffset.Offset
+ } else {
+ partition = -1
+ offset = sarama.OffsetOldest
+ }
+ var queryRes api.QueryKeyChangesResponse
+ keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
+ Partition: partition,
+ Offset: offset,
+ }, &queryRes)
+ if queryRes.Error != nil {
+ // don't fail the catchup because we may have got useful information by tracking membership
+ util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
+ return
+ }
+ userSet := make(map[string]bool)
+ for _, userID := range res.DeviceLists.Changed {
+ userSet[userID] = true
+ }
+ for _, userID := range queryRes.UserIDs {
+ if !userSet[userID] {
+ res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
+ hasNew = true
+ }
+ }
+ // Make a new streaming token using the new offset
+ tok.SetLog(DeviceListLogName, &types.LogPosition{
+ Offset: queryRes.Offset,
+ Partition: queryRes.Partition,
+ })
+ newTok = &tok
+ return
+}
+
+// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
+// nolint:gocyclo
+func TrackChangedUsers(
+ ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
+) (changed, left []string, err error) {
+ // process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
+
+ // Leave algorithm:
+ // - Get set of users and number of times they appear in rooms prior to leave. - QuerySharedUsersRequest with 'IncludeRoomID'.
+ // - Get users in newly left room. - QueryCurrentState
+ // - Loop set of users and decrement by 1 for each user in newly left room.
+ // - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
+ var queryRes currentstateAPI.QuerySharedUsersResponse
+ err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ UserID: userID,
+ IncludeRoomIDs: newlyLeftRooms,
+ }, &queryRes)
+ if err != nil {
+ return nil, nil, err
+ }
+ var stateRes currentstateAPI.QueryBulkStateContentResponse
+ err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ RoomIDs: newlyLeftRooms,
+ StateTuples: []gomatrixserverlib.StateKeyTuple{
+ {
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: "*",
+ },
+ },
+ AllowWildcards: true,
+ }, &stateRes)
+ if err != nil {
+ return nil, nil, err
+ }
+ for _, state := range stateRes.Rooms {
+ for tuple, membership := range state {
+ if membership != gomatrixserverlib.Join {
+ continue
+ }
+ queryRes.UserIDsToCount[tuple.StateKey]--
+ }
+ }
+ for userID, count := range queryRes.UserIDsToCount {
+ if count <= 0 {
+ left = append(left, userID) // left is returned
+ }
+ }
+
+ // Join algorithm:
+ // - Get the set of all joined users prior to joining room - QuerySharedUsersRequest with 'ExcludeRoomID'.
+ // - Get users in newly joined room - QueryCurrentState
+ // - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
+ // - If yes: then they already shared a room in common, do nothing.
+ // - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
+ err = stateAPI.QuerySharedUsers(ctx, &currentstateAPI.QuerySharedUsersRequest{
+ UserID: userID,
+ ExcludeRoomIDs: newlyJoinedRooms,
+ }, &queryRes)
+ if err != nil {
+ return nil, left, err
+ }
+ err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
+ RoomIDs: newlyJoinedRooms,
+ StateTuples: []gomatrixserverlib.StateKeyTuple{
+ {
+ EventType: gomatrixserverlib.MRoomMember,
+ StateKey: "*",
+ },
+ },
+ AllowWildcards: true,
+ }, &stateRes)
+ if err != nil {
+ return nil, left, err
+ }
+ for _, state := range stateRes.Rooms {
+ for tuple, membership := range state {
+ if membership != gomatrixserverlib.Join {
+ continue
+ }
+ // new user who we weren't previously sharing rooms with
+ if _, ok := queryRes.UserIDsToCount[tuple.StateKey]; !ok {
+ changed = append(changed, tuple.StateKey) // changed is returned
+ }
+ }
+ }
+ return changed, left, nil
+}
+
+func joinedRooms(res *types.Response, userID string) []string {
+ var roomIDs []string
+ for roomID, join := range res.Rooms.Join {
+ // we would expect to see our join event somewhere if we newly joined the room.
+ // Normal events get put in the join section so it's not enough to know the room ID is present in 'join'.
+ newlyJoined := membershipEventPresent(join.State.Events, userID)
+ if newlyJoined {
+ roomIDs = append(roomIDs, roomID)
+ continue
+ }
+ newlyJoined = membershipEventPresent(join.Timeline.Events, userID)
+ if newlyJoined {
+ roomIDs = append(roomIDs, roomID)
+ }
+ }
+ return roomIDs
+}
+
+func leftRooms(res *types.Response) []string {
+ roomIDs := make([]string, len(res.Rooms.Leave))
+ i := 0
+ for roomID := range res.Rooms.Leave {
+ roomIDs[i] = roomID
+ i++
+ }
+ return roomIDs
+}
+
+func membershipEventPresent(events []gomatrixserverlib.ClientEvent, userID string) bool {
+ for _, ev := range events {
+ // it's enough to know that we have our member event here, don't need to check membership content
+ // as it's implied by being in the respective section of the sync response.
+ if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil && *ev.StateKey == userID {
+ return true
+ }
+ }
+ return false
+}
diff --git a/syncapi/consumers/keychange_test.go b/syncapi/internal/keychange_test.go
index 3ecb3f58..d0d27e44 100644
--- a/syncapi/consumers/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -1,4 +1,4 @@
-package consumers
+package internal
import (
"context"
@@ -159,18 +159,17 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs
func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
newShareUser := "@bill:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true,
@@ -182,18 +181,17 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
removeUser := "@bill:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser},
"!another:room": {syncingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true,
@@ -205,16 +203,15 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
existingUser := "@bob:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -227,18 +224,17 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
existingUser := "@bob:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: false,
@@ -249,11 +245,6 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
existingUser := "@bob1:localhost"
roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
- roomIDToJoinedMembers: map[string][]string{
- roomID: {syncingUser, existingUser},
- },
- }, nil)
syncResponse := types.NewResponse()
empty := ""
roomStateEvents := []gomatrixserverlib.ClientEvent{
@@ -295,9 +286,13 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ roomID: {syncingUser, existingUser},
+ },
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: false,
@@ -312,18 +307,17 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
newlyLeftUser2 := "@debra:localhost"
newlyJoinedRoom := "!join:bar"
newlyLeftRoom := "!left:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
+ syncResponse := types.NewResponse()
+ syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
+ syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
+
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
"!another:room": {syncingUser},
},
- }, nil)
- syncResponse := types.NewResponse()
- syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
-
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -348,12 +342,6 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
newShareUser := "@berta:localhost"
newShareUser2 := "@bobby:localhost"
roomID := "!join:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
- roomIDToJoinedMembers: map[string][]string{
- roomID: {newShareUser, newShareUser2},
- "!another:room": {syncingUser},
- },
- }, nil)
syncResponse := types.NewResponse()
roomEvents := []gomatrixserverlib.ClientEvent{
{
@@ -408,9 +396,14 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr
- _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, emptyToken)
+ _, hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
+ roomIDToJoinedMembers: map[string][]string{
+ roomID: {newShareUser, newShareUser2},
+ "!another:room": {syncingUser},
+ },
+ }, syncingUser, syncResponse, emptyToken)
if err != nil {
- t.Fatalf("Catchup returned an error: %s", err)
+ t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
assertCatchup(t, hasNew, syncResponse, wantCatchup{
hasNew: true,
diff --git a/syncapi/sync/notifier.go b/syncapi/sync/notifier.go
index 325e7535..df23a2f4 100644
--- a/syncapi/sync/notifier.go
+++ b/syncapi/sync/notifier.go
@@ -132,6 +132,16 @@ func (n *Notifier) OnNewSendToDevice(
n.wakeupUserDevice(userID, deviceIDs, latestPos)
}
+func (n *Notifier) OnNewKeyChange(
+ posUpdate types.StreamingToken, wakeUserID, keyChangeUserID string,
+) {
+ n.streamLock.Lock()
+ defer n.streamLock.Unlock()
+ latestPos := n.currPos.WithUpdates(posUpdate)
+ n.currPos = latestPos
+ n.wakeupUsers([]string{wakeUserID}, latestPos)
+}
+
// GetListener returns a UserStreamListener that can be used to wait for
// updates for a user. Must be closed.
// notify for anything before sincePos
diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go
index bf6a9e01..754d6983 100644
--- a/syncapi/sync/requestpool.go
+++ b/syncapi/sync/requestpool.go
@@ -22,6 +22,9 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
+ currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/syncapi/internal"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -35,11 +38,16 @@ type RequestPool struct {
db storage.Database
userAPI userapi.UserInternalAPI
notifier *Notifier
+ keyAPI keyapi.KeyInternalAPI
+ stateAPI currentstateAPI.CurrentStateInternalAPI
}
// NewRequestPool makes a new RequestPool
-func NewRequestPool(db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI) *RequestPool {
- return &RequestPool{db, userAPI, n}
+func NewRequestPool(
+ db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
+ stateAPI currentstateAPI.CurrentStateInternalAPI,
+) *RequestPool {
+ return &RequestPool{db, userAPI, n, keyAPI, stateAPI}
}
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@@ -164,6 +172,10 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
if err != nil {
return
}
+ res, err = rp.appendDeviceLists(res, req.device.UserID, since)
+ if err != nil {
+ return
+ }
// Before we return the sync response, make sure that we take action on
// any send-to-device database updates or deletions that we need to do.
@@ -192,6 +204,22 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
return
}
+func (rp *RequestPool) appendDeviceLists(
+ data *types.Response, userID string, since types.StreamingToken,
+) (*types.Response, error) {
+ // TODO: Currently this code will race which may result in duplicates but not missing data.
+ // This happens because, whilst we are told the range to fetch here (since / latest) the
+ // QueryKeyChanges API only exposes a "from" value (on purpose to avoid racing, which then
+ // returns the latest position with which the response has authority on). We'd need to tweak
+ // the API to expose a "to" value to fix this.
+ _, _, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since)
+ if err != nil {
+ return nil, err
+ }
+
+ return data, nil
+}
+
// nolint:gocyclo
func (rp *RequestPool) appendAccountData(
data *types.Response, userID string, req syncRequest, currentPos types.StreamPosition,
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index caf91e27..754cd502 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -21,7 +21,9 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
+ currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -39,6 +41,8 @@ func AddPublicRoutes(
consumer sarama.Consumer,
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
+ keyAPI keyapi.KeyInternalAPI,
+ currentStateAPI currentstateapi.CurrentStateInternalAPI,
federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite,
) {
@@ -58,7 +62,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start notifier")
}
- requestPool := sync.NewRequestPool(syncDB, notifier, userAPI)
+ requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg, consumer, notifier, syncDB, rsAPI,
@@ -88,5 +92,13 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
+ keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
+ cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
+ consumer, notifier, keyAPI, currentStateAPI, syncDB,
+ )
+ if err = keyChangeConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start key change consumer")
+ }
+
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 7bba8e52..f20c73bf 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -110,6 +110,10 @@ type StreamingToken struct {
logs map[string]*LogPosition
}
+func (t *StreamingToken) SetLog(name string, lp *LogPosition) {
+ t.logs[name] = lp
+}
+
func (t *StreamingToken) Log(name string) *LogPosition {
l, ok := t.logs[name]
if !ok {
diff --git a/sytest-whitelist b/sytest-whitelist
index 388f95e0..26922df4 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -127,6 +127,7 @@ Can query specific device keys using POST
query for user with no keys returns empty key dict
Can claim one time key using POST
Can claim remote one time key using POST
+Local device key changes appear in v2 /sync
Can add account data
Can add account data to room
Can get account data without syncing