aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-02-08 18:13:38 +0100
committerGitHub <noreply@github.com>2022-02-08 18:13:38 +0100
commit2771d93748380aa7dc21adca0ef690348d79f002 (patch)
tree54230605e12fb77c5d9712d23a759a80489d1d17
parent457a07eac5d668a0f04c273e086d321cab7ea640 (diff)
Remove OutputKeyChangeEvent consumer on keyserver (#2160)
* Remove keyserver consumer * Remove keyserver from eduserver * Directly upload device keys without eduserver * Add passing tests
-rw-r--r--eduserver/api/input.go6
-rw-r--r--eduserver/eduserver.go1
-rw-r--r--eduserver/input/input.go31
-rw-r--r--eduserver/inthttp/client.go20
-rw-r--r--eduserver/inthttp/server.go13
-rw-r--r--federationapi/consumers/roomserver.go1
-rw-r--r--federationapi/routing/send.go44
-rw-r--r--keyserver/consumers/cross_signing.go123
-rw-r--r--keyserver/internal/cross_signing.go50
-rw-r--r--keyserver/keyserver.go8
-rw-r--r--syncapi/internal/keychange.go2
-rw-r--r--sytest-whitelist1
12 files changed, 59 insertions, 241 deletions
diff --git a/eduserver/api/input.go b/eduserver/api/input.go
index 2fa253f4..2aab107b 100644
--- a/eduserver/api/input.go
+++ b/eduserver/api/input.go
@@ -100,10 +100,4 @@ type EDUServerInputAPI interface {
request *InputReceiptEventRequest,
response *InputReceiptEventResponse,
) error
-
- InputCrossSigningKeyUpdate(
- ctx context.Context,
- request *InputCrossSigningKeyUpdateRequest,
- response *InputCrossSigningKeyUpdateResponse,
- ) error
}
diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go
index febcf286..9b7e2165 100644
--- a/eduserver/eduserver.go
+++ b/eduserver/eduserver.go
@@ -51,7 +51,6 @@ func NewInternalAPI(
OutputTypingEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
- OutputKeyChangeEventTopic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
ServerName: cfg.Matrix.ServerName,
}
}
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
index 4f8ab3e3..e58f0dd3 100644
--- a/eduserver/input/input.go
+++ b/eduserver/input/input.go
@@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
- keyapi "github.com/matrix-org/dendrite/keyserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
@@ -40,8 +39,6 @@ type EDUServerInputAPI struct {
OutputSendToDeviceEventTopic string
// The kafka topic to output new receipt events to
OutputReceiptEventTopic string
- // The kafka topic to output new key change events to
- OutputKeyChangeEventTopic string
// kafka producer
JetStream nats.JetStreamContext
// Internal user query API
@@ -80,34 +77,6 @@ func (t *EDUServerInputAPI) InputSendToDeviceEvent(
return t.sendToDeviceEvent(ise)
}
-// InputCrossSigningKeyUpdate implements api.EDUServerInputAPI
-func (t *EDUServerInputAPI) InputCrossSigningKeyUpdate(
- ctx context.Context,
- request *api.InputCrossSigningKeyUpdateRequest,
- response *api.InputCrossSigningKeyUpdateResponse,
-) error {
- eventJSON, err := json.Marshal(&keyapi.DeviceMessage{
- Type: keyapi.TypeCrossSigningUpdate,
- OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{
- CrossSigningKeyUpdate: request.CrossSigningKeyUpdate,
- },
- })
- if err != nil {
- return err
- }
-
- logrus.WithFields(logrus.Fields{
- "user_id": request.UserID,
- }).Tracef("Producing to topic '%s'", t.OutputKeyChangeEventTopic)
-
- _, err = t.JetStream.PublishMsg(&nats.Msg{
- Subject: t.OutputKeyChangeEventTopic,
- Header: nats.Header{},
- Data: eventJSON,
- })
- return err
-}
-
func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
ev := &api.TypingEvent{
Type: gomatrixserverlib.MTyping,
diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go
index 9a6f483c..0690ed82 100644
--- a/eduserver/inthttp/client.go
+++ b/eduserver/inthttp/client.go
@@ -12,10 +12,9 @@ import (
// HTTP paths for the internal HTTP APIs
const (
- EDUServerInputTypingEventPath = "/eduserver/input"
- EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
- EDUServerInputReceiptEventPath = "/eduserver/receipt"
- EDUServerInputCrossSigningKeyUpdatePath = "/eduserver/crossSigningKeyUpdate"
+ EDUServerInputTypingEventPath = "/eduserver/input"
+ EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
+ EDUServerInputReceiptEventPath = "/eduserver/receipt"
)
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@@ -69,16 +68,3 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
-
-// InputCrossSigningKeyUpdate implements EDUServerInputAPI
-func (h *httpEDUServerInputAPI) InputCrossSigningKeyUpdate(
- ctx context.Context,
- request *api.InputCrossSigningKeyUpdateRequest,
- response *api.InputCrossSigningKeyUpdateResponse,
-) error {
- span, ctx := opentracing.StartSpanFromContext(ctx, "InputCrossSigningKeyUpdate")
- defer span.Finish()
-
- apiURL := h.eduServerURL + EDUServerInputCrossSigningKeyUpdatePath
- return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
-}
diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go
index a50ca84f..a3494375 100644
--- a/eduserver/inthttp/server.go
+++ b/eduserver/inthttp/server.go
@@ -51,17 +51,4 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
- internalAPIMux.Handle(EDUServerInputCrossSigningKeyUpdatePath,
- httputil.MakeInternalAPI("inputCrossSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
- var request api.InputCrossSigningKeyUpdateRequest
- var response api.InputCrossSigningKeyUpdateResponse
- if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
- return util.MessageResponse(http.StatusBadRequest, err.Error())
- }
- if err := t.InputCrossSigningKeyUpdate(req.Context(), &request, &response); err != nil {
- return util.ErrorResponse(err)
- }
- return util.JSONResponse{Code: http.StatusOK, JSON: &response}
- }),
- )
}
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index e9862000..60066bb2 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
-
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/types"
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 524fd510..dd4fe13a 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -382,20 +382,8 @@ func (t *txnReq) processEDUs(ctx context.Context) {
}
}
case eduserverAPI.MSigningKeyUpdate:
- var updatePayload eduserverAPI.CrossSigningKeyUpdate
- if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
- util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
- "user_id": updatePayload.UserID,
- }).Debug("Failed to send signing key update to edu server")
- continue
- }
- inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{
- CrossSigningKeyUpdate: updatePayload,
- }
- inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{}
- if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
- util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update")
- continue
+ if err := t.processSigningKeyUpdate(ctx, e); err != nil {
+ logrus.WithError(err).Errorf("Failed to process signing key update")
}
default:
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
@@ -403,6 +391,34 @@ func (t *txnReq) processEDUs(ctx context.Context) {
}
}
+func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
+ var updatePayload eduserverAPI.CrossSigningKeyUpdate
+ if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
+ util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
+ "user_id": updatePayload.UserID,
+ }).Debug("Failed to unmarshal signing key update")
+ return err
+ }
+
+ keys := gomatrixserverlib.CrossSigningKeys{}
+ if updatePayload.MasterKey != nil {
+ keys.MasterKey = *updatePayload.MasterKey
+ }
+ if updatePayload.SelfSigningKey != nil {
+ keys.SelfSigningKey = *updatePayload.SelfSigningKey
+ }
+ uploadReq := &keyapi.PerformUploadDeviceKeysRequest{
+ CrossSigningKeys: keys,
+ UserID: updatePayload.UserID,
+ }
+ uploadRes := &keyapi.PerformUploadDeviceKeysResponse{}
+ t.keyAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes)
+ if uploadRes.Error != nil {
+ return uploadRes.Error
+ }
+ return nil
+}
+
// processReceiptEvent sends receipt events to the edu server
func (t *txnReq) processReceiptEvent(ctx context.Context,
userID, roomID, receiptType string,
diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go
deleted file mode 100644
index aae69e96..00000000
--- a/keyserver/consumers/cross_signing.go
+++ /dev/null
@@ -1,123 +0,0 @@
-// Copyright 2021 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package consumers
-
-import (
- "context"
- "encoding/json"
-
- "github.com/matrix-org/dendrite/keyserver/api"
- "github.com/matrix-org/dendrite/keyserver/storage"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/setup/process"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- "github.com/sirupsen/logrus"
-)
-
-type OutputCrossSigningKeyUpdateConsumer struct {
- ctx context.Context
- keyDB storage.Database
- keyAPI api.KeyInternalAPI
- serverName string
- jetstream nats.JetStreamContext
- durable string
- topic string
-}
-
-func NewOutputCrossSigningKeyUpdateConsumer(
- process *process.ProcessContext,
- cfg *config.Dendrite,
- js nats.JetStreamContext,
- keyDB storage.Database,
- keyAPI api.KeyInternalAPI,
-) *OutputCrossSigningKeyUpdateConsumer {
- // The keyserver both produces and consumes on the TopicOutputKeyChangeEvent
- // topic. We will only produce events where the UserID matches our server name,
- // and we will only consume events where the UserID does NOT match our server
- // name (because the update came from a remote server).
- s := &OutputCrossSigningKeyUpdateConsumer{
- ctx: process.Context(),
- keyDB: keyDB,
- jetstream: js,
- durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"),
- topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
- keyAPI: keyAPI,
- serverName: string(cfg.Global.ServerName),
- }
-
- return s
-}
-
-func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
- return jetstream.JetStreamConsumer(
- s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
- nats.DeliverAll(), nats.ManualAck(),
- )
-}
-
-// onMessage is called in response to a message received on the
-// key change events topic from the key server.
-func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
- var m api.DeviceMessage
- if err := json.Unmarshal(msg.Data, &m); err != nil {
- logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return true
- }
- if m.OutputCrossSigningKeyUpdate == nil {
- // This probably shouldn't happen but stops us from panicking if we come
- // across an update that doesn't satisfy either types.
- return true
- }
- switch m.Type {
- case api.TypeCrossSigningUpdate:
- return t.onCrossSigningMessage(m)
- default:
- return true
- }
-}
-
-func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
- output := m.CrossSigningKeyUpdate
- _, host, err := gomatrixserverlib.SplitID('@', output.UserID)
- if err != nil {
- logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
- return true
- }
- if host == gomatrixserverlib.ServerName(s.serverName) {
- // Ignore any messages that contain information about our own users, as
- // they already originated from this server.
- return true
- }
- uploadReq := &api.PerformUploadDeviceKeysRequest{
- UserID: output.UserID,
- }
- if output.MasterKey != nil {
- uploadReq.MasterKey = *output.MasterKey
- }
- if output.SelfSigningKey != nil {
- uploadReq.SelfSigningKey = *output.SelfSigningKey
- }
- uploadRes := &api.PerformUploadDeviceKeysResponse{}
- s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
- if uploadRes.Error != nil {
- // If the error is due to a missing or invalid parameter then we'd might
- // as well just acknowledge the message, because otherwise otherwise we'll
- // just keep getting delivered a faulty message over and over again.
- return uploadRes.Error.IsMissingParam || uploadRes.Error.IsInvalidParam
- }
- return true
-}
diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go
index 1e1871b8..527990cf 100644
--- a/keyserver/internal/cross_signing.go
+++ b/keyserver/internal/cross_signing.go
@@ -219,25 +219,23 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
}
// Finally, generate a notification that we updated the keys.
- if _, host, err := gomatrixserverlib.SplitID('@', req.UserID); err == nil && host == a.ThisServer {
- update := eduserverAPI.CrossSigningKeyUpdate{
- UserID: req.UserID,
- }
- if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
- update.MasterKey = &mk
- }
- if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok {
- update.SelfSigningKey = &ssk
- }
- if update.MasterKey == nil && update.SelfSigningKey == nil {
- return
- }
- if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
- res.Error = &api.KeyError{
- Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
- }
- return
+ update := eduserverAPI.CrossSigningKeyUpdate{
+ UserID: req.UserID,
+ }
+ if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
+ update.MasterKey = &mk
+ }
+ if ssk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeSelfSigning]; ok {
+ update.SelfSigningKey = &ssk
+ }
+ if update.MasterKey == nil && update.SelfSigningKey == nil {
+ return
+ }
+ if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
+ res.Error = &api.KeyError{
+ Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
}
+ return
}
}
@@ -310,16 +308,14 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
// Finally, generate a notification that we updated the signatures.
for userID := range req.Signatures {
- if _, host, err := gomatrixserverlib.SplitID('@', userID); err == nil && host == a.ThisServer {
- update := eduserverAPI.CrossSigningKeyUpdate{
- UserID: userID,
- }
- if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
- res.Error = &api.KeyError{
- Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
- }
- return
+ update := eduserverAPI.CrossSigningKeyUpdate{
+ UserID: userID,
+ }
+ if err := a.Producer.ProduceSigningKeyUpdate(update); err != nil {
+ res.Error = &api.KeyError{
+ Err: fmt.Sprintf("a.Producer.ProduceSigningKeyUpdate: %s", err),
}
+ return
}
}
}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 61ccc030..bd36fd9f 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -18,7 +18,6 @@ import (
"github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
- "github.com/matrix-org/dendrite/keyserver/consumers"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers"
@@ -65,12 +64,5 @@ func NewInternalAPI(
}
}()
- keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
- base.ProcessContext, base.Cfg, js, db, ap,
- )
- if err := keyconsumer.Start(); err != nil {
- logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
- }
-
return ap
}
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index fa1064b7..37a9e2d3 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -282,6 +282,8 @@ func membershipEvents(res *types.Response) (joinUserIDs, leaveUserIDs []string)
if ev.Type == gomatrixserverlib.MRoomMember && ev.StateKey != nil {
if strings.Contains(string(ev.Content), `"join"`) {
joinUserIDs = append(joinUserIDs, *ev.StateKey)
+ } else if strings.Contains(string(ev.Content), `"invite"`) {
+ joinUserIDs = append(joinUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"leave"`) {
leaveUserIDs = append(leaveUserIDs, *ev.StateKey)
} else if strings.Contains(string(ev.Content), `"ban"`) {
diff --git a/sytest-whitelist b/sytest-whitelist
index 7d26c610..c6ce1daa 100644
--- a/sytest-whitelist
+++ b/sytest-whitelist
@@ -590,3 +590,4 @@ Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9
Forward extremities remain so even after the next events are populated as outliers
If a device list update goes missing, the server resyncs on the next one
+uploading self-signing key notifies over federation