From a49959d2c8bf82575c5d232217a33d91e7b008e8 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 20 Feb 2023 21:26:08 +0100 Subject: wallet-core: support long-polling for peer push credit --- packages/taler-wallet-core/src/db.ts | 12 +- .../taler-wallet-core/src/operations/common.ts | 41 +++++ .../taler-wallet-core/src/operations/pay-peer.ts | 182 ++++++++++++++++----- .../taler-wallet-core/src/operations/pending.ts | 3 +- .../taler-wallet-core/src/operations/withdraw.ts | 56 +++---- 5 files changed, 216 insertions(+), 78 deletions(-) (limited to 'packages/taler-wallet-core/src') diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index cbf49c4ca..29e97cd90 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -1774,6 +1774,16 @@ export interface PeerPushPaymentInitiationRecord { status: PeerPushPaymentInitiationStatus; } +export enum PeerPullPaymentInitiationStatus { + Initial = 10 /* ACTIVE_START */, + /** + * Purse created, waiting for the other party to accept the + * invoice and deposit money into it. + */ + PurseCreated = 11 /* ACTIVE_START + 1 */, + PurseDeposited = 50 /* DORMANT_START */, +} + export interface PeerPullPaymentInitiationRecord { /** * What exchange are we using for the payment request? @@ -1817,7 +1827,7 @@ export interface PeerPullPaymentInitiationRecord { /** * Status of the peer pull payment initiation. */ - status: OperationStatus; + status: PeerPullPaymentInitiationStatus; withdrawalGroupId: string | undefined; } diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts index e5eda074c..3905eaf3e 100644 --- a/packages/taler-wallet-core/src/operations/common.ts +++ b/packages/taler-wallet-core/src/operations/common.ts @@ -21,11 +21,13 @@ import { AgeRestriction, AmountJson, Amounts, + CancellationToken, CoinRefreshRequest, CoinStatus, ExchangeEntryStatus, ExchangeListItem, ExchangeTosStatus, + getErrorDetailFromException, j2s, Logger, OperationErrorInfo, @@ -453,3 +455,42 @@ export function makeExchangeListItem( lastUpdateErrorInfo, }; } + +export interface LongpollResult { + ready: boolean; +} + +export function runLongpollAsync( + ws: InternalWalletState, + retryTag: string, + reqFn: (ct: CancellationToken) => Promise, +): void { + const asyncFn = async () => { + if (ws.stopped) { + logger.trace("not long-polling reserve, wallet already stopped"); + await storeOperationPending(ws, retryTag); + return; + } + const cts = CancellationToken.create(); + let res: { ready: boolean } | undefined = undefined; + try { + ws.activeLongpoll[retryTag] = { + cancel: () => { + logger.trace("cancel of reserve longpoll requested"); + cts.cancel(); + }, + }; + res = await reqFn(cts.token); + } catch (e) { + await storeOperationError(ws, retryTag, getErrorDetailFromException(e)); + return; + } finally { + delete ws.activeLongpoll[retryTag]; + } + if (!res.ready) { + await storeOperationPending(ws, retryTag); + } + ws.latch.trigger(); + }; + asyncFn(); +} diff --git a/packages/taler-wallet-core/src/operations/pay-peer.ts b/packages/taler-wallet-core/src/operations/pay-peer.ts index 4f65ec7ea..4dcc06076 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer.ts @@ -69,12 +69,17 @@ import { TransactionType, UnblindedSignature, WalletAccountMergeFlags, + codecOptional, + codecForTimestamp, + CancellationToken, } from "@gnu-taler/taler-util"; import { SpendCoinDetails } from "../crypto/cryptoImplementation.js"; import { DenominationRecord, OperationStatus, PeerPullPaymentIncomingStatus, + PeerPullPaymentInitiationRecord, + PeerPullPaymentInitiationStatus, PeerPushPaymentCoinSelection, PeerPushPaymentIncomingRecord, PeerPushPaymentIncomingStatus, @@ -86,12 +91,19 @@ import { import { TalerError } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { + LongpollResult, makeTransactionId, resetOperationTimeout, + runLongpollAsync, runOperationWithErrorReporting, spendCoins, + storeOperationPending, } from "../operations/common.js"; -import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; +import { + readSuccessResponseJsonOrErrorCode, + readSuccessResponseJsonOrThrow, + throwUnexpectedRequestError, +} from "@gnu-taler/taler-util/http"; import { checkDbInvariant } from "../util/invariants.js"; import { constructTaskIdentifier, @@ -622,11 +634,13 @@ export async function initiatePeerPushPayment( interface ExchangePurseStatus { balance: AmountString; + deposit_timestamp?: TalerProtocolTimestamp; } export const codecForExchangePurseStatus = (): Codec => buildCodecForObject() .property("balance", codecForAmountString()) + .property("deposit_timestamp", codecOptional(codecForTimestamp)) .build("ExchangePurseStatus"); export async function preparePeerPushCredit( @@ -1255,6 +1269,87 @@ export async function preparePeerPullCredit( }; } +export async function queryPurseForPeerPullCredit( + ws: InternalWalletState, + pullIni: PeerPullPaymentInitiationRecord, + cancellationToken: CancellationToken, +): Promise { + const purseDepositUrl = new URL( + `purses/${pullIni.pursePub}/merge`, + pullIni.exchangeBaseUrl, + ); + purseDepositUrl.searchParams.set("timeout_ms", "30000"); + logger.info(`querying purse status via ${purseDepositUrl.href}`); + const resp = await ws.http.get(purseDepositUrl.href, { + timeout: { d_ms: 60000 }, + cancellationToken, + }); + + logger.info(`purse status code: HTTP ${resp.status}`); + + const result = await readSuccessResponseJsonOrErrorCode( + resp, + codecForExchangePurseStatus(), + ); + + if (result.isError) { + logger.info(`got purse status error, EC=${result.talerErrorResponse.code}`); + if (resp.status === 404) { + return { ready: false }; + } else { + throwUnexpectedRequestError(resp, result.talerErrorResponse); + } + } + + if (!result.response.deposit_timestamp) { + logger.info("purse not ready yet (no deposit)"); + return { ready: false }; + } + + const reserve = await ws.db + .mktx((x) => [x.reserves]) + .runReadOnly(async (tx) => { + return await tx.reserves.get(pullIni.mergeReserveRowId); + }); + + if (!reserve) { + throw Error("reserve for peer pull credit not found in wallet DB"); + } + + await internalCreateWithdrawalGroup(ws, { + amount: Amounts.parseOrThrow(pullIni.amount), + wgInfo: { + withdrawalType: WithdrawalRecordType.PeerPullCredit, + contractTerms: pullIni.contractTerms, + contractPriv: pullIni.contractPriv, + }, + forcedWithdrawalGroupId: pullIni.withdrawalGroupId, + exchangeBaseUrl: pullIni.exchangeBaseUrl, + reserveStatus: WithdrawalGroupStatus.QueryingStatus, + reserveKeyPair: { + priv: reserve.reservePriv, + pub: reserve.reservePub, + }, + }); + + await ws.db + .mktx((x) => [x.peerPullPaymentInitiations]) + .runReadWrite(async (tx) => { + const finPi = await tx.peerPullPaymentInitiations.get(pullIni.pursePub); + if (!finPi) { + logger.warn("peerPullPaymentInitiation not found anymore"); + return; + } + if (finPi.status === PeerPullPaymentInitiationStatus.PurseCreated) { + finPi.status = PeerPullPaymentInitiationStatus.PurseDeposited; + } + await tx.peerPullPaymentInitiations.put(finPi); + }); + return { + ready: true, + }; +} + export async function processPeerPullCredit( ws: InternalWalletState, pursePub: string, @@ -1268,28 +1363,52 @@ export async function processPeerPullCredit( throw Error("peer pull payment initiation not found in database"); } - if (pullIni.status === OperationStatus.Finished) { - logger.warn( - "peer pull payment initiation is already finished, retrying withdrawal", - ); + const retryTag = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullInitiation, + pursePub, + }); - const withdrawalGroupId = pullIni.withdrawalGroupId; + switch (pullIni.status) { + case PeerPullPaymentInitiationStatus.PurseDeposited: { + // We implement this case so that the "retry" action on a peer-pull-credit transaction + // also retries the withdrawal task. - if (withdrawalGroupId) { - const taskId = constructTaskIdentifier({ - tag: PendingTaskType.Withdraw, - withdrawalGroupId, - }); - stopLongpolling(ws, taskId); - await resetOperationTimeout(ws, taskId); - await runOperationWithErrorReporting(ws, taskId, () => - processWithdrawalGroup(ws, withdrawalGroupId), + logger.warn( + "peer pull payment initiation is already finished, retrying withdrawal", ); + + const withdrawalGroupId = pullIni.withdrawalGroupId; + + if (withdrawalGroupId) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId, + }); + stopLongpolling(ws, taskId); + await resetOperationTimeout(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processWithdrawalGroup(ws, withdrawalGroupId), + ); + } + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } - return { - type: OperationAttemptResultType.Finished, - result: undefined, - }; + case PeerPullPaymentInitiationStatus.PurseCreated: + runLongpollAsync(ws, retryTag, async (cancellationToken) => + queryPurseForPeerPullCredit(ws, pullIni, cancellationToken), + ); + logger.trace( + "returning early from processPeerPullCredit for long-polling in background", + ); + return { + type: OperationAttemptResultType.Longpoll, + }; + case PeerPullPaymentInitiationStatus.Initial: + break; + default: + throw Error(`unknown PeerPullPaymentInitiationStatus ${pullIni.status}`); } const mergeReserve = await ws.db @@ -1370,7 +1489,7 @@ export async function processPeerPullCredit( if (!pi2) { return; } - pi2.status = OperationStatus.Finished; + pi2.status = PeerPullPaymentInitiationStatus.PurseCreated; await tx.peerPullPaymentInitiations.put(pi2); }); @@ -1518,7 +1637,7 @@ export async function initiatePeerPullPayment( pursePub: pursePair.pub, mergePriv: mergePair.priv, mergePub: mergePair.pub, - status: OperationStatus.Pending, + status: PeerPullPaymentInitiationStatus.Initial, contractTerms: contractTerms, mergeTimestamp, mergeReserveRowId: mergeReserveRowId, @@ -1545,27 +1664,6 @@ export async function initiatePeerPullPayment( return processPeerPullCredit(ws, pursePair.pub); }); - // FIXME: Why do we create this only here? - // What if the previous operation didn't succeed? - // We actually should create it once we know the - // money arrived (via long-polling). - - await internalCreateWithdrawalGroup(ws, { - amount: instructedAmount, - wgInfo: { - withdrawalType: WithdrawalRecordType.PeerPullCredit, - contractTerms, - contractPriv: contractKeyPair.priv, - }, - forcedWithdrawalGroupId: withdrawalGroupId, - exchangeBaseUrl: exchangeBaseUrl, - reserveStatus: WithdrawalGroupStatus.QueryingStatus, - reserveKeyPair: { - priv: mergeReserveInfo.reservePriv, - pub: mergeReserveInfo.reservePub, - }, - }); - return { talerUri: constructPayPullUri({ exchangeBaseUrl: exchangeBaseUrl, diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index 2e3a5c9dc..458448b31 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -31,6 +31,7 @@ import { PeerPushPaymentInitiationStatus, PeerPullPaymentIncomingStatus, PeerPushPaymentIncomingStatus, + PeerPullPaymentInitiationStatus, } from "../db.js"; import { PendingOperationsResponse, @@ -363,7 +364,7 @@ async function gatherPeerPullInitiationPending( resp: PendingOperationsResponse, ): Promise { await tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => { - if (pi.status === OperationStatus.Finished) { + if (pi.status === PeerPullPaymentInitiationStatus.PurseDeposited) { return; } const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi); diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 5729b8458..aba2948cd 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -90,6 +90,7 @@ import { InternalWalletState } from "../internal-wallet-state.js"; import { makeCoinAvailable, makeExchangeListItem, + runLongpollAsync, runOperationWithErrorReporting, } from "../operations/common.js"; import { walletCoreDebugFlags } from "../util/debugFlags.js"; @@ -1022,8 +1023,7 @@ export interface WithdrawalGroupContext { export async function processWithdrawalGroup( ws: InternalWalletState, withdrawalGroupId: string, - options: { - } = {}, + options: {} = {}, ): Promise { logger.trace("processing withdrawal group", withdrawalGroupId); const withdrawalGroup = await ws.db @@ -1053,38 +1053,9 @@ export async function processWithdrawalGroup( forceNow: true, }); case WithdrawalGroupStatus.QueryingStatus: { - const doQueryAsync = async () => { - if (ws.stopped) { - logger.trace("not long-polling reserve, wallet already stopped"); - await storeOperationPending(ws, retryTag); - return; - } - const cts = CancellationToken.create(); - let res: { ready: boolean } | undefined = undefined; - try { - ws.activeLongpoll[retryTag] = { - cancel: () => { - logger.trace("cancel of reserve longpoll requested"); - cts.cancel(); - }, - }; - res = await queryReserve(ws, withdrawalGroupId, cts.token); - } catch (e) { - await storeOperationError( - ws, - retryTag, - getErrorDetailFromException(e), - ); - return; - } finally { - delete ws.activeLongpoll[retryTag]; - } - if (!res.ready) { - await storeOperationPending(ws, retryTag); - } - ws.latch.trigger(); - }; - doQueryAsync(); + runLongpollAsync(ws, retryTag, (ct) => { + return queryReserve(ws, withdrawalGroupId, ct); + }); logger.trace( "returning early from withdrawal for long-polling in background", ); @@ -1832,6 +1803,14 @@ async function processReserveBankStatus( } } +/** + * Create a withdrawal group. + * + * If a forcedWithdrawalGroupId is given and a + * withdrawal group with this ID already exists, + * the existing one is returned. No conflict checking + * of the other arguments is done in that case. + */ export async function internalCreateWithdrawalGroup( ws: InternalWalletState, args: { @@ -1856,6 +1835,15 @@ export async function internalCreateWithdrawalGroup( if (args.forcedWithdrawalGroupId) { withdrawalGroupId = args.forcedWithdrawalGroupId; + const wgId = withdrawalGroupId; + const existingWg = await ws.db + .mktx((x) => [x.withdrawalGroups]) + .runReadOnly(async (tx) => { + return tx.withdrawalGroups.get(wgId); + }); + if (existingWg) { + return existingWg; + } } else { withdrawalGroupId = encodeCrock(getRandomBytes(32)); } -- cgit v1.2.3