aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/peeking.md21
-rw-r--r--federationapi/routing/peek.go102
-rw-r--r--federationapi/routing/routing.go38
-rw-r--r--federationsender/api/api.go16
-rw-r--r--federationsender/consumers/roomserver.go40
-rw-r--r--federationsender/internal/perform.go168
-rw-r--r--federationsender/internal/perform/join.go15
-rw-r--r--federationsender/inthttp/client.go14
-rw-r--r--federationsender/storage/interface.go11
-rw-r--r--federationsender/storage/postgres/inbound_peeks_table.go176
-rw-r--r--federationsender/storage/postgres/outbound_peeks_table.go176
-rw-r--r--federationsender/storage/postgres/storage.go28
-rw-r--r--federationsender/storage/shared/storage.go60
-rw-r--r--federationsender/storage/sqlite3/inbound_peeks_table.go176
-rw-r--r--federationsender/storage/sqlite3/outbound_peeks_table.go176
-rw-r--r--federationsender/storage/sqlite3/storage.go28
-rw-r--r--federationsender/storage/tables/interface.go18
-rw-r--r--federationsender/types/types.go20
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--roomserver/api/api.go6
-rw-r--r--roomserver/api/api_trace.go10
-rw-r--r--roomserver/api/output.go17
-rw-r--r--roomserver/api/perform.go22
-rw-r--r--roomserver/api/query.go2
-rw-r--r--roomserver/api/wrapper.go2
-rw-r--r--roomserver/internal/api.go5
-rw-r--r--roomserver/internal/perform/perform_inbound_peek.go129
-rw-r--r--roomserver/internal/perform/perform_peek.go23
-rw-r--r--roomserver/internal/query/query.go14
-rw-r--r--roomserver/internal/query/query_test.go4
-rw-r--r--roomserver/inthttp/client.go29
-rw-r--r--roomserver/inthttp/server.go13
-rw-r--r--syncapi/types/types.go2
34 files changed, 1501 insertions, 66 deletions
diff --git a/docs/peeking.md b/docs/peeking.md
index 78bd6f79..60f35907 100644
--- a/docs/peeking.md
+++ b/docs/peeking.md
@@ -1,19 +1,26 @@
## Peeking
-Peeking is implemented as per [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753).
+Local peeking is implemented as per [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753).
Implementationwise, this means:
* Users call `/peek` and `/unpeek` on the clientapi from a given device.
* The clientapi delegates these via HTTP to the roomserver, which coordinates peeking in general for a given room
* The roomserver writes an NewPeek event into the kafka log headed to the syncserver
- * The syncserver tracks the existence of the local peek in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response.
+ * The syncserver tracks the existence of the local peek in the syncapi_peeks table in its DB, and then starts waking up the peeking devices for the room in question, putting it in the `peek` section of the /sync response.
-Questions (given this is [my](https://github.com/ara4n) first time hacking on Dendrite):
- * The whole clientapi -> roomserver -> syncapi flow to initiate a peek seems very indirect. Is there a reason not to just let syncapi itself host the implementation of `/peek`?
+Peeking over federation is implemented as per [MSC2444](https://github.com/matrix-org/matrix-doc/pull/2444).
-In future, peeking over federation will be added as per [MSC2444](https://github.com/matrix-org/matrix-doc/pull/2444).
- * The `roomserver` will kick the `federationsender` much as it does for a federated `/join` in order to trigger a federated `/peek`
- * The `federationsender` tracks the existence of the remote peek in question
+For requests to peek our rooms ("inbound peeks"):
+ * Remote servers call `/peek` on federationapi
+ * The federationapi queries the federationsender to check if this is renewing an inbound peek or not.
+ * If not, it hits the PerformInboundPeek on the roomserver to ask it for the current state of the room.
+ * The roomserver atomically (in theory) adds a NewInboundPeek to its kafka stream to tell the federationserver to start peeking.
+ * The federationsender receives the event, tracks the inbound peek in the federationsender_inbound_peeks table, and starts sending events to the peeking server.
+ * The federationsender evicts stale inbound peeks which haven't been renewed.
+
+For peeking into other server's rooms ("outbound peeks"):
+ * The `roomserver` will kick the `federationsender` much as it does for a federated `/join` in order to trigger a federated outbound `/peek`
+ * The `federationsender` tracks the existence of the outbound peek in in its federationsender_outbound_peeks table.
* The `federationsender` regularly renews the remote peek as long as there are still peeking devices syncing for it.
* TBD: how do we tell if there are no devices currently syncing for a given peeked room? The syncserver needs to tell the roomserver
somehow who then needs to warn the federationsender. \ No newline at end of file
diff --git a/federationapi/routing/peek.go b/federationapi/routing/peek.go
new file mode 100644
index 00000000..8f83cb15
--- /dev/null
+++ b/federationapi/routing/peek.go
@@ -0,0 +1,102 @@
+// Copyright 2020 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package routing
+
+import (
+ "net/http"
+
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+// Peek implements the SS /peek API, handling inbound peeks
+func Peek(
+ httpReq *http.Request,
+ request *gomatrixserverlib.FederationRequest,
+ cfg *config.FederationAPI,
+ rsAPI api.RoomserverInternalAPI,
+ roomID, peekID string,
+ remoteVersions []gomatrixserverlib.RoomVersion,
+) util.JSONResponse {
+ // TODO: check if we're just refreshing an existing peek by querying the federationsender
+
+ verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
+ verRes := api.QueryRoomVersionForRoomResponse{}
+ if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
+ return util.JSONResponse{
+ Code: http.StatusInternalServerError,
+ JSON: jsonerror.InternalServerError(),
+ }
+ }
+
+ // Check that the room that the peeking server is trying to peek is actually
+ // one of the room versions that they listed in their supported ?ver= in
+ // the peek URL.
+ remoteSupportsVersion := false
+ for _, v := range remoteVersions {
+ if v == verRes.RoomVersion {
+ remoteSupportsVersion = true
+ break
+ }
+ }
+ // If it isn't, stop trying to peek the room.
+ if !remoteSupportsVersion {
+ return util.JSONResponse{
+ Code: http.StatusBadRequest,
+ JSON: jsonerror.IncompatibleRoomVersion(verRes.RoomVersion),
+ }
+ }
+
+ // TODO: Check history visibility
+
+ // tell the peeking server to renew every hour
+ renewalInterval := int64(60 * 60 * 1000 * 1000)
+
+ var response api.PerformInboundPeekResponse
+ err := rsAPI.PerformInboundPeek(
+ httpReq.Context(),
+ &api.PerformInboundPeekRequest{
+ RoomID: roomID,
+ PeekID: peekID,
+ ServerName: request.Origin(),
+ RenewalInterval: renewalInterval,
+ },
+ &response,
+ )
+ if err != nil {
+ resErr := util.ErrorResponse(err)
+ return resErr
+ }
+
+ if !response.RoomExists {
+ return util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
+ }
+
+ respPeek := gomatrixserverlib.RespPeek{
+ StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
+ AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
+ RoomVersion: response.RoomVersion,
+ LatestEvent: response.LatestEvent.Unwrap(),
+ RenewalInterval: renewalInterval,
+ }
+
+ return util.JSONResponse{
+ Code: http.StatusOK,
+ JSON: respPeek,
+ }
+}
diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go
index c957e26d..7e76a143 100644
--- a/federationapi/routing/routing.go
+++ b/federationapi/routing/routing.go
@@ -229,7 +229,37 @@ func Setup(
},
)).Methods(http.MethodGet)
- v1fedmux.Handle("/make_join/{roomID}/{eventID}", httputil.MakeFedAPI(
+ v1fedmux.Handle("/peek/{roomID}/{peekID}", httputil.MakeFedAPI(
+ "federation_peek", cfg.Matrix.ServerName, keys, wakeup,
+ func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
+ if roomserverAPI.IsServerBannedFromRoom(httpReq.Context(), rsAPI, vars["roomID"], request.Origin()) {
+ return util.JSONResponse{
+ Code: http.StatusForbidden,
+ JSON: jsonerror.Forbidden("Forbidden by server ACLs"),
+ }
+ }
+ roomID := vars["roomID"]
+ peekID := vars["peekID"]
+ queryVars := httpReq.URL.Query()
+ remoteVersions := []gomatrixserverlib.RoomVersion{}
+ if vers, ok := queryVars["ver"]; ok {
+ // The remote side supplied a ?ver= so use that to build up the list
+ // of supported room versions
+ for _, v := range vers {
+ remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v))
+ }
+ } else {
+ // The remote side didn't supply a ?ver= so just assume that they only
+ // support room version 1
+ remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1)
+ }
+ return Peek(
+ httpReq, request, cfg, rsAPI, roomID, peekID, remoteVersions,
+ )
+ },
+ )).Methods(http.MethodPut, http.MethodDelete)
+
+ v1fedmux.Handle("/make_join/{roomID}/{userID}", httputil.MakeFedAPI(
"federation_make_join", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
if roomserverAPI.IsServerBannedFromRoom(httpReq.Context(), rsAPI, vars["roomID"], request.Origin()) {
@@ -239,11 +269,11 @@ func Setup(
}
}
roomID := vars["roomID"]
- eventID := vars["eventID"]
+ userID := vars["userID"]
queryVars := httpReq.URL.Query()
remoteVersions := []gomatrixserverlib.RoomVersion{}
if vers, ok := queryVars["ver"]; ok {
- // The remote side supplied a ?=ver so use that to build up the list
+ // The remote side supplied a ?ver= so use that to build up the list
// of supported room versions
for _, v := range vers {
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersion(v))
@@ -255,7 +285,7 @@ func Setup(
remoteVersions = append(remoteVersions, gomatrixserverlib.RoomVersionV1)
}
return MakeJoin(
- httpReq, request, cfg, rsAPI, roomID, eventID, remoteVersions,
+ httpReq, request, cfg, rsAPI, roomID, userID, remoteVersions,
)
},
)).Methods(http.MethodGet)
diff --git a/federationsender/api/api.go b/federationsender/api/api.go
index dfc2dd8a..a9ebedaf 100644
--- a/federationsender/api/api.go
+++ b/federationsender/api/api.go
@@ -62,6 +62,12 @@ type FederationSenderInternalAPI interface {
request *PerformJoinRequest,
response *PerformJoinResponse,
)
+ // Handle an instruction to peek a room on a remote server.
+ PerformOutboundPeek(
+ ctx context.Context,
+ request *PerformOutboundPeekRequest,
+ response *PerformOutboundPeekResponse,
+ ) error
// Handle an instruction to make_leave & send_leave with a remote server.
PerformLeave(
ctx context.Context,
@@ -111,6 +117,16 @@ type PerformJoinResponse struct {
LastError *gomatrix.HTTPError
}
+type PerformOutboundPeekRequest struct {
+ RoomID string `json:"room_id"`
+ // The sorted list of servers to try. Servers will be tried sequentially, after de-duplication.
+ ServerNames types.ServerNames `json:"server_names"`
+}
+
+type PerformOutboundPeekResponse struct {
+ LastError *gomatrix.HTTPError
+}
+
type PerformLeaveRequest struct {
RoomID string `json:"room_id"`
UserID string `json:"user_id"`
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 513919c6..846468fa 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -111,6 +111,14 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
return nil
}
+ case api.OutputTypeNewInboundPeek:
+ if err := s.processInboundPeek(*output.NewInboundPeek); err != nil {
+ log.WithFields(log.Fields{
+ "event": output.NewInboundPeek,
+ log.ErrorKey: err,
+ }).Panicf("roomserver output log: remote peek event failure")
+ return nil
+ }
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
@@ -121,6 +129,23 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
+// processInboundPeek starts tracking a new federated inbound peek (replacing the existing one if any)
+// causing the federationsender to start sending messages to the peeking server
+func (s *OutputRoomEventConsumer) processInboundPeek(orp api.OutputNewInboundPeek) error {
+
+ // FIXME: there's a race here - we should start /sending new peeked events
+ // atomically after the orp.LatestEventID to ensure there are no gaps between
+ // the peek beginning and the send stream beginning.
+ //
+ // We probably need to track orp.LatestEventID on the inbound peek, but it's
+ // unclear how we then use that to prevent the race when we start the send
+ // stream.
+ //
+ // This is making the tests flakey.
+
+ return s.db.AddInboundPeek(context.TODO(), orp.ServerName, orp.RoomID, orp.PeekID, orp.RenewalInterval)
+}
+
// processMessage updates the list of currently joined hosts in the room
// and then sends the event to the hosts that were joined before the event.
func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) error {
@@ -164,6 +189,10 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
return err
}
+ // TODO: do housekeeping to evict unrenewed peeking hosts
+
+ // TODO: implement query to let the fedapi check whether a given peek is live or not
+
// Send the event.
return s.queues.SendEvent(
ore.Event, gomatrixserverlib.ServerName(ore.SendAsServer), joinedHostsAtEvent,
@@ -171,7 +200,7 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
}
// joinedHostsAtEvent works out a list of matrix servers that were joined to
-// the room at the event.
+// the room at the event (including peeking ones)
// It is important to use the state at the event for sending messages because:
// 1) We shouldn't send messages to servers that weren't in the room.
// 2) If a server is kicked from the rooms it should still be told about the
@@ -222,6 +251,15 @@ func (s *OutputRoomEventConsumer) joinedHostsAtEvent(
joined[joinedHost.ServerName] = true
}
+ // handle peeking hosts
+ inboundPeeks, err := s.db.GetInboundPeeks(context.TODO(), ore.Event.Event.RoomID())
+ if err != nil {
+ return nil, err
+ }
+ for _, inboundPeek := range inboundPeeks {
+ joined[inboundPeek.ServerName] = true
+ }
+
var result []gomatrixserverlib.ServerName
for serverName, include := range joined {
if include {
diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go
index 45f33ff7..3adf8fc9 100644
--- a/federationsender/internal/perform.go
+++ b/federationsender/internal/perform.go
@@ -234,7 +234,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing)
respState, err := joinCtx.CheckSendJoinResponse(
- ctx, event, serverName, respMakeJoin, respSendJoin,
+ ctx, event, serverName, respSendJoin,
)
if err != nil {
logrus.WithFields(logrus.Fields{
@@ -266,6 +266,172 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
return nil
}
+// PerformOutboundPeekRequest implements api.FederationSenderInternalAPI
+func (r *FederationSenderInternalAPI) PerformOutboundPeek(
+ ctx context.Context,
+ request *api.PerformOutboundPeekRequest,
+ response *api.PerformOutboundPeekResponse,
+) error {
+ // Look up the supported room versions.
+ var supportedVersions []gomatrixserverlib.RoomVersion
+ for version := range version.SupportedRoomVersions() {
+ supportedVersions = append(supportedVersions, version)
+ }
+
+ // Deduplicate the server names we were provided but keep the ordering
+ // as this encodes useful information about which servers are most likely
+ // to respond.
+ seenSet := make(map[gomatrixserverlib.ServerName]bool)
+ var uniqueList []gomatrixserverlib.ServerName
+ for _, srv := range request.ServerNames {
+ if seenSet[srv] {
+ continue
+ }
+ seenSet[srv] = true
+ uniqueList = append(uniqueList, srv)
+ }
+ request.ServerNames = uniqueList
+
+ // See if there's an existing outbound peek for this room ID with
+ // one of the specified servers.
+ if peeks, err := r.db.GetOutboundPeeks(ctx, request.RoomID); err == nil {
+ for _, peek := range peeks {
+ if _, ok := seenSet[peek.ServerName]; ok {
+ return nil
+ }
+ }
+ }
+
+ // Try each server that we were provided until we land on one that
+ // successfully completes the peek
+ var lastErr error
+ for _, serverName := range request.ServerNames {
+ if err := r.performOutboundPeekUsingServer(
+ ctx,
+ request.RoomID,
+ serverName,
+ supportedVersions,
+ ); err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "server_name": serverName,
+ "room_id": request.RoomID,
+ }).Warnf("Failed to peek room through server")
+ lastErr = err
+ continue
+ }
+
+ // We're all good.
+ return nil
+ }
+
+ // If we reach here then we didn't complete a peek for some reason.
+ var httpErr gomatrix.HTTPError
+ if ok := errors.As(lastErr, &httpErr); ok {
+ httpErr.Message = string(httpErr.Contents)
+ // Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
+ httpErr.WrappedError = nil
+ response.LastError = &httpErr
+ } else {
+ response.LastError = &gomatrix.HTTPError{
+ Code: 0,
+ WrappedError: nil,
+ Message: lastErr.Error(),
+ }
+ }
+
+ logrus.Errorf(
+ "failed to peek room %q through %d server(s): last error %s",
+ request.RoomID, len(request.ServerNames), lastErr,
+ )
+
+ return lastErr
+}
+
+func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
+ ctx context.Context,
+ roomID string,
+ serverName gomatrixserverlib.ServerName,
+ supportedVersions []gomatrixserverlib.RoomVersion,
+) error {
+ // create a unique ID for this peek.
+ // for now we just use the room ID again. In future, if we ever
+ // support concurrent peeks to the same room with different filters
+ // then we would need to disambiguate further.
+ peekID := roomID
+
+ // check whether we're peeking already to try to avoid needlessly
+ // re-peeking on the server. we don't need a transaction for this,
+ // given this is a nice-to-have.
+ outboundPeek, err := r.db.GetOutboundPeek(ctx, serverName, roomID, peekID)
+ if err != nil {
+ return err
+ }
+ renewing := false
+ if outboundPeek != nil {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ if nowMilli > outboundPeek.RenewedTimestamp+outboundPeek.RenewalInterval {
+ logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
+ renewing = true
+ } else {
+ logrus.Infof("live outbound peek to %s for %s already exists", serverName, roomID)
+ return nil
+ }
+ }
+
+ // Try to perform an outbound /peek using the information supplied in the
+ // request.
+ respPeek, err := r.federation.Peek(
+ ctx,
+ serverName,
+ roomID,
+ peekID,
+ supportedVersions,
+ )
+ if err != nil {
+ r.statistics.ForServer(serverName).Failure()
+ return fmt.Errorf("r.federation.Peek: %w", err)
+ }
+ r.statistics.ForServer(serverName).Success()
+
+ // Work out if we support the room version that has been supplied in
+ // the peek response.
+ if respPeek.RoomVersion == "" {
+ respPeek.RoomVersion = gomatrixserverlib.RoomVersionV1
+ }
+ if _, err = respPeek.RoomVersion.EventFormat(); err != nil {
+ return fmt.Errorf("respPeek.RoomVersion.EventFormat: %w", err)
+ }
+
+ // TODO: authenticate the state returned (check its auth events etc)
+ // the equivalent of CheckSendJoinResponse()
+
+ // If we've got this far, the remote server is peeking.
+ if renewing {
+ if err = r.db.RenewOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
+ return err
+ }
+ } else {
+ if err = r.db.AddOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
+ return err
+ }
+ }
+
+ respState := respPeek.ToRespState()
+ // logrus.Warnf("got respPeek %#v", respPeek)
+ // Send the newly returned state to the roomserver to update our local view.
+ if err = roomserverAPI.SendEventWithState(
+ ctx, r.rsAPI,
+ roomserverAPI.KindNew,
+ &respState,
+ respPeek.LatestEvent.Headered(respPeek.RoomVersion),
+ nil,
+ ); err != nil {
+ return fmt.Errorf("r.producer.SendEventWithState: %w", err)
+ }
+
+ return nil
+}
+
// PerformLeaveRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformLeave(
ctx context.Context,
diff --git a/federationsender/internal/perform/join.go b/federationsender/internal/perform/join.go
index 2fa3d4bf..c23f6fa3 100644
--- a/federationsender/internal/perform/join.go
+++ b/federationsender/internal/perform/join.go
@@ -1,3 +1,17 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
package perform
import (
@@ -28,7 +42,6 @@ func (r joinContext) CheckSendJoinResponse(
ctx context.Context,
event *gomatrixserverlib.Event,
server gomatrixserverlib.ServerName,
- respMakeJoin gomatrixserverlib.RespMakeJoin,
respSendJoin gomatrixserverlib.RespSendJoin,
) (*gomatrixserverlib.RespState, error) {
// A list of events that we have retried, if they were not included in
diff --git a/federationsender/inthttp/client.go b/federationsender/inthttp/client.go
index 81d3fc51..3f86a2d0 100644
--- a/federationsender/inthttp/client.go
+++ b/federationsender/inthttp/client.go
@@ -20,6 +20,7 @@ const (
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
+ FederationSenderPerformOutboundPeekRequestPath = "/federationsender/performOutboundPeekRequest"
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
@@ -76,6 +77,19 @@ func (h *httpFederationSenderInternalAPI) PerformInvite(
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
+// Handle starting a peek on a remote server.
+func (h *httpFederationSenderInternalAPI) PerformOutboundPeek(
+ ctx context.Context,
+ request *api.PerformOutboundPeekRequest,
+ response *api.PerformOutboundPeekResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformOutboundPeekRequest")
+ defer span.Finish()
+
+ apiURL := h.federationSenderURL + FederationSenderPerformOutboundPeekRequestPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
+
func (h *httpFederationSenderInternalAPI) PerformServersAlive(
ctx context.Context,
request *api.PerformServersAliveRequest,
diff --git a/federationsender/storage/interface.go b/federationsender/storage/interface.go
index 03d616f1..b8361304 100644
--- a/federationsender/storage/interface.go
+++ b/federationsender/storage/interface.go
@@ -51,7 +51,18 @@ type Database interface {
GetPendingPDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
GetPendingEDUServerNames(ctx context.Context) ([]gomatrixserverlib.ServerName, error)
+ // these don't have contexts passed in as we want things to happen regardless of the request context
AddServerToBlacklist(serverName gomatrixserverlib.ServerName) error
RemoveServerFromBlacklist(serverName gomatrixserverlib.ServerName) error
IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error)
+
+ AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error)
+ GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error)
+
+ AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error
+ GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error)
+ GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error)
}
diff --git a/federationsender/storage/postgres/inbound_peeks_table.go b/federationsender/storage/postgres/inbound_peeks_table.go
new file mode 100644
index 00000000..fe35ce44
--- /dev/null
+++ b/federationsender/storage/postgres/inbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const inboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts BIGINT NOT NULL,
+ renewed_ts BIGINT NOT NULL,
+ renewal_interval BIGINT NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertInboundPeekSQL = "" +
+ "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectInboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectInboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+const renewInboundPeekSQL = "" +
+ "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteInboundPeekSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteInboundPeeksSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+type inboundPeeksStatements struct {
+ db *sql.DB
+ insertInboundPeekStmt *sql.Stmt
+ selectInboundPeekStmt *sql.Stmt
+ selectInboundPeeksStmt *sql.Stmt
+ renewInboundPeekStmt *sql.Stmt
+ deleteInboundPeekStmt *sql.Stmt
+ deleteInboundPeeksStmt *sql.Stmt
+}
+
+func NewPostgresInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
+ s = &inboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(inboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *inboundPeeksStatements) InsertInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *inboundPeeksStatements) RenewInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.InboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
+ inboundPeek := types.InboundPeek{}
+ err := row.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &inboundPeek, nil
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (inboundPeeks []types.InboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ inboundPeek := types.InboundPeek{}
+ if err = rows.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ inboundPeeks = append(inboundPeeks, inboundPeek)
+ }
+
+ return inboundPeeks, rows.Err()
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/postgres/outbound_peeks_table.go b/federationsender/storage/postgres/outbound_peeks_table.go
new file mode 100644
index 00000000..596b4bcc
--- /dev/null
+++ b/federationsender/storage/postgres/outbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package postgres
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const outboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts BIGINT NOT NULL,
+ renewed_ts BIGINT NOT NULL,
+ renewal_interval BIGINT NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertOutboundPeekSQL = "" +
+ "INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectOutboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectOutboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+const renewOutboundPeekSQL = "" +
+ "UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteOutboundPeekSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteOutboundPeeksSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+type outboundPeeksStatements struct {
+ db *sql.DB
+ insertOutboundPeekStmt *sql.Stmt
+ selectOutboundPeekStmt *sql.Stmt
+ selectOutboundPeeksStmt *sql.Stmt
+ renewOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeeksStmt *sql.Stmt
+}
+
+func NewPostgresOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) {
+ s = &outboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(outboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *outboundPeeksStatements) InsertOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *outboundPeeksStatements) RenewOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.OutboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID)
+ outboundPeek := types.OutboundPeek{}
+ err := row.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &outboundPeek, nil
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (outboundPeeks []types.OutboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ outboundPeek := types.OutboundPeek{}
+ if err = rows.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ outboundPeeks = append(outboundPeeks, outboundPeek)
+ }
+
+ return outboundPeeks, rows.Err()
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go
index 75b54bbc..b9827ca1 100644
--- a/federationsender/storage/postgres/storage.go
+++ b/federationsender/storage/postgres/storage.go
@@ -64,16 +64,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil {
return nil, err
}
+ inboundPeeks, err := NewPostgresInboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ outboundPeeks, err := NewPostgresOutboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
- DB: d.db,
- Cache: cache,
- Writer: d.writer,
- FederationSenderJoinedHosts: joinedHosts,
- FederationSenderQueuePDUs: queuePDUs,
- FederationSenderQueueEDUs: queueEDUs,
- FederationSenderQueueJSON: queueJSON,
- FederationSenderRooms: rooms,
- FederationSenderBlacklist: blacklist,
+ DB: d.db,
+ Cache: cache,
+ Writer: d.writer,
+ FederationSenderJoinedHosts: joinedHosts,
+ FederationSenderQueuePDUs: queuePDUs,
+ FederationSenderQueueEDUs: queueEDUs,
+ FederationSenderQueueJSON: queueJSON,
+ FederationSenderRooms: rooms,
+ FederationSenderBlacklist: blacklist,
+ FederationSenderInboundPeeks: inboundPeeks,
+ FederationSenderOutboundPeeks: outboundPeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go
index fbf84c70..4c949042 100644
--- a/federationsender/storage/shared/storage.go
+++ b/federationsender/storage/shared/storage.go
@@ -27,15 +27,17 @@ import (
)
type Database struct {
- DB *sql.DB
- Cache caching.FederationSenderCache
- Writer sqlutil.Writer
- FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
- FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
- FederationSenderQueueJSON tables.FederationSenderQueueJSON
- FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
- FederationSenderRooms tables.FederationSenderRooms
- FederationSenderBlacklist tables.FederationSenderBlacklist
+ DB *sql.DB
+ Cache caching.FederationSenderCache
+ Writer sqlutil.Writer
+ FederationSenderQueuePDUs tables.FederationSenderQueuePDUs
+ FederationSenderQueueEDUs tables.FederationSenderQueueEDUs
+ FederationSenderQueueJSON tables.FederationSenderQueueJSON
+ FederationSenderJoinedHosts tables.FederationSenderJoinedHosts
+ FederationSenderRooms tables.FederationSenderRooms
+ FederationSenderBlacklist tables.FederationSenderBlacklist
+ FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks
+ FederationSenderInboundPeeks tables.FederationSenderInboundPeeks
}
// An Receipt contains the NIDs of a call to GetNextTransactionPDUs/EDUs.
@@ -173,3 +175,43 @@ func (d *Database) RemoveServerFromBlacklist(serverName gomatrixserverlib.Server
func (d *Database) IsServerBlacklisted(serverName gomatrixserverlib.ServerName) (bool, error) {
return d.FederationSenderBlacklist.SelectBlacklist(context.TODO(), nil, serverName)
}
+
+func (d *Database) AddOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderOutboundPeeks.InsertOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) RenewOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderOutboundPeeks.RenewOutboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) GetOutboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.OutboundPeek, error) {
+ return d.FederationSenderOutboundPeeks.SelectOutboundPeek(ctx, nil, serverName, roomID, peekID)
+}
+
+func (d *Database) GetOutboundPeeks(ctx context.Context, roomID string) ([]types.OutboundPeek, error) {
+ return d.FederationSenderOutboundPeeks.SelectOutboundPeeks(ctx, nil, roomID)
+}
+
+func (d *Database) AddInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderInboundPeeks.InsertInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) RenewInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) error {
+ return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
+ return d.FederationSenderInboundPeeks.RenewInboundPeek(ctx, txn, serverName, roomID, peekID, renewalInterval)
+ })
+}
+
+func (d *Database) GetInboundPeek(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID, peekID string) (*types.InboundPeek, error) {
+ return d.FederationSenderInboundPeeks.SelectInboundPeek(ctx, nil, serverName, roomID, peekID)
+}
+
+func (d *Database) GetInboundPeeks(ctx context.Context, roomID string) ([]types.InboundPeek, error) {
+ return d.FederationSenderInboundPeeks.SelectInboundPeeks(ctx, nil, roomID)
+}
diff --git a/federationsender/storage/sqlite3/inbound_peeks_table.go b/federationsender/storage/sqlite3/inbound_peeks_table.go
new file mode 100644
index 00000000..d5eacf9e
--- /dev/null
+++ b/federationsender/storage/sqlite3/inbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const inboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_inbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts INTEGER NOT NULL,
+ renewed_ts INTEGER NOT NULL,
+ renewal_interval INTEGER NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertInboundPeekSQL = "" +
+ "INSERT INTO federationsender_inbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectInboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectInboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+const renewInboundPeekSQL = "" +
+ "UPDATE federationsender_inbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteInboundPeekSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteInboundPeeksSQL = "" +
+ "DELETE FROM federationsender_inbound_peeks WHERE room_id = $1"
+
+type inboundPeeksStatements struct {
+ db *sql.DB
+ insertInboundPeekStmt *sql.Stmt
+ selectInboundPeekStmt *sql.Stmt
+ selectInboundPeeksStmt *sql.Stmt
+ renewInboundPeekStmt *sql.Stmt
+ deleteInboundPeekStmt *sql.Stmt
+ deleteInboundPeeksStmt *sql.Stmt
+}
+
+func NewSQLiteInboundPeeksTable(db *sql.DB) (s *inboundPeeksStatements, err error) {
+ s = &inboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(inboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertInboundPeekStmt, err = db.Prepare(insertInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeekStmt, err = db.Prepare(selectInboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectInboundPeeksStmt, err = db.Prepare(selectInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewInboundPeekStmt, err = db.Prepare(renewInboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeeksStmt, err = db.Prepare(deleteInboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteInboundPeekStmt, err = db.Prepare(deleteInboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *inboundPeeksStatements) InsertInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertInboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *inboundPeeksStatements) RenewInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewInboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.InboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryRowContext(ctx, roomID)
+ inboundPeek := types.InboundPeek{}
+ err := row.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &inboundPeek, nil
+}
+
+func (s *inboundPeeksStatements) SelectInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (inboundPeeks []types.InboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectInboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectInboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ inboundPeek := types.InboundPeek{}
+ if err = rows.Scan(
+ &inboundPeek.RoomID,
+ &inboundPeek.ServerName,
+ &inboundPeek.PeekID,
+ &inboundPeek.CreationTimestamp,
+ &inboundPeek.RenewedTimestamp,
+ &inboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ inboundPeeks = append(inboundPeeks, inboundPeek)
+ }
+
+ return inboundPeeks, rows.Err()
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *inboundPeeksStatements) DeleteInboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteInboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/sqlite3/outbound_peeks_table.go b/federationsender/storage/sqlite3/outbound_peeks_table.go
new file mode 100644
index 00000000..02aefce7
--- /dev/null
+++ b/federationsender/storage/sqlite3/outbound_peeks_table.go
@@ -0,0 +1,176 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sqlite3
+
+import (
+ "context"
+ "database/sql"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationsender/types"
+ "github.com/matrix-org/dendrite/internal"
+ "github.com/matrix-org/dendrite/internal/sqlutil"
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+const outboundPeeksSchema = `
+CREATE TABLE IF NOT EXISTS federationsender_outbound_peeks (
+ room_id TEXT NOT NULL,
+ server_name TEXT NOT NULL,
+ peek_id TEXT NOT NULL,
+ creation_ts INTEGER NOT NULL,
+ renewed_ts INTEGER NOT NULL,
+ renewal_interval INTEGER NOT NULL,
+ UNIQUE (room_id, server_name, peek_id)
+);
+`
+
+const insertOutboundPeekSQL = "" +
+ "INSERT INTO federationsender_outbound_peeks (room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval) VALUES ($1, $2, $3, $4, $5, $6)"
+
+const selectOutboundPeekSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2 and peek_id = $3"
+
+const selectOutboundPeeksSQL = "" +
+ "SELECT room_id, server_name, peek_id, creation_ts, renewed_ts, renewal_interval FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+const renewOutboundPeekSQL = "" +
+ "UPDATE federationsender_outbound_peeks SET renewed_ts=$1, renewal_interval=$2 WHERE room_id = $3 and server_name = $4 and peek_id = $5"
+
+const deleteOutboundPeekSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1 and server_name = $2"
+
+const deleteOutboundPeeksSQL = "" +
+ "DELETE FROM federationsender_outbound_peeks WHERE room_id = $1"
+
+type outboundPeeksStatements struct {
+ db *sql.DB
+ insertOutboundPeekStmt *sql.Stmt
+ selectOutboundPeekStmt *sql.Stmt
+ selectOutboundPeeksStmt *sql.Stmt
+ renewOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeekStmt *sql.Stmt
+ deleteOutboundPeeksStmt *sql.Stmt
+}
+
+func NewSQLiteOutboundPeeksTable(db *sql.DB) (s *outboundPeeksStatements, err error) {
+ s = &outboundPeeksStatements{
+ db: db,
+ }
+ _, err = db.Exec(outboundPeeksSchema)
+ if err != nil {
+ return
+ }
+
+ if s.insertOutboundPeekStmt, err = db.Prepare(insertOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeekStmt, err = db.Prepare(selectOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.selectOutboundPeeksStmt, err = db.Prepare(selectOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.renewOutboundPeekStmt, err = db.Prepare(renewOutboundPeekSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeeksStmt, err = db.Prepare(deleteOutboundPeeksSQL); err != nil {
+ return
+ }
+ if s.deleteOutboundPeekStmt, err = db.Prepare(deleteOutboundPeekSQL); err != nil {
+ return
+ }
+ return
+}
+
+func (s *outboundPeeksStatements) InsertOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ stmt := sqlutil.TxStmt(txn, s.insertOutboundPeekStmt)
+ _, err = stmt.ExecContext(ctx, roomID, serverName, peekID, nowMilli, nowMilli, renewalInterval)
+ return
+}
+
+func (s *outboundPeeksStatements) RenewOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64,
+) (err error) {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ _, err = sqlutil.TxStmt(txn, s.renewOutboundPeekStmt).ExecContext(ctx, nowMilli, renewalInterval, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (*types.OutboundPeek, error) {
+ row := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryRowContext(ctx, roomID)
+ outboundPeek := types.OutboundPeek{}
+ err := row.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ )
+ if err == sql.ErrNoRows {
+ return nil, nil
+ }
+ if err != nil {
+ return nil, err
+ }
+ return &outboundPeek, nil
+}
+
+func (s *outboundPeeksStatements) SelectOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (outboundPeeks []types.OutboundPeek, err error) {
+ rows, err := sqlutil.TxStmt(txn, s.selectOutboundPeeksStmt).QueryContext(ctx, roomID)
+ if err != nil {
+ return
+ }
+ defer internal.CloseAndLogIfError(ctx, rows, "SelectOutboundPeeks: rows.close() failed")
+
+ for rows.Next() {
+ outboundPeek := types.OutboundPeek{}
+ if err = rows.Scan(
+ &outboundPeek.RoomID,
+ &outboundPeek.ServerName,
+ &outboundPeek.PeekID,
+ &outboundPeek.CreationTimestamp,
+ &outboundPeek.RenewedTimestamp,
+ &outboundPeek.RenewalInterval,
+ ); err != nil {
+ return
+ }
+ outboundPeeks = append(outboundPeeks, outboundPeek)
+ }
+
+ return outboundPeeks, rows.Err()
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeek(
+ ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeekStmt).ExecContext(ctx, roomID, serverName, peekID)
+ return
+}
+
+func (s *outboundPeeksStatements) DeleteOutboundPeeks(
+ ctx context.Context, txn *sql.Tx, roomID string,
+) (err error) {
+ _, err = sqlutil.TxStmt(txn, s.deleteOutboundPeeksStmt).ExecContext(ctx, roomID)
+ return
+}
diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go
index e66d7690..2b135858 100644
--- a/federationsender/storage/sqlite3/storage.go
+++ b/federationsender/storage/sqlite3/storage.go
@@ -66,16 +66,26 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS
if err != nil {
return nil, err
}
+ outboundPeeks, err := NewSQLiteOutboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
+ inboundPeeks, err := NewSQLiteInboundPeeksTable(d.db)
+ if err != nil {
+ return nil, err
+ }
d.Database = shared.Database{
- DB: d.db,
- Cache: cache,
- Writer: d.writer,
- FederationSenderJoinedHosts: joinedHosts,
- FederationSenderQueuePDUs: queuePDUs,
- FederationSenderQueueEDUs: queueEDUs,
- FederationSenderQueueJSON: queueJSON,
- FederationSenderRooms: rooms,
- FederationSenderBlacklist: blacklist,
+ DB: d.db,
+ Cache: cache,
+ Writer: d.writer,
+ FederationSenderJoinedHosts: joinedHosts,
+ FederationSenderQueuePDUs: queuePDUs,
+ FederationSenderQueueEDUs: queueEDUs,
+ FederationSenderQueueJSON: queueJSON,
+ FederationSenderRooms: rooms,
+ FederationSenderBlacklist: blacklist,
+ FederationSenderOutboundPeeks: outboundPeeks,
+ FederationSenderInboundPeeks: inboundPeeks,
}
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "federationsender"); err != nil {
return nil, err
diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go
index 69e952de..22fd5554 100644
--- a/federationsender/storage/tables/interface.go
+++ b/federationsender/storage/tables/interface.go
@@ -67,3 +67,21 @@ type FederationSenderBlacklist interface {
SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error)
DeleteBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error
}
+
+type FederationSenderOutboundPeeks interface {
+ InsertOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ RenewOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ SelectOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (outboundPeek *types.OutboundPeek, err error)
+ SelectOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (outboundPeeks []types.OutboundPeek, err error)
+ DeleteOutboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
+ DeleteOutboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
+}
+
+type FederationSenderInboundPeeks interface {
+ InsertInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ RenewInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string, renewalInterval int64) (err error)
+ SelectInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (inboundPeek *types.InboundPeek, err error)
+ SelectInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (inboundPeeks []types.InboundPeek, err error)
+ DeleteInboundPeek(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName, roomID, peekID string) (err error)
+ DeleteInboundPeeks(ctx context.Context, txn *sql.Tx, roomID string) (err error)
+}
diff --git a/federationsender/types/types.go b/federationsender/types/types.go
index 398d3267..90da310c 100644
--- a/federationsender/types/types.go
+++ b/federationsender/types/types.go
@@ -49,3 +49,23 @@ func (e EventIDMismatchError) Error() string {
e.DatabaseID, e.RoomServerID,
)
}
+
+// tracks peeks we're performing on another server over federation
+type OutboundPeek struct {
+ PeekID string
+ RoomID string
+ ServerName gomatrixserverlib.ServerName
+ CreationTimestamp int64
+ RenewedTimestamp int64
+ RenewalInterval int64
+}
+
+// tracks peeks other servers are performing on us over federation
+type InboundPeek struct {
+ PeekID string
+ RoomID string
+ ServerName gomatrixserverlib.ServerName
+ CreationTimestamp int64
+ RenewedTimestamp int64
+ RenewalInterval int64
+}
diff --git a/go.mod b/go.mod
index a0fee103..bfe605fd 100644
--- a/go.mod
+++ b/go.mod
@@ -41,7 +41,7 @@ require (
go.uber.org/atomic v1.6.0
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b
- golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78 // indirect
+ golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
gopkg.in/h2non/bimg.v1 v1.1.4
gopkg.in/yaml.v2 v2.3.0
)
diff --git a/go.sum b/go.sum
index 7accf175..f411660f 100644
--- a/go.sum
+++ b/go.sum
@@ -995,8 +995,8 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78 h1:nVuTkr9L6Bq62qpUqKo/RnZCFfzDBL0bYo6w9OJUqZY=
-golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
+golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
diff --git a/roomserver/api/api.go b/roomserver/api/api.go
index cedd6193..72e406ee 100644
--- a/roomserver/api/api.go
+++ b/roomserver/api/api.go
@@ -56,6 +56,12 @@ type RoomserverInternalAPI interface {
res *PerformPublishResponse,
)
+ PerformInboundPeek(
+ ctx context.Context,
+ req *PerformInboundPeekRequest,
+ res *PerformInboundPeekResponse,
+ ) error
+
QueryPublishedRooms(
ctx context.Context,
req *QueryPublishedRoomsRequest,
diff --git a/roomserver/api/api_trace.go b/roomserver/api/api_trace.go
index 40745975..1a2b9a49 100644
--- a/roomserver/api/api_trace.go
+++ b/roomserver/api/api_trace.go
@@ -88,6 +88,16 @@ func (t *RoomserverInternalAPITrace) PerformPublish(
util.GetLogger(ctx).Infof("PerformPublish req=%+v res=%+v", js(req), js(res))
}
+func (t *RoomserverInternalAPITrace) PerformInboundPeek(
+ ctx context.Context,
+ req *PerformInboundPeekRequest,
+ res *PerformInboundPeekResponse,
+) error {
+ err := t.Impl.PerformInboundPeek(ctx, req, res)
+ util.GetLogger(ctx).Infof("PerformInboundPeek req=%+v res=%+v", js(req), js(res))
+ return err
+}
+
func (t *RoomserverInternalAPITrace) QueryPublishedRooms(
ctx context.Context,
req *QueryPublishedRoomsRequest,
diff --git a/roomserver/api/output.go b/roomserver/api/output.go
index 2993813c..d60d1cc8 100644
--- a/roomserver/api/output.go
+++ b/roomserver/api/output.go
@@ -51,6 +51,8 @@ const (
// OutputTypeNewPeek indicates that the kafka event is an OutputNewPeek
OutputTypeNewPeek OutputType = "new_peek"
+ // OutputTypeNewInboundPeek indicates that the kafka event is an OutputNewInboundPeek
+ OutputTypeNewInboundPeek OutputType = "new_inbound_peek"
// OutputTypeRetirePeek indicates that the kafka event is an OutputRetirePeek
OutputTypeRetirePeek OutputType = "retire_peek"
)
@@ -72,6 +74,8 @@ type OutputEvent struct {
RedactedEvent *OutputRedactedEvent `json:"redacted_event,omitempty"`
// The content of event with type OutputTypeNewPeek
NewPeek *OutputNewPeek `json:"new_peek,omitempty"`
+ // The content of event with type OutputTypeNewInboundPeek
+ NewInboundPeek *OutputNewInboundPeek `json:"new_inbound_peek,omitempty"`
// The content of event with type OutputTypeRetirePeek
RetirePeek *OutputRetirePeek `json:"retire_peek,omitempty"`
}
@@ -245,6 +249,19 @@ type OutputNewPeek struct {
DeviceID string
}
+// An OutputNewInboundPeek is written whenever a server starts peeking into a room
+type OutputNewInboundPeek struct {
+ RoomID string
+ PeekID string
+ // the event ID at which the peek begins (so we can avoid
+ // a race between tracking the state returned by /peek and emitting subsequent
+ // peeked events)
+ LatestEventID string
+ ServerName gomatrixserverlib.ServerName
+ // how often we told the peeking server to renew the peek
+ RenewalInterval int64
+}
+
// An OutputRetirePeek is written whenever a user stops peeking into a room.
type OutputRetirePeek struct {
RoomID string
diff --git a/roomserver/api/perform.go b/roomserver/api/perform.go
index ae2d6d97..51cbcb1a 100644
--- a/roomserver/api/perform.go
+++ b/roomserver/api/perform.go
@@ -172,6 +172,28 @@ type PerformPublishResponse struct {
Error *PerformError
}
+type PerformInboundPeekRequest struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ PeekID string `json:"peek_id"`
+ ServerName gomatrixserverlib.ServerName `json:"server_name"`
+ RenewalInterval int64 `json:"renewal_interval"`
+}
+
+type PerformInboundPeekResponse struct {
+ // Does the room exist on this roomserver?
+ // If the room doesn't exist this will be false and StateEvents will be empty.
+ RoomExists bool `json:"room_exists"`
+ // The room version of the room.
+ RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
+ // The current state and auth chain events.
+ // The lists will be in an arbitrary order.
+ StateEvents []*gomatrixserverlib.HeaderedEvent `json:"state_events"`
+ AuthChainEvents []*gomatrixserverlib.HeaderedEvent `json:"auth_chain_events"`
+ // The event at which this state was captured
+ LatestEvent *gomatrixserverlib.HeaderedEvent `json:"latest_event"`
+}
+
// PerformForgetRequest is a request to PerformForget
type PerformForgetRequest struct {
RoomID string `json:"room_id"`
diff --git a/roomserver/api/query.go b/roomserver/api/query.go
index 43e562a9..43bbfd16 100644
--- a/roomserver/api/query.go
+++ b/roomserver/api/query.go
@@ -221,7 +221,7 @@ type QueryStateAndAuthChainRequest struct {
// The room ID to query the state in.
RoomID string `json:"room_id"`
// The list of prev events for the event. Used to calculate the state at
- // the event
+ // the event.
PrevEventIDs []string `json:"prev_event_ids"`
// The list of auth events for the event. Used to calculate the auth chain
AuthEventIDs []string `json:"auth_event_ids"`
diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go
index 7779dbde..a6ef735c 100644
--- a/roomserver/api/wrapper.go
+++ b/roomserver/api/wrapper.go
@@ -43,7 +43,7 @@ func SendEvents(
// SendEventWithState writes an event with the specified kind to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is
-// marked as `true` in haveEventIDs
+// marked as `true` in haveEventIDs.
func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
diff --git a/roomserver/internal/api.go b/roomserver/internal/api.go
index 91caa0bd..e10bdb46 100644
--- a/roomserver/internal/api.go
+++ b/roomserver/internal/api.go
@@ -24,6 +24,7 @@ type RoomserverInternalAPI struct {
*perform.Inviter
*perform.Joiner
*perform.Peeker
+ *perform.InboundPeeker
*perform.Unpeeker
*perform.Leaver
*perform.Publisher
@@ -97,6 +98,10 @@ func (r *RoomserverInternalAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSen
FSAPI: r.fsAPI,
Inputer: r.Inputer,
}
+ r.InboundPeeker = &perform.InboundPeeker{
+ DB: r.DB,
+ Inputer: r.Inputer,
+ }
r.Unpeeker = &perform.Unpeeker{
ServerName: r.Cfg.Matrix.ServerName,
Cfg: r.Cfg,
diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go
new file mode 100644
index 00000000..eb3c9727
--- /dev/null
+++ b/roomserver/internal/perform/perform_inbound_peek.go
@@ -0,0 +1,129 @@
+// Copyright 2020 New Vector Ltd
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package perform
+
+import (
+ "context"
+
+ "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/roomserver/internal/helpers"
+ "github.com/matrix-org/dendrite/roomserver/internal/input"
+ "github.com/matrix-org/dendrite/roomserver/internal/query"
+ "github.com/matrix-org/dendrite/roomserver/state"
+ "github.com/matrix-org/dendrite/roomserver/storage"
+ "github.com/matrix-org/dendrite/roomserver/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+)
+
+type InboundPeeker struct {
+ DB storage.Database
+ Inputer *input.Inputer
+}
+
+// PerformInboundPeek handles peeking into matrix rooms, including over
+// federation by talking to the federationsender. called when a remote server
+// initiates a /peek over federation.
+//
+// It should atomically figure out the current state of the room (for the
+// response to /peek) while adding the new inbound peek to the kafka stream so the
+// fed sender can start sending peeked events without a race between the state
+// snapshot and the stream of peeked events.
+func (r *InboundPeeker) PerformInboundPeek(
+ ctx context.Context,
+ request *api.PerformInboundPeekRequest,
+ response *api.PerformInboundPeekResponse,
+) error {
+ info, err := r.DB.RoomInfo(ctx, request.RoomID)
+ if err != nil {
+ return err
+ }
+ if info == nil || info.IsStub {
+ return nil
+ }
+ response.RoomExists = true
+ response.RoomVersion = info.RoomVersion
+
+ var stateEvents []*gomatrixserverlib.Event
+
+ var currentStateSnapshotNID types.StateSnapshotNID
+ latestEventRefs, currentStateSnapshotNID, _, err :=
+ r.DB.LatestEventIDs(ctx, info.RoomNID)
+ if err != nil {
+ return err
+ }
+ latestEvents, err := r.DB.EventsFromIDs(ctx, []string{latestEventRefs[0].EventID})
+ if err != nil {
+ return err
+ }
+ var sortedLatestEvents []*gomatrixserverlib.Event
+ for _, ev := range latestEvents {
+ sortedLatestEvents = append(sortedLatestEvents, ev.Event)
+ }
+ sortedLatestEvents = gomatrixserverlib.ReverseTopologicalOrdering(
+ sortedLatestEvents,
+ gomatrixserverlib.TopologicalOrderByPrevEvents,
+ )
+ response.LatestEvent = sortedLatestEvents[0].Headered(info.RoomVersion)
+
+ // XXX: do we actually need to do a state resolution here?
+ roomState := state.NewStateResolution(r.DB, *info)
+
+ var stateEntries []types.StateEntry
+ stateEntries, err = roomState.LoadStateAtSnapshot(
+ ctx, currentStateSnapshotNID,
+ )
+ if err != nil {
+ return err
+ }
+ stateEvents, err = helpers.LoadStateEvents(ctx, r.DB, stateEntries)
+ if err != nil {
+ return err
+ }
+
+ // get the auth event IDs for the current state events
+ var authEventIDs []string
+ for _, se := range stateEvents {
+ authEventIDs = append(authEventIDs, se.AuthEventIDs()...)
+ }
+ authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
+
+ authEvents, err := query.GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
+ if err != nil {
+ return err
+ }
+
+ for _, event := range stateEvents {
+ response.StateEvents = append(response.StateEvents, event.Headered(info.RoomVersion))
+ }
+
+ for _, event := range authEvents {
+ response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(info.RoomVersion))
+ }
+
+ err = r.Inputer.WriteOutputEvents(request.RoomID, []api.OutputEvent{
+ {
+ Type: api.OutputTypeNewInboundPeek,
+ NewInboundPeek: &api.OutputNewInboundPeek{
+ RoomID: request.RoomID,
+ PeekID: request.PeekID,
+ LatestEventID: latestEvents[0].EventID(),
+ ServerName: request.ServerName,
+ RenewalInterval: request.RenewalInterval,
+ },
+ },
+ })
+ return err
+}
diff --git a/roomserver/internal/perform/perform_peek.go b/roomserver/internal/perform/perform_peek.go
index 2f4694c8..443276cd 100644
--- a/roomserver/internal/perform/perform_peek.go
+++ b/roomserver/internal/perform/perform_peek.go
@@ -151,11 +151,28 @@ func (r *Peeker) performPeekRoomByID(
}
}
- // If the server name in the room ID isn't ours then it's a
- // possible candidate for finding the room via federation. Add
- // it to the list of servers to try.
+ // handle federated peeks
+ // FIXME: don't create an outbound peek if we already have one going.
if domain != r.Cfg.Matrix.ServerName {
+ // If the server name in the room ID isn't ours then it's a
+ // possible candidate for finding the room via federation. Add
+ // it to the list of servers to try.
req.ServerNames = append(req.ServerNames, domain)
+
+ // Try peeking by all of the supplied server names.
+ fedReq := fsAPI.PerformOutboundPeekRequest{
+ RoomID: req.RoomIDOrAlias, // the room ID to try and peek
+ ServerNames: req.ServerNames, // the servers to try peeking via
+ }
+ fedRes := fsAPI.PerformOutboundPeekResponse{}
+ _ = r.FSAPI.PerformOutboundPeek(ctx, &fedReq, &fedRes)
+ if fedRes.LastError != nil {
+ return "", &api.PerformError{
+ Code: api.PerformErrRemote,
+ Msg: fedRes.LastError.Message,
+ RemoteCode: fedRes.LastError.Code,
+ }
+ }
}
// If this room isn't world_readable, we reject.
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
index 7346c7a7..2a361641 100644
--- a/roomserver/internal/query/query.go
+++ b/roomserver/internal/query/query.go
@@ -107,7 +107,7 @@ func (r *Queryer) QueryStateAfterEvents(
}
authEventIDs = util.UniqueStrings(authEventIDs)
- authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
+ authEvents, err := GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil {
return fmt.Errorf("getAuthChain: %w", err)
}
@@ -447,10 +447,12 @@ func (r *Queryer) QueryStateAndAuthChain(
response.RoomExists = true
response.RoomVersion = info.RoomVersion
- stateEvents, err := r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs)
+ var stateEvents []*gomatrixserverlib.Event
+ stateEvents, err = r.loadStateAtEventIDs(ctx, *info, request.PrevEventIDs)
if err != nil {
return err
}
+
response.PrevEventsExist = true
// add the auth event IDs for the current state events too
@@ -461,7 +463,7 @@ func (r *Queryer) QueryStateAndAuthChain(
}
authEventIDs = util.UniqueStrings(authEventIDs) // de-dupe
- authEvents, err := getAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
+ authEvents, err := GetAuthChain(ctx, r.DB.EventsFromIDs, authEventIDs)
if err != nil {
return err
}
@@ -510,11 +512,11 @@ func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomIn
type eventsFromIDs func(context.Context, []string) ([]types.Event, error)
-// getAuthChain fetches the auth chain for the given auth events. An auth chain
+// GetAuthChain fetches the auth chain for the given auth events. An auth chain
// is the list of all events that are referenced in the auth_events section, and
// all their auth_events, recursively. The returned set of events contain the
// given events. Will *not* error if we don't have all auth events.
-func getAuthChain(
+func GetAuthChain(
ctx context.Context, fn eventsFromIDs, authEventIDs []string,
) ([]*gomatrixserverlib.Event, error) {
// List of event IDs to fetch. On each pass, these events will be requested
@@ -718,7 +720,7 @@ func (r *Queryer) QueryServerBannedFromRoom(ctx context.Context, req *api.QueryS
}
func (r *Queryer) QueryAuthChain(ctx context.Context, req *api.QueryAuthChainRequest, res *api.QueryAuthChainResponse) error {
- chain, err := getAuthChain(ctx, r.DB.EventsFromIDs, req.EventIDs)
+ chain, err := GetAuthChain(ctx, r.DB.EventsFromIDs, req.EventIDs)
if err != nil {
return err
}
diff --git a/roomserver/internal/query/query_test.go b/roomserver/internal/query/query_test.go
index 4e761d8e..ba5bb9f5 100644
--- a/roomserver/internal/query/query_test.go
+++ b/roomserver/internal/query/query_test.go
@@ -106,7 +106,7 @@ func TestGetAuthChainSingle(t *testing.T) {
t.Fatalf("Failed to add events to db: %v", err)
}
- result, err := getAuthChain(context.TODO(), db.EventsFromIDs, []string{"e"})
+ result, err := GetAuthChain(context.TODO(), db.EventsFromIDs, []string{"e"})
if err != nil {
t.Fatalf("getAuthChain failed: %v", err)
}
@@ -139,7 +139,7 @@ func TestGetAuthChainMultiple(t *testing.T) {
t.Fatalf("Failed to add events to db: %v", err)
}
- result, err := getAuthChain(context.TODO(), db.EventsFromIDs, []string{"e", "f"})
+ result, err := GetAuthChain(context.TODO(), db.EventsFromIDs, []string{"e", "f"})
if err != nil {
t.Fatalf("getAuthChain failed: %v", err)
}
diff --git a/roomserver/inthttp/client.go b/roomserver/inthttp/client.go
index cac813ff..6774d102 100644
--- a/roomserver/inthttp/client.go
+++ b/roomserver/inthttp/client.go
@@ -26,14 +26,15 @@ const (
RoomserverInputRoomEventsPath = "/roomserver/inputRoomEvents"
// Perform operations
- RoomserverPerformInvitePath = "/roomserver/performInvite"
- RoomserverPerformPeekPath = "/roomserver/performPeek"
- RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
- RoomserverPerformJoinPath = "/roomserver/performJoin"
- RoomserverPerformLeavePath = "/roomserver/performLeave"
- RoomserverPerformBackfillPath = "/roomserver/performBackfill"
- RoomserverPerformPublishPath = "/roomserver/performPublish"
- RoomserverPerformForgetPath = "/roomserver/performForget"
+ RoomserverPerformInvitePath = "/roomserver/performInvite"
+ RoomserverPerformPeekPath = "/roomserver/performPeek"
+ RoomserverPerformUnpeekPath = "/roomserver/performUnpeek"
+ RoomserverPerformJoinPath = "/roomserver/performJoin"
+ RoomserverPerformLeavePath = "/roomserver/performLeave"
+ RoomserverPerformBackfillPath = "/roomserver/performBackfill"
+ RoomserverPerformPublishPath = "/roomserver/performPublish"
+ RoomserverPerformInboundPeekPath = "/roomserver/performInboundPeek"
+ RoomserverPerformForgetPath = "/roomserver/performForget"
// Query operations
RoomserverQueryLatestEventsAndStatePath = "/roomserver/queryLatestEventsAndState"
@@ -216,6 +217,18 @@ func (h *httpRoomserverInternalAPI) PerformPeek(
}
}
+func (h *httpRoomserverInternalAPI) PerformInboundPeek(
+ ctx context.Context,
+ request *api.PerformInboundPeekRequest,
+ response *api.PerformInboundPeekResponse,
+) error {
+ span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInboundPeek")
+ defer span.Finish()
+
+ apiURL := h.roomserverURL + RoomserverPerformInboundPeekPath
+ return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
+}
+
func (h *httpRoomserverInternalAPI) PerformUnpeek(
ctx context.Context,
request *api.PerformUnpeekRequest,
diff --git a/roomserver/inthttp/server.go b/roomserver/inthttp/server.go
index f9c8ef9f..bf319262 100644
--- a/roomserver/inthttp/server.go
+++ b/roomserver/inthttp/server.go
@@ -72,6 +72,19 @@ func AddRoutes(r api.RoomserverInternalAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
+ internalAPIMux.Handle(RoomserverPerformInboundPeekPath,
+ httputil.MakeInternalAPI("performInboundPeek", func(req *http.Request) util.JSONResponse {
+ var request api.PerformInboundPeekRequest
+ var response api.PerformInboundPeekResponse
+ if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
+ return util.MessageResponse(http.StatusBadRequest, err.Error())
+ }
+ if err := r.PerformInboundPeek(req.Context(), &request, &response); err != nil {
+ return util.ErrorResponse(err)
+ }
+ return util.JSONResponse{Code: http.StatusOK, JSON: &response}
+ }),
+ )
internalAPIMux.Handle(RoomserverPerformPeekPath,
httputil.MakeInternalAPI("performUnpeek", func(req *http.Request) util.JSONResponse {
var request api.PerformUnpeekRequest
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 4ccc8a48..49fa1a16 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -372,7 +372,7 @@ type Response struct {
Leave map[string]LeaveResponse `json:"leave"`
} `json:"rooms"`
ToDevice struct {
- Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"`
+ Events []gomatrixserverlib.SendToDeviceEvent `json:"events"`
} `json:"to_device"`
DeviceLists struct {
Changed []string `json:"changed,omitempty"`