aboutsummaryrefslogtreecommitdiff
path: root/internal/transactionrequest.go
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-01-23 17:55:12 +0000
committerGitHub <noreply@github.com>2023-01-23 17:55:12 +0000
commit5b73592f5a4dddf64184fcbe33f4c1835c656480 (patch)
treeb6dac51b6be7a1e591f24881ee1bfae1b92088e9 /internal/transactionrequest.go
parent48fa869fa3578741d1d5775d30f24f6b097ab995 (diff)
Initial Store & Forward Implementation (#2917)
This adds store & forward relays into dendrite for p2p. A few things have changed: - new relay api serves new http endpoints for s&f federation - updated outbound federation queueing which will attempt to forward using s&f if appropriate - database entries to track s&f relays for other nodes
Diffstat (limited to 'internal/transactionrequest.go')
-rw-r--r--internal/transactionrequest.go356
1 files changed, 356 insertions, 0 deletions
diff --git a/internal/transactionrequest.go b/internal/transactionrequest.go
new file mode 100644
index 00000000..95673fc1
--- /dev/null
+++ b/internal/transactionrequest.go
@@ -0,0 +1,356 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "github.com/getsentry/sentry-go"
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/federationapi/producers"
+ "github.com/matrix-org/dendrite/federationapi/types"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ syncTypes "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+)
+
+var (
+ PDUCountTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "dendrite",
+ Subsystem: "federationapi",
+ Name: "recv_pdus",
+ Help: "Number of incoming PDUs from remote servers with labels for success",
+ },
+ []string{"status"}, // 'success' or 'total'
+ )
+ EDUCountTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "dendrite",
+ Subsystem: "federationapi",
+ Name: "recv_edus",
+ Help: "Number of incoming EDUs from remote servers",
+ },
+ )
+)
+
+type TxnReq struct {
+ gomatrixserverlib.Transaction
+ rsAPI api.FederationRoomserverAPI
+ keyAPI keyapi.FederationKeyAPI
+ ourServerName gomatrixserverlib.ServerName
+ keys gomatrixserverlib.JSONVerifier
+ roomsMu *MutexByRoom
+ producer *producers.SyncAPIProducer
+ inboundPresenceEnabled bool
+}
+
+func NewTxnReq(
+ rsAPI api.FederationRoomserverAPI,
+ keyAPI keyapi.FederationKeyAPI,
+ ourServerName gomatrixserverlib.ServerName,
+ keys gomatrixserverlib.JSONVerifier,
+ roomsMu *MutexByRoom,
+ producer *producers.SyncAPIProducer,
+ inboundPresenceEnabled bool,
+ pdus []json.RawMessage,
+ edus []gomatrixserverlib.EDU,
+ origin gomatrixserverlib.ServerName,
+ transactionID gomatrixserverlib.TransactionID,
+ destination gomatrixserverlib.ServerName,
+) TxnReq {
+ t := TxnReq{
+ rsAPI: rsAPI,
+ keyAPI: keyAPI,
+ ourServerName: ourServerName,
+ keys: keys,
+ roomsMu: roomsMu,
+ producer: producer,
+ inboundPresenceEnabled: inboundPresenceEnabled,
+ }
+
+ t.PDUs = pdus
+ t.EDUs = edus
+ t.Origin = origin
+ t.TransactionID = transactionID
+ t.Destination = destination
+
+ return t
+}
+
+func (t *TxnReq) ProcessTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if t.producer != nil {
+ t.processEDUs(ctx)
+ }
+ }()
+
+ results := make(map[string]gomatrixserverlib.PDUResult)
+ roomVersions := make(map[string]gomatrixserverlib.RoomVersion)
+ getRoomVersion := func(roomID string) gomatrixserverlib.RoomVersion {
+ if v, ok := roomVersions[roomID]; ok {
+ return v
+ }
+ verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
+ verRes := api.QueryRoomVersionForRoomResponse{}
+ if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to query room version for room", verReq.RoomID)
+ return ""
+ }
+ roomVersions[roomID] = verRes.RoomVersion
+ return verRes.RoomVersion
+ }
+
+ for _, pdu := range t.PDUs {
+ PDUCountTotal.WithLabelValues("total").Inc()
+ var header struct {
+ RoomID string `json:"room_id"`
+ }
+ if err := json.Unmarshal(pdu, &header); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to extract room ID from event")
+ // We don't know the event ID at this point so we can't return the
+ // failure in the PDU results
+ continue
+ }
+ roomVersion := getRoomVersion(header.RoomID)
+ event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
+ if err != nil {
+ if _, ok := err.(gomatrixserverlib.BadJSONError); ok {
+ // Room version 6 states that homeservers should strictly enforce canonical JSON
+ // on PDUs.
+ //
+ // This enforces that the entire transaction is rejected if a single bad PDU is
+ // sent. It is unclear if this is the correct behaviour or not.
+ //
+ // See https://github.com/matrix-org/synapse/issues/7543
+ return nil, &util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.BadJSON("PDU contains bad JSON"),
+ }
+ }
+ util.GetLogger(ctx).WithError(err).Debugf("Transaction: Failed to parse event JSON of event %s", string(pdu))
+ continue
+ }
+ if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
+ continue
+ }
+ if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) {
+ results[event.EventID()] = gomatrixserverlib.PDUResult{
+ Error: "Forbidden by server ACLs",
+ }
+ continue
+ }
+ if err = event.VerifyEventSignatures(ctx, t.keys); err != nil {
+ util.GetLogger(ctx).WithError(err).Debugf("Transaction: Couldn't validate signature of event %q", event.EventID())
+ results[event.EventID()] = gomatrixserverlib.PDUResult{
+ Error: err.Error(),
+ }
+ continue
+ }
+
+ // pass the event to the roomserver which will do auth checks
+ // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
+ // discarded by the caller of this function
+ if err = api.SendEvents(
+ ctx,
+ t.rsAPI,
+ api.KindNew,
+ []*gomatrixserverlib.HeaderedEvent{
+ event.Headered(roomVersion),
+ },
+ t.Destination,
+ t.Origin,
+ api.DoNotSendToOtherServers,
+ nil,
+ true,
+ ); err != nil {
+ util.GetLogger(ctx).WithError(err).Errorf("Transaction: Couldn't submit event %q to input queue: %s", event.EventID(), err)
+ results[event.EventID()] = gomatrixserverlib.PDUResult{
+ Error: err.Error(),
+ }
+ continue
+ }
+
+ results[event.EventID()] = gomatrixserverlib.PDUResult{}
+ PDUCountTotal.WithLabelValues("success").Inc()
+ }
+
+ wg.Wait()
+ return &gomatrixserverlib.RespSend{PDUs: results}, nil
+}
+
+// nolint:gocyclo
+func (t *TxnReq) processEDUs(ctx context.Context) {
+ for _, e := range t.EDUs {
+ EDUCountTotal.Inc()
+ switch e.Type {
+ case gomatrixserverlib.MTyping:
+ // https://matrix.org/docs/spec/server_server/latest#typing-notifications
+ var typingPayload struct {
+ RoomID string `json:"room_id"`
+ UserID string `json:"user_id"`
+ Typing bool `json:"typing"`
+ }
+ if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal typing event")
+ continue
+ }
+ if _, serverName, err := gomatrixserverlib.SplitID('@', typingPayload.UserID); err != nil {
+ continue
+ } else if serverName == t.ourServerName {
+ continue
+ } else if serverName != t.Origin {
+ continue
+ }
+ if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream")
+ }
+ case gomatrixserverlib.MDirectToDevice:
+ // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
+ var directPayload gomatrixserverlib.ToDeviceMessage
+ if err := json.Unmarshal(e.Content, &directPayload); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal send-to-device events")
+ continue
+ }
+ if _, serverName, err := gomatrixserverlib.SplitID('@', directPayload.Sender); err != nil {
+ continue
+ } else if serverName == t.ourServerName {
+ continue
+ } else if serverName != t.Origin {
+ continue
+ }
+ for userID, byUser := range directPayload.Messages {
+ for deviceID, message := range byUser {
+ // TODO: check that the user and the device actually exist here
+ if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
+ sentry.CaptureException(err)
+ util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
+ "sender": directPayload.Sender,
+ "user_id": userID,
+ "device_id": deviceID,
+ }).Error("Failed to send send-to-device event to JetStream")
+ }
+ }
+ }
+ case gomatrixserverlib.MDeviceListUpdate:
+ if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
+ sentry.CaptureException(err)
+ util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
+ }
+ case gomatrixserverlib.MReceipt:
+ // https://matrix.org/docs/spec/server_server/r0.1.4#receipts
+ payload := map[string]types.FederationReceiptMRead{}
+
+ if err := json.Unmarshal(e.Content, &payload); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event")
+ continue
+ }
+
+ for roomID, receipt := range payload {
+ for userID, mread := range receipt.User {
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Failed to split domain from receipt event sender")
+ continue
+ }
+ if t.Origin != domain {
+ util.GetLogger(ctx).Debugf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
+ continue
+ }
+ if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil {
+ util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
+ "sender": t.Origin,
+ "user_id": userID,
+ "room_id": roomID,
+ "events": mread.EventIDs,
+ }).Error("Failed to send receipt event to JetStream")
+ continue
+ }
+ }
+ }
+ case types.MSigningKeyUpdate:
+ if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil {
+ sentry.CaptureException(err)
+ logrus.WithError(err).Errorf("Failed to process signing key update")
+ }
+ case gomatrixserverlib.MPresence:
+ if t.inboundPresenceEnabled {
+ if err := t.processPresence(ctx, e); err != nil {
+ logrus.WithError(err).Errorf("Failed to process presence update")
+ }
+ }
+ default:
+ util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
+ }
+ }
+}
+
+// processPresence handles m.receipt events
+func (t *TxnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error {
+ payload := types.Presence{}
+ if err := json.Unmarshal(e.Content, &payload); err != nil {
+ return err
+ }
+ for _, content := range payload.Push {
+ if _, serverName, err := gomatrixserverlib.SplitID('@', content.UserID); err != nil {
+ continue
+ } else if serverName == t.ourServerName {
+ continue
+ } else if serverName != t.Origin {
+ continue
+ }
+ presence, ok := syncTypes.PresenceFromString(content.Presence)
+ if !ok {
+ continue
+ }
+ if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// processReceiptEvent sends receipt events to JetStream
+func (t *TxnReq) processReceiptEvent(ctx context.Context,
+ userID, roomID, receiptType string,
+ timestamp gomatrixserverlib.Timestamp,
+ eventIDs []string,
+) error {
+ if _, serverName, err := gomatrixserverlib.SplitID('@', userID); err != nil {
+ return nil
+ } else if serverName == t.ourServerName {
+ return nil
+ } else if serverName != t.Origin {
+ return nil
+ }
+ // store every event
+ for _, eventID := range eventIDs {
+ if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil {
+ return fmt.Errorf("unable to set receipt event: %w", err)
+ }
+ }
+
+ return nil
+}