aboutsummaryrefslogtreecommitdiff
path: root/federationsender/consumers
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2020-05-01 10:48:17 +0100
committerGitHub <noreply@github.com>2020-05-01 10:48:17 +0100
commite15f6676ac3f76ec2ef679c2df300d6a8e7e668f (patch)
tree0b82339939e8932d46e1ca2cf6024ab55dc7602f /federationsender/consumers
parentebbfc125920beb321713e28a2a137d768406fa15 (diff)
Consolidation of roomserver APIs (#994)
* Consolidation of roomserver APIs * Comment out alias tests for now, they are broken * Wire AS API into roomserver again * Roomserver didn't take asAPI param before so return to that * Prevent roomserver asking AS API for alias info * Rename some files * Remove alias_test, incoherent tests and unwanted appservice integration * Remove FS API inject on syncapi component
Diffstat (limited to 'federationsender/consumers')
-rw-r--r--federationsender/consumers/roomserver.go26
1 files changed, 13 insertions, 13 deletions
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 18c8324b..67d08b33 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -33,11 +33,11 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- cfg *config.Dendrite
- roomServerConsumer *common.ContinualConsumer
- db storage.Database
- queues *queue.OutgoingQueues
- query api.RoomserverQueryAPI
+ cfg *config.Dendrite
+ rsAPI api.RoomserverInternalAPI
+ rsConsumer *common.ContinualConsumer
+ db storage.Database
+ queues *queue.OutgoingQueues
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -46,7 +46,7 @@ func NewOutputRoomEventConsumer(
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
- queryAPI api.RoomserverQueryAPI,
+ rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
@@ -54,11 +54,11 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
- cfg: cfg,
- roomServerConsumer: &consumer,
- db: store,
- queues: queues,
- query: queryAPI,
+ cfg: cfg,
+ rsConsumer: &consumer,
+ db: store,
+ queues: queues,
+ rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
@@ -67,7 +67,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return s.roomServerConsumer.Start()
+ return s.rsConsumer.Start()
}
// onMessage is called when the federation server receives a new event from the room server output log.
@@ -369,7 +369,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
- if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
+ if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err
}