aboutsummaryrefslogtreecommitdiff
path: root/roomserver
diff options
context:
space:
mode:
Diffstat (limited to 'roomserver')
-rw-r--r--roomserver/api/api.go6
-rw-r--r--roomserver/api/api_trace.go10
-rw-r--r--roomserver/api/output.go17
-rw-r--r--roomserver/api/perform.go22
-rw-r--r--roomserver/api/query.go2
-rw-r--r--roomserver/api/wrapper.go2
-rw-r--r--roomserver/internal/api.go5
-rw-r--r--roomserver/internal/perform/perform_inbound_peek.go129
-rw-r--r--roomserver/internal/perform/perform_peek.go23
-rw-r--r--roomserver/internal/query/query.go14
-rw-r--r--roomserver/internal/query/query_test.go4
-rw-r--r--roomserver/inthttp/client.go29
-rw-r--r--roomserver/inthttp/server.go13
13 files changed, 255 insertions, 21 deletions
diff --git a/roomserver/api/api.go b/roomserver/api/api.go
index cedd6193..72e406ee 100644
--- a/roomserver/api/api.go
+++ b/roomserver/api/api.go
@@ -56,6 +56,12 @@ type RoomserverInternalAPI interface {
res *PerformPublishResponse,
)
+ PerformInboundPeek(
+ ctx context.Context,
+ req *PerformInboundPeekRequest,
+ res *PerformInboundPeekResponse,
+ ) error
+
QueryPublishedRooms(
ctx context.Context,
req *QueryPublishedRoomsRequest,
diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go
index 40745975..1a2b9a49 100644
--- a/roomserver/api/api_trace.go
+++ b/roomserver/api/api_trace.go
@@ -88,6 +88,16 @@ func (t *RoomserverInternalAPITrace) PerformPublish(
util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res))
}
+func (t *RoomserverInternalAPITrace) PerformInboundPeek(
+ ctx context.Context,
+ req *PerformInboundPeekRequest,
+ res *PerformInboundPeekResponse,
+) error {
+ err := t.Impl.PerformInboundPeek(ctx, req, res)
+ util.GetLogger(ctx).Infof("PerformInboundPeek req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
func (t *RoomserverInternalAPITrace) QueryPublishedRooms(
ctx context.Context,
req *QueryPublishedRoomsRequest,
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index 2993813c..d60d1cc8 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -51,6 +51,8 @@ const (
// OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
OutputTypeNewPeek OutputType = "new_peek"
+ // OutputTypeNewInboundPeek indicates that the kafka event is an OutputNewInboundPeek
+ OutputTypeNewInboundPeek OutputType = "new_inbound_peek"
// OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek
OutputTypeRetirePeek OutputType = "retire_peek"
)
@@ -72,6 +74,8 @@ type OutputEvent struct {
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
// The content of event with type OutputTypeNewPeek
NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
+ // The content of event with type OutputTypeNewInboundPeek
+ NewInboundPeek *OutputNewInboundPeek `json:"new_inbound_peek,omitempty"`
// The content of event with type OutputTypeRetirePeek
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
}
@@ -245,6 +249,19 @@ type OutputNewPeek struct {
DeviceID string
}
+// An OutputNewInboundPeek is written whenever a server starts peeking into a room
+type OutputNewInboundPeek struct {
+ RoomID string
+ PeekID string
+ // the event ID at which the peek begins (so we can avoid
+ // a race between tracking the state returned by /peek and emitting subsequent
+ // peeked events)
+ LatestEventID string
+ ServerName gomatrixserverlib.ServerName
+ // how often we told the peeking server to renew the peek
+ RenewalInterval int64
+}
+
// An OutputRetirePeek is written whenever a user stops peeking into a room.
type OutputRetirePeek struct {
RoomID string
diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go
index ae2d6d97..51cbcb1a 100644
--- a/roomserver/api/perform.go
+++ b/roomserver/api/perform.go
@@ -172,6 +172,28 @@ type PerformPublishResponse struct {
Error *PerformError
}
+type PerformInboundPeekRequest struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ PeekID string `json:"peek_id"`
+ ServerName gomatrixserverlib.ServerName `json:"server_name"`
+ RenewalInterval int64 `json:"renewal_interval"`
+}
+
+type PerformInboundPeekResponse struct {
+ // Does the room exist on this roomserver?
+ // If the room doesn't exist this will be false and StateEvents will be empty.
+ RoomExists bool `json:"room_exists"`
+ // The room version of the room.
+ RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
+ // The current state and auth chain events.
+ // The lists will be in an arbitrary order.
+ StateEvents []*gomatrixserverlib.HeaderedEvent `json:"state_events"`
+ AuthChainEvents []*gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
+ // The event at which this state was captured
+ LatestEvent *gomatrixserverlib.HeaderedEvent `json:"latest_event"`
+}
+
// PerformForgetRequest is a request to PerformForget
type PerformForgetRequest struct {
RoomID string `json:"room_id"`
diff --git a/roomserver/api/query.go b/roomserver/api/query.go
index 43e562a9..43bbfd16 100644
--- a/roomserver/api/query.go
+++ b/roomserver/api/query.go
@@ -221,7 +221,7 @@ type QueryStateAndAuthChainRequest struct {
// The room ID to query the state in.
RoomID string `json:"room_id"`
// The list of prev events for the event. Used to calculate the state at
- // the event
+ // the event.
PrevEventIDs []string `json:"prev_event_ids"`
// The list of auth events for the event. Used to calculate the auth chain
AuthEventIDs []string `json:"auth_event_ids"`
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go
index 7779dbde..a6ef735c 100644
--- a/roomserver/api/wrapper.go
+++ b/roomserver/api/wrapper.go
@@ -43,7 +43,7 @@ func SendEvents(
// SendEventWithState writes an event with the specified kind to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is
-// marked as `true` in haveEventIDs
+// marked as `true` in haveEventIDs.
func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 91caa0bd..e10bdb46 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -24,6 +24,7 @@ type RoomserverInternalAPI struct {
*perform.Inviter
*perform.Joiner
*perform.Peeker
+ *perform.InboundPeeker
*perform.Unpeeker
*perform.Leaver
*perform.Publisher
@@ -97,6 +98,10 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
+ r.InboundPeeker = &perform.InboundPeeker{
+ DB: r.DB,
+ Inputer: r.Inputer,
+ }
r.Unpeeker = &perform.Unpeeker{
ServerName: r.Cfg.Matrix.ServerName,
Cfg: r.Cfg,
diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go
new file mode 100644
index 00000000..eb3c9727
--- /dev/null
+++ b/roomserver/internal/perform/perform_inbound_peek.go
@@ -0,0 +1,129 @@
+// Copyright 2020 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package perform
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
+ "github.com/matrix-org/dendrite/roomserver/internal/query"
+ "github.com/matrix-org/dendrite/roomserver/state"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+type InboundPeeker struct {
+ DB storage.Database
+ Inputer *input.Inputer
+}
+
+// PerformInboundPeek handles peeking into matrix rooms, including over
+// federation by talking to the federationsender. called when a remote server
+// initiates a /peek over federation.
+//
+// It should atomically figure out the current state of the room (for the
+// response to /peek) while adding the new inbound peek to the kafka stream so the
+// fed sender can start sending peeked events without a race between the state
+// snapshot and the stream of peeked events.
+func (r *InboundPeeker) PerformInboundPeek(
+ ctx context.Context,
+ request *api.PerformInboundPeekRequest,
+ response *api.PerformInboundPeekResponse,
+) error {
+ info, err := r.DB.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+ if info == nil || info.IsStub {
+ return nil
+ }
+ response.RoomExists = true
+ response.RoomVersion = info.RoomVersion
+
+ var stateEvents []*gomatrixserverlib.Event
+
+ var currentStateSnapshotNID types.StateSnapshotNID
+ latestEventRefs, currentStateSnapshotNID, _, err :=
+ r.DB.LatestEventIDs(ctx, info.RoomNID)
+ if err != nil {
+ return err
+ }
+ latestEvents, err := r.DB.EventsFromIDs(ctx, []string{latestEventRefs[0].EventID})
+ if err != nil {
+ return err
+ }
+ var sortedLatestEvents []*gomatrixserverlib.Event
+ for _, ev := range latestEvents {
+ sortedLatestEvents = append(sortedLatestEvents, ev.Event)
+ }
+ sortedLatestEvents = gomatrixserverlib.ReverseTopologicalOrdering(
+ sortedLatestEvents,
+ gomatrixserverlib.TopologicalOrderByPrevEvents,
+ )
+ response.LatestEvent = sortedLatestEvents[0].Headered(info.RoomVersion)
+
+ // XXX: do we actually need to do a state resolution here?
+ roomState := state.NewStateResolution(r.DB, *info)
+
+ var stateEntries []types.StateEntry
+ stateEntries, err = roomState.LoadStateAtSnapshot(
+ ctx, currentStateSnapshotNID,
+ )
+ if err != nil {
+ return err
+ }
+ stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries)
+ if err != nil {
+ return err
+ }
+
+ // get the auth event IDs for the current state events
+ var authEventIDs []string
+ for _, se := range stateEvents {
+ authEventIDs = append(authEventIDs, se.AuthEventIDs()...)
+ }
+ authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
+
+ authEvents, err := query.GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
+ if err != nil {
+ return err
+ }
+
+ for _, event := range stateEvents {
+ response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion))
+ }
+
+ for _, event := range authEvents {
+ response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
+ }
+
+ err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
+ {
+ Type: api.OutputTypeNewInboundPeek,
+ NewInboundPeek: &api.OutputNewInboundPeek{
+ RoomID: request.RoomID,
+ PeekID: request.PeekID,
+ LatestEventID: latestEvents[0].EventID(),
+ ServerName: request.ServerName,
+ RenewalInterval: request.RenewalInterval,
+ },
+ },
+ })
+ return err
+}
diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go
index 2f4694c8..443276cd 100644
--- a/roomserver/internal/perform/perform_peek.go
+++ b/roomserver/internal/perform/perform_peek.go
@@ -151,11 +151,28 @@ func (r *Peeker) performPeekRoomByID(
}
}
- // If the server name in the room ID isn't ours then it's a
- // possible candidate for finding the room via federation. Add
- // it to the list of servers to try.
+ // handle federated peeks
+ // FIXME: don't create an outbound peek if we already have one going.
if domain != r.Cfg.Matrix.ServerName {
+ // If the server name in the room ID isn't ours then it's a
+ // possible candidate for finding the room via federation. Add
+ // it to the list of servers to try.
req.ServerNames = append(req.ServerNames, domain)
+
+ // Try peeking by all of the supplied server names.
+ fedReq := fsAPI.PerformOutboundPeekRequest{
+ RoomID: req.RoomIDOrAlias, // the room ID to try and peek
+ ServerNames: req.ServerNames, // the servers to try peeking via
+ }
+ fedRes := fsAPI.PerformOutboundPeekResponse{}
+ _ = r.FSAPI.PerformOutboundPeek(ctx, &fedReq, &fedRes)
+ if fedRes.LastError != nil {
+ return "", &api.PerformError{
+ Code: api.PerformErrRemote,
+ Msg: fedRes.LastError.Message,
+ RemoteCode: fedRes.LastError.Code,
+ }
+ }
}
// If this room isn't world_readable, we reject.
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
index 7346c7a7..2a361641 100644
--- a/roomserver/internal/query/query.go
+++ b/roomserver/internal/query/query.go
@@ -107,7 +107,7 @@ func (r *Queryer) QueryStateAfterEvents(
}
authEventIDs = util.UniqueStrings(authEventIDs)
- authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
+ authEvents, err := GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil {
return fmt.Errorf("getAuthChain: %w", err)
}
@@ -447,10 +447,12 @@ func (r *Queryer) QueryStateAndAuthChain(
response.RoomExists = true
response.RoomVersion = info.RoomVersion
- stateEvents, err := r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs)
+ var stateEvents []*gomatrixserverlib.Event
+ stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs)
if err != nil {
return err
}
+
response.PrevEventsExist = true
// add the auth event IDs for the current state events too
@@ -461,7 +463,7 @@ func (r *Queryer) QueryStateAndAuthChain(
}
authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
- authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
+ authEvents, err := GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil {
return err
}
@@ -510,11 +512,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomIn
type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
-// getAuthChain fetches the auth chain for the given auth events. An auth chain
+// GetAuthChain fetches the auth chain for the given auth events. An auth chain
// is the list of all events that are referenced in the auth_events section, and
// all their auth_events, recursively. The returned set of events contain the
// given events. Will *not* error if we don't have all auth events.
-func getAuthChain(
+func GetAuthChain(
ctx context.Context, fn eventsFromIDs, authEventIDs []string,
) ([]*gomatrixserverlib.Event, error) {
// List of event IDs to fetch. On each pass, these events will be requested
@@ -718,7 +720,7 @@ func (r *Queryer) QueryServerBannedFromRoom(ctx context.Context, req *api.QueryS
}
func (r *Queryer) QueryAuthChain(ctx context.Context, req *api.QueryAuthChainRequest, res *api.QueryAuthChainResponse) error {
- chain, err := getAuthChain(ctx, r.DB.EventsFromIDs, req.EventIDs)
+ chain, err := GetAuthChain(ctx, r.DB.EventsFromIDs, req.EventIDs)
if err != nil {
return err
}
diff --git a/roomserver/internal/query/query_test.go b/roomserver/internal/query/query_test.go
index 4e761d8e..ba5bb9f5 100644
--- a/roomserver/internal/query/query_test.go
+++ b/roomserver/internal/query/query_test.go
@@ -106,7 +106,7 @@ func TestGetAuthChainSingle(t *testing.T) {
t.Fatalf("Failed to add events to db: %v", err)
}
- result, err := getAuthChain(context.TODO(), db.EventsFromIDs, []string{"e"})
+ result, err := GetAuthChain(context.TODO(), db.EventsFromIDs, []string{"e"})
if err != nil {
t.Fatalf("getAuthChain failed: %v", err)
}
@@ -139,7 +139,7 @@ func TestGetAuthChainMultiple(t *testing.T) {
t.Fatalf("Failed to add events to db: %v", err)
}
- result, err := getAuthChain(context.TODO(), db.EventsFromIDs, []string{"e", "f"})
+ result, err := GetAuthChain(context.TODO(), db.EventsFromIDs, []string{"e", "f"})
if err != nil {
t.Fatalf("getAuthChain failed: %v", err)
}
diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go
index cac813ff..6774d102 100644
--- a/roomserver/inthttp/client.go
+++ b/roomserver/inthttp/client.go
@@ -26,14 +26,15 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations
- RoomserverPerformInvitePath = "/roomserver/performInvite"
- RoomserverPerformPeekPath = "/roomserver/performPeek"
- RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
- RoomserverPerformJoinPath = "/roomserver/performJoin"
- RoomserverPerformLeavePath = "/roomserver/performLeave"
- RoomserverPerformBackfillPath = "/roomserver/performBackfill"
- RoomserverPerformPublishPath = "/roomserver/performPublish"
- RoomserverPerformForgetPath = "/roomserver/performForget"
+ RoomserverPerformInvitePath = "/roomserver/performInvite"
+ RoomserverPerformPeekPath = "/roomserver/performPeek"
+ RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
+ RoomserverPerformJoinPath = "/roomserver/performJoin"
+ RoomserverPerformLeavePath = "/roomserver/performLeave"
+ RoomserverPerformBackfillPath = "/roomserver/performBackfill"
+ RoomserverPerformPublishPath = "/roomserver/performPublish"
+ RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
+ RoomserverPerformForgetPath = "/roomserver/performForget"
// Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@@ -216,6 +217,18 @@ func (h *httpRoomserverInternalAPI) PerformPeek(
}
}
+func (h *httpRoomserverInternalAPI) PerformInboundPeek(
+ ctx context.Context,
+ request *api.PerformInboundPeekRequest,
+ response *api.PerformInboundPeekResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInboundPeek")
+ defer span.Finish()
+
+ apiURL := h.roomserverURL + RoomserverPerformInboundPeekPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
+
func (h *httpRoomserverInternalAPI) PerformUnpeek(
ctx context.Context,
request *api.PerformUnpeekRequest,
diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go
index f9c8ef9f..bf319262 100644
--- a/roomserver/inthttp/server.go
+++ b/roomserver/inthttp/server.go
@@ -72,6 +72,19 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(RoomserverPerformInboundPeekPath,
+ httputil.MakeInternalAPI("performInboundPeek", func(req *http.Request) util.JSONResponse {
+ var request api.PerformInboundPeekRequest
+ var response api.PerformInboundPeekResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := r.PerformInboundPeek(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
internalAPIMux.Handle(RoomserverPerformPeekPath,
httputil.MakeInternalAPI("performUnpeek", func(req *http.Request) util.JSONResponse {
var request api.PerformUnpeekRequest