diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pay-merchant.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/pay-merchant.ts | 647 |
1 files changed, 315 insertions, 332 deletions
diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts index 1039ac95e..260fc815a 100644 --- a/packages/taler-wallet-core/src/operations/pay-merchant.ts +++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts @@ -114,7 +114,8 @@ import { timestampPreciseToDb, timestampProtocolFromDb, timestampProtocolToDb, - WalletDbReadWriteTransactionArr, + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, } from "../index.js"; import { EXCHANGE_COINS_LOCK, @@ -123,11 +124,7 @@ import { import { assertUnreachable } from "../util/assertUnreachable.js"; import { PreviousPayCoins, selectPayCoinsNew } from "../util/coinSelection.js"; import { checkDbInvariant } from "../util/invariants.js"; -import { - DbReadWriteTransactionArr, - GetReadOnlyAccess, - StoreNames, -} from "../util/query.js"; +import { DbReadWriteTransaction, StoreNames } from "../util/query.js"; import { constructTaskIdentifier, DbRetryInfo, @@ -197,7 +194,7 @@ export class PayMerchantTransactionContext implements TransactionContext { opts: { extraStores: StoreNameArray }, f: ( rec: PurchaseRecord, - tx: DbReadWriteTransactionArr< + tx: DbReadWriteTransaction< typeof WalletStoresV1, ["purchases", ...StoreNameArray] >, @@ -233,29 +230,27 @@ export class PayMerchantTransactionContext implements TransactionContext { async deleteTransaction(): Promise<void> { const { ws, proposalId } = this; - await ws.db - .mktx((x) => [x.purchases, x.tombstones]) - .runReadWrite(async (tx) => { - let found = false; - const purchase = await tx.purchases.get(proposalId); - if (purchase) { - found = true; - await tx.purchases.delete(proposalId); - } - if (found) { - await tx.tombstones.put({ - id: TombstoneTag.DeletePayment + ":" + proposalId, - }); - } - }); + await ws.db.runReadWriteTx(["purchases", "tombstones"], async (tx) => { + let found = false; + const purchase = await tx.purchases.get(proposalId); + if (purchase) { + found = true; + await tx.purchases.delete(proposalId); + } + if (found) { + await tx.tombstones.put({ + id: TombstoneTag.DeletePayment + ":" + proposalId, + }); + } + }); } async suspendTransaction(): Promise<void> { const { ws, proposalId, transactionId } = this; ws.taskScheduler.stopShepherdTask(this.taskId); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { throw Error("purchase not found"); @@ -268,7 +263,8 @@ export class PayMerchantTransactionContext implements TransactionContext { await tx.purchases.put(purchase); const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -341,9 +337,9 @@ export class PayMerchantTransactionContext implements TransactionContext { async resumeTransaction(): Promise<void> { const { ws, proposalId, transactionId, taskId: retryTag } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { throw Error("purchase not found"); @@ -356,23 +352,24 @@ export class PayMerchantTransactionContext implements TransactionContext { await tx.purchases.put(purchase); const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise<void> { const { ws, proposalId, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [ - x.purchases, - x.refreshGroups, - x.denominations, - x.coinAvailability, - x.coins, - x.operationRetries, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "purchases", + "refreshGroups", + "denominations", + "coinAvailability", + "coins", + "operationRetries", + ], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { throw Error("purchase not found"); @@ -390,7 +387,8 @@ export class PayMerchantTransactionContext implements TransactionContext { } const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.stopShepherdTask(this.taskId); } @@ -410,17 +408,15 @@ export class RefundTransactionContext implements TransactionContext { async deleteTransaction(): Promise<void> { const { ws, refundGroupId, transactionId } = this; - await ws.db - .mktx((x) => [x.refundGroups, x.tombstones]) - .runReadWrite(async (tx) => { - const refundRecord = await tx.refundGroups.get(refundGroupId); - if (!refundRecord) { - return; - } - await tx.refundGroups.delete(refundGroupId); - await tx.tombstones.put({ id: transactionId }); - // FIXME: Also tombstone the refund items, so that they won't reappear. - }); + await ws.db.runReadWriteTx(["refundGroups", "tombstones"], async (tx) => { + const refundRecord = await tx.refundGroups.get(refundGroupId); + if (!refundRecord) { + return; + } + await tx.refundGroups.delete(refundGroupId); + await tx.tombstones.put({ id: transactionId }); + // FIXME: Also tombstone the refund items, so that they won't reappear. + }); } suspendTransaction(): Promise<void> { @@ -452,46 +448,44 @@ export async function getTotalPaymentCost( pcs: PayCoinSelection, ): Promise<AmountJson> { const currency = Amounts.currencyOf(pcs.paymentAmount); - return ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - const costs: AmountJson[] = []; - for (let i = 0; i < pcs.coinPubs.length; i++) { - const coin = await tx.coins.get(pcs.coinPubs[i]); - if (!coin) { - throw Error("can't calculate payment cost, coin not found"); - } - const denom = await tx.denominations.get([ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - throw Error( - "can't calculate payment cost, denomination for coin not found", - ); - } - const allDenoms = await getCandidateWithdrawalDenomsTx( - ws, - tx, - coin.exchangeBaseUrl, - currency, - ); - const amountLeft = Amounts.sub( - denom.value, - pcs.coinContributions[i], - ).amount; - const refreshCost = getTotalRefreshCost( - allDenoms, - DenominationRecord.toDenomInfo(denom), - amountLeft, - ws.config.testing.denomselAllowLate, + return ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + const costs: AmountJson[] = []; + for (let i = 0; i < pcs.coinPubs.length; i++) { + const coin = await tx.coins.get(pcs.coinPubs[i]); + if (!coin) { + throw Error("can't calculate payment cost, coin not found"); + } + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + throw Error( + "can't calculate payment cost, denomination for coin not found", ); - costs.push(Amounts.parseOrThrow(pcs.coinContributions[i])); - costs.push(refreshCost); } - const zero = Amounts.zeroOfAmount(pcs.paymentAmount); - return Amounts.sum([zero, ...costs]).amount; - }); + const allDenoms = await getCandidateWithdrawalDenomsTx( + ws, + tx, + coin.exchangeBaseUrl, + currency, + ); + const amountLeft = Amounts.sub( + denom.value, + pcs.coinContributions[i], + ).amount; + const refreshCost = getTotalRefreshCost( + allDenoms, + DenominationRecord.toDenomInfo(denom), + amountLeft, + ws.config.testing.denomselAllowLate, + ); + costs.push(Amounts.parseOrThrow(pcs.coinContributions[i])); + costs.push(refreshCost); + } + const zero = Amounts.zeroOfAmount(pcs.paymentAmount); + return Amounts.sum([zero, ...costs]).amount; + }); } async function failProposalPermanently( @@ -503,9 +497,9 @@ async function failProposalPermanently( tag: TransactionType.Payment, proposalId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { return; @@ -516,7 +510,8 @@ async function failProposalPermanently( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -543,9 +538,7 @@ function getPayRequestTimeout(purchase: PurchaseRecord): Duration { export async function expectProposalDownload( ws: InternalWalletState, p: PurchaseRecord, - parentTx?: GetReadOnlyAccess<{ - contractTerms: typeof WalletStoresV1.contractTerms; - }>, + parentTx?: WalletDbReadOnlyTransaction<["contractTerms"]>, ): Promise<{ contractData: WalletContractData; contractTermsRaw: any; @@ -577,9 +570,7 @@ export async function expectProposalDownload( if (parentTx) { return getFromTransaction(parentTx); } - return await ws.db - .mktx((x) => [x.contractTerms]) - .runReadOnly(getFromTransaction); + return await ws.db.runReadOnlyTx(["contractTerms"], getFromTransaction); } export function extractContractData( @@ -626,11 +617,9 @@ async function processDownloadProposal( ws: InternalWalletState, proposalId: string, ): Promise<TaskRunResult> { - const proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return await tx.purchases.get(proposalId); - }); + const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return await tx.purchases.get(proposalId); + }); if (!proposal) { return TaskRunResult.finished(); @@ -666,11 +655,12 @@ async function processDownloadProposal( } const opId = TaskIdentifiers.forPay(proposal); - const retryRecord = await ws.db - .mktx((x) => [x.operationRetries]) - .runReadOnly(async (tx) => { + const retryRecord = await ws.db.runReadOnlyTx( + ["operationRetries"], + async (tx) => { return tx.operationRetries.get(opId); - }); + }, + ); const httpResponse = await ws.http.fetch(orderClaimUrl, { method: "POST", @@ -807,9 +797,9 @@ async function processDownloadProposal( logger.trace(`extracted contract data: ${j2s(contractData)}`); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases, x.contractTerms]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases", "contractTerms"], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { return; @@ -855,7 +845,8 @@ async function processDownloadProposal( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); @@ -875,14 +866,12 @@ async function createOrReusePurchase( claimToken: string | undefined, noncePriv: string | undefined, ): Promise<string> { - const oldProposals = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.indexes.byUrlAndOrderId.getAll([ - merchantBaseUrl, - orderId, - ]); - }); + const oldProposals = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.indexes.byUrlAndOrderId.getAll([ + merchantBaseUrl, + orderId, + ]); + }); const oldProposal = oldProposals.find((p) => { return ( @@ -911,9 +900,9 @@ async function createOrReusePurchase( if (paid) { // if this transaction was shared and the order is paid then it // means that another wallet already paid the proposal - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(oldProposal.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -924,7 +913,8 @@ async function createOrReusePurchase( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Payment, @@ -976,9 +966,9 @@ async function createOrReusePurchase( shared: shared, }; - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { await tx.purchases.put(proposalRecord); const oldTxState: TransactionState = { major: TransactionMajorState.None, @@ -988,7 +978,8 @@ async function createOrReusePurchase( oldTxState, newTxState, }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Payment, @@ -1009,9 +1000,9 @@ async function storeFirstPaySuccess( proposalId, }); const now = AbsoluteTime.toPreciseTimestamp(AbsoluteTime.now()); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases, x.contractTerms]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["contractTerms", "purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { @@ -1059,7 +1050,8 @@ async function storeFirstPaySuccess( oldTxState, newTxState, }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -1072,9 +1064,9 @@ async function storePayReplaySuccess( tag: TransactionType.Payment, proposalId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if (!purchase) { @@ -1096,7 +1088,8 @@ async function storePayReplaySuccess( await tx.purchases.put(purchase); const newTxState = computePayMerchantTransactionState(purchase); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -1115,11 +1108,9 @@ async function handleInsufficientFunds( ): Promise<void> { logger.trace("handling insufficient funds, trying to re-select coins"); - const proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!proposal) { return; } @@ -1156,34 +1147,32 @@ async function handleInsufficientFunds( const payCoinSelection = payInfo.payCoinSelection; - await ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - for (let i = 0; i < payCoinSelection.coinPubs.length; i++) { - const coinPub = payCoinSelection.coinPubs[i]; - if (coinPub === brokenCoinPub) { - continue; - } - const contrib = payCoinSelection.coinContributions[i]; - const coin = await tx.coins.get(coinPub); - if (!coin) { - continue; - } - const denom = await tx.denominations.get([ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - continue; - } - prevPayCoins.push({ - coinPub, - contribution: Amounts.parseOrThrow(contrib), - exchangeBaseUrl: coin.exchangeBaseUrl, - feeDeposit: Amounts.parseOrThrow(denom.fees.feeDeposit), - }); + await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + for (let i = 0; i < payCoinSelection.coinPubs.length; i++) { + const coinPub = payCoinSelection.coinPubs[i]; + if (coinPub === brokenCoinPub) { + continue; } - }); + const contrib = payCoinSelection.coinContributions[i]; + const coin = await tx.coins.get(coinPub); + if (!coin) { + continue; + } + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + continue; + } + prevPayCoins.push({ + coinPub, + contribution: Amounts.parseOrThrow(contrib), + exchangeBaseUrl: coin.exchangeBaseUrl, + feeDeposit: Amounts.parseOrThrow(denom.fees.feeDeposit), + }); + } + }); const res = await selectPayCoinsNew(ws, { auditors: [], @@ -1204,15 +1193,15 @@ async function handleInsufficientFunds( logger.trace("re-selected coins"); - await ws.db - .mktx((x) => [ - x.purchases, - x.coins, - x.coinAvailability, - x.denominations, - x.refreshGroups, - ]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + [ + "purchases", + "coins", + "coinAvailability", + "denominations", + "refreshGroups", + ], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { return; @@ -1236,7 +1225,8 @@ async function handleInsufficientFunds( ), refreshReason: RefreshReason.PayMerchant, }); - }); + }, + ); ws.notify({ type: NotificationType.BalanceChange, @@ -1255,11 +1245,9 @@ async function checkPaymentByProposalId( proposalId: string, sessionId?: string, ): Promise<PreparePayResult> { - let proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + let proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!proposal) { throw Error(`could not get proposal ${proposalId}`); } @@ -1267,11 +1255,12 @@ async function checkPaymentByProposalId( const existingProposalId = proposal.repurchaseProposalId; if (existingProposalId) { logger.trace("using existing purchase for same product"); - const oldProposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { + const oldProposal = await ws.db.runReadOnlyTx( + ["purchases"], + async (tx) => { return tx.purchases.get(existingProposalId); - }); + }, + ); if (oldProposal) { proposal = oldProposal; } @@ -1299,11 +1288,9 @@ async function checkPaymentByProposalId( }); // First check if we already paid for it. - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if ( !purchase || @@ -1363,9 +1350,9 @@ async function checkPaymentByProposalId( "automatically re-submitting payment with different session ID", ); logger.trace(`last: ${purchase.lastSessionId}, current: ${sessionId}`); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { return; @@ -1376,7 +1363,8 @@ async function checkPaymentByProposalId( await tx.purchases.put(p); const newTxState = computePayMerchantTransactionState(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(ctx.taskId); @@ -1440,11 +1428,9 @@ export async function getContractTermsDetails( ws: InternalWalletState, proposalId: string, ): Promise<WalletContractData> { - const proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!proposal) { throw Error(`proposal with id ${proposalId} not found`); @@ -1630,26 +1616,24 @@ export async function generateDepositPermissions( coin: CoinRecord; denom: DenominationRecord; }> = []; - await ws.db - .mktx((x) => [x.coins, x.denominations]) - .runReadOnly(async (tx) => { - for (let i = 0; i < payCoinSel.coinPubs.length; i++) { - const coin = await tx.coins.get(payCoinSel.coinPubs[i]); - if (!coin) { - throw Error("can't pay, allocated coin not found anymore"); - } - const denom = await tx.denominations.get([ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - throw Error( - "can't pay, denomination of allocated coin not found anymore", - ); - } - coinWithDenom.push({ coin, denom }); + await ws.db.runReadOnlyTx(["coins", "denominations"], async (tx) => { + for (let i = 0; i < payCoinSel.coinPubs.length; i++) { + const coin = await tx.coins.get(payCoinSel.coinPubs[i]); + if (!coin) { + throw Error("can't pay, allocated coin not found anymore"); } - }); + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + throw Error( + "can't pay, denomination of allocated coin not found anymore", + ); + } + coinWithDenom.push({ coin, denom }); + } + }); for (let i = 0; i < payCoinSel.coinPubs.length; i++) { const { coin, denom } = coinWithDenom[i]; @@ -1805,11 +1789,9 @@ export async function confirmPay( logger.trace( `executing confirmPay with proposalId ${proposalId} and sessionIdOverride ${sessionIdOverride}`, ); - const proposal = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const proposal = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!proposal) { throw Error(`proposal with id ${proposalId} not found`); @@ -1820,9 +1802,9 @@ export async function confirmPay( throw Error("proposal is in invalid state"); } - const existingPurchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const existingPurchase = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const purchase = await tx.purchases.get(proposalId); if ( purchase && @@ -1837,7 +1819,8 @@ export async function confirmPay( await tx.purchases.put(purchase); } return purchase; - }); + }, + ); if (existingPurchase && existingPurchase.payInfo) { logger.trace("confirmPay: submitting payment for existing purchase"); @@ -1890,15 +1873,15 @@ export async function confirmPay( `recording payment on ${proposal.orderId} with session ID ${sessionId}`, ); - const transitionInfo = await ws.db - .mktx((x) => [ - x.purchases, - x.coins, - x.refreshGroups, - x.denominations, - x.coinAvailability, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "purchases", + "coins", + "refreshGroups", + "denominations", + "coinAvailability", + ], + async (tx) => { const p = await tx.purchases.get(proposal.proposalId); if (!p) { return; @@ -1936,7 +1919,8 @@ export async function confirmPay( } const newTxState = computePayMerchantTransactionState(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.notify({ @@ -1952,11 +1936,9 @@ export async function processPurchase( ws: InternalWalletState, proposalId: string, ): Promise<TaskRunResult> { - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!purchase) { return { type: TaskRunResultType.Error, @@ -2013,11 +1995,9 @@ async function processPurchasePay( ws: InternalWalletState, proposalId: string, ): Promise<TaskRunResult> { - const purchase = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { - return tx.purchases.get(proposalId); - }); + const purchase = await ws.db.runReadOnlyTx(["purchases"], async (tx) => { + return tx.purchases.get(proposalId); + }); if (!purchase) { return { type: TaskRunResultType.Error, @@ -2051,9 +2031,9 @@ async function processPurchasePay( const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); if (paid) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2064,7 +2044,8 @@ async function processPurchasePay( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Payment, proposalId, @@ -2213,9 +2194,9 @@ export async function refuseProposal( tag: TransactionType.Payment, proposalId, }); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const proposal = await tx.purchases.get(proposalId); if (!proposal) { logger.trace(`proposal ${proposalId} not found, won't refuse proposal`); @@ -2232,7 +2213,8 @@ export async function refuseProposal( const newTxState = computePayMerchantTransactionState(proposal); await tx.purchases.put(proposal); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -2476,36 +2458,34 @@ export async function sharePayment( merchantBaseUrl: string, orderId: string, ): Promise<SharePaymentResult> { - const result = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { - const p = await tx.purchases.indexes.byUrlAndOrderId.get([ - merchantBaseUrl, - orderId, - ]); - if (!p) { - logger.warn("purchase does not exist anymore"); - return undefined; - } - if ( - p.purchaseStatus !== PurchaseStatus.DialogProposed && - p.purchaseStatus !== PurchaseStatus.DialogShared - ) { - // FIXME: purchase can be shared before being paid - return undefined; - } - if (p.purchaseStatus === PurchaseStatus.DialogProposed) { - p.purchaseStatus = PurchaseStatus.DialogShared; - p.shared = true; - tx.purchases.put(p); - } + const result = await ws.db.runReadWriteTx(["purchases"], async (tx) => { + const p = await tx.purchases.indexes.byUrlAndOrderId.get([ + merchantBaseUrl, + orderId, + ]); + if (!p) { + logger.warn("purchase does not exist anymore"); + return undefined; + } + if ( + p.purchaseStatus !== PurchaseStatus.DialogProposed && + p.purchaseStatus !== PurchaseStatus.DialogShared + ) { + // FIXME: purchase can be shared before being paid + return undefined; + } + if (p.purchaseStatus === PurchaseStatus.DialogProposed) { + p.purchaseStatus = PurchaseStatus.DialogShared; + p.shared = true; + tx.purchases.put(p); + } - return { - nonce: p.noncePriv, - session: p.lastSessionId ?? p.downloadSessionId, - token: p.claimToken, - }; - }); + return { + nonce: p.noncePriv, + session: p.lastSessionId ?? p.downloadSessionId, + token: p.claimToken, + }; + }); if (result === undefined) { throw Error("This purchase can't be shared"); @@ -2560,9 +2540,9 @@ async function processPurchaseDialogShared( const paid = await checkIfOrderIsAlreadyPaid(ws, download.contractData); if (paid) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2573,7 +2553,8 @@ async function processPurchaseDialogShared( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); const transactionId = constructTransactionIdentifier({ tag: TransactionType.Payment, proposalId, @@ -2612,9 +2593,9 @@ async function processPurchaseAutoRefund( ), ) ) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2629,7 +2610,8 @@ async function processPurchaseAutoRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.finished(); } @@ -2656,9 +2638,9 @@ async function processPurchaseAutoRefund( ); if (orderStatus.refund_pending) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2672,7 +2654,8 @@ async function processPurchaseAutoRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); } @@ -2699,22 +2682,18 @@ async function processPurchaseAbortingRefund( throw Error("can't abort, no coins selected"); } - await ws.db - .mktx((x) => [x.coins]) - .runReadOnly(async (tx) => { - for (let i = 0; i < payCoinSelection.coinPubs.length; i++) { - const coinPub = payCoinSelection.coinPubs[i]; - const coin = await tx.coins.get(coinPub); - checkDbInvariant(!!coin, "expected coin to be present"); - abortingCoins.push({ - coin_pub: coinPub, - contribution: Amounts.stringify( - payCoinSelection.coinContributions[i], - ), - exchange_url: coin.exchangeBaseUrl, - }); - } - }); + await ws.db.runReadOnlyTx(["coins"], async (tx) => { + for (let i = 0; i < payCoinSelection.coinPubs.length; i++) { + const coinPub = payCoinSelection.coinPubs[i]; + const coin = await tx.coins.get(coinPub); + checkDbInvariant(!!coin, "expected coin to be present"); + abortingCoins.push({ + coin_pub: coinPub, + contribution: Amounts.stringify(payCoinSelection.coinContributions[i]), + exchange_url: coin.exchangeBaseUrl, + }); + } + }); const abortReq: AbortRequest = { h_contract: download.contractData.contractTermsHash, @@ -2805,9 +2784,9 @@ async function processPurchaseQueryRefund( }); if (!orderStatus.refund_pending) { - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2822,7 +2801,8 @@ async function processPurchaseQueryRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.progress(); } else { @@ -2831,9 +2811,9 @@ async function processPurchaseQueryRefund( Amounts.parseOrThrow(orderStatus.refund_taken), ).amount; - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(purchase.proposalId); if (!p) { logger.warn("purchase does not exist anymore"); @@ -2848,7 +2828,8 @@ async function processPurchaseQueryRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); return TaskRunResult.progress(); } @@ -2897,14 +2878,15 @@ export async function startRefundQueryForUri( if (parsedUri.type !== TalerUriAction.Refund) { throw Error("expected taler://refund URI"); } - const purchaseRecord = await ws.db - .mktx((x) => [x.purchases]) - .runReadOnly(async (tx) => { + const purchaseRecord = await ws.db.runReadOnlyTx( + ["purchases"], + async (tx) => { return tx.purchases.indexes.byUrlAndOrderId.get([ parsedUri.merchantBaseUrl, parsedUri.orderId, ]); - }); + }, + ); if (!purchaseRecord) { logger.error( `no purchase for order ID "${parsedUri.orderId}" from merchant "${parsedUri.merchantBaseUrl}" when processing "${talerUri}"`, @@ -2927,9 +2909,9 @@ export async function startQueryRefund( proposalId: string, ): Promise<void> { const ctx = new PayMerchantTransactionContext(ws, proposalId); - const transitionInfo = await ws.db - .mktx((x) => [x.purchases]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["purchases"], + async (tx) => { const p = await tx.purchases.get(proposalId); if (!p) { logger.warn(`purchase ${proposalId} does not exist anymore`); @@ -2943,14 +2925,15 @@ export async function startQueryRefund( const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, ctx.transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(ctx.taskId); } async function computeRefreshRequest( ws: InternalWalletState, - tx: WalletDbReadWriteTransactionArr<["coins", "denominations"]>, + tx: WalletDbReadWriteTransaction<["coins", "denominations"]>, items: RefundItemRecord[], ): Promise<CoinRefreshRequest[]> { const refreshCoins: CoinRefreshRequest[] = []; |