From 25eb7624b39f05d720c150b047c15c267e2a1a6d Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 26 Sep 2022 14:40:06 +0200 Subject: wallet-core: improve crypto worker error handling --- .../src/crypto/workers/cryptoDispatcher.ts | 136 +++++++++++++-------- .../src/crypto/workers/nodeThreadWorker.ts | 67 ++++++---- .../src/crypto/workers/synchronousWorkerNode.ts | 30 +++-- 3 files changed, 146 insertions(+), 87 deletions(-) (limited to 'packages/taler-wallet-core/src/crypto') diff --git a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts b/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts index 2ef0d7c69..48c9c6060 100644 --- a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts +++ b/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts @@ -22,7 +22,9 @@ /** * Imports. */ -import { Logger } from "@gnu-taler/taler-util"; +import { Logger, TalerErrorCode } from "@gnu-taler/taler-util"; +import { TalerError } from "../../errors.js"; +import { openPromise } from "../../util/promiseUtils.js"; import { timer, performanceNow, TimerHandle } from "../../util/timer.js"; import { nullCrypto, TalerCryptoInterface } from "../cryptoImplementation.js"; import { CryptoWorker } from "./cryptoWorkerInterface.js"; @@ -32,7 +34,7 @@ const logger = new Logger("cryptoApi.ts"); /** * State of a crypto worker. */ -interface WorkerState { +interface WorkerInfo { /** * The actual worker thread. */ @@ -64,6 +66,8 @@ interface WorkItem { * Time when the work was submitted to a (non-busy) worker thread. */ startTime: BigInt; + + state: WorkItemState; } /** @@ -92,12 +96,18 @@ export class CryptoApiStoppedError extends Error { } } +export enum WorkItemState { + Pending = 1, + Running = 2, + Finished = 3, +} + /** * Dispatcher for cryptographic operations to underlying crypto workers. */ export class CryptoDispatcher { private nextRpcId = 1; - private workers: WorkerState[]; + private workers: WorkerInfo[]; private workQueues: WorkItem[][]; private workerFactory: CryptoWorkerFactory; @@ -141,7 +151,7 @@ export class CryptoDispatcher { /** * Start a worker (if not started) and set as busy. */ - wake(ws: WorkerState, work: WorkItem): void { + wake(ws: WorkerInfo, work: WorkItem): void { if (this.stopped) { return; } @@ -167,10 +177,11 @@ export class CryptoDispatcher { }; this.resetWorkerTimeout(ws); work.startTime = performanceNow(); + work.state = WorkItemState.Running; timer.after(0, () => worker.postMessage(msg)); } - resetWorkerTimeout(ws: WorkerState): void { + resetWorkerTimeout(ws: WorkerInfo): void { if (ws.idleTimeoutHandle !== null) { ws.idleTimeoutHandle.clear(); ws.idleTimeoutHandle = null; @@ -187,7 +198,7 @@ export class CryptoDispatcher { ws.idleTimeoutHandle.unref(); } - handleWorkerError(ws: WorkerState, e: any): void { + handleWorkerError(ws: WorkerInfo, e: any): void { if (ws.currentWorkItem) { logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e); } else { @@ -203,6 +214,7 @@ export class CryptoDispatcher { logger.error(e as string); } if (ws.currentWorkItem !== null) { + ws.currentWorkItem.state = WorkItemState.Finished; ws.currentWorkItem.reject(e); ws.currentWorkItem = null; this.numBusy--; @@ -210,7 +222,7 @@ export class CryptoDispatcher { this.findWork(ws); } - private findWork(ws: WorkerState): void { + private findWork(ws: WorkerInfo): void { // try to find more work for this worker for (let i = 0; i < NUM_PRIO; i++) { const q = this.workQueues[NUM_PRIO - i - 1]; @@ -225,26 +237,38 @@ export class CryptoDispatcher { } } - handleWorkerMessage(ws: WorkerState, msg: any): void { + handleWorkerMessage(ws: WorkerInfo, msg: any): void { const id = msg.data.id; if (typeof id !== "number") { - console.error("rpc id must be number"); + logger.error("rpc id must be number"); return; } const currentWorkItem = ws.currentWorkItem; ws.currentWorkItem = null; - this.numBusy--; - this.findWork(ws); if (!currentWorkItem) { - console.error("unsolicited response from worker"); + logger.error("unsolicited response from worker"); return; } if (id !== currentWorkItem.rpcId) { - console.error(`RPC with id ${id} has no registry entry`); + logger.error(`RPC with id ${id} has no registry entry`); return; } - - currentWorkItem.resolve(msg.data.result); + if (currentWorkItem.state === WorkItemState.Running) { + this.numBusy--; + currentWorkItem.state = WorkItemState.Finished; + if (msg.data.type === "success") { + currentWorkItem.resolve(msg.data.result); + } else if (msg.data.type === "error") { + currentWorkItem.reject( + TalerError.fromDetail(TalerErrorCode.WALLET_CRYPTO_WORKER_ERROR, { + innerError: msg.data.error, + }), + ); + } else { + currentWorkItem.reject(new Error("bad message from crypto worker")); + } + } + this.findWork(ws); } cryptoApi: TalerCryptoInterface; @@ -258,7 +282,7 @@ export class CryptoDispatcher { this.cryptoApi = fns; this.workerFactory = workerFactory; - this.workers = new Array(workerFactory.getConcurrency()); + this.workers = new Array(workerFactory.getConcurrency()); for (let i = 0; i < this.workers.length; i++) { this.workers[i] = { @@ -282,36 +306,42 @@ export class CryptoDispatcher { if (this.stopped) { throw new CryptoApiStoppedError(); } - const p: Promise = new Promise((resolve, reject) => { - const rpcId = this.nextRpcId++; - const workItem: WorkItem = { - operation, - req, - resolve, - reject, - rpcId, - startTime: BigInt(0), - }; - - if (this.numBusy === this.workers.length) { - const q = this.workQueues[priority]; - if (!q) { - throw Error("assertion failed"); - } - this.workQueues[priority].push(workItem); - return; + const rpcId = this.nextRpcId++; + const myProm = openPromise(); + const workItem: WorkItem = { + operation, + req, + resolve: myProm.resolve, + reject: myProm.reject, + rpcId, + startTime: BigInt(0), + state: WorkItemState.Pending, + }; + let scheduled = false; + if (this.numBusy === this.workers.length) { + // All workers are busy, queue work item + const q = this.workQueues[priority]; + if (!q) { + throw Error("assertion failed"); } - + this.workQueues[priority].push(workItem); + scheduled = true; + } + if (!scheduled) { for (const ws of this.workers) { if (ws.currentWorkItem !== null) { continue; } this.wake(ws, workItem); - return; + scheduled = true; + break; } + } + if (!scheduled) { + // Could not schedule work. throw Error("assertion failed"); - }); + } // Make sure that we wait for the result while a timer is active // to prevent the event loop from dying, as just waiting for a promise @@ -324,21 +354,27 @@ export class CryptoDispatcher { logger.warn(`crypto RPC call ('${operation}') timed out`); timedOut = true; reject(new Error(`crypto RPC call ('${operation}') timed out`)); - }); - p.then((x) => { - if (timedOut) { - return; - } - timeout.clear(); - resolve(x); - }).catch((x) => { - logger.info(`crypto RPC call ${operation} threw`); - if (timedOut) { - return; + if (workItem.state === WorkItemState.Running) { + workItem.state = WorkItemState.Finished; + this.numBusy--; } - timeout.clear(); - reject(x); }); + myProm.promise + .then((x) => { + if (timedOut) { + return; + } + timeout.clear(); + resolve(x); + }) + .catch((x) => { + logger.info(`crypto RPC call ${operation} threw`); + if (timedOut) { + return; + } + timeout.clear(); + reject(x); + }); }); } } diff --git a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts index 42370fc1b..71f137f29 100644 --- a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts +++ b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts @@ -22,6 +22,7 @@ import { CryptoWorker } from "./cryptoWorkerInterface.js"; import os from "os"; import { Logger } from "@gnu-taler/taler-util"; import { nativeCryptoR } from "../cryptoImplementation.js"; +import { getErrorDetailFromException } from "../../errors.js"; const logger = new Logger("nodeThreadWorker.ts"); @@ -69,58 +70,72 @@ const workerCode = ` * a message. */ export function handleWorkerMessage(msg: any): void { - const req = msg.req; - if (typeof req !== "object") { - console.error("request must be an object"); - return; - } - const id = msg.id; - if (typeof id !== "number") { - console.error("RPC id must be number"); - return; - } - const operation = msg.operation; - if (typeof operation !== "string") { - console.error("RPC operation must be string"); - return; - } - const handleRequest = async (): Promise => { + const req = msg.req; + if (typeof req !== "object") { + logger.error("request must be an object"); + return; + } + const id = msg.id; + if (typeof id !== "number") { + logger.error("RPC id must be number"); + return; + } + const operation = msg.operation; + if (typeof operation !== "string") { + logger.error("RPC operation must be string"); + return; + } const impl = nativeCryptoR; if (!(operation in impl)) { - console.error(`crypto operation '${operation}' not found`); + logger.error(`crypto operation '${operation}' not found`); return; } + let responseMsg: any; + try { const result = await (impl as any)[operation](impl, req); + responseMsg = { data: { type: "success", result, id } }; + } catch (e: any) { + logger.error(`error during operation: ${e.stack ?? e.toString()}`); + responseMsg = { + data: { + type: "error", + error: getErrorDetailFromException(e), + id, + }, + }; + } + + try { // eslint-disable-next-line @typescript-eslint/no-var-requires const _r = "require"; const worker_threads: typeof import("worker_threads") = module[_r]("worker_threads"); // const worker_threads = require("worker_threads"); - const p = worker_threads.parentPort; - worker_threads.parentPort?.postMessage; if (p) { - p.postMessage({ data: { result, id } }); + p.postMessage(responseMsg); } else { - console.error("parent port not available (not running in thread?"); + logger.error("parent port not available (not running in thread?"); } - } catch (e) { - console.error("error during operation", e); + } catch (e: any) { + logger.error( + `error sending back operation result: ${e.stack ?? e.toString()}`, + ); return; } }; handleRequest().catch((e) => { - console.error("error in node worker", e); + logger.error("error in node worker", e); }); } export function handleWorkerError(e: Error): void { - console.log("got error from worker", e); + logger.error(`got error from worker: ${e.stack ?? e.toString()}`); } export class NodeThreadCryptoWorkerFactory implements CryptoWorkerFactory { @@ -161,7 +176,7 @@ class NodeThreadCryptoWorker implements CryptoWorker { this.nodeWorker = new worker_threads.Worker(workerCode, { eval: true }); this.nodeWorker.on("error", (err: Error) => { - console.error("error in node worker:", err); + logger.error("error in node worker:", err); if (this.onerror) { this.onerror(err); } diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts index 4dda9cd11..f3cfc5ef9 100644 --- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts +++ b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerNode.ts @@ -15,6 +15,7 @@ */ import { Logger } from "@gnu-taler/taler-util"; +import { getErrorDetailFromException } from "../../errors.js"; import { nativeCryptoR, TalerCryptoInterfaceR, @@ -139,7 +140,7 @@ export class SynchronousCryptoWorker { private dispatchMessage(msg: any): void { if (this.onmessage) { - this.onmessage({ data: msg }); + this.onmessage(msg); } } @@ -151,20 +152,27 @@ export class SynchronousCryptoWorker { const impl = this.cryptoImplR; if (!(operation in impl)) { - console.error(`crypto operation '${operation}' not found`); + logger.error(`crypto operation '${operation}' not found`); return; } - let result: any; + let responseMsg: any; try { - result = await (impl as any)[operation](impl, req); + const result = await (impl as any)[operation](impl, req); + responseMsg = { data: { type: "success", result, id } }; } catch (e: any) { - logger.error(`error during operation '${operation}': ${e}`); - return; + logger.error(`error during operation: ${e.stack ?? e.toString()}`); + responseMsg = { + data: { + type: "error", + id, + error: getErrorDetailFromException(e), + }, + }; } try { - setTimeout(() => this.dispatchMessage({ result, id }), 0); + setTimeout(() => this.dispatchMessage(responseMsg), 0); } catch (e) { logger.error("got error during dispatch", e); } @@ -176,22 +184,22 @@ export class SynchronousCryptoWorker { postMessage(msg: any): void { const req = msg.req; if (typeof req !== "object") { - console.error("request must be an object"); + logger.error("request must be an object"); return; } const id = msg.id; if (typeof id !== "number") { - console.error("RPC id must be number"); + logger.error("RPC id must be number"); return; } const operation = msg.operation; if (typeof operation !== "string") { - console.error("RPC operation must be string"); + logger.error("RPC operation must be string"); return; } this.handleRequest(operation, id, req).catch((e) => { - console.error("Error while handling crypto request:", e); + logger.error("Error while handling crypto request:", e); }); } -- cgit v1.2.3