aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-11-15 15:05:23 +0000
committerNeil Alexander <neilalexander@users.noreply.github.com>2022-11-15 15:05:23 +0000
commit6650712a1c0dec282b47b7ba14bc8c2e06a385d8 (patch)
tree12ca755c5c33d3489417f9355dda3f1b7983c779 /syncapi
parentf4ee3977340c84d321767d347795b1dcd05ac459 (diff)
Federation fixes for virtual hosting
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/keychange.go35
-rw-r--r--syncapi/consumers/receipts.go30
-rw-r--r--syncapi/consumers/roomserver.go10
-rw-r--r--syncapi/consumers/sendtodevice.go40
-rw-r--r--syncapi/routing/messages.go1
-rw-r--r--syncapi/syncapi_test.go4
6 files changed, 55 insertions, 65 deletions
diff --git a/syncapi/consumers/keychange.go b/syncapi/consumers/keychange.go
index 96ebba7e..92f08150 100644
--- a/syncapi/consumers/keychange.go
+++ b/syncapi/consumers/keychange.go
@@ -28,22 +28,20 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/streams"
"github.com/matrix-org/dendrite/syncapi/types"
- "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
)
// OutputKeyChangeEventConsumer consumes events that originated in the key server.
type OutputKeyChangeEventConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- db storage.Database
- notifier *notifier.Notifier
- stream streams.StreamProvider
- serverName gomatrixserverlib.ServerName // our server name
- rsAPI roomserverAPI.SyncRoomserverAPI
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ notifier *notifier.Notifier
+ stream streams.StreamProvider
+ rsAPI roomserverAPI.SyncRoomserverAPI
}
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
@@ -59,15 +57,14 @@ func NewOutputKeyChangeEventConsumer(
stream streams.StreamProvider,
) *OutputKeyChangeEventConsumer {
s := &OutputKeyChangeEventConsumer{
- ctx: process.Context(),
- jetstream: js,
- durable: cfg.Matrix.JetStream.Durable("SyncAPIKeyChangeConsumer"),
- topic: topic,
- db: store,
- serverName: cfg.Matrix.ServerName,
- rsAPI: rsAPI,
- notifier: notifier,
- stream: stream,
+ ctx: process.Context(),
+ jetstream: js,
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIKeyChangeConsumer"),
+ topic: topic,
+ db: store,
+ rsAPI: rsAPI,
+ notifier: notifier,
+ stream: stream,
}
return s
diff --git a/syncapi/consumers/receipts.go b/syncapi/consumers/receipts.go
index 8aaa6573..e39d43f9 100644
--- a/syncapi/consumers/receipts.go
+++ b/syncapi/consumers/receipts.go
@@ -34,14 +34,13 @@ import (
// OutputReceiptEventConsumer consumes events that originated in the EDU server.
type OutputReceiptEventConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- db storage.Database
- stream streams.StreamProvider
- notifier *notifier.Notifier
- serverName gomatrixserverlib.ServerName
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ stream streams.StreamProvider
+ notifier *notifier.Notifier
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@@ -55,14 +54,13 @@ func NewOutputReceiptEventConsumer(
stream streams.StreamProvider,
) *OutputReceiptEventConsumer {
return &OutputReceiptEventConsumer{
- ctx: process.Context(),
- jetstream: js,
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"),
- db: store,
- notifier: notifier,
- stream: stream,
- serverName: cfg.Matrix.ServerName,
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"),
+ db: store,
+ notifier: notifier,
+ stream: stream,
}
}
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index f767615c..1b67f568 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -364,11 +364,7 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gom
// TODO: check that it's a join and not a profile change (means unmarshalling prev_content)
if membership == gomatrixserverlib.Join {
// check it's a local join
- _, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
- if err != nil {
- return sp, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
- }
- if domain != s.cfg.Matrix.ServerName {
+ if _, _, err := s.cfg.Matrix.SplitLocalID('@', *ev.StateKey()); err != nil {
return sp, nil
}
@@ -390,9 +386,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
if msg.Event.StateKey() == nil {
return
}
- if _, serverName, err := gomatrixserverlib.SplitID('@', *msg.Event.StateKey()); err != nil {
- return
- } else if serverName != s.cfg.Matrix.ServerName {
+ if _, _, err := s.cfg.Matrix.SplitLocalID('@', *msg.Event.StateKey()); err != nil {
return
}
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
diff --git a/syncapi/consumers/sendtodevice.go b/syncapi/consumers/sendtodevice.go
index 49d84cca..356e8326 100644
--- a/syncapi/consumers/sendtodevice.go
+++ b/syncapi/consumers/sendtodevice.go
@@ -37,15 +37,15 @@ import (
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
type OutputSendToDeviceEventConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- topic string
- db storage.Database
- keyAPI keyapi.SyncKeyAPI
- serverName gomatrixserverlib.ServerName // our server name
- stream streams.StreamProvider
- notifier *notifier.Notifier
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ topic string
+ db storage.Database
+ keyAPI keyapi.SyncKeyAPI
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
+ stream streams.StreamProvider
+ notifier *notifier.Notifier
}
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
@@ -60,15 +60,15 @@ func NewOutputSendToDeviceEventConsumer(
stream streams.StreamProvider,
) *OutputSendToDeviceEventConsumer {
return &OutputSendToDeviceEventConsumer{
- ctx: process.Context(),
- jetstream: js,
- topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"),
- db: store,
- keyAPI: keyAPI,
- serverName: cfg.Matrix.ServerName,
- notifier: notifier,
- stream: stream,
+ ctx: process.Context(),
+ jetstream: js,
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"),
+ db: store,
+ keyAPI: keyAPI,
+ isLocalServerName: cfg.Matrix.IsLocalServerName,
+ notifier: notifier,
+ stream: stream,
}
}
@@ -89,7 +89,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
log.WithError(err).Errorf("send-to-device: failed to split user id, dropping message")
return true
}
- if domain != s.serverName {
+ if !s.isLocalServerName(domain) {
log.Tracef("ignoring send-to-device event with destination %s", domain)
return true
}
@@ -114,7 +114,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msgs []
if output.Type == "m.room_key_request" {
requestingDeviceID := gjson.GetBytes(output.SendToDeviceEvent.Content, "requesting_device_id").Str
_, senderDomain, _ := gomatrixserverlib.SplitID('@', output.Sender)
- if requestingDeviceID != "" && senderDomain != s.serverName {
+ if requestingDeviceID != "" && !s.isLocalServerName(senderDomain) {
// Mark the requesting device as stale, if we don't know about it.
if err = s.keyAPI.PerformMarkAsStaleIfNeeded(ctx, &keyapi.PerformMarkAsStaleRequest{
UserID: output.Sender, Domain: senderDomain, DeviceID: requestingDeviceID,
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index 86cf8e73..0d740ebf 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -528,6 +528,7 @@ func (r *messagesReq) backfill(roomID string, backwardsExtremities map[string][]
BackwardsExtremities: backwardsExtremities,
Limit: limit,
ServerName: r.cfg.Matrix.ServerName,
+ VirtualHost: r.device.UserDomain(),
}, &res)
if err != nil {
return nil, fmt.Errorf("PerformBackfill failed: %w", err)
diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go
index a4985dbf..48327448 100644
--- a/syncapi/syncapi_test.go
+++ b/syncapi/syncapi_test.go
@@ -433,7 +433,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)
beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody})
eventsToSend := append(room.Events(), beforeJoinEv)
- if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
+ if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
syncUntil(t, base, aliceDev.AccessToken, false,
@@ -472,7 +472,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) {
eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv)
- if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil {
+ if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}
syncUntil(t, base, aliceDev.AccessToken, false,