aboutsummaryrefslogtreecommitdiff
path: root/userapi/consumers
diff options
context:
space:
mode:
authorTill <2353100+S7evinK@users.noreply.github.com>2023-02-20 14:58:03 +0100
committerGitHub <noreply@github.com>2023-02-20 14:58:03 +0100
commit4594233f89f8531fca8f696ab0ece36909130c2a (patch)
tree18d3c451041423022e15ba5fcc4a778806ff94dc /userapi/consumers
parentbd6f0c14e56af71d83d703b7c91b8cf829ca560f (diff)
Merge keyserver & userapi (#2972)
As discussed yesterday, a first draft of merging the keyserver and the userapi.
Diffstat (limited to 'userapi/consumers')
-rw-r--r--userapi/consumers/clientapi.go4
-rw-r--r--userapi/consumers/devicelistupdate.go95
-rw-r--r--userapi/consumers/roomserver.go4
-rw-r--r--userapi/consumers/roomserver_test.go4
-rw-r--r--userapi/consumers/signingkeyupdate.go111
5 files changed, 212 insertions, 6 deletions
diff --git a/userapi/consumers/clientapi.go b/userapi/consumers/clientapi.go
index 42ae72e7..51bd2753 100644
--- a/userapi/consumers/clientapi.go
+++ b/userapi/consumers/clientapi.go
@@ -37,7 +37,7 @@ type OutputReceiptEventConsumer struct {
jetstream nats.JetStreamContext
durable string
topic string
- db storage.Database
+ db storage.UserDatabase
serverName gomatrixserverlib.ServerName
syncProducer *producers.SyncAPI
pgClient pushgateway.Client
@@ -49,7 +49,7 @@ func NewOutputReceiptEventConsumer(
process *process.ProcessContext,
cfg *config.UserAPI,
js nats.JetStreamContext,
- store storage.Database,
+ store storage.UserDatabase,
syncProducer *producers.SyncAPI,
pgClient pushgateway.Client,
) *OutputReceiptEventConsumer {
diff --git a/userapi/consumers/devicelistupdate.go b/userapi/consumers/devicelistupdate.go
new file mode 100644
index 00000000..a65889fc
--- /dev/null
+++ b/userapi/consumers/devicelistupdate.go
@@ -0,0 +1,95 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/userapi/internal"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+)
+
+// DeviceListUpdateConsumer consumes device list updates that came in over federation.
+type DeviceListUpdateConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ updater *internal.DeviceListUpdater
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
+}
+
+// NewDeviceListUpdateConsumer creates a new DeviceListConsumer. Call Start() to begin consuming from key servers.
+func NewDeviceListUpdateConsumer(
+ process *process.ProcessContext,
+ cfg *config.UserAPI,
+ js nats.JetStreamContext,
+ updater *internal.DeviceListUpdater,
+) *DeviceListUpdateConsumer {
+ return &DeviceListUpdateConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Prefixed("KeyServerInputDeviceListConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
+ updater: updater,
+ isLocalServerName: cfg.Matrix.IsLocalServerName,
+ }
+}
+
+// Start consuming from key servers
+func (t *DeviceListUpdateConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, 1,
+ t.onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+}
+
+// onMessage is called in response to a message received on the
+// key change events topic from the key server.
+func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+ var m gomatrixserverlib.DeviceListUpdateEvent
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
+ logrus.WithError(err).Errorf("Failed to read from device list update input topic")
+ return true
+ }
+ origin := gomatrixserverlib.ServerName(msg.Header.Get("origin"))
+ if _, serverName, err := gomatrixserverlib.SplitID('@', m.UserID); err != nil {
+ return true
+ } else if t.isLocalServerName(serverName) {
+ return true
+ } else if serverName != origin {
+ return true
+ }
+
+ err := t.updater.Update(ctx, m)
+ if err != nil {
+ logrus.WithFields(logrus.Fields{
+ "user_id": m.UserID,
+ "device_id": m.DeviceID,
+ "stream_id": m.StreamID,
+ "prev_id": m.PrevID,
+ }).WithError(err).Errorf("Failed to update device list")
+ return false
+ }
+ return true
+}
diff --git a/userapi/consumers/roomserver.go b/userapi/consumers/roomserver.go
index 3ce5af62..47d33095 100644
--- a/userapi/consumers/roomserver.go
+++ b/userapi/consumers/roomserver.go
@@ -38,7 +38,7 @@ type OutputRoomEventConsumer struct {
rsAPI rsapi.UserRoomserverAPI
jetstream nats.JetStreamContext
durable string
- db storage.Database
+ db storage.UserDatabase
topic string
pgClient pushgateway.Client
syncProducer *producers.SyncAPI
@@ -53,7 +53,7 @@ func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.UserAPI,
js nats.JetStreamContext,
- store storage.Database,
+ store storage.UserDatabase,
pgClient pushgateway.Client,
rsAPI rsapi.UserRoomserverAPI,
syncProducer *producers.SyncAPI,
diff --git a/userapi/consumers/roomserver_test.go b/userapi/consumers/roomserver_test.go
index 39f4aab4..bc5ae652 100644
--- a/userapi/consumers/roomserver_test.go
+++ b/userapi/consumers/roomserver_test.go
@@ -18,11 +18,11 @@ import (
userAPITypes "github.com/matrix-org/dendrite/userapi/types"
)
-func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
+func mustCreateDatabase(t *testing.T, dbType test.DBType) (storage.UserDatabase, func()) {
base, baseclose := testrig.CreateBaseDendrite(t, dbType)
t.Helper()
connStr, close := test.PrepareDBConnectionString(t, dbType)
- db, err := storage.NewUserAPIDatabase(base, &config.DatabaseOptions{
+ db, err := storage.NewUserDatabase(base, &config.DatabaseOptions{
ConnectionString: config.DataSource(connStr),
}, "", 4, 0, 0, "")
if err != nil {
diff --git a/userapi/consumers/signingkeyupdate.go b/userapi/consumers/signingkeyupdate.go
new file mode 100644
index 00000000..f4ff017d
--- /dev/null
+++ b/userapi/consumers/signingkeyupdate.go
@@ -0,0 +1,111 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ "github.com/sirupsen/logrus"
+
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/userapi/api"
+)
+
+// SigningKeyUpdateConsumer consumes signing key updates that came in over federation.
+type SigningKeyUpdateConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ userAPI api.UploadDeviceKeysAPI
+ cfg *config.UserAPI
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
+}
+
+// NewSigningKeyUpdateConsumer creates a new SigningKeyUpdateConsumer. Call Start() to begin consuming from key servers.
+func NewSigningKeyUpdateConsumer(
+ process *process.ProcessContext,
+ cfg *config.UserAPI,
+ js nats.JetStreamContext,
+ userAPI api.UploadDeviceKeysAPI,
+) *SigningKeyUpdateConsumer {
+ return &SigningKeyUpdateConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Prefixed("KeyServerSigningKeyConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
+ userAPI: userAPI,
+ cfg: cfg,
+ isLocalServerName: cfg.Matrix.IsLocalServerName,
+ }
+}
+
+// Start consuming from key servers
+func (t *SigningKeyUpdateConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, 1,
+ t.onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+}
+
+// onMessage is called in response to a message received on the
+// signing key update events topic from the key server.
+func (t *SigningKeyUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+ var updatePayload api.CrossSigningKeyUpdate
+ if err := json.Unmarshal(msg.Data, &updatePayload); err != nil {
+ logrus.WithError(err).Errorf("Failed to read from signing key update input topic")
+ return true
+ }
+ origin := gomatrixserverlib.ServerName(msg.Header.Get("origin"))
+ if _, serverName, err := gomatrixserverlib.SplitID('@', updatePayload.UserID); err != nil {
+ logrus.WithError(err).Error("failed to split user id")
+ return true
+ } else if t.isLocalServerName(serverName) {
+ logrus.Warn("dropping device key update from ourself")
+ return true
+ } else if serverName != origin {
+ logrus.Warnf("dropping device key update, %s != %s", serverName, origin)
+ return true
+ }
+
+ keys := gomatrixserverlib.CrossSigningKeys{}
+ if updatePayload.MasterKey != nil {
+ keys.MasterKey = *updatePayload.MasterKey
+ }
+ if updatePayload.SelfSigningKey != nil {
+ keys.SelfSigningKey = *updatePayload.SelfSigningKey
+ }
+ uploadReq := &api.PerformUploadDeviceKeysRequest{
+ CrossSigningKeys: keys,
+ UserID: updatePayload.UserID,
+ }
+ uploadRes := &api.PerformUploadDeviceKeysResponse{}
+ if err := t.userAPI.PerformUploadDeviceKeys(ctx, uploadReq, uploadRes); err != nil {
+ logrus.WithError(err).Error("failed to upload device keys")
+ return false
+ }
+ if uploadRes.Error != nil {
+ logrus.WithError(uploadRes.Error).Error("failed to upload device keys")
+ return true
+ }
+
+ return true
+}