diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/refresh.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/refresh.ts | 514 |
1 files changed, 309 insertions, 205 deletions
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 6f4c9725a..8d21e811d 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -22,7 +22,7 @@ import { DenominationRecord, RefreshGroupRecord, RefreshPlanchet, - Stores, + WalletStoresV1, } from "../db.js"; import { codecForExchangeMeltResponse, @@ -38,7 +38,6 @@ import { amountToPretty } from "@gnu-taler/taler-util"; import { readSuccessResponseJsonOrThrow } from "../util/http"; import { checkDbInvariant } from "../util/invariants"; import { Logger } from "@gnu-taler/taler-util"; -import { TransactionHandle } from "../util/query"; import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries"; import { Duration, @@ -57,6 +56,8 @@ import { updateExchangeFromUrl } from "./exchanges"; import { EXCHANGE_COINS_LOCK, InternalWalletState } from "./state"; import { isWithdrawableDenom, selectWithdrawalDenominations } from "./withdraw"; import { RefreshNewDenomInfo } from "../crypto/cryptoTypes.js"; +import { GetReadWriteAccess } from "../util/query.js"; +import { Wallet } from "../wallet.js"; const logger = new Logger("refresh.ts"); @@ -95,7 +96,7 @@ export function getTotalRefreshCost( } /** - * Create a refresh session inside a refresh group. + * Create a refresh session for one particular coin inside a refresh group. */ async function refreshCreateSession( ws: InternalWalletState, @@ -105,45 +106,68 @@ async function refreshCreateSession( logger.trace( `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, ); - const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); - if (!refreshGroup) { - return; - } - if (refreshGroup.finishedPerCoin[coinIndex]) { - return; - } - const existingRefreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; - if (existingRefreshSession) { + + const d = await ws.db + .mktx((x) => ({ + refreshGroups: x.refreshGroups, + coins: x.coins, + })) + .runReadWrite(async (tx) => { + const refreshGroup = await tx.refreshGroups.get(refreshGroupId); + if (!refreshGroup) { + return; + } + if (refreshGroup.finishedPerCoin[coinIndex]) { + return; + } + const existingRefreshSession = + refreshGroup.refreshSessionPerCoin[coinIndex]; + if (existingRefreshSession) { + return; + } + const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; + const coin = await tx.coins.get(oldCoinPub); + if (!coin) { + throw Error("Can't refresh, coin not found"); + } + return { refreshGroup, coin }; + }); + + if (!d) { return; } - const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; - const coin = await ws.db.get(Stores.coins, oldCoinPub); - if (!coin) { - throw Error("Can't refresh, coin not found"); - } + + const { refreshGroup, coin } = d; const { exchange } = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl); if (!exchange) { throw Error("db inconsistent: exchange of coin not found"); } - const oldDenom = await ws.db.get(Stores.denominations, [ - exchange.baseUrl, - coin.denomPubHash, - ]); + const { availableAmount, availableDenoms } = await ws.db + .mktx((x) => ({ + denominations: x.denominations, + })) + .runReadOnly(async (tx) => { + const oldDenom = await tx.denominations.get([ + exchange.baseUrl, + coin.denomPubHash, + ]); - if (!oldDenom) { - throw Error("db inconsistent: denomination for coin not found"); - } + if (!oldDenom) { + throw Error("db inconsistent: denomination for coin not found"); + } - const availableDenoms: DenominationRecord[] = await ws.db - .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl) - .toArray(); + const availableDenoms: DenominationRecord[] = await tx.denominations.indexes.byExchangeBaseUrl + .iter(exchange.baseUrl) + .toArray(); - const availableAmount = Amounts.sub( - refreshGroup.inputPerCoin[coinIndex], - oldDenom.feeRefresh, - ).amount; + const availableAmount = Amounts.sub( + refreshGroup.inputPerCoin[coinIndex], + oldDenom.feeRefresh, + ).amount; + return { availableAmount, availableDenoms }; + }); const newCoinDenoms = selectWithdrawalDenominations( availableAmount, @@ -156,10 +180,13 @@ async function refreshCreateSession( availableAmount, )} too small`, ); - await ws.db.runWithWriteTransaction( - [Stores.coins, Stores.refreshGroups], - async (tx) => { - const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + await ws.db + .mktx((x) => ({ + coins: x.coins, + refreshGroups: x.refreshGroups, + })) + .runReadWrite(async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } @@ -175,9 +202,8 @@ async function refreshCreateSession( rg.timestampFinished = getTimestampNow(); rg.retryInfo = initRetryInfo(false); } - await tx.put(Stores.refreshGroups, rg); - }, - ); + await tx.refreshGroups.put(rg); + }); ws.notify({ type: NotificationType.RefreshUnwarranted }); return; } @@ -185,10 +211,13 @@ async function refreshCreateSession( const sessionSecretSeed = encodeCrock(getRandomBytes(64)); // Store refresh session for this coin in the database. - await ws.db.runWithWriteTransaction( - [Stores.refreshGroups, Stores.coins], - async (tx) => { - const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + await ws.db + .mktx((x) => ({ + refreshGroups: x.refreshGroups, + coins: x.coins, + })) + .runReadWrite(async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; } @@ -204,9 +233,8 @@ async function refreshCreateSession( })), amountRefreshOutput: newCoinDenoms.totalCoinValue, }; - await tx.put(Stores.refreshGroups, rg); - }, - ); + await tx.refreshGroups.put(rg); + }); logger.info( `created refresh session for coin #${coinIndex} in ${refreshGroupId}`, ); @@ -222,48 +250,63 @@ async function refreshMelt( refreshGroupId: string, coinIndex: number, ): Promise<void> { - const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); - if (!refreshGroup) { - return; - } - const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; - if (!refreshSession) { - return; - } - if (refreshSession.norevealIndex !== undefined) { - return; - } + const d = await ws.db + .mktx((x) => ({ + refreshGroups: x.refreshGroups, + coins: x.coins, + denominations: x.denominations, + })) + .runReadWrite(async (tx) => { + const refreshGroup = await tx.refreshGroups.get(refreshGroupId); + if (!refreshGroup) { + return; + } + const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; + if (!refreshSession) { + return; + } + if (refreshSession.norevealIndex !== undefined) { + return; + } - const oldCoin = await ws.db.get( - Stores.coins, - refreshGroup.oldCoinPubs[coinIndex], - ); - checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); - const oldDenom = await ws.db.get(Stores.denominations, [ - oldCoin.exchangeBaseUrl, - oldCoin.denomPubHash, - ]); - checkDbInvariant(!!oldDenom, "denomination for melted coin doesn't exist"); + const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); + checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); + const oldDenom = await tx.denominations.get([ + oldCoin.exchangeBaseUrl, + oldCoin.denomPubHash, + ]); + checkDbInvariant( + !!oldDenom, + "denomination for melted coin doesn't exist", + ); - const newCoinDenoms: RefreshNewDenomInfo[] = []; + const newCoinDenoms: RefreshNewDenomInfo[] = []; - for (const dh of refreshSession.newDenoms) { - const newDenom = await ws.db.get(Stores.denominations, [ - oldCoin.exchangeBaseUrl, - dh.denomPubHash, - ]); - checkDbInvariant( - !!newDenom, - "new denomination for refresh not in database", - ); - newCoinDenoms.push({ - count: dh.count, - denomPub: newDenom.denomPub, - feeWithdraw: newDenom.feeWithdraw, - value: newDenom.value, + for (const dh of refreshSession.newDenoms) { + const newDenom = await tx.denominations.get([ + oldCoin.exchangeBaseUrl, + dh.denomPubHash, + ]); + checkDbInvariant( + !!newDenom, + "new denomination for refresh not in database", + ); + newCoinDenoms.push({ + count: dh.count, + denomPub: newDenom.denomPub, + feeWithdraw: newDenom.feeWithdraw, + value: newDenom.value, + }); + } + return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession }; }); + + if (!d) { + return; } + const { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession } = d; + const derived = await ws.cryptoApi.deriveRefreshSession({ kappa: 3, meltCoinDenomPubHash: oldCoin.denomPubHash, @@ -303,20 +346,28 @@ async function refreshMelt( refreshSession.norevealIndex = norevealIndex; - await ws.db.mutate(Stores.refreshGroups, refreshGroupId, (rg) => { - const rs = rg.refreshSessionPerCoin[coinIndex]; - if (rg.timestampFinished) { - return; - } - if (!rs) { - return; - } - if (rs.norevealIndex !== undefined) { - return; - } - rs.norevealIndex = norevealIndex; - return rg; - }); + await ws.db + .mktx((x) => ({ + refreshGroups: x.refreshGroups, + })) + .runReadWrite(async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); + if (!rg) { + return; + } + if (rg.timestampFinished) { + return; + } + const rs = rg.refreshSessionPerCoin[coinIndex]; + if (!rs) { + return; + } + if (rs.norevealIndex !== undefined) { + return; + } + rs.norevealIndex = norevealIndex; + await tx.refreshGroups.put(rg); + }); ws.notify({ type: NotificationType.RefreshMelted, @@ -328,49 +379,78 @@ async function refreshReveal( refreshGroupId: string, coinIndex: number, ): Promise<void> { - const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); - if (!refreshGroup) { - return; - } - const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; - if (!refreshSession) { - return; - } - const norevealIndex = refreshSession.norevealIndex; - if (norevealIndex === undefined) { - throw Error("can't reveal without melting first"); - } + const d = await ws.db + .mktx((x) => ({ + refreshGroups: x.refreshGroups, + coins: x.coins, + denominations: x.denominations, + })) + .runReadOnly(async (tx) => { + const refreshGroup = await tx.refreshGroups.get(refreshGroupId); + if (!refreshGroup) { + return; + } + const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; + if (!refreshSession) { + return; + } + const norevealIndex = refreshSession.norevealIndex; + if (norevealIndex === undefined) { + throw Error("can't reveal without melting first"); + } - const oldCoin = await ws.db.get( - Stores.coins, - refreshGroup.oldCoinPubs[coinIndex], - ); - checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); - const oldDenom = await ws.db.get(Stores.denominations, [ - oldCoin.exchangeBaseUrl, - oldCoin.denomPubHash, - ]); - checkDbInvariant(!!oldDenom, "denomination for melted coin doesn't exist"); + const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]); + checkDbInvariant(!!oldCoin, "melt coin doesn't exist"); + const oldDenom = await tx.denominations.get([ + oldCoin.exchangeBaseUrl, + oldCoin.denomPubHash, + ]); + checkDbInvariant( + !!oldDenom, + "denomination for melted coin doesn't exist", + ); - const newCoinDenoms: RefreshNewDenomInfo[] = []; + const newCoinDenoms: RefreshNewDenomInfo[] = []; - for (const dh of refreshSession.newDenoms) { - const newDenom = await ws.db.get(Stores.denominations, [ - oldCoin.exchangeBaseUrl, - dh.denomPubHash, - ]); - checkDbInvariant( - !!newDenom, - "new denomination for refresh not in database", - ); - newCoinDenoms.push({ - count: dh.count, - denomPub: newDenom.denomPub, - feeWithdraw: newDenom.feeWithdraw, - value: newDenom.value, + for (const dh of refreshSession.newDenoms) { + const newDenom = await tx.denominations.get([ + oldCoin.exchangeBaseUrl, + dh.denomPubHash, + ]); + checkDbInvariant( + !!newDenom, + "new denomination for refresh not in database", + ); + newCoinDenoms.push({ + count: dh.count, + denomPub: newDenom.denomPub, + feeWithdraw: newDenom.feeWithdraw, + value: newDenom.value, + }); + } + return { + oldCoin, + oldDenom, + newCoinDenoms, + refreshSession, + refreshGroup, + norevealIndex, + }; }); + + if (!d) { + return; } + const { + oldCoin, + oldDenom, + newCoinDenoms, + refreshSession, + refreshGroup, + norevealIndex, + } = d; + const derived = await ws.cryptoApi.deriveRefreshSession({ kappa: 3, meltCoinDenomPubHash: oldCoin.denomPubHash, @@ -389,14 +469,6 @@ async function refreshReveal( throw Error("refresh index error"); } - const meltCoinRecord = await ws.db.get( - Stores.coins, - refreshGroup.oldCoinPubs[coinIndex], - ); - if (!meltCoinRecord) { - throw Error("inconsistent database"); - } - const evs = planchets.map((x: RefreshPlanchet) => x.coinEv); const newDenomsFlat: string[] = []; const linkSigs: string[] = []; @@ -406,9 +478,9 @@ async function refreshReveal( for (let j = 0; j < dsel.count; j++) { const newCoinIndex = linkSigs.length; const linkSig = await ws.cryptoApi.signCoinLink( - meltCoinRecord.coinPriv, + oldCoin.coinPriv, dsel.denomPubHash, - meltCoinRecord.coinPub, + oldCoin.coinPub, derived.transferPubs[norevealIndex], planchets[newCoinIndex].coinEv, ); @@ -447,10 +519,17 @@ async function refreshReveal( for (let i = 0; i < refreshSession.newDenoms.length; i++) { for (let j = 0; j < refreshSession.newDenoms[i].count; j++) { const newCoinIndex = coins.length; - const denom = await ws.db.get(Stores.denominations, [ - oldCoin.exchangeBaseUrl, - refreshSession.newDenoms[i].denomPubHash, - ]); + // FIXME: Look up in earlier transaction! + const denom = await ws.db + .mktx((x) => ({ + denominations: x.denominations, + })) + .runReadOnly(async (tx) => { + return tx.denominations.get([ + oldCoin.exchangeBaseUrl, + refreshSession.newDenoms[i].denomPubHash, + ]); + }); if (!denom) { console.error("denom not found"); continue; @@ -483,10 +562,13 @@ async function refreshReveal( } } - await ws.db.runWithWriteTransaction( - [Stores.coins, Stores.refreshGroups], - async (tx) => { - const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + await ws.db + .mktx((x) => ({ + coins: x.coins, + refreshGroups: x.refreshGroups, + })) + .runReadWrite(async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { logger.warn("no refresh session found"); return; @@ -508,11 +590,10 @@ async function refreshReveal( rg.retryInfo = initRetryInfo(false); } for (const coin of coins) { - await tx.put(Stores.coins, coin); + await tx.coins.put(coin); } - await tx.put(Stores.refreshGroups, rg); - }, - ); + await tx.refreshGroups.put(rg); + }); logger.trace("refresh finished (end of reveal)"); ws.notify({ type: NotificationType.RefreshRevealed, @@ -524,19 +605,23 @@ async function incrementRefreshRetry( refreshGroupId: string, err: TalerErrorDetails | undefined, ): Promise<void> { - await ws.db.runWithWriteTransaction([Stores.refreshGroups], async (tx) => { - const r = await tx.get(Stores.refreshGroups, refreshGroupId); - if (!r) { - return; - } - if (!r.retryInfo) { - return; - } - r.retryInfo.retryCounter++; - updateRetryInfoTimeout(r.retryInfo); - r.lastError = err; - await tx.put(Stores.refreshGroups, r); - }); + await ws.db + .mktx((x) => ({ + refreshGroups: x.refreshGroups, + })) + .runReadWrite(async (tx) => { + const r = await tx.refreshGroups.get(refreshGroupId); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.refreshGroups.put(r); + }); if (err) { ws.notify({ type: NotificationType.RefreshOperationError, error: err }); } @@ -562,14 +647,19 @@ export async function processRefreshGroup( async function resetRefreshGroupRetry( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, ): Promise<void> { - await ws.db.mutate(Stores.refreshGroups, refreshSessionId, (x) => { - if (x.retryInfo.active) { - x.retryInfo = initRetryInfo(); - } - return x; - }); + await ws.db + .mktx((x) => ({ + refreshGroups: x.refreshGroups, + })) + .runReadWrite(async (tx) => { + const x = await tx.refreshGroups.get(refreshGroupId); + if (x && x.retryInfo.active) { + x.retryInfo = initRetryInfo(); + await tx.refreshGroups.put(x); + } + }); } async function processRefreshGroupImpl( @@ -580,13 +670,20 @@ async function processRefreshGroupImpl( if (forceNow) { await resetRefreshGroupRetry(ws, refreshGroupId); } - const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + const refreshGroup = await ws.db + .mktx((x) => ({ + refreshGroups: x.refreshGroups, + })) + .runReadOnly(async (tx) => { + return tx.refreshGroups.get(refreshGroupId); + }); if (!refreshGroup) { return; } if (refreshGroup.timestampFinished) { return; } + // Process refresh sessions of the group in parallel. const ps = refreshGroup.oldCoinPubs.map((x, i) => processRefreshSession(ws, refreshGroupId, i), ); @@ -602,7 +699,11 @@ async function processRefreshSession( logger.trace( `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, ); - let refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + let refreshGroup = await ws.db + .mktx((x) => ({ refreshGroups: x.refreshGroups })) + .runReadOnly(async (tx) => { + return tx.refreshGroups.get(refreshGroupId); + }); if (!refreshGroup) { return; } @@ -611,7 +712,11 @@ async function processRefreshSession( } if (!refreshGroup.refreshSessionPerCoin[coinIndex]) { await refreshCreateSession(ws, refreshGroupId, coinIndex); - refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + refreshGroup = await ws.db + .mktx((x) => ({ refreshGroups: x.refreshGroups })) + .runReadOnly(async (tx) => { + return tx.refreshGroups.get(refreshGroupId); + }); if (!refreshGroup) { return; } @@ -646,11 +751,11 @@ async function processRefreshSession( */ export async function createRefreshGroup( ws: InternalWalletState, - tx: TransactionHandle< - | typeof Stores.denominations - | typeof Stores.coins - | typeof Stores.refreshGroups - >, + tx: GetReadWriteAccess<{ + denominations: typeof WalletStoresV1.denominations; + coins: typeof WalletStoresV1.coins; + refreshGroups: typeof WalletStoresV1.refreshGroups; + }>, oldCoinPubs: CoinPublicKey[], reason: RefreshReason, ): Promise<RefreshGroupId> { @@ -667,8 +772,8 @@ export async function createRefreshGroup( if (denomsPerExchange[exchangeBaseUrl]) { return denomsPerExchange[exchangeBaseUrl]; } - const allDenoms = await tx - .iterIndexed(Stores.denominations.exchangeBaseUrlIndex, exchangeBaseUrl) + const allDenoms = await tx.denominations.indexes.byExchangeBaseUrl + .iter(exchangeBaseUrl) .filter((x) => { return isWithdrawableDenom(x); }); @@ -677,9 +782,9 @@ export async function createRefreshGroup( }; for (const ocp of oldCoinPubs) { - const coin = await tx.get(Stores.coins, ocp.coinPub); + const coin = await tx.coins.get(ocp.coinPub); checkDbInvariant(!!coin, "coin must be in database"); - const denom = await tx.get(Stores.denominations, [ + const denom = await tx.denominations.get([ coin.exchangeBaseUrl, coin.denomPubHash, ]); @@ -691,7 +796,7 @@ export async function createRefreshGroup( inputPerCoin.push(refreshAmount); coin.currentAmount = Amounts.getZero(refreshAmount.currency); coin.status = CoinStatus.Dormant; - await tx.put(Stores.coins, coin); + await tx.coins.put(coin); const denoms = await getDenoms(coin.exchangeBaseUrl); const cost = getTotalRefreshCost(denoms, denom, refreshAmount); const output = Amounts.sub(refreshAmount, cost).amount; @@ -718,7 +823,7 @@ export async function createRefreshGroup( refreshGroup.timestampFinished = getTimestampNow(); } - await tx.put(Stores.refreshGroups, refreshGroup); + await tx.refreshGroups.put(refreshGroup); logger.trace(`created refresh group ${refreshGroupId}`); @@ -760,20 +865,20 @@ export async function autoRefresh( exchangeBaseUrl: string, ): Promise<void> { await updateExchangeFromUrl(ws, exchangeBaseUrl, true); - await ws.db.runWithWriteTransaction( - [ - Stores.coins, - Stores.denominations, - Stores.refreshGroups, - Stores.exchanges, - ], - async (tx) => { - const exchange = await tx.get(Stores.exchanges, exchangeBaseUrl); + await ws.db + .mktx((x) => ({ + coins: x.coins, + denominations: x.denominations, + refreshGroups: x.refreshGroups, + exchanges: x.exchanges, + })) + .runReadWrite(async (tx) => { + const exchange = await tx.exchanges.get(exchangeBaseUrl); if (!exchange) { return; } - const coins = await tx - .iterIndexed(Stores.coins.exchangeBaseUrlIndex, exchangeBaseUrl) + const coins = await tx.coins.indexes.byBaseUrl + .iter(exchangeBaseUrl) .toArray(); const refreshCoins: CoinPublicKey[] = []; for (const coin of coins) { @@ -783,7 +888,7 @@ export async function autoRefresh( if (coin.suspended) { continue; } - const denom = await tx.get(Stores.denominations, [ + const denom = await tx.denominations.get([ exchangeBaseUrl, coin.denomPubHash, ]); @@ -800,8 +905,8 @@ export async function autoRefresh( await createRefreshGroup(ws, tx, refreshCoins, RefreshReason.Scheduled); } - const denoms = await tx - .iterIndexed(Stores.denominations.exchangeBaseUrlIndex, exchangeBaseUrl) + const denoms = await tx.denominations.indexes.byExchangeBaseUrl + .iter(exchangeBaseUrl) .toArray(); let minCheckThreshold = timestampAddDuration( getTimestampNow(), @@ -817,7 +922,6 @@ export async function autoRefresh( minCheckThreshold = timestampMin(minCheckThreshold, checkThreshold); } exchange.nextRefreshCheck = minCheckThreshold; - await tx.put(Stores.exchanges, exchange); - }, - ); + await tx.exchanges.put(exchange); + }); } |