From f67d7f54f9d0fed97446898942e3dfee67ee2985 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 5 Dec 2019 19:38:19 +0100 Subject: threads, retries and notifications WIP --- src/wallet.ts | 202 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 107 insertions(+), 95 deletions(-) (limited to 'src/wallet.ts') diff --git a/src/wallet.ts b/src/wallet.ts index 772bb01ac..86b3085f4 100644 --- a/src/wallet.ts +++ b/src/wallet.ts @@ -22,7 +22,7 @@ /** * Imports. */ -import { CryptoApi, CryptoWorkerFactory } from "./crypto/cryptoApi"; +import { CryptoApi, CryptoWorkerFactory } from "./crypto/workers/cryptoApi"; import { HttpRequestLibrary } from "./util/http"; import { oneShotPut, @@ -49,6 +49,7 @@ import { processDownloadProposal, applyRefund, getFullRefundFees, + processPurchaseImpl, } from "./wallet-impl/pay"; import { @@ -65,14 +66,12 @@ import { } from "./dbTypes"; import { MerchantRefundPermission } from "./talerTypes"; import { - Badge, BenchmarkResult, ConfirmPayResult, ConfirmReserveRequest, CreateReserveRequest, CreateReserveResponse, HistoryEvent, - Notifier, ReturnCoinsRequest, SenderWireInfos, TipStatus, @@ -85,6 +84,8 @@ import { PendingOperationInfo, PendingOperationsResponse, HistoryQuery, + WalletNotification, + NotificationType, } from "./walletTypes"; import { Logger } from "./util/logging"; @@ -97,8 +98,6 @@ import { } from "./wallet-impl/exchanges"; import { processReserve } from "./wallet-impl/reserves"; -import { AsyncOpMemo } from "./util/asyncMemo"; - import { InternalWalletState } from "./wallet-impl/state"; import { createReserve, confirmReserve } from "./wallet-impl/reserves"; import { processRefreshSession, refresh } from "./wallet-impl/refresh"; @@ -111,6 +110,7 @@ import { returnCoins } from "./wallet-impl/return"; import { payback } from "./wallet-impl/payback"; import { TimerGroup } from "./util/timer"; import { AsyncCondition } from "./util/promiseUtils"; +import { AsyncOpMemoSingle } from "./util/asyncMemo"; /** * Wallet protocol version spoken with the exchange @@ -137,18 +137,6 @@ const builtinCurrencies: CurrencyRecord[] = [ }, ]; -/** - * This error is thrown when an - */ -export class OperationFailedAndReportedError extends Error { - constructor(message: string) { - super(message); - - // Set the prototype explicitly. - Object.setPrototypeOf(this, OperationFailedAndReportedError.prototype); - } -} - const logger = new Logger("wallet.ts"); /** @@ -159,41 +147,18 @@ export class Wallet { private timerGroup: TimerGroup = new TimerGroup(); private latch = new AsyncCondition(); private stopped: boolean = false; + private memoRunRetryLoop = new AsyncOpMemoSingle(); get db(): IDBDatabase { return this.ws.db; } - private get badge(): Badge { - return this.ws.badge; - } - - private get cryptoApi(): CryptoApi { - return this.ws.cryptoApi; - } - - private get notifier(): Notifier { - return this.ws.notifier; - } - constructor( db: IDBDatabase, http: HttpRequestLibrary, - badge: Badge, - notifier: Notifier, cryptoWorkerFactory: CryptoWorkerFactory, ) { - this.ws = { - badge, - cachedNextUrl: {}, - cryptoApi: new CryptoApi(cryptoWorkerFactory), - db, - http, - notifier, - speculativePayData: undefined, - memoProcessReserve: new AsyncOpMemo(), - memoMakePlanchet: new AsyncOpMemo(), - }; + this.ws = new InternalWalletState(db, http, cryptoWorkerFactory); } getExchangePaytoUri(exchangeBaseUrl: string, supportedTargetTypes: string[]) { @@ -204,6 +169,10 @@ export class Wallet { return getWithdrawDetailsForAmount(this.ws, baseUrl, amount); } + addNotificationListener(f: (n: WalletNotification) => void): void { + this.ws.addNotificationListener(f); + } + /** * Execute one operation based on the pending operation info record. */ @@ -213,6 +182,7 @@ export class Wallet { ): Promise { switch (pending.type) { case "bug": + // Nothing to do, will just be displayed to the user return; case "dirty-coin": await refresh(this.ws, pending.coinPub); @@ -224,7 +194,7 @@ export class Wallet { await processRefreshSession(this.ws, pending.refreshSessionId); break; case "reserve": - await processReserve(this.ws, pending.reservePub); + await processReserve(this.ws, pending.reservePub, forceNow); break; case "withdraw": await processWithdrawSession(this.ws, pending.withdrawSessionId); @@ -239,6 +209,7 @@ export class Wallet { await processTip(this.ws, pending.tipId); break; case "pay": + await processPurchaseImpl(this.ws, pending.proposalId); break; default: assertUnreachable(pending); @@ -249,7 +220,8 @@ export class Wallet { * Process pending operations. */ public async runPending(forceNow: boolean = false): Promise { - const pendingOpsResponse = await this.getPendingOperations(); + const onlyDue = !forceNow; + const pendingOpsResponse = await this.getPendingOperations(onlyDue); for (const p of pendingOpsResponse.pendingOperations) { try { await this.processOnePendingOperation(p, forceNow); @@ -260,54 +232,96 @@ export class Wallet { } /** - * Process pending operations and wait for scheduled operations in - * a loop until the wallet is stopped explicitly. + * Run the wallet until there are no more pending operations that give + * liveness left. The wallet will be in a stopped state when this function + * returns without resolving to an exception. */ - public async runLoopScheduledRetries(): Promise { - while (!this.stopped) { - console.log("running wallet retry loop iteration"); - let pending = await this.getPendingOperations(); - console.log("waiting for", pending.nextRetryDelay); - const timeout = this.timerGroup.resolveAfter(pending.nextRetryDelay.d_ms); - await Promise.race([timeout, this.latch.wait()]); - pending = await this.getPendingOperations(); - for (const p of pending.pendingOperations) { - try { - this.processOnePendingOperation(p); - } catch (e) { - console.error(e); + public async runUntilDone(): Promise { + const p = new Promise((resolve, reject) => { + // Run this asynchronously + this.addNotificationListener(n => { + if ( + n.type === NotificationType.WaitingForRetry && + n.numGivingLiveness == 0 + ) { + logger.trace("no liveness-giving operations left, stopping"); + this.stop(); } - } - } + }); + this.runRetryLoop().catch(e => { + console.log("exception in wallet retry loop"); + reject(e); + }); + }); + await p; } /** - * Run until all coins have been withdrawn from the given reserve, - * or an error has occured. + * Process pending operations and wait for scheduled operations in + * a loop until the wallet is stopped explicitly. */ - public async runUntilReserveDepleted(reservePub: string) { - while (true) { - const r = await this.getPendingOperations(); - const allPending = r.pendingOperations; - const relevantPending = allPending.filter(x => { - switch (x.type) { - case "reserve": - return x.reservePub === reservePub; - case "withdraw": - return ( - x.source.type === "reserve" && x.source.reservePub === reservePub - ); - default: - return false; - } - }); - if (relevantPending.length === 0) { - return; + public async runRetryLoop(): Promise { + // Make sure we only run one main loop at a time. + return this.memoRunRetryLoop.memo(async () => { + try { + await this.runRetryLoopImpl(); + } catch (e) { + console.error("error during retry loop execution", e); + throw e; } - for (const p of relevantPending) { - await this.processOnePendingOperation(p); + }); + } + + private async runRetryLoopImpl(): Promise { + while (!this.stopped) { + console.log("running wallet retry loop iteration"); + let pending = await this.getPendingOperations(true); + if (pending.pendingOperations.length === 0) { + const allPending = await this.getPendingOperations(false); + let numPending = 0; + let numGivingLiveness = 0; + for (const p of allPending.pendingOperations) { + numPending++; + if (p.givesLifeness) { + numGivingLiveness++; + } + } + let timeout; + if ( + allPending.pendingOperations.length === 0 || + allPending.nextRetryDelay.d_ms === Number.MAX_SAFE_INTEGER + ) { + // Wait forever + timeout = new Promise(() => {}); + console.log("waiting forever"); + } else { + console.log("waiting for timeout", pending.nextRetryDelay); + timeout = this.timerGroup.resolveAfter( + allPending.nextRetryDelay.d_ms, + ); + } + this.ws.notify({ + type: NotificationType.WaitingForRetry, + numGivingLiveness, + numPending, + }); + await Promise.race([timeout, this.latch.wait()]); + console.log("timeout done"); + } else { + logger.trace("running pending operations that are due"); + // FIXME: maybe be a bit smarter about executing these + // opeations in parallel? + for (const p of pending.pendingOperations) { + try { + console.log("running", p); + await this.processOnePendingOperation(p); + } catch (e) { + console.error(e); + } + } } } + logger.trace("exiting wallet retry loop"); } /** @@ -429,7 +443,6 @@ export class Wallet { } } - /** * Check if and how an exchange is trusted and/or audited. */ @@ -466,7 +479,7 @@ export class Wallet { * Get detailed balance information, sliced by exchange and by currency. */ async getBalances(): Promise { - return getBalances(this.ws); + return this.ws.memoGetBalance.memo(() => getBalances(this.ws)); } async refresh(oldCoinPub: string, force: boolean = false): Promise { @@ -488,8 +501,12 @@ export class Wallet { return getHistory(this.ws, historyQuery); } - async getPendingOperations(): Promise { - return getPendingOperations(this.ws); + async getPendingOperations( + onlyDue: boolean = false, + ): Promise { + return this.ws.memoGetPending.memo(() => + getPendingOperations(this.ws, onlyDue), + ); } async getDenoms(exchangeUrl: string): Promise { @@ -517,7 +534,6 @@ export class Wallet { async updateCurrency(currencyRecord: CurrencyRecord): Promise { logger.trace("updating currency to", currencyRecord); await oneShotPut(this.db, Stores.currencies, currencyRecord); - this.notifier.notify(); } async getReserves(exchangeBaseUrl: string): Promise { @@ -552,7 +568,7 @@ export class Wallet { stop() { this.stopped = true; this.timerGroup.stopCurrentAndFutureTimers(); - this.cryptoApi.stop(); + this.ws.cryptoApi.stop(); } async getSenderWireInfos(): Promise { @@ -693,17 +709,13 @@ export class Wallet { const totalFees = totalRefundFees; return { contractTerms: purchase.contractTerms, - hasRefund: purchase.timestamp_refund !== undefined, + hasRefund: purchase.lastRefundTimestamp !== undefined, totalRefundAmount: totalRefundAmount, totalRefundAndRefreshFees: totalFees, }; } - clearNotification(): void { - this.badge.clearNotification(); - } - benchmarkCrypto(repetitions: number): Promise { - return this.cryptoApi.benchmark(repetitions); + return this.ws.cryptoApi.benchmark(repetitions); } } -- cgit v1.2.3