diff options
author | Florian Dold <florian.dold@gmail.com> | 2019-12-05 19:38:19 +0100 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2019-12-05 19:38:19 +0100 |
commit | f67d7f54f9d0fed97446898942e3dfee67ee2985 (patch) | |
tree | 2b81738025e8f61250ede10908cbf81071e16975 /src/crypto | |
parent | 829acdd3d98f1014747f15ecb619b6fbaa06b640 (diff) |
threads, retries and notifications WIP
Diffstat (limited to 'src/crypto')
-rw-r--r-- | src/crypto/nodeProcessWorker.ts | 118 | ||||
-rw-r--r-- | src/crypto/nodeWorkerEntry.ts | 69 | ||||
-rw-r--r-- | src/crypto/workers/browserWorkerEntry.ts (renamed from src/crypto/browserWorkerEntry.ts) | 0 | ||||
-rw-r--r-- | src/crypto/workers/cryptoApi.ts (renamed from src/crypto/cryptoApi.ts) | 25 | ||||
-rw-r--r-- | src/crypto/workers/cryptoImplementation.ts (renamed from src/crypto/cryptoImplementation.ts) | 38 | ||||
-rw-r--r-- | src/crypto/workers/cryptoWorker.ts (renamed from src/crypto/cryptoWorker.ts) | 4 | ||||
-rw-r--r-- | src/crypto/workers/nodeThreadWorker.ts | 175 | ||||
-rw-r--r-- | src/crypto/workers/synchronousWorker.ts (renamed from src/crypto/synchronousWorker.ts) | 0 |
8 files changed, 220 insertions, 209 deletions
diff --git a/src/crypto/nodeProcessWorker.ts b/src/crypto/nodeProcessWorker.ts deleted file mode 100644 index 8ff149788..000000000 --- a/src/crypto/nodeProcessWorker.ts +++ /dev/null @@ -1,118 +0,0 @@ -import { CryptoWorkerFactory } from "./cryptoApi"; - -/* - This file is part of TALER - (C) 2016 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - */ - - -// tslint:disable:no-var-requires - -import { CryptoWorker } from "./cryptoWorker"; - -import path = require("path"); -import child_process = require("child_process"); - -const nodeWorkerEntry = path.join(__dirname, "nodeWorkerEntry.js"); - - -export class NodeCryptoWorkerFactory implements CryptoWorkerFactory { - startWorker(): CryptoWorker { - if (typeof require === "undefined") { - throw Error("cannot make worker, require(...) not defined"); - } - const workerCtor = require("./nodeProcessWorker").Worker; - const workerPath = __dirname + "/cryptoWorker.js"; - return new workerCtor(workerPath); - } - - getConcurrency(): number { - return 4; - } -} - -/** - * Worker implementation that uses node subprocesses. - */ -export class Worker { - private child: any; - - /** - * Function to be called when we receive a message from the worker thread. - */ - onmessage: undefined | ((m: any) => void); - - /** - * Function to be called when we receive an error from the worker thread. - */ - onerror: undefined | ((m: any) => void); - - private dispatchMessage(msg: any) { - if (this.onmessage) { - this.onmessage({ data: msg }); - } else { - console.warn("no handler for worker event 'message' defined") - } - } - - private dispatchError(msg: any) { - if (this.onerror) { - this.onerror({ data: msg }); - } else { - console.warn("no handler for worker event 'error' defined") - } - } - - constructor() { - this.child = child_process.fork(nodeWorkerEntry); - this.onerror = undefined; - this.onmessage = undefined; - - this.child.on("error", (e: any) => { - this.dispatchError(e); - }); - - this.child.on("message", (msg: any) => { - this.dispatchMessage(msg); - }); - } - - /** - * Add an event listener for either an "error" or "message" event. - */ - addEventListener(event: "message" | "error", fn: (x: any) => void): void { - switch (event) { - case "message": - this.onmessage = fn; - break; - case "error": - this.onerror = fn; - break; - } - } - - /** - * Send a message to the worker thread. - */ - postMessage (msg: any) { - this.child.send(JSON.stringify({data: msg})); - } - - /** - * Forcibly terminate the worker thread. - */ - terminate () { - this.child.kill("SIGINT"); - } -} diff --git a/src/crypto/nodeWorkerEntry.ts b/src/crypto/nodeWorkerEntry.ts deleted file mode 100644 index 1e088d987..000000000 --- a/src/crypto/nodeWorkerEntry.ts +++ /dev/null @@ -1,69 +0,0 @@ -/* - This file is part of TALER - (C) 2016 GNUnet e.V. - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU General Public License for more details. - - You should have received a copy of the GNU General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - */ - -// tslint:disable:no-var-requires - -import { CryptoImplementation } from "./cryptoImplementation"; - -async function handleRequest(operation: string, id: number, args: string[]) { - - const impl = new CryptoImplementation(); - - if (!(operation in impl)) { - console.error(`crypto operation '${operation}' not found`); - return; - } - - try { - const result = (impl as any)[operation](...args); - if (process.send) { - process.send({ result, id }); - } else { - console.error("process.send not available"); - } - } catch (e) { - console.error("error during operation", e); - return; - } -} - -process.on("message", (msgStr: any) => { - const msg = JSON.parse(msgStr); - - const args = msg.data.args; - if (!Array.isArray(args)) { - console.error("args must be array"); - return; - } - const id = msg.data.id; - if (typeof id !== "number") { - console.error("RPC id must be number"); - return; - } - const operation = msg.data.operation; - if (typeof operation !== "string") { - console.error("RPC operation must be string"); - return; - } - - handleRequest(operation, id, args).catch((e) => { - console.error("error in node worker", e); - }); -}); - -process.on("uncaughtException", (err: any) => { - console.error("uncaught exception in node worker entry", err); -}); diff --git a/src/crypto/browserWorkerEntry.ts b/src/crypto/workers/browserWorkerEntry.ts index 5ac762c13..5ac762c13 100644 --- a/src/crypto/browserWorkerEntry.ts +++ b/src/crypto/workers/browserWorkerEntry.ts diff --git a/src/crypto/cryptoApi.ts b/src/crypto/workers/cryptoApi.ts index 2521b54ea..5537bb39f 100644 --- a/src/crypto/cryptoApi.ts +++ b/src/crypto/workers/cryptoApi.ts @@ -22,7 +22,7 @@ /** * Imports. */ -import { AmountJson } from "../util/amounts"; +import { AmountJson } from "../../util/amounts"; import { CoinRecord, @@ -30,15 +30,21 @@ import { RefreshSessionRecord, TipPlanchet, WireFee, -} from "../dbTypes"; +} from "../../dbTypes"; import { CryptoWorker } from "./cryptoWorker"; -import { ContractTerms, PaybackRequest } from "../talerTypes"; +import { ContractTerms, PaybackRequest } from "../../talerTypes"; -import { BenchmarkResult, CoinWithDenom, PayCoinInfo, PlanchetCreationResult, PlanchetCreationRequest } from "../walletTypes"; +import { + BenchmarkResult, + CoinWithDenom, + PayCoinInfo, + PlanchetCreationResult, + PlanchetCreationRequest, +} from "../../walletTypes"; -import * as timer from "../util/timer"; +import * as timer from "../../util/timer"; /** * State of a crypto worker. @@ -172,7 +178,8 @@ export class CryptoApi { wake(ws: WorkerState, work: WorkItem): void { if (this.stopped) { console.log("cryptoApi is stopped"); - CryptoApi.enableTracing && console.log("not waking, as cryptoApi is stopped"); + CryptoApi.enableTracing && + console.log("not waking, as cryptoApi is stopped"); return; } if (ws.currentWorkItem !== null) { @@ -333,7 +340,7 @@ export class CryptoApi { } createPlanchet( - req: PlanchetCreationRequest + req: PlanchetCreationRequest, ): Promise<PlanchetCreationResult> { return this.doRpc<PlanchetCreationResult>("createPlanchet", 1, req); } @@ -398,6 +405,10 @@ export class CryptoApi { return this.doRpc<string>("rsaUnblind", 4, sig, bk, pk); } + rsaVerify(hm: string, sig: string, pk: string): Promise<boolean> { + return this.doRpc<boolean>("rsaVerify", 4, hm, sig, pk); + } + createPaybackRequest(coin: CoinRecord): Promise<PaybackRequest> { return this.doRpc<PaybackRequest>("createPaybackRequest", 1, coin); } diff --git a/src/crypto/cryptoImplementation.ts b/src/crypto/workers/cryptoImplementation.ts index faebbaa4a..00d81ce27 100644 --- a/src/crypto/cryptoImplementation.ts +++ b/src/crypto/workers/cryptoImplementation.ts @@ -30,12 +30,12 @@ import { DenominationRecord, RefreshPlanchetRecord, RefreshSessionRecord, - ReserveRecord, TipPlanchet, WireFee, -} from "../dbTypes"; + initRetryInfo, +} from "../../dbTypes"; -import { CoinPaySig, ContractTerms, PaybackRequest } from "../talerTypes"; +import { CoinPaySig, ContractTerms, PaybackRequest } from "../../talerTypes"; import { BenchmarkResult, CoinWithDenom, @@ -43,11 +43,12 @@ import { Timestamp, PlanchetCreationResult, PlanchetCreationRequest, -} from "../walletTypes"; -import { canonicalJson, getTalerStampSec } from "../util/helpers"; -import { AmountJson } from "../util/amounts"; -import * as Amounts from "../util/amounts"; -import * as timer from "../util/timer"; + getTimestampNow, +} from "../../walletTypes"; +import { canonicalJson, getTalerStampSec } from "../../util/helpers"; +import { AmountJson } from "../../util/amounts"; +import * as Amounts from "../../util/amounts"; +import * as timer from "../../util/timer"; import { getRandomBytes, encodeCrock, @@ -64,8 +65,9 @@ import { createEcdheKeyPair, keyExchangeEcdheEddsa, setupRefreshPlanchet, -} from "./talerCrypto"; -import { randomBytes } from "./primitives/nacl-fast"; + rsaVerify, +} from "../talerCrypto"; +import { randomBytes } from "../primitives/nacl-fast"; enum SignaturePurpose { RESERVE_WITHDRAW = 1200, @@ -304,9 +306,9 @@ export class CryptoImplementation { /** * Unblind a blindly signed value. */ - rsaUnblind(sig: string, bk: string, pk: string): string { + rsaUnblind(blindedSig: string, bk: string, pk: string): string { const denomSig = rsaUnblind( - decodeCrock(sig), + decodeCrock(blindedSig), decodeCrock(pk), decodeCrock(bk), ); @@ -314,6 +316,13 @@ export class CryptoImplementation { } /** + * Unblind a blindly signed value. + */ + rsaVerify(hm: string, sig: string, pk: string): boolean { + return rsaVerify(hash(decodeCrock(hm)), decodeCrock(sig), decodeCrock(pk)); + } + + /** * Generate updated coins (to store in the database) * and deposit permissions for each given coin. */ @@ -488,7 +497,6 @@ export class CryptoImplementation { refreshSessionId, confirmSig: encodeCrock(confirmSig), exchangeBaseUrl, - finished: false, hash: encodeCrock(sessionHash), meltCoinPub: meltCoin.coinPub, newDenomHashes: newCoinDenoms.map(d => d.denomPubHash), @@ -499,6 +507,10 @@ export class CryptoImplementation { transferPubs, valueOutput, valueWithFee, + created: getTimestampNow(), + retryInfo: initRetryInfo(), + finishedTimestamp: undefined, + lastError: undefined, }; return refreshSession; diff --git a/src/crypto/cryptoWorker.ts b/src/crypto/workers/cryptoWorker.ts index 0ea641dde..d4449f4a2 100644 --- a/src/crypto/cryptoWorker.ts +++ b/src/crypto/workers/cryptoWorker.ts @@ -3,6 +3,6 @@ export interface CryptoWorker { terminate(): void; - onmessage: (m: any) => void; - onerror: (m: any) => void; + onmessage: ((m: any) => void) | undefined; + onerror: ((m: any) => void) | undefined; }
\ No newline at end of file diff --git a/src/crypto/workers/nodeThreadWorker.ts b/src/crypto/workers/nodeThreadWorker.ts new file mode 100644 index 000000000..b42031c40 --- /dev/null +++ b/src/crypto/workers/nodeThreadWorker.ts @@ -0,0 +1,175 @@ +import { CryptoWorkerFactory } from "./cryptoApi"; + +/* + This file is part of TALER + (C) 2016 GNUnet e.V. + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +// tslint:disable:no-var-requires + +import { CryptoWorker } from "./cryptoWorker"; + +import worker_threads = require("worker_threads"); +import os = require("os"); +import { CryptoImplementation } from "./cryptoImplementation"; + +const f = __filename; + +const workerCode = ` + const worker_threads = require('worker_threads'); + const parentPort = worker_threads.parentPort; + let tw; + try { + tw = require("${f}"); + } catch (e) { + console.log("could not load from ${f}"); + } + if (!tw) { + try { + tw = require("taler-wallet-android"); + } catch (e) { + console.log("could not load taler-wallet-android either"); + throw e; + } + } + parentPort.on("message", tw.handleWorkerMessage); + parentPort.on("error", tw.handleWorkerError); +`; + +/** + * This function is executed in the worker thread to handle + * a message. + */ +export function handleWorkerMessage(msg: any) { + const args = msg.args; + if (!Array.isArray(args)) { + console.error("args must be array"); + return; + } + const id = msg.id; + if (typeof id !== "number") { + console.error("RPC id must be number"); + return; + } + const operation = msg.operation; + if (typeof operation !== "string") { + console.error("RPC operation must be string"); + return; + } + + const handleRequest = async () => { + const impl = new CryptoImplementation(); + + if (!(operation in impl)) { + console.error(`crypto operation '${operation}' not found`); + return; + } + + try { + const result = (impl as any)[operation](...args); + const p = worker_threads.parentPort; + worker_threads.parentPort?.postMessage; + if (p) { + p.postMessage({ data: { result, id } }); + } else { + console.error("parent port not available (not running in thread?"); + } + } catch (e) { + console.error("error during operation", e); + return; + } + }; + + handleRequest().catch(e => { + console.error("error in node worker", e); + }); +} + +export function handleWorkerError(e: Error) { + console.log("got error from worker", e); +} + +export class NodeThreadCryptoWorkerFactory implements CryptoWorkerFactory { + startWorker(): CryptoWorker { + if (typeof require === "undefined") { + throw Error("cannot make worker, require(...) not defined"); + } + return new NodeThreadCryptoWorker(); + } + + getConcurrency(): number { + return Math.max(1, os.cpus().length - 1); + } +} + +/** + * Worker implementation that uses node subprocesses. + */ +class NodeThreadCryptoWorker implements CryptoWorker { + /** + * Function to be called when we receive a message from the worker thread. + */ + onmessage: undefined | ((m: any) => void); + + /** + * Function to be called when we receive an error from the worker thread. + */ + onerror: undefined | ((m: any) => void); + + private nodeWorker: worker_threads.Worker; + + constructor() { + this.nodeWorker = new worker_threads.Worker(workerCode, { eval: true }); + this.nodeWorker.on("error", (err: Error) => { + console.error("error in node worker:", err); + if (this.onerror) { + this.onerror(err); + } + }); + this.nodeWorker.on("message", (v: any) => { + if (this.onmessage) { + this.onmessage(v); + } + }); + this.nodeWorker.unref(); + } + + /** + * Add an event listener for either an "error" or "message" event. + */ + addEventListener(event: "message" | "error", fn: (x: any) => void): void { + switch (event) { + case "message": + this.onmessage = fn; + break; + case "error": + this.onerror = fn; + break; + } + } + + /** + * Send a message to the worker thread. + */ + postMessage(msg: any) { + this.nodeWorker.postMessage(msg); + } + + /** + * Forcibly terminate the worker thread. + */ + terminate() { + this.nodeWorker.terminate(); + } +} diff --git a/src/crypto/synchronousWorker.ts b/src/crypto/workers/synchronousWorker.ts index 12eecde9a..12eecde9a 100644 --- a/src/crypto/synchronousWorker.ts +++ b/src/crypto/workers/synchronousWorker.ts |