aboutsummaryrefslogtreecommitdiff
path: root/federationapi/internal/federationclient.go
diff options
context:
space:
mode:
authorNeil Alexander <neilalexander@users.noreply.github.com>2022-01-27 14:29:14 +0000
committerGitHub <noreply@github.com>2022-01-27 14:29:14 +0000
commita763cbb0e1a12828dade855add9a6c30c784baa8 (patch)
tree6d923ee79951fd8b70b6941b86155c9db6294c8f /federationapi/internal/federationclient.go
parent5b4999afa9cb095eb5b4c8c163d4063078477baa (diff)
Roomserver/federation input refactor (#2104)
* Put federation client functions into their own file * Look for missing auth events in RS input * Remove retrieveMissingAuthEvents from federation API * Logging * Sorta transplanted the code over * Use event origin failing all else * Don't get stuck on mutexes: * Add verifier * Don't mark state events with zero snapshot NID as not existing * Check missing state if not an outlier before storing the event * Reject instead of soft-fail, don't copy roominfo so much * Use synchronous contexts, limit time to fetch missing events * Clean up some commented out bits * Simplify `/send` endpoint significantly * Submit async * Report errors on sending to RS input * Set max payload in NATS to 16MB * Tweak metrics * Add `workerForRoom` for tidiness * Try skipping unmarshalling errors for RespMissingEvents * Track missing prev events separately to avoid calculating state when not possible * Tweak logic around checking missing state * Care about state when checking missing prev events * Don't check missing state for create events * Try that again * Handle create events better * Send create room events as new * Use given event kind when sending auth/state events * Revert "Use given event kind when sending auth/state events" This reverts commit 089d64d271b5fca8c104e1554711187420dbebca. * Only search for missing prev events or state for new events * Tweaks * We only have missing prev if we don't supply state * Room version tweaks * Allow async inputs again * Apply backpressure to consumers/synchronous requests to hopefully stop things being overwhelmed * Set timeouts on roomserver input tasks (need to decide what timeout makes sense) * Use work queue policy, deliver all on restart * Reduce chance of duplicates being sent by NATS * Limit the number of servers we attempt to reduce backpressure * Some review comment fixes * Tidy up a couple things * Don't limit servers, randomise order using map * Some context refactoring * Update gmsl * Don't resend create events * Set stateIDs length correctly or else the roomserver thinks there are missing events when there aren't * Exclude our own servername * Try backing off servers * Make excluding self behaviour optional * Exclude self from g_m_e * Update sytest-whitelist * Update consumers for the roomserver output stream * Remember to send outliers for state returned from /gme * Make full HTTP tests less upsetti * Remove 'If a device list update goes missing, the server resyncs on the next one' from the sytest blacklist * Remove debugging test * Fix blacklist again, remove unnecessary duplicate context * Clearer contexts, don't use background in case there's something happening there * Don't queue up events more than once in memory * Correctly identify create events when checking for state * Fill in gaps again in /gme code * Remove `AuthEventIDs` from `InputRoomEvent` * Remove stray field Co-authored-by: Kegan Dougal <kegan@matrix.org>
Diffstat (limited to 'federationapi/internal/federationclient.go')
-rw-r--r--federationapi/internal/federationclient.go180
1 files changed, 180 insertions, 0 deletions
diff --git a/federationapi/internal/federationclient.go b/federationapi/internal/federationclient.go
new file mode 100644
index 00000000..b31db466
--- /dev/null
+++ b/federationapi/internal/federationclient.go
@@ -0,0 +1,180 @@
+package internal
+
+import (
+ "context"
+ "time"
+
+ "github.com/matrix-org/gomatrixserverlib"
+)
+
+// Functions here are "proxying" calls to the gomatrixserverlib federation
+// client.
+
+func (a *FederationInternalAPI) GetEventAuth(
+ ctx context.Context, s gomatrixserverlib.ServerName,
+ roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string,
+) (res gomatrixserverlib.RespEventAuth, err error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.GetEventAuth(ctx, s, roomVersion, roomID, eventID)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespEventAuth{}, err
+ }
+ return ires.(gomatrixserverlib.RespEventAuth), nil
+}
+
+func (a *FederationInternalAPI) GetUserDevices(
+ ctx context.Context, s gomatrixserverlib.ServerName, userID string,
+) (gomatrixserverlib.RespUserDevices, error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.GetUserDevices(ctx, s, userID)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespUserDevices{}, err
+ }
+ return ires.(gomatrixserverlib.RespUserDevices), nil
+}
+
+func (a *FederationInternalAPI) ClaimKeys(
+ ctx context.Context, s gomatrixserverlib.ServerName, oneTimeKeys map[string]map[string]string,
+) (gomatrixserverlib.RespClaimKeys, error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+ ires, err := a.doRequestIfNotBackingOffOrBlacklisted(s, func() (interface{}, error) {
+ return a.federation.ClaimKeys(ctx, s, oneTimeKeys)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespClaimKeys{}, err
+ }
+ return ires.(gomatrixserverlib.RespClaimKeys), nil
+}
+
+func (a *FederationInternalAPI) QueryKeys(
+ ctx context.Context, s gomatrixserverlib.ServerName, keys map[string][]string,
+) (gomatrixserverlib.RespQueryKeys, error) {
+ ires, err := a.doRequestIfNotBackingOffOrBlacklisted(s, func() (interface{}, error) {
+ return a.federation.QueryKeys(ctx, s, keys)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespQueryKeys{}, err
+ }
+ return ires.(gomatrixserverlib.RespQueryKeys), nil
+}
+
+func (a *FederationInternalAPI) Backfill(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID string, limit int, eventIDs []string,
+) (res gomatrixserverlib.Transaction, err error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.Backfill(ctx, s, roomID, limit, eventIDs)
+ })
+ if err != nil {
+ return gomatrixserverlib.Transaction{}, err
+ }
+ return ires.(gomatrixserverlib.Transaction), nil
+}
+
+func (a *FederationInternalAPI) LookupState(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
+) (res gomatrixserverlib.RespState, err error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.LookupState(ctx, s, roomID, eventID, roomVersion)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespState{}, err
+ }
+ return ires.(gomatrixserverlib.RespState), nil
+}
+
+func (a *FederationInternalAPI) LookupStateIDs(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID, eventID string,
+) (res gomatrixserverlib.RespStateIDs, err error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.LookupStateIDs(ctx, s, roomID, eventID)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespStateIDs{}, err
+ }
+ return ires.(gomatrixserverlib.RespStateIDs), nil
+}
+
+func (a *FederationInternalAPI) LookupMissingEvents(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID string,
+ missing gomatrixserverlib.MissingEvents, roomVersion gomatrixserverlib.RoomVersion,
+) (res gomatrixserverlib.RespMissingEvents, err error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.LookupMissingEvents(ctx, s, roomID, missing, roomVersion)
+ })
+ if err != nil {
+ return gomatrixserverlib.RespMissingEvents{}, err
+ }
+ return ires.(gomatrixserverlib.RespMissingEvents), nil
+}
+
+func (a *FederationInternalAPI) GetEvent(
+ ctx context.Context, s gomatrixserverlib.ServerName, eventID string,
+) (res gomatrixserverlib.Transaction, err error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Second*30)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.GetEvent(ctx, s, eventID)
+ })
+ if err != nil {
+ return gomatrixserverlib.Transaction{}, err
+ }
+ return ires.(gomatrixserverlib.Transaction), nil
+}
+
+func (a *FederationInternalAPI) LookupServerKeys(
+ ctx context.Context, s gomatrixserverlib.ServerName, keyRequests map[gomatrixserverlib.PublicKeyLookupRequest]gomatrixserverlib.Timestamp,
+) ([]gomatrixserverlib.ServerKeys, error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Minute)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.LookupServerKeys(ctx, s, keyRequests)
+ })
+ if err != nil {
+ return []gomatrixserverlib.ServerKeys{}, err
+ }
+ return ires.([]gomatrixserverlib.ServerKeys), nil
+}
+
+func (a *FederationInternalAPI) MSC2836EventRelationships(
+ ctx context.Context, s gomatrixserverlib.ServerName, r gomatrixserverlib.MSC2836EventRelationshipsRequest,
+ roomVersion gomatrixserverlib.RoomVersion,
+) (res gomatrixserverlib.MSC2836EventRelationshipsResponse, err error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Minute)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.MSC2836EventRelationships(ctx, s, r, roomVersion)
+ })
+ if err != nil {
+ return res, err
+ }
+ return ires.(gomatrixserverlib.MSC2836EventRelationshipsResponse), nil
+}
+
+func (a *FederationInternalAPI) MSC2946Spaces(
+ ctx context.Context, s gomatrixserverlib.ServerName, roomID string, r gomatrixserverlib.MSC2946SpacesRequest,
+) (res gomatrixserverlib.MSC2946SpacesResponse, err error) {
+ ctx, cancel := context.WithTimeout(ctx, time.Minute)
+ defer cancel()
+ ires, err := a.doRequestIfNotBlacklisted(s, func() (interface{}, error) {
+ return a.federation.MSC2946Spaces(ctx, s, roomID, r)
+ })
+ if err != nil {
+ return res, err
+ }
+ return ires.(gomatrixserverlib.MSC2946SpacesResponse), nil
+}