aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--clientapi/producers/eduserver.go6
-rw-r--r--cmd/dendrite-federation-api-server/main.go7
-rw-r--r--cmd/dendrite-monolith-server/main.go4
-rw-r--r--cmd/dendritejs/main.go4
-rw-r--r--eduserver/api/input.go4
-rw-r--r--eduserver/input/input.go4
-rw-r--r--federationapi/federationapi.go3
-rw-r--r--federationapi/routing/routing.go3
-rw-r--r--federationapi/routing/send.go54
-rw-r--r--federationsender/consumers/eduserver.go11
10 files changed, 75 insertions, 25 deletions
diff --git a/clientapi/producers/eduserver.go b/clientapi/producers/eduserver.go
index 14414ec6..30c40fb7 100644
--- a/clientapi/producers/eduserver.go
+++ b/clientapi/producers/eduserver.go
@@ -20,7 +20,7 @@ import (
"github.com/matrix-org/gomatrixserverlib"
)
-// EDUServerProducer produces events for the typing server to consume
+// EDUServerProducer produces events for the EDU server to consume
type EDUServerProducer struct {
InputAPI api.EDUServerInputAPI
}
@@ -35,13 +35,13 @@ func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer {
// SendTyping sends a typing event to EDU server
func (p *EDUServerProducer) SendTyping(
ctx context.Context, userID, roomID string,
- typing bool, timeout int64,
+ typing bool, timeoutMS int64,
) error {
requestData := api.InputTypingEvent{
UserID: userID,
RoomID: roomID,
Typing: typing,
- Timeout: timeout,
+ TimeoutMS: timeoutMS,
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go
index 367f5dc0..d18926a6 100644
--- a/cmd/dendrite-federation-api-server/main.go
+++ b/cmd/dendrite-federation-api-server/main.go
@@ -15,8 +15,11 @@
package main
import (
+ "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
+ "github.com/matrix-org/dendrite/eduserver"
+ "github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationapi"
)
@@ -34,10 +37,12 @@ func main() {
alias, input, query := base.CreateHTTPRoomserverAPIs()
asQuery := base.CreateHTTPAppServiceAPIs()
+ eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New())
+ eduProducer := producers.NewEDUServerProducer(eduInputAPI)
federationapi.SetupFederationAPIComponent(
base, accountDB, deviceDB, federation, &keyRing,
- alias, input, query, asQuery, federationSender,
+ alias, input, query, asQuery, federationSender, eduProducer,
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index 0aceef02..9f6531ed 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -20,6 +20,7 @@ import (
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
+ "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/keydb"
@@ -67,7 +68,8 @@ func main() {
federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
- federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
+ eduProducer := producers.NewEDUServerProducer(eduInputAPI)
+ federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index 0f72dc1e..05802725 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -23,6 +23,7 @@ import (
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/clientapi"
+ "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/common/config"
@@ -133,7 +134,8 @@ func main() {
federation, &keyRing, alias, input, query,
eduInputAPI, asQuery, transactions.New(), fedSenderAPI,
)
- federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI)
+ eduProducer := producers.NewEDUServerProducer(eduInputAPI)
+ federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer)
mediaapi.SetupMediaAPIComponent(base, deviceDB)
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg)
diff --git a/eduserver/api/input.go b/eduserver/api/input.go
index c95acaf1..ad3f1ed5 100644
--- a/eduserver/api/input.go
+++ b/eduserver/api/input.go
@@ -30,8 +30,8 @@ type InputTypingEvent struct {
RoomID string `json:"room_id"`
// Typing is true if the user is typing, false if they have stopped.
Typing bool `json:"typing"`
- // Timeout is the interval for which the user should be marked as typing.
- Timeout int64 `json:"timeout"`
+ // Timeout is the interval in milliseconds for which the user should be marked as typing.
+ TimeoutMS int64 `json:"timeout"`
// OriginServerTS when the server received the update.
OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"`
}
diff --git a/eduserver/input/input.go b/eduserver/input/input.go
index e0cc6922..84590945 100644
--- a/eduserver/input/input.go
+++ b/eduserver/input/input.go
@@ -46,7 +46,7 @@ func (t *EDUServerInputAPI) InputTypingEvent(
if ite.Typing {
// user is typing, update our current state of users typing.
expireTime := ite.OriginServerTS.Time().Add(
- time.Duration(ite.Timeout) * time.Millisecond,
+ time.Duration(ite.TimeoutMS) * time.Millisecond,
)
t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime)
} else {
@@ -69,7 +69,7 @@ func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error {
if ev.Typing {
expireTime := ite.OriginServerTS.Time().Add(
- time.Duration(ite.Timeout) * time.Millisecond,
+ time.Duration(ite.TimeoutMS) * time.Millisecond,
)
ote.ExpireTime = &expireTime
}
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index 90db95b3..ed96322b 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -41,12 +41,13 @@ func SetupFederationAPIComponent(
queryAPI roomserverAPI.RoomserverQueryAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
+ eduProducer *producers.EDUServerProducer,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI)
routing.Setup(
base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI,
- roomserverProducer, federationSenderAPI, *keyRing,
+ roomserverProducer, eduProducer, federationSenderAPI, *keyRing,
federation, accountsDB, deviceDB,
)
}
diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go
index b5c8e53d..9ac53576 100644
--- a/federationapi/routing/routing.go
+++ b/federationapi/routing/routing.go
@@ -48,6 +48,7 @@ func Setup(
aliasAPI roomserverAPI.RoomserverAliasAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
producer *producers.RoomserverProducer,
+ eduProducer *producers.EDUServerProducer,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
@@ -79,7 +80,7 @@ func Setup(
}
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
- cfg, query, producer, keys, federation,
+ cfg, query, producer, eduProducer, keys, federation,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 4c92c7e5..1013a44c 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -36,20 +36,22 @@ func Send(
cfg *config.Dendrite,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
+ eduProducer *producers.EDUServerProducer,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
t := txnReq{
- context: httpReq.Context(),
- query: query,
- producer: producer,
- keys: keys,
- federation: federation,
+ context: httpReq.Context(),
+ query: query,
+ producer: producer,
+ eduProducer: eduProducer,
+ keys: keys,
+ federation: federation,
}
var txnEvents struct {
- PDUs []json.RawMessage `json:"pdus"`
- EDUs []json.RawMessage `json:"edus"`
+ PDUs []json.RawMessage `json:"pdus"`
+ EDUs []gomatrixserverlib.EDU `json:"edus"`
}
if err := json.Unmarshal(request.Content(), &txnEvents); err != nil {
@@ -59,7 +61,9 @@ func Send(
}
}
+ // TODO: Really we should have a function to convert FederationRequest to txnReq
t.PDUs = txnEvents.PDUs
+ t.EDUs = txnEvents.EDUs
t.Origin = request.Origin()
t.TransactionID = txnID
t.Destination = cfg.Matrix.ServerName
@@ -80,11 +84,12 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
- context context.Context
- query api.RoomserverQueryAPI
- producer *producers.RoomserverProducer
- keys gomatrixserverlib.KeyRing
- federation *gomatrixserverlib.FederationClient
+ context context.Context
+ query api.RoomserverQueryAPI
+ producer *producers.RoomserverProducer
+ eduProducer *producers.EDUServerProducer
+ keys gomatrixserverlib.KeyRing
+ federation *gomatrixserverlib.FederationClient
}
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
@@ -152,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
}
}
- // TODO: Process the EDUs.
+ t.processEDUs(t.EDUs)
util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID)
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
@@ -163,6 +168,29 @@ type unknownRoomError struct {
func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) }
+func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
+ for _, e := range edus {
+ switch e.Type {
+ case gomatrixserverlib.MTyping:
+ // https://matrix.org/docs/spec/server_server/latest#typing-notifications
+ var typingPayload struct {
+ RoomID string `json:"room_id"`
+ UserID string `json:"user_id"`
+ Typing bool `json:"typing"`
+ }
+ if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
+ util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event")
+ continue
+ }
+ if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
+ util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server")
+ }
+ default:
+ util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu")
+ }
+ }
+}
+
func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
prevEventIDs := e.PrevEventIDs()
diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go
index ba45db7f..4d2445f3 100644
--- a/federationsender/consumers/eduserver.go
+++ b/federationsender/consumers/eduserver.go
@@ -73,6 +73,17 @@ func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
return nil
}
+ // only send typing events which originated from us
+ _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID)
+ if err != nil {
+ log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender")
+ return nil
+ }
+ if typingServerName != t.ServerName {
+ log.WithField("other_server", typingServerName).Info("Suppressing typing notif: originated elsewhere")
+ return nil
+ }
+
joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID)
if err != nil {
return err