diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/wallet/cryptoApi.ts | 141 |
1 files changed, 118 insertions, 23 deletions
diff --git a/lib/wallet/cryptoApi.ts b/lib/wallet/cryptoApi.ts index 2a2a7d319..62f51f450 100644 --- a/lib/wallet/cryptoApi.ts +++ b/lib/wallet/cryptoApi.ts @@ -27,17 +27,55 @@ import {Denomination} from "./types"; import {Offer} from "./wallet"; import {CoinWithDenom} from "./wallet"; import {PayCoinInfo} from "./types"; -type RegistryEntry = {resolve: any; reject: any}; + +interface RegistryEntry { + resolve: any; + reject: any; + workerIndex: number; +} + +interface WorkerState { + /** + * The actual worker thread. + */ + w: Worker; + /** + * Are we currently running a task on this worker? + */ + busy: boolean; +} + +interface WorkItem { + operation: string; + args: any[]; + resolve: any; + reject: any; +} + + +/** + * Number of different priorities. Each priority p + * must be 0 <= p < NUM_PRIO. + */ +const NUM_PRIO = 5; + export class CryptoApi { private nextRpcId: number = 1; private rpcRegistry: {[n: number]: RegistryEntry} = {}; - private cryptoWorker: Worker; + private workers: WorkerState[]; + private workQueues: WorkItem[][]; + /** + * Number of busy workers. + */ + private numBusy: number = 0; + /** + * Number if pending work items. + */ + private numWaiting: number = 0; constructor() { - this.cryptoWorker = new Worker("/lib/wallet/cryptoWorker.js"); - - this.cryptoWorker.onmessage = (msg: MessageEvent) => { + let handler = (msg: MessageEvent) => { let id = msg.data.id; if (typeof id !== "number") { console.error("rpc id must be number"); @@ -47,54 +85,111 @@ export class CryptoApi { console.error(`RPC with id ${id} has no registry entry`); return; } - let {resolve, reject} = this.rpcRegistry[id]; + let {resolve, workerIndex} = this.rpcRegistry[id]; + delete this.rpcRegistry[id]; + let ws = this.workers[workerIndex]; + ws.busy = false; + this.numBusy--; resolve(msg.data.result); + + // try to find more work for this worker + for (let i = 0; i < NUM_PRIO; i++) { + let q = this.workQueues[NUM_PRIO - i - 1]; + if (q.length != 0) { + let work: WorkItem = q.shift()!; + let msg: any = { + operation: work.operation, + args: work.args, + id: this.registerRpcId(work.resolve, work.reject, workerIndex), + }; + ws.w.postMessage(msg); + ws.busy = true; + this.numBusy++; + } + } + }; + + this.workers = new Array<WorkerState>((navigator as any)["hardwareConcurrency"] || 2); + + for (let i = 0; i < this.workers.length; i++) { + let w = new Worker("/lib/wallet/cryptoWorker.js"); + w.onmessage = handler; + this.workers[i] = { + w, + busy: false, + }; + } + this.workQueues = []; + for (let i = 0; i < NUM_PRIO; i++) { + this.workQueues.push([]); } } - private registerRpcId(resolve: any, reject: any): number { + private registerRpcId(resolve: any, reject: any, + workerIndex: number): number { let id = this.nextRpcId++; - this.rpcRegistry[id] = {resolve, reject}; + this.rpcRegistry[id] = {resolve, reject, workerIndex}; return id; } - private doRpc<T>(methodName: string, ...args: any[]): Promise<T> { - return new Promise<T>((resolve, reject) => { - let msg = { - operation: methodName, - id: this.registerRpcId(resolve, reject), - args: args, - }; - this.cryptoWorker.postMessage(msg); - }); + private doRpc<T>(operation: string, priority: number, + ...args: any[]): Promise<T> { + if (this.numBusy == this.workers.length) { + let q = this.workQueues[priority]; + if (!q) { + throw Error("assertion failed"); + } + return new Promise<T>((resolve, reject) => { + this.workQueues[priority].push({operation, args, resolve, reject}); + }); + } + + for (let i = 0; i < this.workers.length; i++) { + let ws = this.workers[i]; + if (ws.busy) { + continue; + } + + return new Promise<T>((resolve, reject) => { + let msg: any = { + operation, args, + id: this.registerRpcId(resolve, reject, i), + }; + ws.w.postMessage(msg); + ws.busy = true; + this.numBusy++; + }); + } + + throw Error("assertion failed"); } createPreCoin(denom: Denomination, reserve: Reserve): Promise<PreCoin> { - return this.doRpc("createPreCoin", denom, reserve); + return this.doRpc("createPreCoin", 1, denom, reserve); } hashRsaPub(rsaPub: string): Promise<string> { - return this.doRpc("hashRsaPub", rsaPub); + return this.doRpc("hashRsaPub", 2, rsaPub); } isValidDenom(denom: Denomination, masterPub: string): Promise<boolean> { - return this.doRpc("isValidDenom", denom, masterPub); + return this.doRpc("isValidDenom", 2, denom, masterPub); } signDeposit(offer: Offer, cds: CoinWithDenom[]): Promise<PayCoinInfo> { - return this.doRpc("signDeposit", offer, cds); + return this.doRpc("signDeposit", 3, offer, cds); } createEddsaKeypair(): Promise<{priv: string, pub: string}> { - return this.doRpc("createEddsaKeypair"); + return this.doRpc("createEddsaKeypair", 1); } rsaUnblind(sig: string, bk: string, pk: string): Promise<string> { - return this.doRpc("rsaUnblind", sig, bk, pk); + return this.doRpc("rsaUnblind", 4, sig, bk, pk); } } |