aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-10 12:17:54 +0100
committerGitHub <noreply@github.com>2020-06-10 12:17:54 +0100
commitb7187a9a354530c1846c2a97be701ea484f91c2c (patch)
tree6d14903a444fa8bca964247dbe85ae8d9709d4c8
parentd9d6f4568ce891ae0ae9d2a3449974d3777bd21d (diff)
Remove clientapi producers which aren't actually producers (#1111)
* Remove clientapi producers which aren't actually producers They are actually just convenience wrappers around the internal APIs for roomserver/eduserver. Move their logic to their respective `api` packages and call them directly. * Remove TODO * unbreak ygg
-rw-r--r--clientapi/clientapi.go7
-rw-r--r--clientapi/routing/createroom.go17
-rw-r--r--clientapi/routing/membership.go17
-rw-r--r--clientapi/routing/profile.go15
-rw-r--r--clientapi/routing/routing.go26
-rw-r--r--clientapi/routing/sendevent.go8
-rw-r--r--clientapi/routing/sendtodevice.go8
-rw-r--r--clientapi/routing/sendtyping.go8
-rw-r--r--clientapi/threepid/invites.go11
-rw-r--r--cmd/dendrite-demo-libp2p/main.go3
-rw-r--r--cmd/dendrite-demo-yggdrasil/main.go3
-rw-r--r--cmd/dendrite-federation-api-server/main.go5
-rw-r--r--cmd/dendrite-monolith-server/main.go3
-rw-r--r--cmd/dendritejs/main.go3
-rw-r--r--eduserver/api/wrapper.go (renamed from clientapi/producers/eduserver.go)41
-rw-r--r--federationapi/federationapi.go10
-rw-r--r--federationapi/routing/invite.go8
-rw-r--r--federationapi/routing/join.go8
-rw-r--r--federationapi/routing/leave.go9
-rw-r--r--federationapi/routing/routing.go19
-rw-r--r--federationapi/routing/send.go41
-rw-r--r--federationapi/routing/send_test.go16
-rw-r--r--federationapi/routing/threepid.go14
-rw-r--r--internal/setup/monolith.go5
-rw-r--r--roomserver/api/wrapper.go (renamed from clientapi/producers/roomserver.go)81
25 files changed, 160 insertions, 226 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 90db9eea..2780f367 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -48,9 +48,6 @@ func AddPublicRoutes(
transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
) {
- roomserverProducer := producers.NewRoomserverProducer(rsAPI)
- eduProducer := producers.NewEDUServerProducer(eduInputAPI)
-
syncProducer := &producers.SyncAPIProducer{
Producer: producer,
Topic: string(cfg.Kafka.Topics.OutputClientData),
@@ -64,8 +61,8 @@ func AddPublicRoutes(
}
routing.Setup(
- router, cfg, roomserverProducer, rsAPI, asAPI,
+ router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing,
- syncProducer, eduProducer, transactionsCache, fsAPI,
+ syncProducer, transactionsCache, fsAPI,
)
}
diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go
index 1b4ff184..89f49d35 100644
--- a/clientapi/routing/createroom.go
+++ b/clientapi/routing/createroom.go
@@ -29,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@@ -137,21 +136,21 @@ type fledglingEvent struct {
// CreateRoom implements /createRoom
func CreateRoom(
req *http.Request, device *authtypes.Device,
- cfg *config.Dendrite, producer *producers.RoomserverProducer,
+ cfg *config.Dendrite,
accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
// TODO (#267): Check room ID doesn't clash with an existing one, and we
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
- return createRoom(req, device, cfg, roomID, producer, accountDB, rsAPI, asAPI)
+ return createRoom(req, device, cfg, roomID, accountDB, rsAPI, asAPI)
}
// createRoom implements /createRoom
// nolint: gocyclo
func createRoom(
req *http.Request, device *authtypes.Device,
- cfg *config.Dendrite, roomID string, producer *producers.RoomserverProducer,
+ cfg *config.Dendrite, roomID string,
accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
@@ -344,9 +343,9 @@ func createRoom(
}
// send events to the room server
- _, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil)
+ _, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -404,14 +403,14 @@ func createRoom(
}
}
// Send the invite event to the roomserver.
- if err = producer.SendInvite(
- req.Context(),
+ if err = roomserverAPI.SendInvite(
+ req.Context(), rsAPI,
inviteEvent.Headered(roomVersion),
strippedState, // invite room state
cfg.Matrix.ServerName, // send as server
nil, // transaction ID
); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed")
return jsonerror.InternalServerError()
}
}
diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go
index 74d92a05..0b2c2bc5 100644
--- a/clientapi/routing/membership.go
+++ b/clientapi/routing/membership.go
@@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@@ -46,7 +45,6 @@ func SendMembership(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
roomID string, membership string, cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI,
- producer *producers.RoomserverProducer,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
@@ -71,7 +69,7 @@ func SendMembership(
}
inviteStored, jsonErrResp := checkAndProcessThreepid(
- req, device, &body, cfg, rsAPI, accountDB, producer,
+ req, device, &body, cfg, rsAPI, accountDB,
membership, roomID, evTime,
)
if jsonErrResp != nil {
@@ -112,8 +110,8 @@ func SendMembership(
switch membership {
case gomatrixserverlib.Invite:
// Invites need to be handled specially
- err = producer.SendInvite(
- req.Context(),
+ err = roomserverAPI.SendInvite(
+ req.Context(), rsAPI,
event.Headered(verRes.RoomVersion),
nil, // ask the roomserver to draw up invite room state for us
cfg.Matrix.ServerName,
@@ -130,14 +128,14 @@ func SendMembership(
}{roomID}
fallthrough
default:
- _, err = producer.SendEvents(
- req.Context(),
+ _, err = roomserverAPI.SendEvents(
+ req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
}
@@ -252,13 +250,12 @@ func checkAndProcessThreepid(
cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI,
accountDB accounts.Database,
- producer *producers.RoomserverProducer,
membership, roomID string,
evTime time.Time,
) (inviteStored bool, errRes *util.JSONResponse) {
inviteStored, err := threepid.CheckAndProcessInvite(
- req.Context(), device, body, cfg, rsAPI, accountDB, producer,
+ req.Context(), device, body, cfg, rsAPI, accountDB,
membership, roomID, evTime,
)
if err == threepid.ErrMissingParameter {
diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go
index c0fe32a3..642a7288 100644
--- a/clientapi/routing/profile.go
+++ b/clientapi/routing/profile.go
@@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -94,8 +93,7 @@ func GetAvatarURL(
// nolint:gocyclo
func SetAvatarURL(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
- userID string, cfg *config.Dendrite,
- rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
+ userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
@@ -167,8 +165,8 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
- if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed")
+ if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -209,8 +207,7 @@ func GetDisplayName(
// nolint:gocyclo
func SetDisplayName(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
- userID string, cfg *config.Dendrite,
- rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
+ userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
@@ -282,8 +279,8 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
- if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed")
+ if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index eb558205..02470775 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
+ eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@@ -48,7 +49,7 @@ const pathPrefixUnstable = "/client/unstable"
// nolint: gocyclo
func Setup(
publicAPIMux *mux.Router, cfg *config.Dendrite,
- producer *producers.RoomserverProducer,
+ eduAPI eduServerAPI.EDUServerInputAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
accountDB accounts.Database,
@@ -56,7 +57,6 @@ func Setup(
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing,
syncProducer *producers.SyncAPIProducer,
- eduProducer *producers.EDUServerProducer,
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
) {
@@ -89,7 +89,7 @@ func Setup(
r0mux.Handle("/createRoom",
internal.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
- return CreateRoom(req, device, cfg, producer, accountDB, rsAPI, asAPI)
+ return CreateRoom(req, device, cfg, accountDB, rsAPI, asAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/join/{roomIDOrAlias}",
@@ -125,7 +125,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI, producer)
+ return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
@@ -134,7 +134,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, producer, nil)
+ return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
@@ -145,7 +145,7 @@ func Setup(
}
txnID := vars["txnID"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID,
- nil, cfg, rsAPI, producer, transactionsCache)
+ nil, cfg, rsAPI, transactionsCache)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/event/{eventID}",
@@ -194,7 +194,7 @@ func Setup(
if strings.HasSuffix(eventType, "/") {
eventType = eventType[:len(eventType)-1]
}
- return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, producer, nil)
+ return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -205,7 +205,7 @@ func Setup(
return util.ErrorResponse(err)
}
stateKey := vars["stateKey"]
- return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, producer, nil)
+ return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -269,7 +269,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer)
+ return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -280,7 +280,7 @@ func Setup(
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
- return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID)
+ return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -294,7 +294,7 @@ func Setup(
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
- return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID)
+ return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -386,7 +386,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI)
+ return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
@@ -408,7 +408,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI)
+ return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go
index a8f8893b..5d5507e8 100644
--- a/clientapi/routing/sendevent.go
+++ b/clientapi/routing/sendevent.go
@@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions"
@@ -46,7 +45,6 @@ func SendEvent(
roomID, eventType string, txnID, stateKey *string,
cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI,
- producer *producers.RoomserverProducer,
txnCache *transactions.Cache,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
@@ -80,8 +78,8 @@ func SendEvent(
// pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded
- eventID, err := producer.SendEvents(
- req.Context(),
+ eventID, err := api.SendEvents(
+ req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(verRes.RoomVersion),
},
@@ -89,7 +87,7 @@ func SendEvent(
txnAndSessionID,
)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{
diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go
index 5d3060d7..dc0a6572 100644
--- a/clientapi/routing/sendtodevice.go
+++ b/clientapi/routing/sendtodevice.go
@@ -19,7 +19,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
+ "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/util"
)
@@ -28,7 +28,7 @@ import (
// sends the device events to the EDU Server
func SendToDevice(
req *http.Request, device *authtypes.Device,
- eduProducer *producers.EDUServerProducer,
+ eduAPI api.EDUServerInputAPI,
txnCache *transactions.Cache,
eventType string, txnID *string,
) util.JSONResponse {
@@ -48,8 +48,8 @@ func SendToDevice(
for userID, byUser := range httpReq.Messages {
for deviceID, message := range byUser {
- if err := eduProducer.SendToDevice(
- req.Context(), device.UserID, userID, deviceID, eventType, message,
+ if err := api.SendToDevice(
+ req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go
index ffaa0e66..2eae1658 100644
--- a/clientapi/routing/sendtyping.go
+++ b/clientapi/routing/sendtyping.go
@@ -20,8 +20,8 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/userutil"
+ "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/util"
)
@@ -35,7 +35,7 @@ type typingContentJSON struct {
func SendTyping(
req *http.Request, device *authtypes.Device, roomID string,
userID string, accountDB accounts.Database,
- eduProducer *producers.EDUServerProducer,
+ eduAPI api.EDUServerInputAPI,
) util.JSONResponse {
if device.UserID != userID {
return util.JSONResponse{
@@ -69,8 +69,8 @@ func SendTyping(
return *resErr
}
- if err = eduProducer.SendTyping(
- req.Context(), userID, roomID, r.Typing, r.Timeout,
+ if err = api.SendTyping(
+ req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go
index 8f173bf8..b0df0dd4 100644
--- a/clientapi/threepid/invites.go
+++ b/clientapi/threepid/invites.go
@@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -88,7 +87,7 @@ func CheckAndProcessInvite(
ctx context.Context,
device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, db accounts.Database,
- producer *producers.RoomserverProducer, membership string, roomID string,
+ membership string, roomID string,
evTime time.Time,
) (inviteStoredOnIDServer bool, err error) {
if membership != gomatrixserverlib.Invite || (body.Address == "" && body.IDServer == "" && body.Medium == "") {
@@ -112,7 +111,7 @@ func CheckAndProcessInvite(
// "m.room.third_party_invite" have to be emitted from the data in
// storeInviteRes.
err = emit3PIDInviteEvent(
- ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, producer, evTime,
+ ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, evTime,
)
inviteStoredOnIDServer = err == nil
@@ -331,7 +330,7 @@ func emit3PIDInviteEvent(
ctx context.Context,
body *MembershipRequest, res *idServerStoreInviteResponse,
device *authtypes.Device, roomID string, cfg *config.Dendrite,
- rsAPI api.RoomserverInternalAPI, producer *producers.RoomserverProducer,
+ rsAPI api.RoomserverInternalAPI,
evTime time.Time,
) error {
builder := &gomatrixserverlib.EventBuilder{
@@ -359,8 +358,8 @@ func emit3PIDInviteEvent(
return err
}
- _, err = producer.SendEvents(
- ctx,
+ _, err = api.SendEvents(
+ ctx, rsAPI,
[]gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion),
},
diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go
index ba1af148..b7cbcdc9 100644
--- a/cmd/dendrite-demo-libp2p/main.go
+++ b/cmd/dendrite-demo-libp2p/main.go
@@ -29,7 +29,6 @@ import (
p2phttp "github.com/libp2p/go-libp2p-http"
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
"github.com/matrix-org/dendrite/appservice"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/federationsender"
@@ -149,7 +148,6 @@ func main() {
&base.Base, federation, rsAPI, keyRing,
)
rsAPI.SetFederationSenderAPI(fsAPI)
- eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
@@ -166,7 +164,6 @@ func main() {
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
- EDUProducer: eduProducer,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,
diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go
index e5192c41..62c56129 100644
--- a/cmd/dendrite-demo-yggdrasil/main.go
+++ b/cmd/dendrite-demo-yggdrasil/main.go
@@ -25,7 +25,6 @@ import (
"time"
"github.com/matrix-org/dendrite/appservice"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
@@ -153,7 +152,6 @@ func main() {
rsComponent.SetFederationSenderAPI(fsAPI)
- eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
@@ -170,7 +168,6 @@ func main() {
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
- EDUProducer: eduProducer,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,
diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go
index 7481299e..fcdc000c 100644
--- a/cmd/dendrite-federation-api-server/main.go
+++ b/cmd/dendrite-federation-api-server/main.go
@@ -15,7 +15,6 @@
package main
import (
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi"
"github.com/matrix-org/dendrite/internal/basecomponent"
)
@@ -33,12 +32,10 @@ func main() {
fsAPI := base.FederationSenderHTTPClient()
rsAPI := base.RoomserverHTTPClient()
asAPI := base.AppserviceHTTPClient()
- // TODO: this isn't a producer
- eduProducer := producers.NewEDUServerProducer(base.EDUServerClient())
federationapi.AddPublicRoutes(
base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, keyRing,
- rsAPI, asAPI, fsAPI, eduProducer,
+ rsAPI, asAPI, fsAPI, base.EDUServerClient(),
)
base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI))
diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go
index dda3ade5..195a1ac5 100644
--- a/cmd/dendrite-monolith-server/main.go
+++ b/cmd/dendrite-monolith-server/main.go
@@ -19,7 +19,6 @@ import (
"net/http"
"github.com/matrix-org/dendrite/appservice"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
@@ -105,7 +104,6 @@ func main() {
}
rsComponent.SetFederationSenderAPI(fsAPI)
- eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
@@ -122,7 +120,6 @@ func main() {
AppserviceAPI: asAPI,
EDUInternalAPI: eduInputAPI,
- EDUProducer: eduProducer,
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,
diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go
index 86ae3368..0512ee5c 100644
--- a/cmd/dendritejs/main.go
+++ b/cmd/dendritejs/main.go
@@ -23,7 +23,6 @@ import (
"syscall/js"
"github.com/matrix-org/dendrite/appservice"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
@@ -209,7 +208,6 @@ func main() {
rsAPI.SetFederationSenderAPI(fedSenderAPI)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI)
- eduProducer := producers.NewEDUServerProducer(eduInputAPI)
publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to public rooms db")
@@ -226,7 +224,6 @@ func main() {
AppserviceAPI: asQuery,
EDUInternalAPI: eduInputAPI,
- EDUProducer: eduProducer,
FederationSenderAPI: fedSenderAPI,
RoomserverAPI: rsAPI,
//ServerKeyAPI: serverKeyAPI,
diff --git a/clientapi/producers/eduserver.go b/eduserver/api/wrapper.go
index 102c1fad..c2c4596d 100644
--- a/clientapi/producers/eduserver.go
+++ b/eduserver/api/wrapper.go
@@ -1,3 +1,5 @@
+// Copyright 2020 The Matrix.org Foundation C.I.C.
+//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -10,35 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package producers
+package api
import (
"context"
"encoding/json"
"time"
- "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
-// EDUServerProducer produces events for the EDU server to consume
-type EDUServerProducer struct {
- InputAPI api.EDUServerInputAPI
-}
-
-// NewEDUServerProducer creates a new EDUServerProducer
-func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer {
- return &EDUServerProducer{
- InputAPI: inputAPI,
- }
-}
-
// SendTyping sends a typing event to EDU server
-func (p *EDUServerProducer) SendTyping(
- ctx context.Context, userID, roomID string,
+func SendTyping(
+ ctx context.Context, eduAPI EDUServerInputAPI, userID, roomID string,
typing bool, timeoutMS int64,
) error {
- requestData := api.InputTypingEvent{
+ requestData := InputTypingEvent{
UserID: userID,
RoomID: roomID,
Typing: typing,
@@ -46,24 +35,24 @@ func (p *EDUServerProducer) SendTyping(
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}
- var response api.InputTypingEventResponse
- err := p.InputAPI.InputTypingEvent(
- ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response,
+ var response InputTypingEventResponse
+ err := eduAPI.InputTypingEvent(
+ ctx, &InputTypingEventRequest{InputTypingEvent: requestData}, &response,
)
return err
}
// SendToDevice sends a typing event to EDU server
-func (p *EDUServerProducer) SendToDevice(
- ctx context.Context, sender, userID, deviceID, eventType string,
+func SendToDevice(
+ ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string,
message interface{},
) error {
js, err := json.Marshal(message)
if err != nil {
return err
}
- requestData := api.InputSendToDeviceEvent{
+ requestData := InputSendToDeviceEvent{
UserID: userID,
DeviceID: deviceID,
SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
@@ -72,9 +61,9 @@ func (p *EDUServerProducer) SendToDevice(
Content: js,
},
}
- request := api.InputSendToDeviceEventRequest{
+ request := InputSendToDeviceEventRequest{
InputSendToDeviceEvent: requestData,
}
- response := api.InputSendToDeviceEventResponse{}
- return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response)
+ response := InputSendToDeviceEventResponse{}
+ return eduAPI.InputSendToDeviceEvent(ctx, &request, &response)
}
diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go
index 7aecd272..9299b501 100644
--- a/federationapi/federationapi.go
+++ b/federationapi/federationapi.go
@@ -19,12 +19,11 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
+ eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
- // TODO: Are we really wanting to pull in the producer from clientapi
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/federationapi/routing"
"github.com/matrix-org/gomatrixserverlib"
)
@@ -40,13 +39,12 @@ func AddPublicRoutes(
rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderInternalAPI,
- eduProducer *producers.EDUServerProducer,
+ eduAPI eduserverAPI.EDUServerInputAPI,
) {
- roomserverProducer := producers.NewRoomserverProducer(rsAPI)
routing.Setup(
- router, cfg, rsAPI, asAPI, roomserverProducer,
- eduProducer, federationSenderAPI, *keyRing,
+ router, cfg, rsAPI, asAPI,
+ eduAPI, federationSenderAPI, *keyRing,
federation, accountsDB, deviceDB,
)
}
diff --git a/federationapi/routing/invite.go b/federationapi/routing/invite.go
index 05efe587..908a04fc 100644
--- a/federationapi/routing/invite.go
+++ b/federationapi/routing/invite.go
@@ -20,8 +20,8 @@ import (
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/config"
+ "github.com/matrix-org/dendrite/roomserver/api"
roomserverVersion "github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@@ -34,7 +34,7 @@ func Invite(
roomID string,
eventID string,
cfg *config.Dendrite,
- producer *producers.RoomserverProducer,
+ rsAPI api.RoomserverInternalAPI,
keys gomatrixserverlib.KeyRing,
) util.JSONResponse {
inviteReq := gomatrixserverlib.InviteV2Request{}
@@ -98,8 +98,8 @@ func Invite(
)
// Add the invite event to the roomserver.
- if err = producer.SendInvite(
- httpReq.Context(),
+ if err = api.SendInvite(
+ httpReq.Context(), rsAPI,
signedEvent.Headered(inviteReq.RoomVersion()),
inviteReq.InviteRoomState(),
event.Origin(),
diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go
index 5b4e8db3..593aa169 100644
--- a/federationapi/routing/join.go
+++ b/federationapi/routing/join.go
@@ -21,7 +21,6 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -144,7 +143,6 @@ func SendJoin(
request *gomatrixserverlib.FederationRequest,
cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI,
- producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
roomID, eventID string,
) util.JSONResponse {
@@ -267,8 +265,8 @@ func SendJoin(
// We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName
if !alreadyJoined {
- _, err = producer.SendEvents(
- httpReq.Context(),
+ _, err = api.SendEvents(
+ httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
event.Headered(stateAndAuthChainResponse.RoomVersion),
},
@@ -276,7 +274,7 @@ func SendJoin(
nil,
)
if err != nil {
- util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
}
diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go
index 62ca1145..f998be45 100644
--- a/federationapi/routing/leave.go
+++ b/federationapi/routing/leave.go
@@ -17,7 +17,6 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -113,13 +112,13 @@ func SendLeave(
httpReq *http.Request,
request *gomatrixserverlib.FederationRequest,
cfg *config.Dendrite,
- producer *producers.RoomserverProducer,
+ rsAPI api.RoomserverInternalAPI,
keys gomatrixserverlib.KeyRing,
roomID, eventID string,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
- if err := producer.RsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
+ if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
@@ -194,8 +193,8 @@ func SendLeave(
// Send the events to the room server.
// We are responsible for notifying other servers that the user has left
// the room, so set SendAsServer to cfg.Matrix.ServerName
- _, err = producer.SendEvents(
- httpReq.Context(),
+ _, err = api.SendEvents(
+ httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
event.Headered(verRes.RoomVersion),
},
diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go
index e6c6df65..754dcdfb 100644
--- a/federationapi/routing/routing.go
+++ b/federationapi/routing/routing.go
@@ -21,7 +21,7 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
- "github.com/matrix-org/dendrite/clientapi/producers"
+ eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@@ -49,8 +49,7 @@ func Setup(
cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
- producer *producers.RoomserverProducer,
- eduProducer *producers.EDUServerProducer,
+ eduAPI eduserverAPI.EDUServerInputAPI,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
@@ -82,7 +81,7 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Send(
httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]),
- cfg, rsAPI, producer, eduProducer, keys, federation,
+ cfg, rsAPI, eduAPI, keys, federation,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
@@ -92,14 +91,14 @@ func Setup(
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return Invite(
httpReq, request, vars["roomID"], vars["eventID"],
- cfg, producer, keys,
+ cfg, rsAPI, keys,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
v1fedmux.Handle("/3pid/onbind", internal.MakeExternalAPI("3pid_onbind",
func(req *http.Request) util.JSONResponse {
- return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, producer, federation, accountDB)
+ return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, federation, accountDB)
},
)).Methods(http.MethodPost, http.MethodOptions)
@@ -107,7 +106,7 @@ func Setup(
"exchange_third_party_invite", cfg.Matrix.ServerName, keys, wakeup,
func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse {
return ExchangeThirdPartyInvite(
- httpReq, request, vars["roomID"], rsAPI, cfg, federation, producer,
+ httpReq, request, vars["roomID"], rsAPI, cfg, federation,
)
},
)).Methods(http.MethodPut, http.MethodOptions)
@@ -206,7 +205,7 @@ func Setup(
roomID := vars["roomID"]
eventID := vars["eventID"]
res := SendJoin(
- httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID,
+ httpReq, request, cfg, rsAPI, keys, roomID, eventID,
)
return util.JSONResponse{
Headers: res.Headers,
@@ -224,7 +223,7 @@ func Setup(
roomID := vars["roomID"]
eventID := vars["eventID"]
return SendJoin(
- httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID,
+ httpReq, request, cfg, rsAPI, keys, roomID, eventID,
)
},
)).Methods(http.MethodPut)
@@ -246,7 +245,7 @@ func Setup(
roomID := vars["roomID"]
eventID := vars["eventID"]
return SendLeave(
- httpReq, request, cfg, producer, keys, roomID, eventID,
+ httpReq, request, cfg, rsAPI, keys, roomID, eventID,
)
},
)).Methods(http.MethodPut)
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index a2120d81..a057750f 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -21,7 +21,7 @@ import (
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
+ eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
@@ -36,20 +36,18 @@ func Send(
txnID gomatrixserverlib.TransactionID,
cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI,
- producer *producers.RoomserverProducer,
- eduProducer *producers.EDUServerProducer,
+ eduAPI eduserverAPI.EDUServerInputAPI,
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
t := txnReq{
- context: httpReq.Context(),
- rsAPI: rsAPI,
- producer: producer,
- eduProducer: eduProducer,
- keys: keys,
- federation: federation,
- haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
- newEvents: make(map[string]bool),
+ context: httpReq.Context(),
+ rsAPI: rsAPI,
+ eduAPI: eduAPI,
+ keys: keys,
+ federation: federation,
+ haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
+ newEvents: make(map[string]bool),
}
var txnEvents struct {
@@ -91,12 +89,11 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
- context context.Context
- rsAPI api.RoomserverInternalAPI
- producer *producers.RoomserverProducer
- eduProducer *producers.EDUServerProducer
- keys gomatrixserverlib.JSONVerifier
- federation txnFederationClient
+ context context.Context
+ rsAPI api.RoomserverInternalAPI
+ eduAPI eduserverAPI.EDUServerInputAPI
+ keys gomatrixserverlib.JSONVerifier
+ federation txnFederationClient
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
@@ -262,7 +259,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event")
continue
}
- if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
+ if err := eduserverAPI.SendTyping(t.context, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server")
}
case gomatrixserverlib.MDirectToDevice:
@@ -275,7 +272,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser {
// TODO: check that the user and the device actually exist here
- if err := t.eduProducer.SendToDevice(t.context, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
+ if err := eduserverAPI.SendToDevice(t.context, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
util.GetLogger(t.context).WithError(err).WithFields(logrus.Fields{
"sender": directPayload.Sender,
"user_id": userID,
@@ -325,8 +322,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
}
// pass the event to the roomserver
- _, err := t.producer.SendEvents(
- t.context,
+ _, err := api.SendEvents(
+ t.context, t.rsAPI,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion),
},
@@ -408,7 +405,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// pass the event along with the state to the roomserver using a background context so we don't
// needlessly expire
- return t.producer.SendEventWithState(context.Background(), resolvedState, e.Headered(roomVersion), t.haveEventIDs())
+ return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, e.Headered(roomVersion), t.haveEventIDs())
}
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go
index 3e28a347..b81f1c00 100644
--- a/federationapi/routing/send_test.go
+++ b/federationapi/routing/send_test.go
@@ -8,7 +8,6 @@ import (
"testing"
"time"
- "github.com/matrix-org/dendrite/clientapi/producers"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
fsAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -339,14 +338,13 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserver
func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq {
t := &txnReq{
- context: context.Background(),
- rsAPI: rsAPI,
- producer: producers.NewRoomserverProducer(rsAPI),
- eduProducer: producers.NewEDUServerProducer(&testEDUProducer{}),
- keys: &testNopJSONVerifier{},
- federation: fedClient,
- haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
- newEvents: make(map[string]bool),
+ context: context.Background(),
+ rsAPI: rsAPI,
+ eduAPI: &testEDUProducer{},
+ keys: &testNopJSONVerifier{},
+ federation: fedClient,
+ haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
+ newEvents: make(map[string]bool),
}
t.PDUs = pdus
t.Origin = testOrigin
diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go
index 8053cedd..8f319387 100644
--- a/federationapi/routing/threepid.go
+++ b/federationapi/routing/threepid.go
@@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@@ -60,7 +59,7 @@ var (
func CreateInvitesFrom3PIDInvites(
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI, cfg *config.Dendrite,
- producer *producers.RoomserverProducer, federation *gomatrixserverlib.FederationClient,
+ federation *gomatrixserverlib.FederationClient,
accountDB accounts.Database,
) util.JSONResponse {
var body invites
@@ -92,8 +91,8 @@ func CreateInvitesFrom3PIDInvites(
}
// Send all the events
- if _, err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ if _, err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -111,7 +110,6 @@ func ExchangeThirdPartyInvite(
rsAPI roomserverAPI.RoomserverInternalAPI,
cfg *config.Dendrite,
federation *gomatrixserverlib.FederationClient,
- producer *producers.RoomserverProducer,
) util.JSONResponse {
var builder gomatrixserverlib.EventBuilder
if err := json.Unmarshal(request.Content(), &builder); err != nil {
@@ -176,15 +174,15 @@ func ExchangeThirdPartyInvite(
}
// Send the event to the roomserver
- if _, err = producer.SendEvents(
- httpReq.Context(),
+ if _, err = api.SendEvents(
+ httpReq.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
); err != nil {
- util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go
index d9bb2839..35fcd311 100644
--- a/internal/setup/monolith.go
+++ b/internal/setup/monolith.go
@@ -7,7 +7,6 @@ import (
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
- "github.com/matrix-org/dendrite/clientapi/producers"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
@@ -41,8 +40,6 @@ type Monolith struct {
RoomserverAPI roomserverAPI.RoomserverInternalAPI
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
- // TODO: remove, this isn't even a producer
- EDUProducer *producers.EDUServerProducer
// TODO: can we remove this? It's weird that we are required the database
// yet every other component can do that on its own. libp2p-demo uses a custom
// database though annoyingly.
@@ -65,7 +62,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) {
federationapi.AddPublicRoutes(
publicMux, m.Config, m.AccountDB, m.DeviceDB, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.AppserviceAPI, m.FederationSenderAPI,
- m.EDUProducer,
+ m.EDUInternalAPI,
)
mediaapi.AddPublicRoutes(publicMux, m.Config, m.DeviceDB)
publicroomsapi.AddPublicRoutes(
diff --git a/clientapi/producers/roomserver.go b/roomserver/api/wrapper.go
index f0733db9..97940e0c 100644
--- a/clientapi/producers/roomserver.go
+++ b/roomserver/api/wrapper.go
@@ -1,4 +1,4 @@
-// Copyright 2017 Vector Creations Ltd
+// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -12,63 +12,51 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package producers
+package api
import (
"context"
- "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
)
-// RoomserverProducer produces events for the roomserver to consume.
-type RoomserverProducer struct {
- RsAPI api.RoomserverInternalAPI
-}
-
-// NewRoomserverProducer creates a new RoomserverProducer
-func NewRoomserverProducer(rsAPI api.RoomserverInternalAPI) *RoomserverProducer {
- return &RoomserverProducer{
- RsAPI: rsAPI,
- }
-}
-
-// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
-func (c *RoomserverProducer) SendEvents(
- ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName,
- txnID *api.TransactionID,
+// SendEvents to the roomserver The events are written with KindNew.
+func SendEvents(
+ ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent,
+ sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) (string, error) {
- ires := make([]api.InputRoomEvent, len(events))
+ ires := make([]InputRoomEvent, len(events))
for i, event := range events {
- ires[i] = api.InputRoomEvent{
- Kind: api.KindNew,
+ ires[i] = InputRoomEvent{
+ Kind: KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
SendAsServer: string(sendAsServer),
TransactionID: txnID,
}
}
- return c.SendInputRoomEvents(ctx, ires)
+ return SendInputRoomEvents(ctx, rsAPI, ires)
}
-// SendEventWithState writes an event with KindNew to the roomserver input log
+// SendEventWithState writes an event with KindNew to the roomserver
// with the state at the event as KindOutlier before it. Will not send any event that is
// marked as `true` in haveEventIDs
-func (c *RoomserverProducer) SendEventWithState(
- ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
+func SendEventWithState(
+ ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState,
+ event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
) error {
outliers, err := state.Events()
if err != nil {
return err
}
- var ires []api.InputRoomEvent
+ var ires []InputRoomEvent
for _, outlier := range outliers {
if haveEventIDs[outlier.EventID()] {
continue
}
- ires = append(ires, api.InputRoomEvent{
- Kind: api.KindOutlier,
+ ires = append(ires, InputRoomEvent{
+ Kind: KindOutlier,
Event: outlier.Headered(event.RoomVersion),
AuthEventIDs: outlier.AuthEventIDs(),
})
@@ -79,39 +67,40 @@ func (c *RoomserverProducer) SendEventWithState(
stateEventIDs[i] = state.StateEvents[i].EventID()
}
- ires = append(ires, api.InputRoomEvent{
- Kind: api.KindNew,
+ ires = append(ires, InputRoomEvent{
+ Kind: KindNew,
Event: event,
AuthEventIDs: event.AuthEventIDs(),
HasState: true,
StateEventIDs: stateEventIDs,
})
- _, err = c.SendInputRoomEvents(ctx, ires)
+ _, err = SendInputRoomEvents(ctx, rsAPI, ires)
return err
}
-// SendInputRoomEvents writes the given input room events to the roomserver input API.
-func (c *RoomserverProducer) SendInputRoomEvents(
- ctx context.Context, ires []api.InputRoomEvent,
+// SendInputRoomEvents to the roomserver.
+func SendInputRoomEvents(
+ ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent,
) (eventID string, err error) {
- request := api.InputRoomEventsRequest{InputRoomEvents: ires}
- var response api.InputRoomEventsResponse
- err = c.RsAPI.InputRoomEvents(ctx, &request, &response)
+ request := InputRoomEventsRequest{InputRoomEvents: ires}
+ var response InputRoomEventsResponse
+ err = rsAPI.InputRoomEvents(ctx, &request, &response)
eventID = response.EventID
return
}
-// SendInvite writes the invite event to the roomserver input API.
+// SendInvite event to the roomserver.
// This should only be needed for invite events that occur outside of a known room.
// If we are in the room then the event should be sent using the SendEvents method.
-func (c *RoomserverProducer) SendInvite(
- ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
+func SendInvite(
+ ctx context.Context,
+ rsAPI RoomserverInternalAPI, inviteEvent gomatrixserverlib.HeaderedEvent,
inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
- sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID,
+ sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
) error {
- request := api.InputRoomEventsRequest{
- InputInviteEvents: []api.InputInviteEvent{{
+ request := InputRoomEventsRequest{
+ InputInviteEvents: []InputInviteEvent{{
Event: inviteEvent,
InviteRoomState: inviteRoomState,
RoomVersion: inviteEvent.RoomVersion,
@@ -119,6 +108,6 @@ func (c *RoomserverProducer) SendInvite(
TransactionID: txnID,
}},
}
- var response api.InputRoomEventsResponse
- return c.RsAPI.InputRoomEvents(ctx, &request, &response)
+ var response InputRoomEventsResponse
+ return rsAPI.InputRoomEvents(ctx, &request, &response)
}