aboutsummaryrefslogtreecommitdiff
path: root/keyserver
diff options
context:
space:
mode:
Diffstat (limited to 'keyserver')
-rw-r--r--keyserver/api/api.go2
-rw-r--r--keyserver/consumers/cross_signing.go62
-rw-r--r--keyserver/keyserver.go4
-rw-r--r--keyserver/storage/interface.go5
-rw-r--r--keyserver/storage/postgres/key_changes_table.go5
-rw-r--r--keyserver/storage/sqlite3/key_changes_table.go5
-rw-r--r--keyserver/storage/storage_test.go6
-rw-r--r--keyserver/storage/tables/interface.go2
-rw-r--r--keyserver/types/storage.go13
9 files changed, 52 insertions, 52 deletions
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index 0eea2f0f..3933961c 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -228,7 +228,7 @@ type QueryKeyChangesRequest struct {
// The offset of the last received key event, or sarama.OffsetOldest if this is from the beginning
Offset int64
// The inclusive offset where to track key changes up to. Messages with this offset are included in the response.
- // Use sarama.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing).
+ // Use types.OffsetNewest if the offset is unknown (then check the response Offset to avoid racing).
ToOffset int64
}
diff --git a/keyserver/consumers/cross_signing.go b/keyserver/consumers/cross_signing.go
index 4b2bd4a9..a533006f 100644
--- a/keyserver/consumers/cross_signing.go
+++ b/keyserver/consumers/cross_signing.go
@@ -18,29 +18,30 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
-
- "github.com/Shopify/sarama"
)
type OutputCrossSigningKeyUpdateConsumer struct {
- eduServerConsumer *internal.ContinualConsumer
- keyDB storage.Database
- keyAPI api.KeyInternalAPI
- serverName string
+ ctx context.Context
+ keyDB storage.Database
+ keyAPI api.KeyInternalAPI
+ serverName string
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
}
func NewOutputCrossSigningKeyUpdateConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
keyDB storage.Database,
keyAPI api.KeyInternalAPI,
) *OutputCrossSigningKeyUpdateConsumer {
@@ -48,60 +49,58 @@ func NewOutputCrossSigningKeyUpdateConsumer(
// topic. We will only produce events where the UserID matches our server name,
// and we will only consume events where the UserID does NOT match our server
// name (because the update came from a remote server).
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "keyserver/keyserver",
- Topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
- Consumer: kafkaConsumer,
- PartitionStore: keyDB,
- }
s := &OutputCrossSigningKeyUpdateConsumer{
- eduServerConsumer: &consumer,
- keyDB: keyDB,
- keyAPI: keyAPI,
- serverName: string(cfg.Global.ServerName),
+ ctx: process.Context(),
+ keyDB: keyDB,
+ jetstream: js,
+ durable: cfg.Global.JetStream.Durable("KeyServerCrossSigningConsumer"),
+ topic: cfg.Global.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
+ keyAPI: keyAPI,
+ serverName: string(cfg.Global.ServerName),
}
- consumer.ProcessMessage = s.onMessage
return s
}
func (s *OutputCrossSigningKeyUpdateConsumer) Start() error {
- return s.eduServerConsumer.Start()
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
-func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
- if err := json.Unmarshal(msg.Value, &m); err != nil {
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return nil
+ return true
}
if m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
- return nil
+ return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m)
default:
- return nil
+ return true
}
}
-func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
+func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("eduserver output log: user ID parse failure")
- return nil
+ return true
}
if host == gomatrixserverlib.ServerName(s.serverName) {
// Ignore any messages that contain information about our own users, as
// they already originated from this server.
- return nil
+ return true
}
uploadReq := &api.PerformUploadDeviceKeysRequest{
UserID: output.UserID,
@@ -114,5 +113,8 @@ func (s *OutputCrossSigningKeyUpdateConsumer) onCrossSigningMessage(m api.Device
}
uploadRes := &api.PerformUploadDeviceKeysResponse{}
s.keyAPI.PerformUploadDeviceKeys(context.TODO(), uploadReq, uploadRes)
- return uploadRes.Error
+ if uploadRes.Error != nil {
+ return false
+ }
+ return true
}
diff --git a/keyserver/keyserver.go b/keyserver/keyserver.go
index 8cc50ea0..61ccc030 100644
--- a/keyserver/keyserver.go
+++ b/keyserver/keyserver.go
@@ -40,7 +40,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.KeyInternalAPI) {
func NewInternalAPI(
base *base.BaseDendrite, cfg *config.KeyServer, fedClient fedsenderapi.FederationClient,
) api.KeyInternalAPI {
- js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
db, err := storage.NewDatabase(&cfg.Database)
if err != nil {
@@ -66,7 +66,7 @@ func NewInternalAPI(
}()
keyconsumer := consumers.NewOutputCrossSigningKeyUpdateConsumer(
- base.ProcessContext, base.Cfg, consumer, db, ap,
+ base.ProcessContext, base.Cfg, js, db, ap,
)
if err := keyconsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start keyserver EDU server consumer")
diff --git a/keyserver/storage/interface.go b/keyserver/storage/interface.go
index 87feae47..0110860e 100644
--- a/keyserver/storage/interface.go
+++ b/keyserver/storage/interface.go
@@ -18,15 +18,12 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
- internal.PartitionStorer
-
// ExistingOneTimeKeys returns a map of keyIDWithAlgorithm to key JSON for the given parameters. If no keys exist with this combination
// of user/device/key/algorithm 4-uple then it is omitted from the map. Returns an error when failing to communicate with the database.
ExistingOneTimeKeys(ctx context.Context, userID, deviceID string, keyIDsWithAlgorithms []string) (map[string]json.RawMessage, error)
@@ -71,7 +68,7 @@ type Database interface {
StoreKeyChange(ctx context.Context, userID string) (int64, error)
// KeyChanges returns a list of user IDs who have modified their keys from the offset given (exclusive) to the offset given (inclusive).
- // A to offset of sarama.OffsetNewest means no upper limit.
+ // A to offset of types.OffsetNewest means no upper limit.
// Returns the offset of the latest key change.
KeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
diff --git a/keyserver/storage/postgres/key_changes_table.go b/keyserver/storage/postgres/key_changes_table.go
index 20d227c2..f93a94bd 100644
--- a/keyserver/storage/postgres/key_changes_table.go
+++ b/keyserver/storage/postgres/key_changes_table.go
@@ -17,9 +17,7 @@ package postgres
import (
"context"
"database/sql"
- "math"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@@ -78,9 +76,6 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin
func (s *keyChangesStatements) SelectKeyChanges(
ctx context.Context, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
- if toOffset == sarama.OffsetNewest {
- toOffset = math.MaxInt64
- }
latestOffset = fromOffset
rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset)
if err != nil {
diff --git a/keyserver/storage/sqlite3/key_changes_table.go b/keyserver/storage/sqlite3/key_changes_table.go
index d43c15ca..e035e8c9 100644
--- a/keyserver/storage/sqlite3/key_changes_table.go
+++ b/keyserver/storage/sqlite3/key_changes_table.go
@@ -17,9 +17,7 @@ package sqlite3
import (
"context"
"database/sql"
- "math"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/storage/tables"
)
@@ -76,9 +74,6 @@ func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID strin
func (s *keyChangesStatements) SelectKeyChanges(
ctx context.Context, fromOffset, toOffset int64,
) (userIDs []string, latestOffset int64, err error) {
- if toOffset == sarama.OffsetNewest {
- toOffset = math.MaxInt64
- }
latestOffset = fromOffset
rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset)
if err != nil {
diff --git a/keyserver/storage/storage_test.go b/keyserver/storage/storage_test.go
index 2f8cf809..c4c99d8c 100644
--- a/keyserver/storage/storage_test.go
+++ b/keyserver/storage/storage_test.go
@@ -9,8 +9,8 @@ import (
"reflect"
"testing"
- "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/dendrite/setup/config"
)
@@ -50,7 +50,7 @@ func TestKeyChanges(t *testing.T) {
MustNotError(t, err)
deviceChangeIDC, err := db.StoreKeyChange(ctx, "@charlie:localhost")
MustNotError(t, err)
- userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, sarama.OffsetNewest)
+ userIDs, latest, err := db.KeyChanges(ctx, deviceChangeIDB, types.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
@@ -74,7 +74,7 @@ func TestKeyChangesNoDupes(t *testing.T) {
}
deviceChangeID, err := db.StoreKeyChange(ctx, "@alice:localhost")
MustNotError(t, err)
- userIDs, latest, err := db.KeyChanges(ctx, 0, sarama.OffsetNewest)
+ userIDs, latest, err := db.KeyChanges(ctx, 0, types.OffsetNewest)
if err != nil {
t.Fatalf("Failed to KeyChanges: %s", err)
}
diff --git a/keyserver/storage/tables/interface.go b/keyserver/storage/tables/interface.go
index 0d94c94c..e44757e1 100644
--- a/keyserver/storage/tables/interface.go
+++ b/keyserver/storage/tables/interface.go
@@ -46,7 +46,7 @@ type DeviceKeys interface {
type KeyChanges interface {
InsertKeyChange(ctx context.Context, userID string) (int64, error)
// SelectKeyChanges returns the set (de-duplicated) of users who have changed their keys between the two offsets.
- // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of sarama.OffsetNewest means no upper offset.
+ // Results are exclusive of fromOffset and inclusive of toOffset. A toOffset of types.OffsetNewest means no upper offset.
SelectKeyChanges(ctx context.Context, fromOffset, toOffset int64) (userIDs []string, latestOffset int64, err error)
Prepare() error
diff --git a/keyserver/types/storage.go b/keyserver/types/storage.go
index 3480ec65..7fb90454 100644
--- a/keyserver/types/storage.go
+++ b/keyserver/types/storage.go
@@ -14,7 +14,18 @@
package types
-import "github.com/matrix-org/gomatrixserverlib"
+import (
+ "math"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const (
+ // OffsetNewest tells e.g. the database to get the most current data
+ OffsetNewest int64 = math.MaxInt64
+ // OffsetOldest tells e.g. the database to get the oldest data
+ OffsetOldest int64 = 0
+)
// KeyTypePurposeToInt maps a purpose to an integer, which is used in the
// database to reduce the amount of space taken up by this column.