aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-01-04 13:47:48 +0000
committerGitHub <noreply@github.com>2021-01-04 13:47:48 +0000
commit05324b68610834b3e5af32be9245624e455ed041 (patch)
treeb256645d944f0b3bec018153a3318823c36a44c1
parent597350a67f6fc803e1a81e4d651be3efbd0d5907 (diff)
Send/state tweaks (#1681)
* Check missing event count * Don't use request context for /send
-rw-r--r--federationapi/routing/send.go127
1 files changed, 65 insertions, 62 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index f50b9c3d..96b5355e 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -84,7 +84,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
- resp, jsonErr := t.processTransaction(httpReq.Context())
+ resp, jsonErr := t.processTransaction(context.Background())
if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr
@@ -1005,79 +1005,82 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
return t.lookupMissingStateViaState(ctx, roomID, eventID, roomVersion)
}
- util.GetLogger(ctx).WithFields(logrus.Fields{
- "missing": missingCount,
- "event_id": eventID,
- "room_id": roomID,
- "total_state": len(stateIDs.StateEventIDs),
- "total_auth_events": len(stateIDs.AuthEventIDs),
- "concurrent_requests": concurrentRequests,
- }).Info("Fetching missing state at event")
+ if missingCount > 0 {
+ util.GetLogger(ctx).WithFields(logrus.Fields{
+ "missing": missingCount,
+ "event_id": eventID,
+ "room_id": roomID,
+ "total_state": len(stateIDs.StateEventIDs),
+ "total_auth_events": len(stateIDs.AuthEventIDs),
+ "concurrent_requests": concurrentRequests,
+ }).Info("Fetching missing state at event")
+
+ // Get a list of servers to fetch from.
+ servers := t.getServers(ctx, roomID)
+ if len(servers) > 5 {
+ servers = servers[:5]
+ }
- // Get a list of servers to fetch from.
- servers := t.getServers(ctx, roomID)
- if len(servers) > 5 {
- servers = servers[:5]
- }
+ // Create a queue containing all of the missing event IDs that we want
+ // to retrieve.
+ pending := make(chan string, missingCount)
+ for missingEventID := range missing {
+ pending <- missingEventID
+ }
+ close(pending)
- // Create a queue containing all of the missing event IDs that we want
- // to retrieve.
- pending := make(chan string, missingCount)
- for missingEventID := range missing {
- pending <- missingEventID
- }
- close(pending)
+ // Define how many workers we should start to do this.
+ if missingCount < concurrentRequests {
+ concurrentRequests = missingCount
+ }
- // Define how many workers we should start to do this.
- if missingCount < concurrentRequests {
- concurrentRequests = missingCount
- }
+ // Create the wait group.
+ var fetchgroup sync.WaitGroup
+ fetchgroup.Add(concurrentRequests)
- // Create the wait group.
- var fetchgroup sync.WaitGroup
- fetchgroup.Add(concurrentRequests)
+ // This is the only place where we'll write to t.haveEvents from
+ // multiple goroutines, and everywhere else is blocked on this
+ // synchronous function anyway.
+ var haveEventsMutex sync.Mutex
- // This is the only place where we'll write to t.haveEvents from
- // multiple goroutines, and everywhere else is blocked on this
- // synchronous function anyway.
- var haveEventsMutex sync.Mutex
+ // Define what we'll do in order to fetch the missing event ID.
+ fetch := func(missingEventID string) {
+ var h *gomatrixserverlib.HeaderedEvent
+ h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
+ switch err.(type) {
+ case verifySigError:
+ return
+ case nil:
+ break
+ default:
+ util.GetLogger(ctx).WithFields(logrus.Fields{
+ "event_id": missingEventID,
+ "room_id": roomID,
+ }).Info("Failed to fetch missing event")
+ return
+ }
+ haveEventsMutex.Lock()
+ t.haveEvents[h.EventID()] = h
+ haveEventsMutex.Unlock()
+ }
- // Define what we'll do in order to fetch the missing event ID.
- fetch := func(missingEventID string) {
- var h *gomatrixserverlib.HeaderedEvent
- h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
- switch err.(type) {
- case verifySigError:
- return
- case nil:
- break
- default:
- util.GetLogger(ctx).WithFields(logrus.Fields{
- "event_id": missingEventID,
- "room_id": roomID,
- }).Info("Failed to fetch missing event")
- return
+ // Create the worker.
+ worker := func(ch <-chan string) {
+ defer fetchgroup.Done()
+ for missingEventID := range ch {
+ fetch(missingEventID)
+ }
}
- haveEventsMutex.Lock()
- t.haveEvents[h.EventID()] = h
- haveEventsMutex.Unlock()
- }
- // Create the worker.
- worker := func(ch <-chan string) {
- defer fetchgroup.Done()
- for missingEventID := range ch {
- fetch(missingEventID)
+ // Start the workers.
+ for i := 0; i < concurrentRequests; i++ {
+ go worker(pending)
}
- }
- // Start the workers.
- for i := 0; i < concurrentRequests; i++ {
- go worker(pending)
+ // Wait for the workers to finish.
+ fetchgroup.Wait()
}
- // Wait for the workers to finish.
- fetchgroup.Wait()
resp, err := t.createRespStateFromStateIDs(stateIDs)
return resp, err
}