diff options
author | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-11-15 15:05:23 +0000 |
---|---|---|
committer | Neil Alexander <neilalexander@users.noreply.github.com> | 2022-11-15 15:05:23 +0000 |
commit | 6650712a1c0dec282b47b7ba14bc8c2e06a385d8 (patch) | |
tree | 12ca755c5c33d3489417f9355dda3f1b7983c779 /roomserver | |
parent | f4ee3977340c84d321767d347795b1dcd05ac459 (diff) |
Federation fixes for virtual hosting
Diffstat (limited to 'roomserver')
-rw-r--r-- | roomserver/api/input.go | 5 | ||||
-rw-r--r-- | roomserver/api/perform.go | 2 | ||||
-rw-r--r-- | roomserver/api/wrapper.go | 11 | ||||
-rw-r--r-- | roomserver/internal/alias.go | 19 | ||||
-rw-r--r-- | roomserver/internal/api.go | 29 | ||||
-rw-r--r-- | roomserver/internal/input/input.go | 7 | ||||
-rw-r--r-- | roomserver/internal/input/input_events.go | 27 | ||||
-rw-r--r-- | roomserver/internal/input/input_missing.go | 17 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_admin.go | 25 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_backfill.go | 50 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_join.go | 24 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_leave.go | 14 | ||||
-rw-r--r-- | roomserver/internal/perform/perform_upgrade.go | 21 | ||||
-rw-r--r-- | roomserver/internal/query/query.go | 10 | ||||
-rw-r--r-- | roomserver/roomserver_test.go | 2 |
15 files changed, 168 insertions, 95 deletions
diff --git a/roomserver/api/input.go b/roomserver/api/input.go index 45a9ef49..88d52327 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -94,8 +94,9 @@ type TransactionID struct { // InputRoomEventsRequest is a request to InputRoomEvents type InputRoomEventsRequest struct { - InputRoomEvents []InputRoomEvent `json:"input_room_events"` - Asynchronous bool `json:"async"` + InputRoomEvents []InputRoomEvent `json:"input_room_events"` + Asynchronous bool `json:"async"` + VirtualHost gomatrixserverlib.ServerName `json:"virtual_host"` } // InputRoomEventsResponse is a response to InputRoomEvents diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go index 2536a4bb..e70e5ea9 100644 --- a/roomserver/api/perform.go +++ b/roomserver/api/perform.go @@ -148,6 +148,8 @@ type PerformBackfillRequest struct { Limit int `json:"limit"` // The server interested in the events. ServerName gomatrixserverlib.ServerName `json:"server_name"` + // Which virtual host are we doing this for? + VirtualHost gomatrixserverlib.ServerName `json:"virtual_host"` } // PrevEventIDs returns the prev_event IDs of all backwards extremities, de-duplicated in a lexicographically sorted order. diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index 8b031982..252be557 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -26,7 +26,7 @@ import ( func SendEvents( ctx context.Context, rsAPI InputRoomEventsAPI, kind Kind, events []*gomatrixserverlib.HeaderedEvent, - origin gomatrixserverlib.ServerName, + virtualHost, origin gomatrixserverlib.ServerName, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, async bool, ) error { @@ -40,14 +40,15 @@ func SendEvents( TransactionID: txnID, } } - return SendInputRoomEvents(ctx, rsAPI, ires, async) + return SendInputRoomEvents(ctx, rsAPI, virtualHost, ires, async) } // SendEventWithState writes an event with the specified kind to the roomserver // with the state at the event as KindOutlier before it. Will not send any event that is // marked as `true` in haveEventIDs. func SendEventWithState( - ctx context.Context, rsAPI InputRoomEventsAPI, kind Kind, + ctx context.Context, rsAPI InputRoomEventsAPI, + virtualHost gomatrixserverlib.ServerName, kind Kind, state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, origin gomatrixserverlib.ServerName, haveEventIDs map[string]bool, async bool, ) error { @@ -85,17 +86,19 @@ func SendEventWithState( StateEventIDs: stateEventIDs, }) - return SendInputRoomEvents(ctx, rsAPI, ires, async) + return SendInputRoomEvents(ctx, rsAPI, virtualHost, ires, async) } // SendInputRoomEvents to the roomserver. func SendInputRoomEvents( ctx context.Context, rsAPI InputRoomEventsAPI, + virtualHost gomatrixserverlib.ServerName, ires []InputRoomEvent, async bool, ) error { request := InputRoomEventsRequest{ InputRoomEvents: ires, Asynchronous: async, + VirtualHost: virtualHost, } var response InputRoomEventsResponse if err := rsAPI.InputRoomEvents(ctx, &request, &response); err != nil { diff --git a/roomserver/internal/alias.go b/roomserver/internal/alias.go index 175bb931..329e6af7 100644 --- a/roomserver/internal/alias.go +++ b/roomserver/internal/alias.go @@ -137,6 +137,11 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( request *api.RemoveRoomAliasRequest, response *api.RemoveRoomAliasResponse, ) error { + _, virtualHost, err := r.Cfg.Matrix.SplitLocalID('@', request.UserID) + if err != nil { + return err + } + roomID, err := r.DB.GetRoomIDForAlias(ctx, request.Alias) if err != nil { return fmt.Errorf("r.DB.GetRoomIDForAlias: %w", err) @@ -190,6 +195,16 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( sender = ev.Sender() } + _, senderDomain, err := r.Cfg.Matrix.SplitLocalID('@', sender) + if err != nil { + return err + } + + identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain) + if err != nil { + return err + } + builder := &gomatrixserverlib.EventBuilder{ Sender: sender, RoomID: ev.RoomID(), @@ -211,12 +226,12 @@ func (r *RoomserverInternalAPI) RemoveRoomAlias( return err } - newEvent, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, stateRes) + newEvent, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, stateRes) if err != nil { return err } - err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, r.ServerName, r.ServerName, nil, false) + err = api.SendEvents(ctx, r, api.KindNew, []*gomatrixserverlib.HeaderedEvent{newEvent}, virtualHost, r.ServerName, r.ServerName, nil, false) if err != nil { return err } diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 1a11586a..1a362660 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -87,10 +87,10 @@ func NewRoomserverAPI( Durable: base.Cfg.Global.JetStream.Durable("RoomserverInputConsumer"), ServerACLs: serverACLs, Queryer: &query.Queryer{ - DB: roomserverDB, - Cache: base.Caches, - ServerName: base.Cfg.Global.ServerName, - ServerACLs: serverACLs, + DB: roomserverDB, + Cache: base.Caches, + IsLocalServerName: base.Cfg.Global.IsLocalServerName, + ServerACLs: serverACLs, }, // perform-er structs get initialised when we have a federation sender to use } @@ -127,13 +127,12 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio Inputer: r.Inputer, } r.Joiner = &perform.Joiner{ - ServerName: r.Cfg.Matrix.ServerName, - Cfg: r.Cfg, - DB: r.DB, - FSAPI: r.fsAPI, - RSAPI: r, - Inputer: r.Inputer, - Queryer: r.Queryer, + Cfg: r.Cfg, + DB: r.DB, + FSAPI: r.fsAPI, + RSAPI: r, + Inputer: r.Inputer, + Queryer: r.Queryer, } r.Peeker = &perform.Peeker{ ServerName: r.Cfg.Matrix.ServerName, @@ -163,10 +162,10 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.RoomserverFederatio DB: r.DB, } r.Backfiller = &perform.Backfiller{ - ServerName: r.ServerName, - DB: r.DB, - FSAPI: r.fsAPI, - KeyRing: r.KeyRing, + IsLocalServerName: r.Cfg.Matrix.IsLocalServerName, + DB: r.DB, + FSAPI: r.fsAPI, + KeyRing: r.KeyRing, // Perspective servers are trusted to not lie about server keys, so we will also // prefer these servers when backfilling (assuming they are in the room) rather // than trying random servers diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index f5099ca1..e965691c 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -278,7 +278,11 @@ func (w *worker) _next() { // a string, because we might want to return that to the caller if // it was a synchronous request. var errString string - if err = w.r.processRoomEvent(w.r.ProcessContext.Context(), &inputRoomEvent); err != nil { + if err = w.r.processRoomEvent( + w.r.ProcessContext.Context(), + gomatrixserverlib.ServerName(msg.Header.Get("virtual_host")), + &inputRoomEvent, + ); err != nil { switch err.(type) { case types.RejectedError: // Don't send events that were rejected to Sentry @@ -358,6 +362,7 @@ func (r *Inputer) queueInputRoomEvents( if replyTo != "" { msg.Header.Set("sync", replyTo) } + msg.Header.Set("virtual_host", string(request.VirtualHost)) msg.Data, err = json.Marshal(e) if err != nil { return nil, fmt.Errorf("json.Marshal: %w", err) diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 682aa2b1..72c285f8 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -69,6 +69,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec( // nolint:gocyclo func (r *Inputer) processRoomEvent( ctx context.Context, + virtualHost gomatrixserverlib.ServerName, input *api.InputRoomEvent, ) error { select { @@ -200,7 +201,7 @@ func (r *Inputer) processRoomEvent( isRejected := false authEvents := gomatrixserverlib.NewAuthEvents(nil) knownEvents := map[string]*types.Event{} - if err = r.fetchAuthEvents(ctx, logger, roomInfo, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil { + if err = r.fetchAuthEvents(ctx, logger, roomInfo, virtualHost, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil { return fmt.Errorf("r.fetchAuthEvents: %w", err) } @@ -265,16 +266,17 @@ func (r *Inputer) processRoomEvent( // processRoomEvent. if len(serverRes.ServerNames) > 0 { missingState := missingStateReq{ - origin: input.Origin, - inputer: r, - db: r.DB, - roomInfo: roomInfo, - federation: r.FSAPI, - keys: r.KeyRing, - roomsMu: internal.NewMutexByRoom(), - servers: serverRes.ServerNames, - hadEvents: map[string]bool{}, - haveEvents: map[string]*gomatrixserverlib.Event{}, + origin: input.Origin, + virtualHost: virtualHost, + inputer: r, + db: r.DB, + roomInfo: roomInfo, + federation: r.FSAPI, + keys: r.KeyRing, + roomsMu: internal.NewMutexByRoom(), + servers: serverRes.ServerNames, + hadEvents: map[string]bool{}, + haveEvents: map[string]*gomatrixserverlib.Event{}, } var stateSnapshot *parsedRespState if stateSnapshot, err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil { @@ -555,6 +557,7 @@ func (r *Inputer) fetchAuthEvents( ctx context.Context, logger *logrus.Entry, roomInfo *types.RoomInfo, + virtualHost gomatrixserverlib.ServerName, event *gomatrixserverlib.HeaderedEvent, auth *gomatrixserverlib.AuthEvents, known map[string]*types.Event, @@ -605,7 +608,7 @@ func (r *Inputer) fetchAuthEvents( // Request the entire auth chain for the event in question. This should // contain all of the auth events — including ones that we already know — // so we'll need to filter through those in the next section. - res, err = r.FSAPI.GetEventAuth(ctx, serverName, event.RoomVersion, event.RoomID(), event.EventID()) + res, err = r.FSAPI.GetEventAuth(ctx, virtualHost, serverName, event.RoomVersion, event.RoomID(), event.EventID()) if err != nil { logger.WithError(err).Warnf("Failed to get event auth from federation for %q: %s", event.EventID(), err) continue diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index f788c5b3..03ac2b38 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -41,6 +41,7 @@ func (p *parsedRespState) Events() []*gomatrixserverlib.Event { type missingStateReq struct { log *logrus.Entry + virtualHost gomatrixserverlib.ServerName origin gomatrixserverlib.ServerName db storage.Database roomInfo *types.RoomInfo @@ -101,7 +102,7 @@ func (t *missingStateReq) processEventWithMissingState( // we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled // in the gap in the DAG for _, newEvent := range newEvents { - err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{ + err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{ Kind: api.KindOld, Event: newEvent.Headered(roomVersion), Origin: t.origin, @@ -157,7 +158,7 @@ func (t *missingStateReq) processEventWithMissingState( }) } for _, ire := range outlierRoomEvents { - if err = t.inputer.processRoomEvent(ctx, &ire); err != nil { + if err = t.inputer.processRoomEvent(ctx, t.virtualHost, &ire); err != nil { if _, ok := err.(types.RejectedError); !ok { return fmt.Errorf("t.inputer.processRoomEvent (outlier): %w", err) } @@ -180,7 +181,7 @@ func (t *missingStateReq) processEventWithMissingState( stateIDs = append(stateIDs, event.EventID()) } - err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{ + err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{ Kind: api.KindOld, Event: backwardsExtremity.Headered(roomVersion), Origin: t.origin, @@ -199,7 +200,7 @@ func (t *missingStateReq) processEventWithMissingState( // they will automatically fast-forward based on the room state at the // extremity in the last step. for _, newEvent := range newEvents { - err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{ + err = t.inputer.processRoomEvent(ctx, t.virtualHost, &api.InputRoomEvent{ Kind: api.KindOld, Event: newEvent.Headered(roomVersion), Origin: t.origin, @@ -519,7 +520,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve var missingResp *gomatrixserverlib.RespMissingEvents for _, server := range t.servers { var m gomatrixserverlib.RespMissingEvents - if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ + if m, err = t.federation.LookupMissingEvents(ctx, t.virtualHost, server, e.RoomID(), gomatrixserverlib.MissingEvents{ Limit: 20, // The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events. EarliestEvents: latestEvents, @@ -635,7 +636,7 @@ func (t *missingStateReq) lookupMissingStateViaState( span, ctx := opentracing.StartSpanFromContext(ctx, "lookupMissingStateViaState") defer span.Finish() - state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion) + state, err := t.federation.LookupState(ctx, t.virtualHost, t.origin, roomID, eventID, roomVersion) if err != nil { return nil, err } @@ -675,7 +676,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo totalctx, totalcancel := context.WithTimeout(ctx, time.Minute*5) for _, serverName := range t.servers { reqctx, reqcancel := context.WithTimeout(totalctx, time.Second*20) - stateIDs, err = t.federation.LookupStateIDs(reqctx, serverName, roomID, eventID) + stateIDs, err = t.federation.LookupStateIDs(reqctx, t.virtualHost, serverName, roomID, eventID) reqcancel() if err == nil { break @@ -855,7 +856,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs for _, serverName := range t.servers { reqctx, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - txn, err := t.federation.GetEvent(reqctx, serverName, missingEventID) + txn, err := t.federation.GetEvent(reqctx, t.virtualHost, serverName, missingEventID) if err != nil || len(txn.PDUs) == 0 { t.log.WithError(err).WithField("missing_event_id", missingEventID).Warn("Failed to get missing /event for event ID") if errors.Is(err, context.DeadlineExceeded) { diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index 0406fc8f..d42f4e45 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -139,7 +139,12 @@ func (r *Admin) PerformAdminEvacuateRoom( return nil } - event, err := eventutil.BuildEvent(ctx, fledglingEvent, r.Cfg.Matrix, time.Now(), &eventsNeeded, latestRes) + identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain) + if err != nil { + continue + } + + event, err := eventutil.BuildEvent(ctx, fledglingEvent, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, latestRes) if err != nil { res.Error = &api.PerformError{ Code: api.PerformErrorBadRequest, @@ -242,6 +247,15 @@ func (r *Admin) PerformAdminDownloadState( req *api.PerformAdminDownloadStateRequest, res *api.PerformAdminDownloadStateResponse, ) error { + _, senderDomain, err := r.Cfg.Matrix.SplitLocalID('@', req.UserID) + if err != nil { + res.Error = &api.PerformError{ + Code: api.PerformErrorBadRequest, + Msg: fmt.Sprintf("r.Cfg.Matrix.SplitLocalID: %s", err), + } + return nil + } + roomInfo, err := r.DB.RoomInfo(ctx, req.RoomID) if err != nil { res.Error = &api.PerformError{ @@ -273,7 +287,7 @@ func (r *Admin) PerformAdminDownloadState( for _, fwdExtremity := range fwdExtremities { var state gomatrixserverlib.RespState - state, err = r.Inputer.FSAPI.LookupState(ctx, req.ServerName, req.RoomID, fwdExtremity.EventID, roomInfo.RoomVersion) + state, err = r.Inputer.FSAPI.LookupState(ctx, r.Inputer.ServerName, req.ServerName, req.RoomID, fwdExtremity.EventID, roomInfo.RoomVersion) if err != nil { res.Error = &api.PerformError{ Code: api.PerformErrorBadRequest, @@ -331,7 +345,12 @@ func (r *Admin) PerformAdminDownloadState( Depth: depth, } - ev, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, time.Now(), &eventsNeeded, queryRes) + identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain) + if err != nil { + return err + } + + ev, err := eventutil.BuildEvent(ctx, builder, r.Cfg.Matrix, identity, time.Now(), &eventsNeeded, queryRes) if err != nil { res.Error = &api.PerformError{ Code: api.PerformErrorBadRequest, diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index 57e121ea..069f017a 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -37,10 +37,10 @@ import ( const maxBackfillServers = 5 type Backfiller struct { - ServerName gomatrixserverlib.ServerName - DB storage.Database - FSAPI federationAPI.RoomserverFederationAPI - KeyRing gomatrixserverlib.JSONVerifier + IsLocalServerName func(gomatrixserverlib.ServerName) bool + DB storage.Database + FSAPI federationAPI.RoomserverFederationAPI + KeyRing gomatrixserverlib.JSONVerifier // The servers which should be preferred above other servers when backfilling PreferServers []gomatrixserverlib.ServerName @@ -55,7 +55,7 @@ func (r *Backfiller) PerformBackfill( // if we are requesting the backfill then we need to do a federation hit // TODO: we could be more sensible and fetch as many events we already have then request the rest // which is what the syncapi does already. - if request.ServerName == r.ServerName { + if r.IsLocalServerName(request.ServerName) { return r.backfillViaFederation(ctx, request, response) } // someone else is requesting the backfill, try to service their request. @@ -112,16 +112,18 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform if info == nil || info.IsStub() { return fmt.Errorf("backfillViaFederation: missing room info for room %s", req.RoomID) } - requester := newBackfillRequester(r.DB, r.FSAPI, r.ServerName, req.BackwardsExtremities, r.PreferServers) + requester := newBackfillRequester(r.DB, r.FSAPI, req.VirtualHost, r.IsLocalServerName, req.BackwardsExtremities, r.PreferServers) // Request 100 items regardless of what the query asks for. // We don't want to go much higher than this. // We can't honour exactly the limit as some sytests rely on requesting more for tests to pass // (so we don't need to hit /state_ids which the test has no listener for) // Specifically the test "Outbound federation can backfill events" events, err := gomatrixserverlib.RequestBackfill( - ctx, requester, - r.KeyRing, req.RoomID, info.RoomVersion, req.PrevEventIDs(), 100) + ctx, req.VirtualHost, requester, + r.KeyRing, req.RoomID, info.RoomVersion, req.PrevEventIDs(), 100, + ) if err != nil { + logrus.WithError(err).Errorf("gomatrixserverlib.RequestBackfill failed") return err } logrus.WithField("room_id", req.RoomID).Infof("backfilled %d events", len(events)) @@ -144,7 +146,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform var entries []types.StateEntry if entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true); err != nil { // attempt to fetch the missing events - r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs) + r.fetchAndStoreMissingEvents(ctx, info.RoomVersion, requester, stateIDs, req.VirtualHost) // try again entries, err = r.DB.StateEntriesForEventIDs(ctx, stateIDs, true) if err != nil { @@ -173,7 +175,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform // fetchAndStoreMissingEvents does a best-effort fetch and store of missing events specified in stateIDs. Returns no error as it is just // best effort. func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gomatrixserverlib.RoomVersion, - backfillRequester *backfillRequester, stateIDs []string) { + backfillRequester *backfillRequester, stateIDs []string, virtualHost gomatrixserverlib.ServerName) { servers := backfillRequester.servers @@ -198,7 +200,7 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom continue // already found } logger := util.GetLogger(ctx).WithField("server", srv).WithField("event_id", id) - res, err := r.FSAPI.GetEvent(ctx, srv, id) + res, err := r.FSAPI.GetEvent(ctx, virtualHost, srv, id) if err != nil { logger.WithError(err).Warn("failed to get event from server") continue @@ -241,11 +243,12 @@ func (r *Backfiller) fetchAndStoreMissingEvents(ctx context.Context, roomVer gom // backfillRequester implements gomatrixserverlib.BackfillRequester type backfillRequester struct { - db storage.Database - fsAPI federationAPI.RoomserverFederationAPI - thisServer gomatrixserverlib.ServerName - preferServer map[gomatrixserverlib.ServerName]bool - bwExtrems map[string][]string + db storage.Database + fsAPI federationAPI.RoomserverFederationAPI + virtualHost gomatrixserverlib.ServerName + isLocalServerName func(gomatrixserverlib.ServerName) bool + preferServer map[gomatrixserverlib.ServerName]bool + bwExtrems map[string][]string // per-request state servers []gomatrixserverlib.ServerName @@ -255,7 +258,9 @@ type backfillRequester struct { } func newBackfillRequester( - db storage.Database, fsAPI federationAPI.RoomserverFederationAPI, thisServer gomatrixserverlib.ServerName, + db storage.Database, fsAPI federationAPI.RoomserverFederationAPI, + virtualHost gomatrixserverlib.ServerName, + isLocalServerName func(gomatrixserverlib.ServerName) bool, bwExtrems map[string][]string, preferServers []gomatrixserverlib.ServerName, ) *backfillRequester { preferServer := make(map[gomatrixserverlib.ServerName]bool) @@ -265,7 +270,8 @@ func newBackfillRequester( return &backfillRequester{ db: db, fsAPI: fsAPI, - thisServer: thisServer, + virtualHost: virtualHost, + isLocalServerName: isLocalServerName, eventIDToBeforeStateIDs: make(map[string][]string), eventIDMap: make(map[string]*gomatrixserverlib.Event), bwExtrems: bwExtrems, @@ -450,7 +456,7 @@ FindSuccessor: } // possibly return all joined servers depending on history visiblity - memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.thisServer) + memberEventsFromVis, visibility, err := joinEventsFromHistoryVisibility(ctx, b.db, roomID, stateEntries, b.virtualHost) b.historyVisiblity = visibility if err != nil { logrus.WithError(err).Error("ServersAtEvent: failed calculate servers from history visibility rules") @@ -477,7 +483,7 @@ FindSuccessor: } var servers []gomatrixserverlib.ServerName for server := range serverSet { - if server == b.thisServer { + if b.isLocalServerName(server) { continue } if b.preferServer[server] { // insert at the front @@ -496,10 +502,10 @@ FindSuccessor: // Backfill performs a backfill request to the given server. // https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid -func (b *backfillRequester) Backfill(ctx context.Context, server gomatrixserverlib.ServerName, roomID string, +func (b *backfillRequester) Backfill(ctx context.Context, origin, server gomatrixserverlib.ServerName, roomID string, limit int, fromEventIDs []string) (gomatrixserverlib.Transaction, error) { - tx, err := b.fsAPI.Backfill(ctx, server, roomID, limit, fromEventIDs) + tx, err := b.fsAPI.Backfill(ctx, origin, server, roomID, limit, fromEventIDs) return tx, err } diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go index 9d596ab3..4de008c6 100644 --- a/roomserver/internal/perform/perform_join.go +++ b/roomserver/internal/perform/perform_join.go @@ -39,11 +39,10 @@ import ( ) type Joiner struct { - ServerName gomatrixserverlib.ServerName - Cfg *config.RoomServer - FSAPI fsAPI.RoomserverFederationAPI - RSAPI rsAPI.RoomserverInternalAPI - DB storage.Database + Cfg *config.RoomServer + FSAPI fsAPI.RoomserverFederationAPI + RSAPI rsAPI.RoomserverInternalAPI + DB storage.Database Inputer *input.Inputer Queryer *query.Queryer @@ -197,7 +196,7 @@ func (r *Joiner) performJoinRoomByID( // Prepare the template for the join event. userID := req.UserID - _, userDomain, err := gomatrixserverlib.SplitID('@', userID) + _, userDomain, err := r.Cfg.Matrix.SplitLocalID('@', userID) if err != nil { return "", "", &rsAPI.PerformError{ Code: rsAPI.PerformErrorBadRequest, @@ -283,7 +282,7 @@ func (r *Joiner) performJoinRoomByID( // locally on the homeserver. // TODO: Check what happens if the room exists on the server // but everyone has since left. I suspect it does the wrong thing. - event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, &eb) + event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, userDomain, &eb) switch err { case nil: @@ -410,7 +409,9 @@ func (r *Joiner) populateAuthorisedViaUserForRestrictedJoin( } func buildEvent( - ctx context.Context, db storage.Database, cfg *config.Global, builder *gomatrixserverlib.EventBuilder, + ctx context.Context, db storage.Database, cfg *config.Global, + senderDomain gomatrixserverlib.ServerName, + builder *gomatrixserverlib.EventBuilder, ) (*gomatrixserverlib.HeaderedEvent, *rsAPI.QueryLatestEventsAndStateResponse, error) { eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder) if err != nil { @@ -438,7 +439,12 @@ func buildEvent( } } - ev, err := eventutil.BuildEvent(ctx, builder, cfg, time.Now(), &eventsNeeded, &queryRes) + identity, err := cfg.SigningIdentityFor(senderDomain) + if err != nil { + return nil, nil, err + } + + ev, err := eventutil.BuildEvent(ctx, builder, cfg, identity, time.Now(), &eventsNeeded, &queryRes) if err != nil { return nil, nil, err } diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go index 49e4b479..fa998e3e 100644 --- a/roomserver/internal/perform/perform_leave.go +++ b/roomserver/internal/perform/perform_leave.go @@ -162,21 +162,21 @@ func (r *Leaver) performLeaveRoomByID( return nil, fmt.Errorf("eb.SetUnsigned: %w", err) } + // Get the sender domain. + _, senderDomain, serr := r.Cfg.Matrix.SplitLocalID('@', eb.Sender) + if serr != nil { + return nil, fmt.Errorf("sender %q is invalid", eb.Sender) + } + // We know that the user is in the room at this point so let's build // a leave event. // TODO: Check what happens if the room exists on the server // but everyone has since left. I suspect it does the wrong thing. - event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, &eb) + event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, senderDomain, &eb) if err != nil { return nil, fmt.Errorf("eventutil.BuildEvent: %w", err) } - // Get the sender domain. - _, senderDomain, serr := gomatrixserverlib.SplitID('@', event.Sender()) - if serr != nil { - return nil, fmt.Errorf("sender %q is invalid", event.Sender()) - } - // Give our leave event to the roomserver input stream. The // roomserver will process the membership change and notify // downstream automatically. diff --git a/roomserver/internal/perform/perform_upgrade.go b/roomserver/internal/perform/perform_upgrade.go index 38abe323..02a19911 100644 --- a/roomserver/internal/perform/perform_upgrade.go +++ b/roomserver/internal/perform/perform_upgrade.go @@ -60,7 +60,7 @@ func (r *Upgrader) performRoomUpgrade( ) (string, *api.PerformError) { roomID := req.RoomID userID := req.UserID - _, userDomain, err := gomatrixserverlib.SplitID('@', userID) + _, userDomain, err := r.Cfg.Matrix.SplitLocalID('@', userID) if err != nil { return "", &api.PerformError{ Code: api.PerformErrorNotAllowed, @@ -558,7 +558,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user SendAsServer: api.DoNotSendToOtherServers, }) } - if err = api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil { + if err = api.SendInputRoomEvents(ctx, r.URSAPI, userDomain, inputs, false); err != nil { return &api.PerformError{ Msg: fmt.Sprintf("Failed to send new room %q to roomserver: %s", newRoomID, err), } @@ -595,8 +595,21 @@ func (r *Upgrader) makeHeaderedEvent(ctx context.Context, evTime time.Time, user Msg: fmt.Sprintf("Failed to set new %q event content: %s", builder.Type, err), } } + // Get the sender domain. + _, senderDomain, serr := r.Cfg.Matrix.SplitLocalID('@', builder.Sender) + if serr != nil { + return nil, &api.PerformError{ + Msg: fmt.Sprintf("Failed to split user ID %q: %s", builder.Sender, err), + } + } + identity, err := r.Cfg.Matrix.SigningIdentityFor(senderDomain) + if err != nil { + return nil, &api.PerformError{ + Msg: fmt.Sprintf("Failed to get signing identity for %q: %s", senderDomain, err), + } + } var queryRes api.QueryLatestEventsAndStateResponse - headeredEvent, err := eventutil.QueryAndBuildEvent(ctx, &builder, r.Cfg.Matrix, evTime, r.URSAPI, &queryRes) + headeredEvent, err := eventutil.QueryAndBuildEvent(ctx, &builder, r.Cfg.Matrix, identity, evTime, r.URSAPI, &queryRes) if err == eventutil.ErrRoomNoExists { return nil, &api.PerformError{ Code: api.PerformErrorNoRoom, @@ -686,7 +699,7 @@ func (r *Upgrader) sendHeaderedEvent( Origin: serverName, SendAsServer: sendAsServer, }) - if err := api.SendInputRoomEvents(ctx, r.URSAPI, inputs, false); err != nil { + if err := api.SendInputRoomEvents(ctx, r.URSAPI, serverName, inputs, false); err != nil { return &api.PerformError{ Msg: fmt.Sprintf("Failed to send new %q event to roomserver: %s", headeredEvent.Type(), err), } diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go index 8850e5c4..d8456fb4 100644 --- a/roomserver/internal/query/query.go +++ b/roomserver/internal/query/query.go @@ -37,10 +37,10 @@ import ( ) type Queryer struct { - DB storage.Database - Cache caching.RoomServerCaches - ServerName gomatrixserverlib.ServerName - ServerACLs *acls.ServerACLs + DB storage.Database + Cache caching.RoomServerCaches + IsLocalServerName func(gomatrixserverlib.ServerName) bool + ServerACLs *acls.ServerACLs } // QueryLatestEventsAndState implements api.RoomserverInternalAPI @@ -392,7 +392,7 @@ func (r *Queryer) QueryServerJoinedToRoom( } response.RoomExists = true - if request.ServerName == r.ServerName || request.ServerName == "" { + if r.IsLocalServerName(request.ServerName) || request.ServerName == "" { response.IsInRoom, err = r.DB.GetLocalServerInRoom(ctx, info.RoomNID) if err != nil { return fmt.Errorf("r.DB.GetLocalServerInRoom: %w", err) diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 4e98af85..24b5515e 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -44,7 +44,7 @@ func Test_SharedUsers(t *testing.T) { // SetFederationAPI starts the room event input consumer rsAPI.SetFederationAPI(nil, nil) // Create the room - if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", nil, false); err != nil { + if err := api.SendEvents(ctx, rsAPI, api.KindNew, room.Events(), "test", "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err) } |