diff options
author | Florian Dold <florian.dold@gmail.com> | 2019-08-01 23:21:15 +0200 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2019-08-01 23:21:15 +0200 |
commit | 5f62d83a4ddab49a20ed29221c204dff5fe52b6d (patch) | |
tree | 8883706456b268f3a42ac64c21d987e3e908e13e /src/wallet.ts | |
parent | 92b04858a3dcc98b8d252e69a06c8ee2f1745394 (diff) |
headless/android port, PoC for CLI / headless tests
Diffstat (limited to 'src/wallet.ts')
-rw-r--r-- | src/wallet.ts | 315 |
1 files changed, 179 insertions, 136 deletions
diff --git a/src/wallet.ts b/src/wallet.ts index 4fc108a11..6d4eeb26c 100644 --- a/src/wallet.ts +++ b/src/wallet.ts @@ -105,6 +105,7 @@ import { WalletBalance, WalletBalanceEntry, } from "./walletTypes"; +import { openPromise } from "./promiseUtils"; interface SpeculativePayData { payCoinInfo: PayCoinInfo; @@ -327,6 +328,7 @@ export class Wallet { * IndexedDB database used by the wallet. */ db: IDBDatabase; + private enableTracing = false; private http: HttpRequestLibrary; private badge: Badge; private notifier: Notifier; @@ -337,6 +339,12 @@ export class Wallet { private speculativePayData: SpeculativePayData | undefined; private cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; private activeTipOperations: { [s: string]: Promise<TipRecord> } = {}; + private activeProcessReserveOperations: { + [reservePub: string]: Promise<void>; + } = {}; + private activeProcessPreCoinOperations: { + [preCoinPub: string]: Promise<void>; + } = {}; /** * Set of identifiers for running operations. @@ -426,14 +434,14 @@ export class Wallet { .iter(Stores.reserves) .forEach(reserve => { console.log("resuming reserve", reserve.reserve_pub); - this.processReserve(reserve); + this.processReserve(reserve.reserve_pub); }); this.q() .iter(Stores.precoins) .forEach(preCoin => { console.log("resuming precoin"); - this.processPreCoin(preCoin); + this.processPreCoin(preCoin.coinPub); }); this.q() @@ -1073,151 +1081,184 @@ export class Wallet { * First fetch information requred to withdraw from the reserve, * then deplete the reserve, withdrawing coins until it is empty. */ - private async processReserve( - reserveRecord: ReserveRecord, - retryDelayMs: number = 250, - ): Promise<void> { - const opId = "reserve-" + reserveRecord.reserve_pub; + async processReserve(reservePub: string): Promise<void> { + const activeOperation = this.activeProcessReserveOperations[reservePub]; + + if (activeOperation) { + return activeOperation; + } + + const opId = "reserve-" + reservePub; this.startOperation(opId); + // This opened promise gets resolved only once the + // reserve withdraw operation succeeds, even after retries. + const op = openPromise<void>(); + + const processReserveInternal = async (retryDelayMs: number = 250) => { + try { + const reserve = await this.updateReserve(reservePub); + await this.depleteReserve(reserve); + op.resolve(); + } catch (e) { + // random, exponential backoff truncated at 3 minutes + const nextDelay = Math.min( + 2 * retryDelayMs + retryDelayMs * Math.random(), + 3000 * 60, + ); + console.warn( + `Failed to deplete reserve, trying again in ${retryDelayMs} ms`, + ); + this.timerGroup.after(retryDelayMs, () => + processReserveInternal(nextDelay), + ); + } + }; + try { - const reserve = await this.updateReserve(reserveRecord.reserve_pub); - await this.depleteReserve(reserve); - } catch (e) { - // random, exponential backoff truncated at 3 minutes - const nextDelay = Math.min( - 2 * retryDelayMs + retryDelayMs * Math.random(), - 3000 * 60, - ); - console.warn( - `Failed to deplete reserve, trying again in ${retryDelayMs} ms`, - ); - this.timerGroup.after(retryDelayMs, () => - this.processReserve(reserveRecord, nextDelay), - ); + processReserveInternal(); + this.activeProcessReserveOperations[reservePub] = op.promise; + await op.promise; } finally { this.stopOperation(opId); + delete this.activeProcessReserveOperations[reservePub]; } } /** * Given a planchet, withdraw a coin from the exchange. */ - private async processPreCoin( - preCoin: PreCoinRecord, - retryDelayMs = 200, - ): Promise<void> { - // Throttle concurrent executions of this function, so we don't withdraw too many coins at once. - if ( - this.processPreCoinConcurrent >= 4 || - this.processPreCoinThrottle[preCoin.exchangeBaseUrl] - ) { - console.log("delaying processPreCoin"); - this.timerGroup.after(retryDelayMs, () => - this.processPreCoin(preCoin, Math.min(retryDelayMs * 2, 5 * 60 * 1000)), - ); - return; + private async processPreCoin(preCoinPub: string): Promise<void> { + const activeOperation = this.activeProcessPreCoinOperations[preCoinPub]; + if (activeOperation) { + return activeOperation; } - console.log("executing processPreCoin", preCoin); - this.processPreCoinConcurrent++; - try { - const exchange = await this.q().get( - Stores.exchanges, - preCoin.exchangeBaseUrl, - ); - if (!exchange) { - console.error("db inconsistent: exchange for precoin not found"); + + const op = openPromise<void>(); + + const processPreCoinInternal = async (retryDelayMs: number = 200) => { + const preCoin = await this.q().get(Stores.precoins, preCoinPub); + if (!preCoin) { + console.log("processPreCoin: preCoinPub not found"); return; } - const denom = await this.q().get(Stores.denominations, [ - preCoin.exchangeBaseUrl, - preCoin.denomPub, - ]); - if (!denom) { - console.error("db inconsistent: denom for precoin not found"); - return; + // Throttle concurrent executions of this function, + // so we don't withdraw too many coins at once. + if ( + this.processPreCoinConcurrent >= 4 || + this.processPreCoinThrottle[preCoin.exchangeBaseUrl] + ) { + this.enableTracing && console.log("delaying processPreCoin"); + this.timerGroup.after(retryDelayMs, () => + processPreCoinInternal(Math.min(retryDelayMs * 2, 5 * 60 * 1000)), + ); + return op.promise; } - const coin = await this.withdrawExecute(preCoin); - console.log("processPreCoin: got coin", coin); - - const mutateReserve = (r: ReserveRecord) => { - console.log( - `before committing coin: current ${amountToPretty( - r.current_amount!, - )}, precoin: ${amountToPretty(r.precoin_amount)})}`, - ); + //console.log("executing processPreCoin", preCoin); + this.processPreCoinConcurrent++; - const x = Amounts.sub( - r.precoin_amount, - preCoin.coinValue, - denom.feeWithdraw, + try { + const exchange = await this.q().get( + Stores.exchanges, + preCoin.exchangeBaseUrl, ); - if (x.saturated) { - console.error("database inconsistent"); - throw AbortTransaction; + if (!exchange) { + console.error("db inconsistent: exchange for precoin not found"); + return; + } + const denom = await this.q().get(Stores.denominations, [ + preCoin.exchangeBaseUrl, + preCoin.denomPub, + ]); + if (!denom) { + console.error("db inconsistent: denom for precoin not found"); + return; } - r.precoin_amount = x.amount; - return r; - }; - await this.q() - .mutate(Stores.reserves, preCoin.reservePub, mutateReserve) - .delete(Stores.precoins, coin.coinPub) - .add(Stores.coins, coin) - .finish(); + const coin = await this.withdrawExecute(preCoin); - if (coin.status === CoinStatus.TainedByTip) { - const tip = await this.q().getIndexed( - Stores.tips.coinPubIndex, - coin.coinPub, - ); - if (!tip) { - throw Error( - `inconsistent DB: tip for coin pub ${coin.coinPub} not found.`, + const mutateReserve = (r: ReserveRecord) => { + const x = Amounts.sub( + r.precoin_amount, + preCoin.coinValue, + denom.feeWithdraw, ); - } + if (x.saturated) { + console.error("database inconsistent"); + throw AbortTransaction; + } + r.precoin_amount = x.amount; + return r; + }; - if (tip.accepted) { - console.log("untainting already accepted tip"); - // Transactionally set coin to fresh. - const mutateCoin = (c: CoinRecord) => { - if (c.status === CoinStatus.TainedByTip) { - c.status = CoinStatus.Fresh; - } - return c; - }; - await this.q().mutate(Stores.coins, coin.coinPub, mutateCoin); - // Show notifications only for accepted tips + await this.q() + .mutate(Stores.reserves, preCoin.reservePub, mutateReserve) + .delete(Stores.precoins, coin.coinPub) + .add(Stores.coins, coin) + .finish(); + + if (coin.status === CoinStatus.TainedByTip) { + const tip = await this.q().getIndexed( + Stores.tips.coinPubIndex, + coin.coinPub, + ); + if (!tip) { + throw Error( + `inconsistent DB: tip for coin pub ${coin.coinPub} not found.`, + ); + } + + if (tip.accepted) { + console.log("untainting already accepted tip"); + // Transactionally set coin to fresh. + const mutateCoin = (c: CoinRecord) => { + if (c.status === CoinStatus.TainedByTip) { + c.status = CoinStatus.Fresh; + } + return c; + }; + await this.q().mutate(Stores.coins, coin.coinPub, mutateCoin); + // Show notifications only for accepted tips + this.badge.showNotification(); + } + } else { this.badge.showNotification(); } - } else { - this.badge.showNotification(); - } - this.notifier.notify(); - } catch (e) { - console.error( - "Failed to withdraw coin from precoin, retrying in", - retryDelayMs, - "ms", - e, - ); - // exponential backoff truncated at one minute - const nextRetryDelayMs = Math.min(retryDelayMs * 2, 5 * 60 * 1000); - this.timerGroup.after(retryDelayMs, () => - this.processPreCoin(preCoin, nextRetryDelayMs), - ); + this.notifier.notify(); + op.resolve(); + } catch (e) { + console.error( + "Failed to withdraw coin from precoin, retrying in", + retryDelayMs, + "ms", + e, + ); + // exponential backoff truncated at one minute + const nextRetryDelayMs = Math.min(retryDelayMs * 2, 5 * 60 * 1000); + this.timerGroup.after(retryDelayMs, () => + processPreCoinInternal(nextRetryDelayMs), + ); - const currentThrottle = - this.processPreCoinThrottle[preCoin.exchangeBaseUrl] || 0; - this.processPreCoinThrottle[preCoin.exchangeBaseUrl] = - currentThrottle + 1; - this.timerGroup.after(retryDelayMs, () => { - this.processPreCoinThrottle[preCoin.exchangeBaseUrl]--; - }); + const currentThrottle = + this.processPreCoinThrottle[preCoin.exchangeBaseUrl] || 0; + this.processPreCoinThrottle[preCoin.exchangeBaseUrl] = + currentThrottle + 1; + this.timerGroup.after(retryDelayMs, () => { + this.processPreCoinThrottle[preCoin.exchangeBaseUrl]--; + }); + } finally { + this.processPreCoinConcurrent--; + } + }; + + try { + this.activeProcessPreCoinOperations[preCoinPub] = op.promise; + await processPreCoinInternal(); + return op.promise; } finally { - this.processPreCoinConcurrent--; + delete this.activeProcessPreCoinOperations[preCoinPub]; } } @@ -1332,9 +1373,8 @@ export class Wallet { .finish(); this.notifier.notify(); - this.processReserve(reserve); + this.processReserve(reserve.reserve_pub); } - private async withdrawExecute(pc: PreCoinRecord): Promise<CoinRecord> { const wd: any = {}; @@ -1424,20 +1464,22 @@ export class Wallet { r.timestamp_depleted = new Date().getTime(); } - console.log( - `after creating precoin: current ${amountToPretty( - r.current_amount, - )}, precoin: ${amountToPretty(r.precoin_amount)})}`, - ); - return r; } const preCoin = await this.cryptoApi.createPreCoin(denom, reserve); - await this.q() - .put(Stores.precoins, preCoin) - .mutate(Stores.reserves, reserve.reserve_pub, mutateReserve); - await this.processPreCoin(preCoin); + // This will fail and throw an exception if the remaining amount in the + // reserve is too low to create a pre-coin. + try { + await this.q() + .put(Stores.precoins, preCoin) + .mutate(Stores.reserves, reserve.reserve_pub, mutateReserve) + .finish(); + } catch (e) { + console.log("can't create pre-coin:", e.name, e.message); + return; + } + await this.processPreCoin(preCoin.coinPub); }); await Promise.all(ps); @@ -1746,7 +1788,10 @@ export class Wallet { return ret; } - async getExchangePaytoUri(exchangeBaseUrl: string, supportedTargetTypes: string[]): Promise<string> { + async getExchangePaytoUri( + exchangeBaseUrl: string, + supportedTargetTypes: string[], + ): Promise<string> { const wireInfo = await this.getWireInfo(exchangeBaseUrl); for (let account of wireInfo.accounts) { const paytoUri = new URI(account.url); @@ -1820,8 +1865,6 @@ export class Wallet { throw Error("exchange doesn't offer any denominations"); } - console.log("updating exchange with wireMethodDetails", wireMethodDetails); - const r = await this.q().get<ExchangeRecord>(Stores.exchanges, baseUrl); let exchangeInfo: ExchangeRecord; @@ -2714,7 +2757,7 @@ export class Wallet { */ stop() { this.timerGroup.stopCurrentAndFutureTimers(); - this.cryptoApi.terminateWorkers(); + this.cryptoApi.stop(); } async getSenderWireInfos(): Promise<SenderWireInfos> { @@ -3199,7 +3242,7 @@ export class Wallet { withdrawSig: response.reserve_sigs[i].reserve_sig, }; await this.q().put(Stores.precoins, preCoin); - this.processPreCoin(preCoin); + this.processPreCoin(preCoin.coinPub); } tipRecord.pickedUp = true; |