aboutsummaryrefslogtreecommitdiff
path: root/keyserver
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-07-23 16:41:36 +0100
committerGitHub <noreply@github.com>2020-07-23 16:41:36 +0100
commit98f2f09bb46f8bd126214f7874065d6b311bdeba (patch)
tree89108a41fe96e5d0a6dbd4f1d5e91c0b5263fc2e /keyserver
parent7b862384a779f067f07ffeb2151856f89d372732 (diff)
keyserver: produce key change events (#1218)
* Produce kafka events when keys are added * Consume key changes in syncapi with TODO markers for handling them and catching up * unbreak tests * Linting
Diffstat (limited to 'keyserver')
-rw-r--r--keyserver/internal/internal.go25
-rw-r--r--keyserver/keyserver.go11
-rw-r--r--keyserver/producers/keychange.go57
3 files changed, 89 insertions, 4 deletions
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index 174a72dc..d3a6d4ba 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -23,6 +23,7 @@ import (
"time"
"github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/keyserver/producers"
"github.com/matrix-org/dendrite/keyserver/storage"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -36,6 +37,7 @@ type KeyInternalAPI struct {
ThisServer gomatrixserverlib.ServerName
FedClient *gomatrixserverlib.FederationClient
UserAPI userapi.UserInternalAPI
+ Producer *producers.KeyChange
}
func (a *KeyInternalAPI) PerformUploadKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
@@ -290,7 +292,10 @@ func (a *KeyInternalAPI) uploadDeviceKeys(ctx context.Context, req *api.PerformU
}
return
}
- a.emitDeviceKeyChanges(existingKeys, keysToStore)
+ err := a.emitDeviceKeyChanges(existingKeys, keysToStore)
+ if err != nil {
+ util.GetLogger(ctx).Errorf("Failed to emitDeviceKeyChanges: %s", err)
+ }
}
func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
@@ -332,6 +337,20 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
}
-func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceKeys) {
- // TODO
+func (a *KeyInternalAPI) emitDeviceKeyChanges(existing, new []api.DeviceKeys) error {
+ // find keys in new that are not in existing
+ var keysAdded []api.DeviceKeys
+ for _, newKey := range new {
+ exists := false
+ for _, existingKey := range existing {
+ if bytes.Equal(existingKey.KeyJSON, newKey.KeyJSON) {
+ exists = true
+ break
+ }
+ }
+ if !exists {
+ keysAdded = append(keysAdded, newKey)
+ }
+ }
+ return a.Producer.ProduceKeyChanges(keysAdded)
}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 2e1ddb6c..47c6a8c3 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -15,11 +15,13 @@
package keyserver
import (
+ "github.com/Shopify/sarama"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
+ "github.com/matrix-org/dendrite/keyserver/producers"
"github.com/matrix-org/dendrite/keyserver/storage"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -34,7 +36,9 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
// NewInternalAPI returns a concerete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
-func NewInternalAPI(cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, userAPI userapi.UserInternalAPI) api.KeyInternalAPI {
+func NewInternalAPI(
+ cfg *config.Dendrite, fedClient *gomatrixserverlib.FederationClient, userAPI userapi.UserInternalAPI, producer sarama.SyncProducer,
+) api.KeyInternalAPI {
db, err := storage.NewDatabase(
string(cfg.Database.E2EKey),
cfg.DbProperties(),
@@ -42,10 +46,15 @@ func NewInternalAPI(cfg *config.Dendrite, fedClient *gomatrixserverlib.Federatio
if err != nil {
logrus.WithError(err).Panicf("failed to connect to key server database")
}
+ keyChangeProducer := &producers.KeyChange{
+ Topic: string(cfg.Kafka.Topics.OutputKeyChangeEvent),
+ Producer: producer,
+ }
return &internal.KeyInternalAPI{
DB: db,
ThisServer: cfg.Matrix.ServerName,
FedClient: fedClient,
UserAPI: userAPI,
+ Producer: keyChangeProducer,
}
}
diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go
new file mode 100644
index 00000000..6683a936
--- /dev/null
+++ b/keyserver/producers/keychange.go
@@ -0,0 +1,57 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "encoding/json"
+
+ "github.com/Shopify/sarama"
+ "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/sirupsen/logrus"
+)
+
+// KeyChange produces key change events for the sync API and federation sender to consume
+type KeyChange struct {
+ Topic string
+ Producer sarama.SyncProducer
+}
+
+// ProduceKeyChanges creates new change events for each key
+func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceKeys) error {
+ for _, key := range keys {
+ var m sarama.ProducerMessage
+
+ value, err := json.Marshal(key)
+ if err != nil {
+ return err
+ }
+
+ m.Topic = string(p.Topic)
+ m.Key = sarama.StringEncoder(key.UserID)
+ m.Value = sarama.ByteEncoder(value)
+
+ partition, offset, err := p.Producer.SendMessage(&m)
+ if err != nil {
+ return err
+ }
+ logrus.WithFields(logrus.Fields{
+ "user_id": key.UserID,
+ "device_id": key.DeviceID,
+ "partition": partition,
+ "offset": offset,
+ }).Infof("Produced to key change topic '%s'", p.Topic)
+ }
+ return nil
+}