aboutsummaryrefslogtreecommitdiff
path: root/keyserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-06-15 14:27:07 +0100
committerGitHub <noreply@github.com>2022-06-15 14:27:07 +0100
commit7120eb6bc943af6f725b0c61cfd110330f04064a (patch)
treea7e278689e44b36eeefe01cd7b8ecb247119c940 /keyserver
parent1b90cc95367947fa00616b4426d0c894b33c9862 (diff)
Add `InputDeviceListUpdate` to the keyserver, remove old input API (#2536)
* Add `InputDeviceListUpdate` to the keyserver, remove old input API * Fix copyright * Log more information when a device list update fails
Diffstat (limited to 'keyserver')
-rw-r--r--keyserver/api/api.go10
-rw-r--r--keyserver/consumers/devicelistupdate.go82
-rw-r--r--keyserver/internal/internal.go11
-rw-r--r--keyserver/inthttp/client.go14
-rw-r--r--keyserver/inthttp/server.go11
-rw-r--r--keyserver/keyserver.go10
6 files changed, 91 insertions, 47 deletions
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index 140f0356..c0a1eedb 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -62,8 +62,6 @@ type FederationKeyAPI interface {
QueryKeys(ctx context.Context, req *QueryKeysRequest, res *QueryKeysResponse)
QuerySignatures(ctx context.Context, req *QuerySignaturesRequest, res *QuerySignaturesResponse)
QueryDeviceMessages(ctx context.Context, req *QueryDeviceMessagesRequest, res *QueryDeviceMessagesResponse)
- // InputDeviceListUpdate from a federated server EDU
- InputDeviceListUpdate(ctx context.Context, req *InputDeviceListUpdateRequest, res *InputDeviceListUpdateResponse)
PerformUploadDeviceKeys(ctx context.Context, req *PerformUploadDeviceKeysRequest, res *PerformUploadDeviceKeysResponse)
PerformClaimKeys(ctx context.Context, req *PerformClaimKeysRequest, res *PerformClaimKeysResponse)
}
@@ -337,11 +335,3 @@ type QuerySignaturesResponse struct {
// The request error, if any
Error *KeyError
}
-
-type InputDeviceListUpdateRequest struct {
- Event gomatrixserverlib.DeviceListUpdateEvent
-}
-
-type InputDeviceListUpdateResponse struct {
- Error *KeyError
-}
diff --git a/keyserver/consumers/devicelistupdate.go b/keyserver/consumers/devicelistupdate.go
new file mode 100644
index 00000000..f4f24628
--- /dev/null
+++ b/keyserver/consumers/devicelistupdate.go
@@ -0,0 +1,82 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/keyserver/internal"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
+)
+
+// DeviceListUpdateConsumer consumes device list updates that came in over federation.
+type DeviceListUpdateConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ updater *internal.DeviceListUpdater
+}
+
+// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
+func NewDeviceListUpdateConsumer(
+ process *process.ProcessContext,
+ cfg *config.KeyServer,
+ js nats.JetStreamContext,
+ updater *internal.DeviceListUpdater,
+) *DeviceListUpdateConsumer {
+ return &DeviceListUpdateConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
+ updater: updater,
+ }
+}
+
+// Start consuming from key servers
+func (t *DeviceListUpdateConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
+}
+
+// onMessage is called in response to a message received on the
+// key change events topic from the key server.
+func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ var m gomatrixserverlib.DeviceListUpdateEvent
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
+ logrus.WithError(err).Errorf("Failed to read from device list update input topic")
+ return true
+ }
+ err := t.updater.Update(ctx, m)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ "user_id": m.UserID,
+ "device_id": m.DeviceID,
+ "stream_id": m.StreamID,
+ "prev_id": m.PrevID,
+ }).WithError(err).Errorf("Failed to update device list")
+ return false
+ }
+ return true
+}
diff --git a/keyserver/internal/internal.go b/keyserver/internal/internal.go
index f8d0d69c..c146b2aa 100644
--- a/keyserver/internal/internal.go
+++ b/keyserver/internal/internal.go
@@ -47,17 +47,6 @@ func (a *KeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
a.UserAPI = i
}
-func (a *KeyInternalAPI) InputDeviceListUpdate(
- ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
-) {
- err := a.Updater.Update(ctx, req.Event)
- if err != nil {
- res.Error = &api.KeyError{
- Err: fmt.Sprintf("failed to update device list: %s", err),
- }
- }
-}
-
func (a *KeyInternalAPI) QueryKeyChanges(ctx context.Context, req *api.QueryKeyChangesRequest, res *api.QueryKeyChangesResponse) {
userIDs, latest, err := a.DB.KeyChanges(ctx, req.Offset, req.ToOffset)
if err != nil {
diff --git a/keyserver/inthttp/client.go b/keyserver/inthttp/client.go
index abce8158..dac61d1e 100644
--- a/keyserver/inthttp/client.go
+++ b/keyserver/inthttp/client.go
@@ -63,20 +63,6 @@ type httpKeyInternalAPI struct {
func (h *httpKeyInternalAPI) SetUserAPI(i userapi.KeyserverUserAPI) {
// no-op: doesn't need it
}
-func (h *httpKeyInternalAPI) InputDeviceListUpdate(
- ctx context.Context, req *api.InputDeviceListUpdateRequest, res *api.InputDeviceListUpdateResponse,
-) {
- span, ctx := opentracing.StartSpanFromContext(ctx, "InputDeviceListUpdate")
- defer span.Finish()
-
- apiURL := h.apiURL + InputDeviceListUpdatePath
- err := httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
- if err != nil {
- res.Error = &api.KeyError{
- Err: err.Error(),
- }
- }
-}
func (h *httpKeyInternalAPI) PerformClaimKeys(
ctx context.Context,
diff --git a/keyserver/inthttp/server.go b/keyserver/inthttp/server.go
index 8d557a76..5bf5976a 100644
--- a/keyserver/inthttp/server.go
+++ b/keyserver/inthttp/server.go
@@ -25,17 +25,6 @@ import (
)
func AddRoutes(internalAPIMux *mux.Router, s api.KeyInternalAPI) {
- internalAPIMux.Handle(InputDeviceListUpdatePath,
- httputil.MakeInternalAPI("inputDeviceListUpdate", func(req *http.Request) util.JSONResponse {
- request := api.InputDeviceListUpdateRequest{}
- response := api.InputDeviceListUpdateResponse{}
- if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
- return util.MessageResponse(http.StatusBadRequest, err.Error())
- }
- s.InputDeviceListUpdate(req.Context(), &request, &response)
- return util.JSONResponse{Code: http.StatusOK, JSON: &response}
- }),
- )
internalAPIMux.Handle(PerformClaimKeysPath,
httputil.MakeInternalAPI("performClaimKeys", func(req *http.Request) util.JSONResponse {
request := api.PerformClaimKeysRequest{}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 3ffd3ba1..cd506f98 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -18,6 +18,7 @@ import (
"github.com/gorilla/mux"
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/keyserver/consumers"
"github.com/matrix-org/dendrite/keyserver/internal"
"github.com/matrix-org/dendrite/keyserver/inthttp"
"github.com/matrix-org/dendrite/keyserver/producers"
@@ -59,10 +60,17 @@ func NewInternalAPI(
updater := internal.NewDeviceListUpdater(db, ap, keyChangeProducer, fedClient, 8) // 8 workers TODO: configurable
ap.Updater = updater
go func() {
- if err := updater.Start(); err != nil {
+ if err = updater.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start device list updater")
}
}()
+ dlConsumer := consumers.NewDeviceListUpdateConsumer(
+ base.ProcessContext, cfg, js, updater,
+ )
+ if err = dlConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start device list consumer")
+ }
+
return ap
}