From 5c26461247040c07c86291babf0c87631df638b5 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 9 Jun 2021 15:14:17 +0200 Subject: database access refactor --- packages/taler-wallet-core/src/operations/pay.ts | 830 +++++++++++++---------- 1 file changed, 462 insertions(+), 368 deletions(-) (limited to 'packages/taler-wallet-core/src/operations/pay.ts') diff --git a/packages/taler-wallet-core/src/operations/pay.ts b/packages/taler-wallet-core/src/operations/pay.ts index 0b1b30f68..c57243b59 100644 --- a/packages/taler-wallet-core/src/operations/pay.ts +++ b/packages/taler-wallet-core/src/operations/pay.ts @@ -72,9 +72,7 @@ import { readSuccessResponseJsonOrErrorCode, readSuccessResponseJsonOrThrow, readTalerErrorResponse, - Stores, throwUnexpectedRequestError, - TransactionHandle, URL, WalletContractData, } from "../index.js"; @@ -85,7 +83,7 @@ import { selectPayCoins, PreviousPayCoins, } from "../util/coinSelection.js"; -import { canonicalJson, j2s } from "@gnu-taler/taler-util"; +import { j2s } from "@gnu-taler/taler-util"; import { initRetryInfo, updateRetryInfoTimeout, @@ -95,6 +93,10 @@ import { getTotalRefreshCost, createRefreshGroup } from "./refresh.js"; import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state.js"; import { ContractTermsUtil } from "../util/contractTerms.js"; import { getExchangeDetails } from "./exchanges.js"; +import { DbAccess, GetReadWriteAccess } from "../util/query.js"; +import { WalletStoresV1 } from "../db.js"; +import { Wallet } from "../wallet.js"; +import { x25519_edwards_keyPair_fromSecretKey } from "../crypto/primitives/nacl-fast.js"; /** * Logger. @@ -112,34 +114,35 @@ export async function getTotalPaymentCost( ws: InternalWalletState, pcs: PayCoinSelection, ): Promise { - const costs = []; - for (let i = 0; i < pcs.coinPubs.length; i++) { - const coin = await ws.db.get(Stores.coins, pcs.coinPubs[i]); - if (!coin) { - throw Error("can't calculate payment cost, coin not found"); - } - const denom = await ws.db.get(Stores.denominations, [ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - throw Error( - "can't calculate payment cost, denomination for coin not found", - ); - } - const allDenoms = await ws.db - .iterIndex( - Stores.denominations.exchangeBaseUrlIndex, - coin.exchangeBaseUrl, - ) - .toArray(); - const amountLeft = Amounts.sub(denom.value, pcs.coinContributions[i]) - .amount; - const refreshCost = getTotalRefreshCost(allDenoms, denom, amountLeft); - costs.push(pcs.coinContributions[i]); - costs.push(refreshCost); - } - return Amounts.sum(costs).amount; + return ws.db + .mktx((x) => ({ coins: x.coins, denominations: x.denominations })) + .runReadOnly(async (tx) => { + const costs = []; + for (let i = 0; i < pcs.coinPubs.length; i++) { + const coin = await tx.coins.get(pcs.coinPubs[i]); + if (!coin) { + throw Error("can't calculate payment cost, coin not found"); + } + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + throw Error( + "can't calculate payment cost, denomination for coin not found", + ); + } + const allDenoms = await tx.denominations.indexes.byExchangeBaseUrl + .iter() + .toArray(); + const amountLeft = Amounts.sub(denom.value, pcs.coinContributions[i]) + .amount; + const refreshCost = getTotalRefreshCost(allDenoms, denom, amountLeft); + costs.push(pcs.coinContributions[i]); + costs.push(refreshCost); + } + return Amounts.sum(costs).amount; + }); } /** @@ -154,39 +157,48 @@ export async function getEffectiveDepositAmount( const amt: AmountJson[] = []; const fees: AmountJson[] = []; const exchangeSet: Set = new Set(); - for (let i = 0; i < pcs.coinPubs.length; i++) { - const coin = await ws.db.get(Stores.coins, pcs.coinPubs[i]); - if (!coin) { - throw Error("can't calculate deposit amountt, coin not found"); - } - const denom = await ws.db.get(Stores.denominations, [ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - throw Error("can't find denomination to calculate deposit amount"); - } - amt.push(pcs.coinContributions[i]); - fees.push(denom.feeDeposit); - exchangeSet.add(coin.exchangeBaseUrl); - } - for (const exchangeUrl of exchangeSet.values()) { - const exchangeDetails = await ws.db.runWithReadTransaction( - [Stores.exchanges, Stores.exchangeDetails], - async (tx) => { - return getExchangeDetails(tx, exchangeUrl); - }, - ); - if (!exchangeDetails) { - continue; - } - const fee = exchangeDetails.wireInfo.feesForType[wireType].find((x) => { - return timestampIsBetween(getTimestampNow(), x.startStamp, x.endStamp); - })?.wireFee; - if (fee) { - fees.push(fee); - } - } + + await ws.db + .mktx((x) => ({ + coins: x.coins, + denominations: x.denominations, + exchanges: x.exchanges, + exchangeDetails: x.exchangeDetails, + })) + .runReadOnly(async (tx) => { + for (let i = 0; i < pcs.coinPubs.length; i++) { + const coin = await tx.coins.get(pcs.coinPubs[i]); + if (!coin) { + throw Error("can't calculate deposit amountt, coin not found"); + } + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + throw Error("can't find denomination to calculate deposit amount"); + } + amt.push(pcs.coinContributions[i]); + fees.push(denom.feeDeposit); + exchangeSet.add(coin.exchangeBaseUrl); + } + for (const exchangeUrl of exchangeSet.values()) { + const exchangeDetails = await getExchangeDetails(tx, exchangeUrl); + if (!exchangeDetails) { + continue; + } + const fee = exchangeDetails.wireInfo.feesForType[wireType].find((x) => { + return timestampIsBetween( + getTimestampNow(), + x.startStamp, + x.endStamp, + ); + })?.wireFee; + if (fee) { + fees.push(fee); + } + } + }); return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount; } @@ -243,105 +255,112 @@ export async function getCandidatePayCoins( const candidateCoins: AvailableCoinInfo[] = []; const wireFeesPerExchange: Record = {}; - const exchanges = await ws.db.iter(Stores.exchanges).toArray(); - for (const exchange of exchanges) { - let isOkay = false; - const exchangeDetails = await ws.db.runWithReadTransaction( - [Stores.exchanges, Stores.exchangeDetails], - async (tx) => { - return getExchangeDetails(tx, exchange.baseUrl); - }, - ); - if (!exchangeDetails) { - continue; - } - const exchangeFees = exchangeDetails.wireInfo; - if (!exchangeFees) { - continue; - } - - // is the exchange explicitly allowed? - for (const allowedExchange of req.allowedExchanges) { - if (allowedExchange.exchangePub === exchangeDetails.masterPublicKey) { - isOkay = true; - break; - } - } + await ws.db + .mktx((x) => ({ + exchanges: x.exchanges, + exchangeDetails: x.exchangeDetails, + denominations: x.denominations, + coins: x.coins, + })) + .runReadOnly(async (tx) => { + const exchanges = await tx.exchanges.iter().toArray(); + for (const exchange of exchanges) { + let isOkay = false; + const exchangeDetails = await getExchangeDetails(tx, exchange.baseUrl); + if (!exchangeDetails) { + continue; + } + const exchangeFees = exchangeDetails.wireInfo; + if (!exchangeFees) { + continue; + } - // is the exchange allowed because of one of its auditors? - if (!isOkay) { - for (const allowedAuditor of req.allowedAuditors) { - for (const auditor of exchangeDetails.auditors) { - if (auditor.auditor_pub === allowedAuditor.auditorPub) { + // is the exchange explicitly allowed? + for (const allowedExchange of req.allowedExchanges) { + if (allowedExchange.exchangePub === exchangeDetails.masterPublicKey) { isOkay = true; break; } } - if (isOkay) { - break; + + // is the exchange allowed because of one of its auditors? + if (!isOkay) { + for (const allowedAuditor of req.allowedAuditors) { + for (const auditor of exchangeDetails.auditors) { + if (auditor.auditor_pub === allowedAuditor.auditorPub) { + isOkay = true; + break; + } + } + if (isOkay) { + break; + } + } } - } - } - if (!isOkay) { - continue; - } + if (!isOkay) { + continue; + } - const coins = await ws.db - .iterIndex(Stores.coins.exchangeBaseUrlIndex, exchange.baseUrl) - .toArray(); + const coins = await tx.coins.indexes.byBaseUrl + .iter(exchange.baseUrl) + .toArray(); - if (!coins || coins.length === 0) { - continue; - } + if (!coins || coins.length === 0) { + continue; + } - // Denomination of the first coin, we assume that all other - // coins have the same currency - const firstDenom = await ws.db.get(Stores.denominations, [ - exchange.baseUrl, - coins[0].denomPubHash, - ]); - if (!firstDenom) { - throw Error("db inconsistent"); - } - const currency = firstDenom.value.currency; - for (const coin of coins) { - const denom = await ws.db.get(Stores.denominations, [ - exchange.baseUrl, - coin.denomPubHash, - ]); - if (!denom) { - throw Error("db inconsistent"); - } - if (denom.value.currency !== currency) { - logger.warn( - `same pubkey for different currencies at exchange ${exchange.baseUrl}`, - ); - continue; - } - if (!isSpendableCoin(coin, denom)) { - continue; - } - candidateCoins.push({ - availableAmount: coin.currentAmount, - coinPub: coin.coinPub, - denomPub: coin.denomPub, - feeDeposit: denom.feeDeposit, - exchangeBaseUrl: denom.exchangeBaseUrl, - }); - } + // Denomination of the first coin, we assume that all other + // coins have the same currency + const firstDenom = await tx.denominations.get([ + exchange.baseUrl, + coins[0].denomPubHash, + ]); + if (!firstDenom) { + throw Error("db inconsistent"); + } + const currency = firstDenom.value.currency; + for (const coin of coins) { + const denom = await tx.denominations.get([ + exchange.baseUrl, + coin.denomPubHash, + ]); + if (!denom) { + throw Error("db inconsistent"); + } + if (denom.value.currency !== currency) { + logger.warn( + `same pubkey for different currencies at exchange ${exchange.baseUrl}`, + ); + continue; + } + if (!isSpendableCoin(coin, denom)) { + continue; + } + candidateCoins.push({ + availableAmount: coin.currentAmount, + coinPub: coin.coinPub, + denomPub: coin.denomPub, + feeDeposit: denom.feeDeposit, + exchangeBaseUrl: denom.exchangeBaseUrl, + }); + } - let wireFee: AmountJson | undefined; - for (const fee of exchangeFees.feesForType[req.wireMethod] || []) { - if (fee.startStamp <= req.timestamp && fee.endStamp >= req.timestamp) { - wireFee = fee.wireFee; - break; + let wireFee: AmountJson | undefined; + for (const fee of exchangeFees.feesForType[req.wireMethod] || []) { + if ( + fee.startStamp <= req.timestamp && + fee.endStamp >= req.timestamp + ) { + wireFee = fee.wireFee; + break; + } + } + if (wireFee) { + wireFeesPerExchange[exchange.baseUrl] = wireFee; + } } - } - if (wireFee) { - wireFeesPerExchange[exchange.baseUrl] = wireFee; - } - } + }); return { candidateCoins, @@ -351,15 +370,15 @@ export async function getCandidatePayCoins( export async function applyCoinSpend( ws: InternalWalletState, - tx: TransactionHandle< - | typeof Stores.coins - | typeof Stores.refreshGroups - | typeof Stores.denominations - >, + tx: GetReadWriteAccess<{ + coins: typeof WalletStoresV1.coins; + refreshGroups: typeof WalletStoresV1.refreshGroups; + denominations: typeof WalletStoresV1.denominations; + }>, coinSelection: PayCoinSelection, ) { for (let i = 0; i < coinSelection.coinPubs.length; i++) { - const coin = await tx.get(Stores.coins, coinSelection.coinPubs[i]); + const coin = await tx.coins.get(coinSelection.coinPubs[i]); if (!coin) { throw Error("coin allocated for payment doesn't exist anymore"); } @@ -379,7 +398,7 @@ export async function applyCoinSpend( throw Error("not enough remaining balance on coin for payment"); } coin.currentAmount = remaining.amount; - await tx.put(Stores.coins, coin); + await tx.coins.put(coin); } const refreshCoinPubs = coinSelection.coinPubs.map((x) => ({ coinPub: x, @@ -437,26 +456,25 @@ async function recordConfirmPay( noncePub: proposal.noncePub, }; - await ws.db.runWithWriteTransaction( - [ - Stores.coins, - Stores.purchases, - Stores.proposals, - Stores.refreshGroups, - Stores.denominations, - ], - async (tx) => { - const p = await tx.get(Stores.proposals, proposal.proposalId); + await ws.db + .mktx((x) => ({ + proposals: x.proposals, + purchases: x.purchases, + coins: x.coins, + refreshGroups: x.refreshGroups, + denominations: x.denominations, + })) + .runReadWrite(async (tx) => { + const p = await tx.proposals.get(proposal.proposalId); if (p) { p.proposalStatus = ProposalStatus.ACCEPTED; p.lastError = undefined; p.retryInfo = initRetryInfo(false); - await tx.put(Stores.proposals, p); + await tx.proposals.put(p); } - await tx.put(Stores.purchases, t); + await tx.purchases.put(t); await applyCoinSpend(ws, tx, coinSelection); - }, - ); + }); ws.notify({ type: NotificationType.ProposalAccepted, @@ -470,19 +488,21 @@ async function incrementProposalRetry( proposalId: string, err: TalerErrorDetails | undefined, ): Promise { - await ws.db.runWithWriteTransaction([Stores.proposals], async (tx) => { - const pr = await tx.get(Stores.proposals, proposalId); - if (!pr) { - return; - } - if (!pr.retryInfo) { - return; - } - pr.retryInfo.retryCounter++; - updateRetryInfoTimeout(pr.retryInfo); - pr.lastError = err; - await tx.put(Stores.proposals, pr); - }); + await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadWrite(async (tx) => { + const pr = await tx.proposals.get(proposalId); + if (!pr) { + return; + } + if (!pr.retryInfo) { + return; + } + pr.retryInfo.retryCounter++; + updateRetryInfoTimeout(pr.retryInfo); + pr.lastError = err; + await tx.proposals.put(pr); + }); if (err) { ws.notify({ type: NotificationType.ProposalOperationError, error: err }); } @@ -494,19 +514,21 @@ async function incrementPurchasePayRetry( err: TalerErrorDetails | undefined, ): Promise { logger.warn("incrementing purchase pay retry with error", err); - await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => { - const pr = await tx.get(Stores.purchases, proposalId); - if (!pr) { - return; - } - if (!pr.payRetryInfo) { - return; - } - pr.payRetryInfo.retryCounter++; - updateRetryInfoTimeout(pr.payRetryInfo); - pr.lastPayError = err; - await tx.put(Stores.purchases, pr); - }); + await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadWrite(async (tx) => { + const pr = await tx.purchases.get(proposalId); + if (!pr) { + return; + } + if (!pr.payRetryInfo) { + return; + } + pr.payRetryInfo.retryCounter++; + updateRetryInfoTimeout(pr.payRetryInfo); + pr.lastPayError = err; + await tx.purchases.put(pr); + }); if (err) { ws.notify({ type: NotificationType.PayOperationError, error: err }); } @@ -529,12 +551,15 @@ async function resetDownloadProposalRetry( ws: InternalWalletState, proposalId: string, ): Promise { - await ws.db.mutate(Stores.proposals, proposalId, (x) => { - if (x.retryInfo.active) { - x.retryInfo = initRetryInfo(); - } - return x; - }); + await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadWrite(async (tx) => { + const p = await tx.proposals.get(proposalId); + if (p && p.retryInfo.active) { + p.retryInfo = initRetryInfo(); + await tx.proposals.put(p); + } + }); } async function failProposalPermanently( @@ -542,12 +567,18 @@ async function failProposalPermanently( proposalId: string, err: TalerErrorDetails, ): Promise { - await ws.db.mutate(Stores.proposals, proposalId, (x) => { - x.retryInfo.active = false; - x.lastError = err; - x.proposalStatus = ProposalStatus.PERMANENTLY_FAILED; - return x; - }); + await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadWrite(async (tx) => { + const p = await tx.proposals.get(proposalId); + if (!p) { + return; + } + p.retryInfo.active = false; + p.lastError = err; + p.proposalStatus = ProposalStatus.PERMANENTLY_FAILED; + await tx.proposals.put(p); + }); } function getProposalRequestTimeout(proposal: ProposalRecord): Duration { @@ -616,7 +647,11 @@ async function processDownloadProposalImpl( if (forceNow) { await resetDownloadProposalRetry(ws, proposalId); } - const proposal = await ws.db.get(Stores.proposals, proposalId); + const proposal = await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadOnly(async (tx) => { + return tx.proposals.get(proposalId); + }); if (!proposal) { return; } @@ -750,10 +785,10 @@ async function processDownloadProposalImpl( proposalResp.sig, ); - await ws.db.runWithWriteTransaction( - [Stores.proposals, Stores.purchases], - async (tx) => { - const p = await tx.get(Stores.proposals, proposalId); + await ws.db + .mktx((x) => ({ proposals: x.proposals, purchases: x.purchases })) + .runReadWrite(async (tx) => { + const p = await tx.proposals.get(proposalId); if (!p) { return; } @@ -769,22 +804,20 @@ async function processDownloadProposalImpl( (fulfillmentUrl.startsWith("http://") || fulfillmentUrl.startsWith("https://")) ) { - const differentPurchase = await tx.getIndexed( - Stores.purchases.fulfillmentUrlIndex, + const differentPurchase = await tx.purchases.indexes.byFulfillmentUrl.get( fulfillmentUrl, ); if (differentPurchase) { logger.warn("repurchase detected"); p.proposalStatus = ProposalStatus.REPURCHASE; p.repurchaseProposalId = differentPurchase.proposalId; - await tx.put(Stores.proposals, p); + await tx.proposals.put(p); return; } } p.proposalStatus = ProposalStatus.PROPOSED; - await tx.put(Stores.proposals, p); - }, - ); + await tx.proposals.put(p); + }); ws.notify({ type: NotificationType.ProposalDownloaded, @@ -806,10 +839,14 @@ async function startDownloadProposal( sessionId: string | undefined, claimToken: string | undefined, ): Promise { - const oldProposal = await ws.db.getIndexed( - Stores.proposals.urlAndOrderIdIndex, - [merchantBaseUrl, orderId], - ); + const oldProposal = await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadOnly(async (tx) => { + return tx.proposals.indexes.byUrlAndOrderId.get([ + merchantBaseUrl, + orderId, + ]); + }); if (oldProposal) { await processDownloadProposal(ws, oldProposal.proposalId); return oldProposal.proposalId; @@ -834,17 +871,19 @@ async function startDownloadProposal( downloadSessionId: sessionId, }; - await ws.db.runWithWriteTransaction([Stores.proposals], async (tx) => { - const existingRecord = await tx.getIndexed( - Stores.proposals.urlAndOrderIdIndex, - [merchantBaseUrl, orderId], - ); - if (existingRecord) { - // Created concurrently - return; - } - await tx.put(Stores.proposals, proposalRecord); - }); + await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadWrite(async (tx) => { + const existingRecord = tx.proposals.indexes.byUrlAndOrderId.get([ + merchantBaseUrl, + orderId, + ]); + if (existingRecord) { + // Created concurrently + return; + } + await tx.proposals.put(proposalRecord); + }); await processDownloadProposal(ws, proposalId); return proposalId; @@ -857,37 +896,38 @@ async function storeFirstPaySuccess( paySig: string, ): Promise { const now = getTimestampNow(); - await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => { - const purchase = await tx.get(Stores.purchases, proposalId); + await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadWrite(async (tx) => { + const purchase = await tx.purchases.get(proposalId); - if (!purchase) { - logger.warn("purchase does not exist anymore"); - return; - } - const isFirst = purchase.timestampFirstSuccessfulPay === undefined; - if (!isFirst) { - logger.warn("payment success already stored"); - return; - } - purchase.timestampFirstSuccessfulPay = now; - purchase.paymentSubmitPending = false; - purchase.lastPayError = undefined; - purchase.lastSessionId = sessionId; - purchase.payRetryInfo = initRetryInfo(false); - purchase.merchantPaySig = paySig; - if (isFirst) { - const ar = purchase.download.contractData.autoRefund; - if (ar) { - logger.info("auto_refund present"); - purchase.refundQueryRequested = true; - purchase.refundStatusRetryInfo = initRetryInfo(); - purchase.lastRefundStatusError = undefined; - purchase.autoRefundDeadline = timestampAddDuration(now, ar); + if (!purchase) { + logger.warn("purchase does not exist anymore"); + return; } - } - - await tx.put(Stores.purchases, purchase); - }); + const isFirst = purchase.timestampFirstSuccessfulPay === undefined; + if (!isFirst) { + logger.warn("payment success already stored"); + return; + } + purchase.timestampFirstSuccessfulPay = now; + purchase.paymentSubmitPending = false; + purchase.lastPayError = undefined; + purchase.lastSessionId = sessionId; + purchase.payRetryInfo = initRetryInfo(false); + purchase.merchantPaySig = paySig; + if (isFirst) { + const ar = purchase.download.contractData.autoRefund; + if (ar) { + logger.info("auto_refund present"); + purchase.refundQueryRequested = true; + purchase.refundStatusRetryInfo = initRetryInfo(); + purchase.lastRefundStatusError = undefined; + purchase.autoRefundDeadline = timestampAddDuration(now, ar); + } + } + await tx.purchases.put(purchase); + }); } async function storePayReplaySuccess( @@ -895,23 +935,25 @@ async function storePayReplaySuccess( proposalId: string, sessionId: string | undefined, ): Promise { - await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => { - const purchase = await tx.get(Stores.purchases, proposalId); + await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadWrite(async (tx) => { + const purchase = await tx.purchases.get(proposalId); - if (!purchase) { - logger.warn("purchase does not exist anymore"); - return; - } - const isFirst = purchase.timestampFirstSuccessfulPay === undefined; - if (isFirst) { - throw Error("invalid payment state"); - } - purchase.paymentSubmitPending = false; - purchase.lastPayError = undefined; - purchase.payRetryInfo = initRetryInfo(false); - purchase.lastSessionId = sessionId; - await tx.put(Stores.purchases, purchase); - }); + if (!purchase) { + logger.warn("purchase does not exist anymore"); + return; + } + const isFirst = purchase.timestampFirstSuccessfulPay === undefined; + if (isFirst) { + throw Error("invalid payment state"); + } + purchase.paymentSubmitPending = false; + purchase.lastPayError = undefined; + purchase.payRetryInfo = initRetryInfo(false); + purchase.lastSessionId = sessionId; + await tx.purchases.put(purchase); + }); } /** @@ -929,7 +971,11 @@ async function handleInsufficientFunds( ): Promise { logger.trace("handling insufficient funds, trying to re-select coins"); - const proposal = await ws.db.get(Stores.purchases, proposalId); + const proposal = await ws.db + .mktx((x) => ({ purchaes: x.purchases })) + .runReadOnly(async (tx) => { + return tx.purchaes.get(proposalId); + }); if (!proposal) { return; } @@ -961,30 +1007,34 @@ async function handleInsufficientFunds( const prevPayCoins: PreviousPayCoins = []; - for (let i = 0; i < proposal.payCoinSelection.coinPubs.length; i++) { - const coinPub = proposal.payCoinSelection.coinPubs[i]; - if (coinPub === brokenCoinPub) { - continue; - } - const contrib = proposal.payCoinSelection.coinContributions[i]; - const coin = await ws.db.get(Stores.coins, coinPub); - if (!coin) { - continue; - } - const denom = await ws.db.get(Stores.denominations, [ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - continue; - } - prevPayCoins.push({ - coinPub, - contribution: contrib, - exchangeBaseUrl: coin.exchangeBaseUrl, - feeDeposit: denom.feeDeposit, + await ws.db + .mktx((x) => ({ coins: x.coins, denominations: x.denominations })) + .runReadOnly(async (tx) => { + for (let i = 0; i < proposal.payCoinSelection.coinPubs.length; i++) { + const coinPub = proposal.payCoinSelection.coinPubs[i]; + if (coinPub === brokenCoinPub) { + continue; + } + const contrib = proposal.payCoinSelection.coinContributions[i]; + const coin = await tx.coins.get(coinPub); + if (!coin) { + continue; + } + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + continue; + } + prevPayCoins.push({ + coinPub, + contribution: contrib, + exchangeBaseUrl: coin.exchangeBaseUrl, + feeDeposit: denom.feeDeposit, + }); + } }); - } const res = selectPayCoins({ candidates, @@ -1002,24 +1052,23 @@ async function handleInsufficientFunds( logger.trace("re-selected coins"); - await ws.db.runWithWriteTransaction( - [ - Stores.purchases, - Stores.coins, - Stores.denominations, - Stores.refreshGroups, - ], - async (tx) => { - const p = await tx.get(Stores.purchases, proposalId); + await ws.db + .mktx((x) => ({ + purchases: x.purchases, + coins: x.coins, + denominations: x.denominations, + refreshGroups: x.refreshGroups, + })) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(proposalId); if (!p) { return; } p.payCoinSelection = res; p.coinDepositPermissions = undefined; - await tx.put(Stores.purchases, p); + await tx.purchases.put(p); await applyCoinSpend(ws, tx, res); - }, - ); + }); } /** @@ -1032,7 +1081,11 @@ async function submitPay( ws: InternalWalletState, proposalId: string, ): Promise { - const purchase = await ws.db.get(Stores.purchases, proposalId); + const purchase = await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadOnly(async (tx) => { + return tx.purchases.get(proposalId); + }); if (!purchase) { throw Error("Purchase not found: " + proposalId); } @@ -1202,7 +1255,11 @@ export async function checkPaymentByProposalId( proposalId: string, sessionId?: string, ): Promise { - let proposal = await ws.db.get(Stores.proposals, proposalId); + let proposal = await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadOnly(async (tx) => { + return tx.proposals.get(proposalId); + }); if (!proposal) { throw Error(`could not get proposal ${proposalId}`); } @@ -1212,7 +1269,11 @@ export async function checkPaymentByProposalId( throw Error("invalid proposal state"); } logger.trace("using existing purchase for same product"); - proposal = await ws.db.get(Stores.proposals, existingProposalId); + proposal = await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadOnly(async (tx) => { + return tx.proposals.get(existingProposalId); + }); if (!proposal) { throw Error("existing proposal is in wrong state"); } @@ -1231,7 +1292,11 @@ export async function checkPaymentByProposalId( proposalId = proposal.proposalId; // First check if we already paid for it. - const purchase = await ws.db.get(Stores.purchases, proposalId); + const purchase = await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadOnly(async (tx) => { + return tx.purchases.get(proposalId); + }); if (!purchase) { // If not already paid, check if we could pay for it. @@ -1281,14 +1346,16 @@ export async function checkPaymentByProposalId( logger.trace( "automatically re-submitting payment with different session ID", ); - await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => { - const p = await tx.get(Stores.purchases, proposalId); - if (!p) { - return; - } - p.lastSessionId = sessionId; - await tx.put(Stores.purchases, p); - }); + await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(proposalId); + if (!p) { + return; + } + p.lastSessionId = sessionId; + await tx.purchases.put(p); + }); const r = await guardOperationException( () => submitPay(ws, proposalId), (e: TalerErrorDetails): Promise => @@ -1375,20 +1442,33 @@ export async function generateDepositPermissions( contractData: WalletContractData, ): Promise { const depositPermissions: CoinDepositPermission[] = []; + const coinWithDenom: Array<{ + coin: CoinRecord; + denom: DenominationRecord; + }> = []; + await ws.db + .mktx((x) => ({ coins: x.coins, denominations: x.denominations })) + .runReadOnly(async (tx) => { + for (let i = 0; i < payCoinSel.coinPubs.length; i++) { + const coin = await tx.coins.get(payCoinSel.coinPubs[i]); + if (!coin) { + throw Error("can't pay, allocated coin not found anymore"); + } + const denom = await tx.denominations.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + throw Error( + "can't pay, denomination of allocated coin not found anymore", + ); + } + coinWithDenom.push({ coin, denom }); + } + }); + for (let i = 0; i < payCoinSel.coinPubs.length; i++) { - const coin = await ws.db.get(Stores.coins, payCoinSel.coinPubs[i]); - if (!coin) { - throw Error("can't pay, allocated coin not found anymore"); - } - const denom = await ws.db.get(Stores.denominations, [ - coin.exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - throw Error( - "can't pay, denomination of allocated coin not found anymore", - ); - } + const { coin, denom } = coinWithDenom[i]; const dp = await ws.cryptoApi.signDepositPermission({ coinPriv: coin.coinPriv, coinPub: coin.coinPub, @@ -1419,7 +1499,11 @@ export async function confirmPay( logger.trace( `executing confirmPay with proposalId ${proposalId} and sessionIdOverride ${sessionIdOverride}`, ); - const proposal = await ws.db.get(Stores.proposals, proposalId); + const proposal = await ws.db + .mktx((x) => ({ proposals: x.proposals })) + .runReadOnly(async (tx) => { + return tx.proposals.get(proposalId); + }); if (!proposal) { throw Error(`proposal with id ${proposalId} not found`); @@ -1430,20 +1514,24 @@ export async function confirmPay( throw Error("proposal is in invalid state"); } - let purchase = await ws.db.get(Stores.purchases, proposalId); + const existingPurchase = await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadWrite(async (tx) => { + const purchase = await tx.purchases.get(proposalId); + if ( + purchase && + sessionIdOverride !== undefined && + sessionIdOverride != purchase.lastSessionId + ) { + logger.trace(`changing session ID to ${sessionIdOverride}`); + purchase.lastSessionId = sessionIdOverride; + purchase.paymentSubmitPending = true; + await tx.purchases.put(purchase); + } + return purchase; + }); - if (purchase) { - if ( - sessionIdOverride !== undefined && - sessionIdOverride != purchase.lastSessionId - ) { - logger.trace(`changing session ID to ${sessionIdOverride}`); - await ws.db.mutate(Stores.purchases, purchase.proposalId, (x) => { - x.lastSessionId = sessionIdOverride; - x.paymentSubmitPending = true; - return x; - }); - } + if (existingPurchase) { logger.trace("confirmPay: submitting payment for existing purchase"); return await guardOperationException( () => submitPay(ws, proposalId), @@ -1491,7 +1579,7 @@ export async function confirmPay( res, d.contractData, ); - purchase = await recordConfirmPay( + await recordConfirmPay( ws, proposal, res, @@ -1523,12 +1611,15 @@ async function resetPurchasePayRetry( ws: InternalWalletState, proposalId: string, ): Promise { - await ws.db.mutate(Stores.purchases, proposalId, (x) => { - if (x.payRetryInfo.active) { - x.payRetryInfo = initRetryInfo(); - } - return x; - }); + await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadWrite(async (tx) => { + const p = await tx.purchases.get(proposalId); + if (p) { + p.payRetryInfo = initRetryInfo(); + await tx.purchases.put(p); + } + }); } async function processPurchasePayImpl( @@ -1539,7 +1630,11 @@ async function processPurchasePayImpl( if (forceNow) { await resetPurchasePayRetry(ws, proposalId); } - const purchase = await ws.db.get(Stores.purchases, proposalId); + const purchase = await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadOnly(async (tx) => { + return tx.purchases.get(proposalId); + }); if (!purchase) { return; } @@ -1554,10 +1649,9 @@ export async function refuseProposal( ws: InternalWalletState, proposalId: string, ): Promise { - const success = await ws.db.runWithWriteTransaction( - [Stores.proposals], + const success = await ws.db.mktx((x) => ({proposals: x.proposals})).runReadWrite( async (tx) => { - const proposal = await tx.get(Stores.proposals, proposalId); + const proposal = await tx.proposals.get(proposalId); if (!proposal) { logger.trace(`proposal ${proposalId} not found, won't refuse proposal`); return false; @@ -1566,7 +1660,7 @@ export async function refuseProposal( return false; } proposal.proposalStatus = ProposalStatus.REFUSED; - await tx.put(Stores.proposals, proposal); + await tx.proposals.put(proposal); return true; }, ); -- cgit v1.2.3