diff options
author | Florian Dold <florian@dold.me> | 2021-06-09 15:14:17 +0200 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2021-06-09 15:24:19 +0200 |
commit | 5c26461247040c07c86291babf0c87631df638b5 (patch) | |
tree | 8ff93454d0c21d2675d6734f210d4e8ff91d2bfb /packages/taler-wallet-core/src/operations/withdraw.ts | |
parent | 68dddc848f2f650d74697bb3a5c05d649e5db3c7 (diff) | |
download | wallet-core-5c26461247040c07c86291babf0c87631df638b5.tar.xz |
database access refactor
Diffstat (limited to 'packages/taler-wallet-core/src/operations/withdraw.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/withdraw.ts | 459 |
1 files changed, 264 insertions, 195 deletions
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 36be84df0..1266a3b0f 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -26,7 +26,6 @@ import { } from "@gnu-taler/taler-util"; import { DenominationRecord, - Stores, DenominationStatus, CoinStatus, CoinRecord, @@ -314,13 +313,17 @@ export async function getCandidateWithdrawalDenoms( exchangeBaseUrl: string, ): Promise<DenominationRecord[]> { return await ws.db - .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchangeBaseUrl) - .filter((d) => { - return ( - (d.status === DenominationStatus.Unverified || - d.status === DenominationStatus.VerifiedGood) && - !d.isRevoked - ); + .mktx((x) => ({ denominations: x.denominations })) + .runReadOnly(async (tx) => { + return tx.denominations.indexes.byExchangeBaseUrl + .iter(exchangeBaseUrl) + .filter((d) => { + return ( + (d.status === DenominationStatus.Unverified || + d.status === DenominationStatus.VerifiedGood) && + !d.isRevoked + ); + }); }); } @@ -336,17 +339,24 @@ async function processPlanchetGenerate( withdrawalGroupId: string, coinIdx: number, ): Promise<void> { - const withdrawalGroup = await ws.db.get( - Stores.withdrawalGroups, - withdrawalGroupId, - ); + const withdrawalGroup = await ws.db + .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) + .runReadOnly(async (tx) => { + return await tx.withdrawalGroups.get(withdrawalGroupId); + }); if (!withdrawalGroup) { return; } - let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [ - withdrawalGroupId, - coinIdx, - ]); + let planchet = await ws.db + .mktx((x) => ({ + planchets: x.planchets, + })) + .runReadOnly(async (tx) => { + return tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroupId, + coinIdx, + ]); + }); if (!planchet) { let ci = 0; let denomPubHash: string | undefined; @@ -365,20 +375,26 @@ async function processPlanchetGenerate( if (!denomPubHash) { throw Error("invariant violated"); } - const denom = await ws.db.get(Stores.denominations, [ - withdrawalGroup.exchangeBaseUrl, - denomPubHash, - ]); - if (!denom) { - throw Error("invariant violated"); - } - const reserve = await ws.db.get( - Stores.reserves, - withdrawalGroup.reservePub, - ); - if (!reserve) { - throw Error("invariant violated"); - } + + const { denom, reserve } = await ws.db + .mktx((x) => ({ + reserves: x.reserves, + denominations: x.denominations, + })) + .runReadOnly(async (tx) => { + const denom = await tx.denominations.get([ + withdrawalGroup.exchangeBaseUrl, + denomPubHash!, + ]); + if (!denom) { + throw Error("invariant violated"); + } + const reserve = await tx.reserves.get(withdrawalGroup.reservePub); + if (!reserve) { + throw Error("invariant violated"); + } + return { denom, reserve }; + }); const r = await ws.cryptoApi.createPlanchet({ denomPub: denom.denomPub, feeWithdraw: denom.feeWithdraw, @@ -405,18 +421,20 @@ async function processPlanchetGenerate( withdrawalGroupId: withdrawalGroupId, lastError: undefined, }; - await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => { - const p = await tx.getIndexed(Stores.planchets.byGroupAndIndex, [ - withdrawalGroupId, - coinIdx, - ]); - if (p) { - planchet = p; - return; - } - await tx.put(Stores.planchets, newPlanchet); - planchet = newPlanchet; - }); + await ws.db + .mktx((x) => ({ planchets: x.planchets })) + .runReadWrite(async (tx) => { + const p = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroupId, + coinIdx, + ]); + if (p) { + planchet = p; + return; + } + await tx.planchets.put(newPlanchet); + planchet = newPlanchet; + }); } } @@ -430,59 +448,70 @@ async function processPlanchetExchangeRequest( withdrawalGroupId: string, coinIdx: number, ): Promise<WithdrawResponse | undefined> { - const withdrawalGroup = await ws.db.get( - Stores.withdrawalGroups, - withdrawalGroupId, - ); - if (!withdrawalGroup) { - return; - } - let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [ - withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - if (planchet.withdrawalDone) { - logger.warn("processPlanchet: planchet already withdrawn"); - return; - } - const exchange = await ws.db.get( - Stores.exchanges, - withdrawalGroup.exchangeBaseUrl, - ); - if (!exchange) { - logger.error("db inconsistent: exchange for planchet not found"); - return; - } + const d = await ws.db + .mktx((x) => ({ + withdrawalGroups: x.withdrawalGroups, + planchets: x.planchets, + exchanges: x.exchanges, + denominations: x.denominations, + })) + .runReadOnly(async (tx) => { + const withdrawalGroup = await tx.withdrawalGroups.get(withdrawalGroupId); + if (!withdrawalGroup) { + return; + } + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + if (planchet.withdrawalDone) { + logger.warn("processPlanchet: planchet already withdrawn"); + return; + } + const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl); + if (!exchange) { + logger.error("db inconsistent: exchange for planchet not found"); + return; + } - const denom = await ws.db.get(Stores.denominations, [ - withdrawalGroup.exchangeBaseUrl, - planchet.denomPubHash, - ]); + const denom = await tx.denominations.get([ + withdrawalGroup.exchangeBaseUrl, + planchet.denomPubHash, + ]); - if (!denom) { - console.error("db inconsistent: denom for planchet not found"); - return; - } + if (!denom) { + console.error("db inconsistent: denom for planchet not found"); + return; + } - logger.trace( - `processing planchet #${coinIdx} in withdrawal ${withdrawalGroupId}`, - ); + logger.trace( + `processing planchet #${coinIdx} in withdrawal ${withdrawalGroupId}`, + ); - const wd: any = {}; - wd.denom_pub_hash = planchet.denomPubHash; - wd.reserve_pub = planchet.reservePub; - wd.reserve_sig = planchet.withdrawSig; - wd.coin_ev = planchet.coinEv; - const reqUrl = new URL( - `reserves/${planchet.reservePub}/withdraw`, - exchange.baseUrl, - ).href; + const reqBody: any = { + denom_pub_hash: planchet.denomPubHash, + reserve_pub: planchet.reservePub, + reserve_sig: planchet.withdrawSig, + coin_ev: planchet.coinEv, + }; + const reqUrl = new URL( + `reserves/${planchet.reservePub}/withdraw`, + exchange.baseUrl, + ).href; + + return { reqUrl, reqBody }; + }); + + if (!d) { + return; + } + const { reqUrl, reqBody } = d; try { - const resp = await ws.http.postJson(reqUrl, wd); + const resp = await ws.http.postJson(reqUrl, reqBody); const r = await readSuccessResponseJsonOrThrow( resp, codecForWithdrawResponse(), @@ -495,17 +524,19 @@ async function processPlanchetExchangeRequest( throw e; } const errDetails = e.operationError; - await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => { - let planchet = await tx.getIndexed(Stores.planchets.byGroupAndIndex, [ - withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - planchet.lastError = errDetails; - await tx.put(Stores.planchets, planchet); - }); + await ws.db + .mktx((x) => ({ planchets: x.planchets })) + .runReadWrite(async (tx) => { + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + planchet.lastError = errDetails; + await tx.planchets.put(planchet); + }); return; } } @@ -516,25 +547,36 @@ async function processPlanchetVerifyAndStoreCoin( coinIdx: number, resp: WithdrawResponse, ): Promise<void> { - const withdrawalGroup = await ws.db.get( - Stores.withdrawalGroups, - withdrawalGroupId, - ); - if (!withdrawalGroup) { - return; - } - let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [ - withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - if (planchet.withdrawalDone) { - logger.warn("processPlanchet: planchet already withdrawn"); + const d = await ws.db + .mktx((x) => ({ + withdrawalGroups: x.withdrawalGroups, + planchets: x.planchets, + })) + .runReadOnly(async (tx) => { + const withdrawalGroup = await tx.withdrawalGroups.get(withdrawalGroupId); + if (!withdrawalGroup) { + return; + } + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + if (planchet.withdrawalDone) { + logger.warn("processPlanchet: planchet already withdrawn"); + return; + } + return { planchet, exchangeBaseUrl: withdrawalGroup.exchangeBaseUrl }; + }); + + if (!d) { return; } + const { planchet, exchangeBaseUrl } = d; + const denomSig = await ws.cryptoApi.rsaUnblind( resp.ev_sig, planchet.blindingKey, @@ -548,21 +590,23 @@ async function processPlanchetVerifyAndStoreCoin( ); if (!isValid) { - await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => { - let planchet = await tx.getIndexed(Stores.planchets.byGroupAndIndex, [ - withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - planchet.lastError = makeErrorDetails( - TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID, - "invalid signature from the exchange after unblinding", - {}, - ); - await tx.put(Stores.planchets, planchet); - }); + await ws.db + .mktx((x) => ({ planchets: x.planchets })) + .runReadWrite(async (tx) => { + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + return; + } + planchet.lastError = makeErrorDetails( + TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID, + "invalid signature from the exchange after unblinding", + {}, + ); + await tx.planchets.put(planchet); + }); return; } @@ -575,7 +619,7 @@ async function processPlanchetVerifyAndStoreCoin( denomPubHash: planchet.denomPubHash, denomSig, coinEvHash: planchet.coinEvHash, - exchangeBaseUrl: withdrawalGroup.exchangeBaseUrl, + exchangeBaseUrl: exchangeBaseUrl, status: CoinStatus.Fresh, coinSource: { type: CoinSourceType.Withdraw, @@ -588,23 +632,27 @@ async function processPlanchetVerifyAndStoreCoin( const planchetCoinPub = planchet.coinPub; - const firstSuccess = await ws.db.runWithWriteTransaction( - [Stores.coins, Stores.withdrawalGroups, Stores.reserves, Stores.planchets], - async (tx) => { - const ws = await tx.get(Stores.withdrawalGroups, withdrawalGroupId); + const firstSuccess = await ws.db + .mktx((x) => ({ + coins: x.coins, + withdrawalGroups: x.withdrawalGroups, + reserves: x.reserves, + planchets: x.planchets, + })) + .runReadWrite(async (tx) => { + const ws = await tx.withdrawalGroups.get(withdrawalGroupId); if (!ws) { return false; } - const p = await tx.get(Stores.planchets, planchetCoinPub); + const p = await tx.planchets.get(planchetCoinPub); if (!p || p.withdrawalDone) { return false; } p.withdrawalDone = true; - await tx.put(Stores.planchets, p); - await tx.add(Stores.coins, coin); + await tx.planchets.put(p); + await tx.coins.add(coin); return true; - }, - ); + }); if (firstSuccess) { ws.notify({ @@ -636,12 +684,14 @@ export async function updateWithdrawalDenoms( ws: InternalWalletState, exchangeBaseUrl: string, ): Promise<void> { - const exchangeDetails = await ws.db.runWithReadTransaction( - [Stores.exchanges, Stores.exchangeDetails], - async (tx) => { + const exchangeDetails = await ws.db + .mktx((x) => ({ + exchanges: x.exchanges, + exchangeDetails: x.exchangeDetails, + })) + .runReadOnly(async (tx) => { return getExchangeDetails(tx, exchangeBaseUrl); - }, - ); + }); if (!exchangeDetails) { logger.error("exchange details not available"); throw Error(`exchange ${exchangeBaseUrl} details not available`); @@ -663,7 +713,11 @@ export async function updateWithdrawalDenoms( } else { denom.status = DenominationStatus.VerifiedGood; } - await ws.db.put(Stores.denominations, denom); + await ws.db + .mktx((x) => ({ denominations: x.denominations })) + .runReadWrite(async (tx) => { + await tx.denominations.put(denom); + }); } } // FIXME: This debug info should either be made conditional on some flag @@ -698,16 +752,18 @@ async function incrementWithdrawalRetry( withdrawalGroupId: string, err: TalerErrorDetails | undefined, ): Promise<void> { - await ws.db.runWithWriteTransaction([Stores.withdrawalGroups], async (tx) => { - const wsr = await tx.get(Stores.withdrawalGroups, withdrawalGroupId); - if (!wsr) { - return; - } - wsr.retryInfo.retryCounter++; - updateRetryInfoTimeout(wsr.retryInfo); - wsr.lastError = err; - await tx.put(Stores.withdrawalGroups, wsr); - }); + await ws.db + .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) + .runReadWrite(async (tx) => { + const wsr = await tx.withdrawalGroups.get(withdrawalGroupId); + if (!wsr) { + return; + } + wsr.retryInfo.retryCounter++; + updateRetryInfoTimeout(wsr.retryInfo); + wsr.lastError = err; + await tx.withdrawalGroups.put(wsr); + }); if (err) { ws.notify({ type: NotificationType.WithdrawOperationError, error: err }); } @@ -730,12 +786,15 @@ async function resetWithdrawalGroupRetry( ws: InternalWalletState, withdrawalGroupId: string, ): Promise<void> { - await ws.db.mutate(Stores.withdrawalGroups, withdrawalGroupId, (x) => { - if (x.retryInfo.active) { - x.retryInfo = initRetryInfo(); - } - return x; - }); + await ws.db + .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) + .runReadWrite(async (tx) => { + const x = await tx.withdrawalGroups.get(withdrawalGroupId); + if (x && x.retryInfo.active) { + x.retryInfo = initRetryInfo(); + await tx.withdrawalGroups.put(x); + } + }); } async function processWithdrawGroupImpl( @@ -747,10 +806,11 @@ async function processWithdrawGroupImpl( if (forceNow) { await resetWithdrawalGroupRetry(ws, withdrawalGroupId); } - const withdrawalGroup = await ws.db.get( - Stores.withdrawalGroups, - withdrawalGroupId, - ); + const withdrawalGroup = await ws.db + .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) + .runReadOnly(async (tx) => { + return tx.withdrawalGroups.get(withdrawalGroupId); + }); if (!withdrawalGroup) { logger.trace("withdraw session doesn't exist"); return; @@ -793,16 +853,21 @@ async function processWithdrawGroupImpl( let finishedForFirstTime = false; let errorsPerCoin: Record<number, TalerErrorDetails> = {}; - await ws.db.runWithWriteTransaction( - [Stores.coins, Stores.withdrawalGroups, Stores.reserves, Stores.planchets], - async (tx) => { - const wg = await tx.get(Stores.withdrawalGroups, withdrawalGroupId); + await ws.db + .mktx((x) => ({ + coins: x.coins, + withdrawalGroups: x.withdrawalGroups, + reserves: x.reserves, + planchets: x.planchets, + })) + .runReadWrite(async (tx) => { + const wg = await tx.withdrawalGroups.get(withdrawalGroupId); if (!wg) { return; } - await tx - .iterIndexed(Stores.planchets.byGroup, withdrawalGroupId) + await tx.planchets.indexes.byGroup + .iter(withdrawalGroupId) .forEach((x) => { if (x.withdrawalDone) { numFinished++; @@ -819,9 +884,8 @@ async function processWithdrawGroupImpl( wg.retryInfo = initRetryInfo(false); } - await tx.put(Stores.withdrawalGroups, wg); - }, - ); + await tx.withdrawalGroups.put(wg); + }); if (numFinished != numTotalCoins) { throw OperationFailedError.fromCode( @@ -871,8 +935,12 @@ export async function getExchangeWithdrawalInfo( } const possibleDenoms = await ws.db - .iterIndex(Stores.denominations.exchangeBaseUrlIndex, baseUrl) - .filter((d) => d.isOffered); + .mktx((x) => ({ denominations: x.denominations })) + .runReadOnly(async (tx) => { + return tx.denominations.indexes.byExchangeBaseUrl + .iter() + .filter((d) => d.isOffered); + }); let versionMatch; if (exchangeDetails.protocolVersion) { @@ -953,23 +1021,24 @@ export async function getWithdrawalDetailsForUri( const exchanges: ExchangeListItem[] = []; - const exchangeRecords = await ws.db.iter(Stores.exchanges).toArray(); - - for (const r of exchangeRecords) { - const details = await ws.db.runWithReadTransaction( - [Stores.exchanges, Stores.exchangeDetails], - async (tx) => { - return getExchangeDetails(tx, r.baseUrl); - }, - ); - if (details) { - exchanges.push({ - exchangeBaseUrl: details.exchangeBaseUrl, - currency: details.currency, - paytoUris: details.wireInfo.accounts.map((x) => x.payto_uri), - }); - } - } + await ws.db + .mktx((x) => ({ + exchanges: x.exchanges, + exchangeDetails: x.exchangeDetails, + })) + .runReadOnly(async (tx) => { + const exchangeRecords = await tx.exchanges.iter().toArray(); + for (const r of exchangeRecords) { + const details = await getExchangeDetails(tx, r.baseUrl); + if (details) { + exchanges.push({ + exchangeBaseUrl: details.exchangeBaseUrl, + currency: details.currency, + paytoUris: details.wireInfo.accounts.map((x) => x.payto_uri), + }); + } + } + }); return { amount: Amounts.stringify(info.amount), |