From 2819792fd2b22039ed405c32f09ada6334ab0c79 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 18 Jun 2024 20:46:31 +0200 Subject: wallet-core: allow cancelling queued long-poll requests --- packages/taler-wallet-core/src/deposits.ts | 2 ++ packages/taler-wallet-core/src/pay-merchant.ts | 2 ++ .../taler-wallet-core/src/pay-peer-pull-credit.ts | 2 ++ .../taler-wallet-core/src/pay-peer-push-credit.ts | 1 + .../taler-wallet-core/src/pay-peer-push-debit.ts | 1 + packages/taler-wallet-core/src/wallet.ts | 41 ++++++++++++++-------- packages/taler-wallet-core/src/withdraw.ts | 3 ++ 7 files changed, 38 insertions(+), 14 deletions(-) (limited to 'packages') diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts index 919536c96..855db836a 100644 --- a/packages/taler-wallet-core/src/deposits.ts +++ b/packages/taler-wallet-core/src/deposits.ts @@ -671,6 +671,7 @@ async function processDepositGroupPendingKyc( ); const kycStatusRes = await wex.ws.runLongpollQueueing( + wex, url.hostname, async (timeoutMs) => { url.searchParams.set("timeout_ms", `${timeoutMs}`); @@ -1286,6 +1287,7 @@ async function trackDeposit( }); url.searchParams.set("merchant_sig", sigResp.sig); const httpResp = await wex.ws.runLongpollQueueing( + wex, url.hostname, async (timeoutMs) => { url.searchParams.set("timeout_ms", `${timeoutMs}`); diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts index 9e3b37c51..76c96925f 100644 --- a/packages/taler-wallet-core/src/pay-merchant.ts +++ b/packages/taler-wallet-core/src/pay-merchant.ts @@ -2875,6 +2875,7 @@ async function checkIfOrderIsAlreadyPaid( if (doLongPolling) { resp = await wex.ws.runLongpollQueueing( + wex, requestUrl.hostname, async (timeoutMs) => { requestUrl.searchParams.set("timeout_ms", `${timeoutMs}`); @@ -3031,6 +3032,7 @@ async function processPurchaseAutoRefund( requestUrl.searchParams.set("refund", Amounts.stringify(totalKnownRefund)); const resp = await wex.ws.runLongpollQueueing( + wex, requestUrl.hostname, async (timeoutMs) => { requestUrl.searchParams.set("timeout_ms", `${timeoutMs}`); diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts index ae09a1280..588fe5c11 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -375,6 +375,7 @@ async function queryPurseForPeerPullCredit( purseDepositUrl.searchParams.set("timeout_ms", "30000"); logger.info(`querying purse status via ${purseDepositUrl.href}`); const resp = await wex.ws.runLongpollQueueing( + wex, purseDepositUrl.hostname, async () => { return await wex.http.fetch(purseDepositUrl.href, { @@ -494,6 +495,7 @@ async function longpollKycStatus( exchangeUrl, ); const kycStatusRes = await wex.ws.runLongpollQueueing( + wex, url.hostname, async (timeoutMs) => { url.searchParams.set("timeout_ms", `${timeoutMs}`); diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts index 5e8fc5941..02b07c429 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts @@ -539,6 +539,7 @@ async function longpollKycStatus( ); logger.info(`kyc url ${url.href}`); const kycStatusRes = await wex.ws.runLongpollQueueing( + wex, url.hostname, async (timeoutMs) => { url.searchParams.set("timeout_ms", `${timeoutMs}`); diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index 544273552..c04d37f1c 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -945,6 +945,7 @@ async function processPeerPushDebitReady( peerPushInitiation.exchangeBaseUrl, ); const resp = await wex.ws.runLongpollQueueing( + wex, mergeUrl.hostname, async (timeoutMs) => { mergeUrl.searchParams.set("timeout_ms", `${timeoutMs}`); diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index fb0bd9f62..97d122480 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -1997,11 +1997,25 @@ export class InternalWalletState { * (or too many overall) are active. */ async runLongpollQueueing( + wex: WalletExecutionContext, hostname: string, f: LongpollRunFn, ): Promise { let rid = this.longpollRequestIdCounter++; - const doRun: () => Promise = async () => { + const triggerNextLongpoll = () => { + logger.info(`cleaning up after long-poll ${rid} request to ${hostname}`); + const st = this.longpollStatePerHostname.get(hostname); + if (!st) { + return; + } + const next = st.queue.shift(); + if (next) { + next(); + } else { + this.longpollStatePerHostname.delete(hostname); + } + }; + const doRunLongpoll: () => Promise = async () => { const st = this.longpollStatePerHostname.get(hostname); const numWaiting = st?.queue.length ?? 0; logger.info( @@ -2011,17 +2025,7 @@ export class InternalWalletState { const timeoutMs = Math.round(Math.max(10000, 30000 / (numWaiting + 1))); return await f(timeoutMs); } finally { - logger.info( - `cleaning up after long-poll ${rid} request to ${hostname}`, - ); - if (st) { - const next = st.queue.shift(); - if (next) { - next(); - } else { - this.longpollStatePerHostname.delete(hostname); - } - } + triggerNextLongpoll(); } }; const state = this.longpollStatePerHostname.get(hostname); @@ -2029,13 +2033,22 @@ export class InternalWalletState { logger.info(`long-poll request ${rid} to ${hostname} queued`); const promcap = openPromise(); state.queue.push(promcap.resolve); - return promcap.promise.then(doRun); + try { + await wex.cancellationToken.racePromise(promcap.promise); + } catch (e) { + logger.info( + `long-poll request ${rid} to ${hostname} cancelled while queued`, + ); + triggerNextLongpoll(); + throw e; + } + return doRunLongpoll(); } else { logger.info(`directly running long-poll request ${rid} to ${hostname}`); this.longpollStatePerHostname.set(hostname, { queue: [], }); - return Promise.resolve().then(doRun); + return Promise.resolve().then(doRunLongpoll); } } diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 7bf829db5..dbe1cf207 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -826,6 +826,7 @@ async function processWithdrawalGroupDialogProposed( url.searchParams.set("old_state", "pending"); const resp = await ctx.wex.ws.runLongpollQueueing( + ctx.wex, url.hostname, async (timeoutMs) => { url.searchParams.set("long_poll_ms", `${timeoutMs}`); @@ -1621,6 +1622,7 @@ async function processQueryReserve( ); const resp = await wex.ws.runLongpollQueueing( + wex, reserveUrl.hostname, async (timeoutMs) => { reserveUrl.searchParams.set("timeout_ms", `${timeoutMs}`); @@ -1771,6 +1773,7 @@ async function processWithdrawalGroupPendingKyc( ); const kycStatusRes = await wex.ws.runLongpollQueueing( + wex, url.hostname, async (timeoutMs) => { url.searchParams.set("timeout_ms", `${timeoutMs}`); -- cgit v1.2.3