diff options
author | Florian Dold <florian.dold@gmail.com> | 2019-12-15 16:59:00 +0100 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2019-12-15 16:59:00 +0100 |
commit | 4966376839365536923cd6cfbb86d15071432e1a (patch) | |
tree | 6658b4a84df5ba6a0189e6d79e37fb0cc7fb597a /src | |
parent | f4043a0f8114b1b8612e01a5cdf65b8d6ffc6f00 (diff) |
group refresh sessions into groups for nicer history
Diffstat (limited to 'src')
-rw-r--r-- | src/crypto/workers/cryptoImplementation.ts | 5 | ||||
-rw-r--r-- | src/headless/taler-wallet-cli.ts | 2 | ||||
-rw-r--r-- | src/operations/balance.ts | 47 | ||||
-rw-r--r-- | src/operations/history.ts | 2 | ||||
-rw-r--r-- | src/operations/pay.ts | 20 | ||||
-rw-r--r-- | src/operations/pending.ts | 71 | ||||
-rw-r--r-- | src/operations/refresh.ts | 364 | ||||
-rw-r--r-- | src/operations/reserves.ts | 4 | ||||
-rw-r--r-- | src/operations/tip.ts | 1 | ||||
-rw-r--r-- | src/types/dbTypes.ts | 57 | ||||
-rw-r--r-- | src/types/history.ts | 14 | ||||
-rw-r--r-- | src/types/pending.ts | 87 | ||||
-rw-r--r-- | src/types/walletTypes.ts | 26 | ||||
-rw-r--r-- | src/util/asyncMemo.ts | 13 | ||||
-rw-r--r-- | src/wallet.ts | 63 |
15 files changed, 425 insertions, 351 deletions
diff --git a/src/crypto/workers/cryptoImplementation.ts b/src/crypto/workers/cryptoImplementation.ts index 04d15fb4b..01cd797b9 100644 --- a/src/crypto/workers/cryptoImplementation.ts +++ b/src/crypto/workers/cryptoImplementation.ts @@ -395,7 +395,6 @@ export class CryptoImplementation { const newAmount = Amounts.sub(cd.coin.currentAmount, coinSpend).amount; cd.coin.currentAmount = newAmount; - cd.coin.status = CoinStatus.Dirty; const d = buildSigPS(SignaturePurpose.WALLET_COIN_DEPOSIT) .put(decodeCrock(contractTermsHash)) @@ -509,10 +508,7 @@ export class CryptoImplementation { valueOutput = Amounts.add(valueOutput, denom.value).amount; } - const refreshSessionId = encodeCrock(getRandomBytes(32)); - const refreshSession: RefreshSessionRecord = { - refreshSessionId, confirmSig: encodeCrock(confirmSig), exchangeBaseUrl, hash: encodeCrock(sessionHash), @@ -526,7 +522,6 @@ export class CryptoImplementation { valueOutput, valueWithFee, created: getTimestampNow(), - retryInfo: initRetryInfo(), finishedTimestamp: undefined, lastError: undefined, }; diff --git a/src/headless/taler-wallet-cli.ts b/src/headless/taler-wallet-cli.ts index 2e2ded52c..8d4341103 100644 --- a/src/headless/taler-wallet-cli.ts +++ b/src/headless/taler-wallet-cli.ts @@ -330,7 +330,7 @@ advancedCli .requiredArgument("coinPub", clk.STRING) .action(async args => { await withWallet(args, async wallet => { - await wallet.refresh(args.refresh.coinPub, true); + await wallet.refresh(args.refresh.coinPub); }); }); diff --git a/src/operations/balance.ts b/src/operations/balance.ts index f5a51abec..15d8e52fa 100644 --- a/src/operations/balance.ts +++ b/src/operations/balance.ts @@ -74,7 +74,7 @@ export async function getBalances( }; await ws.db.runWithReadTransaction( - [Stores.coins, Stores.refresh, Stores.reserves, Stores.purchases, Stores.withdrawalSession], + [Stores.coins, Stores.refreshGroups, Stores.reserves, Stores.purchases, Stores.withdrawalSession], async tx => { await tx.iter(Stores.coins).forEach(c => { if (c.suspended) { @@ -83,39 +83,30 @@ export async function getBalances( if (c.status === CoinStatus.Fresh) { addTo(balanceStore, "available", c.currentAmount, c.exchangeBaseUrl); } - if (c.status === CoinStatus.Dirty) { - addTo( - balanceStore, - "pendingIncoming", - c.currentAmount, - c.exchangeBaseUrl, - ); - addTo( - balanceStore, - "pendingIncomingDirty", - c.currentAmount, - c.exchangeBaseUrl, - ); - } }); - await tx.iter(Stores.refresh).forEach(r => { + await tx.iter(Stores.refreshGroups).forEach(r => { // Don't count finished refreshes, since the refresh already resulted // in coins being added to the wallet. if (r.finishedTimestamp) { return; } - addTo( - balanceStore, - "pendingIncoming", - r.valueOutput, - r.exchangeBaseUrl, - ); - addTo( - balanceStore, - "pendingIncomingRefresh", - r.valueOutput, - r.exchangeBaseUrl, - ); + for (let i = 0; i < r.oldCoinPubs.length; i++) { + const session = r.refreshSessionPerCoin[i]; + if (session) { + addTo( + balanceStore, + "pendingIncoming", + session.valueOutput, + session.exchangeBaseUrl, + ); + addTo( + balanceStore, + "pendingIncomingRefresh", + session.valueOutput, + session.exchangeBaseUrl, + ); + } + } }); await tx.iter(Stores.withdrawalSession).forEach(wds => { diff --git a/src/operations/history.ts b/src/operations/history.ts index 64f5b21cc..8b225ea07 100644 --- a/src/operations/history.ts +++ b/src/operations/history.ts @@ -45,7 +45,7 @@ export async function getHistory( Stores.exchanges, Stores.proposals, Stores.purchases, - Stores.refresh, + Stores.refreshGroups, Stores.reserves, Stores.tips, Stores.withdrawalSession, diff --git a/src/operations/pay.ts b/src/operations/pay.ts index 27f0e4404..ccb55305d 100644 --- a/src/operations/pay.ts +++ b/src/operations/pay.ts @@ -34,6 +34,7 @@ import { PreparePayResult, ConfirmPayResult, OperationError, + RefreshReason, } from "../types/walletTypes"; import { Database @@ -65,7 +66,7 @@ import { parseRefundUri, getOrderDownloadUrl, } from "../util/taleruri"; -import { getTotalRefreshCost, refresh } from "./refresh"; +import { getTotalRefreshCost, createRefreshGroup } from "./refresh"; import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto"; import { guardOperationException } from "./errors"; import { assertUnreachable } from "../util/assertUnreachable"; @@ -782,26 +783,21 @@ export async function submitPay( console.error("coin not found"); throw Error("coin used in payment not found"); } - c.status = CoinStatus.Dirty; + c.status = CoinStatus.Dormant; modifiedCoins.push(c); } await ws.db.runWithWriteTransaction( - [Stores.coins, Stores.purchases], + [Stores.coins, Stores.purchases, Stores.refreshGroups], async tx => { for (let c of modifiedCoins) { await tx.put(Stores.coins, c); } + await createRefreshGroup(tx, modifiedCoins.map((x) => ({ coinPub: x.coinPub })), RefreshReason.Pay); await tx.put(Stores.purchases, purchase); }, ); - for (const c of purchase.payReq.coins) { - refresh(ws, c.coin_pub).catch(e => { - console.log("error in refreshing after payment:", e); - }); - } - const nextUrl = getNextUrl(purchase.contractTerms); ws.cachedNextUrl[purchase.contractTerms.fulfillment_url] = { nextUrl, @@ -1433,7 +1429,7 @@ async function processPurchaseApplyRefundImpl( let allRefundsProcessed = false; await ws.db.runWithWriteTransaction( - [Stores.purchases, Stores.coins], + [Stores.purchases, Stores.coins, Stores.refreshGroups], async tx => { const p = await tx.get(Stores.purchases, proposalId); if (!p) { @@ -1456,10 +1452,11 @@ async function processPurchaseApplyRefundImpl( } const refundAmount = Amounts.parseOrThrow(perm.refund_amount); const refundFee = Amounts.parseOrThrow(perm.refund_fee); - c.status = CoinStatus.Dirty; + c.status = CoinStatus.Dormant; c.currentAmount = Amounts.add(c.currentAmount, refundAmount).amount; c.currentAmount = Amounts.sub(c.currentAmount, refundFee).amount; await tx.put(Stores.coins, c); + await createRefreshGroup(tx, [{ coinPub: perm.coin_pub }], RefreshReason.Refund); }, ); if (allRefundsProcessed) { @@ -1467,7 +1464,6 @@ async function processPurchaseApplyRefundImpl( type: NotificationType.RefundFinished, }); } - await refresh(ws, perm.coin_pub); } ws.notify({ diff --git a/src/operations/pending.ts b/src/operations/pending.ts index 13859c64b..27892df06 100644 --- a/src/operations/pending.ts +++ b/src/operations/pending.ts @@ -31,7 +31,7 @@ import { CoinStatus, ProposalStatus, } from "../types/dbTypes"; -import { PendingOperationsResponse } from "../types/pending"; +import { PendingOperationsResponse, PendingOperationType } from "../types/pending"; function updateRetryDelay( oldDelay: Duration, @@ -59,7 +59,7 @@ async function gatherExchangePending( case ExchangeUpdateStatus.FINISHED: if (e.lastError) { resp.pendingOperations.push({ - type: "bug", + type: PendingOperationType.Bug, givesLifeness: false, message: "Exchange record is in FINISHED state but has lastError set", @@ -70,7 +70,7 @@ async function gatherExchangePending( } if (!e.details) { resp.pendingOperations.push({ - type: "bug", + type: PendingOperationType.Bug, givesLifeness: false, message: "Exchange record does not have details, but no update in progress.", @@ -81,7 +81,7 @@ async function gatherExchangePending( } if (!e.wireInfo) { resp.pendingOperations.push({ - type: "bug", + type: PendingOperationType.Bug, givesLifeness: false, message: "Exchange record does not have wire info, but no update in progress.", @@ -93,7 +93,7 @@ async function gatherExchangePending( break; case ExchangeUpdateStatus.FETCH_KEYS: resp.pendingOperations.push({ - type: "exchange-update", + type: PendingOperationType.ExchangeUpdate, givesLifeness: false, stage: "fetch-keys", exchangeBaseUrl: e.baseUrl, @@ -103,7 +103,7 @@ async function gatherExchangePending( break; case ExchangeUpdateStatus.FETCH_WIRE: resp.pendingOperations.push({ - type: "exchange-update", + type: PendingOperationType.ExchangeUpdate, givesLifeness: false, stage: "fetch-wire", exchangeBaseUrl: e.baseUrl, @@ -113,7 +113,7 @@ async function gatherExchangePending( break; default: resp.pendingOperations.push({ - type: "bug", + type: PendingOperationType.Bug, givesLifeness: false, message: "Unknown exchangeUpdateStatus", details: { @@ -147,7 +147,7 @@ async function gatherReservePending( break; } resp.pendingOperations.push({ - type: "reserve", + type: PendingOperationType.Reserve, givesLifeness: false, stage: reserve.reserveStatus, timestampCreated: reserve.created, @@ -169,7 +169,7 @@ async function gatherReservePending( return; } resp.pendingOperations.push({ - type: "reserve", + type: PendingOperationType.Reserve, givesLifeness: true, stage: reserve.reserveStatus, timestampCreated: reserve.created, @@ -180,7 +180,7 @@ async function gatherReservePending( break; default: resp.pendingOperations.push({ - type: "bug", + type: PendingOperationType.Bug, givesLifeness: false, message: "Unknown reserve record status", details: { @@ -199,7 +199,7 @@ async function gatherRefreshPending( resp: PendingOperationsResponse, onlyDue: boolean = false, ): Promise<void> { - await tx.iter(Stores.refresh).forEach(r => { + await tx.iter(Stores.refreshGroups).forEach(r => { if (r.finishedTimestamp) { return; } @@ -211,43 +211,15 @@ async function gatherRefreshPending( if (onlyDue && r.retryInfo.nextRetry.t_ms > now.t_ms) { return; } - let refreshStatus: string; - if (r.norevealIndex === undefined) { - refreshStatus = "melt"; - } else { - refreshStatus = "reveal"; - } resp.pendingOperations.push({ - type: "refresh", + type: PendingOperationType.Refresh, givesLifeness: true, - oldCoinPub: r.meltCoinPub, - refreshStatus, - refreshOutputSize: r.newDenoms.length, - refreshSessionId: r.refreshSessionId, + refreshGroupId: r.refreshGroupId, }); }); } -async function gatherCoinsPending( - tx: TransactionHandle, - now: Timestamp, - resp: PendingOperationsResponse, - onlyDue: boolean = false, -): Promise<void> { - // Refreshing dirty coins is always due. - await tx.iter(Stores.coins).forEach(coin => { - if (coin.status == CoinStatus.Dirty) { - resp.nextRetryDelay = { d_ms: 0 }; - resp.pendingOperations.push({ - givesLifeness: true, - type: "dirty-coin", - coinPub: coin.coinPub, - }); - } - }); -} - async function gatherWithdrawalPending( tx: TransactionHandle, now: Timestamp, @@ -272,7 +244,7 @@ async function gatherWithdrawalPending( ); const numCoinsTotal = wsr.withdrawn.length; resp.pendingOperations.push({ - type: "withdraw", + type: PendingOperationType.Withdraw, givesLifeness: true, numCoinsTotal, numCoinsWithdrawn, @@ -294,7 +266,7 @@ async function gatherProposalPending( return; } resp.pendingOperations.push({ - type: "proposal-choice", + type: PendingOperationType.ProposalChoice, givesLifeness: false, merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, proposalId: proposal.proposalId, @@ -310,7 +282,7 @@ async function gatherProposalPending( return; } resp.pendingOperations.push({ - type: "proposal-download", + type: PendingOperationType.ProposalDownload, givesLifeness: true, merchantBaseUrl: proposal.merchantBaseUrl, orderId: proposal.orderId, @@ -343,7 +315,7 @@ async function gatherTipPending( } if (tip.accepted) { resp.pendingOperations.push({ - type: "tip", + type: PendingOperationType.TipPickup, givesLifeness: true, merchantBaseUrl: tip.merchantBaseUrl, tipId: tip.tipId, @@ -368,7 +340,7 @@ async function gatherPurchasePending( ); if (!onlyDue || pr.payRetryInfo.nextRetry.t_ms <= now.t_ms) { resp.pendingOperations.push({ - type: "pay", + type: PendingOperationType.Pay, givesLifeness: true, isReplay: false, proposalId: pr.proposalId, @@ -385,7 +357,7 @@ async function gatherPurchasePending( ); if (!onlyDue || pr.refundStatusRetryInfo.nextRetry.t_ms <= now.t_ms) { resp.pendingOperations.push({ - type: "refund-query", + type: PendingOperationType.RefundQuery, givesLifeness: true, proposalId: pr.proposalId, retryInfo: pr.refundStatusRetryInfo, @@ -403,7 +375,7 @@ async function gatherPurchasePending( ); if (!onlyDue || pr.refundApplyRetryInfo.nextRetry.t_ms <= now.t_ms) { resp.pendingOperations.push({ - type: "refund-apply", + type: PendingOperationType.RefundApply, numRefundsDone, numRefundsPending, givesLifeness: true, @@ -429,7 +401,7 @@ export async function getPendingOperations( [ Stores.exchanges, Stores.reserves, - Stores.refresh, + Stores.refreshGroups, Stores.coins, Stores.withdrawalSession, Stores.proposals, @@ -440,7 +412,6 @@ export async function getPendingOperations( await gatherExchangePending(tx, now, resp, onlyDue); await gatherReservePending(tx, now, resp, onlyDue); await gatherRefreshPending(tx, now, resp, onlyDue); - await gatherCoinsPending(tx, now, resp, onlyDue); await gatherWithdrawalPending(tx, now, resp, onlyDue); await gatherProposalPending(tx, now, resp, onlyDue); await gatherTipPending(tx, now, resp, onlyDue); diff --git a/src/operations/refresh.ts b/src/operations/refresh.ts index 4ffc3ea60..be23a5bb0 100644 --- a/src/operations/refresh.ts +++ b/src/operations/refresh.ts @@ -25,16 +25,24 @@ import { RefreshSessionRecord, initRetryInfo, updateRetryInfoTimeout, + RefreshGroupRecord, } from "../types/dbTypes"; import { amountToPretty } from "../util/helpers"; -import { Database } from "../util/query"; +import { Database, TransactionHandle } from "../util/query"; import { InternalWalletState } from "./state"; import { Logger } from "../util/logging"; import { getWithdrawDenomList } from "./withdraw"; import { updateExchangeFromUrl } from "./exchanges"; -import { getTimestampNow, OperationError } from "../types/walletTypes"; +import { + getTimestampNow, + OperationError, + CoinPublicKey, + RefreshReason, + RefreshGroupId, +} from "../types/walletTypes"; import { guardOperationException } from "./errors"; import { NotificationType } from "../types/notifications"; +import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto"; const logger = new Logger("refresh.ts"); @@ -71,11 +79,130 @@ export function getTotalRefreshCost( return totalCost; } +/** + * Create a refresh session inside a refresh group. + */ +async function refreshCreateSession( + ws: InternalWalletState, + refreshGroupId: string, + coinIndex: number, +): Promise<void> { + logger.trace( + `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, + ); + const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { + return; + } + if (refreshGroup.finishedPerCoin[coinIndex]) { + return; + } + const existingRefreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; + if (existingRefreshSession) { + return; + } + const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; + const coin = await ws.db.get(Stores.coins, oldCoinPub); + if (!coin) { + throw Error("Can't refresh, coin not found"); + } + + const exchange = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl); + if (!exchange) { + throw Error("db inconsistent: exchange of coin not found"); + } + + const oldDenom = await ws.db.get(Stores.denominations, [ + exchange.baseUrl, + coin.denomPub, + ]); + + if (!oldDenom) { + throw Error("db inconsistent: denomination for coin not found"); + } + + const availableDenoms: DenominationRecord[] = await ws.db + .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl) + .toArray(); + + const availableAmount = Amounts.sub(coin.currentAmount, oldDenom.feeRefresh) + .amount; + + const newCoinDenoms = getWithdrawDenomList(availableAmount, availableDenoms); + + if (newCoinDenoms.length === 0) { + logger.trace( + `not refreshing, available amount ${amountToPretty( + availableAmount, + )} too small`, + ); + await ws.db.runWithWriteTransaction( + [Stores.coins, Stores.refreshGroups], + async tx => { + const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + if (!rg) { + return; + } + rg.finishedPerCoin[coinIndex] = true; + await tx.put(Stores.refreshGroups, rg); + }, + ); + ws.notify({ type: NotificationType.RefreshRefused }); + return; + } + + const refreshSession: RefreshSessionRecord = await ws.cryptoApi.createRefreshSession( + exchange.baseUrl, + 3, + coin, + newCoinDenoms, + oldDenom.feeRefresh, + ); + + // Store refresh session and subtract refreshed amount from + // coin in the same transaction. + await ws.db.runWithWriteTransaction( + [Stores.refreshGroups, Stores.coins], + async tx => { + const c = await tx.get(Stores.coins, coin.coinPub); + if (!c) { + throw Error("coin not found, but marked for refresh"); + } + const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); + if (r.saturated) { + console.log("can't refresh coin, no amount left"); + return; + } + c.currentAmount = r.amount; + c.status = CoinStatus.Dormant; + const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + if (!rg) { + return; + } + if (rg.refreshSessionPerCoin[coinIndex]) { + return; + } + rg.refreshSessionPerCoin[coinIndex] = refreshSession; + await tx.put(Stores.refreshGroups, rg); + await tx.put(Stores.coins, c); + }, + ); + logger.info( + `created refresh session for coin #${coinIndex} in ${refreshGroupId}`, + ); + ws.notify({ type: NotificationType.RefreshStarted }); +} + async function refreshMelt( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, + coinIndex: number, ): Promise<void> { - const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId); + const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { + return; + } + const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; if (!refreshSession) { return; } @@ -122,7 +249,11 @@ async function refreshMelt( refreshSession.norevealIndex = norevealIndex; - await ws.db.mutate(Stores.refresh, refreshSessionId, rs => { + await ws.db.mutate(Stores.refreshGroups, refreshGroupId, rg => { + const rs = rg.refreshSessionPerCoin[coinIndex]; + if (!rs) { + return; + } if (rs.norevealIndex !== undefined) { return; } @@ -130,7 +261,7 @@ async function refreshMelt( return; } rs.norevealIndex = norevealIndex; - return rs; + return rg; }); ws.notify({ @@ -140,9 +271,14 @@ async function refreshMelt( async function refreshReveal( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, + coinIndex: number, ): Promise<void> { - const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId); + const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { + return; + } + const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; if (!refreshSession) { return; } @@ -253,23 +389,38 @@ async function refreshReveal( } await ws.db.runWithWriteTransaction( - [Stores.coins, Stores.refresh], + [Stores.coins, Stores.refreshGroups], async tx => { - const rs = await tx.get(Stores.refresh, refreshSessionId); - if (!rs) { + const rg = await tx.get(Stores.refreshGroups, refreshGroupId); + if (!rg) { console.log("no refresh session found"); return; } + const rs = rg.refreshSessionPerCoin[coinIndex]; + if (!rs) { + return; + } if (rs.finishedTimestamp) { console.log("refresh session already finished"); return; } rs.finishedTimestamp = getTimestampNow(); - rs.retryInfo = initRetryInfo(false); + rg.finishedPerCoin[coinIndex] = true; + let allDone = true; + for (const f of rg.finishedPerCoin) { + if (!f) { + allDone = false; + break; + } + } + if (allDone) { + rg.finishedTimestamp = getTimestampNow(); + rg.retryInfo = initRetryInfo(false); + } for (let coin of coins) { await tx.put(Stores.coins, coin); } - await tx.put(Stores.refresh, rs); + await tx.put(Stores.refreshGroups, rg); }, ); console.log("refresh finished (end of reveal)"); @@ -280,11 +431,11 @@ async function refreshReveal( async function incrementRefreshRetry( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, err: OperationError | undefined, ): Promise<void> { - await ws.db.runWithWriteTransaction([Stores.refresh], async tx => { - const r = await tx.get(Stores.refresh, refreshSessionId); + await ws.db.runWithWriteTransaction([Stores.refreshGroups], async tx => { + const r = await tx.get(Stores.refreshGroups, refreshGroupId); if (!r) { return; } @@ -294,31 +445,31 @@ async function incrementRefreshRetry( r.retryInfo.retryCounter++; updateRetryInfoTimeout(r.retryInfo); r.lastError = err; - await tx.put(Stores.refresh, r); + await tx.put(Stores.refreshGroups, r); }); ws.notify({ type: NotificationType.RefreshOperationError }); } -export async function processRefreshSession( +export async function processRefreshGroup( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, forceNow: boolean = false, -) { - return ws.memoProcessRefresh.memo(refreshSessionId, async () => { +): Promise<void> { + await ws.memoProcessRefresh.memo(refreshGroupId, async () => { const onOpErr = (e: OperationError) => - incrementRefreshRetry(ws, refreshSessionId, e); - return guardOperationException( - () => processRefreshSessionImpl(ws, refreshSessionId, forceNow), + incrementRefreshRetry(ws, refreshGroupId, e); + return await guardOperationException( + async () => await processRefreshGroupImpl(ws, refreshGroupId, forceNow), onOpErr, ); }); } -async function resetRefreshSessionRetry( +async function resetRefreshGroupRetry( ws: InternalWalletState, refreshSessionId: string, ) { - await ws.db.mutate(Stores.refresh, refreshSessionId, x => { + await ws.db.mutate(Stores.refreshGroups, refreshSessionId, x => { if (x.retryInfo.active) { x.retryInfo = initRetryInfo(); } @@ -326,124 +477,87 @@ async function resetRefreshSessionRetry( }); } -async function processRefreshSessionImpl( +async function processRefreshGroupImpl( ws: InternalWalletState, - refreshSessionId: string, + refreshGroupId: string, forceNow: boolean, ) { if (forceNow) { - await resetRefreshSessionRetry(ws, refreshSessionId); + await resetRefreshGroupRetry(ws, refreshGroupId); } - const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId); - if (!refreshSession) { + const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { return; } - if (refreshSession.finishedTimestamp) { + if (refreshGroup.finishedTimestamp) { return; } - if (typeof refreshSession.norevealIndex !== "number") { - await refreshMelt(ws, refreshSession.refreshSessionId); - } - await refreshReveal(ws, refreshSession.refreshSessionId); + const ps = refreshGroup.oldCoinPubs.map((x, i) => + processRefreshSession(ws, refreshGroupId, i), + ); + await Promise.all(ps); logger.trace("refresh finished"); } -export async function refresh( +async function processRefreshSession( ws: InternalWalletState, - oldCoinPub: string, - force: boolean = false, -): Promise<void> { - const coin = await ws.db.get(Stores.coins, oldCoinPub); - if (!coin) { - console.warn("can't refresh, coin not in database"); + refreshGroupId: string, + coinIndex: number, +) { + logger.trace(`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`); + let refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { return; } - switch (coin.status) { - case CoinStatus.Dirty: - break; - case CoinStatus.Dormant: - return; - case CoinStatus.Fresh: - if (!force) { - return; - } - break; - } - - const exchange = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl); - if (!exchange) { - throw Error("db inconsistent: exchange of coin not found"); + if (refreshGroup.finishedPerCoin[coinIndex]) { + return; } - - const oldDenom = await ws.db.get(Stores.denominations, [ - exchange.baseUrl, - coin.denomPub, - ]); - - if (!oldDenom) { - throw Error("db inconsistent: denomination for coin not found"); + if (!refreshGroup.refreshSessionPerCoin[coinIndex]) { + await refreshCreateSession(ws, refreshGroupId, coinIndex); + refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId); + if (!refreshGroup) { + return; + } } - - const availableDenoms: DenominationRecord[] = await ws.db - .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl) - .toArray(); - - const availableAmount = Amounts.sub(coin.currentAmount, oldDenom.feeRefresh) - .amount; - - const newCoinDenoms = getWithdrawDenomList(availableAmount, availableDenoms); - - if (newCoinDenoms.length === 0) { - logger.trace( - `not refreshing, available amount ${amountToPretty( - availableAmount, - )} too small`, - ); - await ws.db.mutate(Stores.coins, oldCoinPub, x => { - if (x.status != coin.status) { - // Concurrent modification? - return; - } - x.status = CoinStatus.Dormant; - return x; - }); - ws.notify({ type: NotificationType.RefreshRefused }); + const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; + if (!refreshSession) { + if (!refreshGroup.finishedPerCoin[coinIndex]) { + throw Error( + "BUG: refresh session was not created and coin not marked as finished", + ); + } return; } + if (refreshSession.norevealIndex === undefined) { + await refreshMelt(ws, refreshGroupId, coinIndex); + } + await refreshReveal(ws, refreshGroupId, coinIndex); +} - const refreshSession: RefreshSessionRecord = await ws.cryptoApi.createRefreshSession( - exchange.baseUrl, - 3, - coin, - newCoinDenoms, - oldDenom.feeRefresh, - ); - - // Store refresh session and subtract refreshed amount from - // coin in the same transaction. - await ws.db.runWithWriteTransaction( - [Stores.refresh, Stores.coins], - async tx => { - const c = await tx.get(Stores.coins, coin.coinPub); - if (!c) { - return; - } - if (c.status !== CoinStatus.Dirty) { - return; - } - const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); - if (r.saturated) { - console.log("can't refresh coin, no amount left"); - return; - } - c.currentAmount = r.amount; - c.status = CoinStatus.Dormant; - await tx.put(Stores.refresh, refreshSession); - await tx.put(Stores.coins, c); - }, - ); - logger.info(`created refresh session ${refreshSession.refreshSessionId}`); - ws.notify({ type: NotificationType.RefreshStarted }); +/** + * Create a refresh group for a list of coins. + */ +export async function createRefreshGroup( + tx: TransactionHandle, + oldCoinPubs: CoinPublicKey[], + reason: RefreshReason, +): Promise<RefreshGroupId> { + const refreshGroupId = encodeCrock(getRandomBytes(32)); + + const refreshGroup: RefreshGroupRecord = { + finishedTimestamp: undefined, + finishedPerCoin: oldCoinPubs.map(x => false), + lastError: undefined, + lastErrorPerCoin: oldCoinPubs.map(x => undefined), + oldCoinPubs: oldCoinPubs.map(x => x.coinPub), + reason, + refreshGroupId, + refreshSessionPerCoin: oldCoinPubs.map(x => undefined), + retryInfo: initRetryInfo(), + }; - await processRefreshSession(ws, refreshSession.refreshSessionId); + await tx.put(Stores.refreshGroups, refreshGroup); + return { + refreshGroupId, + }; } diff --git a/src/operations/reserves.ts b/src/operations/reserves.ts index 215d5ba7d..559d3ab08 100644 --- a/src/operations/reserves.ts +++ b/src/operations/reserves.ts @@ -458,10 +458,10 @@ async function processReserveImpl( break; case ReserveRecordStatus.REGISTERING_BANK: await processReserveBankStatus(ws, reservePub); - return processReserveImpl(ws, reservePub, true); + return await processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.QUERYING_STATUS: await updateReserve(ws, reservePub); - return processReserveImpl(ws, reservePub, true); + return await processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.WITHDRAWING: await depleteReserve(ws, reservePub); break; diff --git a/src/operations/tip.ts b/src/operations/tip.ts index f723374f9..f9953b513 100644 --- a/src/operations/tip.ts +++ b/src/operations/tip.ts @@ -15,7 +15,6 @@ */ -import { Database } from "../util/query"; import { InternalWalletState } from "./state"; import { parseTipUri } from "../util/taleruri"; import { TipStatus, getTimestampNow, OperationError } from "../types/walletTypes"; diff --git a/src/types/dbTypes.ts b/src/types/dbTypes.ts index 6a00a497d..c05aa68d7 100644 --- a/src/types/dbTypes.ts +++ b/src/types/dbTypes.ts @@ -41,6 +41,7 @@ import { OperationError, Duration, getTimestampNow, + RefreshReason, } from "./walletTypes"; export enum ReserveRecordStatus { @@ -572,10 +573,6 @@ export enum CoinStatus { */ Fresh = "fresh", /** - * Used for a completed transaction and now dirty. - */ - Dirty = "dirty", - /** * A coin that has been spent and refreshed. */ Dormant = "dormant", @@ -849,6 +846,39 @@ export interface TipRecord { retryInfo: RetryInfo; } +export interface RefreshGroupRecord { + /** + * Retry info, even present when the operation isn't active to allow indexing + * on the next retry timestamp. + */ + retryInfo: RetryInfo; + + lastError: OperationError | undefined; + + lastErrorPerCoin: (OperationError | undefined)[]; + + refreshGroupId: string; + + reason: RefreshReason; + + oldCoinPubs: string[]; + + refreshSessionPerCoin: (RefreshSessionRecord | undefined)[]; + + /** + * Flag for each coin whether refreshing finished. + * If a coin can't be refreshed (remaining value too small), + * it will be marked as finished, but no refresh session will + * be created. + */ + finishedPerCoin: boolean[]; + + /** + * Timestamp when the refresh session finished. + */ + finishedTimestamp: Timestamp | undefined; +} + /** * Ongoing refresh */ @@ -913,30 +943,19 @@ export interface RefreshSessionRecord { hash: string; /** - * Base URL for the exchange we're doing the refresh with. - */ - exchangeBaseUrl: string; - - /** * Timestamp when the refresh session finished. */ finishedTimestamp: Timestamp | undefined; /** - * A 32-byte base32-crockford encoded random identifier. - */ - refreshSessionId: string; - - /** * When has this refresh session been created? */ created: Timestamp; /** - * Retry info, even present when the operation isn't active to allow indexing - * on the next retry timestamp. + * Base URL for the exchange we're doing the refresh with. */ - retryInfo: RetryInfo; + exchangeBaseUrl: string; } /** @@ -1366,8 +1385,8 @@ export namespace Stores { export const denominations = new DenominationsStore(); export const exchanges = new ExchangesStore(); export const proposals = new ProposalsStore(); - export const refresh = new Store<RefreshSessionRecord>("refresh", { - keyPath: "refreshSessionId", + export const refreshGroups = new Store<RefreshGroupRecord>("refreshGroups", { + keyPath: "refreshGroupId", }); export const reserves = new ReservesStore(); export const purchases = new PurchasesStore(); diff --git a/src/types/history.ts b/src/types/history.ts index 06b863005..54004b122 100644 --- a/src/types/history.ts +++ b/src/types/history.ts @@ -1,4 +1,4 @@ -import { Timestamp } from "./walletTypes"; +import { Timestamp, RefreshReason } from "./walletTypes"; /* This file is part of GNU Taler @@ -604,18 +604,6 @@ export interface HistoryRefund { } /** - * Reasons for why a coin is being refreshed. - */ -export const enum RefreshReason { - Manual = "manual", - Pay = "pay", - Refund = "refund", - AbortPay = "abort-pay", - Recoup = "recoup", - BackupRestored = "backup-restored", -} - -/** * Event to indicate that a group of refresh sessions has completed. */ export interface HistoryRefreshedEvent { diff --git a/src/types/pending.ts b/src/types/pending.ts index 5e381d09a..d08d2c54e 100644 --- a/src/types/pending.ts +++ b/src/types/pending.ts @@ -24,27 +24,41 @@ import { OperationError, Timestamp, Duration } from "./walletTypes"; import { WithdrawalSource, RetryInfo } from "./dbTypes"; +export const enum PendingOperationType { + Bug = "bug", + ExchangeUpdate = "exchange-update", + Pay = "pay", + ProposalChoice = "proposal-choice", + ProposalDownload = "proposal-download", + Refresh = "refresh", + Reserve = "reserve", + RefundApply = "refund-apply", + RefundQuery = "refund-query", + TipChoice = "tip-choice", + TipPickup = "tip-pickup", + Withdraw = "withdraw", +} + /** * Information about a pending operation. */ export type PendingOperationInfo = PendingOperationInfoCommon & ( - | PendingWithdrawOperation - | PendingReserveOperation | PendingBugOperation - | PendingDirtyCoinOperation | PendingExchangeUpdateOperation - | PendingRefreshOperation - | PendingTipOperation - | PendingProposalDownloadOperation - | PendingProposalChoiceOperation | PendingPayOperation - | PendingRefundQueryOperation + | PendingProposalChoiceOperation + | PendingProposalDownloadOperation + | PendingRefreshOperation | PendingRefundApplyOperation + | PendingRefundQueryOperation + | PendingReserveOperation + | PendingTipPickupOperation + | PendingWithdrawOperation ); export interface PendingExchangeUpdateOperation { - type: "exchange-update"; + type: PendingOperationType.ExchangeUpdate; stage: string; reason: string; exchangeBaseUrl: string; @@ -52,13 +66,13 @@ export interface PendingExchangeUpdateOperation { } export interface PendingBugOperation { - type: "bug"; + type: PendingOperationType.Bug; message: string; details: any; } export interface PendingReserveOperation { - type: "reserve"; + type: PendingOperationType.Reserve; retryInfo: RetryInfo | undefined; stage: string; timestampCreated: Timestamp; @@ -68,21 +82,13 @@ export interface PendingReserveOperation { } export interface PendingRefreshOperation { - type: "refresh"; + type: PendingOperationType.Refresh; lastError?: OperationError; - refreshSessionId: string; - oldCoinPub: string; - refreshStatus: string; - refreshOutputSize: number; -} - -export interface PendingDirtyCoinOperation { - type: "dirty-coin"; - coinPub: string; + refreshGroupId: string; } export interface PendingProposalDownloadOperation { - type: "proposal-download"; + type: PendingOperationType.ProposalDownload; merchantBaseUrl: string; proposalTimestamp: Timestamp; proposalId: string; @@ -96,66 +102,57 @@ export interface PendingProposalDownloadOperation { * proposed contract terms. */ export interface PendingProposalChoiceOperation { - type: "proposal-choice"; + type: PendingOperationType.ProposalChoice; merchantBaseUrl: string; proposalTimestamp: Timestamp; proposalId: string; } -export interface PendingTipOperation { - type: "tip"; +export interface PendingTipPickupOperation { + type: PendingOperationType.TipPickup; tipId: string; merchantBaseUrl: string; merchantTipId: string; } export interface PendingPayOperation { - type: "pay"; + type: PendingOperationType.Pay; proposalId: string; isReplay: boolean; - retryInfo: RetryInfo, + retryInfo: RetryInfo; lastError: OperationError | undefined; } export interface PendingRefundQueryOperation { - type: "refund-query"; + type: PendingOperationType.RefundQuery; proposalId: string; - retryInfo: RetryInfo, + retryInfo: RetryInfo; lastError: OperationError | undefined; } export interface PendingRefundApplyOperation { - type: "refund-apply"; + type: PendingOperationType.RefundApply; proposalId: string; - retryInfo: RetryInfo, + retryInfo: RetryInfo; lastError: OperationError | undefined; numRefundsPending: number; numRefundsDone: number; } -export interface PendingOperationInfoCommon { - type: string; - givesLifeness: boolean; -} - - export interface PendingWithdrawOperation { - type: "withdraw"; + type: PendingOperationType.Withdraw; source: WithdrawalSource; withdrawSessionId: string; numCoinsWithdrawn: number; numCoinsTotal: number; } -export interface PendingRefreshOperation { - type: "refresh"; -} - -export interface PendingPayOperation { - type: "pay"; +export interface PendingOperationInfoCommon { + type: PendingOperationType; + givesLifeness: boolean; } export interface PendingOperationsResponse { pendingOperations: PendingOperationInfo[]; nextRetryDelay: Duration; -}
\ No newline at end of file +} diff --git a/src/types/walletTypes.ts b/src/types/walletTypes.ts index 903852b1b..eedae6f2c 100644 --- a/src/types/walletTypes.ts +++ b/src/types/walletTypes.ts @@ -506,3 +506,29 @@ export interface PlanchetCreationRequest { reservePub: string; reservePriv: string; } + +/** + * Reasons for why a coin is being refreshed. + */ +export const enum RefreshReason { + Manual = "manual", + Pay = "pay", + Refund = "refund", + AbortPay = "abort-pay", + Recoup = "recoup", + BackupRestored = "backup-restored", +} + +/** + * Wrapper for coin public keys. + */ +export interface CoinPublicKey { + readonly coinPub: string; +} + +/** + * Wrapper for refresh group IDs. + */ +export interface RefreshGroupId { + readonly refreshGroupId: string; +} diff --git a/src/util/asyncMemo.ts b/src/util/asyncMemo.ts index 193ce6df6..17204a88e 100644 --- a/src/util/asyncMemo.ts +++ b/src/util/asyncMemo.ts @@ -39,15 +39,14 @@ export class AsyncOpMemoMap<T> { const n = this.n++; // Wrap the operation in case it immediately throws const p = Promise.resolve().then(() => pg()); - p.finally(() => { - this.cleanUp(key, n); - }); this.memoMap[key] = { - p, - n, - t: new Date().getTime(), + p, + n, + t: new Date().getTime(), }; - return p; + return p.finally(() => { + this.cleanUp(key, n); + }); } clear() { this.memoMap = {}; diff --git a/src/wallet.ts b/src/wallet.ts index e4088fab2..163f3def9 100644 --- a/src/wallet.ts +++ b/src/wallet.ts @@ -77,6 +77,7 @@ import { AcceptWithdrawalResponse, PurchaseDetails, ExchangeWithdrawDetails, + RefreshReason, } from "./types/walletTypes"; import { Logger } from "./util/logging"; @@ -92,7 +93,7 @@ import { processReserve } from "./operations/reserves"; import { InternalWalletState } from "./operations/state"; import { createReserve, confirmReserve } from "./operations/reserves"; -import { processRefreshSession, refresh } from "./operations/refresh"; +import { processRefreshGroup, createRefreshGroup } from "./operations/refresh"; import { processWithdrawSession } from "./operations/withdraw"; import { getHistory } from "./operations/history"; import { getPendingOperations } from "./operations/pending"; @@ -103,7 +104,7 @@ import { payback } from "./operations/payback"; import { TimerGroup } from "./util/timer"; import { AsyncCondition } from "./util/promiseUtils"; import { AsyncOpMemoSingle } from "./util/asyncMemo"; -import { PendingOperationInfo, PendingOperationsResponse } from "./types/pending"; +import { PendingOperationInfo, PendingOperationsResponse, PendingOperationType } from "./types/pending"; import { WalletNotification, NotificationType } from "./types/notifications"; import { HistoryQuery, HistoryEvent } from "./types/history"; @@ -180,48 +181,45 @@ export class Wallet { ): Promise<void> { console.log("running pending", pending); switch (pending.type) { - case "bug": + case PendingOperationType.Bug: // Nothing to do, will just be displayed to the user return; - case "dirty-coin": - await refresh(this.ws, pending.coinPub); - break; - case "exchange-update": + case PendingOperationType.ExchangeUpdate: await updateExchangeFromUrl(this.ws, pending.exchangeBaseUrl, forceNow); break; - case "refresh": - await processRefreshSession( + case PendingOperationType.Refresh: + await processRefreshGroup( this.ws, - pending.refreshSessionId, + pending.refreshGroupId, forceNow, ); break; - case "reserve": + case PendingOperationType.Reserve: await processReserve(this.ws, pending.reservePub, forceNow); break; - case "withdraw": + case PendingOperationType.Withdraw: await processWithdrawSession( this.ws, pending.withdrawSessionId, forceNow, ); break; - case "proposal-choice": + case PendingOperationType.ProposalChoice: // Nothing to do, user needs to accept/reject break; - case "proposal-download": + case PendingOperationType.ProposalDownload: await processDownloadProposal(this.ws, pending.proposalId, forceNow); break; - case "tip": + case PendingOperationType.TipPickup: await processTip(this.ws, pending.tipId, forceNow); break; - case "pay": + case PendingOperationType.Pay: await processPurchasePay(this.ws, pending.proposalId, forceNow); break; - case "refund-query": + case PendingOperationType.RefundQuery: await processPurchaseQueryRefund(this.ws, pending.proposalId, forceNow); break; - case "refund-apply": + case PendingOperationType.RefundApply: await processPurchaseApplyRefund(this.ws, pending.proposalId, forceNow); break; default: @@ -370,28 +368,6 @@ export class Wallet { } /** - * Refresh all dirty coins. - * The returned promise resolves only after all refresh - * operations have completed. - */ - async refreshDirtyCoins(): Promise<{ numRefreshed: number }> { - let n = 0; - const coins = await this.db.iter(Stores.coins).toArray(); - for (let coin of coins) { - if (coin.status == CoinStatus.Dirty) { - try { - await this.refresh(coin.coinPub); - } catch (e) { - console.log("error during refresh"); - } - - n += 1; - } - } - return { numRefreshed: n }; - } - - /** * Add a contract to the wallet and sign coins, and send them. */ async confirmPay( @@ -496,9 +472,12 @@ export class Wallet { return this.ws.memoGetBalance.memo(() => getBalances(this.ws)); } - async refresh(oldCoinPub: string, force: boolean = false): Promise<void> { + async refresh(oldCoinPub: string): Promise<void> { try { - return refresh(this.ws, oldCoinPub, force); + const refreshGroupId = await this.db.runWithWriteTransaction([Stores.refreshGroups], async (tx) => { + return await createRefreshGroup(tx, [{ coinPub: oldCoinPub }], RefreshReason.Manual); + }); + await processRefreshGroup(this.ws, refreshGroupId.refreshGroupId); } catch (e) { this.latch.trigger(); } |