From b5ee6b7b4ee506712f51e1b90e9256c4b0c0c603 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 2 Dec 2019 17:35:47 +0100 Subject: pending operations WIP --- src/wallet.ts | 86 +++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 29 deletions(-) (limited to 'src/wallet.ts') diff --git a/src/wallet.ts b/src/wallet.ts index 91f6c0cca..a6eecb8a9 100644 --- a/src/wallet.ts +++ b/src/wallet.ts @@ -46,7 +46,6 @@ import { abortFailedPayment, preparePay, confirmPay, - SpeculativePayData, } from "./wallet-impl/pay"; import { @@ -55,7 +54,6 @@ import { CurrencyRecord, DenominationRecord, ExchangeRecord, - PlanchetRecord, ProposalRecord, PurchaseRecord, ReserveRecord, @@ -107,9 +105,11 @@ import { processWithdrawSession } from "./wallet-impl/withdraw"; import { getHistory } from "./wallet-impl/history"; import { getPendingOperations } from "./wallet-impl/pending"; import { getBalances } from "./wallet-impl/balance"; -import { acceptTip, getTipStatus } from "./wallet-impl/tip"; +import { acceptTip, getTipStatus, processTip } from "./wallet-impl/tip"; import { returnCoins } from "./wallet-impl/return"; import { payback } from "./wallet-impl/payback"; +import { TimerGroup } from "./util/timer"; +import { AsyncCondition } from "./util/promiseUtils"; /** * Wallet protocol version spoken with the exchange @@ -155,6 +155,9 @@ const logger = new Logger("wallet.ts"); */ export class Wallet { private ws: InternalWalletState; + private timerGroup: TimerGroup = new TimerGroup(); + private latch = new AsyncCondition(); + private stopped: boolean = false; get db(): IDBDatabase { return this.ws.db; @@ -188,6 +191,7 @@ export class Wallet { notifier, speculativePayData: undefined, memoProcessReserve: new AsyncOpMemo(), + memoMakePlanchet: new AsyncOpMemo(), }; } @@ -195,7 +199,6 @@ export class Wallet { return getExchangePaytoUri(this.ws, exchangeBaseUrl, supportedTargetTypes); } - getWithdrawDetailsForAmount(baseUrl: any, amount: AmountJson): any { return getWithdrawDetailsForAmount(this.ws, baseUrl, amount); } @@ -210,26 +213,26 @@ export class Wallet { case "bug": return; case "dirty-coin": - await this.refresh(pending.coinPub); + await refresh(this.ws, pending.coinPub); break; case "exchange-update": - await this.updateExchangeFromUrl(pending.exchangeBaseUrl); - break; - case "planchet": - // Nothing to do, since the withdraw session will process the planchet + await updateExchangeFromUrl(this.ws, pending.exchangeBaseUrl); break; case "refresh": - await this.processRefreshSession(pending.refreshSessionId); + await processRefreshSession(this.ws, pending.refreshSessionId); break; case "reserve": - await this.processReserve(pending.reservePub); + await processReserve(this.ws, pending.reservePub); break; case "withdraw": - await this.processWithdrawSession(pending.withdrawSessionId); + await processWithdrawSession(this.ws, pending.withdrawSessionId); break; case "proposal": // Nothing to do, user needs to accept/reject break; + case "tip": + await processTip(this.ws, pending.tipId); + break; default: assertUnreachable(pending); } @@ -253,8 +256,22 @@ export class Wallet { * Process pending operations and wait for scheduled operations in * a loop until the wallet is stopped explicitly. */ - public async runUntilStopped(): Promise { - throw Error("not implemented"); + public async runLoopScheduledRetries(): Promise { + while (!this.stopped) { + console.log("running wallet retry loop iteration"); + let pending = await this.getPendingOperations(); + console.log("waiting for", pending.nextRetryDelay); + const timeout = this.timerGroup.resolveAfter(pending.nextRetryDelay.d_ms); + await Promise.race([timeout, this.latch.wait()]); + pending = await this.getPendingOperations(); + for (const p of pending.pendingOperations) { + try { + this.processOnePendingOperation(p); + } catch (e) { + console.error(e); + } + } + } } /** @@ -267,9 +284,12 @@ export class Wallet { const allPending = r.pendingOperations; const relevantPending = allPending.filter(x => { switch (x.type) { - case "planchet": case "reserve": return x.reservePub === reservePub; + case "withdraw": + return ( + x.source.type === "reserve" && x.source.reservePub === reservePub + ); default: return false; } @@ -347,7 +367,11 @@ export class Wallet { proposalId: string, sessionIdOverride: string | undefined, ): Promise { - return confirmPay(this.ws, proposalId, sessionIdOverride); + try { + return await confirmPay(this.ws, proposalId, sessionIdOverride); + } finally { + this.latch.trigger(); + } } /** @@ -358,7 +382,11 @@ export class Wallet { * state DORMANT. */ async processReserve(reservePub: string): Promise { - return processReserve(this.ws, reservePub); + try { + return await processReserve(this.ws, reservePub); + } finally { + this.latch.trigger(); + } } /** @@ -370,7 +398,11 @@ export class Wallet { async createReserve( req: CreateReserveRequest, ): Promise { - return createReserve(this.ws, req); + try { + return createReserve(this.ws, req); + } finally { + this.latch.trigger(); + } } /** @@ -383,14 +415,13 @@ export class Wallet { * an unconfirmed reserve should be hidden. */ async confirmReserve(req: ConfirmReserveRequest): Promise { - return confirmReserve(this.ws, req); + try { + return confirmReserve(this.ws, req); + } finally { + this.latch.trigger(); + } } - private async processWithdrawSession( - withdrawalSessionId: string, - ): Promise { - return processWithdrawSession(this.ws, withdrawalSessionId); - } /** * Check if and how an exchange is trusted and/or audited. @@ -435,10 +466,6 @@ export class Wallet { return refresh(this.ws, oldCoinPub, force); } - async processRefreshSession(refreshSessionId: string) { - return processRefreshSession(this.ws, refreshSessionId); - } - async findExchange( exchangeBaseUrl: string, ): Promise { @@ -516,7 +543,8 @@ export class Wallet { * Stop ongoing processing. */ stop() { - //this.timerGroup.stopCurrentAndFutureTimers(); + this.stopped = true; + this.timerGroup.stopCurrentAndFutureTimers(); this.cryptoApi.stop(); } -- cgit v1.2.3