From b5ee6b7b4ee506712f51e1b90e9256c4b0c0c603 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 2 Dec 2019 17:35:47 +0100 Subject: pending operations WIP --- src/wallet-impl/balance.ts | 6 +- src/wallet-impl/pay.ts | 11 +++- src/wallet-impl/pending.ts | 44 ++++++++++---- src/wallet-impl/reserves.ts | 1 + src/wallet-impl/state.ts | 1 + src/wallet-impl/withdraw.ts | 138 ++++++++++++++++++++++++-------------------- 6 files changed, 121 insertions(+), 80 deletions(-) (limited to 'src/wallet-impl') diff --git a/src/wallet-impl/balance.ts b/src/wallet-impl/balance.ts index 1d8e077af..0abc96637 100644 --- a/src/wallet-impl/balance.ts +++ b/src/wallet-impl/balance.ts @@ -18,12 +18,10 @@ * Imports. */ import { - HistoryQuery, - HistoryEvent, WalletBalance, WalletBalanceEntry, } from "../walletTypes"; -import { oneShotIter, runWithWriteTransaction } from "../util/query"; +import { runWithReadTransaction } from "../util/query"; import { InternalWalletState } from "./state"; import { Stores, TipRecord, CoinStatus } from "../dbTypes"; import * as Amounts from "../util/amounts"; @@ -77,7 +75,7 @@ export async function getBalances( byExchange: {}, }; - await runWithWriteTransaction( + await runWithReadTransaction( ws.db, [Stores.coins, Stores.refresh, Stores.reserves, Stores.purchases], async tx => { diff --git a/src/wallet-impl/pay.ts b/src/wallet-impl/pay.ts index d4d2b3cd4..31a1500ec 100644 --- a/src/wallet-impl/pay.ts +++ b/src/wallet-impl/pay.ts @@ -358,9 +358,14 @@ async function recordConfirmPay( } function getNextUrl(contractTerms: ContractTerms): string { - const fu = new URL(contractTerms.fulfillment_url) - fu.searchParams.set("order_id", contractTerms.order_id); - return fu.href; + const f = contractTerms.fulfillment_url; + if (f.startsWith("http://") || f.startsWith("https://")) { + const fu = new URL(contractTerms.fulfillment_url) + fu.searchParams.set("order_id", contractTerms.order_id); + return fu.href; + } else { + return f; + } } export async function abortFailedPayment( diff --git a/src/wallet-impl/pending.ts b/src/wallet-impl/pending.ts index a66571a34..dbc6672c7 100644 --- a/src/wallet-impl/pending.ts +++ b/src/wallet-impl/pending.ts @@ -14,18 +14,29 @@ GNU Taler; see the file COPYING. If not, see */ - /** - * Imports. - */ -import { PendingOperationInfo, PendingOperationsResponse } from "../walletTypes"; +/** + * Imports. + */ +import { + PendingOperationInfo, + PendingOperationsResponse, + getTimestampNow, +} from "../walletTypes"; import { oneShotIter } from "../util/query"; import { InternalWalletState } from "./state"; -import { Stores, ExchangeUpdateStatus, ReserveRecordStatus, CoinStatus, ProposalStatus } from "../dbTypes"; +import { + Stores, + ExchangeUpdateStatus, + ReserveRecordStatus, + CoinStatus, + ProposalStatus, +} from "../dbTypes"; export async function getPendingOperations( ws: InternalWalletState, ): Promise { const pendingOperations: PendingOperationInfo[] = []; + let minRetryDurationMs = 5000; const exchanges = await oneShotIter(ws.db, Stores.exchanges).toArray(); for (let e of exchanges) { switch (e.updateStatus) { @@ -92,9 +103,8 @@ export async function getPendingOperations( } } await oneShotIter(ws.db, Stores.reserves).forEach(reserve => { - const reserveType = reserve.bankWithdrawStatusUrl - ? "taler-bank" - : "manual"; + const reserveType = reserve.bankWithdrawStatusUrl ? "taler-bank" : "manual"; + const now = getTimestampNow(); switch (reserve.reserveStatus) { case ReserveRecordStatus.DORMANT: // nothing to report as pending @@ -110,6 +120,11 @@ export async function getPendingOperations( reserveType, reservePub: reserve.reservePub, }); + if (reserve.created.t_ms < now.t_ms - 5000) { + minRetryDurationMs = 500; + } else if (reserve.created.t_ms < now.t_ms - 30000) { + minRetryDurationMs = 2000; + } break; case ReserveRecordStatus.WAIT_CONFIRM_BANK: pendingOperations.push({ @@ -120,6 +135,11 @@ export async function getPendingOperations( reservePub: reserve.reservePub, bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl, }); + if (reserve.created.t_ms < now.t_ms - 5000) { + minRetryDurationMs = 500; + } else if (reserve.created.t_ms < now.t_ms - 30000) { + minRetryDurationMs = 2000; + } break; default: pendingOperations.push({ @@ -164,10 +184,7 @@ export async function getPendingOperations( }); await oneShotIter(ws.db, Stores.withdrawalSession).forEach(ws => { - const numCoinsWithdrawn = ws.withdrawn.reduce( - (a, x) => a + (x ? 1 : 0), - 0, - ); + const numCoinsWithdrawn = ws.withdrawn.reduce((a, x) => a + (x ? 1 : 0), 0); const numCoinsTotal = ws.withdrawn.length; if (numCoinsWithdrawn < numCoinsTotal) { pendingOperations.push({ @@ -204,5 +221,8 @@ export async function getPendingOperations( return { pendingOperations, + nextRetryDelay: { + d_ms: minRetryDurationMs, + }, }; } diff --git a/src/wallet-impl/reserves.ts b/src/wallet-impl/reserves.ts index 265eddce4..5d624fe27 100644 --- a/src/wallet-impl/reserves.ts +++ b/src/wallet-impl/reserves.ts @@ -105,6 +105,7 @@ export async function createReserve( const exchangeInfo = await updateExchangeFromUrl(ws, req.exchange); const exchangeDetails = exchangeInfo.details; if (!exchangeDetails) { + console.log(exchangeDetails); throw Error("exchange not updated"); } const { isAudited, isTrusted } = await getExchangeTrust(ws, exchangeInfo); diff --git a/src/wallet-impl/state.ts b/src/wallet-impl/state.ts index 3d6bb8bdf..a04a7dd1c 100644 --- a/src/wallet-impl/state.ts +++ b/src/wallet-impl/state.ts @@ -29,4 +29,5 @@ export interface InternalWalletState { speculativePayData: SpeculativePayData | undefined; cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult }; memoProcessReserve: AsyncOpMemo; + memoMakePlanchet: AsyncOpMemo; } \ No newline at end of file diff --git a/src/wallet-impl/withdraw.ts b/src/wallet-impl/withdraw.ts index 4e2d80556..baeae1a50 100644 --- a/src/wallet-impl/withdraw.ts +++ b/src/wallet-impl/withdraw.ts @@ -189,7 +189,6 @@ async function processPlanchet( return; } if (withdrawalSession.source.type === "reserve") { - } const planchet = withdrawalSession.planchets[coinIdx]; if (!planchet) { @@ -251,10 +250,7 @@ async function processPlanchet( ws.db, [Stores.coins, Stores.withdrawalSession, Stores.reserves], async tx => { - const ws = await tx.get( - Stores.withdrawalSession, - withdrawalSessionId, - ); + const ws = await tx.get(Stores.withdrawalSession, withdrawalSessionId); if (!ws) { return; } @@ -350,12 +346,72 @@ export async function getVerifiedWithdrawDenomList( return selectedDenoms; } +async function makePlanchet( + ws: InternalWalletState, + withdrawalSessionId: string, + coinIndex: number, +): Promise { + const withdrawalSession = await oneShotGet( + ws.db, + Stores.withdrawalSession, + withdrawalSessionId, + ); + if (!withdrawalSession) { + return; + } + const src = withdrawalSession.source; + if (src.type !== "reserve") { + throw Error("invalid state"); + } + const reserve = await oneShotGet(ws.db, Stores.reserves, src.reservePub); + if (!reserve) { + return; + } + const denom = await oneShotGet(ws.db, Stores.denominations, [ + withdrawalSession.exchangeBaseUrl, + withdrawalSession.denoms[coinIndex], + ]); + if (!denom) { + return; + } + const r = await ws.cryptoApi.createPlanchet({ + denomPub: denom.denomPub, + feeWithdraw: denom.feeWithdraw, + reservePriv: reserve.reservePriv, + reservePub: reserve.reservePub, + value: denom.value, + }); + const newPlanchet: PlanchetRecord = { + blindingKey: r.blindingKey, + coinEv: r.coinEv, + coinPriv: r.coinPriv, + coinPub: r.coinPub, + coinValue: r.coinValue, + denomPub: r.denomPub, + denomPubHash: r.denomPubHash, + isFromTip: false, + reservePub: r.reservePub, + withdrawSig: r.withdrawSig, + }; + await runWithWriteTransaction(ws.db, [Stores.withdrawalSession], async tx => { + const myWs = await tx.get(Stores.withdrawalSession, withdrawalSessionId); + if (!myWs) { + return; + } + if (myWs.planchets[coinIndex]) { + return; + } + myWs.planchets[coinIndex] = newPlanchet; + await tx.put(Stores.withdrawalSession, myWs); + }); +} + async function processWithdrawCoin( ws: InternalWalletState, withdrawalSessionId: string, coinIndex: number, ) { - logger.info("starting withdraw for coin"); + logger.trace("starting withdraw for coin", coinIndex); const withdrawalSession = await oneShotGet( ws.db, Stores.withdrawalSession, @@ -377,63 +433,23 @@ async function processWithdrawCoin( return; } - if (withdrawalSession.planchets[coinIndex]) { - return processPlanchet(ws, withdrawalSessionId, coinIndex); - } else { - const src = withdrawalSession.source; - if (src.type !== "reserve") { - throw Error("invalid state"); - } - const reserve = await oneShotGet(ws.db, Stores.reserves, src.reservePub) - if (!reserve) { - return; - } - const denom = await oneShotGet(ws.db, Stores.denominations, [ - withdrawalSession.exchangeBaseUrl, - withdrawalSession.denoms[coinIndex], - ]); - if (!denom) { - return; + if (!withdrawalSession.planchets[coinIndex]) { + logger.trace("creating planchet for coin", coinIndex); + const key = `${withdrawalSessionId}-${coinIndex}`; + const p = ws.memoMakePlanchet.find(key); + if (p) { + await p; + } else { + ws.memoMakePlanchet.put( + key, + makePlanchet(ws, withdrawalSessionId, coinIndex), + ); } - const r = await ws.cryptoApi.createPlanchet({ - denomPub: denom.denomPub, - feeWithdraw: denom.feeWithdraw, - reservePriv: reserve.reservePriv, - reservePub: reserve.reservePub, - value: denom.value, - }); - const newPlanchet: PlanchetRecord = { - blindingKey: r.blindingKey, - coinEv: r.coinEv, - coinPriv: r.coinPriv, - coinPub: r.coinPub, - coinValue: r.coinValue, - denomPub: r.denomPub, - denomPubHash: r.denomPubHash, - isFromTip: false, - reservePub: r.reservePub, - withdrawSig: r.withdrawSig, - }; - await runWithWriteTransaction( - ws.db, - [Stores.withdrawalSession], - async tx => { - const myWs = await tx.get( - Stores.withdrawalSession, - withdrawalSessionId, - ); - if (!myWs) { - return; - } - if (myWs.planchets[coinIndex]) { - return; - } - myWs.planchets[coinIndex] = newPlanchet; - await tx.put(Stores.withdrawalSession, myWs); - }, - ); - await processPlanchet(ws, withdrawalSessionId, coinIndex); + await makePlanchet(ws, withdrawalSessionId, coinIndex); + logger.trace("done creating planchet for coin", coinIndex); } + await processPlanchet(ws, withdrawalSessionId, coinIndex); + logger.trace("starting withdraw for coin", coinIndex); } export async function processWithdrawSession( -- cgit v1.2.3