aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-09-02 17:13:15 +0100
committerGitHub <noreply@github.com>2020-09-02 17:13:15 +0100
commit9d9e854fe042cd2c83cf694d6b3e4c8e7046cde1 (patch)
tree75f9247df1d00b140c4249ef14b711b2839806bd
parentf06637435b2124c89dfdd96cd723f54cc7055602 (diff)
Add Queryer and Inputer and factor out more RSAPI stuff (#1382)
* Add Queryer and use embedded structs * Add Inputer and factor out more RS API stuff This neatly splits up the RS API based on the functionality it provides, whilst providing a useful place for code sharing via the `helpers` package.
-rw-r--r--clientapi/routing/membership.go2
-rw-r--r--clientapi/routing/profile.go2
-rw-r--r--clientapi/routing/redaction.go2
-rw-r--r--clientapi/routing/sendevent.go2
-rw-r--r--clientapi/threepid/invites.go2
-rw-r--r--federationapi/routing/join.go2
-rw-r--r--federationapi/routing/leave.go2
-rw-r--r--internal/eventutil/events.go53
-rw-r--r--roomserver/internal/api.go81
-rw-r--r--roomserver/internal/helpers/helpers.go53
-rw-r--r--roomserver/internal/input/input.go (renamed from roomserver/internal/input.go)17
-rw-r--r--roomserver/internal/input/input_events.go (renamed from roomserver/internal/input_events.go)6
-rw-r--r--roomserver/internal/input/input_latest_events.go (renamed from roomserver/internal/input_latest_events.go)6
-rw-r--r--roomserver/internal/input/input_membership.go (renamed from roomserver/internal/input_membership.go)10
-rw-r--r--roomserver/internal/perform/perform_backfill.go14
-rw-r--r--roomserver/internal/perform/perform_invite.go27
-rw-r--r--roomserver/internal/perform/perform_join.go58
-rw-r--r--roomserver/internal/perform/perform_leave.go34
-rw-r--r--roomserver/internal/perform/perform_publish.go14
-rw-r--r--roomserver/internal/query/query.go (renamed from roomserver/internal/query.go)86
-rw-r--r--roomserver/internal/query/query_test.go (renamed from roomserver/internal/query_test.go)4
21 files changed, 292 insertions, 185 deletions
diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go
index 5d635c01..cba19a24 100644
--- a/clientapi/routing/membership.go
+++ b/clientapi/routing/membership.go
@@ -270,7 +270,7 @@ func buildMembershipEvent(
return nil, err
}
- return eventutil.BuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil)
+ return eventutil.QueryAndBuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil)
}
// loadProfile lookups the profile of a given user from the database and returns
diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go
index faf92451..4c7895bd 100644
--- a/clientapi/routing/profile.go
+++ b/clientapi/routing/profile.go
@@ -375,7 +375,7 @@ func buildMembershipEvents(
return nil, err
}
- event, err := eventutil.BuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil)
+ event, err := eventutil.QueryAndBuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil)
if err != nil {
return nil, err
}
diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go
index bb526513..a825da64 100644
--- a/clientapi/routing/redaction.go
+++ b/clientapi/routing/redaction.go
@@ -115,7 +115,7 @@ func SendRedaction(
}
var queryRes api.QueryLatestEventsAndStateResponse
- e, err := eventutil.BuildEvent(req.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes)
+ e, err := eventutil.QueryAndBuildEvent(req.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes)
if err == eventutil.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,
diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go
index 9cf517cf..a25979ea 100644
--- a/clientapi/routing/sendevent.go
+++ b/clientapi/routing/sendevent.go
@@ -158,7 +158,7 @@ func generateSendEvent(
}
var queryRes api.QueryLatestEventsAndStateResponse
- e, err := eventutil.BuildEvent(req.Context(), &builder, cfg.Matrix, evTime, rsAPI, &queryRes)
+ e, err := eventutil.QueryAndBuildEvent(req.Context(), &builder, cfg.Matrix, evTime, rsAPI, &queryRes)
if err == eventutil.ErrRoomNoExists {
return nil, &util.JSONResponse{
Code: http.StatusNotFound,
diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go
index f1d54a47..2ffb6bb0 100644
--- a/clientapi/threepid/invites.go
+++ b/clientapi/threepid/invites.go
@@ -354,7 +354,7 @@ func emit3PIDInviteEvent(
}
queryRes := api.QueryLatestEventsAndStateResponse{}
- event, err := eventutil.BuildEvent(ctx, builder, cfg.Matrix, evTime, rsAPI, &queryRes)
+ event, err := eventutil.QueryAndBuildEvent(ctx, builder, cfg.Matrix, evTime, rsAPI, &queryRes)
if err != nil {
return err
}
diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go
index ffdadd52..6cac1245 100644
--- a/federationapi/routing/join.go
+++ b/federationapi/routing/join.go
@@ -95,7 +95,7 @@ func MakeJoin(
queryRes := api.QueryLatestEventsAndStateResponse{
RoomVersion: verRes.RoomVersion,
}
- event, err := eventutil.BuildEvent(httpReq.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes)
+ event, err := eventutil.QueryAndBuildEvent(httpReq.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes)
if err == eventutil.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,
diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go
index d2fbfc71..51162344 100644
--- a/federationapi/routing/leave.go
+++ b/federationapi/routing/leave.go
@@ -61,7 +61,7 @@ func MakeLeave(
}
var queryRes api.QueryLatestEventsAndStateResponse
- event, err := eventutil.BuildEvent(httpReq.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes)
+ event, err := eventutil.QueryAndBuildEvent(httpReq.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes)
if err == eventutil.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,
diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go
index 35c7f33d..0b878961 100644
--- a/internal/eventutil/events.go
+++ b/internal/eventutil/events.go
@@ -30,13 +30,13 @@ import (
// doesn't exist
var ErrRoomNoExists = errors.New("Room does not exist")
-// BuildEvent builds a Matrix event using the event builder and roomserver query
+// QueryAndBuildEvent builds a Matrix event using the event builder and roomserver query
// API client provided. If also fills roomserver query API response (if provided)
// in case the function calling FillBuilder needs to use it.
// Returns ErrRoomNoExists if the state of the room could not be retrieved because
// the room doesn't exist
// Returns an error if something else went wrong
-func BuildEvent(
+func QueryAndBuildEvent(
ctx context.Context,
builder *gomatrixserverlib.EventBuilder, cfg *config.Global, evTime time.Time,
rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse,
@@ -45,11 +45,25 @@ func BuildEvent(
queryRes = &api.QueryLatestEventsAndStateResponse{}
}
- ver, err := AddPrevEventsToEvent(ctx, builder, rsAPI, queryRes)
+ eventsNeeded, err := queryRequiredEventsForBuilder(ctx, builder, rsAPI, queryRes)
if err != nil {
// This can pass through a ErrRoomNoExists to the caller
return nil, err
}
+ return BuildEvent(ctx, builder, cfg, evTime, eventsNeeded, queryRes)
+}
+
+// BuildEvent builds a Matrix event from the builder and QueryLatestEventsAndStateResponse
+// provided.
+func BuildEvent(
+ ctx context.Context,
+ builder *gomatrixserverlib.EventBuilder, cfg *config.Global, evTime time.Time,
+ eventsNeeded *gomatrixserverlib.StateNeeded, queryRes *api.QueryLatestEventsAndStateResponse,
+) (*gomatrixserverlib.HeaderedEvent, error) {
+ err := addPrevEventsToEvent(builder, eventsNeeded, queryRes)
+ if err != nil {
+ return nil, err
+ }
event, err := builder.Build(
evTime, cfg.ServerName, cfg.KeyID,
@@ -59,23 +73,23 @@ func BuildEvent(
return nil, err
}
- h := event.Headered(ver)
+ h := event.Headered(queryRes.RoomVersion)
return &h, nil
}
-// AddPrevEventsToEvent fills out the prev_events and auth_events fields in builder
-func AddPrevEventsToEvent(
+// queryRequiredEventsForBuilder queries the roomserver for auth/prev events needed for this builder.
+func queryRequiredEventsForBuilder(
ctx context.Context,
builder *gomatrixserverlib.EventBuilder,
rsAPI api.RoomserverInternalAPI, queryRes *api.QueryLatestEventsAndStateResponse,
-) (gomatrixserverlib.RoomVersion, error) {
+) (*gomatrixserverlib.StateNeeded, error) {
eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
if err != nil {
- return "", fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err)
+ return nil, fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err)
}
if len(eventsNeeded.Tuples()) == 0 {
- return "", errors.New("expecting state tuples for event builder, got none")
+ return nil, errors.New("expecting state tuples for event builder, got none")
}
// Ask the roomserver for information about this room
@@ -83,17 +97,22 @@ func AddPrevEventsToEvent(
RoomID: builder.RoomID,
StateToFetch: eventsNeeded.Tuples(),
}
- if err = rsAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes); err != nil {
- return "", fmt.Errorf("rsAPI.QueryLatestEventsAndState: %w", err)
- }
+ return &eventsNeeded, rsAPI.QueryLatestEventsAndState(ctx, &queryReq, queryRes)
+}
+// addPrevEventsToEvent fills out the prev_events and auth_events fields in builder
+func addPrevEventsToEvent(
+ builder *gomatrixserverlib.EventBuilder,
+ eventsNeeded *gomatrixserverlib.StateNeeded,
+ queryRes *api.QueryLatestEventsAndStateResponse,
+) error {
if !queryRes.RoomExists {
- return "", ErrRoomNoExists
+ return ErrRoomNoExists
}
eventFormat, err := queryRes.RoomVersion.EventFormat()
if err != nil {
- return "", fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err)
+ return fmt.Errorf("queryRes.RoomVersion.EventFormat: %w", err)
}
builder.Depth = queryRes.Depth
@@ -103,13 +122,13 @@ func AddPrevEventsToEvent(
for i := range queryRes.StateEvents {
err = authEvents.AddEvent(&queryRes.StateEvents[i].Event)
if err != nil {
- return "", fmt.Errorf("authEvents.AddEvent: %w", err)
+ return fmt.Errorf("authEvents.AddEvent: %w", err)
}
}
refs, err := eventsNeeded.AuthEventReferences(&authEvents)
if err != nil {
- return "", fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err)
+ return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err)
}
truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents)
@@ -129,7 +148,7 @@ func AddPrevEventsToEvent(
builder.PrevEvents = v2PrevRefs
}
- return queryRes.RoomVersion, nil
+ return nil
}
// truncateAuthAndPrevEvents limits the number of events we add into
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 8ac1bdda..93c0be77 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -2,20 +2,28 @@ package internal
import (
"context"
- "sync"
"github.com/Shopify/sarama"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/internal/perform"
+ "github.com/matrix-org/dendrite/roomserver/internal/query"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
)
// RoomserverInternalAPI is an implementation of api.RoomserverInternalAPI
type RoomserverInternalAPI struct {
+ *input.Inputer
+ *query.Queryer
+ *perform.Inviter
+ *perform.Joiner
+ *perform.Leaver
+ *perform.Publisher
+ *perform.Backfiller
DB storage.Database
Cfg *config.RoomServer
Producer sarama.SyncProducer
@@ -24,12 +32,6 @@ type RoomserverInternalAPI struct {
KeyRing gomatrixserverlib.JSONVerifier
fsAPI fsAPI.FederationSenderInternalAPI
OutputRoomEventTopic string // Kafka topic for new output room events
- Inviter *perform.Inviter
- Joiner *perform.Joiner
- Leaver *perform.Leaver
- Publisher *perform.Publisher
- Backfiller *perform.Backfiller
- mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent
}
func NewRoomserverAPI(
@@ -38,13 +40,21 @@ func NewRoomserverAPI(
keyRing gomatrixserverlib.JSONVerifier,
) *RoomserverInternalAPI {
a := &RoomserverInternalAPI{
- DB: roomserverDB,
- Cfg: cfg,
- Producer: producer,
- Cache: caches,
- ServerName: cfg.Matrix.ServerName,
- KeyRing: keyRing,
- OutputRoomEventTopic: outputRoomEventTopic,
+ DB: roomserverDB,
+ Cfg: cfg,
+ Cache: caches,
+ ServerName: cfg.Matrix.ServerName,
+ KeyRing: keyRing,
+ Queryer: &query.Queryer{
+ DB: roomserverDB,
+ Cache: caches,
+ },
+ Inputer: &input.Inputer{
+ DB: roomserverDB,
+ OutputRoomEventTopic: outputRoomEventTopic,
+ Producer: producer,
+ ServerName: cfg.Matrix.ServerName,
+ },
// perform-er structs get initialised when we have a federation sender to use
}
return a
@@ -57,23 +67,23 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
r.fsAPI = fsAPI
r.Inviter = &perform.Inviter{
- DB: r.DB,
- Cfg: r.Cfg,
- FSAPI: r.fsAPI,
- RSAPI: r,
+ DB: r.DB,
+ Cfg: r.Cfg,
+ FSAPI: r.fsAPI,
+ Inputer: r.Inputer,
}
r.Joiner = &perform.Joiner{
ServerName: r.Cfg.Matrix.ServerName,
Cfg: r.Cfg,
DB: r.DB,
FSAPI: r.fsAPI,
- RSAPI: r,
+ Inputer: r.Inputer,
}
r.Leaver = &perform.Leaver{
- Cfg: r.Cfg,
- DB: r.DB,
- FSAPI: r.fsAPI,
- RSAPI: r,
+ Cfg: r.Cfg,
+ DB: r.DB,
+ FSAPI: r.fsAPI,
+ Inputer: r.Inputer,
}
r.Publisher = &perform.Publisher{
DB: r.DB,
@@ -101,14 +111,6 @@ func (r *RoomserverInternalAPI) PerformInvite(
return r.WriteOutputEvents(req.Event.RoomID(), outputEvents)
}
-func (r *RoomserverInternalAPI) PerformJoin(
- ctx context.Context,
- req *api.PerformJoinRequest,
- res *api.PerformJoinResponse,
-) {
- r.Joiner.PerformJoin(ctx, req, res)
-}
-
func (r *RoomserverInternalAPI) PerformLeave(
ctx context.Context,
req *api.PerformLeaveRequest,
@@ -123,20 +125,3 @@ func (r *RoomserverInternalAPI) PerformLeave(
}
return r.WriteOutputEvents(req.RoomID, outputEvents)
}
-
-func (r *RoomserverInternalAPI) PerformPublish(
- ctx context.Context,
- req *api.PerformPublishRequest,
- res *api.PerformPublishResponse,
-) {
- r.Publisher.PerformPublish(ctx, req, res)
-}
-
-// Query a given amount (or less) of events prior to a given set of events.
-func (r *RoomserverInternalAPI) PerformBackfill(
- ctx context.Context,
- request *api.PerformBackfillRequest,
- response *api.PerformBackfillResponse,
-) error {
- return r.Backfiller.PerformBackfill(ctx, request, response)
-}
diff --git a/roomserver/internal/helpers/helpers.go b/roomserver/internal/helpers/helpers.go
index d7bb40af..b7e6ce86 100644
--- a/roomserver/internal/helpers/helpers.go
+++ b/roomserver/internal/helpers/helpers.go
@@ -324,3 +324,56 @@ BFSLoop:
return resultNIDs, err
}
+
+func QueryLatestEventsAndState(
+ ctx context.Context, db storage.Database,
+ request *api.QueryLatestEventsAndStateRequest,
+ response *api.QueryLatestEventsAndStateResponse,
+) error {
+ roomInfo, err := db.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+ if roomInfo == nil || roomInfo.IsStub {
+ response.RoomExists = false
+ return nil
+ }
+
+ roomState := state.NewStateResolution(db, *roomInfo)
+ response.RoomExists = true
+ response.RoomVersion = roomInfo.RoomVersion
+
+ var currentStateSnapshotNID types.StateSnapshotNID
+ response.LatestEvents, currentStateSnapshotNID, response.Depth, err =
+ db.LatestEventIDs(ctx, roomInfo.RoomNID)
+ if err != nil {
+ return err
+ }
+
+ var stateEntries []types.StateEntry
+ if len(request.StateToFetch) == 0 {
+ // Look up all room state.
+ stateEntries, err = roomState.LoadStateAtSnapshot(
+ ctx, currentStateSnapshotNID,
+ )
+ } else {
+ // Look up the current state for the requested tuples.
+ stateEntries, err = roomState.LoadStateAtSnapshotForStringTuples(
+ ctx, currentStateSnapshotNID, request.StateToFetch,
+ )
+ }
+ if err != nil {
+ return err
+ }
+
+ stateEvents, err := LoadStateEvents(ctx, db, stateEntries)
+ if err != nil {
+ return err
+ }
+
+ for _, event := range stateEvents {
+ response.StateEvents = append(response.StateEvents, event.Headered(roomInfo.RoomVersion))
+ }
+
+ return nil
+}
diff --git a/roomserver/internal/input.go b/roomserver/internal/input/input.go
index dbf67b79..87bdc5db 100644
--- a/roomserver/internal/input.go
+++ b/roomserver/internal/input/input.go
@@ -13,7 +13,7 @@
// limitations under the License.
// Package input contains the code processes new room events
-package internal
+package input
import (
"context"
@@ -22,11 +22,22 @@ import (
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
+type Inputer struct {
+ DB storage.Database
+ Producer sarama.SyncProducer
+ ServerName gomatrixserverlib.ServerName
+ OutputRoomEventTopic string
+
+ mutexes sync.Map // room ID -> *sync.Mutex, protects calls to processRoomEvent
+}
+
// WriteOutputEvents implements OutputRoomEventWriter
-func (r *RoomserverInternalAPI) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
+func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
messages := make([]*sarama.ProducerMessage, len(updates))
for i := range updates {
value, err := json.Marshal(updates[i])
@@ -58,7 +69,7 @@ func (r *RoomserverInternalAPI) WriteOutputEvents(roomID string, updates []api.O
}
// InputRoomEvents implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) InputRoomEvents(
+func (r *Inputer) InputRoomEvents(
ctx context.Context,
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
diff --git a/roomserver/internal/input_events.go b/roomserver/internal/input/input_events.go
index edc8b416..69f51f4b 100644
--- a/roomserver/internal/input_events.go
+++ b/roomserver/internal/input/input_events.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package internal
+package input
import (
"context"
@@ -36,7 +36,7 @@ import (
// state deltas when sending to kafka streams
// TODO: Break up function - we should probably do transaction ID checks before calling this.
// nolint:gocyclo
-func (r *RoomserverInternalAPI) processRoomEvent(
+func (r *Inputer) processRoomEvent(
ctx context.Context,
input api.InputRoomEvent,
) (eventID string, err error) {
@@ -141,7 +141,7 @@ func (r *RoomserverInternalAPI) processRoomEvent(
return event.EventID(), nil
}
-func (r *RoomserverInternalAPI) calculateAndSetState(
+func (r *Inputer) calculateAndSetState(
ctx context.Context,
input api.InputRoomEvent,
roomInfo types.RoomInfo,
diff --git a/roomserver/internal/input_latest_events.go b/roomserver/internal/input/input_latest_events.go
index d5e38e7a..67a7d8a4 100644
--- a/roomserver/internal/input_latest_events.go
+++ b/roomserver/internal/input/input_latest_events.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package internal
+package input
import (
"bytes"
@@ -47,7 +47,7 @@ import (
// 7 <----- latest
//
// Can only be called once at a time
-func (r *RoomserverInternalAPI) updateLatestEvents(
+func (r *Inputer) updateLatestEvents(
ctx context.Context,
roomInfo *types.RoomInfo,
stateAtEvent types.StateAtEvent,
@@ -87,7 +87,7 @@ func (r *RoomserverInternalAPI) updateLatestEvents(
// when there are so many variables to pass around.
type latestEventsUpdater struct {
ctx context.Context
- api *RoomserverInternalAPI
+ api *Inputer
updater *shared.LatestEventsUpdater
roomInfo *types.RoomInfo
stateAtEvent types.StateAtEvent
diff --git a/roomserver/internal/input_membership.go b/roomserver/internal/input/input_membership.go
index 57a94596..8befcd64 100644
--- a/roomserver/internal/input_membership.go
+++ b/roomserver/internal/input/input_membership.go
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package internal
+package input
import (
"context"
@@ -29,7 +29,7 @@ import (
// user affected by a change in the current state of the room.
// Returns a list of output events to write to the kafka log to inform the
// consumers about the invites added or retired by the change in current state.
-func (r *RoomserverInternalAPI) updateMemberships(
+func (r *Inputer) updateMemberships(
ctx context.Context,
updater *shared.LatestEventsUpdater,
removed, added []types.StateEntry,
@@ -78,7 +78,7 @@ func (r *RoomserverInternalAPI) updateMemberships(
return updates, nil
}
-func (r *RoomserverInternalAPI) updateMembership(
+func (r *Inputer) updateMembership(
updater *shared.LatestEventsUpdater,
targetUserNID types.EventStateKeyNID,
remove, add *gomatrixserverlib.Event,
@@ -133,11 +133,11 @@ func (r *RoomserverInternalAPI) updateMembership(
}
}
-func (r *RoomserverInternalAPI) isLocalTarget(event *gomatrixserverlib.Event) bool {
+func (r *Inputer) isLocalTarget(event *gomatrixserverlib.Event) bool {
isTargetLocalUser := false
if statekey := event.StateKey(); statekey != nil {
_, domain, _ := gomatrixserverlib.SplitID('@', *statekey)
- isTargetLocalUser = domain == r.Cfg.Matrix.ServerName
+ isTargetLocalUser = domain == r.ServerName
}
return isTargetLocalUser
}
diff --git a/roomserver/internal/perform/perform_backfill.go b/roomserver/internal/perform/perform_backfill.go
index d345e9c7..668c8078 100644
--- a/roomserver/internal/perform/perform_backfill.go
+++ b/roomserver/internal/perform/perform_backfill.go
@@ -1,3 +1,17 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package perform
import (
diff --git a/roomserver/internal/perform/perform_invite.go b/roomserver/internal/perform/perform_invite.go
index 7320388e..e06ad062 100644
--- a/roomserver/internal/perform/perform_invite.go
+++ b/roomserver/internal/perform/perform_invite.go
@@ -1,3 +1,17 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package perform
import (
@@ -8,6 +22,7 @@ import (
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
@@ -16,12 +31,10 @@ import (
)
type Inviter struct {
- DB storage.Database
- Cfg *config.RoomServer
- FSAPI federationSenderAPI.FederationSenderInternalAPI
-
- // TODO FIXME: Remove this
- RSAPI api.RoomserverInternalAPI
+ DB storage.Database
+ Cfg *config.RoomServer
+ FSAPI federationSenderAPI.FederationSenderInternalAPI
+ Inputer *input.Inputer
}
// nolint:gocyclo
@@ -170,7 +183,7 @@ func (r *Inviter) PerformInvite(
},
}
inputRes := &api.InputRoomEventsResponse{}
- if err = r.RSAPI.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil {
+ if err = r.Inputer.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil {
return nil, fmt.Errorf("r.InputRoomEvents: %w", err)
}
} else {
diff --git a/roomserver/internal/perform/perform_join.go b/roomserver/internal/perform/perform_join.go
index c8e6e8e6..3d194227 100644
--- a/roomserver/internal/perform/perform_join.go
+++ b/roomserver/internal/perform/perform_join.go
@@ -1,3 +1,17 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package perform
import (
@@ -12,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
@@ -23,8 +38,7 @@ type Joiner struct {
FSAPI fsAPI.FederationSenderInternalAPI
DB storage.Database
- // TODO FIXME: Remove this
- RSAPI api.RoomserverInternalAPI
+ Inputer *input.Inputer
}
// PerformJoin handles joining matrix rooms, including over federation by talking to the federationsender.
@@ -201,15 +215,7 @@ func (r *Joiner) performJoinRoomByID(
// locally on the homeserver.
// TODO: Check what happens if the room exists on the server
// but everyone has since left. I suspect it does the wrong thing.
- buildRes := api.QueryLatestEventsAndStateResponse{}
- event, err := eventutil.BuildEvent(
- ctx, // the request context
- &eb, // the template join event
- r.Cfg.Matrix, // the server configuration
- time.Now(), // the event timestamp to use
- r.RSAPI, // the roomserver API to use
- &buildRes, // the query response
- )
+ event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, &eb)
switch err {
case nil:
@@ -241,7 +247,7 @@ func (r *Joiner) performJoinRoomByID(
},
}
inputRes := api.InputRoomEventsResponse{}
- if err = r.RSAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
+ if err = r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
var notAllowed *gomatrixserverlib.NotAllowed
if errors.As(err, &notAllowed) {
return "", &api.PerformError{
@@ -306,3 +312,31 @@ func (r *Joiner) performFederatedJoinRoomByID(
}
return nil
}
+
+func buildEvent(
+ ctx context.Context, db storage.Database, cfg *config.Global, builder *gomatrixserverlib.EventBuilder,
+) (*gomatrixserverlib.HeaderedEvent, *api.QueryLatestEventsAndStateResponse, error) {
+ eventsNeeded, err := gomatrixserverlib.StateNeededForEventBuilder(builder)
+ if err != nil {
+ return nil, nil, fmt.Errorf("gomatrixserverlib.StateNeededForEventBuilder: %w", err)
+ }
+
+ if len(eventsNeeded.Tuples()) == 0 {
+ return nil, nil, errors.New("expecting state tuples for event builder, got none")
+ }
+
+ var queryRes api.QueryLatestEventsAndStateResponse
+ err = helpers.QueryLatestEventsAndState(ctx, db, &api.QueryLatestEventsAndStateRequest{
+ RoomID: builder.RoomID,
+ StateToFetch: eventsNeeded.Tuples(),
+ }, &queryRes)
+ if err != nil {
+ return nil, nil, fmt.Errorf("QueryLatestEventsAndState: %w", err)
+ }
+
+ ev, err := eventutil.BuildEvent(ctx, builder, cfg, time.Now(), &eventsNeeded, &queryRes)
+ if err != nil {
+ return nil, nil, err
+ }
+ return ev, &queryRes, nil
+}
diff --git a/roomserver/internal/perform/perform_leave.go b/roomserver/internal/perform/perform_leave.go
index b4053eed..aaa3b5b1 100644
--- a/roomserver/internal/perform/perform_leave.go
+++ b/roomserver/internal/perform/perform_leave.go
@@ -1,16 +1,29 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package perform
import (
"context"
"fmt"
"strings"
- "time"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
- "github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -20,8 +33,7 @@ type Leaver struct {
DB storage.Database
FSAPI fsAPI.FederationSenderInternalAPI
- // TODO FIXME: Remove this
- RSAPI api.RoomserverInternalAPI
+ Inputer *input.Inputer
}
// WriteOutputEvents implements OutputRoomEventWriter
@@ -67,7 +79,7 @@ func (r *Leaver) performLeaveRoomByID(
},
}
latestRes := api.QueryLatestEventsAndStateResponse{}
- if err = r.RSAPI.QueryLatestEventsAndState(ctx, &latestReq, &latestRes); err != nil {
+ if err = helpers.QueryLatestEventsAndState(ctx, r.DB, &latestReq, &latestRes); err != nil {
return nil, err
}
if !latestRes.RoomExists {
@@ -108,15 +120,7 @@ func (r *Leaver) performLeaveRoomByID(
// a leave event.
// TODO: Check what happens if the room exists on the server
// but everyone has since left. I suspect it does the wrong thing.
- buildRes := api.QueryLatestEventsAndStateResponse{}
- event, err := eventutil.BuildEvent(
- ctx, // the request context
- &eb, // the template leave event
- r.Cfg.Matrix, // the server configuration
- time.Now(), // the event timestamp to use
- r.RSAPI, // the roomserver API to use
- &buildRes, // the query response
- )
+ event, buildRes, err := buildEvent(ctx, r.DB, r.Cfg.Matrix, &eb)
if err != nil {
return nil, fmt.Errorf("eventutil.BuildEvent: %w", err)
}
@@ -135,7 +139,7 @@ func (r *Leaver) performLeaveRoomByID(
},
}
inputRes := api.InputRoomEventsResponse{}
- if err = r.RSAPI.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
+ if err = r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
return nil, fmt.Errorf("r.InputRoomEvents: %w", err)
}
diff --git a/roomserver/internal/perform/perform_publish.go b/roomserver/internal/perform/perform_publish.go
index aab282f3..6ff42ac1 100644
--- a/roomserver/internal/perform/perform_publish.go
+++ b/roomserver/internal/perform/perform_publish.go
@@ -1,3 +1,17 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package perform
import (
diff --git a/roomserver/internal/query.go b/roomserver/internal/query/query.go
index 26b22c74..b2799aef 100644
--- a/roomserver/internal/query.go
+++ b/roomserver/internal/query/query.go
@@ -1,6 +1,4 @@
-// Copyright 2017 Vector Creations Ltd
-// Copyright 2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
+// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -14,15 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package internal
+package query
import (
"context"
"fmt"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
"github.com/matrix-org/dendrite/roomserver/state"
+ "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
@@ -30,62 +30,22 @@ import (
"github.com/sirupsen/logrus"
)
+type Queryer struct {
+ DB storage.Database
+ Cache caching.RoomServerCaches
+}
+
// QueryLatestEventsAndState implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryLatestEventsAndState(
+func (r *Queryer) QueryLatestEventsAndState(
ctx context.Context,
request *api.QueryLatestEventsAndStateRequest,
response *api.QueryLatestEventsAndStateResponse,
) error {
- roomInfo, err := r.DB.RoomInfo(ctx, request.RoomID)
- if err != nil {
- return err
- }
- if roomInfo == nil || roomInfo.IsStub {
- response.RoomExists = false
- return nil
- }
-
- roomState := state.NewStateResolution(r.DB, *roomInfo)
- response.RoomExists = true
- response.RoomVersion = roomInfo.RoomVersion
-
- var currentStateSnapshotNID types.StateSnapshotNID
- response.LatestEvents, currentStateSnapshotNID, response.Depth, err =
- r.DB.LatestEventIDs(ctx, roomInfo.RoomNID)
- if err != nil {
- return err
- }
-
- var stateEntries []types.StateEntry
- if len(request.StateToFetch) == 0 {
- // Look up all room state.
- stateEntries, err = roomState.LoadStateAtSnapshot(
- ctx, currentStateSnapshotNID,
- )
- } else {
- // Look up the current state for the requested tuples.
- stateEntries, err = roomState.LoadStateAtSnapshotForStringTuples(
- ctx, currentStateSnapshotNID, request.StateToFetch,
- )
- }
- if err != nil {
- return err
- }
-
- stateEvents, err := helpers.LoadStateEvents(ctx, r.DB, stateEntries)
- if err != nil {
- return err
- }
-
- for _, event := range stateEvents {
- response.StateEvents = append(response.StateEvents, event.Headered(roomInfo.RoomVersion))
- }
-
- return nil
+ return helpers.QueryLatestEventsAndState(ctx, r.DB, request, response)
}
// QueryStateAfterEvents implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryStateAfterEvents(
+func (r *Queryer) QueryStateAfterEvents(
ctx context.Context,
request *api.QueryStateAfterEventsRequest,
response *api.QueryStateAfterEventsResponse,
@@ -134,7 +94,7 @@ func (r *RoomserverInternalAPI) QueryStateAfterEvents(
}
// QueryEventsByID implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryEventsByID(
+func (r *Queryer) QueryEventsByID(
ctx context.Context,
request *api.QueryEventsByIDRequest,
response *api.QueryEventsByIDResponse,
@@ -167,7 +127,7 @@ func (r *RoomserverInternalAPI) QueryEventsByID(
}
// QueryMembershipForUser implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryMembershipForUser(
+func (r *Queryer) QueryMembershipForUser(
ctx context.Context,
request *api.QueryMembershipForUserRequest,
response *api.QueryMembershipForUserResponse,
@@ -204,7 +164,7 @@ func (r *RoomserverInternalAPI) QueryMembershipForUser(
}
// QueryMembershipsForRoom implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryMembershipsForRoom(
+func (r *Queryer) QueryMembershipsForRoom(
ctx context.Context,
request *api.QueryMembershipsForRoomRequest,
response *api.QueryMembershipsForRoomResponse,
@@ -260,7 +220,7 @@ func (r *RoomserverInternalAPI) QueryMembershipsForRoom(
}
// QueryServerAllowedToSeeEvent implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryServerAllowedToSeeEvent(
+func (r *Queryer) QueryServerAllowedToSeeEvent(
ctx context.Context,
request *api.QueryServerAllowedToSeeEventRequest,
response *api.QueryServerAllowedToSeeEventResponse,
@@ -293,7 +253,7 @@ func (r *RoomserverInternalAPI) QueryServerAllowedToSeeEvent(
// QueryMissingEvents implements api.RoomserverInternalAPI
// nolint:gocyclo
-func (r *RoomserverInternalAPI) QueryMissingEvents(
+func (r *Queryer) QueryMissingEvents(
ctx context.Context,
request *api.QueryMissingEventsRequest,
response *api.QueryMissingEventsResponse,
@@ -352,7 +312,7 @@ func (r *RoomserverInternalAPI) QueryMissingEvents(
}
// QueryStateAndAuthChain implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryStateAndAuthChain(
+func (r *Queryer) QueryStateAndAuthChain(
ctx context.Context,
request *api.QueryStateAndAuthChainRequest,
response *api.QueryStateAndAuthChainResponse,
@@ -405,7 +365,7 @@ func (r *RoomserverInternalAPI) QueryStateAndAuthChain(
return err
}
-func (r *RoomserverInternalAPI) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]gomatrixserverlib.Event, error) {
+func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]gomatrixserverlib.Event, error) {
roomState := state.NewStateResolution(r.DB, roomInfo)
prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs)
if err != nil {
@@ -482,7 +442,7 @@ func getAuthChain(
}
// QueryRoomVersionCapabilities implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryRoomVersionCapabilities(
+func (r *Queryer) QueryRoomVersionCapabilities(
ctx context.Context,
request *api.QueryRoomVersionCapabilitiesRequest,
response *api.QueryRoomVersionCapabilitiesResponse,
@@ -500,7 +460,7 @@ func (r *RoomserverInternalAPI) QueryRoomVersionCapabilities(
}
// QueryRoomVersionCapabilities implements api.RoomserverInternalAPI
-func (r *RoomserverInternalAPI) QueryRoomVersionForRoom(
+func (r *Queryer) QueryRoomVersionForRoom(
ctx context.Context,
request *api.QueryRoomVersionForRoomRequest,
response *api.QueryRoomVersionForRoomResponse,
@@ -522,7 +482,7 @@ func (r *RoomserverInternalAPI) QueryRoomVersionForRoom(
return nil
}
-func (r *RoomserverInternalAPI) roomVersion(roomID string) (gomatrixserverlib.RoomVersion, error) {
+func (r *Queryer) roomVersion(roomID string) (gomatrixserverlib.RoomVersion, error) {
var res api.QueryRoomVersionForRoomResponse
err := r.QueryRoomVersionForRoom(context.Background(), &api.QueryRoomVersionForRoomRequest{
RoomID: roomID,
@@ -530,7 +490,7 @@ func (r *RoomserverInternalAPI) roomVersion(roomID string) (gomatrixserverlib.Ro
return res.RoomVersion, err
}
-func (r *RoomserverInternalAPI) QueryPublishedRooms(
+func (r *Queryer) QueryPublishedRooms(
ctx context.Context,
req *api.QueryPublishedRoomsRequest,
res *api.QueryPublishedRoomsResponse,
diff --git a/roomserver/internal/query_test.go b/roomserver/internal/query/query_test.go
index 92e00832..b4cb99b8 100644
--- a/roomserver/internal/query_test.go
+++ b/roomserver/internal/query/query_test.go
@@ -1,4 +1,4 @@
-// Copyright 2017 Vector Creations Ltd
+// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package internal
+package query
import (
"context"