aboutsummaryrefslogtreecommitdiff
path: root/src/wallet.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/wallet.ts')
-rw-r--r--src/wallet.ts315
1 files changed, 179 insertions, 136 deletions
diff --git a/src/wallet.ts b/src/wallet.ts
index 4fc108a11..6d4eeb26c 100644
--- a/src/wallet.ts
+++ b/src/wallet.ts
@@ -105,6 +105,7 @@ import {
WalletBalance,
WalletBalanceEntry,
} from "./walletTypes";
+import { openPromise } from "./promiseUtils";
interface SpeculativePayData {
payCoinInfo: PayCoinInfo;
@@ -327,6 +328,7 @@ export class Wallet {
* IndexedDB database used by the wallet.
*/
db: IDBDatabase;
+ private enableTracing = false;
private http: HttpRequestLibrary;
private badge: Badge;
private notifier: Notifier;
@@ -337,6 +339,12 @@ export class Wallet {
private speculativePayData: SpeculativePayData | undefined;
private cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {};
private activeTipOperations: { [s: string]: Promise<TipRecord> } = {};
+ private activeProcessReserveOperations: {
+ [reservePub: string]: Promise<void>;
+ } = {};
+ private activeProcessPreCoinOperations: {
+ [preCoinPub: string]: Promise<void>;
+ } = {};
/**
* Set of identifiers for running operations.
@@ -426,14 +434,14 @@ export class Wallet {
.iter(Stores.reserves)
.forEach(reserve => {
console.log("resuming reserve", reserve.reserve_pub);
- this.processReserve(reserve);
+ this.processReserve(reserve.reserve_pub);
});
this.q()
.iter(Stores.precoins)
.forEach(preCoin => {
console.log("resuming precoin");
- this.processPreCoin(preCoin);
+ this.processPreCoin(preCoin.coinPub);
});
this.q()
@@ -1073,151 +1081,184 @@ export class Wallet {
* First fetch information requred to withdraw from the reserve,
* then deplete the reserve, withdrawing coins until it is empty.
*/
- private async processReserve(
- reserveRecord: ReserveRecord,
- retryDelayMs: number = 250,
- ): Promise<void> {
- const opId = "reserve-" + reserveRecord.reserve_pub;
+ async processReserve(reservePub: string): Promise<void> {
+ const activeOperation = this.activeProcessReserveOperations[reservePub];
+
+ if (activeOperation) {
+ return activeOperation;
+ }
+
+ const opId = "reserve-" + reservePub;
this.startOperation(opId);
+ // This opened promise gets resolved only once the
+ // reserve withdraw operation succeeds, even after retries.
+ const op = openPromise<void>();
+
+ const processReserveInternal = async (retryDelayMs: number = 250) => {
+ try {
+ const reserve = await this.updateReserve(reservePub);
+ await this.depleteReserve(reserve);
+ op.resolve();
+ } catch (e) {
+ // random, exponential backoff truncated at 3 minutes
+ const nextDelay = Math.min(
+ 2 * retryDelayMs + retryDelayMs * Math.random(),
+ 3000 * 60,
+ );
+ console.warn(
+ `Failed to deplete reserve, trying again in ${retryDelayMs} ms`,
+ );
+ this.timerGroup.after(retryDelayMs, () =>
+ processReserveInternal(nextDelay),
+ );
+ }
+ };
+
try {
- const reserve = await this.updateReserve(reserveRecord.reserve_pub);
- await this.depleteReserve(reserve);
- } catch (e) {
- // random, exponential backoff truncated at 3 minutes
- const nextDelay = Math.min(
- 2 * retryDelayMs + retryDelayMs * Math.random(),
- 3000 * 60,
- );
- console.warn(
- `Failed to deplete reserve, trying again in ${retryDelayMs} ms`,
- );
- this.timerGroup.after(retryDelayMs, () =>
- this.processReserve(reserveRecord, nextDelay),
- );
+ processReserveInternal();
+ this.activeProcessReserveOperations[reservePub] = op.promise;
+ await op.promise;
} finally {
this.stopOperation(opId);
+ delete this.activeProcessReserveOperations[reservePub];
}
}
/**
* Given a planchet, withdraw a coin from the exchange.
*/
- private async processPreCoin(
- preCoin: PreCoinRecord,
- retryDelayMs = 200,
- ): Promise<void> {
- // Throttle concurrent executions of this function, so we don't withdraw too many coins at once.
- if (
- this.processPreCoinConcurrent >= 4 ||
- this.processPreCoinThrottle[preCoin.exchangeBaseUrl]
- ) {
- console.log("delaying processPreCoin");
- this.timerGroup.after(retryDelayMs, () =>
- this.processPreCoin(preCoin, Math.min(retryDelayMs * 2, 5 * 60 * 1000)),
- );
- return;
+ private async processPreCoin(preCoinPub: string): Promise<void> {
+ const activeOperation = this.activeProcessPreCoinOperations[preCoinPub];
+ if (activeOperation) {
+ return activeOperation;
}
- console.log("executing processPreCoin", preCoin);
- this.processPreCoinConcurrent++;
- try {
- const exchange = await this.q().get(
- Stores.exchanges,
- preCoin.exchangeBaseUrl,
- );
- if (!exchange) {
- console.error("db inconsistent: exchange for precoin not found");
+
+ const op = openPromise<void>();
+
+ const processPreCoinInternal = async (retryDelayMs: number = 200) => {
+ const preCoin = await this.q().get(Stores.precoins, preCoinPub);
+ if (!preCoin) {
+ console.log("processPreCoin: preCoinPub not found");
return;
}
- const denom = await this.q().get(Stores.denominations, [
- preCoin.exchangeBaseUrl,
- preCoin.denomPub,
- ]);
- if (!denom) {
- console.error("db inconsistent: denom for precoin not found");
- return;
+ // Throttle concurrent executions of this function,
+ // so we don't withdraw too many coins at once.
+ if (
+ this.processPreCoinConcurrent >= 4 ||
+ this.processPreCoinThrottle[preCoin.exchangeBaseUrl]
+ ) {
+ this.enableTracing && console.log("delaying processPreCoin");
+ this.timerGroup.after(retryDelayMs, () =>
+ processPreCoinInternal(Math.min(retryDelayMs * 2, 5 * 60 * 1000)),
+ );
+ return op.promise;
}
- const coin = await this.withdrawExecute(preCoin);
- console.log("processPreCoin: got coin", coin);
-
- const mutateReserve = (r: ReserveRecord) => {
- console.log(
- `before committing coin: current ${amountToPretty(
- r.current_amount!,
- )}, precoin: ${amountToPretty(r.precoin_amount)})}`,
- );
+ //console.log("executing processPreCoin", preCoin);
+ this.processPreCoinConcurrent++;
- const x = Amounts.sub(
- r.precoin_amount,
- preCoin.coinValue,
- denom.feeWithdraw,
+ try {
+ const exchange = await this.q().get(
+ Stores.exchanges,
+ preCoin.exchangeBaseUrl,
);
- if (x.saturated) {
- console.error("database inconsistent");
- throw AbortTransaction;
+ if (!exchange) {
+ console.error("db inconsistent: exchange for precoin not found");
+ return;
+ }
+ const denom = await this.q().get(Stores.denominations, [
+ preCoin.exchangeBaseUrl,
+ preCoin.denomPub,
+ ]);
+ if (!denom) {
+ console.error("db inconsistent: denom for precoin not found");
+ return;
}
- r.precoin_amount = x.amount;
- return r;
- };
- await this.q()
- .mutate(Stores.reserves, preCoin.reservePub, mutateReserve)
- .delete(Stores.precoins, coin.coinPub)
- .add(Stores.coins, coin)
- .finish();
+ const coin = await this.withdrawExecute(preCoin);
- if (coin.status === CoinStatus.TainedByTip) {
- const tip = await this.q().getIndexed(
- Stores.tips.coinPubIndex,
- coin.coinPub,
- );
- if (!tip) {
- throw Error(
- `inconsistent DB: tip for coin pub ${coin.coinPub} not found.`,
+ const mutateReserve = (r: ReserveRecord) => {
+ const x = Amounts.sub(
+ r.precoin_amount,
+ preCoin.coinValue,
+ denom.feeWithdraw,
);
- }
+ if (x.saturated) {
+ console.error("database inconsistent");
+ throw AbortTransaction;
+ }
+ r.precoin_amount = x.amount;
+ return r;
+ };
- if (tip.accepted) {
- console.log("untainting already accepted tip");
- // Transactionally set coin to fresh.
- const mutateCoin = (c: CoinRecord) => {
- if (c.status === CoinStatus.TainedByTip) {
- c.status = CoinStatus.Fresh;
- }
- return c;
- };
- await this.q().mutate(Stores.coins, coin.coinPub, mutateCoin);
- // Show notifications only for accepted tips
+ await this.q()
+ .mutate(Stores.reserves, preCoin.reservePub, mutateReserve)
+ .delete(Stores.precoins, coin.coinPub)
+ .add(Stores.coins, coin)
+ .finish();
+
+ if (coin.status === CoinStatus.TainedByTip) {
+ const tip = await this.q().getIndexed(
+ Stores.tips.coinPubIndex,
+ coin.coinPub,
+ );
+ if (!tip) {
+ throw Error(
+ `inconsistent DB: tip for coin pub ${coin.coinPub} not found.`,
+ );
+ }
+
+ if (tip.accepted) {
+ console.log("untainting already accepted tip");
+ // Transactionally set coin to fresh.
+ const mutateCoin = (c: CoinRecord) => {
+ if (c.status === CoinStatus.TainedByTip) {
+ c.status = CoinStatus.Fresh;
+ }
+ return c;
+ };
+ await this.q().mutate(Stores.coins, coin.coinPub, mutateCoin);
+ // Show notifications only for accepted tips
+ this.badge.showNotification();
+ }
+ } else {
this.badge.showNotification();
}
- } else {
- this.badge.showNotification();
- }
- this.notifier.notify();
- } catch (e) {
- console.error(
- "Failed to withdraw coin from precoin, retrying in",
- retryDelayMs,
- "ms",
- e,
- );
- // exponential backoff truncated at one minute
- const nextRetryDelayMs = Math.min(retryDelayMs * 2, 5 * 60 * 1000);
- this.timerGroup.after(retryDelayMs, () =>
- this.processPreCoin(preCoin, nextRetryDelayMs),
- );
+ this.notifier.notify();
+ op.resolve();
+ } catch (e) {
+ console.error(
+ "Failed to withdraw coin from precoin, retrying in",
+ retryDelayMs,
+ "ms",
+ e,
+ );
+ // exponential backoff truncated at one minute
+ const nextRetryDelayMs = Math.min(retryDelayMs * 2, 5 * 60 * 1000);
+ this.timerGroup.after(retryDelayMs, () =>
+ processPreCoinInternal(nextRetryDelayMs),
+ );
- const currentThrottle =
- this.processPreCoinThrottle[preCoin.exchangeBaseUrl] || 0;
- this.processPreCoinThrottle[preCoin.exchangeBaseUrl] =
- currentThrottle + 1;
- this.timerGroup.after(retryDelayMs, () => {
- this.processPreCoinThrottle[preCoin.exchangeBaseUrl]--;
- });
+ const currentThrottle =
+ this.processPreCoinThrottle[preCoin.exchangeBaseUrl] || 0;
+ this.processPreCoinThrottle[preCoin.exchangeBaseUrl] =
+ currentThrottle + 1;
+ this.timerGroup.after(retryDelayMs, () => {
+ this.processPreCoinThrottle[preCoin.exchangeBaseUrl]--;
+ });
+ } finally {
+ this.processPreCoinConcurrent--;
+ }
+ };
+
+ try {
+ this.activeProcessPreCoinOperations[preCoinPub] = op.promise;
+ await processPreCoinInternal();
+ return op.promise;
} finally {
- this.processPreCoinConcurrent--;
+ delete this.activeProcessPreCoinOperations[preCoinPub];
}
}
@@ -1332,9 +1373,8 @@ export class Wallet {
.finish();
this.notifier.notify();
- this.processReserve(reserve);
+ this.processReserve(reserve.reserve_pub);
}
-
private async withdrawExecute(pc: PreCoinRecord): Promise<CoinRecord> {
const wd: any = {};
@@ -1424,20 +1464,22 @@ export class Wallet {
r.timestamp_depleted = new Date().getTime();
}
- console.log(
- `after creating precoin: current ${amountToPretty(
- r.current_amount,
- )}, precoin: ${amountToPretty(r.precoin_amount)})}`,
- );
-
return r;
}
const preCoin = await this.cryptoApi.createPreCoin(denom, reserve);
- await this.q()
- .put(Stores.precoins, preCoin)
- .mutate(Stores.reserves, reserve.reserve_pub, mutateReserve);
- await this.processPreCoin(preCoin);
+ // This will fail and throw an exception if the remaining amount in the
+ // reserve is too low to create a pre-coin.
+ try {
+ await this.q()
+ .put(Stores.precoins, preCoin)
+ .mutate(Stores.reserves, reserve.reserve_pub, mutateReserve)
+ .finish();
+ } catch (e) {
+ console.log("can't create pre-coin:", e.name, e.message);
+ return;
+ }
+ await this.processPreCoin(preCoin.coinPub);
});
await Promise.all(ps);
@@ -1746,7 +1788,10 @@ export class Wallet {
return ret;
}
- async getExchangePaytoUri(exchangeBaseUrl: string, supportedTargetTypes: string[]): Promise<string> {
+ async getExchangePaytoUri(
+ exchangeBaseUrl: string,
+ supportedTargetTypes: string[],
+ ): Promise<string> {
const wireInfo = await this.getWireInfo(exchangeBaseUrl);
for (let account of wireInfo.accounts) {
const paytoUri = new URI(account.url);
@@ -1820,8 +1865,6 @@ export class Wallet {
throw Error("exchange doesn't offer any denominations");
}
- console.log("updating exchange with wireMethodDetails", wireMethodDetails);
-
const r = await this.q().get<ExchangeRecord>(Stores.exchanges, baseUrl);
let exchangeInfo: ExchangeRecord;
@@ -2714,7 +2757,7 @@ export class Wallet {
*/
stop() {
this.timerGroup.stopCurrentAndFutureTimers();
- this.cryptoApi.terminateWorkers();
+ this.cryptoApi.stop();
}
async getSenderWireInfos(): Promise<SenderWireInfos> {
@@ -3199,7 +3242,7 @@ export class Wallet {
withdrawSig: response.reserve_sigs[i].reserve_sig,
};
await this.q().put(Stores.precoins, preCoin);
- this.processPreCoin(preCoin);
+ this.processPreCoin(preCoin.coinPub);
}
tipRecord.pickedUp = true;