diff options
Diffstat (limited to 'src/operations/refresh.ts')
-rw-r--r-- | src/operations/refresh.ts | 364 |
1 files changed, 239 insertions, 125 deletions
diff --git a/src/operations/refresh.ts b/src/operations/refresh.ts index 4ffc3ea60..be23a5bb0 100644 --- a/src/operations/refresh.ts +++ b/src/operations/refresh.ts @@ -25,16 +25,24 @@ import { RefreshSessionRecord, initRetryInfo, updateRetryInfoTimeout, + RefreshGroupRecord, } from "../types/dbTypes"; import { amountToPretty } from "../util/helpers"; -import { Database } from "../util/query"; +import { Database, TransactionHandle } from "../util/query"; import { InternalWalletState } from "./state"; import { Logger } from "../util/logging"; import { getWithdrawDenomList } from "./withdraw"; import { updateExchangeFromUrl } from "./exchanges"; -import { getTimestampNow, OperationError } from "../types/walletTypes"; +import { + getTimestampNow, + OperationError, + CoinPublicKey, + RefreshReason, + RefreshGroupId, +} from "../types/walletTypes"; import { guardOperationException } from "./errors"; import { NotificationType } from "../types/notifications"; +import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto"; const logger = new Logger("refresh.ts"); @@ -71,11 +79,130 @@ export function getTotalRefreshCost( return totalCost; } +/** + * Create a refresh session inside a refresh group. + */ +async function refreshCreateSession( + ws: InternalWalletState, + refreshGroupId: string, + coinIndex: number, +): Promise<void> { + logger.trace( + `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, + ); + const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { + return; + } + if (refreshGroup.finishedPerCoin[coinIndex]) { + return; + } + const existingRefreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; + if (existingRefreshSession) { + return; + } + const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; + const coin = await ws.db.get(Stores.coins, oldCoinPub); + if (!coin) { + throw Error("Can't refresh, coin not found"); + } + + const exchange = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl); + if (!exchange) { + throw Error("db inconsistent: exchange of coin not found"); + } + + const oldDenom = await ws.db.get(Stores.denominations, [ + exchange.baseUrl, + coin.denomPub, + ]); + + if (!oldDenom) { + throw Error("db inconsistent: denomination for coin not found"); + } + + const availableDenoms: DenominationRecord[] = await ws.db + .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl) + .toArray(); + + const availableAmount = Amounts.sub(coin.currentAmount, oldDenom.feeRefresh) + .amount; + + const newCoinDenoms = getWithdrawDenomList(availableAmount, availableDenoms); + + if (newCoinDenoms.length === 0) { + logger.trace( + `not refreshing, available amount ${amountToPretty( + availableAmount, + )} too small`, + ); + await ws.db.runWithWriteTransaction( + [Stores.coins, Stores.refreshGroups], + async tx => { + const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + if (!rg) { + return; + } + rg.finishedPerCoin[coinIndex] = true; + await tx.put(Stores.refreshGroups, rg); + }, + ); + ws.notify({ type: NotificationType.RefreshRefused }); + return; + } + + const refreshSession: RefreshSessionRecord = await ws.cryptoApi.createRefreshSession( + exchange.baseUrl, + 3, + coin, + newCoinDenoms, + oldDenom.feeRefresh, + ); + + // Store refresh session and subtract refreshed amount from + // coin in the same transaction. + await ws.db.runWithWriteTransaction( + [Stores.refreshGroups, Stores.coins], + async tx => { + const c = await tx.get(Stores.coins, coin.coinPub); + if (!c) { + throw Error("coin not found, but marked for refresh"); + } + const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); + if (r.saturated) { + console.log("can't refresh coin, no amount left"); + return; + } + c.currentAmount = r.amount; + c.status = CoinStatus.Dormant; + const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + if (!rg) { + return; + } + if (rg.refreshSessionPerCoin[coinIndex]) { + return; + } + rg.refreshSessionPerCoin[coinIndex] = refreshSession; + await tx.put(Stores.refreshGroups, rg); + await tx.put(Stores.coins, c); + }, + ); + logger.info( + `created refresh session for coin #${coinIndex} in ${refreshGroupId}`, + ); + ws.notify({ type: NotificationType.RefreshStarted }); +} + async function refreshMelt( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, + coinIndex: number, ): Promise<void> { - const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId); + const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { + return; + } + const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; if (!refreshSession) { return; } @@ -122,7 +249,11 @@ async function refreshMelt( refreshSession.norevealIndex = norevealIndex; - await ws.db.mutate(Stores.refresh, refreshSessionId, rs => { + await ws.db.mutate(Stores.refreshGroups, refreshGroupId, rg => { + const rs = rg.refreshSessionPerCoin[coinIndex]; + if (!rs) { + return; + } if (rs.norevealIndex !== undefined) { return; } @@ -130,7 +261,7 @@ async function refreshMelt( return; } rs.norevealIndex = norevealIndex; - return rs; + return rg; }); ws.notify({ @@ -140,9 +271,14 @@ async function refreshMelt( async function refreshReveal( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, + coinIndex: number, ): Promise<void> { - const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId); + const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { + return; + } + const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; if (!refreshSession) { return; } @@ -253,23 +389,38 @@ async function refreshReveal( } await ws.db.runWithWriteTransaction( - [Stores.coins, Stores.refresh], + [Stores.coins, Stores.refreshGroups], async tx => { - const rs = await tx.get(Stores.refresh, refreshSessionId); - if (!rs) { + const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + if (!rg) { console.log("no refresh session found"); return; } + const rs = rg.refreshSessionPerCoin[coinIndex]; + if (!rs) { + return; + } if (rs.finishedTimestamp) { console.log("refresh session already finished"); return; } rs.finishedTimestamp = getTimestampNow(); - rs.retryInfo = initRetryInfo(false); + rg.finishedPerCoin[coinIndex] = true; + let allDone = true; + for (const f of rg.finishedPerCoin) { + if (!f) { + allDone = false; + break; + } + } + if (allDone) { + rg.finishedTimestamp = getTimestampNow(); + rg.retryInfo = initRetryInfo(false); + } for (let coin of coins) { await tx.put(Stores.coins, coin); } - await tx.put(Stores.refresh, rs); + await tx.put(Stores.refreshGroups, rg); }, ); console.log("refresh finished (end of reveal)"); @@ -280,11 +431,11 @@ async function refreshReveal( async function incrementRefreshRetry( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, err: OperationError | undefined, ): Promise<void> { - await ws.db.runWithWriteTransaction([Stores.refresh], async tx => { - const r = await tx.get(Stores.refresh, refreshSessionId); + await ws.db.runWithWriteTransaction([Stores.refreshGroups], async tx => { + const r = await tx.get(Stores.refreshGroups, refreshGroupId); if (!r) { return; } @@ -294,31 +445,31 @@ async function incrementRefreshRetry( r.retryInfo.retryCounter++; updateRetryInfoTimeout(r.retryInfo); r.lastError = err; - await tx.put(Stores.refresh, r); + await tx.put(Stores.refreshGroups, r); }); ws.notify({ type: NotificationType.RefreshOperationError }); } -export async function processRefreshSession( +export async function processRefreshGroup( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, forceNow: boolean = false, -) { - return ws.memoProcessRefresh.memo(refreshSessionId, async () => { +): Promise<void> { + await ws.memoProcessRefresh.memo(refreshGroupId, async () => { const onOpErr = (e: OperationError) => - incrementRefreshRetry(ws, refreshSessionId, e); - return guardOperationException( - () => processRefreshSessionImpl(ws, refreshSessionId, forceNow), + incrementRefreshRetry(ws, refreshGroupId, e); + return await guardOperationException( + async () => await processRefreshGroupImpl(ws, refreshGroupId, forceNow), onOpErr, ); }); } -async function resetRefreshSessionRetry( +async function resetRefreshGroupRetry( ws: InternalWalletState, refreshSessionId: string, ) { - await ws.db.mutate(Stores.refresh, refreshSessionId, x => { + await ws.db.mutate(Stores.refreshGroups, refreshSessionId, x => { if (x.retryInfo.active) { x.retryInfo = initRetryInfo(); } @@ -326,124 +477,87 @@ async function resetRefreshSessionRetry( }); } -async function processRefreshSessionImpl( +async function processRefreshGroupImpl( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, forceNow: boolean, ) { if (forceNow) { - await resetRefreshSessionRetry(ws, refreshSessionId); + await resetRefreshGroupRetry(ws, refreshGroupId); } - const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId); - if (!refreshSession) { + const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { return; } - if (refreshSession.finishedTimestamp) { + if (refreshGroup.finishedTimestamp) { return; } - if (typeof refreshSession.norevealIndex !== "number") { - await refreshMelt(ws, refreshSession.refreshSessionId); - } - await refreshReveal(ws, refreshSession.refreshSessionId); + const ps = refreshGroup.oldCoinPubs.map((x, i) => + processRefreshSession(ws, refreshGroupId, i), + ); + await Promise.all(ps); logger.trace("refresh finished"); } -export async function refresh( +async function processRefreshSession( ws: InternalWalletState, - oldCoinPub: string, - force: boolean = false, -): Promise<void> { - const coin = await ws.db.get(Stores.coins, oldCoinPub); - if (!coin) { - console.warn("can't refresh, coin not in database"); + refreshGroupId: string, + coinIndex: number, +) { + logger.trace(`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`); + let refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { return; } - switch (coin.status) { - case CoinStatus.Dirty: - break; - case CoinStatus.Dormant: - return; - case CoinStatus.Fresh: - if (!force) { - return; - } - break; - } - - const exchange = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl); - if (!exchange) { - throw Error("db inconsistent: exchange of coin not found"); + if (refreshGroup.finishedPerCoin[coinIndex]) { + return; } - - const oldDenom = await ws.db.get(Stores.denominations, [ - exchange.baseUrl, - coin.denomPub, - ]); - - if (!oldDenom) { - throw Error("db inconsistent: denomination for coin not found"); + if (!refreshGroup.refreshSessionPerCoin[coinIndex]) { + await refreshCreateSession(ws, refreshGroupId, coinIndex); + refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { + return; + } } - - const availableDenoms: DenominationRecord[] = await ws.db - .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl) - .toArray(); - - const availableAmount = Amounts.sub(coin.currentAmount, oldDenom.feeRefresh) - .amount; - - const newCoinDenoms = getWithdrawDenomList(availableAmount, availableDenoms); - - if (newCoinDenoms.length === 0) { - logger.trace( - `not refreshing, available amount ${amountToPretty( - availableAmount, - )} too small`, - ); - await ws.db.mutate(Stores.coins, oldCoinPub, x => { - if (x.status != coin.status) { - // Concurrent modification? - return; - } - x.status = CoinStatus.Dormant; - return x; - }); - ws.notify({ type: NotificationType.RefreshRefused }); + const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; + if (!refreshSession) { + if (!refreshGroup.finishedPerCoin[coinIndex]) { + throw Error( + "BUG: refresh session was not created and coin not marked as finished", + ); + } return; } + if (refreshSession.norevealIndex === undefined) { + await refreshMelt(ws, refreshGroupId, coinIndex); + } + await refreshReveal(ws, refreshGroupId, coinIndex); +} - const refreshSession: RefreshSessionRecord = await ws.cryptoApi.createRefreshSession( - exchange.baseUrl, - 3, - coin, - newCoinDenoms, - oldDenom.feeRefresh, - ); - - // Store refresh session and subtract refreshed amount from - // coin in the same transaction. - await ws.db.runWithWriteTransaction( - [Stores.refresh, Stores.coins], - async tx => { - const c = await tx.get(Stores.coins, coin.coinPub); - if (!c) { - return; - } - if (c.status !== CoinStatus.Dirty) { - return; - } - const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); - if (r.saturated) { - console.log("can't refresh coin, no amount left"); - return; - } - c.currentAmount = r.amount; - c.status = CoinStatus.Dormant; - await tx.put(Stores.refresh, refreshSession); - await tx.put(Stores.coins, c); - }, - ); - logger.info(`created refresh session ${refreshSession.refreshSessionId}`); - ws.notify({ type: NotificationType.RefreshStarted }); +/** + * Create a refresh group for a list of coins. + */ +export async function createRefreshGroup( + tx: TransactionHandle, + oldCoinPubs: CoinPublicKey[], + reason: RefreshReason, +): Promise<RefreshGroupId> { + const refreshGroupId = encodeCrock(getRandomBytes(32)); + + const refreshGroup: RefreshGroupRecord = { + finishedTimestamp: undefined, + finishedPerCoin: oldCoinPubs.map(x => false), + lastError: undefined, + lastErrorPerCoin: oldCoinPubs.map(x => undefined), + oldCoinPubs: oldCoinPubs.map(x => x.coinPub), + reason, + refreshGroupId, + refreshSessionPerCoin: oldCoinPubs.map(x => undefined), + retryInfo: initRetryInfo(), + }; - await processRefreshSession(ws, refreshSession.refreshSessionId); + await tx.put(Stores.refreshGroups, refreshGroup); + return { + refreshGroupId, + }; } |