aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--keyserver/api/api.go17
-rw-r--r--keyserver/internal/internal.go11
-rw-r--r--keyserver/inthttp/client.go18
-rw-r--r--keyserver/inthttp/server.go11
-rw-r--r--syncapi/consumers/keychange.go42
-rw-r--r--syncapi/consumers/keychange_test.go42
6 files changed, 119 insertions, 22 deletions
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index d42fb60c..406a252d 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -26,6 +26,7 @@ type KeyInternalAPI interface {
// PerformClaimKeys claims one-time keys for use in pre-key messages
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
+ QueryKeyChanges(ctx context.Context, req *QueryKeyChangesRequest, res *QueryKeyChangesResponse)
}
// KeyError is returned if there was a problem performing/querying the server
@@ -131,3 +132,19 @@ type QueryKeysResponse struct {
// Set if there was a fatal error processing this query
Error *KeyError
}
+
+type QueryKeyChangesRequest struct {
+ // The partition which had key events sent to
+ Partition int32
+ // The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning
+ Offset int64
+}
+
+type QueryKeyChangesResponse struct {
+ // The set of users who have had their keys change.
+ UserIDs []string
+ // The latest offset represented in this response.
+ Offset int64
+ // Set if there was a problem handling the request.
+ Error *KeyError
+}
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index d3a6d4ba..240a5640 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -40,6 +40,17 @@ type KeyInternalAPI struct {
Producer *producers.KeyChange
}
+func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
+ userIDs, latest, err := a.DB.KeyChanges(ctx, req.Partition, req.Offset)
+ if err != nil {
+ res.Error = &api.KeyError{
+ Err: err.Error(),
+ }
+ }
+ res.Offset = latest
+ res.UserIDs = userIDs
+}
+
func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
res.KeyErrors = make(map[string]map[string]*api.KeyError)
a.uploadDeviceKeys(ctx, req, res)
diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go
index 4c0f1e53..cd9cf70d 100644
--- a/keyserver/inthttp/client.go
+++ b/keyserver/inthttp/client.go
@@ -29,6 +29,7 @@ const (
PerformUploadKeysPath = "/keyserver/performUploadKeys"
PerformClaimKeysPath = "/keyserver/performClaimKeys"
QueryKeysPath = "/keyserver/queryKeys"
+ QueryKeyChangesPath = "/keyserver/queryKeyChanges"
)
// NewKeyServerClient creates a KeyInternalAPI implemented by talking to a HTTP POST API.
@@ -101,3 +102,20 @@ func (h *httpKeyInternalAPI) QueryKeys(
}
}
}
+
+func (h *httpKeyInternalAPI) QueryKeyChanges(
+ ctx context.Context,
+ request *api.QueryKeyChangesRequest,
+ response *api.QueryKeyChangesResponse,
+) {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeyChanges")
+ defer span.Finish()
+
+ apiURL := h.apiURL + QueryKeyChangesPath
+ err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+ if err != nil {
+ response.Error = &api.KeyError{
+ Err: err.Error(),
+ }
+ }
+}
diff --git a/keyserver/inthttp/server.go b/keyserver/inthttp/server.go
index ec78b613..f3d2882c 100644
--- a/keyserver/inthttp/server.go
+++ b/keyserver/inthttp/server.go
@@ -58,4 +58,15 @@ func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(QueryKeyChangesPath,
+ httputil.MakeInternalAPI("queryKeyChanges", func(req *http.Request) util.JSONResponse {
+ request := api.QueryKeyChangesRequest{}
+ response := api.QueryKeyChangesResponse{}
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ s.QueryKeyChanges(req.Context(), &request, &response)
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
}
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 4a1c7309..78aff601 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -26,16 +26,17 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
log "github.com/sirupsen/logrus"
)
// OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct {
- keyChangeConsumer *internal.ContinualConsumer
- db storage.Database
- serverName gomatrixserverlib.ServerName // our server name
- currentStateAPI currentstateAPI.CurrentStateInternalAPI
- // keyAPI api.KeyInternalAPI
+ keyChangeConsumer *internal.ContinualConsumer
+ db storage.Database
+ serverName gomatrixserverlib.ServerName // our server name
+ currentStateAPI currentstateAPI.CurrentStateInternalAPI
+ keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
}
@@ -46,6 +47,7 @@ func NewOutputKeyChangeEventConsumer(
serverName gomatrixserverlib.ServerName,
topic string,
kafkaConsumer sarama.Consumer,
+ keyAPI api.KeyInternalAPI,
currentStateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
) *OutputKeyChangeEventConsumer {
@@ -60,6 +62,7 @@ func NewOutputKeyChangeEventConsumer(
keyChangeConsumer: &consumer,
db: store,
serverName: serverName,
+ keyAPI: keyAPI,
currentStateAPI: currentStateAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
@@ -115,21 +118,44 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
// be already filled in with join/leave information.
func (s *OutputKeyChangeEventConsumer) Catchup(
ctx context.Context, userID string, res *types.Response, tok types.StreamingToken,
-) (hasNew bool, err error) {
+) (newTok *types.StreamingToken, hasNew bool, err error) {
// Track users who we didn't track before but now do by virtue of sharing a room with them, or not.
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := s.trackChangedUsers(ctx, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
- return false, err
+ return nil, false, err
}
res.DeviceLists.Changed = changed
res.DeviceLists.Left = left
hasNew = len(changed) > 0 || len(left) > 0
}
- // TODO: now also track users who we already share rooms with but who have updated their devices between the two tokens
+ // now also track users who we already share rooms with but who have updated their devices between the two tokens
+ // TODO: Extract partition/offset from sync token
+ var partition int32
+ var offset int64
+ var queryRes api.QueryKeyChangesResponse
+ s.keyAPI.QueryKeyChanges(ctx, &api.QueryKeyChangesRequest{
+ Partition: partition,
+ Offset: offset,
+ }, &queryRes)
+ if queryRes.Error != nil {
+ // don't fail the catchup because we may have got useful information by tracking membership
+ util.GetLogger(ctx).WithError(queryRes.Error).Error("QueryKeyChanges failed")
+ } else {
+ // TODO: Make a new streaming token using the new offset
+ userSet := make(map[string]bool)
+ for _, userID := range res.DeviceLists.Changed {
+ userSet[userID] = true
+ }
+ for _, userID := range queryRes.UserIDs {
+ if !userSet[userID] {
+ res.DeviceLists.Changed = append(res.DeviceLists.Changed, userID)
+ }
+ }
+ }
return
}
diff --git a/syncapi/consumers/keychange_test.go b/syncapi/consumers/keychange_test.go
index 7322e208..f8e96570 100644
--- a/syncapi/consumers/keychange_test.go
+++ b/syncapi/consumers/keychange_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"github.com/matrix-org/dendrite/currentstateserver/api"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -15,6 +16,19 @@ var (
syncingUser = "@alice:localhost"
)
+type mockKeyAPI struct{}
+
+func (k *mockKeyAPI) PerformUploadKeys(ctx context.Context, req *keyapi.PerformUploadKeysRequest, res *keyapi.PerformUploadKeysResponse) {
+}
+
+// PerformClaimKeys claims one-time keys for use in pre-key messages
+func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) {
+}
+func (k *mockKeyAPI) QueryKeys(ctx context.Context, req *keyapi.QueryKeysRequest, res *keyapi.QueryKeysResponse) {
+}
+func (k *mockKeyAPI) QueryKeyChanges(ctx context.Context, req *keyapi.QueryKeyChangesRequest, res *keyapi.QueryKeyChangesResponse) {
+}
+
type mockCurrentStateAPI struct {
roomIDToJoinedMembers map[string][]string
}
@@ -144,7 +158,7 @@ func leaveResponseWithRooms(syncResponse *types.Response, userID string, roomIDs
func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
newShareUser := "@bill:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNewUser:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser},
"!another:room": {syncingUser},
@@ -153,7 +167,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -167,7 +181,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
removeUser := "@bill:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareLeftUser:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {removeUser},
"!another:room": {syncingUser},
@@ -176,7 +190,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -190,7 +204,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
existingUser := "@bob:localhost"
newlyJoinedRoom := "!TestKeyChangeCatchupOnJoinShareNoNewUsers:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, existingUser},
"!another:room": {syncingUser, existingUser},
@@ -199,7 +213,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
- hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -212,7 +226,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
existingUser := "@bob:localhost"
newlyLeftRoom := "!TestKeyChangeCatchupOnLeaveShareNoUsers:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyLeftRoom: {existingUser},
"!another:room": {syncingUser, existingUser},
@@ -221,7 +235,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
syncResponse := types.NewResponse()
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -234,7 +248,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
existingUser := "@bob1:localhost"
roomID := "!TestKeyChangeCatchupNoNewJoinsButMessages:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {syncingUser, existingUser},
},
@@ -280,7 +294,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
jr.Timeline.Events = roomTimelineEvents
syncResponse.Rooms.Join[roomID] = jr
- hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -297,7 +311,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
newlyLeftUser2 := "@debra:localhost"
newlyJoinedRoom := "!join:bar"
newlyLeftRoom := "!left:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
@@ -308,7 +322,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
- hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@@ -333,7 +347,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
newShareUser := "@berta:localhost"
newShareUser2 := "@bobby:localhost"
roomID := "!join:bar"
- consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockCurrentStateAPI{
+ consumer := NewOutputKeyChangeEventConsumer(gomatrixserverlib.ServerName("localhost"), "some_topic", nil, &mockKeyAPI{}, &mockCurrentStateAPI{
roomIDToJoinedMembers: map[string][]string{
roomID: {newShareUser, newShareUser2},
"!another:room": {syncingUser},
@@ -393,7 +407,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
lr.Timeline.Events = roomEvents
syncResponse.Rooms.Leave[roomID] = lr
- hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
+ _, hasNew, err := consumer.Catchup(context.Background(), syncingUser, syncResponse, types.NewStreamToken(0, 0))
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}