diff options
Diffstat (limited to 'federationapi/routing/send.go')
-rw-r--r-- | federationapi/routing/send.go | 127 |
1 files changed, 65 insertions, 62 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index f50b9c3d..96b5355e 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -84,7 +84,7 @@ func Send( util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) - resp, jsonErr := t.processTransaction(httpReq.Context()) + resp, jsonErr := t.processTransaction(context.Background()) if jsonErr != nil { util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") return *jsonErr @@ -1005,79 +1005,82 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even return t.lookupMissingStateViaState(ctx, roomID, eventID, roomVersion) } - util.GetLogger(ctx).WithFields(logrus.Fields{ - "missing": missingCount, - "event_id": eventID, - "room_id": roomID, - "total_state": len(stateIDs.StateEventIDs), - "total_auth_events": len(stateIDs.AuthEventIDs), - "concurrent_requests": concurrentRequests, - }).Info("Fetching missing state at event") + if missingCount > 0 { + util.GetLogger(ctx).WithFields(logrus.Fields{ + "missing": missingCount, + "event_id": eventID, + "room_id": roomID, + "total_state": len(stateIDs.StateEventIDs), + "total_auth_events": len(stateIDs.AuthEventIDs), + "concurrent_requests": concurrentRequests, + }).Info("Fetching missing state at event") + + // Get a list of servers to fetch from. + servers := t.getServers(ctx, roomID) + if len(servers) > 5 { + servers = servers[:5] + } - // Get a list of servers to fetch from. - servers := t.getServers(ctx, roomID) - if len(servers) > 5 { - servers = servers[:5] - } + // Create a queue containing all of the missing event IDs that we want + // to retrieve. + pending := make(chan string, missingCount) + for missingEventID := range missing { + pending <- missingEventID + } + close(pending) - // Create a queue containing all of the missing event IDs that we want - // to retrieve. - pending := make(chan string, missingCount) - for missingEventID := range missing { - pending <- missingEventID - } - close(pending) + // Define how many workers we should start to do this. + if missingCount < concurrentRequests { + concurrentRequests = missingCount + } - // Define how many workers we should start to do this. - if missingCount < concurrentRequests { - concurrentRequests = missingCount - } + // Create the wait group. + var fetchgroup sync.WaitGroup + fetchgroup.Add(concurrentRequests) - // Create the wait group. - var fetchgroup sync.WaitGroup - fetchgroup.Add(concurrentRequests) + // This is the only place where we'll write to t.haveEvents from + // multiple goroutines, and everywhere else is blocked on this + // synchronous function anyway. + var haveEventsMutex sync.Mutex - // This is the only place where we'll write to t.haveEvents from - // multiple goroutines, and everywhere else is blocked on this - // synchronous function anyway. - var haveEventsMutex sync.Mutex + // Define what we'll do in order to fetch the missing event ID. + fetch := func(missingEventID string) { + var h *gomatrixserverlib.HeaderedEvent + h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers) + switch err.(type) { + case verifySigError: + return + case nil: + break + default: + util.GetLogger(ctx).WithFields(logrus.Fields{ + "event_id": missingEventID, + "room_id": roomID, + }).Info("Failed to fetch missing event") + return + } + haveEventsMutex.Lock() + t.haveEvents[h.EventID()] = h + haveEventsMutex.Unlock() + } - // Define what we'll do in order to fetch the missing event ID. - fetch := func(missingEventID string) { - var h *gomatrixserverlib.HeaderedEvent - h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers) - switch err.(type) { - case verifySigError: - return - case nil: - break - default: - util.GetLogger(ctx).WithFields(logrus.Fields{ - "event_id": missingEventID, - "room_id": roomID, - }).Info("Failed to fetch missing event") - return + // Create the worker. + worker := func(ch <-chan string) { + defer fetchgroup.Done() + for missingEventID := range ch { + fetch(missingEventID) + } } - haveEventsMutex.Lock() - t.haveEvents[h.EventID()] = h - haveEventsMutex.Unlock() - } - // Create the worker. - worker := func(ch <-chan string) { - defer fetchgroup.Done() - for missingEventID := range ch { - fetch(missingEventID) + // Start the workers. + for i := 0; i < concurrentRequests; i++ { + go worker(pending) } - } - // Start the workers. - for i := 0; i < concurrentRequests; i++ { - go worker(pending) + // Wait for the workers to finish. + fetchgroup.Wait() } - // Wait for the workers to finish. - fetchgroup.Wait() resp, err := t.createRespStateFromStateIDs(stateIDs) return resp, err } |