aboutsummaryrefslogtreecommitdiff
path: root/federationapi/routing/send.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationapi/routing/send.go')
-rw-r--r--federationapi/routing/send.go339
1 files changed, 14 insertions, 325 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index a146d85b..67b513c9 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -17,26 +17,20 @@ package routing
import (
"context"
"encoding/json"
- "fmt"
"net/http"
"sync"
"time"
- "github.com/getsentry/sentry-go"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/producers"
- "github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
- syncTypes "github.com/matrix-org/dendrite/syncapi/types"
)
const (
@@ -56,26 +50,6 @@ const (
MetricsWorkMissingPrevEvents = "missing_prev_events"
)
-var (
- pduCountTotal = prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Namespace: "dendrite",
- Subsystem: "federationapi",
- Name: "recv_pdus",
- Help: "Number of incoming PDUs from remote servers with labels for success",
- },
- []string{"status"}, // 'success' or 'total'
- )
- eduCountTotal = prometheus.NewCounter(
- prometheus.CounterOpts{
- Namespace: "dendrite",
- Subsystem: "federationapi",
- Name: "recv_edus",
- Help: "Number of incoming EDUs from remote servers",
- },
- )
-)
-
var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
// Send implements /_matrix/federation/v1/send/{txnID}
@@ -123,18 +97,6 @@ func Send(
defer close(ch)
defer inFlightTxnsPerOrigin.Delete(index)
- t := txnReq{
- rsAPI: rsAPI,
- keys: keys,
- ourServerName: cfg.Matrix.ServerName,
- federation: federation,
- servers: servers,
- keyAPI: keyAPI,
- roomsMu: mu,
- producer: producer,
- inboundPresenceEnabled: cfg.Matrix.Presence.EnableInbound,
- }
-
var txnEvents struct {
PDUs []json.RawMessage `json:"pdus"`
EDUs []gomatrixserverlib.EDU `json:"edus"`
@@ -155,16 +117,23 @@ func Send(
}
}
- // TODO: Really we should have a function to convert FederationRequest to txnReq
- t.PDUs = txnEvents.PDUs
- t.EDUs = txnEvents.EDUs
- t.Origin = request.Origin()
- t.TransactionID = txnID
- t.Destination = cfg.Matrix.ServerName
+ t := internal.NewTxnReq(
+ rsAPI,
+ keyAPI,
+ cfg.Matrix.ServerName,
+ keys,
+ mu,
+ producer,
+ cfg.Matrix.Presence.EnableInbound,
+ txnEvents.PDUs,
+ txnEvents.EDUs,
+ request.Origin(),
+ txnID,
+ cfg.Matrix.ServerName)
util.GetLogger(httpReq.Context()).Debugf("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
- resp, jsonErr := t.processTransaction(httpReq.Context())
+ resp, jsonErr := t.ProcessTransaction(httpReq.Context())
if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr
@@ -181,283 +150,3 @@ func Send(
ch <- res
return res
}
-
-type txnReq struct {
- gomatrixserverlib.Transaction
- rsAPI api.FederationRoomserverAPI
- keyAPI keyapi.FederationKeyAPI
- ourServerName gomatrixserverlib.ServerName
- keys gomatrixserverlib.JSONVerifier
- federation txnFederationClient
- roomsMu *internal.MutexByRoom
- servers federationAPI.ServersInRoomProvider
- producer *producers.SyncAPIProducer
- inboundPresenceEnabled bool
-}
-
-// A subset of FederationClient functionality that txn requires. Useful for testing.
-type txnFederationClient interface {
- LookupState(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
- res gomatrixserverlib.RespState, err error,
- )
- LookupStateIDs(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error)
- GetEvent(ctx context.Context, origin, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
- LookupMissingEvents(ctx context.Context, origin, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents,
- roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
-}
-
-func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- t.processEDUs(ctx)
- }()
-
- results := make(map[string]gomatrixserverlib.PDUResult)
- roomVersions := make(map[string]gomatrixserverlib.RoomVersion)
- getRoomVersion := func(roomID string) gomatrixserverlib.RoomVersion {
- if v, ok := roomVersions[roomID]; ok {
- return v
- }
- verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
- verRes := api.QueryRoomVersionForRoomResponse{}
- if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
- util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to query room version for room", verReq.RoomID)
- return ""
- }
- roomVersions[roomID] = verRes.RoomVersion
- return verRes.RoomVersion
- }
-
- for _, pdu := range t.PDUs {
- pduCountTotal.WithLabelValues("total").Inc()
- var header struct {
- RoomID string `json:"room_id"`
- }
- if err := json.Unmarshal(pdu, &header); err != nil {
- util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to extract room ID from event")
- // We don't know the event ID at this point so we can't return the
- // failure in the PDU results
- continue
- }
- roomVersion := getRoomVersion(header.RoomID)
- event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
- if err != nil {
- if _, ok := err.(gomatrixserverlib.BadJSONError); ok {
- // Room version 6 states that homeservers should strictly enforce canonical JSON
- // on PDUs.
- //
- // This enforces that the entire transaction is rejected if a single bad PDU is
- // sent. It is unclear if this is the correct behaviour or not.
- //
- // See https://github.com/matrix-org/synapse/issues/7543
- return nil, &util.JSONResponse{
- Code: 400,
- JSON: jsonerror.BadJSON("PDU contains bad JSON"),
- }
- }
- util.GetLogger(ctx).WithError(err).Debugf("Transaction: Failed to parse event JSON of event %s", string(pdu))
- continue
- }
- if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
- continue
- }
- if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) {
- results[event.EventID()] = gomatrixserverlib.PDUResult{
- Error: "Forbidden by server ACLs",
- }
- continue
- }
- if err = event.VerifyEventSignatures(ctx, t.keys); err != nil {
- util.GetLogger(ctx).WithError(err).Debugf("Transaction: Couldn't validate signature of event %q", event.EventID())
- results[event.EventID()] = gomatrixserverlib.PDUResult{
- Error: err.Error(),
- }
- continue
- }
-
- // pass the event to the roomserver which will do auth checks
- // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
- // discarded by the caller of this function
- if err = api.SendEvents(
- ctx,
- t.rsAPI,
- api.KindNew,
- []*gomatrixserverlib.HeaderedEvent{
- event.Headered(roomVersion),
- },
- t.Destination,
- t.Origin,
- api.DoNotSendToOtherServers,
- nil,
- true,
- ); err != nil {
- util.GetLogger(ctx).WithError(err).Errorf("Transaction: Couldn't submit event %q to input queue: %s", event.EventID(), err)
- results[event.EventID()] = gomatrixserverlib.PDUResult{
- Error: err.Error(),
- }
- continue
- }
-
- results[event.EventID()] = gomatrixserverlib.PDUResult{}
- pduCountTotal.WithLabelValues("success").Inc()
- }
-
- wg.Wait()
- return &gomatrixserverlib.RespSend{PDUs: results}, nil
-}
-
-// nolint:gocyclo
-func (t *txnReq) processEDUs(ctx context.Context) {
- for _, e := range t.EDUs {
- eduCountTotal.Inc()
- switch e.Type {
- case gomatrixserverlib.MTyping:
- // https://matrix.org/docs/spec/server_server/latest#typing-notifications
- var typingPayload struct {
- RoomID string `json:"room_id"`
- UserID string `json:"user_id"`
- Typing bool `json:"typing"`
- }
- if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
- util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal typing event")
- continue
- }
- if _, serverName, err := gomatrixserverlib.SplitID('@', typingPayload.UserID); err != nil {
- continue
- } else if serverName == t.ourServerName {
- continue
- } else if serverName != t.Origin {
- continue
- }
- if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
- util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream")
- }
- case gomatrixserverlib.MDirectToDevice:
- // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
- var directPayload gomatrixserverlib.ToDeviceMessage
- if err := json.Unmarshal(e.Content, &directPayload); err != nil {
- util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal send-to-device events")
- continue
- }
- if _, serverName, err := gomatrixserverlib.SplitID('@', directPayload.Sender); err != nil {
- continue
- } else if serverName == t.ourServerName {
- continue
- } else if serverName != t.Origin {
- continue
- }
- for userID, byUser := range directPayload.Messages {
- for deviceID, message := range byUser {
- // TODO: check that the user and the device actually exist here
- if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
- sentry.CaptureException(err)
- util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
- "sender": directPayload.Sender,
- "user_id": userID,
- "device_id": deviceID,
- }).Error("Failed to send send-to-device event to JetStream")
- }
- }
- }
- case gomatrixserverlib.MDeviceListUpdate:
- if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
- sentry.CaptureException(err)
- util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
- }
- case gomatrixserverlib.MReceipt:
- // https://matrix.org/docs/spec/server_server/r0.1.4#receipts
- payload := map[string]types.FederationReceiptMRead{}
-
- if err := json.Unmarshal(e.Content, &payload); err != nil {
- util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event")
- continue
- }
-
- for roomID, receipt := range payload {
- for userID, mread := range receipt.User {
- _, domain, err := gomatrixserverlib.SplitID('@', userID)
- if err != nil {
- util.GetLogger(ctx).WithError(err).Debug("Failed to split domain from receipt event sender")
- continue
- }
- if t.Origin != domain {
- util.GetLogger(ctx).Debugf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
- continue
- }
- if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil {
- util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
- "sender": t.Origin,
- "user_id": userID,
- "room_id": roomID,
- "events": mread.EventIDs,
- }).Error("Failed to send receipt event to JetStream")
- continue
- }
- }
- }
- case types.MSigningKeyUpdate:
- if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil {
- sentry.CaptureException(err)
- logrus.WithError(err).Errorf("Failed to process signing key update")
- }
- case gomatrixserverlib.MPresence:
- if t.inboundPresenceEnabled {
- if err := t.processPresence(ctx, e); err != nil {
- logrus.WithError(err).Errorf("Failed to process presence update")
- }
- }
- default:
- util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
- }
- }
-}
-
-// processPresence handles m.receipt events
-func (t *txnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error {
- payload := types.Presence{}
- if err := json.Unmarshal(e.Content, &payload); err != nil {
- return err
- }
- for _, content := range payload.Push {
- if _, serverName, err := gomatrixserverlib.SplitID('@', content.UserID); err != nil {
- continue
- } else if serverName == t.ourServerName {
- continue
- } else if serverName != t.Origin {
- continue
- }
- presence, ok := syncTypes.PresenceFromString(content.Presence)
- if !ok {
- continue
- }
- if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
- return err
- }
- }
- return nil
-}
-
-// processReceiptEvent sends receipt events to JetStream
-func (t *txnReq) processReceiptEvent(ctx context.Context,
- userID, roomID, receiptType string,
- timestamp gomatrixserverlib.Timestamp,
- eventIDs []string,
-) error {
- if _, serverName, err := gomatrixserverlib.SplitID('@', userID); err != nil {
- return nil
- } else if serverName == t.ourServerName {
- return nil
- } else if serverName != t.Origin {
- return nil
- }
- // store every event
- for _, eventID := range eventIDs {
- if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil {
- return fmt.Errorf("unable to set receipt event: %w", err)
- }
- }
-
- return nil
-}