From 18c30b9a00a4e5dee629f4e06c261509ff7ba455 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Fri, 10 Feb 2023 13:21:37 +0100 Subject: wallet-core: implement partial withdrawal batching, don't block when generating planchets --- packages/taler-harness/src/harness/harness.ts | 10 +- .../src/integrationtests/test-withdrawal-huge.ts | 3 +- packages/taler-util/src/taler-types.ts | 19 +- .../taler-wallet-core/src/operations/withdraw.ts | 378 ++++++++++----------- 4 files changed, 207 insertions(+), 203 deletions(-) (limited to 'packages') diff --git a/packages/taler-harness/src/harness/harness.ts b/packages/taler-harness/src/harness/harness.ts index 3403c266e..4e5d8238c 100644 --- a/packages/taler-harness/src/harness/harness.ts +++ b/packages/taler-harness/src/harness/harness.ts @@ -1361,7 +1361,12 @@ export class ExchangeService implements ExchangeServiceInterface { this.exchangeWirewatchProc = this.globalState.spawnService( "taler-exchange-wirewatch", - ["-c", this.configFilename, ...this.timetravelArgArr], + [ + "-c", + this.configFilename, + "--longpoll-timeout=5s", + ...this.timetravelArgArr, + ], `exchange-wirewatch-${this.name}`, ); @@ -1951,6 +1956,9 @@ export class WalletService { ], `wallet-${this.opts.name}`, ); + logger.info( + `hint: connect to wallet using taler-wallet-cli --wallet-connection=${unixPath}`, + ); } async pingUntilAvailable(): Promise { diff --git a/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts index 579d727b1..437d799b8 100644 --- a/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts +++ b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts @@ -87,9 +87,10 @@ export async function runWithdrawalHugeTest(t: GlobalTestState) { exchangeBaseUrl: exchange.baseUrl, }); + // Results in about 1K coins withdrawn await wallet.client.call(WalletApiOperation.WithdrawFakebank, { exchange: exchange.baseUrl, - amount: "TESTKUDOS:5000", + amount: "TESTKUDOS:10000", bank: bank.baseUrl, }); diff --git a/packages/taler-util/src/taler-types.ts b/packages/taler-util/src/taler-types.ts index a9303ed9c..bb15f0494 100644 --- a/packages/taler-util/src/taler-types.ts +++ b/packages/taler-util/src/taler-types.ts @@ -951,12 +951,12 @@ export const codecForBlindedDenominationSignature = () => .alternative(DenomKeyType.Rsa, codecForRsaBlindedDenominationSignature()) .build("BlindedDenominationSignature"); -export class WithdrawResponse { +export class ExchangeWithdrawResponse { ev_sig: BlindedDenominationSignature; } -export class WithdrawBatchResponse { - ev_sigs: WithdrawResponse[]; +export class ExchangeWithdrawBatchResponse { + ev_sigs: ExchangeWithdrawResponse[]; } export interface MerchantPayResponse { @@ -1476,13 +1476,13 @@ export const codecForRecoupConfirmation = (): Codec => .property("old_coin_pub", codecOptional(codecForString())) .build("RecoupConfirmation"); -export const codecForWithdrawResponse = (): Codec => - buildCodecForObject() +export const codecForWithdrawResponse = (): Codec => + buildCodecForObject() .property("ev_sig", codecForBlindedDenominationSignature()) .build("WithdrawResponse"); -export const codecForWithdrawBatchResponse = (): Codec => - buildCodecForObject() +export const codecForWithdrawBatchResponse = (): Codec => + buildCodecForObject() .property("ev_sigs", codecForList(codecForWithdrawResponse())) .build("WithdrawBatchResponse"); @@ -1753,6 +1753,11 @@ export interface ExchangeWithdrawRequest { coin_ev: CoinEnvelope; } +export interface ExchangeBatchWithdrawRequest { + planchets: ExchangeWithdrawRequest[]; +} + + export interface ExchangeRefreshRevealRequest { new_denoms_h: HashCodeString[]; coin_evs: CoinEnvelope[]; diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index caa280fe5..987a5e062 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -59,9 +59,11 @@ import { TransactionType, UnblindedSignature, URL, - WithdrawBatchResponse, - WithdrawResponse, + ExchangeWithdrawBatchResponse, + ExchangeWithdrawResponse, WithdrawUriInfoResponse, + ExchangeBatchWithdrawRequest, + WalletNotification, } from "@gnu-taler/taler-util"; import { EddsaKeypair } from "../crypto/cryptoImplementation.js"; import { @@ -93,6 +95,7 @@ import { import { walletCoreDebugFlags } from "../util/debugFlags.js"; import { HttpRequestLibrary, + HttpResponse, readSuccessResponseJsonOrErrorCode, readSuccessResponseJsonOrThrow, throwUnexpectedRequestError, @@ -455,136 +458,21 @@ async function processPlanchetGenerate( }); } -/** - * Send the withdrawal request for a generated planchet to the exchange. - * - * The verification of the response is done asynchronously to enable parallelism. - */ -async function processPlanchetExchangeRequest( - ws: InternalWalletState, - wgContext: WithdrawalGroupContext, - coinIdx: number, -): Promise { - const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; - logger.info( - `processing planchet exchange request ${withdrawalGroup.withdrawalGroupId}/${coinIdx}`, - ); - const d = await ws.db - .mktx((x) => [ - x.withdrawalGroups, - x.planchets, - x.exchanges, - x.denominations, - ]) - .runReadOnly(async (tx) => { - let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) { - logger.warn("processPlanchet: planchet already withdrawn"); - return; - } - const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl); - if (!exchange) { - logger.error("db inconsistent: exchange for planchet not found"); - return; - } - - const denom = await ws.getDenomInfo( - ws, - tx, - withdrawalGroup.exchangeBaseUrl, - planchet.denomPubHash, - ); - - if (!denom) { - logger.error("db inconsistent: denom for planchet not found"); - return; - } - - logger.trace( - `processing planchet #${coinIdx} in withdrawal ${withdrawalGroup.withdrawalGroupId}`, - ); +interface WithdrawalRequestBatchArgs { + /** + * Use the batched request on the network level. + * Not supported by older exchanges. + */ + useBatchRequest: boolean; - const reqBody: ExchangeWithdrawRequest = { - denom_pub_hash: planchet.denomPubHash, - reserve_sig: planchet.withdrawSig, - coin_ev: planchet.coinEv, - }; - const reqUrl = new URL( - `reserves/${withdrawalGroup.reservePub}/withdraw`, - exchange.baseUrl, - ).href; + coinStartIndex: number; - return { reqUrl, reqBody }; - }); + batchSize: number; +} - if (!d) { - return; - } - const { reqUrl, reqBody } = d; - - try { - const resp = await ws.http.postJson(reqUrl, reqBody); - if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { - logger.info("withdrawal requires KYC"); - const respJson = await resp.json(); - const uuidResp = codecForWalletKycUuid().decode(respJson); - logger.info(`kyc uuid response: ${j2s(uuidResp)}`); - await ws.db - .mktx((x) => [x.planchets, x.withdrawalGroups]) - .runReadWrite(async (tx) => { - let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - planchet.planchetStatus = PlanchetStatus.KycRequired; - const wg2 = await tx.withdrawalGroups.get( - withdrawalGroup.withdrawalGroupId, - ); - if (!wg2) { - return; - } - wg2.kycPending = { - paytoHash: uuidResp.h_payto, - requirementRow: uuidResp.requirement_row, - }; - await tx.planchets.put(planchet); - await tx.withdrawalGroups.put(wg2); - }); - return; - } - const r = await readSuccessResponseJsonOrThrow( - resp, - codecForWithdrawResponse(), - ); - return r; - } catch (e) { - const errDetail = getErrorDetailFromException(e); - logger.trace("withdrawal request failed", e); - logger.trace(String(e)); - await ws.db - .mktx((x) => [x.planchets]) - .runReadWrite(async (tx) => { - let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - planchet.lastError = errDetail; - await tx.planchets.put(planchet); - }); - return; - } +interface WithdrawalBatchResult { + coinIdxs: number[]; + batchResp: ExchangeWithdrawBatchResponse; } /** @@ -595,15 +483,18 @@ async function processPlanchetExchangeRequest( async function processPlanchetExchangeBatchRequest( ws: InternalWalletState, wgContext: WithdrawalGroupContext, -): Promise { + args: WithdrawalRequestBatchArgs, +): Promise { const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; logger.info( - `processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}`, + `processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}, start=${args.coinStartIndex}, len=${args.batchSize}`, ); - const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms - .map((x) => x.count) - .reduce((a, b) => a + b); - const d = await ws.db + + const batchReq: ExchangeBatchWithdrawRequest = { planchets: [] }; + // Indices of coins that are included in the batch request + const coinIdxs: number[] = []; + + await ws.db .mktx((x) => [ x.withdrawalGroups, x.planchets, @@ -611,26 +502,22 @@ async function processPlanchetExchangeBatchRequest( x.denominations, ]) .runReadOnly(async (tx) => { - const reqBody: { planchets: ExchangeWithdrawRequest[] } = { - planchets: [], - }; - const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl); - if (!exchange) { - logger.error("db inconsistent: exchange for planchet not found"); - return; - } - - for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { + for ( + let coinIdx = args.coinStartIndex; + coinIdx < args.coinStartIndex + args.batchSize && + coinIdx < wgContext.numPlanchets; + coinIdx++ + ) { let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ withdrawalGroup.withdrawalGroupId, coinIdx, ]); if (!planchet) { - return; + continue; } if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) { logger.warn("processPlanchet: planchet already withdrawn"); - return; + continue; } const denom = await ws.getDenomInfo( ws, @@ -641,7 +528,7 @@ async function processPlanchetExchangeBatchRequest( if (!denom) { logger.error("db inconsistent: denom for planchet not found"); - return; + continue; } const planchetReq: ExchangeWithdrawRequest = { @@ -649,35 +536,145 @@ async function processPlanchetExchangeBatchRequest( reserve_sig: planchet.withdrawSig, coin_ev: planchet.coinEv, }; - reqBody.planchets.push(planchetReq); + batchReq.planchets.push(planchetReq); + coinIdxs.push(coinIdx); } - return reqBody; }); - if (!d) { + if (batchReq.planchets.length == 0) { + logger.warn("empty withdrawal batch"); + return { + batchResp: { ev_sigs: [] }, + coinIdxs: [], + }; + } + + async function handleKycRequired(resp: HttpResponse, startIdx: number) { + logger.info("withdrawal requires KYC"); + const respJson = await resp.json(); + const uuidResp = codecForWalletKycUuid().decode(respJson); + logger.info(`kyc uuid response: ${j2s(uuidResp)}`); + await ws.db + .mktx((x) => [x.planchets, x.withdrawalGroups]) + .runReadWrite(async (tx) => { + for (let i = 0; i < startIdx; i++) { + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroup.withdrawalGroupId, + coinIdxs[i], + ]); + if (!planchet) { + continue; + } + planchet.planchetStatus = PlanchetStatus.KycRequired; + await tx.planchets.put(planchet); + } + const wg2 = await tx.withdrawalGroups.get( + withdrawalGroup.withdrawalGroupId, + ); + if (!wg2) { + return; + } + wg2.kycPending = { + paytoHash: uuidResp.h_payto, + requirementRow: uuidResp.requirement_row, + }; + await tx.withdrawalGroups.put(wg2); + }); return; } - const reqUrl = new URL( - `reserves/${withdrawalGroup.reservePub}/batch-withdraw`, - withdrawalGroup.exchangeBaseUrl, - ).href; + async function storeCoinError(e: any, coinIdx: number) { + const errDetail = getErrorDetailFromException(e); + logger.trace("withdrawal request failed", e); + logger.trace(String(e)); + await ws.db + .mktx((x) => [x.planchets]) + .runReadWrite(async (tx) => { + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroup.withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + planchet.lastError = errDetail; + await tx.planchets.put(planchet); + }); + } - const resp = await ws.http.postJson(reqUrl, d); - const r = await readSuccessResponseJsonOrThrow( - resp, - codecForWithdrawBatchResponse(), - ); - return r; + // FIXME: handle individual error codes better! + + if (args.useBatchRequest) { + const reqUrl = new URL( + `reserves/${withdrawalGroup.reservePub}/batch-withdraw`, + withdrawalGroup.exchangeBaseUrl, + ).href; + + try { + const resp = await ws.http.postJson(reqUrl, batchReq); + if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { + await handleKycRequired(resp, 0); + } + const r = await readSuccessResponseJsonOrThrow( + resp, + codecForWithdrawBatchResponse(), + ); + return { + coinIdxs, + batchResp: r, + }; + } catch (e) { + await storeCoinError(e, coinIdxs[0]); + return { + batchResp: { ev_sigs: [] }, + coinIdxs: [], + }; + } + } else { + // We emulate the batch response here by making multiple individual requests + const responses: ExchangeWithdrawBatchResponse = { + ev_sigs: [], + }; + for (let i = 0; i < batchReq.planchets.length; i++) { + try { + const p = batchReq.planchets[i]; + const reqUrl = new URL( + `reserves/${withdrawalGroup.reservePub}/withdraw`, + withdrawalGroup.exchangeBaseUrl, + ).href; + const resp = await ws.http.postJson(reqUrl, p); + if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { + await handleKycRequired(resp, i); + // We still return blinded coins that we could actually withdraw. + return { + coinIdxs, + batchResp: responses, + }; + } + const r = await readSuccessResponseJsonOrThrow( + resp, + codecForWithdrawResponse(), + ); + responses.ev_sigs.push(r); + } catch (e) { + await storeCoinError(e, coinIdxs[i]); + } + } + return { + coinIdxs, + batchResp: responses, + }; + } } async function processPlanchetVerifyAndStoreCoin( ws: InternalWalletState, wgContext: WithdrawalGroupContext, coinIdx: number, - resp: WithdrawResponse, + resp: ExchangeWithdrawResponse, ): Promise { const withdrawalGroup = wgContext.wgRecord; + logger.info(`checking and storing planchet idx=${coinIdx}`); const d = await ws.db .mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations]) .runReadOnly(async (tx) => { @@ -791,6 +788,14 @@ async function processPlanchetVerifyAndStoreCoin( wgContext.planchetsFinished.add(planchet.coinPub); + // We create the notification here, as the async transaction below + // allows other planchet withdrawals to change wgContext.planchetsFinished + const notification: WalletNotification = { + type: NotificationType.CoinWithdrawn, + numTotal: wgContext.numPlanchets, + numWithdrawn: wgContext.planchetsFinished.size, + } + // Check if this is the first time that the whole // withdrawal succeeded. If so, mark the withdrawal // group as finished. @@ -814,11 +819,7 @@ async function processPlanchetVerifyAndStoreCoin( }); if (firstSuccess) { - ws.notify({ - type: NotificationType.CoinWithdrawn, - numTotal: wgContext.numPlanchets, - numWithdrawn: wgContext.planchetsFinished.size, - }); + ws.notify(notification); } } @@ -1150,8 +1151,6 @@ export async function processWithdrawalGroup( wgRecord: withdrawalGroup, }; - let work: Promise[] = []; - await ws.db .mktx((x) => [x.planchets]) .runReadOnly(async (tx) => { @@ -1165,44 +1164,35 @@ export async function processWithdrawalGroup( } }); + // We sequentially generate planchets, so that + // large withdrawal groups don't make the wallet unresponsive. for (let i = 0; i < numTotalCoins; i++) { - work.push(processPlanchetGenerate(ws, withdrawalGroup, i)); + await processPlanchetGenerate(ws, withdrawalGroup, i); } - // Generate coins concurrently (parallelism only happens in the crypto API workers) - await Promise.all(work); - - work = []; + const maxBatchSize = 100; - if (ws.batchWithdrawal) { - const resp = await processPlanchetExchangeBatchRequest(ws, wgContext); - if (!resp) { - throw Error("unable to do batch withdrawal"); - } - for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { + for (let i = 0; i < numTotalCoins; i += maxBatchSize) { + const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, { + batchSize: maxBatchSize, + coinStartIndex: i, + useBatchRequest: ws.batchWithdrawal, + }); + let work: Promise[] = []; + work = []; + for (let j = 0; j < resp.coinIdxs.length; j++) { work.push( processPlanchetVerifyAndStoreCoin( ws, wgContext, - coinIdx, - resp.ev_sigs[coinIdx], + resp.coinIdxs[j], + resp.batchResp.ev_sigs[j], ), ); } - } else { - for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { - const resp = await processPlanchetExchangeRequest(ws, wgContext, coinIdx); - if (!resp) { - continue; - } - work.push( - processPlanchetVerifyAndStoreCoin(ws, wgContext, coinIdx, resp), - ); - } + await Promise.all(work); } - await Promise.all(work); - let numFinished = 0; let numKycRequired = 0; let finishedForFirstTime = false; -- cgit v1.2.3