diff options
author | Kegsay <kegan@matrix.org> | 2020-09-02 17:13:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-02 17:13:15 +0100 |
commit | 9d9e854fe042cd2c83cf694d6b3e4c8e7046cde1 (patch) | |
tree | 75f9247df1d00b140c4249ef14b711b2839806bd | |
parent | f06637435b2124c89dfdd96cd723f54cc7055602 (diff) |
Add Queryer and Inputer and factor out more RSAPI stuff (#1382)
* Add Queryer and use embedded structs
* Add Inputer and factor out more RS API stuff
This neatly splits up the RS API based on the functionality it provides,
whilst providing a useful place for code sharing via the `helpers` package.
21 files changed, 292 insertions, 185 deletions
diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 5d635c01..cba19a24 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -270,7 +270,7 @@ func buildMembershipEvent( return nil, err } - return eventutil.BuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil) + return eventutil.QueryAndBuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil) } // loadProfile lookups the profile of a given user from the database and returns diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index faf92451..4c7895bd 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -375,7 +375,7 @@ func buildMembershipEvents( return nil, err } - event, err := eventutil.BuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil) + event, err := eventutil.QueryAndBuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil) if err != nil { return nil, err } diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go index bb526513..a825da64 100644 --- a/clientapi/routing/redaction.go +++ b/clientapi/routing/redaction.go @@ -115,7 +115,7 @@ func SendRedaction( } var queryRes api.QueryLatestEventsAndStateResponse - e, err := eventutil.BuildEvent(req.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes) + e, err := eventutil.QueryAndBuildEvent(req.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes) if err == eventutil.ErrRoomNoExists { return util.JSONResponse{ Code: http.StatusNotFound, diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index 9cf517cf..a25979ea 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -158,7 +158,7 @@ func generateSendEvent( } var queryRes api.QueryLatestEventsAndStateResponse - e, err := eventutil.BuildEvent(req.Context(), &builder, cfg.Matrix, evTime, rsAPI, &queryRes) + e, err := eventutil.QueryAndBuildEvent(req.Context(), &builder, cfg.Matrix, evTime, rsAPI, &queryRes) if err == eventutil.ErrRoomNoExists { return nil, &util.JSONResponse{ Code: http.StatusNotFound, diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index f1d54a47..2ffb6bb0 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -354,7 +354,7 @@ func emit3PIDInviteEvent( } queryRes := api.QueryLatestEventsAndStateResponse{} - event, err := eventutil.BuildEvent(ctx, builder, cfg.Matrix, evTime, rsAPI, &queryRes) + event, err := eventutil.QueryAndBuildEvent(ctx, builder, cfg.Matrix, evTime, rsAPI, &queryRes) if err != nil { return err } diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index ffdadd52..6cac1245 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -95,7 +95,7 @@ func MakeJoin( queryRes := api.QueryLatestEventsAndStateResponse{ RoomVersion: verRes.RoomVersion, } - event, err := eventutil.BuildEvent(httpReq.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes) + event, err := eventutil.QueryAndBuildEvent(httpReq.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes) if err == eventutil.ErrRoomNoExists { return util.JSONResponse{ Code: http.StatusNotFound, diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index d2fbfc71..51162344 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -61,7 +61,7 @@ func MakeLeave( } var queryRes api.QueryLatestEventsAndStateResponse - event, err := eventutil.BuildEvent(httpReq.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes) + event, err := eventutil.QueryAndBuildEvent(httpReq.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes) if err == eventutil.ErrRoomNoExists { return util.JSONResponse{ Code: http.StatusNotFound, diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go index 35c7f33d..0b878961 100644 --- a/internal/eventutil/events.go +++ b/internal/eventutil/events.go @@ -30,13 +30,13 @@ import ( // doesn't exist var ErrRoomNoExists = errors.New("Room does not exist") -// BuildEvent builds a Matrix event using the event builder and roomserver query +// QueryAndBuildEvent builds a Matrix event using the event builder and roomserver query // API client provided. If also fills roomserver query API response (if provided) // in case the function calling FillBuilder needs to use it. // Returns ErrRoomNoExists if the state of the room could not be retrieved because // the room doesn't exist // Returns an error if something else went wrong -func BuildEvent( +func QueryAndBuildEvent( ctx context.Context, builder *gomatrixserverlib.EventBuilder, cfg *config.Global, evTime time.Time, rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse, @@ -45,11 +45,25 @@ func BuildEvent( queryRes = &api.QueryLatestEventsAndStateResponse{} } - ver, err := AddPrevEventsToEvent(ctx, builder, rsAPI, queryRes) + eventsNeeded, err := queryRequiredEventsForBuilder(ctx, builder, rsAPI, queryRes) if err != nil { // This can pass through a ErrRoomNoExists to the caller return nil, err } + return BuildEvent(ctx, builder, cfg, evTime, eventsNeeded, queryRes) +} + +// BuildEvent builds a Matrix event from the builder and QueryLatestEventsAndStateResponse +// provided. +func BuildEvent( + ctx context.Context, + builder *gomatrixserverlib.EventBuilder, cfg *config.Global, evTime time.Time, + eventsNeeded *gomatrixserverlib.StateNeeded, queryRes *api.QueryLatestEventsAndStateResponse, +) (*gomatrixserverlib.HeaderedEvent, error) { + err := addPrevEventsToEvent(builder, eventsNeeded, queryRes) + if err != nil { + return nil, err + } event, err := builder.Build( evTime, cfg.ServerName, cfg.KeyID, @@ -59,23 +73,23 @@ func BuildEvent( return nil, err } - h := event.Headered(ver) + h := event.Headered(queryRes.RoomVersion) return &h, nil } -// AddPrevEventsToEvent fills out the prev_events and auth_events fields in builder -func AddPrevEventsToEvent( +// queryRequiredEventsForBuilder queries the roomserver for auth/prev events needed for this builder. +func queryRequiredEventsForBuilder( ctx context.Context, builder *gomatrixserverlib.EventBuilder, rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse, -) (gomatrixserverlib.RoomVersion, error) { +) (*gomatrixserverlib.StateNeeded, error) { eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder) if err != nil { - return "", fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err) + return nil, fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err) } if len(eventsNeeded.Tuples()) == 0 { - return "", errors.New("expecting state tuples for event builder, got none") + return nil, errors.New("expecting state tuples for event builder, got none") } // Ask the roomserver for information about this room @@ -83,17 +97,22 @@ func AddPrevEventsToEvent( RoomID: builder.RoomID, StateToFetch: eventsNeeded.Tuples(), } - if err = rsAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil { - return "", fmt.Errorf("rsAPI.QueryLatestEventsAndState: %w", err) - } + return &eventsNeeded, rsAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes) +} +// addPrevEventsToEvent fills out the prev_events and auth_events fields in builder +func addPrevEventsToEvent( + builder *gomatrixserverlib.EventBuilder, + eventsNeeded *gomatrixserverlib.StateNeeded, + queryRes *api.QueryLatestEventsAndStateResponse, +) error { if !queryRes.RoomExists { - return "", ErrRoomNoExists + return ErrRoomNoExists } eventFormat, err := queryRes.RoomVersion.EventFormat() if err != nil { - return "", fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err) + return fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err) } builder.Depth = queryRes.Depth @@ -103,13 +122,13 @@ func AddPrevEventsToEvent( for i := range queryRes.StateEvents { err = authEvents.AddEvent(&queryRes.StateEvents[i].Event) if err != nil { - return "", fmt.Errorf("authEvents.AddEvent: %w", err) + return fmt.Errorf("authEvents.AddEvent: %w", err) } } refs, err := eventsNeeded.AuthEventReferences(&authEvents) if err != nil { - return "", fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err) + return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err) } truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents) @@ -129,7 +148,7 @@ func AddPrevEventsToEvent( builder.PrevEvents = v2PrevRefs } - return queryRes.RoomVersion, nil + return nil } // truncateAuthAndPrevEvents limits the number of events we add into diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go index 8ac1bdda..93c0be77 100644 --- a/roomserver/internal/api.go +++ b/roomserver/internal/api.go @@ -2,20 +2,28 @@ package internal import ( "context" - "sync" "github.com/Shopify/sarama" fsAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/internal/perform" + "github.com/matrix-org/dendrite/roomserver/internal/query" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" ) // RoomserverInternalAPI is an implementation of api.RoomserverInternalAPI type RoomserverInternalAPI struct { + *input.Inputer + *query.Queryer + *perform.Inviter + *perform.Joiner + *perform.Leaver + *perform.Publisher + *perform.Backfiller DB storage.Database Cfg *config.RoomServer Producer sarama.SyncProducer @@ -24,12 +32,6 @@ type RoomserverInternalAPI struct { KeyRing gomatrixserverlib.JSONVerifier fsAPI fsAPI.FederationSenderInternalAPI OutputRoomEventTopic string // Kafka topic for new output room events - Inviter *perform.Inviter - Joiner *perform.Joiner - Leaver *perform.Leaver - Publisher *perform.Publisher - Backfiller *perform.Backfiller - mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent } func NewRoomserverAPI( @@ -38,13 +40,21 @@ func NewRoomserverAPI( keyRing gomatrixserverlib.JSONVerifier, ) *RoomserverInternalAPI { a := &RoomserverInternalAPI{ - DB: roomserverDB, - Cfg: cfg, - Producer: producer, - Cache: caches, - ServerName: cfg.Matrix.ServerName, - KeyRing: keyRing, - OutputRoomEventTopic: outputRoomEventTopic, + DB: roomserverDB, + Cfg: cfg, + Cache: caches, + ServerName: cfg.Matrix.ServerName, + KeyRing: keyRing, + Queryer: &query.Queryer{ + DB: roomserverDB, + Cache: caches, + }, + Inputer: &input.Inputer{ + DB: roomserverDB, + OutputRoomEventTopic: outputRoomEventTopic, + Producer: producer, + ServerName: cfg.Matrix.ServerName, + }, // perform-er structs get initialised when we have a federation sender to use } return a @@ -57,23 +67,23 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen r.fsAPI = fsAPI r.Inviter = &perform.Inviter{ - DB: r.DB, - Cfg: r.Cfg, - FSAPI: r.fsAPI, - RSAPI: r, + DB: r.DB, + Cfg: r.Cfg, + FSAPI: r.fsAPI, + Inputer: r.Inputer, } r.Joiner = &perform.Joiner{ ServerName: r.Cfg.Matrix.ServerName, Cfg: r.Cfg, DB: r.DB, FSAPI: r.fsAPI, - RSAPI: r, + Inputer: r.Inputer, } r.Leaver = &perform.Leaver{ - Cfg: r.Cfg, - DB: r.DB, - FSAPI: r.fsAPI, - RSAPI: r, + Cfg: r.Cfg, + DB: r.DB, + FSAPI: r.fsAPI, + Inputer: r.Inputer, } r.Publisher = &perform.Publisher{ DB: r.DB, @@ -101,14 +111,6 @@ func (r *RoomserverInternalAPI) PerformInvite( return r.WriteOutputEvents(req.Event.RoomID(), outputEvents) } -func (r *RoomserverInternalAPI) PerformJoin( - ctx context.Context, - req *api.PerformJoinRequest, - res *api.PerformJoinResponse, -) { - r.Joiner.PerformJoin(ctx, req, res) -} - func (r *RoomserverInternalAPI) PerformLeave( ctx context.Context, req *api.PerformLeaveRequest, @@ -123,20 +125,3 @@ func (r *RoomserverInternalAPI) PerformLeave( } return r.WriteOutputEvents(req.RoomID, outputEvents) } - -func (r *RoomserverInternalAPI) PerformPublish( - ctx context.Context, - req *api.PerformPublishRequest, - res *api.PerformPublishResponse, -) { - r.Publisher.PerformPublish(ctx, req, res) -} - -// Query a given amount (or less) of events prior to a given set of events. -func (r *RoomserverInternalAPI) PerformBackfill( - ctx context.Context, - request *api.PerformBackfillRequest, - response *api.PerformBackfillResponse, -) error { - return r.Backfiller.PerformBackfill(ctx, request, response) -} diff --git a/roomserver/internal/helpers/helpers.go b/roomserver/internal/helpers/helpers.go index d7bb40af..b7e6ce86 100644 --- a/roomserver/internal/helpers/helpers.go +++ b/roomserver/internal/helpers/helpers.go @@ -324,3 +324,56 @@ BFSLoop: return resultNIDs, err } + +func QueryLatestEventsAndState( + ctx context.Context, db storage.Database, + request *api.QueryLatestEventsAndStateRequest, + response *api.QueryLatestEventsAndStateResponse, +) error { + roomInfo, err := db.RoomInfo(ctx, request.RoomID) + if err != nil { + return err + } + if roomInfo == nil || roomInfo.IsStub { + response.RoomExists = false + return nil + } + + roomState := state.NewStateResolution(db, *roomInfo) + response.RoomExists = true + response.RoomVersion = roomInfo.RoomVersion + + var currentStateSnapshotNID types.StateSnapshotNID + response.LatestEvents, currentStateSnapshotNID, response.Depth, err = + db.LatestEventIDs(ctx, roomInfo.RoomNID) + if err != nil { + return err + } + + var stateEntries []types.StateEntry + if len(request.StateToFetch) == 0 { + // Look up all room state. + stateEntries, err = roomState.LoadStateAtSnapshot( + ctx, currentStateSnapshotNID, + ) + } else { + // Look up the current state for the requested tuples. + stateEntries, err = roomState.LoadStateAtSnapshotForStringTuples( + ctx, currentStateSnapshotNID, request.StateToFetch, + ) + } + if err != nil { + return err + } + + stateEvents, err := LoadStateEvents(ctx, db, stateEntries) + if err != nil { + return err + } + + for _, event := range stateEvents { + response.StateEvents = append(response.StateEvents, event.Headered(roomInfo.RoomVersion)) + } + + return nil +} diff --git a/roomserver/internal/input.go b/roomserver/internal/input/input.go index dbf67b79..87bdc5db 100644 --- a/roomserver/internal/input.go +++ b/roomserver/internal/input/input.go @@ -13,7 +13,7 @@ // limitations under the License. // Package input contains the code processes new room events -package internal +package input import ( "context" @@ -22,11 +22,22 @@ import ( "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/roomserver/storage" + "github.com/matrix-org/gomatrixserverlib" log "github.com/sirupsen/logrus" ) +type Inputer struct { + DB storage.Database + Producer sarama.SyncProducer + ServerName gomatrixserverlib.ServerName + OutputRoomEventTopic string + + mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent +} + // WriteOutputEvents implements OutputRoomEventWriter -func (r *RoomserverInternalAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { +func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { messages := make([]*sarama.ProducerMessage, len(updates)) for i := range updates { value, err := json.Marshal(updates[i]) @@ -58,7 +69,7 @@ func (r *RoomserverInternalAPI) WriteOutputEvents(roomID string, updates []api.O } // InputRoomEvents implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) InputRoomEvents( +func (r *Inputer) InputRoomEvents( ctx context.Context, request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, diff --git a/roomserver/internal/input_events.go b/roomserver/internal/input/input_events.go index edc8b416..69f51f4b 100644 --- a/roomserver/internal/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package input import ( "context" @@ -36,7 +36,7 @@ import ( // state deltas when sending to kafka streams // TODO: Break up function - we should probably do transaction ID checks before calling this. // nolint:gocyclo -func (r *RoomserverInternalAPI) processRoomEvent( +func (r *Inputer) processRoomEvent( ctx context.Context, input api.InputRoomEvent, ) (eventID string, err error) { @@ -141,7 +141,7 @@ func (r *RoomserverInternalAPI) processRoomEvent( return event.EventID(), nil } -func (r *RoomserverInternalAPI) calculateAndSetState( +func (r *Inputer) calculateAndSetState( ctx context.Context, input api.InputRoomEvent, roomInfo types.RoomInfo, diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index d5e38e7a..67a7d8a4 100644 --- a/roomserver/internal/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package input import ( "bytes" @@ -47,7 +47,7 @@ import ( // 7 <----- latest // // Can only be called once at a time -func (r *RoomserverInternalAPI) updateLatestEvents( +func (r *Inputer) updateLatestEvents( ctx context.Context, roomInfo *types.RoomInfo, stateAtEvent types.StateAtEvent, @@ -87,7 +87,7 @@ func (r *RoomserverInternalAPI) updateLatestEvents( // when there are so many variables to pass around. type latestEventsUpdater struct { ctx context.Context - api *RoomserverInternalAPI + api *Inputer updater *shared.LatestEventsUpdater roomInfo *types.RoomInfo stateAtEvent types.StateAtEvent diff --git a/roomserver/internal/input_membership.go b/roomserver/internal/input/input_membership.go index 57a94596..8befcd64 100644 --- a/roomserver/internal/input_membership.go +++ b/roomserver/internal/input/input_membership.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package input import ( "context" @@ -29,7 +29,7 @@ import ( // user affected by a change in the current state of the room. // Returns a list of output events to write to the kafka log to inform the // consumers about the invites added or retired by the change in current state. -func (r *RoomserverInternalAPI) updateMemberships( +func (r *Inputer) updateMemberships( ctx context.Context, updater *shared.LatestEventsUpdater, removed, added []types.StateEntry, @@ -78,7 +78,7 @@ func (r *RoomserverInternalAPI) updateMemberships( return updates, nil } -func (r *RoomserverInternalAPI) updateMembership( +func (r *Inputer) updateMembership( updater *shared.LatestEventsUpdater, targetUserNID types.EventStateKeyNID, remove, add *gomatrixserverlib.Event, @@ -133,11 +133,11 @@ func (r *RoomserverInternalAPI) updateMembership( } } -func (r *RoomserverInternalAPI) isLocalTarget(event *gomatrixserverlib.Event) bool { +func (r *Inputer) isLocalTarget(event *gomatrixserverlib.Event) bool { isTargetLocalUser := false if statekey := event.StateKey(); statekey != nil { _, domain, _ := gomatrixserverlib.SplitID('@', *statekey) - isTargetLocalUser = domain == r.Cfg.Matrix.ServerName + isTargetLocalUser = domain == r.ServerName } return isTargetLocalUser } diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go index d345e9c7..668c8078 100644 --- a/roomserver/internal/perform/perform_backfill.go +++ b/roomserver/internal/perform/perform_backfill.go @@ -1,3 +1,17 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package perform import ( diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go index 7320388e..e06ad062 100644 --- a/roomserver/internal/perform/perform_invite.go +++ b/roomserver/internal/perform/perform_invite.go @@ -1,3 +1,17 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package perform import ( @@ -8,6 +22,7 @@ import ( "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/state" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" @@ -16,12 +31,10 @@ import ( ) type Inviter struct { - DB storage.Database - Cfg *config.RoomServer - FSAPI federationSenderAPI.FederationSenderInternalAPI - - // TODO FIXME: Remove this - RSAPI api.RoomserverInternalAPI + DB storage.Database + Cfg *config.RoomServer + FSAPI federationSenderAPI.FederationSenderInternalAPI + Inputer *input.Inputer } // nolint:gocyclo @@ -170,7 +183,7 @@ func (r *Inviter) PerformInvite( }, } inputRes := &api.InputRoomEventsResponse{} - if err = r.RSAPI.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil { + if err = r.Inputer.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil { return nil, fmt.Errorf("r.InputRoomEvents: %w", err) } } else { diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go index c8e6e8e6..3d194227 100644 --- a/roomserver/internal/perform/perform_join.go +++ b/roomserver/internal/perform/perform_join.go @@ -1,3 +1,17 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package perform import ( @@ -12,6 +26,7 @@ import ( "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" @@ -23,8 +38,7 @@ type Joiner struct { FSAPI fsAPI.FederationSenderInternalAPI DB storage.Database - // TODO FIXME: Remove this - RSAPI api.RoomserverInternalAPI + Inputer *input.Inputer } // PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender. @@ -201,15 +215,7 @@ func (r *Joiner) performJoinRoomByID( // locally on the homeserver. // TODO: Check what happens if the room exists on the server // but everyone has since left. I suspect it does the wrong thing. - buildRes := api.QueryLatestEventsAndStateResponse{} - event, err := eventutil.BuildEvent( - ctx, // the request context - &eb, // the template join event - r.Cfg.Matrix, // the server configuration - time.Now(), // the event timestamp to use - r.RSAPI, // the roomserver API to use - &buildRes, // the query response - ) + event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, &eb) switch err { case nil: @@ -241,7 +247,7 @@ func (r *Joiner) performJoinRoomByID( }, } inputRes := api.InputRoomEventsResponse{} - if err = r.RSAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil { + if err = r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil { var notAllowed *gomatrixserverlib.NotAllowed if errors.As(err, ¬Allowed) { return "", &api.PerformError{ @@ -306,3 +312,31 @@ func (r *Joiner) performFederatedJoinRoomByID( } return nil } + +func buildEvent( + ctx context.Context, db storage.Database, cfg *config.Global, builder *gomatrixserverlib.EventBuilder, +) (*gomatrixserverlib.HeaderedEvent, *api.QueryLatestEventsAndStateResponse, error) { + eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder) + if err != nil { + return nil, nil, fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err) + } + + if len(eventsNeeded.Tuples()) == 0 { + return nil, nil, errors.New("expecting state tuples for event builder, got none") + } + + var queryRes api.QueryLatestEventsAndStateResponse + err = helpers.QueryLatestEventsAndState(ctx, db, &api.QueryLatestEventsAndStateRequest{ + RoomID: builder.RoomID, + StateToFetch: eventsNeeded.Tuples(), + }, &queryRes) + if err != nil { + return nil, nil, fmt.Errorf("QueryLatestEventsAndState: %w", err) + } + + ev, err := eventutil.BuildEvent(ctx, builder, cfg, time.Now(), &eventsNeeded, &queryRes) + if err != nil { + return nil, nil, err + } + return ev, &queryRes, nil +} diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go index b4053eed..aaa3b5b1 100644 --- a/roomserver/internal/perform/perform_leave.go +++ b/roomserver/internal/perform/perform_leave.go @@ -1,16 +1,29 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package perform import ( "context" "fmt" "strings" - "time" fsAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" - "github.com/matrix-org/dendrite/internal/eventutil" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" + "github.com/matrix-org/dendrite/roomserver/internal/input" "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/gomatrixserverlib" ) @@ -20,8 +33,7 @@ type Leaver struct { DB storage.Database FSAPI fsAPI.FederationSenderInternalAPI - // TODO FIXME: Remove this - RSAPI api.RoomserverInternalAPI + Inputer *input.Inputer } // WriteOutputEvents implements OutputRoomEventWriter @@ -67,7 +79,7 @@ func (r *Leaver) performLeaveRoomByID( }, } latestRes := api.QueryLatestEventsAndStateResponse{} - if err = r.RSAPI.QueryLatestEventsAndState(ctx, &latestReq, &latestRes); err != nil { + if err = helpers.QueryLatestEventsAndState(ctx, r.DB, &latestReq, &latestRes); err != nil { return nil, err } if !latestRes.RoomExists { @@ -108,15 +120,7 @@ func (r *Leaver) performLeaveRoomByID( // a leave event. // TODO: Check what happens if the room exists on the server // but everyone has since left. I suspect it does the wrong thing. - buildRes := api.QueryLatestEventsAndStateResponse{} - event, err := eventutil.BuildEvent( - ctx, // the request context - &eb, // the template leave event - r.Cfg.Matrix, // the server configuration - time.Now(), // the event timestamp to use - r.RSAPI, // the roomserver API to use - &buildRes, // the query response - ) + event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, &eb) if err != nil { return nil, fmt.Errorf("eventutil.BuildEvent: %w", err) } @@ -135,7 +139,7 @@ func (r *Leaver) performLeaveRoomByID( }, } inputRes := api.InputRoomEventsResponse{} - if err = r.RSAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil { + if err = r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil { return nil, fmt.Errorf("r.InputRoomEvents: %w", err) } diff --git a/roomserver/internal/perform/perform_publish.go b/roomserver/internal/perform/perform_publish.go index aab282f3..6ff42ac1 100644 --- a/roomserver/internal/perform/perform_publish.go +++ b/roomserver/internal/perform/perform_publish.go @@ -1,3 +1,17 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package perform import ( diff --git a/roomserver/internal/query.go b/roomserver/internal/query/query.go index 26b22c74..b2799aef 100644 --- a/roomserver/internal/query.go +++ b/roomserver/internal/query/query.go @@ -1,6 +1,4 @@ -// Copyright 2017 Vector Creations Ltd -// Copyright 2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,15 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package query import ( "context" "fmt" + "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/internal/helpers" "github.com/matrix-org/dendrite/roomserver/state" + "github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrixserverlib" @@ -30,62 +30,22 @@ import ( "github.com/sirupsen/logrus" ) +type Queryer struct { + DB storage.Database + Cache caching.RoomServerCaches +} + // QueryLatestEventsAndState implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryLatestEventsAndState( +func (r *Queryer) QueryLatestEventsAndState( ctx context.Context, request *api.QueryLatestEventsAndStateRequest, response *api.QueryLatestEventsAndStateResponse, ) error { - roomInfo, err := r.DB.RoomInfo(ctx, request.RoomID) - if err != nil { - return err - } - if roomInfo == nil || roomInfo.IsStub { - response.RoomExists = false - return nil - } - - roomState := state.NewStateResolution(r.DB, *roomInfo) - response.RoomExists = true - response.RoomVersion = roomInfo.RoomVersion - - var currentStateSnapshotNID types.StateSnapshotNID - response.LatestEvents, currentStateSnapshotNID, response.Depth, err = - r.DB.LatestEventIDs(ctx, roomInfo.RoomNID) - if err != nil { - return err - } - - var stateEntries []types.StateEntry - if len(request.StateToFetch) == 0 { - // Look up all room state. - stateEntries, err = roomState.LoadStateAtSnapshot( - ctx, currentStateSnapshotNID, - ) - } else { - // Look up the current state for the requested tuples. - stateEntries, err = roomState.LoadStateAtSnapshotForStringTuples( - ctx, currentStateSnapshotNID, request.StateToFetch, - ) - } - if err != nil { - return err - } - - stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries) - if err != nil { - return err - } - - for _, event := range stateEvents { - response.StateEvents = append(response.StateEvents, event.Headered(roomInfo.RoomVersion)) - } - - return nil + return helpers.QueryLatestEventsAndState(ctx, r.DB, request, response) } // QueryStateAfterEvents implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryStateAfterEvents( +func (r *Queryer) QueryStateAfterEvents( ctx context.Context, request *api.QueryStateAfterEventsRequest, response *api.QueryStateAfterEventsResponse, @@ -134,7 +94,7 @@ func (r *RoomserverInternalAPI) QueryStateAfterEvents( } // QueryEventsByID implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryEventsByID( +func (r *Queryer) QueryEventsByID( ctx context.Context, request *api.QueryEventsByIDRequest, response *api.QueryEventsByIDResponse, @@ -167,7 +127,7 @@ func (r *RoomserverInternalAPI) QueryEventsByID( } // QueryMembershipForUser implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryMembershipForUser( +func (r *Queryer) QueryMembershipForUser( ctx context.Context, request *api.QueryMembershipForUserRequest, response *api.QueryMembershipForUserResponse, @@ -204,7 +164,7 @@ func (r *RoomserverInternalAPI) QueryMembershipForUser( } // QueryMembershipsForRoom implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryMembershipsForRoom( +func (r *Queryer) QueryMembershipsForRoom( ctx context.Context, request *api.QueryMembershipsForRoomRequest, response *api.QueryMembershipsForRoomResponse, @@ -260,7 +220,7 @@ func (r *RoomserverInternalAPI) QueryMembershipsForRoom( } // QueryServerAllowedToSeeEvent implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryServerAllowedToSeeEvent( +func (r *Queryer) QueryServerAllowedToSeeEvent( ctx context.Context, request *api.QueryServerAllowedToSeeEventRequest, response *api.QueryServerAllowedToSeeEventResponse, @@ -293,7 +253,7 @@ func (r *RoomserverInternalAPI) QueryServerAllowedToSeeEvent( // QueryMissingEvents implements api.RoomserverInternalAPI // nolint:gocyclo -func (r *RoomserverInternalAPI) QueryMissingEvents( +func (r *Queryer) QueryMissingEvents( ctx context.Context, request *api.QueryMissingEventsRequest, response *api.QueryMissingEventsResponse, @@ -352,7 +312,7 @@ func (r *RoomserverInternalAPI) QueryMissingEvents( } // QueryStateAndAuthChain implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryStateAndAuthChain( +func (r *Queryer) QueryStateAndAuthChain( ctx context.Context, request *api.QueryStateAndAuthChainRequest, response *api.QueryStateAndAuthChainResponse, @@ -405,7 +365,7 @@ func (r *RoomserverInternalAPI) QueryStateAndAuthChain( return err } -func (r *RoomserverInternalAPI) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]gomatrixserverlib.Event, error) { +func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]gomatrixserverlib.Event, error) { roomState := state.NewStateResolution(r.DB, roomInfo) prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs) if err != nil { @@ -482,7 +442,7 @@ func getAuthChain( } // QueryRoomVersionCapabilities implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryRoomVersionCapabilities( +func (r *Queryer) QueryRoomVersionCapabilities( ctx context.Context, request *api.QueryRoomVersionCapabilitiesRequest, response *api.QueryRoomVersionCapabilitiesResponse, @@ -500,7 +460,7 @@ func (r *RoomserverInternalAPI) QueryRoomVersionCapabilities( } // QueryRoomVersionCapabilities implements api.RoomserverInternalAPI -func (r *RoomserverInternalAPI) QueryRoomVersionForRoom( +func (r *Queryer) QueryRoomVersionForRoom( ctx context.Context, request *api.QueryRoomVersionForRoomRequest, response *api.QueryRoomVersionForRoomResponse, @@ -522,7 +482,7 @@ func (r *RoomserverInternalAPI) QueryRoomVersionForRoom( return nil } -func (r *RoomserverInternalAPI) roomVersion(roomID string) (gomatrixserverlib.RoomVersion, error) { +func (r *Queryer) roomVersion(roomID string) (gomatrixserverlib.RoomVersion, error) { var res api.QueryRoomVersionForRoomResponse err := r.QueryRoomVersionForRoom(context.Background(), &api.QueryRoomVersionForRoomRequest{ RoomID: roomID, @@ -530,7 +490,7 @@ func (r *RoomserverInternalAPI) roomVersion(roomID string) (gomatrixserverlib.Ro return res.RoomVersion, err } -func (r *RoomserverInternalAPI) QueryPublishedRooms( +func (r *Queryer) QueryPublishedRooms( ctx context.Context, req *api.QueryPublishedRoomsRequest, res *api.QueryPublishedRoomsResponse, diff --git a/roomserver/internal/query_test.go b/roomserver/internal/query/query_test.go index 92e00832..b4cb99b8 100644 --- a/roomserver/internal/query_test.go +++ b/roomserver/internal/query/query_test.go @@ -1,4 +1,4 @@ -// Copyright 2017 Vector Creations Ltd +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package query import ( "context" |