diff options
author | Florian Dold <florian.dold@gmail.com> | 2019-12-05 19:38:19 +0100 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2019-12-05 19:38:19 +0100 |
commit | f67d7f54f9d0fed97446898942e3dfee67ee2985 (patch) | |
tree | 2b81738025e8f61250ede10908cbf81071e16975 /src | |
parent | 829acdd3d98f1014747f15ecb619b6fbaa06b640 (diff) |
threads, retries and notifications WIP
Diffstat (limited to 'src')
36 files changed, 1634 insertions, 870 deletions
diff --git a/src/android/index.ts b/src/android/index.ts index 4d0136ecf..dcdb9d756 100644 --- a/src/android/index.ts +++ b/src/android/index.ts @@ -125,6 +125,7 @@ export function installAndroidWalletListener() { return; } const id = msg.id; + console.log(`android listener: got request for ${operation} (${id})`); let result; switch (operation) { case "init": { @@ -137,7 +138,7 @@ export function installAndroidWalletListener() { }; const w = await getDefaultNodeWallet(walletArgs); maybeWallet = w; - w.runLoopScheduledRetries().catch((e) => { + w.runRetryLoop().catch((e) => { console.error("Error during wallet retry loop", e); }); wp.resolve(w); @@ -156,7 +157,11 @@ export function installAndroidWalletListener() { } case "withdrawTestkudos": { const wallet = await wp.promise; - await withdrawTestBalance(wallet); + try { + await withdrawTestBalance(wallet); + } catch (e) { + console.log("error during withdrawTestBalance", e); + } result = {}; break; } @@ -221,7 +226,7 @@ export function installAndroidWalletListener() { maybeWallet = undefined; const w = await getDefaultNodeWallet(walletArgs); maybeWallet = w; - w.runLoopScheduledRetries().catch((e) => { + w.runRetryLoop().catch((e) => { console.error("Error during wallet retry loop", e); }); wp.resolve(w); @@ -233,6 +238,8 @@ export function installAndroidWalletListener() { return; } + console.log(`android listener: sending response for ${operation} (${id})`); + const respMsg = { result, id, operation, type: "response" }; sendMessage(JSON.stringify(respMsg)); }; diff --git a/src/crypto/nodeProcessWorker.ts b/src/crypto/nodeProcessWorker.ts deleted file mode 100644 index 8ff149788..000000000 --- a/src/crypto/nodeProcessWorker.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { CryptoWorkerFactory } from "./cryptoApi"; - -/* - This file is part of TALER - (C) 2016 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - */ - - -// tslint:disable:no-var-requires - -import { CryptoWorker } from "./cryptoWorker"; - -import path = require("path"); -import child_process = require("child_process"); - -const nodeWorkerEntry = path.join(__dirname, "nodeWorkerEntry.js"); - - -export class NodeCryptoWorkerFactory implements CryptoWorkerFactory { - startWorker(): CryptoWorker { - if (typeof require === "undefined") { - throw Error("cannot make worker, require(...) not defined"); - } - const workerCtor = require("./nodeProcessWorker").Worker; - const workerPath = __dirname + "/cryptoWorker.js"; - return new workerCtor(workerPath); - } - - getConcurrency(): number { - return 4; - } -} - -/** - * Worker implementation that uses node subprocesses. - */ -export class Worker { - private child: any; - - /** - * Function to be called when we receive a message from the worker thread. - */ - onmessage: undefined | ((m: any) => void); - - /** - * Function to be called when we receive an error from the worker thread. - */ - onerror: undefined | ((m: any) => void); - - private dispatchMessage(msg: any) { - if (this.onmessage) { - this.onmessage({ data: msg }); - } else { - console.warn("no handler for worker event 'message' defined") - } - } - - private dispatchError(msg: any) { - if (this.onerror) { - this.onerror({ data: msg }); - } else { - console.warn("no handler for worker event 'error' defined") - } - } - - constructor() { - this.child = child_process.fork(nodeWorkerEntry); - this.onerror = undefined; - this.onmessage = undefined; - - this.child.on("error", (e: any) => { - this.dispatchError(e); - }); - - this.child.on("message", (msg: any) => { - this.dispatchMessage(msg); - }); - } - - /** - * Add an event listener for either an "error" or "message" event. - */ - addEventListener(event: "message" | "error", fn: (x: any) => void): void { - switch (event) { - case "message": - this.onmessage = fn; - break; - case "error": - this.onerror = fn; - break; - } - } - - /** - * Send a message to the worker thread. - */ - postMessage (msg: any) { - this.child.send(JSON.stringify({data: msg})); - } - - /** - * Forcibly terminate the worker thread. - */ - terminate () { - this.child.kill("SIGINT"); - } -} diff --git a/src/crypto/nodeWorkerEntry.ts b/src/crypto/nodeWorkerEntry.ts deleted file mode 100644 index 1e088d987..000000000 --- a/src/crypto/nodeWorkerEntry.ts +++ /dev/null @@ -1,69 +0,0 @@ -/* - This file is part of TALER - (C) 2016 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - */ - -// tslint:disable:no-var-requires - -import { CryptoImplementation } from "./cryptoImplementation"; - -async function handleRequest(operation: string, id: number, args: string[]) { - - const impl = new CryptoImplementation(); - - if (!(operation in impl)) { - console.error(`crypto operation '${operation}' not found`); - return; - } - - try { - const result = (impl as any)[operation](...args); - if (process.send) { - process.send({ result, id }); - } else { - console.error("process.send not available"); - } - } catch (e) { - console.error("error during operation", e); - return; - } -} - -process.on("message", (msgStr: any) => { - const msg = JSON.parse(msgStr); - - const args = msg.data.args; - if (!Array.isArray(args)) { - console.error("args must be array"); - return; - } - const id = msg.data.id; - if (typeof id !== "number") { - console.error("RPC id must be number"); - return; - } - const operation = msg.data.operation; - if (typeof operation !== "string") { - console.error("RPC operation must be string"); - return; - } - - handleRequest(operation, id, args).catch((e) => { - console.error("error in node worker", e); - }); -}); - -process.on("uncaughtException", (err: any) => { - console.error("uncaught exception in node worker entry", err); -}); diff --git a/src/crypto/browserWorkerEntry.ts b/src/crypto/workers/browserWorkerEntry.ts index 5ac762c13..5ac762c13 100644 --- a/src/crypto/browserWorkerEntry.ts +++ b/src/crypto/workers/browserWorkerEntry.ts diff --git a/src/crypto/cryptoApi.ts b/src/crypto/workers/cryptoApi.ts index 2521b54ea..5537bb39f 100644 --- a/src/crypto/cryptoApi.ts +++ b/src/crypto/workers/cryptoApi.ts @@ -22,7 +22,7 @@ /** * Imports. */ -import { AmountJson } from "../util/amounts"; +import { AmountJson } from "../../util/amounts"; import { CoinRecord, @@ -30,15 +30,21 @@ import { RefreshSessionRecord, TipPlanchet, WireFee, -} from "../dbTypes"; +} from "../../dbTypes"; import { CryptoWorker } from "./cryptoWorker"; -import { ContractTerms, PaybackRequest } from "../talerTypes"; +import { ContractTerms, PaybackRequest } from "../../talerTypes"; -import { BenchmarkResult, CoinWithDenom, PayCoinInfo, PlanchetCreationResult, PlanchetCreationRequest } from "../walletTypes"; +import { + BenchmarkResult, + CoinWithDenom, + PayCoinInfo, + PlanchetCreationResult, + PlanchetCreationRequest, +} from "../../walletTypes"; -import * as timer from "../util/timer"; +import * as timer from "../../util/timer"; /** * State of a crypto worker. @@ -172,7 +178,8 @@ export class CryptoApi { wake(ws: WorkerState, work: WorkItem): void { if (this.stopped) { console.log("cryptoApi is stopped"); - CryptoApi.enableTracing && console.log("not waking, as cryptoApi is stopped"); + CryptoApi.enableTracing && + console.log("not waking, as cryptoApi is stopped"); return; } if (ws.currentWorkItem !== null) { @@ -333,7 +340,7 @@ export class CryptoApi { } createPlanchet( - req: PlanchetCreationRequest + req: PlanchetCreationRequest, ): Promise<PlanchetCreationResult> { return this.doRpc<PlanchetCreationResult>("createPlanchet", 1, req); } @@ -398,6 +405,10 @@ export class CryptoApi { return this.doRpc<string>("rsaUnblind", 4, sig, bk, pk); } + rsaVerify(hm: string, sig: string, pk: string): Promise<boolean> { + return this.doRpc<boolean>("rsaVerify", 4, hm, sig, pk); + } + createPaybackRequest(coin: CoinRecord): Promise<PaybackRequest> { return this.doRpc<PaybackRequest>("createPaybackRequest", 1, coin); } diff --git a/src/crypto/cryptoImplementation.ts b/src/crypto/workers/cryptoImplementation.ts index faebbaa4a..00d81ce27 100644 --- a/src/crypto/cryptoImplementation.ts +++ b/src/crypto/workers/cryptoImplementation.ts @@ -30,12 +30,12 @@ import { DenominationRecord, RefreshPlanchetRecord, RefreshSessionRecord, - ReserveRecord, TipPlanchet, WireFee, -} from "../dbTypes"; + initRetryInfo, +} from "../../dbTypes"; -import { CoinPaySig, ContractTerms, PaybackRequest } from "../talerTypes"; +import { CoinPaySig, ContractTerms, PaybackRequest } from "../../talerTypes"; import { BenchmarkResult, CoinWithDenom, @@ -43,11 +43,12 @@ import { Timestamp, PlanchetCreationResult, PlanchetCreationRequest, -} from "../walletTypes"; -import { canonicalJson, getTalerStampSec } from "../util/helpers"; -import { AmountJson } from "../util/amounts"; -import * as Amounts from "../util/amounts"; -import * as timer from "../util/timer"; + getTimestampNow, +} from "../../walletTypes"; +import { canonicalJson, getTalerStampSec } from "../../util/helpers"; +import { AmountJson } from "../../util/amounts"; +import * as Amounts from "../../util/amounts"; +import * as timer from "../../util/timer"; import { getRandomBytes, encodeCrock, @@ -64,8 +65,9 @@ import { createEcdheKeyPair, keyExchangeEcdheEddsa, setupRefreshPlanchet, -} from "./talerCrypto"; -import { randomBytes } from "./primitives/nacl-fast"; + rsaVerify, +} from "../talerCrypto"; +import { randomBytes } from "../primitives/nacl-fast"; enum SignaturePurpose { RESERVE_WITHDRAW = 1200, @@ -304,9 +306,9 @@ export class CryptoImplementation { /** * Unblind a blindly signed value. */ - rsaUnblind(sig: string, bk: string, pk: string): string { + rsaUnblind(blindedSig: string, bk: string, pk: string): string { const denomSig = rsaUnblind( - decodeCrock(sig), + decodeCrock(blindedSig), decodeCrock(pk), decodeCrock(bk), ); @@ -314,6 +316,13 @@ export class CryptoImplementation { } /** + * Unblind a blindly signed value. + */ + rsaVerify(hm: string, sig: string, pk: string): boolean { + return rsaVerify(hash(decodeCrock(hm)), decodeCrock(sig), decodeCrock(pk)); + } + + /** * Generate updated coins (to store in the database) * and deposit permissions for each given coin. */ @@ -488,7 +497,6 @@ export class CryptoImplementation { refreshSessionId, confirmSig: encodeCrock(confirmSig), exchangeBaseUrl, - finished: false, hash: encodeCrock(sessionHash), meltCoinPub: meltCoin.coinPub, newDenomHashes: newCoinDenoms.map(d => d.denomPubHash), @@ -499,6 +507,10 @@ export class CryptoImplementation { transferPubs, valueOutput, valueWithFee, + created: getTimestampNow(), + retryInfo: initRetryInfo(), + finishedTimestamp: undefined, + lastError: undefined, }; return refreshSession; diff --git a/src/crypto/cryptoWorker.ts b/src/crypto/workers/cryptoWorker.ts index 0ea641dde..d4449f4a2 100644 --- a/src/crypto/cryptoWorker.ts +++ b/src/crypto/workers/cryptoWorker.ts @@ -3,6 +3,6 @@ export interface CryptoWorker { terminate(): void; - onmessage: (m: any) => void; - onerror: (m: any) => void; + onmessage: ((m: any) => void) | undefined; + onerror: ((m: any) => void) | undefined; }
\ No newline at end of file diff --git a/src/crypto/workers/nodeThreadWorker.ts b/src/crypto/workers/nodeThreadWorker.ts new file mode 100644 index 000000000..b42031c40 --- /dev/null +++ b/src/crypto/workers/nodeThreadWorker.ts @@ -0,0 +1,175 @@ +import { CryptoWorkerFactory } from "./cryptoApi"; + +/* + This file is part of TALER + (C) 2016 GNUnet e.V. + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +// tslint:disable:no-var-requires + +import { CryptoWorker } from "./cryptoWorker"; + +import worker_threads = require("worker_threads"); +import os = require("os"); +import { CryptoImplementation } from "./cryptoImplementation"; + +const f = __filename; + +const workerCode = ` + const worker_threads = require('worker_threads'); + const parentPort = worker_threads.parentPort; + let tw; + try { + tw = require("${f}"); + } catch (e) { + console.log("could not load from ${f}"); + } + if (!tw) { + try { + tw = require("taler-wallet-android"); + } catch (e) { + console.log("could not load taler-wallet-android either"); + throw e; + } + } + parentPort.on("message", tw.handleWorkerMessage); + parentPort.on("error", tw.handleWorkerError); +`; + +/** + * This function is executed in the worker thread to handle + * a message. + */ +export function handleWorkerMessage(msg: any) { + const args = msg.args; + if (!Array.isArray(args)) { + console.error("args must be array"); + return; + } + const id = msg.id; + if (typeof id !== "number") { + console.error("RPC id must be number"); + return; + } + const operation = msg.operation; + if (typeof operation !== "string") { + console.error("RPC operation must be string"); + return; + } + + const handleRequest = async () => { + const impl = new CryptoImplementation(); + + if (!(operation in impl)) { + console.error(`crypto operation '${operation}' not found`); + return; + } + + try { + const result = (impl as any)[operation](...args); + const p = worker_threads.parentPort; + worker_threads.parentPort?.postMessage; + if (p) { + p.postMessage({ data: { result, id } }); + } else { + console.error("parent port not available (not running in thread?"); + } + } catch (e) { + console.error("error during operation", e); + return; + } + }; + + handleRequest().catch(e => { + console.error("error in node worker", e); + }); +} + +export function handleWorkerError(e: Error) { + console.log("got error from worker", e); +} + +export class NodeThreadCryptoWorkerFactory implements CryptoWorkerFactory { + startWorker(): CryptoWorker { + if (typeof require === "undefined") { + throw Error("cannot make worker, require(...) not defined"); + } + return new NodeThreadCryptoWorker(); + } + + getConcurrency(): number { + return Math.max(1, os.cpus().length - 1); + } +} + +/** + * Worker implementation that uses node subprocesses. + */ +class NodeThreadCryptoWorker implements CryptoWorker { + /** + * Function to be called when we receive a message from the worker thread. + */ + onmessage: undefined | ((m: any) => void); + + /** + * Function to be called when we receive an error from the worker thread. + */ + onerror: undefined | ((m: any) => void); + + private nodeWorker: worker_threads.Worker; + + constructor() { + this.nodeWorker = new worker_threads.Worker(workerCode, { eval: true }); + this.nodeWorker.on("error", (err: Error) => { + console.error("error in node worker:", err); + if (this.onerror) { + this.onerror(err); + } + }); + this.nodeWorker.on("message", (v: any) => { + if (this.onmessage) { + this.onmessage(v); + } + }); + this.nodeWorker.unref(); + } + + /** + * Add an event listener for either an "error" or "message" event. + */ + addEventListener(event: "message" | "error", fn: (x: any) => void): void { + switch (event) { + case "message": + this.onmessage = fn; + break; + case "error": + this.onerror = fn; + break; + } + } + + /** + * Send a message to the worker thread. + */ + postMessage(msg: any) { + this.nodeWorker.postMessage(msg); + } + + /** + * Forcibly terminate the worker thread. + */ + terminate() { + this.nodeWorker.terminate(); + } +} diff --git a/src/crypto/synchronousWorker.ts b/src/crypto/workers/synchronousWorker.ts index 12eecde9a..12eecde9a 100644 --- a/src/crypto/synchronousWorker.ts +++ b/src/crypto/workers/synchronousWorker.ts diff --git a/src/dbTypes.ts b/src/dbTypes.ts index 66c4fa8b9..16edbf31a 100644 --- a/src/dbTypes.ts +++ b/src/dbTypes.ts @@ -36,7 +36,12 @@ import { } from "./talerTypes"; import { Index, Store } from "./util/query"; -import { Timestamp, OperationError } from "./walletTypes"; +import { + Timestamp, + OperationError, + Duration, + getTimestampNow, +} from "./walletTypes"; /** * Current database version, should be incremented @@ -83,6 +88,55 @@ export enum ReserveRecordStatus { DORMANT = "dormant", } +export interface RetryInfo { + firstTry: Timestamp; + nextRetry: Timestamp; + retryCounter: number; + active: boolean; +} + +export interface RetryPolicy { + readonly backoffDelta: Duration; + readonly backoffBase: number; +} + +const defaultRetryPolicy: RetryPolicy = { + backoffBase: 1.5, + backoffDelta: { d_ms: 200 }, +}; + +export function updateRetryInfoTimeout( + r: RetryInfo, + p: RetryPolicy = defaultRetryPolicy, +): void { + const now = getTimestampNow(); + const t = + now.t_ms + p.backoffDelta.d_ms * Math.pow(p.backoffBase, r.retryCounter); + r.nextRetry = { t_ms: t }; +} + +export function initRetryInfo( + active: boolean = true, + p: RetryPolicy = defaultRetryPolicy, +): RetryInfo { + if (!active) { + return { + active: false, + firstTry: { t_ms: Number.MAX_SAFE_INTEGER }, + nextRetry: { t_ms: Number.MAX_SAFE_INTEGER }, + retryCounter: 0, + }; + } + const info = { + firstTry: getTimestampNow(), + active: true, + nextRetry: { t_ms: 0 }, + retryCounter: 0, + }; + updateRetryInfoTimeout(info, p); + return info; +} + /** * A reserve record as stored in the wallet's database. */ @@ -176,9 +230,20 @@ export interface ReserveRecord { /** * Time of the last successful status query. */ - lastStatusQuery: Timestamp | undefined; + lastSuccessfulStatusQuery: Timestamp | undefined; - lastError?: OperationError; + /** + * Retry info. This field is present even if no retry is scheduled, + * because we need it to be present for the index on the object store + * to work. + */ + retryInfo: RetryInfo; + + /** + * Last error that happened in a reserve operation + * (either talking to the bank or the exchange). + */ + lastError: OperationError | undefined; } /** @@ -683,16 +748,25 @@ export class ProposalRecord { downloadSessionId?: string; /** + * Retry info, even present when the operation isn't active to allow indexing + * on the next retry timestamp. + */ + retryInfo: RetryInfo; + + /** * Verify that a value matches the schema of this class and convert it into a * member. */ static checked: (obj: any) => ProposalRecord; + + lastError: OperationError | undefined; } /** * Status of a tip we got from a merchant. */ export interface TipRecord { + lastError: OperationError | undefined; /** * Has the user accepted the tip? Only after the tip has been accepted coins * withdrawn from the tip may be used. @@ -753,13 +827,21 @@ export interface TipRecord { */ nextUrl?: string; - timestamp: Timestamp; + createdTimestamp: Timestamp; + + /** + * Retry info, even present when the operation isn't active to allow indexing + * on the next retry timestamp. + */ + retryInfo: RetryInfo; } /** * Ongoing refresh */ export interface RefreshSessionRecord { + lastError: OperationError | undefined; + /** * Public key that's being melted in this session. */ @@ -823,14 +905,25 @@ export interface RefreshSessionRecord { exchangeBaseUrl: string; /** - * Is this session finished? + * Timestamp when the refresh session finished. */ - finished: boolean; + finishedTimestamp: Timestamp | undefined; /** * A 32-byte base32-crockford encoded random identifier. */ refreshSessionId: string; + + /** + * When has this refresh session been created? + */ + created: Timestamp; + + /** + * Retry info, even present when the operation isn't active to allow indexing + * on the next retry timestamp. + */ + retryInfo: RetryInfo; } /** @@ -877,12 +970,36 @@ export interface WireFee { sig: string; } +export enum PurchaseStatus { + /** + * We're currently paying, either for the first + * time or as a re-play potentially with a different + * session ID. + */ + SubmitPay = "submit-pay", + QueryRefund = "query-refund", + ProcessRefund = "process-refund", + Abort = "abort", + Done = "done", +} + /** * Record that stores status information about one purchase, starting from when * the customer accepts a proposal. Includes refund status if applicable. */ export interface PurchaseRecord { /** + * Proposal ID for this purchase. Uniquely identifies the + * purchase and the proposal. + */ + proposalId: string; + + /** + * Status of this purchase. + */ + status: PurchaseStatus; + + /** * Hash of the contract terms. */ contractTermsHash: string; @@ -923,13 +1040,13 @@ export interface PurchaseRecord { * When was the purchase made? * Refers to the time that the user accepted. */ - timestamp: Timestamp; + acceptTimestamp: Timestamp; /** * When was the last refund made? * Set to 0 if no refund was made on the purchase. */ - timestamp_refund: Timestamp | undefined; + lastRefundTimestamp: Timestamp | undefined; /** * Last session signature that we submitted to /pay (if any). @@ -946,11 +1063,9 @@ export interface PurchaseRecord { */ abortDone: boolean; - /** - * Proposal ID for this purchase. Uniquely identifies the - * purchase and the proposal. - */ - proposalId: string; + retryInfo: RetryInfo; + + lastError: OperationError | undefined; } /** @@ -1025,7 +1140,7 @@ export interface WithdrawalSourceReserve { reservePub: string; } -export type WithdrawalSource = WithdrawalSourceTip | WithdrawalSourceReserve +export type WithdrawalSource = WithdrawalSourceTip | WithdrawalSourceReserve; export interface WithdrawalSessionRecord { withdrawSessionId: string; @@ -1048,7 +1163,8 @@ export interface WithdrawalSessionRecord { totalCoinValue: AmountJson; /** - * Amount including fees. + * Amount including fees (i.e. the amount subtracted from the + * reserve to withdraw all coins in this withdrawal session). */ rawWithdrawalAmount: AmountJson; @@ -1060,6 +1176,19 @@ export interface WithdrawalSessionRecord { * Coins in this session that are withdrawn are set to true. */ withdrawn: boolean[]; + + /** + * Retry info, always present even on completed operations so that indexing works. + */ + retryInfo: RetryInfo; + + /** + * Last error per coin/planchet, or undefined if no error occured for + * the coin/planchet. + */ + lastCoinErrors: (OperationError | undefined)[]; + + lastError: OperationError | undefined; } export interface BankWithdrawUriRecord { @@ -1125,11 +1254,10 @@ export namespace Stores { "fulfillmentUrlIndex", "contractTerms.fulfillment_url", ); - orderIdIndex = new Index<string, PurchaseRecord>( - this, - "orderIdIndex", + orderIdIndex = new Index<string, PurchaseRecord>(this, "orderIdIndex", [ + "contractTerms.merchant_base_url", "contractTerms.order_id", - ); + ]); } class DenominationsStore extends Store<DenominationRecord> { diff --git a/src/headless/helpers.ts b/src/headless/helpers.ts index e5338369e..cfc7e3695 100644 --- a/src/headless/helpers.ts +++ b/src/headless/helpers.ts @@ -21,35 +21,22 @@ /** * Imports. */ -import { Wallet, OperationFailedAndReportedError } from "../wallet"; -import { Notifier, Badge } from "../walletTypes"; +import { Wallet } from "../wallet"; import { MemoryBackend, BridgeIDBFactory, shimIndexedDB } from "idb-bridge"; -import { SynchronousCryptoWorkerFactory } from "../crypto/synchronousWorker"; import { openTalerDb } from "../db"; import Axios from "axios"; -import querystring = require("querystring"); import { HttpRequestLibrary } from "../util/http"; import * as amounts from "../util/amounts"; import { Bank } from "./bank"; import fs = require("fs"); -import { NodeCryptoWorkerFactory } from "../crypto/nodeProcessWorker"; import { Logger } from "../util/logging"; +import { NodeThreadCryptoWorkerFactory } from "../crypto/workers/nodeThreadWorker"; +import { NotificationType } from "../walletTypes"; const logger = new Logger("helpers.ts"); -class ConsoleBadge implements Badge { - startBusy(): void { - } - stopBusy(): void { - } - showNotification(): void { - } - clearNotification(): void { - } -} - export class NodeHttpLib implements HttpRequestLibrary { async get(url: string): Promise<import("../util/http").HttpResponse> { try { @@ -97,7 +84,6 @@ export interface DefaultNodeWalletArgs { */ persistentStoragePath?: string; - /** * Handler for asynchronous notifications from the wallet. */ @@ -116,15 +102,7 @@ export interface DefaultNodeWalletArgs { export async function getDefaultNodeWallet( args: DefaultNodeWalletArgs = {}, ): Promise<Wallet> { - const myNotifier: Notifier = { - notify() { - if (args.notifyHandler) { - args.notifyHandler(""); - } - } - } - const myBadge = new ConsoleBadge(); BridgeIDBFactory.enableTracing = false; const myBackend = new MemoryBackend(); @@ -180,14 +158,14 @@ export async function getDefaultNodeWallet( myUnsupportedUpgrade, ); - const worker = new SynchronousCryptoWorkerFactory(); + //const worker = new SynchronousCryptoWorkerFactory(); //const worker = new NodeCryptoWorkerFactory(); + const worker = new NodeThreadCryptoWorkerFactory(); + return new Wallet( myDb, myHttpLib, - myBadge, - myNotifier, worker, ); } @@ -217,6 +195,14 @@ export async function withdrawTestBalance( ["x-taler-bank"], ); + const donePromise = new Promise((resolve, reject) => { + myWallet.addNotificationListener((n) => { + if (n.type === NotificationType.ReserveDepleted && n.reservePub === reservePub ) { + resolve(); + } + }); + }); + await bank.createReserve( bankUser, amount, @@ -225,5 +211,5 @@ export async function withdrawTestBalance( ); await myWallet.confirmReserve({ reservePub: reserveResponse.reservePub }); - await myWallet.runUntilReserveDepleted(reservePub); + await donePromise; } diff --git a/src/headless/integrationtest.ts b/src/headless/integrationtest.ts index 91adfaa6d..632ce8f60 100644 --- a/src/headless/integrationtest.ts +++ b/src/headless/integrationtest.ts @@ -82,9 +82,5 @@ export async function runIntegrationTest(args: { throw Error("payment did not succeed"); } - await myWallet.runPending(); - //const refreshRes = await myWallet.refreshDirtyCoins(); - //console.log(`waited to refresh ${refreshRes.numRefreshed} coins`); - - myWallet.stop(); + await myWallet.runUntilDone(); } diff --git a/src/headless/taler-wallet-cli.ts b/src/headless/taler-wallet-cli.ts index 9598b9d98..931cac087 100644 --- a/src/headless/taler-wallet-cli.ts +++ b/src/headless/taler-wallet-cli.ts @@ -19,14 +19,14 @@ import fs = require("fs"); import { getDefaultNodeWallet, withdrawTestBalance } from "./helpers"; import { MerchantBackendConnection } from "./merchant"; import { runIntegrationTest } from "./integrationtest"; -import { Wallet, OperationFailedAndReportedError } from "../wallet"; +import { Wallet } from "../wallet"; import qrcodeGenerator = require("qrcode-generator"); import * as clk from "./clk"; import { BridgeIDBFactory, MemoryBackend } from "idb-bridge"; import { Logger } from "../util/logging"; import * as Amounts from "../util/amounts"; import { decodeCrock } from "../crypto/talerCrypto"; -import { Bank } from "./bank"; +import { OperationFailedAndReportedError } from "../wallet-impl/errors"; const logger = new Logger("taler-wallet-cli.ts"); diff --git a/src/util/asyncMemo.ts b/src/util/asyncMemo.ts index 34868ab4f..193ce6df6 100644 --- a/src/util/asyncMemo.ts +++ b/src/util/asyncMemo.ts @@ -14,39 +14,76 @@ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -export interface MemoEntry<T> { +interface MemoEntry<T> { p: Promise<T>; t: number; n: number; } -export class AsyncOpMemo<T> { +export class AsyncOpMemoMap<T> { private n = 0; - private memo: { [k: string]: MemoEntry<T> } = {}; - put(key: string, p: Promise<T>): Promise<T> { + private memoMap: { [k: string]: MemoEntry<T> } = {}; + + private cleanUp(key: string, n: number) { + const r = this.memoMap[key]; + if (r && r.n === n) { + delete this.memoMap[key]; + } + } + + memo(key: string, pg: () => Promise<T>): Promise<T> { + const res = this.memoMap[key]; + if (res) { + return res.p; + } const n = this.n++; - this.memo[key] = { + // Wrap the operation in case it immediately throws + const p = Promise.resolve().then(() => pg()); + p.finally(() => { + this.cleanUp(key, n); + }); + this.memoMap[key] = { p, n, t: new Date().getTime(), }; - p.finally(() => { - const r = this.memo[key]; - if (r && r.n === n) { - delete this.memo[key]; - } - }); return p; } - find(key: string): Promise<T> | undefined { - const res = this.memo[key]; - const tNow = new Date().getTime(); - if (res && res.t < tNow - 10 * 1000) { - delete this.memo[key]; - return; - } else if (res) { + clear() { + this.memoMap = {}; + } +} + + +export class AsyncOpMemoSingle<T> { + private n = 0; + private memoEntry: MemoEntry<T> | undefined; + + private cleanUp(n: number) { + if (this.memoEntry && this.memoEntry.n === n) { + this.memoEntry = undefined; + } + } + + memo(pg: () => Promise<T>): Promise<T> { + const res = this.memoEntry; + if (res) { return res.p; } - return; + const n = this.n++; + // Wrap the operation in case it immediately throws + const p = Promise.resolve().then(() => pg()); + p.finally(() => { + this.cleanUp(n); + }); + this.memoEntry = { + p, + n, + t: new Date().getTime(), + }; + return p; + } + clear() { + this.memoEntry = undefined; } -}
\ No newline at end of file +} diff --git a/src/util/query.ts b/src/util/query.ts index b1b19665b..e05656bb7 100644 --- a/src/util/query.ts +++ b/src/util/query.ts @@ -316,7 +316,7 @@ export function oneShotIterIndex<S extends IDBValidKey, T>( return new ResultStream<T>(req); } -class TransactionHandle { +export class TransactionHandle { constructor(private tx: IDBTransaction) {} put<T>(store: Store<T>, value: T, key?: any): Promise<any> { @@ -406,6 +406,7 @@ function runWithTransaction<T>( }; tx.onerror = () => { console.error("error in transaction"); + console.error(stack); }; tx.onabort = () => { if (tx.error) { diff --git a/src/util/taleruri-test.ts b/src/util/taleruri-test.ts index 02eecf209..c687a6717 100644 --- a/src/util/taleruri-test.ts +++ b/src/util/taleruri-test.ts @@ -169,10 +169,8 @@ test("taler refund uri parsing", t => { t.fail(); return; } - t.is( - r1.refundUrl, - "https://merchant.example.com/public/refund?order_id=1234", - ); + t.is(r1.merchantBaseUrl, "https://merchant.example.com/public/"); + t.is(r1.orderId, "1234"); }); test("taler refund uri parsing with instance", t => { @@ -182,10 +180,8 @@ test("taler refund uri parsing with instance", t => { t.fail(); return; } - t.is( - r1.refundUrl, - "https://merchant.example.com/public/instances/myinst/refund?order_id=1234", - ); + t.is(r1.orderId, "1234"); + t.is(r1.merchantBaseUrl, "https://merchant.example.com/public/instances/myinst/"); }); test("taler tip pickup uri", t => { @@ -197,7 +193,7 @@ test("taler tip pickup uri", t => { } t.is( r1.merchantBaseUrl, - "https://merchant.example.com/public/tip-pickup?tip_id=tipid", + "https://merchant.example.com/public/", ); }); diff --git a/src/util/taleruri.ts b/src/util/taleruri.ts index aa6705c07..50886a916 100644 --- a/src/util/taleruri.ts +++ b/src/util/taleruri.ts @@ -24,7 +24,8 @@ export interface WithdrawUriResult { } export interface RefundUriResult { - refundUrl: string; + merchantBaseUrl: string; + orderId: string; } export interface TipUriResult { @@ -184,17 +185,13 @@ export function parseRefundUri(s: string): RefundUriResult | undefined { maybeInstancePath = `instances/${maybeInstance}/`; } - const refundUrl = - "https://" + - host + - "/" + - maybePath + - maybeInstancePath + - "refund" + - "?order_id=" + - orderId; + const merchantBaseUrl = "https://" + host + + "/" + + maybePath + + maybeInstancePath return { - refundUrl, + merchantBaseUrl, + orderId, }; } diff --git a/src/wallet-impl/balance.ts b/src/wallet-impl/balance.ts index 94d65fa96..a1351014c 100644 --- a/src/wallet-impl/balance.ts +++ b/src/wallet-impl/balance.ts @@ -33,6 +33,7 @@ const logger = new Logger("withdraw.ts"); export async function getBalances( ws: InternalWalletState, ): Promise<WalletBalance> { + logger.trace("starting to compute balance"); /** * Add amount to a balance field, both for * the slicing by exchange and currency. @@ -101,7 +102,7 @@ export async function getBalances( await tx.iter(Stores.refresh).forEach(r => { // Don't count finished refreshes, since the refresh already resulted // in coins being added to the wallet. - if (r.finished) { + if (r.finishedTimestamp) { return; } addTo( diff --git a/src/wallet-impl/errors.ts b/src/wallet-impl/errors.ts new file mode 100644 index 000000000..5df99b7d3 --- /dev/null +++ b/src/wallet-impl/errors.ts @@ -0,0 +1,81 @@ +import { OperationError } from "../walletTypes"; + +/* + This file is part of GNU Taler + (C) 2019 GNUnet e.V. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +/** + * This exception is there to let the caller know that an error happened, + * but the error has already been reported by writing it to the database. + */ +export class OperationFailedAndReportedError extends Error { + constructor(message: string) { + super(message); + + // Set the prototype explicitly. + Object.setPrototypeOf(this, OperationFailedAndReportedError.prototype); + } +} + +/** + * This exception is thrown when an error occured and the caller is + * responsible for recording the failure in the database. + */ +export class OperationFailedError extends Error { + constructor(message: string, public err: OperationError) { + super(message); + + // Set the prototype explicitly. + Object.setPrototypeOf(this, OperationFailedError.prototype); + } +} + +/** + * Run an operation and call the onOpError callback + * when there was an exception or operation error that must be reported. + * The cause will be re-thrown to the caller. + */ +export async function guardOperationException<T>( + op: () => Promise<T>, + onOpError: (e: OperationError) => Promise<void>, +): Promise<T> { + try { + return op(); + } catch (e) { + if (e instanceof OperationFailedAndReportedError) { + throw e; + } + if (e instanceof OperationFailedError) { + await onOpError(e.err); + throw new OperationFailedAndReportedError(e.message); + } + if (e instanceof Error) { + await onOpError({ + type: "exception", + message: e.message, + details: {}, + }); + throw new OperationFailedAndReportedError(e.message); + } + await onOpError({ + type: "exception", + message: "non-error exception thrown", + details: { + value: e.toString(), + }, + }); + throw new OperationFailedAndReportedError(e.message); + } +}
\ No newline at end of file diff --git a/src/wallet-impl/exchanges.ts b/src/wallet-impl/exchanges.ts index b3677c6c6..b89f3f84e 100644 --- a/src/wallet-impl/exchanges.ts +++ b/src/wallet-impl/exchanges.ts @@ -17,7 +17,6 @@ import { InternalWalletState } from "./state"; import { WALLET_CACHE_BREAKER_CLIENT_VERSION, - OperationFailedAndReportedError, } from "../wallet"; import { KeysJson, Denomination, ExchangeWireJson } from "../talerTypes"; import { getTimestampNow, OperationError } from "../walletTypes"; @@ -42,6 +41,7 @@ import { } from "../util/query"; import * as Amounts from "../util/amounts"; import { parsePaytoUri } from "../util/payto"; +import { OperationFailedAndReportedError } from "./errors"; async function denominationRecordFromKeys( ws: InternalWalletState, diff --git a/src/wallet-impl/history.ts b/src/wallet-impl/history.ts index dfc683e6d..5e93ab878 100644 --- a/src/wallet-impl/history.ts +++ b/src/wallet-impl/history.ts @@ -78,11 +78,11 @@ export async function getHistory( fulfillmentUrl: p.contractTerms.fulfillment_url, merchantName: p.contractTerms.merchant.name, }, - timestamp: p.timestamp, + timestamp: p.acceptTimestamp, type: "pay", explicit: false, }); - if (p.timestamp_refund) { + if (p.lastRefundTimestamp) { const contractAmount = Amounts.parseOrThrow(p.contractTerms.amount); const amountsPending = Object.keys(p.refundsPending).map(x => Amounts.parseOrThrow(p.refundsPending[x].refund_amount), @@ -103,7 +103,7 @@ export async function getHistory( merchantName: p.contractTerms.merchant.name, refundAmount: amount, }, - timestamp: p.timestamp_refund, + timestamp: p.lastRefundTimestamp, type: "refund", explicit: false, }); @@ -151,7 +151,7 @@ export async function getHistory( merchantBaseUrl: tip.merchantBaseUrl, tipId: tip.merchantTipId, }, - timestamp: tip.timestamp, + timestamp: tip.createdTimestamp, explicit: false, type: "tip", }); diff --git a/src/wallet-impl/pay.ts b/src/wallet-impl/pay.ts index 9942139a6..9b2da9c7d 100644 --- a/src/wallet-impl/pay.ts +++ b/src/wallet-impl/pay.ts @@ -33,6 +33,8 @@ import { getTimestampNow, PreparePayResult, ConfirmPayResult, + OperationError, + NotificationType, } from "../walletTypes"; import { oneShotIter, @@ -51,12 +53,14 @@ import { PurchaseRecord, CoinRecord, ProposalStatus, + initRetryInfo, + updateRetryInfoTimeout, + PurchaseStatus, } from "../dbTypes"; import * as Amounts from "../util/amounts"; import { amountToPretty, strcmp, - extractTalerStamp, canonicalJson, extractTalerStampOrThrow, } from "../util/helpers"; @@ -65,6 +69,8 @@ import { InternalWalletState } from "./state"; import { parsePayUri, parseRefundUri } from "../util/taleruri"; import { getTotalRefreshCost, refresh } from "./refresh"; import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto"; +import { guardOperationException } from "./errors"; +import { assertUnreachable } from "../util/assertUnreachable"; export interface SpeculativePayData { payCoinInfo: PayCoinInfo; @@ -344,9 +350,12 @@ async function recordConfirmPay( payReq, refundsDone: {}, refundsPending: {}, - timestamp: getTimestampNow(), - timestamp_refund: undefined, + acceptTimestamp: getTimestampNow(), + lastRefundTimestamp: undefined, proposalId: proposal.proposalId, + retryInfo: initRetryInfo(), + lastError: undefined, + status: PurchaseStatus.SubmitPay, }; await runWithWriteTransaction( @@ -365,8 +374,10 @@ async function recordConfirmPay( }, ); - ws.badge.showNotification(); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.ProposalAccepted, + proposalId: proposal.proposalId, + }); return t; } @@ -419,7 +430,7 @@ export async function abortFailedPayment( } const refundResponse = MerchantRefundResponse.checked(resp.responseJson); - await acceptRefundResponse(ws, refundResponse); + await acceptRefundResponse(ws, purchase.proposalId, refundResponse); await runWithWriteTransaction(ws.db, [Stores.purchases], async tx => { const p = await tx.get(Stores.purchases, proposalId); @@ -431,10 +442,62 @@ export async function abortFailedPayment( }); } +async function incrementProposalRetry( + ws: InternalWalletState, + proposalId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.proposals], async tx => { + const pr = await tx.get(Stores.proposals, proposalId); + if (!pr) { + return; + } + if (!pr.retryInfo) { + return; + } + pr.retryInfo.retryCounter++; + updateRetryInfoTimeout(pr.retryInfo); + pr.lastError = err; + await tx.put(Stores.proposals, pr); + }); +} + +async function incrementPurchaseRetry( + ws: InternalWalletState, + proposalId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.purchases], async tx => { + const pr = await tx.get(Stores.purchases, proposalId); + if (!pr) { + return; + } + if (!pr.retryInfo) { + return; + } + pr.retryInfo.retryCounter++; + updateRetryInfoTimeout(pr.retryInfo); + pr.lastError = err; + await tx.put(Stores.purchases, pr); + }); +} + export async function processDownloadProposal( ws: InternalWalletState, proposalId: string, ): Promise<void> { + const onOpErr = (err: OperationError) => + incrementProposalRetry(ws, proposalId, err); + await guardOperationException( + () => processDownloadProposalImpl(ws, proposalId), + onOpErr, + ); +} + +async function processDownloadProposalImpl( + ws: InternalWalletState, + proposalId: string, +): Promise<void> { const proposal = await oneShotGet(ws.db, Stores.proposals, proposalId); if (!proposal) { return; @@ -498,7 +561,10 @@ export async function processDownloadProposal( }, ); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.ProposalDownloaded, + proposalId: proposal.proposalId, + }); } /** @@ -536,6 +602,8 @@ async function startDownloadProposal( proposalId: proposalId, proposalStatus: ProposalStatus.DOWNLOADING, repurchaseProposalId: undefined, + retryInfo: initRetryInfo(), + lastError: undefined, }; await oneShotPut(ws.db, Stores.proposals, proposalRecord); @@ -582,6 +650,7 @@ export async function submitPay( throw Error("merchant payment signature invalid"); } purchase.finished = true; + purchase.retryInfo = initRetryInfo(false); const modifiedCoins: CoinRecord[] = []; for (const pc of purchase.payReq.coins) { const c = await oneShotGet(ws.db, Stores.coins, pc.coin_pub); @@ -859,8 +928,6 @@ export async function confirmPay( return submitPay(ws, proposalId, sessionId); } - - export async function getFullRefundFees( ws: InternalWalletState, refundPermissions: MerchantRefundPermission[], @@ -914,15 +981,13 @@ export async function getFullRefundFees( return feeAcc; } -async function submitRefunds( +async function submitRefundsToExchange( ws: InternalWalletState, proposalId: string, ): Promise<void> { const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); if (!purchase) { - console.error( - "not submitting refunds, payment not found:", - ); + console.error("not submitting refunds, payment not found:"); return; } const pendingKeys = Object.keys(purchase.refundsPending); @@ -991,14 +1056,18 @@ async function submitRefunds( refresh(ws, perm.coin_pub); } - ws.badge.showNotification(); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.RefundsSubmitted, + proposalId, + }); } -export async function acceptRefundResponse( + +async function acceptRefundResponse( ws: InternalWalletState, + proposalId: string, refundResponse: MerchantRefundResponse, -): Promise<string> { +): Promise<void> { const refundPermissions = refundResponse.refund_permissions; if (!refundPermissions.length) { @@ -1015,7 +1084,8 @@ export async function acceptRefundResponse( return; } - t.timestamp_refund = getTimestampNow(); + t.lastRefundTimestamp = getTimestampNow(); + t.status = PurchaseStatus.ProcessRefund; for (const perm of refundPermissions) { if ( @@ -1027,18 +1097,48 @@ export async function acceptRefundResponse( } return t; } + // Add the refund permissions to the purchase within a DB transaction + await oneShotMutate(ws.db, Stores.purchases, proposalId, f); + await submitRefundsToExchange(ws, proposalId); +} - const hc = refundResponse.h_contract_terms; - // Add the refund permissions to the purchase within a DB transaction - await oneShotMutate(ws.db, Stores.purchases, hc, f); - ws.notifier.notify(); +async function queryRefund(ws: InternalWalletState, proposalId: string): Promise<void> { + const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); + if (purchase?.status !== PurchaseStatus.QueryRefund) { + return; + } - await submitRefunds(ws, hc); + const refundUrl = new URL("refund", purchase.contractTerms.merchant_base_url).href + let resp; + try { + resp = await ws.http.get(refundUrl); + } catch (e) { + console.error("error downloading refund permission", e); + throw e; + } - return hc; + const refundResponse = MerchantRefundResponse.checked(resp.responseJson); + await acceptRefundResponse(ws, proposalId, refundResponse); } +async function startRefundQuery(ws: InternalWalletState, proposalId: string): Promise<void> { + const success = await runWithWriteTransaction(ws.db, [Stores.purchases], async (tx) => { + const p = await tx.get(Stores.purchases, proposalId); + if (p?.status !== PurchaseStatus.Done) { + return false; + } + p.status = PurchaseStatus.QueryRefund; + return true; + }); + + if (!success) { + return; + } + await queryRefund(ws, proposalId); +} + + /** * Accept a refund, return the contract hash for the contract * that was involved in the refund. @@ -1053,17 +1153,56 @@ export async function applyRefund( throw Error("invalid refund URI"); } - const refundUrl = parseResult.refundUrl; + const purchase = await oneShotGetIndexed( + ws.db, + Stores.purchases.orderIdIndex, + [parseResult.merchantBaseUrl, parseResult.orderId], + ); - logger.trace("processing refund"); - let resp; - try { - resp = await ws.http.get(refundUrl); - } catch (e) { - console.error("error downloading refund permission", e); - throw e; + if (!purchase) { + throw Error("no purchase for the taler://refund/ URI was found"); } - const refundResponse = MerchantRefundResponse.checked(resp.responseJson); - return acceptRefundResponse(ws, refundResponse); + await startRefundQuery(ws, purchase.proposalId); + + return purchase.contractTermsHash; +} + +export async function processPurchase( + ws: InternalWalletState, + proposalId: string, +): Promise<void> { + const onOpErr = (e: OperationError) => + incrementPurchaseRetry(ws, proposalId, e); + await guardOperationException( + () => processPurchaseImpl(ws, proposalId), + onOpErr, + ); +} + +export async function processPurchaseImpl( + ws: InternalWalletState, + proposalId: string, +): Promise<void> { + const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); + if (!purchase) { + return; + } + switch (purchase.status) { + case PurchaseStatus.Done: + return; + case PurchaseStatus.Abort: + // FIXME + break; + case PurchaseStatus.SubmitPay: + break; + case PurchaseStatus.QueryRefund: + await queryRefund(ws, proposalId); + break; + case PurchaseStatus.ProcessRefund: + await submitRefundsToExchange(ws, proposalId); + break; + default: + throw assertUnreachable(purchase.status); + } } diff --git a/src/wallet-impl/payback.ts b/src/wallet-impl/payback.ts index 5bf5ff06e..56696d771 100644 --- a/src/wallet-impl/payback.ts +++ b/src/wallet-impl/payback.ts @@ -29,6 +29,7 @@ import { Stores, TipRecord, CoinStatus } from "../dbTypes"; import { Logger } from "../util/logging"; import { PaybackConfirmation } from "../talerTypes"; import { updateExchangeFromUrl } from "./exchanges"; +import { NotificationType } from "../walletTypes"; const logger = new Logger("payback.ts"); @@ -65,7 +66,9 @@ export async function payback( await tx.put(Stores.reserves, reserve); }, ); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.PaybackStarted, + }); const paybackRequest = await ws.cryptoApi.createPaybackRequest(coin); const reqUrl = new URL("payback", coin.exchangeBaseUrl); @@ -83,6 +86,8 @@ export async function payback( } coin.status = CoinStatus.Dormant; await oneShotPut(ws.db, Stores.coins, coin); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.PaybackFinished, + }); await updateExchangeFromUrl(ws, coin.exchangeBaseUrl, true); } diff --git a/src/wallet-impl/pending.ts b/src/wallet-impl/pending.ts index 72102e3a1..bd10538af 100644 --- a/src/wallet-impl/pending.ts +++ b/src/wallet-impl/pending.ts @@ -21,8 +21,10 @@ import { PendingOperationInfo, PendingOperationsResponse, getTimestampNow, + Timestamp, + Duration, } from "../walletTypes"; -import { runWithReadTransaction } from "../util/query"; +import { runWithReadTransaction, TransactionHandle } from "../util/query"; import { InternalWalletState } from "./state"; import { Stores, @@ -32,11 +34,355 @@ import { ProposalStatus, } from "../dbTypes"; +function updateRetryDelay( + oldDelay: Duration, + now: Timestamp, + retryTimestamp: Timestamp, +): Duration { + if (retryTimestamp.t_ms <= now.t_ms) { + return { d_ms: 0 }; + } + return { d_ms: Math.min(oldDelay.d_ms, retryTimestamp.t_ms - now.t_ms) }; +} + +async function gatherExchangePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + if (onlyDue) { + // FIXME: exchanges should also be updated regularly + return; + } + await tx.iter(Stores.exchanges).forEach(e => { + switch (e.updateStatus) { + case ExchangeUpdateStatus.FINISHED: + if (e.lastError) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record is in FINISHED state but has lastError set", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + if (!e.details) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record does not have details, but no update in progress.", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + if (!e.wireInfo) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record does not have wire info, but no update in progress.", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + break; + case ExchangeUpdateStatus.FETCH_KEYS: + resp.pendingOperations.push({ + type: "exchange-update", + givesLifeness: false, + stage: "fetch-keys", + exchangeBaseUrl: e.baseUrl, + lastError: e.lastError, + reason: e.updateReason || "unknown", + }); + break; + case ExchangeUpdateStatus.FETCH_WIRE: + resp.pendingOperations.push({ + type: "exchange-update", + givesLifeness: false, + stage: "fetch-wire", + exchangeBaseUrl: e.baseUrl, + lastError: e.lastError, + reason: e.updateReason || "unknown", + }); + break; + default: + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: "Unknown exchangeUpdateStatus", + details: { + exchangeBaseUrl: e.baseUrl, + exchangeUpdateStatus: e.updateStatus, + }, + }); + break; + } + }); +} + +async function gatherReservePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + // FIXME: this should be optimized by using an index for "onlyDue==true". + await tx.iter(Stores.reserves).forEach(reserve => { + const reserveType = reserve.bankWithdrawStatusUrl ? "taler-bank" : "manual"; + if (!reserve.retryInfo.active) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + reserve.retryInfo.nextRetry, + ); + if (onlyDue && reserve.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + switch (reserve.reserveStatus) { + case ReserveRecordStatus.DORMANT: + // nothing to report as pending + break; + case ReserveRecordStatus.WITHDRAWING: + case ReserveRecordStatus.UNCONFIRMED: + case ReserveRecordStatus.QUERYING_STATUS: + case ReserveRecordStatus.REGISTERING_BANK: + resp.pendingOperations.push({ + type: "reserve", + givesLifeness: true, + stage: reserve.reserveStatus, + timestampCreated: reserve.created, + reserveType, + reservePub: reserve.reservePub, + retryInfo: reserve.retryInfo, + }); + break; + case ReserveRecordStatus.WAIT_CONFIRM_BANK: + resp.pendingOperations.push({ + type: "reserve", + givesLifeness: true, + stage: reserve.reserveStatus, + timestampCreated: reserve.created, + reserveType, + reservePub: reserve.reservePub, + bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl, + retryInfo: reserve.retryInfo, + }); + break; + default: + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: "Unknown reserve record status", + details: { + reservePub: reserve.reservePub, + reserveStatus: reserve.reserveStatus, + }, + }); + break; + } + }); +} + +async function gatherRefreshPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.refresh).forEach(r => { + if (r.finishedTimestamp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + r.retryInfo.nextRetry, + ); + if (onlyDue && r.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + let refreshStatus: string; + if (r.norevealIndex === undefined) { + refreshStatus = "melt"; + } else { + refreshStatus = "reveal"; + } + + resp.pendingOperations.push({ + type: "refresh", + givesLifeness: true, + oldCoinPub: r.meltCoinPub, + refreshStatus, + refreshOutputSize: r.newDenoms.length, + refreshSessionId: r.refreshSessionId, + }); + }); +} + +async function gatherCoinsPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + // Refreshing dirty coins is always due. + await tx.iter(Stores.coins).forEach(coin => { + if (coin.status == CoinStatus.Dirty) { + resp.nextRetryDelay.d_ms = 0; + resp.pendingOperations.push({ + givesLifeness: true, + type: "dirty-coin", + coinPub: coin.coinPub, + }); + } + }); +} + +async function gatherWithdrawalPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.withdrawalSession).forEach(wsr => { + if (wsr.finishTimestamp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + wsr.retryInfo.nextRetry, + ); + if (onlyDue && wsr.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + const numCoinsWithdrawn = wsr.withdrawn.reduce((a, x) => a + (x ? 1 : 0), 0); + const numCoinsTotal = wsr.withdrawn.length; + resp.pendingOperations.push({ + type: "withdraw", + givesLifeness: true, + numCoinsTotal, + numCoinsWithdrawn, + source: wsr.source, + withdrawSessionId: wsr.withdrawSessionId, + }); + }); +} + +async function gatherProposalPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.proposals).forEach(proposal => { + if (proposal.proposalStatus == ProposalStatus.PROPOSED) { + if (onlyDue) { + return; + } + resp.pendingOperations.push({ + type: "proposal-choice", + givesLifeness: false, + merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, + proposalId: proposal.proposalId, + proposalTimestamp: proposal.timestamp, + }); + } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) { + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + proposal.retryInfo.nextRetry, + ); + if (onlyDue && proposal.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + resp.pendingOperations.push({ + type: "proposal-download", + givesLifeness: true, + merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, + proposalId: proposal.proposalId, + proposalTimestamp: proposal.timestamp, + }); + } + }); +} + +async function gatherTipPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.tips).forEach(tip => { + if (tip.pickedUp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + tip.retryInfo.nextRetry, + ); + if (onlyDue && tip.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + if (tip.accepted) { + resp.pendingOperations.push({ + type: "tip", + givesLifeness: true, + merchantBaseUrl: tip.merchantBaseUrl, + tipId: tip.tipId, + merchantTipId: tip.merchantTipId, + }); + } + }); +} + +async function gatherPurchasePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise<void> { + await tx.iter(Stores.purchases).forEach((pr) => { + if (pr.finished) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + pr.retryInfo.nextRetry, + ); + if (onlyDue && pr.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + resp.pendingOperations.push({ + type: "pay", + givesLifeness: true, + isReplay: false, + proposalId: pr.proposalId, + }); + }); + +} + export async function getPendingOperations( ws: InternalWalletState, + onlyDue: boolean = false, ): Promise<PendingOperationsResponse> { - const pendingOperations: PendingOperationInfo[] = []; - let minRetryDurationMs = 5000; + const resp: PendingOperationsResponse = { + nextRetryDelay: { d_ms: Number.MAX_SAFE_INTEGER }, + pendingOperations: [], + }; + const now = getTimestampNow(); await runWithReadTransaction( ws.db, [ @@ -47,207 +393,18 @@ export async function getPendingOperations( Stores.withdrawalSession, Stores.proposals, Stores.tips, + Stores.purchases, ], async tx => { - await tx.iter(Stores.exchanges).forEach(e => { - switch (e.updateStatus) { - case ExchangeUpdateStatus.FINISHED: - if (e.lastError) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record is in FINISHED state but has lastError set", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - if (!e.details) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record does not have details, but no update in progress.", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - if (!e.wireInfo) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record does not have wire info, but no update in progress.", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - break; - case ExchangeUpdateStatus.FETCH_KEYS: - pendingOperations.push({ - type: "exchange-update", - stage: "fetch-keys", - exchangeBaseUrl: e.baseUrl, - lastError: e.lastError, - reason: e.updateReason || "unknown", - }); - break; - case ExchangeUpdateStatus.FETCH_WIRE: - pendingOperations.push({ - type: "exchange-update", - stage: "fetch-wire", - exchangeBaseUrl: e.baseUrl, - lastError: e.lastError, - reason: e.updateReason || "unknown", - }); - break; - default: - pendingOperations.push({ - type: "bug", - message: "Unknown exchangeUpdateStatus", - details: { - exchangeBaseUrl: e.baseUrl, - exchangeUpdateStatus: e.updateStatus, - }, - }); - break; - } - }); - await tx.iter(Stores.reserves).forEach(reserve => { - const reserveType = reserve.bankWithdrawStatusUrl - ? "taler-bank" - : "manual"; - const now = getTimestampNow(); - switch (reserve.reserveStatus) { - case ReserveRecordStatus.DORMANT: - // nothing to report as pending - break; - case ReserveRecordStatus.WITHDRAWING: - case ReserveRecordStatus.UNCONFIRMED: - case ReserveRecordStatus.QUERYING_STATUS: - case ReserveRecordStatus.REGISTERING_BANK: - pendingOperations.push({ - type: "reserve", - stage: reserve.reserveStatus, - timestampCreated: reserve.created, - reserveType, - reservePub: reserve.reservePub, - }); - if (reserve.created.t_ms < now.t_ms - 5000) { - minRetryDurationMs = 500; - } else if (reserve.created.t_ms < now.t_ms - 30000) { - minRetryDurationMs = 2000; - } - break; - case ReserveRecordStatus.WAIT_CONFIRM_BANK: - pendingOperations.push({ - type: "reserve", - stage: reserve.reserveStatus, - timestampCreated: reserve.created, - reserveType, - reservePub: reserve.reservePub, - bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl, - }); - if (reserve.created.t_ms < now.t_ms - 5000) { - minRetryDurationMs = 500; - } else if (reserve.created.t_ms < now.t_ms - 30000) { - minRetryDurationMs = 2000; - } - break; - default: - pendingOperations.push({ - type: "bug", - message: "Unknown reserve record status", - details: { - reservePub: reserve.reservePub, - reserveStatus: reserve.reserveStatus, - }, - }); - break; - } - }); - - await tx.iter(Stores.refresh).forEach(r => { - if (r.finished) { - return; - } - let refreshStatus: string; - if (r.norevealIndex === undefined) { - refreshStatus = "melt"; - } else { - refreshStatus = "reveal"; - } - - pendingOperations.push({ - type: "refresh", - oldCoinPub: r.meltCoinPub, - refreshStatus, - refreshOutputSize: r.newDenoms.length, - refreshSessionId: r.refreshSessionId, - }); - }); - - await tx.iter(Stores.coins).forEach(coin => { - if (coin.status == CoinStatus.Dirty) { - pendingOperations.push({ - type: "dirty-coin", - coinPub: coin.coinPub, - }); - } - }); - - await tx.iter(Stores.withdrawalSession).forEach(ws => { - const numCoinsWithdrawn = ws.withdrawn.reduce( - (a, x) => a + (x ? 1 : 0), - 0, - ); - const numCoinsTotal = ws.withdrawn.length; - if (numCoinsWithdrawn < numCoinsTotal) { - pendingOperations.push({ - type: "withdraw", - numCoinsTotal, - numCoinsWithdrawn, - source: ws.source, - withdrawSessionId: ws.withdrawSessionId, - }); - } - }); - - await tx.iter(Stores.proposals).forEach((proposal) => { - if (proposal.proposalStatus == ProposalStatus.PROPOSED) { - pendingOperations.push({ - type: "proposal-choice", - merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, - proposalId: proposal.proposalId, - proposalTimestamp: proposal.timestamp, - }); - } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) { - pendingOperations.push({ - type: "proposal-download", - merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, - proposalId: proposal.proposalId, - proposalTimestamp: proposal.timestamp, - }); - } - }); - - await tx.iter(Stores.tips).forEach((tip) => { - if (tip.accepted && !tip.pickedUp) { - pendingOperations.push({ - type: "tip", - merchantBaseUrl: tip.merchantBaseUrl, - tipId: tip.tipId, - merchantTipId: tip.merchantTipId, - }); - } - }); + await gatherExchangePending(tx, now, resp, onlyDue); + await gatherReservePending(tx, now, resp, onlyDue); + await gatherRefreshPending(tx, now, resp, onlyDue); + await gatherCoinsPending(tx, now, resp, onlyDue); + await gatherWithdrawalPending(tx, now, resp, onlyDue); + await gatherProposalPending(tx, now, resp, onlyDue); + await gatherTipPending(tx, now, resp, onlyDue); + await gatherPurchasePending(tx, now, resp, onlyDue); }, ); - - return { - pendingOperations, - nextRetryDelay: { - d_ms: minRetryDurationMs, - }, - }; + return resp; } diff --git a/src/wallet-impl/refresh.ts b/src/wallet-impl/refresh.ts index 7e7270ed3..a3b48919d 100644 --- a/src/wallet-impl/refresh.ts +++ b/src/wallet-impl/refresh.ts @@ -23,6 +23,8 @@ import { RefreshPlanchetRecord, CoinRecord, RefreshSessionRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import { amountToPretty } from "../util/helpers"; import { @@ -36,6 +38,8 @@ import { InternalWalletState } from "./state"; import { Logger } from "../util/logging"; import { getWithdrawDenomList } from "./withdraw"; import { updateExchangeFromUrl } from "./exchanges"; +import { getTimestampNow, OperationError, NotificationType } from "../walletTypes"; +import { guardOperationException } from "./errors"; const logger = new Logger("refresh.ts"); @@ -132,14 +136,16 @@ async function refreshMelt( if (rs.norevealIndex !== undefined) { return; } - if (rs.finished) { + if (rs.finishedTimestamp) { return; } rs.norevealIndex = norevealIndex; return rs; }); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.RefreshMelted, + }); } async function refreshReveal( @@ -225,16 +231,6 @@ async function refreshReveal( return; } - const exchange = oneShotGet( - ws.db, - Stores.exchanges, - refreshSession.exchangeBaseUrl, - ); - if (!exchange) { - console.error(`exchange ${refreshSession.exchangeBaseUrl} not found`); - return; - } - const coins: CoinRecord[] = []; for (let i = 0; i < respJson.ev_sigs.length; i++) { @@ -271,32 +267,72 @@ async function refreshReveal( coins.push(coin); } - refreshSession.finished = true; - await runWithWriteTransaction( ws.db, [Stores.coins, Stores.refresh], async tx => { const rs = await tx.get(Stores.refresh, refreshSessionId); if (!rs) { + console.log("no refresh session found"); return; } - if (rs.finished) { + if (rs.finishedTimestamp) { + console.log("refresh session already finished"); return; } + rs.finishedTimestamp = getTimestampNow(); + rs.retryInfo = initRetryInfo(false); for (let coin of coins) { await tx.put(Stores.coins, coin); } - await tx.put(Stores.refresh, refreshSession); + await tx.put(Stores.refresh, rs); }, ); - ws.notifier.notify(); + console.log("refresh finished (end of reveal)"); + ws.notify({ + type: NotificationType.RefreshRevealed, + }); } +async function incrementRefreshRetry( + ws: InternalWalletState, + refreshSessionId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.refresh], async tx => { + const r = await tx.get(Stores.refresh, refreshSessionId); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.put(Stores.refresh, r); + }); +} + + export async function processRefreshSession( ws: InternalWalletState, refreshSessionId: string, ) { + return ws.memoProcessRefresh.memo(refreshSessionId, async () => { + const onOpErr = (e: OperationError) => + incrementRefreshRetry(ws, refreshSessionId, e); + return guardOperationException( + () => processRefreshSessionImpl(ws, refreshSessionId), + onOpErr, + ); + }); +} + +async function processRefreshSessionImpl( + ws: InternalWalletState, + refreshSessionId: string, +) { const refreshSession = await oneShotGet( ws.db, Stores.refresh, @@ -305,7 +341,7 @@ export async function processRefreshSession( if (!refreshSession) { return; } - if (refreshSession.finished) { + if (refreshSession.finishedTimestamp) { return; } if (typeof refreshSession.norevealIndex !== "number") { @@ -376,7 +412,7 @@ export async function refresh( x.status = CoinStatus.Dormant; return x; }); - ws.notifier.notify(); + ws.notify( { type: NotificationType.RefreshRefused }); return; } @@ -388,29 +424,32 @@ export async function refresh( oldDenom.feeRefresh, ); - function mutateCoin(c: CoinRecord): CoinRecord { - const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); - if (r.saturated) { - // Something else must have written the coin value - throw TransactionAbort; - } - c.currentAmount = r.amount; - c.status = CoinStatus.Dormant; - return c; - } - // Store refresh session and subtract refreshed amount from // coin in the same transaction. await runWithWriteTransaction( ws.db, [Stores.refresh, Stores.coins], async tx => { + const c = await tx.get(Stores.coins, coin.coinPub); + if (!c) { + return; + } + if (c.status !== CoinStatus.Dirty) { + return; + } + const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); + if (r.saturated) { + console.log("can't refresh coin, no amount left"); + return; + } + c.currentAmount = r.amount; + c.status = CoinStatus.Dormant; await tx.put(Stores.refresh, refreshSession); - await tx.mutate(Stores.coins, coin.coinPub, mutateCoin); + await tx.put(Stores.coins, c); }, ); logger.info(`created refresh session ${refreshSession.refreshSessionId}`); - ws.notifier.notify(); + ws.notify( { type: NotificationType.RefreshStarted }); await processRefreshSession(ws, refreshSession.refreshSessionId); } diff --git a/src/wallet-impl/reserves.ts b/src/wallet-impl/reserves.ts index d70f02576..f00956b46 100644 --- a/src/wallet-impl/reserves.ts +++ b/src/wallet-impl/reserves.ts @@ -20,6 +20,7 @@ import { getTimestampNow, ConfirmReserveRequest, OperationError, + NotificationType, } from "../walletTypes"; import { canonicalizeBaseUrl } from "../util/helpers"; import { InternalWalletState } from "./state"; @@ -29,6 +30,8 @@ import { CurrencyRecord, Stores, WithdrawalSessionRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import { oneShotMutate, @@ -42,13 +45,13 @@ import * as Amounts from "../util/amounts"; import { updateExchangeFromUrl, getExchangeTrust } from "./exchanges"; import { WithdrawOperationStatusResponse, ReserveStatus } from "../talerTypes"; import { assertUnreachable } from "../util/assertUnreachable"; -import { OperationFailedAndReportedError } from "../wallet"; import { encodeCrock } from "../crypto/talerCrypto"; import { randomBytes } from "../crypto/primitives/nacl-fast"; import { getVerifiedWithdrawDenomList, processWithdrawSession, } from "./withdraw"; +import { guardOperationException, OperationFailedAndReportedError } from "./errors"; const logger = new Logger("reserves.ts"); @@ -91,7 +94,9 @@ export async function createReserve( bankWithdrawStatusUrl: req.bankWithdrawStatusUrl, exchangeWire: req.exchangeWire, reserveStatus, - lastStatusQuery: undefined, + lastSuccessfulStatusQuery: undefined, + retryInfo: initRetryInfo(), + lastError: undefined, }; const senderWire = req.senderWire; @@ -171,7 +176,7 @@ export async function createReserve( // Asynchronously process the reserve, but return // to the caller already. - processReserve(ws, resp.reservePub).catch(e => { + processReserve(ws, resp.reservePub, true).catch(e => { console.error("Processing reserve failed:", e); }); @@ -188,18 +193,19 @@ export async function createReserve( export async function processReserve( ws: InternalWalletState, reservePub: string, + forceNow: boolean = false, ): Promise<void> { - const p = ws.memoProcessReserve.find(reservePub); - if (p) { - return p; - } else { - return ws.memoProcessReserve.put( - reservePub, - processReserveImpl(ws, reservePub), + return ws.memoProcessReserve.memo(reservePub, async () => { + const onOpError = (err: OperationError) => + incrementReserveRetry(ws, reservePub, err); + await guardOperationException( + () => processReserveImpl(ws, reservePub, forceNow), + onOpError, ); - } + }); } + async function registerReserveWithBank( ws: InternalWalletState, reservePub: string, @@ -235,6 +241,7 @@ async function registerReserveWithBank( } r.timestampReserveInfoPosted = getTimestampNow(); r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK; + r.retryInfo = initRetryInfo(); return r; }); return processReserveBankStatus(ws, reservePub); @@ -244,6 +251,18 @@ export async function processReserveBankStatus( ws: InternalWalletState, reservePub: string, ): Promise<void> { + const onOpError = (err: OperationError) => + incrementReserveRetry(ws, reservePub, err); + await guardOperationException( + () => processReserveBankStatusImpl(ws, reservePub), + onOpError, + ); +} + +async function processReserveBankStatusImpl( + ws: InternalWalletState, + reservePub: string, +): Promise<void> { let reserve = await oneShotGet(ws.db, Stores.reserves, reservePub); switch (reserve?.reserveStatus) { case ReserveRecordStatus.WAIT_CONFIRM_BANK: @@ -287,9 +306,10 @@ export async function processReserveBankStatus( const now = getTimestampNow(); r.timestampConfirmed = now; r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + r.retryInfo = initRetryInfo(); return r; }); - await processReserveImpl(ws, reservePub); + await processReserveImpl(ws, reservePub, true); } else { await oneShotMutate(ws.db, Stores.reserves, reservePub, r => { switch (r.reserveStatus) { @@ -304,16 +324,24 @@ export async function processReserveBankStatus( } } -async function setReserveError( +async function incrementReserveRetry( ws: InternalWalletState, reservePub: string, - err: OperationError, + err: OperationError | undefined, ): Promise<void> { - const mut = (reserve: ReserveRecord) => { - reserve.lastError = err; - return reserve; - }; - await oneShotMutate(ws.db, Stores.reserves, reservePub, mut); + await runWithWriteTransaction(ws.db, [Stores.reserves], async tx => { + const r = await tx.get(Stores.reserves, reservePub); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.put(Stores.reserves, r); + }); } /** @@ -345,15 +373,11 @@ async function updateReserve( } catch (e) { if (e.response?.status === 404) { const m = "The exchange does not know about this reserve (yet)."; - await setReserveError(ws, reservePub, { - type: "waiting", - details: {}, - message: "The exchange does not know about this reserve (yet).", - }); - throw new OperationFailedAndReportedError(m); + await incrementReserveRetry(ws, reservePub, undefined); + return; } else { const m = e.message; - await setReserveError(ws, reservePub, { + await incrementReserveRetry(ws, reservePub, { type: "network", details: {}, message: m, @@ -369,7 +393,7 @@ async function updateReserve( } // FIXME: check / compare history! - if (!r.lastStatusQuery) { + if (!r.lastSuccessfulStatusQuery) { // FIXME: check if this matches initial expectations r.withdrawRemainingAmount = balance; } else { @@ -392,22 +416,31 @@ async function updateReserve( // We're missing some money. } } - r.lastStatusQuery = getTimestampNow(); + r.lastSuccessfulStatusQuery = getTimestampNow(); r.reserveStatus = ReserveRecordStatus.WITHDRAWING; + r.retryInfo = initRetryInfo(); return r; }); - ws.notifier.notify(); + ws.notify( { type: NotificationType.ReserveUpdated }); } async function processReserveImpl( ws: InternalWalletState, reservePub: string, + forceNow: boolean = false, ): Promise<void> { const reserve = await oneShotGet(ws.db, Stores.reserves, reservePub); if (!reserve) { console.log("not processing reserve: reserve does not exist"); return; } + if (!forceNow) { + const now = getTimestampNow(); + if (reserve.retryInfo.nextRetry.t_ms > now.t_ms) { + logger.trace("processReserve retry not due yet"); + return; + } + } logger.trace( `Processing reserve ${reservePub} with status ${reserve.reserveStatus}`, ); @@ -417,10 +450,10 @@ async function processReserveImpl( break; case ReserveRecordStatus.REGISTERING_BANK: await processReserveBankStatus(ws, reservePub); - return processReserveImpl(ws, reservePub); + return processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.QUERYING_STATUS: await updateReserve(ws, reservePub); - return processReserveImpl(ws, reservePub); + return processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.WITHDRAWING: await depleteReserve(ws, reservePub); break; @@ -448,12 +481,13 @@ export async function confirmReserve( } reserve.timestampConfirmed = now; reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + reserve.retryInfo = initRetryInfo(); return reserve; }); - ws.notifier.notify(); + ws.notify({ type: NotificationType.ReserveUpdated }); - processReserve(ws, req.reservePub).catch(e => { + processReserve(ws, req.reservePub, true).catch(e => { console.log("processing reserve failed:", e); }); } @@ -489,7 +523,7 @@ async function depleteReserve( logger.trace(`got denom list`); if (denomsForWithdraw.length === 0) { const m = `Unable to withdraw from reserve, no denominations are available to withdraw.`; - await setReserveError(ws, reserve.reservePub, { + await incrementReserveRetry(ws, reserve.reservePub, { type: "internal", message: m, details: {}, @@ -502,7 +536,8 @@ async function depleteReserve( const withdrawalSessionId = encodeCrock(randomBytes(32)); - const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)).amount; + const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)) + .amount; const withdrawalRecord: WithdrawalSessionRecord = { withdrawSessionId: withdrawalSessionId, @@ -517,6 +552,9 @@ async function depleteReserve( withdrawn: denomsForWithdraw.map(x => false), planchets: denomsForWithdraw.map(x => undefined), totalCoinValue, + retryInfo: initRetryInfo(), + lastCoinErrors: denomsForWithdraw.map(x => undefined), + lastError: undefined, }; const totalCoinWithdrawFee = Amounts.sum( @@ -545,7 +583,7 @@ async function depleteReserve( r.withdrawRemainingAmount = remaining.amount; r.withdrawAllocatedAmount = allocated.amount; r.reserveStatus = ReserveRecordStatus.DORMANT; - + r.retryInfo = initRetryInfo(false); return r; } diff --git a/src/wallet-impl/return.ts b/src/wallet-impl/return.ts index 9cf12052d..ec19c00ae 100644 --- a/src/wallet-impl/return.ts +++ b/src/wallet-impl/return.ts @@ -204,8 +204,6 @@ export async function returnCoins( } }, ); - ws.badge.showNotification(); - ws.notifier.notify(); depositReturnedCoins(ws, coinsReturnRecord); } @@ -269,6 +267,5 @@ async function depositReturnedCoins( } } await oneShotPut(ws.db, Stores.coinsReturns, currentCrr); - ws.notifier.notify(); } } diff --git a/src/wallet-impl/state.ts b/src/wallet-impl/state.ts index a04a7dd1c..18df861f1 100644 --- a/src/wallet-impl/state.ts +++ b/src/wallet-impl/state.ts @@ -15,19 +15,54 @@ */ import { HttpRequestLibrary } from "../util/http"; -import { Badge, Notifier, NextUrlResult } from "../walletTypes"; +import { + NextUrlResult, + WalletBalance, + PendingOperationsResponse, + WalletNotification, +} from "../walletTypes"; import { SpeculativePayData } from "./pay"; -import { CryptoApi } from "../crypto/cryptoApi"; -import { AsyncOpMemo } from "../util/asyncMemo"; - -export interface InternalWalletState { - db: IDBDatabase; - http: HttpRequestLibrary; - badge: Badge; - notifier: Notifier; +import { CryptoApi, CryptoWorkerFactory } from "../crypto/workers/cryptoApi"; +import { AsyncOpMemoMap, AsyncOpMemoSingle } from "../util/asyncMemo"; +import { Logger } from "../util/logging"; + +type NotificationListener = (n: WalletNotification) => void; + +const logger = new Logger("state.ts"); + +export class InternalWalletState { + speculativePayData: SpeculativePayData | undefined = undefined; + cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; + memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + memoGetPending: AsyncOpMemoSingle< + PendingOperationsResponse + > = new AsyncOpMemoSingle(); + memoGetBalance: AsyncOpMemoSingle<WalletBalance> = new AsyncOpMemoSingle(); + memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); cryptoApi: CryptoApi; - speculativePayData: SpeculativePayData | undefined; - cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult }; - memoProcessReserve: AsyncOpMemo<void>; - memoMakePlanchet: AsyncOpMemo<void>; -}
\ No newline at end of file + + listeners: NotificationListener[] = []; + + constructor( + public db: IDBDatabase, + public http: HttpRequestLibrary, + cryptoWorkerFactory: CryptoWorkerFactory, + ) { + this.cryptoApi = new CryptoApi(cryptoWorkerFactory); + } + + public notify(n: WalletNotification) { + logger.trace("Notification", n); + for (const l of this.listeners) { + const nc = JSON.parse(JSON.stringify(n)); + setImmediate(() => { + l(nc); + }); + } + } + + addNotificationListener(f: (n: WalletNotification) => void): void { + this.listeners.push(f); + } +} diff --git a/src/wallet-impl/tip.ts b/src/wallet-impl/tip.ts index 593f0d612..3ae931d45 100644 --- a/src/wallet-impl/tip.ts +++ b/src/wallet-impl/tip.ts @@ -18,14 +18,15 @@ import { oneShotGet, oneShotPut, oneShotMutate, runWithWriteTransaction } from "../util/query"; import { InternalWalletState } from "./state"; import { parseTipUri } from "../util/taleruri"; -import { TipStatus, getTimestampNow } from "../walletTypes"; +import { TipStatus, getTimestampNow, OperationError } from "../walletTypes"; import { TipPickupGetResponse, TipPlanchetDetail, TipResponse } from "../talerTypes"; import * as Amounts from "../util/amounts"; -import { Stores, PlanchetRecord, WithdrawalSessionRecord } from "../dbTypes"; +import { Stores, PlanchetRecord, WithdrawalSessionRecord, initRetryInfo, updateRetryInfoTimeout } from "../dbTypes"; import { getWithdrawDetailsForAmount, getVerifiedWithdrawDenomList, processWithdrawSession } from "./withdraw"; import { getTalerStampSec } from "../util/helpers"; import { updateExchangeFromUrl } from "./exchanges"; import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto"; +import { guardOperationException } from "./errors"; export async function getTipStatus( @@ -74,12 +75,14 @@ export async function getTipStatus( pickedUp: false, planchets: undefined, response: undefined, - timestamp: getTimestampNow(), + createdTimestamp: getTimestampNow(), merchantTipId: res.merchantTipId, totalFees: Amounts.add( withdrawDetails.overhead, withdrawDetails.withdrawFee, ).amount, + retryInfo: initRetryInfo(), + lastError: undefined, }; await oneShotPut(ws.db, Stores.tips, tipRecord); } @@ -101,9 +104,37 @@ export async function getTipStatus( return tipStatus; } +async function incrementTipRetry( + ws: InternalWalletState, + refreshSessionId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.tips], async tx => { + const t = await tx.get(Stores.tips, refreshSessionId); + if (!t) { + return; + } + if (!t.retryInfo) { + return; + } + t.retryInfo.retryCounter++; + updateRetryInfoTimeout(t.retryInfo); + t.lastError = err; + await tx.put(Stores.tips, t); + }); +} + export async function processTip( ws: InternalWalletState, tipId: string, +): Promise<void> { + const onOpErr = (e: OperationError) => incrementTipRetry(ws, tipId, e); + await guardOperationException(() => processTipImpl(ws, tipId), onOpErr); +} + +async function processTipImpl( + ws: InternalWalletState, + tipId: string, ) { let tipRecord = await oneShotGet(ws.db, Stores.tips, tipId); if (!tipRecord) { @@ -205,6 +236,10 @@ export async function processTip( rawWithdrawalAmount: tipRecord.amount, withdrawn: planchets.map((x) => false), totalCoinValue: Amounts.sum(planchets.map((p) => p.coinValue)).amount, + lastCoinErrors: planchets.map((x) => undefined), + retryInfo: initRetryInfo(), + finishTimestamp: undefined, + lastError: undefined, }; @@ -217,6 +252,7 @@ export async function processTip( return; } tr.pickedUp = true; + tr.retryInfo = initRetryInfo(false); await tx.put(Stores.tips, tr); await tx.put(Stores.withdrawalSession, withdrawalSession); @@ -224,8 +260,6 @@ export async function processTip( await processWithdrawSession(ws, withdrawalSessionId); - ws.notifier.notify(); - ws.badge.showNotification(); return; } diff --git a/src/wallet-impl/withdraw.ts b/src/wallet-impl/withdraw.ts index d02ae14aa..7b7d0f640 100644 --- a/src/wallet-impl/withdraw.ts +++ b/src/wallet-impl/withdraw.ts @@ -22,6 +22,8 @@ import { CoinStatus, CoinRecord, PlanchetRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import * as Amounts from "../util/amounts"; import { @@ -30,6 +32,8 @@ import { DownloadedWithdrawInfo, ReserveCreationInfo, WithdrawDetails, + OperationError, + NotificationType, } from "../walletTypes"; import { WithdrawOperationStatusResponse } from "../talerTypes"; import { InternalWalletState } from "./state"; @@ -51,6 +55,7 @@ import { createReserve, processReserveBankStatus } from "./reserves"; import { WALLET_PROTOCOL_VERSION } from "../wallet"; import * as LibtoolVersion from "../util/libtoolVersion"; +import { guardOperationException } from "./errors"; const logger = new Logger("withdraw.ts"); @@ -143,12 +148,9 @@ export async function acceptWithdrawal( senderWire: withdrawInfo.senderWire, exchangeWire: exchangeWire, }); - ws.badge.showNotification(); - ws.notifier.notify(); // We do this here, as the reserve should be registered before we return, // so that we can redirect the user to the bank's status page. await processReserveBankStatus(ws, reserve.reservePub); - ws.notifier.notify(); console.log("acceptWithdrawal: returning"); return { reservePub: reserve.reservePub, @@ -234,6 +236,12 @@ async function processPlanchet( planchet.denomPub, ); + + const isValid = await ws.cryptoApi.rsaVerify(planchet.coinPub, denomSig, planchet.denomPub); + if (!isValid) { + throw Error("invalid RSA signature by the exchange"); + } + const coin: CoinRecord = { blindingKey: planchet.blindingKey, coinPriv: planchet.coinPriv, @@ -249,6 +257,9 @@ async function processPlanchet( withdrawSessionId: withdrawalSessionId, }; + let withdrawSessionFinished = false; + let reserveDepleted = false; + await runWithWriteTransaction( ws.db, [Stores.coins, Stores.withdrawalSession, Stores.reserves], @@ -262,6 +273,18 @@ async function processPlanchet( return; } ws.withdrawn[coinIdx] = true; + ws.lastCoinErrors[coinIdx] = undefined; + let numDone = 0; + for (let i = 0; i < ws.withdrawn.length; i++) { + if (ws.withdrawn[i]) { + numDone++; + } + } + if (numDone === ws.denoms.length) { + ws.finishTimestamp = getTimestampNow(); + ws.retryInfo = initRetryInfo(false); + withdrawSessionFinished = true; + } await tx.put(Stores.withdrawalSession, ws); if (!planchet.isFromTip) { const r = await tx.get(Stores.reserves, planchet.reservePub); @@ -270,14 +293,29 @@ async function processPlanchet( r.withdrawCompletedAmount, Amounts.add(denom.value, denom.feeWithdraw).amount, ).amount; + if (Amounts.cmp(r.withdrawCompletedAmount, r.withdrawAllocatedAmount) == 0) { + reserveDepleted = true; + } await tx.put(Stores.reserves, r); } } await tx.add(Stores.coins, coin); }, ); - ws.notifier.notify(); - logger.trace(`withdraw of one coin ${coin.coinPub} finished`); + + if (withdrawSessionFinished) { + ws.notify({ + type: NotificationType.WithdrawSessionFinished, + withdrawSessionId: withdrawalSessionId, + }); + } + + if (reserveDepleted && withdrawalSession.source.type === "reserve") { + ws.notify({ + type: NotificationType.ReserveDepleted, + reservePub: withdrawalSession.source.reservePub, + }); + } } /** @@ -437,28 +475,51 @@ async function processWithdrawCoin( } if (!withdrawalSession.planchets[coinIndex]) { - logger.trace("creating planchet for coin", coinIndex); const key = `${withdrawalSessionId}-${coinIndex}`; - const p = ws.memoMakePlanchet.find(key); - if (p) { - await p; - } else { - ws.memoMakePlanchet.put( - key, - makePlanchet(ws, withdrawalSessionId, coinIndex), - ); - } - await makePlanchet(ws, withdrawalSessionId, coinIndex); - logger.trace("done creating planchet for coin", coinIndex); + await ws.memoMakePlanchet.memo(key, async () => { + logger.trace("creating planchet for coin", coinIndex); + return makePlanchet(ws, withdrawalSessionId, coinIndex); + }); } await processPlanchet(ws, withdrawalSessionId, coinIndex); - logger.trace("starting withdraw for coin", coinIndex); +} + +async function incrementWithdrawalRetry( + ws: InternalWalletState, + withdrawalSessionId: string, + err: OperationError | undefined, +): Promise<void> { + await runWithWriteTransaction(ws.db, [Stores.withdrawalSession], async tx => { + const wsr = await tx.get(Stores.withdrawalSession, withdrawalSessionId); + if (!wsr) { + return; + } + if (!wsr.retryInfo) { + return; + } + wsr.retryInfo.retryCounter++; + updateRetryInfoTimeout(wsr.retryInfo); + wsr.lastError = err; + await tx.put(Stores.withdrawalSession, wsr); + }); } export async function processWithdrawSession( ws: InternalWalletState, withdrawalSessionId: string, ): Promise<void> { + const onOpErr = (e: OperationError) => + incrementWithdrawalRetry(ws, withdrawalSessionId, e); + await guardOperationException( + () => processWithdrawSessionImpl(ws, withdrawalSessionId), + onOpErr, + ); +} + +export async function processWithdrawSessionImpl( + ws: InternalWalletState, + withdrawalSessionId: string, +): Promise<void> { logger.trace("processing withdraw session", withdrawalSessionId); const withdrawalSession = await oneShotGet( ws.db, @@ -474,7 +535,6 @@ export async function processWithdrawSession( processWithdrawCoin(ws, withdrawalSessionId, i), ); await Promise.all(ps); - ws.badge.showNotification(); return; } diff --git a/src/wallet.ts b/src/wallet.ts index 772bb01ac..86b3085f4 100644 --- a/src/wallet.ts +++ b/src/wallet.ts @@ -22,7 +22,7 @@ /** * Imports. */ -import { CryptoApi, CryptoWorkerFactory } from "./crypto/cryptoApi"; +import { CryptoApi, CryptoWorkerFactory } from "./crypto/workers/cryptoApi"; import { HttpRequestLibrary } from "./util/http"; import { oneShotPut, @@ -49,6 +49,7 @@ import { processDownloadProposal, applyRefund, getFullRefundFees, + processPurchaseImpl, } from "./wallet-impl/pay"; import { @@ -65,14 +66,12 @@ import { } from "./dbTypes"; import { MerchantRefundPermission } from "./talerTypes"; import { - Badge, BenchmarkResult, ConfirmPayResult, ConfirmReserveRequest, CreateReserveRequest, CreateReserveResponse, HistoryEvent, - Notifier, ReturnCoinsRequest, SenderWireInfos, TipStatus, @@ -85,6 +84,8 @@ import { PendingOperationInfo, PendingOperationsResponse, HistoryQuery, + WalletNotification, + NotificationType, } from "./walletTypes"; import { Logger } from "./util/logging"; @@ -97,8 +98,6 @@ import { } from "./wallet-impl/exchanges"; import { processReserve } from "./wallet-impl/reserves"; -import { AsyncOpMemo } from "./util/asyncMemo"; - import { InternalWalletState } from "./wallet-impl/state"; import { createReserve, confirmReserve } from "./wallet-impl/reserves"; import { processRefreshSession, refresh } from "./wallet-impl/refresh"; @@ -111,6 +110,7 @@ import { returnCoins } from "./wallet-impl/return"; import { payback } from "./wallet-impl/payback"; import { TimerGroup } from "./util/timer"; import { AsyncCondition } from "./util/promiseUtils"; +import { AsyncOpMemoSingle } from "./util/asyncMemo"; /** * Wallet protocol version spoken with the exchange @@ -137,18 +137,6 @@ const builtinCurrencies: CurrencyRecord[] = [ }, ]; -/** - * This error is thrown when an - */ -export class OperationFailedAndReportedError extends Error { - constructor(message: string) { - super(message); - - // Set the prototype explicitly. - Object.setPrototypeOf(this, OperationFailedAndReportedError.prototype); - } -} - const logger = new Logger("wallet.ts"); /** @@ -159,41 +147,18 @@ export class Wallet { private timerGroup: TimerGroup = new TimerGroup(); private latch = new AsyncCondition(); private stopped: boolean = false; + private memoRunRetryLoop = new AsyncOpMemoSingle<void>(); get db(): IDBDatabase { return this.ws.db; } - private get badge(): Badge { - return this.ws.badge; - } - - private get cryptoApi(): CryptoApi { - return this.ws.cryptoApi; - } - - private get notifier(): Notifier { - return this.ws.notifier; - } - constructor( db: IDBDatabase, http: HttpRequestLibrary, - badge: Badge, - notifier: Notifier, cryptoWorkerFactory: CryptoWorkerFactory, ) { - this.ws = { - badge, - cachedNextUrl: {}, - cryptoApi: new CryptoApi(cryptoWorkerFactory), - db, - http, - notifier, - speculativePayData: undefined, - memoProcessReserve: new AsyncOpMemo<void>(), - memoMakePlanchet: new AsyncOpMemo<void>(), - }; + this.ws = new InternalWalletState(db, http, cryptoWorkerFactory); } getExchangePaytoUri(exchangeBaseUrl: string, supportedTargetTypes: string[]) { @@ -204,6 +169,10 @@ export class Wallet { return getWithdrawDetailsForAmount(this.ws, baseUrl, amount); } + addNotificationListener(f: (n: WalletNotification) => void): void { + this.ws.addNotificationListener(f); + } + /** * Execute one operation based on the pending operation info record. */ @@ -213,6 +182,7 @@ export class Wallet { ): Promise<void> { switch (pending.type) { case "bug": + // Nothing to do, will just be displayed to the user return; case "dirty-coin": await refresh(this.ws, pending.coinPub); @@ -224,7 +194,7 @@ export class Wallet { await processRefreshSession(this.ws, pending.refreshSessionId); break; case "reserve": - await processReserve(this.ws, pending.reservePub); + await processReserve(this.ws, pending.reservePub, forceNow); break; case "withdraw": await processWithdrawSession(this.ws, pending.withdrawSessionId); @@ -239,6 +209,7 @@ export class Wallet { await processTip(this.ws, pending.tipId); break; case "pay": + await processPurchaseImpl(this.ws, pending.proposalId); break; default: assertUnreachable(pending); @@ -249,7 +220,8 @@ export class Wallet { * Process pending operations. */ public async runPending(forceNow: boolean = false): Promise<void> { - const pendingOpsResponse = await this.getPendingOperations(); + const onlyDue = !forceNow; + const pendingOpsResponse = await this.getPendingOperations(onlyDue); for (const p of pendingOpsResponse.pendingOperations) { try { await this.processOnePendingOperation(p, forceNow); @@ -260,54 +232,96 @@ export class Wallet { } /** - * Process pending operations and wait for scheduled operations in - * a loop until the wallet is stopped explicitly. + * Run the wallet until there are no more pending operations that give + * liveness left. The wallet will be in a stopped state when this function + * returns without resolving to an exception. */ - public async runLoopScheduledRetries(): Promise<void> { - while (!this.stopped) { - console.log("running wallet retry loop iteration"); - let pending = await this.getPendingOperations(); - console.log("waiting for", pending.nextRetryDelay); - const timeout = this.timerGroup.resolveAfter(pending.nextRetryDelay.d_ms); - await Promise.race([timeout, this.latch.wait()]); - pending = await this.getPendingOperations(); - for (const p of pending.pendingOperations) { - try { - this.processOnePendingOperation(p); - } catch (e) { - console.error(e); + public async runUntilDone(): Promise<void> { + const p = new Promise((resolve, reject) => { + // Run this asynchronously + this.addNotificationListener(n => { + if ( + n.type === NotificationType.WaitingForRetry && + n.numGivingLiveness == 0 + ) { + logger.trace("no liveness-giving operations left, stopping"); + this.stop(); } - } - } + }); + this.runRetryLoop().catch(e => { + console.log("exception in wallet retry loop"); + reject(e); + }); + }); + await p; } /** - * Run until all coins have been withdrawn from the given reserve, - * or an error has occured. + * Process pending operations and wait for scheduled operations in + * a loop until the wallet is stopped explicitly. */ - public async runUntilReserveDepleted(reservePub: string) { - while (true) { - const r = await this.getPendingOperations(); - const allPending = r.pendingOperations; - const relevantPending = allPending.filter(x => { - switch (x.type) { - case "reserve": - return x.reservePub === reservePub; - case "withdraw": - return ( - x.source.type === "reserve" && x.source.reservePub === reservePub - ); - default: - return false; - } - }); - if (relevantPending.length === 0) { - return; + public async runRetryLoop(): Promise<void> { + // Make sure we only run one main loop at a time. + return this.memoRunRetryLoop.memo(async () => { + try { + await this.runRetryLoopImpl(); + } catch (e) { + console.error("error during retry loop execution", e); + throw e; } - for (const p of relevantPending) { - await this.processOnePendingOperation(p); + }); + } + + private async runRetryLoopImpl(): Promise<void> { + while (!this.stopped) { + console.log("running wallet retry loop iteration"); + let pending = await this.getPendingOperations(true); + if (pending.pendingOperations.length === 0) { + const allPending = await this.getPendingOperations(false); + let numPending = 0; + let numGivingLiveness = 0; + for (const p of allPending.pendingOperations) { + numPending++; + if (p.givesLifeness) { + numGivingLiveness++; + } + } + let timeout; + if ( + allPending.pendingOperations.length === 0 || + allPending.nextRetryDelay.d_ms === Number.MAX_SAFE_INTEGER + ) { + // Wait forever + timeout = new Promise(() => {}); + console.log("waiting forever"); + } else { + console.log("waiting for timeout", pending.nextRetryDelay); + timeout = this.timerGroup.resolveAfter( + allPending.nextRetryDelay.d_ms, + ); + } + this.ws.notify({ + type: NotificationType.WaitingForRetry, + numGivingLiveness, + numPending, + }); + await Promise.race([timeout, this.latch.wait()]); + console.log("timeout done"); + } else { + logger.trace("running pending operations that are due"); + // FIXME: maybe be a bit smarter about executing these + // opeations in parallel? + for (const p of pending.pendingOperations) { + try { + console.log("running", p); + await this.processOnePendingOperation(p); + } catch (e) { + console.error(e); + } + } } } + logger.trace("exiting wallet retry loop"); } /** @@ -429,7 +443,6 @@ export class Wallet { } } - /** * Check if and how an exchange is trusted and/or audited. */ @@ -466,7 +479,7 @@ export class Wallet { * Get detailed balance information, sliced by exchange and by currency. */ async getBalances(): Promise<WalletBalance> { - return getBalances(this.ws); + return this.ws.memoGetBalance.memo(() => getBalances(this.ws)); } async refresh(oldCoinPub: string, force: boolean = false): Promise<void> { @@ -488,8 +501,12 @@ export class Wallet { return getHistory(this.ws, historyQuery); } - async getPendingOperations(): Promise<PendingOperationsResponse> { - return getPendingOperations(this.ws); + async getPendingOperations( + onlyDue: boolean = false, + ): Promise<PendingOperationsResponse> { + return this.ws.memoGetPending.memo(() => + getPendingOperations(this.ws, onlyDue), + ); } async getDenoms(exchangeUrl: string): Promise<DenominationRecord[]> { @@ -517,7 +534,6 @@ export class Wallet { async updateCurrency(currencyRecord: CurrencyRecord): Promise<void> { logger.trace("updating currency to", currencyRecord); await oneShotPut(this.db, Stores.currencies, currencyRecord); - this.notifier.notify(); } async getReserves(exchangeBaseUrl: string): Promise<ReserveRecord[]> { @@ -552,7 +568,7 @@ export class Wallet { stop() { this.stopped = true; this.timerGroup.stopCurrentAndFutureTimers(); - this.cryptoApi.stop(); + this.ws.cryptoApi.stop(); } async getSenderWireInfos(): Promise<SenderWireInfos> { @@ -693,17 +709,13 @@ export class Wallet { const totalFees = totalRefundFees; return { contractTerms: purchase.contractTerms, - hasRefund: purchase.timestamp_refund !== undefined, + hasRefund: purchase.lastRefundTimestamp !== undefined, totalRefundAmount: totalRefundAmount, totalRefundAndRefreshFees: totalFees, }; } - clearNotification(): void { - this.badge.clearNotification(); - } - benchmarkCrypto(repetitions: number): Promise<BenchmarkResult> { - return this.cryptoApi.benchmark(repetitions); + return this.ws.cryptoApi.benchmark(repetitions); } } diff --git a/src/walletTypes.ts b/src/walletTypes.ts index be88fc5b0..d78fc8126 100644 --- a/src/walletTypes.ts +++ b/src/walletTypes.ts @@ -36,6 +36,7 @@ import { ExchangeRecord, ExchangeWireInfo, WithdrawalSource, + RetryInfo, } from "./dbTypes"; import { CoinPaySig, ContractTerms, PayReq } from "./talerTypes"; @@ -204,16 +205,6 @@ export interface PayCoinInfo { } /** - * Listener for notifications from the wallet. - */ -export interface Notifier { - /** - * Called when a new notification arrives. - */ - notify(): void; -} - -/** * For terseness. */ export function mkAmount( @@ -421,31 +412,6 @@ export interface TipStatus { totalFees: AmountJson; } -/** - * Badge that shows activity for the wallet. - */ -export interface Badge { - /** - * Start indicating background activity. - */ - startBusy(): void; - - /** - * Stop indicating background activity. - */ - stopBusy(): void; - - /** - * Show the notification in the badge. - */ - showNotification(): void; - - /** - * Stop showing the notification. - */ - clearNotification(): void; -} - export interface BenchmarkResult { time: { [s: string]: number }; repetitions: number; @@ -525,7 +491,7 @@ export interface WalletDiagnostics { export interface PendingWithdrawOperation { type: "withdraw"; - source: WithdrawalSource, + source: WithdrawalSource; withdrawSessionId: string; numCoinsWithdrawn: number; numCoinsTotal: number; @@ -539,6 +505,102 @@ export interface PendingPayOperation { type: "pay"; } +export const enum NotificationType { + ProposalAccepted = "proposal-accepted", + ProposalDownloaded = "proposal-downloaded", + RefundsSubmitted = "refunds-submitted", + PaybackStarted = "payback-started", + PaybackFinished = "payback-finished", + RefreshRevealed = "refresh-revealed", + RefreshMelted = "refresh-melted", + RefreshStarted = "refresh-started", + RefreshRefused = "refresh-refused", + ReserveUpdated = "reserve-updated", + ReserveConfirmed = "reserve-confirmed", + ReserveDepleted = "reserve-depleted", + WithdrawSessionFinished = "withdraw-session-finished", + WaitingForRetry = "waiting-for-retry", +} + +export interface ProposalAcceptedNotification { + type: NotificationType.ProposalAccepted; + proposalId: string; +} + +export interface ProposalDownloadedNotification { + type: NotificationType.ProposalDownloaded; + proposalId: string; +} + +export interface RefundsSubmittedNotification { + type: NotificationType.RefundsSubmitted; + proposalId: string; +} + +export interface PaybackStartedNotification { + type: NotificationType.PaybackStarted; +} + +export interface PaybackFinishedNotification { + type: NotificationType.PaybackFinished; +} + +export interface RefreshMeltedNotification { + type: NotificationType.RefreshMelted; +} + +export interface RefreshRevealedNotification { + type: NotificationType.RefreshRevealed; +} + +export interface RefreshStartedNotification { + type: NotificationType.RefreshStarted; +} + +export interface RefreshRefusedNotification { + type: NotificationType.RefreshRefused; +} + +export interface ReserveUpdatedNotification { + type: NotificationType.ReserveUpdated; +} + +export interface ReserveConfirmedNotification { + type: NotificationType.ReserveConfirmed; +} + +export interface WithdrawSessionFinishedNotification { + type: NotificationType.WithdrawSessionFinished; + withdrawSessionId: string; +} + +export interface ReserveDepletedNotification { + type: NotificationType.ReserveDepleted; + reservePub: string; +} + +export interface WaitingForRetryNotification { + type: NotificationType.WaitingForRetry; + numPending: number; + numGivingLiveness: number; +} + +export type WalletNotification = + | ProposalAcceptedNotification + | ProposalDownloadedNotification + | RefundsSubmittedNotification + | PaybackStartedNotification + | PaybackFinishedNotification + | RefreshMeltedNotification + | RefreshRevealedNotification + | RefreshStartedNotification + | RefreshRefusedNotification + | ReserveUpdatedNotification + | ReserveConfirmedNotification + | WithdrawSessionFinishedNotification + | ReserveDepletedNotification + | WaitingForRetryNotification; + export interface OperationError { type: string; message: string; @@ -561,7 +623,7 @@ export interface PendingBugOperation { export interface PendingReserveOperation { type: "reserve"; - lastError?: OperationError; + retryInfo: RetryInfo | undefined; stage: string; timestampCreated: Timestamp; reserveType: string; @@ -578,7 +640,6 @@ export interface PendingRefreshOperation { refreshOutputSize: number; } - export interface PendingDirtyCoinOperation { type: "dirty-coin"; coinPub: string; @@ -615,17 +676,24 @@ export interface PendingPayOperation { isReplay: boolean; } -export type PendingOperationInfo = - | PendingWithdrawOperation - | PendingReserveOperation - | PendingBugOperation - | PendingDirtyCoinOperation - | PendingExchangeUpdateOperation - | PendingRefreshOperation - | PendingTipOperation - | PendingProposalDownloadOperation - | PendingProposalChoiceOperation - | PendingPayOperation; +export interface PendingOperationInfoCommon { + type: string; + givesLifeness: boolean; +} + +export type PendingOperationInfo = PendingOperationInfoCommon & + ( + | PendingWithdrawOperation + | PendingReserveOperation + | PendingBugOperation + | PendingDirtyCoinOperation + | PendingExchangeUpdateOperation + | PendingRefreshOperation + | PendingTipOperation + | PendingProposalDownloadOperation + | PendingProposalChoiceOperation + | PendingPayOperation + ); export interface PendingOperationsResponse { pendingOperations: PendingOperationInfo[]; @@ -683,4 +751,4 @@ export interface PlanchetCreationRequest { denomPub: string; reservePub: string; reservePriv: string; -}
\ No newline at end of file +} diff --git a/src/webex/chromeBadge.ts b/src/webex/chromeBadge.ts index 15b68ef02..e6b21ad91 100644 --- a/src/webex/chromeBadge.ts +++ b/src/webex/chromeBadge.ts @@ -14,9 +14,6 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -import { - Badge, -} from "../walletTypes"; import { isFirefox } from "./compat"; @@ -36,7 +33,7 @@ function rAF(cb: (ts: number) => void) { * Badge for Chrome that renders a Taler logo with a rotating ring if some * background activity is happening. */ -export class ChromeBadge implements Badge { +export class ChromeBadge { private canvas: HTMLCanvasElement; private ctx: CanvasRenderingContext2D; /** diff --git a/src/webex/messages.ts b/src/webex/messages.ts index cf409b44e..6c57385c3 100644 --- a/src/webex/messages.ts +++ b/src/webex/messages.ts @@ -145,10 +145,6 @@ export interface MessageMap { request: { talerTipUri: string }; response: walletTypes.TipStatus; }; - "clear-notification": { - request: {}; - response: void; - }; "accept-refund": { request: { refundUrl: string }; response: string; diff --git a/src/webex/wxApi.ts b/src/webex/wxApi.ts index ea26cd2eb..c4fa65186 100644 --- a/src/webex/wxApi.ts +++ b/src/webex/wxApi.ts @@ -281,13 +281,6 @@ export function acceptTip(talerTipUri: string): Promise<void> { /** - * Clear notifications that the wallet shows to the user. - */ -export function clearNotification(): Promise<void> { - return callBackend("clear-notification", { }); -} - -/** * Download a refund and accept it. */ export function applyRefund(refundUrl: string): Promise<string> { diff --git a/src/webex/wxBackend.ts b/src/webex/wxBackend.ts index 752027b70..4363890eb 100644 --- a/src/webex/wxBackend.ts +++ b/src/webex/wxBackend.ts @@ -28,7 +28,6 @@ import { AmountJson } from "../util/amounts"; import { ConfirmReserveRequest, CreateReserveRequest, - Notifier, ReturnCoinsRequest, WalletDiagnostics, } from "../walletTypes"; @@ -41,7 +40,7 @@ import { MessageType } from "./messages"; import * as wxApi from "./wxApi"; import Port = chrome.runtime.Port; import MessageSender = chrome.runtime.MessageSender; -import { BrowserCryptoWorkerFactory } from "../crypto/cryptoApi"; +import { BrowserCryptoWorkerFactory } from "../crypto/workers/cryptoApi"; import { OpenedPromise, openPromise } from "../util/promiseUtils"; const NeedsWallet = Symbol("NeedsWallet"); @@ -225,9 +224,6 @@ async function handleMessage( case "accept-tip": { return needsWallet().acceptTip(detail.talerTipUri); } - case "clear-notification": { - return needsWallet().clearNotification(); - } case "abort-failed-payment": { if (!detail.contractTermsHash) { throw Error("contracTermsHash not given"); @@ -331,31 +327,6 @@ async function dispatch( } } -class ChromeNotifier implements Notifier { - private ports: Port[] = []; - - constructor() { - chrome.runtime.onConnect.addListener(port => { - console.log("got connect!"); - this.ports.push(port); - port.onDisconnect.addListener(() => { - const i = this.ports.indexOf(port); - if (i >= 0) { - this.ports.splice(i, 1); - } else { - console.error("port already removed"); - } - }); - }); - } - - notify() { - for (const p of this.ports) { - p.postMessage({ notify: true }); - } - } -} - function getTab(tabId: number): Promise<chrome.tabs.Tab> { return new Promise((resolve, reject) => { chrome.tabs.get(tabId, (tab: chrome.tabs.Tab) => resolve(tab)); @@ -458,16 +429,13 @@ async function reinitWallet() { return; } const http = new BrowserHttpLib(); - const notifier = new ChromeNotifier(); console.log("setting wallet"); const wallet = new Wallet( currentDatabase, http, - badge, - notifier, new BrowserCryptoWorkerFactory(), ); - wallet.runLoopScheduledRetries().catch((e) => { + wallet.runRetryLoop().catch((e) => { console.log("error during wallet retry loop", e); }); // Useful for debugging in the background page. @@ -621,21 +589,6 @@ export async function wxMain() { return true; }); - // Clear notifications both when the popop opens, - // as well when it closes. - chrome.runtime.onConnect.addListener(port => { - if (port.name === "popup") { - if (currentWallet) { - currentWallet.clearNotification(); - } - port.onDisconnect.addListener(() => { - if (currentWallet) { - currentWallet.clearNotification(); - } - }); - } - }); - // Handlers for catching HTTP requests chrome.webRequest.onHeadersReceived.addListener( details => { |