diff options
author | S7evinK <2353100+S7evinK@users.noreply.github.com> | 2022-02-08 18:13:38 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-02-08 18:13:38 +0100 |
commit | 2771d93748380aa7dc21adca0ef690348d79f002 (patch) | |
tree | 54230605e12fb77c5d9712d23a759a80489d1d17 /keyserver | |
parent | 457a07eac5d668a0f04c273e086d321cab7ea640 (diff) |
Remove OutputKeyChangeEvent consumer on keyserver (#2160)
* Remove keyserver consumer
* Remove keyserver from eduserver
* Directly upload device keys without eduserver
* Add passing tests
Diffstat (limited to 'keyserver')
-rw-r--r-- | keyserver/consumers/cross_signing.go | 123 | ||||
-rw-r--r-- | keyserver/internal/cross_signing.go | 50 | ||||
-rw-r--r-- | keyserver/keyserver.go | 8 |
3 files changed, 23 insertions, 158 deletions
diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go deleted file mode 100644 index aae69e96..00000000 --- a/keyserver/consumers/cross_signing.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2021 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package consumers - -import ( - "context" - "encoding/json" - - "github.com/matrix-org/dendrite/keyserver/api" - "github.com/matrix-org/dendrite/keyserver/storage" - "github.com/matrix-org/dendrite/setup/config" - "github.com/matrix-org/dendrite/setup/jetstream" - "github.com/matrix-org/dendrite/setup/process" - "github.com/matrix-org/gomatrixserverlib" - "github.com/nats-io/nats.go" - "github.com/sirupsen/logrus" -) - -type OutputCrossSigningKeyUpdateConsumer struct { - ctx context.Context - keyDB storage.Database - keyAPI api.KeyInternalAPI - serverName string - jetstream nats.JetStreamContext - durable string - topic string -} - -func NewOutputCrossSigningKeyUpdateConsumer( - process *process.ProcessContext, - cfg *config.Dendrite, - js nats.JetStreamContext, - keyDB storage.Database, - keyAPI api.KeyInternalAPI, -) *OutputCrossSigningKeyUpdateConsumer { - // The keyserver both produces and consumes on the TopicOutputKeyChangeEvent - // topic. We will only produce events where the UserID matches our server name, - // and we will only consume events where the UserID does NOT match our server - // name (because the update came from a remote server). - s := &OutputCrossSigningKeyUpdateConsumer{ - ctx: process.Context(), - keyDB: keyDB, - jetstream: js, - durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"), - topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent), - keyAPI: keyAPI, - serverName: string(cfg.Global.ServerName), - } - - return s -} - -func (s *OutputCrossSigningKeyUpdateConsumer) Start() error { - return jetstream.JetStreamConsumer( - s.ctx, s.jetstream, s.topic, s.durable, s.onMessage, - nats.DeliverAll(), nats.ManualAck(), - ) -} - -// onMessage is called in response to a message received on the -// key change events topic from the key server. -func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool { - var m api.DeviceMessage - if err := json.Unmarshal(msg.Data, &m); err != nil { - logrus.WithError(err).Errorf("failed to read device message from key change topic") - return true - } - if m.OutputCrossSigningKeyUpdate == nil { - // This probably shouldn't happen but stops us from panicking if we come - // across an update that doesn't satisfy either types. - return true - } - switch m.Type { - case api.TypeCrossSigningUpdate: - return t.onCrossSigningMessage(m) - default: - return true - } -} - -func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool { - output := m.CrossSigningKeyUpdate - _, host, err := gomatrixserverlib.SplitID('@', output.UserID) - if err != nil { - logrus.WithError(err).Errorf("eduserver output log: user ID parse failure") - return true - } - if host == gomatrixserverlib.ServerName(s.serverName) { - // Ignore any messages that contain information about our own users, as - // they already originated from this server. - return true - } - uploadReq := &api.PerformUploadDeviceKeysRequest{ - UserID: output.UserID, - } - if output.MasterKey != nil { - uploadReq.MasterKey = *output.MasterKey - } - if output.SelfSigningKey != nil { - uploadReq.SelfSigningKey = *output.SelfSigningKey - } - uploadRes := &api.PerformUploadDeviceKeysResponse{} - s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes) - if uploadRes.Error != nil { - // If the error is due to a missing or invalid parameter then we'd might - // as well just acknowledge the message, because otherwise otherwise we'll - // just keep getting delivered a faulty message over and over again. - return uploadRes.Error.IsMissingParam || uploadRes.Error.IsInvalidParam - } - return true -} diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go index 1e1871b8..527990cf 100644 --- a/keyserver/internal/cross_signing.go +++ b/keyserver/internal/cross_signing.go @@ -219,25 +219,23 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P } // Finally, generate a notification that we updated the keys. - if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer { - update := eduserverAPI.CrossSigningKeyUpdate{ - UserID: req.UserID, - } - if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { - update.MasterKey = &mk - } - if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { - update.SelfSigningKey = &ssk - } - if update.MasterKey == nil && update.SelfSigningKey == nil { - return - } - if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), - } - return + update := eduserverAPI.CrossSigningKeyUpdate{ + UserID: req.UserID, + } + if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok { + update.MasterKey = &mk + } + if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok { + update.SelfSigningKey = &ssk + } + if update.MasterKey == nil && update.SelfSigningKey == nil { + return + } + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), } + return } } @@ -310,16 +308,14 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req // Finally, generate a notification that we updated the signatures. for userID := range req.Signatures { - if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer { - update := eduserverAPI.CrossSigningKeyUpdate{ - UserID: userID, - } - if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { - res.Error = &api.KeyError{ - Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), - } - return + update := eduserverAPI.CrossSigningKeyUpdate{ + UserID: userID, + } + if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil { + res.Error = &api.KeyError{ + Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err), } + return } } } diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go index 61ccc030..bd36fd9f 100644 --- a/keyserver/keyserver.go +++ b/keyserver/keyserver.go @@ -18,7 +18,6 @@ import ( "github.com/gorilla/mux" fedsenderapi "github.com/matrix-org/dendrite/federationapi/api" "github.com/matrix-org/dendrite/keyserver/api" - "github.com/matrix-org/dendrite/keyserver/consumers" "github.com/matrix-org/dendrite/keyserver/internal" "github.com/matrix-org/dendrite/keyserver/inthttp" "github.com/matrix-org/dendrite/keyserver/producers" @@ -65,12 +64,5 @@ func NewInternalAPI( } }() - keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer( - base.ProcessContext, base.Cfg, js, db, ap, - ) - if err := keyconsumer.Start(); err != nil { - logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer") - } - return ap } |