aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2016-10-11 14:08:41 +0200
committerFlorian Dold <florian.dold@gmail.com>2016-10-11 14:51:58 +0200
commit49ec724049faa043966d0b392644e86638b8203e (patch)
tree88ce8f065b464bf6a763a7374d6878acda44a5fb
parent31bddb98c60e34bb161f08c576f7d7437dbbe15c (diff)
downloadwallet-core-49ec724049faa043966d0b392644e86638b8203e.tar.xz
Terminate workers after timeout, handle errors
-rw-r--r--lib/wallet/cryptoApi.ts138
1 files changed, 97 insertions, 41 deletions
diff --git a/lib/wallet/cryptoApi.ts b/lib/wallet/cryptoApi.ts
index db29592fc..855afbb4e 100644
--- a/lib/wallet/cryptoApi.ts
+++ b/lib/wallet/cryptoApi.ts
@@ -38,11 +38,22 @@ interface WorkerState {
/**
* The actual worker thread.
*/
- w: Worker;
+ w: Worker|null;
+
/**
* Are we currently running a task on this worker?
*/
busy: boolean;
+
+ /**
+ * Timer to terminate the worker if it's not busy enough.
+ */
+ terminationTimerHandle: number|null;
+
+ /**
+ * Index of this worker in the list of workers.
+ */
+ workerIndex: number;
}
interface WorkItem {
@@ -74,53 +85,99 @@ export class CryptoApi {
private numWaiting: number = 0;
- constructor() {
- let handler = (msg: MessageEvent) => {
- let id = msg.data.id;
- if (typeof id !== "number") {
- console.error("rpc id must be number");
- return;
- }
- if (!this.rpcRegistry[id]) {
- console.error(`RPC with id ${id} has no registry entry`);
- return;
- }
- let {resolve, workerIndex} = this.rpcRegistry[id];
- delete this.rpcRegistry[id];
- let ws = this.workers[workerIndex];
- if (!ws.busy) {
- throw Error("assertion failed");
+ /**
+ * Start a worker (if not started) and set as busy.
+ */
+ wake(ws: WorkerState) {
+ if (ws.busy) {
+ throw Error("assertion failed");
+ }
+ ws.busy = true;
+ this.numBusy++;
+ if (!ws.w) {
+ let w = new Worker("/lib/wallet/cryptoWorker.js");
+ w.onmessage = (m: MessageEvent) => this.handleWorkerMessage(ws, m);
+ w.onerror = (e: ErrorEvent) => this.handleWorkerError(ws, e);
+ ws.w = w;
+ }
+ if (ws.terminationTimerHandle != null) {
+ clearTimeout(ws.terminationTimerHandle);
+ }
+ let destroy = () => {
+ if (ws.w && !ws.busy) {
+ ws.w!.terminate();
+ ws.w = null;
}
+ }
+ ws.terminationTimerHandle = setTimeout(destroy, 20 * 1000);
+ }
+
+ handleWorkerError(ws: WorkerState, e: ErrorEvent) {
+ console.error("error in worker", e);
+ try {
+ ws.w!.terminate();
+ } catch (e) {
+ console.error(e);
+ }
+ if (ws.busy) {
ws.busy = false;
this.numBusy--;
- resolve(msg.data.result);
-
- // try to find more work for this worker
- for (let i = 0; i < NUM_PRIO; i++) {
- let q = this.workQueues[NUM_PRIO - i - 1];
- if (q.length != 0) {
- let work: WorkItem = q.shift()!;
- let msg: any = {
- operation: work.operation,
- args: work.args,
- id: this.registerRpcId(work.resolve, work.reject, workerIndex),
- };
- ws.w.postMessage(msg);
- ws.busy = true;
- this.numBusy++;
- return;
- }
+ }
+ this.findWork(ws);
+ }
+
+ findWork(ws: WorkerState) {
+ // try to find more work for this worker
+ for (let i = 0; i < NUM_PRIO; i++) {
+ let q = this.workQueues[NUM_PRIO - i - 1];
+ if (q.length != 0) {
+ let work: WorkItem = q.shift()!;
+ let msg: any = {
+ operation: work.operation,
+ args: work.args,
+ id: this.registerRpcId(work.resolve, work.reject, ws.workerIndex),
+ };
+
+ this.wake(ws);
+ ws.w!.postMessage(msg);
+ return;
}
- };
+ }
+ }
+
+ handleWorkerMessage(ws: WorkerState, msg: MessageEvent) {
+ let id = msg.data.id;
+ if (typeof id !== "number") {
+ console.error("rpc id must be number");
+ return;
+ }
+ if (!this.rpcRegistry[id]) {
+ console.error(`RPC with id ${id} has no registry entry`);
+ return;
+ }
+ let {resolve, workerIndex} = this.rpcRegistry[id];
+ delete this.rpcRegistry[id];
+ if (workerIndex != ws.workerIndex) {
+ throw Error("assertion failed");
+ }
+ if (!ws.busy) {
+ throw Error("assertion failed");
+ }
+ ws.busy = false;
+ this.numBusy--;
+ resolve(msg.data.result);
+ this.findWork(ws);
+ }
+ constructor() {
this.workers = new Array<WorkerState>((navigator as any)["hardwareConcurrency"] || 2);
for (let i = 0; i < this.workers.length; i++) {
- let w = new Worker("/lib/wallet/cryptoWorker.js");
- w.onmessage = handler;
this.workers[i] = {
- w,
+ w: null,
busy: false,
+ terminationTimerHandle: null,
+ workerIndex: i,
};
}
this.workQueues = [];
@@ -156,15 +213,14 @@ export class CryptoApi {
continue;
}
- ws.busy = true;
- this.numBusy++;
+ this.wake(ws);
return new Promise<T>((resolve, reject) => {
let msg: any = {
operation, args,
id: this.registerRpcId(resolve, reject, i),
};
- ws.w.postMessage(msg);
+ ws.w!.postMessage(msg);
});
}