diff options
author | Florian Dold <florian.dold@gmail.com> | 2019-12-05 19:38:19 +0100 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2019-12-05 19:38:19 +0100 |
commit | f67d7f54f9d0fed97446898942e3dfee67ee2985 (patch) | |
tree | 2b81738025e8f61250ede10908cbf81071e16975 /src/wallet-impl | |
parent | 829acdd3d98f1014747f15ecb619b6fbaa06b640 (diff) | |
download | wallet-core-f67d7f54f9d0fed97446898942e3dfee67ee2985.tar.xz |
threads, retries and notifications WIP
Diffstat (limited to 'src/wallet-impl')
-rw-r--r-- | src/wallet-impl/balance.ts | 3 | ||||
-rw-r--r-- | src/wallet-impl/errors.ts | 81 | ||||
-rw-r--r-- | src/wallet-impl/exchanges.ts | 2 | ||||
-rw-r--r-- | src/wallet-impl/history.ts | 8 | ||||
-rw-r--r-- | src/wallet-impl/pay.ts | 207 | ||||
-rw-r--r-- | src/wallet-impl/payback.ts | 9 | ||||
-rw-r--r-- | src/wallet-impl/pending.ts | 561 | ||||
-rw-r--r-- | src/wallet-impl/refresh.ts | 103 | ||||
-rw-r--r-- | src/wallet-impl/reserves.ts | 110 | ||||
-rw-r--r-- | src/wallet-impl/return.ts | 3 | ||||
-rw-r--r-- | src/wallet-impl/state.ts | 63 | ||||
-rw-r--r-- | src/wallet-impl/tip.ts | 44 | ||||
-rw-r--r-- | src/wallet-impl/withdraw.ts | 98 |
13 files changed, 939 insertions, 353 deletions
diff --git a/src/wallet-impl/balance.ts b/src/wallet-impl/balance.ts index 94d65fa96..a1351014c 100644 --- a/src/wallet-impl/balance.ts +++ b/src/wallet-impl/balance.ts @@ -33,6 +33,7 @@ const logger = new Logger("withdraw.ts"); export async function getBalances( ws: InternalWalletState, ): Promise<WalletBalance> { + logger.trace("starting to compute balance"); /** * Add amount to a balance field, both for * the slicing by exchange and currency. @@ -101,7 +102,7 @@ export async function getBalances( await tx.iter(Stores.refresh).forEach(r => { // Don't count finished refreshes, since the refresh already resulted // in coins being added to the wallet. - if (r.finished) { + if (r.finishedTimestamp) { return; } addTo( diff --git a/src/wallet-impl/errors.ts b/src/wallet-impl/errors.ts new file mode 100644 index 000000000..5df99b7d3 --- /dev/null +++ b/src/wallet-impl/errors.ts @@ -0,0 +1,81 @@ +import { OperationError } from "../walletTypes"; + +/* + This file is part of GNU Taler + (C) 2019 GNUnet e.V. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +/** + * This exception is there to let the caller know that an error happened, + * but the error has already been reported by writing it to the database. + */ +export class OperationFailedAndReportedError extends Error { + constructor(message: string) { + super(message); + + // Set the prototype explicitly. + Object.setPrototypeOf(this, OperationFailedAndReportedError.prototype); + } +} + +/** + * This exception is thrown when an error occured and the caller is + * responsible for recording the failure in the database. + */ +export class OperationFailedError extends Error { + constructor(message: string, public err: OperationError) { + super(message); + + // Set the prototype explicitly. + Object.setPrototypeOf(this, OperationFailedError.prototype); + } +} + +/** + * Run an operation and call the onOpError callback + * when there was an exception or operation error that must be reported. + * The cause will be re-thrown to the caller. + */ +export async function guardOperationException<T>( + op: () => Promise<T>, + onOpError: (e: OperationError) => Promise<void>, +): Promise<T> { + try { + return op(); + } catch (e) { + if (e instanceof OperationFailedAndReportedError) { + throw e; + } + if (e instanceof OperationFailedError) { + await onOpError(e.err); + throw new OperationFailedAndReportedError(e.message); + } + if (e instanceof Error) { + await onOpError({ + type: "exception", + message: e.message, + details: {}, + }); + throw new OperationFailedAndReportedError(e.message); + } + await onOpError({ + type: "exception", + message: "non-error exception thrown", + details: { + value: e.toString(), + }, + }); + throw new OperationFailedAndReportedError(e.message); + } +}
\ No newline at end of file diff --git a/src/wallet-impl/exchanges.ts b/src/wallet-impl/exchanges.ts index b3677c6c6..b89f3f84e 100644 --- a/src/wallet-impl/exchanges.ts +++ b/src/wallet-impl/exchanges.ts @@ -17,7 +17,6 @@ import { InternalWalletState } from "./state"; import { WALLET_CACHE_BREAKER_CLIENT_VERSION, - OperationFailedAndReportedError, } from "../wallet"; import { KeysJson, Denomination, ExchangeWireJson } from "../talerTypes"; import { getTimestampNow, OperationError } from "../walletTypes"; @@ -42,6 +41,7 @@ import { } from "../util/query"; import * as Amounts from "../util/amounts"; import { parsePaytoUri } from "../util/payto"; +import { OperationFailedAndReportedError } from "./errors"; async function denominationRecordFromKeys( ws: InternalWalletState, diff --git a/src/wallet-impl/history.ts b/src/wallet-impl/history.ts index dfc683e6d..5e93ab878 100644 --- a/src/wallet-impl/history.ts +++ b/src/wallet-impl/history.ts @@ -78,11 +78,11 @@ export async function getHistory( fulfillmentUrl: p.contractTerms.fulfillment_url, merchantName: p.contractTerms.merchant.name, }, - timestamp: p.timestamp, + timestamp: p.acceptTimestamp, type: "pay", explicit: false, }); - if (p.timestamp_refund) { + if (p.lastRefundTimestamp) { const contractAmount = Amounts.parseOrThrow(p.contractTerms.amount); const amountsPending = Object.keys(p.refundsPending).map(x => Amounts.parseOrThrow(p.refundsPending[x].refund_amount), @@ -103,7 +103,7 @@ export async function getHistory( merchantName: p.contractTerms.merchant.name, refundAmount: amount, }, - timestamp: p.timestamp_refund, + timestamp: p.lastRefundTimestamp, type: "refund", explicit: false, }); @@ -151,7 +151,7 @@ export async function getHistory( merchantBaseUrl: tip.merchantBaseUrl, tipId: tip.merchantTipId, }, - timestamp: tip.timestamp, + timestamp: tip.createdTimestamp, explicit: false, type: "tip", }); diff --git a/src/wallet-impl/pay.ts b/src/wallet-impl/pay.ts index 9942139a6..9b2da9c7d 100644 --- a/src/wallet-impl/pay.ts +++ b/src/wallet-impl/pay.ts @@ -33,6 +33,8 @@ import { getTimestampNow, PreparePayResult, ConfirmPayResult, + OperationError, + NotificationType, } from "../walletTypes"; import { oneShotIter, @@ -51,12 +53,14 @@ import { PurchaseRecord, CoinRecord, ProposalStatus, + initRetryInfo, + updateRetryInfoTimeout, + PurchaseStatus, } from "../dbTypes"; import * as Amounts from "../util/amounts"; import { amountToPretty, strcmp, - extractTalerStamp, canonicalJson, extractTalerStampOrThrow, } from "../util/helpers"; @@ -65,6 +69,8 @@ import { InternalWalletState } from "./state"; import { parsePayUri, parseRefundUri } from "../util/taleruri"; import { getTotalRefreshCost, refresh } from "./refresh"; import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto"; +import { guardOperationException } from "./errors"; +import { assertUnreachable } from "../util/assertUnreachable"; export interface SpeculativePayData { payCoinInfo: PayCoinInfo; @@ -344,9 +350,12 @@ async function recordConfirmPay( payReq, refundsDone: {}, refundsPending: {}, - timestamp: getTimestampNow(), - timestamp_refund: undefined, + acceptTimestamp: getTimestampNow(), + lastRefundTimestamp: undefined, proposalId: proposal.proposalId, + retryInfo: initRetryInfo(), + lastError: undefined, + status: PurchaseStatus.SubmitPay, }; await runWithWriteTransaction( @@ -365,8 +374,10 @@ async function recordConfirmPay( }, ); - ws.badge.showNotification(); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.ProposalAccepted, + proposalId: proposal.proposalId, + }); return t; } @@ -419,7 +430,7 @@ export async function abortFailedPayment( } const refundResponse = MerchantRefundResponse.checked(resp.responseJson); - await acceptRefundResponse(ws, refundResponse); + await acceptRefundResponse(ws, purchase.proposalId, refundResponse); await runWithWriteTransaction(ws.db, [Stores.purchases], async tx => { const p = await tx.get(Stores.purchases, proposalId); @@ -431,10 +442,62 @@ export async function abortFailedPayment( }); } +async function incrementProposalRetry( + ws: InternalWalletState, + proposalId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.proposals], async tx => { + const pr = await tx.get(Stores.proposals, proposalId); + if (!pr) { + return; + } + if (!pr.retryInfo) { + return; + } + pr.retryInfo.retryCounter++; + updateRetryInfoTimeout(pr.retryInfo); + pr.lastError = err; + await tx.put(Stores.proposals, pr); + }); +} + +async function incrementPurchaseRetry( + ws: InternalWalletState, + proposalId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.purchases], async tx => { + const pr = await tx.get(Stores.purchases, proposalId); + if (!pr) { + return; + } + if (!pr.retryInfo) { + return; + } + pr.retryInfo.retryCounter++; + updateRetryInfoTimeout(pr.retryInfo); + pr.lastError = err; + await tx.put(Stores.purchases, pr); + }); +} + export async function processDownloadProposal( ws: InternalWalletState, proposalId: string, ): Promise<void> { + const onOpErr = (err: OperationError) => + incrementProposalRetry(ws, proposalId, err); + await guardOperationException( + () => processDownloadProposalImpl(ws, proposalId), + onOpErr, + ); +} + +async function processDownloadProposalImpl( + ws: InternalWalletState, + proposalId: string, +): Promise<void> { const proposal = await oneShotGet(ws.db, Stores.proposals, proposalId); if (!proposal) { return; @@ -498,7 +561,10 @@ export async function processDownloadProposal( }, ); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.ProposalDownloaded, + proposalId: proposal.proposalId, + }); } /** @@ -536,6 +602,8 @@ async function startDownloadProposal( proposalId: proposalId, proposalStatus: ProposalStatus.DOWNLOADING, repurchaseProposalId: undefined, + retryInfo: initRetryInfo(), + lastError: undefined, }; await oneShotPut(ws.db, Stores.proposals, proposalRecord); @@ -582,6 +650,7 @@ export async function submitPay( throw Error("merchant payment signature invalid"); } purchase.finished = true; + purchase.retryInfo = initRetryInfo(false); const modifiedCoins: CoinRecord[] = []; for (const pc of purchase.payReq.coins) { const c = await oneShotGet(ws.db, Stores.coins, pc.coin_pub); @@ -859,8 +928,6 @@ export async function confirmPay( return submitPay(ws, proposalId, sessionId); } - - export async function getFullRefundFees( ws: InternalWalletState, refundPermissions: MerchantRefundPermission[], @@ -914,15 +981,13 @@ export async function getFullRefundFees( return feeAcc; } -async function submitRefunds( +async function submitRefundsToExchange( ws: InternalWalletState, proposalId: string, ): Promise<void> { const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); if (!purchase) { - console.error( - "not submitting refunds, payment not found:", - ); + console.error("not submitting refunds, payment not found:"); return; } const pendingKeys = Object.keys(purchase.refundsPending); @@ -991,14 +1056,18 @@ async function submitRefunds( refresh(ws, perm.coin_pub); } - ws.badge.showNotification(); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.RefundsSubmitted, + proposalId, + }); } -export async function acceptRefundResponse( + +async function acceptRefundResponse( ws: InternalWalletState, + proposalId: string, refundResponse: MerchantRefundResponse, -): Promise<string> { +): Promise<void> { const refundPermissions = refundResponse.refund_permissions; if (!refundPermissions.length) { @@ -1015,7 +1084,8 @@ export async function acceptRefundResponse( return; } - t.timestamp_refund = getTimestampNow(); + t.lastRefundTimestamp = getTimestampNow(); + t.status = PurchaseStatus.ProcessRefund; for (const perm of refundPermissions) { if ( @@ -1027,18 +1097,48 @@ export async function acceptRefundResponse( } return t; } + // Add the refund permissions to the purchase within a DB transaction + await oneShotMutate(ws.db, Stores.purchases, proposalId, f); + await submitRefundsToExchange(ws, proposalId); +} - const hc = refundResponse.h_contract_terms; - // Add the refund permissions to the purchase within a DB transaction - await oneShotMutate(ws.db, Stores.purchases, hc, f); - ws.notifier.notify(); +async function queryRefund(ws: InternalWalletState, proposalId: string): Promise<void> { + const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); + if (purchase?.status !== PurchaseStatus.QueryRefund) { + return; + } - await submitRefunds(ws, hc); + const refundUrl = new URL("refund", purchase.contractTerms.merchant_base_url).href + let resp; + try { + resp = await ws.http.get(refundUrl); + } catch (e) { + console.error("error downloading refund permission", e); + throw e; + } - return hc; + const refundResponse = MerchantRefundResponse.checked(resp.responseJson); + await acceptRefundResponse(ws, proposalId, refundResponse); } +async function startRefundQuery(ws: InternalWalletState, proposalId: string): Promise<void> { + const success = await runWithWriteTransaction(ws.db, [Stores.purchases], async (tx) => { + const p = await tx.get(Stores.purchases, proposalId); + if (p?.status !== PurchaseStatus.Done) { + return false; + } + p.status = PurchaseStatus.QueryRefund; + return true; + }); + + if (!success) { + return; + } + await queryRefund(ws, proposalId); +} + + /** * Accept a refund, return the contract hash for the contract * that was involved in the refund. @@ -1053,17 +1153,56 @@ export async function applyRefund( throw Error("invalid refund URI"); } - const refundUrl = parseResult.refundUrl; + const purchase = await oneShotGetIndexed( + ws.db, + Stores.purchases.orderIdIndex, + [parseResult.merchantBaseUrl, parseResult.orderId], + ); - logger.trace("processing refund"); - let resp; - try { - resp = await ws.http.get(refundUrl); - } catch (e) { - console.error("error downloading refund permission", e); - throw e; + if (!purchase) { + throw Error("no purchase for the taler://refund/ URI was found"); } - const refundResponse = MerchantRefundResponse.checked(resp.responseJson); - return acceptRefundResponse(ws, refundResponse); + await startRefundQuery(ws, purchase.proposalId); + + return purchase.contractTermsHash; +} + +export async function processPurchase( + ws: InternalWalletState, + proposalId: string, +): Promise<void> { + const onOpErr = (e: OperationError) => + incrementPurchaseRetry(ws, proposalId, e); + await guardOperationException( + () => processPurchaseImpl(ws, proposalId), + onOpErr, + ); +} + +export async function processPurchaseImpl( + ws: InternalWalletState, + proposalId: string, +): Promise<void> { + const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); + if (!purchase) { + return; + } + switch (purchase.status) { + case PurchaseStatus.Done: + return; + case PurchaseStatus.Abort: + // FIXME + break; + case PurchaseStatus.SubmitPay: + break; + case PurchaseStatus.QueryRefund: + await queryRefund(ws, proposalId); + break; + case PurchaseStatus.ProcessRefund: + await submitRefundsToExchange(ws, proposalId); + break; + default: + throw assertUnreachable(purchase.status); + } } diff --git a/src/wallet-impl/payback.ts b/src/wallet-impl/payback.ts index 5bf5ff06e..56696d771 100644 --- a/src/wallet-impl/payback.ts +++ b/src/wallet-impl/payback.ts @@ -29,6 +29,7 @@ import { Stores, TipRecord, CoinStatus } from "../dbTypes"; import { Logger } from "../util/logging"; import { PaybackConfirmation } from "../talerTypes"; import { updateExchangeFromUrl } from "./exchanges"; +import { NotificationType } from "../walletTypes"; const logger = new Logger("payback.ts"); @@ -65,7 +66,9 @@ export async function payback( await tx.put(Stores.reserves, reserve); }, ); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.PaybackStarted, + }); const paybackRequest = await ws.cryptoApi.createPaybackRequest(coin); const reqUrl = new URL("payback", coin.exchangeBaseUrl); @@ -83,6 +86,8 @@ export async function payback( } coin.status = CoinStatus.Dormant; await oneShotPut(ws.db, Stores.coins, coin); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.PaybackFinished, + }); await updateExchangeFromUrl(ws, coin.exchangeBaseUrl, true); } diff --git a/src/wallet-impl/pending.ts b/src/wallet-impl/pending.ts index 72102e3a1..bd10538af 100644 --- a/src/wallet-impl/pending.ts +++ b/src/wallet-impl/pending.ts @@ -21,8 +21,10 @@ import { PendingOperationInfo, PendingOperationsResponse, getTimestampNow, + Timestamp, + Duration, } from "../walletTypes"; -import { runWithReadTransaction } from "../util/query"; +import { runWithReadTransaction, TransactionHandle } from "../util/query"; import { InternalWalletState } from "./state"; import { Stores, @@ -32,11 +34,355 @@ import { ProposalStatus, } from "../dbTypes"; +function updateRetryDelay( + oldDelay: Duration, + now: Timestamp, + retryTimestamp: Timestamp, +): Duration { + if (retryTimestamp.t_ms <= now.t_ms) { + return { d_ms: 0 }; + } + return { d_ms: Math.min(oldDelay.d_ms, retryTimestamp.t_ms - now.t_ms) }; +} + +async function gatherExchangePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + if (onlyDue) { + // FIXME: exchanges should also be updated regularly + return; + } + await tx.iter(Stores.exchanges).forEach(e => { + switch (e.updateStatus) { + case ExchangeUpdateStatus.FINISHED: + if (e.lastError) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record is in FINISHED state but has lastError set", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + if (!e.details) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record does not have details, but no update in progress.", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + if (!e.wireInfo) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record does not have wire info, but no update in progress.", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + break; + case ExchangeUpdateStatus.FETCH_KEYS: + resp.pendingOperations.push({ + type: "exchange-update", + givesLifeness: false, + stage: "fetch-keys", + exchangeBaseUrl: e.baseUrl, + lastError: e.lastError, + reason: e.updateReason || "unknown", + }); + break; + case ExchangeUpdateStatus.FETCH_WIRE: + resp.pendingOperations.push({ + type: "exchange-update", + givesLifeness: false, + stage: "fetch-wire", + exchangeBaseUrl: e.baseUrl, + lastError: e.lastError, + reason: e.updateReason || "unknown", + }); + break; + default: + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: "Unknown exchangeUpdateStatus", + details: { + exchangeBaseUrl: e.baseUrl, + exchangeUpdateStatus: e.updateStatus, + }, + }); + break; + } + }); +} + +async function gatherReservePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + // FIXME: this should be optimized by using an index for "onlyDue==true". + await tx.iter(Stores.reserves).forEach(reserve => { + const reserveType = reserve.bankWithdrawStatusUrl ? "taler-bank" : "manual"; + if (!reserve.retryInfo.active) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + reserve.retryInfo.nextRetry, + ); + if (onlyDue && reserve.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + switch (reserve.reserveStatus) { + case ReserveRecordStatus.DORMANT: + // nothing to report as pending + break; + case ReserveRecordStatus.WITHDRAWING: + case ReserveRecordStatus.UNCONFIRMED: + case ReserveRecordStatus.QUERYING_STATUS: + case ReserveRecordStatus.REGISTERING_BANK: + resp.pendingOperations.push({ + type: "reserve", + givesLifeness: true, + stage: reserve.reserveStatus, + timestampCreated: reserve.created, + reserveType, + reservePub: reserve.reservePub, + retryInfo: reserve.retryInfo, + }); + break; + case ReserveRecordStatus.WAIT_CONFIRM_BANK: + resp.pendingOperations.push({ + type: "reserve", + givesLifeness: true, + stage: reserve.reserveStatus, + timestampCreated: reserve.created, + reserveType, + reservePub: reserve.reservePub, + bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl, + retryInfo: reserve.retryInfo, + }); + break; + default: + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: "Unknown reserve record status", + details: { + reservePub: reserve.reservePub, + reserveStatus: reserve.reserveStatus, + }, + }); + break; + } + }); +} + +async function gatherRefreshPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.refresh).forEach(r => { + if (r.finishedTimestamp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + r.retryInfo.nextRetry, + ); + if (onlyDue && r.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + let refreshStatus: string; + if (r.norevealIndex === undefined) { + refreshStatus = "melt"; + } else { + refreshStatus = "reveal"; + } + + resp.pendingOperations.push({ + type: "refresh", + givesLifeness: true, + oldCoinPub: r.meltCoinPub, + refreshStatus, + refreshOutputSize: r.newDenoms.length, + refreshSessionId: r.refreshSessionId, + }); + }); +} + +async function gatherCoinsPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + // Refreshing dirty coins is always due. + await tx.iter(Stores.coins).forEach(coin => { + if (coin.status == CoinStatus.Dirty) { + resp.nextRetryDelay.d_ms = 0; + resp.pendingOperations.push({ + givesLifeness: true, + type: "dirty-coin", + coinPub: coin.coinPub, + }); + } + }); +} + +async function gatherWithdrawalPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.withdrawalSession).forEach(wsr => { + if (wsr.finishTimestamp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + wsr.retryInfo.nextRetry, + ); + if (onlyDue && wsr.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + const numCoinsWithdrawn = wsr.withdrawn.reduce((a, x) => a + (x ? 1 : 0), 0); + const numCoinsTotal = wsr.withdrawn.length; + resp.pendingOperations.push({ + type: "withdraw", + givesLifeness: true, + numCoinsTotal, + numCoinsWithdrawn, + source: wsr.source, + withdrawSessionId: wsr.withdrawSessionId, + }); + }); +} + +async function gatherProposalPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.proposals).forEach(proposal => { + if (proposal.proposalStatus == ProposalStatus.PROPOSED) { + if (onlyDue) { + return; + } + resp.pendingOperations.push({ + type: "proposal-choice", + givesLifeness: false, + merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, + proposalId: proposal.proposalId, + proposalTimestamp: proposal.timestamp, + }); + } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) { + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + proposal.retryInfo.nextRetry, + ); + if (onlyDue && proposal.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + resp.pendingOperations.push({ + type: "proposal-download", + givesLifeness: true, + merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, + proposalId: proposal.proposalId, + proposalTimestamp: proposal.timestamp, + }); + } + }); +} + +async function gatherTipPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.tips).forEach(tip => { + if (tip.pickedUp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + tip.retryInfo.nextRetry, + ); + if (onlyDue && tip.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + if (tip.accepted) { + resp.pendingOperations.push({ + type: "tip", + givesLifeness: true, + merchantBaseUrl: tip.merchantBaseUrl, + tipId: tip.tipId, + merchantTipId: tip.merchantTipId, + }); + } + }); +} + +async function gatherPurchasePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.purchases).forEach((pr) => { + if (pr.finished) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + pr.retryInfo.nextRetry, + ); + if (onlyDue && pr.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + resp.pendingOperations.push({ + type: "pay", + givesLifeness: true, + isReplay: false, + proposalId: pr.proposalId, + }); + }); + +} + export async function getPendingOperations( ws: InternalWalletState, + onlyDue: boolean = false, ): Promise<PendingOperationsResponse> { - const pendingOperations: PendingOperationInfo[] = []; - let minRetryDurationMs = 5000; + const resp: PendingOperationsResponse = { + nextRetryDelay: { d_ms: Number.MAX_SAFE_INTEGER }, + pendingOperations: [], + }; + const now = getTimestampNow(); await runWithReadTransaction( ws.db, [ @@ -47,207 +393,18 @@ export async function getPendingOperations( Stores.withdrawalSession, Stores.proposals, Stores.tips, + Stores.purchases, ], async tx => { - await tx.iter(Stores.exchanges).forEach(e => { - switch (e.updateStatus) { - case ExchangeUpdateStatus.FINISHED: - if (e.lastError) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record is in FINISHED state but has lastError set", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - if (!e.details) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record does not have details, but no update in progress.", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - if (!e.wireInfo) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record does not have wire info, but no update in progress.", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - break; - case ExchangeUpdateStatus.FETCH_KEYS: - pendingOperations.push({ - type: "exchange-update", - stage: "fetch-keys", - exchangeBaseUrl: e.baseUrl, - lastError: e.lastError, - reason: e.updateReason || "unknown", - }); - break; - case ExchangeUpdateStatus.FETCH_WIRE: - pendingOperations.push({ - type: "exchange-update", - stage: "fetch-wire", - exchangeBaseUrl: e.baseUrl, - lastError: e.lastError, - reason: e.updateReason || "unknown", - }); - break; - default: - pendingOperations.push({ - type: "bug", - message: "Unknown exchangeUpdateStatus", - details: { - exchangeBaseUrl: e.baseUrl, - exchangeUpdateStatus: e.updateStatus, - }, - }); - break; - } - }); - await tx.iter(Stores.reserves).forEach(reserve => { - const reserveType = reserve.bankWithdrawStatusUrl - ? "taler-bank" - : "manual"; - const now = getTimestampNow(); - switch (reserve.reserveStatus) { - case ReserveRecordStatus.DORMANT: - // nothing to report as pending - break; - case ReserveRecordStatus.WITHDRAWING: - case ReserveRecordStatus.UNCONFIRMED: - case ReserveRecordStatus.QUERYING_STATUS: - case ReserveRecordStatus.REGISTERING_BANK: - pendingOperations.push({ - type: "reserve", - stage: reserve.reserveStatus, - timestampCreated: reserve.created, - reserveType, - reservePub: reserve.reservePub, - }); - if (reserve.created.t_ms < now.t_ms - 5000) { - minRetryDurationMs = 500; - } else if (reserve.created.t_ms < now.t_ms - 30000) { - minRetryDurationMs = 2000; - } - break; - case ReserveRecordStatus.WAIT_CONFIRM_BANK: - pendingOperations.push({ - type: "reserve", - stage: reserve.reserveStatus, - timestampCreated: reserve.created, - reserveType, - reservePub: reserve.reservePub, - bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl, - }); - if (reserve.created.t_ms < now.t_ms - 5000) { - minRetryDurationMs = 500; - } else if (reserve.created.t_ms < now.t_ms - 30000) { - minRetryDurationMs = 2000; - } - break; - default: - pendingOperations.push({ - type: "bug", - message: "Unknown reserve record status", - details: { - reservePub: reserve.reservePub, - reserveStatus: reserve.reserveStatus, - }, - }); - break; - } - }); - - await tx.iter(Stores.refresh).forEach(r => { - if (r.finished) { - return; - } - let refreshStatus: string; - if (r.norevealIndex === undefined) { - refreshStatus = "melt"; - } else { - refreshStatus = "reveal"; - } - - pendingOperations.push({ - type: "refresh", - oldCoinPub: r.meltCoinPub, - refreshStatus, - refreshOutputSize: r.newDenoms.length, - refreshSessionId: r.refreshSessionId, - }); - }); - - await tx.iter(Stores.coins).forEach(coin => { - if (coin.status == CoinStatus.Dirty) { - pendingOperations.push({ - type: "dirty-coin", - coinPub: coin.coinPub, - }); - } - }); - - await tx.iter(Stores.withdrawalSession).forEach(ws => { - const numCoinsWithdrawn = ws.withdrawn.reduce( - (a, x) => a + (x ? 1 : 0), - 0, - ); - const numCoinsTotal = ws.withdrawn.length; - if (numCoinsWithdrawn < numCoinsTotal) { - pendingOperations.push({ - type: "withdraw", - numCoinsTotal, - numCoinsWithdrawn, - source: ws.source, - withdrawSessionId: ws.withdrawSessionId, - }); - } - }); - - await tx.iter(Stores.proposals).forEach((proposal) => { - if (proposal.proposalStatus == ProposalStatus.PROPOSED) { - pendingOperations.push({ - type: "proposal-choice", - merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, - proposalId: proposal.proposalId, - proposalTimestamp: proposal.timestamp, - }); - } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) { - pendingOperations.push({ - type: "proposal-download", - merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, - proposalId: proposal.proposalId, - proposalTimestamp: proposal.timestamp, - }); - } - }); - - await tx.iter(Stores.tips).forEach((tip) => { - if (tip.accepted && !tip.pickedUp) { - pendingOperations.push({ - type: "tip", - merchantBaseUrl: tip.merchantBaseUrl, - tipId: tip.tipId, - merchantTipId: tip.merchantTipId, - }); - } - }); + await gatherExchangePending(tx, now, resp, onlyDue); + await gatherReservePending(tx, now, resp, onlyDue); + await gatherRefreshPending(tx, now, resp, onlyDue); + await gatherCoinsPending(tx, now, resp, onlyDue); + await gatherWithdrawalPending(tx, now, resp, onlyDue); + await gatherProposalPending(tx, now, resp, onlyDue); + await gatherTipPending(tx, now, resp, onlyDue); + await gatherPurchasePending(tx, now, resp, onlyDue); }, ); - - return { - pendingOperations, - nextRetryDelay: { - d_ms: minRetryDurationMs, - }, - }; + return resp; } diff --git a/src/wallet-impl/refresh.ts b/src/wallet-impl/refresh.ts index 7e7270ed3..a3b48919d 100644 --- a/src/wallet-impl/refresh.ts +++ b/src/wallet-impl/refresh.ts @@ -23,6 +23,8 @@ import { RefreshPlanchetRecord, CoinRecord, RefreshSessionRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import { amountToPretty } from "../util/helpers"; import { @@ -36,6 +38,8 @@ import { InternalWalletState } from "./state"; import { Logger } from "../util/logging"; import { getWithdrawDenomList } from "./withdraw"; import { updateExchangeFromUrl } from "./exchanges"; +import { getTimestampNow, OperationError, NotificationType } from "../walletTypes"; +import { guardOperationException } from "./errors"; const logger = new Logger("refresh.ts"); @@ -132,14 +136,16 @@ async function refreshMelt( if (rs.norevealIndex !== undefined) { return; } - if (rs.finished) { + if (rs.finishedTimestamp) { return; } rs.norevealIndex = norevealIndex; return rs; }); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.RefreshMelted, + }); } async function refreshReveal( @@ -225,16 +231,6 @@ async function refreshReveal( return; } - const exchange = oneShotGet( - ws.db, - Stores.exchanges, - refreshSession.exchangeBaseUrl, - ); - if (!exchange) { - console.error(`exchange ${refreshSession.exchangeBaseUrl} not found`); - return; - } - const coins: CoinRecord[] = []; for (let i = 0; i < respJson.ev_sigs.length; i++) { @@ -271,32 +267,72 @@ async function refreshReveal( coins.push(coin); } - refreshSession.finished = true; - await runWithWriteTransaction( ws.db, [Stores.coins, Stores.refresh], async tx => { const rs = await tx.get(Stores.refresh, refreshSessionId); if (!rs) { + console.log("no refresh session found"); return; } - if (rs.finished) { + if (rs.finishedTimestamp) { + console.log("refresh session already finished"); return; } + rs.finishedTimestamp = getTimestampNow(); + rs.retryInfo = initRetryInfo(false); for (let coin of coins) { await tx.put(Stores.coins, coin); } - await tx.put(Stores.refresh, refreshSession); + await tx.put(Stores.refresh, rs); }, ); - ws.notifier.notify(); + console.log("refresh finished (end of reveal)"); + ws.notify({ + type: NotificationType.RefreshRevealed, + }); } +async function incrementRefreshRetry( + ws: InternalWalletState, + refreshSessionId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.refresh], async tx => { + const r = await tx.get(Stores.refresh, refreshSessionId); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.put(Stores.refresh, r); + }); +} + + export async function processRefreshSession( ws: InternalWalletState, refreshSessionId: string, ) { + return ws.memoProcessRefresh.memo(refreshSessionId, async () => { + const onOpErr = (e: OperationError) => + incrementRefreshRetry(ws, refreshSessionId, e); + return guardOperationException( + () => processRefreshSessionImpl(ws, refreshSessionId), + onOpErr, + ); + }); +} + +async function processRefreshSessionImpl( + ws: InternalWalletState, + refreshSessionId: string, +) { const refreshSession = await oneShotGet( ws.db, Stores.refresh, @@ -305,7 +341,7 @@ export async function processRefreshSession( if (!refreshSession) { return; } - if (refreshSession.finished) { + if (refreshSession.finishedTimestamp) { return; } if (typeof refreshSession.norevealIndex !== "number") { @@ -376,7 +412,7 @@ export async function refresh( x.status = CoinStatus.Dormant; return x; }); - ws.notifier.notify(); + ws.notify( { type: NotificationType.RefreshRefused }); return; } @@ -388,29 +424,32 @@ export async function refresh( oldDenom.feeRefresh, ); - function mutateCoin(c: CoinRecord): CoinRecord { - const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); - if (r.saturated) { - // Something else must have written the coin value - throw TransactionAbort; - } - c.currentAmount = r.amount; - c.status = CoinStatus.Dormant; - return c; - } - // Store refresh session and subtract refreshed amount from // coin in the same transaction. await runWithWriteTransaction( ws.db, [Stores.refresh, Stores.coins], async tx => { + const c = await tx.get(Stores.coins, coin.coinPub); + if (!c) { + return; + } + if (c.status !== CoinStatus.Dirty) { + return; + } + const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); + if (r.saturated) { + console.log("can't refresh coin, no amount left"); + return; + } + c.currentAmount = r.amount; + c.status = CoinStatus.Dormant; await tx.put(Stores.refresh, refreshSession); - await tx.mutate(Stores.coins, coin.coinPub, mutateCoin); + await tx.put(Stores.coins, c); }, ); logger.info(`created refresh session ${refreshSession.refreshSessionId}`); - ws.notifier.notify(); + ws.notify( { type: NotificationType.RefreshStarted }); await processRefreshSession(ws, refreshSession.refreshSessionId); } diff --git a/src/wallet-impl/reserves.ts b/src/wallet-impl/reserves.ts index d70f02576..f00956b46 100644 --- a/src/wallet-impl/reserves.ts +++ b/src/wallet-impl/reserves.ts @@ -20,6 +20,7 @@ import { getTimestampNow, ConfirmReserveRequest, OperationError, + NotificationType, } from "../walletTypes"; import { canonicalizeBaseUrl } from "../util/helpers"; import { InternalWalletState } from "./state"; @@ -29,6 +30,8 @@ import { CurrencyRecord, Stores, WithdrawalSessionRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import { oneShotMutate, @@ -42,13 +45,13 @@ import * as Amounts from "../util/amounts"; import { updateExchangeFromUrl, getExchangeTrust } from "./exchanges"; import { WithdrawOperationStatusResponse, ReserveStatus } from "../talerTypes"; import { assertUnreachable } from "../util/assertUnreachable"; -import { OperationFailedAndReportedError } from "../wallet"; import { encodeCrock } from "../crypto/talerCrypto"; import { randomBytes } from "../crypto/primitives/nacl-fast"; import { getVerifiedWithdrawDenomList, processWithdrawSession, } from "./withdraw"; +import { guardOperationException, OperationFailedAndReportedError } from "./errors"; const logger = new Logger("reserves.ts"); @@ -91,7 +94,9 @@ export async function createReserve( bankWithdrawStatusUrl: req.bankWithdrawStatusUrl, exchangeWire: req.exchangeWire, reserveStatus, - lastStatusQuery: undefined, + lastSuccessfulStatusQuery: undefined, + retryInfo: initRetryInfo(), + lastError: undefined, }; const senderWire = req.senderWire; @@ -171,7 +176,7 @@ export async function createReserve( // Asynchronously process the reserve, but return // to the caller already. - processReserve(ws, resp.reservePub).catch(e => { + processReserve(ws, resp.reservePub, true).catch(e => { console.error("Processing reserve failed:", e); }); @@ -188,18 +193,19 @@ export async function createReserve( export async function processReserve( ws: InternalWalletState, reservePub: string, + forceNow: boolean = false, ): Promise<void> { - const p = ws.memoProcessReserve.find(reservePub); - if (p) { - return p; - } else { - return ws.memoProcessReserve.put( - reservePub, - processReserveImpl(ws, reservePub), + return ws.memoProcessReserve.memo(reservePub, async () => { + const onOpError = (err: OperationError) => + incrementReserveRetry(ws, reservePub, err); + await guardOperationException( + () => processReserveImpl(ws, reservePub, forceNow), + onOpError, ); - } + }); } + async function registerReserveWithBank( ws: InternalWalletState, reservePub: string, @@ -235,6 +241,7 @@ async function registerReserveWithBank( } r.timestampReserveInfoPosted = getTimestampNow(); r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK; + r.retryInfo = initRetryInfo(); return r; }); return processReserveBankStatus(ws, reservePub); @@ -244,6 +251,18 @@ export async function processReserveBankStatus( ws: InternalWalletState, reservePub: string, ): Promise<void> { + const onOpError = (err: OperationError) => + incrementReserveRetry(ws, reservePub, err); + await guardOperationException( + () => processReserveBankStatusImpl(ws, reservePub), + onOpError, + ); +} + +async function processReserveBankStatusImpl( + ws: InternalWalletState, + reservePub: string, +): Promise<void> { let reserve = await oneShotGet(ws.db, Stores.reserves, reservePub); switch (reserve?.reserveStatus) { case ReserveRecordStatus.WAIT_CONFIRM_BANK: @@ -287,9 +306,10 @@ export async function processReserveBankStatus( const now = getTimestampNow(); r.timestampConfirmed = now; r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + r.retryInfo = initRetryInfo(); return r; }); - await processReserveImpl(ws, reservePub); + await processReserveImpl(ws, reservePub, true); } else { await oneShotMutate(ws.db, Stores.reserves, reservePub, r => { switch (r.reserveStatus) { @@ -304,16 +324,24 @@ export async function processReserveBankStatus( } } -async function setReserveError( +async function incrementReserveRetry( ws: InternalWalletState, reservePub: string, - err: OperationError, + err: OperationError | undefined, ): Promise<void> { - const mut = (reserve: ReserveRecord) => { - reserve.lastError = err; - return reserve; - }; - await oneShotMutate(ws.db, Stores.reserves, reservePub, mut); + await runWithWriteTransaction(ws.db, [Stores.reserves], async tx => { + const r = await tx.get(Stores.reserves, reservePub); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.put(Stores.reserves, r); + }); } /** @@ -345,15 +373,11 @@ async function updateReserve( } catch (e) { if (e.response?.status === 404) { const m = "The exchange does not know about this reserve (yet)."; - await setReserveError(ws, reservePub, { - type: "waiting", - details: {}, - message: "The exchange does not know about this reserve (yet).", - }); - throw new OperationFailedAndReportedError(m); + await incrementReserveRetry(ws, reservePub, undefined); + return; } else { const m = e.message; - await setReserveError(ws, reservePub, { + await incrementReserveRetry(ws, reservePub, { type: "network", details: {}, message: m, @@ -369,7 +393,7 @@ async function updateReserve( } // FIXME: check / compare history! - if (!r.lastStatusQuery) { + if (!r.lastSuccessfulStatusQuery) { // FIXME: check if this matches initial expectations r.withdrawRemainingAmount = balance; } else { @@ -392,22 +416,31 @@ async function updateReserve( // We're missing some money. } } - r.lastStatusQuery = getTimestampNow(); + r.lastSuccessfulStatusQuery = getTimestampNow(); r.reserveStatus = ReserveRecordStatus.WITHDRAWING; + r.retryInfo = initRetryInfo(); return r; }); - ws.notifier.notify(); + ws.notify( { type: NotificationType.ReserveUpdated }); } async function processReserveImpl( ws: InternalWalletState, reservePub: string, + forceNow: boolean = false, ): Promise<void> { const reserve = await oneShotGet(ws.db, Stores.reserves, reservePub); if (!reserve) { console.log("not processing reserve: reserve does not exist"); return; } + if (!forceNow) { + const now = getTimestampNow(); + if (reserve.retryInfo.nextRetry.t_ms > now.t_ms) { + logger.trace("processReserve retry not due yet"); + return; + } + } logger.trace( `Processing reserve ${reservePub} with status ${reserve.reserveStatus}`, ); @@ -417,10 +450,10 @@ async function processReserveImpl( break; case ReserveRecordStatus.REGISTERING_BANK: await processReserveBankStatus(ws, reservePub); - return processReserveImpl(ws, reservePub); + return processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.QUERYING_STATUS: await updateReserve(ws, reservePub); - return processReserveImpl(ws, reservePub); + return processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.WITHDRAWING: await depleteReserve(ws, reservePub); break; @@ -448,12 +481,13 @@ export async function confirmReserve( } reserve.timestampConfirmed = now; reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + reserve.retryInfo = initRetryInfo(); return reserve; }); - ws.notifier.notify(); + ws.notify({ type: NotificationType.ReserveUpdated }); - processReserve(ws, req.reservePub).catch(e => { + processReserve(ws, req.reservePub, true).catch(e => { console.log("processing reserve failed:", e); }); } @@ -489,7 +523,7 @@ async function depleteReserve( logger.trace(`got denom list`); if (denomsForWithdraw.length === 0) { const m = `Unable to withdraw from reserve, no denominations are available to withdraw.`; - await setReserveError(ws, reserve.reservePub, { + await incrementReserveRetry(ws, reserve.reservePub, { type: "internal", message: m, details: {}, @@ -502,7 +536,8 @@ async function depleteReserve( const withdrawalSessionId = encodeCrock(randomBytes(32)); - const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)).amount; + const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)) + .amount; const withdrawalRecord: WithdrawalSessionRecord = { withdrawSessionId: withdrawalSessionId, @@ -517,6 +552,9 @@ async function depleteReserve( withdrawn: denomsForWithdraw.map(x => false), planchets: denomsForWithdraw.map(x => undefined), totalCoinValue, + retryInfo: initRetryInfo(), + lastCoinErrors: denomsForWithdraw.map(x => undefined), + lastError: undefined, }; const totalCoinWithdrawFee = Amounts.sum( @@ -545,7 +583,7 @@ async function depleteReserve( r.withdrawRemainingAmount = remaining.amount; r.withdrawAllocatedAmount = allocated.amount; r.reserveStatus = ReserveRecordStatus.DORMANT; - + r.retryInfo = initRetryInfo(false); return r; } diff --git a/src/wallet-impl/return.ts b/src/wallet-impl/return.ts index 9cf12052d..ec19c00ae 100644 --- a/src/wallet-impl/return.ts +++ b/src/wallet-impl/return.ts @@ -204,8 +204,6 @@ export async function returnCoins( } }, ); - ws.badge.showNotification(); - ws.notifier.notify(); depositReturnedCoins(ws, coinsReturnRecord); } @@ -269,6 +267,5 @@ async function depositReturnedCoins( } } await oneShotPut(ws.db, Stores.coinsReturns, currentCrr); - ws.notifier.notify(); } } diff --git a/src/wallet-impl/state.ts b/src/wallet-impl/state.ts index a04a7dd1c..18df861f1 100644 --- a/src/wallet-impl/state.ts +++ b/src/wallet-impl/state.ts @@ -15,19 +15,54 @@ */ import { HttpRequestLibrary } from "../util/http"; -import { Badge, Notifier, NextUrlResult } from "../walletTypes"; +import { + NextUrlResult, + WalletBalance, + PendingOperationsResponse, + WalletNotification, +} from "../walletTypes"; import { SpeculativePayData } from "./pay"; -import { CryptoApi } from "../crypto/cryptoApi"; -import { AsyncOpMemo } from "../util/asyncMemo"; - -export interface InternalWalletState { - db: IDBDatabase; - http: HttpRequestLibrary; - badge: Badge; - notifier: Notifier; +import { CryptoApi, CryptoWorkerFactory } from "../crypto/workers/cryptoApi"; +import { AsyncOpMemoMap, AsyncOpMemoSingle } from "../util/asyncMemo"; +import { Logger } from "../util/logging"; + +type NotificationListener = (n: WalletNotification) => void; + +const logger = new Logger("state.ts"); + +export class InternalWalletState { + speculativePayData: SpeculativePayData | undefined = undefined; + cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; + memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + memoGetPending: AsyncOpMemoSingle< + PendingOperationsResponse + > = new AsyncOpMemoSingle(); + memoGetBalance: AsyncOpMemoSingle<WalletBalance> = new AsyncOpMemoSingle(); + memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); cryptoApi: CryptoApi; - speculativePayData: SpeculativePayData | undefined; - cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult }; - memoProcessReserve: AsyncOpMemo<void>; - memoMakePlanchet: AsyncOpMemo<void>; -}
\ No newline at end of file + + listeners: NotificationListener[] = []; + + constructor( + public db: IDBDatabase, + public http: HttpRequestLibrary, + cryptoWorkerFactory: CryptoWorkerFactory, + ) { + this.cryptoApi = new CryptoApi(cryptoWorkerFactory); + } + + public notify(n: WalletNotification) { + logger.trace("Notification", n); + for (const l of this.listeners) { + const nc = JSON.parse(JSON.stringify(n)); + setImmediate(() => { + l(nc); + }); + } + } + + addNotificationListener(f: (n: WalletNotification) => void): void { + this.listeners.push(f); + } +} diff --git a/src/wallet-impl/tip.ts b/src/wallet-impl/tip.ts index 593f0d612..3ae931d45 100644 --- a/src/wallet-impl/tip.ts +++ b/src/wallet-impl/tip.ts @@ -18,14 +18,15 @@ import { oneShotGet, oneShotPut, oneShotMutate, runWithWriteTransaction } from "../util/query"; import { InternalWalletState } from "./state"; import { parseTipUri } from "../util/taleruri"; -import { TipStatus, getTimestampNow } from "../walletTypes"; +import { TipStatus, getTimestampNow, OperationError } from "../walletTypes"; import { TipPickupGetResponse, TipPlanchetDetail, TipResponse } from "../talerTypes"; import * as Amounts from "../util/amounts"; -import { Stores, PlanchetRecord, WithdrawalSessionRecord } from "../dbTypes"; +import { Stores, PlanchetRecord, WithdrawalSessionRecord, initRetryInfo, updateRetryInfoTimeout } from "../dbTypes"; import { getWithdrawDetailsForAmount, getVerifiedWithdrawDenomList, processWithdrawSession } from "./withdraw"; import { getTalerStampSec } from "../util/helpers"; import { updateExchangeFromUrl } from "./exchanges"; import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto"; +import { guardOperationException } from "./errors"; export async function getTipStatus( @@ -74,12 +75,14 @@ export async function getTipStatus( pickedUp: false, planchets: undefined, response: undefined, - timestamp: getTimestampNow(), + createdTimestamp: getTimestampNow(), merchantTipId: res.merchantTipId, totalFees: Amounts.add( withdrawDetails.overhead, withdrawDetails.withdrawFee, ).amount, + retryInfo: initRetryInfo(), + lastError: undefined, }; await oneShotPut(ws.db, Stores.tips, tipRecord); } @@ -101,9 +104,37 @@ export async function getTipStatus( return tipStatus; } +async function incrementTipRetry( + ws: InternalWalletState, + refreshSessionId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.tips], async tx => { + const t = await tx.get(Stores.tips, refreshSessionId); + if (!t) { + return; + } + if (!t.retryInfo) { + return; + } + t.retryInfo.retryCounter++; + updateRetryInfoTimeout(t.retryInfo); + t.lastError = err; + await tx.put(Stores.tips, t); + }); +} + export async function processTip( ws: InternalWalletState, tipId: string, +): Promise<void> { + const onOpErr = (e: OperationError) => incrementTipRetry(ws, tipId, e); + await guardOperationException(() => processTipImpl(ws, tipId), onOpErr); +} + +async function processTipImpl( + ws: InternalWalletState, + tipId: string, ) { let tipRecord = await oneShotGet(ws.db, Stores.tips, tipId); if (!tipRecord) { @@ -205,6 +236,10 @@ export async function processTip( rawWithdrawalAmount: tipRecord.amount, withdrawn: planchets.map((x) => false), totalCoinValue: Amounts.sum(planchets.map((p) => p.coinValue)).amount, + lastCoinErrors: planchets.map((x) => undefined), + retryInfo: initRetryInfo(), + finishTimestamp: undefined, + lastError: undefined, }; @@ -217,6 +252,7 @@ export async function processTip( return; } tr.pickedUp = true; + tr.retryInfo = initRetryInfo(false); await tx.put(Stores.tips, tr); await tx.put(Stores.withdrawalSession, withdrawalSession); @@ -224,8 +260,6 @@ export async function processTip( await processWithdrawSession(ws, withdrawalSessionId); - ws.notifier.notify(); - ws.badge.showNotification(); return; } diff --git a/src/wallet-impl/withdraw.ts b/src/wallet-impl/withdraw.ts index d02ae14aa..7b7d0f640 100644 --- a/src/wallet-impl/withdraw.ts +++ b/src/wallet-impl/withdraw.ts @@ -22,6 +22,8 @@ import { CoinStatus, CoinRecord, PlanchetRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import * as Amounts from "../util/amounts"; import { @@ -30,6 +32,8 @@ import { DownloadedWithdrawInfo, ReserveCreationInfo, WithdrawDetails, + OperationError, + NotificationType, } from "../walletTypes"; import { WithdrawOperationStatusResponse } from "../talerTypes"; import { InternalWalletState } from "./state"; @@ -51,6 +55,7 @@ import { createReserve, processReserveBankStatus } from "./reserves"; import { WALLET_PROTOCOL_VERSION } from "../wallet"; import * as LibtoolVersion from "../util/libtoolVersion"; +import { guardOperationException } from "./errors"; const logger = new Logger("withdraw.ts"); @@ -143,12 +148,9 @@ export async function acceptWithdrawal( senderWire: withdrawInfo.senderWire, exchangeWire: exchangeWire, }); - ws.badge.showNotification(); - ws.notifier.notify(); // We do this here, as the reserve should be registered before we return, // so that we can redirect the user to the bank's status page. await processReserveBankStatus(ws, reserve.reservePub); - ws.notifier.notify(); console.log("acceptWithdrawal: returning"); return { reservePub: reserve.reservePub, @@ -234,6 +236,12 @@ async function processPlanchet( planchet.denomPub, ); + + const isValid = await ws.cryptoApi.rsaVerify(planchet.coinPub, denomSig, planchet.denomPub); + if (!isValid) { + throw Error("invalid RSA signature by the exchange"); + } + const coin: CoinRecord = { blindingKey: planchet.blindingKey, coinPriv: planchet.coinPriv, @@ -249,6 +257,9 @@ async function processPlanchet( withdrawSessionId: withdrawalSessionId, }; + let withdrawSessionFinished = false; + let reserveDepleted = false; + await runWithWriteTransaction( ws.db, [Stores.coins, Stores.withdrawalSession, Stores.reserves], @@ -262,6 +273,18 @@ async function processPlanchet( return; } ws.withdrawn[coinIdx] = true; + ws.lastCoinErrors[coinIdx] = undefined; + let numDone = 0; + for (let i = 0; i < ws.withdrawn.length; i++) { + if (ws.withdrawn[i]) { + numDone++; + } + } + if (numDone === ws.denoms.length) { + ws.finishTimestamp = getTimestampNow(); + ws.retryInfo = initRetryInfo(false); + withdrawSessionFinished = true; + } await tx.put(Stores.withdrawalSession, ws); if (!planchet.isFromTip) { const r = await tx.get(Stores.reserves, planchet.reservePub); @@ -270,14 +293,29 @@ async function processPlanchet( r.withdrawCompletedAmount, Amounts.add(denom.value, denom.feeWithdraw).amount, ).amount; + if (Amounts.cmp(r.withdrawCompletedAmount, r.withdrawAllocatedAmount) == 0) { + reserveDepleted = true; + } await tx.put(Stores.reserves, r); } } await tx.add(Stores.coins, coin); }, ); - ws.notifier.notify(); - logger.trace(`withdraw of one coin ${coin.coinPub} finished`); + + if (withdrawSessionFinished) { + ws.notify({ + type: NotificationType.WithdrawSessionFinished, + withdrawSessionId: withdrawalSessionId, + }); + } + + if (reserveDepleted && withdrawalSession.source.type === "reserve") { + ws.notify({ + type: NotificationType.ReserveDepleted, + reservePub: withdrawalSession.source.reservePub, + }); + } } /** @@ -437,28 +475,51 @@ async function processWithdrawCoin( } if (!withdrawalSession.planchets[coinIndex]) { - logger.trace("creating planchet for coin", coinIndex); const key = `${withdrawalSessionId}-${coinIndex}`; - const p = ws.memoMakePlanchet.find(key); - if (p) { - await p; - } else { - ws.memoMakePlanchet.put( - key, - makePlanchet(ws, withdrawalSessionId, coinIndex), - ); - } - await makePlanchet(ws, withdrawalSessionId, coinIndex); - logger.trace("done creating planchet for coin", coinIndex); + await ws.memoMakePlanchet.memo(key, async () => { + logger.trace("creating planchet for coin", coinIndex); + return makePlanchet(ws, withdrawalSessionId, coinIndex); + }); } await processPlanchet(ws, withdrawalSessionId, coinIndex); - logger.trace("starting withdraw for coin", coinIndex); +} + +async function incrementWithdrawalRetry( + ws: InternalWalletState, + withdrawalSessionId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.withdrawalSession], async tx => { + const wsr = await tx.get(Stores.withdrawalSession, withdrawalSessionId); + if (!wsr) { + return; + } + if (!wsr.retryInfo) { + return; + } + wsr.retryInfo.retryCounter++; + updateRetryInfoTimeout(wsr.retryInfo); + wsr.lastError = err; + await tx.put(Stores.withdrawalSession, wsr); + }); } export async function processWithdrawSession( ws: InternalWalletState, withdrawalSessionId: string, ): Promise<void> { + const onOpErr = (e: OperationError) => + incrementWithdrawalRetry(ws, withdrawalSessionId, e); + await guardOperationException( + () => processWithdrawSessionImpl(ws, withdrawalSessionId), + onOpErr, + ); +} + +export async function processWithdrawSessionImpl( + ws: InternalWalletState, + withdrawalSessionId: string, +): Promise<void> { logger.trace("processing withdraw session", withdrawalSessionId); const withdrawalSession = await oneShotGet( ws.db, @@ -474,7 +535,6 @@ export async function processWithdrawSession( processWithdrawCoin(ws, withdrawalSessionId, i), ); await Promise.all(ps); - ws.badge.showNotification(); return; } |