aboutsummaryrefslogtreecommitdiff
path: root/federationapi/consumers
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-02-04 14:08:13 +0100
committerGitHub <noreply@github.com>2022-02-04 13:08:13 +0000
commit9de7efa0b095f40457f0e348632c77326dcb4a42 (patch)
treec543823b7d4ce25371b975a88bc5270c3a9d7352 /federationapi/consumers
parent532f445c4e31396fc3aa4f52e0e078cd499bc39a (diff)
Remove sarama/saramajetstream dependencies (#2138)
* Remove dependency on saramajetstream & sarama Signed-off-by: Till Faelligen <tfaelligen@gmail.com> * Remove internal.ContinualConsumer from federationapi * Remove internal.ContinualConsumer from syncapi * Remove internal.ContinualConsumer from keyserver * Move to new Prepare function * Remove saramajetstream & sarama dependency * Delete unneeded file * Remove duplicate import * Log error instead of silently irgnoring it * Move `OffsetNewest` and `OffsetOldest` into keyserver types, change them to be more sane values * Fix comments Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Diffstat (limited to 'federationapi/consumers')
-rw-r--r--federationapi/consumers/keychange.go78
-rw-r--r--federationapi/consumers/roomserver.go5
2 files changed, 42 insertions, 41 deletions
diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go
index 6a737d0a..1ec9f4c1 100644
--- a/federationapi/consumers/keychange.go
+++ b/federationapi/consumers/keychange.go
@@ -17,80 +17,73 @@ package consumers
import (
"context"
"encoding/json"
- "fmt"
- "github.com/Shopify/sarama"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
- "github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// KeyChangeConsumer consumes events that originate in key server.
type KeyChangeConsumer struct {
ctx context.Context
- consumer *internal.ContinualConsumer
+ jetstream nats.JetStreamContext
+ durable string
db storage.Database
queues *queue.OutgoingQueues
serverName gomatrixserverlib.ServerName
rsAPI roomserverAPI.RoomserverInternalAPI
+ topic string
}
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
func NewKeyChangeConsumer(
process *process.ProcessContext,
cfg *config.KeyServer,
- kafkaConsumer sarama.Consumer,
+ js nats.JetStreamContext,
queues *queue.OutgoingQueues,
store storage.Database,
rsAPI roomserverAPI.RoomserverInternalAPI,
) *KeyChangeConsumer {
- c := &KeyChangeConsumer{
- ctx: process.Context(),
- consumer: &internal.ContinualConsumer{
- Process: process,
- ComponentName: "federationapi/keychange",
- Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
- Consumer: kafkaConsumer,
- PartitionStore: store,
- },
+ return &KeyChangeConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.TopicFor("FederationAPIKeyChangeConsumer"),
+ topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent),
queues: queues,
db: store,
serverName: cfg.Matrix.ServerName,
rsAPI: rsAPI,
}
- c.consumer.ProcessMessage = c.onMessage
-
- return c
}
// Start consuming from key servers
func (t *KeyChangeConsumer) Start() error {
- if err := t.consumer.Start(); err != nil {
- return fmt.Errorf("t.consumer.Start: %w", err)
- }
- return nil
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
}
// onMessage is called in response to a message received on the
// key change events topic from the key server.
-func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
+func (t *KeyChangeConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
var m api.DeviceMessage
- if err := json.Unmarshal(msg.Value, &m); err != nil {
+ if err := json.Unmarshal(msg.Data, &m); err != nil {
logrus.WithError(err).Errorf("failed to read device message from key change topic")
- return nil
+ return true
}
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
- return nil
+ return true
}
switch m.Type {
case api.TypeCrossSigningUpdate:
@@ -102,9 +95,9 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
}
-func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
+func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
if m.DeviceKeys == nil {
- return nil
+ return true
}
logger := logrus.WithField("user_id", m.UserID)
@@ -112,10 +105,10 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
_, originServerName, err := gomatrixserverlib.SplitID('@', m.UserID)
if err != nil {
logger.WithError(err).Error("Failed to extract domain from key change event")
- return nil
+ return true
}
if originServerName != t.serverName {
- return nil
+ return true
}
var queryRes roomserverAPI.QueryRoomsForUserResponse
@@ -125,13 +118,13 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
}, &queryRes)
if err != nil {
logger.WithError(err).Error("failed to calculate joined rooms for user")
- return nil
+ return true
}
// send this key change to all servers who share rooms with this user.
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
if err != nil {
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
- return nil
+ return true
}
// Pack the EDU and marshal it
@@ -149,24 +142,26 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
Keys: m.KeyJSON,
}
if edu.Content, err = json.Marshal(event); err != nil {
- return err
+ logger.WithError(err).Error("failed to marshal EDU JSON")
+ return true
}
- logrus.Infof("Sending device list update message to %q", destinations)
- return t.queues.SendEDU(edu, t.serverName, destinations)
+ logger.Infof("Sending device list update message to %q", destinations)
+ err = t.queues.SendEDU(edu, t.serverName, destinations)
+ return err == nil
}
-func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
+func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
output := m.CrossSigningKeyUpdate
_, host, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
- return nil
+ return true
}
if host != gomatrixserverlib.ServerName(t.serverName) {
// Ignore any messages that didn't originate locally, otherwise we'll
// end up parroting information we received from other servers.
- return nil
+ return true
}
logger := logrus.WithField("user_id", output.UserID)
@@ -177,13 +172,13 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
}, &queryRes)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined rooms for user")
- return nil
+ return true
}
// send this key change to all servers who share rooms with this user.
destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
- return nil
+ return true
}
// Pack the EDU and marshal it
@@ -193,11 +188,12 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
}
if edu.Content, err = json.Marshal(output); err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to marshal output, dropping")
- return nil
+ return true
}
logger.Infof("Sending cross-signing update message to %q", destinations)
- return t.queues.SendEDU(edu, t.serverName, destinations)
+ err = t.queues.SendEDU(edu, t.serverName, destinations)
+ return err == nil
}
func prevID(streamID int) []int {
diff --git a/federationapi/consumers/roomserver.go b/federationapi/consumers/roomserver.go
index ac29f930..e9862000 100644
--- a/federationapi/consumers/roomserver.go
+++ b/federationapi/consumers/roomserver.go
@@ -114,6 +114,11 @@ func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msg *nats.Msg)
}
}
+ case api.OutputTypeNewInviteEvent:
+ log.WithField("type", output.Type).Debug(
+ "received new invite, send device keys",
+ )
+
case api.OutputTypeNewInboundPeek:
if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
log.WithFields(log.Fields{