aboutsummaryrefslogtreecommitdiff
path: root/src/wallet.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2019-12-05 19:38:19 +0100
committerFlorian Dold <florian.dold@gmail.com>2019-12-05 19:38:19 +0100
commitf67d7f54f9d0fed97446898942e3dfee67ee2985 (patch)
tree2b81738025e8f61250ede10908cbf81071e16975 /src/wallet.ts
parent829acdd3d98f1014747f15ecb619b6fbaa06b640 (diff)
threads, retries and notifications WIP
Diffstat (limited to 'src/wallet.ts')
-rw-r--r--src/wallet.ts202
1 files changed, 107 insertions, 95 deletions
diff --git a/src/wallet.ts b/src/wallet.ts
index 772bb01ac..86b3085f4 100644
--- a/src/wallet.ts
+++ b/src/wallet.ts
@@ -22,7 +22,7 @@
/**
* Imports.
*/
-import { CryptoApi, CryptoWorkerFactory } from "./crypto/cryptoApi";
+import { CryptoApi, CryptoWorkerFactory } from "./crypto/workers/cryptoApi";
import { HttpRequestLibrary } from "./util/http";
import {
oneShotPut,
@@ -49,6 +49,7 @@ import {
processDownloadProposal,
applyRefund,
getFullRefundFees,
+ processPurchaseImpl,
} from "./wallet-impl/pay";
import {
@@ -65,14 +66,12 @@ import {
} from "./dbTypes";
import { MerchantRefundPermission } from "./talerTypes";
import {
- Badge,
BenchmarkResult,
ConfirmPayResult,
ConfirmReserveRequest,
CreateReserveRequest,
CreateReserveResponse,
HistoryEvent,
- Notifier,
ReturnCoinsRequest,
SenderWireInfos,
TipStatus,
@@ -85,6 +84,8 @@ import {
PendingOperationInfo,
PendingOperationsResponse,
HistoryQuery,
+ WalletNotification,
+ NotificationType,
} from "./walletTypes";
import { Logger } from "./util/logging";
@@ -97,8 +98,6 @@ import {
} from "./wallet-impl/exchanges";
import { processReserve } from "./wallet-impl/reserves";
-import { AsyncOpMemo } from "./util/asyncMemo";
-
import { InternalWalletState } from "./wallet-impl/state";
import { createReserve, confirmReserve } from "./wallet-impl/reserves";
import { processRefreshSession, refresh } from "./wallet-impl/refresh";
@@ -111,6 +110,7 @@ import { returnCoins } from "./wallet-impl/return";
import { payback } from "./wallet-impl/payback";
import { TimerGroup } from "./util/timer";
import { AsyncCondition } from "./util/promiseUtils";
+import { AsyncOpMemoSingle } from "./util/asyncMemo";
/**
* Wallet protocol version spoken with the exchange
@@ -137,18 +137,6 @@ const builtinCurrencies: CurrencyRecord[] = [
},
];
-/**
- * This error is thrown when an
- */
-export class OperationFailedAndReportedError extends Error {
- constructor(message: string) {
- super(message);
-
- // Set the prototype explicitly.
- Object.setPrototypeOf(this, OperationFailedAndReportedError.prototype);
- }
-}
-
const logger = new Logger("wallet.ts");
/**
@@ -159,41 +147,18 @@ export class Wallet {
private timerGroup: TimerGroup = new TimerGroup();
private latch = new AsyncCondition();
private stopped: boolean = false;
+ private memoRunRetryLoop = new AsyncOpMemoSingle<void>();
get db(): IDBDatabase {
return this.ws.db;
}
- private get badge(): Badge {
- return this.ws.badge;
- }
-
- private get cryptoApi(): CryptoApi {
- return this.ws.cryptoApi;
- }
-
- private get notifier(): Notifier {
- return this.ws.notifier;
- }
-
constructor(
db: IDBDatabase,
http: HttpRequestLibrary,
- badge: Badge,
- notifier: Notifier,
cryptoWorkerFactory: CryptoWorkerFactory,
) {
- this.ws = {
- badge,
- cachedNextUrl: {},
- cryptoApi: new CryptoApi(cryptoWorkerFactory),
- db,
- http,
- notifier,
- speculativePayData: undefined,
- memoProcessReserve: new AsyncOpMemo<void>(),
- memoMakePlanchet: new AsyncOpMemo<void>(),
- };
+ this.ws = new InternalWalletState(db, http, cryptoWorkerFactory);
}
getExchangePaytoUri(exchangeBaseUrl: string, supportedTargetTypes: string[]) {
@@ -204,6 +169,10 @@ export class Wallet {
return getWithdrawDetailsForAmount(this.ws, baseUrl, amount);
}
+ addNotificationListener(f: (n: WalletNotification) => void): void {
+ this.ws.addNotificationListener(f);
+ }
+
/**
* Execute one operation based on the pending operation info record.
*/
@@ -213,6 +182,7 @@ export class Wallet {
): Promise<void> {
switch (pending.type) {
case "bug":
+ // Nothing to do, will just be displayed to the user
return;
case "dirty-coin":
await refresh(this.ws, pending.coinPub);
@@ -224,7 +194,7 @@ export class Wallet {
await processRefreshSession(this.ws, pending.refreshSessionId);
break;
case "reserve":
- await processReserve(this.ws, pending.reservePub);
+ await processReserve(this.ws, pending.reservePub, forceNow);
break;
case "withdraw":
await processWithdrawSession(this.ws, pending.withdrawSessionId);
@@ -239,6 +209,7 @@ export class Wallet {
await processTip(this.ws, pending.tipId);
break;
case "pay":
+ await processPurchaseImpl(this.ws, pending.proposalId);
break;
default:
assertUnreachable(pending);
@@ -249,7 +220,8 @@ export class Wallet {
* Process pending operations.
*/
public async runPending(forceNow: boolean = false): Promise<void> {
- const pendingOpsResponse = await this.getPendingOperations();
+ const onlyDue = !forceNow;
+ const pendingOpsResponse = await this.getPendingOperations(onlyDue);
for (const p of pendingOpsResponse.pendingOperations) {
try {
await this.processOnePendingOperation(p, forceNow);
@@ -260,54 +232,96 @@ export class Wallet {
}
/**
- * Process pending operations and wait for scheduled operations in
- * a loop until the wallet is stopped explicitly.
+ * Run the wallet until there are no more pending operations that give
+ * liveness left. The wallet will be in a stopped state when this function
+ * returns without resolving to an exception.
*/
- public async runLoopScheduledRetries(): Promise<void> {
- while (!this.stopped) {
- console.log("running wallet retry loop iteration");
- let pending = await this.getPendingOperations();
- console.log("waiting for", pending.nextRetryDelay);
- const timeout = this.timerGroup.resolveAfter(pending.nextRetryDelay.d_ms);
- await Promise.race([timeout, this.latch.wait()]);
- pending = await this.getPendingOperations();
- for (const p of pending.pendingOperations) {
- try {
- this.processOnePendingOperation(p);
- } catch (e) {
- console.error(e);
+ public async runUntilDone(): Promise<void> {
+ const p = new Promise((resolve, reject) => {
+ // Run this asynchronously
+ this.addNotificationListener(n => {
+ if (
+ n.type === NotificationType.WaitingForRetry &&
+ n.numGivingLiveness == 0
+ ) {
+ logger.trace("no liveness-giving operations left, stopping");
+ this.stop();
}
- }
- }
+ });
+ this.runRetryLoop().catch(e => {
+ console.log("exception in wallet retry loop");
+ reject(e);
+ });
+ });
+ await p;
}
/**
- * Run until all coins have been withdrawn from the given reserve,
- * or an error has occured.
+ * Process pending operations and wait for scheduled operations in
+ * a loop until the wallet is stopped explicitly.
*/
- public async runUntilReserveDepleted(reservePub: string) {
- while (true) {
- const r = await this.getPendingOperations();
- const allPending = r.pendingOperations;
- const relevantPending = allPending.filter(x => {
- switch (x.type) {
- case "reserve":
- return x.reservePub === reservePub;
- case "withdraw":
- return (
- x.source.type === "reserve" && x.source.reservePub === reservePub
- );
- default:
- return false;
- }
- });
- if (relevantPending.length === 0) {
- return;
+ public async runRetryLoop(): Promise<void> {
+ // Make sure we only run one main loop at a time.
+ return this.memoRunRetryLoop.memo(async () => {
+ try {
+ await this.runRetryLoopImpl();
+ } catch (e) {
+ console.error("error during retry loop execution", e);
+ throw e;
}
- for (const p of relevantPending) {
- await this.processOnePendingOperation(p);
+ });
+ }
+
+ private async runRetryLoopImpl(): Promise<void> {
+ while (!this.stopped) {
+ console.log("running wallet retry loop iteration");
+ let pending = await this.getPendingOperations(true);
+ if (pending.pendingOperations.length === 0) {
+ const allPending = await this.getPendingOperations(false);
+ let numPending = 0;
+ let numGivingLiveness = 0;
+ for (const p of allPending.pendingOperations) {
+ numPending++;
+ if (p.givesLifeness) {
+ numGivingLiveness++;
+ }
+ }
+ let timeout;
+ if (
+ allPending.pendingOperations.length === 0 ||
+ allPending.nextRetryDelay.d_ms === Number.MAX_SAFE_INTEGER
+ ) {
+ // Wait forever
+ timeout = new Promise(() => {});
+ console.log("waiting forever");
+ } else {
+ console.log("waiting for timeout", pending.nextRetryDelay);
+ timeout = this.timerGroup.resolveAfter(
+ allPending.nextRetryDelay.d_ms,
+ );
+ }
+ this.ws.notify({
+ type: NotificationType.WaitingForRetry,
+ numGivingLiveness,
+ numPending,
+ });
+ await Promise.race([timeout, this.latch.wait()]);
+ console.log("timeout done");
+ } else {
+ logger.trace("running pending operations that are due");
+ // FIXME: maybe be a bit smarter about executing these
+ // opeations in parallel?
+ for (const p of pending.pendingOperations) {
+ try {
+ console.log("running", p);
+ await this.processOnePendingOperation(p);
+ } catch (e) {
+ console.error(e);
+ }
+ }
}
}
+ logger.trace("exiting wallet retry loop");
}
/**
@@ -429,7 +443,6 @@ export class Wallet {
}
}
-
/**
* Check if and how an exchange is trusted and/or audited.
*/
@@ -466,7 +479,7 @@ export class Wallet {
* Get detailed balance information, sliced by exchange and by currency.
*/
async getBalances(): Promise<WalletBalance> {
- return getBalances(this.ws);
+ return this.ws.memoGetBalance.memo(() => getBalances(this.ws));
}
async refresh(oldCoinPub: string, force: boolean = false): Promise<void> {
@@ -488,8 +501,12 @@ export class Wallet {
return getHistory(this.ws, historyQuery);
}
- async getPendingOperations(): Promise<PendingOperationsResponse> {
- return getPendingOperations(this.ws);
+ async getPendingOperations(
+ onlyDue: boolean = false,
+ ): Promise<PendingOperationsResponse> {
+ return this.ws.memoGetPending.memo(() =>
+ getPendingOperations(this.ws, onlyDue),
+ );
}
async getDenoms(exchangeUrl: string): Promise<DenominationRecord[]> {
@@ -517,7 +534,6 @@ export class Wallet {
async updateCurrency(currencyRecord: CurrencyRecord): Promise<void> {
logger.trace("updating currency to", currencyRecord);
await oneShotPut(this.db, Stores.currencies, currencyRecord);
- this.notifier.notify();
}
async getReserves(exchangeBaseUrl: string): Promise<ReserveRecord[]> {
@@ -552,7 +568,7 @@ export class Wallet {
stop() {
this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers();
- this.cryptoApi.stop();
+ this.ws.cryptoApi.stop();
}
async getSenderWireInfos(): Promise<SenderWireInfos> {
@@ -693,17 +709,13 @@ export class Wallet {
const totalFees = totalRefundFees;
return {
contractTerms: purchase.contractTerms,
- hasRefund: purchase.timestamp_refund !== undefined,
+ hasRefund: purchase.lastRefundTimestamp !== undefined,
totalRefundAmount: totalRefundAmount,
totalRefundAndRefreshFees: totalFees,
};
}
- clearNotification(): void {
- this.badge.clearNotification();
- }
-
benchmarkCrypto(repetitions: number): Promise<BenchmarkResult> {
- return this.cryptoApi.benchmark(repetitions);
+ return this.ws.cryptoApi.benchmark(repetitions);
}
}