aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2021-02-16 17:12:17 +0000
committerGitHub <noreply@github.com>2021-02-16 17:12:17 +0000
commit5d74a1757f652f1e367a036f931e71bd3da612dd (patch)
tree1eba4d67889667ce00575be5751c305e4759a9e5
parentf448e8972a1a6974916afc3ab686b342eaf568e2 (diff)
Don't query for servers so often in /send (#1766)
* Look up servers less often, don't hit API for missing auth events unless there are actually missing auth events * Remove ResolveConflictsAdhoc (since it is already in GMSL), other tweaks * Update gomatrixserverlib to matrix-org/gomatrixserverlib#254 * Fix resolve-state * Initialise t.servers on first use
-rw-r--r--cmd/resolve-state/main.go3
-rw-r--r--federationapi/routing/send.go108
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--roomserver/internal/query/query.go4
-rw-r--r--roomserver/state/state.go73
6 files changed, 50 insertions, 144 deletions
diff --git a/cmd/resolve-state/main.go b/cmd/resolve-state/main.go
index efa58333..69c3489d 100644
--- a/cmd/resolve-state/main.go
+++ b/cmd/resolve-state/main.go
@@ -8,7 +8,6 @@ import (
"strconv"
"github.com/matrix-org/dendrite/internal/caching"
- "github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup"
@@ -105,7 +104,7 @@ func main() {
}
fmt.Println("Resolving state")
- resolved, err := state.ResolveConflictsAdhoc(
+ resolved, err := gomatrixserverlib.ResolveConflicts(
gomatrixserverlib.RoomVersion(*roomVersion),
events,
authEvents,
diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go
index 96b5355e..02683aea 100644
--- a/federationapi/routing/send.go
+++ b/federationapi/routing/send.go
@@ -102,11 +102,13 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
- rsAPI api.RoomserverInternalAPI
- eduAPI eduserverAPI.EDUServerInputAPI
- keyAPI keyapi.KeyInternalAPI
- keys gomatrixserverlib.JSONVerifier
- federation txnFederationClient
+ rsAPI api.RoomserverInternalAPI
+ eduAPI eduserverAPI.EDUServerInputAPI
+ keyAPI keyapi.KeyInternalAPI
+ keys gomatrixserverlib.JSONVerifier
+ federation txnFederationClient
+ servers []gomatrixserverlib.ServerName
+ serversMutex sync.RWMutex
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
@@ -404,16 +406,21 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
}
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
- servers := []gomatrixserverlib.ServerName{t.Origin}
+ t.serversMutex.Lock()
+ defer t.serversMutex.Unlock()
+ if t.servers != nil {
+ return t.servers
+ }
+ t.servers = []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: roomID,
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
- servers = append(servers, serverRes.ServerNames...)
- util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(servers), roomID)
+ t.servers = append(t.servers, serverRes.ServerNames...)
+ util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
}
- return servers
+ return t.servers
}
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
@@ -482,14 +489,10 @@ func (t *txnReq) retrieveMissingAuthEvents(
missingAuthEvents[missingAuthEventID] = struct{}{}
}
- servers := t.getServers(ctx, e.RoomID())
- if len(servers) > 5 {
- servers = servers[:5]
- }
withNextEvent:
for missingAuthEventID := range missingAuthEvents {
withNextServer:
- for _, server := range servers {
+ for _, server := range t.getServers(ctx, e.RoomID()) {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil {
@@ -692,13 +695,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
}
- servers := t.getServers(ctx, roomID)
- if len(servers) > 5 {
- servers = servers[:5]
- }
-
// fetch the event we're missing and add it to the pile
- h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers)
+ h, err := t.lookupEvent(ctx, roomVersion, roomID, eventID, false)
switch err.(type) {
case verifySigError:
return respState, false, nil
@@ -740,11 +738,10 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
t.haveEvents[ev.EventID()] = res.StateEvents[i]
}
var authEvents []*gomatrixserverlib.Event
- missingAuthEvents := make(map[string]bool)
+ missingAuthEvents := map[string]bool{}
for _, ev := range res.StateEvents {
for _, ae := range ev.AuthEventIDs() {
- aev, ok := t.haveEvents[ae]
- if ok {
+ if aev, ok := t.haveEvents[ae]; ok {
authEvents = append(authEvents, aev.Unwrap())
} else {
missingAuthEvents[ae] = true
@@ -753,27 +750,28 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
- var missingEventList []string
- for evID := range missingAuthEvents {
- missingEventList = append(missingEventList, evID)
- }
- queryReq := api.QueryEventsByIDRequest{
- EventIDs: missingEventList,
- }
- util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList)
- var queryRes api.QueryEventsByIDResponse
- if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
- return nil
- }
- for i := range queryRes.Events {
- evID := queryRes.Events[i].EventID()
- t.haveEvents[evID] = queryRes.Events[i]
- authEvents = append(authEvents, queryRes.Events[i].Unwrap())
+ if len(missingAuthEvents) > 0 {
+ var missingEventList []string
+ for evID := range missingAuthEvents {
+ missingEventList = append(missingEventList, evID)
+ }
+ queryReq := api.QueryEventsByIDRequest{
+ EventIDs: missingEventList,
+ }
+ util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList)
+ var queryRes api.QueryEventsByIDResponse
+ if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
+ return nil
+ }
+ for i := range queryRes.Events {
+ evID := queryRes.Events[i].EventID()
+ t.haveEvents[evID] = queryRes.Events[i]
+ authEvents = append(authEvents, queryRes.Events[i].Unwrap())
+ }
}
- evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents)
return &gomatrixserverlib.RespState{
- StateEvents: evs,
+ StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents),
AuthEvents: authEvents,
}
}
@@ -805,11 +803,7 @@ retryAllowedState:
if err = checkAllowedByState(backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
- servers := t.getServers(ctx, backwardsExtremity.RoomID())
- if len(servers) > 5 {
- servers = servers[:5]
- }
- h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers)
+ h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true)
switch err2.(type) {
case verifySigError:
return &gomatrixserverlib.RespState{
@@ -857,17 +851,8 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
latestEvents[i] = res.LatestEvents[i].EventID
}
- servers := []gomatrixserverlib.ServerName{t.Origin}
- serverReq := &api.QueryServerJoinedToRoomRequest{
- RoomID: e.RoomID(),
- }
- serverRes := &api.QueryServerJoinedToRoomResponse{}
- if err = t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
- servers = append(servers, serverRes.ServerNames...)
- logger.Infof("Found %d server(s) to query for missing events", len(servers))
- }
-
var missingResp *gomatrixserverlib.RespMissingEvents
+ servers := t.getServers(ctx, e.RoomID())
for _, server := range servers {
var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
@@ -1015,12 +1000,6 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
"concurrent_requests": concurrentRequests,
}).Info("Fetching missing state at event")
- // Get a list of servers to fetch from.
- servers := t.getServers(ctx, roomID)
- if len(servers) > 5 {
- servers = servers[:5]
- }
-
// Create a queue containing all of the missing event IDs that we want
// to retrieve.
pending := make(chan string, missingCount)
@@ -1046,7 +1025,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
// Define what we'll do in order to fetch the missing event ID.
fetch := func(missingEventID string) {
var h *gomatrixserverlib.HeaderedEvent
- h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers)
+ h, err = t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false)
switch err.(type) {
case verifySigError:
return
@@ -1112,7 +1091,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
return &respState, nil
}
-func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) {
+func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
if localFirst {
// fetch from the roomserver
queryReq := api.QueryEventsByIDRequest{
@@ -1127,6 +1106,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
}
var event *gomatrixserverlib.Event
found := false
+ servers := t.getServers(ctx, roomID)
for _, serverName := range servers {
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
diff --git a/go.mod b/go.mod
index 96aa881c..8517ca7e 100644
--- a/go.mod
+++ b/go.mod
@@ -22,7 +22,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
- github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead
+ github.com/matrix-org/gomatrixserverlib v0.0.0-20210216163908-bab1f2be20d0
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.2
diff --git a/go.sum b/go.sum
index a6464cc8..473fffa7 100644
--- a/go.sum
+++ b/go.sum
@@ -567,8 +567,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
-github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead h1:VmGJybKUQin8+NyA9ZkrHJpE8ygXzcON9peQH9LC92c=
-github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20210216163908-bab1f2be20d0 h1:eP8t7DaLKkNz0IT9GcJeG6UTKjfvihIxbAXKN0I7j6g=
+github.com/matrix-org/gomatrixserverlib v0.0.0-20210216163908-bab1f2be20d0/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=
diff --git a/roomserver/internal/query/query.go b/roomserver/internal/query/query.go
index 2a361641..3aa51726 100644
--- a/roomserver/internal/query/query.go
+++ b/roomserver/internal/query/query.go
@@ -112,7 +112,7 @@ func (r *Queryer) QueryStateAfterEvents(
return fmt.Errorf("getAuthChain: %w", err)
}
- stateEvents, err = state.ResolveConflictsAdhoc(info.RoomVersion, stateEvents, authEvents)
+ stateEvents, err = gomatrixserverlib.ResolveConflicts(info.RoomVersion, stateEvents, authEvents)
if err != nil {
return fmt.Errorf("state.ResolveConflictsAdhoc: %w", err)
}
@@ -469,7 +469,7 @@ func (r *Queryer) QueryStateAndAuthChain(
}
if request.ResolveState {
- if stateEvents, err = state.ResolveConflictsAdhoc(
+ if stateEvents, err = gomatrixserverlib.ResolveConflicts(
info.RoomVersion, stateEvents, authEvents,
); err != nil {
return err
diff --git a/roomserver/state/state.go b/roomserver/state/state.go
index 7f0bc37d..2c01ca03 100644
--- a/roomserver/state/state.go
+++ b/roomserver/state/state.go
@@ -683,79 +683,6 @@ func (v *StateResolution) calculateStateAfterManyEvents(
return
}
-// ResolveConflictsAdhoc is a helper function to assist the query API in
-// performing state resolution when requested. This is a different code
-// path to the rest of state.go because this assumes you already have
-// gomatrixserverlib.Event objects and not just a bunch of NIDs like
-// elsewhere in the state resolution.
-// TODO: Some of this can possibly be deduplicated
-func ResolveConflictsAdhoc(
- version gomatrixserverlib.RoomVersion,
- events []*gomatrixserverlib.Event,
- authEvents []*gomatrixserverlib.Event,
-) ([]*gomatrixserverlib.Event, error) {
- type stateKeyTuple struct {
- Type string
- StateKey string
- }
-
- // Prepare our data structures.
- eventMap := make(map[stateKeyTuple][]*gomatrixserverlib.Event)
- var conflicted, notConflicted, resolved []*gomatrixserverlib.Event
-
- // Run through all of the events that we were given and sort them
- // into a map, sorted by (event_type, state_key) tuple. This means
- // that we can easily spot events that are "conflicted", e.g.
- // there are duplicate values for the same tuple key.
- for _, event := range events {
- if event.StateKey() == nil {
- // Ignore events that are not state events.
- continue
- }
- // Append the events if there is already a conflicted list for
- // this tuple key, create it if not.
- tuple := stateKeyTuple{event.Type(), *event.StateKey()}
- eventMap[tuple] = append(eventMap[tuple], event)
- }
-
- // Split out the events in the map into conflicted and unconflicted
- // buckets. The conflicted events will be ran through state res,
- // whereas unconfliced events will always going to appear in the
- // final resolved state.
- for _, list := range eventMap {
- if len(list) > 1 {
- conflicted = append(conflicted, list...)
- } else {
- notConflicted = append(notConflicted, list...)
- }
- }
-
- // Work out which state resolution algorithm we want to run for
- // the room version.
- stateResAlgo, err := version.StateResAlgorithm()
- if err != nil {
- return nil, err
- }
- switch stateResAlgo {
- case gomatrixserverlib.StateResV1:
- // Currently state res v1 doesn't handle unconflicted events
- // for us, like state res v2 does, so we will need to add the
- // unconflicted events into the state ourselves.
- // TODO: Fix state res v1 so this is handled for the caller.
- resolved = gomatrixserverlib.ResolveStateConflicts(conflicted, authEvents)
- resolved = append(resolved, notConflicted...)
- case gomatrixserverlib.StateResV2:
- // TODO: auth difference here?
- resolved = gomatrixserverlib.ResolveStateConflictsV2(conflicted, notConflicted, authEvents, authEvents)
- default:
- return nil, fmt.Errorf("unsupported state resolution algorithm %v", stateResAlgo)
- }
-
- // Return the final resolved state events, including both the
- // resolved set of conflicted events, and the unconflicted events.
- return resolved, nil
-}
-
func (v *StateResolution) resolveConflicts(
ctx context.Context, version gomatrixserverlib.RoomVersion,
notConflicted, conflicted []types.StateEntry,