aboutsummaryrefslogtreecommitdiff
path: root/federationsender/consumers/roomserver.go
diff options
context:
space:
mode:
Diffstat (limited to 'federationsender/consumers/roomserver.go')
-rw-r--r--federationsender/consumers/roomserver.go26
1 files changed, 13 insertions, 13 deletions
diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go
index 18c8324b..67d08b33 100644
--- a/federationsender/consumers/roomserver.go
+++ b/federationsender/consumers/roomserver.go
@@ -33,11 +33,11 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
- cfg *config.Dendrite
- roomServerConsumer *common.ContinualConsumer
- db storage.Database
- queues *queue.OutgoingQueues
- query api.RoomserverQueryAPI
+ cfg *config.Dendrite
+ rsAPI api.RoomserverInternalAPI
+ rsConsumer *common.ContinualConsumer
+ db storage.Database
+ queues *queue.OutgoingQueues
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
@@ -46,7 +46,7 @@ func NewOutputRoomEventConsumer(
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store storage.Database,
- queryAPI api.RoomserverQueryAPI,
+ rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputRoomEvent),
@@ -54,11 +54,11 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
- cfg: cfg,
- roomServerConsumer: &consumer,
- db: store,
- queues: queues,
- query: queryAPI,
+ cfg: cfg,
+ rsConsumer: &consumer,
+ db: store,
+ queues: queues,
+ rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
@@ -67,7 +67,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
- return s.roomServerConsumer.Start()
+ return s.rsConsumer.Start()
}
// onMessage is called when the federation server receives a new event from the room server output log.
@@ -369,7 +369,7 @@ func (s *OutputRoomEventConsumer) lookupStateEvents(
// from the roomserver using the query API.
eventReq := api.QueryEventsByIDRequest{EventIDs: missing}
var eventResp api.QueryEventsByIDResponse
- if err := s.query.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
+ if err := s.rsAPI.QueryEventsByID(context.TODO(), &eventReq, &eventResp); err != nil {
return nil, err
}