aboutsummaryrefslogtreecommitdiff
path: root/lib/wallet/cryptoApi.ts
diff options
context:
space:
mode:
Diffstat (limited to 'lib/wallet/cryptoApi.ts')
-rw-r--r--lib/wallet/cryptoApi.ts149
1 files changed, 67 insertions, 82 deletions
diff --git a/lib/wallet/cryptoApi.ts b/lib/wallet/cryptoApi.ts
index ec20dd964..88b82ae3b 100644
--- a/lib/wallet/cryptoApi.ts
+++ b/lib/wallet/cryptoApi.ts
@@ -28,11 +28,6 @@ import {CoinWithDenom} from "./wallet";
import {PayCoinInfo} from "./types";
import {RefreshSession} from "./types";
-interface RegistryEntry {
- resolve: any;
- reject: any;
- workerIndex: number;
-}
interface WorkerState {
/**
@@ -41,19 +36,14 @@ interface WorkerState {
w: Worker|null;
/**
- * Are we currently running a task on this worker?
+ * Work we're currently executing or null if not busy.
*/
- busy: boolean;
+ currentWorkItem: WorkItem|null;
/**
* Timer to terminate the worker if it's not busy enough.
*/
terminationTimerHandle: number|null;
-
- /**
- * Index of this worker in the list of workers.
- */
- workerIndex: number;
}
interface WorkItem {
@@ -61,6 +51,11 @@ interface WorkItem {
args: any[];
resolve: any;
reject: any;
+
+ /**
+ * Serial id to identify a matching response.
+ */
+ rpcId: number;
}
@@ -72,27 +67,21 @@ const NUM_PRIO = 5;
export class CryptoApi {
private nextRpcId: number = 1;
- private rpcRegistry: {[n: number]: RegistryEntry} = {};
private workers: WorkerState[];
private workQueues: WorkItem[][];
/**
* Number of busy workers.
*/
private numBusy: number = 0;
- /**
- * Number if pending work items.
- */
- private numWaiting: number = 0;
-
/**
* Start a worker (if not started) and set as busy.
*/
- wake(ws: WorkerState) {
- if (ws.busy) {
+ wake<T>(ws: WorkerState, work: WorkItem): void {
+ if (ws.currentWorkItem != null) {
throw Error("assertion failed");
}
- ws.busy = true;
+ ws.currentWorkItem = work;
this.numBusy++;
if (!ws.w) {
let w = new Worker("/lib/wallet/cryptoWorker.js");
@@ -100,27 +89,45 @@ export class CryptoApi {
w.onerror = (e: ErrorEvent) => this.handleWorkerError(ws, e);
ws.w = w;
}
+
+ let msg: any = {
+ operation: work.operation, args: work.args,
+ id: work.rpcId
+ };
+ this.resetWorkerTimeout(ws);
+ ws.w!.postMessage(msg);
+ }
+
+ resetWorkerTimeout(ws: WorkerState) {
if (ws.terminationTimerHandle != null) {
clearTimeout(ws.terminationTimerHandle);
}
let destroy = () => {
- if (ws.w && !ws.busy) {
+ // terminate worker if it's idle
+ if (ws.w && ws.currentWorkItem == null) {
ws.w!.terminate();
ws.w = null;
}
- }
+ };
ws.terminationTimerHandle = setTimeout(destroy, 20 * 1000);
}
handleWorkerError(ws: WorkerState, e: ErrorEvent) {
- console.error("error in worker", e);
+ if (ws.currentWorkItem) {
+ console.error(`error in worker during ${ws.currentWorkItem!.operation}`, e);
+ } else {
+ console.error("error in worker", e);
+ }
+ console.error(e.message);
try {
ws.w!.terminate();
+ ws.w = null;
} catch (e) {
console.error(e);
}
- if (ws.busy) {
- ws.busy = false;
+ if (ws.currentWorkItem != null) {
+ ws.currentWorkItem.reject(e);
+ ws.currentWorkItem = null;
this.numBusy--;
}
this.findWork(ws);
@@ -132,14 +139,7 @@ export class CryptoApi {
let q = this.workQueues[NUM_PRIO - i - 1];
if (q.length != 0) {
let work: WorkItem = q.shift()!;
- let msg: any = {
- operation: work.operation,
- args: work.args,
- id: this.registerRpcId(work.resolve, work.reject, ws.workerIndex),
- };
-
- this.wake(ws);
- ws.w!.postMessage(msg);
+ this.wake(ws, work);
return;
}
}
@@ -151,22 +151,19 @@ export class CryptoApi {
console.error("rpc id must be number");
return;
}
- if (!this.rpcRegistry[id]) {
- console.error(`RPC with id ${id} has no registry entry`);
+ let currentWorkItem = ws.currentWorkItem;
+ ws.currentWorkItem = null;
+ this.numBusy--;
+ this.findWork(ws);
+ if (!currentWorkItem) {
+ console.error("unsolicited response from worker");
return;
}
- let {resolve, workerIndex} = this.rpcRegistry[id];
- delete this.rpcRegistry[id];
- if (workerIndex != ws.workerIndex) {
- throw Error("assertion failed");
- }
- if (!ws.busy) {
- throw Error("assertion failed");
+ if (id != currentWorkItem.rpcId) {
+ console.error(`RPC with id ${id} has no registry entry`);
+ return;
}
- ws.busy = false;
- this.numBusy--;
- resolve(msg.data.result);
- this.findWork(ws);
+ currentWorkItem.resolve(msg.data.result);
}
constructor() {
@@ -175,9 +172,8 @@ export class CryptoApi {
for (let i = 0; i < this.workers.length; i++) {
this.workers[i] = {
w: null,
- busy: false,
terminationTimerHandle: null,
- workerIndex: i,
+ currentWorkItem: null,
};
}
this.workQueues = [];
@@ -186,45 +182,34 @@ export class CryptoApi {
}
}
-
- private registerRpcId(resolve: any, reject: any,
- workerIndex: number): number {
- let id = this.nextRpcId++;
- this.rpcRegistry[id] = {resolve, reject, workerIndex};
- return id;
- }
-
-
private doRpc<T>(operation: string, priority: number,
...args: any[]): Promise<T> {
- if (this.numBusy == this.workers.length) {
- let q = this.workQueues[priority];
- if (!q) {
- throw Error("assertion failed");
- }
- return new Promise<T>((resolve, reject) => {
- this.workQueues[priority].push({operation, args, resolve, reject});
- });
- }
- for (let i = 0; i < this.workers.length; i++) {
- let ws = this.workers[i];
- if (ws.busy) {
- continue;
+ return new Promise((resolve, reject) => {
+ let rpcId = this.nextRpcId++;
+ let workItem: WorkItem = {operation, args, resolve, reject, rpcId};
+
+ if (this.numBusy == this.workers.length) {
+ let q = this.workQueues[priority];
+ if (!q) {
+ throw Error("assertion failed");
+ }
+ this.workQueues[priority].push(workItem);
+ return;
}
- this.wake(ws);
+ for (let i = 0; i < this.workers.length; i++) {
+ let ws = this.workers[i];
+ if (ws.currentWorkItem != null) {
+ continue;
+ }
- return new Promise<T>((resolve, reject) => {
- let msg: any = {
- operation, args,
- id: this.registerRpcId(resolve, reject, i),
- };
- ws.w!.postMessage(msg);
- });
- }
+ this.wake<T>(ws, workItem);
+ return;
+ }
- throw Error("assertion failed");
+ throw Error("assertion failed");
+ });
}
@@ -270,4 +255,4 @@ export class CryptoApi {
meltAmount,
meltFee);
}
-}
+} \ No newline at end of file