aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-02-04 14:08:13 +0100
committerGitHub <noreply@github.com>2022-02-04 13:08:13 +0000
commit9de7efa0b095f40457f0e348632c77326dcb4a42 (patch)
treec543823b7d4ce25371b975a88bc5270c3a9d7352 /syncapi
parent532f445c4e31396fc3aa4f52e0e078cd499bc39a (diff)
Remove sarama/saramajetstream dependencies (#2138)
* Remove dependency on saramajetstream & sarama Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Remove internal.ContinualConsumer from federationapi * Remove internal.ContinualConsumer from syncapi * Remove internal.ContinualConsumer from keyserver * Move to new Prepare function * Remove saramajetstream & sarama dependency * Delete unneeded file * Remove duplicate import * Log error instead of silently irgnoring it * Move `OffsetNewest` and `OffsetOldest` into keyserver types, change them to be more sane values * Fix comments Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/keychange.go83
-rw-r--r--syncapi/internal/keychange.go6
-rw-r--r--syncapi/storage/interface.go3
-rw-r--r--syncapi/syncapi.go6
4 files changed, 46 insertions, 52 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 97685cc0..e806f76e 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -18,84 +18,81 @@ import (
"context"
"encoding/json"
- "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct {
- ctx context.Context
- keyChangeConsumer *internal.ContinualConsumer
- db storage.Database
- notifier *notifier.Notifier
- stream types.StreamProvider
- serverName gomatrixserverlib.ServerName // our server name
- rsAPI roomserverAPI.RoomserverInternalAPI
- keyAPI api.KeyInternalAPI
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ notifier *notifier.Notifier
+ stream types.StreamProvider
+ serverName gomatrixserverlib.ServerName // our server name
+ rsAPI roomserverAPI.RoomserverInternalAPI
+ keyAPI api.KeyInternalAPI
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
// Call Start() to begin consuming from the key server.
func NewOutputKeyChangeEventConsumer(
process *process.ProcessContext,
- serverName gomatrixserverlib.ServerName,
+ cfg *config.SyncAPI,
topic string,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputKeyChangeEventConsumer {
-
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "syncapi/keychange",
- Topic: topic,
- Consumer: kafkaConsumer,
- PartitionStore: store,
- }
-
s := &OutputKeyChangeEventConsumer{
- ctx: process.Context(),
- keyChangeConsumer: &consumer,
- db: store,
- serverName: serverName,
- keyAPI: keyAPI,
- rsAPI: rsAPI,
- notifier: notifier,
- stream: stream,
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIKeyChangeConsumer"),
+ topic: topic,
+ db: store,
+ serverName: cfg.Matrix.ServerName,
+ keyAPI: keyAPI,
+ rsAPI: rsAPI,
+ notifier: notifier,
+ stream: stream,
}
- consumer.ProcessMessage = s.onMessage
-
return s
}
// Start consuming from the key server
func (s *OutputKeyChangeEventConsumer) Start() error {
- return s.keyChangeConsumer.Start()
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
}
-func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+func (s *OutputKeyChangeEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
- if err := json.Unmarshal(msg.Value, &m); err != nil {
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return nil
+ return true
}
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
- return nil
+ return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
@@ -107,9 +104,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
}
-func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) error {
+func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) bool {
if m.DeviceKeys == nil {
- return nil
+ return true
}
output := m.DeviceKeys
// work out who we need to notify about the new key
@@ -120,7 +117,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
if err != nil {
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
- return err
+ return true
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
@@ -131,10 +128,10 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
- return nil
+ return true
}
-func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) error {
+func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) bool {
output := m.CrossSigningKeyUpdate
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse
@@ -144,7 +141,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
if err != nil {
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
- return err
+ return true
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
@@ -155,5 +152,5 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
- return nil
+ return true
}
diff --git a/syncapi/internal/keychange.go b/syncapi/internal/keychange.go
index 41efd4a0..fa1064b7 100644
--- a/syncapi/internal/keychange.go
+++ b/syncapi/internal/keychange.go
@@ -18,8 +18,8 @@ import (
"context"
"strings"
- "github.com/Shopify/sarama"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ keytypes "github.com/matrix-org/dendrite/keyserver/types"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
@@ -64,8 +64,8 @@ func DeviceListCatchup(
}
// now also track users who we already share rooms with but who have updated their devices between the two tokens
- offset := sarama.OffsetOldest
- toOffset := sarama.OffsetNewest
+ offset := keytypes.OffsetOldest
+ toOffset := keytypes.OffsetNewest
if to > 0 && to > from {
toOffset = int64(to)
}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index 9cff4cad..b464ad9c 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -19,7 +19,6 @@ import (
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
@@ -27,8 +26,6 @@ import (
)
type Database interface {
- internal.PartitionStorer
-
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 39bc233a..72462459 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -48,7 +48,7 @@ func AddPublicRoutes(
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
- js, consumer, _ := jetstream.Prepare(&cfg.Matrix.JetStream)
+ js := jetstream.Prepare(&cfg.Matrix.JetStream)
syncDB, err := storage.NewSyncServerDatasource(&cfg.Database)
if err != nil {
@@ -65,8 +65,8 @@ func AddPublicRoutes(
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
- process, cfg.Matrix.ServerName, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
- consumer, keyAPI, rsAPI, syncDB, notifier,
+ process, cfg, cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
+ js, keyAPI, rsAPI, syncDB, notifier,
streams.DeviceListStreamProvider,
)
if err = keyChangeConsumer.Start(); err != nil {