From da927b5e48453b5bddb56944f7073619f693f526 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 5 Jun 2023 18:38:17 +0200 Subject: wallet-core: handle Gone in peer-pull-debit --- packages/taler-wallet-core/src/db.ts | 2 + .../taler-wallet-core/src/internal-wallet-state.ts | 7 - .../taler-wallet-core/src/operations/exchanges.ts | 4 +- .../src/operations/pay-peer-pull-credit.ts | 6 +- .../src/operations/pay-peer-pull-debit.ts | 149 ++++++++++++++++++--- .../taler-wallet-core/src/operations/recoup.ts | 17 --- .../taler-wallet-core/src/operations/withdraw.ts | 6 +- packages/taler-wallet-core/src/wallet.ts | 30 ++--- 8 files changed, 150 insertions(+), 71 deletions(-) diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index d64d1fbc6..a5db49649 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -2052,6 +2052,8 @@ export interface PeerPullPaymentIncomingRecord { */ totalCostEstimated: AmountString; + abortRefreshGroupId?: string; + coinSel?: PeerPullPaymentCoinSelection; } diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts index 760f40d6c..8dc83c65a 100644 --- a/packages/taler-wallet-core/src/internal-wallet-state.ts +++ b/packages/taler-wallet-core/src/internal-wallet-state.ts @@ -134,13 +134,6 @@ export interface RecoupOperations { exchangeBaseUrl: string, coinPubs: string[], ): Promise; - processRecoupGroup( - ws: InternalWalletState, - recoupGroupId: string, - options?: { - forceNow?: boolean; - }, - ): Promise; } export type NotificationListener = (n: WalletNotification) => void; diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts index 142e0cf03..29d2451e6 100644 --- a/packages/taler-wallet-core/src/operations/exchanges.ts +++ b/packages/taler-wallet-core/src/operations/exchanges.ts @@ -863,9 +863,7 @@ export async function updateExchangeFromUrlHandler( if (recoupGroupId) { // Asynchronously start recoup. This doesn't need to finish // for the exchange update to be considered finished. - ws.recoupOps.processRecoupGroup(ws, recoupGroupId).catch((e) => { - logger.error("error while recouping coins:", e); - }); + ws.workAvailable.trigger(); } if (!updated) { diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts index e9c34cf73..a85df66d2 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts @@ -28,6 +28,7 @@ import { Logger, TalerErrorCode, TalerPreciseTimestamp, + TalerUriAction, TransactionAction, TransactionMajorState, TransactionMinorState, @@ -37,11 +38,11 @@ import { WalletKycUuid, codecForAny, codecForWalletKycUuid, - constructPayPullUri, encodeCrock, getRandomBytes, j2s, makeErrorDetail, + stringifyTalerUri, } from "@gnu-taler/taler-util"; import { readSuccessResponseJsonOrErrorCode, @@ -741,7 +742,8 @@ export async function initiatePeerPullPayment( }); return { - talerUri: constructPayPullUri({ + talerUri: stringifyTalerUri({ + type: TalerUriAction.PayPull, exchangeBaseUrl: exchangeBaseUrl, contractPriv: contractKeyPair.priv, }), diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts index 212d69eea..2be21c68d 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts @@ -17,8 +17,10 @@ import { AcceptPeerPullPaymentResponse, Amounts, + CoinRefreshRequest, ConfirmPeerPullDebitRequest, ExchangePurseDeposits, + HttpStatusCode, Logger, PeerContractTerms, PreparePeerPullDebitRequest, @@ -48,6 +50,8 @@ import { PeerPullDebitRecordStatus, PeerPullPaymentIncomingRecord, PendingTaskType, + RefreshOperationStatus, + createRefreshGroup, } from "../index.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { @@ -68,6 +72,7 @@ import { notifyTransition, stopLongpolling, } from "./transactions.js"; +import { checkLogicInvariant } from "../util/invariants.js"; const logger = new Logger("pay-peer-pull-debit.ts"); @@ -104,24 +109,89 @@ async function processPeerPullDebitPendingDeposit( logger.trace(`purse deposit payload: ${j2s(depositPayload)}`); } - const httpResp = await ws.http.postJson(purseDepositUrl.href, depositPayload); - const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny()); - logger.trace(`purse deposit response: ${j2s(resp)}`); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.PeerPullDebit, + peerPullPaymentIncomingId, + }); - await ws.db - .mktx((x) => [x.peerPullPaymentIncoming]) - .runReadWrite(async (tx) => { - const pi = await tx.peerPullPaymentIncoming.get( - peerPullPaymentIncomingId, - ); - if (!pi) { - throw Error("peer pull payment not found anymore"); - } - if (pi.status === PeerPullDebitRecordStatus.PendingDeposit) { + const httpResp = await ws.http.fetch(purseDepositUrl.href, { + method: "POST", + body: depositPayload, + }); + if (httpResp.status === HttpStatusCode.Gone) { + const transitionInfo = await ws.db + .mktx((x) => [ + x.peerPullPaymentIncoming, + x.refreshGroups, + x.denominations, + x.coinAvailability, + x.coins, + ]) + .runReadWrite(async (tx) => { + const pi = await tx.peerPullPaymentIncoming.get( + peerPullPaymentIncomingId, + ); + if (!pi) { + throw Error("peer pull payment not found anymore"); + } + if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) { + return; + } + const oldTxState = computePeerPullDebitTransactionState(pi); + + const currency = Amounts.currencyOf(pi.totalCostEstimated); + const coinPubs: CoinRefreshRequest[] = []; + + if (!pi.coinSel) { + throw Error("invalid db state"); + } + + for (let i = 0; i < pi.coinSel.coinPubs.length; i++) { + coinPubs.push({ + amount: pi.coinSel.contributions[i], + coinPub: pi.coinSel.coinPubs[i], + }); + } + + const refresh = await createRefreshGroup( + ws, + tx, + currency, + coinPubs, + RefreshReason.AbortPeerPushDebit, + ); + + pi.status = PeerPullDebitRecordStatus.AbortingRefresh; + pi.abortRefreshGroupId = refresh.refreshGroupId; + const newTxState = computePeerPullDebitTransactionState(pi); + await tx.peerPullPaymentIncoming.put(pi); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } else { + const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny()); + logger.trace(`purse deposit response: ${j2s(resp)}`); + + const transitionInfo = await ws.db + .mktx((x) => [x.peerPullPaymentIncoming]) + .runReadWrite(async (tx) => { + const pi = await tx.peerPullPaymentIncoming.get( + peerPullPaymentIncomingId, + ); + if (!pi) { + throw Error("peer pull payment not found anymore"); + } + if (pi.status !== PeerPullDebitRecordStatus.PendingDeposit) { + return; + } + const oldTxState = computePeerPullDebitTransactionState(pi); pi.status = PeerPullDebitRecordStatus.DonePaid; - } - await tx.peerPullPaymentIncoming.put(pi); - }); + const newTxState = computePeerPullDebitTransactionState(pi); + await tx.peerPullPaymentIncoming.put(pi); + return { oldTxState, newTxState }; + }); + notifyTransition(ws, transactionId, transitionInfo); + } return { type: OperationAttemptResultType.Finished, @@ -133,7 +203,50 @@ async function processPeerPullDebitAbortingRefresh( ws: InternalWalletState, peerPullInc: PeerPullPaymentIncomingRecord, ): Promise { - throw Error("not implemented"); + const peerPullPaymentIncomingId = peerPullInc.peerPullPaymentIncomingId; + const abortRefreshGroupId = peerPullInc.abortRefreshGroupId; + checkLogicInvariant(!!abortRefreshGroupId); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.PeerPullDebit, + peerPullPaymentIncomingId, + }); + const transitionInfo = await ws.db + .mktx((x) => [x.refreshGroups, x.peerPullPaymentIncoming]) + .runReadWrite(async (tx) => { + const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); + let newOpState: PeerPullDebitRecordStatus | undefined; + if (!refreshGroup) { + // Maybe it got manually deleted? Means that we should + // just go into failed. + logger.warn("no aborting refresh group found for deposit group"); + newOpState = PeerPullDebitRecordStatus.Failed; + } else { + if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { + newOpState = PeerPullDebitRecordStatus.Aborted; + } else if ( + refreshGroup.operationStatus === RefreshOperationStatus.Failed + ) { + newOpState = PeerPullDebitRecordStatus.Failed; + } + } + if (newOpState) { + const newDg = await tx.peerPullPaymentIncoming.get( + peerPullPaymentIncomingId, + ); + if (!newDg) { + return; + } + const oldTxState = computePeerPullDebitTransactionState(newDg); + newDg.status = newOpState; + const newTxState = computePeerPullDebitTransactionState(newDg); + await tx.peerPullPaymentIncoming.put(newDg); + return { oldTxState, newTxState }; + } + return undefined; + }); + notifyTransition(ws, transactionId, transitionInfo); + // FIXME: Shouldn't this be finished in some cases?! + return OperationAttemptResult.pendingEmpty(); } export async function processPeerPullDebit( @@ -158,7 +271,7 @@ export async function processPeerPullDebit( return { type: OperationAttemptResultType.Finished, result: undefined, - } + }; } export async function confirmPeerPullDebit( diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts index 1f36117ee..fcb7d6c98 100644 --- a/packages/taler-wallet-core/src/operations/recoup.ts +++ b/packages/taler-wallet-core/src/operations/recoup.ts @@ -304,24 +304,7 @@ async function recoupRefreshCoin( export async function processRecoupGroup( ws: InternalWalletState, recoupGroupId: string, - options: { - forceNow?: boolean; - } = {}, -): Promise { - await unwrapOperationHandlerResultOrThrow( - await processRecoupGroupHandler(ws, recoupGroupId, options), - ); - return; -} - -export async function processRecoupGroupHandler( - ws: InternalWalletState, - recoupGroupId: string, - options: { - forceNow?: boolean; - } = {}, ): Promise { - const forceNow = options.forceNow ?? false; let recoupGroup = await ws.db .mktx((x) => [x.recoupGroups]) .runReadOnly(async (tx) => { diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 4801a67ee..7db6dcd2a 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -1273,7 +1273,6 @@ export interface WithdrawalGroupContext { export async function processWithdrawalGroup( ws: InternalWalletState, withdrawalGroupId: string, - options: {} = {}, ): Promise { logger.trace("processing withdrawal group", withdrawalGroupId); const withdrawalGroup = await ws.db @@ -1303,9 +1302,8 @@ export async function processWithdrawalGroup( switch (withdrawalGroup.status) { case WithdrawalGroupStatus.PendingRegisteringBank: await processReserveBankStatus(ws, withdrawalGroupId); - return await processWithdrawalGroup(ws, withdrawalGroupId, { - forceNow: true, - }); + // FIXME: This will get called by the main task loop, why call it here?! + return await processWithdrawalGroup(ws, withdrawalGroupId); case WithdrawalGroupStatus.PendingQueryingStatus: { runLongpollAsync(ws, retryTag, (ct) => { return queryReserve(ws, withdrawalGroupId, ct); diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 89e1bf383..5277916de 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -219,9 +219,7 @@ import { } from "./operations/pay-peer-push-debit.js"; import { getPendingOperations } from "./operations/pending.js"; import { - createRecoupGroup, - processRecoupGroup, - processRecoupGroupHandler, + createRecoupGroup, processRecoupGroup, } from "./operations/recoup.js"; import { autoRefresh, @@ -295,27 +293,20 @@ const logger = new Logger("wallet.ts"); async function callOperationHandler( ws: InternalWalletState, pending: PendingTaskInfo, - forceNow = false, ): Promise { switch (pending.type) { case PendingTaskType.ExchangeUpdate: - return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl, { - forceNow, - }); + return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl); case PendingTaskType.Refresh: return await processRefreshGroup(ws, pending.refreshGroupId); case PendingTaskType.Withdraw: - return await processWithdrawalGroup(ws, pending.withdrawalGroupId, { - forceNow, - }); + return await processWithdrawalGroup(ws, pending.withdrawalGroupId); case PendingTaskType.TipPickup: return await processTip(ws, pending.tipId); case PendingTaskType.Purchase: return await processPurchase(ws, pending.proposalId); case PendingTaskType.Recoup: - return await processRecoupGroupHandler(ws, pending.recoupGroupId, { - forceNow, - }); + return await processRecoupGroup(ws, pending.recoupGroupId); case PendingTaskType.ExchangeCheckRefresh: return await autoRefresh(ws, pending.exchangeBaseUrl); case PendingTaskType.Deposit: { @@ -342,16 +333,15 @@ async function callOperationHandler( */ export async function runPending( ws: InternalWalletState, - forceNow = false, ): Promise { const pendingOpsResponse = await getPendingOperations(ws); for (const p of pendingOpsResponse.pendingOperations) { - if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) { + if (!AbsoluteTime.isExpired(p.timestampDue)) { continue; } await runOperationWithErrorReporting(ws, p.id, async () => { logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); - return await callOperationHandler(ws, p, forceNow); + return await callOperationHandler(ws, p); }); } } @@ -1168,7 +1158,8 @@ async function dispatchRequestInternal( return getContractTermsDetails(ws, req.proposalId); } case WalletApiOperation.RetryPendingNow: { - await runPending(ws, true); + // FIXME: Should we reset all operation retries here? + await runPending(ws); return {}; } case WalletApiOperation.PreparePayForUri: { @@ -1624,8 +1615,8 @@ export class Wallet { this.ws.stop(); } - runPending(forceNow = false): Promise { - return runPending(this.ws, forceNow); + runPending(): Promise { + return runPending(this.ws); } runTaskLoop(opts?: RetryLoopOpts): Promise { @@ -1673,7 +1664,6 @@ class InternalWalletStateImpl implements InternalWalletState { recoupOps: RecoupOperations = { createRecoupGroup, - processRecoupGroup, }; merchantOps: MerchantOperations = { -- cgit v1.2.3