diff options
author | Florian Dold <florian@dold.me> | 2023-02-20 20:14:37 +0100 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2023-02-20 20:14:46 +0100 |
commit | 3daa4dbb3fc5199fb05d58b40c0d7c9ee287595e (patch) | |
tree | bdf3bcd9af71ce9cc77a8c4dc32ff0537654e83f | |
parent | 7bb81a008b7148cfd3fd656f858e4cbd755531ac (diff) |
wallet-core: fix retryTransaction, improve tx/op identifier parsing/construction
14 files changed, 334 insertions, 130 deletions
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index a23ba0f76..cbf49c4ca 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -69,7 +69,7 @@ import { StoreDescriptor, StoreWithIndexes, } from "./util/query.js"; -import { RetryInfo, RetryTags } from "./util/retries.js"; +import { RetryInfo, TaskIdentifiers } from "./util/retries.js"; /** * This file contains the database schema of the Taler wallet together @@ -1945,7 +1945,7 @@ export interface OperationRetryRecord { * Unique identifier for the operation. Typically of * the format `${opType}-${opUniqueKey}` * - * @see {@link RetryTags} + * @see {@link TaskIdentifiers} */ id: string; diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts index 3dae26087..59e99b505 100644 --- a/packages/taler-wallet-core/src/operations/backup/index.ts +++ b/packages/taler-wallet-core/src/operations/backup/index.ts @@ -99,7 +99,7 @@ import { import { OperationAttemptResult, OperationAttemptResultType, - RetryTags, + TaskIdentifiers, scheduleRetryInTx, } from "../../util/retries.js"; import { addAttentionRequest, removeAttentionRequest } from "../attention.js"; @@ -379,7 +379,7 @@ async function runBackupCycleForProvider( logger.warn("backup provider not found anymore"); return; } - const opId = RetryTags.forBackup(prov); + const opId = TaskIdentifiers.forBackup(prov); await scheduleRetryInTx(ws, tx, opId); prov.shouldRetryFreshProposal = true; prov.state = { @@ -405,7 +405,7 @@ async function runBackupCycleForProvider( logger.warn("backup provider not found anymore"); return; } - const opId = RetryTags.forBackup(prov); + const opId = TaskIdentifiers.forBackup(prov); await scheduleRetryInTx(ws, tx, opId); prov.currentPaymentProposalId = result.proposalId; prov.shouldRetryFreshProposal = false; @@ -479,7 +479,7 @@ async function runBackupCycleForProvider( prov.lastBackupHash = encodeCrock(hash(backupEnc)); // FIXME: Allocate error code for this situation? // FIXME: Add operation retry record! - const opId = RetryTags.forBackup(prov); + const opId = TaskIdentifiers.forBackup(prov); await scheduleRetryInTx(ws, tx, opId); prov.state = { tag: BackupProviderStateTag.Retrying, @@ -920,7 +920,7 @@ export async function getBackupInfo( .mktx((x) => [x.backupProviders, x.operationRetries]) .runReadOnly(async (tx) => { return await tx.backupProviders.iter().mapAsync(async (bp) => { - const opId = RetryTags.forBackup(bp); + const opId = TaskIdentifiers.forBackup(bp); const retryRecord = await tx.operationRetries.get(opId); return { provider: bp, diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts index 35e6455bc..e5eda074c 100644 --- a/packages/taler-wallet-core/src/operations/common.ts +++ b/packages/taler-wallet-core/src/operations/common.ts @@ -218,6 +218,23 @@ export async function storeOperationError( }); } +export async function resetOperationTimeout( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<void> { + await ws.db + .mktx((x) => [x.operationRetries]) + .runReadWrite(async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + if (retryRecord) { + // Note that we don't reset the lastError, it should still be visible + // while the retry runs. + retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); + await tx.operationRetries.put(retryRecord); + } + }); +} + export async function storeOperationPending( ws: InternalWalletState, pendingTaskId: string, diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts index 1e696a1d6..22283b7a8 100644 --- a/packages/taler-wallet-core/src/operations/deposits.ts +++ b/packages/taler-wallet-core/src/operations/deposits.ts @@ -122,7 +122,6 @@ export async function processDepositGroup( ws: InternalWalletState, depositGroupId: string, options: { - forceNow?: boolean; cancellationToken?: CancellationToken; } = {}, ): Promise<OperationAttemptResult> { diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts index 08d30eac6..457344e06 100644 --- a/packages/taler-wallet-core/src/operations/exchanges.ts +++ b/packages/taler-wallet-core/src/operations/exchanges.ts @@ -73,7 +73,7 @@ import { import { OperationAttemptResult, OperationAttemptResultType, - RetryTags, + TaskIdentifiers, unwrapOperationHandlerResultOrThrow, } from "../util/retries.js"; import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js"; @@ -552,7 +552,7 @@ export async function updateExchangeFromUrl( return unwrapOperationHandlerResultOrThrow( await runOperationWithErrorReporting( ws, - RetryTags.forExchangeUpdateFromUrl(canonUrl), + TaskIdentifiers.forExchangeUpdateFromUrl(canonUrl), () => updateExchangeFromUrlHandler(ws, canonUrl, options), ), ); diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts index 19eb40f3a..25153f9fb 100644 --- a/packages/taler-wallet-core/src/operations/pay-merchant.ts +++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts @@ -95,7 +95,7 @@ import { TalerError, TalerProtocolViolationError, } from "@gnu-taler/taler-util"; -import { GetReadWriteAccess } from "../index.js"; +import { GetReadWriteAccess, PendingTaskType } from "../index.js"; import { EXCHANGE_COINS_LOCK, InternalWalletState, @@ -119,8 +119,9 @@ import { OperationAttemptResult, OperationAttemptResultType, RetryInfo, - RetryTags, + TaskIdentifiers, scheduleRetry, + constructTaskIdentifier, } from "../util/retries.js"; import { makeTransactionId, @@ -360,7 +361,7 @@ export async function processDownloadProposal( requestBody.token = proposal.claimToken; } - const opId = RetryTags.forPay(proposal); + const opId = TaskIdentifiers.forPay(proposal); const retryRecord = await ws.db .mktx((x) => [x.operationRetries]) .runReadOnly(async (tx) => { @@ -1598,8 +1599,11 @@ export async function runPayForConfirmPay( proposalId: string, ): Promise<ConfirmPayResult> { logger.trace("processing proposal for confirmPay"); - const opId = RetryTags.byPaymentProposalId(proposalId); - const res = await runOperationWithErrorReporting(ws, opId, async () => { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Purchase, + proposalId, + }); + const res = await runOperationWithErrorReporting(ws, taskId, async () => { return await processPurchasePay(ws, proposalId, { forceNow: true }); }); logger.trace(`processPurchasePay response type ${res.type}`); @@ -1624,9 +1628,7 @@ export async function runPayForConfirmPay( // We hide transient errors from the caller. const opRetry = await ws.db .mktx((x) => [x.operationRetries]) - .runReadOnly(async (tx) => - tx.operationRetries.get(RetryTags.byPaymentProposalId(proposalId)), - ); + .runReadOnly(async (tx) => tx.operationRetries.get(taskId)); return { type: ConfirmPayResultType.Pending, lastError: opRetry?.lastError, @@ -1792,9 +1794,7 @@ export async function confirmPay( export async function processPurchase( ws: InternalWalletState, proposalId: string, - options: { - forceNow?: boolean; - } = {}, + options: Record<any, never> = {}, ): Promise<OperationAttemptResult> { const purchase = await ws.db .mktx((x) => [x.purchases]) @@ -1843,9 +1843,7 @@ export async function processPurchase( export async function processPurchasePay( ws: InternalWalletState, proposalId: string, - options: { - forceNow?: boolean; - } = {}, + options: unknown = {}, ): Promise<OperationAttemptResult> { const purchase = await ws.db .mktx((x) => [x.purchases]) @@ -1935,7 +1933,7 @@ export async function processPurchasePay( handleInsufficientFunds(ws, proposalId, err).catch(async (e) => { console.log("handling insufficient funds failed"); - await scheduleRetry(ws, RetryTags.forPay(purchase), { + await scheduleRetry(ws, TaskIdentifiers.forPay(purchase), { code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, when: AbsoluteTime.now(), message: "unexpected exception", @@ -2830,7 +2828,10 @@ export async function abortPay( proposalId: string, cancelImmediately?: boolean, ): Promise<void> { - const opId = RetryTags.byPaymentProposalId(proposalId); + const opId = constructTaskIdentifier({ + tag: PendingTaskType.Purchase, + proposalId, + }); await ws.db .mktx((x) => [ x.purchases, diff --git a/packages/taler-wallet-core/src/operations/pay-peer.ts b/packages/taler-wallet-core/src/operations/pay-peer.ts index ff01342f8..4f65ec7ea 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer.ts @@ -87,15 +87,17 @@ import { TalerError } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { makeTransactionId, + resetOperationTimeout, runOperationWithErrorReporting, spendCoins, } from "../operations/common.js"; import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { checkDbInvariant } from "../util/invariants.js"; import { + constructTaskIdentifier, OperationAttemptResult, OperationAttemptResultType, - RetryTags, + TaskIdentifiers, } from "../util/retries.js"; import { getPeerPaymentBalanceDetailsInTx } from "./balance.js"; import { updateExchangeFromUrl } from "./exchanges.js"; @@ -103,7 +105,10 @@ import { getTotalRefreshCost } from "./refresh.js"; import { getExchangeWithdrawalInfo, internalCreateWithdrawalGroup, + processWithdrawalGroup, } from "./withdraw.js"; +import { PendingTaskType } from "../pending-types.js"; +import { stopLongpolling } from "./transactions.js"; const logger = new Logger("operations/peer-to-peer.ts"); @@ -590,13 +595,14 @@ export async function initiatePeerPushPayment( }); }); - await runOperationWithErrorReporting( - ws, - RetryTags.byPeerPushPaymentInitiationPursePub(pursePair.pub), - async () => { - return await processPeerPushInitiation(ws, pursePair.pub); - }, - ); + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPushInitiation, + pursePub: pursePair.pub, + }); + + await runOperationWithErrorReporting(ws, taskId, async () => { + return await processPeerPushInitiation(ws, pursePair.pub); + }); return { contractPriv: contractKeyPair.priv, @@ -951,7 +957,7 @@ export async function confirmPeerPushPayment( await updateExchangeFromUrl(ws, peerInc.exchangeBaseUrl); - const retryTag = RetryTags.forPeerPushCredit(peerInc); + const retryTag = TaskIdentifiers.forPeerPushCredit(peerInc); await runOperationWithErrorReporting(ws, retryTag, () => processPeerPushCredit(ws, req.peerPushPaymentIncomingId), @@ -1113,7 +1119,7 @@ export async function acceptIncomingPeerPullPayment( await runOperationWithErrorReporting( ws, - RetryTags.forPeerPullPaymentDebit(ppi), + TaskIdentifiers.forPeerPullPaymentDebit(ppi), async () => { return processPeerPullDebit(ws, ppi.peerPullPaymentIncomingId); }, @@ -1263,7 +1269,23 @@ export async function processPeerPullCredit( } if (pullIni.status === OperationStatus.Finished) { - logger.warn("peer pull payment initiation is already finished"); + logger.warn( + "peer pull payment initiation is already finished, retrying withdrawal", + ); + + const withdrawalGroupId = pullIni.withdrawalGroupId; + + if (withdrawalGroupId) { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId, + }); + stopLongpolling(ws, taskId); + await resetOperationTimeout(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processWithdrawalGroup(ws, withdrawalGroupId), + ); + } return { type: OperationAttemptResultType.Finished, result: undefined, @@ -1514,19 +1536,19 @@ export async function initiatePeerPullPayment( // whether purse creation has failed, or does the client/ // check this asynchronously from the transaction status? - await runOperationWithErrorReporting( - ws, - RetryTags.byPeerPullPaymentInitiationPursePub(pursePair.pub), - async () => { - return processPeerPullCredit(ws, pursePair.pub); - }, - ); + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullInitiation, + pursePub: pursePair.pub, + }); + + await runOperationWithErrorReporting(ws, taskId, async () => { + return processPeerPullCredit(ws, pursePair.pub); + }); // FIXME: Why do we create this only here? // What if the previous operation didn't succeed? - - // FIXME: Use a pre-computed withdrawal group ID - // so we don't create it multiple times. + // We actually should create it once we know the + // money arrived (via long-polling). await internalCreateWithdrawalGroup(ws, { amount: instructedAmount, diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index 240c7ff65..2e3a5c9dc 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -39,7 +39,7 @@ import { import { AbsoluteTime } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { GetReadOnlyAccess } from "../util/query.js"; -import { RetryTags } from "../util/retries.js"; +import { TaskIdentifiers } from "../util/retries.js"; import { GlobalIDB } from "@gnu-taler/idb-bridge"; function getPendingCommon( @@ -74,7 +74,7 @@ async function gatherExchangePending( ): Promise<void> { // FIXME: We should do a range query here based on the update time. await tx.exchanges.iter().forEachAsync(async (exch) => { - const opTag = RetryTags.forExchangeUpdate(exch); + const opTag = TaskIdentifiers.forExchangeUpdate(exch); let opr = await tx.operationRetries.get(opTag); const timestampDue = opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate); @@ -120,7 +120,7 @@ async function gatherRefreshPending( if (r.timestampFinished) { return; } - const opId = RetryTags.forRefresh(r); + const opId = TaskIdentifiers.forRefresh(r); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); @@ -158,7 +158,7 @@ async function gatherWithdrawalPending( if (wsr.timestampFinish) { return; } - const opTag = RetryTags.forWithdrawal(wsr); + const opTag = TaskIdentifiers.forWithdrawal(wsr); let opr = await tx.operationRetries.get(opTag); const now = AbsoluteTime.now(); if (!opr) { @@ -208,7 +208,7 @@ async function gatherDepositPending( deposited = false; } } - const opId = RetryTags.forDeposit(dg); + const opId = TaskIdentifiers.forDeposit(dg); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ @@ -239,7 +239,7 @@ async function gatherTipPending( if (tip.pickedUpTimestamp) { return; } - const opId = RetryTags.forTipPickup(tip); + const opId = TaskIdentifiers.forTipPickup(tip); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); if (tip.acceptedTimestamp) { @@ -272,7 +272,7 @@ async function gatherPurchasePending( await tx.purchases.indexes.byStatus .iter(keyRange) .forEachAsync(async (pr) => { - const opId = RetryTags.forPay(pr); + const opId = TaskIdentifiers.forPay(pr); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); @@ -301,7 +301,7 @@ async function gatherRecoupPending( if (rg.timestampFinished) { return; } - const opId = RetryTags.forRecoup(rg); + const opId = TaskIdentifiers.forRecoup(rg); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ @@ -325,7 +325,7 @@ async function gatherBackupPending( resp: PendingOperationsResponse, ): Promise<void> { await tx.backupProviders.iter().forEachAsync(async (bp) => { - const opId = RetryTags.forBackup(bp); + const opId = TaskIdentifiers.forBackup(bp); const retryRecord = await tx.operationRetries.get(opId); if (bp.state.tag === BackupProviderStateTag.Ready) { const timestampDue = AbsoluteTime.fromTimestamp( @@ -366,7 +366,7 @@ async function gatherPeerPullInitiationPending( if (pi.status === OperationStatus.Finished) { return; } - const opId = RetryTags.forPeerPullPaymentInitiation(pi); + const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ @@ -392,7 +392,7 @@ async function gatherPeerPullDebitPending( if (pi.status === PeerPullPaymentIncomingStatus.Paid) { return; } - const opId = RetryTags.forPeerPullPaymentDebit(pi); + const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ @@ -418,7 +418,7 @@ async function gatherPeerPushInitiationPending( if (pi.status === PeerPushPaymentInitiationStatus.PurseCreated) { return; } - const opId = RetryTags.forPeerPushPaymentInitiation(pi); + const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ @@ -447,7 +447,7 @@ async function gatherPeerPushCreditPending( case PeerPushPaymentIncomingStatus.WithdrawalCreated: return; } - const opId = RetryTags.forPeerPushCredit(pi); + const opId = TaskIdentifiers.forPeerPushCredit(pi); const retryRecord = await tx.operationRetries.get(opId); const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 773689635..2d406ec7d 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -734,9 +734,7 @@ async function refreshReveal( export async function processRefreshGroup( ws: InternalWalletState, refreshGroupId: string, - options: { - forceNow?: boolean; - } = {}, + options: Record<string, never> = {}, ): Promise<OperationAttemptResult> { logger.info(`processing refresh group ${refreshGroupId}`); diff --git a/packages/taler-wallet-core/src/operations/tip.ts b/packages/taler-wallet-core/src/operations/tip.ts index ec7546992..28c3cda52 100644 --- a/packages/taler-wallet-core/src/operations/tip.ts +++ b/packages/taler-wallet-core/src/operations/tip.ts @@ -164,9 +164,7 @@ export async function prepareTip( export async function processTip( ws: InternalWalletState, walletTipId: string, - options: { - forceNow?: boolean; - } = {}, + options: Record<string, never> = {}, ): Promise<OperationAttemptResult> { const tipRecord = await ws.db .mktx((x) => [x.tips]) diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index c03d2aa3d..1c2ce34bb 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -63,12 +63,15 @@ import { PeerPullPaymentInitiationRecord, } from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; +import { PendingTaskType } from "../pending-types.js"; import { checkDbInvariant } from "../util/invariants.js"; -import { RetryTags } from "../util/retries.js"; +import { constructTaskIdentifier, TaskIdentifiers } from "../util/retries.js"; import { makeTombstoneId, makeTransactionId, parseId, + resetOperationTimeout, + runOperationWithErrorReporting, TombstoneTag, } from "./common.js"; import { processDepositGroup } from "./deposits.js"; @@ -79,6 +82,7 @@ import { extractContractData, processPurchasePay, } from "./pay-merchant.js"; +import { processPeerPullCredit } from "./pay-peer.js"; import { processRefreshGroup } from "./refresh.js"; import { processTip } from "./tip.js"; import { @@ -152,7 +156,7 @@ export async function getTransactionById( if (!withdrawalGroupRecord) throw Error("not found"); - const opId = RetryTags.forWithdrawal(withdrawalGroupRecord); + const opId = TaskIdentifiers.forWithdrawal(withdrawalGroupRecord); const ort = await tx.operationRetries.get(opId); if ( @@ -215,7 +219,7 @@ export async function getTransactionById( Amounts.zeroOfAmount(contractData.amount), ); - const payOpId = RetryTags.forPay(purchase); + const payOpId = TaskIdentifiers.forPay(purchase); const payRetryRecord = await tx.operationRetries.get(payOpId); return buildTransactionForPurchase( @@ -237,7 +241,7 @@ export async function getTransactionById( if (!tipRecord) throw Error("not found"); const retries = await tx.operationRetries.get( - RetryTags.forTipPickup(tipRecord), + TaskIdentifiers.forTipPickup(tipRecord), ); return buildTransactionForTip(tipRecord, retries); }); @@ -250,7 +254,7 @@ export async function getTransactionById( if (!depositRecord) throw Error("not found"); const retries = await tx.operationRetries.get( - RetryTags.forDeposit(depositRecord), + TaskIdentifiers.forDeposit(depositRecord), ); return buildTransactionForDeposit(depositRecord, retries); }); @@ -359,11 +363,11 @@ export async function getTransactionById( if (pushInc.withdrawalGroupId) { wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId); if (wg) { - const withdrawalOpId = RetryTags.forWithdrawal(wg); + const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg); wgOrt = await tx.operationRetries.get(withdrawalOpId); } } - const pushIncOpId = RetryTags.forPeerPushCredit(pushInc); + const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pushInc); let pushIncOrt = await tx.operationRetries.get(pushIncOpId); return buildTransactionForPeerPushCredit( @@ -394,11 +398,12 @@ export async function getTransactionById( if (pushInc.withdrawalGroupId) { wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId); if (wg) { - const withdrawalOpId = RetryTags.forWithdrawal(wg); + const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg); wgOrt = await tx.operationRetries.get(withdrawalOpId); } } - const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pushInc); + const pushIncOpId = + TaskIdentifiers.forPeerPullPaymentInitiation(pushInc); let pushIncOrt = await tx.operationRetries.get(pushIncOpId); return buildTransactionForPeerPullCredit( @@ -1109,11 +1114,11 @@ export async function getTransactions( if (pi.withdrawalGroupId) { wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId); if (wg) { - const withdrawalOpId = RetryTags.forWithdrawal(wg); + const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg); wgOrt = await tx.operationRetries.get(withdrawalOpId); } } - const pushIncOpId = RetryTags.forPeerPushCredit(pi); + const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pi); let pushIncOrt = await tx.operationRetries.get(pushIncOpId); checkDbInvariant(!!ct); @@ -1142,11 +1147,11 @@ export async function getTransactions( if (pi.withdrawalGroupId) { wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId); if (wg) { - const withdrawalOpId = RetryTags.forWithdrawal(wg); + const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg); wgOrt = await tx.operationRetries.get(withdrawalOpId); } } - const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pi); + const pushIncOpId = TaskIdentifiers.forPeerPullPaymentInitiation(pi); let pushIncOrt = await tx.operationRetries.get(pushIncOpId); checkDbInvariant(!!ct); @@ -1166,7 +1171,7 @@ export async function getTransactions( return; } let required = false; - const opId = RetryTags.forRefresh(rg); + const opId = TaskIdentifiers.forRefresh(rg); if (transactionsRequest?.includeRefreshes) { required = true; } else if (rg.operationStatus !== RefreshOperationStatus.Finished) { @@ -1195,7 +1200,7 @@ export async function getTransactions( return; } - const opId = RetryTags.forWithdrawal(wsr); + const opId = TaskIdentifiers.forWithdrawal(wsr); const ort = await tx.operationRetries.get(opId); switch (wsr.wgInfo.withdrawalType) { @@ -1238,7 +1243,7 @@ export async function getTransactions( if (shouldSkipCurrency(transactionsRequest, amount.currency)) { return; } - const opId = RetryTags.forDeposit(dg); + const opId = TaskIdentifiers.forDeposit(dg); const retryRecord = await tx.operationRetries.get(opId); transactions.push(buildTransactionForDeposit(dg, retryRecord)); @@ -1309,7 +1314,7 @@ export async function getTransactions( ); }); - const payOpId = RetryTags.forPay(purchase); + const payOpId = TaskIdentifiers.forPay(purchase); const payRetryRecord = await tx.operationRetries.get(payOpId); transactions.push( await buildTransactionForPurchase( @@ -1333,7 +1338,7 @@ export async function getTransactions( if (!tipRecord.acceptedTimestamp) { return; } - const opId = RetryTags.forTipPickup(tipRecord); + const opId = TaskIdentifiers.forTipPickup(tipRecord); const retryRecord = await tx.operationRetries.get(opId); transactions.push(buildTransactionForTip(tipRecord, retryRecord)); }); @@ -1359,6 +1364,77 @@ export async function getTransactions( return { transactions: [...txNotPending, ...txPending] }; } +export type ParsedTransactionIdentifier = + | { tag: TransactionType.Deposit; depositGroupId: string } + | { tag: TransactionType.Payment; proposalId: string } + | { tag: TransactionType.PeerPullDebit; peerPullPaymentIncomingId: string } + | { tag: TransactionType.PeerPullCredit; pursePub: string } + | { tag: TransactionType.PeerPushCredit; peerPushPaymentIncomingId: string } + | { tag: TransactionType.PeerPushDebit; pursePub: string } + | { tag: TransactionType.Refresh; refreshGroupId: string } + | { tag: TransactionType.Refund; proposalId: string; executionTime: string } + | { tag: TransactionType.Tip; walletTipId: string } + | { tag: TransactionType.Withdrawal; withdrawalGroupId: string }; + +/** + * Parse a transaction identifier string into a typed, structured representation. + */ +export function parseTransactionIdentifier( + transactionId: string, +): ParsedTransactionIdentifier | undefined { + const { type, args: rest } = parseId("any", transactionId); + + switch (type) { + case TransactionType.Deposit: + return { tag: TransactionType.Deposit, depositGroupId: rest[0] }; + case TransactionType.Payment: + return { tag: TransactionType.Payment, proposalId: rest[0] }; + case TransactionType.PeerPullCredit: + return { tag: TransactionType.PeerPullCredit, pursePub: rest[0] }; + case TransactionType.PeerPullDebit: + return { + tag: TransactionType.PeerPullDebit, + peerPullPaymentIncomingId: rest[0], + }; + case TransactionType.PeerPushCredit: + return { + tag: TransactionType.PeerPushCredit, + peerPushPaymentIncomingId: rest[0], + }; + case TransactionType.PeerPushDebit: + return { tag: TransactionType.PeerPushDebit, pursePub: rest[0] }; + case TransactionType.Refresh: + return { tag: TransactionType.Refresh, refreshGroupId: rest[0] }; + case TransactionType.Refund: + return { + tag: TransactionType.Refund, + proposalId: rest[0], + executionTime: rest[1], + }; + case TransactionType.Tip: + return { + tag: TransactionType.Tip, + walletTipId: rest[0], + }; + case TransactionType.Withdrawal: + return { + tag: TransactionType.Withdrawal, + withdrawalGroupId: rest[0], + }; + default: + return undefined; + } +} + +export function stopLongpolling(ws: InternalWalletState, taskId: string) { + const longpoll = ws.activeLongpoll[taskId]; + if (longpoll) { + logger.info(`cancelling long-polling for ${taskId}`); + longpoll.cancel(); + delete ws.activeLongpoll[taskId]; + } +} + /** * Immediately retry the underlying operation * of a transaction. @@ -1369,34 +1445,86 @@ export async function retryTransaction( ): Promise<void> { logger.info(`retrying transaction ${transactionId}`); - const { type, args: rest } = parseId("any", transactionId); + const parsedTx = parseTransactionIdentifier(transactionId); - switch (type) { + if (!parsedTx) { + throw Error("invalid transaction identifier"); + } + + // FIXME: We currently don't cancel active long-polling tasks here. + + switch (parsedTx.tag) { + case TransactionType.PeerPullCredit: { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullInitiation, + pursePub: parsedTx.pursePub, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processPeerPullCredit(ws, parsedTx.pursePub), + ); + break; + } case TransactionType.Deposit: { - const depositGroupId = rest[0]; - processDepositGroup(ws, depositGroupId, { - forceNow: true, + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Deposit, + depositGroupId: parsedTx.depositGroupId, }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processDepositGroup(ws, parsedTx.depositGroupId), + ); break; } case TransactionType.Withdrawal: { - const withdrawalGroupId = rest[0]; - await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true }); + // FIXME: Abort current long-poller! + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId: parsedTx.withdrawalGroupId, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processWithdrawalGroup(ws, parsedTx.withdrawalGroupId), + ); break; } case TransactionType.Payment: { - const proposalId = rest[0]; - await processPurchasePay(ws, proposalId, { forceNow: true }); + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Purchase, + proposalId: parsedTx.proposalId, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processPurchasePay(ws, parsedTx.proposalId), + ); break; } case TransactionType.Tip: { - const walletTipId = rest[0]; - await processTip(ws, walletTipId, { forceNow: true }); + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.TipPickup, + walletTipId: parsedTx.walletTipId, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processTip(ws, parsedTx.walletTipId), + ); break; } case TransactionType.Refresh: { - const refreshGroupId = rest[0]; - await processRefreshGroup(ws, refreshGroupId, { forceNow: true }); + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId: parsedTx.refreshGroupId, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processRefreshGroup(ws, parsedTx.refreshGroupId), + ); break; } default: diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 9dfd72678..5729b8458 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -109,7 +109,7 @@ import { DbAccess, GetReadOnlyAccess } from "../util/query.js"; import { OperationAttemptResult, OperationAttemptResultType, - RetryTags, + TaskIdentifiers, } from "../util/retries.js"; import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, @@ -1023,7 +1023,6 @@ export async function processWithdrawalGroup( ws: InternalWalletState, withdrawalGroupId: string, options: { - forceNow?: boolean; } = {}, ): Promise<OperationAttemptResult> { logger.trace("processing withdrawal group", withdrawalGroupId); @@ -1037,10 +1036,10 @@ export async function processWithdrawalGroup( throw Error(`withdrawal group ${withdrawalGroupId} not found`); } - const retryTag = RetryTags.forWithdrawal(withdrawalGroup); + const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup); // We're already running! - if (ws.activeLongpoll[retryTag] && !options.forceNow) { + if (ws.activeLongpoll[retryTag]) { logger.info("withdrawal group already in long-polling, returning!"); return { type: OperationAttemptResultType.Longpoll, @@ -1532,7 +1531,7 @@ export async function getWithdrawalDetailsForUri( .iter(r.baseUrl) .toArray(); const retryRecord = await tx.operationRetries.get( - RetryTags.forExchangeUpdate(r), + TaskIdentifiers.forExchangeUpdate(r), ); if (exchangeDetails && denominations) { exchanges.push( @@ -2087,7 +2086,7 @@ export async function createManualWithdrawal( // rely on retry handling to re-process the withdrawal group. runOperationWithErrorReporting( ws, - RetryTags.forWithdrawal(withdrawalGroup), + TaskIdentifiers.forWithdrawal(withdrawalGroup), async () => { return await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true, diff --git a/packages/taler-wallet-core/src/util/retries.ts b/packages/taler-wallet-core/src/util/retries.ts index 5744bf8fe..5b6645924 100644 --- a/packages/taler-wallet-core/src/util/retries.ts +++ b/packages/taler-wallet-core/src/util/retries.ts @@ -46,6 +46,7 @@ import { TalerError } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { PendingTaskType } from "../pending-types.js"; import { GetReadWriteAccess } from "./query.js"; +import { assertUnreachable } from "./assertUnreachable.js"; const logger = new Logger("util/retries.ts"); @@ -176,7 +177,66 @@ export namespace RetryInfo { } } -export namespace RetryTags { +/** + * Parsed representation of task identifiers. + */ +export type ParsedTaskIdentifier = + | { + tag: PendingTaskType.Withdraw; + withdrawalGroupId: string; + } + | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string } + | { tag: PendingTaskType.Backup; backupProviderBaseUrl: string } + | { tag: PendingTaskType.Deposit; depositGroupId: string } + | { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string } + | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string } + | { tag: PendingTaskType.PeerPullDebit; peerPullPaymentIncomingId: string } + | { tag: PendingTaskType.PeerPullInitiation; pursePub: string } + | { tag: PendingTaskType.PeerPushCredit; peerPushPaymentIncomingId: string } + | { tag: PendingTaskType.PeerPushInitiation; pursePub: string } + | { tag: PendingTaskType.Purchase; proposalId: string } + | { tag: PendingTaskType.Recoup; recoupGroupId: string } + | { tag: PendingTaskType.TipPickup; walletTipId: string } + | { tag: PendingTaskType.Refresh; refreshGroupId: string }; + +export function parseTaskIdentifier(x: string): ParsedTaskIdentifier { + throw Error("not yet implemented"); +} + +export function constructTaskIdentifier(p: ParsedTaskIdentifier): string { + switch (p.tag) { + case PendingTaskType.Backup: + return `${p.tag}:${p.backupProviderBaseUrl}`; + case PendingTaskType.Deposit: + return `${p.tag}:${p.depositGroupId}`; + case PendingTaskType.ExchangeCheckRefresh: + return `${p.tag}:${p.exchangeBaseUrl}`; + case PendingTaskType.ExchangeUpdate: + return `${p.tag}:${p.exchangeBaseUrl}`; + case PendingTaskType.PeerPullDebit: + return `${p.tag}:${p.peerPullPaymentIncomingId}`; + case PendingTaskType.PeerPushCredit: + return `${p.tag}:${p.peerPushPaymentIncomingId}`; + case PendingTaskType.PeerPullInitiation: + return `${p.tag}:${p.pursePub}`; + case PendingTaskType.PeerPushInitiation: + return `${p.tag}:${p.pursePub}`; + case PendingTaskType.Purchase: + return `${p.tag}:${p.proposalId}`; + case PendingTaskType.Recoup: + return `${p.tag}:${p.recoupGroupId}`; + case PendingTaskType.Refresh: + return `${p.tag}:${p.refreshGroupId}`; + case PendingTaskType.TipPickup: + return `${p.tag}:${p.walletTipId}`; + case PendingTaskType.Withdraw: + return `${p.tag}:${p.withdrawalGroupId}`; + default: + assertUnreachable(p); + } +} + +export namespace TaskIdentifiers { export function forWithdrawal(wg: WithdrawalGroupRecord): string { return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}`; } @@ -227,19 +287,6 @@ export namespace RetryTags { ): string { return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushPaymentIncomingId}`; } - export function byPaymentProposalId(proposalId: string): string { - return `${PendingTaskType.Purchase}:${proposalId}`; - } - export function byPeerPushPaymentInitiationPursePub( - pursePub: string, - ): string { - return `${PendingTaskType.PeerPushInitiation}:${pursePub}`; - } - export function byPeerPullPaymentInitiationPursePub( - pursePub: string, - ): string { - return `${PendingTaskType.PeerPullInitiation}:${pursePub}`; - } } export async function scheduleRetryInTx( diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index a036be86c..47724efdc 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -265,7 +265,7 @@ import { GetReadOnlyAccess, GetReadWriteAccess, } from "./util/query.js"; -import { OperationAttemptResult, RetryTags } from "./util/retries.js"; +import { OperationAttemptResult, TaskIdentifiers } from "./util/retries.js"; import { TimerAPI, TimerGroup } from "./util/timer.js"; import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, @@ -306,17 +306,15 @@ async function callOperationHandler( forceNow, }); case PendingTaskType.Refresh: - return await processRefreshGroup(ws, pending.refreshGroupId, { - forceNow, - }); + return await processRefreshGroup(ws, pending.refreshGroupId); case PendingTaskType.Withdraw: return await processWithdrawalGroup(ws, pending.withdrawalGroupId, { forceNow, }); case PendingTaskType.TipPickup: - return await processTip(ws, pending.tipId, { forceNow }); + return await processTip(ws, pending.tipId); case PendingTaskType.Purchase: - return await processPurchase(ws, pending.proposalId, { forceNow }); + return await processPurchase(ws, pending.proposalId); case PendingTaskType.Recoup: return await processRecoupGroupHandler(ws, pending.recoupGroupId, { forceNow, @@ -324,9 +322,7 @@ async function callOperationHandler( case PendingTaskType.ExchangeCheckRefresh: return await autoRefresh(ws, pending.exchangeBaseUrl); case PendingTaskType.Deposit: { - return await processDepositGroup(ws, pending.depositGroupId, { - forceNow, - }); + return await processDepositGroup(ws, pending.depositGroupId); } case PendingTaskType.Backup: return await processBackupForProvider(ws, pending.backupProviderBaseUrl); @@ -691,7 +687,7 @@ async function getExchanges( for (const r of exchangeRecords) { const exchangeDetails = await getExchangeDetails(tx, r.baseUrl); const opRetryRecord = await tx.operationRetries.get( - RetryTags.forExchangeUpdate(r), + TaskIdentifiers.forExchangeUpdate(r), ); exchanges.push( makeExchangeListItem(r, exchangeDetails, opRetryRecord?.lastError), @@ -1285,9 +1281,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>( RefreshReason.Manual, ); }); - processRefreshGroup(ws, refreshGroupId.refreshGroupId, { - forceNow: true, - }).catch((x) => { + processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((x) => { logger.error(x); }); return { @@ -1753,6 +1747,7 @@ class InternalWalletStateImpl implements InternalWalletState { for (const key of Object.keys(this.activeLongpoll)) { logger.trace(`cancelling active longpoll ${key}`); this.activeLongpoll[key].cancel(); + delete this.activeLongpoll[key]; } } |