aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2020-06-12 15:11:33 +0100
committerGitHub <noreply@github.com>2020-06-12 15:11:33 +0100
commit0dc4ceaa2d8e46aa0134c1aabe96389ba4c1591d (patch)
tree01fc28ca7c7590e37f05923169602622b2144971
parentecd7accbad724f26248498a9035a1fbc69e2f08d (diff)
Minor perf/debugging improvements (#1121)
* Minor perf/debugging improvements - publicroomsapi: Don't call QueryEventsByID with no event IDs - appservice: Consume only if there are 1 or more ASes - roomserver: don't keep a copy of the request "for debugging" - we trace now * fedsender: return early if we have no destinations * Unbreak tests
-rw-r--r--appservice/appservice.go16
-rw-r--r--federationapi/routing/send_test.go3
-rw-r--r--federationsender/queue/queue.go3
-rw-r--r--publicroomsapi/consumers/roomserver.go11
-rw-r--r--roomserver/api/query.go8
-rw-r--r--roomserver/internal/query.go5
6 files changed, 19 insertions, 27 deletions
diff --git a/appservice/appservice.go b/appservice/appservice.go
index 0fbe3f20..bd261ff9 100644
--- a/appservice/appservice.go
+++ b/appservice/appservice.go
@@ -86,12 +86,16 @@ func NewInternalAPI(
Cfg: base.Cfg,
}
- consumer := consumers.NewOutputRoomEventConsumer(
- base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
- rsAPI, workerStates,
- )
- if err := consumer.Start(); err != nil {
- logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
+ // Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
+ // We can't add ASes at runtime so this is safe to do.
+ if len(workerStates) > 0 {
+ consumer := consumers.NewOutputRoomEventConsumer(
+ base.Cfg, base.KafkaConsumer, accountsDB, appserviceDB,
+ rsAPI, workerStates,
+ )
+ if err := consumer.Start(); err != nil {
+ logrus.WithError(err).Panicf("failed to start appservice roomserver consumer")
+ }
}
// Create application service transaction workers
diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go
index adae7c22..3123b55c 100644
--- a/federationapi/routing/send_test.go
+++ b/federationapi/routing/send_test.go
@@ -128,7 +128,6 @@ func (t *testRoomserverAPI) QueryLatestEventsAndState(
response *api.QueryLatestEventsAndStateResponse,
) error {
r := t.queryLatestEventsAndState(request)
- response.QueryLatestEventsAndStateRequest = *request
response.RoomExists = r.RoomExists
response.RoomVersion = testRoomVersion
response.LatestEvents = r.LatestEvents
@@ -144,7 +143,6 @@ func (t *testRoomserverAPI) QueryStateAfterEvents(
response *api.QueryStateAfterEventsResponse,
) error {
response.RoomVersion = testRoomVersion
- response.QueryStateAfterEventsRequest = *request
res := t.queryStateAfterEvents(request)
response.PrevEventsExist = res.PrevEventsExist
response.RoomExists = res.RoomExists
@@ -612,7 +610,6 @@ func TestTransactionFetchMissingStateByStateIDs(t *testing.T) {
}
}
}
- res.QueryEventsByIDRequest = *req
return res
},
}
diff --git a/federationsender/queue/queue.go b/federationsender/queue/queue.go
index 5b8fc3c5..24034355 100644
--- a/federationsender/queue/queue.go
+++ b/federationsender/queue/queue.go
@@ -107,6 +107,9 @@ func (oqs *OutgoingQueues) SendEvent(
// Remove our own server from the list of destinations.
destinations = filterAndDedupeDests(oqs.origin, destinations)
+ if len(destinations) == 0 {
+ return nil
+ }
log.WithFields(log.Fields{
"destinations": destinations, "event": ev.EventID(),
diff --git a/publicroomsapi/consumers/roomserver.go b/publicroomsapi/consumers/roomserver.go
index ba187cb1..b9686d56 100644
--- a/publicroomsapi/consumers/roomserver.go
+++ b/publicroomsapi/consumers/roomserver.go
@@ -78,18 +78,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return nil
}
- remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
var remQueryRes api.QueryEventsByIDResponse
- if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
- log.Warn(err)
- return err
+ if len(output.NewRoomEvent.RemovesStateEventIDs) > 0 {
+ remQueryReq := api.QueryEventsByIDRequest{EventIDs: output.NewRoomEvent.RemovesStateEventIDs}
+ if err := s.rsAPI.QueryEventsByID(context.TODO(), &remQueryReq, &remQueryRes); err != nil {
+ log.Warn(err)
+ return err
+ }
}
var addQueryEvents, remQueryEvents []gomatrixserverlib.Event
for _, headeredEvent := range output.NewRoomEvent.AddsState() {
addQueryEvents = append(addQueryEvents, headeredEvent.Event)
}
- addQueryEvents = append(addQueryEvents, output.NewRoomEvent.Event.Unwrap())
for _, headeredEvent := range remQueryRes.Events {
remQueryEvents = append(remQueryEvents, headeredEvent.Event)
}
diff --git a/roomserver/api/query.go b/roomserver/api/query.go
index b1525342..6586b1af 100644
--- a/roomserver/api/query.go
+++ b/roomserver/api/query.go
@@ -33,8 +33,6 @@ type QueryLatestEventsAndStateRequest struct {
// This is used when sending events to set the prev_events, auth_events and depth.
// It is also used to tell whether the event is allowed by the event auth rules.
type QueryLatestEventsAndStateResponse struct {
- // Copy of the request for debugging.
- QueryLatestEventsAndStateRequest
// Does the room exist?
// If the room doesn't exist this will be false and LatestEvents will be empty.
RoomExists bool `json:"room_exists"`
@@ -66,8 +64,6 @@ type QueryStateAfterEventsRequest struct {
// QueryStateAfterEventsResponse is a response to QueryStateAfterEvents
type QueryStateAfterEventsResponse struct {
- // Copy of the request for debugging.
- QueryStateAfterEventsRequest
// Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"`
@@ -89,8 +85,6 @@ type QueryEventsByIDRequest struct {
// QueryEventsByIDResponse is a response to QueryEventsByID
type QueryEventsByIDResponse struct {
- // Copy of the request for debugging.
- QueryEventsByIDRequest
// A list of events with the requested IDs.
// If the roomserver does not have a copy of a requested event
// then it will omit that event from the list.
@@ -187,8 +181,6 @@ type QueryStateAndAuthChainRequest struct {
// QueryStateAndAuthChainResponse is a response to QueryStateAndAuthChain
type QueryStateAndAuthChainResponse struct {
- // Copy of the request for debugging.
- QueryStateAndAuthChainRequest
// Does the room exist on this roomserver?
// If the room doesn't exist this will be false and StateEvents will be empty.
RoomExists bool `json:"room_exists"`
diff --git a/roomserver/internal/query.go b/roomserver/internal/query.go
index 375ddc23..4fc8e4c2 100644
--- a/roomserver/internal/query.go
+++ b/roomserver/internal/query.go
@@ -45,7 +45,6 @@ func (r *RoomserverInternalAPI) QueryLatestEventsAndState(
roomState := state.NewStateResolution(r.DB)
- response.QueryLatestEventsAndStateRequest = *request
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
if err != nil {
return err
@@ -105,7 +104,6 @@ func (r *RoomserverInternalAPI) QueryStateAfterEvents(
roomState := state.NewStateResolution(r.DB)
- response.QueryStateAfterEventsRequest = *request
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
if err != nil {
return err
@@ -153,8 +151,6 @@ func (r *RoomserverInternalAPI) QueryEventsByID(
request *api.QueryEventsByIDRequest,
response *api.QueryEventsByIDResponse,
) error {
- response.QueryEventsByIDRequest = *request
-
eventNIDMap, err := r.DB.EventNIDs(ctx, request.EventIDs)
if err != nil {
return err
@@ -734,7 +730,6 @@ func (r *RoomserverInternalAPI) QueryStateAndAuthChain(
request *api.QueryStateAndAuthChainRequest,
response *api.QueryStateAndAuthChainResponse,
) error {
- response.QueryStateAndAuthChainRequest = *request
roomNID, err := r.DB.RoomNIDExcludingStubs(ctx, request.RoomID)
if err != nil {
return err