aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--build/gobind/monolith.go6
-rw-r--r--clientapi/routing/device.go28
-rw-r--r--clientapi/routing/routing.go4
-rw-r--r--cmd/dendrite-demo-libp2p/main.go6
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go6
-rw-r--r--cmd/dendrite-key-server/main.go3
-rw-r--r--cmd/dendrite-monolith-server/main.go5
-rw-r--r--cmd/dendrite-user-api-server/main.go2
-rw-r--r--cmd/dendritejs/main.go6
-rw-r--r--keyserver/api/api.go4
-rw-r--r--keyserver/internal/internal.go13
-rw-r--r--keyserver/inthttp/client.go5
-rw-r--r--keyserver/keyserver.go4
-rw-r--r--keyserver/producers/keychange.go9
-rw-r--r--keyserver/storage/interface.go3
-rw-r--r--syncapi/consumers/roomserver.go23
-rw-r--r--syncapi/internal/keychange_test.go3
-rw-r--r--syncapi/syncapi.go18
-rw-r--r--sytest-whitelist1
-rw-r--r--userapi/api/api.go10
-rw-r--r--userapi/internal/api.go39
-rw-r--r--userapi/inthttp/client.go13
-rw-r--r--userapi/inthttp/server.go13
-rw-r--r--userapi/userapi.go4
-rw-r--r--userapi/userapi_test.go2
25 files changed, 183 insertions, 47 deletions
diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go
index 5d0d11fd..e2ff79c3 100644
--- a/build/gobind/monolith.go
+++ b/build/gobind/monolith.go
@@ -118,7 +118,9 @@ func (m *DendriteMonolith) Start() {
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
- userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices)
+ keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer)
+ userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices, keyAPI)
+ keyAPI.SetUserAPI(userAPI)
rsAPI := roomserver.NewInternalAPI(
base, keyRing, federation,
@@ -156,7 +158,7 @@ func (m *DendriteMonolith) Start() {
RoomserverAPI: rsAPI,
UserAPI: userAPI,
StateAPI: stateAPI,
- KeyAPI: keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer),
+ KeyAPI: keyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,
),
diff --git a/clientapi/routing/device.go b/clientapi/routing/device.go
index 01310400..11c6c782 100644
--- a/clientapi/routing/device.go
+++ b/clientapi/routing/device.go
@@ -165,7 +165,7 @@ func UpdateDeviceByID(
// DeleteDeviceById handles DELETE requests to /devices/{deviceId}
func DeleteDeviceById(
- req *http.Request, userInteractiveAuth *auth.UserInteractive, deviceDB devices.Database, device *api.Device,
+ req *http.Request, userInteractiveAuth *auth.UserInteractive, userAPI api.UserInternalAPI, device *api.Device,
deviceID string,
) util.JSONResponse {
ctx := req.Context()
@@ -197,8 +197,12 @@ func DeleteDeviceById(
}
}
- if err := deviceDB.RemoveDevice(ctx, deviceID, localpart); err != nil {
- util.GetLogger(ctx).WithError(err).Error("deviceDB.RemoveDevice failed")
+ var res api.PerformDeviceDeletionResponse
+ if err := userAPI.PerformDeviceDeletion(ctx, &api.PerformDeviceDeletionRequest{
+ UserID: device.UserID,
+ DeviceIDs: []string{deviceID},
+ }, &res); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("userAPI.PerformDeviceDeletion failed")
return jsonerror.InternalServerError()
}
@@ -210,26 +214,24 @@ func DeleteDeviceById(
// DeleteDevices handles POST requests to /delete_devices
func DeleteDevices(
- req *http.Request, deviceDB devices.Database, device *api.Device,
+ req *http.Request, userAPI api.UserInternalAPI, device *api.Device,
) util.JSONResponse {
- localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
- if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
- return jsonerror.InternalServerError()
- }
-
ctx := req.Context()
payload := devicesDeleteJSON{}
if err := json.NewDecoder(req.Body).Decode(&payload); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("json.NewDecoder.Decode failed")
+ util.GetLogger(ctx).WithError(err).Error("json.NewDecoder.Decode failed")
return jsonerror.InternalServerError()
}
defer req.Body.Close() // nolint: errcheck
- if err := deviceDB.RemoveDevices(ctx, localpart, payload.Devices); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("deviceDB.RemoveDevices failed")
+ var res api.PerformDeviceDeletionResponse
+ if err := userAPI.PerformDeviceDeletion(ctx, &api.PerformDeviceDeletionRequest{
+ UserID: device.UserID,
+ DeviceIDs: payload.Devices,
+ }, &res); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("userAPI.PerformDeviceDeletion failed")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index ebb141ef..6c40db86 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -654,13 +654,13 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return DeleteDeviceById(req, userInteractiveAuth, deviceDB, device, vars["deviceID"])
+ return DeleteDeviceById(req, userInteractiveAuth, userAPI, device, vars["deviceID"])
}),
).Methods(http.MethodDelete, http.MethodOptions)
r0mux.Handle("/delete_devices",
httputil.MakeAuthAPI("delete_devices", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
- return DeleteDevices(req, deviceDB, device)
+ return DeleteDevices(req, userAPI, device)
}),
).Methods(http.MethodPost, http.MethodOptions)
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go
index ed731018..4e774acc 100644
--- a/cmd/dendrite-demo-libp2p/main.go
+++ b/cmd/dendrite-demo-libp2p/main.go
@@ -141,7 +141,9 @@ func main() {
accountDB := base.Base.CreateAccountsDB()
deviceDB := base.Base.CreateDeviceDB()
federation := createFederationClient(base)
- userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
+ keyAPI := keyserver.NewInternalAPI(base.Base.Cfg, federation, base.Base.KafkaProducer)
+ userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil, keyAPI)
+ keyAPI.SetUserAPI(userAPI)
serverKeyAPI := serverkeyapi.NewInternalAPI(
base.Base.Cfg, federation, base.Base.Caches,
@@ -186,7 +188,7 @@ func main() {
ServerKeyAPI: serverKeyAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
- KeyAPI: keyserver.NewInternalAPI(base.Base.Cfg, federation, userAPI, base.Base.KafkaProducer),
+ KeyAPI: keyAPI,
ExtPublicRoomsProvider: provider,
}
monolith.AddAllPublicRoutes(base.Base.PublicAPIMux)
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index 5fd76102..0655a2a3 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -103,7 +103,9 @@ func main() {
serverKeyAPI := &signing.YggdrasilKeys{}
keyRing := serverKeyAPI.KeyRing()
- userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
+ keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer)
+ userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil, keyAPI)
+ keyAPI.SetUserAPI(userAPI)
rsComponent := roomserver.NewInternalAPI(
base, keyRing, federation,
@@ -142,7 +144,7 @@ func main() {
RoomserverAPI: rsAPI,
UserAPI: userAPI,
StateAPI: stateAPI,
- KeyAPI: keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer),
+ KeyAPI: keyAPI,
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,
diff --git a/cmd/dendrite-key-server/main.go b/cmd/dendrite-key-server/main.go
index d58d475a..94ea819f 100644
--- a/cmd/dendrite-key-server/main.go
+++ b/cmd/dendrite-key-server/main.go
@@ -24,7 +24,8 @@ func main() {
base := setup.NewBaseDendrite(cfg, "KeyServer", true)
defer base.Close() // nolint: errcheck
- intAPI := keyserver.NewInternalAPI(base.Cfg, base.CreateFederationClient(), base.UserAPIClient(), base.KafkaProducer)
+ intAPI := keyserver.NewInternalAPI(base.Cfg, base.CreateFederationClient(), base.KafkaProducer)
+ intAPI.SetUserAPI(base.UserAPIClient())
keyserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index b312579c..bce5fce0 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -76,7 +76,9 @@ func main() {
serverKeyAPI = base.ServerKeyAPIClient()
}
keyRing := serverKeyAPI.KeyRing()
- userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices)
+ keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer)
+ userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices, keyAPI)
+ keyAPI.SetUserAPI(userAPI)
rsImpl := roomserver.NewInternalAPI(
base, keyRing, federation,
@@ -119,7 +121,6 @@ func main() {
rsImpl.SetFederationSenderAPI(fsAPI)
stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer)
- keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer)
monolith := setup.Monolith{
Config: base.Cfg,
diff --git a/cmd/dendrite-user-api-server/main.go b/cmd/dendrite-user-api-server/main.go
index 4257da3f..e6d61da1 100644
--- a/cmd/dendrite-user-api-server/main.go
+++ b/cmd/dendrite-user-api-server/main.go
@@ -27,7 +27,7 @@ func main() {
accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
- userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices)
+ userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, cfg.Derived.ApplicationServices, base.KeyServerHTTPClient())
userapi.AddInternalRoutes(base.InternalAPIMux, userAPI)
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index 70e5279c..0df53e06 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -196,7 +196,9 @@ func main() {
accountDB := base.CreateAccountsDB()
deviceDB := base.CreateDeviceDB()
federation := createFederationClient(cfg, node)
- userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil)
+ keyAPI := keyserver.NewInternalAPI(base.Cfg, federation, base.KafkaProducer)
+ userAPI := userapi.NewInternalAPI(accountDB, deviceDB, cfg.Matrix.ServerName, nil, keyAPI)
+ keyAPI.SetUserAPI(userAPI)
fetcher := &libp2pKeyFetcher{}
keyRing := gomatrixserverlib.KeyRing{
@@ -233,7 +235,7 @@ func main() {
RoomserverAPI: rsAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
- KeyAPI: keyserver.NewInternalAPI(base.Cfg, federation, userAPI, base.KafkaProducer),
+ KeyAPI: keyAPI,
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: p2pPublicRoomProvider,
}
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index 98bcd944..6795498f 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -19,9 +19,13 @@ import (
"encoding/json"
"strings"
"time"
+
+ userapi "github.com/matrix-org/dendrite/userapi/api"
)
type KeyInternalAPI interface {
+ // SetUserAPI assigns a user API to query when extracting device names.
+ SetUserAPI(i userapi.UserInternalAPI)
PerformUploadKeys(ctx context.Context, req *PerformUploadKeysRequest, res *PerformUploadKeysResponse)
// PerformClaimKeys claims one-time keys for use in pre-key messages
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index 70371353..480d1084 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -40,6 +40,10 @@ type KeyInternalAPI struct {
Producer *producers.KeyChange
}
+func (a *KeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) {
+ a.UserAPI = i
+}
+
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
if req.Partition < 0 {
req.Partition = a.Producer.DefaultPartition()
@@ -272,6 +276,10 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
var keysToStore []api.DeviceKeys
// assert that the user ID / device ID are not lying for each key
for _, key := range req.DeviceKeys {
+ if len(key.KeyJSON) == 0 {
+ keysToStore = append(keysToStore, key)
+ continue // deleted keys don't need sanity checking
+ }
gotUserID := gjson.GetBytes(key.KeyJSON, "user_id").Str
gotDeviceID := gjson.GetBytes(key.KeyJSON, "device_id").Str
if gotUserID == key.UserID && gotDeviceID == key.DeviceID {
@@ -286,6 +294,7 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
),
})
}
+
// get existing device keys so we can check for changes
existingKeys := make([]api.DeviceKeys, len(keysToStore))
for i := range keysToStore {
@@ -358,7 +367,9 @@ func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceKeys) er
for _, newKey := range new {
exists := false
for _, existingKey := range existing {
- if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) {
+ // Do not treat the absence of keys as equal, or else we will not emit key changes
+ // when users delete devices which never had a key to begin with as both KeyJSONs are nil.
+ if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) && len(existingKey.KeyJSON) > 0 {
exists = true
break
}
diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go
index cd9cf70d..3f9690b5 100644
--- a/keyserver/inthttp/client.go
+++ b/keyserver/inthttp/client.go
@@ -21,6 +21,7 @@ import (
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver/api"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/opentracing/opentracing-go"
)
@@ -52,6 +53,10 @@ type httpKeyInternalAPI struct {
httpClient *http.Client
}
+func (h *httpKeyInternalAPI) SetUserAPI(i userapi.UserInternalAPI) {
+ // no-op: doesn't need it
+}
+
func (h *httpKeyInternalAPI) PerformClaimKeys(
ctx context.Context,
request *api.PerformClaimKeysRequest,
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index c748d7ce..36bedf34 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers"
"github.com/matrix-org/dendrite/keyserver/storage"
- userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
)
@@ -37,7 +36,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(
- cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, userAPI userapi.UserInternalAPI, producer sarama.SyncProducer,
+ cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, producer sarama.SyncProducer,
) api.KeyInternalAPI {
db, err := storage.NewDatabase(
string(cfg.Database.E2EKey),
@@ -55,7 +54,6 @@ func NewInternalAPI(
DB: db,
ThisServer: cfg.Matrix.ServerName,
FedClient: fedClient,
- UserAPI: userAPI,
Producer: keyChangeProducer,
}
}
diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go
index c51d9f55..6035b67b 100644
--- a/keyserver/producers/keychange.go
+++ b/keyserver/producers/keychange.go
@@ -63,10 +63,11 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error {
return err
}
logrus.WithFields(logrus.Fields{
- "user_id": key.UserID,
- "device_id": key.DeviceID,
- "partition": partition,
- "offset": offset,
+ "user_id": key.UserID,
+ "device_id": key.DeviceID,
+ "partition": partition,
+ "offset": offset,
+ "len_key_bytes": len(key.KeyJSON),
}).Infof("Produced to key change topic '%s'", p.Topic)
}
return nil
diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go
index 7a4fce6f..fade7522 100644
--- a/keyserver/storage/interface.go
+++ b/keyserver/storage/interface.go
@@ -32,7 +32,8 @@ type Database interface {
// DeviceKeysJSON populates the KeyJSON for the given keys. If any proided `keys` have a `KeyJSON` already then it will be replaced.
DeviceKeysJSON(ctx context.Context, keys []api.DeviceKeys) error
- // StoreDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced.
+ // StoreDeviceKeys persists the given keys. Keys with the same user ID and device ID will be replaced. An empty KeyJSON removes the key
+ // for this (user, device).
// Returns an error if there was a problem storing the keys.
StoreDeviceKeys(ctx context.Context, keys []api.DeviceKeys) error
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index da4a5366..f8cdcd5c 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -35,6 +35,7 @@ type OutputRoomEventConsumer struct {
rsConsumer *internal.ContinualConsumer
db storage.Database
notifier *sync.Notifier
+ keyChanges *OutputKeyChangeEventConsumer
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -44,6 +45,7 @@ func NewOutputRoomEventConsumer(
n *sync.Notifier,
store storage.Database,
rsAPI api.RoomserverInternalAPI,
+ keyChanges *OutputKeyChangeEventConsumer,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
@@ -56,6 +58,7 @@ func NewOutputRoomEventConsumer(
db: store,
notifier: n,
rsAPI: rsAPI,
+ keyChanges: keyChanges,
}
consumer.ProcessMessage = s.onMessage
@@ -160,9 +163,29 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
+ s.notifyKeyChanges(&ev)
+
return nil
}
+func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.HeaderedEvent) {
+ if ev.Type() != gomatrixserverlib.MRoomMember || ev.StateKey() == nil {
+ return
+ }
+ membership, err := ev.Membership()
+ if err != nil {
+ return
+ }
+ switch membership {
+ case gomatrixserverlib.Join:
+ s.keyChanges.OnJoinEvent(ev)
+ case gomatrixserverlib.Ban:
+ fallthrough
+ case gomatrixserverlib.Leave:
+ s.keyChanges.OnLeaveEvent(ev)
+ }
+}
+
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
diff --git a/syncapi/internal/keychange_test.go b/syncapi/internal/keychange_test.go
index 3f18696c..2c3d154d 100644
--- a/syncapi/internal/keychange_test.go
+++ b/syncapi/internal/keychange_test.go
@@ -10,6 +10,7 @@ import (
"github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -29,6 +30,8 @@ type mockKeyAPI struct{}
func (k *mockKeyAPI) PerformUploadKeys(ctx context.Context, req *keyapi.PerformUploadKeysRequest, res *keyapi.PerformUploadKeysResponse) {
}
+func (k *mockKeyAPI) SetUserAPI(i userapi.UserInternalAPI) {}
+
// PerformClaimKeys claims one-time keys for use in pre-key messages
func (k *mockKeyAPI) PerformClaimKeys(ctx context.Context, req *keyapi.PerformClaimKeysRequest, res *keyapi.PerformClaimKeysResponse) {
}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 754cd502..5198d59b 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -64,8 +64,16 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
+ keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
+ cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
+ consumer, notifier, keyAPI, currentStateAPI, syncDB,
+ )
+ if err = keyChangeConsumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start key change consumer")
+ }
+
roomConsumer := consumers.NewOutputRoomEventConsumer(
- cfg, consumer, notifier, syncDB, rsAPI,
+ cfg, consumer, notifier, syncDB, rsAPI, keyChangeConsumer,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
@@ -92,13 +100,5 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
}
- keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- cfg.Matrix.ServerName, string(cfg.Kafka.Topics.OutputKeyChangeEvent),
- consumer, notifier, keyAPI, currentStateAPI, syncDB,
- )
- if err = keyChangeConsumer.Start(); err != nil {
- logrus.WithError(err).Panicf("failed to start key change consumer")
- }
-
routing.Setup(router, requestPool, syncDB, userAPI, federation, rsAPI, cfg)
}
diff --git a/sytest-whitelist b/sytest-whitelist
index 341df8a9..03baf4d4 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -129,6 +129,7 @@ Can claim one time key using POST
Can claim remote one time key using POST
Local device key changes appear in v2 /sync
Local device key changes appear in /keys/changes
+Local delete device changes appear in v2 /sync
Get left notifs for other users in sync and /keys/changes when user leaves
Can add account data
Can add account data to room
diff --git a/userapi/api/api.go b/userapi/api/api.go
index 5791403f..5c964c4f 100644
--- a/userapi/api/api.go
+++ b/userapi/api/api.go
@@ -27,6 +27,7 @@ type UserInternalAPI interface {
InputAccountData(ctx context.Context, req *InputAccountDataRequest, res *InputAccountDataResponse) error
PerformAccountCreation(ctx context.Context, req *PerformAccountCreationRequest, res *PerformAccountCreationResponse) error
PerformDeviceCreation(ctx context.Context, req *PerformDeviceCreationRequest, res *PerformDeviceCreationResponse) error
+ PerformDeviceDeletion(ctx context.Context, req *PerformDeviceDeletionRequest, res *PerformDeviceDeletionResponse) error
QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error
@@ -47,6 +48,15 @@ type InputAccountDataRequest struct {
type InputAccountDataResponse struct {
}
+type PerformDeviceDeletionRequest struct {
+ UserID string
+ // The devices to delete
+ DeviceIDs []string
+}
+
+type PerformDeviceDeletionResponse struct {
+}
+
// QueryDeviceInfosRequest is the request to QueryDeviceInfos
type QueryDeviceInfosRequest struct {
DeviceIDs []string
diff --git a/userapi/internal/api.go b/userapi/internal/api.go
index 5b154196..738023dd 100644
--- a/userapi/internal/api.go
+++ b/userapi/internal/api.go
@@ -25,10 +25,12 @@ import (
"github.com/matrix-org/dendrite/clientapi/userutil"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/storage/accounts"
"github.com/matrix-org/dendrite/userapi/storage/devices"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
)
type UserInternalAPI struct {
@@ -37,6 +39,7 @@ type UserInternalAPI struct {
ServerName gomatrixserverlib.ServerName
// AppServices is the list of all registered AS
AppServices []config.ApplicationService
+ KeyAPI keyapi.KeyInternalAPI
}
func (a *UserInternalAPI) InputAccountData(ctx context.Context, req *api.InputAccountDataRequest, res *api.InputAccountDataResponse) error {
@@ -104,6 +107,42 @@ func (a *UserInternalAPI) PerformDeviceCreation(ctx context.Context, req *api.Pe
return nil
}
+func (a *UserInternalAPI) PerformDeviceDeletion(ctx context.Context, req *api.PerformDeviceDeletionRequest, res *api.PerformDeviceDeletionResponse) error {
+ util.GetLogger(ctx).WithField("user_id", req.UserID).WithField("devices", req.DeviceIDs).Info("PerformDeviceDeletion")
+ local, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
+ if err != nil {
+ return err
+ }
+ if domain != a.ServerName {
+ return fmt.Errorf("cannot PerformDeviceDeletion of remote users: got %s want %s", domain, a.ServerName)
+ }
+ err = a.DeviceDB.RemoveDevices(ctx, local, req.DeviceIDs)
+ if err != nil {
+ return err
+ }
+ // create empty device keys and upload them to delete what was once there and trigger device list changes
+ deviceKeys := make([]keyapi.DeviceKeys, len(req.DeviceIDs))
+ for i, did := range req.DeviceIDs {
+ deviceKeys[i] = keyapi.DeviceKeys{
+ UserID: req.UserID,
+ DeviceID: did,
+ KeyJSON: nil,
+ }
+ }
+
+ var uploadRes keyapi.PerformUploadKeysResponse
+ a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{
+ DeviceKeys: deviceKeys,
+ }, &uploadRes)
+ if uploadRes.Error != nil {
+ return fmt.Errorf("Failed to delete device keys: %v", uploadRes.Error)
+ }
+ if len(uploadRes.KeyErrors) > 0 {
+ return fmt.Errorf("Failed to delete device keys, key errors: %+v", uploadRes.KeyErrors)
+ }
+ return nil
+}
+
func (a *UserInternalAPI) QueryProfile(ctx context.Context, req *api.QueryProfileRequest, res *api.QueryProfileResponse) error {
local, domain, err := gomatrixserverlib.SplitID('@', req.UserID)
if err != nil {
diff --git a/userapi/inthttp/client.go b/userapi/inthttp/client.go
index 3e1ac066..47e2110f 100644
--- a/userapi/inthttp/client.go
+++ b/userapi/inthttp/client.go
@@ -30,6 +30,7 @@ const (
PerformDeviceCreationPath = "/userapi/performDeviceCreation"
PerformAccountCreationPath = "/userapi/performAccountCreation"
+ PerformDeviceDeletionPath = "/userapi/performDeviceDeletion"
QueryProfilePath = "/userapi/queryProfile"
QueryAccessTokenPath = "/userapi/queryAccessToken"
@@ -91,6 +92,18 @@ func (h *httpUserInternalAPI) PerformDeviceCreation(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+func (h *httpUserInternalAPI) PerformDeviceDeletion(
+ ctx context.Context,
+ request *api.PerformDeviceDeletionRequest,
+ response *api.PerformDeviceDeletionResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformDeviceDeletion")
+ defer span.Finish()
+
+ apiURL := h.apiURL + PerformDeviceDeletionPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
+
func (h *httpUserInternalAPI) QueryProfile(
ctx context.Context,
request *api.QueryProfileRequest,
diff --git a/userapi/inthttp/server.go b/userapi/inthttp/server.go
index d29f4d44..ebb9bf4e 100644
--- a/userapi/inthttp/server.go
+++ b/userapi/inthttp/server.go
@@ -52,6 +52,19 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(PerformDeviceDeletionPath,
+ httputil.MakeInternalAPI("performDeviceDeletion", func(req *http.Request) util.JSONResponse {
+ request := api.PerformDeviceDeletionRequest{}
+ response := api.PerformDeviceDeletionResponse{}
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := s.PerformDeviceDeletion(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
internalAPIMux.Handle(QueryProfilePath,
httputil.MakeInternalAPI("queryProfile", func(req *http.Request) util.JSONResponse {
request := api.QueryProfileRequest{}
diff --git a/userapi/userapi.go b/userapi/userapi.go
index 7aadec06..c4ab90ba 100644
--- a/userapi/userapi.go
+++ b/userapi/userapi.go
@@ -17,6 +17,7 @@ package userapi
import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/config"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/userapi/internal"
"github.com/matrix-org/dendrite/userapi/inthttp"
@@ -34,12 +35,13 @@ func AddInternalRoutes(router *mux.Router, intAPI api.UserInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(accountDB accounts.Database, deviceDB devices.Database,
- serverName gomatrixserverlib.ServerName, appServices []config.ApplicationService) api.UserInternalAPI {
+ serverName gomatrixserverlib.ServerName, appServices []config.ApplicationService, keyAPI keyapi.KeyInternalAPI) api.UserInternalAPI {
return &internal.UserInternalAPI{
AccountDB: accountDB,
DeviceDB: deviceDB,
ServerName: serverName,
AppServices: appServices,
+ KeyAPI: keyAPI,
}
}
diff --git a/userapi/userapi_test.go b/userapi/userapi_test.go
index 163b10ec..dab1ec71 100644
--- a/userapi/userapi_test.go
+++ b/userapi/userapi_test.go
@@ -32,7 +32,7 @@ func MustMakeInternalAPI(t *testing.T) (api.UserInternalAPI, accounts.Database,
t.Fatalf("failed to create device DB: %s", err)
}
- return userapi.NewInternalAPI(accountDB, deviceDB, serverName, nil), accountDB, deviceDB
+ return userapi.NewInternalAPI(accountDB, deviceDB, serverName, nil, nil), accountDB, deviceDB
}
func TestQueryProfile(t *testing.T) {