aboutsummaryrefslogtreecommitdiff
path: root/clientapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-10 12:17:54 +0100
committerGitHub <noreply@github.com>2020-06-10 12:17:54 +0100
commitb7187a9a354530c1846c2a97be701ea484f91c2c (patch)
tree6d14903a444fa8bca964247dbe85ae8d9709d4c8 /clientapi
parentd9d6f4568ce891ae0ae9d2a3449974d3777bd21d (diff)
Remove clientapi producers which aren't actually producers (#1111)
* Remove clientapi producers which aren't actually producers They are actually just convenience wrappers around the internal APIs for roomserver/eduserver. Move their logic to their respective `api` packages and call them directly. * Remove TODO * unbreak ygg
Diffstat (limited to 'clientapi')
-rw-r--r--clientapi/clientapi.go7
-rw-r--r--clientapi/producers/eduserver.go80
-rw-r--r--clientapi/producers/roomserver.go124
-rw-r--r--clientapi/routing/createroom.go17
-rw-r--r--clientapi/routing/membership.go17
-rw-r--r--clientapi/routing/profile.go15
-rw-r--r--clientapi/routing/routing.go26
-rw-r--r--clientapi/routing/sendevent.go8
-rw-r--r--clientapi/routing/sendtodevice.go8
-rw-r--r--clientapi/routing/sendtyping.go8
-rw-r--r--clientapi/threepid/invites.go11
11 files changed, 52 insertions, 269 deletions
diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go
index 90db9eea..2780f367 100644
--- a/clientapi/clientapi.go
+++ b/clientapi/clientapi.go
@@ -48,9 +48,6 @@ func AddPublicRoutes(
transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
) {
- roomserverProducer := producers.NewRoomserverProducer(rsAPI)
- eduProducer := producers.NewEDUServerProducer(eduInputAPI)
-
syncProducer := &producers.SyncAPIProducer{
Producer: producer,
Topic: string(cfg.Kafka.Topics.OutputClientData),
@@ -64,8 +61,8 @@ func AddPublicRoutes(
}
routing.Setup(
- router, cfg, roomserverProducer, rsAPI, asAPI,
+ router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, deviceDB, federation, *keyRing,
- syncProducer, eduProducer, transactionsCache, fsAPI,
+ syncProducer, transactionsCache, fsAPI,
)
}
diff --git a/clientapi/producers/eduserver.go b/clientapi/producers/eduserver.go
deleted file mode 100644
index 102c1fad..00000000
--- a/clientapi/producers/eduserver.go
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package producers
-
-import (
- "context"
- "encoding/json"
- "time"
-
- "github.com/matrix-org/dendrite/eduserver/api"
- "github.com/matrix-org/gomatrixserverlib"
-)
-
-// EDUServerProducer produces events for the EDU server to consume
-type EDUServerProducer struct {
- InputAPI api.EDUServerInputAPI
-}
-
-// NewEDUServerProducer creates a new EDUServerProducer
-func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer {
- return &EDUServerProducer{
- InputAPI: inputAPI,
- }
-}
-
-// SendTyping sends a typing event to EDU server
-func (p *EDUServerProducer) SendTyping(
- ctx context.Context, userID, roomID string,
- typing bool, timeoutMS int64,
-) error {
- requestData := api.InputTypingEvent{
- UserID: userID,
- RoomID: roomID,
- Typing: typing,
- TimeoutMS: timeoutMS,
- OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
- }
-
- var response api.InputTypingEventResponse
- err := p.InputAPI.InputTypingEvent(
- ctx, &api.InputTypingEventRequest{InputTypingEvent: requestData}, &response,
- )
-
- return err
-}
-
-// SendToDevice sends a typing event to EDU server
-func (p *EDUServerProducer) SendToDevice(
- ctx context.Context, sender, userID, deviceID, eventType string,
- message interface{},
-) error {
- js, err := json.Marshal(message)
- if err != nil {
- return err
- }
- requestData := api.InputSendToDeviceEvent{
- UserID: userID,
- DeviceID: deviceID,
- SendToDeviceEvent: gomatrixserverlib.SendToDeviceEvent{
- Sender: sender,
- Type: eventType,
- Content: js,
- },
- }
- request := api.InputSendToDeviceEventRequest{
- InputSendToDeviceEvent: requestData,
- }
- response := api.InputSendToDeviceEventResponse{}
- return p.InputAPI.InputSendToDeviceEvent(ctx, &request, &response)
-}
diff --git a/clientapi/producers/roomserver.go b/clientapi/producers/roomserver.go
deleted file mode 100644
index f0733db9..00000000
--- a/clientapi/producers/roomserver.go
+++ /dev/null
@@ -1,124 +0,0 @@
-// Copyright 2017 Vector Creations Ltd
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package producers
-
-import (
- "context"
-
- "github.com/matrix-org/dendrite/roomserver/api"
- "github.com/matrix-org/gomatrixserverlib"
-)
-
-// RoomserverProducer produces events for the roomserver to consume.
-type RoomserverProducer struct {
- RsAPI api.RoomserverInternalAPI
-}
-
-// NewRoomserverProducer creates a new RoomserverProducer
-func NewRoomserverProducer(rsAPI api.RoomserverInternalAPI) *RoomserverProducer {
- return &RoomserverProducer{
- RsAPI: rsAPI,
- }
-}
-
-// SendEvents writes the given events to the roomserver input log. The events are written with KindNew.
-func (c *RoomserverProducer) SendEvents(
- ctx context.Context, events []gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName,
- txnID *api.TransactionID,
-) (string, error) {
- ires := make([]api.InputRoomEvent, len(events))
- for i, event := range events {
- ires[i] = api.InputRoomEvent{
- Kind: api.KindNew,
- Event: event,
- AuthEventIDs: event.AuthEventIDs(),
- SendAsServer: string(sendAsServer),
- TransactionID: txnID,
- }
- }
- return c.SendInputRoomEvents(ctx, ires)
-}
-
-// SendEventWithState writes an event with KindNew to the roomserver input log
-// with the state at the event as KindOutlier before it. Will not send any event that is
-// marked as `true` in haveEventIDs
-func (c *RoomserverProducer) SendEventWithState(
- ctx context.Context, state *gomatrixserverlib.RespState, event gomatrixserverlib.HeaderedEvent, haveEventIDs map[string]bool,
-) error {
- outliers, err := state.Events()
- if err != nil {
- return err
- }
-
- var ires []api.InputRoomEvent
- for _, outlier := range outliers {
- if haveEventIDs[outlier.EventID()] {
- continue
- }
- ires = append(ires, api.InputRoomEvent{
- Kind: api.KindOutlier,
- Event: outlier.Headered(event.RoomVersion),
- AuthEventIDs: outlier.AuthEventIDs(),
- })
- }
-
- stateEventIDs := make([]string, len(state.StateEvents))
- for i := range state.StateEvents {
- stateEventIDs[i] = state.StateEvents[i].EventID()
- }
-
- ires = append(ires, api.InputRoomEvent{
- Kind: api.KindNew,
- Event: event,
- AuthEventIDs: event.AuthEventIDs(),
- HasState: true,
- StateEventIDs: stateEventIDs,
- })
-
- _, err = c.SendInputRoomEvents(ctx, ires)
- return err
-}
-
-// SendInputRoomEvents writes the given input room events to the roomserver input API.
-func (c *RoomserverProducer) SendInputRoomEvents(
- ctx context.Context, ires []api.InputRoomEvent,
-) (eventID string, err error) {
- request := api.InputRoomEventsRequest{InputRoomEvents: ires}
- var response api.InputRoomEventsResponse
- err = c.RsAPI.InputRoomEvents(ctx, &request, &response)
- eventID = response.EventID
- return
-}
-
-// SendInvite writes the invite event to the roomserver input API.
-// This should only be needed for invite events that occur outside of a known room.
-// If we are in the room then the event should be sent using the SendEvents method.
-func (c *RoomserverProducer) SendInvite(
- ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
- inviteRoomState []gomatrixserverlib.InviteV2StrippedState,
- sendAsServer gomatrixserverlib.ServerName, txnID *api.TransactionID,
-) error {
- request := api.InputRoomEventsRequest{
- InputInviteEvents: []api.InputInviteEvent{{
- Event: inviteEvent,
- InviteRoomState: inviteRoomState,
- RoomVersion: inviteEvent.RoomVersion,
- SendAsServer: string(sendAsServer),
- TransactionID: txnID,
- }},
- }
- var response api.InputRoomEventsResponse
- return c.RsAPI.InputRoomEvents(ctx, &request, &response)
-}
diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go
index 1b4ff184..89f49d35 100644
--- a/clientapi/routing/createroom.go
+++ b/clientapi/routing/createroom.go
@@ -29,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@@ -137,21 +136,21 @@ type fledglingEvent struct {
// CreateRoom implements /createRoom
func CreateRoom(
req *http.Request, device *authtypes.Device,
- cfg *config.Dendrite, producer *producers.RoomserverProducer,
+ cfg *config.Dendrite,
accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
// TODO (#267): Check room ID doesn't clash with an existing one, and we
// probably shouldn't be using pseudo-random strings, maybe GUIDs?
roomID := fmt.Sprintf("!%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
- return createRoom(req, device, cfg, roomID, producer, accountDB, rsAPI, asAPI)
+ return createRoom(req, device, cfg, roomID, accountDB, rsAPI, asAPI)
}
// createRoom implements /createRoom
// nolint: gocyclo
func createRoom(
req *http.Request, device *authtypes.Device,
- cfg *config.Dendrite, roomID string, producer *producers.RoomserverProducer,
+ cfg *config.Dendrite, roomID string,
accountDB accounts.Database, rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
) util.JSONResponse {
@@ -344,9 +343,9 @@ func createRoom(
}
// send events to the room server
- _, err = producer.SendEvents(req.Context(), builtEvents, cfg.Matrix.ServerName, nil)
+ _, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -404,14 +403,14 @@ func createRoom(
}
}
// Send the invite event to the roomserver.
- if err = producer.SendInvite(
- req.Context(),
+ if err = roomserverAPI.SendInvite(
+ req.Context(), rsAPI,
inviteEvent.Headered(roomVersion),
strippedState, // invite room state
cfg.Matrix.ServerName, // send as server
nil, // transaction ID
); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(req.Context()).WithError(err).Error("SendInvite failed")
return jsonerror.InternalServerError()
}
}
diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go
index 74d92a05..0b2c2bc5 100644
--- a/clientapi/routing/membership.go
+++ b/clientapi/routing/membership.go
@@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/threepid"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@@ -46,7 +45,6 @@ func SendMembership(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
roomID string, membership string, cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI, asAPI appserviceAPI.AppServiceQueryAPI,
- producer *producers.RoomserverProducer,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
@@ -71,7 +69,7 @@ func SendMembership(
}
inviteStored, jsonErrResp := checkAndProcessThreepid(
- req, device, &body, cfg, rsAPI, accountDB, producer,
+ req, device, &body, cfg, rsAPI, accountDB,
membership, roomID, evTime,
)
if jsonErrResp != nil {
@@ -112,8 +110,8 @@ func SendMembership(
switch membership {
case gomatrixserverlib.Invite:
// Invites need to be handled specially
- err = producer.SendInvite(
- req.Context(),
+ err = roomserverAPI.SendInvite(
+ req.Context(), rsAPI,
event.Headered(verRes.RoomVersion),
nil, // ask the roomserver to draw up invite room state for us
cfg.Matrix.ServerName,
@@ -130,14 +128,14 @@ func SendMembership(
}{roomID}
fallthrough
default:
- _, err = producer.SendEvents(
- req.Context(),
+ _, err = roomserverAPI.SendEvents(
+ req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{event.Headered(verRes.RoomVersion)},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
}
@@ -252,13 +250,12 @@ func checkAndProcessThreepid(
cfg *config.Dendrite,
rsAPI roomserverAPI.RoomserverInternalAPI,
accountDB accounts.Database,
- producer *producers.RoomserverProducer,
membership, roomID string,
evTime time.Time,
) (inviteStored bool, errRes *util.JSONResponse) {
inviteStored, err := threepid.CheckAndProcessInvite(
- req.Context(), device, body, cfg, rsAPI, accountDB, producer,
+ req.Context(), device, body, cfg, rsAPI, accountDB,
membership, roomID, evTime,
)
if err == threepid.ErrMissingParameter {
diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go
index c0fe32a3..642a7288 100644
--- a/clientapi/routing/profile.go
+++ b/clientapi/routing/profile.go
@@ -24,7 +24,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -94,8 +93,7 @@ func GetAvatarURL(
// nolint:gocyclo
func SetAvatarURL(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
- userID string, cfg *config.Dendrite,
- rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
+ userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
@@ -167,8 +165,8 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
- if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed")
+ if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@@ -209,8 +207,7 @@ func GetDisplayName(
// nolint:gocyclo
func SetDisplayName(
req *http.Request, accountDB accounts.Database, device *authtypes.Device,
- userID string, cfg *config.Dendrite,
- rsProducer *producers.RoomserverProducer, rsAPI api.RoomserverInternalAPI,
+ userID string, cfg *config.Dendrite, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
return util.JSONResponse{
@@ -282,8 +279,8 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
- if _, err := rsProducer.SendEvents(req.Context(), events, cfg.Matrix.ServerName, nil); err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("rsProducer.SendEvents failed")
+ if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go
index eb558205..02470775 100644
--- a/clientapi/routing/routing.go
+++ b/clientapi/routing/routing.go
@@ -27,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
+ eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
@@ -48,7 +49,7 @@ const pathPrefixUnstable = "/client/unstable"
// nolint: gocyclo
func Setup(
publicAPIMux *mux.Router, cfg *config.Dendrite,
- producer *producers.RoomserverProducer,
+ eduAPI eduServerAPI.EDUServerInputAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
accountDB accounts.Database,
@@ -56,7 +57,6 @@ func Setup(
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing,
syncProducer *producers.SyncAPIProducer,
- eduProducer *producers.EDUServerProducer,
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
) {
@@ -89,7 +89,7 @@ func Setup(
r0mux.Handle("/createRoom",
internal.MakeAuthAPI("createRoom", authData, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
- return CreateRoom(req, device, cfg, producer, accountDB, rsAPI, asAPI)
+ return CreateRoom(req, device, cfg, accountDB, rsAPI, asAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/join/{roomIDOrAlias}",
@@ -125,7 +125,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI, producer)
+ return SendMembership(req, accountDB, device, vars["roomID"], vars["membership"], cfg, rsAPI, asAPI)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}",
@@ -134,7 +134,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, producer, nil)
+ return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, nil, cfg, rsAPI, nil)
}),
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
@@ -145,7 +145,7 @@ func Setup(
}
txnID := vars["txnID"]
return SendEvent(req, device, vars["roomID"], vars["eventType"], &txnID,
- nil, cfg, rsAPI, producer, transactionsCache)
+ nil, cfg, rsAPI, transactionsCache)
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/event/{eventID}",
@@ -194,7 +194,7 @@ func Setup(
if strings.HasSuffix(eventType, "/") {
eventType = eventType[:len(eventType)-1]
}
- return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, producer, nil)
+ return SendEvent(req, device, vars["roomID"], eventType, nil, &emptyString, cfg, rsAPI, nil)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -205,7 +205,7 @@ func Setup(
return util.ErrorResponse(err)
}
stateKey := vars["stateKey"]
- return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, producer, nil)
+ return SendEvent(req, device, vars["roomID"], vars["eventType"], nil, &stateKey, cfg, rsAPI, nil)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -269,7 +269,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduProducer)
+ return SendTyping(req, device, vars["roomID"], vars["userID"], accountDB, eduAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -280,7 +280,7 @@ func Setup(
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
- return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID)
+ return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -294,7 +294,7 @@ func Setup(
return util.ErrorResponse(err)
}
txnID := vars["txnID"]
- return SendToDevice(req, device, eduProducer, transactionsCache, vars["eventType"], &txnID)
+ return SendToDevice(req, device, eduAPI, transactionsCache, vars["eventType"], &txnID)
}),
).Methods(http.MethodPut, http.MethodOptions)
@@ -386,7 +386,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, producer, rsAPI)
+ return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
@@ -408,7 +408,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return SetDisplayName(req, accountDB, device, vars["userID"], cfg, producer, rsAPI)
+ return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go
index a8f8893b..5d5507e8 100644
--- a/clientapi/routing/sendevent.go
+++ b/clientapi/routing/sendevent.go
@@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/transactions"
@@ -46,7 +45,6 @@ func SendEvent(
roomID, eventType string, txnID, stateKey *string,
cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI,
- producer *producers.RoomserverProducer,
txnCache *transactions.Cache,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
@@ -80,8 +78,8 @@ func SendEvent(
// pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded
- eventID, err := producer.SendEvents(
- req.Context(),
+ eventID, err := api.SendEvents(
+ req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(verRes.RoomVersion),
},
@@ -89,7 +87,7 @@ func SendEvent(
txnAndSessionID,
)
if err != nil {
- util.GetLogger(req.Context()).WithError(err).Error("producer.SendEvents failed")
+ util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{
diff --git a/clientapi/routing/sendtodevice.go b/clientapi/routing/sendtodevice.go
index 5d3060d7..dc0a6572 100644
--- a/clientapi/routing/sendtodevice.go
+++ b/clientapi/routing/sendtodevice.go
@@ -19,7 +19,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
+ "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal/transactions"
"github.com/matrix-org/util"
)
@@ -28,7 +28,7 @@ import (
// sends the device events to the EDU Server
func SendToDevice(
req *http.Request, device *authtypes.Device,
- eduProducer *producers.EDUServerProducer,
+ eduAPI api.EDUServerInputAPI,
txnCache *transactions.Cache,
eventType string, txnID *string,
) util.JSONResponse {
@@ -48,8 +48,8 @@ func SendToDevice(
for userID, byUser := range httpReq.Messages {
for deviceID, message := range byUser {
- if err := eduProducer.SendToDevice(
- req.Context(), device.UserID, userID, deviceID, eventType, message,
+ if err := api.SendToDevice(
+ req.Context(), eduAPI, device.UserID, userID, deviceID, eventType, message,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.SendToDevice failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/routing/sendtyping.go b/clientapi/routing/sendtyping.go
index ffaa0e66..2eae1658 100644
--- a/clientapi/routing/sendtyping.go
+++ b/clientapi/routing/sendtyping.go
@@ -20,8 +20,8 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/userutil"
+ "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/util"
)
@@ -35,7 +35,7 @@ type typingContentJSON struct {
func SendTyping(
req *http.Request, device *authtypes.Device, roomID string,
userID string, accountDB accounts.Database,
- eduProducer *producers.EDUServerProducer,
+ eduAPI api.EDUServerInputAPI,
) util.JSONResponse {
if device.UserID != userID {
return util.JSONResponse{
@@ -69,8 +69,8 @@ func SendTyping(
return *resErr
}
- if err = eduProducer.SendTyping(
- req.Context(), userID, roomID, r.Typing, r.Timeout,
+ if err = api.SendTyping(
+ req.Context(), eduAPI, userID, roomID, r.Typing, r.Timeout,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("eduProducer.Send failed")
return jsonerror.InternalServerError()
diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go
index 8f173bf8..b0df0dd4 100644
--- a/clientapi/threepid/invites.go
+++ b/clientapi/threepid/invites.go
@@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/auth/storage/accounts"
- "github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/roomserver/api"
@@ -88,7 +87,7 @@ func CheckAndProcessInvite(
ctx context.Context,
device *authtypes.Device, body *MembershipRequest, cfg *config.Dendrite,
rsAPI api.RoomserverInternalAPI, db accounts.Database,
- producer *producers.RoomserverProducer, membership string, roomID string,
+ membership string, roomID string,
evTime time.Time,
) (inviteStoredOnIDServer bool, err error) {
if membership != gomatrixserverlib.Invite || (body.Address == "" && body.IDServer == "" && body.Medium == "") {
@@ -112,7 +111,7 @@ func CheckAndProcessInvite(
// "m.room.third_party_invite" have to be emitted from the data in
// storeInviteRes.
err = emit3PIDInviteEvent(
- ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, producer, evTime,
+ ctx, body, storeInviteRes, device, roomID, cfg, rsAPI, evTime,
)
inviteStoredOnIDServer = err == nil
@@ -331,7 +330,7 @@ func emit3PIDInviteEvent(
ctx context.Context,
body *MembershipRequest, res *idServerStoreInviteResponse,
device *authtypes.Device, roomID string, cfg *config.Dendrite,
- rsAPI api.RoomserverInternalAPI, producer *producers.RoomserverProducer,
+ rsAPI api.RoomserverInternalAPI,
evTime time.Time,
) error {
builder := &gomatrixserverlib.EventBuilder{
@@ -359,8 +358,8 @@ func emit3PIDInviteEvent(
return err
}
- _, err = producer.SendEvents(
- ctx,
+ _, err = api.SendEvents(
+ ctx, rsAPI,
[]gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion),
},