diff options
Diffstat (limited to 'packages/taler-wallet-core/src/crypto/workers/cryptoApi.ts')
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/cryptoApi.ts | 74 |
1 files changed, 55 insertions, 19 deletions
diff --git a/packages/taler-wallet-core/src/crypto/workers/cryptoApi.ts b/packages/taler-wallet-core/src/crypto/workers/cryptoApi.ts index d91aa08a2..ca498bff1 100644 --- a/packages/taler-wallet-core/src/crypto/workers/cryptoApi.ts +++ b/packages/taler-wallet-core/src/crypto/workers/cryptoApi.ts @@ -75,7 +75,7 @@ interface WorkerState { /** * Timer to terminate the worker if it's not busy enough. */ - terminationTimerHandle: timer.TimerHandle | null; + idleTimeoutHandle: timer.TimerHandle | null; } interface WorkItem { @@ -114,6 +114,13 @@ export interface CryptoWorkerFactory { getConcurrency(): number; } +export class CryptoApiStoppedError extends Error { + constructor() { + super("Crypto API stopped"); + Object.setPrototypeOf(this, CryptoApiStoppedError.prototype); + } +} + /** * Crypto API that interfaces manages a background crypto thread * for the execution of expensive operations. @@ -140,25 +147,25 @@ export class CryptoApi { */ terminateWorkers(): void { for (const worker of this.workers) { + if (worker.idleTimeoutHandle) { + worker.idleTimeoutHandle.clear(); + worker.idleTimeoutHandle = null; + } + if (worker.currentWorkItem) { + worker.currentWorkItem.reject(Error("explicitly terminated")); + worker.currentWorkItem = null; + } if (worker.w) { logger.trace("terminating worker"); worker.w.terminate(); - if (worker.terminationTimerHandle) { - worker.terminationTimerHandle.clear(); - worker.terminationTimerHandle = null; - } - if (worker.currentWorkItem) { - worker.currentWorkItem.reject(Error("explicitly terminated")); - worker.currentWorkItem = null; - } worker.w = null; } } } stop(): void { - this.terminateWorkers(); this.stopped = true; + this.terminateWorkers(); } /** @@ -166,8 +173,7 @@ export class CryptoApi { */ wake(ws: WorkerState, work: WorkItem): void { if (this.stopped) { - logger.trace("cryptoApi is stopped"); - return; + throw new CryptoApiStoppedError(); } if (ws.currentWorkItem !== null) { throw Error("assertion failed"); @@ -195,19 +201,20 @@ export class CryptoApi { } resetWorkerTimeout(ws: WorkerState): void { - if (ws.terminationTimerHandle !== null) { - ws.terminationTimerHandle.clear(); - ws.terminationTimerHandle = null; + if (ws.idleTimeoutHandle !== null) { + ws.idleTimeoutHandle.clear(); + ws.idleTimeoutHandle = null; } const destroy = (): void => { + logger.trace("destroying crypto worker after idle timeout"); // terminate worker if it's idle if (ws.w && ws.currentWorkItem === null) { ws.w.terminate(); ws.w = null; } }; - ws.terminationTimerHandle = timer.after(15 * 1000, destroy); - //ws.terminationTimerHandle.unref(); + ws.idleTimeoutHandle = timer.after(15 * 1000, destroy); + ws.idleTimeoutHandle.unref(); } handleWorkerError(ws: WorkerState, e: any): void { @@ -277,7 +284,7 @@ export class CryptoApi { for (let i = 0; i < this.workers.length; i++) { this.workers[i] = { currentWorkItem: null, - terminationTimerHandle: null, + idleTimeoutHandle: null, w: null, }; } @@ -293,6 +300,9 @@ export class CryptoApi { priority: number, ...args: any[] ): Promise<T> { + if (this.stopped) { + throw new CryptoApiStoppedError(); + } const p: Promise<T> = new Promise<T>((resolve, reject) => { const rpcId = this.nextRpcId++; const workItem: WorkItem = { @@ -324,7 +334,33 @@ export class CryptoApi { throw Error("assertion failed"); }); - return p; + // Make sure that we wait for the result while a timer is active + // to prevent the event loop from dying, as just waiting for a promise + // does not keep the process alive in Node. + // (The worker child process won't keep us alive either, because we un-ref + // it to make sure it doesn't keep us alive if there is no work.) + return new Promise<T>((resolve, reject) => { + let timedOut = false; + const timeout = timer.after(5000, () => { + logger.warn("crypto RPC call timed out"); + timedOut = true; + reject(new Error("crypto RPC call timed out")); + }); + p.then((x) => { + if (timedOut) { + return; + } + timeout.clear(); + resolve(x); + }); + p.catch((x) => { + if (timedOut) { + return; + } + timeout.clear(); + reject(x); + }); + }); } createPlanchet(req: PlanchetCreationRequest): Promise<WithdrawalPlanchet> { |