diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/transactions.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/transactions.ts | 190 |
1 files changed, 159 insertions, 31 deletions
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index c03d2aa3d..1c2ce34bb 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -63,12 +63,15 @@ import { PeerPullPaymentInitiationRecord, } from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; +import { PendingTaskType } from "../pending-types.js"; import { checkDbInvariant } from "../util/invariants.js"; -import { RetryTags } from "../util/retries.js"; +import { constructTaskIdentifier, TaskIdentifiers } from "../util/retries.js"; import { makeTombstoneId, makeTransactionId, parseId, + resetOperationTimeout, + runOperationWithErrorReporting, TombstoneTag, } from "./common.js"; import { processDepositGroup } from "./deposits.js"; @@ -79,6 +82,7 @@ import { extractContractData, processPurchasePay, } from "./pay-merchant.js"; +import { processPeerPullCredit } from "./pay-peer.js"; import { processRefreshGroup } from "./refresh.js"; import { processTip } from "./tip.js"; import { @@ -152,7 +156,7 @@ export async function getTransactionById( if (!withdrawalGroupRecord) throw Error("not found"); - const opId = RetryTags.forWithdrawal(withdrawalGroupRecord); + const opId = TaskIdentifiers.forWithdrawal(withdrawalGroupRecord); const ort = await tx.operationRetries.get(opId); if ( @@ -215,7 +219,7 @@ export async function getTransactionById( Amounts.zeroOfAmount(contractData.amount), ); - const payOpId = RetryTags.forPay(purchase); + const payOpId = TaskIdentifiers.forPay(purchase); const payRetryRecord = await tx.operationRetries.get(payOpId); return buildTransactionForPurchase( @@ -237,7 +241,7 @@ export async function getTransactionById( if (!tipRecord) throw Error("not found"); const retries = await tx.operationRetries.get( - RetryTags.forTipPickup(tipRecord), + TaskIdentifiers.forTipPickup(tipRecord), ); return buildTransactionForTip(tipRecord, retries); }); @@ -250,7 +254,7 @@ export async function getTransactionById( if (!depositRecord) throw Error("not found"); const retries = await tx.operationRetries.get( - RetryTags.forDeposit(depositRecord), + TaskIdentifiers.forDeposit(depositRecord), ); return buildTransactionForDeposit(depositRecord, retries); }); @@ -359,11 +363,11 @@ export async function getTransactionById( if (pushInc.withdrawalGroupId) { wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId); if (wg) { - const withdrawalOpId = RetryTags.forWithdrawal(wg); + const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg); wgOrt = await tx.operationRetries.get(withdrawalOpId); } } - const pushIncOpId = RetryTags.forPeerPushCredit(pushInc); + const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pushInc); let pushIncOrt = await tx.operationRetries.get(pushIncOpId); return buildTransactionForPeerPushCredit( @@ -394,11 +398,12 @@ export async function getTransactionById( if (pushInc.withdrawalGroupId) { wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId); if (wg) { - const withdrawalOpId = RetryTags.forWithdrawal(wg); + const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg); wgOrt = await tx.operationRetries.get(withdrawalOpId); } } - const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pushInc); + const pushIncOpId = + TaskIdentifiers.forPeerPullPaymentInitiation(pushInc); let pushIncOrt = await tx.operationRetries.get(pushIncOpId); return buildTransactionForPeerPullCredit( @@ -1109,11 +1114,11 @@ export async function getTransactions( if (pi.withdrawalGroupId) { wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId); if (wg) { - const withdrawalOpId = RetryTags.forWithdrawal(wg); + const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg); wgOrt = await tx.operationRetries.get(withdrawalOpId); } } - const pushIncOpId = RetryTags.forPeerPushCredit(pi); + const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pi); let pushIncOrt = await tx.operationRetries.get(pushIncOpId); checkDbInvariant(!!ct); @@ -1142,11 +1147,11 @@ export async function getTransactions( if (pi.withdrawalGroupId) { wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId); if (wg) { - const withdrawalOpId = RetryTags.forWithdrawal(wg); + const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg); wgOrt = await tx.operationRetries.get(withdrawalOpId); } } - const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pi); + const pushIncOpId = TaskIdentifiers.forPeerPullPaymentInitiation(pi); let pushIncOrt = await tx.operationRetries.get(pushIncOpId); checkDbInvariant(!!ct); @@ -1166,7 +1171,7 @@ export async function getTransactions( return; } let required = false; - const opId = RetryTags.forRefresh(rg); + const opId = TaskIdentifiers.forRefresh(rg); if (transactionsRequest?.includeRefreshes) { required = true; } else if (rg.operationStatus !== RefreshOperationStatus.Finished) { @@ -1195,7 +1200,7 @@ export async function getTransactions( return; } - const opId = RetryTags.forWithdrawal(wsr); + const opId = TaskIdentifiers.forWithdrawal(wsr); const ort = await tx.operationRetries.get(opId); switch (wsr.wgInfo.withdrawalType) { @@ -1238,7 +1243,7 @@ export async function getTransactions( if (shouldSkipCurrency(transactionsRequest, amount.currency)) { return; } - const opId = RetryTags.forDeposit(dg); + const opId = TaskIdentifiers.forDeposit(dg); const retryRecord = await tx.operationRetries.get(opId); transactions.push(buildTransactionForDeposit(dg, retryRecord)); @@ -1309,7 +1314,7 @@ export async function getTransactions( ); }); - const payOpId = RetryTags.forPay(purchase); + const payOpId = TaskIdentifiers.forPay(purchase); const payRetryRecord = await tx.operationRetries.get(payOpId); transactions.push( await buildTransactionForPurchase( @@ -1333,7 +1338,7 @@ export async function getTransactions( if (!tipRecord.acceptedTimestamp) { return; } - const opId = RetryTags.forTipPickup(tipRecord); + const opId = TaskIdentifiers.forTipPickup(tipRecord); const retryRecord = await tx.operationRetries.get(opId); transactions.push(buildTransactionForTip(tipRecord, retryRecord)); }); @@ -1359,6 +1364,77 @@ export async function getTransactions( return { transactions: [...txNotPending, ...txPending] }; } +export type ParsedTransactionIdentifier = + | { tag: TransactionType.Deposit; depositGroupId: string } + | { tag: TransactionType.Payment; proposalId: string } + | { tag: TransactionType.PeerPullDebit; peerPullPaymentIncomingId: string } + | { tag: TransactionType.PeerPullCredit; pursePub: string } + | { tag: TransactionType.PeerPushCredit; peerPushPaymentIncomingId: string } + | { tag: TransactionType.PeerPushDebit; pursePub: string } + | { tag: TransactionType.Refresh; refreshGroupId: string } + | { tag: TransactionType.Refund; proposalId: string; executionTime: string } + | { tag: TransactionType.Tip; walletTipId: string } + | { tag: TransactionType.Withdrawal; withdrawalGroupId: string }; + +/** + * Parse a transaction identifier string into a typed, structured representation. + */ +export function parseTransactionIdentifier( + transactionId: string, +): ParsedTransactionIdentifier | undefined { + const { type, args: rest } = parseId("any", transactionId); + + switch (type) { + case TransactionType.Deposit: + return { tag: TransactionType.Deposit, depositGroupId: rest[0] }; + case TransactionType.Payment: + return { tag: TransactionType.Payment, proposalId: rest[0] }; + case TransactionType.PeerPullCredit: + return { tag: TransactionType.PeerPullCredit, pursePub: rest[0] }; + case TransactionType.PeerPullDebit: + return { + tag: TransactionType.PeerPullDebit, + peerPullPaymentIncomingId: rest[0], + }; + case TransactionType.PeerPushCredit: + return { + tag: TransactionType.PeerPushCredit, + peerPushPaymentIncomingId: rest[0], + }; + case TransactionType.PeerPushDebit: + return { tag: TransactionType.PeerPushDebit, pursePub: rest[0] }; + case TransactionType.Refresh: + return { tag: TransactionType.Refresh, refreshGroupId: rest[0] }; + case TransactionType.Refund: + return { + tag: TransactionType.Refund, + proposalId: rest[0], + executionTime: rest[1], + }; + case TransactionType.Tip: + return { + tag: TransactionType.Tip, + walletTipId: rest[0], + }; + case TransactionType.Withdrawal: + return { + tag: TransactionType.Withdrawal, + withdrawalGroupId: rest[0], + }; + default: + return undefined; + } +} + +export function stopLongpolling(ws: InternalWalletState, taskId: string) { + const longpoll = ws.activeLongpoll[taskId]; + if (longpoll) { + logger.info(`cancelling long-polling for ${taskId}`); + longpoll.cancel(); + delete ws.activeLongpoll[taskId]; + } +} + /** * Immediately retry the underlying operation * of a transaction. @@ -1369,34 +1445,86 @@ export async function retryTransaction( ): Promise<void> { logger.info(`retrying transaction ${transactionId}`); - const { type, args: rest } = parseId("any", transactionId); + const parsedTx = parseTransactionIdentifier(transactionId); - switch (type) { + if (!parsedTx) { + throw Error("invalid transaction identifier"); + } + + // FIXME: We currently don't cancel active long-polling tasks here. + + switch (parsedTx.tag) { + case TransactionType.PeerPullCredit: { + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.PeerPullInitiation, + pursePub: parsedTx.pursePub, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processPeerPullCredit(ws, parsedTx.pursePub), + ); + break; + } case TransactionType.Deposit: { - const depositGroupId = rest[0]; - processDepositGroup(ws, depositGroupId, { - forceNow: true, + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Deposit, + depositGroupId: parsedTx.depositGroupId, }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processDepositGroup(ws, parsedTx.depositGroupId), + ); break; } case TransactionType.Withdrawal: { - const withdrawalGroupId = rest[0]; - await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true }); + // FIXME: Abort current long-poller! + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Withdraw, + withdrawalGroupId: parsedTx.withdrawalGroupId, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processWithdrawalGroup(ws, parsedTx.withdrawalGroupId), + ); break; } case TransactionType.Payment: { - const proposalId = rest[0]; - await processPurchasePay(ws, proposalId, { forceNow: true }); + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Purchase, + proposalId: parsedTx.proposalId, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processPurchasePay(ws, parsedTx.proposalId), + ); break; } case TransactionType.Tip: { - const walletTipId = rest[0]; - await processTip(ws, walletTipId, { forceNow: true }); + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.TipPickup, + walletTipId: parsedTx.walletTipId, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processTip(ws, parsedTx.walletTipId), + ); break; } case TransactionType.Refresh: { - const refreshGroupId = rest[0]; - await processRefreshGroup(ws, refreshGroupId, { forceNow: true }); + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.Refresh, + refreshGroupId: parsedTx.refreshGroupId, + }); + await resetOperationTimeout(ws, taskId); + stopLongpolling(ws, taskId); + await runOperationWithErrorReporting(ws, taskId, () => + processRefreshGroup(ws, parsedTx.refreshGroupId), + ); break; } default: |