aboutsummaryrefslogtreecommitdiff
path: root/federationapi
diff options
context:
space:
mode:
authorKegsay <kegan@matrix.org>2021-04-08 13:50:39 +0100
committerGitHub <noreply@github.com>2021-04-08 13:50:39 +0100
commitb769d5a25ee1dd298899ab9fec0c31e834f9b99e (patch)
tree53c2e72af0b5b8339aa706485fde04c2fc72c605 /federationapi
parent5ade348d142012367e6cf4b8c2c65d6fbf357af6 (diff)
Optimise memory usage when calling /g_m_e (#1819)
* Optimise memory usage when calling /g_m_e * cache more events * refactor handling of device list update pokes * Sigh
Diffstat (limited to 'federationapi')
-rw-r--r--federationapi/routing/send.go44
1 files changed, 29 insertions, 15 deletions
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index b48d6c0b..fee6d565 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -620,7 +620,9 @@ func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserv
return gomatrixserverlib.Allowed(e, &authUsingState)
}
-func (t *txnReq) processEventWithMissingState(ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
+func (t *txnReq) processEventWithMissingState(
+ ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
+) error {
// Do this with a fresh context, so that we keep working even if the
// original request times out. With any luck, by the time the remote
// side retries, we'll have fetched the missing state.
@@ -784,7 +786,7 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
default:
return nil, false, fmt.Errorf("t.lookupEvent: %w", err)
}
- t.haveEvents[h.EventID()] = h
+ t.cacheAndReturn(h)
if h.StateKey() != nil {
addedToState := false
for i := range respState.StateEvents {
@@ -803,6 +805,14 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
return respState, false, nil
}
+func (t *txnReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *gomatrixserverlib.HeaderedEvent {
+ if cached, exists := t.haveEvents[ev.EventID()]; exists {
+ return cached
+ }
+ t.haveEvents[ev.EventID()] = ev
+ return ev
+}
+
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
var res api.QueryStateAfterEventsResponse
err := t.rsAPI.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
@@ -810,15 +820,21 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
PrevEventIDs: []string{eventID},
}, &res)
if err != nil || !res.PrevEventsExist {
- util.GetLogger(ctx).WithError(err).Warnf("failed to query state after %s locally", eventID)
+ util.GetLogger(ctx).WithField("room_id", roomID).WithError(err).Warnf("failed to query state after %s locally, prev exists=%v", eventID, res.PrevEventsExist)
return nil
}
+ stateEvents := make([]*gomatrixserverlib.HeaderedEvent, len(res.StateEvents))
for i, ev := range res.StateEvents {
- t.haveEvents[ev.EventID()] = res.StateEvents[i]
+ // set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
+ // processEvent request, which is better for memory.
+ stateEvents[i] = t.cacheAndReturn(ev)
}
+ // we should never access res.StateEvents again so we delete it here to make GC faster
+ res.StateEvents = nil
+
var authEvents []*gomatrixserverlib.Event
missingAuthEvents := map[string]bool{}
- for _, ev := range res.StateEvents {
+ for _, ev := range stateEvents {
for _, ae := range ev.AuthEventIDs() {
if aev, ok := t.haveEvents[ae]; ok {
authEvents = append(authEvents, aev.Unwrap())
@@ -843,14 +859,13 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
return nil
}
for i := range queryRes.Events {
- evID := queryRes.Events[i].EventID()
- t.haveEvents[evID] = queryRes.Events[i]
- authEvents = append(authEvents, queryRes.Events[i].Unwrap())
+ authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
}
+ queryRes.Events = nil
}
return &gomatrixserverlib.RespState{
- StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents),
+ StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
AuthEvents: authEvents,
}
}
@@ -860,8 +875,6 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
func (t *txnReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
*gomatrixserverlib.RespState, error) {
- util.GetLogger(ctx).Infof("lookupStateBeforeEvent %s", eventID)
-
// Attempt to fetch the missing state using /state_ids and /events
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
}
@@ -992,7 +1005,6 @@ Event:
}
}
- // we processed everything!
return newEvents, nil
}
@@ -1011,7 +1023,7 @@ func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID
func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
*gomatrixserverlib.RespState, error) {
- util.GetLogger(ctx).Infof("lookupMissingStateViaStateIDs %s", eventID)
+ util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event
stateIDs, err := t.federation.LookupStateIDs(ctx, t.Origin, roomID, eventID)
if err != nil {
@@ -1040,14 +1052,16 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
}
for i := range queryRes.Events {
evID := queryRes.Events[i].EventID()
- t.haveEvents[evID] = queryRes.Events[i]
+ t.cacheAndReturn(queryRes.Events[i])
if missing[evID] {
delete(missing, evID)
}
}
+ queryRes.Events = nil // allow it to be GCed
concurrentRequests := 8
missingCount := len(missing)
+ util.GetLogger(ctx).WithField("room_id", roomID).WithField("event_id", eventID).Infof("lookupMissingStateViaStateIDs missing %d/%d events", missingCount, len(wantIDs))
// If over 50% of the auth/state events from /state_ids are missing
// then we'll just call /state instead, otherwise we'll just end up
@@ -1112,7 +1126,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
return
}
haveEventsMutex.Lock()
- t.haveEvents[h.EventID()] = h
+ t.cacheAndReturn(h)
haveEventsMutex.Unlock()
}