diff options
author | Florian Dold <florian@dold.me> | 2022-09-23 18:56:21 +0200 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2022-09-23 20:38:26 +0200 |
commit | 72336b149b4c27715e4e2f7610ec4007ecccdbd9 (patch) | |
tree | f209095fc98fd47d4681499662276f4cde5486ba /packages | |
parent | 9811e19252ef859099fa5c16d703808f6c778a94 (diff) |
wallet-core: do not block when accepting a manual withdrawal
Diffstat (limited to 'packages')
-rw-r--r-- | packages/taler-util/src/walletTypes.ts | 1 | ||||
-rw-r--r-- | packages/taler-wallet-cli/package.json | 2 | ||||
-rw-r--r-- | packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts | 15 | ||||
-rw-r--r-- | packages/taler-wallet-core/package.json | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/internal-wallet-state.ts | 16 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/operations/withdraw.ts | 80 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 71 |
7 files changed, 130 insertions, 57 deletions
diff --git a/packages/taler-util/src/walletTypes.ts b/packages/taler-util/src/walletTypes.ts index 35cb14837..64daee776 100644 --- a/packages/taler-util/src/walletTypes.ts +++ b/packages/taler-util/src/walletTypes.ts @@ -1378,6 +1378,7 @@ export interface DepositGroupFees { wire: AmountJson; refresh: AmountJson; } + export interface CreateDepositGroupRequest { depositPaytoUri: string; amount: AmountString; diff --git a/packages/taler-wallet-cli/package.json b/packages/taler-wallet-cli/package.json index d01ce9465..2316a7d48 100644 --- a/packages/taler-wallet-cli/package.json +++ b/packages/taler-wallet-cli/package.json @@ -46,7 +46,7 @@ "dependencies": { "@gnu-taler/taler-util": "workspace:*", "@gnu-taler/taler-wallet-core": "workspace:*", - "axios": "^0.25.0", + "axios": "^0.27.2", "source-map-support": "^0.5.21", "tslib": "^2.3.1" } diff --git a/packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts b/packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts index 6ae0e65e7..b691ae508 100644 --- a/packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts +++ b/packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts @@ -20,6 +20,11 @@ import { GlobalTestState } from "../harness/harness.js"; import { createSimpleTestkudosEnvironment } from "../harness/helpers.js"; import { WalletApiOperation, BankApi } from "@gnu-taler/taler-wallet-core"; +import { + AbsoluteTime, + Duration, + TalerProtocolTimestamp, +} from "@gnu-taler/taler-util"; /** * Run test for basic, bank-integrated withdrawal. @@ -38,6 +43,9 @@ export async function runTestWithdrawalManualTest(t: GlobalTestState) { exchangeBaseUrl: exchange.baseUrl, }); + const tStart = AbsoluteTime.now(); + + // We expect this to return immediately. const wres = await wallet.client.call( WalletApiOperation.AcceptManualWithdrawal, { @@ -46,9 +54,14 @@ export async function runTestWithdrawalManualTest(t: GlobalTestState) { }, ); + // Check that the request did not go into long-polling. + const duration = AbsoluteTime.difference(tStart, AbsoluteTime.now()); + if (duration.d_ms > 5 * 1000) { + throw Error("withdrawal took too long (longpolling issue)"); + } + const reservePub: string = wres.reservePub; - // Bug. await BankApi.adminAddIncoming(bank, { exchangeBankAccount, amount: "TESTKUDOS:10", diff --git a/packages/taler-wallet-core/package.json b/packages/taler-wallet-core/package.json index 80e6c6d2c..c2b75d582 100644 --- a/packages/taler-wallet-core/package.json +++ b/packages/taler-wallet-core/package.json @@ -64,7 +64,7 @@ "@gnu-taler/idb-bridge": "workspace:*", "@gnu-taler/taler-util": "workspace:*", "@types/node": "^17.0.17", - "axios": "^0.25.0", + "axios": "^0.27.2", "big-integer": "^1.6.51", "fflate": "^0.7.3", "source-map-support": "^0.5.21", diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts index ad6afe3c3..b8415a469 100644 --- a/packages/taler-wallet-core/src/internal-wallet-state.ts +++ b/packages/taler-wallet-core/src/internal-wallet-state.ts @@ -127,6 +127,12 @@ export interface RecoupOperations { export type NotificationListener = (n: WalletNotification) => void; +export interface ActiveLongpollInfo { + [opId: string]: { + cancel: () => void; + }; +} + /** * Internal, shard wallet state that is used by the implementation * of wallet operations. @@ -135,12 +141,10 @@ export type NotificationListener = (n: WalletNotification) => void; * as it's an opaque implementation detail. */ export interface InternalWalletState { - memoProcessReserve: AsyncOpMemoMap<void>; - memoMakePlanchet: AsyncOpMemoMap<void>; - memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>; - memoGetBalance: AsyncOpMemoSingle<BalancesResponse>; - memoProcessRefresh: AsyncOpMemoMap<void>; - memoProcessRecoup: AsyncOpMemoMap<void>; + /** + * Active longpoll operations. + */ + activeLongpoll: ActiveLongpollInfo; cryptoApi: TalerCryptoInterface; diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index cedb62361..4901fdc86 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -28,6 +28,7 @@ import { Amounts, AmountString, BankWithdrawDetails, + CancellationToken, canonicalizeBaseUrl, codecForBankWithdrawalOperationPostResponse, codecForReserveStatus, @@ -106,7 +107,12 @@ import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, WALLET_EXCHANGE_PROTOCOL_VERSION, } from "../versions.js"; -import { makeCoinAvailable, storeOperationPending } from "../wallet.js"; +import { + makeCoinAvailable, + runOperationWithErrorReporting, + storeOperationError, + storeOperationPending, +} from "../wallet.js"; import { getExchangeDetails, getExchangePaytoUri, @@ -962,6 +968,7 @@ export async function updateWithdrawalDenoms( async function queryReserve( ws: InternalWalletState, withdrawalGroupId: string, + cancellationToken: CancellationToken, ): Promise<{ ready: boolean }> { const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, { withdrawalGroupId, @@ -982,6 +989,7 @@ async function queryReserve( const resp = await ws.http.get(reserveUrl.href, { timeout: getReserveRequestTimeout(withdrawalGroup), + cancellationToken, }); const result = await readSuccessResponseJsonOrErrorCode( @@ -1044,6 +1052,16 @@ export async function processWithdrawalGroup( throw Error(`withdrawal group ${withdrawalGroupId} not found`); } + const retryTag = RetryTags.forWithdrawal(withdrawalGroup); + + // We're already running! + if (ws.activeLongpoll[retryTag]) { + logger.info("withdrawal group already in long-polling, returning!"); + return { + type: OperationAttemptResultType.Longpoll, + }; + } + switch (withdrawalGroup.status) { case WithdrawalGroupStatus.RegisteringBank: await processReserveBankStatus(ws, withdrawalGroupId); @@ -1051,15 +1069,45 @@ export async function processWithdrawalGroup( forceNow: true, }); case WithdrawalGroupStatus.QueryingStatus: { - const res = await queryReserve(ws, withdrawalGroupId); - if (res.ready) { - return await processWithdrawalGroup(ws, withdrawalGroupId, { - forceNow: true, - }); - } + const doQueryAsync = async () => { + if (ws.stopped) { + logger.info("not long-polling reserve, wallet already stopped"); + await storeOperationPending(ws, retryTag); + return; + } + const cts = CancellationToken.create(); + let res: { ready: boolean } | undefined = undefined; + try { + ws.activeLongpoll[retryTag] = { + cancel: () => { + logger.info("cancel of reserve longpoll requested"); + cts.cancel(); + }, + }; + res = await queryReserve(ws, withdrawalGroupId, cts.token); + } catch (e) { + await storeOperationError( + ws, + retryTag, + getErrorDetailFromException(e), + ); + return; + } + delete ws.activeLongpoll[retryTag]; + logger.info( + `active longpoll keys (2) ${Object.keys(ws.activeLongpoll)}`, + ); + if (!res.ready) { + await storeOperationPending(ws, retryTag); + } + ws.latch.trigger(); + }; + doQueryAsync(); + logger.info( + "returning early from withdrawal for long-polling in background", + ); return { - type: OperationAttemptResultType.Pending, - result: undefined, + type: OperationAttemptResultType.Longpoll, }; } case WithdrawalGroupStatus.WaitConfirmBank: { @@ -1912,10 +1960,16 @@ export async function createManualWithdrawal( return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId); }); - // Start withdrawal in the background. - await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true }).catch( - (err) => { - logger.error("Processing withdrawal (after creation) failed:", err); + // Start withdrawal in the background (do not await!) + // FIXME: We could also interrupt the task look if it is waiting and + // rely on retry handling to re-process the withdrawal group. + runOperationWithErrorReporting( + ws, + RetryTags.forWithdrawal(withdrawalGroup), + async () => { + return await processWithdrawalGroup(ws, withdrawalGroupId, { + forceNow: true, + }); }, ); diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 7890259f2..dd47d7ce3 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -64,38 +64,37 @@ import { codecForSetWalletDeviceIdRequest, codecForTestPayArgs, codecForTrackDepositGroupRequest, + codecForTransactionByIdRequest, codecForTransactionsRequest, codecForWithdrawFakebankRequest, codecForWithdrawTestBalance, CoinDumpJson, CoreApiResponse, + DenominationInfo, Duration, durationFromSpec, durationMin, ExchangeFullDetails, + ExchangeListItem, ExchangesListResponse, + FeeDescription, GetExchangeTosResult, j2s, KnownBankAccounts, Logger, ManualWithdrawalDetails, NotificationType, + OperationMap, parsePaytoUri, - PaytoUri, RefreshReason, TalerErrorCode, - URL, - WalletNotification, - WalletCoreVersion, - ExchangeListItem, - OperationMap, - FeeDescription, TalerErrorDetail, - codecForTransactionByIdRequest, - DenominationInfo, KnownBankAccountsInfo, codecForAddKnownBankAccounts, codecForForgetKnownBankAccounts, + URL, + WalletCoreVersion, + WalletNotification, } from "@gnu-taler/taler-util"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { @@ -119,6 +118,7 @@ import { TalerError, } from "./errors.js"; import { + ActiveLongpollInfo, ExchangeOperations, InternalWalletState, MerchantInfo, @@ -235,7 +235,6 @@ import { OperationAttemptResult, OperationAttemptResultType, RetryInfo, - runOperationHandlerForResult, } from "./util/retries.js"; import { TimerAPI, TimerGroup } from "./util/timer.js"; import { @@ -392,27 +391,21 @@ export async function storeOperationPending( }); } -/** - * Execute one operation based on the pending operation info record. - * - * Store success/failure result in the database. - */ -async function processOnePendingOperation( +export async function runOperationWithErrorReporting( ws: InternalWalletState, - pending: PendingTaskInfo, - forceNow = false, + opId: string, + f: () => Promise<OperationAttemptResult>, ): Promise<void> { - logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`); let maybeError: TalerErrorDetail | undefined; try { - const resp = await callOperationHandler(ws, pending, forceNow); + const resp = await f(); switch (resp.type) { case OperationAttemptResultType.Error: - return await storeOperationError(ws, pending.id, resp.errorDetail); + return await storeOperationError(ws, opId, resp.errorDetail); case OperationAttemptResultType.Finished: - return await storeOperationFinished(ws, pending.id); + return await storeOperationFinished(ws, opId); case OperationAttemptResultType.Pending: - return await storeOperationPending(ws, pending.id); + return await storeOperationPending(ws, opId); case OperationAttemptResultType.Longpoll: break; } @@ -421,7 +414,7 @@ async function processOnePendingOperation( logger.warn("operation processed resulted in error"); logger.warn(`error was: ${j2s(e.errorDetail)}`); maybeError = e.errorDetail; - return await storeOperationError(ws, pending.id, maybeError!); + return await storeOperationError(ws, opId, maybeError!); } else if (e instanceof Error) { // This is a bug, as we expect pending operations to always // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED @@ -435,7 +428,7 @@ async function processOnePendingOperation( }, `unexpected exception (message: ${e.message})`, ); - return await storeOperationError(ws, pending.id, maybeError); + return await storeOperationError(ws, opId, maybeError); } else { logger.error("Uncaught exception, value is not even an error."); maybeError = makeErrorDetail( @@ -443,7 +436,7 @@ async function processOnePendingOperation( {}, `unexpected exception (not even an error)`, ); - return await storeOperationError(ws, pending.id, maybeError); + return await storeOperationError(ws, opId, maybeError); } } } @@ -460,7 +453,10 @@ export async function runPending( if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) { continue; } - await processOnePendingOperation(ws, p, forceNow); + await runOperationWithErrorReporting(ws, p.id, async () => { + logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); + return await callOperationHandler(ws, p, forceNow); + }); } } @@ -563,7 +559,10 @@ async function runTaskLoop( if (!AbsoluteTime.isExpired(p.timestampDue)) { continue; } - await processOnePendingOperation(ws, p); + await runOperationWithErrorReporting(ws, p.id, async () => { + logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); + return await callOperationHandler(ws, p); + }); ws.notify({ type: NotificationType.PendingOperationProcessed, }); @@ -1613,13 +1612,10 @@ export class Wallet { * This ties together all the operation implementations. */ class InternalWalletStateImpl implements InternalWalletState { - memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); - memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); - memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> = - new AsyncOpMemoSingle(); - memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle(); - memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); - memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + /** + * @see {@link InternalWalletState.activeLongpoll} + */ + activeLongpoll: ActiveLongpollInfo = {}; cryptoApi: TalerCryptoInterface; cryptoDispatcher: CryptoDispatcher; @@ -1719,9 +1715,14 @@ class InternalWalletStateImpl implements InternalWalletState { * Stop ongoing processing. */ stop(): void { + logger.info("stopping (at internal wallet state)"); this.stopped = true; this.timerGroup.stopCurrentAndFutureTimers(); this.cryptoDispatcher.stop(); + for (const key of Object.keys(this.activeLongpoll)) { + logger.info(`cancelling active longpoll ${key}`); + this.activeLongpoll[key].cancel(); + } } async runUntilDone( |