From 49ec724049faa043966d0b392644e86638b8203e Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 11 Oct 2016 14:08:41 +0200 Subject: Terminate workers after timeout, handle errors --- lib/wallet/cryptoApi.ts | 138 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 97 insertions(+), 41 deletions(-) (limited to 'lib') diff --git a/lib/wallet/cryptoApi.ts b/lib/wallet/cryptoApi.ts index db29592fc..855afbb4e 100644 --- a/lib/wallet/cryptoApi.ts +++ b/lib/wallet/cryptoApi.ts @@ -38,11 +38,22 @@ interface WorkerState { /** * The actual worker thread. */ - w: Worker; + w: Worker|null; + /** * Are we currently running a task on this worker? */ busy: boolean; + + /** + * Timer to terminate the worker if it's not busy enough. + */ + terminationTimerHandle: number|null; + + /** + * Index of this worker in the list of workers. + */ + workerIndex: number; } interface WorkItem { @@ -74,53 +85,99 @@ export class CryptoApi { private numWaiting: number = 0; - constructor() { - let handler = (msg: MessageEvent) => { - let id = msg.data.id; - if (typeof id !== "number") { - console.error("rpc id must be number"); - return; - } - if (!this.rpcRegistry[id]) { - console.error(`RPC with id ${id} has no registry entry`); - return; - } - let {resolve, workerIndex} = this.rpcRegistry[id]; - delete this.rpcRegistry[id]; - let ws = this.workers[workerIndex]; - if (!ws.busy) { - throw Error("assertion failed"); + /** + * Start a worker (if not started) and set as busy. + */ + wake(ws: WorkerState) { + if (ws.busy) { + throw Error("assertion failed"); + } + ws.busy = true; + this.numBusy++; + if (!ws.w) { + let w = new Worker("/lib/wallet/cryptoWorker.js"); + w.onmessage = (m: MessageEvent) => this.handleWorkerMessage(ws, m); + w.onerror = (e: ErrorEvent) => this.handleWorkerError(ws, e); + ws.w = w; + } + if (ws.terminationTimerHandle != null) { + clearTimeout(ws.terminationTimerHandle); + } + let destroy = () => { + if (ws.w && !ws.busy) { + ws.w!.terminate(); + ws.w = null; } + } + ws.terminationTimerHandle = setTimeout(destroy, 20 * 1000); + } + + handleWorkerError(ws: WorkerState, e: ErrorEvent) { + console.error("error in worker", e); + try { + ws.w!.terminate(); + } catch (e) { + console.error(e); + } + if (ws.busy) { ws.busy = false; this.numBusy--; - resolve(msg.data.result); - - // try to find more work for this worker - for (let i = 0; i < NUM_PRIO; i++) { - let q = this.workQueues[NUM_PRIO - i - 1]; - if (q.length != 0) { - let work: WorkItem = q.shift()!; - let msg: any = { - operation: work.operation, - args: work.args, - id: this.registerRpcId(work.resolve, work.reject, workerIndex), - }; - ws.w.postMessage(msg); - ws.busy = true; - this.numBusy++; - return; - } + } + this.findWork(ws); + } + + findWork(ws: WorkerState) { + // try to find more work for this worker + for (let i = 0; i < NUM_PRIO; i++) { + let q = this.workQueues[NUM_PRIO - i - 1]; + if (q.length != 0) { + let work: WorkItem = q.shift()!; + let msg: any = { + operation: work.operation, + args: work.args, + id: this.registerRpcId(work.resolve, work.reject, ws.workerIndex), + }; + + this.wake(ws); + ws.w!.postMessage(msg); + return; } - }; + } + } + + handleWorkerMessage(ws: WorkerState, msg: MessageEvent) { + let id = msg.data.id; + if (typeof id !== "number") { + console.error("rpc id must be number"); + return; + } + if (!this.rpcRegistry[id]) { + console.error(`RPC with id ${id} has no registry entry`); + return; + } + let {resolve, workerIndex} = this.rpcRegistry[id]; + delete this.rpcRegistry[id]; + if (workerIndex != ws.workerIndex) { + throw Error("assertion failed"); + } + if (!ws.busy) { + throw Error("assertion failed"); + } + ws.busy = false; + this.numBusy--; + resolve(msg.data.result); + this.findWork(ws); + } + constructor() { this.workers = new Array((navigator as any)["hardwareConcurrency"] || 2); for (let i = 0; i < this.workers.length; i++) { - let w = new Worker("/lib/wallet/cryptoWorker.js"); - w.onmessage = handler; this.workers[i] = { - w, + w: null, busy: false, + terminationTimerHandle: null, + workerIndex: i, }; } this.workQueues = []; @@ -156,15 +213,14 @@ export class CryptoApi { continue; } - ws.busy = true; - this.numBusy++; + this.wake(ws); return new Promise((resolve, reject) => { let msg: any = { operation, args, id: this.registerRpcId(resolve, reject, i), }; - ws.w.postMessage(msg); + ws.w!.postMessage(msg); }); } -- cgit v1.2.3