aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorS7evinK <2353100+S7evinK@users.noreply.github.com>2022-03-29 14:14:35 +0200
committerGitHub <noreply@github.com>2022-03-29 14:14:35 +0200
commit49dc49b232432d52c082646cc6f778593f4cb8b4 (patch)
treee809deedcae2127e930b0d382c863561b61fd9d2
parent7972915806348847ecd9a9b8a1b1ff0609cb883c (diff)
Remove eduserver (#2306)
* Move receipt sending to own JetStream producer * Move SendToDevice to producer * Remove most parts of the EDU server * Fix SendToDevice & copyrights * Move structs, cleanup EDU Server traces * Use HeadersOnly subscription * Missing file * Fix linter issues * Move consumers to own files * Rename durable consumer; Consumer cleanup * Docs/config cleanup
-rw-r--r--build/docker/config/dendrite.yaml6
-rw-r--r--build/docker/docker-compose.polylith.yml12
-rw-r--r--build/gobind-pinecone/monolith.go7
-rw-r--r--build/gobind-yggdrasil/monolith.go17
-rw-r--r--clientapi/clientapi.go13
-rw-r--r--clientapi/producers/syncapi.go124
-rw-r--r--clientapi/routing/account_data.go5
-rw-r--r--clientapi/routing/receipt.go11
-rw-r--r--clientapi/routing/routing.go12
-rw-r--r--clientapi/routing/sendtodevice.go10
-rw-r--r--clientapi/routing/sendtyping.go11
-rw-r--r--cmd/dendrite-demo-libp2p/main.go7
-rw-r--r--cmd/dendrite-demo-pinecone/main.go7
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go17
-rw-r--r--cmd/dendrite-monolith-server/main.go21
-rw-r--r--cmd/dendrite-polylith-multi/main.go1
-rw-r--r--cmd/dendrite-polylith-multi/personalities/clientapi.go3
-rw-r--r--cmd/dendrite-polylith-multi/personalities/eduserver.go33
-rw-r--r--cmd/dendrite-polylith-multi/personalities/federationapi.go4
-rw-r--r--cmd/dendritejs-pinecone/main.go14
-rw-r--r--cmd/dendritejs/main.go4
-rw-r--r--dendrite-config.yaml6
-rw-r--r--docs/INSTALL.md8
-rw-r--r--eduserver/api/input.go103
-rw-r--r--eduserver/api/output.go57
-rw-r--r--eduserver/api/types.go59
-rw-r--r--eduserver/api/wrapper.go88
-rw-r--r--eduserver/eduserver.go56
-rw-r--r--eduserver/input/input.go198
-rw-r--r--eduserver/inthttp/client.go70
-rw-r--r--eduserver/inthttp/server.go54
-rw-r--r--federationapi/consumers/eduserver.go257
-rw-r--r--federationapi/consumers/keychange.go4
-rw-r--r--federationapi/consumers/receipts.go141
-rw-r--r--federationapi/consumers/sendtodevice.go125
-rw-r--r--federationapi/consumers/typing.go119
-rw-r--r--federationapi/federationapi.go41
-rw-r--r--federationapi/federationapi_test.go2
-rw-r--r--federationapi/producers/syncapi.go144
-rw-r--r--federationapi/routing/routing.go6
-rw-r--r--federationapi/routing/send.go39
-rw-r--r--federationapi/routing/send_test.go40
-rw-r--r--federationapi/types/types.go15
-rw-r--r--internal/caching/cache_typing.go (renamed from eduserver/cache/cache.go)6
-rw-r--r--internal/caching/cache_typing_test.go (renamed from eduserver/cache/cache_test.go)6
-rw-r--r--internal/test/config.go2
-rw-r--r--keyserver/api/api.go18
-rw-r--r--keyserver/internal/cross_signing.go5
-rw-r--r--keyserver/producers/keychange.go5
-rw-r--r--setup/base/base.go11
-rw-r--r--setup/config/config.go15
-rw-r--r--setup/config/config_eduserver.go17
-rw-r--r--setup/config/config_test.go4
-rw-r--r--setup/jetstream/nats.go21
-rw-r--r--setup/jetstream/streams.go5
-rw-r--r--setup/monolith.go18
-rw-r--r--syncapi/consumers/clientapi.go2
-rw-r--r--syncapi/consumers/receipts.go (renamed from syncapi/consumers/eduserver_receipts.go)24
-rw-r--r--syncapi/consumers/sendtodevice.go (renamed from syncapi/consumers/eduserver_sendtodevice.go)26
-rw-r--r--syncapi/consumers/typing.go (renamed from syncapi/consumers/eduserver_typing.go)51
-rw-r--r--syncapi/storage/interface.go5
-rw-r--r--syncapi/storage/postgres/receipt_table.go7
-rw-r--r--syncapi/storage/shared/syncserver.go5
-rw-r--r--syncapi/storage/sqlite3/receipt_table.go7
-rw-r--r--syncapi/storage/tables/interface.go3
-rw-r--r--syncapi/streams/stream_receipt.go19
-rw-r--r--syncapi/streams/stream_typing.go4
-rw-r--r--syncapi/streams/streams.go4
-rw-r--r--syncapi/syncapi.go6
-rw-r--r--syncapi/types/types.go18
70 files changed, 931 insertions, 1354 deletions
diff --git a/build/docker/config/dendrite.yaml b/build/docker/config/dendrite.yaml
index c01ab62e..f3d37303 100644
--- a/build/docker/config/dendrite.yaml
+++ b/build/docker/config/dendrite.yaml
@@ -160,12 +160,6 @@ client_api:
threshold: 5
cooloff_ms: 500
-# Configuration for the EDU server.
-edu_server:
- internal_api:
- listen: http://0.0.0.0:7778
- connect: http://edu_server:7778
-
# Configuration for the Federation API.
federation_api:
internal_api:
diff --git a/build/docker/docker-compose.polylith.yml b/build/docker/docker-compose.polylith.yml
index 207d0451..de0ab0aa 100644
--- a/build/docker/docker-compose.polylith.yml
+++ b/build/docker/docker-compose.polylith.yml
@@ -84,18 +84,6 @@ services:
- internal
restart: unless-stopped
- edu_server:
- hostname: edu_server
- image: matrixdotorg/dendrite-polylith:latest
- command: eduserver
- volumes:
- - ./config:/etc/dendrite
- depends_on:
- - jetstream
- networks:
- - internal
- restart: unless-stopped
-
federation_api:
hostname: federation_api
image: matrixdotorg/dendrite-polylith:latest
diff --git a/build/gobind-pinecone/monolith.go b/build/gobind-pinecone/monolith.go
index 346c5d1e..efc21f59 100644
--- a/build/gobind-pinecone/monolith.go
+++ b/build/gobind-pinecone/monolith.go
@@ -24,8 +24,6 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
- "github.com/matrix-org/dendrite/eduserver"
- "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/httputil"
@@ -317,10 +315,6 @@ func (m *DendriteMonolith) Start() {
m.userAPI = userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(m.userAPI)
- eduInputAPI := eduserver.NewInternalAPI(
- base, cache.New(), m.userAPI,
- )
-
asAPI := appservice.NewInternalAPI(base, m.userAPI, rsAPI)
// The underlying roomserver implementation needs to be able to call the fedsender.
@@ -338,7 +332,6 @@ func (m *DendriteMonolith) Start() {
KeyRing: keyRing,
AppserviceAPI: asAPI,
- EDUInternalAPI: eduInputAPI,
FederationAPI: fsAPI,
RoomserverAPI: rsAPI,
UserAPI: m.userAPI,
diff --git a/build/gobind-yggdrasil/monolith.go b/build/gobind-yggdrasil/monolith.go
index a2c9bcff..87dcad2e 100644
--- a/build/gobind-yggdrasil/monolith.go
+++ b/build/gobind-yggdrasil/monolith.go
@@ -13,8 +13,6 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
- "github.com/matrix-org/dendrite/eduserver"
- "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/httputil"
@@ -123,10 +121,6 @@ func (m *DendriteMonolith) Start() {
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(userAPI)
- eduInputAPI := eduserver.NewInternalAPI(
- base, cache.New(), userAPI,
- )
-
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
@@ -141,12 +135,11 @@ func (m *DendriteMonolith) Start() {
FedClient: federation,
KeyRing: keyRing,
- AppserviceAPI: asAPI,
- EDUInternalAPI: eduInputAPI,
- FederationAPI: fsAPI,
- RoomserverAPI: rsAPI,
- UserAPI: userAPI,
- KeyAPI: keyAPI,
+ AppserviceAPI: asAPI,
+ FederationAPI: fsAPI,
+ RoomserverAPI: rsAPI,
+ UserAPI: userAPI,
+ KeyAPI: keyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,
),
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index d0ef368d..d4b417a3 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/routing"
- eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/transactions"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
@@ -40,7 +39,6 @@ func AddPublicRoutes(
cfg *config.ClientAPI,
federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI,
- eduInputAPI eduServerAPI.EDUServerInputAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
transactionsCache *transactions.Cache,
fsAPI federationAPI.FederationInternalAPI,
@@ -53,12 +51,17 @@ func AddPublicRoutes(
js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
syncProducer := &producers.SyncAPIProducer{
- JetStream: js,
- Topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
+ JetStream: js,
+ TopicClientData: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
+ TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ UserAPI: userAPI,
+ ServerName: cfg.Matrix.ServerName,
}
routing.Setup(
- router, synapseAdminRouter, cfg, eduInputAPI, rsAPI, asAPI,
+ router, synapseAdminRouter, cfg, rsAPI, asAPI,
userAPI, userDirectoryProvider, federation,
syncProducer, transactionsCache, fsAPI, keyAPI,
extRoomsProvider, mscCfg,
diff --git a/clientapi/producers/syncapi.go b/clientapi/producers/syncapi.go
index 9ab90391..2dee04e3 100644
--- a/clientapi/producers/syncapi.go
+++ b/clientapi/producers/syncapi.go
@@ -15,24 +15,34 @@
package producers
import (
+ "context"
"encoding/json"
+ "strconv"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// SyncAPIProducer produces events for the sync API server to consume
type SyncAPIProducer struct {
- Topic string
- JetStream nats.JetStreamContext
+ TopicClientData string
+ TopicReceiptEvent string
+ TopicSendToDeviceEvent string
+ TopicTypingEvent string
+ JetStream nats.JetStreamContext
+ ServerName gomatrixserverlib.ServerName
+ UserAPI userapi.UserInternalAPI
}
// SendData sends account data to the sync API server
func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string, readMarker *eventutil.ReadMarkerJSON) error {
m := &nats.Msg{
- Subject: p.Topic,
+ Subject: p.TopicClientData,
Header: nats.Header{},
}
m.Header.Set(jetstream.UserID, userID)
@@ -52,8 +62,114 @@ func (p *SyncAPIProducer) SendData(userID string, roomID string, dataType string
"user_id": userID,
"room_id": roomID,
"data_type": dataType,
- }).Tracef("Producing to topic '%s'", p.Topic)
+ }).Tracef("Producing to topic '%s'", p.TopicClientData)
_, err = p.JetStream.PublishMsg(m)
return err
}
+
+func (p *SyncAPIProducer) SendReceipt(
+ ctx context.Context,
+ userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp,
+) error {
+ m := &nats.Msg{
+ Subject: p.TopicReceiptEvent,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set(jetstream.RoomID, roomID)
+ m.Header.Set(jetstream.EventID, eventID)
+ m.Header.Set("type", receiptType)
+ m.Header.Set("timestamp", strconv.Itoa(int(timestamp)))
+
+ log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}
+
+func (p *SyncAPIProducer) SendToDevice(
+ ctx context.Context, sender, userID, deviceID, eventType string,
+ message interface{},
+) error {
+ devices := []string{}
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ return err
+ }
+
+ // If the event is targeted locally then we want to expand the wildcard
+ // out into individual device IDs so that we can send them to each respective
+ // device. If the event isn't targeted locally then we can't expand the
+ // wildcard as we don't know about the remote devices, so instead we leave it
+ // as-is, so that the federation sender can send it on with the wildcard intact.
+ if domain == p.ServerName && deviceID == "*" {
+ var res userapi.QueryDevicesResponse
+ err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{
+ UserID: userID,
+ }, &res)
+ if err != nil {
+ return err
+ }
+ for _, dev := range res.Devices {
+ devices = append(devices, dev.ID)
+ }
+ } else {
+ devices = append(devices, deviceID)
+ }
+
+ js, err := json.Marshal(message)
+ if err != nil {
+ return err
+ }
+
+ log.WithFields(log.Fields{
+ "user_id": userID,
+ "num_devices": len(devices),
+ "type": eventType,
+ }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
+ for _, device := range devices {
+ ote := &types.OutputSendToDeviceEvent{
+ UserID: userID,
+ DeviceID: device,
+ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
+ Sender: sender,
+ Type: eventType,
+ Content: js,
+ },
+ }
+
+ eventJSON, err := json.Marshal(ote)
+ if err != nil {
+ log.WithError(err).Error("sendToDevice failed json.Marshal")
+ return err
+ }
+ m := &nats.Msg{
+ Subject: p.TopicSendToDeviceEvent,
+ Data: eventJSON,
+ Header: nats.Header{},
+ }
+ m.Header.Set("sender", sender)
+ m.Header.Set(jetstream.UserID, userID)
+ if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
+ log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
+ return err
+ }
+ }
+ return nil
+}
+
+func (p *SyncAPIProducer) SendTyping(
+ ctx context.Context, userID, roomID string, typing bool, timeoutMS int64,
+) error {
+ m := &nats.Msg{
+ Subject: p.TopicTypingEvent,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set(jetstream.RoomID, roomID)
+ m.Header.Set("typing", strconv.FormatBool(typing))
+ m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS)))
+
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}
diff --git a/clientapi/routing/account_data.go b/clientapi/routing/account_data.go
index d8e98269..873ffaf5 100644
--- a/clientapi/routing/account_data.go
+++ b/clientapi/routing/account_data.go
@@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
- eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/userapi/api"
@@ -146,7 +145,7 @@ type fullyReadEvent struct {
// SaveReadMarker implements POST /rooms/{roomId}/read_markers
func SaveReadMarker(
req *http.Request,
- userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, eduAPI eduserverAPI.EDUServerInputAPI,
+ userAPI api.UserInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
syncProducer *producers.SyncAPIProducer, device *api.Device, roomID string,
) util.JSONResponse {
// Verify that the user is a member of this room
@@ -192,7 +191,7 @@ func SaveReadMarker(
// Handle the read receipt that may be included in the read marker
if r.Read != "" {
- return SetReceipt(req, eduAPI, device, roomID, "m.read", r.Read)
+ return SetReceipt(req, syncProducer, device, roomID, "m.read", r.Read)
}
return util.JSONResponse{
diff --git a/clientapi/routing/receipt.go b/clientapi/routing/receipt.go
index fe8fe765..0f9b1b4f 100644
--- a/clientapi/routing/receipt.go
+++ b/clientapi/routing/receipt.go
@@ -19,21 +19,20 @@ import (
"net/http"
"time"
+ "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/dendrite/eduserver/api"
-
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
)
-func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi.Device, roomId, receiptType, eventId string) util.JSONResponse {
+func SetReceipt(req *http.Request, syncProducer *producers.SyncAPIProducer, device *userapi.Device, roomID, receiptType, eventID string) util.JSONResponse {
timestamp := gomatrixserverlib.AsTimestamp(time.Now())
logrus.WithFields(logrus.Fields{
- "roomId": roomId,
+ "roomID": roomID,
"receiptType": receiptType,
- "eventId": eventId,
+ "eventID": eventID,
"userId": device.UserID,
"timestamp": timestamp,
}).Debug("Setting receipt")
@@ -43,7 +42,7 @@ func SetReceipt(req *http.Request, eduAPI api.EDUServerInputAPI, device *userapi
return util.MessageResponse(400, fmt.Sprintf("receipt type must be m.read not '%s'", receiptType))
}
- if err := api.SendReceipt(req.Context(), eduAPI, device.UserID, roomId, eventId, receiptType, timestamp); err != nil {
+ if err := syncProducer.SendReceipt(req.Context(), device.UserID, roomID, eventID, receiptType, timestamp); err != nil {
return util.ErrorResponse(err)
}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index 218087bb..8afaba56 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -26,7 +26,6 @@ import (
clientutil "github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
- eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/transactions"
@@ -47,7 +46,6 @@ import (
// nolint: gocyclo
func Setup(
publicAPIMux, synapseAdminRouter *mux.Router, cfg *config.ClientAPI,
- eduAPI eduServerAPI.EDUServerInputAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
userAPI userapi.UserInternalAPI,
@@ -467,7 +465,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SendTyping(req, device, vars["roomID"], vars["userID"], eduAPI, rsAPI)
+ return SendTyping(req, device, vars["roomID"], vars["userID"], rsAPI, syncProducer)
}),
).Methods(http.MethodPut, http.MethodOptions)
v3mux.Handle("/rooms/{roomID}/redact/{eventID}",
@@ -496,7 +494,7 @@ func Setup(
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
- return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
+ return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -510,7 +508,7 @@ func Setup(
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
- return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
+ return SendToDevice(req, device, syncProducer, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -942,7 +940,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SaveReadMarker(req, userAPI, rsAPI, eduAPI, syncProducer, device, vars["roomID"])
+ return SaveReadMarker(req, userAPI, rsAPI, syncProducer, device, vars["roomID"])
}),
).Methods(http.MethodPost, http.MethodOptions)
@@ -1297,7 +1295,7 @@ func Setup(
return util.ErrorResponse(err)
}
- return SetReceipt(req, eduAPI, device, vars["roomId"], vars["receiptType"], vars["eventId"])
+ return SetReceipt(req, syncProducer, device, vars["roomId"], vars["receiptType"], vars["eventId"])
}),
).Methods(http.MethodPost, http.MethodOptions)
}
diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go
index 768e8e0e..4a5f0888 100644
--- a/clientapi/routing/sendtodevice.go
+++ b/clientapi/routing/sendtodevice.go
@@ -18,17 +18,17 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/transactions"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
)
// SendToDevice handles PUT /_matrix/client/r0/sendToDevice/{eventType}/{txnId}
-// sends the device events to the EDU Server
+// sends the device events to the syncapi & federationsender
func SendToDevice(
req *http.Request, device *userapi.Device,
- eduAPI api.EDUServerInputAPI,
+ syncProducer *producers.SyncAPIProducer,
txnCache *transactions.Cache,
eventType string, txnID *string,
) util.JSONResponse {
@@ -48,8 +48,8 @@ func SendToDevice(
for userID, byUser := range httpReq.Messages {
for deviceID, message := range byUser {
- if err := api.SendToDevice(
- req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message,
+ if err := syncProducer.SendToDevice(
+ req.Context(), device.UserID, userID, deviceID, eventType, message,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go
index abd2061a..6a27ee61 100644
--- a/clientapi/routing/sendtyping.go
+++ b/clientapi/routing/sendtyping.go
@@ -17,7 +17,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/eduserver/api"
+ "github.com/matrix-org/dendrite/clientapi/producers"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/util"
@@ -32,9 +32,8 @@ type typingContentJSON struct {
// sends the typing events to client API typingProducer
func SendTyping(
req *http.Request, device *userapi.Device, roomID string,
- userID string,
- eduAPI api.EDUServerInputAPI,
- rsAPI roomserverAPI.RoomserverInternalAPI,
+ userID string, rsAPI roomserverAPI.RoomserverInternalAPI,
+ syncProducer *producers.SyncAPIProducer,
) util.JSONResponse {
if device.UserID != userID {
return util.JSONResponse{
@@ -56,9 +55,7 @@ func SendTyping(
return *resErr
}
- if err := api.SendTyping(
- req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout,
- ); err != nil {
+ if err := syncProducer.SendTyping(req.Context(), userID, roomID, r.Typing, r.Timeout); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed")
return jsonerror.InternalServerError()
}
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go
index 8ce64191..26c8eb85 100644
--- a/cmd/dendrite-demo-libp2p/main.go
+++ b/cmd/dendrite-demo-libp2p/main.go
@@ -29,7 +29,6 @@ import (
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
- "github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
@@ -40,8 +39,6 @@ import (
"github.com/matrix-org/dendrite/userapi"
"github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/dendrite/eduserver/cache"
-
"github.com/sirupsen/logrus"
_ "github.com/mattn/go-sqlite3"
@@ -152,9 +149,6 @@ func main() {
userAPI := userapi.NewInternalAPI(&base.Base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.Base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(userAPI)
- eduInputAPI := eduserver.NewInternalAPI(
- &base.Base, cache.New(), userAPI,
- )
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationapi.NewInternalAPI(
@@ -180,7 +174,6 @@ func main() {
KeyRing: keyRing,
AppserviceAPI: asAPI,
- EDUInternalAPI: eduInputAPI,
FederationAPI: fsAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
diff --git a/cmd/dendrite-demo-pinecone/main.go b/cmd/dendrite-demo-pinecone/main.go
index 87054dc8..a6abf06e 100644
--- a/cmd/dendrite-demo-pinecone/main.go
+++ b/cmd/dendrite-demo-pinecone/main.go
@@ -37,8 +37,6 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/users"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
- "github.com/matrix-org/dendrite/eduserver"
- "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal"
@@ -191,10 +189,6 @@ func main() {
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(userAPI)
- eduInputAPI := eduserver.NewInternalAPI(
- base, cache.New(), userAPI,
- )
-
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsComponent.SetFederationAPI(fsAPI, keyRing)
@@ -210,7 +204,6 @@ func main() {
KeyRing: keyRing,
AppserviceAPI: asAPI,
- EDUInternalAPI: eduInputAPI,
FederationAPI: fsAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index b7e30ba2..b840eb2b 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -32,8 +32,6 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
- "github.com/matrix-org/dendrite/eduserver"
- "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal"
@@ -120,10 +118,6 @@ func main() {
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(userAPI)
- eduInputAPI := eduserver.NewInternalAPI(
- base, cache.New(), userAPI,
- )
-
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationapi.NewInternalAPI(
@@ -139,12 +133,11 @@ func main() {
FedClient: federation,
KeyRing: keyRing,
- AppserviceAPI: asAPI,
- EDUInternalAPI: eduInputAPI,
- FederationAPI: fsAPI,
- RoomserverAPI: rsAPI,
- UserAPI: userAPI,
- KeyAPI: keyAPI,
+ AppserviceAPI: asAPI,
+ FederationAPI: fsAPI,
+ RoomserverAPI: rsAPI,
+ UserAPI: userAPI,
+ KeyAPI: keyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,
),
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index 3b952504..1443ab5b 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -19,8 +19,6 @@ import (
"os"
"github.com/matrix-org/dendrite/appservice"
- "github.com/matrix-org/dendrite/eduserver"
- "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/keyserver"
"github.com/matrix-org/dendrite/roomserver"
@@ -61,7 +59,6 @@ func main() {
// itself.
cfg.AppServiceAPI.InternalAPI.Connect = httpAPIAddr
cfg.ClientAPI.InternalAPI.Connect = httpAPIAddr
- cfg.EDUServer.InternalAPI.Connect = httpAPIAddr
cfg.FederationAPI.InternalAPI.Connect = httpAPIAddr
cfg.KeyServer.InternalAPI.Connect = httpAPIAddr
cfg.MediaAPI.InternalAPI.Connect = httpAPIAddr
@@ -136,14 +133,6 @@ func main() {
rsImpl.SetUserAPI(userAPI)
keyImpl.SetUserAPI(userAPI)
- eduInputAPI := eduserver.NewInternalAPI(
- base, cache.New(), userAPI,
- )
- if base.UseHTTPAPIs {
- eduserver.AddInternalRoutes(base.InternalAPIMux, eduInputAPI)
- eduInputAPI = base.EDUServerClient()
- }
-
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,
@@ -151,12 +140,10 @@ func main() {
FedClient: federation,
KeyRing: keyRing,
- AppserviceAPI: asAPI,
- EDUInternalAPI: eduInputAPI,
- FederationAPI: fsAPI,
- RoomserverAPI: rsAPI,
- UserAPI: userAPI,
- KeyAPI: keyAPI,
+ AppserviceAPI: asAPI, FederationAPI: fsAPI,
+ RoomserverAPI: rsAPI,
+ UserAPI: userAPI,
+ KeyAPI: keyAPI,
}
monolith.AddAllPublicRoutes(
base.ProcessContext,
diff --git a/cmd/dendrite-polylith-multi/main.go b/cmd/dendrite-polylith-multi/main.go
index edfe6cdb..6226cc32 100644
--- a/cmd/dendrite-polylith-multi/main.go
+++ b/cmd/dendrite-polylith-multi/main.go
@@ -43,7 +43,6 @@ func main() {
components := map[string]entrypoint{
"appservice": personalities.Appservice,
"clientapi": personalities.ClientAPI,
- "eduserver": personalities.EDUServer,
"federationapi": personalities.FederationAPI,
"keyserver": personalities.KeyServer,
"mediaapi": personalities.MediaAPI,
diff --git a/cmd/dendrite-polylith-multi/personalities/clientapi.go b/cmd/dendrite-polylith-multi/personalities/clientapi.go
index 978d8b0a..1e509f88 100644
--- a/cmd/dendrite-polylith-multi/personalities/clientapi.go
+++ b/cmd/dendrite-polylith-multi/personalities/clientapi.go
@@ -27,13 +27,12 @@ func ClientAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
asQuery := base.AppserviceHTTPClient()
rsAPI := base.RoomserverHTTPClient()
fsAPI := base.FederationAPIHTTPClient()
- eduInputAPI := base.EDUServerClient()
userAPI := base.UserAPIClient()
keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes(
base.ProcessContext, base.PublicClientAPIMux, base.SynapseAdminMux, &base.Cfg.ClientAPI,
- federation, rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI,
+ federation, rsAPI, asQuery, transactions.New(), fsAPI, userAPI, userAPI,
keyAPI, nil, &cfg.MSCs,
)
diff --git a/cmd/dendrite-polylith-multi/personalities/eduserver.go b/cmd/dendrite-polylith-multi/personalities/eduserver.go
deleted file mode 100644
index 8719facb..00000000
--- a/cmd/dendrite-polylith-multi/personalities/eduserver.go
+++ /dev/null
@@ -1,33 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package personalities
-
-import (
- "github.com/matrix-org/dendrite/eduserver"
- "github.com/matrix-org/dendrite/eduserver/cache"
- basepkg "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/config"
-)
-
-func EDUServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
- intAPI := eduserver.NewInternalAPI(base, cache.New(), base.UserAPIClient())
- eduserver.AddInternalRoutes(base.InternalAPIMux, intAPI)
-
- base.SetupAndServeHTTP(
- base.Cfg.EDUServer.InternalAPI.Listen, // internal listener
- basepkg.NoListener, // external listener
- nil, nil,
- )
-}
diff --git a/cmd/dendrite-polylith-multi/personalities/federationapi.go b/cmd/dendrite-polylith-multi/personalities/federationapi.go
index 44357d66..b82577ce 100644
--- a/cmd/dendrite-polylith-multi/personalities/federationapi.go
+++ b/cmd/dendrite-polylith-multi/personalities/federationapi.go
@@ -29,9 +29,9 @@ func FederationAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
keyRing := fsAPI.KeyRing()
federationapi.AddPublicRoutes(
- base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux,
+ base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux,
&base.Cfg.FederationAPI, userAPI, federation, keyRing,
- rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
+ rsAPI, fsAPI, keyAPI,
&base.Cfg.MSCs, nil,
)
diff --git a/cmd/dendritejs-pinecone/main.go b/cmd/dendritejs-pinecone/main.go
index 407081f5..05cdf29e 100644
--- a/cmd/dendritejs-pinecone/main.go
+++ b/cmd/dendritejs-pinecone/main.go
@@ -31,8 +31,6 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/conn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-pinecone/rooms"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
- "github.com/matrix-org/dendrite/eduserver"
- "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
@@ -193,7 +191,6 @@ func startup() {
userAPI := userapi.NewInternalAPI(base, accountDB, &cfg.UserAPI, nil, keyAPI, rsAPI, base.PushGatewayHTTPClient())
keyAPI.SetUserAPI(userAPI)
- eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI,
)
@@ -208,12 +205,11 @@ func startup() {
FedClient: federation,
KeyRing: keyRing,
- AppserviceAPI: asQuery,
- EDUInternalAPI: eduInputAPI,
- FederationAPI: fedSenderAPI,
- RoomserverAPI: rsAPI,
- UserAPI: userAPI,
- KeyAPI: keyAPI,
+ AppserviceAPI: asQuery,
+ FederationAPI: fedSenderAPI,
+ RoomserverAPI: rsAPI,
+ UserAPI: userAPI,
+ KeyAPI: keyAPI,
//ServerKeyAPI: serverKeyAPI,
ExtPublicRoomsProvider: rooms.NewPineconeRoomProvider(pRouter, pSessions, fedSenderAPI, federation),
}
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index 37cbb12d..05e0f0ad 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -24,8 +24,6 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/appservice"
- "github.com/matrix-org/dendrite/eduserver"
- "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/keyserver"
@@ -203,7 +201,6 @@ func main() {
}
rsAPI := roomserver.NewInternalAPI(base)
- eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
asQuery := appservice.NewInternalAPI(
base, userAPI, rsAPI,
)
@@ -222,7 +219,6 @@ func main() {
KeyRing: &keyRing,
AppserviceAPI: asQuery,
- EDUInternalAPI: eduInputAPI,
FederationSenderAPI: fedSenderAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
diff --git a/dendrite-config.yaml b/dendrite-config.yaml
index 0236851c..6e2bc7be 100644
--- a/dendrite-config.yaml
+++ b/dendrite-config.yaml
@@ -187,12 +187,6 @@ client_api:
threshold: 5
cooloff_ms: 500
-# Configuration for the EDU server.
-edu_server:
- internal_api:
- listen: http://localhost:7778 # Only used in polylith deployments
- connect: http://localhost:7778 # Only used in polylith deployments
-
# Configuration for the Federation API.
federation_api:
internal_api:
diff --git a/docs/INSTALL.md b/docs/INSTALL.md
index a7b2e67f..523c5c7d 100644
--- a/docs/INSTALL.md
+++ b/docs/INSTALL.md
@@ -263,14 +263,6 @@ This manages end-to-end encryption keys for users.
./bin/dendrite-polylith-multi --config=dendrite.yaml keyserver
```
-#### EDU server
-
-This manages processing EDUs such as typing, send-to-device events and presence. Clients do not talk to
-
-```bash
-./bin/dendrite-polylith-multi --config=dendrite.yaml eduserver
-```
-
#### User server
This manages user accounts, device access tokens and user account data,
diff --git a/eduserver/api/input.go b/eduserver/api/input.go
deleted file mode 100644
index 2aab107b..00000000
--- a/eduserver/api/input.go
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-// Copyright 2017-2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// Package api provides the types that are used to communicate with the typing server.
-package api
-
-import (
- "context"
-
- "github.com/matrix-org/gomatrixserverlib"
-)
-
-// InputTypingEvent is an event for notifying the typing server about typing updates.
-type InputTypingEvent struct {
- // UserID of the user to update typing status.
- UserID string `json:"user_id"`
- // RoomID of the room the user is typing (or has stopped).
- RoomID string `json:"room_id"`
- // Typing is true if the user is typing, false if they have stopped.
- Typing bool `json:"typing"`
- // Timeout is the interval in milliseconds for which the user should be marked as typing.
- TimeoutMS int64 `json:"timeout"`
- // OriginServerTS when the server received the update.
- OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
-}
-
-type InputSendToDeviceEvent struct {
- UserID string `json:"user_id"`
- DeviceID string `json:"device_id"`
- gomatrixserverlib.SendToDeviceEvent
-}
-
-// InputTypingEventRequest is a request to EDUServerInputAPI
-type InputTypingEventRequest struct {
- InputTypingEvent InputTypingEvent `json:"input_typing_event"`
-}
-
-// InputTypingEventResponse is a response to InputTypingEvents
-type InputTypingEventResponse struct{}
-
-// InputSendToDeviceEventRequest is a request to EDUServerInputAPI
-type InputSendToDeviceEventRequest struct {
- InputSendToDeviceEvent InputSendToDeviceEvent `json:"input_send_to_device_event"`
-}
-
-// InputSendToDeviceEventResponse is a response to InputSendToDeviceEventRequest
-type InputSendToDeviceEventResponse struct{}
-
-type InputReceiptEvent struct {
- UserID string `json:"user_id"`
- RoomID string `json:"room_id"`
- EventID string `json:"event_id"`
- Type string `json:"type"`
- Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
-}
-
-// InputReceiptEventRequest is a request to EDUServerInputAPI
-type InputReceiptEventRequest struct {
- InputReceiptEvent InputReceiptEvent `json:"input_receipt_event"`
-}
-
-// InputReceiptEventResponse is a response to InputReceiptEventRequest
-type InputReceiptEventResponse struct{}
-
-type InputCrossSigningKeyUpdateRequest struct {
- CrossSigningKeyUpdate `json:"signing_keys"`
-}
-
-type InputCrossSigningKeyUpdateResponse struct{}
-
-// EDUServerInputAPI is used to write events to the typing server.
-type EDUServerInputAPI interface {
- InputTypingEvent(
- ctx context.Context,
- request *InputTypingEventRequest,
- response *InputTypingEventResponse,
- ) error
-
- InputSendToDeviceEvent(
- ctx context.Context,
- request *InputSendToDeviceEventRequest,
- response *InputSendToDeviceEventResponse,
- ) error
-
- InputReceiptEvent(
- ctx context.Context,
- request *InputReceiptEventRequest,
- response *InputReceiptEventResponse,
- ) error
-}
diff --git a/eduserver/api/output.go b/eduserver/api/output.go
deleted file mode 100644
index c6de4e01..00000000
--- a/eduserver/api/output.go
+++ /dev/null
@@ -1,57 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-// Copyright 2017-2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package api
-
-import (
- "time"
-
- "github.com/matrix-org/gomatrixserverlib"
-)
-
-// OutputTypingEvent is an entry in typing server output kafka log.
-// This contains the event with extra fields used to create 'm.typing' event
-// in clientapi & federation.
-type OutputTypingEvent struct {
- // The Event for the typing edu event.
- Event TypingEvent `json:"event"`
- // ExpireTime is the interval after which the user should no longer be
- // considered typing. Only available if Event.Typing is true.
- ExpireTime *time.Time
-}
-
-// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
-// This contains the full event content, along with the user ID and device ID
-// to which it is destined.
-type OutputSendToDeviceEvent struct {
- UserID string `json:"user_id"`
- DeviceID string `json:"device_id"`
- gomatrixserverlib.SendToDeviceEvent
-}
-
-// OutputReceiptEvent is an entry in the receipt output kafka log
-type OutputReceiptEvent struct {
- UserID string `json:"user_id"`
- RoomID string `json:"room_id"`
- EventID string `json:"event_id"`
- Type string `json:"type"`
- Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
-}
-
-// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log
-type OutputCrossSigningKeyUpdate struct {
- CrossSigningKeyUpdate `json:"signing_keys"`
-}
diff --git a/eduserver/api/types.go b/eduserver/api/types.go
deleted file mode 100644
index a207580f..00000000
--- a/eduserver/api/types.go
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright 2021 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package api
-
-import "github.com/matrix-org/gomatrixserverlib"
-
-const (
- MSigningKeyUpdate = "m.signing_key_update"
-)
-
-type TypingEvent struct {
- Type string `json:"type"`
- RoomID string `json:"room_id"`
- UserID string `json:"user_id"`
- Typing bool `json:"typing"`
-}
-
-type ReceiptEvent struct {
- UserID string `json:"user_id"`
- RoomID string `json:"room_id"`
- EventID string `json:"event_id"`
- Type string `json:"type"`
- Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
-}
-
-type FederationReceiptMRead struct {
- User map[string]FederationReceiptData `json:"m.read"`
-}
-
-type FederationReceiptData struct {
- Data ReceiptTS `json:"data"`
- EventIDs []string `json:"event_ids"`
-}
-
-type ReceiptMRead struct {
- User map[string]ReceiptTS `json:"m.read"`
-}
-
-type ReceiptTS struct {
- TS gomatrixserverlib.Timestamp `json:"ts"`
-}
-
-type CrossSigningKeyUpdate struct {
- MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"`
- SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"`
- UserID string `json:"user_id"`
-}
diff --git a/eduserver/api/wrapper.go b/eduserver/api/wrapper.go
deleted file mode 100644
index 7907f4d3..00000000
--- a/eduserver/api/wrapper.go
+++ /dev/null
@@ -1,88 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package api
-
-import (
- "context"
- "encoding/json"
- "time"
-
- "github.com/matrix-org/gomatrixserverlib"
-)
-
-// SendTyping sends a typing event to EDU server
-func SendTyping(
- ctx context.Context, eduAPI EDUServerInputAPI, userID, roomID string,
- typing bool, timeoutMS int64,
-) error {
- requestData := InputTypingEvent{
- UserID: userID,
- RoomID: roomID,
- Typing: typing,
- TimeoutMS: timeoutMS,
- OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
- }
-
- var response InputTypingEventResponse
- err := eduAPI.InputTypingEvent(
- ctx, &InputTypingEventRequest{InputTypingEvent: requestData}, &response,
- )
-
- return err
-}
-
-// SendToDevice sends a typing event to EDU server
-func SendToDevice(
- ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string,
- message interface{},
-) error {
- js, err := json.Marshal(message)
- if err != nil {
- return err
- }
- requestData := InputSendToDeviceEvent{
- UserID: userID,
- DeviceID: deviceID,
- SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
- Sender: sender,
- Type: eventType,
- Content: js,
- },
- }
- request := InputSendToDeviceEventRequest{
- InputSendToDeviceEvent: requestData,
- }
- response := InputSendToDeviceEventResponse{}
- return eduAPI.InputSendToDeviceEvent(ctx, &request, &response)
-}
-
-// SendReceipt sends a receipt event to EDU Server
-func SendReceipt(
- ctx context.Context,
- eduAPI EDUServerInputAPI, userID, roomID, eventID, receiptType string,
- timestamp gomatrixserverlib.Timestamp,
-) error {
- request := InputReceiptEventRequest{
- InputReceiptEvent: InputReceiptEvent{
- UserID: userID,
- RoomID: roomID,
- EventID: eventID,
- Type: receiptType,
- Timestamp: timestamp,
- },
- }
- response := InputReceiptEventResponse{}
- return eduAPI.InputReceiptEvent(ctx, &request, &response)
-}
diff --git a/eduserver/eduserver.go b/eduserver/eduserver.go
deleted file mode 100644
index 91208a40..00000000
--- a/eduserver/eduserver.go
+++ /dev/null
@@ -1,56 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-// Copyright 2017-2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package eduserver
-
-import (
- "github.com/gorilla/mux"
- "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/eduserver/cache"
- "github.com/matrix-org/dendrite/eduserver/input"
- "github.com/matrix-org/dendrite/eduserver/inthttp"
- "github.com/matrix-org/dendrite/setup/base"
- "github.com/matrix-org/dendrite/setup/jetstream"
- userapi "github.com/matrix-org/dendrite/userapi/api"
-)
-
-// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
-// on the given input API.
-func AddInternalRoutes(internalMux *mux.Router, inputAPI api.EDUServerInputAPI) {
- inthttp.AddRoutes(inputAPI, internalMux)
-}
-
-// NewInternalAPI returns a concerete implementation of the internal API. Callers
-// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
-func NewInternalAPI(
- base *base.BaseDendrite,
- eduCache *cache.EDUCache,
- userAPI userapi.UserInternalAPI,
-) api.EDUServerInputAPI {
- cfg := &base.Cfg.EDUServer
-
- js, _ := jetstream.Prepare(base.ProcessContext, &cfg.Matrix.JetStream)
-
- return &input.EDUServerInputAPI{
- Cache: eduCache,
- UserAPI: userAPI,
- JetStream: js,
- OutputTypingEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
- OutputSendToDeviceEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
- OutputReceiptEventTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
- ServerName: cfg.Matrix.ServerName,
- }
-}
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
deleted file mode 100644
index e58f0dd3..00000000
--- a/eduserver/input/input.go
+++ /dev/null
@@ -1,198 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-// Copyright 2017-2018 New Vector Ltd
-// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package input
-
-import (
- "context"
- "encoding/json"
- "time"
-
- "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/eduserver/cache"
- userapi "github.com/matrix-org/dendrite/userapi/api"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/nats-io/nats.go"
- "github.com/sirupsen/logrus"
-)
-
-// EDUServerInputAPI implements api.EDUServerInputAPI
-type EDUServerInputAPI struct {
- // Cache to store the current typing members in each room.
- Cache *cache.EDUCache
- // The kafka topic to output new typing events to.
- OutputTypingEventTopic string
- // The kafka topic to output new send to device events to.
- OutputSendToDeviceEventTopic string
- // The kafka topic to output new receipt events to
- OutputReceiptEventTopic string
- // kafka producer
- JetStream nats.JetStreamContext
- // Internal user query API
- UserAPI userapi.UserInternalAPI
- // our server name
- ServerName gomatrixserverlib.ServerName
-}
-
-// InputTypingEvent implements api.EDUServerInputAPI
-func (t *EDUServerInputAPI) InputTypingEvent(
- ctx context.Context,
- request *api.InputTypingEventRequest,
- response *api.InputTypingEventResponse,
-) error {
- ite := &request.InputTypingEvent
- if ite.Typing {
- // user is typing, update our current state of users typing.
- expireTime := ite.OriginServerTS.Time().Add(
- time.Duration(ite.TimeoutMS) * time.Millisecond,
- )
- t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
- } else {
- t.Cache.RemoveUser(ite.UserID, ite.RoomID)
- }
-
- return t.sendTypingEvent(ite)
-}
-
-// InputTypingEvent implements api.EDUServerInputAPI
-func (t *EDUServerInputAPI) InputSendToDeviceEvent(
- ctx context.Context,
- request *api.InputSendToDeviceEventRequest,
- response *api.InputSendToDeviceEventResponse,
-) error {
- ise := &request.InputSendToDeviceEvent
- return t.sendToDeviceEvent(ise)
-}
-
-func (t *EDUServerInputAPI) sendTypingEvent(ite *api.InputTypingEvent) error {
- ev := &api.TypingEvent{
- Type: gomatrixserverlib.MTyping,
- RoomID: ite.RoomID,
- UserID: ite.UserID,
- Typing: ite.Typing,
- }
- ote := &api.OutputTypingEvent{
- Event: *ev,
- }
-
- if ev.Typing {
- expireTime := ite.OriginServerTS.Time().Add(
- time.Duration(ite.TimeoutMS) * time.Millisecond,
- )
- ote.ExpireTime = &expireTime
- }
-
- eventJSON, err := json.Marshal(ote)
- if err != nil {
- return err
- }
- logrus.WithFields(logrus.Fields{
- "room_id": ite.RoomID,
- "user_id": ite.UserID,
- "typing": ite.Typing,
- }).Tracef("Producing to topic '%s'", t.OutputTypingEventTopic)
-
- _, err = t.JetStream.PublishMsg(&nats.Msg{
- Subject: t.OutputTypingEventTopic,
- Header: nats.Header{},
- Data: eventJSON,
- })
- return err
-}
-
-func (t *EDUServerInputAPI) sendToDeviceEvent(ise *api.InputSendToDeviceEvent) error {
- devices := []string{}
- _, domain, err := gomatrixserverlib.SplitID('@', ise.UserID)
- if err != nil {
- return err
- }
-
- // If the event is targeted locally then we want to expand the wildcard
- // out into individual device IDs so that we can send them to each respective
- // device. If the event isn't targeted locally then we can't expand the
- // wildcard as we don't know about the remote devices, so instead we leave it
- // as-is, so that the federation sender can send it on with the wildcard intact.
- if domain == t.ServerName && ise.DeviceID == "*" {
- var res userapi.QueryDevicesResponse
- err = t.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{
- UserID: ise.UserID,
- }, &res)
- if err != nil {
- return err
- }
- for _, dev := range res.Devices {
- devices = append(devices, dev.ID)
- }
- } else {
- devices = append(devices, ise.DeviceID)
- }
-
- logrus.WithFields(logrus.Fields{
- "user_id": ise.UserID,
- "num_devices": len(devices),
- "type": ise.Type,
- }).Tracef("Producing to topic '%s'", t.OutputSendToDeviceEventTopic)
- for _, device := range devices {
- ote := &api.OutputSendToDeviceEvent{
- UserID: ise.UserID,
- DeviceID: device,
- SendToDeviceEvent: ise.SendToDeviceEvent,
- }
-
- eventJSON, err := json.Marshal(ote)
- if err != nil {
- logrus.WithError(err).Error("sendToDevice failed json.Marshal")
- return err
- }
-
- if _, err = t.JetStream.PublishMsg(&nats.Msg{
- Subject: t.OutputSendToDeviceEventTopic,
- Data: eventJSON,
- }); err != nil {
- logrus.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
- return err
- }
- }
-
- return nil
-}
-
-// InputReceiptEvent implements api.EDUServerInputAPI
-// TODO: Intelligently batch requests sent by the same user (e.g wait a few milliseconds before emitting output events)
-func (t *EDUServerInputAPI) InputReceiptEvent(
- ctx context.Context,
- request *api.InputReceiptEventRequest,
- response *api.InputReceiptEventResponse,
-) error {
- logrus.WithFields(logrus.Fields{}).Tracef("Producing to topic '%s'", t.OutputReceiptEventTopic)
- output := &api.OutputReceiptEvent{
- UserID: request.InputReceiptEvent.UserID,
- RoomID: request.InputReceiptEvent.RoomID,
- EventID: request.InputReceiptEvent.EventID,
- Type: request.InputReceiptEvent.Type,
- Timestamp: request.InputReceiptEvent.Timestamp,
- }
- js, err := json.Marshal(output)
- if err != nil {
- return err
- }
-
- _, err = t.JetStream.PublishMsg(&nats.Msg{
- Subject: t.OutputReceiptEventTopic,
- Data: js,
- })
- return err
-}
diff --git a/eduserver/inthttp/client.go b/eduserver/inthttp/client.go
deleted file mode 100644
index 0690ed82..00000000
--- a/eduserver/inthttp/client.go
+++ /dev/null
@@ -1,70 +0,0 @@
-package inthttp
-
-import (
- "context"
- "errors"
- "net/http"
-
- "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/internal/httputil"
- "github.com/opentracing/opentracing-go"
-)
-
-// HTTP paths for the internal HTTP APIs
-const (
- EDUServerInputTypingEventPath = "/eduserver/input"
- EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
- EDUServerInputReceiptEventPath = "/eduserver/receipt"
-)
-
-// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
-func NewEDUServerClient(eduServerURL string, httpClient *http.Client) (api.EDUServerInputAPI, error) {
- if httpClient == nil {
- return nil, errors.New("NewEDUServerClient: httpClient is <nil>")
- }
- return &httpEDUServerInputAPI{eduServerURL, httpClient}, nil
-}
-
-type httpEDUServerInputAPI struct {
- eduServerURL string
- httpClient *http.Client
-}
-
-// InputTypingEvent implements EDUServerInputAPI
-func (h *httpEDUServerInputAPI) InputTypingEvent(
- ctx context.Context,
- request *api.InputTypingEventRequest,
- response *api.InputTypingEventResponse,
-) error {
- span, ctx := opentracing.StartSpanFromContext(ctx, "InputTypingEvent")
- defer span.Finish()
-
- apiURL := h.eduServerURL + EDUServerInputTypingEventPath
- return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
-}
-
-// InputSendToDeviceEvent implements EDUServerInputAPI
-func (h *httpEDUServerInputAPI) InputSendToDeviceEvent(
- ctx context.Context,
- request *api.InputSendToDeviceEventRequest,
- response *api.InputSendToDeviceEventResponse,
-) error {
- span, ctx := opentracing.StartSpanFromContext(ctx, "InputSendToDeviceEvent")
- defer span.Finish()
-
- apiURL := h.eduServerURL + EDUServerInputSendToDeviceEventPath
- return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
-}
-
-// InputSendToDeviceEvent implements EDUServerInputAPI
-func (h *httpEDUServerInputAPI) InputReceiptEvent(
- ctx context.Context,
- request *api.InputReceiptEventRequest,
- response *api.InputReceiptEventResponse,
-) error {
- span, ctx := opentracing.StartSpanFromContext(ctx, "InputReceiptEventPath")
- defer span.Finish()
-
- apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
- return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
-}
diff --git a/eduserver/inthttp/server.go b/eduserver/inthttp/server.go
deleted file mode 100644
index a3494375..00000000
--- a/eduserver/inthttp/server.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package inthttp
-
-import (
- "encoding/json"
- "net/http"
-
- "github.com/gorilla/mux"
- "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/internal/httputil"
- "github.com/matrix-org/util"
-)
-
-// AddRoutes adds the EDUServerInputAPI handlers to the http.ServeMux.
-func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
- internalAPIMux.Handle(EDUServerInputTypingEventPath,
- httputil.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse {
- var request api.InputTypingEventRequest
- var response api.InputTypingEventResponse
- if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
- return util.MessageResponse(http.StatusBadRequest, err.Error())
- }
- if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil {
- return util.ErrorResponse(err)
- }
- return util.JSONResponse{Code: http.StatusOK, JSON: &response}
- }),
- )
- internalAPIMux.Handle(EDUServerInputSendToDeviceEventPath,
- httputil.MakeInternalAPI("inputSendToDeviceEvents", func(req *http.Request) util.JSONResponse {
- var request api.InputSendToDeviceEventRequest
- var response api.InputSendToDeviceEventResponse
- if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
- return util.MessageResponse(http.StatusBadRequest, err.Error())
- }
- if err := t.InputSendToDeviceEvent(req.Context(), &request, &response); err != nil {
- return util.ErrorResponse(err)
- }
- return util.JSONResponse{Code: http.StatusOK, JSON: &response}
- }),
- )
- internalAPIMux.Handle(EDUServerInputReceiptEventPath,
- httputil.MakeInternalAPI("inputReceiptEvent", func(req *http.Request) util.JSONResponse {
- var request api.InputReceiptEventRequest
- var response api.InputReceiptEventResponse
- if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
- return util.MessageResponse(http.StatusBadRequest, err.Error())
- }
- if err := t.InputReceiptEvent(req.Context(), &request, &response); err != nil {
- return util.ErrorResponse(err)
- }
- return util.JSONResponse{Code: http.StatusOK, JSON: &response}
- }),
- )
-}
diff --git a/federationapi/consumers/eduserver.go b/federationapi/consumers/eduserver.go
deleted file mode 100644
index e14e60f4..00000000
--- a/federationapi/consumers/eduserver.go
+++ /dev/null
@@ -1,257 +0,0 @@
-// Copyright 2020 The Matrix.org Foundation C.I.C.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package consumers
-
-import (
- "context"
- "encoding/json"
-
- "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/federationapi/queue"
- "github.com/matrix-org/dendrite/federationapi/storage"
- "github.com/matrix-org/dendrite/setup/config"
- "github.com/matrix-org/dendrite/setup/jetstream"
- "github.com/matrix-org/dendrite/setup/process"
- "github.com/matrix-org/gomatrixserverlib"
- "github.com/matrix-org/util"
- "github.com/nats-io/nats.go"
- log "github.com/sirupsen/logrus"
-)
-
-// OutputEDUConsumer consumes events that originate in EDU server.
-type OutputEDUConsumer struct {
- ctx context.Context
- jetstream nats.JetStreamContext
- durable string
- db storage.Database
- queues *queue.OutgoingQueues
- ServerName gomatrixserverlib.ServerName
- typingTopic string
- sendToDeviceTopic string
- receiptTopic string
-}
-
-// NewOutputEDUConsumer creates a new OutputEDUConsumer. Call Start() to begin consuming from EDU servers.
-func NewOutputEDUConsumer(
- process *process.ProcessContext,
- cfg *config.FederationAPI,
- js nats.JetStreamContext,
- queues *queue.OutgoingQueues,
- store storage.Database,
-) *OutputEDUConsumer {
- return &OutputEDUConsumer{
- ctx: process.Context(),
- jetstream: js,
- queues: queues,
- db: store,
- ServerName: cfg.Matrix.ServerName,
- durable: cfg.Matrix.JetStream.Durable("FederationAPIEDUServerConsumer"),
- typingTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
- sendToDeviceTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
- receiptTopic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
- }
-}
-
-// Start consuming from EDU servers
-func (t *OutputEDUConsumer) Start() error {
- if err := jetstream.JetStreamConsumer(
- t.ctx, t.jetstream, t.typingTopic, t.durable, t.onTypingEvent,
- nats.DeliverAll(), nats.ManualAck(),
- ); err != nil {
- return err
- }
- if err := jetstream.JetStreamConsumer(
- t.ctx, t.jetstream, t.sendToDeviceTopic, t.durable, t.onSendToDeviceEvent,
- nats.DeliverAll(), nats.ManualAck(),
- ); err != nil {
- return err
- }
- if err := jetstream.JetStreamConsumer(
- t.ctx, t.jetstream, t.receiptTopic, t.durable, t.onReceiptEvent,
- nats.DeliverAll(), nats.ManualAck(),
- ); err != nil {
- return err
- }
- return nil
-}
-
-// onSendToDeviceEvent is called in response to a message received on the
-// send-to-device events topic from the EDU server.
-func (t *OutputEDUConsumer) onSendToDeviceEvent(ctx context.Context, msg *nats.Msg) bool {
- // Extract the send-to-device event from msg.
- var ote api.OutputSendToDeviceEvent
- if err := json.Unmarshal(msg.Data, &ote); err != nil {
- log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)")
- return true
- }
-
- // only send send-to-device events which originated from us
- _, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender)
- if err != nil {
- log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender")
- return true
- }
- if originServerName != t.ServerName {
- log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
- return true
- }
-
- _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
- if err != nil {
- log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
- return true
- }
-
- // Pack the EDU and marshal it
- edu := &gomatrixserverlib.EDU{
- Type: gomatrixserverlib.MDirectToDevice,
- Origin: string(t.ServerName),
- }
- tdm := gomatrixserverlib.ToDeviceMessage{
- Sender: ote.Sender,
- Type: ote.Type,
- MessageID: util.RandomString(32),
- Messages: map[string]map[string]json.RawMessage{
- ote.UserID: {
- ote.DeviceID: ote.Content,
- },
- },
- }
- if edu.Content, err = json.Marshal(tdm); err != nil {
- log.WithError(err).Error("failed to marshal EDU JSON")
- return true
- }
-
- log.Debugf("Sending send-to-device message into %q destination queue", destServerName)
- if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
- log.WithError(err).Error("failed to send EDU")
- return false
- }
-
- return true
-}
-
-// onTypingEvent is called in response to a message received on the typing
-// events topic from the EDU server.
-func (t *OutputEDUConsumer) onTypingEvent(ctx context.Context, msg *nats.Msg) bool {
- // Extract the typing event from msg.
- var ote api.OutputTypingEvent
- if err := json.Unmarshal(msg.Data, &ote); err != nil {
- // Skip this msg but continue processing messages.
- log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)")
- _ = msg.Ack()
- return true
- }
-
- // only send typing events which originated from us
- _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
- if err != nil {
- log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
- _ = msg.Ack()
- return true
- }
- if typingServerName != t.ServerName {
- return true
- }
-
- joined, err := t.db.GetJoinedHosts(ctx, ote.Event.RoomID)
- if err != nil {
- log.WithError(err).WithField("room_id", ote.Event.RoomID).Error("failed to get joined hosts for room")
- return false
- }
-
- names := make([]gomatrixserverlib.ServerName, len(joined))
- for i := range joined {
- names[i] = joined[i].ServerName
- }
-
- edu := &gomatrixserverlib.EDU{Type: ote.Event.Type}
- if edu.Content, err = json.Marshal(map[string]interface{}{
- "room_id": ote.Event.RoomID,
- "user_id": ote.Event.UserID,
- "typing": ote.Event.Typing,
- }); err != nil {
- log.WithError(err).Error("failed to marshal EDU JSON")
- return true
- }
-
- if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
- log.WithError(err).Error("failed to send EDU")
- return false
- }
-
- return true
-}
-
-// onReceiptEvent is called in response to a message received on the receipt
-// events topic from the EDU server.
-func (t *OutputEDUConsumer) onReceiptEvent(ctx context.Context, msg *nats.Msg) bool {
- // Extract the typing event from msg.
- var receipt api.OutputReceiptEvent
- if err := json.Unmarshal(msg.Data, &receipt); err != nil {
- // Skip this msg but continue processing messages.
- log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)")
- return true
- }
-
- // only send receipt events which originated from us
- _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
- if err != nil {
- log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
- return true
- }
- if receiptServerName != t.ServerName {
- return true
- }
-
- joined, err := t.db.GetJoinedHosts(ctx, receipt.RoomID)
- if err != nil {
- log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room")
- return false
- }
-
- names := make([]gomatrixserverlib.ServerName, len(joined))
- for i := range joined {
- names[i] = joined[i].ServerName
- }
-
- content := map[string]api.FederationReceiptMRead{}
- content[receipt.RoomID] = api.FederationReceiptMRead{
- User: map[string]api.FederationReceiptData{
- receipt.UserID: {
- Data: api.ReceiptTS{
- TS: receipt.Timestamp,
- },
- EventIDs: []string{receipt.EventID},
- },
- },
- }
-
- edu := &gomatrixserverlib.EDU{
- Type: gomatrixserverlib.MReceipt,
- Origin: string(t.ServerName),
- }
- if edu.Content, err = json.Marshal(content); err != nil {
- log.WithError(err).Error("failed to marshal EDU JSON")
- return true
- }
-
- if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
- log.WithError(err).Error("failed to send EDU")
- return false
- }
-
- return true
-}
diff --git a/federationapi/consumers/keychange.go b/federationapi/consumers/keychange.go
index 94e45435..0ece18e9 100644
--- a/federationapi/consumers/keychange.go
+++ b/federationapi/consumers/keychange.go
@@ -18,9 +18,9 @@ import (
"context"
"encoding/json"
- eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/storage"
+ "github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
@@ -190,7 +190,7 @@ func (t *KeyChangeConsumer) onCrossSigningMessage(m api.DeviceMessage) bool {
// Pack the EDU and marshal it
edu := &gomatrixserverlib.EDU{
- Type: eduserverAPI.MSigningKeyUpdate,
+ Type: types.MSigningKeyUpdate,
Origin: string(t.serverName),
}
if edu.Content, err = json.Marshal(output); err != nil {
diff --git a/federationapi/consumers/receipts.go b/federationapi/consumers/receipts.go
new file mode 100644
index 00000000..9300451e
--- /dev/null
+++ b/federationapi/consumers/receipts.go
@@ -0,0 +1,141 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+ "strconv"
+
+ "github.com/getsentry/sentry-go"
+ "github.com/matrix-org/dendrite/federationapi/queue"
+ "github.com/matrix-org/dendrite/federationapi/storage"
+ fedTypes "github.com/matrix-org/dendrite/federationapi/types"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ syncTypes "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// OutputReceiptConsumer consumes events that originate in the clientapi.
+type OutputReceiptConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ ServerName gomatrixserverlib.ServerName
+ topic string
+}
+
+// NewOutputReceiptConsumer creates a new OutputReceiptConsumer. Call Start() to begin consuming typing events.
+func NewOutputReceiptConsumer(
+ process *process.ProcessContext,
+ cfg *config.FederationAPI,
+ js nats.JetStreamContext,
+ queues *queue.OutgoingQueues,
+ store storage.Database,
+) *OutputReceiptConsumer {
+ return &OutputReceiptConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ queues: queues,
+ db: store,
+ ServerName: cfg.Matrix.ServerName,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPIReceiptConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ }
+}
+
+// Start consuming from the clientapi
+func (t *OutputReceiptConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
+ )
+}
+
+// onMessage is called in response to a message received on the receipt
+// events topic from the client api.
+func (t *OutputReceiptConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ receipt := syncTypes.OutputReceiptEvent{
+ UserID: msg.Header.Get(jetstream.UserID),
+ RoomID: msg.Header.Get(jetstream.RoomID),
+ EventID: msg.Header.Get(jetstream.EventID),
+ Type: msg.Header.Get("type"),
+ }
+
+ // only send receipt events which originated from us
+ _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender")
+ return true
+ }
+ if receiptServerName != t.ServerName {
+ return true
+ }
+
+ timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
+ if err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("EDU output log: message parse failure")
+ sentry.CaptureException(err)
+ return true
+ }
+
+ receipt.Timestamp = gomatrixserverlib.Timestamp(timestamp)
+
+ joined, err := t.db.GetJoinedHosts(ctx, receipt.RoomID)
+ if err != nil {
+ log.WithError(err).WithField("room_id", receipt.RoomID).Error("failed to get joined hosts for room")
+ return false
+ }
+
+ names := make([]gomatrixserverlib.ServerName, len(joined))
+ for i := range joined {
+ names[i] = joined[i].ServerName
+ }
+
+ content := map[string]fedTypes.FederationReceiptMRead{}
+ content[receipt.RoomID] = fedTypes.FederationReceiptMRead{
+ User: map[string]fedTypes.FederationReceiptData{
+ receipt.UserID: {
+ Data: fedTypes.ReceiptTS{
+ TS: receipt.Timestamp,
+ },
+ EventIDs: []string{receipt.EventID},
+ },
+ },
+ }
+
+ edu := &gomatrixserverlib.EDU{
+ Type: gomatrixserverlib.MReceipt,
+ Origin: string(t.ServerName),
+ }
+ if edu.Content, err = json.Marshal(content); err != nil {
+ log.WithError(err).Error("failed to marshal EDU JSON")
+ return true
+ }
+
+ if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
+ log.WithError(err).Error("failed to send EDU")
+ return false
+ }
+
+ return true
+}
diff --git a/federationapi/consumers/sendtodevice.go b/federationapi/consumers/sendtodevice.go
new file mode 100644
index 00000000..84c9f620
--- /dev/null
+++ b/federationapi/consumers/sendtodevice.go
@@ -0,0 +1,125 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+
+ "github.com/matrix-org/dendrite/federationapi/queue"
+ "github.com/matrix-org/dendrite/federationapi/storage"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ syncTypes "github.com/matrix-org/dendrite/syncapi/types"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/matrix-org/util"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// OutputSendToDeviceConsumer consumes events that originate in the clientapi.
+type OutputSendToDeviceConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ ServerName gomatrixserverlib.ServerName
+ topic string
+}
+
+// NewOutputSendToDeviceConsumer creates a new OutputSendToDeviceConsumer. Call Start() to begin consuming send-to-device events.
+func NewOutputSendToDeviceConsumer(
+ process *process.ProcessContext,
+ cfg *config.FederationAPI,
+ js nats.JetStreamContext,
+ queues *queue.OutgoingQueues,
+ store storage.Database,
+) *OutputSendToDeviceConsumer {
+ return &OutputSendToDeviceConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ queues: queues,
+ db: store,
+ ServerName: cfg.Matrix.ServerName,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPIESendToDeviceConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ }
+}
+
+// Start consuming from the client api
+func (t *OutputSendToDeviceConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ nats.DeliverAll(), nats.ManualAck(),
+ )
+}
+
+// onMessage is called in response to a message received on the
+// send-to-device events topic from the client api.
+func (t *OutputSendToDeviceConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ // only send send-to-device events which originated from us
+ sender := msg.Header.Get("sender")
+ _, originServerName, err := gomatrixserverlib.SplitID('@', sender)
+ if err != nil {
+ log.WithError(err).WithField("user_id", sender).Error("Failed to extract domain from send-to-device sender")
+ return true
+ }
+ if originServerName != t.ServerName {
+ log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere")
+ return true
+ }
+ // Extract the send-to-device event from msg.
+ var ote syncTypes.OutputSendToDeviceEvent
+ if err = json.Unmarshal(msg.Data, &ote); err != nil {
+ log.WithError(err).Errorf("output log: message parse failed (expected send-to-device)")
+ return true
+ }
+
+ _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination")
+ return true
+ }
+
+ // Pack the EDU and marshal it
+ edu := &gomatrixserverlib.EDU{
+ Type: gomatrixserverlib.MDirectToDevice,
+ Origin: string(t.ServerName),
+ }
+ tdm := gomatrixserverlib.ToDeviceMessage{
+ Sender: ote.Sender,
+ Type: ote.Type,
+ MessageID: util.RandomString(32),
+ Messages: map[string]map[string]json.RawMessage{
+ ote.UserID: {
+ ote.DeviceID: ote.Content,
+ },
+ },
+ }
+ if edu.Content, err = json.Marshal(tdm); err != nil {
+ log.WithError(err).Error("failed to marshal EDU JSON")
+ return true
+ }
+
+ log.Debugf("Sending send-to-device message into %q destination queue", destServerName)
+ if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil {
+ log.WithError(err).Error("failed to send EDU")
+ return false
+ }
+
+ return true
+}
diff --git a/federationapi/consumers/typing.go b/federationapi/consumers/typing.go
new file mode 100644
index 00000000..428e1a86
--- /dev/null
+++ b/federationapi/consumers/typing.go
@@ -0,0 +1,119 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumers
+
+import (
+ "context"
+ "encoding/json"
+ "strconv"
+
+ "github.com/matrix-org/dendrite/federationapi/queue"
+ "github.com/matrix-org/dendrite/federationapi/storage"
+ "github.com/matrix-org/dendrite/setup/config"
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// OutputTypingConsumer consumes events that originate in the clientapi.
+type OutputTypingConsumer struct {
+ ctx context.Context
+ jetstream nats.JetStreamContext
+ durable string
+ db storage.Database
+ queues *queue.OutgoingQueues
+ ServerName gomatrixserverlib.ServerName
+ topic string
+}
+
+// NewOutputTypingConsumer creates a new OutputTypingConsumer. Call Start() to begin consuming typing events.
+func NewOutputTypingConsumer(
+ process *process.ProcessContext,
+ cfg *config.FederationAPI,
+ js nats.JetStreamContext,
+ queues *queue.OutgoingQueues,
+ store storage.Database,
+) *OutputTypingConsumer {
+ return &OutputTypingConsumer{
+ ctx: process.Context(),
+ jetstream: js,
+ queues: queues,
+ db: store,
+ ServerName: cfg.Matrix.ServerName,
+ durable: cfg.Matrix.JetStream.Durable("FederationAPITypingConsumer"),
+ topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ }
+}
+
+// Start consuming from the clientapi
+func (t *OutputTypingConsumer) Start() error {
+ return jetstream.JetStreamConsumer(
+ t.ctx, t.jetstream, t.topic, t.durable, t.onMessage,
+ nats.DeliverAll(), nats.ManualAck(), nats.HeadersOnly(),
+ )
+}
+
+// onMessage is called in response to a message received on the typing
+// events topic from the client api.
+func (t *OutputTypingConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
+ // Extract the typing event from msg.
+ roomID := msg.Header.Get(jetstream.RoomID)
+ userID := msg.Header.Get(jetstream.UserID)
+ typing, err := strconv.ParseBool(msg.Header.Get("typing"))
+ if err != nil {
+ log.WithError(err).Errorf("EDU output log: typing parse failure")
+ return true
+ }
+
+ // only send typing events which originated from us
+ _, typingServerName, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", userID).Error("Failed to extract domain from typing sender")
+ _ = msg.Ack()
+ return true
+ }
+ if typingServerName != t.ServerName {
+ return true
+ }
+
+ joined, err := t.db.GetJoinedHosts(ctx, roomID)
+ if err != nil {
+ log.WithError(err).WithField("room_id", roomID).Error("failed to get joined hosts for room")
+ return false
+ }
+
+ names := make([]gomatrixserverlib.ServerName, len(joined))
+ for i := range joined {
+ names[i] = joined[i].ServerName
+ }
+
+ edu := &gomatrixserverlib.EDU{Type: "m.typing"}
+ if edu.Content, err = json.Marshal(map[string]interface{}{
+ "room_id": roomID,
+ "user_id": userID,
+ "typing": typing,
+ }); err != nil {
+ log.WithError(err).Error("failed to marshal EDU JSON")
+ return true
+ }
+ if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil {
+ log.WithError(err).Error("failed to send EDU")
+ return false
+ }
+
+ return true
+}
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index b7f93ecb..8a0ce8e3 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -16,12 +16,12 @@ package federationapi
import (
"github.com/gorilla/mux"
- eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/federationapi/consumers"
"github.com/matrix-org/dendrite/federationapi/internal"
"github.com/matrix-org/dendrite/federationapi/inthttp"
+ "github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/federationapi/queue"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
@@ -31,6 +31,7 @@ import (
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/setup/process"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/sirupsen/logrus"
@@ -46,6 +47,7 @@ func AddInternalRoutes(router *mux.Router, intAPI api.FederationInternalAPI) {
// AddPublicRoutes sets up and registers HTTP handlers on the base API muxes for the FederationAPI component.
func AddPublicRoutes(
+ process *process.ProcessContext,
fedRouter, keyRouter, wellKnownRouter *mux.Router,
cfg *config.FederationAPI,
userAPI userapi.UserInternalAPI,
@@ -53,16 +55,26 @@ func AddPublicRoutes(
keyRing gomatrixserverlib.JSONVerifier,
rsAPI roomserverAPI.RoomserverInternalAPI,
federationAPI federationAPI.FederationInternalAPI,
- eduAPI eduserverAPI.EDUServerInputAPI,
keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
) {
+
+ js, _ := jetstream.Prepare(process, &cfg.Matrix.JetStream)
+ producer := &producers.SyncAPIProducer{
+ JetStream: js,
+ TopicReceiptEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
+ TopicSendToDeviceEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
+ TopicTypingEvent: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
+ ServerName: cfg.Matrix.ServerName,
+ UserAPI: userAPI,
+ }
+
routing.Setup(
fedRouter, keyRouter, wellKnownRouter, cfg, rsAPI,
- eduAPI, federationAPI, keyRing,
+ federationAPI, keyRing,
federation, userAPI, keyAPI, mscCfg,
- servers,
+ servers, producer,
)
}
@@ -112,17 +124,28 @@ func NewInternalAPI(
if err = rsConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start room server consumer")
}
-
- tsConsumer := consumers.NewOutputEDUConsumer(
+ tsConsumer := consumers.NewOutputSendToDeviceConsumer(
+ base.ProcessContext, cfg, js, queues, federationDB,
+ )
+ if err = tsConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start send-to-device consumer")
+ }
+ receiptConsumer := consumers.NewOutputReceiptConsumer(
+ base.ProcessContext, cfg, js, queues, federationDB,
+ )
+ if err = receiptConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start receipt consumer")
+ }
+ typingConsumer := consumers.NewOutputTypingConsumer(
base.ProcessContext, cfg, js, queues, federationDB,
)
- if err := tsConsumer.Start(); err != nil {
- logrus.WithError(err).Panic("failed to start typing server consumer")
+ if err = typingConsumer.Start(); err != nil {
+ logrus.WithError(err).Panic("failed to start typing consumer")
}
keyConsumer := consumers.NewKeyChangeConsumer(
base.ProcessContext, &base.Cfg.KeyServer, js, queues, federationDB, rsAPI,
)
- if err := keyConsumer.Start(); err != nil {
+ if err = keyConsumer.Start(); err != nil {
logrus.WithError(err).Panic("failed to start key server consumer")
}
diff --git a/federationapi/federationapi_test.go b/federationapi/federationapi_test.go
index c660f12e..833359c1 100644
--- a/federationapi/federationapi_test.go
+++ b/federationapi/federationapi_test.go
@@ -30,7 +30,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationAPIHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
- federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, &cfg.MSCs, nil)
+ federationapi.AddPublicRoutes(base.ProcessContext, base.PublicFederationAPIMux, base.PublicKeyAPIMux, base.PublicWellKnownAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, &cfg.MSCs, nil)
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))
diff --git a/federationapi/producers/syncapi.go b/federationapi/producers/syncapi.go
new file mode 100644
index 00000000..24acb126
--- /dev/null
+++ b/federationapi/producers/syncapi.go
@@ -0,0 +1,144 @@
+// Copyright 2022 The Matrix.org Foundation C.I.C.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package producers
+
+import (
+ "context"
+ "encoding/json"
+ "strconv"
+
+ "github.com/matrix-org/dendrite/setup/jetstream"
+ "github.com/matrix-org/dendrite/syncapi/types"
+ userapi "github.com/matrix-org/dendrite/userapi/api"
+ "github.com/matrix-org/gomatrixserverlib"
+ "github.com/nats-io/nats.go"
+ log "github.com/sirupsen/logrus"
+)
+
+// SyncAPIProducer produces events for the sync API server to consume
+type SyncAPIProducer struct {
+ TopicReceiptEvent string
+ TopicSendToDeviceEvent string
+ TopicTypingEvent string
+ JetStream nats.JetStreamContext
+ ServerName gomatrixserverlib.ServerName
+ UserAPI userapi.UserInternalAPI
+}
+
+func (p *SyncAPIProducer) SendReceipt(
+ ctx context.Context,
+ userID, roomID, eventID, receiptType string, timestamp gomatrixserverlib.Timestamp,
+) error {
+ m := &nats.Msg{
+ Subject: p.TopicReceiptEvent,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set(jetstream.RoomID, roomID)
+ m.Header.Set(jetstream.EventID, eventID)
+ m.Header.Set("type", receiptType)
+ m.Header.Set("timestamp", strconv.Itoa(int(timestamp)))
+
+ log.WithFields(log.Fields{}).Tracef("Producing to topic '%s'", p.TopicReceiptEvent)
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}
+
+func (p *SyncAPIProducer) SendToDevice(
+ ctx context.Context, sender, userID, deviceID, eventType string,
+ message interface{},
+) error {
+ devices := []string{}
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
+ if err != nil {
+ return err
+ }
+
+ // If the event is targeted locally then we want to expand the wildcard
+ // out into individual device IDs so that we can send them to each respective
+ // device. If the event isn't targeted locally then we can't expand the
+ // wildcard as we don't know about the remote devices, so instead we leave it
+ // as-is, so that the federation sender can send it on with the wildcard intact.
+ if domain == p.ServerName && deviceID == "*" {
+ var res userapi.QueryDevicesResponse
+ err = p.UserAPI.QueryDevices(context.TODO(), &userapi.QueryDevicesRequest{
+ UserID: userID,
+ }, &res)
+ if err != nil {
+ return err
+ }
+ for _, dev := range res.Devices {
+ devices = append(devices, dev.ID)
+ }
+ } else {
+ devices = append(devices, deviceID)
+ }
+
+ js, err := json.Marshal(message)
+ if err != nil {
+ return err
+ }
+
+ log.WithFields(log.Fields{
+ "user_id": userID,
+ "num_devices": len(devices),
+ "type": eventType,
+ }).Tracef("Producing to topic '%s'", p.TopicSendToDeviceEvent)
+ for _, device := range devices {
+ ote := &types.OutputSendToDeviceEvent{
+ UserID: userID,
+ DeviceID: device,
+ SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
+ Sender: sender,
+ Type: eventType,
+ Content: js,
+ },
+ }
+
+ eventJSON, err := json.Marshal(ote)
+ if err != nil {
+ log.WithError(err).Error("sendToDevice failed json.Marshal")
+ return err
+ }
+ m := &nats.Msg{
+ Subject: p.TopicSendToDeviceEvent,
+ Data: eventJSON,
+ Header: nats.Header{},
+ }
+ m.Header.Set("sender", sender)
+ m.Header.Set(jetstream.UserID, userID)
+
+ if _, err = p.JetStream.PublishMsg(m, nats.Context(ctx)); err != nil {
+ log.WithError(err).Error("sendToDevice failed t.Producer.SendMessage")
+ return err
+ }
+ }
+ return nil
+}
+
+func (p *SyncAPIProducer) SendTyping(
+ ctx context.Context, userID, roomID string, typing bool, timeoutMS int64,
+) error {
+ m := &nats.Msg{
+ Subject: p.TopicTypingEvent,
+ Header: nats.Header{},
+ }
+ m.Header.Set(jetstream.UserID, userID)
+ m.Header.Set(jetstream.RoomID, roomID)
+ m.Header.Set("typing", strconv.FormatBool(typing))
+ m.Header.Set("timeout_ms", strconv.Itoa(int(timeoutMS)))
+ _, err := p.JetStream.PublishMsg(m, nats.Context(ctx))
+ return err
+}
diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go
index 04c88d95..9e5cdb28 100644
--- a/federationapi/routing/routing.go
+++ b/federationapi/routing/routing.go
@@ -19,8 +19,8 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
+ "github.com/matrix-org/dendrite/federationapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/httputil"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
@@ -44,7 +44,6 @@ func Setup(
fedMux, keyMux, wkMux *mux.Router,
cfg *config.FederationAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
- eduAPI eduserverAPI.EDUServerInputAPI,
fsAPI federationAPI.FederationInternalAPI,
keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient,
@@ -52,6 +51,7 @@ func Setup(
keyAPI keyserverAPI.KeyInternalAPI,
mscCfg *config.MSCs,
servers federationAPI.ServersInRoomProvider,
+ producer *producers.SyncAPIProducer,
) {
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
v1fedmux := fedMux.PathPrefix("/v1").Subrouter()
@@ -116,7 +116,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
- cfg, rsAPI, eduAPI, keyAPI, keys, federation, mu, servers,
+ cfg, rsAPI, keyAPI, keys, federation, mu, servers, producer,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 745e36de..eacc76db 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -23,8 +23,9 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
+ "github.com/matrix-org/dendrite/federationapi/producers"
+ "github.com/matrix-org/dendrite/federationapi/types"
"github.com/matrix-org/dendrite/internal"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -87,12 +88,12 @@ func Send(
txnID gomatrixserverlib.TransactionID,
cfg *config.FederationAPI,
rsAPI api.RoomserverInternalAPI,
- eduAPI eduserverAPI.EDUServerInputAPI,
keyAPI keyapi.KeyInternalAPI,
keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient,
mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider,
+ producer *producers.SyncAPIProducer,
) util.JSONResponse {
// First we should check if this origin has already submitted this
// txn ID to us. If they have and the txnIDs map contains an entry,
@@ -127,12 +128,12 @@ func Send(
t := txnReq{
rsAPI: rsAPI,
- eduAPI: eduAPI,
keys: keys,
federation: federation,
servers: servers,
keyAPI: keyAPI,
roomsMu: mu,
+ producer: producer,
}
var txnEvents struct {
@@ -185,12 +186,12 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
rsAPI api.RoomserverInternalAPI
- eduAPI eduserverAPI.EDUServerInputAPI
keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier
federation txnFederationClient
roomsMu *internal.MutexByRoom
servers federationAPI.ServersInRoomProvider
+ producer *producers.SyncAPIProducer
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@@ -329,8 +330,8 @@ func (t *txnReq) processEDUs(ctx context.Context) {
util.GetLogger(ctx).Debugf("Dropping typing event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
continue
}
- if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
- util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server")
+ if err := t.producer.SendTyping(ctx, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
+ util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to JetStream")
}
case gomatrixserverlib.MDirectToDevice:
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
@@ -342,12 +343,12 @@ func (t *txnReq) processEDUs(ctx context.Context) {
for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser {
// TODO: check that the user and the device actually exist here
- if err := eduserverAPI.SendToDevice(ctx, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
+ if err := t.producer.SendToDevice(ctx, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"sender": directPayload.Sender,
"user_id": userID,
"device_id": deviceID,
- }).Error("Failed to send send-to-device event to edu server")
+ }).Error("Failed to send send-to-device event to JetStream")
}
}
}
@@ -355,7 +356,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
t.processDeviceListUpdate(ctx, e)
case gomatrixserverlib.MReceipt:
// https://matrix.org/docs/spec/server_server/r0.1.4#receipts
- payload := map[string]eduserverAPI.FederationReceiptMRead{}
+ payload := map[string]types.FederationReceiptMRead{}
if err := json.Unmarshal(e.Content, &payload); err != nil {
util.GetLogger(ctx).WithError(err).Debug("Failed to unmarshal receipt event")
@@ -379,12 +380,12 @@ func (t *txnReq) processEDUs(ctx context.Context) {
"user_id": userID,
"room_id": roomID,
"events": mread.EventIDs,
- }).Error("Failed to send receipt event to edu server")
+ }).Error("Failed to send receipt event to JetStream")
continue
}
}
}
- case eduserverAPI.MSigningKeyUpdate:
+ case types.MSigningKeyUpdate:
if err := t.processSigningKeyUpdate(ctx, e); err != nil {
logrus.WithError(err).Errorf("Failed to process signing key update")
}
@@ -395,7 +396,7 @@ func (t *txnReq) processEDUs(ctx context.Context) {
}
func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverlib.EDU) error {
- var updatePayload eduserverAPI.CrossSigningKeyUpdate
+ var updatePayload keyapi.CrossSigningKeyUpdate
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"user_id": updatePayload.UserID,
@@ -422,7 +423,7 @@ func (t *txnReq) processSigningKeyUpdate(ctx context.Context, e gomatrixserverli
return nil
}
-// processReceiptEvent sends receipt events to the edu server
+// processReceiptEvent sends receipt events to JetStream
func (t *txnReq) processReceiptEvent(ctx context.Context,
userID, roomID, receiptType string,
timestamp gomatrixserverlib.Timestamp,
@@ -430,17 +431,7 @@ func (t *txnReq) processReceiptEvent(ctx context.Context,
) error {
// store every event
for _, eventID := range eventIDs {
- req := eduserverAPI.InputReceiptEventRequest{
- InputReceiptEvent: eduserverAPI.InputReceiptEvent{
- UserID: userID,
- RoomID: roomID,
- EventID: eventID,
- Type: receiptType,
- Timestamp: timestamp,
- },
- }
- resp := eduserverAPI.InputReceiptEventResponse{}
- if err := t.eduAPI.InputReceiptEvent(ctx, &req, &resp); err != nil {
+ if err := t.producer.SendReceipt(ctx, userID, roomID, eventID, receiptType, timestamp); err != nil {
return fmt.Errorf("unable to set receipt event: %w", err)
}
}
diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go
index 4280643e..8d2d8504 100644
--- a/federationapi/routing/send_test.go
+++ b/federationapi/routing/send_test.go
@@ -7,7 +7,6 @@ import (
"testing"
"time"
- eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/test"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -53,44 +52,6 @@ func init() {
}
}
-type testEDUProducer struct {
- // this producer keeps track of calls to InputTypingEvent
- invocations []eduAPI.InputTypingEventRequest
-}
-
-func (p *testEDUProducer) InputTypingEvent(
- ctx context.Context,
- request *eduAPI.InputTypingEventRequest,
- response *eduAPI.InputTypingEventResponse,
-) error {
- p.invocations = append(p.invocations, *request)
- return nil
-}
-
-func (p *testEDUProducer) InputSendToDeviceEvent(
- ctx context.Context,
- request *eduAPI.InputSendToDeviceEventRequest,
- response *eduAPI.InputSendToDeviceEventResponse,
-) error {
- return nil
-}
-
-func (o *testEDUProducer) InputReceiptEvent(
- ctx context.Context,
- request *eduAPI.InputReceiptEventRequest,
- response *eduAPI.InputReceiptEventResponse,
-) error {
- return nil
-}
-
-func (o *testEDUProducer) InputCrossSigningKeyUpdate(
- ctx context.Context,
- request *eduAPI.InputCrossSigningKeyUpdateRequest,
- response *eduAPI.InputCrossSigningKeyUpdateResponse,
-) error {
- return nil
-}
-
type testRoomserverAPI struct {
api.RoomserverInternalAPITrace
inputRoomEvents []api.InputRoomEvent
@@ -225,7 +186,6 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserver
func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq {
t := &txnReq{
rsAPI: rsAPI,
- eduAPI: &testEDUProducer{},
keys: &test.NopJSONVerifier{},
federation: fedClient,
roomsMu: internal.NewMutexByRoom(),
diff --git a/federationapi/types/types.go b/federationapi/types/types.go
index c486c05c..a28a80b2 100644
--- a/federationapi/types/types.go
+++ b/federationapi/types/types.go
@@ -18,6 +18,8 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
+const MSigningKeyUpdate = "m.signing_key_update" // TODO: move to gomatrixserverlib
+
// A JoinedHost is a server that is joined to a matrix room.
type JoinedHost struct {
// The MemberEventID of a m.room.member join event.
@@ -51,3 +53,16 @@ type InboundPeek struct {
RenewedTimestamp int64
RenewalInterval int64
}
+
+type FederationReceiptMRead struct {
+ User map[string]FederationReceiptData `json:"m.read"`
+}
+
+type FederationReceiptData struct {
+ Data ReceiptTS `json:"data"`
+ EventIDs []string `json:"event_ids"`
+}
+
+type ReceiptTS struct {
+ TS gomatrixserverlib.Timestamp `json:"ts"`
+}
diff --git a/eduserver/cache/cache.go b/internal/caching/cache_typing.go
index f637d7c9..bd6a5fc1 100644
--- a/eduserver/cache/cache.go
+++ b/internal/caching/cache_typing.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package cache
+package caching
import (
"sync"
@@ -53,8 +53,8 @@ func (t *EDUCache) newRoomData() *roomData {
}
}
-// New returns a new EDUCache initialised for use.
-func New() *EDUCache {
+// NewTypingCache returns a new EDUCache initialised for use.
+func NewTypingCache() *EDUCache {
return &EDUCache{data: make(map[string]*roomData)}
}
diff --git a/eduserver/cache/cache_test.go b/internal/caching/cache_typing_test.go
index c7d01879..c03d89bc 100644
--- a/eduserver/cache/cache_test.go
+++ b/internal/caching/cache_typing_test.go
@@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package cache
+package caching
import (
"testing"
@@ -24,9 +24,9 @@ import (
)
func TestEDUCache(t *testing.T) {
- tCache := New()
+ tCache := NewTypingCache()
if tCache == nil {
- t.Fatal("New failed")
+ t.Fatal("NewTypingCache failed")
}
t.Run("AddTypingUser", func(t *testing.T) {
diff --git a/internal/test/config.go b/internal/test/config.go
index 0372fb9c..0b0e897b 100644
--- a/internal/test/config.go
+++ b/internal/test/config.go
@@ -97,7 +97,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(database)
cfg.AppServiceAPI.InternalAPI.Listen = assignAddress()
- cfg.EDUServer.InternalAPI.Listen = assignAddress()
cfg.FederationAPI.InternalAPI.Listen = assignAddress()
cfg.KeyServer.InternalAPI.Listen = assignAddress()
cfg.MediaAPI.InternalAPI.Listen = assignAddress()
@@ -106,7 +105,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.UserAPI.InternalAPI.Listen = assignAddress()
cfg.AppServiceAPI.InternalAPI.Connect = cfg.AppServiceAPI.InternalAPI.Listen
- cfg.EDUServer.InternalAPI.Connect = cfg.EDUServer.InternalAPI.Listen
cfg.FederationAPI.InternalAPI.Connect = cfg.FederationAPI.InternalAPI.Listen
cfg.KeyServer.InternalAPI.Connect = cfg.KeyServer.InternalAPI.Listen
cfg.MediaAPI.InternalAPI.Connect = cfg.MediaAPI.InternalAPI.Listen
diff --git a/keyserver/api/api.go b/keyserver/api/api.go
index d361c622..429617b1 100644
--- a/keyserver/api/api.go
+++ b/keyserver/api/api.go
@@ -21,7 +21,6 @@ import (
"strings"
"time"
- eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -66,14 +65,25 @@ const (
// DeviceMessage represents the message produced into Kafka by the key server.
type DeviceMessage struct {
- Type DeviceMessageType `json:"Type,omitempty"`
- *DeviceKeys `json:"DeviceKeys,omitempty"`
- *eduapi.OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"`
+ Type DeviceMessageType `json:"Type,omitempty"`
+ *DeviceKeys `json:"DeviceKeys,omitempty"`
+ *OutputCrossSigningKeyUpdate `json:"CrossSigningKeyUpdate,omitempty"`
// A monotonically increasing number which represents device changes for this user.
StreamID int64
DeviceChangeID int64
}
+// OutputCrossSigningKeyUpdate is an entry in the signing key update output kafka log
+type OutputCrossSigningKeyUpdate struct {
+ CrossSigningKeyUpdate `json:"signing_keys"`
+}
+
+type CrossSigningKeyUpdate struct {
+ MasterKey *gomatrixserverlib.CrossSigningKey `json:"master_key,omitempty"`
+ SelfSigningKey *gomatrixserverlib.CrossSigningKey `json:"self_signing_key,omitempty"`
+ UserID string `json:"user_id"`
+}
+
// DeviceKeysEqual returns true if the device keys updates contain the
// same display name and key JSON. This will return false if either of
// the updates is not a device keys update, or if the user ID/device ID
diff --git a/keyserver/internal/cross_signing.go b/keyserver/internal/cross_signing.go
index 5124f37e..0d083b4b 100644
--- a/keyserver/internal/cross_signing.go
+++ b/keyserver/internal/cross_signing.go
@@ -22,7 +22,6 @@ import (
"fmt"
"strings"
- eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/types"
"github.com/matrix-org/gomatrixserverlib"
@@ -246,7 +245,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.P
}
// Finally, generate a notification that we updated the keys.
- update := eduserverAPI.CrossSigningKeyUpdate{
+ update := api.CrossSigningKeyUpdate{
UserID: req.UserID,
}
if mk, ok := byPurpose[gomatrixserverlib.CrossSigningKeyPurposeMaster]; ok {
@@ -337,7 +336,7 @@ func (a *KeyInternalAPI) PerformUploadDeviceSignatures(ctx context.Context, req
for userID := range req.Signatures {
masterKey := queryRes.MasterKeys[userID]
selfSigningKey := queryRes.SelfSigningKeys[userID]
- update := eduserverAPI.CrossSigningKeyUpdate{
+ update := api.CrossSigningKeyUpdate{
UserID: userID,
MasterKey: &masterKey,
SelfSigningKey: &selfSigningKey,
diff --git a/keyserver/producers/keychange.go b/keyserver/producers/keychange.go
index 9e1c4c64..f86c3417 100644
--- a/keyserver/producers/keychange.go
+++ b/keyserver/producers/keychange.go
@@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
- eduapi "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/keyserver/storage"
"github.com/matrix-org/dendrite/setup/jetstream"
@@ -70,10 +69,10 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
return nil
}
-func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error {
+func (p *KeyChange) ProduceSigningKeyUpdate(key api.CrossSigningKeyUpdate) error {
output := &api.DeviceMessage{
Type: api.TypeCrossSigningUpdate,
- OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{
+ OutputCrossSigningKeyUpdate: &api.OutputCrossSigningKeyUpdate{
CrossSigningKeyUpdate: key,
},
}
diff --git a/setup/base/base.go b/setup/base/base.go
index 6135e080..43d613b0 100644
--- a/setup/base/base.go
+++ b/setup/base/base.go
@@ -45,8 +45,6 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
- eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
- eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
federationIntHTTP "github.com/matrix-org/dendrite/federationapi/inthttp"
keyserverAPI "github.com/matrix-org/dendrite/keyserver/api"
@@ -247,15 +245,6 @@ func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI {
return userAPI
}
-// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
-func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
- e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.apiHttpClient)
- if err != nil {
- logrus.WithError(err).Panic("EDUServerClient failed", b.apiHttpClient)
- }
- return e
-}
-
// FederationAPIHTTPClient returns FederationInternalAPI for hitting
// the federation API server over HTTP
func (b *BaseDendrite) FederationAPIHTTPClient() federationAPI.FederationInternalAPI {
diff --git a/setup/config/config.go b/setup/config/config.go
index eb371a54..e03518e2 100644
--- a/setup/config/config.go
+++ b/setup/config/config.go
@@ -56,7 +56,6 @@ type Dendrite struct {
Global Global `yaml:"global"`
AppServiceAPI AppServiceAPI `yaml:"app_service_api"`
ClientAPI ClientAPI `yaml:"client_api"`
- EDUServer EDUServer `yaml:"edu_server"`
FederationAPI FederationAPI `yaml:"federation_api"`
KeyServer KeyServer `yaml:"key_server"`
MediaAPI MediaAPI `yaml:"media_api"`
@@ -296,7 +295,6 @@ func (c *Dendrite) Defaults(generate bool) {
c.Global.Defaults(generate)
c.ClientAPI.Defaults(generate)
- c.EDUServer.Defaults(generate)
c.FederationAPI.Defaults(generate)
c.KeyServer.Defaults(generate)
c.MediaAPI.Defaults(generate)
@@ -314,8 +312,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
Verify(configErrs *ConfigErrors, isMonolith bool)
}
for _, c := range []verifiable{
- &c.Global, &c.ClientAPI,
- &c.EDUServer, &c.FederationAPI,
+ &c.Global, &c.ClientAPI, &c.FederationAPI,
&c.KeyServer, &c.MediaAPI, &c.RoomServer,
&c.SyncAPI, &c.UserAPI,
&c.AppServiceAPI, &c.MSCs,
@@ -327,7 +324,6 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
func (c *Dendrite) Wiring() {
c.Global.JetStream.Matrix = &c.Global
c.ClientAPI.Matrix = &c.Global
- c.EDUServer.Matrix = &c.Global
c.FederationAPI.Matrix = &c.Global
c.KeyServer.Matrix = &c.Global
c.MediaAPI.Matrix = &c.Global
@@ -519,15 +515,6 @@ func (config *Dendrite) UserAPIURL() string {
return string(config.UserAPI.InternalAPI.Connect)
}
-// EDUServerURL returns an HTTP URL for where the EDU server is listening.
-func (config *Dendrite) EDUServerURL() string {
- // Hard code the EDU server to talk HTTP for now.
- // If we support HTTPS we need to think of a practical way to do certificate validation.
- // People setting up servers shouldn't need to get a certificate valid for the public
- // internet for an internal API.
- return string(config.EDUServer.InternalAPI.Connect)
-}
-
// KeyServerURL returns an HTTP URL for where the key server is listening.
func (config *Dendrite) KeyServerURL() string {
// Hard code the key server to talk HTTP for now.
diff --git a/setup/config/config_eduserver.go b/setup/config/config_eduserver.go
deleted file mode 100644
index e7ed36aa..00000000
--- a/setup/config/config_eduserver.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package config
-
-type EDUServer struct {
- Matrix *Global `yaml:"-"`
-
- InternalAPI InternalAPIOptions `yaml:"internal_api"`
-}
-
-func (c *EDUServer) Defaults(generate bool) {
- c.InternalAPI.Listen = "http://localhost:7778"
- c.InternalAPI.Connect = "http://localhost:7778"
-}
-
-func (c *EDUServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
- checkURL(configErrs, "edu_server.internal_api.listen", string(c.InternalAPI.Listen))
- checkURL(configErrs, "edu_server.internal_api.connect", string(c.InternalAPI.Connect))
-}
diff --git a/setup/config/config_test.go b/setup/config/config_test.go
index e6f0a493..46e973fa 100644
--- a/setup/config/config_test.go
+++ b/setup/config/config_test.go
@@ -101,10 +101,6 @@ current_state_server:
max_open_conns: 100
max_idle_conns: 2
conn_max_lifetime: -1
-edu_server:
- internal_api:
- listen: http://localhost:7778
- connect: http://localhost:7778
federation_api:
internal_api:
listen: http://localhost:7772
diff --git a/setup/jetstream/nats.go b/setup/jetstream/nats.go
index 328cf915..4e4fe7a2 100644
--- a/setup/jetstream/nats.go
+++ b/setup/jetstream/nats.go
@@ -157,5 +157,26 @@ func setupNATS(process *process.ProcessContext, cfg *config.JetStream, nc *natsc
}
}
+ // Clean up old consumers so that interest-based consumers do the
+ // right thing.
+ for stream, consumers := range map[string][]string{
+ OutputClientData: {"SyncAPIClientAPIConsumer"},
+ OutputReceiptEvent: {"SyncAPIEDUServerReceiptConsumer", "FederationAPIEDUServerConsumer"},
+ OutputSendToDeviceEvent: {"SyncAPIEDUServerSendToDeviceConsumer", "FederationAPIEDUServerConsumer"},
+ OutputTypingEvent: {"SyncAPIEDUServerTypingConsumer", "FederationAPIEDUServerConsumer"},
+ } {
+ streamName := cfg.Matrix.JetStream.Prefixed(stream)
+ for _, consumer := range consumers {
+ consumerName := cfg.Matrix.JetStream.Prefixed(consumer) + "Pull"
+ consumerInfo, err := s.ConsumerInfo(streamName, consumerName)
+ if err != nil || consumerInfo == nil {
+ continue
+ }
+ if err = s.DeleteConsumer(streamName, consumerName); err != nil {
+ logrus.WithError(err).Errorf("Unable to clean up old consumer %q for stream %q", consumer, stream)
+ }
+ }
+ }
+
return s, nc
}
diff --git a/setup/jetstream/streams.go b/setup/jetstream/streams.go
index aa979924..5f0d37fd 100644
--- a/setup/jetstream/streams.go
+++ b/setup/jetstream/streams.go
@@ -9,8 +9,9 @@ import (
)
const (
- UserID = "user_id"
- RoomID = "room_id"
+ UserID = "user_id"
+ RoomID = "room_id"
+ EventID = "event_id"
)
var (
diff --git a/setup/monolith.go b/setup/monolith.go
index cf6872f9..32f1a649 100644
--- a/setup/monolith.go
+++ b/setup/monolith.go
@@ -19,7 +19,6 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/api"
- eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi"
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
"github.com/matrix-org/dendrite/internal/transactions"
@@ -43,12 +42,11 @@ type Monolith struct {
Client *gomatrixserverlib.Client
FedClient *gomatrixserverlib.FederationClient
- AppserviceAPI appserviceAPI.AppServiceQueryAPI
- EDUInternalAPI eduServerAPI.EDUServerInputAPI
- FederationAPI federationAPI.FederationInternalAPI
- RoomserverAPI roomserverAPI.RoomserverInternalAPI
- UserAPI userapi.UserInternalAPI
- KeyAPI keyAPI.KeyInternalAPI
+ AppserviceAPI appserviceAPI.AppServiceQueryAPI
+ FederationAPI federationAPI.FederationInternalAPI
+ RoomserverAPI roomserverAPI.RoomserverInternalAPI
+ UserAPI userapi.UserInternalAPI
+ KeyAPI keyAPI.KeyInternalAPI
// Optional
ExtPublicRoomsProvider api.ExtraPublicRoomsProvider
@@ -64,14 +62,14 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
clientapi.AddPublicRoutes(
process, csMux, synapseMux, &m.Config.ClientAPI,
m.FedClient, m.RoomserverAPI,
- m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
+ m.AppserviceAPI, transactions.New(),
m.FederationAPI, m.UserAPI, userDirectoryProvider, m.KeyAPI,
m.ExtPublicRoomsProvider, &m.Config.MSCs,
)
federationapi.AddPublicRoutes(
- ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
+ process, ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationAPI,
- m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil,
+ m.KeyAPI, &m.Config.MSCs, nil,
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go
index 40c1cd3d..c28da460 100644
--- a/syncapi/consumers/clientapi.go
+++ b/syncapi/consumers/clientapi.go
@@ -62,7 +62,7 @@ func NewOutputClientDataConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputClientData),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIAccountDataConsumer"),
db: store,
notifier: notifier,
stream: stream,
diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/receipts.go
index ab79998e..6bb0747f 100644
--- a/syncapi/consumers/eduserver_receipts.go
+++ b/syncapi/consumers/receipts.go
@@ -17,11 +17,10 @@ package consumers
import (
"context"
"database/sql"
- "encoding/json"
"fmt"
+ "strconv"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -63,7 +62,7 @@ func NewOutputReceiptEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputReceiptEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPIReceiptConsumer"),
db: store,
notifier: notifier,
stream: stream,
@@ -72,7 +71,7 @@ func NewOutputReceiptEventConsumer(
}
}
-// Start consuming from EDU api
+// Start consuming receipts events.
func (s *OutputReceiptEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
@@ -81,14 +80,23 @@ func (s *OutputReceiptEventConsumer) Start() error {
}
func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
- var output api.OutputReceiptEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
+ output := types.OutputReceiptEvent{
+ UserID: msg.Header.Get(jetstream.UserID),
+ RoomID: msg.Header.Get(jetstream.RoomID),
+ EventID: msg.Header.Get(jetstream.EventID),
+ Type: msg.Header.Get("type"),
+ }
+
+ timestamp, err := strconv.Atoi(msg.Header.Get("timestamp"))
+ if err != nil {
// If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("EDU server output log: message parse failure")
+ log.WithError(err).Errorf("output log: message parse failure")
sentry.CaptureException(err)
return true
}
+ output.Timestamp = gomatrixserverlib.Timestamp(timestamp)
+
streamPos, err := s.db.StoreReceipt(
s.ctx,
output.RoomID,
@@ -117,7 +125,7 @@ func (s *OutputReceiptEventConsumer) onMessage(ctx context.Context, msg *nats.Ms
return true
}
-func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output api.OutputReceiptEvent) error {
+func (s *OutputReceiptEventConsumer) sendReadUpdate(ctx context.Context, output types.OutputReceiptEvent) error {
if output.Type != "m.read" {
return nil
}
diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/sendtodevice.go
index bdbe7735..0b9153fc 100644
--- a/syncapi/consumers/eduserver_sendtodevice.go
+++ b/syncapi/consumers/sendtodevice.go
@@ -19,7 +19,6 @@ import (
"encoding/json"
"github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@@ -58,7 +57,7 @@ func NewOutputSendToDeviceEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputSendToDeviceEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPISendToDeviceConsumer"),
db: store,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
@@ -66,7 +65,7 @@ func NewOutputSendToDeviceEventConsumer(
}
}
-// Start consuming from EDU api
+// Start consuming send-to-device events.
func (s *OutputSendToDeviceEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
@@ -75,15 +74,8 @@ func (s *OutputSendToDeviceEventConsumer) Start() error {
}
func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
- var output api.OutputSendToDeviceEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("EDU server output log: message parse failure")
- sentry.CaptureException(err)
- return true
- }
-
- _, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
+ userID := msg.Header.Get(jetstream.UserID)
+ _, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
sentry.CaptureException(err)
return true
@@ -92,12 +84,20 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(ctx context.Context, msg *na
return true
}
+ var output types.OutputSendToDeviceEvent
+ if err = json.Unmarshal(msg.Data, &output); err != nil {
+ // If the message was invalid, log it and move on to the next message in the stream
+ log.WithError(err).Errorf("output log: message parse failure")
+ sentry.CaptureException(err)
+ return true
+ }
+
util.GetLogger(context.TODO()).WithFields(log.Fields{
"sender": output.Sender,
"user_id": output.UserID,
"device_id": output.DeviceID,
"event_type": output.Type,
- }).Info("sync API received send-to-device event from EDU server")
+ }).Debugf("sync API received send-to-device event from the clientapi/federationsender")
streamPos, err := s.db.StoreNewSendForDeviceMessage(
s.ctx, output.UserID, output.DeviceID, output.SendToDeviceEvent,
diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/typing.go
index c2828c7f..48e484ec 100644
--- a/syncapi/consumers/eduserver_typing.go
+++ b/syncapi/consumers/typing.go
@@ -16,16 +16,14 @@ package consumers
import (
"context"
- "encoding/json"
+ "strconv"
+ "time"
- "github.com/getsentry/sentry-go"
- "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
- "github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
@@ -37,7 +35,7 @@ type OutputTypingEventConsumer struct {
jetstream nats.JetStreamContext
durable string
topic string
- eduCache *cache.EDUCache
+ eduCache *caching.EDUCache
stream types.StreamProvider
notifier *notifier.Notifier
}
@@ -48,8 +46,7 @@ func NewOutputTypingEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
js nats.JetStreamContext,
- store storage.Database,
- eduCache *cache.EDUCache,
+ eduCache *caching.EDUCache,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputTypingEventConsumer {
@@ -57,14 +54,14 @@ func NewOutputTypingEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputTypingEvent),
- durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
+ durable: cfg.Matrix.JetStream.Durable("SyncAPITypingConsumer"),
eduCache: eduCache,
notifier: notifier,
stream: stream,
}
}
-// Start consuming from EDU api
+// Start consuming typing events.
func (s *OutputTypingEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, s.onMessage,
@@ -73,34 +70,40 @@ func (s *OutputTypingEventConsumer) Start() error {
}
func (s *OutputTypingEventConsumer) onMessage(ctx context.Context, msg *nats.Msg) bool {
- var output api.OutputTypingEvent
- if err := json.Unmarshal(msg.Data, &output); err != nil {
- // If the message was invalid, log it and move on to the next message in the stream
- log.WithError(err).Errorf("EDU server output log: message parse failure")
- sentry.CaptureException(err)
+ roomID := msg.Header.Get(jetstream.RoomID)
+ userID := msg.Header.Get(jetstream.UserID)
+ typing, err := strconv.ParseBool(msg.Header.Get("typing"))
+ if err != nil {
+ log.WithError(err).Errorf("output log: typing parse failure")
+ return true
+ }
+ timeout, err := strconv.Atoi(msg.Header.Get("timeout_ms"))
+ if err != nil {
+ log.WithError(err).Errorf("output log: timeout_ms parse failure")
return true
}
log.WithFields(log.Fields{
- "room_id": output.Event.RoomID,
- "user_id": output.Event.UserID,
- "typing": output.Event.Typing,
- }).Debug("received data from EDU server")
+ "room_id": roomID,
+ "user_id": userID,
+ "typing": typing,
+ "timeout": timeout,
+ }).Debug("syncapi received EDU data from client api")
var typingPos types.StreamPosition
- typingEvent := output.Event
- if typingEvent.Typing {
+ if typing {
+ expiry := time.Now().Add(time.Duration(timeout) * time.Millisecond)
typingPos = types.StreamPosition(
- s.eduCache.AddTypingUser(typingEvent.UserID, typingEvent.RoomID, output.ExpireTime),
+ s.eduCache.AddTypingUser(userID, roomID, &expiry),
)
} else {
typingPos = types.StreamPosition(
- s.eduCache.RemoveUser(typingEvent.UserID, typingEvent.RoomID),
+ s.eduCache.RemoveUser(userID, roomID),
)
}
s.stream.Advance(typingPos)
- s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
+ s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: typingPos})
return true
}
diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go
index b6ac5be1..647fffad 100644
--- a/syncapi/storage/interface.go
+++ b/syncapi/storage/interface.go
@@ -17,7 +17,6 @@ package storage
import (
"context"
- eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -46,7 +45,7 @@ type Database interface {
InviteEventsInRange(ctx context.Context, targetUserID string, r types.Range) (map[string]*gomatrixserverlib.HeaderedEvent, map[string]*gomatrixserverlib.HeaderedEvent, error)
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
- RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
+ RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
@@ -136,7 +135,7 @@ type Database interface {
// StoreReceipt stores new receipt events
StoreReceipt(ctx context.Context, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
// GetRoomReceipts gets all receipts for a given roomID
- GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error)
+ GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error)
// UpsertRoomUnreadNotificationCounts updates the notification statistics about a (user, room) key.
UpsertRoomUnreadNotificationCounts(ctx context.Context, userID, roomID string, notificationCount, highlightCount int) (types.StreamPosition, error)
diff --git a/syncapi/storage/postgres/receipt_table.go b/syncapi/storage/postgres/receipt_table.go
index 474d0c02..2a42ffd7 100644
--- a/syncapi/storage/postgres/receipt_table.go
+++ b/syncapi/storage/postgres/receipt_table.go
@@ -21,7 +21,6 @@ import (
"github.com/lib/pq"
- "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
@@ -95,16 +94,16 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
return
}
-func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
+func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) {
var lastPos types.StreamPosition
rows, err := r.selectRoomReceipts.QueryContext(ctx, pq.Array(roomIDs), streamPos)
if err != nil {
return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
- var res []api.OutputReceiptEvent
+ var res []types.OutputReceiptEvent
for rows.Next() {
- r := api.OutputReceiptEvent{}
+ r := types.OutputReceiptEvent{}
var id types.StreamPosition
err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
if err != nil {
diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go
index 9a2dc0d4..349e4452 100644
--- a/syncapi/storage/shared/syncserver.go
+++ b/syncapi/storage/shared/syncserver.go
@@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
- eduAPI "github.com/matrix-org/dendrite/eduserver/api"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/internal/eventutil"
@@ -135,7 +134,7 @@ func (d *Database) PeeksInRange(ctx context.Context, userID, deviceID string, r
return d.Peeks.SelectPeeksInRange(ctx, nil, userID, deviceID, r)
}
-func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) {
+func (d *Database) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) {
return d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
}
@@ -972,7 +971,7 @@ func (d *Database) StoreReceipt(ctx context.Context, roomId, receiptType, userId
return
}
-func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]eduAPI.OutputReceiptEvent, error) {
+func (d *Database) GetRoomReceipts(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) ([]types.OutputReceiptEvent, error) {
_, receipts, err := d.Receipts.SelectRoomReceiptsAfter(ctx, roomIDs, streamPos)
return receipts, err
}
diff --git a/syncapi/storage/sqlite3/receipt_table.go b/syncapi/storage/sqlite3/receipt_table.go
index 9111a39f..dea05771 100644
--- a/syncapi/storage/sqlite3/receipt_table.go
+++ b/syncapi/storage/sqlite3/receipt_table.go
@@ -20,7 +20,6 @@ import (
"fmt"
"strings"
- "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
@@ -99,7 +98,7 @@ func (r *receiptStatements) UpsertReceipt(ctx context.Context, txn *sql.Tx, room
}
// SelectRoomReceiptsAfter select all receipts for a given room after a specific timestamp
-func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []api.OutputReceiptEvent, error) {
+func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error) {
selectSQL := strings.Replace(selectRoomReceipts, "($2)", sqlutil.QueryVariadicOffset(len(roomIDs), 1), 1)
var lastPos types.StreamPosition
params := make([]interface{}, len(roomIDs)+1)
@@ -112,9 +111,9 @@ func (r *receiptStatements) SelectRoomReceiptsAfter(ctx context.Context, roomIDs
return 0, nil, fmt.Errorf("unable to query room receipts: %w", err)
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectRoomReceiptsAfter: rows.close() failed")
- var res []api.OutputReceiptEvent
+ var res []types.OutputReceiptEvent
for rows.Next() {
- r := api.OutputReceiptEvent{}
+ r := types.OutputReceiptEvent{}
var id types.StreamPosition
err = rows.Scan(&id, &r.RoomID, &r.Type, &r.UserID, &r.EventID, &r.Timestamp)
if err != nil {
diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go
index 640b7dc3..ba0076e2 100644
--- a/syncapi/storage/tables/interface.go
+++ b/syncapi/storage/tables/interface.go
@@ -18,7 +18,6 @@ import (
"context"
"database/sql"
- eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@@ -168,7 +167,7 @@ type Filter interface {
type Receipts interface {
UpsertReceipt(ctx context.Context, txn *sql.Tx, roomId, receiptType, userId, eventId string, timestamp gomatrixserverlib.Timestamp) (pos types.StreamPosition, err error)
- SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
+ SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []types.OutputReceiptEvent, error)
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
}
diff --git a/syncapi/streams/stream_receipt.go b/syncapi/streams/stream_receipt.go
index 35ffd3a1..680f8cd8 100644
--- a/syncapi/streams/stream_receipt.go
+++ b/syncapi/streams/stream_receipt.go
@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
- eduAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -53,7 +52,7 @@ func (p *ReceiptStreamProvider) IncrementalSync(
}
// Group receipts by room, so we can create one ClientEvent for every room
- receiptsByRoom := make(map[string][]eduAPI.OutputReceiptEvent)
+ receiptsByRoom := make(map[string][]types.OutputReceiptEvent)
for _, receipt := range receipts {
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
}
@@ -68,15 +67,15 @@ func (p *ReceiptStreamProvider) IncrementalSync(
Type: gomatrixserverlib.MReceipt,
RoomID: roomID,
}
- content := make(map[string]eduAPI.ReceiptMRead)
+ content := make(map[string]ReceiptMRead)
for _, receipt := range receipts {
read, ok := content[receipt.EventID]
if !ok {
- read = eduAPI.ReceiptMRead{
- User: make(map[string]eduAPI.ReceiptTS),
+ read = ReceiptMRead{
+ User: make(map[string]ReceiptTS),
}
}
- read.User[receipt.UserID] = eduAPI.ReceiptTS{TS: receipt.Timestamp}
+ read.User[receipt.UserID] = ReceiptTS{TS: receipt.Timestamp}
content[receipt.EventID] = read
}
ev.Content, err = json.Marshal(content)
@@ -91,3 +90,11 @@ func (p *ReceiptStreamProvider) IncrementalSync(
return lastPos
}
+
+type ReceiptMRead struct {
+ User map[string]ReceiptTS `json:"m.read"`
+}
+
+type ReceiptTS struct {
+ TS gomatrixserverlib.Timestamp `json:"ts"`
+}
diff --git a/syncapi/streams/stream_typing.go b/syncapi/streams/stream_typing.go
index 1e7a46bd..e46cd447 100644
--- a/syncapi/streams/stream_typing.go
+++ b/syncapi/streams/stream_typing.go
@@ -4,14 +4,14 @@ import (
"context"
"encoding/json"
- "github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type TypingStreamProvider struct {
StreamProvider
- EDUCache *cache.EDUCache
+ EDUCache *caching.EDUCache
}
func (p *TypingStreamProvider) CompleteSync(
diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go
index 17951acb..b2273aad 100644
--- a/syncapi/streams/streams.go
+++ b/syncapi/streams/streams.go
@@ -3,7 +3,7 @@ package streams
import (
"context"
- "github.com/matrix-org/dendrite/eduserver/cache"
+ "github.com/matrix-org/dendrite/internal/caching"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage"
@@ -25,7 +25,7 @@ type Streams struct {
func NewSyncStreamProviders(
d storage.Database, userAPI userapi.UserInternalAPI,
rsAPI rsapi.RoomserverInternalAPI, keyAPI keyapi.KeyInternalAPI,
- eduCache *cache.EDUCache,
+ eduCache *caching.EDUCache,
) *Streams {
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index ed8118bf..b579467a 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -18,9 +18,9 @@ import (
"context"
"github.com/gorilla/mux"
+ "github.com/matrix-org/dendrite/internal/caching"
"github.com/sirupsen/logrus"
- "github.com/matrix-org/dendrite/eduserver/cache"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
@@ -56,7 +56,7 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to connect to sync db")
}
- eduCache := cache.New()
+ eduCache := caching.NewTypingCache()
streams := streams.NewSyncStreamProviders(syncDB, userAPI, rsAPI, keyAPI, eduCache)
notifier := notifier.NewNotifier(streams.Latest(context.Background()))
if err = notifier.Load(context.Background(), syncDB); err != nil {
@@ -110,7 +110,7 @@ func AddPublicRoutes(
}
typingConsumer := consumers.NewOutputTypingEventConsumer(
- process, cfg, js, syncDB, eduCache, notifier, streams.TypingStreamProvider,
+ process, cfg, js, eduCache, notifier, streams.TypingStreamProvider,
)
if err = typingConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start typing consumer")
diff --git a/syncapi/types/types.go b/syncapi/types/types.go
index 4150e6c9..f964b80b 100644
--- a/syncapi/types/types.go
+++ b/syncapi/types/types.go
@@ -487,3 +487,21 @@ type StreamedEvent struct {
Event *gomatrixserverlib.HeaderedEvent `json:"event"`
StreamPosition StreamPosition `json:"stream_position"`
}
+
+// OutputReceiptEvent is an entry in the receipt output kafka log
+type OutputReceiptEvent struct {
+ UserID string `json:"user_id"`
+ RoomID string `json:"room_id"`
+ EventID string `json:"event_id"`
+ Type string `json:"type"`
+ Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
+}
+
+// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
+// This contains the full event content, along with the user ID and device ID
+// to which it is destined.
+type OutputSendToDeviceEvent struct {
+ UserID string `json:"user_id"`
+ DeviceID string `json:"device_id"`
+ gomatrixserverlib.SendToDeviceEvent
+}