aboutsummaryrefslogtreecommitdiff
path: root/keyserver/internal
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-23 16:41:36 +0100
committerGitHub <noreply@github.com>2020-07-23 16:41:36 +0100
commit98f2f09bb46f8bd126214f7874065d6b311bdeba (patch)
tree89108a41fe96e5d0a6dbd4f1d5e91c0b5263fc2e /keyserver/internal
parent7b862384a779f067f07ffeb2151856f89d372732 (diff)
keyserver: produce key change events (#1218)
* Produce kafka events when keys are added * Consume key changes in syncapi with TODO markers for handling them and catching up * unbreak tests * Linting
Diffstat (limited to 'keyserver/internal')
-rw-r--r--keyserver/internal/internal.go25
1 files changed, 22 insertions, 3 deletions
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index 174a72dc..d3a6d4ba 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -23,6 +23,7 @@ import (
"time"
"github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/keyserver/producers"
"github.com/matrix-org/dendrite/keyserver/storage"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -36,6 +37,7 @@ type KeyInternalAPI struct {
ThisServer gomatrixserverlib.ServerName
FedClient *gomatrixserverlib.FederationClient
UserAPI userapi.UserInternalAPI
+ Producer *producers.KeyChange
}
func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
@@ -290,7 +292,10 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
}
return
}
- a.emitDeviceKeyChanges(existingKeys, keysToStore)
+ err := a.emitDeviceKeyChanges(existingKeys, keysToStore)
+ if err != nil {
+ util.GetLogger(ctx).Errorf("Failed to emitDeviceKeyChanges: %s", err)
+ }
}
func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
@@ -332,6 +337,20 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
}
-func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceKeys) {
- // TODO
+func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceKeys) error {
+ // find keys in new that are not in existing
+ var keysAdded []api.DeviceKeys
+ for _, newKey := range new {
+ exists := false
+ for _, existingKey := range existing {
+ if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) {
+ exists = true
+ break
+ }
+ }
+ if !exists {
+ keysAdded = append(keysAdded, newKey)
+ }
+ }
+ return a.Producer.ProduceKeyChanges(keysAdded)
}