diff options
author | Kegsay <kegan@matrix.org> | 2020-06-10 12:17:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-06-10 12:17:54 +0100 |
commit | b7187a9a354530c1846c2a97be701ea484f91c2c (patch) | |
tree | 6d14903a444fa8bca964247dbe85ae8d9709d4c8 /clientapi | |
parent | d9d6f4568ce891ae0ae9d2a3449974d3777bd21d (diff) |
Remove clientapi producers which aren't actually producers (#1111)
* Remove clientapi producers which aren't actually producers
They are actually just convenience wrappers around the internal APIs
for roomserver/eduserver. Move their logic to their respective `api`
packages and call them directly.
* Remove TODO
* unbreak ygg
Diffstat (limited to 'clientapi')
-rw-r--r-- | clientapi/clientapi.go | 7 | ||||
-rw-r--r-- | clientapi/producers/eduserver.go | 80 | ||||
-rw-r--r-- | clientapi/producers/roomserver.go | 124 | ||||
-rw-r--r-- | clientapi/routing/createroom.go | 17 | ||||
-rw-r--r-- | clientapi/routing/membership.go | 17 | ||||
-rw-r--r-- | clientapi/routing/profile.go | 15 | ||||
-rw-r--r-- | clientapi/routing/routing.go | 26 | ||||
-rw-r--r-- | clientapi/routing/sendevent.go | 8 | ||||
-rw-r--r-- | clientapi/routing/sendtodevice.go | 8 | ||||
-rw-r--r-- | clientapi/routing/sendtyping.go | 8 | ||||
-rw-r--r-- | clientapi/threepid/invites.go | 11 |
11 files changed, 52 insertions, 269 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index 90db9eea..2780f367 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,9 +48,6 @@ func AddPublicRoutes( transactionsCache *transactions.Cache, fsAPI federationSenderAPI.FederationSenderInternalAPI, ) { - roomserverProducer := producers.NewRoomserverProducer(rsAPI) - eduProducer := producers.NewEDUServerProducer(eduInputAPI) - syncProducer := &producers.SyncAPIProducer{ Producer: producer, Topic: string(cfg.Kafka.Topics.OutputClientData), @@ -64,8 +61,8 @@ func AddPublicRoutes( } routing.Setup( - router, cfg, roomserverProducer, rsAPI, asAPI, + router, cfg, eduInputAPI, rsAPI, asAPI, accountsDB, deviceDB, federation, *keyRing, - syncProducer, eduProducer, transactionsCache, fsAPI, + syncProducer, transactionsCache, fsAPI, ) } diff --git a/clientapi/producers/eduserver.go b/clientapi/producers/eduserver.go deleted file mode 100644 index 102c1fad..00000000 --- a/clientapi/producers/eduserver.go +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package producers - -import ( - "context" - "encoding/json" - "time" - - "github.com/matrix-org/dendrite/eduserver/api" - "github.com/matrix-org/gomatrixserverlib" -) - -// EDUServerProducer produces events for the EDU server to consume -type EDUServerProducer struct { - InputAPI api.EDUServerInputAPI -} - -// NewEDUServerProducer creates a new EDUServerProducer -func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer { - return &EDUServerProducer{ - InputAPI: inputAPI, - } -} - -// SendTyping sends a typing event to EDU server -func (p *EDUServerProducer) SendTyping( - ctx context.Context, userID, roomID string, - typing bool, timeoutMS int64, -) error { - requestData := api.InputTypingEvent{ - UserID: userID, - RoomID: roomID, - Typing: typing, - TimeoutMS: timeoutMS, - OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), - } - - var response api.InputTypingEventResponse - err := p.InputAPI.InputTypingEvent( - ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response, - ) - - return err -} - -// SendToDevice sends a typing event to EDU server -func (p *EDUServerProducer) SendToDevice( - ctx context.Context, sender, userID, deviceID, eventType string, - message interface{}, -) error { - js, err := json.Marshal(message) - if err != nil { - return err - } - requestData := api.InputSendToDeviceEvent{ - UserID: userID, - DeviceID: deviceID, - SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{ - Sender: sender, - Type: eventType, - Content: js, - }, - } - request := api.InputSendToDeviceEventRequest{ - InputSendToDeviceEvent: requestData, - } - response := api.InputSendToDeviceEventResponse{} - return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response) -} diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go deleted file mode 100644 index f0733db9..00000000 --- a/clientapi/producers/roomserver.go +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright 2017 Vector Creations Ltd -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package producers - -import ( - "context" - - "github.com/matrix-org/dendrite/roomserver/api" - "github.com/matrix-org/gomatrixserverlib" -) - -// RoomserverProducer produces events for the roomserver to consume. -type RoomserverProducer struct { - RsAPI api.RoomserverInternalAPI -} - -// NewRoomserverProducer creates a new RoomserverProducer -func NewRoomserverProducer(rsAPI api.RoomserverInternalAPI) *RoomserverProducer { - return &RoomserverProducer{ - RsAPI: rsAPI, - } -} - -// SendEvents writes the given events to the roomserver input log. The events are written with KindNew. -func (c *RoomserverProducer) SendEvents( - ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, - txnID *api.TransactionID, -) (string, error) { - ires := make([]api.InputRoomEvent, len(events)) - for i, event := range events { - ires[i] = api.InputRoomEvent{ - Kind: api.KindNew, - Event: event, - AuthEventIDs: event.AuthEventIDs(), - SendAsServer: string(sendAsServer), - TransactionID: txnID, - } - } - return c.SendInputRoomEvents(ctx, ires) -} - -// SendEventWithState writes an event with KindNew to the roomserver input log -// with the state at the event as KindOutlier before it. Will not send any event that is -// marked as `true` in haveEventIDs -func (c *RoomserverProducer) SendEventWithState( - ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool, -) error { - outliers, err := state.Events() - if err != nil { - return err - } - - var ires []api.InputRoomEvent - for _, outlier := range outliers { - if haveEventIDs[outlier.EventID()] { - continue - } - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindOutlier, - Event: outlier.Headered(event.RoomVersion), - AuthEventIDs: outlier.AuthEventIDs(), - }) - } - - stateEventIDs := make([]string, len(state.StateEvents)) - for i := range state.StateEvents { - stateEventIDs[i] = state.StateEvents[i].EventID() - } - - ires = append(ires, api.InputRoomEvent{ - Kind: api.KindNew, - Event: event, - AuthEventIDs: event.AuthEventIDs(), - HasState: true, - StateEventIDs: stateEventIDs, - }) - - _, err = c.SendInputRoomEvents(ctx, ires) - return err -} - -// SendInputRoomEvents writes the given input room events to the roomserver input API. -func (c *RoomserverProducer) SendInputRoomEvents( - ctx context.Context, ires []api.InputRoomEvent, -) (eventID string, err error) { - request := api.InputRoomEventsRequest{InputRoomEvents: ires} - var response api.InputRoomEventsResponse - err = c.RsAPI.InputRoomEvents(ctx, &request, &response) - eventID = response.EventID - return -} - -// SendInvite writes the invite event to the roomserver input API. -// This should only be needed for invite events that occur outside of a known room. -// If we are in the room then the event should be sent using the SendEvents method. -func (c *RoomserverProducer) SendInvite( - ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent, - inviteRoomState []gomatrixserverlib.InviteV2StrippedState, - sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID, -) error { - request := api.InputRoomEventsRequest{ - InputInviteEvents: []api.InputInviteEvent{{ - Event: inviteEvent, - InviteRoomState: inviteRoomState, - RoomVersion: inviteEvent.RoomVersion, - SendAsServer: string(sendAsServer), - TransactionID: txnID, - }}, - } - var response api.InputRoomEventsResponse - return c.RsAPI.InputRoomEvents(ctx, &request, &response) -} diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 1b4ff184..89f49d35 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -29,7 +29,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -137,21 +136,21 @@ type fledglingEvent struct { // CreateRoom implements /createRoom func CreateRoom( req *http.Request, device *authtypes.Device, - cfg *config.Dendrite, producer *producers.RoomserverProducer, + cfg *config.Dendrite, accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, ) util.JSONResponse { // TODO (#267): Check room ID doesn't clash with an existing one, and we // probably shouldn't be using pseudo-random strings, maybe GUIDs? roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName) - return createRoom(req, device, cfg, roomID, producer, accountDB, rsAPI, asAPI) + return createRoom(req, device, cfg, roomID, accountDB, rsAPI, asAPI) } // createRoom implements /createRoom // nolint: gocyclo func createRoom( req *http.Request, device *authtypes.Device, - cfg *config.Dendrite, roomID string, producer *producers.RoomserverProducer, + cfg *config.Dendrite, roomID string, accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, ) util.JSONResponse { @@ -344,9 +343,9 @@ func createRoom( } // send events to the room server - _, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil) + _, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -404,14 +403,14 @@ func createRoom( } } // Send the invite event to the roomserver. - if err = producer.SendInvite( - req.Context(), + if err = roomserverAPI.SendInvite( + req.Context(), rsAPI, inviteEvent.Headered(roomVersion), strippedState, // invite room state cfg.Matrix.ServerName, // send as server nil, // transaction ID ); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed") return jsonerror.InternalServerError() } } diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index 74d92a05..0b2c2bc5 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -25,7 +25,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/threepid" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -46,7 +45,6 @@ func SendMembership( req *http.Request, accountDB accounts.Database, device *authtypes.Device, roomID string, membership string, cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, - producer *producers.RoomserverProducer, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} verRes := api.QueryRoomVersionForRoomResponse{} @@ -71,7 +69,7 @@ func SendMembership( } inviteStored, jsonErrResp := checkAndProcessThreepid( - req, device, &body, cfg, rsAPI, accountDB, producer, + req, device, &body, cfg, rsAPI, accountDB, membership, roomID, evTime, ) if jsonErrResp != nil { @@ -112,8 +110,8 @@ func SendMembership( switch membership { case gomatrixserverlib.Invite: // Invites need to be handled specially - err = producer.SendInvite( - req.Context(), + err = roomserverAPI.SendInvite( + req.Context(), rsAPI, event.Headered(verRes.RoomVersion), nil, // ask the roomserver to draw up invite room state for us cfg.Matrix.ServerName, @@ -130,14 +128,14 @@ func SendMembership( }{roomID} fallthrough default: - _, err = producer.SendEvents( - req.Context(), + _, err = roomserverAPI.SendEvents( + req.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)}, cfg.Matrix.ServerName, nil, ) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } } @@ -252,13 +250,12 @@ func checkAndProcessThreepid( cfg *config.Dendrite, rsAPI roomserverAPI.RoomserverInternalAPI, accountDB accounts.Database, - producer *producers.RoomserverProducer, membership, roomID string, evTime time.Time, ) (inviteStored bool, errRes *util.JSONResponse) { inviteStored, err := threepid.CheckAndProcessInvite( - req.Context(), device, body, cfg, rsAPI, accountDB, producer, + req.Context(), device, body, cfg, rsAPI, accountDB, membership, roomID, evTime, ) if err == threepid.ErrMissingParameter { diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index c0fe32a3..642a7288 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -24,7 +24,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -94,8 +93,7 @@ func GetAvatarURL( // nolint:gocyclo func SetAvatarURL( req *http.Request, accountDB accounts.Database, device *authtypes.Device, - userID string, cfg *config.Dendrite, - rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI, + userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -167,8 +165,8 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed") + if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -209,8 +207,7 @@ func GetDisplayName( // nolint:gocyclo func SetDisplayName( req *http.Request, accountDB accounts.Database, device *authtypes.Device, - userID string, cfg *config.Dendrite, - rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI, + userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, ) util.JSONResponse { if userID != device.UserID { return util.JSONResponse{ @@ -282,8 +279,8 @@ func SetDisplayName( return jsonerror.InternalServerError() } - if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil { - util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed") + if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil { + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index eb558205..02470775 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -27,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/jsonerror" "github.com/matrix-org/dendrite/clientapi/producers" + eduServerAPI "github.com/matrix-org/dendrite/eduserver/api" federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" @@ -48,7 +49,7 @@ const pathPrefixUnstable = "/client/unstable" // nolint: gocyclo func Setup( publicAPIMux *mux.Router, cfg *config.Dendrite, - producer *producers.RoomserverProducer, + eduAPI eduServerAPI.EDUServerInputAPI, rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI, accountDB accounts.Database, @@ -56,7 +57,6 @@ func Setup( federation *gomatrixserverlib.FederationClient, keyRing gomatrixserverlib.KeyRing, syncProducer *producers.SyncAPIProducer, - eduProducer *producers.EDUServerProducer, transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderInternalAPI, ) { @@ -89,7 +89,7 @@ func Setup( r0mux.Handle("/createRoom", internal.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse { - return CreateRoom(req, device, cfg, producer, accountDB, rsAPI, asAPI) + return CreateRoom(req, device, cfg, accountDB, rsAPI, asAPI) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/join/{roomIDOrAlias}", @@ -125,7 +125,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI, producer) + return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/send/{eventType}", @@ -134,7 +134,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil) }), ).Methods(http.MethodPost, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}", @@ -145,7 +145,7 @@ func Setup( } txnID := vars["txnID"] return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID, - nil, cfg, rsAPI, producer, transactionsCache) + nil, cfg, rsAPI, transactionsCache) }), ).Methods(http.MethodPut, http.MethodOptions) r0mux.Handle("/rooms/{roomID}/event/{eventID}", @@ -194,7 +194,7 @@ func Setup( if strings.HasSuffix(eventType, "/") { eventType = eventType[:len(eventType)-1] } - return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -205,7 +205,7 @@ func Setup( return util.ErrorResponse(err) } stateKey := vars["stateKey"] - return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, producer, nil) + return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -269,7 +269,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer) + return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -280,7 +280,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -294,7 +294,7 @@ func Setup( return util.ErrorResponse(err) } txnID := vars["txnID"] - return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID) + return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID) }), ).Methods(http.MethodPut, http.MethodOptions) @@ -386,7 +386,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) + return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows @@ -408,7 +408,7 @@ func Setup( if err != nil { return util.ErrorResponse(err) } - return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI) + return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI) }), ).Methods(http.MethodPut, http.MethodOptions) // Browsers use the OPTIONS HTTP method to check if the CORS policy allows diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index a8f8893b..5d5507e8 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -20,7 +20,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/internal/transactions" @@ -46,7 +45,6 @@ func SendEvent( roomID, eventType string, txnID, stateKey *string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, - producer *producers.RoomserverProducer, txnCache *transactions.Cache, ) util.JSONResponse { verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID} @@ -80,8 +78,8 @@ func SendEvent( // pass the new event to the roomserver and receive the correct event ID // event ID in case of duplicate transaction is discarded - eventID, err := producer.SendEvents( - req.Context(), + eventID, err := api.SendEvents( + req.Context(), rsAPI, []gomatrixserverlib.HeaderedEvent{ e.Headered(verRes.RoomVersion), }, @@ -89,7 +87,7 @@ func SendEvent( txnAndSessionID, ) if err != nil { - util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed") + util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } util.GetLogger(req.Context()).WithFields(logrus.Fields{ diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go index 5d3060d7..dc0a6572 100644 --- a/clientapi/routing/sendtodevice.go +++ b/clientapi/routing/sendtodevice.go @@ -19,7 +19,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/dendrite/internal/transactions" "github.com/matrix-org/util" ) @@ -28,7 +28,7 @@ import ( // sends the device events to the EDU Server func SendToDevice( req *http.Request, device *authtypes.Device, - eduProducer *producers.EDUServerProducer, + eduAPI api.EDUServerInputAPI, txnCache *transactions.Cache, eventType string, txnID *string, ) util.JSONResponse { @@ -48,8 +48,8 @@ func SendToDevice( for userID, byUser := range httpReq.Messages { for deviceID, message := range byUser { - if err := eduProducer.SendToDevice( - req.Context(), device.UserID, userID, deviceID, eventType, message, + if err := api.SendToDevice( + req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go index ffaa0e66..2eae1658 100644 --- a/clientapi/routing/sendtyping.go +++ b/clientapi/routing/sendtyping.go @@ -20,8 +20,8 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" "github.com/matrix-org/dendrite/clientapi/httputil" "github.com/matrix-org/dendrite/clientapi/jsonerror" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/clientapi/userutil" + "github.com/matrix-org/dendrite/eduserver/api" "github.com/matrix-org/util" ) @@ -35,7 +35,7 @@ type typingContentJSON struct { func SendTyping( req *http.Request, device *authtypes.Device, roomID string, userID string, accountDB accounts.Database, - eduProducer *producers.EDUServerProducer, + eduAPI api.EDUServerInputAPI, ) util.JSONResponse { if device.UserID != userID { return util.JSONResponse{ @@ -69,8 +69,8 @@ func SendTyping( return *resErr } - if err = eduProducer.SendTyping( - req.Context(), userID, roomID, r.Typing, r.Timeout, + if err = api.SendTyping( + req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed") return jsonerror.InternalServerError() diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index 8f173bf8..b0df0dd4 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -26,7 +26,6 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/dendrite/clientapi/auth/storage/accounts" - "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/config" "github.com/matrix-org/dendrite/roomserver/api" @@ -88,7 +87,7 @@ func CheckAndProcessInvite( ctx context.Context, device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI, db accounts.Database, - producer *producers.RoomserverProducer, membership string, roomID string, + membership string, roomID string, evTime time.Time, ) (inviteStoredOnIDServer bool, err error) { if membership != gomatrixserverlib.Invite || (body.Address == "" && body.IDServer == "" && body.Medium == "") { @@ -112,7 +111,7 @@ func CheckAndProcessInvite( // "m.room.third_party_invite" have to be emitted from the data in // storeInviteRes. err = emit3PIDInviteEvent( - ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, producer, evTime, + ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, evTime, ) inviteStoredOnIDServer = err == nil @@ -331,7 +330,7 @@ func emit3PIDInviteEvent( ctx context.Context, body *MembershipRequest, res *idServerStoreInviteResponse, device *authtypes.Device, roomID string, cfg *config.Dendrite, - rsAPI api.RoomserverInternalAPI, producer *producers.RoomserverProducer, + rsAPI api.RoomserverInternalAPI, evTime time.Time, ) error { builder := &gomatrixserverlib.EventBuilder{ @@ -359,8 +358,8 @@ func emit3PIDInviteEvent( return err } - _, err = producer.SendEvents( - ctx, + _, err = api.SendEvents( + ctx, rsAPI, []gomatrixserverlib.HeaderedEvent{ (*event).Headered(queryRes.RoomVersion), }, |