diff options
25 files changed, 160 insertions, 226 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 90db9eea..2780f367 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,9 +48,6 @@ func AddPublicRoutes( transactionsCache *transactions.Cache, fsAPI federationSenderAPI.FederationSenderInternalAPI, ) { - roomserverProducer := producers.NewRoomserverProducer(rsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) - syncProducer := &producers.SyncAPIProducer{ Producer: producer, Topic: string(cfg.Kafka.Topics.OutputClientData), @@ -64,8 +61,8 @@ func AddPublicRoutes( } routing.Setup( - router, cfg, roomserverProducer, rsAPI, asAPI, + router, cfg, eduInputAPI, rsAPI, asAPI, accountsDB, deviceDB, federation, *keyRing, - syncProducer, eduProducer, transactionsCache, fsAPI, + syncProducer, transactionsCache, fsAPI, ) } diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 1b4ff184..89f49d35 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -29,7 +29,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -137,21 +136,21 @@ type fledglingEvent struct { // CreateRoom implements /createRoom func CreateRoom( req *http.Request, device *authtypes.Device, - cfg *config.Dendrite, producer *producers.RoomserverProducer, + cfg *config.Dendrite, accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, ) util.JSONResponse { // TODO (#267): Check room ID doesn't clash with an existing one, and we // probably shouldn't be using pseudo-random strings, maybe GUIDs? roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName) - return createRoom(req, device, cfg, roomID, producer, accountDB, rsAPI, asAPI) + return createRoom(req, device, cfg, roomID, accountDB, rsAPI, asAPI) } // createRoom implements /createRoom // nolint: gocyclo func createRoom( req *http.Request, device *authtypes.Device, - cfg *config.Dendrite, roomID string, producer *producers.RoomserverProducer, + cfg *config.Dendrite, roomID string, accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, ) util.JSONResponse { @@ -344,9 +343,9 @@ func createRoom( } // send events to the room server - _, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil) + _, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -404,14 +403,14 @@ func createRoom( } } // Send the invite event to the roomserver. - if err = producer.SendInvite( - req.Context(), + if err = roomserverAPI.SendInvite( + req.Context(), rsAPI, inviteEvent.Headered(roomVersion), strippedState, // invite room state cfg.Matrix.ServerName, // send as server nil, // transaction ID ); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed") return jsonerror.InternalServerError() } } diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 74d92a05..0b2c2bc5 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -46,7 +45,6 @@ func SendMembership( req *http.Request, accountDB accounts.Database, device *authtypes.Device, roomID string, membership string, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, - producer *producers.RoomserverProducer, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} @@ -71,7 +69,7 @@ func SendMembership( } inviteStored, jsonErrResp := checkAndProcessThreepid( - req, device, &body, cfg, rsAPI, accountDB, producer, + req, device, &body, cfg, rsAPI, accountDB, membership, roomID, evTime, ) if jsonErrResp != nil { @@ -112,8 +110,8 @@ func SendMembership( switch membership { case gomatrixserverlib.Invite: // Invites need to be handled specially - err = producer.SendInvite( - req.Context(), + err = roomserverAPI.SendInvite( + req.Context(), rsAPI, event.Headered(verRes.RoomVersion), nil, // ask the roomserver to draw up invite room state for us cfg.Matrix.ServerName, @@ -130,14 +128,14 @@ func SendMembership( }{roomID} fallthrough default: - _, err = producer.SendEvents( - req.Context(), + _, err = roomserverAPI.SendEvents( + req.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, cfg.Matrix.ServerName, nil, ) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } } @@ -252,13 +250,12 @@ func checkAndProcessThreepid( cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, accountDB accounts.Database, - producer *producers.RoomserverProducer, membership, roomID string, evTime time.Time, ) (inviteStored bool, errRes *util.JSONResponse) { inviteStored, err := threepid.CheckAndProcessInvite( - req.Context(), device, body, cfg, rsAPI, accountDB, producer, + req.Context(), device, body, cfg, rsAPI, accountDB, membership, roomID, evTime, ) if err == threepid.ErrMissingParameter { diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index c0fe32a3..642a7288 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -94,8 +93,7 @@ func GetAvatarURL( // nolint:gocyclo func SetAvatarURL( req *http.Request, accountDB accounts.Database, device *authtypes.Device, - userID string, cfg *config.Dendrite, - rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI, + userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -167,8 +165,8 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed") + if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -209,8 +207,7 @@ func GetDisplayName( // nolint:gocyclo func SetDisplayName( req *http.Request, accountDB accounts.Database, device *authtypes.Device, - userID string, cfg *config.Dendrite, - rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI, + userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -282,8 +279,8 @@ func SetDisplayName( return jsonerror.InternalServerError() } - if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed") + if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index eb558205..02470775 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -48,7 +49,7 @@ const pathPrefixUnstable = "/client/unstable" // nolint: gocyclo func Setup( publicAPIMux *mux.Router, cfg *config.Dendrite, - producer *producers.RoomserverProducer, + eduAPI eduServerAPI.EDUServerInputAPI, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, accountDB accounts.Database, @@ -56,7 +57,6 @@ func Setup( federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, syncProducer *producers.SyncAPIProducer, - eduProducer *producers.EDUServerProducer, transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderInternalAPI, ) { @@ -89,7 +89,7 @@ func Setup( r0mux.Handle("/createRoom", internal.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { - return CreateRoom(req, device, cfg, producer, accountDB, rsAPI, asAPI) + return CreateRoom(req, device, cfg, accountDB, rsAPI, asAPI) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/join/{roomIDOrAlias}", @@ -125,7 +125,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI, producer) + return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/send/{eventType}", @@ -134,7 +134,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", @@ -145,7 +145,7 @@ func Setup( } txnID := vars["txnID"] return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, - nil, cfg, rsAPI, producer, transactionsCache) + nil, cfg, rsAPI, transactionsCache) }), ).Methods(http.MethodPut, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/event/{eventID}", @@ -194,7 +194,7 @@ func Setup( if strings.HasSuffix(eventType, "/") { eventType = eventType[:len(eventType)-1] } - return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -205,7 +205,7 @@ func Setup( return util.ErrorResponse(err) } stateKey := vars["stateKey"] - return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -269,7 +269,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer) + return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -280,7 +280,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -294,7 +294,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -386,7 +386,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) + return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -408,7 +408,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) + return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index a8f8893b..5d5507e8 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/transactions" @@ -46,7 +45,6 @@ func SendEvent( roomID, eventType string, txnID, stateKey *string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, - producer *producers.RoomserverProducer, txnCache *transactions.Cache, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} @@ -80,8 +78,8 @@ func SendEvent( // pass the new event to the roomserver and receive the correct event ID // event ID in case of duplicate transaction is discarded - eventID, err := producer.SendEvents( - req.Context(), + eventID, err := api.SendEvents( + req.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ e.Headered(verRes.RoomVersion), }, @@ -89,7 +87,7 @@ func SendEvent( txnAndSessionID, ) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } util.GetLogger(req.Context()).WithFields(logrus.Fields{ diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go index 5d3060d7..dc0a6572 100644 --- a/clientapi/routing/sendtodevice.go +++ b/clientapi/routing/sendtodevice.go @@ -19,7 +19,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/util" ) @@ -28,7 +28,7 @@ import ( // sends the device events to the EDU Server func SendToDevice( req *http.Request, device *authtypes.Device, - eduProducer *producers.EDUServerProducer, + eduAPI api.EDUServerInputAPI, txnCache *transactions.Cache, eventType string, txnID *string, ) util.JSONResponse { @@ -48,8 +48,8 @@ func SendToDevice( for userID, byUser := range httpReq.Messages { for deviceID, message := range byUser { - if err := eduProducer.SendToDevice( - req.Context(), device.UserID, userID, deviceID, eventType, message, + if err := api.SendToDevice( + req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index ffaa0e66..2eae1658 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -20,8 +20,8 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/userutil" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/util" ) @@ -35,7 +35,7 @@ type typingContentJSON struct { func SendTyping( req *http.Request, device *authtypes.Device, roomID string, userID string, accountDB accounts.Database, - eduProducer *producers.EDUServerProducer, + eduAPI api.EDUServerInputAPI, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -69,8 +69,8 @@ func SendTyping( return *resErr } - if err = eduProducer.SendTyping( - req.Context(), userID, roomID, r.Typing, r.Timeout, + if err = api.SendTyping( + req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed") return jsonerror.InternalServerError() diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index 8f173bf8..b0df0dd4 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -26,7 +26,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -88,7 +87,7 @@ func CheckAndProcessInvite( ctx context.Context, device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, db accounts.Database, - producer *producers.RoomserverProducer, membership string, roomID string, + membership string, roomID string, evTime time.Time, ) (inviteStoredOnIDServer bool, err error) { if membership != gomatrixserverlib.Invite || (body.Address == "" && body.IDServer == "" && body.Medium == "") { @@ -112,7 +111,7 @@ func CheckAndProcessInvite( // "m.room.third_party_invite" have to be emitted from the data in // storeInviteRes. err = emit3PIDInviteEvent( - ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, producer, evTime, + ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, evTime, ) inviteStoredOnIDServer = err == nil @@ -331,7 +330,7 @@ func emit3PIDInviteEvent( ctx context.Context, body *MembershipRequest, res *idServerStoreInviteResponse, device *authtypes.Device, roomID string, cfg *config.Dendrite, - rsAPI api.RoomserverInternalAPI, producer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, evTime time.Time, ) error { builder := &gomatrixserverlib.EventBuilder{ @@ -359,8 +358,8 @@ func emit3PIDInviteEvent( return err } - _, err = producer.SendEvents( - ctx, + _, err = api.SendEvents( + ctx, rsAPI, []gomatrixserverlib.HeaderedEvent{ (*event).Headered(queryRes.RoomVersion), }, diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index ba1af148..b7cbcdc9 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -29,7 +29,6 @@ import ( p2phttp "github.com/libp2p/go-libp2p-http" p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/cmd/dendrite-demo-libp2p/storage" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/federationsender" @@ -149,7 +148,6 @@ func main() { &base.Base, federation, rsAPI, keyRing, ) rsAPI.SetFederationSenderAPI(fsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -166,7 +164,6 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - EDUProducer: eduProducer, FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, ServerKeyAPI: serverKeyAPI, diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index e5192c41..62c56129 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -25,7 +25,6 @@ import ( "time" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" @@ -153,7 +152,6 @@ func main() { rsComponent.SetFederationSenderAPI(fsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -170,7 +168,6 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - EDUProducer: eduProducer, FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, ServerKeyAPI: serverKeyAPI, diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go index 7481299e..fcdc000c 100644 --- a/cmd/dendrite-federation-api-server/main.go +++ b/cmd/dendrite-federation-api-server/main.go @@ -15,7 +15,6 @@ package main import ( - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/internal/basecomponent" ) @@ -33,12 +32,10 @@ func main() { fsAPI := base.FederationSenderHTTPClient() rsAPI := base.RoomserverHTTPClient() asAPI := base.AppserviceHTTPClient() - // TODO: this isn't a producer - eduProducer := producers.NewEDUServerProducer(base.EDUServerClient()) federationapi.AddPublicRoutes( base.PublicAPIMux, base.Cfg, accountDB, deviceDB, federation, keyRing, - rsAPI, asAPI, fsAPI, eduProducer, + rsAPI, asAPI, fsAPI, base.EDUServerClient(), ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index dda3ade5..195a1ac5 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -19,7 +19,6 @@ import ( "net/http" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationsender" @@ -105,7 +104,6 @@ func main() { } rsComponent.SetFederationSenderAPI(fsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), base.Cfg.DbProperties(), cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -122,7 +120,6 @@ func main() { AppserviceAPI: asAPI, EDUInternalAPI: eduInputAPI, - EDUProducer: eduProducer, FederationSenderAPI: fsAPI, RoomserverAPI: rsAPI, ServerKeyAPI: serverKeyAPI, diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 86ae3368..0512ee5c 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -23,7 +23,6 @@ import ( "syscall/js" "github.com/matrix-org/dendrite/appservice" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/eduserver" "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationsender" @@ -209,7 +208,6 @@ func main() { rsAPI.SetFederationSenderAPI(fedSenderAPI) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) publicRoomsDB, err := storage.NewPublicRoomsServerDatabase(string(base.Cfg.Database.PublicRoomsAPI), cfg.Matrix.ServerName) if err != nil { logrus.WithError(err).Panicf("failed to connect to public rooms db") @@ -226,7 +224,6 @@ func main() { AppserviceAPI: asQuery, EDUInternalAPI: eduInputAPI, - EDUProducer: eduProducer, FederationSenderAPI: fedSenderAPI, RoomserverAPI: rsAPI, //ServerKeyAPI: serverKeyAPI, diff --git a/clientapi/producers/eduserver.go b/eduserver/api/wrapper.go index 102c1fad..c2c4596d 100644 --- a/clientapi/producers/eduserver.go +++ b/eduserver/api/wrapper.go @@ -1,3 +1,5 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -10,35 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package producers +package api import ( "context" "encoding/json" "time" - "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/gomatrixserverlib" ) -// EDUServerProducer produces events for the EDU server to consume -type EDUServerProducer struct { - InputAPI api.EDUServerInputAPI -} - -// NewEDUServerProducer creates a new EDUServerProducer -func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer { - return &EDUServerProducer{ - InputAPI: inputAPI, - } -} - // SendTyping sends a typing event to EDU server -func (p *EDUServerProducer) SendTyping( - ctx context.Context, userID, roomID string, +func SendTyping( + ctx context.Context, eduAPI EDUServerInputAPI, userID, roomID string, typing bool, timeoutMS int64, ) error { - requestData := api.InputTypingEvent{ + requestData := InputTypingEvent{ UserID: userID, RoomID: roomID, Typing: typing, @@ -46,24 +35,24 @@ func (p *EDUServerProducer) SendTyping( OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), } - var response api.InputTypingEventResponse - err := p.InputAPI.InputTypingEvent( - ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response, + var response InputTypingEventResponse + err := eduAPI.InputTypingEvent( + ctx, &InputTypingEventRequest{InputTypingEvent: requestData}, &response, ) return err } // SendToDevice sends a typing event to EDU server -func (p *EDUServerProducer) SendToDevice( - ctx context.Context, sender, userID, deviceID, eventType string, +func SendToDevice( + ctx context.Context, eduAPI EDUServerInputAPI, sender, userID, deviceID, eventType string, message interface{}, ) error { js, err := json.Marshal(message) if err != nil { return err } - requestData := api.InputSendToDeviceEvent{ + requestData := InputSendToDeviceEvent{ UserID: userID, DeviceID: deviceID, SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ @@ -72,9 +61,9 @@ func (p *EDUServerProducer) SendToDevice( Content: js, }, } - request := api.InputSendToDeviceEventRequest{ + request := InputSendToDeviceEventRequest{ InputSendToDeviceEvent: requestData, } - response := api.InputSendToDeviceEventResponse{} - return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response) + response := InputSendToDeviceEventResponse{} + return eduAPI.InputSendToDeviceEvent(ctx, &request, &response) } diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 7aecd272..9299b501 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -19,12 +19,11 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal/config" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" - // TODO: Are we really wanting to pull in the producer from clientapi - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/federationapi/routing" "github.com/matrix-org/gomatrixserverlib" ) @@ -40,13 +39,12 @@ func AddPublicRoutes( rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, federationSenderAPI federationSenderAPI.FederationSenderInternalAPI, - eduProducer *producers.EDUServerProducer, + eduAPI eduserverAPI.EDUServerInputAPI, ) { - roomserverProducer := producers.NewRoomserverProducer(rsAPI) routing.Setup( - router, cfg, rsAPI, asAPI, roomserverProducer, - eduProducer, federationSenderAPI, *keyRing, + router, cfg, rsAPI, asAPI, + eduAPI, federationSenderAPI, *keyRing, federation, accountsDB, deviceDB, ) } diff --git a/federationapi/routing/invite.go b/federationapi/routing/invite.go index 05efe587..908a04fc 100644 --- a/federationapi/routing/invite.go +++ b/federationapi/routing/invite.go @@ -20,8 +20,8 @@ import ( "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/config" + "github.com/matrix-org/dendrite/roomserver/api" roomserverVersion "github.com/matrix-org/dendrite/roomserver/version" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -34,7 +34,7 @@ func Invite( roomID string, eventID string, cfg *config.Dendrite, - producer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, keys gomatrixserverlib.KeyRing, ) util.JSONResponse { inviteReq := gomatrixserverlib.InviteV2Request{} @@ -98,8 +98,8 @@ func Invite( ) // Add the invite event to the roomserver. - if err = producer.SendInvite( - httpReq.Context(), + if err = api.SendInvite( + httpReq.Context(), rsAPI, signedEvent.Headered(inviteReq.RoomVersion()), inviteReq.InviteRoomState(), event.Origin(), diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index 5b4e8db3..593aa169 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -21,7 +21,6 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -144,7 +143,6 @@ func SendJoin( request *gomatrixserverlib.FederationRequest, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, - producer *producers.RoomserverProducer, keys gomatrixserverlib.KeyRing, roomID, eventID string, ) util.JSONResponse { @@ -267,8 +265,8 @@ func SendJoin( // We are responsible for notifying other servers that the user has joined // the room, so set SendAsServer to cfg.Matrix.ServerName if !alreadyJoined { - _, err = producer.SendEvents( - httpReq.Context(), + _, err = api.SendEvents( + httpReq.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ event.Headered(stateAndAuthChainResponse.RoomVersion), }, @@ -276,7 +274,7 @@ func SendJoin( nil, ) if err != nil { - util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } } diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index 62ca1145..f998be45 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -17,7 +17,6 @@ import ( "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -113,13 +112,13 @@ func SendLeave( httpReq *http.Request, request *gomatrixserverlib.FederationRequest, cfg *config.Dendrite, - producer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, keys gomatrixserverlib.KeyRing, roomID, eventID string, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} - if err := producer.RsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { + if err := rsAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil { return util.JSONResponse{ Code: http.StatusBadRequest, JSON: jsonerror.UnsupportedRoomVersion(err.Error()), @@ -194,8 +193,8 @@ func SendLeave( // Send the events to the room server. // We are responsible for notifying other servers that the user has left // the room, so set SendAsServer to cfg.Matrix.ServerName - _, err = producer.SendEvents( - httpReq.Context(), + _, err = api.SendEvents( + httpReq.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ event.Headered(verRes.RoomVersion), }, diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index e6c6df65..754dcdfb 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -21,7 +21,7 @@ import ( appserviceAPI "github.com/matrix-org/dendrite/appservice/api" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" - "github.com/matrix-org/dendrite/clientapi/producers" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -49,8 +49,7 @@ func Setup( cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, - producer *producers.RoomserverProducer, - eduProducer *producers.EDUServerProducer, + eduAPI eduserverAPI.EDUServerInputAPI, fsAPI federationSenderAPI.FederationSenderInternalAPI, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, @@ -82,7 +81,7 @@ func Setup( func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return Send( httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), - cfg, rsAPI, producer, eduProducer, keys, federation, + cfg, rsAPI, eduAPI, keys, federation, ) }, )).Methods(http.MethodPut, http.MethodOptions) @@ -92,14 +91,14 @@ func Setup( func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return Invite( httpReq, request, vars["roomID"], vars["eventID"], - cfg, producer, keys, + cfg, rsAPI, keys, ) }, )).Methods(http.MethodPut, http.MethodOptions) v1fedmux.Handle("/3pid/onbind", internal.MakeExternalAPI("3pid_onbind", func(req *http.Request) util.JSONResponse { - return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, producer, federation, accountDB) + return CreateInvitesFrom3PIDInvites(req, rsAPI, asAPI, cfg, federation, accountDB) }, )).Methods(http.MethodPost, http.MethodOptions) @@ -107,7 +106,7 @@ func Setup( "exchange_third_party_invite", cfg.Matrix.ServerName, keys, wakeup, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest, vars map[string]string) util.JSONResponse { return ExchangeThirdPartyInvite( - httpReq, request, vars["roomID"], rsAPI, cfg, federation, producer, + httpReq, request, vars["roomID"], rsAPI, cfg, federation, ) }, )).Methods(http.MethodPut, http.MethodOptions) @@ -206,7 +205,7 @@ func Setup( roomID := vars["roomID"] eventID := vars["eventID"] res := SendJoin( - httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID, + httpReq, request, cfg, rsAPI, keys, roomID, eventID, ) return util.JSONResponse{ Headers: res.Headers, @@ -224,7 +223,7 @@ func Setup( roomID := vars["roomID"] eventID := vars["eventID"] return SendJoin( - httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID, + httpReq, request, cfg, rsAPI, keys, roomID, eventID, ) }, )).Methods(http.MethodPut) @@ -246,7 +245,7 @@ func Setup( roomID := vars["roomID"] eventID := vars["eventID"] return SendLeave( - httpReq, request, cfg, producer, keys, roomID, eventID, + httpReq, request, cfg, rsAPI, keys, roomID, eventID, ) }, )).Methods(http.MethodPut) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index a2120d81..a057750f 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -21,7 +21,7 @@ import ( "net/http" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" + eduserverAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" @@ -36,20 +36,18 @@ func Send( txnID gomatrixserverlib.TransactionID, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, - producer *producers.RoomserverProducer, - eduProducer *producers.EDUServerProducer, + eduAPI eduserverAPI.EDUServerInputAPI, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, ) util.JSONResponse { t := txnReq{ - context: httpReq.Context(), - rsAPI: rsAPI, - producer: producer, - eduProducer: eduProducer, - keys: keys, - federation: federation, - haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), - newEvents: make(map[string]bool), + context: httpReq.Context(), + rsAPI: rsAPI, + eduAPI: eduAPI, + keys: keys, + federation: federation, + haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), + newEvents: make(map[string]bool), } var txnEvents struct { @@ -91,12 +89,11 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - context context.Context - rsAPI api.RoomserverInternalAPI - producer *producers.RoomserverProducer - eduProducer *producers.EDUServerProducer - keys gomatrixserverlib.JSONVerifier - federation txnFederationClient + context context.Context + rsAPI api.RoomserverInternalAPI + eduAPI eduserverAPI.EDUServerInputAPI + keys gomatrixserverlib.JSONVerifier + federation txnFederationClient // local cache of events for auth checks, etc - this may include events // which the roomserver is unaware of. haveEvents map[string]*gomatrixserverlib.HeaderedEvent @@ -262,7 +259,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event") continue } - if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { + if err := eduserverAPI.SendTyping(t.context, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server") } case gomatrixserverlib.MDirectToDevice: @@ -275,7 +272,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { for userID, byUser := range directPayload.Messages { for deviceID, message := range byUser { // TODO: check that the user and the device actually exist here - if err := t.eduProducer.SendToDevice(t.context, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { + if err := eduserverAPI.SendToDevice(t.context, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil { util.GetLogger(t.context).WithError(err).WithFields(logrus.Fields{ "sender": directPayload.Sender, "user_id": userID, @@ -325,8 +322,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro } // pass the event to the roomserver - _, err := t.producer.SendEvents( - t.context, + _, err := api.SendEvents( + t.context, t.rsAPI, []gomatrixserverlib.HeaderedEvent{ e.Headered(stateResp.RoomVersion), }, @@ -408,7 +405,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer // pass the event along with the state to the roomserver using a background context so we don't // needlessly expire - return t.producer.SendEventWithState(context.Background(), resolvedState, e.Headered(roomVersion), t.haveEventIDs()) + return api.SendEventWithState(context.Background(), t.rsAPI, resolvedState, e.Headered(roomVersion), t.haveEventIDs()) } // lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 3e28a347..b81f1c00 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/matrix-org/dendrite/clientapi/producers" eduAPI "github.com/matrix-org/dendrite/eduserver/api" fsAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/roomserver/api" @@ -339,14 +338,13 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserver func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq { t := &txnReq{ - context: context.Background(), - rsAPI: rsAPI, - producer: producers.NewRoomserverProducer(rsAPI), - eduProducer: producers.NewEDUServerProducer(&testEDUProducer{}), - keys: &testNopJSONVerifier{}, - federation: fedClient, - haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), - newEvents: make(map[string]bool), + context: context.Background(), + rsAPI: rsAPI, + eduAPI: &testEDUProducer{}, + keys: &testNopJSONVerifier{}, + federation: fedClient, + haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), + newEvents: make(map[string]bool), } t.PDUs = pdus t.Origin = testOrigin diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index 8053cedd..8f319387 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" @@ -60,7 +59,7 @@ var ( func CreateInvitesFrom3PIDInvites( req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, cfg *config.Dendrite, - producer *producers.RoomserverProducer, federation *gomatrixserverlib.FederationClient, + federation *gomatrixserverlib.FederationClient, accountDB accounts.Database, ) util.JSONResponse { var body invites @@ -92,8 +91,8 @@ func CreateInvitesFrom3PIDInvites( } // Send all the events - if _, err := producer.SendEvents(req.Context(), evs, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + if _, err := api.SendEvents(req.Context(), rsAPI, evs, cfg.Matrix.ServerName, nil); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -111,7 +110,6 @@ func ExchangeThirdPartyInvite( rsAPI roomserverAPI.RoomserverInternalAPI, cfg *config.Dendrite, federation *gomatrixserverlib.FederationClient, - producer *producers.RoomserverProducer, ) util.JSONResponse { var builder gomatrixserverlib.EventBuilder if err := json.Unmarshal(request.Content(), &builder); err != nil { @@ -176,15 +174,15 @@ func ExchangeThirdPartyInvite( } // Send the event to the roomserver - if _, err = producer.SendEvents( - httpReq.Context(), + if _, err = api.SendEvents( + httpReq.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ signedEvent.Event.Headered(verRes.RoomVersion), }, cfg.Matrix.ServerName, nil, ); err != nil { - util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index d9bb2839..35fcd311 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -7,7 +7,6 @@ import ( "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" - "github.com/matrix-org/dendrite/clientapi/producers" eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/federationapi" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" @@ -41,8 +40,6 @@ type Monolith struct { RoomserverAPI roomserverAPI.RoomserverInternalAPI ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI - // TODO: remove, this isn't even a producer - EDUProducer *producers.EDUServerProducer // TODO: can we remove this? It's weird that we are required the database // yet every other component can do that on its own. libp2p-demo uses a custom // database though annoyingly. @@ -65,7 +62,7 @@ func (m *Monolith) AddAllPublicRoutes(publicMux *mux.Router) { federationapi.AddPublicRoutes( publicMux, m.Config, m.AccountDB, m.DeviceDB, m.FedClient, m.KeyRing, m.RoomserverAPI, m.AppserviceAPI, m.FederationSenderAPI, - m.EDUProducer, + m.EDUInternalAPI, ) mediaapi.AddPublicRoutes(publicMux, m.Config, m.DeviceDB) publicroomsapi.AddPublicRoutes( diff --git a/clientapi/producers/roomserver.go b/roomserver/api/wrapper.go index f0733db9..97940e0c 100644 --- a/clientapi/producers/roomserver.go +++ b/roomserver/api/wrapper.go @@ -1,4 +1,4 @@ -// Copyright 2017 Vector Creations Ltd +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,63 +12,51 @@ // See the License for the specific language governing permissions and // limitations under the License. -package producers +package api import ( "context" - "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/gomatrixserverlib" ) -// RoomserverProducer produces events for the roomserver to consume. -type RoomserverProducer struct { - RsAPI api.RoomserverInternalAPI -} - -// NewRoomserverProducer creates a new RoomserverProducer -func NewRoomserverProducer(rsAPI api.RoomserverInternalAPI) *RoomserverProducer { - return &RoomserverProducer{ - RsAPI: rsAPI, - } -} - -// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. -func (c *RoomserverProducer) SendEvents( - ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, - txnID *api.TransactionID, +// SendEvents to the roomserver The events are written with KindNew. +func SendEvents( + ctx context.Context, rsAPI RoomserverInternalAPI, events []gomatrixserverlib.HeaderedEvent, + sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, ) (string, error) { - ires := make([]api.InputRoomEvent, len(events)) + ires := make([]InputRoomEvent, len(events)) for i, event := range events { - ires[i] = api.InputRoomEvent{ - Kind: api.KindNew, + ires[i] = InputRoomEvent{ + Kind: KindNew, Event: event, AuthEventIDs: event.AuthEventIDs(), SendAsServer: string(sendAsServer), TransactionID: txnID, } } - return c.SendInputRoomEvents(ctx, ires) + return SendInputRoomEvents(ctx, rsAPI, ires) } -// SendEventWithState writes an event with KindNew to the roomserver input log +// SendEventWithState writes an event with KindNew to the roomserver // with the state at the event as KindOutlier before it. Will not send any event that is // marked as `true` in haveEventIDs -func (c *RoomserverProducer) SendEventWithState( - ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, +func SendEventWithState( + ctx context.Context, rsAPI RoomserverInternalAPI, state *gomatrixserverlib.RespState, + event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, ) error { outliers, err := state.Events() if err != nil { return err } - var ires []api.InputRoomEvent + var ires []InputRoomEvent for _, outlier := range outliers { if haveEventIDs[outlier.EventID()] { continue } - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindOutlier, + ires = append(ires, InputRoomEvent{ + Kind: KindOutlier, Event: outlier.Headered(event.RoomVersion), AuthEventIDs: outlier.AuthEventIDs(), }) @@ -79,39 +67,40 @@ func (c *RoomserverProducer) SendEventWithState( stateEventIDs[i] = state.StateEvents[i].EventID() } - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindNew, + ires = append(ires, InputRoomEvent{ + Kind: KindNew, Event: event, AuthEventIDs: event.AuthEventIDs(), HasState: true, StateEventIDs: stateEventIDs, }) - _, err = c.SendInputRoomEvents(ctx, ires) + _, err = SendInputRoomEvents(ctx, rsAPI, ires) return err } -// SendInputRoomEvents writes the given input room events to the roomserver input API. -func (c *RoomserverProducer) SendInputRoomEvents( - ctx context.Context, ires []api.InputRoomEvent, +// SendInputRoomEvents to the roomserver. +func SendInputRoomEvents( + ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, ) (eventID string, err error) { - request := api.InputRoomEventsRequest{InputRoomEvents: ires} - var response api.InputRoomEventsResponse - err = c.RsAPI.InputRoomEvents(ctx, &request, &response) + request := InputRoomEventsRequest{InputRoomEvents: ires} + var response InputRoomEventsResponse + err = rsAPI.InputRoomEvents(ctx, &request, &response) eventID = response.EventID return } -// SendInvite writes the invite event to the roomserver input API. +// SendInvite event to the roomserver. // This should only be needed for invite events that occur outside of a known room. // If we are in the room then the event should be sent using the SendEvents method. -func (c *RoomserverProducer) SendInvite( - ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, +func SendInvite( + ctx context.Context, + rsAPI RoomserverInternalAPI, inviteEvent gomatrixserverlib.HeaderedEvent, inviteRoomState []gomatrixserverlib.InviteV2StrippedState, - sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID, + sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, ) error { - request := api.InputRoomEventsRequest{ - InputInviteEvents: []api.InputInviteEvent{{ + request := InputRoomEventsRequest{ + InputInviteEvents: []InputInviteEvent{{ Event: inviteEvent, InviteRoomState: inviteRoomState, RoomVersion: inviteEvent.RoomVersion, @@ -119,6 +108,6 @@ func (c *RoomserverProducer) SendInvite( TransactionID: txnID, }}, } - var response api.InputRoomEventsResponse - return c.RsAPI.InputRoomEvents(ctx, &request, &response) + var response InputRoomEventsResponse + return rsAPI.InputRoomEvents(ctx, &request, &response) } |