aboutsummaryrefslogtreecommitdiff
path: root/federationsender/internal/perform.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/internal/perform.go')
-rw-r--r--federationsender/internal/perform.go168
1 files changed, 167 insertions, 1 deletions
diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go
index 45f33ff7..3adf8fc9 100644
--- a/federationsender/internal/perform.go
+++ b/federationsender/internal/perform.go
@@ -234,7 +234,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing)
respState, err := joinCtx.CheckSendJoinResponse(
- ctx, event, serverName, respMakeJoin, respSendJoin,
+ ctx, event, serverName, respSendJoin,
)
if err != nil {
logrus.WithFields(logrus.Fields{
@@ -266,6 +266,172 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
return nil
}
+// PerformOutboundPeekRequest implements api.FederationSenderInternalAPI
+func (r *FederationSenderInternalAPI) PerformOutboundPeek(
+ ctx context.Context,
+ request *api.PerformOutboundPeekRequest,
+ response *api.PerformOutboundPeekResponse,
+) error {
+ // Look up the supported room versions.
+ var supportedVersions []gomatrixserverlib.RoomVersion
+ for version := range version.SupportedRoomVersions() {
+ supportedVersions = append(supportedVersions, version)
+ }
+
+ // Deduplicate the server names we were provided but keep the ordering
+ // as this encodes useful information about which servers are most likely
+ // to respond.
+ seenSet := make(map[gomatrixserverlib.ServerName]bool)
+ var uniqueList []gomatrixserverlib.ServerName
+ for _, srv := range request.ServerNames {
+ if seenSet[srv] {
+ continue
+ }
+ seenSet[srv] = true
+ uniqueList = append(uniqueList, srv)
+ }
+ request.ServerNames = uniqueList
+
+ // See if there's an existing outbound peek for this room ID with
+ // one of the specified servers.
+ if peeks, err := r.db.GetOutboundPeeks(ctx, request.RoomID); err == nil {
+ for _, peek := range peeks {
+ if _, ok := seenSet[peek.ServerName]; ok {
+ return nil
+ }
+ }
+ }
+
+ // Try each server that we were provided until we land on one that
+ // successfully completes the peek
+ var lastErr error
+ for _, serverName := range request.ServerNames {
+ if err := r.performOutboundPeekUsingServer(
+ ctx,
+ request.RoomID,
+ serverName,
+ supportedVersions,
+ ); err != nil {
+ logrus.WithError(err).WithFields(logrus.Fields{
+ "server_name": serverName,
+ "room_id": request.RoomID,
+ }).Warnf("Failed to peek room through server")
+ lastErr = err
+ continue
+ }
+
+ // We're all good.
+ return nil
+ }
+
+ // If we reach here then we didn't complete a peek for some reason.
+ var httpErr gomatrix.HTTPError
+ if ok := errors.As(lastErr, &httpErr); ok {
+ httpErr.Message = string(httpErr.Contents)
+ // Clear the wrapped error, else serialising to JSON (in polylith mode) will fail
+ httpErr.WrappedError = nil
+ response.LastError = &httpErr
+ } else {
+ response.LastError = &gomatrix.HTTPError{
+ Code: 0,
+ WrappedError: nil,
+ Message: lastErr.Error(),
+ }
+ }
+
+ logrus.Errorf(
+ "failed to peek room %q through %d server(s): last error %s",
+ request.RoomID, len(request.ServerNames), lastErr,
+ )
+
+ return lastErr
+}
+
+func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
+ ctx context.Context,
+ roomID string,
+ serverName gomatrixserverlib.ServerName,
+ supportedVersions []gomatrixserverlib.RoomVersion,
+) error {
+ // create a unique ID for this peek.
+ // for now we just use the room ID again. In future, if we ever
+ // support concurrent peeks to the same room with different filters
+ // then we would need to disambiguate further.
+ peekID := roomID
+
+ // check whether we're peeking already to try to avoid needlessly
+ // re-peeking on the server. we don't need a transaction for this,
+ // given this is a nice-to-have.
+ outboundPeek, err := r.db.GetOutboundPeek(ctx, serverName, roomID, peekID)
+ if err != nil {
+ return err
+ }
+ renewing := false
+ if outboundPeek != nil {
+ nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
+ if nowMilli > outboundPeek.RenewedTimestamp+outboundPeek.RenewalInterval {
+ logrus.Infof("stale outbound peek to %s for %s already exists; renewing", serverName, roomID)
+ renewing = true
+ } else {
+ logrus.Infof("live outbound peek to %s for %s already exists", serverName, roomID)
+ return nil
+ }
+ }
+
+ // Try to perform an outbound /peek using the information supplied in the
+ // request.
+ respPeek, err := r.federation.Peek(
+ ctx,
+ serverName,
+ roomID,
+ peekID,
+ supportedVersions,
+ )
+ if err != nil {
+ r.statistics.ForServer(serverName).Failure()
+ return fmt.Errorf("r.federation.Peek: %w", err)
+ }
+ r.statistics.ForServer(serverName).Success()
+
+ // Work out if we support the room version that has been supplied in
+ // the peek response.
+ if respPeek.RoomVersion == "" {
+ respPeek.RoomVersion = gomatrixserverlib.RoomVersionV1
+ }
+ if _, err = respPeek.RoomVersion.EventFormat(); err != nil {
+ return fmt.Errorf("respPeek.RoomVersion.EventFormat: %w", err)
+ }
+
+ // TODO: authenticate the state returned (check its auth events etc)
+ // the equivalent of CheckSendJoinResponse()
+
+ // If we've got this far, the remote server is peeking.
+ if renewing {
+ if err = r.db.RenewOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
+ return err
+ }
+ } else {
+ if err = r.db.AddOutboundPeek(ctx, serverName, roomID, peekID, respPeek.RenewalInterval); err != nil {
+ return err
+ }
+ }
+
+ respState := respPeek.ToRespState()
+ // logrus.Warnf("got respPeek %#v", respPeek)
+ // Send the newly returned state to the roomserver to update our local view.
+ if err = roomserverAPI.SendEventWithState(
+ ctx, r.rsAPI,
+ roomserverAPI.KindNew,
+ &respState,
+ respPeek.LatestEvent.Headered(respPeek.RoomVersion),
+ nil,
+ ); err != nil {
+ return fmt.Errorf("r.producer.SendEventWithState: %w", err)
+ }
+
+ return nil
+}
+
// PerformLeaveRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformLeave(
ctx context.Context,