From a49959d2c8bf82575c5d232217a33d91e7b008e8 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 20 Feb 2023 21:26:08 +0100 Subject: wallet-core: support long-polling for peer push credit --- .../taler-wallet-core/src/operations/pay-peer.ts | 182 ++++++++++++++++----- 1 file changed, 140 insertions(+), 42 deletions(-) (limited to 'packages/taler-wallet-core/src/operations/pay-peer.ts') diff --git a/packages/taler-wallet-core/src/operations/pay-peer.ts b/packages/taler-wallet-core/src/operations/pay-peer.ts index 4f65ec7ea..4dcc06076 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer.ts @@ -69,12 +69,17 @@ import { TransactionType, UnblindedSignature, WalletAccountMergeFlags, + codecOptional, + codecForTimestamp, + CancellationToken, } from "@gnu-taler/taler-util"; import { SpendCoinDetails } from "../crypto/cryptoImplementation.js"; import { DenominationRecord, OperationStatus, PeerPullPaymentIncomingStatus, + PeerPullPaymentInitiationRecord, + PeerPullPaymentInitiationStatus, PeerPushPaymentCoinSelection, PeerPushPaymentIncomingRecord, PeerPushPaymentIncomingStatus, @@ -86,12 +91,19 @@ import { import { TalerError } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { + LongpollResult, makeTransactionId, resetOperationTimeout, + runLongpollAsync, runOperationWithErrorReporting, spendCoins, + storeOperationPending, } from "../operations/common.js"; -import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; +import { + readSuccessResponseJsonOrErrorCode, + readSuccessResponseJsonOrThrow, + throwUnexpectedRequestError, +} from "@gnu-taler/taler-util/http"; import { checkDbInvariant } from "../util/invariants.js"; import { constructTaskIdentifier, @@ -622,11 +634,13 @@ export async function initiatePeerPushPayment( interface ExchangePurseStatus { balance: AmountString; + deposit_timestamp?: TalerProtocolTimestamp; } export const codecForExchangePurseStatus = (): Codec => buildCodecForObject() .property("balance", codecForAmountString()) + .property("deposit_timestamp", codecOptional(codecForTimestamp)) .build("ExchangePurseStatus"); export async function preparePeerPushCredit( @@ -1255,6 +1269,87 @@ export async function preparePeerPullCredit( }; } +export async function queryPurseForPeerPullCredit( + ws: InternalWalletState, + pullIni: PeerPullPaymentInitiationRecord, + cancellationToken: CancellationToken, +): Promise { + const purseDepositUrl = new URL( + `purses/${pullIni.pursePub}/merge`, + pullIni.exchangeBaseUrl, + ); + purseDepositUrl.searchParams.set("timeout_ms", "30000"); + logger.info(`querying purse status via ${purseDepositUrl.href}`); + const resp = await ws.http.get(purseDepositUrl.href, { + timeout: { d_ms: 60000 }, + cancellationToken, + }); + + logger.info(`purse status code: HTTP ${resp.status}`); + + const result = await readSuccessResponseJsonOrErrorCode( + resp, + codecForExchangePurseStatus(), + ); + + if (result.isError) { + logger.info(`got purse status error, EC=${result.talerErrorResponse.code}`); + if (resp.status === 404) { + return { ready: false }; + } else { + throwUnexpectedRequestError(resp, result.talerErrorResponse); + } + } + + if (!result.response.deposit_timestamp) { + logger.info("purse not ready yet (no deposit)"); + return { ready: false }; + } + + const reserve = await ws.db + .mktx((x) => [x.reserves]) + .runReadOnly(async (tx) => { + return await tx.reserves.get(pullIni.mergeReserveRowId); + }); + + if (!reserve) { + throw Error("reserve for peer pull credit not found in wallet DB"); + } + + await internalCreateWithdrawalGroup(ws, { + amount: Amounts.parseOrThrow(pullIni.amount), + wgInfo: { + withdrawalType: WithdrawalRecordType.PeerPullCredit, + contractTerms: pullIni.contractTerms, + contractPriv: pullIni.contractPriv, + }, + forcedWithdrawalGroupId: pullIni.withdrawalGroupId, + exchangeBaseUrl: pullIni.exchangeBaseUrl, + reserveStatus: WithdrawalGroupStatus.QueryingStatus, + reserveKeyPair: { + priv: reserve.reservePriv, + pub: reserve.reservePub, + }, + }); + + await ws.db + .mktx((x) => [x.peerPullPaymentInitiations]) + .runReadWrite(async (tx) => { + const finPi = await tx.peerPullPaymentInitiations.get(pullIni.pursePub); + if (!finPi) { + logger.warn("peerPullPaymentInitiation not found anymore"); + return; + } + if (finPi.status === PeerPullPaymentInitiationStatus.PurseCreated) { + finPi.status = PeerPullPaymentInitiationStatus.PurseDeposited; + } + await tx.peerPullPaymentInitiations.put(finPi); + }); + return { + ready: true, + }; +} + export async function processPeerPullCredit( ws: InternalWalletState, pursePub: string, @@ -1268,28 +1363,52 @@ export async function processPeerPullCredit( throw Error("peer pull payment initiation not found in database"); } - if (pullIni.status === OperationStatus.Finished) { - logger.warn( - "peer pull payment initiation is already finished, retrying withdrawal", - ); + const retryTag = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullInitiation, + pursePub, + }); - const withdrawalGroupId = pullIni.withdrawalGroupId; + switch (pullIni.status) { + case PeerPullPaymentInitiationStatus.PurseDeposited: { + // We implement this case so that the "retry" action on a peer-pull-credit transaction + // also retries the withdrawal task. - if (withdrawalGroupId) { - const taskId = constructTaskIdentifier({ - tag: PendingTaskType.Withdraw, - withdrawalGroupId, - }); - stopLongpolling(ws, taskId); - await resetOperationTimeout(ws, taskId); - await runOperationWithErrorReporting(ws, taskId, () => - processWithdrawalGroup(ws, withdrawalGroupId), + logger.warn( + "peer pull payment initiation is already finished, retrying withdrawal", ); + + const withdrawalGroupId = pullIni.withdrawalGroupId; + + if (withdrawalGroupId) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId, + }); + stopLongpolling(ws, taskId); + await resetOperationTimeout(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processWithdrawalGroup(ws, withdrawalGroupId), + ); + } + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } - return { - type: OperationAttemptResultType.Finished, - result: undefined, - }; + case PeerPullPaymentInitiationStatus.PurseCreated: + runLongpollAsync(ws, retryTag, async (cancellationToken) => + queryPurseForPeerPullCredit(ws, pullIni, cancellationToken), + ); + logger.trace( + "returning early from processPeerPullCredit for long-polling in background", + ); + return { + type: OperationAttemptResultType.Longpoll, + }; + case PeerPullPaymentInitiationStatus.Initial: + break; + default: + throw Error(`unknown PeerPullPaymentInitiationStatus ${pullIni.status}`); } const mergeReserve = await ws.db @@ -1370,7 +1489,7 @@ export async function processPeerPullCredit( if (!pi2) { return; } - pi2.status = OperationStatus.Finished; + pi2.status = PeerPullPaymentInitiationStatus.PurseCreated; await tx.peerPullPaymentInitiations.put(pi2); }); @@ -1518,7 +1637,7 @@ export async function initiatePeerPullPayment( pursePub: pursePair.pub, mergePriv: mergePair.priv, mergePub: mergePair.pub, - status: OperationStatus.Pending, + status: PeerPullPaymentInitiationStatus.Initial, contractTerms: contractTerms, mergeTimestamp, mergeReserveRowId: mergeReserveRowId, @@ -1545,27 +1664,6 @@ export async function initiatePeerPullPayment( return processPeerPullCredit(ws, pursePair.pub); }); - // FIXME: Why do we create this only here? - // What if the previous operation didn't succeed? - // We actually should create it once we know the - // money arrived (via long-polling). - - await internalCreateWithdrawalGroup(ws, { - amount: instructedAmount, - wgInfo: { - withdrawalType: WithdrawalRecordType.PeerPullCredit, - contractTerms, - contractPriv: contractKeyPair.priv, - }, - forcedWithdrawalGroupId: withdrawalGroupId, - exchangeBaseUrl: exchangeBaseUrl, - reserveStatus: WithdrawalGroupStatus.QueryingStatus, - reserveKeyPair: { - priv: mergeReserveInfo.reservePriv, - pub: mergeReserveInfo.reservePub, - }, - }); - return { talerUri: constructPayPullUri({ exchangeBaseUrl: exchangeBaseUrl, -- cgit v1.2.3