diff options
Diffstat (limited to 'packages/taler-wallet-core/src/pay-peer-pull-debit.ts')
-rw-r--r-- | packages/taler-wallet-core/src/pay-peer-pull-debit.ts | 120 |
1 files changed, 69 insertions, 51 deletions
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts index 705317eb6..92eb44a87 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts @@ -33,6 +33,7 @@ import { HttpStatusCode, Logger, NotificationType, + ObservabilityEventType, PeerContractTerms, PreparePeerPullDebitRequest, PreparePeerPullDebitResponse, @@ -425,6 +426,11 @@ async function processPeerPullDebitPendingDeposit( wex: WalletExecutionContext, peerPullInc: PeerPullPaymentIncomingRecord, ): Promise<TaskRunResult> { + const ctx = new PeerPullDebitTransactionContext( + wex, + peerPullInc.peerPullDebitId, + ); + const pursePub = peerPullInc.pursePub; const coinSel = peerPullInc.coinSel; @@ -512,70 +518,82 @@ async function processPeerPullDebitPendingDeposit( } } - const coins = await queryCoinInfosForSelection(wex, coinSel); - - const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ - exchangeBaseUrl: peerPullInc.exchangeBaseUrl, - pursePub: peerPullInc.pursePub, - coins, - }); - const purseDepositUrl = new URL( `purses/${pursePub}/deposit`, peerPullInc.exchangeBaseUrl, ); - const depositPayload: ExchangePurseDeposits = { - deposits: depositSigsResp.deposits, - }; + // FIXME: We could skip batches that we've already submitted. - if (logger.shouldLogTrace()) { - logger.trace(`purse deposit payload: ${j2s(depositPayload)}`); - } + const coins = await queryCoinInfosForSelection(wex, coinSel); - const httpResp = await wex.http.fetch(purseDepositUrl.href, { - method: "POST", - body: depositPayload, - cancellationToken: wex.cancellationToken, - }); + const maxBatchSize = 100; - const ctx = new PeerPullDebitTransactionContext( - wex, - peerPullInc.peerPullDebitId, - ); + for (let i = 0; i < coins.length; i += maxBatchSize) { + const batchSize = Math.min(maxBatchSize, coins.length - i); - switch (httpResp.status) { - case HttpStatusCode.Ok: { - const resp = await readSuccessResponseJsonOrThrow( - httpResp, - codecForAny(), - ); - logger.trace(`purse deposit response: ${j2s(resp)}`); + wex.oc.observe({ + type: ObservabilityEventType.Message, + contents: `Depositing batch at ${i}/${coins.length} of size ${batchSize}`, + }); - await ctx.transition(async (r) => { - if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) { - return TransitionResultType.Stay; - } - r.status = PeerPullDebitRecordStatus.Done; - return TransitionResultType.Transition; - }); - return TaskRunResult.finished(); - } - case HttpStatusCode.Gone: { - await ctx.abortTransaction(); - return TaskRunResult.backoff(); - } - case HttpStatusCode.Conflict: { - return handlePurseCreationConflict(ctx, peerPullInc, httpResp); + const batchCoins = coins.slice(i, i + batchSize); + const depositSigsResp = await wex.cryptoApi.signPurseDeposits({ + exchangeBaseUrl: peerPullInc.exchangeBaseUrl, + pursePub: peerPullInc.pursePub, + coins: batchCoins, + }); + + const depositPayload: ExchangePurseDeposits = { + deposits: depositSigsResp.deposits, + }; + + if (logger.shouldLogTrace()) { + logger.trace(`purse deposit payload: ${j2s(depositPayload)}`); } - default: { - const errResp = await readTalerErrorResponse(httpResp); - return { - type: TaskRunResultType.Error, - errorDetail: errResp, - }; + + const httpResp = await wex.http.fetch(purseDepositUrl.href, { + method: "POST", + body: depositPayload, + cancellationToken: wex.cancellationToken, + }); + + switch (httpResp.status) { + case HttpStatusCode.Ok: { + const resp = await readSuccessResponseJsonOrThrow( + httpResp, + codecForAny(), + ); + logger.trace(`purse deposit response: ${j2s(resp)}`); + continue; + } + case HttpStatusCode.Gone: { + await ctx.abortTransaction(); + return TaskRunResult.backoff(); + } + case HttpStatusCode.Conflict: { + return handlePurseCreationConflict(ctx, peerPullInc, httpResp); + } + default: { + const errResp = await readTalerErrorResponse(httpResp); + return { + type: TaskRunResultType.Error, + errorDetail: errResp, + }; + } } } + + // All batches succeeded, we can transition! + + await ctx.transition(async (r) => { + if (r.status !== PeerPullDebitRecordStatus.PendingDeposit) { + return TransitionResultType.Stay; + } + r.status = PeerPullDebitRecordStatus.Done; + return TransitionResultType.Transition; + }); + return TaskRunResult.finished(); } async function processPeerPullDebitAbortingRefresh( |