aboutsummaryrefslogtreecommitdiff
path: root/syncapi
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-05-01 10:48:17 +0100
committerGitHub <noreply@github.com>2020-05-01 10:48:17 +0100
commite15f6676ac3f76ec2ef679c2df300d6a8e7e668f (patch)
tree0b82339939e8932d46e1ca2cf6024ab55dc7602f /syncapi
parentebbfc125920beb321713e28a2a137d768406fa15 (diff)
Consolidation of roomserver APIs (#994)
* Consolidation of roomserver APIs * Comment out alias tests for now, they are broken * Wire AS API into roomserver again * Roomserver didn't take asAPI param before so return to that * Prevent roomserver asking AS API for alias info * Rename some files * Remove alias_test, incoherent tests and unwanted appservice integration * Remove FS API inject on syncapi component
Diffstat (limited to 'syncapi')
-rw-r--r--syncapi/consumers/roomserver.go22
-rw-r--r--syncapi/routing/messages.go8
-rw-r--r--syncapi/routing/routing.go4
-rw-r--r--syncapi/syncapi.go6
4 files changed, 20 insertions, 20 deletions
diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go
index 1d512972..987cc5df 100644
--- a/syncapi/consumers/roomserver.go
+++ b/syncapi/consumers/roomserver.go
@@ -32,10 +32,10 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- roomServerConsumer *common.ContinualConsumer
- db storage.Database
- notifier *sync.Notifier
- query api.RoomserverQueryAPI
+ rsAPI api.RoomserverInternalAPI
+ rsConsumer *common.ContinualConsumer
+ db storage.Database
+ notifier *sync.Notifier
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -44,7 +44,7 @@ func NewOutputRoomEventConsumer(
kafkaConsumer sarama.Consumer,
n *sync.Notifier,
store storage.Database,
- queryAPI api.RoomserverQueryAPI,
+ rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{
@@ -53,10 +53,10 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
- roomServerConsumer: &consumer,
- db: store,
- notifier: n,
- query: queryAPI,
+ rsConsumer: &consumer,
+ db: store,
+ notifier: n,
+ rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
@@ -65,7 +65,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return s.roomServerConsumer.Start()
+ return s.rsConsumer.Start()
}
// onMessage is called when the sync server receives a new event from the room server output log.
@@ -226,7 +226,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
- if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
+ if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err
}
diff --git a/syncapi/routing/messages.go b/syncapi/routing/messages.go
index c48414ab..5105e224 100644
--- a/syncapi/routing/messages.go
+++ b/syncapi/routing/messages.go
@@ -34,7 +34,7 @@ import (
type messagesReq struct {
ctx context.Context
db storage.Database
- queryAPI api.RoomserverQueryAPI
+ rsAPI api.RoomserverInternalAPI
federation *gomatrixserverlib.FederationClient
cfg *config.Dendrite
roomID string
@@ -59,7 +59,7 @@ const defaultMessagesLimit = 10
func OnIncomingMessagesRequest(
req *http.Request, db storage.Database, roomID string,
federation *gomatrixserverlib.FederationClient,
- queryAPI api.RoomserverQueryAPI,
+ rsAPI api.RoomserverInternalAPI,
cfg *config.Dendrite,
) util.JSONResponse {
var err error
@@ -135,7 +135,7 @@ func OnIncomingMessagesRequest(
mReq := messagesReq{
ctx: req.Context(),
db: db,
- queryAPI: queryAPI,
+ rsAPI: rsAPI,
federation: federation,
cfg: cfg,
roomID: roomID,
@@ -360,7 +360,7 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
// the room or sending the request.
func (r *messagesReq) backfill(roomID string, fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
var res api.QueryBackfillResponse
- err := r.queryAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
+ err := r.rsAPI.QueryBackfill(context.Background(), &api.QueryBackfillRequest{
RoomID: roomID,
EarliestEventsIDs: fromEventIDs,
Limit: limit,
diff --git a/syncapi/routing/routing.go b/syncapi/routing/routing.go
index 9078b87f..5a36a279 100644
--- a/syncapi/routing/routing.go
+++ b/syncapi/routing/routing.go
@@ -40,7 +40,7 @@ const pathPrefixR0 = "/_matrix/client/r0"
func Setup(
apiMux *mux.Router, srp *sync.RequestPool, syncDB storage.Database,
deviceDB devices.Database, federation *gomatrixserverlib.FederationClient,
- queryAPI api.RoomserverQueryAPI,
+ rsAPI api.RoomserverInternalAPI,
cfg *config.Dendrite,
) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
@@ -61,6 +61,6 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
- return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], federation, queryAPI, cfg)
+ return OnIncomingMessagesRequest(req, syncDB, vars["roomID"], federation, rsAPI, cfg)
})).Methods(http.MethodGet, http.MethodOptions)
}
diff --git a/syncapi/syncapi.go b/syncapi/syncapi.go
index 1535d2b1..5ab1ec7c 100644
--- a/syncapi/syncapi.go
+++ b/syncapi/syncapi.go
@@ -38,7 +38,7 @@ func SetupSyncAPIComponent(
base *basecomponent.BaseDendrite,
deviceDB devices.Database,
accountsDB accounts.Database,
- queryAPI api.RoomserverQueryAPI,
+ rsAPI api.RoomserverInternalAPI,
federation *gomatrixserverlib.FederationClient,
cfg *config.Dendrite,
) {
@@ -61,7 +61,7 @@ func SetupSyncAPIComponent(
requestPool := sync.NewRequestPool(syncDB, notifier, accountsDB)
roomConsumer := consumers.NewOutputRoomEventConsumer(
- base.Cfg, base.KafkaConsumer, notifier, syncDB, queryAPI,
+ base.Cfg, base.KafkaConsumer, notifier, syncDB, rsAPI,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
@@ -81,5 +81,5 @@ func SetupSyncAPIComponent(
logrus.WithError(err).Panicf("failed to start typing server consumer")
}
- routing.Setup(base.APIMux, requestPool, syncDB, deviceDB, federation, queryAPI, cfg)
+ routing.Setup(base.APIMux, requestPool, syncDB, deviceDB, federation, rsAPI, cfg)
}