From 3913d0117bd034b5d7467eeed6b10c59dae90ce8 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 18 Jun 2024 17:41:09 +0200 Subject: wallet-core: round-robin waiting for long-polling With dynamic long-poll timeout computation --- packages/taler-harness/src/bench2.ts | 1 - packages/taler-wallet-core/src/dbless.ts | 44 +++++++-------- packages/taler-wallet-core/src/deposits.ts | 34 ++++++++---- packages/taler-wallet-core/src/pay-merchant.ts | 21 ++++++-- .../taler-wallet-core/src/pay-peer-pull-credit.ts | 30 +++++++---- .../taler-wallet-core/src/pay-peer-push-credit.ts | 16 ++++-- .../taler-wallet-core/src/pay-peer-push-debit.ts | 17 +++--- packages/taler-wallet-core/src/wallet.ts | 62 ++++++++++++++++++++++ packages/taler-wallet-core/src/withdraw.ts | 38 ++++++++----- 9 files changed, 190 insertions(+), 73 deletions(-) (limited to 'packages') diff --git a/packages/taler-harness/src/bench2.ts b/packages/taler-harness/src/bench2.ts index 90924caec..dc360ae62 100644 --- a/packages/taler-harness/src/bench2.ts +++ b/packages/taler-harness/src/bench2.ts @@ -30,7 +30,6 @@ import { applyRunConfigDefaults, CryptoDispatcher, SynchronousCryptoWorkerFactoryPlain, - Wallet, } from "@gnu-taler/taler-wallet-core"; import { checkReserve, diff --git a/packages/taler-wallet-core/src/dbless.ts b/packages/taler-wallet-core/src/dbless.ts index ec9655e6f..bc0b6e428 100644 --- a/packages/taler-wallet-core/src/dbless.ts +++ b/packages/taler-wallet-core/src/dbless.ts @@ -56,9 +56,9 @@ import { } from "@gnu-taler/taler-util/http"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { DenominationRecord } from "./db.js"; +import { isWithdrawableDenom } from "./denominations.js"; import { ExchangeInfo, downloadExchangeInfo } from "./exchanges.js"; import { assembleRefreshRevealRequest } from "./refresh.js"; -import { isWithdrawableDenom } from "./denominations.js"; import { getBankStatusUrl, getBankWithdrawalInfo } from "./withdraw.js"; export { downloadExchangeInfo }; @@ -85,26 +85,6 @@ export interface CoinInfo { maxAge: number; } -/** - * Check the status of a reserve, use long-polling to wait - * until the reserve actually has been created. - */ -export async function checkReserve( - http: HttpRequestLibrary, - exchangeBaseUrl: string, - reservePub: string, - longpollTimeoutMs: number = 500, -): Promise { - const reqUrl = new URL(`reserves/${reservePub}`, exchangeBaseUrl); - if (longpollTimeoutMs) { - reqUrl.searchParams.set("timeout_ms", `${longpollTimeoutMs}`); - } - const resp = await http.fetch(reqUrl.href, { method: "GET" }); - if (resp.status !== 200) { - throw new Error("reserve not okay"); - } -} - export interface TopupReserveWithBankArgs { http: HttpRequestLibrary; reservePub: string; @@ -415,3 +395,25 @@ export async function createTestingReserve(args: { ); await readSuccessResponseJsonOrThrow(fbReq, codecForAny()); } + +/** + * Check the status of a reserve, use long-polling to wait + * until the reserve actually has been created. + */ +export async function checkReserve( + http: HttpRequestLibrary, + exchangeBaseUrl: string, + reservePub: string, + longpollTimeoutMs: number = 500, +): Promise { + const reqUrl = new URL(`reserves/${reservePub}`, exchangeBaseUrl); + if (longpollTimeoutMs) { + reqUrl.searchParams.set("timeout_ms", `${longpollTimeoutMs}`); + } + const resp = await http.fetch(reqUrl.href, { + method: "GET", + }); + if (resp.status !== 200) { + throw new Error("reserve not okay"); + } +} diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts index 6394fdc78..919536c96 100644 --- a/packages/taler-wallet-core/src/deposits.ts +++ b/packages/taler-wallet-core/src/deposits.ts @@ -669,12 +669,19 @@ async function processDepositGroupPendingKyc( `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, kycInfo.exchangeBaseUrl, ); - url.searchParams.set("timeout_ms", "10000"); - logger.info(`kyc url ${url.href}`); - const kycStatusRes = await wex.http.fetch(url.href, { - method: "GET", - cancellationToken: wex.cancellationToken, - }); + + const kycStatusRes = await wex.ws.runLongpollQueueing( + url.hostname, + async (timeoutMs) => { + url.searchParams.set("timeout_ms", `${timeoutMs}`); + logger.info(`kyc url ${url.href}`); + return await wex.http.fetch(url.href, { + method: "GET", + cancellationToken: wex.cancellationToken, + }); + }, + ); + if ( kycStatusRes.status === HttpStatusCode.Ok || //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge @@ -1278,11 +1285,16 @@ async function trackDeposit( wireHash, }); url.searchParams.set("merchant_sig", sigResp.sig); - url.searchParams.set("timeout_ms", "30000"); - const httpResp = await wex.http.fetch(url.href, { - method: "GET", - cancellationToken: wex.cancellationToken, - }); + const httpResp = await wex.ws.runLongpollQueueing( + url.hostname, + async (timeoutMs) => { + url.searchParams.set("timeout_ms", `${timeoutMs}`); + return await wex.http.fetch(url.href, { + method: "GET", + cancellationToken: wex.cancellationToken, + }); + }, + ); logger.trace(`deposits response status: ${httpResp.status}`); switch (httpResp.status) { case HttpStatusCode.Accepted: { diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts index 28fb204dd..b5d2670ad 100644 --- a/packages/taler-wallet-core/src/pay-merchant.ts +++ b/packages/taler-wallet-core/src/pay-merchant.ts @@ -95,6 +95,7 @@ import { } from "@gnu-taler/taler-util"; import { getHttpResponseErrorDetails, + HttpResponse, readSuccessResponseJsonOrErrorCode, readSuccessResponseJsonOrThrow, readTalerErrorResponse, @@ -2870,14 +2871,24 @@ async function checkIfOrderIsAlreadyPaid( ); requestUrl.searchParams.set("h_contract", contract.contractTermsHash); + let resp: HttpResponse; + if (doLongPolling) { - requestUrl.searchParams.set("timeout_ms", "30000"); + resp = await wex.ws.runLongpollQueueing( + requestUrl.hostname, + async (timeoutMs) => { + requestUrl.searchParams.set("timeout_ms", `${timeoutMs}`); + return await wex.http.fetch(requestUrl.href, { + cancellationToken: wex.cancellationToken, + }); + }, + ); + } else { + resp = await wex.http.fetch(requestUrl.href, { + cancellationToken: wex.cancellationToken, + }); } - const resp = await wex.http.fetch(requestUrl.href, { - cancellationToken: wex.cancellationToken, - }); - if ( resp.status === HttpStatusCode.Ok || resp.status === HttpStatusCode.Accepted || diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts index 3e7fdd36b..ae09a1280 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -374,10 +374,15 @@ async function queryPurseForPeerPullCredit( ); purseDepositUrl.searchParams.set("timeout_ms", "30000"); logger.info(`querying purse status via ${purseDepositUrl.href}`); - const resp = await wex.http.fetch(purseDepositUrl.href, { - timeout: { d_ms: 60000 }, - cancellationToken: wex.cancellationToken, - }); + const resp = await wex.ws.runLongpollQueueing( + purseDepositUrl.hostname, + async () => { + return await wex.http.fetch(purseDepositUrl.href, { + timeout: { d_ms: 60000 }, + cancellationToken: wex.cancellationToken, + }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, pursePub: pullIni.pursePub, @@ -488,12 +493,17 @@ async function longpollKycStatus( `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, exchangeUrl, ); - url.searchParams.set("timeout_ms", "10000"); - logger.info(`kyc url ${url.href}`); - const kycStatusRes = await wex.http.fetch(url.href, { - method: "GET", - cancellationToken: wex.cancellationToken, - }); + const kycStatusRes = await wex.ws.runLongpollQueueing( + url.hostname, + async (timeoutMs) => { + url.searchParams.set("timeout_ms", `${timeoutMs}`); + logger.info(`kyc url ${url.href}`); + return await wex.http.fetch(url.href, { + method: "GET", + cancellationToken: wex.cancellationToken, + }); + }, + ); if ( kycStatusRes.status === HttpStatusCode.Ok || // FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts index 14f32b545..5e8fc5941 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts @@ -537,12 +537,18 @@ async function longpollKycStatus( `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, exchangeUrl, ); - url.searchParams.set("timeout_ms", "30000"); logger.info(`kyc url ${url.href}`); - const kycStatusRes = await wex.http.fetch(url.href, { - method: "GET", - cancellationToken: wex.cancellationToken, - }); + const kycStatusRes = await wex.ws.runLongpollQueueing( + url.hostname, + async (timeoutMs) => { + url.searchParams.set("timeout_ms", `${timeoutMs}`); + return await wex.http.fetch(url.href, { + method: "GET", + cancellationToken: wex.cancellationToken, + }); + }, + ); + if ( kycStatusRes.status === HttpStatusCode.Ok || //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index 6603cc4f3..544273552 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -944,12 +944,17 @@ async function processPeerPushDebitReady( `purses/${pursePub}/merge`, peerPushInitiation.exchangeBaseUrl, ); - mergeUrl.searchParams.set("timeout_ms", "30000"); - logger.info(`long-polling on purse status at ${mergeUrl.href}`); - const resp = await wex.http.fetch(mergeUrl.href, { - // timeout: getReserveRequestTimeout(withdrawalGroup), - cancellationToken: wex.cancellationToken, - }); + const resp = await wex.ws.runLongpollQueueing( + mergeUrl.hostname, + async (timeoutMs) => { + mergeUrl.searchParams.set("timeout_ms", `${timeoutMs}`); + logger.info(`long-polling on purse status at ${mergeUrl.href}`); + return await wex.http.fetch(mergeUrl.href, { + // timeout: getReserveRequestTimeout(withdrawalGroup), + cancellationToken: wex.cancellationToken, + }); + }, + ); if (resp.status === HttpStatusCode.Ok) { const purseStatus = await readSuccessResponseJsonOrThrow( resp, diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 69f0e5f0b..fb0bd9f62 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -1916,6 +1916,16 @@ class WalletDbTriggerSpec implements TriggerSpec { } } +type LongpollRunFn = (timeoutMs: number) => Promise; +type ResolveFn = () => void; + +/** + * Per-hostname state for longpolling. + */ +interface LongpollState { + queue: Array; +} + /** * Internal state of the wallet. * @@ -1977,6 +1987,58 @@ export class InternalWalletState { return this._dbAccessHandle; } + private longpollStatePerHostname: Map = new Map(); + + private longpollRequestIdCounter = 1; + + /** + * Run a long-polling request, potentially queueing the request + * if too many other long-polling requests against the same hostname + * (or too many overall) are active. + */ + async runLongpollQueueing( + hostname: string, + f: LongpollRunFn, + ): Promise { + let rid = this.longpollRequestIdCounter++; + const doRun: () => Promise = async () => { + const st = this.longpollStatePerHostname.get(hostname); + const numWaiting = st?.queue.length ?? 0; + logger.info( + `running long-poll ${rid} to ${hostname} with ${numWaiting} waiting`, + ); + try { + const timeoutMs = Math.round(Math.max(10000, 30000 / (numWaiting + 1))); + return await f(timeoutMs); + } finally { + logger.info( + `cleaning up after long-poll ${rid} request to ${hostname}`, + ); + if (st) { + const next = st.queue.shift(); + if (next) { + next(); + } else { + this.longpollStatePerHostname.delete(hostname); + } + } + } + }; + const state = this.longpollStatePerHostname.get(hostname); + if (state) { + logger.info(`long-poll request ${rid} to ${hostname} queued`); + const promcap = openPromise(); + state.queue.push(promcap.resolve); + return promcap.promise.then(doRun); + } else { + logger.info(`directly running long-poll request ${rid} to ${hostname}`); + this.longpollStatePerHostname.set(hostname, { + queue: [], + }); + return Promise.resolve().then(doRun); + } + } + devExperimentState: DevExperimentState = {}; clientCancellationMap: Map = new Map(); diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 2af8807cc..7ba69d2c1 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -45,7 +45,6 @@ import { EddsaPrivateKeyString, ExchangeBatchWithdrawRequest, ExchangeListItem, - ExchangeTosStatus, ExchangeUpdateStatus, ExchangeWireAccount, ExchangeWithdrawBatchResponse, @@ -1614,14 +1613,18 @@ async function processQueryReserve( `reserves/${reservePub}`, withdrawalGroup.exchangeBaseUrl, ); - reserveUrl.searchParams.set("timeout_ms", "30000"); - logger.trace(`querying reserve status via ${reserveUrl.href}`); - - const resp = await wex.http.fetch(reserveUrl.href, { - timeout: getReserveRequestTimeout(withdrawalGroup), - cancellationToken: wex.cancellationToken, - }); + const resp = await wex.ws.runLongpollQueueing( + reserveUrl.hostname, + async (timeoutMs) => { + reserveUrl.searchParams.set("timeout_ms", `${timeoutMs}`); + logger.trace(`querying reserve status via ${reserveUrl.href}`); + return await wex.http.fetch(reserveUrl.href, { + timeout: getReserveRequestTimeout(withdrawalGroup), + cancellationToken: wex.cancellationToken, + }); + }, + ); logger.trace(`reserve status code: HTTP ${resp.status}`); @@ -1760,12 +1763,19 @@ async function processWithdrawalGroupPendingKyc( `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, exchangeUrl, ); - url.searchParams.set("timeout_ms", "30000"); - logger.info(`long-polling for withdrawal KYC status via ${url.href}`); - const kycStatusRes = await wex.http.fetch(url.href, { - method: "GET", - cancellationToken: wex.cancellationToken, - }); + + const kycStatusRes = await wex.ws.runLongpollQueueing( + url.hostname, + async (timeoutMs) => { + url.searchParams.set("timeout_ms", `${timeoutMs}`); + logger.info(`long-polling for withdrawal KYC status via ${url.href}`); + return await wex.http.fetch(url.href, { + method: "GET", + cancellationToken: wex.cancellationToken, + }); + }, + ); + logger.info(`kyc long-polling response status: HTTP ${kycStatusRes.status}`); if ( kycStatusRes.status === HttpStatusCode.Ok || -- cgit v1.2.3