aboutsummaryrefslogtreecommitdiff
path: root/federationapi/consumers
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-10-26 14:52:33 +0100
committerNeil Alexander <neilalexander@users.noreply.github.com>2022-10-26 14:52:33 +0100
commit5298dd1133948606172944b7e5ec3805ccc72644 (patch)
tree2506042156aaafa3727ff6261e20c19768c088ea /federationapi/consumers
parentf6dea712d2e9c71f6ebe61f90e45a142852432e8 (diff)
Update federation API consumers
Diffstat (limited to 'federationapi/consumers')
-rw-r--r--federationapi/consumers/keychange.go44
-rw-r--r--federationapi/consumers/presence.go10
-rw-r--r--federationapi/consumers/receipts.go34
-rw-r--r--federationapi/consumers/sendtodevice.go36
-rw-r--r--federationapi/consumers/typing.go32
5 files changed, 78 insertions, 78 deletions
diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go
index 67dfdc1d..7d1ae0f8 100644
--- a/federationapi/consumers/keychange.go
+++ b/federationapi/consumers/keychange.go
@@ -35,14 +35,14 @@ import (
// KeyChangeConsumer consumes events that originate in key server.
type KeyChangeConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- db storage.Database
- queues *queue.OutgoingQueues
- serverName gomatrixserverlib.ServerName
- rsAPI roomserverAPI.FederationRoomserverAPI
- topic string
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
+ rsAPI roomserverAPI.FederationRoomserverAPI
+ topic string
}
// NewKeyChangeConsumer creates a new KeyChangeConsumer. Call Start() to begin consuming from key servers.
@@ -55,14 +55,14 @@ func NewKeyChangeConsumer(
rsAPI roomserverAPI.FederationRoomserverAPI,
) *KeyChangeConsumer {
return &KeyChangeConsumer{
- ctx: process.Context(),
- jetstream: js,
- durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"),
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
- queues: queues,
- db: store,
- serverName: cfg.Matrix.ServerName,
- rsAPI: rsAPI,
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Prefixed("FederationAPIKeyChangeConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputKeyChangeEvent),
+ queues: queues,
+ db: store,
+ isLocalServerName: cfg.Matrix.IsLocalServerName,
+ rsAPI: rsAPI,
}
}
@@ -112,7 +112,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
logger.WithError(err).Error("Failed to extract domain from key change event")
return true
}
- if originServerName != t.serverName {
+ if !t.isLocalServerName(originServerName) {
return true
}
@@ -141,7 +141,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDeviceListUpdate,
- Origin: string(t.serverName),
+ Origin: string(originServerName),
}
event := gomatrixserverlib.DeviceListUpdateEvent{
UserID: m.UserID,
@@ -159,7 +159,7 @@ func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) bool {
}
logger.Debugf("Sending device list update message to %q", destinations)
- err = t.queues.SendEDU(edu, t.serverName, destinations)
+ err = t.queues.SendEDU(edu, originServerName, destinations)
return err == nil
}
@@ -171,7 +171,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
logrus.WithError(err).Errorf("fedsender key change consumer: user ID parse failure")
return true
}
- if host != gomatrixserverlib.ServerName(t.serverName) {
+ if !t.isLocalServerName(host) {
// Ignore any messages that didn't originate locally, otherwise we'll
// end up parroting information we received from other servers.
return true
@@ -203,7 +203,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: types.MSigningKeyUpdate,
- Origin: string(t.serverName),
+ Origin: string(host),
}
if edu.Content, err = json.Marshal(output); err != nil {
sentry.CaptureException(err)
@@ -212,7 +212,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
}
logger.Debugf("Sending cross-signing update message to %q", destinations)
- err = t.queues.SendEDU(edu, t.serverName, destinations)
+ err = t.queues.SendEDU(edu, host, destinations)
return err == nil
}
diff --git a/federationapi/consumers/presence.go b/federationapi/consumers/presence.go
index e76103cd..3445d34a 100644
--- a/federationapi/consumers/presence.go
+++ b/federationapi/consumers/presence.go
@@ -38,7 +38,7 @@ type OutputPresenceConsumer struct {
durable string
db storage.Database
queues *queue.OutgoingQueues
- ServerName gomatrixserverlib.ServerName
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
topic string
outboundPresenceEnabled bool
}
@@ -56,7 +56,7 @@ func NewOutputPresenceConsumer(
jetstream: js,
queues: queues,
db: store,
- ServerName: cfg.Matrix.ServerName,
+ isLocalServerName: cfg.Matrix.IsLocalServerName,
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
@@ -85,7 +85,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
log.WithError(err).WithField("user_id", userID).Error("failed to extract domain from receipt sender")
return true
}
- if serverName != t.ServerName {
+ if !t.isLocalServerName(serverName) {
return true
}
@@ -127,7 +127,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MPresence,
- Origin: string(t.ServerName),
+ Origin: string(serverName),
}
if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
@@ -135,7 +135,7 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
}
log.Tracef("sending presence EDU to %d servers", len(joined))
- if err = t.queues.SendEDU(edu, t.ServerName, joined); err != nil {
+ if err = t.queues.SendEDU(edu, serverName, joined); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
diff --git a/federationapi/consumers/receipts.go b/federationapi/consumers/receipts.go
index 75827cb6..200c06e6 100644
--- a/federationapi/consumers/receipts.go
+++ b/federationapi/consumers/receipts.go
@@ -34,13 +34,13 @@ import (
// OutputReceiptConsumer consumes events that originate in the clientapi.
type OutputReceiptConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- db storage.Database
- queues *queue.OutgoingQueues
- ServerName gomatrixserverlib.ServerName
- topic string
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
+ topic string
}
// NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events.
@@ -52,13 +52,13 @@ func NewOutputReceiptConsumer(
store storage.Database,
) *OutputReceiptConsumer {
return &OutputReceiptConsumer{
- ctx: process.Context(),
- jetstream: js,
- queues: queues,
- db: store,
- ServerName: cfg.Matrix.ServerName,
- durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"),
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ ctx: process.Context(),
+ jetstream: js,
+ queues: queues,
+ db: store,
+ isLocalServerName: cfg.Matrix.IsLocalServerName,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
}
}
@@ -95,7 +95,7 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
return true
}
- if receiptServerName != t.ServerName {
+ if !t.isLocalServerName(receiptServerName) {
return true
}
@@ -134,14 +134,14 @@ func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MReceipt,
- Origin: string(t.ServerName),
+ Origin: string(receiptServerName),
}
if edu.Content, err = json.Marshal(content); err != nil {
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
- if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
+ if err := t.queues.SendEDU(edu, receiptServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
diff --git a/federationapi/consumers/sendtodevice.go b/federationapi/consumers/sendtodevice.go
index 9aec22a3..9620d161 100644
--- a/federationapi/consumers/sendtodevice.go
+++ b/federationapi/consumers/sendtodevice.go
@@ -34,13 +34,13 @@ import (
// OutputSendToDeviceConsumer consumes events that originate in the clientapi.
type OutputSendToDeviceConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- db storage.Database
- queues *queue.OutgoingQueues
- ServerName gomatrixserverlib.ServerName
- topic string
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
+ topic string
}
// NewOutputSendToDeviceConsumer creates a new OutputSendToDeviceConsumer. Call Start() to begin consuming send-to-device events.
@@ -52,13 +52,13 @@ func NewOutputSendToDeviceConsumer(
store storage.Database,
) *OutputSendToDeviceConsumer {
return &OutputSendToDeviceConsumer{
- ctx: process.Context(),
- jetstream: js,
- queues: queues,
- db: store,
- ServerName: cfg.Matrix.ServerName,
- durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"),
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ ctx: process.Context(),
+ jetstream: js,
+ queues: queues,
+ db: store,
+ isLocalServerName: cfg.Matrix.IsLocalServerName,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
}
}
@@ -82,7 +82,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender")
return true
}
- if originServerName != t.ServerName {
+ if !t.isLocalServerName(originServerName) {
return true
}
// Extract the send-to-device event from msg.
@@ -101,14 +101,14 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
}
// The SyncAPI is already handling sendToDevice for the local server
- if destServerName == t.ServerName {
+ if t.isLocalServerName(destServerName) {
return true
}
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
Type: gomatrixserverlib.MDirectToDevice,
- Origin: string(t.ServerName),
+ Origin: string(originServerName),
}
tdm := gomatrixserverlib.ToDeviceMessage{
Sender: ote.Sender,
@@ -127,7 +127,7 @@ func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msgs []*nats
}
log.Debugf("Sending send-to-device message into %q destination queue", destServerName)
- if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
+ if err := t.queues.SendEDU(edu, originServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}
diff --git a/federationapi/consumers/typing.go b/federationapi/consumers/typing.go
index 9c737913..c66f9751 100644
--- a/federationapi/consumers/typing.go
+++ b/federationapi/consumers/typing.go
@@ -31,13 +31,13 @@ import (
// OutputTypingConsumer consumes events that originate in the clientapi.
type OutputTypingConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- db storage.Database
- queues *queue.OutgoingQueues
- ServerName gomatrixserverlib.ServerName
- topic string
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
+ topic string
}
// NewOutputTypingConsumer creates a new OutputTypingConsumer. Call Start() to begin consuming typing events.
@@ -49,13 +49,13 @@ func NewOutputTypingConsumer(
store storage.Database,
) *OutputTypingConsumer {
return &OutputTypingConsumer{
- ctx: process.Context(),
- jetstream: js,
- queues: queues,
- db: store,
- ServerName: cfg.Matrix.ServerName,
- durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"),
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ ctx: process.Context(),
+ jetstream: js,
+ queues: queues,
+ db: store,
+ isLocalServerName: cfg.Matrix.IsLocalServerName,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
}
}
@@ -87,7 +87,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
_ = msg.Ack()
return true
}
- if typingServerName != t.ServerName {
+ if !t.isLocalServerName(typingServerName) {
return true
}
@@ -111,7 +111,7 @@ func (t *OutputTypingConsumer) onMessage(ctx context.Context, msgs []*nats.Msg)
log.WithError(err).Error("failed to marshal EDU JSON")
return true
}
- if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
+ if err := t.queues.SendEDU(edu, typingServerName, names); err != nil {
log.WithError(err).Error("failed to send EDU")
return false
}