From f67d7f54f9d0fed97446898942e3dfee67ee2985 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 5 Dec 2019 19:38:19 +0100 Subject: threads, retries and notifications WIP --- rollup.config.js | 26 +- src/android/index.ts | 13 +- src/crypto/browserWorkerEntry.ts | 70 ---- src/crypto/cryptoApi.ts | 444 --------------------- src/crypto/cryptoImplementation.ts | 596 ---------------------------- src/crypto/cryptoWorker.ts | 8 - src/crypto/nodeProcessWorker.ts | 118 ------ src/crypto/nodeWorkerEntry.ts | 69 ---- src/crypto/synchronousWorker.ts | 135 ------- src/crypto/workers/browserWorkerEntry.ts | 70 ++++ src/crypto/workers/cryptoApi.ts | 455 +++++++++++++++++++++ src/crypto/workers/cryptoImplementation.ts | 608 +++++++++++++++++++++++++++++ src/crypto/workers/cryptoWorker.ts | 8 + src/crypto/workers/nodeThreadWorker.ts | 175 +++++++++ src/crypto/workers/synchronousWorker.ts | 135 +++++++ src/dbTypes.ts | 166 +++++++- src/headless/helpers.ts | 44 +-- src/headless/integrationtest.ts | 6 +- src/headless/taler-wallet-cli.ts | 4 +- src/util/asyncMemo.ts | 77 +++- src/util/query.ts | 3 +- src/util/taleruri-test.ts | 14 +- src/util/taleruri.ts | 19 +- src/wallet-impl/balance.ts | 3 +- src/wallet-impl/errors.ts | 81 ++++ src/wallet-impl/exchanges.ts | 2 +- src/wallet-impl/history.ts | 8 +- src/wallet-impl/pay.ts | 207 ++++++++-- src/wallet-impl/payback.ts | 9 +- src/wallet-impl/pending.ts | 561 ++++++++++++++++---------- src/wallet-impl/refresh.ts | 103 +++-- src/wallet-impl/reserves.ts | 110 ++++-- src/wallet-impl/return.ts | 3 - src/wallet-impl/state.ts | 63 ++- src/wallet-impl/tip.ts | 44 ++- src/wallet-impl/withdraw.ts | 98 ++++- src/wallet.ts | 202 +++++----- src/walletTypes.ts | 168 +++++--- src/webex/chromeBadge.ts | 5 +- src/webex/messages.ts | 4 - src/webex/wxApi.ts | 7 - src/webex/wxBackend.ts | 51 +-- tsconfig.json | 14 +- 43 files changed, 2897 insertions(+), 2109 deletions(-) delete mode 100644 src/crypto/browserWorkerEntry.ts delete mode 100644 src/crypto/cryptoApi.ts delete mode 100644 src/crypto/cryptoImplementation.ts delete mode 100644 src/crypto/cryptoWorker.ts delete mode 100644 src/crypto/nodeProcessWorker.ts delete mode 100644 src/crypto/nodeWorkerEntry.ts delete mode 100644 src/crypto/synchronousWorker.ts create mode 100644 src/crypto/workers/browserWorkerEntry.ts create mode 100644 src/crypto/workers/cryptoApi.ts create mode 100644 src/crypto/workers/cryptoImplementation.ts create mode 100644 src/crypto/workers/cryptoWorker.ts create mode 100644 src/crypto/workers/nodeThreadWorker.ts create mode 100644 src/crypto/workers/synchronousWorker.ts create mode 100644 src/wallet-impl/errors.ts diff --git a/rollup.config.js b/rollup.config.js index 8038b5094..e520a1bd8 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -29,4 +29,28 @@ const walletCli = { ] }; -export default [walletCli]; +const walletAndroid = { + input: 'dist/node/android/index.js', + output: { + file: 'dist/standalone/taler-wallet-android.js', + format: 'cjs' + }, + plugins: [ + json(), + + nodeResolve({ + external: builtins, + preferBuiltins: true + }), + + commonjs({ + include: ['node_modules/**', 'dist/node/**'], + extensions: [ '.js' ], + ignoreGlobal: false, // Default: false + sourceMap: false, + ignore: [ 'taler-wallet' ] + }) + ] +}; + +export default [walletCli, walletAndroid]; diff --git a/src/android/index.ts b/src/android/index.ts index 4d0136ecf..dcdb9d756 100644 --- a/src/android/index.ts +++ b/src/android/index.ts @@ -125,6 +125,7 @@ export function installAndroidWalletListener() { return; } const id = msg.id; + console.log(`android listener: got request for ${operation} (${id})`); let result; switch (operation) { case "init": { @@ -137,7 +138,7 @@ export function installAndroidWalletListener() { }; const w = await getDefaultNodeWallet(walletArgs); maybeWallet = w; - w.runLoopScheduledRetries().catch((e) => { + w.runRetryLoop().catch((e) => { console.error("Error during wallet retry loop", e); }); wp.resolve(w); @@ -156,7 +157,11 @@ export function installAndroidWalletListener() { } case "withdrawTestkudos": { const wallet = await wp.promise; - await withdrawTestBalance(wallet); + try { + await withdrawTestBalance(wallet); + } catch (e) { + console.log("error during withdrawTestBalance", e); + } result = {}; break; } @@ -221,7 +226,7 @@ export function installAndroidWalletListener() { maybeWallet = undefined; const w = await getDefaultNodeWallet(walletArgs); maybeWallet = w; - w.runLoopScheduledRetries().catch((e) => { + w.runRetryLoop().catch((e) => { console.error("Error during wallet retry loop", e); }); wp.resolve(w); @@ -233,6 +238,8 @@ export function installAndroidWalletListener() { return; } + console.log(`android listener: sending response for ${operation} (${id})`); + const respMsg = { result, id, operation, type: "response" }; sendMessage(JSON.stringify(respMsg)); }; diff --git a/src/crypto/browserWorkerEntry.ts b/src/crypto/browserWorkerEntry.ts deleted file mode 100644 index 5ac762c13..000000000 --- a/src/crypto/browserWorkerEntry.ts +++ /dev/null @@ -1,70 +0,0 @@ -/* - This file is part of TALER - (C) 2016 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see -*/ - -/** - * Web worker for crypto operations. - */ - -/** - * Imports. - */ - -import { CryptoImplementation } from "./cryptoImplementation"; - -const worker: Worker = (self as any) as Worker; - -async function handleRequest(operation: string, id: number, args: string[]) { - const impl = new CryptoImplementation(); - - if (!(operation in impl)) { - console.error(`crypto operation '${operation}' not found`); - return; - } - - try { - const result = (impl as any)[operation](...args); - worker.postMessage({ result, id }); - } catch (e) { - console.log("error during operation", e); - return; - } -} - -worker.onmessage = (msg: MessageEvent) => { - const args = msg.data.args; - if (!Array.isArray(args)) { - console.error("args must be array"); - return; - } - const id = msg.data.id; - if (typeof id !== "number") { - console.error("RPC id must be number"); - return; - } - const operation = msg.data.operation; - if (typeof operation !== "string") { - console.error("RPC operation must be string"); - return; - } - - if (CryptoImplementation.enableTracing) { - console.log("onmessage with", operation); - } - - handleRequest(operation, id, args).catch((e) => { - console.error("error in browsere worker", e); - }); -}; diff --git a/src/crypto/cryptoApi.ts b/src/crypto/cryptoApi.ts deleted file mode 100644 index 2521b54ea..000000000 --- a/src/crypto/cryptoApi.ts +++ /dev/null @@ -1,444 +0,0 @@ -/* - This file is part of TALER - (C) 2016 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see - */ - -/** - * API to access the Taler crypto worker thread. - * @author Florian Dold - */ - -/** - * Imports. - */ -import { AmountJson } from "../util/amounts"; - -import { - CoinRecord, - DenominationRecord, - RefreshSessionRecord, - TipPlanchet, - WireFee, -} from "../dbTypes"; - -import { CryptoWorker } from "./cryptoWorker"; - -import { ContractTerms, PaybackRequest } from "../talerTypes"; - -import { BenchmarkResult, CoinWithDenom, PayCoinInfo, PlanchetCreationResult, PlanchetCreationRequest } from "../walletTypes"; - -import * as timer from "../util/timer"; - -/** - * State of a crypto worker. - */ -interface WorkerState { - /** - * The actual worker thread. - */ - w: CryptoWorker | null; - - /** - * Work we're currently executing or null if not busy. - */ - currentWorkItem: WorkItem | null; - - /** - * Timer to terminate the worker if it's not busy enough. - */ - terminationTimerHandle: timer.TimerHandle | null; -} - -interface WorkItem { - operation: string; - args: any[]; - resolve: any; - reject: any; - - /** - * Serial id to identify a matching response. - */ - rpcId: number; - - /** - * Time when the work was submitted to a (non-busy) worker thread. - */ - startTime: number; -} - -/** - * Number of different priorities. Each priority p - * must be 0 <= p < NUM_PRIO. - */ -const NUM_PRIO = 5; - -export interface CryptoWorkerFactory { - /** - * Start a new worker. - */ - startWorker(): CryptoWorker; - - /** - * Query the number of workers that should be - * run at the same time. - */ - getConcurrency(): number; -} - -export class BrowserCryptoWorkerFactory implements CryptoWorkerFactory { - startWorker(): CryptoWorker { - const workerCtor = Worker; - const workerPath = "/dist/cryptoWorker-bundle.js"; - return new workerCtor(workerPath) as CryptoWorker; - } - - getConcurrency(): number { - let concurrency = 2; - try { - // only works in the browser - // tslint:disable-next-line:no-string-literal - concurrency = (navigator as any)["hardwareConcurrency"]; - concurrency = Math.max(1, Math.ceil(concurrency / 2)); - } catch (e) { - concurrency = 2; - } - return concurrency; - } -} - -/** - * Crypto API that interfaces manages a background crypto thread - * for the execution of expensive operations. - */ -export class CryptoApi { - private nextRpcId: number = 1; - private workers: WorkerState[]; - private workQueues: WorkItem[][]; - - private workerFactory: CryptoWorkerFactory; - - /** - * Number of busy workers. - */ - private numBusy: number = 0; - - /** - * Did we stop accepting new requests? - */ - private stopped: boolean = false; - - static enableTracing = false; - - /** - * Terminate all worker threads. - */ - terminateWorkers() { - for (let worker of this.workers) { - if (worker.w) { - CryptoApi.enableTracing && console.log("terminating worker"); - worker.w.terminate(); - if (worker.terminationTimerHandle) { - worker.terminationTimerHandle.clear(); - worker.terminationTimerHandle = null; - } - if (worker.currentWorkItem) { - worker.currentWorkItem.reject(Error("explicitly terminated")); - worker.currentWorkItem = null; - } - worker.w = null; - } - } - } - - stop() { - this.terminateWorkers(); - this.stopped = true; - } - - /** - * Start a worker (if not started) and set as busy. - */ - wake(ws: WorkerState, work: WorkItem): void { - if (this.stopped) { - console.log("cryptoApi is stopped"); - CryptoApi.enableTracing && console.log("not waking, as cryptoApi is stopped"); - return; - } - if (ws.currentWorkItem !== null) { - throw Error("assertion failed"); - } - ws.currentWorkItem = work; - this.numBusy++; - if (!ws.w) { - const w = this.workerFactory.startWorker(); - w.onmessage = (m: MessageEvent) => this.handleWorkerMessage(ws, m); - w.onerror = (e: ErrorEvent) => this.handleWorkerError(ws, e); - ws.w = w; - } - - const msg: any = { - args: work.args, - id: work.rpcId, - operation: work.operation, - }; - this.resetWorkerTimeout(ws); - work.startTime = timer.performanceNow(); - setImmediate(() => ws.w!.postMessage(msg)); - } - - resetWorkerTimeout(ws: WorkerState) { - if (ws.terminationTimerHandle !== null) { - ws.terminationTimerHandle.clear(); - ws.terminationTimerHandle = null; - } - const destroy = () => { - // terminate worker if it's idle - if (ws.w && ws.currentWorkItem === null) { - ws.w!.terminate(); - ws.w = null; - } - }; - ws.terminationTimerHandle = timer.after(15 * 1000, destroy); - } - - handleWorkerError(ws: WorkerState, e: ErrorEvent) { - if (ws.currentWorkItem) { - console.error( - `error in worker during ${ws.currentWorkItem!.operation}`, - e, - ); - } else { - console.error("error in worker", e); - } - console.error(e.message); - try { - ws.w!.terminate(); - ws.w = null; - } catch (e) { - console.error(e); - } - if (ws.currentWorkItem !== null) { - ws.currentWorkItem.reject(e); - ws.currentWorkItem = null; - this.numBusy--; - } - this.findWork(ws); - } - - private findWork(ws: WorkerState) { - // try to find more work for this worker - for (let i = 0; i < NUM_PRIO; i++) { - const q = this.workQueues[NUM_PRIO - i - 1]; - if (q.length !== 0) { - const work: WorkItem = q.shift()!; - this.wake(ws, work); - return; - } - } - } - - handleWorkerMessage(ws: WorkerState, msg: MessageEvent) { - const id = msg.data.id; - if (typeof id !== "number") { - console.error("rpc id must be number"); - return; - } - const currentWorkItem = ws.currentWorkItem; - ws.currentWorkItem = null; - this.numBusy--; - this.findWork(ws); - if (!currentWorkItem) { - console.error("unsolicited response from worker"); - return; - } - if (id !== currentWorkItem.rpcId) { - console.error(`RPC with id ${id} has no registry entry`); - return; - } - - CryptoApi.enableTracing && - console.log( - `rpc ${currentWorkItem.operation} took ${timer.performanceNow() - - currentWorkItem.startTime}ms`, - ); - currentWorkItem.resolve(msg.data.result); - } - - constructor(workerFactory: CryptoWorkerFactory) { - this.workerFactory = workerFactory; - this.workers = new Array(workerFactory.getConcurrency()); - - for (let i = 0; i < this.workers.length; i++) { - this.workers[i] = { - currentWorkItem: null, - terminationTimerHandle: null, - w: null, - }; - } - - this.workQueues = []; - for (let i = 0; i < NUM_PRIO; i++) { - this.workQueues.push([]); - } - } - - private doRpc( - operation: string, - priority: number, - ...args: any[] - ): Promise { - const p: Promise = new Promise((resolve, reject) => { - const rpcId = this.nextRpcId++; - const workItem: WorkItem = { - operation, - args, - resolve, - reject, - rpcId, - startTime: 0, - }; - - if (this.numBusy === this.workers.length) { - const q = this.workQueues[priority]; - if (!q) { - throw Error("assertion failed"); - } - this.workQueues[priority].push(workItem); - return; - } - - for (const ws of this.workers) { - if (ws.currentWorkItem !== null) { - continue; - } - this.wake(ws, workItem); - return; - } - - throw Error("assertion failed"); - }); - - return p; - } - - createPlanchet( - req: PlanchetCreationRequest - ): Promise { - return this.doRpc("createPlanchet", 1, req); - } - - createTipPlanchet(denom: DenominationRecord): Promise { - return this.doRpc("createTipPlanchet", 1, denom); - } - - hashString(str: string): Promise { - return this.doRpc("hashString", 1, str); - } - - hashDenomPub(denomPub: string): Promise { - return this.doRpc("hashDenomPub", 1, denomPub); - } - - isValidDenom(denom: DenominationRecord, masterPub: string): Promise { - return this.doRpc("isValidDenom", 2, denom, masterPub); - } - - isValidWireFee( - type: string, - wf: WireFee, - masterPub: string, - ): Promise { - return this.doRpc("isValidWireFee", 2, type, wf, masterPub); - } - - isValidPaymentSignature( - sig: string, - contractHash: string, - merchantPub: string, - ): Promise { - return this.doRpc( - "isValidPaymentSignature", - 1, - sig, - contractHash, - merchantPub, - ); - } - - signDeposit( - contractTerms: ContractTerms, - cds: CoinWithDenom[], - totalAmount: AmountJson, - ): Promise { - return this.doRpc( - "signDeposit", - 3, - contractTerms, - cds, - totalAmount, - ); - } - - createEddsaKeypair(): Promise<{ priv: string; pub: string }> { - return this.doRpc<{ priv: string; pub: string }>("createEddsaKeypair", 1); - } - - rsaUnblind(sig: string, bk: string, pk: string): Promise { - return this.doRpc("rsaUnblind", 4, sig, bk, pk); - } - - createPaybackRequest(coin: CoinRecord): Promise { - return this.doRpc("createPaybackRequest", 1, coin); - } - - createRefreshSession( - exchangeBaseUrl: string, - kappa: number, - meltCoin: CoinRecord, - newCoinDenoms: DenominationRecord[], - meltFee: AmountJson, - ): Promise { - return this.doRpc( - "createRefreshSession", - 4, - exchangeBaseUrl, - kappa, - meltCoin, - newCoinDenoms, - meltFee, - ); - } - - signCoinLink( - oldCoinPriv: string, - newDenomHash: string, - oldCoinPub: string, - transferPub: string, - coinEv: string, - ): Promise { - return this.doRpc( - "signCoinLink", - 4, - oldCoinPriv, - newDenomHash, - oldCoinPub, - transferPub, - coinEv, - ); - } - - benchmark(repetitions: number): Promise { - return this.doRpc("benchmark", 1, repetitions); - } -} diff --git a/src/crypto/cryptoImplementation.ts b/src/crypto/cryptoImplementation.ts deleted file mode 100644 index faebbaa4a..000000000 --- a/src/crypto/cryptoImplementation.ts +++ /dev/null @@ -1,596 +0,0 @@ -/* - This file is part of GNU Taler - (C) 2019 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see - */ - -/** - * Synchronous implementation of crypto-related functions for the wallet. - * - * The functionality is parameterized over an Emscripten environment. - */ - -/** - * Imports. - */ - -import { - CoinRecord, - CoinStatus, - DenominationRecord, - RefreshPlanchetRecord, - RefreshSessionRecord, - ReserveRecord, - TipPlanchet, - WireFee, -} from "../dbTypes"; - -import { CoinPaySig, ContractTerms, PaybackRequest } from "../talerTypes"; -import { - BenchmarkResult, - CoinWithDenom, - PayCoinInfo, - Timestamp, - PlanchetCreationResult, - PlanchetCreationRequest, -} from "../walletTypes"; -import { canonicalJson, getTalerStampSec } from "../util/helpers"; -import { AmountJson } from "../util/amounts"; -import * as Amounts from "../util/amounts"; -import * as timer from "../util/timer"; -import { - getRandomBytes, - encodeCrock, - decodeCrock, - createEddsaKeyPair, - createBlindingKeySecret, - hash, - rsaBlind, - eddsaVerify, - eddsaSign, - rsaUnblind, - stringToBytes, - createHashContext, - createEcdheKeyPair, - keyExchangeEcdheEddsa, - setupRefreshPlanchet, -} from "./talerCrypto"; -import { randomBytes } from "./primitives/nacl-fast"; - -enum SignaturePurpose { - RESERVE_WITHDRAW = 1200, - WALLET_COIN_DEPOSIT = 1201, - MASTER_DENOMINATION_KEY_VALIDITY = 1025, - WALLET_COIN_MELT = 1202, - TEST = 4242, - MERCHANT_PAYMENT_OK = 1104, - MASTER_WIRE_FEES = 1028, - WALLET_COIN_PAYBACK = 1203, - WALLET_COIN_LINK = 1204, -} - -function amountToBuffer(amount: AmountJson): Uint8Array { - const buffer = new ArrayBuffer(8 + 4 + 12); - const dvbuf = new DataView(buffer); - const u8buf = new Uint8Array(buffer); - const te = new TextEncoder(); - const curr = te.encode(amount.currency); - dvbuf.setBigUint64(0, BigInt(amount.value)); - dvbuf.setUint32(8, amount.fraction); - u8buf.set(curr, 8 + 4); - - return u8buf; -} - -function timestampToBuffer(ts: Timestamp): Uint8Array { - const b = new ArrayBuffer(8); - const v = new DataView(b); - const s = BigInt(ts.t_ms) * BigInt(1000); - v.setBigUint64(0, s); - return new Uint8Array(b); -} - -function talerTimestampStringToBuffer(ts: string): Uint8Array { - const t_sec = getTalerStampSec(ts); - if (t_sec === null || t_sec === undefined) { - // Should have been validated before! - throw Error("invalid timestamp"); - } - const buffer = new ArrayBuffer(8); - const dvbuf = new DataView(buffer); - const s = BigInt(t_sec) * BigInt(1000 * 1000); - dvbuf.setBigUint64(0, s); - return new Uint8Array(buffer); -} - -class SignaturePurposeBuilder { - private chunks: Uint8Array[] = []; - - constructor(private purposeNum: number) {} - - put(bytes: Uint8Array): SignaturePurposeBuilder { - this.chunks.push(Uint8Array.from(bytes)); - return this; - } - - build(): Uint8Array { - let payloadLen = 0; - for (let c of this.chunks) { - payloadLen += c.byteLength; - } - const buf = new ArrayBuffer(4 + 4 + payloadLen); - const u8buf = new Uint8Array(buf); - let p = 8; - for (let c of this.chunks) { - u8buf.set(c, p); - p += c.byteLength; - } - const dvbuf = new DataView(buf); - dvbuf.setUint32(0, payloadLen + 4 + 4); - dvbuf.setUint32(4, this.purposeNum); - return u8buf; - } -} - -function buildSigPS(purposeNum: number): SignaturePurposeBuilder { - return new SignaturePurposeBuilder(purposeNum); -} - -export class CryptoImplementation { - static enableTracing: boolean = false; - - constructor() {} - - /** - * Create a pre-coin of the given denomination to be withdrawn from then given - * reserve. - */ - createPlanchet( - req: PlanchetCreationRequest, - ): PlanchetCreationResult { - const reservePub = decodeCrock(req.reservePub); - const reservePriv = decodeCrock(req.reservePriv); - const denomPub = decodeCrock(req.denomPub); - const coinKeyPair = createEddsaKeyPair(); - const blindingFactor = createBlindingKeySecret(); - const coinPubHash = hash(coinKeyPair.eddsaPub); - const ev = rsaBlind(coinPubHash, blindingFactor, denomPub); - const amountWithFee = Amounts.add(req.value, req.feeWithdraw).amount; - const denomPubHash = hash(denomPub); - const evHash = hash(ev); - - const withdrawRequest = buildSigPS(SignaturePurpose.RESERVE_WITHDRAW) - .put(reservePub) - .put(amountToBuffer(amountWithFee)) - .put(amountToBuffer(req.feeWithdraw)) - .put(denomPubHash) - .put(evHash) - .build(); - - const sig = eddsaSign(withdrawRequest, reservePriv); - - const planchet: PlanchetCreationResult = { - blindingKey: encodeCrock(blindingFactor), - coinEv: encodeCrock(ev), - coinPriv: encodeCrock(coinKeyPair.eddsaPriv), - coinPub: encodeCrock(coinKeyPair.eddsaPub), - coinValue: req.value, - denomPub: encodeCrock(denomPub), - denomPubHash: encodeCrock(denomPubHash), - reservePub: encodeCrock(reservePub), - withdrawSig: encodeCrock(sig), - }; - return planchet; - } - - /** - * Create a planchet used for tipping, including the private keys. - */ - createTipPlanchet(denom: DenominationRecord): TipPlanchet { - const denomPub = decodeCrock(denom.denomPub); - const coinKeyPair = createEddsaKeyPair(); - const blindingFactor = createBlindingKeySecret(); - const coinPubHash = hash(coinKeyPair.eddsaPub); - const ev = rsaBlind(coinPubHash, blindingFactor, denomPub); - - const tipPlanchet: TipPlanchet = { - blindingKey: encodeCrock(blindingFactor), - coinEv: encodeCrock(ev), - coinPriv: encodeCrock(coinKeyPair.eddsaPriv), - coinPub: encodeCrock(coinKeyPair.eddsaPub), - coinValue: denom.value, - denomPub: encodeCrock(denomPub), - denomPubHash: encodeCrock(hash(denomPub)), - }; - return tipPlanchet; - } - - /** - * Create and sign a message to request payback for a coin. - */ - createPaybackRequest(coin: CoinRecord): PaybackRequest { - const p = buildSigPS(SignaturePurpose.WALLET_COIN_PAYBACK) - .put(decodeCrock(coin.coinPub)) - .put(decodeCrock(coin.denomPubHash)) - .put(decodeCrock(coin.blindingKey)) - .build(); - - const coinPriv = decodeCrock(coin.coinPriv); - const coinSig = eddsaSign(p, coinPriv); - const paybackRequest: PaybackRequest = { - coin_blind_key_secret: coin.blindingKey, - coin_pub: coin.coinPub, - coin_sig: encodeCrock(coinSig), - denom_pub: coin.denomPub, - denom_sig: coin.denomSig, - }; - return paybackRequest; - } - - /** - * Check if a payment signature is valid. - */ - isValidPaymentSignature( - sig: string, - contractHash: string, - merchantPub: string, - ): boolean { - const p = buildSigPS(SignaturePurpose.MERCHANT_PAYMENT_OK) - .put(decodeCrock(contractHash)) - .build(); - const sigBytes = decodeCrock(sig); - const pubBytes = decodeCrock(merchantPub); - return eddsaVerify(p, sigBytes, pubBytes); - } - - /** - * Check if a wire fee is correctly signed. - */ - isValidWireFee(type: string, wf: WireFee, masterPub: string): boolean { - const p = buildSigPS(SignaturePurpose.MASTER_WIRE_FEES) - .put(hash(stringToBytes(type + "\0"))) - .put(timestampToBuffer(wf.startStamp)) - .put(timestampToBuffer(wf.endStamp)) - .put(amountToBuffer(wf.wireFee)) - .build(); - const sig = decodeCrock(wf.sig); - const pub = decodeCrock(masterPub); - return eddsaVerify(p, sig, pub); - } - - /** - * Check if the signature of a denomination is valid. - */ - isValidDenom(denom: DenominationRecord, masterPub: string): boolean { - const p = buildSigPS(SignaturePurpose.MASTER_DENOMINATION_KEY_VALIDITY) - .put(decodeCrock(masterPub)) - .put(timestampToBuffer(denom.stampStart)) - .put(timestampToBuffer(denom.stampExpireWithdraw)) - .put(timestampToBuffer(denom.stampExpireDeposit)) - .put(timestampToBuffer(denom.stampExpireLegal)) - .put(amountToBuffer(denom.value)) - .put(amountToBuffer(denom.feeWithdraw)) - .put(amountToBuffer(denom.feeDeposit)) - .put(amountToBuffer(denom.feeRefresh)) - .put(amountToBuffer(denom.feeRefund)) - .put(decodeCrock(denom.denomPubHash)) - .build(); - const sig = decodeCrock(denom.masterSig); - const pub = decodeCrock(masterPub); - return eddsaVerify(p, sig, pub); - } - - /** - * Create a new EdDSA key pair. - */ - createEddsaKeypair(): { priv: string; pub: string } { - const pair = createEddsaKeyPair(); - return { - priv: encodeCrock(pair.eddsaPriv), - pub: encodeCrock(pair.eddsaPub), - }; - } - - /** - * Unblind a blindly signed value. - */ - rsaUnblind(sig: string, bk: string, pk: string): string { - const denomSig = rsaUnblind( - decodeCrock(sig), - decodeCrock(pk), - decodeCrock(bk), - ); - return encodeCrock(denomSig); - } - - /** - * Generate updated coins (to store in the database) - * and deposit permissions for each given coin. - */ - signDeposit( - contractTerms: ContractTerms, - cds: CoinWithDenom[], - totalAmount: AmountJson, - ): PayCoinInfo { - const ret: PayCoinInfo = { - originalCoins: [], - sigs: [], - updatedCoins: [], - }; - - const contractTermsHash = this.hashString(canonicalJson(contractTerms)); - - const feeList: AmountJson[] = cds.map(x => x.denom.feeDeposit); - let fees = Amounts.add(Amounts.getZero(feeList[0].currency), ...feeList) - .amount; - // okay if saturates - fees = Amounts.sub(fees, Amounts.parseOrThrow(contractTerms.max_fee)) - .amount; - const total = Amounts.add(fees, totalAmount).amount; - - let amountSpent = Amounts.getZero(cds[0].coin.currentAmount.currency); - let amountRemaining = total; - - for (const cd of cds) { - const originalCoin = { ...cd.coin }; - - if (amountRemaining.value === 0 && amountRemaining.fraction === 0) { - break; - } - - let coinSpend: AmountJson; - if (Amounts.cmp(amountRemaining, cd.coin.currentAmount) < 0) { - coinSpend = amountRemaining; - } else { - coinSpend = cd.coin.currentAmount; - } - - amountSpent = Amounts.add(amountSpent, coinSpend).amount; - - const feeDeposit = cd.denom.feeDeposit; - - // Give the merchant at least the deposit fee, otherwise it'll reject - // the coin. - - if (Amounts.cmp(coinSpend, feeDeposit) < 0) { - coinSpend = feeDeposit; - } - - const newAmount = Amounts.sub(cd.coin.currentAmount, coinSpend).amount; - cd.coin.currentAmount = newAmount; - cd.coin.status = CoinStatus.Dirty; - - const d = buildSigPS(SignaturePurpose.WALLET_COIN_DEPOSIT) - .put(decodeCrock(contractTermsHash)) - .put(decodeCrock(contractTerms.H_wire)) - .put(talerTimestampStringToBuffer(contractTerms.timestamp)) - .put(talerTimestampStringToBuffer(contractTerms.refund_deadline)) - .put(amountToBuffer(coinSpend)) - .put(amountToBuffer(cd.denom.feeDeposit)) - .put(decodeCrock(contractTerms.merchant_pub)) - .put(decodeCrock(cd.coin.coinPub)) - .build(); - const coinSig = eddsaSign(d, decodeCrock(cd.coin.coinPriv)); - - const s: CoinPaySig = { - coin_pub: cd.coin.coinPub, - coin_sig: encodeCrock(coinSig), - contribution: Amounts.toString(coinSpend), - denom_pub: cd.coin.denomPub, - exchange_url: cd.denom.exchangeBaseUrl, - ub_sig: cd.coin.denomSig, - }; - ret.sigs.push(s); - ret.updatedCoins.push(cd.coin); - ret.originalCoins.push(originalCoin); - } - return ret; - } - - /** - * Create a new refresh session. - */ - createRefreshSession( - exchangeBaseUrl: string, - kappa: number, - meltCoin: CoinRecord, - newCoinDenoms: DenominationRecord[], - meltFee: AmountJson, - ): RefreshSessionRecord { - let valueWithFee = Amounts.getZero(newCoinDenoms[0].value.currency); - - for (const ncd of newCoinDenoms) { - valueWithFee = Amounts.add(valueWithFee, ncd.value, ncd.feeWithdraw) - .amount; - } - - // melt fee - valueWithFee = Amounts.add(valueWithFee, meltFee).amount; - - const sessionHc = createHashContext(); - - const transferPubs: string[] = []; - const transferPrivs: string[] = []; - - const planchetsForGammas: RefreshPlanchetRecord[][] = []; - - for (let i = 0; i < kappa; i++) { - const transferKeyPair = createEcdheKeyPair(); - sessionHc.update(transferKeyPair.ecdhePub); - transferPrivs.push(encodeCrock(transferKeyPair.ecdhePriv)); - transferPubs.push(encodeCrock(transferKeyPair.ecdhePub)); - } - - for (const denom of newCoinDenoms) { - const r = decodeCrock(denom.denomPub); - sessionHc.update(r); - } - - sessionHc.update(decodeCrock(meltCoin.coinPub)); - sessionHc.update(amountToBuffer(valueWithFee)); - - for (let i = 0; i < kappa; i++) { - const planchets: RefreshPlanchetRecord[] = []; - for (let j = 0; j < newCoinDenoms.length; j++) { - const transferPriv = decodeCrock(transferPrivs[i]); - const oldCoinPub = decodeCrock(meltCoin.coinPub); - const transferSecret = keyExchangeEcdheEddsa(transferPriv, oldCoinPub); - - const fresh = setupRefreshPlanchet(transferSecret, j); - - const coinPriv = fresh.coinPriv; - const coinPub = fresh.coinPub; - const blindingFactor = fresh.bks; - const pubHash = hash(coinPub); - const denomPub = decodeCrock(newCoinDenoms[j].denomPub); - const ev = rsaBlind(pubHash, blindingFactor, denomPub); - const planchet: RefreshPlanchetRecord = { - blindingKey: encodeCrock(blindingFactor), - coinEv: encodeCrock(ev), - privateKey: encodeCrock(coinPriv), - publicKey: encodeCrock(coinPub), - }; - planchets.push(planchet); - sessionHc.update(ev); - } - planchetsForGammas.push(planchets); - } - - const sessionHash = sessionHc.finish(); - - const confirmData = buildSigPS(SignaturePurpose.WALLET_COIN_MELT) - .put(sessionHash) - .put(amountToBuffer(valueWithFee)) - .put(amountToBuffer(meltFee)) - .put(decodeCrock(meltCoin.coinPub)) - .build(); - - const confirmSig = eddsaSign(confirmData, decodeCrock(meltCoin.coinPriv)); - - let valueOutput = Amounts.getZero(newCoinDenoms[0].value.currency); - for (const denom of newCoinDenoms) { - valueOutput = Amounts.add(valueOutput, denom.value).amount; - } - - const refreshSessionId = encodeCrock(getRandomBytes(32)); - - const refreshSession: RefreshSessionRecord = { - refreshSessionId, - confirmSig: encodeCrock(confirmSig), - exchangeBaseUrl, - finished: false, - hash: encodeCrock(sessionHash), - meltCoinPub: meltCoin.coinPub, - newDenomHashes: newCoinDenoms.map(d => d.denomPubHash), - newDenoms: newCoinDenoms.map(d => d.denomPub), - norevealIndex: undefined, - planchetsForGammas: planchetsForGammas, - transferPrivs, - transferPubs, - valueOutput, - valueWithFee, - }; - - return refreshSession; - } - - /** - * Hash a string including the zero terminator. - */ - hashString(str: string): string { - const ts = new TextEncoder(); - const b = ts.encode(str + "\0"); - return encodeCrock(hash(b)); - } - - /** - * Hash a denomination public key. - */ - hashDenomPub(denomPub: string): string { - return encodeCrock(hash(decodeCrock(denomPub))); - } - - signCoinLink( - oldCoinPriv: string, - newDenomHash: string, - oldCoinPub: string, - transferPub: string, - coinEv: string, - ): string { - const coinEvHash = hash(decodeCrock(coinEv)); - const coinLink = buildSigPS(SignaturePurpose.WALLET_COIN_LINK) - .put(decodeCrock(newDenomHash)) - .put(decodeCrock(oldCoinPub)) - .put(decodeCrock(transferPub)) - .put(coinEvHash) - .build(); - const coinPriv = decodeCrock(oldCoinPriv); - const sig = eddsaSign(coinLink, coinPriv); - return encodeCrock(sig); - } - - benchmark(repetitions: number): BenchmarkResult { - let time_hash = 0; - for (let i = 0; i < repetitions; i++) { - const start = timer.performanceNow(); - this.hashString("hello world"); - time_hash += timer.performanceNow() - start; - } - - let time_hash_big = 0; - for (let i = 0; i < repetitions; i++) { - const ba = randomBytes(4096); - const start = timer.performanceNow(); - hash(ba); - time_hash_big += timer.performanceNow() - start; - } - - let time_eddsa_create = 0; - for (let i = 0; i < repetitions; i++) { - const start = timer.performanceNow(); - const pair = createEddsaKeyPair(); - time_eddsa_create += timer.performanceNow() - start; - } - - let time_eddsa_sign = 0; - const p = randomBytes(4096); - - const pair = createEddsaKeyPair(); - - for (let i = 0; i < repetitions; i++) { - const start = timer.performanceNow(); - eddsaSign(p, pair.eddsaPriv); - time_eddsa_sign += timer.performanceNow() - start; - } - - const sig = eddsaSign(p, pair.eddsaPriv); - - let time_eddsa_verify = 0; - for (let i = 0; i < repetitions; i++) { - const start = timer.performanceNow(); - eddsaVerify(p, sig, pair.eddsaPub); - time_eddsa_verify += timer.performanceNow() - start; - } - - return { - repetitions, - time: { - hash_small: time_hash, - hash_big: time_hash_big, - eddsa_create: time_eddsa_create, - eddsa_sign: time_eddsa_sign, - eddsa_verify: time_eddsa_verify, - }, - }; - } -} diff --git a/src/crypto/cryptoWorker.ts b/src/crypto/cryptoWorker.ts deleted file mode 100644 index 0ea641dde..000000000 --- a/src/crypto/cryptoWorker.ts +++ /dev/null @@ -1,8 +0,0 @@ -export interface CryptoWorker { - postMessage(message: any): void; - - terminate(): void; - - onmessage: (m: any) => void; - onerror: (m: any) => void; -} \ No newline at end of file diff --git a/src/crypto/nodeProcessWorker.ts b/src/crypto/nodeProcessWorker.ts deleted file mode 100644 index 8ff149788..000000000 --- a/src/crypto/nodeProcessWorker.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { CryptoWorkerFactory } from "./cryptoApi"; - -/* - This file is part of TALER - (C) 2016 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see - */ - - -// tslint:disable:no-var-requires - -import { CryptoWorker } from "./cryptoWorker"; - -import path = require("path"); -import child_process = require("child_process"); - -const nodeWorkerEntry = path.join(__dirname, "nodeWorkerEntry.js"); - - -export class NodeCryptoWorkerFactory implements CryptoWorkerFactory { - startWorker(): CryptoWorker { - if (typeof require === "undefined") { - throw Error("cannot make worker, require(...) not defined"); - } - const workerCtor = require("./nodeProcessWorker").Worker; - const workerPath = __dirname + "/cryptoWorker.js"; - return new workerCtor(workerPath); - } - - getConcurrency(): number { - return 4; - } -} - -/** - * Worker implementation that uses node subprocesses. - */ -export class Worker { - private child: any; - - /** - * Function to be called when we receive a message from the worker thread. - */ - onmessage: undefined | ((m: any) => void); - - /** - * Function to be called when we receive an error from the worker thread. - */ - onerror: undefined | ((m: any) => void); - - private dispatchMessage(msg: any) { - if (this.onmessage) { - this.onmessage({ data: msg }); - } else { - console.warn("no handler for worker event 'message' defined") - } - } - - private dispatchError(msg: any) { - if (this.onerror) { - this.onerror({ data: msg }); - } else { - console.warn("no handler for worker event 'error' defined") - } - } - - constructor() { - this.child = child_process.fork(nodeWorkerEntry); - this.onerror = undefined; - this.onmessage = undefined; - - this.child.on("error", (e: any) => { - this.dispatchError(e); - }); - - this.child.on("message", (msg: any) => { - this.dispatchMessage(msg); - }); - } - - /** - * Add an event listener for either an "error" or "message" event. - */ - addEventListener(event: "message" | "error", fn: (x: any) => void): void { - switch (event) { - case "message": - this.onmessage = fn; - break; - case "error": - this.onerror = fn; - break; - } - } - - /** - * Send a message to the worker thread. - */ - postMessage (msg: any) { - this.child.send(JSON.stringify({data: msg})); - } - - /** - * Forcibly terminate the worker thread. - */ - terminate () { - this.child.kill("SIGINT"); - } -} diff --git a/src/crypto/nodeWorkerEntry.ts b/src/crypto/nodeWorkerEntry.ts deleted file mode 100644 index 1e088d987..000000000 --- a/src/crypto/nodeWorkerEntry.ts +++ /dev/null @@ -1,69 +0,0 @@ -/* - This file is part of TALER - (C) 2016 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see - */ - -// tslint:disable:no-var-requires - -import { CryptoImplementation } from "./cryptoImplementation"; - -async function handleRequest(operation: string, id: number, args: string[]) { - - const impl = new CryptoImplementation(); - - if (!(operation in impl)) { - console.error(`crypto operation '${operation}' not found`); - return; - } - - try { - const result = (impl as any)[operation](...args); - if (process.send) { - process.send({ result, id }); - } else { - console.error("process.send not available"); - } - } catch (e) { - console.error("error during operation", e); - return; - } -} - -process.on("message", (msgStr: any) => { - const msg = JSON.parse(msgStr); - - const args = msg.data.args; - if (!Array.isArray(args)) { - console.error("args must be array"); - return; - } - const id = msg.data.id; - if (typeof id !== "number") { - console.error("RPC id must be number"); - return; - } - const operation = msg.data.operation; - if (typeof operation !== "string") { - console.error("RPC operation must be string"); - return; - } - - handleRequest(operation, id, args).catch((e) => { - console.error("error in node worker", e); - }); -}); - -process.on("uncaughtException", (err: any) => { - console.error("uncaught exception in node worker entry", err); -}); diff --git a/src/crypto/synchronousWorker.ts b/src/crypto/synchronousWorker.ts deleted file mode 100644 index 12eecde9a..000000000 --- a/src/crypto/synchronousWorker.ts +++ /dev/null @@ -1,135 +0,0 @@ -/* - This file is part of GNU Taler - (C) 2019 GNUnet e.V. - - GNU Taler is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - GNU Taler; see the file COPYING. If not, see - */ - -import { CryptoImplementation } from "./cryptoImplementation"; - -import { CryptoWorkerFactory } from "./cryptoApi"; -import { CryptoWorker } from "./cryptoWorker"; - -/** - * The synchronous crypto worker produced by this factory doesn't run in the - * background, but actually blocks the caller until the operation is done. - */ -export class SynchronousCryptoWorkerFactory implements CryptoWorkerFactory { - startWorker(): CryptoWorker { - if (typeof require === "undefined") { - throw Error("cannot make worker, require(...) not defined"); - } - const workerCtor = require("./synchronousWorker").SynchronousCryptoWorker; - return new workerCtor(); - } - - getConcurrency(): number { - return 1; - } -} - - -/** - * Worker implementation that uses node subprocesses. - */ -export class SynchronousCryptoWorker { - - /** - * Function to be called when we receive a message from the worker thread. - */ - onmessage: undefined | ((m: any) => void); - - /** - * Function to be called when we receive an error from the worker thread. - */ - onerror: undefined | ((m: any) => void); - - constructor() { - this.onerror = undefined; - this.onmessage = undefined; - } - - /** - * Add an event listener for either an "error" or "message" event. - */ - addEventListener(event: "message" | "error", fn: (x: any) => void): void { - switch (event) { - case "message": - this.onmessage = fn; - break; - case "error": - this.onerror = fn; - break; - } - } - - private dispatchMessage(msg: any) { - if (this.onmessage) { - this.onmessage({ data: msg }); - } - } - - private async handleRequest(operation: string, id: number, args: string[]) { - const impl = new CryptoImplementation(); - - if (!(operation in impl)) { - console.error(`crypto operation '${operation}' not found`); - return; - } - - let result: any; - try { - result = (impl as any)[operation](...args); - } catch (e) { - console.log("error during operation", e); - return; - } - - try { - setImmediate(() => this.dispatchMessage({ result, id })); - } catch (e) { - console.log("got error during dispatch", e); - } - } - - /** - * Send a message to the worker thread. - */ - postMessage(msg: any) { - const args = msg.args; - if (!Array.isArray(args)) { - console.error("args must be array"); - return; - } - const id = msg.id; - if (typeof id !== "number") { - console.error("RPC id must be number"); - return; - } - const operation = msg.operation; - if (typeof operation !== "string") { - console.error("RPC operation must be string"); - return; - } - - this.handleRequest(operation, id, args).catch(e => { - console.error("Error while handling crypto request:", e); - }); - } - - /** - * Forcibly terminate the worker thread. - */ - terminate() { - // This is a no-op. - } -} diff --git a/src/crypto/workers/browserWorkerEntry.ts b/src/crypto/workers/browserWorkerEntry.ts new file mode 100644 index 000000000..5ac762c13 --- /dev/null +++ b/src/crypto/workers/browserWorkerEntry.ts @@ -0,0 +1,70 @@ +/* + This file is part of TALER + (C) 2016 GNUnet e.V. + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see +*/ + +/** + * Web worker for crypto operations. + */ + +/** + * Imports. + */ + +import { CryptoImplementation } from "./cryptoImplementation"; + +const worker: Worker = (self as any) as Worker; + +async function handleRequest(operation: string, id: number, args: string[]) { + const impl = new CryptoImplementation(); + + if (!(operation in impl)) { + console.error(`crypto operation '${operation}' not found`); + return; + } + + try { + const result = (impl as any)[operation](...args); + worker.postMessage({ result, id }); + } catch (e) { + console.log("error during operation", e); + return; + } +} + +worker.onmessage = (msg: MessageEvent) => { + const args = msg.data.args; + if (!Array.isArray(args)) { + console.error("args must be array"); + return; + } + const id = msg.data.id; + if (typeof id !== "number") { + console.error("RPC id must be number"); + return; + } + const operation = msg.data.operation; + if (typeof operation !== "string") { + console.error("RPC operation must be string"); + return; + } + + if (CryptoImplementation.enableTracing) { + console.log("onmessage with", operation); + } + + handleRequest(operation, id, args).catch((e) => { + console.error("error in browsere worker", e); + }); +}; diff --git a/src/crypto/workers/cryptoApi.ts b/src/crypto/workers/cryptoApi.ts new file mode 100644 index 000000000..5537bb39f --- /dev/null +++ b/src/crypto/workers/cryptoApi.ts @@ -0,0 +1,455 @@ +/* + This file is part of TALER + (C) 2016 GNUnet e.V. + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ + +/** + * API to access the Taler crypto worker thread. + * @author Florian Dold + */ + +/** + * Imports. + */ +import { AmountJson } from "../../util/amounts"; + +import { + CoinRecord, + DenominationRecord, + RefreshSessionRecord, + TipPlanchet, + WireFee, +} from "../../dbTypes"; + +import { CryptoWorker } from "./cryptoWorker"; + +import { ContractTerms, PaybackRequest } from "../../talerTypes"; + +import { + BenchmarkResult, + CoinWithDenom, + PayCoinInfo, + PlanchetCreationResult, + PlanchetCreationRequest, +} from "../../walletTypes"; + +import * as timer from "../../util/timer"; + +/** + * State of a crypto worker. + */ +interface WorkerState { + /** + * The actual worker thread. + */ + w: CryptoWorker | null; + + /** + * Work we're currently executing or null if not busy. + */ + currentWorkItem: WorkItem | null; + + /** + * Timer to terminate the worker if it's not busy enough. + */ + terminationTimerHandle: timer.TimerHandle | null; +} + +interface WorkItem { + operation: string; + args: any[]; + resolve: any; + reject: any; + + /** + * Serial id to identify a matching response. + */ + rpcId: number; + + /** + * Time when the work was submitted to a (non-busy) worker thread. + */ + startTime: number; +} + +/** + * Number of different priorities. Each priority p + * must be 0 <= p < NUM_PRIO. + */ +const NUM_PRIO = 5; + +export interface CryptoWorkerFactory { + /** + * Start a new worker. + */ + startWorker(): CryptoWorker; + + /** + * Query the number of workers that should be + * run at the same time. + */ + getConcurrency(): number; +} + +export class BrowserCryptoWorkerFactory implements CryptoWorkerFactory { + startWorker(): CryptoWorker { + const workerCtor = Worker; + const workerPath = "/dist/cryptoWorker-bundle.js"; + return new workerCtor(workerPath) as CryptoWorker; + } + + getConcurrency(): number { + let concurrency = 2; + try { + // only works in the browser + // tslint:disable-next-line:no-string-literal + concurrency = (navigator as any)["hardwareConcurrency"]; + concurrency = Math.max(1, Math.ceil(concurrency / 2)); + } catch (e) { + concurrency = 2; + } + return concurrency; + } +} + +/** + * Crypto API that interfaces manages a background crypto thread + * for the execution of expensive operations. + */ +export class CryptoApi { + private nextRpcId: number = 1; + private workers: WorkerState[]; + private workQueues: WorkItem[][]; + + private workerFactory: CryptoWorkerFactory; + + /** + * Number of busy workers. + */ + private numBusy: number = 0; + + /** + * Did we stop accepting new requests? + */ + private stopped: boolean = false; + + static enableTracing = false; + + /** + * Terminate all worker threads. + */ + terminateWorkers() { + for (let worker of this.workers) { + if (worker.w) { + CryptoApi.enableTracing && console.log("terminating worker"); + worker.w.terminate(); + if (worker.terminationTimerHandle) { + worker.terminationTimerHandle.clear(); + worker.terminationTimerHandle = null; + } + if (worker.currentWorkItem) { + worker.currentWorkItem.reject(Error("explicitly terminated")); + worker.currentWorkItem = null; + } + worker.w = null; + } + } + } + + stop() { + this.terminateWorkers(); + this.stopped = true; + } + + /** + * Start a worker (if not started) and set as busy. + */ + wake(ws: WorkerState, work: WorkItem): void { + if (this.stopped) { + console.log("cryptoApi is stopped"); + CryptoApi.enableTracing && + console.log("not waking, as cryptoApi is stopped"); + return; + } + if (ws.currentWorkItem !== null) { + throw Error("assertion failed"); + } + ws.currentWorkItem = work; + this.numBusy++; + if (!ws.w) { + const w = this.workerFactory.startWorker(); + w.onmessage = (m: MessageEvent) => this.handleWorkerMessage(ws, m); + w.onerror = (e: ErrorEvent) => this.handleWorkerError(ws, e); + ws.w = w; + } + + const msg: any = { + args: work.args, + id: work.rpcId, + operation: work.operation, + }; + this.resetWorkerTimeout(ws); + work.startTime = timer.performanceNow(); + setImmediate(() => ws.w!.postMessage(msg)); + } + + resetWorkerTimeout(ws: WorkerState) { + if (ws.terminationTimerHandle !== null) { + ws.terminationTimerHandle.clear(); + ws.terminationTimerHandle = null; + } + const destroy = () => { + // terminate worker if it's idle + if (ws.w && ws.currentWorkItem === null) { + ws.w!.terminate(); + ws.w = null; + } + }; + ws.terminationTimerHandle = timer.after(15 * 1000, destroy); + } + + handleWorkerError(ws: WorkerState, e: ErrorEvent) { + if (ws.currentWorkItem) { + console.error( + `error in worker during ${ws.currentWorkItem!.operation}`, + e, + ); + } else { + console.error("error in worker", e); + } + console.error(e.message); + try { + ws.w!.terminate(); + ws.w = null; + } catch (e) { + console.error(e); + } + if (ws.currentWorkItem !== null) { + ws.currentWorkItem.reject(e); + ws.currentWorkItem = null; + this.numBusy--; + } + this.findWork(ws); + } + + private findWork(ws: WorkerState) { + // try to find more work for this worker + for (let i = 0; i < NUM_PRIO; i++) { + const q = this.workQueues[NUM_PRIO - i - 1]; + if (q.length !== 0) { + const work: WorkItem = q.shift()!; + this.wake(ws, work); + return; + } + } + } + + handleWorkerMessage(ws: WorkerState, msg: MessageEvent) { + const id = msg.data.id; + if (typeof id !== "number") { + console.error("rpc id must be number"); + return; + } + const currentWorkItem = ws.currentWorkItem; + ws.currentWorkItem = null; + this.numBusy--; + this.findWork(ws); + if (!currentWorkItem) { + console.error("unsolicited response from worker"); + return; + } + if (id !== currentWorkItem.rpcId) { + console.error(`RPC with id ${id} has no registry entry`); + return; + } + + CryptoApi.enableTracing && + console.log( + `rpc ${currentWorkItem.operation} took ${timer.performanceNow() - + currentWorkItem.startTime}ms`, + ); + currentWorkItem.resolve(msg.data.result); + } + + constructor(workerFactory: CryptoWorkerFactory) { + this.workerFactory = workerFactory; + this.workers = new Array(workerFactory.getConcurrency()); + + for (let i = 0; i < this.workers.length; i++) { + this.workers[i] = { + currentWorkItem: null, + terminationTimerHandle: null, + w: null, + }; + } + + this.workQueues = []; + for (let i = 0; i < NUM_PRIO; i++) { + this.workQueues.push([]); + } + } + + private doRpc( + operation: string, + priority: number, + ...args: any[] + ): Promise { + const p: Promise = new Promise((resolve, reject) => { + const rpcId = this.nextRpcId++; + const workItem: WorkItem = { + operation, + args, + resolve, + reject, + rpcId, + startTime: 0, + }; + + if (this.numBusy === this.workers.length) { + const q = this.workQueues[priority]; + if (!q) { + throw Error("assertion failed"); + } + this.workQueues[priority].push(workItem); + return; + } + + for (const ws of this.workers) { + if (ws.currentWorkItem !== null) { + continue; + } + this.wake(ws, workItem); + return; + } + + throw Error("assertion failed"); + }); + + return p; + } + + createPlanchet( + req: PlanchetCreationRequest, + ): Promise { + return this.doRpc("createPlanchet", 1, req); + } + + createTipPlanchet(denom: DenominationRecord): Promise { + return this.doRpc("createTipPlanchet", 1, denom); + } + + hashString(str: string): Promise { + return this.doRpc("hashString", 1, str); + } + + hashDenomPub(denomPub: string): Promise { + return this.doRpc("hashDenomPub", 1, denomPub); + } + + isValidDenom(denom: DenominationRecord, masterPub: string): Promise { + return this.doRpc("isValidDenom", 2, denom, masterPub); + } + + isValidWireFee( + type: string, + wf: WireFee, + masterPub: string, + ): Promise { + return this.doRpc("isValidWireFee", 2, type, wf, masterPub); + } + + isValidPaymentSignature( + sig: string, + contractHash: string, + merchantPub: string, + ): Promise { + return this.doRpc( + "isValidPaymentSignature", + 1, + sig, + contractHash, + merchantPub, + ); + } + + signDeposit( + contractTerms: ContractTerms, + cds: CoinWithDenom[], + totalAmount: AmountJson, + ): Promise { + return this.doRpc( + "signDeposit", + 3, + contractTerms, + cds, + totalAmount, + ); + } + + createEddsaKeypair(): Promise<{ priv: string; pub: string }> { + return this.doRpc<{ priv: string; pub: string }>("createEddsaKeypair", 1); + } + + rsaUnblind(sig: string, bk: string, pk: string): Promise { + return this.doRpc("rsaUnblind", 4, sig, bk, pk); + } + + rsaVerify(hm: string, sig: string, pk: string): Promise { + return this.doRpc("rsaVerify", 4, hm, sig, pk); + } + + createPaybackRequest(coin: CoinRecord): Promise { + return this.doRpc("createPaybackRequest", 1, coin); + } + + createRefreshSession( + exchangeBaseUrl: string, + kappa: number, + meltCoin: CoinRecord, + newCoinDenoms: DenominationRecord[], + meltFee: AmountJson, + ): Promise { + return this.doRpc( + "createRefreshSession", + 4, + exchangeBaseUrl, + kappa, + meltCoin, + newCoinDenoms, + meltFee, + ); + } + + signCoinLink( + oldCoinPriv: string, + newDenomHash: string, + oldCoinPub: string, + transferPub: string, + coinEv: string, + ): Promise { + return this.doRpc( + "signCoinLink", + 4, + oldCoinPriv, + newDenomHash, + oldCoinPub, + transferPub, + coinEv, + ); + } + + benchmark(repetitions: number): Promise { + return this.doRpc("benchmark", 1, repetitions); + } +} diff --git a/src/crypto/workers/cryptoImplementation.ts b/src/crypto/workers/cryptoImplementation.ts new file mode 100644 index 000000000..00d81ce27 --- /dev/null +++ b/src/crypto/workers/cryptoImplementation.ts @@ -0,0 +1,608 @@ +/* + This file is part of GNU Taler + (C) 2019 GNUnet e.V. + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ + +/** + * Synchronous implementation of crypto-related functions for the wallet. + * + * The functionality is parameterized over an Emscripten environment. + */ + +/** + * Imports. + */ + +import { + CoinRecord, + CoinStatus, + DenominationRecord, + RefreshPlanchetRecord, + RefreshSessionRecord, + TipPlanchet, + WireFee, + initRetryInfo, +} from "../../dbTypes"; + +import { CoinPaySig, ContractTerms, PaybackRequest } from "../../talerTypes"; +import { + BenchmarkResult, + CoinWithDenom, + PayCoinInfo, + Timestamp, + PlanchetCreationResult, + PlanchetCreationRequest, + getTimestampNow, +} from "../../walletTypes"; +import { canonicalJson, getTalerStampSec } from "../../util/helpers"; +import { AmountJson } from "../../util/amounts"; +import * as Amounts from "../../util/amounts"; +import * as timer from "../../util/timer"; +import { + getRandomBytes, + encodeCrock, + decodeCrock, + createEddsaKeyPair, + createBlindingKeySecret, + hash, + rsaBlind, + eddsaVerify, + eddsaSign, + rsaUnblind, + stringToBytes, + createHashContext, + createEcdheKeyPair, + keyExchangeEcdheEddsa, + setupRefreshPlanchet, + rsaVerify, +} from "../talerCrypto"; +import { randomBytes } from "../primitives/nacl-fast"; + +enum SignaturePurpose { + RESERVE_WITHDRAW = 1200, + WALLET_COIN_DEPOSIT = 1201, + MASTER_DENOMINATION_KEY_VALIDITY = 1025, + WALLET_COIN_MELT = 1202, + TEST = 4242, + MERCHANT_PAYMENT_OK = 1104, + MASTER_WIRE_FEES = 1028, + WALLET_COIN_PAYBACK = 1203, + WALLET_COIN_LINK = 1204, +} + +function amountToBuffer(amount: AmountJson): Uint8Array { + const buffer = new ArrayBuffer(8 + 4 + 12); + const dvbuf = new DataView(buffer); + const u8buf = new Uint8Array(buffer); + const te = new TextEncoder(); + const curr = te.encode(amount.currency); + dvbuf.setBigUint64(0, BigInt(amount.value)); + dvbuf.setUint32(8, amount.fraction); + u8buf.set(curr, 8 + 4); + + return u8buf; +} + +function timestampToBuffer(ts: Timestamp): Uint8Array { + const b = new ArrayBuffer(8); + const v = new DataView(b); + const s = BigInt(ts.t_ms) * BigInt(1000); + v.setBigUint64(0, s); + return new Uint8Array(b); +} + +function talerTimestampStringToBuffer(ts: string): Uint8Array { + const t_sec = getTalerStampSec(ts); + if (t_sec === null || t_sec === undefined) { + // Should have been validated before! + throw Error("invalid timestamp"); + } + const buffer = new ArrayBuffer(8); + const dvbuf = new DataView(buffer); + const s = BigInt(t_sec) * BigInt(1000 * 1000); + dvbuf.setBigUint64(0, s); + return new Uint8Array(buffer); +} + +class SignaturePurposeBuilder { + private chunks: Uint8Array[] = []; + + constructor(private purposeNum: number) {} + + put(bytes: Uint8Array): SignaturePurposeBuilder { + this.chunks.push(Uint8Array.from(bytes)); + return this; + } + + build(): Uint8Array { + let payloadLen = 0; + for (let c of this.chunks) { + payloadLen += c.byteLength; + } + const buf = new ArrayBuffer(4 + 4 + payloadLen); + const u8buf = new Uint8Array(buf); + let p = 8; + for (let c of this.chunks) { + u8buf.set(c, p); + p += c.byteLength; + } + const dvbuf = new DataView(buf); + dvbuf.setUint32(0, payloadLen + 4 + 4); + dvbuf.setUint32(4, this.purposeNum); + return u8buf; + } +} + +function buildSigPS(purposeNum: number): SignaturePurposeBuilder { + return new SignaturePurposeBuilder(purposeNum); +} + +export class CryptoImplementation { + static enableTracing: boolean = false; + + constructor() {} + + /** + * Create a pre-coin of the given denomination to be withdrawn from then given + * reserve. + */ + createPlanchet( + req: PlanchetCreationRequest, + ): PlanchetCreationResult { + const reservePub = decodeCrock(req.reservePub); + const reservePriv = decodeCrock(req.reservePriv); + const denomPub = decodeCrock(req.denomPub); + const coinKeyPair = createEddsaKeyPair(); + const blindingFactor = createBlindingKeySecret(); + const coinPubHash = hash(coinKeyPair.eddsaPub); + const ev = rsaBlind(coinPubHash, blindingFactor, denomPub); + const amountWithFee = Amounts.add(req.value, req.feeWithdraw).amount; + const denomPubHash = hash(denomPub); + const evHash = hash(ev); + + const withdrawRequest = buildSigPS(SignaturePurpose.RESERVE_WITHDRAW) + .put(reservePub) + .put(amountToBuffer(amountWithFee)) + .put(amountToBuffer(req.feeWithdraw)) + .put(denomPubHash) + .put(evHash) + .build(); + + const sig = eddsaSign(withdrawRequest, reservePriv); + + const planchet: PlanchetCreationResult = { + blindingKey: encodeCrock(blindingFactor), + coinEv: encodeCrock(ev), + coinPriv: encodeCrock(coinKeyPair.eddsaPriv), + coinPub: encodeCrock(coinKeyPair.eddsaPub), + coinValue: req.value, + denomPub: encodeCrock(denomPub), + denomPubHash: encodeCrock(denomPubHash), + reservePub: encodeCrock(reservePub), + withdrawSig: encodeCrock(sig), + }; + return planchet; + } + + /** + * Create a planchet used for tipping, including the private keys. + */ + createTipPlanchet(denom: DenominationRecord): TipPlanchet { + const denomPub = decodeCrock(denom.denomPub); + const coinKeyPair = createEddsaKeyPair(); + const blindingFactor = createBlindingKeySecret(); + const coinPubHash = hash(coinKeyPair.eddsaPub); + const ev = rsaBlind(coinPubHash, blindingFactor, denomPub); + + const tipPlanchet: TipPlanchet = { + blindingKey: encodeCrock(blindingFactor), + coinEv: encodeCrock(ev), + coinPriv: encodeCrock(coinKeyPair.eddsaPriv), + coinPub: encodeCrock(coinKeyPair.eddsaPub), + coinValue: denom.value, + denomPub: encodeCrock(denomPub), + denomPubHash: encodeCrock(hash(denomPub)), + }; + return tipPlanchet; + } + + /** + * Create and sign a message to request payback for a coin. + */ + createPaybackRequest(coin: CoinRecord): PaybackRequest { + const p = buildSigPS(SignaturePurpose.WALLET_COIN_PAYBACK) + .put(decodeCrock(coin.coinPub)) + .put(decodeCrock(coin.denomPubHash)) + .put(decodeCrock(coin.blindingKey)) + .build(); + + const coinPriv = decodeCrock(coin.coinPriv); + const coinSig = eddsaSign(p, coinPriv); + const paybackRequest: PaybackRequest = { + coin_blind_key_secret: coin.blindingKey, + coin_pub: coin.coinPub, + coin_sig: encodeCrock(coinSig), + denom_pub: coin.denomPub, + denom_sig: coin.denomSig, + }; + return paybackRequest; + } + + /** + * Check if a payment signature is valid. + */ + isValidPaymentSignature( + sig: string, + contractHash: string, + merchantPub: string, + ): boolean { + const p = buildSigPS(SignaturePurpose.MERCHANT_PAYMENT_OK) + .put(decodeCrock(contractHash)) + .build(); + const sigBytes = decodeCrock(sig); + const pubBytes = decodeCrock(merchantPub); + return eddsaVerify(p, sigBytes, pubBytes); + } + + /** + * Check if a wire fee is correctly signed. + */ + isValidWireFee(type: string, wf: WireFee, masterPub: string): boolean { + const p = buildSigPS(SignaturePurpose.MASTER_WIRE_FEES) + .put(hash(stringToBytes(type + "\0"))) + .put(timestampToBuffer(wf.startStamp)) + .put(timestampToBuffer(wf.endStamp)) + .put(amountToBuffer(wf.wireFee)) + .build(); + const sig = decodeCrock(wf.sig); + const pub = decodeCrock(masterPub); + return eddsaVerify(p, sig, pub); + } + + /** + * Check if the signature of a denomination is valid. + */ + isValidDenom(denom: DenominationRecord, masterPub: string): boolean { + const p = buildSigPS(SignaturePurpose.MASTER_DENOMINATION_KEY_VALIDITY) + .put(decodeCrock(masterPub)) + .put(timestampToBuffer(denom.stampStart)) + .put(timestampToBuffer(denom.stampExpireWithdraw)) + .put(timestampToBuffer(denom.stampExpireDeposit)) + .put(timestampToBuffer(denom.stampExpireLegal)) + .put(amountToBuffer(denom.value)) + .put(amountToBuffer(denom.feeWithdraw)) + .put(amountToBuffer(denom.feeDeposit)) + .put(amountToBuffer(denom.feeRefresh)) + .put(amountToBuffer(denom.feeRefund)) + .put(decodeCrock(denom.denomPubHash)) + .build(); + const sig = decodeCrock(denom.masterSig); + const pub = decodeCrock(masterPub); + return eddsaVerify(p, sig, pub); + } + + /** + * Create a new EdDSA key pair. + */ + createEddsaKeypair(): { priv: string; pub: string } { + const pair = createEddsaKeyPair(); + return { + priv: encodeCrock(pair.eddsaPriv), + pub: encodeCrock(pair.eddsaPub), + }; + } + + /** + * Unblind a blindly signed value. + */ + rsaUnblind(blindedSig: string, bk: string, pk: string): string { + const denomSig = rsaUnblind( + decodeCrock(blindedSig), + decodeCrock(pk), + decodeCrock(bk), + ); + return encodeCrock(denomSig); + } + + /** + * Unblind a blindly signed value. + */ + rsaVerify(hm: string, sig: string, pk: string): boolean { + return rsaVerify(hash(decodeCrock(hm)), decodeCrock(sig), decodeCrock(pk)); + } + + /** + * Generate updated coins (to store in the database) + * and deposit permissions for each given coin. + */ + signDeposit( + contractTerms: ContractTerms, + cds: CoinWithDenom[], + totalAmount: AmountJson, + ): PayCoinInfo { + const ret: PayCoinInfo = { + originalCoins: [], + sigs: [], + updatedCoins: [], + }; + + const contractTermsHash = this.hashString(canonicalJson(contractTerms)); + + const feeList: AmountJson[] = cds.map(x => x.denom.feeDeposit); + let fees = Amounts.add(Amounts.getZero(feeList[0].currency), ...feeList) + .amount; + // okay if saturates + fees = Amounts.sub(fees, Amounts.parseOrThrow(contractTerms.max_fee)) + .amount; + const total = Amounts.add(fees, totalAmount).amount; + + let amountSpent = Amounts.getZero(cds[0].coin.currentAmount.currency); + let amountRemaining = total; + + for (const cd of cds) { + const originalCoin = { ...cd.coin }; + + if (amountRemaining.value === 0 && amountRemaining.fraction === 0) { + break; + } + + let coinSpend: AmountJson; + if (Amounts.cmp(amountRemaining, cd.coin.currentAmount) < 0) { + coinSpend = amountRemaining; + } else { + coinSpend = cd.coin.currentAmount; + } + + amountSpent = Amounts.add(amountSpent, coinSpend).amount; + + const feeDeposit = cd.denom.feeDeposit; + + // Give the merchant at least the deposit fee, otherwise it'll reject + // the coin. + + if (Amounts.cmp(coinSpend, feeDeposit) < 0) { + coinSpend = feeDeposit; + } + + const newAmount = Amounts.sub(cd.coin.currentAmount, coinSpend).amount; + cd.coin.currentAmount = newAmount; + cd.coin.status = CoinStatus.Dirty; + + const d = buildSigPS(SignaturePurpose.WALLET_COIN_DEPOSIT) + .put(decodeCrock(contractTermsHash)) + .put(decodeCrock(contractTerms.H_wire)) + .put(talerTimestampStringToBuffer(contractTerms.timestamp)) + .put(talerTimestampStringToBuffer(contractTerms.refund_deadline)) + .put(amountToBuffer(coinSpend)) + .put(amountToBuffer(cd.denom.feeDeposit)) + .put(decodeCrock(contractTerms.merchant_pub)) + .put(decodeCrock(cd.coin.coinPub)) + .build(); + const coinSig = eddsaSign(d, decodeCrock(cd.coin.coinPriv)); + + const s: CoinPaySig = { + coin_pub: cd.coin.coinPub, + coin_sig: encodeCrock(coinSig), + contribution: Amounts.toString(coinSpend), + denom_pub: cd.coin.denomPub, + exchange_url: cd.denom.exchangeBaseUrl, + ub_sig: cd.coin.denomSig, + }; + ret.sigs.push(s); + ret.updatedCoins.push(cd.coin); + ret.originalCoins.push(originalCoin); + } + return ret; + } + + /** + * Create a new refresh session. + */ + createRefreshSession( + exchangeBaseUrl: string, + kappa: number, + meltCoin: CoinRecord, + newCoinDenoms: DenominationRecord[], + meltFee: AmountJson, + ): RefreshSessionRecord { + let valueWithFee = Amounts.getZero(newCoinDenoms[0].value.currency); + + for (const ncd of newCoinDenoms) { + valueWithFee = Amounts.add(valueWithFee, ncd.value, ncd.feeWithdraw) + .amount; + } + + // melt fee + valueWithFee = Amounts.add(valueWithFee, meltFee).amount; + + const sessionHc = createHashContext(); + + const transferPubs: string[] = []; + const transferPrivs: string[] = []; + + const planchetsForGammas: RefreshPlanchetRecord[][] = []; + + for (let i = 0; i < kappa; i++) { + const transferKeyPair = createEcdheKeyPair(); + sessionHc.update(transferKeyPair.ecdhePub); + transferPrivs.push(encodeCrock(transferKeyPair.ecdhePriv)); + transferPubs.push(encodeCrock(transferKeyPair.ecdhePub)); + } + + for (const denom of newCoinDenoms) { + const r = decodeCrock(denom.denomPub); + sessionHc.update(r); + } + + sessionHc.update(decodeCrock(meltCoin.coinPub)); + sessionHc.update(amountToBuffer(valueWithFee)); + + for (let i = 0; i < kappa; i++) { + const planchets: RefreshPlanchetRecord[] = []; + for (let j = 0; j < newCoinDenoms.length; j++) { + const transferPriv = decodeCrock(transferPrivs[i]); + const oldCoinPub = decodeCrock(meltCoin.coinPub); + const transferSecret = keyExchangeEcdheEddsa(transferPriv, oldCoinPub); + + const fresh = setupRefreshPlanchet(transferSecret, j); + + const coinPriv = fresh.coinPriv; + const coinPub = fresh.coinPub; + const blindingFactor = fresh.bks; + const pubHash = hash(coinPub); + const denomPub = decodeCrock(newCoinDenoms[j].denomPub); + const ev = rsaBlind(pubHash, blindingFactor, denomPub); + const planchet: RefreshPlanchetRecord = { + blindingKey: encodeCrock(blindingFactor), + coinEv: encodeCrock(ev), + privateKey: encodeCrock(coinPriv), + publicKey: encodeCrock(coinPub), + }; + planchets.push(planchet); + sessionHc.update(ev); + } + planchetsForGammas.push(planchets); + } + + const sessionHash = sessionHc.finish(); + + const confirmData = buildSigPS(SignaturePurpose.WALLET_COIN_MELT) + .put(sessionHash) + .put(amountToBuffer(valueWithFee)) + .put(amountToBuffer(meltFee)) + .put(decodeCrock(meltCoin.coinPub)) + .build(); + + const confirmSig = eddsaSign(confirmData, decodeCrock(meltCoin.coinPriv)); + + let valueOutput = Amounts.getZero(newCoinDenoms[0].value.currency); + for (const denom of newCoinDenoms) { + valueOutput = Amounts.add(valueOutput, denom.value).amount; + } + + const refreshSessionId = encodeCrock(getRandomBytes(32)); + + const refreshSession: RefreshSessionRecord = { + refreshSessionId, + confirmSig: encodeCrock(confirmSig), + exchangeBaseUrl, + hash: encodeCrock(sessionHash), + meltCoinPub: meltCoin.coinPub, + newDenomHashes: newCoinDenoms.map(d => d.denomPubHash), + newDenoms: newCoinDenoms.map(d => d.denomPub), + norevealIndex: undefined, + planchetsForGammas: planchetsForGammas, + transferPrivs, + transferPubs, + valueOutput, + valueWithFee, + created: getTimestampNow(), + retryInfo: initRetryInfo(), + finishedTimestamp: undefined, + lastError: undefined, + }; + + return refreshSession; + } + + /** + * Hash a string including the zero terminator. + */ + hashString(str: string): string { + const ts = new TextEncoder(); + const b = ts.encode(str + "\0"); + return encodeCrock(hash(b)); + } + + /** + * Hash a denomination public key. + */ + hashDenomPub(denomPub: string): string { + return encodeCrock(hash(decodeCrock(denomPub))); + } + + signCoinLink( + oldCoinPriv: string, + newDenomHash: string, + oldCoinPub: string, + transferPub: string, + coinEv: string, + ): string { + const coinEvHash = hash(decodeCrock(coinEv)); + const coinLink = buildSigPS(SignaturePurpose.WALLET_COIN_LINK) + .put(decodeCrock(newDenomHash)) + .put(decodeCrock(oldCoinPub)) + .put(decodeCrock(transferPub)) + .put(coinEvHash) + .build(); + const coinPriv = decodeCrock(oldCoinPriv); + const sig = eddsaSign(coinLink, coinPriv); + return encodeCrock(sig); + } + + benchmark(repetitions: number): BenchmarkResult { + let time_hash = 0; + for (let i = 0; i < repetitions; i++) { + const start = timer.performanceNow(); + this.hashString("hello world"); + time_hash += timer.performanceNow() - start; + } + + let time_hash_big = 0; + for (let i = 0; i < repetitions; i++) { + const ba = randomBytes(4096); + const start = timer.performanceNow(); + hash(ba); + time_hash_big += timer.performanceNow() - start; + } + + let time_eddsa_create = 0; + for (let i = 0; i < repetitions; i++) { + const start = timer.performanceNow(); + const pair = createEddsaKeyPair(); + time_eddsa_create += timer.performanceNow() - start; + } + + let time_eddsa_sign = 0; + const p = randomBytes(4096); + + const pair = createEddsaKeyPair(); + + for (let i = 0; i < repetitions; i++) { + const start = timer.performanceNow(); + eddsaSign(p, pair.eddsaPriv); + time_eddsa_sign += timer.performanceNow() - start; + } + + const sig = eddsaSign(p, pair.eddsaPriv); + + let time_eddsa_verify = 0; + for (let i = 0; i < repetitions; i++) { + const start = timer.performanceNow(); + eddsaVerify(p, sig, pair.eddsaPub); + time_eddsa_verify += timer.performanceNow() - start; + } + + return { + repetitions, + time: { + hash_small: time_hash, + hash_big: time_hash_big, + eddsa_create: time_eddsa_create, + eddsa_sign: time_eddsa_sign, + eddsa_verify: time_eddsa_verify, + }, + }; + } +} diff --git a/src/crypto/workers/cryptoWorker.ts b/src/crypto/workers/cryptoWorker.ts new file mode 100644 index 000000000..d4449f4a2 --- /dev/null +++ b/src/crypto/workers/cryptoWorker.ts @@ -0,0 +1,8 @@ +export interface CryptoWorker { + postMessage(message: any): void; + + terminate(): void; + + onmessage: ((m: any) => void) | undefined; + onerror: ((m: any) => void) | undefined; +} \ No newline at end of file diff --git a/src/crypto/workers/nodeThreadWorker.ts b/src/crypto/workers/nodeThreadWorker.ts new file mode 100644 index 000000000..b42031c40 --- /dev/null +++ b/src/crypto/workers/nodeThreadWorker.ts @@ -0,0 +1,175 @@ +import { CryptoWorkerFactory } from "./cryptoApi"; + +/* + This file is part of TALER + (C) 2016 GNUnet e.V. + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ + +// tslint:disable:no-var-requires + +import { CryptoWorker } from "./cryptoWorker"; + +import worker_threads = require("worker_threads"); +import os = require("os"); +import { CryptoImplementation } from "./cryptoImplementation"; + +const f = __filename; + +const workerCode = ` + const worker_threads = require('worker_threads'); + const parentPort = worker_threads.parentPort; + let tw; + try { + tw = require("${f}"); + } catch (e) { + console.log("could not load from ${f}"); + } + if (!tw) { + try { + tw = require("taler-wallet-android"); + } catch (e) { + console.log("could not load taler-wallet-android either"); + throw e; + } + } + parentPort.on("message", tw.handleWorkerMessage); + parentPort.on("error", tw.handleWorkerError); +`; + +/** + * This function is executed in the worker thread to handle + * a message. + */ +export function handleWorkerMessage(msg: any) { + const args = msg.args; + if (!Array.isArray(args)) { + console.error("args must be array"); + return; + } + const id = msg.id; + if (typeof id !== "number") { + console.error("RPC id must be number"); + return; + } + const operation = msg.operation; + if (typeof operation !== "string") { + console.error("RPC operation must be string"); + return; + } + + const handleRequest = async () => { + const impl = new CryptoImplementation(); + + if (!(operation in impl)) { + console.error(`crypto operation '${operation}' not found`); + return; + } + + try { + const result = (impl as any)[operation](...args); + const p = worker_threads.parentPort; + worker_threads.parentPort?.postMessage; + if (p) { + p.postMessage({ data: { result, id } }); + } else { + console.error("parent port not available (not running in thread?"); + } + } catch (e) { + console.error("error during operation", e); + return; + } + }; + + handleRequest().catch(e => { + console.error("error in node worker", e); + }); +} + +export function handleWorkerError(e: Error) { + console.log("got error from worker", e); +} + +export class NodeThreadCryptoWorkerFactory implements CryptoWorkerFactory { + startWorker(): CryptoWorker { + if (typeof require === "undefined") { + throw Error("cannot make worker, require(...) not defined"); + } + return new NodeThreadCryptoWorker(); + } + + getConcurrency(): number { + return Math.max(1, os.cpus().length - 1); + } +} + +/** + * Worker implementation that uses node subprocesses. + */ +class NodeThreadCryptoWorker implements CryptoWorker { + /** + * Function to be called when we receive a message from the worker thread. + */ + onmessage: undefined | ((m: any) => void); + + /** + * Function to be called when we receive an error from the worker thread. + */ + onerror: undefined | ((m: any) => void); + + private nodeWorker: worker_threads.Worker; + + constructor() { + this.nodeWorker = new worker_threads.Worker(workerCode, { eval: true }); + this.nodeWorker.on("error", (err: Error) => { + console.error("error in node worker:", err); + if (this.onerror) { + this.onerror(err); + } + }); + this.nodeWorker.on("message", (v: any) => { + if (this.onmessage) { + this.onmessage(v); + } + }); + this.nodeWorker.unref(); + } + + /** + * Add an event listener for either an "error" or "message" event. + */ + addEventListener(event: "message" | "error", fn: (x: any) => void): void { + switch (event) { + case "message": + this.onmessage = fn; + break; + case "error": + this.onerror = fn; + break; + } + } + + /** + * Send a message to the worker thread. + */ + postMessage(msg: any) { + this.nodeWorker.postMessage(msg); + } + + /** + * Forcibly terminate the worker thread. + */ + terminate() { + this.nodeWorker.terminate(); + } +} diff --git a/src/crypto/workers/synchronousWorker.ts b/src/crypto/workers/synchronousWorker.ts new file mode 100644 index 000000000..12eecde9a --- /dev/null +++ b/src/crypto/workers/synchronousWorker.ts @@ -0,0 +1,135 @@ +/* + This file is part of GNU Taler + (C) 2019 GNUnet e.V. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +import { CryptoImplementation } from "./cryptoImplementation"; + +import { CryptoWorkerFactory } from "./cryptoApi"; +import { CryptoWorker } from "./cryptoWorker"; + +/** + * The synchronous crypto worker produced by this factory doesn't run in the + * background, but actually blocks the caller until the operation is done. + */ +export class SynchronousCryptoWorkerFactory implements CryptoWorkerFactory { + startWorker(): CryptoWorker { + if (typeof require === "undefined") { + throw Error("cannot make worker, require(...) not defined"); + } + const workerCtor = require("./synchronousWorker").SynchronousCryptoWorker; + return new workerCtor(); + } + + getConcurrency(): number { + return 1; + } +} + + +/** + * Worker implementation that uses node subprocesses. + */ +export class SynchronousCryptoWorker { + + /** + * Function to be called when we receive a message from the worker thread. + */ + onmessage: undefined | ((m: any) => void); + + /** + * Function to be called when we receive an error from the worker thread. + */ + onerror: undefined | ((m: any) => void); + + constructor() { + this.onerror = undefined; + this.onmessage = undefined; + } + + /** + * Add an event listener for either an "error" or "message" event. + */ + addEventListener(event: "message" | "error", fn: (x: any) => void): void { + switch (event) { + case "message": + this.onmessage = fn; + break; + case "error": + this.onerror = fn; + break; + } + } + + private dispatchMessage(msg: any) { + if (this.onmessage) { + this.onmessage({ data: msg }); + } + } + + private async handleRequest(operation: string, id: number, args: string[]) { + const impl = new CryptoImplementation(); + + if (!(operation in impl)) { + console.error(`crypto operation '${operation}' not found`); + return; + } + + let result: any; + try { + result = (impl as any)[operation](...args); + } catch (e) { + console.log("error during operation", e); + return; + } + + try { + setImmediate(() => this.dispatchMessage({ result, id })); + } catch (e) { + console.log("got error during dispatch", e); + } + } + + /** + * Send a message to the worker thread. + */ + postMessage(msg: any) { + const args = msg.args; + if (!Array.isArray(args)) { + console.error("args must be array"); + return; + } + const id = msg.id; + if (typeof id !== "number") { + console.error("RPC id must be number"); + return; + } + const operation = msg.operation; + if (typeof operation !== "string") { + console.error("RPC operation must be string"); + return; + } + + this.handleRequest(operation, id, args).catch(e => { + console.error("Error while handling crypto request:", e); + }); + } + + /** + * Forcibly terminate the worker thread. + */ + terminate() { + // This is a no-op. + } +} diff --git a/src/dbTypes.ts b/src/dbTypes.ts index 66c4fa8b9..16edbf31a 100644 --- a/src/dbTypes.ts +++ b/src/dbTypes.ts @@ -36,7 +36,12 @@ import { } from "./talerTypes"; import { Index, Store } from "./util/query"; -import { Timestamp, OperationError } from "./walletTypes"; +import { + Timestamp, + OperationError, + Duration, + getTimestampNow, +} from "./walletTypes"; /** * Current database version, should be incremented @@ -83,6 +88,55 @@ export enum ReserveRecordStatus { DORMANT = "dormant", } +export interface RetryInfo { + firstTry: Timestamp; + nextRetry: Timestamp; + retryCounter: number; + active: boolean; +} + +export interface RetryPolicy { + readonly backoffDelta: Duration; + readonly backoffBase: number; +} + +const defaultRetryPolicy: RetryPolicy = { + backoffBase: 1.5, + backoffDelta: { d_ms: 200 }, +}; + +export function updateRetryInfoTimeout( + r: RetryInfo, + p: RetryPolicy = defaultRetryPolicy, +): void { + const now = getTimestampNow(); + const t = + now.t_ms + p.backoffDelta.d_ms * Math.pow(p.backoffBase, r.retryCounter); + r.nextRetry = { t_ms: t }; +} + +export function initRetryInfo( + active: boolean = true, + p: RetryPolicy = defaultRetryPolicy, +): RetryInfo { + if (!active) { + return { + active: false, + firstTry: { t_ms: Number.MAX_SAFE_INTEGER }, + nextRetry: { t_ms: Number.MAX_SAFE_INTEGER }, + retryCounter: 0, + }; + } + const info = { + firstTry: getTimestampNow(), + active: true, + nextRetry: { t_ms: 0 }, + retryCounter: 0, + }; + updateRetryInfoTimeout(info, p); + return info; +} + /** * A reserve record as stored in the wallet's database. */ @@ -176,9 +230,20 @@ export interface ReserveRecord { /** * Time of the last successful status query. */ - lastStatusQuery: Timestamp | undefined; + lastSuccessfulStatusQuery: Timestamp | undefined; - lastError?: OperationError; + /** + * Retry info. This field is present even if no retry is scheduled, + * because we need it to be present for the index on the object store + * to work. + */ + retryInfo: RetryInfo; + + /** + * Last error that happened in a reserve operation + * (either talking to the bank or the exchange). + */ + lastError: OperationError | undefined; } /** @@ -682,17 +747,26 @@ export class ProposalRecord { @Checkable.Optional(Checkable.String()) downloadSessionId?: string; + /** + * Retry info, even present when the operation isn't active to allow indexing + * on the next retry timestamp. + */ + retryInfo: RetryInfo; + /** * Verify that a value matches the schema of this class and convert it into a * member. */ static checked: (obj: any) => ProposalRecord; + + lastError: OperationError | undefined; } /** * Status of a tip we got from a merchant. */ export interface TipRecord { + lastError: OperationError | undefined; /** * Has the user accepted the tip? Only after the tip has been accepted coins * withdrawn from the tip may be used. @@ -753,13 +827,21 @@ export interface TipRecord { */ nextUrl?: string; - timestamp: Timestamp; + createdTimestamp: Timestamp; + + /** + * Retry info, even present when the operation isn't active to allow indexing + * on the next retry timestamp. + */ + retryInfo: RetryInfo; } /** * Ongoing refresh */ export interface RefreshSessionRecord { + lastError: OperationError | undefined; + /** * Public key that's being melted in this session. */ @@ -823,14 +905,25 @@ export interface RefreshSessionRecord { exchangeBaseUrl: string; /** - * Is this session finished? + * Timestamp when the refresh session finished. */ - finished: boolean; + finishedTimestamp: Timestamp | undefined; /** * A 32-byte base32-crockford encoded random identifier. */ refreshSessionId: string; + + /** + * When has this refresh session been created? + */ + created: Timestamp; + + /** + * Retry info, even present when the operation isn't active to allow indexing + * on the next retry timestamp. + */ + retryInfo: RetryInfo; } /** @@ -877,11 +970,35 @@ export interface WireFee { sig: string; } +export enum PurchaseStatus { + /** + * We're currently paying, either for the first + * time or as a re-play potentially with a different + * session ID. + */ + SubmitPay = "submit-pay", + QueryRefund = "query-refund", + ProcessRefund = "process-refund", + Abort = "abort", + Done = "done", +} + /** * Record that stores status information about one purchase, starting from when * the customer accepts a proposal. Includes refund status if applicable. */ export interface PurchaseRecord { + /** + * Proposal ID for this purchase. Uniquely identifies the + * purchase and the proposal. + */ + proposalId: string; + + /** + * Status of this purchase. + */ + status: PurchaseStatus; + /** * Hash of the contract terms. */ @@ -923,13 +1040,13 @@ export interface PurchaseRecord { * When was the purchase made? * Refers to the time that the user accepted. */ - timestamp: Timestamp; + acceptTimestamp: Timestamp; /** * When was the last refund made? * Set to 0 if no refund was made on the purchase. */ - timestamp_refund: Timestamp | undefined; + lastRefundTimestamp: Timestamp | undefined; /** * Last session signature that we submitted to /pay (if any). @@ -946,11 +1063,9 @@ export interface PurchaseRecord { */ abortDone: boolean; - /** - * Proposal ID for this purchase. Uniquely identifies the - * purchase and the proposal. - */ - proposalId: string; + retryInfo: RetryInfo; + + lastError: OperationError | undefined; } /** @@ -1025,7 +1140,7 @@ export interface WithdrawalSourceReserve { reservePub: string; } -export type WithdrawalSource = WithdrawalSourceTip | WithdrawalSourceReserve +export type WithdrawalSource = WithdrawalSourceTip | WithdrawalSourceReserve; export interface WithdrawalSessionRecord { withdrawSessionId: string; @@ -1048,7 +1163,8 @@ export interface WithdrawalSessionRecord { totalCoinValue: AmountJson; /** - * Amount including fees. + * Amount including fees (i.e. the amount subtracted from the + * reserve to withdraw all coins in this withdrawal session). */ rawWithdrawalAmount: AmountJson; @@ -1060,6 +1176,19 @@ export interface WithdrawalSessionRecord { * Coins in this session that are withdrawn are set to true. */ withdrawn: boolean[]; + + /** + * Retry info, always present even on completed operations so that indexing works. + */ + retryInfo: RetryInfo; + + /** + * Last error per coin/planchet, or undefined if no error occured for + * the coin/planchet. + */ + lastCoinErrors: (OperationError | undefined)[]; + + lastError: OperationError | undefined; } export interface BankWithdrawUriRecord { @@ -1125,11 +1254,10 @@ export namespace Stores { "fulfillmentUrlIndex", "contractTerms.fulfillment_url", ); - orderIdIndex = new Index( - this, - "orderIdIndex", + orderIdIndex = new Index(this, "orderIdIndex", [ + "contractTerms.merchant_base_url", "contractTerms.order_id", - ); + ]); } class DenominationsStore extends Store { diff --git a/src/headless/helpers.ts b/src/headless/helpers.ts index e5338369e..cfc7e3695 100644 --- a/src/headless/helpers.ts +++ b/src/headless/helpers.ts @@ -21,35 +21,22 @@ /** * Imports. */ -import { Wallet, OperationFailedAndReportedError } from "../wallet"; -import { Notifier, Badge } from "../walletTypes"; +import { Wallet } from "../wallet"; import { MemoryBackend, BridgeIDBFactory, shimIndexedDB } from "idb-bridge"; -import { SynchronousCryptoWorkerFactory } from "../crypto/synchronousWorker"; import { openTalerDb } from "../db"; import Axios from "axios"; -import querystring = require("querystring"); import { HttpRequestLibrary } from "../util/http"; import * as amounts from "../util/amounts"; import { Bank } from "./bank"; import fs = require("fs"); -import { NodeCryptoWorkerFactory } from "../crypto/nodeProcessWorker"; import { Logger } from "../util/logging"; +import { NodeThreadCryptoWorkerFactory } from "../crypto/workers/nodeThreadWorker"; +import { NotificationType } from "../walletTypes"; const logger = new Logger("helpers.ts"); -class ConsoleBadge implements Badge { - startBusy(): void { - } - stopBusy(): void { - } - showNotification(): void { - } - clearNotification(): void { - } -} - export class NodeHttpLib implements HttpRequestLibrary { async get(url: string): Promise { try { @@ -97,7 +84,6 @@ export interface DefaultNodeWalletArgs { */ persistentStoragePath?: string; - /** * Handler for asynchronous notifications from the wallet. */ @@ -116,15 +102,7 @@ export interface DefaultNodeWalletArgs { export async function getDefaultNodeWallet( args: DefaultNodeWalletArgs = {}, ): Promise { - const myNotifier: Notifier = { - notify() { - if (args.notifyHandler) { - args.notifyHandler(""); - } - } - } - const myBadge = new ConsoleBadge(); BridgeIDBFactory.enableTracing = false; const myBackend = new MemoryBackend(); @@ -180,14 +158,14 @@ export async function getDefaultNodeWallet( myUnsupportedUpgrade, ); - const worker = new SynchronousCryptoWorkerFactory(); + //const worker = new SynchronousCryptoWorkerFactory(); //const worker = new NodeCryptoWorkerFactory(); + const worker = new NodeThreadCryptoWorkerFactory(); + return new Wallet( myDb, myHttpLib, - myBadge, - myNotifier, worker, ); } @@ -217,6 +195,14 @@ export async function withdrawTestBalance( ["x-taler-bank"], ); + const donePromise = new Promise((resolve, reject) => { + myWallet.addNotificationListener((n) => { + if (n.type === NotificationType.ReserveDepleted && n.reservePub === reservePub ) { + resolve(); + } + }); + }); + await bank.createReserve( bankUser, amount, @@ -225,5 +211,5 @@ export async function withdrawTestBalance( ); await myWallet.confirmReserve({ reservePub: reserveResponse.reservePub }); - await myWallet.runUntilReserveDepleted(reservePub); + await donePromise; } diff --git a/src/headless/integrationtest.ts b/src/headless/integrationtest.ts index 91adfaa6d..632ce8f60 100644 --- a/src/headless/integrationtest.ts +++ b/src/headless/integrationtest.ts @@ -82,9 +82,5 @@ export async function runIntegrationTest(args: { throw Error("payment did not succeed"); } - await myWallet.runPending(); - //const refreshRes = await myWallet.refreshDirtyCoins(); - //console.log(`waited to refresh ${refreshRes.numRefreshed} coins`); - - myWallet.stop(); + await myWallet.runUntilDone(); } diff --git a/src/headless/taler-wallet-cli.ts b/src/headless/taler-wallet-cli.ts index 9598b9d98..931cac087 100644 --- a/src/headless/taler-wallet-cli.ts +++ b/src/headless/taler-wallet-cli.ts @@ -19,14 +19,14 @@ import fs = require("fs"); import { getDefaultNodeWallet, withdrawTestBalance } from "./helpers"; import { MerchantBackendConnection } from "./merchant"; import { runIntegrationTest } from "./integrationtest"; -import { Wallet, OperationFailedAndReportedError } from "../wallet"; +import { Wallet } from "../wallet"; import qrcodeGenerator = require("qrcode-generator"); import * as clk from "./clk"; import { BridgeIDBFactory, MemoryBackend } from "idb-bridge"; import { Logger } from "../util/logging"; import * as Amounts from "../util/amounts"; import { decodeCrock } from "../crypto/talerCrypto"; -import { Bank } from "./bank"; +import { OperationFailedAndReportedError } from "../wallet-impl/errors"; const logger = new Logger("taler-wallet-cli.ts"); diff --git a/src/util/asyncMemo.ts b/src/util/asyncMemo.ts index 34868ab4f..193ce6df6 100644 --- a/src/util/asyncMemo.ts +++ b/src/util/asyncMemo.ts @@ -14,39 +14,76 @@ GNU Taler; see the file COPYING. If not, see */ -export interface MemoEntry { +interface MemoEntry { p: Promise; t: number; n: number; } -export class AsyncOpMemo { +export class AsyncOpMemoMap { private n = 0; - private memo: { [k: string]: MemoEntry } = {}; - put(key: string, p: Promise): Promise { + private memoMap: { [k: string]: MemoEntry } = {}; + + private cleanUp(key: string, n: number) { + const r = this.memoMap[key]; + if (r && r.n === n) { + delete this.memoMap[key]; + } + } + + memo(key: string, pg: () => Promise): Promise { + const res = this.memoMap[key]; + if (res) { + return res.p; + } const n = this.n++; - this.memo[key] = { + // Wrap the operation in case it immediately throws + const p = Promise.resolve().then(() => pg()); + p.finally(() => { + this.cleanUp(key, n); + }); + this.memoMap[key] = { p, n, t: new Date().getTime(), }; - p.finally(() => { - const r = this.memo[key]; - if (r && r.n === n) { - delete this.memo[key]; - } - }); return p; } - find(key: string): Promise | undefined { - const res = this.memo[key]; - const tNow = new Date().getTime(); - if (res && res.t < tNow - 10 * 1000) { - delete this.memo[key]; - return; - } else if (res) { + clear() { + this.memoMap = {}; + } +} + + +export class AsyncOpMemoSingle { + private n = 0; + private memoEntry: MemoEntry | undefined; + + private cleanUp(n: number) { + if (this.memoEntry && this.memoEntry.n === n) { + this.memoEntry = undefined; + } + } + + memo(pg: () => Promise): Promise { + const res = this.memoEntry; + if (res) { return res.p; } - return; + const n = this.n++; + // Wrap the operation in case it immediately throws + const p = Promise.resolve().then(() => pg()); + p.finally(() => { + this.cleanUp(n); + }); + this.memoEntry = { + p, + n, + t: new Date().getTime(), + }; + return p; + } + clear() { + this.memoEntry = undefined; } -} \ No newline at end of file +} diff --git a/src/util/query.ts b/src/util/query.ts index b1b19665b..e05656bb7 100644 --- a/src/util/query.ts +++ b/src/util/query.ts @@ -316,7 +316,7 @@ export function oneShotIterIndex( return new ResultStream(req); } -class TransactionHandle { +export class TransactionHandle { constructor(private tx: IDBTransaction) {} put(store: Store, value: T, key?: any): Promise { @@ -406,6 +406,7 @@ function runWithTransaction( }; tx.onerror = () => { console.error("error in transaction"); + console.error(stack); }; tx.onabort = () => { if (tx.error) { diff --git a/src/util/taleruri-test.ts b/src/util/taleruri-test.ts index 02eecf209..c687a6717 100644 --- a/src/util/taleruri-test.ts +++ b/src/util/taleruri-test.ts @@ -169,10 +169,8 @@ test("taler refund uri parsing", t => { t.fail(); return; } - t.is( - r1.refundUrl, - "https://merchant.example.com/public/refund?order_id=1234", - ); + t.is(r1.merchantBaseUrl, "https://merchant.example.com/public/"); + t.is(r1.orderId, "1234"); }); test("taler refund uri parsing with instance", t => { @@ -182,10 +180,8 @@ test("taler refund uri parsing with instance", t => { t.fail(); return; } - t.is( - r1.refundUrl, - "https://merchant.example.com/public/instances/myinst/refund?order_id=1234", - ); + t.is(r1.orderId, "1234"); + t.is(r1.merchantBaseUrl, "https://merchant.example.com/public/instances/myinst/"); }); test("taler tip pickup uri", t => { @@ -197,7 +193,7 @@ test("taler tip pickup uri", t => { } t.is( r1.merchantBaseUrl, - "https://merchant.example.com/public/tip-pickup?tip_id=tipid", + "https://merchant.example.com/public/", ); }); diff --git a/src/util/taleruri.ts b/src/util/taleruri.ts index aa6705c07..50886a916 100644 --- a/src/util/taleruri.ts +++ b/src/util/taleruri.ts @@ -24,7 +24,8 @@ export interface WithdrawUriResult { } export interface RefundUriResult { - refundUrl: string; + merchantBaseUrl: string; + orderId: string; } export interface TipUriResult { @@ -184,17 +185,13 @@ export function parseRefundUri(s: string): RefundUriResult | undefined { maybeInstancePath = `instances/${maybeInstance}/`; } - const refundUrl = - "https://" + - host + - "/" + - maybePath + - maybeInstancePath + - "refund" + - "?order_id=" + - orderId; + const merchantBaseUrl = "https://" + host + + "/" + + maybePath + + maybeInstancePath return { - refundUrl, + merchantBaseUrl, + orderId, }; } diff --git a/src/wallet-impl/balance.ts b/src/wallet-impl/balance.ts index 94d65fa96..a1351014c 100644 --- a/src/wallet-impl/balance.ts +++ b/src/wallet-impl/balance.ts @@ -33,6 +33,7 @@ const logger = new Logger("withdraw.ts"); export async function getBalances( ws: InternalWalletState, ): Promise { + logger.trace("starting to compute balance"); /** * Add amount to a balance field, both for * the slicing by exchange and currency. @@ -101,7 +102,7 @@ export async function getBalances( await tx.iter(Stores.refresh).forEach(r => { // Don't count finished refreshes, since the refresh already resulted // in coins being added to the wallet. - if (r.finished) { + if (r.finishedTimestamp) { return; } addTo( diff --git a/src/wallet-impl/errors.ts b/src/wallet-impl/errors.ts new file mode 100644 index 000000000..5df99b7d3 --- /dev/null +++ b/src/wallet-impl/errors.ts @@ -0,0 +1,81 @@ +import { OperationError } from "../walletTypes"; + +/* + This file is part of GNU Taler + (C) 2019 GNUnet e.V. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +/** + * This exception is there to let the caller know that an error happened, + * but the error has already been reported by writing it to the database. + */ +export class OperationFailedAndReportedError extends Error { + constructor(message: string) { + super(message); + + // Set the prototype explicitly. + Object.setPrototypeOf(this, OperationFailedAndReportedError.prototype); + } +} + +/** + * This exception is thrown when an error occured and the caller is + * responsible for recording the failure in the database. + */ +export class OperationFailedError extends Error { + constructor(message: string, public err: OperationError) { + super(message); + + // Set the prototype explicitly. + Object.setPrototypeOf(this, OperationFailedError.prototype); + } +} + +/** + * Run an operation and call the onOpError callback + * when there was an exception or operation error that must be reported. + * The cause will be re-thrown to the caller. + */ +export async function guardOperationException( + op: () => Promise, + onOpError: (e: OperationError) => Promise, +): Promise { + try { + return op(); + } catch (e) { + if (e instanceof OperationFailedAndReportedError) { + throw e; + } + if (e instanceof OperationFailedError) { + await onOpError(e.err); + throw new OperationFailedAndReportedError(e.message); + } + if (e instanceof Error) { + await onOpError({ + type: "exception", + message: e.message, + details: {}, + }); + throw new OperationFailedAndReportedError(e.message); + } + await onOpError({ + type: "exception", + message: "non-error exception thrown", + details: { + value: e.toString(), + }, + }); + throw new OperationFailedAndReportedError(e.message); + } +} \ No newline at end of file diff --git a/src/wallet-impl/exchanges.ts b/src/wallet-impl/exchanges.ts index b3677c6c6..b89f3f84e 100644 --- a/src/wallet-impl/exchanges.ts +++ b/src/wallet-impl/exchanges.ts @@ -17,7 +17,6 @@ import { InternalWalletState } from "./state"; import { WALLET_CACHE_BREAKER_CLIENT_VERSION, - OperationFailedAndReportedError, } from "../wallet"; import { KeysJson, Denomination, ExchangeWireJson } from "../talerTypes"; import { getTimestampNow, OperationError } from "../walletTypes"; @@ -42,6 +41,7 @@ import { } from "../util/query"; import * as Amounts from "../util/amounts"; import { parsePaytoUri } from "../util/payto"; +import { OperationFailedAndReportedError } from "./errors"; async function denominationRecordFromKeys( ws: InternalWalletState, diff --git a/src/wallet-impl/history.ts b/src/wallet-impl/history.ts index dfc683e6d..5e93ab878 100644 --- a/src/wallet-impl/history.ts +++ b/src/wallet-impl/history.ts @@ -78,11 +78,11 @@ export async function getHistory( fulfillmentUrl: p.contractTerms.fulfillment_url, merchantName: p.contractTerms.merchant.name, }, - timestamp: p.timestamp, + timestamp: p.acceptTimestamp, type: "pay", explicit: false, }); - if (p.timestamp_refund) { + if (p.lastRefundTimestamp) { const contractAmount = Amounts.parseOrThrow(p.contractTerms.amount); const amountsPending = Object.keys(p.refundsPending).map(x => Amounts.parseOrThrow(p.refundsPending[x].refund_amount), @@ -103,7 +103,7 @@ export async function getHistory( merchantName: p.contractTerms.merchant.name, refundAmount: amount, }, - timestamp: p.timestamp_refund, + timestamp: p.lastRefundTimestamp, type: "refund", explicit: false, }); @@ -151,7 +151,7 @@ export async function getHistory( merchantBaseUrl: tip.merchantBaseUrl, tipId: tip.merchantTipId, }, - timestamp: tip.timestamp, + timestamp: tip.createdTimestamp, explicit: false, type: "tip", }); diff --git a/src/wallet-impl/pay.ts b/src/wallet-impl/pay.ts index 9942139a6..9b2da9c7d 100644 --- a/src/wallet-impl/pay.ts +++ b/src/wallet-impl/pay.ts @@ -33,6 +33,8 @@ import { getTimestampNow, PreparePayResult, ConfirmPayResult, + OperationError, + NotificationType, } from "../walletTypes"; import { oneShotIter, @@ -51,12 +53,14 @@ import { PurchaseRecord, CoinRecord, ProposalStatus, + initRetryInfo, + updateRetryInfoTimeout, + PurchaseStatus, } from "../dbTypes"; import * as Amounts from "../util/amounts"; import { amountToPretty, strcmp, - extractTalerStamp, canonicalJson, extractTalerStampOrThrow, } from "../util/helpers"; @@ -65,6 +69,8 @@ import { InternalWalletState } from "./state"; import { parsePayUri, parseRefundUri } from "../util/taleruri"; import { getTotalRefreshCost, refresh } from "./refresh"; import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto"; +import { guardOperationException } from "./errors"; +import { assertUnreachable } from "../util/assertUnreachable"; export interface SpeculativePayData { payCoinInfo: PayCoinInfo; @@ -344,9 +350,12 @@ async function recordConfirmPay( payReq, refundsDone: {}, refundsPending: {}, - timestamp: getTimestampNow(), - timestamp_refund: undefined, + acceptTimestamp: getTimestampNow(), + lastRefundTimestamp: undefined, proposalId: proposal.proposalId, + retryInfo: initRetryInfo(), + lastError: undefined, + status: PurchaseStatus.SubmitPay, }; await runWithWriteTransaction( @@ -365,8 +374,10 @@ async function recordConfirmPay( }, ); - ws.badge.showNotification(); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.ProposalAccepted, + proposalId: proposal.proposalId, + }); return t; } @@ -419,7 +430,7 @@ export async function abortFailedPayment( } const refundResponse = MerchantRefundResponse.checked(resp.responseJson); - await acceptRefundResponse(ws, refundResponse); + await acceptRefundResponse(ws, purchase.proposalId, refundResponse); await runWithWriteTransaction(ws.db, [Stores.purchases], async tx => { const p = await tx.get(Stores.purchases, proposalId); @@ -431,9 +442,61 @@ export async function abortFailedPayment( }); } +async function incrementProposalRetry( + ws: InternalWalletState, + proposalId: string, + err: OperationError | undefined, +): Promise { + await runWithWriteTransaction(ws.db, [Stores.proposals], async tx => { + const pr = await tx.get(Stores.proposals, proposalId); + if (!pr) { + return; + } + if (!pr.retryInfo) { + return; + } + pr.retryInfo.retryCounter++; + updateRetryInfoTimeout(pr.retryInfo); + pr.lastError = err; + await tx.put(Stores.proposals, pr); + }); +} + +async function incrementPurchaseRetry( + ws: InternalWalletState, + proposalId: string, + err: OperationError | undefined, +): Promise { + await runWithWriteTransaction(ws.db, [Stores.purchases], async tx => { + const pr = await tx.get(Stores.purchases, proposalId); + if (!pr) { + return; + } + if (!pr.retryInfo) { + return; + } + pr.retryInfo.retryCounter++; + updateRetryInfoTimeout(pr.retryInfo); + pr.lastError = err; + await tx.put(Stores.purchases, pr); + }); +} + export async function processDownloadProposal( ws: InternalWalletState, proposalId: string, +): Promise { + const onOpErr = (err: OperationError) => + incrementProposalRetry(ws, proposalId, err); + await guardOperationException( + () => processDownloadProposalImpl(ws, proposalId), + onOpErr, + ); +} + +async function processDownloadProposalImpl( + ws: InternalWalletState, + proposalId: string, ): Promise { const proposal = await oneShotGet(ws.db, Stores.proposals, proposalId); if (!proposal) { @@ -498,7 +561,10 @@ export async function processDownloadProposal( }, ); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.ProposalDownloaded, + proposalId: proposal.proposalId, + }); } /** @@ -536,6 +602,8 @@ async function startDownloadProposal( proposalId: proposalId, proposalStatus: ProposalStatus.DOWNLOADING, repurchaseProposalId: undefined, + retryInfo: initRetryInfo(), + lastError: undefined, }; await oneShotPut(ws.db, Stores.proposals, proposalRecord); @@ -582,6 +650,7 @@ export async function submitPay( throw Error("merchant payment signature invalid"); } purchase.finished = true; + purchase.retryInfo = initRetryInfo(false); const modifiedCoins: CoinRecord[] = []; for (const pc of purchase.payReq.coins) { const c = await oneShotGet(ws.db, Stores.coins, pc.coin_pub); @@ -859,8 +928,6 @@ export async function confirmPay( return submitPay(ws, proposalId, sessionId); } - - export async function getFullRefundFees( ws: InternalWalletState, refundPermissions: MerchantRefundPermission[], @@ -914,15 +981,13 @@ export async function getFullRefundFees( return feeAcc; } -async function submitRefunds( +async function submitRefundsToExchange( ws: InternalWalletState, proposalId: string, ): Promise { const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); if (!purchase) { - console.error( - "not submitting refunds, payment not found:", - ); + console.error("not submitting refunds, payment not found:"); return; } const pendingKeys = Object.keys(purchase.refundsPending); @@ -991,14 +1056,18 @@ async function submitRefunds( refresh(ws, perm.coin_pub); } - ws.badge.showNotification(); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.RefundsSubmitted, + proposalId, + }); } -export async function acceptRefundResponse( + +async function acceptRefundResponse( ws: InternalWalletState, + proposalId: string, refundResponse: MerchantRefundResponse, -): Promise { +): Promise { const refundPermissions = refundResponse.refund_permissions; if (!refundPermissions.length) { @@ -1015,7 +1084,8 @@ export async function acceptRefundResponse( return; } - t.timestamp_refund = getTimestampNow(); + t.lastRefundTimestamp = getTimestampNow(); + t.status = PurchaseStatus.ProcessRefund; for (const perm of refundPermissions) { if ( @@ -1027,18 +1097,48 @@ export async function acceptRefundResponse( } return t; } + // Add the refund permissions to the purchase within a DB transaction + await oneShotMutate(ws.db, Stores.purchases, proposalId, f); + await submitRefundsToExchange(ws, proposalId); +} - const hc = refundResponse.h_contract_terms; - // Add the refund permissions to the purchase within a DB transaction - await oneShotMutate(ws.db, Stores.purchases, hc, f); - ws.notifier.notify(); +async function queryRefund(ws: InternalWalletState, proposalId: string): Promise { + const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); + if (purchase?.status !== PurchaseStatus.QueryRefund) { + return; + } - await submitRefunds(ws, hc); + const refundUrl = new URL("refund", purchase.contractTerms.merchant_base_url).href + let resp; + try { + resp = await ws.http.get(refundUrl); + } catch (e) { + console.error("error downloading refund permission", e); + throw e; + } - return hc; + const refundResponse = MerchantRefundResponse.checked(resp.responseJson); + await acceptRefundResponse(ws, proposalId, refundResponse); } +async function startRefundQuery(ws: InternalWalletState, proposalId: string): Promise { + const success = await runWithWriteTransaction(ws.db, [Stores.purchases], async (tx) => { + const p = await tx.get(Stores.purchases, proposalId); + if (p?.status !== PurchaseStatus.Done) { + return false; + } + p.status = PurchaseStatus.QueryRefund; + return true; + }); + + if (!success) { + return; + } + await queryRefund(ws, proposalId); +} + + /** * Accept a refund, return the contract hash for the contract * that was involved in the refund. @@ -1053,17 +1153,56 @@ export async function applyRefund( throw Error("invalid refund URI"); } - const refundUrl = parseResult.refundUrl; + const purchase = await oneShotGetIndexed( + ws.db, + Stores.purchases.orderIdIndex, + [parseResult.merchantBaseUrl, parseResult.orderId], + ); - logger.trace("processing refund"); - let resp; - try { - resp = await ws.http.get(refundUrl); - } catch (e) { - console.error("error downloading refund permission", e); - throw e; + if (!purchase) { + throw Error("no purchase for the taler://refund/ URI was found"); } - const refundResponse = MerchantRefundResponse.checked(resp.responseJson); - return acceptRefundResponse(ws, refundResponse); + await startRefundQuery(ws, purchase.proposalId); + + return purchase.contractTermsHash; +} + +export async function processPurchase( + ws: InternalWalletState, + proposalId: string, +): Promise { + const onOpErr = (e: OperationError) => + incrementPurchaseRetry(ws, proposalId, e); + await guardOperationException( + () => processPurchaseImpl(ws, proposalId), + onOpErr, + ); +} + +export async function processPurchaseImpl( + ws: InternalWalletState, + proposalId: string, +): Promise { + const purchase = await oneShotGet(ws.db, Stores.purchases, proposalId); + if (!purchase) { + return; + } + switch (purchase.status) { + case PurchaseStatus.Done: + return; + case PurchaseStatus.Abort: + // FIXME + break; + case PurchaseStatus.SubmitPay: + break; + case PurchaseStatus.QueryRefund: + await queryRefund(ws, proposalId); + break; + case PurchaseStatus.ProcessRefund: + await submitRefundsToExchange(ws, proposalId); + break; + default: + throw assertUnreachable(purchase.status); + } } diff --git a/src/wallet-impl/payback.ts b/src/wallet-impl/payback.ts index 5bf5ff06e..56696d771 100644 --- a/src/wallet-impl/payback.ts +++ b/src/wallet-impl/payback.ts @@ -29,6 +29,7 @@ import { Stores, TipRecord, CoinStatus } from "../dbTypes"; import { Logger } from "../util/logging"; import { PaybackConfirmation } from "../talerTypes"; import { updateExchangeFromUrl } from "./exchanges"; +import { NotificationType } from "../walletTypes"; const logger = new Logger("payback.ts"); @@ -65,7 +66,9 @@ export async function payback( await tx.put(Stores.reserves, reserve); }, ); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.PaybackStarted, + }); const paybackRequest = await ws.cryptoApi.createPaybackRequest(coin); const reqUrl = new URL("payback", coin.exchangeBaseUrl); @@ -83,6 +86,8 @@ export async function payback( } coin.status = CoinStatus.Dormant; await oneShotPut(ws.db, Stores.coins, coin); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.PaybackFinished, + }); await updateExchangeFromUrl(ws, coin.exchangeBaseUrl, true); } diff --git a/src/wallet-impl/pending.ts b/src/wallet-impl/pending.ts index 72102e3a1..bd10538af 100644 --- a/src/wallet-impl/pending.ts +++ b/src/wallet-impl/pending.ts @@ -21,8 +21,10 @@ import { PendingOperationInfo, PendingOperationsResponse, getTimestampNow, + Timestamp, + Duration, } from "../walletTypes"; -import { runWithReadTransaction } from "../util/query"; +import { runWithReadTransaction, TransactionHandle } from "../util/query"; import { InternalWalletState } from "./state"; import { Stores, @@ -32,11 +34,355 @@ import { ProposalStatus, } from "../dbTypes"; +function updateRetryDelay( + oldDelay: Duration, + now: Timestamp, + retryTimestamp: Timestamp, +): Duration { + if (retryTimestamp.t_ms <= now.t_ms) { + return { d_ms: 0 }; + } + return { d_ms: Math.min(oldDelay.d_ms, retryTimestamp.t_ms - now.t_ms) }; +} + +async function gatherExchangePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise { + if (onlyDue) { + // FIXME: exchanges should also be updated regularly + return; + } + await tx.iter(Stores.exchanges).forEach(e => { + switch (e.updateStatus) { + case ExchangeUpdateStatus.FINISHED: + if (e.lastError) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record is in FINISHED state but has lastError set", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + if (!e.details) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record does not have details, but no update in progress.", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + if (!e.wireInfo) { + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: + "Exchange record does not have wire info, but no update in progress.", + details: { + exchangeBaseUrl: e.baseUrl, + }, + }); + } + break; + case ExchangeUpdateStatus.FETCH_KEYS: + resp.pendingOperations.push({ + type: "exchange-update", + givesLifeness: false, + stage: "fetch-keys", + exchangeBaseUrl: e.baseUrl, + lastError: e.lastError, + reason: e.updateReason || "unknown", + }); + break; + case ExchangeUpdateStatus.FETCH_WIRE: + resp.pendingOperations.push({ + type: "exchange-update", + givesLifeness: false, + stage: "fetch-wire", + exchangeBaseUrl: e.baseUrl, + lastError: e.lastError, + reason: e.updateReason || "unknown", + }); + break; + default: + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: "Unknown exchangeUpdateStatus", + details: { + exchangeBaseUrl: e.baseUrl, + exchangeUpdateStatus: e.updateStatus, + }, + }); + break; + } + }); +} + +async function gatherReservePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise { + // FIXME: this should be optimized by using an index for "onlyDue==true". + await tx.iter(Stores.reserves).forEach(reserve => { + const reserveType = reserve.bankWithdrawStatusUrl ? "taler-bank" : "manual"; + if (!reserve.retryInfo.active) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + reserve.retryInfo.nextRetry, + ); + if (onlyDue && reserve.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + switch (reserve.reserveStatus) { + case ReserveRecordStatus.DORMANT: + // nothing to report as pending + break; + case ReserveRecordStatus.WITHDRAWING: + case ReserveRecordStatus.UNCONFIRMED: + case ReserveRecordStatus.QUERYING_STATUS: + case ReserveRecordStatus.REGISTERING_BANK: + resp.pendingOperations.push({ + type: "reserve", + givesLifeness: true, + stage: reserve.reserveStatus, + timestampCreated: reserve.created, + reserveType, + reservePub: reserve.reservePub, + retryInfo: reserve.retryInfo, + }); + break; + case ReserveRecordStatus.WAIT_CONFIRM_BANK: + resp.pendingOperations.push({ + type: "reserve", + givesLifeness: true, + stage: reserve.reserveStatus, + timestampCreated: reserve.created, + reserveType, + reservePub: reserve.reservePub, + bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl, + retryInfo: reserve.retryInfo, + }); + break; + default: + resp.pendingOperations.push({ + type: "bug", + givesLifeness: false, + message: "Unknown reserve record status", + details: { + reservePub: reserve.reservePub, + reserveStatus: reserve.reserveStatus, + }, + }); + break; + } + }); +} + +async function gatherRefreshPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise { + await tx.iter(Stores.refresh).forEach(r => { + if (r.finishedTimestamp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + r.retryInfo.nextRetry, + ); + if (onlyDue && r.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + let refreshStatus: string; + if (r.norevealIndex === undefined) { + refreshStatus = "melt"; + } else { + refreshStatus = "reveal"; + } + + resp.pendingOperations.push({ + type: "refresh", + givesLifeness: true, + oldCoinPub: r.meltCoinPub, + refreshStatus, + refreshOutputSize: r.newDenoms.length, + refreshSessionId: r.refreshSessionId, + }); + }); +} + +async function gatherCoinsPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise { + // Refreshing dirty coins is always due. + await tx.iter(Stores.coins).forEach(coin => { + if (coin.status == CoinStatus.Dirty) { + resp.nextRetryDelay.d_ms = 0; + resp.pendingOperations.push({ + givesLifeness: true, + type: "dirty-coin", + coinPub: coin.coinPub, + }); + } + }); +} + +async function gatherWithdrawalPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise { + await tx.iter(Stores.withdrawalSession).forEach(wsr => { + if (wsr.finishTimestamp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + wsr.retryInfo.nextRetry, + ); + if (onlyDue && wsr.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + const numCoinsWithdrawn = wsr.withdrawn.reduce((a, x) => a + (x ? 1 : 0), 0); + const numCoinsTotal = wsr.withdrawn.length; + resp.pendingOperations.push({ + type: "withdraw", + givesLifeness: true, + numCoinsTotal, + numCoinsWithdrawn, + source: wsr.source, + withdrawSessionId: wsr.withdrawSessionId, + }); + }); +} + +async function gatherProposalPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise { + await tx.iter(Stores.proposals).forEach(proposal => { + if (proposal.proposalStatus == ProposalStatus.PROPOSED) { + if (onlyDue) { + return; + } + resp.pendingOperations.push({ + type: "proposal-choice", + givesLifeness: false, + merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, + proposalId: proposal.proposalId, + proposalTimestamp: proposal.timestamp, + }); + } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) { + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + proposal.retryInfo.nextRetry, + ); + if (onlyDue && proposal.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + resp.pendingOperations.push({ + type: "proposal-download", + givesLifeness: true, + merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, + proposalId: proposal.proposalId, + proposalTimestamp: proposal.timestamp, + }); + } + }); +} + +async function gatherTipPending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise { + await tx.iter(Stores.tips).forEach(tip => { + if (tip.pickedUp) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + tip.retryInfo.nextRetry, + ); + if (onlyDue && tip.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + if (tip.accepted) { + resp.pendingOperations.push({ + type: "tip", + givesLifeness: true, + merchantBaseUrl: tip.merchantBaseUrl, + tipId: tip.tipId, + merchantTipId: tip.merchantTipId, + }); + } + }); +} + +async function gatherPurchasePending( + tx: TransactionHandle, + now: Timestamp, + resp: PendingOperationsResponse, + onlyDue: boolean = false, +): Promise { + await tx.iter(Stores.purchases).forEach((pr) => { + if (pr.finished) { + return; + } + resp.nextRetryDelay = updateRetryDelay( + resp.nextRetryDelay, + now, + pr.retryInfo.nextRetry, + ); + if (onlyDue && pr.retryInfo.nextRetry.t_ms > now.t_ms) { + return; + } + resp.pendingOperations.push({ + type: "pay", + givesLifeness: true, + isReplay: false, + proposalId: pr.proposalId, + }); + }); + +} + export async function getPendingOperations( ws: InternalWalletState, + onlyDue: boolean = false, ): Promise { - const pendingOperations: PendingOperationInfo[] = []; - let minRetryDurationMs = 5000; + const resp: PendingOperationsResponse = { + nextRetryDelay: { d_ms: Number.MAX_SAFE_INTEGER }, + pendingOperations: [], + }; + const now = getTimestampNow(); await runWithReadTransaction( ws.db, [ @@ -47,207 +393,18 @@ export async function getPendingOperations( Stores.withdrawalSession, Stores.proposals, Stores.tips, + Stores.purchases, ], async tx => { - await tx.iter(Stores.exchanges).forEach(e => { - switch (e.updateStatus) { - case ExchangeUpdateStatus.FINISHED: - if (e.lastError) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record is in FINISHED state but has lastError set", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - if (!e.details) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record does not have details, but no update in progress.", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - if (!e.wireInfo) { - pendingOperations.push({ - type: "bug", - message: - "Exchange record does not have wire info, but no update in progress.", - details: { - exchangeBaseUrl: e.baseUrl, - }, - }); - } - break; - case ExchangeUpdateStatus.FETCH_KEYS: - pendingOperations.push({ - type: "exchange-update", - stage: "fetch-keys", - exchangeBaseUrl: e.baseUrl, - lastError: e.lastError, - reason: e.updateReason || "unknown", - }); - break; - case ExchangeUpdateStatus.FETCH_WIRE: - pendingOperations.push({ - type: "exchange-update", - stage: "fetch-wire", - exchangeBaseUrl: e.baseUrl, - lastError: e.lastError, - reason: e.updateReason || "unknown", - }); - break; - default: - pendingOperations.push({ - type: "bug", - message: "Unknown exchangeUpdateStatus", - details: { - exchangeBaseUrl: e.baseUrl, - exchangeUpdateStatus: e.updateStatus, - }, - }); - break; - } - }); - await tx.iter(Stores.reserves).forEach(reserve => { - const reserveType = reserve.bankWithdrawStatusUrl - ? "taler-bank" - : "manual"; - const now = getTimestampNow(); - switch (reserve.reserveStatus) { - case ReserveRecordStatus.DORMANT: - // nothing to report as pending - break; - case ReserveRecordStatus.WITHDRAWING: - case ReserveRecordStatus.UNCONFIRMED: - case ReserveRecordStatus.QUERYING_STATUS: - case ReserveRecordStatus.REGISTERING_BANK: - pendingOperations.push({ - type: "reserve", - stage: reserve.reserveStatus, - timestampCreated: reserve.created, - reserveType, - reservePub: reserve.reservePub, - }); - if (reserve.created.t_ms < now.t_ms - 5000) { - minRetryDurationMs = 500; - } else if (reserve.created.t_ms < now.t_ms - 30000) { - minRetryDurationMs = 2000; - } - break; - case ReserveRecordStatus.WAIT_CONFIRM_BANK: - pendingOperations.push({ - type: "reserve", - stage: reserve.reserveStatus, - timestampCreated: reserve.created, - reserveType, - reservePub: reserve.reservePub, - bankWithdrawConfirmUrl: reserve.bankWithdrawConfirmUrl, - }); - if (reserve.created.t_ms < now.t_ms - 5000) { - minRetryDurationMs = 500; - } else if (reserve.created.t_ms < now.t_ms - 30000) { - minRetryDurationMs = 2000; - } - break; - default: - pendingOperations.push({ - type: "bug", - message: "Unknown reserve record status", - details: { - reservePub: reserve.reservePub, - reserveStatus: reserve.reserveStatus, - }, - }); - break; - } - }); - - await tx.iter(Stores.refresh).forEach(r => { - if (r.finished) { - return; - } - let refreshStatus: string; - if (r.norevealIndex === undefined) { - refreshStatus = "melt"; - } else { - refreshStatus = "reveal"; - } - - pendingOperations.push({ - type: "refresh", - oldCoinPub: r.meltCoinPub, - refreshStatus, - refreshOutputSize: r.newDenoms.length, - refreshSessionId: r.refreshSessionId, - }); - }); - - await tx.iter(Stores.coins).forEach(coin => { - if (coin.status == CoinStatus.Dirty) { - pendingOperations.push({ - type: "dirty-coin", - coinPub: coin.coinPub, - }); - } - }); - - await tx.iter(Stores.withdrawalSession).forEach(ws => { - const numCoinsWithdrawn = ws.withdrawn.reduce( - (a, x) => a + (x ? 1 : 0), - 0, - ); - const numCoinsTotal = ws.withdrawn.length; - if (numCoinsWithdrawn < numCoinsTotal) { - pendingOperations.push({ - type: "withdraw", - numCoinsTotal, - numCoinsWithdrawn, - source: ws.source, - withdrawSessionId: ws.withdrawSessionId, - }); - } - }); - - await tx.iter(Stores.proposals).forEach((proposal) => { - if (proposal.proposalStatus == ProposalStatus.PROPOSED) { - pendingOperations.push({ - type: "proposal-choice", - merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, - proposalId: proposal.proposalId, - proposalTimestamp: proposal.timestamp, - }); - } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) { - pendingOperations.push({ - type: "proposal-download", - merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url, - proposalId: proposal.proposalId, - proposalTimestamp: proposal.timestamp, - }); - } - }); - - await tx.iter(Stores.tips).forEach((tip) => { - if (tip.accepted && !tip.pickedUp) { - pendingOperations.push({ - type: "tip", - merchantBaseUrl: tip.merchantBaseUrl, - tipId: tip.tipId, - merchantTipId: tip.merchantTipId, - }); - } - }); + await gatherExchangePending(tx, now, resp, onlyDue); + await gatherReservePending(tx, now, resp, onlyDue); + await gatherRefreshPending(tx, now, resp, onlyDue); + await gatherCoinsPending(tx, now, resp, onlyDue); + await gatherWithdrawalPending(tx, now, resp, onlyDue); + await gatherProposalPending(tx, now, resp, onlyDue); + await gatherTipPending(tx, now, resp, onlyDue); + await gatherPurchasePending(tx, now, resp, onlyDue); }, ); - - return { - pendingOperations, - nextRetryDelay: { - d_ms: minRetryDurationMs, - }, - }; + return resp; } diff --git a/src/wallet-impl/refresh.ts b/src/wallet-impl/refresh.ts index 7e7270ed3..a3b48919d 100644 --- a/src/wallet-impl/refresh.ts +++ b/src/wallet-impl/refresh.ts @@ -23,6 +23,8 @@ import { RefreshPlanchetRecord, CoinRecord, RefreshSessionRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import { amountToPretty } from "../util/helpers"; import { @@ -36,6 +38,8 @@ import { InternalWalletState } from "./state"; import { Logger } from "../util/logging"; import { getWithdrawDenomList } from "./withdraw"; import { updateExchangeFromUrl } from "./exchanges"; +import { getTimestampNow, OperationError, NotificationType } from "../walletTypes"; +import { guardOperationException } from "./errors"; const logger = new Logger("refresh.ts"); @@ -132,14 +136,16 @@ async function refreshMelt( if (rs.norevealIndex !== undefined) { return; } - if (rs.finished) { + if (rs.finishedTimestamp) { return; } rs.norevealIndex = norevealIndex; return rs; }); - ws.notifier.notify(); + ws.notify({ + type: NotificationType.RefreshMelted, + }); } async function refreshReveal( @@ -225,16 +231,6 @@ async function refreshReveal( return; } - const exchange = oneShotGet( - ws.db, - Stores.exchanges, - refreshSession.exchangeBaseUrl, - ); - if (!exchange) { - console.error(`exchange ${refreshSession.exchangeBaseUrl} not found`); - return; - } - const coins: CoinRecord[] = []; for (let i = 0; i < respJson.ev_sigs.length; i++) { @@ -271,31 +267,71 @@ async function refreshReveal( coins.push(coin); } - refreshSession.finished = true; - await runWithWriteTransaction( ws.db, [Stores.coins, Stores.refresh], async tx => { const rs = await tx.get(Stores.refresh, refreshSessionId); if (!rs) { + console.log("no refresh session found"); return; } - if (rs.finished) { + if (rs.finishedTimestamp) { + console.log("refresh session already finished"); return; } + rs.finishedTimestamp = getTimestampNow(); + rs.retryInfo = initRetryInfo(false); for (let coin of coins) { await tx.put(Stores.coins, coin); } - await tx.put(Stores.refresh, refreshSession); + await tx.put(Stores.refresh, rs); }, ); - ws.notifier.notify(); + console.log("refresh finished (end of reveal)"); + ws.notify({ + type: NotificationType.RefreshRevealed, + }); } +async function incrementRefreshRetry( + ws: InternalWalletState, + refreshSessionId: string, + err: OperationError | undefined, +): Promise { + await runWithWriteTransaction(ws.db, [Stores.refresh], async tx => { + const r = await tx.get(Stores.refresh, refreshSessionId); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.put(Stores.refresh, r); + }); +} + + export async function processRefreshSession( ws: InternalWalletState, refreshSessionId: string, +) { + return ws.memoProcessRefresh.memo(refreshSessionId, async () => { + const onOpErr = (e: OperationError) => + incrementRefreshRetry(ws, refreshSessionId, e); + return guardOperationException( + () => processRefreshSessionImpl(ws, refreshSessionId), + onOpErr, + ); + }); +} + +async function processRefreshSessionImpl( + ws: InternalWalletState, + refreshSessionId: string, ) { const refreshSession = await oneShotGet( ws.db, @@ -305,7 +341,7 @@ export async function processRefreshSession( if (!refreshSession) { return; } - if (refreshSession.finished) { + if (refreshSession.finishedTimestamp) { return; } if (typeof refreshSession.norevealIndex !== "number") { @@ -376,7 +412,7 @@ export async function refresh( x.status = CoinStatus.Dormant; return x; }); - ws.notifier.notify(); + ws.notify( { type: NotificationType.RefreshRefused }); return; } @@ -388,29 +424,32 @@ export async function refresh( oldDenom.feeRefresh, ); - function mutateCoin(c: CoinRecord): CoinRecord { - const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); - if (r.saturated) { - // Something else must have written the coin value - throw TransactionAbort; - } - c.currentAmount = r.amount; - c.status = CoinStatus.Dormant; - return c; - } - // Store refresh session and subtract refreshed amount from // coin in the same transaction. await runWithWriteTransaction( ws.db, [Stores.refresh, Stores.coins], async tx => { + const c = await tx.get(Stores.coins, coin.coinPub); + if (!c) { + return; + } + if (c.status !== CoinStatus.Dirty) { + return; + } + const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee); + if (r.saturated) { + console.log("can't refresh coin, no amount left"); + return; + } + c.currentAmount = r.amount; + c.status = CoinStatus.Dormant; await tx.put(Stores.refresh, refreshSession); - await tx.mutate(Stores.coins, coin.coinPub, mutateCoin); + await tx.put(Stores.coins, c); }, ); logger.info(`created refresh session ${refreshSession.refreshSessionId}`); - ws.notifier.notify(); + ws.notify( { type: NotificationType.RefreshStarted }); await processRefreshSession(ws, refreshSession.refreshSessionId); } diff --git a/src/wallet-impl/reserves.ts b/src/wallet-impl/reserves.ts index d70f02576..f00956b46 100644 --- a/src/wallet-impl/reserves.ts +++ b/src/wallet-impl/reserves.ts @@ -20,6 +20,7 @@ import { getTimestampNow, ConfirmReserveRequest, OperationError, + NotificationType, } from "../walletTypes"; import { canonicalizeBaseUrl } from "../util/helpers"; import { InternalWalletState } from "./state"; @@ -29,6 +30,8 @@ import { CurrencyRecord, Stores, WithdrawalSessionRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import { oneShotMutate, @@ -42,13 +45,13 @@ import * as Amounts from "../util/amounts"; import { updateExchangeFromUrl, getExchangeTrust } from "./exchanges"; import { WithdrawOperationStatusResponse, ReserveStatus } from "../talerTypes"; import { assertUnreachable } from "../util/assertUnreachable"; -import { OperationFailedAndReportedError } from "../wallet"; import { encodeCrock } from "../crypto/talerCrypto"; import { randomBytes } from "../crypto/primitives/nacl-fast"; import { getVerifiedWithdrawDenomList, processWithdrawSession, } from "./withdraw"; +import { guardOperationException, OperationFailedAndReportedError } from "./errors"; const logger = new Logger("reserves.ts"); @@ -91,7 +94,9 @@ export async function createReserve( bankWithdrawStatusUrl: req.bankWithdrawStatusUrl, exchangeWire: req.exchangeWire, reserveStatus, - lastStatusQuery: undefined, + lastSuccessfulStatusQuery: undefined, + retryInfo: initRetryInfo(), + lastError: undefined, }; const senderWire = req.senderWire; @@ -171,7 +176,7 @@ export async function createReserve( // Asynchronously process the reserve, but return // to the caller already. - processReserve(ws, resp.reservePub).catch(e => { + processReserve(ws, resp.reservePub, true).catch(e => { console.error("Processing reserve failed:", e); }); @@ -188,18 +193,19 @@ export async function createReserve( export async function processReserve( ws: InternalWalletState, reservePub: string, + forceNow: boolean = false, ): Promise { - const p = ws.memoProcessReserve.find(reservePub); - if (p) { - return p; - } else { - return ws.memoProcessReserve.put( - reservePub, - processReserveImpl(ws, reservePub), + return ws.memoProcessReserve.memo(reservePub, async () => { + const onOpError = (err: OperationError) => + incrementReserveRetry(ws, reservePub, err); + await guardOperationException( + () => processReserveImpl(ws, reservePub, forceNow), + onOpError, ); - } + }); } + async function registerReserveWithBank( ws: InternalWalletState, reservePub: string, @@ -235,6 +241,7 @@ async function registerReserveWithBank( } r.timestampReserveInfoPosted = getTimestampNow(); r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK; + r.retryInfo = initRetryInfo(); return r; }); return processReserveBankStatus(ws, reservePub); @@ -243,6 +250,18 @@ async function registerReserveWithBank( export async function processReserveBankStatus( ws: InternalWalletState, reservePub: string, +): Promise { + const onOpError = (err: OperationError) => + incrementReserveRetry(ws, reservePub, err); + await guardOperationException( + () => processReserveBankStatusImpl(ws, reservePub), + onOpError, + ); +} + +async function processReserveBankStatusImpl( + ws: InternalWalletState, + reservePub: string, ): Promise { let reserve = await oneShotGet(ws.db, Stores.reserves, reservePub); switch (reserve?.reserveStatus) { @@ -287,9 +306,10 @@ export async function processReserveBankStatus( const now = getTimestampNow(); r.timestampConfirmed = now; r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + r.retryInfo = initRetryInfo(); return r; }); - await processReserveImpl(ws, reservePub); + await processReserveImpl(ws, reservePub, true); } else { await oneShotMutate(ws.db, Stores.reserves, reservePub, r => { switch (r.reserveStatus) { @@ -304,16 +324,24 @@ export async function processReserveBankStatus( } } -async function setReserveError( +async function incrementReserveRetry( ws: InternalWalletState, reservePub: string, - err: OperationError, + err: OperationError | undefined, ): Promise { - const mut = (reserve: ReserveRecord) => { - reserve.lastError = err; - return reserve; - }; - await oneShotMutate(ws.db, Stores.reserves, reservePub, mut); + await runWithWriteTransaction(ws.db, [Stores.reserves], async tx => { + const r = await tx.get(Stores.reserves, reservePub); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.put(Stores.reserves, r); + }); } /** @@ -345,15 +373,11 @@ async function updateReserve( } catch (e) { if (e.response?.status === 404) { const m = "The exchange does not know about this reserve (yet)."; - await setReserveError(ws, reservePub, { - type: "waiting", - details: {}, - message: "The exchange does not know about this reserve (yet).", - }); - throw new OperationFailedAndReportedError(m); + await incrementReserveRetry(ws, reservePub, undefined); + return; } else { const m = e.message; - await setReserveError(ws, reservePub, { + await incrementReserveRetry(ws, reservePub, { type: "network", details: {}, message: m, @@ -369,7 +393,7 @@ async function updateReserve( } // FIXME: check / compare history! - if (!r.lastStatusQuery) { + if (!r.lastSuccessfulStatusQuery) { // FIXME: check if this matches initial expectations r.withdrawRemainingAmount = balance; } else { @@ -392,22 +416,31 @@ async function updateReserve( // We're missing some money. } } - r.lastStatusQuery = getTimestampNow(); + r.lastSuccessfulStatusQuery = getTimestampNow(); r.reserveStatus = ReserveRecordStatus.WITHDRAWING; + r.retryInfo = initRetryInfo(); return r; }); - ws.notifier.notify(); + ws.notify( { type: NotificationType.ReserveUpdated }); } async function processReserveImpl( ws: InternalWalletState, reservePub: string, + forceNow: boolean = false, ): Promise { const reserve = await oneShotGet(ws.db, Stores.reserves, reservePub); if (!reserve) { console.log("not processing reserve: reserve does not exist"); return; } + if (!forceNow) { + const now = getTimestampNow(); + if (reserve.retryInfo.nextRetry.t_ms > now.t_ms) { + logger.trace("processReserve retry not due yet"); + return; + } + } logger.trace( `Processing reserve ${reservePub} with status ${reserve.reserveStatus}`, ); @@ -417,10 +450,10 @@ async function processReserveImpl( break; case ReserveRecordStatus.REGISTERING_BANK: await processReserveBankStatus(ws, reservePub); - return processReserveImpl(ws, reservePub); + return processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.QUERYING_STATUS: await updateReserve(ws, reservePub); - return processReserveImpl(ws, reservePub); + return processReserveImpl(ws, reservePub, true); case ReserveRecordStatus.WITHDRAWING: await depleteReserve(ws, reservePub); break; @@ -448,12 +481,13 @@ export async function confirmReserve( } reserve.timestampConfirmed = now; reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + reserve.retryInfo = initRetryInfo(); return reserve; }); - ws.notifier.notify(); + ws.notify({ type: NotificationType.ReserveUpdated }); - processReserve(ws, req.reservePub).catch(e => { + processReserve(ws, req.reservePub, true).catch(e => { console.log("processing reserve failed:", e); }); } @@ -489,7 +523,7 @@ async function depleteReserve( logger.trace(`got denom list`); if (denomsForWithdraw.length === 0) { const m = `Unable to withdraw from reserve, no denominations are available to withdraw.`; - await setReserveError(ws, reserve.reservePub, { + await incrementReserveRetry(ws, reserve.reservePub, { type: "internal", message: m, details: {}, @@ -502,7 +536,8 @@ async function depleteReserve( const withdrawalSessionId = encodeCrock(randomBytes(32)); - const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)).amount; + const totalCoinValue = Amounts.sum(denomsForWithdraw.map(x => x.value)) + .amount; const withdrawalRecord: WithdrawalSessionRecord = { withdrawSessionId: withdrawalSessionId, @@ -517,6 +552,9 @@ async function depleteReserve( withdrawn: denomsForWithdraw.map(x => false), planchets: denomsForWithdraw.map(x => undefined), totalCoinValue, + retryInfo: initRetryInfo(), + lastCoinErrors: denomsForWithdraw.map(x => undefined), + lastError: undefined, }; const totalCoinWithdrawFee = Amounts.sum( @@ -545,7 +583,7 @@ async function depleteReserve( r.withdrawRemainingAmount = remaining.amount; r.withdrawAllocatedAmount = allocated.amount; r.reserveStatus = ReserveRecordStatus.DORMANT; - + r.retryInfo = initRetryInfo(false); return r; } diff --git a/src/wallet-impl/return.ts b/src/wallet-impl/return.ts index 9cf12052d..ec19c00ae 100644 --- a/src/wallet-impl/return.ts +++ b/src/wallet-impl/return.ts @@ -204,8 +204,6 @@ export async function returnCoins( } }, ); - ws.badge.showNotification(); - ws.notifier.notify(); depositReturnedCoins(ws, coinsReturnRecord); } @@ -269,6 +267,5 @@ async function depositReturnedCoins( } } await oneShotPut(ws.db, Stores.coinsReturns, currentCrr); - ws.notifier.notify(); } } diff --git a/src/wallet-impl/state.ts b/src/wallet-impl/state.ts index a04a7dd1c..18df861f1 100644 --- a/src/wallet-impl/state.ts +++ b/src/wallet-impl/state.ts @@ -15,19 +15,54 @@ */ import { HttpRequestLibrary } from "../util/http"; -import { Badge, Notifier, NextUrlResult } from "../walletTypes"; +import { + NextUrlResult, + WalletBalance, + PendingOperationsResponse, + WalletNotification, +} from "../walletTypes"; import { SpeculativePayData } from "./pay"; -import { CryptoApi } from "../crypto/cryptoApi"; -import { AsyncOpMemo } from "../util/asyncMemo"; - -export interface InternalWalletState { - db: IDBDatabase; - http: HttpRequestLibrary; - badge: Badge; - notifier: Notifier; +import { CryptoApi, CryptoWorkerFactory } from "../crypto/workers/cryptoApi"; +import { AsyncOpMemoMap, AsyncOpMemoSingle } from "../util/asyncMemo"; +import { Logger } from "../util/logging"; + +type NotificationListener = (n: WalletNotification) => void; + +const logger = new Logger("state.ts"); + +export class InternalWalletState { + speculativePayData: SpeculativePayData | undefined = undefined; + cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; + memoProcessReserve: AsyncOpMemoMap = new AsyncOpMemoMap(); + memoMakePlanchet: AsyncOpMemoMap = new AsyncOpMemoMap(); + memoGetPending: AsyncOpMemoSingle< + PendingOperationsResponse + > = new AsyncOpMemoSingle(); + memoGetBalance: AsyncOpMemoSingle = new AsyncOpMemoSingle(); + memoProcessRefresh: AsyncOpMemoMap = new AsyncOpMemoMap(); cryptoApi: CryptoApi; - speculativePayData: SpeculativePayData | undefined; - cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult }; - memoProcessReserve: AsyncOpMemo; - memoMakePlanchet: AsyncOpMemo; -} \ No newline at end of file + + listeners: NotificationListener[] = []; + + constructor( + public db: IDBDatabase, + public http: HttpRequestLibrary, + cryptoWorkerFactory: CryptoWorkerFactory, + ) { + this.cryptoApi = new CryptoApi(cryptoWorkerFactory); + } + + public notify(n: WalletNotification) { + logger.trace("Notification", n); + for (const l of this.listeners) { + const nc = JSON.parse(JSON.stringify(n)); + setImmediate(() => { + l(nc); + }); + } + } + + addNotificationListener(f: (n: WalletNotification) => void): void { + this.listeners.push(f); + } +} diff --git a/src/wallet-impl/tip.ts b/src/wallet-impl/tip.ts index 593f0d612..3ae931d45 100644 --- a/src/wallet-impl/tip.ts +++ b/src/wallet-impl/tip.ts @@ -18,14 +18,15 @@ import { oneShotGet, oneShotPut, oneShotMutate, runWithWriteTransaction } from "../util/query"; import { InternalWalletState } from "./state"; import { parseTipUri } from "../util/taleruri"; -import { TipStatus, getTimestampNow } from "../walletTypes"; +import { TipStatus, getTimestampNow, OperationError } from "../walletTypes"; import { TipPickupGetResponse, TipPlanchetDetail, TipResponse } from "../talerTypes"; import * as Amounts from "../util/amounts"; -import { Stores, PlanchetRecord, WithdrawalSessionRecord } from "../dbTypes"; +import { Stores, PlanchetRecord, WithdrawalSessionRecord, initRetryInfo, updateRetryInfoTimeout } from "../dbTypes"; import { getWithdrawDetailsForAmount, getVerifiedWithdrawDenomList, processWithdrawSession } from "./withdraw"; import { getTalerStampSec } from "../util/helpers"; import { updateExchangeFromUrl } from "./exchanges"; import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto"; +import { guardOperationException } from "./errors"; export async function getTipStatus( @@ -74,12 +75,14 @@ export async function getTipStatus( pickedUp: false, planchets: undefined, response: undefined, - timestamp: getTimestampNow(), + createdTimestamp: getTimestampNow(), merchantTipId: res.merchantTipId, totalFees: Amounts.add( withdrawDetails.overhead, withdrawDetails.withdrawFee, ).amount, + retryInfo: initRetryInfo(), + lastError: undefined, }; await oneShotPut(ws.db, Stores.tips, tipRecord); } @@ -101,9 +104,37 @@ export async function getTipStatus( return tipStatus; } +async function incrementTipRetry( + ws: InternalWalletState, + refreshSessionId: string, + err: OperationError | undefined, +): Promise { + await runWithWriteTransaction(ws.db, [Stores.tips], async tx => { + const t = await tx.get(Stores.tips, refreshSessionId); + if (!t) { + return; + } + if (!t.retryInfo) { + return; + } + t.retryInfo.retryCounter++; + updateRetryInfoTimeout(t.retryInfo); + t.lastError = err; + await tx.put(Stores.tips, t); + }); +} + export async function processTip( ws: InternalWalletState, tipId: string, +): Promise { + const onOpErr = (e: OperationError) => incrementTipRetry(ws, tipId, e); + await guardOperationException(() => processTipImpl(ws, tipId), onOpErr); +} + +async function processTipImpl( + ws: InternalWalletState, + tipId: string, ) { let tipRecord = await oneShotGet(ws.db, Stores.tips, tipId); if (!tipRecord) { @@ -205,6 +236,10 @@ export async function processTip( rawWithdrawalAmount: tipRecord.amount, withdrawn: planchets.map((x) => false), totalCoinValue: Amounts.sum(planchets.map((p) => p.coinValue)).amount, + lastCoinErrors: planchets.map((x) => undefined), + retryInfo: initRetryInfo(), + finishTimestamp: undefined, + lastError: undefined, }; @@ -217,6 +252,7 @@ export async function processTip( return; } tr.pickedUp = true; + tr.retryInfo = initRetryInfo(false); await tx.put(Stores.tips, tr); await tx.put(Stores.withdrawalSession, withdrawalSession); @@ -224,8 +260,6 @@ export async function processTip( await processWithdrawSession(ws, withdrawalSessionId); - ws.notifier.notify(); - ws.badge.showNotification(); return; } diff --git a/src/wallet-impl/withdraw.ts b/src/wallet-impl/withdraw.ts index d02ae14aa..7b7d0f640 100644 --- a/src/wallet-impl/withdraw.ts +++ b/src/wallet-impl/withdraw.ts @@ -22,6 +22,8 @@ import { CoinStatus, CoinRecord, PlanchetRecord, + initRetryInfo, + updateRetryInfoTimeout, } from "../dbTypes"; import * as Amounts from "../util/amounts"; import { @@ -30,6 +32,8 @@ import { DownloadedWithdrawInfo, ReserveCreationInfo, WithdrawDetails, + OperationError, + NotificationType, } from "../walletTypes"; import { WithdrawOperationStatusResponse } from "../talerTypes"; import { InternalWalletState } from "./state"; @@ -51,6 +55,7 @@ import { createReserve, processReserveBankStatus } from "./reserves"; import { WALLET_PROTOCOL_VERSION } from "../wallet"; import * as LibtoolVersion from "../util/libtoolVersion"; +import { guardOperationException } from "./errors"; const logger = new Logger("withdraw.ts"); @@ -143,12 +148,9 @@ export async function acceptWithdrawal( senderWire: withdrawInfo.senderWire, exchangeWire: exchangeWire, }); - ws.badge.showNotification(); - ws.notifier.notify(); // We do this here, as the reserve should be registered before we return, // so that we can redirect the user to the bank's status page. await processReserveBankStatus(ws, reserve.reservePub); - ws.notifier.notify(); console.log("acceptWithdrawal: returning"); return { reservePub: reserve.reservePub, @@ -234,6 +236,12 @@ async function processPlanchet( planchet.denomPub, ); + + const isValid = await ws.cryptoApi.rsaVerify(planchet.coinPub, denomSig, planchet.denomPub); + if (!isValid) { + throw Error("invalid RSA signature by the exchange"); + } + const coin: CoinRecord = { blindingKey: planchet.blindingKey, coinPriv: planchet.coinPriv, @@ -249,6 +257,9 @@ async function processPlanchet( withdrawSessionId: withdrawalSessionId, }; + let withdrawSessionFinished = false; + let reserveDepleted = false; + await runWithWriteTransaction( ws.db, [Stores.coins, Stores.withdrawalSession, Stores.reserves], @@ -262,6 +273,18 @@ async function processPlanchet( return; } ws.withdrawn[coinIdx] = true; + ws.lastCoinErrors[coinIdx] = undefined; + let numDone = 0; + for (let i = 0; i < ws.withdrawn.length; i++) { + if (ws.withdrawn[i]) { + numDone++; + } + } + if (numDone === ws.denoms.length) { + ws.finishTimestamp = getTimestampNow(); + ws.retryInfo = initRetryInfo(false); + withdrawSessionFinished = true; + } await tx.put(Stores.withdrawalSession, ws); if (!planchet.isFromTip) { const r = await tx.get(Stores.reserves, planchet.reservePub); @@ -270,14 +293,29 @@ async function processPlanchet( r.withdrawCompletedAmount, Amounts.add(denom.value, denom.feeWithdraw).amount, ).amount; + if (Amounts.cmp(r.withdrawCompletedAmount, r.withdrawAllocatedAmount) == 0) { + reserveDepleted = true; + } await tx.put(Stores.reserves, r); } } await tx.add(Stores.coins, coin); }, ); - ws.notifier.notify(); - logger.trace(`withdraw of one coin ${coin.coinPub} finished`); + + if (withdrawSessionFinished) { + ws.notify({ + type: NotificationType.WithdrawSessionFinished, + withdrawSessionId: withdrawalSessionId, + }); + } + + if (reserveDepleted && withdrawalSession.source.type === "reserve") { + ws.notify({ + type: NotificationType.ReserveDepleted, + reservePub: withdrawalSession.source.reservePub, + }); + } } /** @@ -437,27 +475,50 @@ async function processWithdrawCoin( } if (!withdrawalSession.planchets[coinIndex]) { - logger.trace("creating planchet for coin", coinIndex); const key = `${withdrawalSessionId}-${coinIndex}`; - const p = ws.memoMakePlanchet.find(key); - if (p) { - await p; - } else { - ws.memoMakePlanchet.put( - key, - makePlanchet(ws, withdrawalSessionId, coinIndex), - ); - } - await makePlanchet(ws, withdrawalSessionId, coinIndex); - logger.trace("done creating planchet for coin", coinIndex); + await ws.memoMakePlanchet.memo(key, async () => { + logger.trace("creating planchet for coin", coinIndex); + return makePlanchet(ws, withdrawalSessionId, coinIndex); + }); } await processPlanchet(ws, withdrawalSessionId, coinIndex); - logger.trace("starting withdraw for coin", coinIndex); +} + +async function incrementWithdrawalRetry( + ws: InternalWalletState, + withdrawalSessionId: string, + err: OperationError | undefined, +): Promise { + await runWithWriteTransaction(ws.db, [Stores.withdrawalSession], async tx => { + const wsr = await tx.get(Stores.withdrawalSession, withdrawalSessionId); + if (!wsr) { + return; + } + if (!wsr.retryInfo) { + return; + } + wsr.retryInfo.retryCounter++; + updateRetryInfoTimeout(wsr.retryInfo); + wsr.lastError = err; + await tx.put(Stores.withdrawalSession, wsr); + }); } export async function processWithdrawSession( ws: InternalWalletState, withdrawalSessionId: string, +): Promise { + const onOpErr = (e: OperationError) => + incrementWithdrawalRetry(ws, withdrawalSessionId, e); + await guardOperationException( + () => processWithdrawSessionImpl(ws, withdrawalSessionId), + onOpErr, + ); +} + +export async function processWithdrawSessionImpl( + ws: InternalWalletState, + withdrawalSessionId: string, ): Promise { logger.trace("processing withdraw session", withdrawalSessionId); const withdrawalSession = await oneShotGet( @@ -474,7 +535,6 @@ export async function processWithdrawSession( processWithdrawCoin(ws, withdrawalSessionId, i), ); await Promise.all(ps); - ws.badge.showNotification(); return; } diff --git a/src/wallet.ts b/src/wallet.ts index 772bb01ac..86b3085f4 100644 --- a/src/wallet.ts +++ b/src/wallet.ts @@ -22,7 +22,7 @@ /** * Imports. */ -import { CryptoApi, CryptoWorkerFactory } from "./crypto/cryptoApi"; +import { CryptoApi, CryptoWorkerFactory } from "./crypto/workers/cryptoApi"; import { HttpRequestLibrary } from "./util/http"; import { oneShotPut, @@ -49,6 +49,7 @@ import { processDownloadProposal, applyRefund, getFullRefundFees, + processPurchaseImpl, } from "./wallet-impl/pay"; import { @@ -65,14 +66,12 @@ import { } from "./dbTypes"; import { MerchantRefundPermission } from "./talerTypes"; import { - Badge, BenchmarkResult, ConfirmPayResult, ConfirmReserveRequest, CreateReserveRequest, CreateReserveResponse, HistoryEvent, - Notifier, ReturnCoinsRequest, SenderWireInfos, TipStatus, @@ -85,6 +84,8 @@ import { PendingOperationInfo, PendingOperationsResponse, HistoryQuery, + WalletNotification, + NotificationType, } from "./walletTypes"; import { Logger } from "./util/logging"; @@ -97,8 +98,6 @@ import { } from "./wallet-impl/exchanges"; import { processReserve } from "./wallet-impl/reserves"; -import { AsyncOpMemo } from "./util/asyncMemo"; - import { InternalWalletState } from "./wallet-impl/state"; import { createReserve, confirmReserve } from "./wallet-impl/reserves"; import { processRefreshSession, refresh } from "./wallet-impl/refresh"; @@ -111,6 +110,7 @@ import { returnCoins } from "./wallet-impl/return"; import { payback } from "./wallet-impl/payback"; import { TimerGroup } from "./util/timer"; import { AsyncCondition } from "./util/promiseUtils"; +import { AsyncOpMemoSingle } from "./util/asyncMemo"; /** * Wallet protocol version spoken with the exchange @@ -137,18 +137,6 @@ const builtinCurrencies: CurrencyRecord[] = [ }, ]; -/** - * This error is thrown when an - */ -export class OperationFailedAndReportedError extends Error { - constructor(message: string) { - super(message); - - // Set the prototype explicitly. - Object.setPrototypeOf(this, OperationFailedAndReportedError.prototype); - } -} - const logger = new Logger("wallet.ts"); /** @@ -159,41 +147,18 @@ export class Wallet { private timerGroup: TimerGroup = new TimerGroup(); private latch = new AsyncCondition(); private stopped: boolean = false; + private memoRunRetryLoop = new AsyncOpMemoSingle(); get db(): IDBDatabase { return this.ws.db; } - private get badge(): Badge { - return this.ws.badge; - } - - private get cryptoApi(): CryptoApi { - return this.ws.cryptoApi; - } - - private get notifier(): Notifier { - return this.ws.notifier; - } - constructor( db: IDBDatabase, http: HttpRequestLibrary, - badge: Badge, - notifier: Notifier, cryptoWorkerFactory: CryptoWorkerFactory, ) { - this.ws = { - badge, - cachedNextUrl: {}, - cryptoApi: new CryptoApi(cryptoWorkerFactory), - db, - http, - notifier, - speculativePayData: undefined, - memoProcessReserve: new AsyncOpMemo(), - memoMakePlanchet: new AsyncOpMemo(), - }; + this.ws = new InternalWalletState(db, http, cryptoWorkerFactory); } getExchangePaytoUri(exchangeBaseUrl: string, supportedTargetTypes: string[]) { @@ -204,6 +169,10 @@ export class Wallet { return getWithdrawDetailsForAmount(this.ws, baseUrl, amount); } + addNotificationListener(f: (n: WalletNotification) => void): void { + this.ws.addNotificationListener(f); + } + /** * Execute one operation based on the pending operation info record. */ @@ -213,6 +182,7 @@ export class Wallet { ): Promise { switch (pending.type) { case "bug": + // Nothing to do, will just be displayed to the user return; case "dirty-coin": await refresh(this.ws, pending.coinPub); @@ -224,7 +194,7 @@ export class Wallet { await processRefreshSession(this.ws, pending.refreshSessionId); break; case "reserve": - await processReserve(this.ws, pending.reservePub); + await processReserve(this.ws, pending.reservePub, forceNow); break; case "withdraw": await processWithdrawSession(this.ws, pending.withdrawSessionId); @@ -239,6 +209,7 @@ export class Wallet { await processTip(this.ws, pending.tipId); break; case "pay": + await processPurchaseImpl(this.ws, pending.proposalId); break; default: assertUnreachable(pending); @@ -249,7 +220,8 @@ export class Wallet { * Process pending operations. */ public async runPending(forceNow: boolean = false): Promise { - const pendingOpsResponse = await this.getPendingOperations(); + const onlyDue = !forceNow; + const pendingOpsResponse = await this.getPendingOperations(onlyDue); for (const p of pendingOpsResponse.pendingOperations) { try { await this.processOnePendingOperation(p, forceNow); @@ -260,54 +232,96 @@ export class Wallet { } /** - * Process pending operations and wait for scheduled operations in - * a loop until the wallet is stopped explicitly. + * Run the wallet until there are no more pending operations that give + * liveness left. The wallet will be in a stopped state when this function + * returns without resolving to an exception. */ - public async runLoopScheduledRetries(): Promise { - while (!this.stopped) { - console.log("running wallet retry loop iteration"); - let pending = await this.getPendingOperations(); - console.log("waiting for", pending.nextRetryDelay); - const timeout = this.timerGroup.resolveAfter(pending.nextRetryDelay.d_ms); - await Promise.race([timeout, this.latch.wait()]); - pending = await this.getPendingOperations(); - for (const p of pending.pendingOperations) { - try { - this.processOnePendingOperation(p); - } catch (e) { - console.error(e); + public async runUntilDone(): Promise { + const p = new Promise((resolve, reject) => { + // Run this asynchronously + this.addNotificationListener(n => { + if ( + n.type === NotificationType.WaitingForRetry && + n.numGivingLiveness == 0 + ) { + logger.trace("no liveness-giving operations left, stopping"); + this.stop(); } - } - } + }); + this.runRetryLoop().catch(e => { + console.log("exception in wallet retry loop"); + reject(e); + }); + }); + await p; } /** - * Run until all coins have been withdrawn from the given reserve, - * or an error has occured. + * Process pending operations and wait for scheduled operations in + * a loop until the wallet is stopped explicitly. */ - public async runUntilReserveDepleted(reservePub: string) { - while (true) { - const r = await this.getPendingOperations(); - const allPending = r.pendingOperations; - const relevantPending = allPending.filter(x => { - switch (x.type) { - case "reserve": - return x.reservePub === reservePub; - case "withdraw": - return ( - x.source.type === "reserve" && x.source.reservePub === reservePub - ); - default: - return false; - } - }); - if (relevantPending.length === 0) { - return; + public async runRetryLoop(): Promise { + // Make sure we only run one main loop at a time. + return this.memoRunRetryLoop.memo(async () => { + try { + await this.runRetryLoopImpl(); + } catch (e) { + console.error("error during retry loop execution", e); + throw e; } - for (const p of relevantPending) { - await this.processOnePendingOperation(p); + }); + } + + private async runRetryLoopImpl(): Promise { + while (!this.stopped) { + console.log("running wallet retry loop iteration"); + let pending = await this.getPendingOperations(true); + if (pending.pendingOperations.length === 0) { + const allPending = await this.getPendingOperations(false); + let numPending = 0; + let numGivingLiveness = 0; + for (const p of allPending.pendingOperations) { + numPending++; + if (p.givesLifeness) { + numGivingLiveness++; + } + } + let timeout; + if ( + allPending.pendingOperations.length === 0 || + allPending.nextRetryDelay.d_ms === Number.MAX_SAFE_INTEGER + ) { + // Wait forever + timeout = new Promise(() => {}); + console.log("waiting forever"); + } else { + console.log("waiting for timeout", pending.nextRetryDelay); + timeout = this.timerGroup.resolveAfter( + allPending.nextRetryDelay.d_ms, + ); + } + this.ws.notify({ + type: NotificationType.WaitingForRetry, + numGivingLiveness, + numPending, + }); + await Promise.race([timeout, this.latch.wait()]); + console.log("timeout done"); + } else { + logger.trace("running pending operations that are due"); + // FIXME: maybe be a bit smarter about executing these + // opeations in parallel? + for (const p of pending.pendingOperations) { + try { + console.log("running", p); + await this.processOnePendingOperation(p); + } catch (e) { + console.error(e); + } + } } } + logger.trace("exiting wallet retry loop"); } /** @@ -429,7 +443,6 @@ export class Wallet { } } - /** * Check if and how an exchange is trusted and/or audited. */ @@ -466,7 +479,7 @@ export class Wallet { * Get detailed balance information, sliced by exchange and by currency. */ async getBalances(): Promise { - return getBalances(this.ws); + return this.ws.memoGetBalance.memo(() => getBalances(this.ws)); } async refresh(oldCoinPub: string, force: boolean = false): Promise { @@ -488,8 +501,12 @@ export class Wallet { return getHistory(this.ws, historyQuery); } - async getPendingOperations(): Promise { - return getPendingOperations(this.ws); + async getPendingOperations( + onlyDue: boolean = false, + ): Promise { + return this.ws.memoGetPending.memo(() => + getPendingOperations(this.ws, onlyDue), + ); } async getDenoms(exchangeUrl: string): Promise { @@ -517,7 +534,6 @@ export class Wallet { async updateCurrency(currencyRecord: CurrencyRecord): Promise { logger.trace("updating currency to", currencyRecord); await oneShotPut(this.db, Stores.currencies, currencyRecord); - this.notifier.notify(); } async getReserves(exchangeBaseUrl: string): Promise { @@ -552,7 +568,7 @@ export class Wallet { stop() { this.stopped = true; this.timerGroup.stopCurrentAndFutureTimers(); - this.cryptoApi.stop(); + this.ws.cryptoApi.stop(); } async getSenderWireInfos(): Promise { @@ -693,17 +709,13 @@ export class Wallet { const totalFees = totalRefundFees; return { contractTerms: purchase.contractTerms, - hasRefund: purchase.timestamp_refund !== undefined, + hasRefund: purchase.lastRefundTimestamp !== undefined, totalRefundAmount: totalRefundAmount, totalRefundAndRefreshFees: totalFees, }; } - clearNotification(): void { - this.badge.clearNotification(); - } - benchmarkCrypto(repetitions: number): Promise { - return this.cryptoApi.benchmark(repetitions); + return this.ws.cryptoApi.benchmark(repetitions); } } diff --git a/src/walletTypes.ts b/src/walletTypes.ts index be88fc5b0..d78fc8126 100644 --- a/src/walletTypes.ts +++ b/src/walletTypes.ts @@ -36,6 +36,7 @@ import { ExchangeRecord, ExchangeWireInfo, WithdrawalSource, + RetryInfo, } from "./dbTypes"; import { CoinPaySig, ContractTerms, PayReq } from "./talerTypes"; @@ -203,16 +204,6 @@ export interface PayCoinInfo { sigs: CoinPaySig[]; } -/** - * Listener for notifications from the wallet. - */ -export interface Notifier { - /** - * Called when a new notification arrives. - */ - notify(): void; -} - /** * For terseness. */ @@ -421,31 +412,6 @@ export interface TipStatus { totalFees: AmountJson; } -/** - * Badge that shows activity for the wallet. - */ -export interface Badge { - /** - * Start indicating background activity. - */ - startBusy(): void; - - /** - * Stop indicating background activity. - */ - stopBusy(): void; - - /** - * Show the notification in the badge. - */ - showNotification(): void; - - /** - * Stop showing the notification. - */ - clearNotification(): void; -} - export interface BenchmarkResult { time: { [s: string]: number }; repetitions: number; @@ -525,7 +491,7 @@ export interface WalletDiagnostics { export interface PendingWithdrawOperation { type: "withdraw"; - source: WithdrawalSource, + source: WithdrawalSource; withdrawSessionId: string; numCoinsWithdrawn: number; numCoinsTotal: number; @@ -539,6 +505,102 @@ export interface PendingPayOperation { type: "pay"; } +export const enum NotificationType { + ProposalAccepted = "proposal-accepted", + ProposalDownloaded = "proposal-downloaded", + RefundsSubmitted = "refunds-submitted", + PaybackStarted = "payback-started", + PaybackFinished = "payback-finished", + RefreshRevealed = "refresh-revealed", + RefreshMelted = "refresh-melted", + RefreshStarted = "refresh-started", + RefreshRefused = "refresh-refused", + ReserveUpdated = "reserve-updated", + ReserveConfirmed = "reserve-confirmed", + ReserveDepleted = "reserve-depleted", + WithdrawSessionFinished = "withdraw-session-finished", + WaitingForRetry = "waiting-for-retry", +} + +export interface ProposalAcceptedNotification { + type: NotificationType.ProposalAccepted; + proposalId: string; +} + +export interface ProposalDownloadedNotification { + type: NotificationType.ProposalDownloaded; + proposalId: string; +} + +export interface RefundsSubmittedNotification { + type: NotificationType.RefundsSubmitted; + proposalId: string; +} + +export interface PaybackStartedNotification { + type: NotificationType.PaybackStarted; +} + +export interface PaybackFinishedNotification { + type: NotificationType.PaybackFinished; +} + +export interface RefreshMeltedNotification { + type: NotificationType.RefreshMelted; +} + +export interface RefreshRevealedNotification { + type: NotificationType.RefreshRevealed; +} + +export interface RefreshStartedNotification { + type: NotificationType.RefreshStarted; +} + +export interface RefreshRefusedNotification { + type: NotificationType.RefreshRefused; +} + +export interface ReserveUpdatedNotification { + type: NotificationType.ReserveUpdated; +} + +export interface ReserveConfirmedNotification { + type: NotificationType.ReserveConfirmed; +} + +export interface WithdrawSessionFinishedNotification { + type: NotificationType.WithdrawSessionFinished; + withdrawSessionId: string; +} + +export interface ReserveDepletedNotification { + type: NotificationType.ReserveDepleted; + reservePub: string; +} + +export interface WaitingForRetryNotification { + type: NotificationType.WaitingForRetry; + numPending: number; + numGivingLiveness: number; +} + +export type WalletNotification = + | ProposalAcceptedNotification + | ProposalDownloadedNotification + | RefundsSubmittedNotification + | PaybackStartedNotification + | PaybackFinishedNotification + | RefreshMeltedNotification + | RefreshRevealedNotification + | RefreshStartedNotification + | RefreshRefusedNotification + | ReserveUpdatedNotification + | ReserveConfirmedNotification + | WithdrawSessionFinishedNotification + | ReserveDepletedNotification + | WaitingForRetryNotification; + export interface OperationError { type: string; message: string; @@ -561,7 +623,7 @@ export interface PendingBugOperation { export interface PendingReserveOperation { type: "reserve"; - lastError?: OperationError; + retryInfo: RetryInfo | undefined; stage: string; timestampCreated: Timestamp; reserveType: string; @@ -578,7 +640,6 @@ export interface PendingRefreshOperation { refreshOutputSize: number; } - export interface PendingDirtyCoinOperation { type: "dirty-coin"; coinPub: string; @@ -615,17 +676,24 @@ export interface PendingPayOperation { isReplay: boolean; } -export type PendingOperationInfo = - | PendingWithdrawOperation - | PendingReserveOperation - | PendingBugOperation - | PendingDirtyCoinOperation - | PendingExchangeUpdateOperation - | PendingRefreshOperation - | PendingTipOperation - | PendingProposalDownloadOperation - | PendingProposalChoiceOperation - | PendingPayOperation; +export interface PendingOperationInfoCommon { + type: string; + givesLifeness: boolean; +} + +export type PendingOperationInfo = PendingOperationInfoCommon & + ( + | PendingWithdrawOperation + | PendingReserveOperation + | PendingBugOperation + | PendingDirtyCoinOperation + | PendingExchangeUpdateOperation + | PendingRefreshOperation + | PendingTipOperation + | PendingProposalDownloadOperation + | PendingProposalChoiceOperation + | PendingPayOperation + ); export interface PendingOperationsResponse { pendingOperations: PendingOperationInfo[]; @@ -683,4 +751,4 @@ export interface PlanchetCreationRequest { denomPub: string; reservePub: string; reservePriv: string; -} \ No newline at end of file +} diff --git a/src/webex/chromeBadge.ts b/src/webex/chromeBadge.ts index 15b68ef02..e6b21ad91 100644 --- a/src/webex/chromeBadge.ts +++ b/src/webex/chromeBadge.ts @@ -14,9 +14,6 @@ TALER; see the file COPYING. If not, see */ -import { - Badge, -} from "../walletTypes"; import { isFirefox } from "./compat"; @@ -36,7 +33,7 @@ function rAF(cb: (ts: number) => void) { * Badge for Chrome that renders a Taler logo with a rotating ring if some * background activity is happening. */ -export class ChromeBadge implements Badge { +export class ChromeBadge { private canvas: HTMLCanvasElement; private ctx: CanvasRenderingContext2D; /** diff --git a/src/webex/messages.ts b/src/webex/messages.ts index cf409b44e..6c57385c3 100644 --- a/src/webex/messages.ts +++ b/src/webex/messages.ts @@ -145,10 +145,6 @@ export interface MessageMap { request: { talerTipUri: string }; response: walletTypes.TipStatus; }; - "clear-notification": { - request: {}; - response: void; - }; "accept-refund": { request: { refundUrl: string }; response: string; diff --git a/src/webex/wxApi.ts b/src/webex/wxApi.ts index ea26cd2eb..c4fa65186 100644 --- a/src/webex/wxApi.ts +++ b/src/webex/wxApi.ts @@ -280,13 +280,6 @@ export function acceptTip(talerTipUri: string): Promise { } -/** - * Clear notifications that the wallet shows to the user. - */ -export function clearNotification(): Promise { - return callBackend("clear-notification", { }); -} - /** * Download a refund and accept it. */ diff --git a/src/webex/wxBackend.ts b/src/webex/wxBackend.ts index 752027b70..4363890eb 100644 --- a/src/webex/wxBackend.ts +++ b/src/webex/wxBackend.ts @@ -28,7 +28,6 @@ import { AmountJson } from "../util/amounts"; import { ConfirmReserveRequest, CreateReserveRequest, - Notifier, ReturnCoinsRequest, WalletDiagnostics, } from "../walletTypes"; @@ -41,7 +40,7 @@ import { MessageType } from "./messages"; import * as wxApi from "./wxApi"; import Port = chrome.runtime.Port; import MessageSender = chrome.runtime.MessageSender; -import { BrowserCryptoWorkerFactory } from "../crypto/cryptoApi"; +import { BrowserCryptoWorkerFactory } from "../crypto/workers/cryptoApi"; import { OpenedPromise, openPromise } from "../util/promiseUtils"; const NeedsWallet = Symbol("NeedsWallet"); @@ -225,9 +224,6 @@ async function handleMessage( case "accept-tip": { return needsWallet().acceptTip(detail.talerTipUri); } - case "clear-notification": { - return needsWallet().clearNotification(); - } case "abort-failed-payment": { if (!detail.contractTermsHash) { throw Error("contracTermsHash not given"); @@ -331,31 +327,6 @@ async function dispatch( } } -class ChromeNotifier implements Notifier { - private ports: Port[] = []; - - constructor() { - chrome.runtime.onConnect.addListener(port => { - console.log("got connect!"); - this.ports.push(port); - port.onDisconnect.addListener(() => { - const i = this.ports.indexOf(port); - if (i >= 0) { - this.ports.splice(i, 1); - } else { - console.error("port already removed"); - } - }); - }); - } - - notify() { - for (const p of this.ports) { - p.postMessage({ notify: true }); - } - } -} - function getTab(tabId: number): Promise { return new Promise((resolve, reject) => { chrome.tabs.get(tabId, (tab: chrome.tabs.Tab) => resolve(tab)); @@ -458,16 +429,13 @@ async function reinitWallet() { return; } const http = new BrowserHttpLib(); - const notifier = new ChromeNotifier(); console.log("setting wallet"); const wallet = new Wallet( currentDatabase, http, - badge, - notifier, new BrowserCryptoWorkerFactory(), ); - wallet.runLoopScheduledRetries().catch((e) => { + wallet.runRetryLoop().catch((e) => { console.log("error during wallet retry loop", e); }); // Useful for debugging in the background page. @@ -621,21 +589,6 @@ export async function wxMain() { return true; }); - // Clear notifications both when the popop opens, - // as well when it closes. - chrome.runtime.onConnect.addListener(port => { - if (port.name === "popup") { - if (currentWallet) { - currentWallet.clearNotification(); - } - port.onDisconnect.addListener(() => { - if (currentWallet) { - currentWallet.clearNotification(); - } - }); - } - }); - // Handlers for catching HTTP requests chrome.webRequest.onHeadersReceived.addListener( details => { diff --git a/tsconfig.json b/tsconfig.json index 50359419b..1650171d5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -24,18 +24,17 @@ }, "files": [ "src/android/index.ts", - "src/crypto/browserWorkerEntry.ts", - "src/crypto/cryptoApi.ts", - "src/crypto/cryptoImplementation.ts", - "src/crypto/cryptoWorker.ts", - "src/crypto/nodeProcessWorker.ts", - "src/crypto/nodeWorkerEntry.ts", "src/crypto/primitives/kdf.ts", "src/crypto/primitives/nacl-fast.ts", "src/crypto/primitives/sha256.ts", - "src/crypto/synchronousWorker.ts", "src/crypto/talerCrypto-test.ts", "src/crypto/talerCrypto.ts", + "src/crypto/workers/browserWorkerEntry.ts", + "src/crypto/workers/cryptoApi.ts", + "src/crypto/workers/cryptoImplementation.ts", + "src/crypto/workers/cryptoWorker.ts", + "src/crypto/workers/nodeThreadWorker.ts", + "src/crypto/workers/synchronousWorker.ts", "src/db.ts", "src/dbTypes.ts", "src/headless/bank.ts", @@ -68,6 +67,7 @@ "src/util/timer.ts", "src/util/wire.ts", "src/wallet-impl/balance.ts", + "src/wallet-impl/errors.ts", "src/wallet-impl/exchanges.ts", "src/wallet-impl/history.ts", "src/wallet-impl/pay.ts", -- cgit v1.2.3