aboutsummaryrefslogtreecommitdiff
path: root/federationapi/consumers/keychange.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/consumers/keychange.go')
-rw-r--r--federationapi/consumers/keychange.go16
1 files changed, 11 insertions, 5 deletions
diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go
index a8ae0894..8231fcf4 100644
--- a/federationapi/consumers/keychange.go
+++ b/federationapi/consumers/keychange.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@@ -34,6 +35,7 @@ import (
// KeyChangeConsumer consumes events that originate in key server.
type KeyChangeConsumer struct {
+ ctx context.Context
consumer *internal.ContinualConsumer
db storage.Database
queues *queue.OutgoingQueues
@@ -51,10 +53,11 @@ func NewKeyChangeConsumer(
rsAPI roomserverAPI.RoomserverInternalAPI,
) *KeyChangeConsumer {
c := &KeyChangeConsumer{
+ ctx: process.Context(),
consumer: &internal.ContinualConsumer{
Process: process,
ComponentName: "federationapi/keychange",
- Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
+ Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputKeyChangeEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
},
@@ -100,6 +103,9 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
+ if m.DeviceKeys == nil {
+ return nil
+ }
logger := logrus.WithField("user_id", m.UserID)
// only send key change events which originated from us
@@ -113,7 +119,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
}
var queryRes roomserverAPI.QueryRoomsForUserResponse
- err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{
+ err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
UserID: m.UserID,
WantMembership: "join",
}, &queryRes)
@@ -122,7 +128,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
return nil
}
// send this key change to all servers who share rooms with this user.
- destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs)
+ destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs)
if err != nil {
logger.WithError(err).Error("failed to calculate joined hosts for rooms user is in")
return nil
@@ -165,7 +171,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
logger := logrus.WithField("user_id", output.UserID)
var queryRes roomserverAPI.QueryRoomsForUserResponse
- err = t.rsAPI.QueryRoomsForUser(context.Background(), &roomserverAPI.QueryRoomsForUserRequest{
+ err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
UserID: output.UserID,
WantMembership: "join",
}, &queryRes)
@@ -174,7 +180,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) error {
return nil
}
// send this key change to all servers who share rooms with this user.
- destinations, err := t.db.GetJoinedHostsForRooms(context.Background(), queryRes.RoomIDs)
+ destinations, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs)
if err != nil {
logger.WithError(err).Error("fedsender key change consumer: failed to calculate joined hosts for rooms user is in")
return nil