aboutsummaryrefslogtreecommitdiff
path: root/syncapi/consumers/keychange.go
diff options
context:
space:
mode:
Diffstat (limited to 'syncapi/consumers/keychange.go')
-rw-r--r--syncapi/consumers/keychange.go83
1 files changed, 40 insertions, 43 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 97685cc0..e806f76e 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -18,84 +18,81 @@ import (
"context"
"encoding/json"
- "github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct {
- ctx context.Context
- keyChangeConsumer *internal.ContinualConsumer
- db storage.Database
- notifier *notifier.Notifier
- stream types.StreamProvider
- serverName gomatrixserverlib.ServerName // our server name
- rsAPI roomserverAPI.RoomserverInternalAPI
- keyAPI api.KeyInternalAPI
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ notifier *notifier.Notifier
+ stream types.StreamProvider
+ serverName gomatrixserverlib.ServerName // our server name
+ rsAPI roomserverAPI.RoomserverInternalAPI
+ keyAPI api.KeyInternalAPI
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
// Call Start() to begin consuming from the key server.
func NewOutputKeyChangeEventConsumer(
process *process.ProcessContext,
- serverName gomatrixserverlib.ServerName,
+ cfg *config.SyncAPI,
topic string,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputKeyChangeEventConsumer {
-
- consumer := internal.ContinualConsumer{
- Process: process,
- ComponentName: "syncapi/keychange",
- Topic: topic,
- Consumer: kafkaConsumer,
- PartitionStore: store,
- }
-
s := &OutputKeyChangeEventConsumer{
- ctx: process.Context(),
- keyChangeConsumer: &consumer,
- db: store,
- serverName: serverName,
- keyAPI: keyAPI,
- rsAPI: rsAPI,
- notifier: notifier,
- stream: stream,
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIKeyChangeConsumer"),
+ topic: topic,
+ db: store,
+ serverName: cfg.Matrix.ServerName,
+ keyAPI: keyAPI,
+ rsAPI: rsAPI,
+ notifier: notifier,
+ stream: stream,
}
- consumer.ProcessMessage = s.onMessage
-
return s
}
// Start consuming from the key server
func (s *OutputKeyChangeEventConsumer) Start() error {
- return s.keyChangeConsumer.Start()
+ return jetstream.JetStreamConsumer(
+ s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
}
-func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+func (s *OutputKeyChangeEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
- if err := json.Unmarshal(msg.Value, &m); err != nil {
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return nil
+ return true
}
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
- return nil
+ return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
@@ -107,9 +104,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
}
-func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) error {
+func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, deviceChangeID int64) bool {
if m.DeviceKeys == nil {
- return nil
+ return true
}
output := m.DeviceKeys
// work out who we need to notify about the new key
@@ -120,7 +117,7 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
if err != nil {
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
- return err
+ return true
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
@@ -131,10 +128,10 @@ func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, d
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
- return nil
+ return true
}
-func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) error {
+func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage, deviceChangeID int64) bool {
output := m.CrossSigningKeyUpdate
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse
@@ -144,7 +141,7 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
if err != nil {
logrus.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
sentry.CaptureException(err)
- return err
+ return true
}
// make sure we get our own key updates too!
queryRes.UserIDsToCount[output.UserID] = 1
@@ -155,5 +152,5 @@ func (s *OutputKeyChangeEventConsumer) onCrossSigningMessage(m api.DeviceMessage
s.notifier.OnNewKeyChange(types.StreamingToken{DeviceListPosition: posUpdate}, userID, output.UserID)
}
- return nil
+ return true
}