aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-11-15 15:05:23 +0000
committerNeil Alexander <neilalexander@users.noreply.github.com>2022-11-15 15:05:23 +0000
commit6650712a1c0dec282b47b7ba14bc8c2e06a385d8 (patch)
tree12ca755c5c33d3489417f9355dda3f1b7983c779 /roomserver
parentf4ee3977340c84d321767d347795b1dcd05ac459 (diff)
Federation fixes for virtual hosting
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/input.go5
-rw-r--r--roomserver/api/perform.go2
-rw-r--r--roomserver/api/wrapper.go11
-rw-r--r--roomserver/internal/alias.go19
-rw-r--r--roomserver/internal/api.go29
-rw-r--r--roomserver/internal/input/input.go7
-rw-r--r--roomserver/internal/input/input_events.go27
-rw-r--r--roomserver/internal/input/input_missing.go17
-rw-r--r--roomserver/internal/perform/perform_admin.go25
-rw-r--r--roomserver/internal/perform/perform_backfill.go50
-rw-r--r--roomserver/internal/perform/perform_join.go24
-rw-r--r--roomserver/internal/perform/perform_leave.go14
-rw-r--r--roomserver/internal/perform/perform_upgrade.go21
-rw-r--r--roomserver/internal/query/query.go10
-rw-r--r--roomserver/roomserver_test.go2
15 files changed, 168 insertions, 95 deletions
diff --git a/roomserver/api/input.go b/roomserver/api/input.go
index 45a9ef49..88d52327 100644
--- a/roomserver/api/input.go
+++ b/roomserver/api/input.go
@@ -94,8 +94,9 @@ type TransactionID struct {
// InputRoomEventsRequest is a request to InputRoomEvents
type InputRoomEventsRequest struct {
- InputRoomEvents []InputRoomEvent `json:"input_room_events"`
- Asynchronous bool `json:"async"`
+ InputRoomEvents []InputRoomEvent `json:"input_room_events"`
+ Asynchronous bool `json:"async"`
+ VirtualHost gomatrixserverlib.ServerName `json:"virtual_host"`
}
// InputRoomEventsResponse is a response to InputRoomEvents
diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go
index 2536a4bb..e70e5ea9 100644
--- a/roomserver/api/perform.go
+++ b/roomserver/api/perform.go
@@ -148,6 +148,8 @@ type PerformBackfillRequest struct {
Limit int `json:"limit"`
// The server interested in the events.
ServerName gomatrixserverlib.ServerName `json:"server_name"`
+ // Which virtual host are we doing this for?
+ VirtualHost gomatrixserverlib.ServerName `json:"virtual_host"`
}
// PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order.
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go
index 8b031982..252be557 100644
--- a/roomserver/api/wrapper.go
+++ b/roomserver/api/wrapper.go
@@ -26,7 +26,7 @@ import (
func SendEvents(
ctx context.Context, rsAPI InputRoomEventsAPI,
kind Kind, events []*gomatrixserverlib.HeaderedEvent,
- origin gomatrixserverlib.ServerName,
+ virtualHost, origin gomatrixserverlib.ServerName,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
async bool,
) error {
@@ -40,14 +40,15 @@ func SendEvents(
TransactionID: txnID,
}
}
- return SendInputRoomEvents(ctx, rsAPI, ires, async)
+ return SendInputRoomEvents(ctx, rsAPI, virtualHost, ires, async)
}
// SendEventWithState writes an event with the specified kind to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs.
func SendEventWithState(
- ctx context.Context, rsAPI InputRoomEventsAPI, kind Kind,
+ ctx context.Context, rsAPI InputRoomEventsAPI,
+ virtualHost gomatrixserverlib.ServerName, kind Kind,
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
origin gomatrixserverlib.ServerName, haveEventIDs map[string]bool, async bool,
) error {
@@ -85,17 +86,19 @@ func SendEventWithState(
StateEventIDs: stateEventIDs,
})
- return SendInputRoomEvents(ctx, rsAPI, ires, async)
+ return SendInputRoomEvents(ctx, rsAPI, virtualHost, ires, async)
}
// SendInputRoomEvents to the roomserver.
func SendInputRoomEvents(
ctx context.Context, rsAPI InputRoomEventsAPI,
+ virtualHost gomatrixserverlib.ServerName,
ires []InputRoomEvent, async bool,
) error {
request := InputRoomEventsRequest{
InputRoomEvents: ires,
Asynchronous: async,
+ VirtualHost: virtualHost,
}
var response InputRoomEventsResponse
if err := rsAPI.InputRoomEvents(ctx, &request, &response); err != nil {
diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go
index 175bb931..329e6af7 100644
--- a/roomserver/internal/alias.go
+++ b/roomserver/internal/alias.go
@@ -137,6 +137,11 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
request *api.RemoveRoomAliasRequest,
response *api.RemoveRoomAliasResponse,
) error {
+ _, virtualHost, err := r.Cfg.Matrix.SplitLocalID('@', request.UserID)
+ if err != nil {
+ return err
+ }
+
roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias)
if err != nil {
return fmt.Errorf("r.DB.GetRoomIDForAlias: %w", err)
@@ -190,6 +195,16 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
sender = ev.Sender()
}
+ _, senderDomain, err := r.Cfg.Matrix.SplitLocalID('@', sender)
+ if err != nil {
+ return err
+ }
+
+ identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain)
+ if err != nil {
+ return err
+ }
+
builder := &gomatrixserverlib.EventBuilder{
Sender: sender,
RoomID: ev.RoomID(),
@@ -211,12 +226,12 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias(
return err
}
- newEvent, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, stateRes)
+ newEvent, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, stateRes)
if err != nil {
return err
}
- err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false)
+ err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, virtualHost, r.ServerName, r.ServerName, nil, false)
if err != nil {
return err
}
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 1a11586a..1a362660 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -87,10 +87,10 @@ func NewRoomserverAPI(
Durable: base.Cfg.Global.JetStream.Durable("RoomserverInputConsumer"),
ServerACLs: serverACLs,
Queryer: &query.Queryer{
- DB: roomserverDB,
- Cache: base.Caches,
- ServerName: base.Cfg.Global.ServerName,
- ServerACLs: serverACLs,
+ DB: roomserverDB,
+ Cache: base.Caches,
+ IsLocalServerName: base.Cfg.Global.IsLocalServerName,
+ ServerACLs: serverACLs,
},
// perform-er structs get initialised when we have a federation sender to use
}
@@ -127,13 +127,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
Inputer: r.Inputer,
}
r.Joiner = &perform.Joiner{
- ServerName: r.Cfg.Matrix.ServerName,
- Cfg: r.Cfg,
- DB: r.DB,
- FSAPI: r.fsAPI,
- RSAPI: r,
- Inputer: r.Inputer,
- Queryer: r.Queryer,
+ Cfg: r.Cfg,
+ DB: r.DB,
+ FSAPI: r.fsAPI,
+ RSAPI: r,
+ Inputer: r.Inputer,
+ Queryer: r.Queryer,
}
r.Peeker = &perform.Peeker{
ServerName: r.Cfg.Matrix.ServerName,
@@ -163,10 +162,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio
DB: r.DB,
}
r.Backfiller = &perform.Backfiller{
- ServerName: r.ServerName,
- DB: r.DB,
- FSAPI: r.fsAPI,
- KeyRing: r.KeyRing,
+ IsLocalServerName: r.Cfg.Matrix.IsLocalServerName,
+ DB: r.DB,
+ FSAPI: r.fsAPI,
+ KeyRing: r.KeyRing,
// Perspective servers are trusted to not lie about server keys, so we will also
// prefer these servers when backfilling (assuming they are in the room) rather
// than trying random servers
diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go
index f5099ca1..e965691c 100644
--- a/roomserver/internal/input/input.go
+++ b/roomserver/internal/input/input.go
@@ -278,7 +278,11 @@ func (w *worker) _next() {
// a string, because we might want to return that to the caller if
// it was a synchronous request.
var errString string
- if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil {
+ if err = w.r.processRoomEvent(
+ w.r.ProcessContext.Context(),
+ gomatrixserverlib.ServerName(msg.Header.Get("virtual_host")),
+ &inputRoomEvent,
+ ); err != nil {
switch err.(type) {
case types.RejectedError:
// Don't send events that were rejected to Sentry
@@ -358,6 +362,7 @@ func (r *Inputer) queueInputRoomEvents(
if replyTo != "" {
msg.Header.Set("sync", replyTo)
}
+ msg.Header.Set("virtual_host", string(request.VirtualHost))
msg.Data, err = json.Marshal(e)
if err != nil {
return nil, fmt.Errorf("json.Marshal: %w", err)
diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go
index 682aa2b1..72c285f8 100644
--- a/roomserver/internal/input/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -69,6 +69,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
// nolint:gocyclo
func (r *Inputer) processRoomEvent(
ctx context.Context,
+ virtualHost gomatrixserverlib.ServerName,
input *api.InputRoomEvent,
) error {
select {
@@ -200,7 +201,7 @@ func (r *Inputer) processRoomEvent(
isRejected := false
authEvents := gomatrixserverlib.NewAuthEvents(nil)
knownEvents := map[string]*types.Event{}
- if err = r.fetchAuthEvents(ctx, logger, roomInfo, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
+ if err = r.fetchAuthEvents(ctx, logger, roomInfo, virtualHost, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
return fmt.Errorf("r.fetchAuthEvents: %w", err)
}
@@ -265,16 +266,17 @@ func (r *Inputer) processRoomEvent(
// processRoomEvent.
if len(serverRes.ServerNames) > 0 {
missingState := missingStateReq{
- origin: input.Origin,
- inputer: r,
- db: r.DB,
- roomInfo: roomInfo,
- federation: r.FSAPI,
- keys: r.KeyRing,
- roomsMu: internal.NewMutexByRoom(),
- servers: serverRes.ServerNames,
- hadEvents: map[string]bool{},
- haveEvents: map[string]*gomatrixserverlib.Event{},
+ origin: input.Origin,
+ virtualHost: virtualHost,
+ inputer: r,
+ db: r.DB,
+ roomInfo: roomInfo,
+ federation: r.FSAPI,
+ keys: r.KeyRing,
+ roomsMu: internal.NewMutexByRoom(),
+ servers: serverRes.ServerNames,
+ hadEvents: map[string]bool{},
+ haveEvents: map[string]*gomatrixserverlib.Event{},
}
var stateSnapshot *parsedRespState
if stateSnapshot, err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
@@ -555,6 +557,7 @@ func (r *Inputer) fetchAuthEvents(
ctx context.Context,
logger *logrus.Entry,
roomInfo *types.RoomInfo,
+ virtualHost gomatrixserverlib.ServerName,
event *gomatrixserverlib.HeaderedEvent,
auth *gomatrixserverlib.AuthEvents,
known map[string]*types.Event,
@@ -605,7 +608,7 @@ func (r *Inputer) fetchAuthEvents(
// Request the entire auth chain for the event in question. This should
// contain all of the auth events — including ones that we already know —
// so we'll need to filter through those in the next section.
- res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomVersion, event.RoomID(), event.EventID())
+ res, err = r.FSAPI.GetEventAuth(ctx, virtualHost, serverName, event.RoomVersion, event.RoomID(), event.EventID())
if err != nil {
logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err)
continue
diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go
index f788c5b3..03ac2b38 100644
--- a/roomserver/internal/input/input_missing.go
+++ b/roomserver/internal/input/input_missing.go
@@ -41,6 +41,7 @@ func (p *parsedRespState) Events() []*gomatrixserverlib.Event {
type missingStateReq struct {
log *logrus.Entry
+ virtualHost gomatrixserverlib.ServerName
origin gomatrixserverlib.ServerName
db storage.Database
roomInfo *types.RoomInfo
@@ -101,7 +102,7 @@ func (t *missingStateReq) processEventWithMissingState(
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
// in the gap in the DAG
for _, newEvent := range newEvents {
- err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
+ err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{
Kind: api.KindOld,
Event: newEvent.Headered(roomVersion),
Origin: t.origin,
@@ -157,7 +158,7 @@ func (t *missingStateReq) processEventWithMissingState(
})
}
for _, ire := range outlierRoomEvents {
- if err = t.inputer.processRoomEvent(ctx, &ire); err != nil {
+ if err = t.inputer.processRoomEvent(ctx, t.virtualHost, &ire); err != nil {
if _, ok := err.(types.RejectedError); !ok {
return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err)
}
@@ -180,7 +181,7 @@ func (t *missingStateReq) processEventWithMissingState(
stateIDs = append(stateIDs, event.EventID())
}
- err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
+ err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{
Kind: api.KindOld,
Event: backwardsExtremity.Headered(roomVersion),
Origin: t.origin,
@@ -199,7 +200,7 @@ func (t *missingStateReq) processEventWithMissingState(
// they will automatically fast-forward based on the room state at the
// extremity in the last step.
for _, newEvent := range newEvents {
- err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
+ err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{
Kind: api.KindOld,
Event: newEvent.Headered(roomVersion),
Origin: t.origin,
@@ -519,7 +520,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
var missingResp *gomatrixserverlib.RespMissingEvents
for _, server := range t.servers {
var m gomatrixserverlib.RespMissingEvents
- if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
+ if m, err = t.federation.LookupMissingEvents(ctx, t.virtualHost, server, e.RoomID(), gomatrixserverlib.MissingEvents{
Limit: 20,
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
EarliestEvents: latestEvents,
@@ -635,7 +636,7 @@ func (t *missingStateReq) lookupMissingStateViaState(
span, ctx := opentracing.StartSpanFromContext(ctx, "lookupMissingStateViaState")
defer span.Finish()
- state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion)
+ state, err := t.federation.LookupState(ctx, t.virtualHost, t.origin, roomID, eventID, roomVersion)
if err != nil {
return nil, err
}
@@ -675,7 +676,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
totalctx, totalcancel := context.WithTimeout(ctx, time.Minute*5)
for _, serverName := range t.servers {
reqctx, reqcancel := context.WithTimeout(totalctx, time.Second*20)
- stateIDs, err = t.federation.LookupStateIDs(reqctx, serverName, roomID, eventID)
+ stateIDs, err = t.federation.LookupStateIDs(reqctx, t.virtualHost, serverName, roomID, eventID)
reqcancel()
if err == nil {
break
@@ -855,7 +856,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs
for _, serverName := range t.servers {
reqctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
- txn, err := t.federation.GetEvent(reqctx, serverName, missingEventID)
+ txn, err := t.federation.GetEvent(reqctx, t.virtualHost, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
t.log.WithError(err).WithField("missing_event_id", missingEventID).Warn("Failed to get missing /event for event ID")
if errors.Is(err, context.DeadlineExceeded) {
diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go
index 0406fc8f..d42f4e45 100644
--- a/roomserver/internal/perform/perform_admin.go
+++ b/roomserver/internal/perform/perform_admin.go
@@ -139,7 +139,12 @@ func (r *Admin) PerformAdminEvacuateRoom(
return nil
}
- event, err := eventutil.BuildEvent(ctx, fledglingEvent, r.Cfg.Matrix, time.Now(), &eventsNeeded, latestRes)
+ identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain)
+ if err != nil {
+ continue
+ }
+
+ event, err := eventutil.BuildEvent(ctx, fledglingEvent, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, latestRes)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
@@ -242,6 +247,15 @@ func (r *Admin) PerformAdminDownloadState(
req *api.PerformAdminDownloadStateRequest,
res *api.PerformAdminDownloadStateResponse,
) error {
+ _, senderDomain, err := r.Cfg.Matrix.SplitLocalID('@', req.UserID)
+ if err != nil {
+ res.Error = &api.PerformError{
+ Code: api.PerformErrorBadRequest,
+ Msg: fmt.Sprintf("r.Cfg.Matrix.SplitLocalID: %s", err),
+ }
+ return nil
+ }
+
roomInfo, err := r.DB.RoomInfo(ctx, req.RoomID)
if err != nil {
res.Error = &api.PerformError{
@@ -273,7 +287,7 @@ func (r *Admin) PerformAdminDownloadState(
for _, fwdExtremity := range fwdExtremities {
var state gomatrixserverlib.RespState
- state, err = r.Inputer.FSAPI.LookupState(ctx, req.ServerName, req.RoomID, fwdExtremity.EventID, roomInfo.RoomVersion)
+ state, err = r.Inputer.FSAPI.LookupState(ctx, r.Inputer.ServerName, req.ServerName, req.RoomID, fwdExtremity.EventID, roomInfo.RoomVersion)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
@@ -331,7 +345,12 @@ func (r *Admin) PerformAdminDownloadState(
Depth: depth,
}
- ev, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, queryRes)
+ identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain)
+ if err != nil {
+ return err
+ }
+
+ ev, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, queryRes)
if err != nil {
res.Error = &api.PerformError{
Code: api.PerformErrorBadRequest,
diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go
index 57e121ea..069f017a 100644
--- a/roomserver/internal/perform/perform_backfill.go
+++ b/roomserver/internal/perform/perform_backfill.go
@@ -37,10 +37,10 @@ import (
const maxBackfillServers = 5
type Backfiller struct {
- ServerName gomatrixserverlib.ServerName
- DB storage.Database
- FSAPI federationAPI.RoomserverFederationAPI
- KeyRing gomatrixserverlib.JSONVerifier
+ IsLocalServerName func(gomatrixserverlib.ServerName) bool
+ DB storage.Database
+ FSAPI federationAPI.RoomserverFederationAPI
+ KeyRing gomatrixserverlib.JSONVerifier
// The servers which should be preferred above other servers when backfilling
PreferServers []gomatrixserverlib.ServerName
@@ -55,7 +55,7 @@ func (r *Backfiller) PerformBackfill(
// if we are requesting the backfill then we need to do a federation hit
// TODO: we could be more sensible and fetch as many events we already have then request the rest
// which is what the syncapi does already.
- if request.ServerName == r.ServerName {
+ if r.IsLocalServerName(request.ServerName) {
return r.backfillViaFederation(ctx, request, response)
}
// someone else is requesting the backfill, try to service their request.
@@ -112,16 +112,18 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
if info == nil || info.IsStub() {
return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID)
}
- requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities, r.PreferServers)
+ requester := newBackfillRequester(r.DB, r.FSAPI, req.VirtualHost, r.IsLocalServerName, req.BackwardsExtremities, r.PreferServers)
// Request 100 items regardless of what the query asks for.
// We don't want to go much higher than this.
// We can't honour exactly the limit as some sytests rely on requesting more for tests to pass
// (so we don't need to hit /state_ids which the test has no listener for)
// Specifically the test "Outbound federation can backfill events"
events, err := gomatrixserverlib.RequestBackfill(
- ctx, requester,
- r.KeyRing, req.RoomID, info.RoomVersion, req.PrevEventIDs(), 100)
+ ctx, req.VirtualHost, requester,
+ r.KeyRing, req.RoomID, info.RoomVersion, req.PrevEventIDs(), 100,
+ )
if err != nil {
+ logrus.WithError(err).Errorf("gomatrixserverlib.RequestBackfill failed")
return err
}
logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events))
@@ -144,7 +146,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
var entries []types.StateEntry
if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true); err != nil {
// attempt to fetch the missing events
- r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs)
+ r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs, req.VirtualHost)
// try again
entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true)
if err != nil {
@@ -173,7 +175,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
// fetchAndStoreMissingEvents does a best-effort fetch and store of missing events specified in stateIDs. Returns no error as it is just
// best effort.
func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gomatrixserverlib.RoomVersion,
- backfillRequester *backfillRequester, stateIDs []string) {
+ backfillRequester *backfillRequester, stateIDs []string, virtualHost gomatrixserverlib.ServerName) {
servers := backfillRequester.servers
@@ -198,7 +200,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
continue // already found
}
logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id)
- res, err := r.FSAPI.GetEvent(ctx, srv, id)
+ res, err := r.FSAPI.GetEvent(ctx, virtualHost, srv, id)
if err != nil {
logger.WithError(err).Warn("failed to get event from server")
continue
@@ -241,11 +243,12 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom
// backfillRequester implements gomatrixserverlib.BackfillRequester
type backfillRequester struct {
- db storage.Database
- fsAPI federationAPI.RoomserverFederationAPI
- thisServer gomatrixserverlib.ServerName
- preferServer map[gomatrixserverlib.ServerName]bool
- bwExtrems map[string][]string
+ db storage.Database
+ fsAPI federationAPI.RoomserverFederationAPI
+ virtualHost gomatrixserverlib.ServerName
+ isLocalServerName func(gomatrixserverlib.ServerName) bool
+ preferServer map[gomatrixserverlib.ServerName]bool
+ bwExtrems map[string][]string
// per-request state
servers []gomatrixserverlib.ServerName
@@ -255,7 +258,9 @@ type backfillRequester struct {
}
func newBackfillRequester(
- db storage.Database, fsAPI federationAPI.RoomserverFederationAPI, thisServer gomatrixserverlib.ServerName,
+ db storage.Database, fsAPI federationAPI.RoomserverFederationAPI,
+ virtualHost gomatrixserverlib.ServerName,
+ isLocalServerName func(gomatrixserverlib.ServerName) bool,
bwExtrems map[string][]string, preferServers []gomatrixserverlib.ServerName,
) *backfillRequester {
preferServer := make(map[gomatrixserverlib.ServerName]bool)
@@ -265,7 +270,8 @@ func newBackfillRequester(
return &backfillRequester{
db: db,
fsAPI: fsAPI,
- thisServer: thisServer,
+ virtualHost: virtualHost,
+ isLocalServerName: isLocalServerName,
eventIDToBeforeStateIDs: make(map[string][]string),
eventIDMap: make(map[string]*gomatrixserverlib.Event),
bwExtrems: bwExtrems,
@@ -450,7 +456,7 @@ FindSuccessor:
}
// possibly return all joined servers depending on history visiblity
- memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer)
+ memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.virtualHost)
b.historyVisiblity = visibility
if err != nil {
logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules")
@@ -477,7 +483,7 @@ FindSuccessor:
}
var servers []gomatrixserverlib.ServerName
for server := range serverSet {
- if server == b.thisServer {
+ if b.isLocalServerName(server) {
continue
}
if b.preferServer[server] { // insert at the front
@@ -496,10 +502,10 @@ FindSuccessor:
// Backfill performs a backfill request to the given server.
// https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
-func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string,
+func (b *backfillRequester) Backfill(ctx context.Context, origin, server gomatrixserverlib.ServerName, roomID string,
limit int, fromEventIDs []string) (gomatrixserverlib.Transaction, error) {
- tx, err := b.fsAPI.Backfill(ctx, server, roomID, limit, fromEventIDs)
+ tx, err := b.fsAPI.Backfill(ctx, origin, server, roomID, limit, fromEventIDs)
return tx, err
}
diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go
index 9d596ab3..4de008c6 100644
--- a/roomserver/internal/perform/perform_join.go
+++ b/roomserver/internal/perform/perform_join.go
@@ -39,11 +39,10 @@ import (
)
type Joiner struct {
- ServerName gomatrixserverlib.ServerName
- Cfg *config.RoomServer
- FSAPI fsAPI.RoomserverFederationAPI
- RSAPI rsAPI.RoomserverInternalAPI
- DB storage.Database
+ Cfg *config.RoomServer
+ FSAPI fsAPI.RoomserverFederationAPI
+ RSAPI rsAPI.RoomserverInternalAPI
+ DB storage.Database
Inputer *input.Inputer
Queryer *query.Queryer
@@ -197,7 +196,7 @@ func (r *Joiner) performJoinRoomByID(
// Prepare the template for the join event.
userID := req.UserID
- _, userDomain, err := gomatrixserverlib.SplitID('@', userID)
+ _, userDomain, err := r.Cfg.Matrix.SplitLocalID('@', userID)
if err != nil {
return "", "", &rsAPI.PerformError{
Code: rsAPI.PerformErrorBadRequest,
@@ -283,7 +282,7 @@ func (r *Joiner) performJoinRoomByID(
// locally on the homeserver.
// TODO: Check what happens if the room exists on the server
// but everyone has since left. I suspect it does the wrong thing.
- event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, &eb)
+ event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, userDomain, &eb)
switch err {
case nil:
@@ -410,7 +409,9 @@ func (r *Joiner) populateAuthorisedViaUserForRestrictedJoin(
}
func buildEvent(
- ctx context.Context, db storage.Database, cfg *config.Global, builder *gomatrixserverlib.EventBuilder,
+ ctx context.Context, db storage.Database, cfg *config.Global,
+ senderDomain gomatrixserverlib.ServerName,
+ builder *gomatrixserverlib.EventBuilder,
) (*gomatrixserverlib.HeaderedEvent, *rsAPI.QueryLatestEventsAndStateResponse, error) {
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
if err != nil {
@@ -438,7 +439,12 @@ func buildEvent(
}
}
- ev, err := eventutil.BuildEvent(ctx, builder, cfg, time.Now(), &eventsNeeded, &queryRes)
+ identity, err := cfg.SigningIdentityFor(senderDomain)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ ev, err := eventutil.BuildEvent(ctx, builder, cfg, identity, time.Now(), &eventsNeeded, &queryRes)
if err != nil {
return nil, nil, err
}
diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go
index 49e4b479..fa998e3e 100644
--- a/roomserver/internal/perform/perform_leave.go
+++ b/roomserver/internal/perform/perform_leave.go
@@ -162,21 +162,21 @@ func (r *Leaver) performLeaveRoomByID(
return nil, fmt.Errorf("eb.SetUnsigned: %w", err)
}
+ // Get the sender domain.
+ _, senderDomain, serr := r.Cfg.Matrix.SplitLocalID('@', eb.Sender)
+ if serr != nil {
+ return nil, fmt.Errorf("sender %q is invalid", eb.Sender)
+ }
+
// We know that the user is in the room at this point so let's build
// a leave event.
// TODO: Check what happens if the room exists on the server
// but everyone has since left. I suspect it does the wrong thing.
- event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, &eb)
+ event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, senderDomain, &eb)
if err != nil {
return nil, fmt.Errorf("eventutil.BuildEvent: %w", err)
}
- // Get the sender domain.
- _, senderDomain, serr := gomatrixserverlib.SplitID('@', event.Sender())
- if serr != nil {
- return nil, fmt.Errorf("sender %q is invalid", event.Sender())
- }
-
// Give our leave event to the roomserver input stream. The
// roomserver will process the membership change and notify
// downstream automatically.
diff --git a/roomserver/internal/perform/perform_upgrade.go b/roomserver/internal/perform/perform_upgrade.go
index 38abe323..02a19911 100644
--- a/roomserver/internal/perform/perform_upgrade.go
+++ b/roomserver/internal/perform/perform_upgrade.go
@@ -60,7 +60,7 @@ func (r *Upgrader) performRoomUpgrade(
) (string, *api.PerformError) {
roomID := req.RoomID
userID := req.UserID
- _, userDomain, err := gomatrixserverlib.SplitID('@', userID)
+ _, userDomain, err := r.Cfg.Matrix.SplitLocalID('@', userID)
if err != nil {
return "", &api.PerformError{
Code: api.PerformErrorNotAllowed,
@@ -558,7 +558,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user
SendAsServer: api.DoNotSendToOtherServers,
})
}
- if err = api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil {
+ if err = api.SendInputRoomEvents(ctx, r.URSAPI, userDomain, inputs, false); err != nil {
return &api.PerformError{
Msg: fmt.Sprintf("Failed to send new room %q to roomserver: %s", newRoomID, err),
}
@@ -595,8 +595,21 @@ func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, user
Msg: fmt.Sprintf("Failed to set new %q event content: %s", builder.Type, err),
}
}
+ // Get the sender domain.
+ _, senderDomain, serr := r.Cfg.Matrix.SplitLocalID('@', builder.Sender)
+ if serr != nil {
+ return nil, &api.PerformError{
+ Msg: fmt.Sprintf("Failed to split user ID %q: %s", builder.Sender, err),
+ }
+ }
+ identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain)
+ if err != nil {
+ return nil, &api.PerformError{
+ Msg: fmt.Sprintf("Failed to get signing identity for %q: %s", senderDomain, err),
+ }
+ }
var queryRes api.QueryLatestEventsAndStateResponse
- headeredEvent, err := eventutil.QueryAndBuildEvent(ctx, &builder, r.Cfg.Matrix, evTime, r.URSAPI, &queryRes)
+ headeredEvent, err := eventutil.QueryAndBuildEvent(ctx, &builder, r.Cfg.Matrix, identity, evTime, r.URSAPI, &queryRes)
if err == eventutil.ErrRoomNoExists {
return nil, &api.PerformError{
Code: api.PerformErrorNoRoom,
@@ -686,7 +699,7 @@ func (r *Upgrader) sendHeaderedEvent(
Origin: serverName,
SendAsServer: sendAsServer,
})
- if err := api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil {
+ if err := api.SendInputRoomEvents(ctx, r.URSAPI, serverName, inputs, false); err != nil {
return &api.PerformError{
Msg: fmt.Sprintf("Failed to send new %q event to roomserver: %s", headeredEvent.Type(), err),
}
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
index 8850e5c4..d8456fb4 100644
--- a/roomserver/internal/query/query.go
+++ b/roomserver/internal/query/query.go
@@ -37,10 +37,10 @@ import (
)
type Queryer struct {
- DB storage.Database
- Cache caching.RoomServerCaches
- ServerName gomatrixserverlib.ServerName
- ServerACLs *acls.ServerACLs
+ DB storage.Database
+ Cache caching.RoomServerCaches
+ IsLocalServerName func(gomatrixserverlib.ServerName) bool
+ ServerACLs *acls.ServerACLs
}
// QueryLatestEventsAndState implements api.RoomserverInternalAPI
@@ -392,7 +392,7 @@ func (r *Queryer) QueryServerJoinedToRoom(
}
response.RoomExists = true
- if request.ServerName == r.ServerName || request.ServerName == "" {
+ if r.IsLocalServerName(request.ServerName) || request.ServerName == "" {
response.IsInRoom, err = r.DB.GetLocalServerInRoom(ctx, info.RoomNID)
if err != nil {
return fmt.Errorf("r.DB.GetLocalServerInRoom: %w", err)
diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go
index 4e98af85..24b5515e 100644
--- a/roomserver/roomserver_test.go
+++ b/roomserver/roomserver_test.go
@@ -44,7 +44,7 @@ func Test_SharedUsers(t *testing.T) {
// SetFederationAPI starts the room event input consumer
rsAPI.SetFederationAPI(nil, nil)
// Create the room
- if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", nil, false); err != nil {
+ if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil {
t.Fatalf("failed to send events: %v", err)
}