aboutsummaryrefslogtreecommitdiff
path: root/keyserver
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-02-08 18:13:38 +0100
committerGitHub <noreply@github.com>2022-02-08 18:13:38 +0100
commit2771d93748380aa7dc21adca0ef690348d79f002 (patch)
tree54230605e12fb77c5d9712d23a759a80489d1d17 /keyserver
parent457a07eac5d668a0f04c273e086d321cab7ea640 (diff)
Remove OutputKeyChangeEvent consumer on keyserver (#2160)
* Remove keyserver consumer * Remove keyserver from eduserver * Directly upload device keys without eduserver * Add passing tests
Diffstat (limited to 'keyserver')
-rw-r--r--keyserver/consumers/cross_signing.go123
-rw-r--r--keyserver/internal/cross_signing.go50
-rw-r--r--keyserver/keyserver.go8
3 files changed, 23 insertions, 158 deletions
diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go
deleted file mode 100644
index aae69e96..00000000
--- a/keyserver/consumers/cross_signing.go
+++ /dev/null
@@ -1,123 +0,0 @@
-// Copyright 2021 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package consumers
-
-import (
- "context"
- "encoding/json"
-
- "github.com/matrix-org/dendrite/keyserver/api"
- "github.com/matrix-org/dendrite/keyserver/storage"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/setup/process"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- "github.com/sirupsen/logrus"
-)
-
-type OutputCrossSigningKeyUpdateConsumer struct {
- ctx context.Context
- keyDB storage.Database
- keyAPI api.KeyInternalAPI
- serverName string
- jetstream nats.JetStreamContext
- durable string
- topic string
-}
-
-func NewOutputCrossSigningKeyUpdateConsumer(
- process *process.ProcessContext,
- cfg *config.Dendrite,
- js nats.JetStreamContext,
- keyDB storage.Database,
- keyAPI api.KeyInternalAPI,
-) *OutputCrossSigningKeyUpdateConsumer {
- // The keyserver both produces and consumes on the TopicOutputKeyChangeEvent
- // topic. We will only produce events where the UserID matches our server name,
- // and we will only consume events where the UserID does NOT match our server
- // name (because the update came from a remote server).
- s := &OutputCrossSigningKeyUpdateConsumer{
- ctx: process.Context(),
- keyDB: keyDB,
- jetstream: js,
- durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"),
- topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
- keyAPI: keyAPI,
- serverName: string(cfg.Global.ServerName),
- }
-
- return s
-}
-
-func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
- return jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
- nats.DeliverAll(), nats.ManualAck(),
- )
-}
-
-// onMessage is called in response to a message received on the
-// key change events topic from the key server.
-func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
- var m api.DeviceMessage
- if err := json.Unmarshal(msg.Data, &m); err != nil {
- logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return true
- }
- if m.OutputCrossSigningKeyUpdate == nil {
- // This probably shouldn't happen but stops us from panicking if we come
- // across an update that doesn't satisfy either types.
- return true
- }
- switch m.Type {
- case api.TypeCrossSigningUpdate:
- return t.onCrossSigningMessage(m)
- default:
- return true
- }
-}
-
-func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
- output := m.CrossSigningKeyUpdate
- _, host, err := gomatrixserverlib.SplitID('@', output.UserID)
- if err != nil {
- logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
- return true
- }
- if host == gomatrixserverlib.ServerName(s.serverName) {
- // Ignore any messages that contain information about our own users, as
- // they already originated from this server.
- return true
- }
- uploadReq := &api.PerformUploadDeviceKeysRequest{
- UserID: output.UserID,
- }
- if output.MasterKey != nil {
- uploadReq.MasterKey = *output.MasterKey
- }
- if output.SelfSigningKey != nil {
- uploadReq.SelfSigningKey = *output.SelfSigningKey
- }
- uploadRes := &api.PerformUploadDeviceKeysResponse{}
- s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
- if uploadRes.Error != nil {
- // If the error is due to a missing or invalid parameter then we'd might
- // as well just acknowledge the message, because otherwise otherwise we'll
- // just keep getting delivered a faulty message over and over again.
- return uploadRes.Error.IsMissingParam || uploadRes.Error.IsInvalidParam
- }
- return true
-}
diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go
index 1e1871b8..527990cf 100644
--- a/keyserver/internal/cross_signing.go
+++ b/keyserver/internal/cross_signing.go
@@ -219,25 +219,23 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
}
// Finally, generate a notification that we updated the keys.
- if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer {
- update := eduserverAPI.CrossSigningKeyUpdate{
- UserID: req.UserID,
- }
- if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
- update.MasterKey = &mk
- }
- if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok {
- update.SelfSigningKey = &ssk
- }
- if update.MasterKey == nil && update.SelfSigningKey == nil {
- return
- }
- if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
- res.Error = &api.KeyError{
- Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
- }
- return
+ update := eduserverAPI.CrossSigningKeyUpdate{
+ UserID: req.UserID,
+ }
+ if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
+ update.MasterKey = &mk
+ }
+ if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok {
+ update.SelfSigningKey = &ssk
+ }
+ if update.MasterKey == nil && update.SelfSigningKey == nil {
+ return
+ }
+ if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
+ res.Error = &api.KeyError{
+ Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
}
+ return
}
}
@@ -310,16 +308,14 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
// Finally, generate a notification that we updated the signatures.
for userID := range req.Signatures {
- if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer {
- update := eduserverAPI.CrossSigningKeyUpdate{
- UserID: userID,
- }
- if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
- res.Error = &api.KeyError{
- Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
- }
- return
+ update := eduserverAPI.CrossSigningKeyUpdate{
+ UserID: userID,
+ }
+ if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
+ res.Error = &api.KeyError{
+ Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
}
+ return
}
}
}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 61ccc030..bd36fd9f 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -18,7 +18,6 @@ import (
"github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
- "github.com/matrix-org/dendrite/keyserver/consumers"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers"
@@ -65,12 +64,5 @@ func NewInternalAPI(
}
}()
- keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
- base.ProcessContext, base.Cfg, js, db, ap,
- )
- if err := keyconsumer.Start(); err != nil {
- logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
- }
-
return ap
}