aboutsummaryrefslogtreecommitdiff
path: root/internal
diff options
context:
space:
mode:
authordevonh <devon.dmytro@gmail.com>2023-01-23 17:55:12 +0000
committerGitHub <noreply@github.com>2023-01-23 17:55:12 +0000
commit5b73592f5a4dddf64184fcbe33f4c1835c656480 (patch)
treeb6dac51b6be7a1e591f24881ee1bfae1b92088e9 /internal
parent48fa869fa3578741d1d5775d30f24f6b097ab995 (diff)
Initial Store & Forward Implementation (#2917)
This adds store & forward relays into dendrite for p2p. A few things have changed: - new relay api serves new http endpoints for s&f federation - updated outbound federation queueing which will attempt to forward using s&f if appropriate - database entries to track s&f relays for other nodes
Diffstat (limited to 'internal')
-rw-r--r--internal/log.go2
-rw-r--r--internal/log_unix.go4
-rw-r--r--internal/transactionrequest.go356
-rw-r--r--internal/transactionrequest_test.go820
4 files changed, 1180 insertions, 2 deletions
diff --git a/internal/log.go b/internal/log.go
index da6e2041..9e8656c5 100644
--- a/internal/log.go
+++ b/internal/log.go
@@ -101,6 +101,8 @@ func SetupPprof() {
// SetupStdLogging configures the logging format to standard output. Typically, it is called when the config is not yet loaded.
func SetupStdLogging() {
+ levelLogAddedMu.Lock()
+ defer levelLogAddedMu.Unlock()
logrus.SetReportCaller(true)
logrus.SetFormatter(&utcFormatter{
&logrus.TextFormatter{
diff --git a/internal/log_unix.go b/internal/log_unix.go
index 8f34c320..85942704 100644
--- a/internal/log_unix.go
+++ b/internal/log_unix.go
@@ -32,6 +32,8 @@ import (
// If something fails here it means that the logging was improperly configured,
// so we just exit with the error
func SetupHookLogging(hooks []config.LogrusHook, componentName string) {
+ levelLogAddedMu.Lock()
+ defer levelLogAddedMu.Unlock()
for _, hook := range hooks {
// Check we received a proper logging level
level, err := logrus.ParseLevel(hook.Level)
@@ -85,8 +87,6 @@ func checkSyslogHookParams(params map[string]interface{}) {
}
func setupStdLogHook(level logrus.Level) {
- levelLogAddedMu.Lock()
- defer levelLogAddedMu.Unlock()
if stdLevelLogAdded[level] {
return
}
diff --git a/internal/transactionrequest.go b/internal/transactionrequest.go
new file mode 100644
index 00000000..95673fc1
--- /dev/null
+++ b/internal/transactionrequest.go
@@ -0,0 +1,356 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "github.com/getsentry/sentry-go"
+ "github.com/matrix-org/dendrite/clientapi/jsonerror"
+ "github.com/matrix-org/dendrite/federationapi/producers"
+ "github.com/matrix-org/dendrite/federationapi/types"
+ keyapi "github.com/matrix-org/dendrite/keyserver/api"
+ "github.com/matrix-org/dendrite/roomserver/api"
+ syncTypes "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
+)
+
+var (
+ PDUCountTotal = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "dendrite",
+ Subsystem: "federationapi",
+ Name: "recv_pdus",
+ Help: "Number of incoming PDUs from remote servers with labels for success",
+ },
+ []string{"status"}, // 'success' or 'total'
+ )
+ EDUCountTotal = prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: "dendrite",
+ Subsystem: "federationapi",
+ Name: "recv_edus",
+ Help: "Number of incoming EDUs from remote servers",
+ },
+ )
+)
+
+type TxnReq struct {
+ gomatrixserverlib.Transaction
+ rsAPI api.FederationRoomserverAPI
+ keyAPI keyapi.FederationKeyAPI
+ ourServerName gomatrixserverlib.ServerName
+ keys gomatrixserverlib.JSONVerifier
+ roomsMu *MutexByRoom
+ producer *producers.SyncAPIProducer
+ inboundPresenceEnabled bool
+}
+
+func NewTxnReq(
+ rsAPI api.FederationRoomserverAPI,
+ keyAPI keyapi.FederationKeyAPI,
+ ourServerName gomatrixserverlib.ServerName,
+ keys gomatrixserverlib.JSONVerifier,
+ roomsMu *MutexByRoom,
+ producer *producers.SyncAPIProducer,
+ inboundPresenceEnabled bool,
+ pdus []json.RawMessage,
+ edus []gomatrixserverlib.EDU,
+ origin gomatrixserverlib.ServerName,
+ transactionID gomatrixserverlib.TransactionID,
+ destination gomatrixserverlib.ServerName,
+) TxnReq {
+ t := TxnReq{
+ rsAPI: rsAPI,
+ keyAPI: keyAPI,
+ ourServerName: ourServerName,
+ keys: keys,
+ roomsMu: roomsMu,
+ producer: producer,
+ inboundPresenceEnabled: inboundPresenceEnabled,
+ }
+
+ t.PDUs = pdus
+ t.EDUs = edus
+ t.Origin = origin
+ t.TransactionID = transactionID
+ t.Destination = destination
+
+ return t
+}
+
+func (t *TxnReq) ProcessTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ if t.producer != nil {
+ t.processEDUs(ctx)
+ }
+ }()
+
+ results := make(map[string]gomatrixserverlib.PDUResult)
+ roomVersions := make(map[string]gomatrixserverlib.RoomVersion)
+ getRoomVersion := func(roomID string) gomatrixserverlib.RoomVersion {
+ if v, ok := roomVersions[roomID]; ok {
+ return v
+ }
+ verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
+ verRes := api.QueryRoomVersionForRoomResponse{}
+ if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to query room version for room", verReq.RoomID)
+ return ""
+ }
+ roomVersions[roomID] = verRes.RoomVersion
+ return verRes.RoomVersion
+ }
+
+ for _, pdu := range t.PDUs {
+ PDUCountTotal.WithLabelValues("total").Inc()
+ var header struct {
+ RoomID string `json:"room_id"`
+ }
+ if err := json.Unmarshal(pdu, &header); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Transaction: Failed to extract room ID from event")
+ // We don't know the event ID at this point so we can't return the
+ // failure in the PDU results
+ continue
+ }
+ roomVersion := getRoomVersion(header.RoomID)
+ event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
+ if err != nil {
+ if _, ok := err.(gomatrixserverlib.BadJSONError); ok {
+ // Room version 6 states that homeservers should strictly enforce canonical JSON
+ // on PDUs.
+ //
+ // This enforces that the entire transaction is rejected if a single bad PDU is
+ // sent. It is unclear if this is the correct behaviour or not.
+ //
+ // See https://github.com/matrix-org/synapse/issues/7543
+ return nil, &util.JSONResponse{
+ Code: 400,
+ JSON: jsonerror.BadJSON("PDU contains bad JSON"),
+ }
+ }
+ util.GetLogger(ctx).WithError(err).Debugf("Transaction: Failed to parse event JSON of event %s", string(pdu))
+ continue
+ }
+ if event.Type() == gomatrixserverlib.MRoomCreate && event.StateKeyEquals("") {
+ continue
+ }
+ if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) {
+ results[event.EventID()] = gomatrixserverlib.PDUResult{
+ Error: "Forbidden by server ACLs",
+ }
+ continue
+ }
+ if err = event.VerifyEventSignatures(ctx, t.keys); err != nil {
+ util.GetLogger(ctx).WithError(err).Debugf("Transaction: Couldn't validate signature of event %q", event.EventID())
+ results[event.EventID()] = gomatrixserverlib.PDUResult{
+ Error: err.Error(),
+ }
+ continue
+ }
+
+ // pass the event to the roomserver which will do auth checks
+ // If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
+ // discarded by the caller of this function
+ if err = api.SendEvents(
+ ctx,
+ t.rsAPI,
+ api.KindNew,
+ []*gomatrixserverlib.HeaderedEvent{
+ event.Headered(roomVersion),
+ },
+ t.Destination,
+ t.Origin,
+ api.DoNotSendToOtherServers,
+ nil,
+ true,
+ ); err != nil {
+ util.GetLogger(ctx).WithError(err).Errorf("Transaction: Couldn't submit event %q to input queue: %s", event.EventID(), err)
+ results[event.EventID()] = gomatrixserverlib.PDUResult{
+ Error: err.Error(),
+ }
+ continue
+ }
+
+ results[event.EventID()] = gomatrixserverlib.PDUResult{}
+ PDUCountTotal.WithLabelValues("success").Inc()
+ }
+
+ wg.Wait()
+ return &gomatrixserverlib.RespSend{PDUs: results}, nil
+}
+
+// nolint:gocyclo
+func (t *TxnReq) processEDUs(ctx context.Context) {
+ for _, e := range t.EDUs {
+ EDUCountTotal.Inc()
+ switch e.Type {
+ case gomatrixserverlib.MTyping:
+ // https://matrix.org/docs/spec/server_server/latest#typing-notifications
+ var typingPayload struct {
+ RoomID string `json:"room_id"`
+ UserID string `json:"user_id"`
+ Typing bool `json:"typing"`
+ }
+ if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal typing event")
+ continue
+ }
+ if _, serverName, err := gomatrixserverlib.SplitID('@', typingPayload.UserID); err != nil {
+ continue
+ } else if serverName == t.ourServerName {
+ continue
+ } else if serverName != t.Origin {
+ continue
+ }
+ if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream")
+ }
+ case gomatrixserverlib.MDirectToDevice:
+ // https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
+ var directPayload gomatrixserverlib.ToDeviceMessage
+ if err := json.Unmarshal(e.Content, &directPayload); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal send-to-device events")
+ continue
+ }
+ if _, serverName, err := gomatrixserverlib.SplitID('@', directPayload.Sender); err != nil {
+ continue
+ } else if serverName == t.ourServerName {
+ continue
+ } else if serverName != t.Origin {
+ continue
+ }
+ for userID, byUser := range directPayload.Messages {
+ for deviceID, message := range byUser {
+ // TODO: check that the user and the device actually exist here
+ if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
+ sentry.CaptureException(err)
+ util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
+ "sender": directPayload.Sender,
+ "user_id": userID,
+ "device_id": deviceID,
+ }).Error("Failed to send send-to-device event to JetStream")
+ }
+ }
+ }
+ case gomatrixserverlib.MDeviceListUpdate:
+ if err := t.producer.SendDeviceListUpdate(ctx, e.Content, t.Origin); err != nil {
+ sentry.CaptureException(err)
+ util.GetLogger(ctx).WithError(err).Error("failed to InputDeviceListUpdate")
+ }
+ case gomatrixserverlib.MReceipt:
+ // https://matrix.org/docs/spec/server_server/r0.1.4#receipts
+ payload := map[string]types.FederationReceiptMRead{}
+
+ if err := json.Unmarshal(e.Content, &payload); err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event")
+ continue
+ }
+
+ for roomID, receipt := range payload {
+ for userID, mread := range receipt.User {
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ util.GetLogger(ctx).WithError(err).Debug("Failed to split domain from receipt event sender")
+ continue
+ }
+ if t.Origin != domain {
+ util.GetLogger(ctx).Debugf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
+ continue
+ }
+ if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil {
+ util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
+ "sender": t.Origin,
+ "user_id": userID,
+ "room_id": roomID,
+ "events": mread.EventIDs,
+ }).Error("Failed to send receipt event to JetStream")
+ continue
+ }
+ }
+ }
+ case types.MSigningKeyUpdate:
+ if err := t.producer.SendSigningKeyUpdate(ctx, e.Content, t.Origin); err != nil {
+ sentry.CaptureException(err)
+ logrus.WithError(err).Errorf("Failed to process signing key update")
+ }
+ case gomatrixserverlib.MPresence:
+ if t.inboundPresenceEnabled {
+ if err := t.processPresence(ctx, e); err != nil {
+ logrus.WithError(err).Errorf("Failed to process presence update")
+ }
+ }
+ default:
+ util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
+ }
+ }
+}
+
+// processPresence handles m.receipt events
+func (t *TxnReq) processPresence(ctx context.Context, e gomatrixserverlib.EDU) error {
+ payload := types.Presence{}
+ if err := json.Unmarshal(e.Content, &payload); err != nil {
+ return err
+ }
+ for _, content := range payload.Push {
+ if _, serverName, err := gomatrixserverlib.SplitID('@', content.UserID); err != nil {
+ continue
+ } else if serverName == t.ourServerName {
+ continue
+ } else if serverName != t.Origin {
+ continue
+ }
+ presence, ok := syncTypes.PresenceFromString(content.Presence)
+ if !ok {
+ continue
+ }
+ if err := t.producer.SendPresence(ctx, content.UserID, presence, content.StatusMsg, content.LastActiveAgo); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// processReceiptEvent sends receipt events to JetStream
+func (t *TxnReq) processReceiptEvent(ctx context.Context,
+ userID, roomID, receiptType string,
+ timestamp gomatrixserverlib.Timestamp,
+ eventIDs []string,
+) error {
+ if _, serverName, err := gomatrixserverlib.SplitID('@', userID); err != nil {
+ return nil
+ } else if serverName == t.ourServerName {
+ return nil
+ } else if serverName != t.Origin {
+ return nil
+ }
+ // store every event
+ for _, eventID := range eventIDs {
+ if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil {
+ return fmt.Errorf("unable to set receipt event: %w", err)
+ }
+ }
+
+ return nil
+}
diff --git a/internal/transactionrequest_test.go b/internal/transactionrequest_test.go
new file mode 100644
index 00000000..dd1bd350
--- /dev/null
+++ b/internal/transactionrequest_test.go
@@ -0,0 +1,820 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/matrix-org/dendrite/federationapi/producers"
+ keyAPI "github.com/matrix-org/dendrite/keyserver/api"
+ rsAPI "github.com/matrix-org/dendrite/roomserver/api"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/dendrite/test"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ "github.com/stretchr/testify/assert"
+ "go.uber.org/atomic"
+ "gotest.tools/v3/poll"
+)
+
+const (
+ testOrigin = gomatrixserverlib.ServerName("kaer.morhen")
+ testDestination = gomatrixserverlib.ServerName("white.orchard")
+)
+
+var (
+ invalidSignatures = json.RawMessage(`{"auth_events":["$x4MKEPRSF6OGlo0qpnsP3BfSmYX5HhVlykOsQH3ECyg","$BcEcbZnlFLB5rxSNSZNBn6fO3jU/TKAJ79wfKyCQLiU"],"content":{"body":"Test Message"},"depth":3917,"hashes":{"sha256":"cNAWtlHIegrji0mMA6x1rhpYCccY8W1NsWZqSpJFhjs"},"origin":"localhost","origin_server_ts":0,"prev_events":["$4GDB0bVjkWwS3G4noUZCq5oLWzpBYpwzdMcf7gj24CI"],"room_id":"!roomid:localishhost","sender":"@userid:localhost","signatures":{"localhost":{"ed2559:auto":"NKym6Kcy3u9mGUr21Hjfe3h7DfDilDhN5PqztT0QZ4NTZ+8Y7owseLolQVXp+TvNjecvzdDywsXXVvGiaQiWAQ"}},"type":"m.room.member"}`)
+ testData = []json.RawMessage{
+ []byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$0ok8ynDp7kjc95e3:kaer.morhen","hashes":{"sha256":"17kPoH+h0Dk4Omn7Sus0qMb6+oGcf+CZFEgDhv7UKWs"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"jP4a04f5/F10Pw95FPpdCyKAO44JOwUQ/MZOOeA/RTU1Dn+AHPMzGSaZnuGjRr/xQuADt+I3ctb5ZQfLKNzHDw"}},"state_key":"","type":"m.room.create"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"content":{"membership":"join"},"depth":1,"event_id":"$LEwEu0kxrtu5fOiS:kaer.morhen","hashes":{"sha256":"B7M88PhXf3vd1LaFtjQutFu4x/w7fHD28XKZ4sAsJTo"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"p2vqmuJn7ZBRImctSaKbXCAxCcBlIjPH9JHte1ouIUGy84gpu4eLipOvSBCLL26hXfC0Zrm4WUto6Hr+ohdrCg"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"join_rule":"public"},"depth":2,"event_id":"$SMHlqUrNhhBBRLeN:kaer.morhen","hashes":{"sha256":"vIuJQvmMjrGxshAkj1SXe0C4RqvMbv4ZADDw9pFCWqQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"hBMsb3Qppo3RaqqAl4JyTgaiWEbW5hlckATky6PrHun+F3YM203TzG7w9clwuQU5F5pZoB1a6nw+to0hN90FAw"}},"state_key":"","type":"m.room.join_rules"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"history_visibility":"shared"},"depth":3,"event_id":"$6F1yGIbO0J7TM93h:kaer.morhen","hashes":{"sha256":"Mr23GKSlZW7UCCYLgOWawI2Sg6KIoMjUWO2TDenuOgw"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$SMHlqUrNhhBBRLeN:kaer.morhen",{"sha256":"SylzE8U02I+6eyEHgL+FlU0L5YdqrVp8OOlxKS9VQW0"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sHLKrFI3hKGrEJfpMVZSDS3LvLasQsy50CTsOwru9XTVxgRsPo6wozNtRVjxo1J3Rk18RC9JppovmQ5VR5EcDw"}},"state_key":"","type":"m.room.history_visibility"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"ban":50,"events":null,"events_default":0,"invite":0,"kick":50,"redact":50,"state_default":50,"users":null,"users_default":0},"depth":4,"event_id":"$UKNe10XzYzG0TeA9:kaer.morhen","hashes":{"sha256":"ngbP3yja9U5dlckKerUs/fSOhtKxZMCVvsfhPURSS28"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6F1yGIbO0J7TM93h:kaer.morhen",{"sha256":"A4CucrKSoWX4IaJXhq02mBg1sxIyZEftbC+5p3fZAvk"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zOmwlP01QL3yFchzuR9WHvogOoBZA3oVtNIF3lM0ZfDnqlSYZB9sns27G/4HVq0k7alaK7ZE3oGoCrVnMkPNCw"}},"state_key":"","type":"m.room.power_levels"}`),
+ // messages
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":6,"event_id":"$MYSbs8m4rEbsCWXD:kaer.morhen","hashes":{"sha256":"kgbYM7v4Ud2YaBsjBTolM4ySg6rHcJNYI6nWhMSdFUA"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$gl2T9l3qm0kUbiIJ:kaer.morhen",{"sha256":"C/rD04h9wGxRdN2G/IBfrgoE1UovzLZ+uskwaKZ37/Q"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"x0UoKh968jj/F5l1/R7Ew0T6CTKuew3PLNHASNxqck/bkNe8yYQiDHXRr+kZxObeqPZZTpaF1+EI+bLU9W8GDQ"}},"type":"m.room.message"}`),
+ []byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":7,"event_id":"$N5x9WJkl9ClPrAEg:kaer.morhen","hashes":{"sha256":"FWM8oz4yquTunRZ67qlW2gzPDzdWfBP6RPHXhK1I/x8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$MYSbs8m4rEbsCWXD:kaer.morhen",{"sha256":"fatqgW+SE8mb2wFn3UN+drmluoD4UJ/EcSrL6Ur9q1M"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"Y+LX/xcyufoXMOIoqQBNOzy6lZfUGB1ffgXIrSugk6obMiyAsiRejHQN/pciZXsHKxMJLYRFAz4zSJoS/LGPAA"}},"type":"m.room.message"}`),
+ }
+ testEvent = []byte(`{"auth_events":["$x4MKEPRSF6OGlo0qpnsP3BfSmYX5HhVlykOsQH3ECyg","$BcEcbZnlFLB5rxSNSZNBn6fO3jU/TKAJ79wfKyCQLiU"],"content":{"body":"Test Message"},"depth":3917,"hashes":{"sha256":"cNAWtlHIegrji0mMA6x1rhpYCccY8W1NsWZqSpJFhjs"},"origin":"localhost","origin_server_ts":0,"prev_events":["$4GDB0bVjkWwS3G4noUZCq5oLWzpBYpwzdMcf7gj24CI"],"room_id":"!roomid:localhost","sender":"@userid:localhost","signatures":{"localhost":{"ed25519:auto":"NKym6Kcy3u9mGUr21Hjfe3h7DfDilDhN5PqztT0QZ4NTZ+8Y7owseLolQVXp+TvNjecvzdDywsXXVvGiuQiWAQ"}},"type":"m.room.message"}`)
+ testRoomVersion = gomatrixserverlib.RoomVersionV1
+ testEvents = []*gomatrixserverlib.HeaderedEvent{}
+ testStateEvents = make(map[gomatrixserverlib.StateKeyTuple]*gomatrixserverlib.HeaderedEvent)
+)
+
+type FakeRsAPI struct {
+ rsAPI.RoomserverInternalAPI
+ shouldFailQuery bool
+ bannedFromRoom bool
+ shouldEventsFail bool
+}
+
+func (r *FakeRsAPI) QueryRoomVersionForRoom(
+ ctx context.Context,
+ req *rsAPI.QueryRoomVersionForRoomRequest,
+ res *rsAPI.QueryRoomVersionForRoomResponse,
+) error {
+ if r.shouldFailQuery {
+ return fmt.Errorf("Failure")
+ }
+ res.RoomVersion = gomatrixserverlib.RoomVersionV10
+ return nil
+}
+
+func (r *FakeRsAPI) QueryServerBannedFromRoom(
+ ctx context.Context,
+ req *rsAPI.QueryServerBannedFromRoomRequest,
+ res *rsAPI.QueryServerBannedFromRoomResponse,
+) error {
+ if r.bannedFromRoom {
+ res.Banned = true
+ } else {
+ res.Banned = false
+ }
+ return nil
+}
+
+func (r *FakeRsAPI) InputRoomEvents(
+ ctx context.Context,
+ req *rsAPI.InputRoomEventsRequest,
+ res *rsAPI.InputRoomEventsResponse,
+) error {
+ if r.shouldEventsFail {
+ return fmt.Errorf("Failure")
+ }
+ return nil
+}
+
+func TestEmptyTransactionRequest(t *testing.T) {
+ txn := NewTxnReq(&FakeRsAPI{}, nil, "ourserver", nil, nil, nil, false, []json.RawMessage{}, []gomatrixserverlib.EDU{}, "", "", "")
+ txnRes, jsonRes := txn.ProcessTransaction(context.Background())
+
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+}
+
+func TestProcessTransactionRequestPDU(t *testing.T) {
+ keyRing := &test.NopJSONVerifier{}
+ txn := NewTxnReq(&FakeRsAPI{}, nil, "ourserver", keyRing, nil, nil, false, []json.RawMessage{testEvent}, []gomatrixserverlib.EDU{}, "", "", "")
+ txnRes, jsonRes := txn.ProcessTransaction(context.Background())
+
+ assert.Nil(t, jsonRes)
+ assert.Equal(t, 1, len(txnRes.PDUs))
+ for _, result := range txnRes.PDUs {
+ assert.Empty(t, result.Error)
+ }
+}
+
+func TestProcessTransactionRequestPDUs(t *testing.T) {
+ keyRing := &test.NopJSONVerifier{}
+ txn := NewTxnReq(&FakeRsAPI{}, nil, "ourserver", keyRing, nil, nil, false, append(testData, testEvent), []gomatrixserverlib.EDU{}, "", "", "")
+ txnRes, jsonRes := txn.ProcessTransaction(context.Background())
+
+ assert.Nil(t, jsonRes)
+ assert.Equal(t, 1, len(txnRes.PDUs))
+ for _, result := range txnRes.PDUs {
+ assert.Empty(t, result.Error)
+ }
+}
+
+func TestProcessTransactionRequestBadPDU(t *testing.T) {
+ pdu := json.RawMessage("{\"room_id\":\"asdf\"}")
+ pdu2 := json.RawMessage("\"roomid\":\"asdf\"")
+ keyRing := &test.NopJSONVerifier{}
+ txn := NewTxnReq(&FakeRsAPI{}, nil, "ourserver", keyRing, nil, nil, false, []json.RawMessage{pdu, pdu2, testEvent}, []gomatrixserverlib.EDU{}, "", "", "")
+ txnRes, jsonRes := txn.ProcessTransaction(context.Background())
+
+ assert.Nil(t, jsonRes)
+ assert.Equal(t, 1, len(txnRes.PDUs))
+ for _, result := range txnRes.PDUs {
+ assert.Empty(t, result.Error)
+ }
+}
+
+func TestProcessTransactionRequestPDUQueryFailure(t *testing.T) {
+ keyRing := &test.NopJSONVerifier{}
+ txn := NewTxnReq(&FakeRsAPI{shouldFailQuery: true}, nil, "ourserver", keyRing, nil, nil, false, []json.RawMessage{testEvent}, []gomatrixserverlib.EDU{}, "", "", "")
+ txnRes, jsonRes := txn.ProcessTransaction(context.Background())
+
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+}
+
+func TestProcessTransactionRequestPDUBannedFromRoom(t *testing.T) {
+ keyRing := &test.NopJSONVerifier{}
+ txn := NewTxnReq(&FakeRsAPI{bannedFromRoom: true}, nil, "ourserver", keyRing, nil, nil, false, []json.RawMessage{testEvent}, []gomatrixserverlib.EDU{}, "", "", "")
+ txnRes, jsonRes := txn.ProcessTransaction(context.Background())
+
+ assert.Nil(t, jsonRes)
+ assert.Equal(t, 1, len(txnRes.PDUs))
+ for _, result := range txnRes.PDUs {
+ assert.NotEmpty(t, result.Error)
+ }
+}
+
+func TestProcessTransactionRequestPDUInvalidSignature(t *testing.T) {
+ keyRing := &test.NopJSONVerifier{}
+ txn := NewTxnReq(&FakeRsAPI{}, nil, "ourserver", keyRing, nil, nil, false, []json.RawMessage{invalidSignatures}, []gomatrixserverlib.EDU{}, "", "", "")
+ txnRes, jsonRes := txn.ProcessTransaction(context.Background())
+
+ assert.Nil(t, jsonRes)
+ assert.Equal(t, 1, len(txnRes.PDUs))
+ for _, result := range txnRes.PDUs {
+ assert.NotEmpty(t, result.Error)
+ }
+}
+
+func TestProcessTransactionRequestPDUSendFail(t *testing.T) {
+ keyRing := &test.NopJSONVerifier{}
+ txn := NewTxnReq(&FakeRsAPI{shouldEventsFail: true}, nil, "ourserver", keyRing, nil, nil, false, []json.RawMessage{testEvent}, []gomatrixserverlib.EDU{}, "", "", "")
+ txnRes, jsonRes := txn.ProcessTransaction(context.Background())
+
+ assert.Nil(t, jsonRes)
+ assert.Equal(t, 1, len(txnRes.PDUs))
+ for _, result := range txnRes.PDUs {
+ assert.NotEmpty(t, result.Error)
+ }
+}
+
+func createTransactionWithEDU(ctx *process.ProcessContext, edus []gomatrixserverlib.EDU) (TxnReq, nats.JetStreamContext, *config.Dendrite) {
+ cfg := &config.Dendrite{}
+ cfg.Defaults(config.DefaultOpts{
+ Generate: true,
+ Monolithic: true,
+ })
+ cfg.Global.JetStream.InMemory = true
+ natsInstance := &jetstream.NATSInstance{}
+ js, _ := natsInstance.Prepare(ctx, &cfg.Global.JetStream)
+ producer := &producers.SyncAPIProducer{
+ JetStream: js,
+ TopicReceiptEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ TopicSendToDeviceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ TopicTypingEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ TopicPresenceEvent: cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ TopicDeviceListUpdate: cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
+ TopicSigningKeyUpdate: cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
+ Config: &cfg.FederationAPI,
+ UserAPI: nil,
+ }
+ keyRing := &test.NopJSONVerifier{}
+ txn := NewTxnReq(&FakeRsAPI{}, nil, "ourserver", keyRing, nil, producer, true, []json.RawMessage{}, edus, "kaer.morhen", "", "ourserver")
+ return txn, js, cfg
+}
+
+func TestProcessTransactionRequestEDUTyping(t *testing.T) {
+ var err error
+ roomID := "!roomid:kaer.morhen"
+ userID := "@userid:kaer.morhen"
+ typing := true
+ edu := gomatrixserverlib.EDU{Type: "m.typing"}
+ if edu.Content, err = json.Marshal(map[string]interface{}{
+ "room_id": roomID,
+ "user_id": userID,
+ "typing": typing,
+ }); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+ badEDU := gomatrixserverlib.EDU{Type: "m.typing"}
+ badEDU.Content = gomatrixserverlib.RawJSON("badjson")
+ edus := []gomatrixserverlib.EDU{badEDU, edu}
+
+ ctx := process.NewProcessContext()
+ defer ctx.ShutdownDendrite()
+ txn, js, cfg := createTransactionWithEDU(ctx, edus)
+ received := atomic.NewBool(false)
+ onMessage := func(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+ room := msg.Header.Get(jetstream.RoomID)
+ assert.Equal(t, roomID, room)
+ user := msg.Header.Get(jetstream.UserID)
+ assert.Equal(t, userID, user)
+ typ, parseErr := strconv.ParseBool(msg.Header.Get("typing"))
+ if parseErr != nil {
+ return true
+ }
+ assert.Equal(t, typing, typ)
+
+ received.Store(true)
+ return true
+ }
+ err = jetstream.JetStreamConsumer(
+ ctx.Context(), js, cfg.Global.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ cfg.Global.JetStream.Durable("TestTypingConsumer"), 1,
+ onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+ assert.Nil(t, err)
+
+ txnRes, jsonRes := txn.ProcessTransaction(ctx.Context())
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+
+ check := func(log poll.LogT) poll.Result {
+ if received.Load() {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for events to be processed")
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(2*time.Second), poll.WithDelay(10*time.Millisecond))
+}
+
+func TestProcessTransactionRequestEDUToDevice(t *testing.T) {
+ var err error
+ sender := "@userid:kaer.morhen"
+ messageID := "$x4MKEPRSF6OGlo0qpnsP3BfSmYX5HhVlykOsQH3ECyg"
+ msgType := "m.dendrite.test"
+ edu := gomatrixserverlib.EDU{Type: "m.direct_to_device"}
+ if edu.Content, err = json.Marshal(map[string]interface{}{
+ "sender": sender,
+ "type": msgType,
+ "message_id": messageID,
+ "messages": map[string]interface{}{
+ "@alice:example.org": map[string]interface{}{
+ "IWHQUZUIAH": map[string]interface{}{
+ "algorithm": "m.megolm.v1.aes-sha2",
+ "room_id": "!Cuyf34gef24t:localhost",
+ "session_id": "X3lUlvLELLYxeTx4yOVu6UDpasGEVO0Jbu+QFnm0cKQ",
+ "session_key": "AgAAAADxKHa9uFxcXzwYoNueL5Xqi69IkD4sni8LlfJL7qNBEY...",
+ },
+ },
+ },
+ }); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+ badEDU := gomatrixserverlib.EDU{Type: "m.direct_to_device"}
+ badEDU.Content = gomatrixserverlib.RawJSON("badjson")
+ edus := []gomatrixserverlib.EDU{badEDU, edu}
+
+ ctx := process.NewProcessContext()
+ defer ctx.ShutdownDendrite()
+ txn, js, cfg := createTransactionWithEDU(ctx, edus)
+ received := atomic.NewBool(false)
+ onMessage := func(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+
+ var output types.OutputSendToDeviceEvent
+ if err = json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ println(err.Error())
+ return true
+ }
+ assert.Equal(t, sender, output.Sender)
+ assert.Equal(t, msgType, output.Type)
+
+ received.Store(true)
+ return true
+ }
+ err = jetstream.JetStreamConsumer(
+ ctx.Context(), js, cfg.Global.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ cfg.Global.JetStream.Durable("TestToDevice"), 1,
+ onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+ assert.Nil(t, err)
+
+ txnRes, jsonRes := txn.ProcessTransaction(ctx.Context())
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+
+ check := func(log poll.LogT) poll.Result {
+ if received.Load() {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for events to be processed")
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(2*time.Second), poll.WithDelay(10*time.Millisecond))
+}
+
+func TestProcessTransactionRequestEDUDeviceListUpdate(t *testing.T) {
+ var err error
+ deviceID := "QBUAZIFURK"
+ userID := "@john:example.com"
+ edu := gomatrixserverlib.EDU{Type: "m.device_list_update"}
+ if edu.Content, err = json.Marshal(map[string]interface{}{
+ "device_display_name": "Mobile",
+ "device_id": deviceID,
+ "key": "value",
+ "keys": map[string]interface{}{
+ "algorithms": []string{
+ "m.olm.v1.curve25519-aes-sha2",
+ "m.megolm.v1.aes-sha2",
+ },
+ "device_id": "JLAFKJWSCS",
+ "keys": map[string]interface{}{
+ "curve25519:JLAFKJWSCS": "3C5BFWi2Y8MaVvjM8M22DBmh24PmgR0nPvJOIArzgyI",
+ "ed25519:JLAFKJWSCS": "lEuiRJBit0IG6nUf5pUzWTUEsRVVe/HJkoKuEww9ULI",
+ },
+ "signatures": map[string]interface{}{
+ "@alice:example.com": map[string]interface{}{
+ "ed25519:JLAFKJWSCS": "dSO80A01XiigH3uBiDVx/EjzaoycHcjq9lfQX0uWsqxl2giMIiSPR8a4d291W1ihKJL/a+myXS367WT6NAIcBA",
+ },
+ },
+ "user_id": "@alice:example.com",
+ },
+ "prev_id": []int{
+ 5,
+ },
+ "stream_id": 6,
+ "user_id": userID,
+ }); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+ badEDU := gomatrixserverlib.EDU{Type: "m.device_list_update"}
+ badEDU.Content = gomatrixserverlib.RawJSON("badjson")
+ edus := []gomatrixserverlib.EDU{badEDU, edu}
+
+ ctx := process.NewProcessContext()
+ defer ctx.ShutdownDendrite()
+ txn, js, cfg := createTransactionWithEDU(ctx, edus)
+ received := atomic.NewBool(false)
+ onMessage := func(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+
+ var output gomatrixserverlib.DeviceListUpdateEvent
+ if err = json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ println(err.Error())
+ return true
+ }
+ assert.Equal(t, userID, output.UserID)
+ assert.Equal(t, deviceID, output.DeviceID)
+
+ received.Store(true)
+ return true
+ }
+ err = jetstream.JetStreamConsumer(
+ ctx.Context(), js, cfg.Global.JetStream.Prefixed(jetstream.InputDeviceListUpdate),
+ cfg.Global.JetStream.Durable("TestDeviceListUpdate"), 1,
+ onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+ assert.Nil(t, err)
+
+ txnRes, jsonRes := txn.ProcessTransaction(ctx.Context())
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+
+ check := func(log poll.LogT) poll.Result {
+ if received.Load() {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for events to be processed")
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(2*time.Second), poll.WithDelay(10*time.Millisecond))
+}
+
+func TestProcessTransactionRequestEDUReceipt(t *testing.T) {
+ var err error
+ roomID := "!some_room:example.org"
+ edu := gomatrixserverlib.EDU{Type: "m.receipt"}
+ if edu.Content, err = json.Marshal(map[string]interface{}{
+ roomID: map[string]interface{}{
+ "m.read": map[string]interface{}{
+ "@john:kaer.morhen": map[string]interface{}{
+ "data": map[string]interface{}{
+ "ts": 1533358089009,
+ },
+ "event_ids": []string{
+ "$read_this_event:matrix.org",
+ },
+ },
+ },
+ },
+ }); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+ badEDU := gomatrixserverlib.EDU{Type: "m.receipt"}
+ badEDU.Content = gomatrixserverlib.RawJSON("badjson")
+ badUser := gomatrixserverlib.EDU{Type: "m.receipt"}
+ if badUser.Content, err = json.Marshal(map[string]interface{}{
+ roomID: map[string]interface{}{
+ "m.read": map[string]interface{}{
+ "johnkaer.morhen": map[string]interface{}{
+ "data": map[string]interface{}{
+ "ts": 1533358089009,
+ },
+ "event_ids": []string{
+ "$read_this_event:matrix.org",
+ },
+ },
+ },
+ },
+ }); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+ badDomain := gomatrixserverlib.EDU{Type: "m.receipt"}
+ if badDomain.Content, err = json.Marshal(map[string]interface{}{
+ roomID: map[string]interface{}{
+ "m.read": map[string]interface{}{
+ "@john:bad.domain": map[string]interface{}{
+ "data": map[string]interface{}{
+ "ts": 1533358089009,
+ },
+ "event_ids": []string{
+ "$read_this_event:matrix.org",
+ },
+ },
+ },
+ },
+ }); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+ edus := []gomatrixserverlib.EDU{badEDU, badUser, edu}
+
+ ctx := process.NewProcessContext()
+ defer ctx.ShutdownDendrite()
+ txn, js, cfg := createTransactionWithEDU(ctx, edus)
+ received := atomic.NewBool(false)
+ onMessage := func(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+
+ var output types.OutputReceiptEvent
+ output.RoomID = msg.Header.Get(jetstream.RoomID)
+ assert.Equal(t, roomID, output.RoomID)
+
+ received.Store(true)
+ return true
+ }
+ err = jetstream.JetStreamConsumer(
+ ctx.Context(), js, cfg.Global.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ cfg.Global.JetStream.Durable("TestReceipt"), 1,
+ onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+ assert.Nil(t, err)
+
+ txnRes, jsonRes := txn.ProcessTransaction(ctx.Context())
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+
+ check := func(log poll.LogT) poll.Result {
+ if received.Load() {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for events to be processed")
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(2*time.Second), poll.WithDelay(10*time.Millisecond))
+}
+
+func TestProcessTransactionRequestEDUSigningKeyUpdate(t *testing.T) {
+ var err error
+ edu := gomatrixserverlib.EDU{Type: "m.signing_key_update"}
+ if edu.Content, err = json.Marshal(map[string]interface{}{}); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+ badEDU := gomatrixserverlib.EDU{Type: "m.signing_key_update"}
+ badEDU.Content = gomatrixserverlib.RawJSON("badjson")
+ edus := []gomatrixserverlib.EDU{badEDU, edu}
+
+ ctx := process.NewProcessContext()
+ defer ctx.ShutdownDendrite()
+ txn, js, cfg := createTransactionWithEDU(ctx, edus)
+ received := atomic.NewBool(false)
+ onMessage := func(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+
+ var output keyAPI.CrossSigningKeyUpdate
+ if err = json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ println(err.Error())
+ return true
+ }
+
+ received.Store(true)
+ return true
+ }
+ err = jetstream.JetStreamConsumer(
+ ctx.Context(), js, cfg.Global.JetStream.Prefixed(jetstream.InputSigningKeyUpdate),
+ cfg.Global.JetStream.Durable("TestSigningKeyUpdate"), 1,
+ onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+ assert.Nil(t, err)
+
+ txnRes, jsonRes := txn.ProcessTransaction(ctx.Context())
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+
+ check := func(log poll.LogT) poll.Result {
+ if received.Load() {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for events to be processed")
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(2*time.Second), poll.WithDelay(10*time.Millisecond))
+}
+
+func TestProcessTransactionRequestEDUPresence(t *testing.T) {
+ var err error
+ userID := "@john:kaer.morhen"
+ presence := "online"
+ edu := gomatrixserverlib.EDU{Type: "m.presence"}
+ if edu.Content, err = json.Marshal(map[string]interface{}{
+ "push": []map[string]interface{}{{
+ "currently_active": true,
+ "last_active_ago": 5000,
+ "presence": presence,
+ "status_msg": "Making cupcakes",
+ "user_id": userID,
+ }},
+ }); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+ badEDU := gomatrixserverlib.EDU{Type: "m.presence"}
+ badEDU.Content = gomatrixserverlib.RawJSON("badjson")
+ edus := []gomatrixserverlib.EDU{badEDU, edu}
+
+ ctx := process.NewProcessContext()
+ defer ctx.ShutdownDendrite()
+ txn, js, cfg := createTransactionWithEDU(ctx, edus)
+ received := atomic.NewBool(false)
+ onMessage := func(ctx context.Context, msgs []*nats.Msg) bool {
+ msg := msgs[0] // Guaranteed to exist if onMessage is called
+
+ userIDRes := msg.Header.Get(jetstream.UserID)
+ presenceRes := msg.Header.Get("presence")
+ assert.Equal(t, userID, userIDRes)
+ assert.Equal(t, presence, presenceRes)
+
+ received.Store(true)
+ return true
+ }
+ err = jetstream.JetStreamConsumer(
+ ctx.Context(), js, cfg.Global.JetStream.Prefixed(jetstream.OutputPresenceEvent),
+ cfg.Global.JetStream.Durable("TestPresence"), 1,
+ onMessage, nats.DeliverAll(), nats.ManualAck(),
+ )
+ assert.Nil(t, err)
+
+ txnRes, jsonRes := txn.ProcessTransaction(ctx.Context())
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+
+ check := func(log poll.LogT) poll.Result {
+ if received.Load() {
+ return poll.Success()
+ }
+ return poll.Continue("waiting for events to be processed")
+ }
+ poll.WaitOn(t, check, poll.WithTimeout(2*time.Second), poll.WithDelay(10*time.Millisecond))
+}
+
+func TestProcessTransactionRequestEDUUnhandled(t *testing.T) {
+ var err error
+ edu := gomatrixserverlib.EDU{Type: "m.unhandled"}
+ if edu.Content, err = json.Marshal(map[string]interface{}{}); err != nil {
+ t.Errorf("failed to marshal EDU JSON")
+ }
+
+ ctx := process.NewProcessContext()
+ defer ctx.ShutdownDendrite()
+ txn, _, _ := createTransactionWithEDU(ctx, []gomatrixserverlib.EDU{edu})
+ txnRes, jsonRes := txn.ProcessTransaction(ctx.Context())
+
+ assert.Nil(t, jsonRes)
+ assert.Zero(t, len(txnRes.PDUs))
+}
+
+func init() {
+ for _, j := range testData {
+ e, err := gomatrixserverlib.NewEventFromTrustedJSON(j, false, testRoomVersion)
+ if err != nil {
+ panic("cannot load test data: " + err.Error())
+ }
+ h := e.Headered(testRoomVersion)
+ testEvents = append(testEvents, h)
+ if e.StateKey() != nil {
+ testStateEvents[gomatrixserverlib.StateKeyTuple{
+ EventType: e.Type(),
+ StateKey: *e.StateKey(),
+ }] = h
+ }
+ }
+}
+
+type testRoomserverAPI struct {
+ rsAPI.RoomserverInternalAPITrace
+ inputRoomEvents []rsAPI.InputRoomEvent
+ queryStateAfterEvents func(*rsAPI.QueryStateAfterEventsRequest) rsAPI.QueryStateAfterEventsResponse
+ queryEventsByID func(req *rsAPI.QueryEventsByIDRequest) rsAPI.QueryEventsByIDResponse
+ queryLatestEventsAndState func(*rsAPI.QueryLatestEventsAndStateRequest) rsAPI.QueryLatestEventsAndStateResponse
+}
+
+func (t *testRoomserverAPI) InputRoomEvents(
+ ctx context.Context,
+ request *rsAPI.InputRoomEventsRequest,
+ response *rsAPI.InputRoomEventsResponse,
+) error {
+ t.inputRoomEvents = append(t.inputRoomEvents, request.InputRoomEvents...)
+ for _, ire := range request.InputRoomEvents {
+ fmt.Println("InputRoomEvents: ", ire.Event.EventID())
+ }
+ return nil
+}
+
+// Query the latest events and state for a room from the room server.
+func (t *testRoomserverAPI) QueryLatestEventsAndState(
+ ctx context.Context,
+ request *rsAPI.QueryLatestEventsAndStateRequest,
+ response *rsAPI.QueryLatestEventsAndStateResponse,
+) error {
+ r := t.queryLatestEventsAndState(request)
+ response.RoomExists = r.RoomExists
+ response.RoomVersion = testRoomVersion
+ response.LatestEvents = r.LatestEvents
+ response.StateEvents = r.StateEvents
+ response.Depth = r.Depth
+ return nil
+}
+
+// Query the state after a list of events in a room from the room server.
+func (t *testRoomserverAPI) QueryStateAfterEvents(
+ ctx context.Context,
+ request *rsAPI.QueryStateAfterEventsRequest,
+ response *rsAPI.QueryStateAfterEventsResponse,
+) error {
+ response.RoomVersion = testRoomVersion
+ res := t.queryStateAfterEvents(request)
+ response.PrevEventsExist = res.PrevEventsExist
+ response.RoomExists = res.RoomExists
+ response.StateEvents = res.StateEvents
+ return nil
+}
+
+// Query a list of events by event ID.
+func (t *testRoomserverAPI) QueryEventsByID(
+ ctx context.Context,
+ request *rsAPI.QueryEventsByIDRequest,
+ response *rsAPI.QueryEventsByIDResponse,
+) error {
+ res := t.queryEventsByID(request)
+ response.Events = res.Events
+ return nil
+}
+
+// Query if a server is joined to a room
+func (t *testRoomserverAPI) QueryServerJoinedToRoom(
+ ctx context.Context,
+ request *rsAPI.QueryServerJoinedToRoomRequest,
+ response *rsAPI.QueryServerJoinedToRoomResponse,
+) error {
+ response.RoomExists = true
+ response.IsInRoom = true
+ return nil
+}
+
+// Asks for the room version for a given room.
+func (t *testRoomserverAPI) QueryRoomVersionForRoom(
+ ctx context.Context,
+ request *rsAPI.QueryRoomVersionForRoomRequest,
+ response *rsAPI.QueryRoomVersionForRoomResponse,
+) error {
+ response.RoomVersion = testRoomVersion
+ return nil
+}
+
+func (t *testRoomserverAPI) QueryServerBannedFromRoom(
+ ctx context.Context, req *rsAPI.QueryServerBannedFromRoomRequest, res *rsAPI.QueryServerBannedFromRoomResponse,
+) error {
+ res.Banned = false
+ return nil
+}
+
+func mustCreateTransaction(rsAPI rsAPI.FederationRoomserverAPI, pdus []json.RawMessage) *TxnReq {
+ t := NewTxnReq(
+ rsAPI,
+ nil,
+ "",
+ &test.NopJSONVerifier{},
+ NewMutexByRoom(),
+ nil,
+ false,
+ pdus,
+ nil,
+ testOrigin,
+ gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano())),
+ testDestination)
+ t.PDUs = pdus
+ t.Origin = testOrigin
+ t.TransactionID = gomatrixserverlib.TransactionID(fmt.Sprintf("%d", time.Now().UnixNano()))
+ t.Destination = testDestination
+ return &t
+}
+
+func mustProcessTransaction(t *testing.T, txn *TxnReq, pdusWithErrors []string) {
+ res, err := txn.ProcessTransaction(context.Background())
+ if err != nil {
+ t.Errorf("txn.processTransaction returned an error: %v", err)
+ return
+ }
+ if len(res.PDUs) != len(txn.PDUs) {
+ t.Errorf("txn.processTransaction did not return results for all PDUs, got %d want %d", len(res.PDUs), len(txn.PDUs))
+ return
+ }
+NextPDU:
+ for eventID, result := range res.PDUs {
+ if result.Error == "" {
+ continue
+ }
+ for _, eventIDWantError := range pdusWithErrors {
+ if eventID == eventIDWantError {
+ break NextPDU
+ }
+ }
+ t.Errorf("txn.processTransaction PDU %s returned an error %s", eventID, result.Error)
+ }
+}
+
+func assertInputRoomEvents(t *testing.T, got []rsAPI.InputRoomEvent, want []*gomatrixserverlib.HeaderedEvent) {
+ for _, g := range got {
+ fmt.Println("GOT ", g.Event.EventID())
+ }
+ if len(got) != len(want) {
+ t.Errorf("wrong number of InputRoomEvents: got %d want %d", len(got), len(want))
+ return
+ }
+ for i := range got {
+ if got[i].Event.EventID() != want[i].EventID() {
+ t.Errorf("InputRoomEvents[%d] got %s want %s", i, got[i].Event.EventID(), want[i].EventID())
+ }
+ }
+}
+
+// The purpose of this test is to check that receiving an event over federation for which we have the prev_events works correctly, and passes it on
+// to the roomserver. It's the most basic test possible.
+func TestBasicTransaction(t *testing.T) {
+ rsAPI := &testRoomserverAPI{}
+ pdus := []json.RawMessage{
+ testData[len(testData)-1], // a message event
+ }
+ txn := mustCreateTransaction(rsAPI, pdus)
+ mustProcessTransaction(t, txn, nil)
+ assertInputRoomEvents(t, rsAPI.inputRoomEvents, []*gomatrixserverlib.HeaderedEvent{testEvents[len(testEvents)-1]})
+}
+
+// The purpose of this test is to check that if the event received fails auth checks the event is still sent to the roomserver
+// as it does the auth check.
+func TestTransactionFailAuthChecks(t *testing.T) {
+ rsAPI := &testRoomserverAPI{}
+ pdus := []json.RawMessage{
+ testData[len(testData)-1], // a message event
+ }
+ txn := mustCreateTransaction(rsAPI, pdus)
+ mustProcessTransaction(t, txn, []string{})
+ // expect message to be sent to the roomserver
+ assertInputRoomEvents(t, rsAPI.inputRoomEvents, []*gomatrixserverlib.HeaderedEvent{testEvents[len(testEvents)-1]})
+}