From 0b198e08888830890622e983445c75f947186b4c Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 13 Oct 2016 20:02:42 +0200 Subject: refactor work queue --- lib/wallet/cryptoApi.ts | 149 +++++++++++++-------------- lib/wallet/cryptoLib.ts | 27 +++-- lib/wallet/cryptoWorker.ts | 4 +- lib/wallet/emscriptif.ts | 244 ++++++++++++++++++++++++--------------------- lib/wallet/wallet.ts | 2 - lib/wallet/wxMessaging.ts | 1 - 6 files changed, 214 insertions(+), 213 deletions(-) (limited to 'lib') diff --git a/lib/wallet/cryptoApi.ts b/lib/wallet/cryptoApi.ts index ec20dd964..88b82ae3b 100644 --- a/lib/wallet/cryptoApi.ts +++ b/lib/wallet/cryptoApi.ts @@ -28,11 +28,6 @@ import {CoinWithDenom} from "./wallet"; import {PayCoinInfo} from "./types"; import {RefreshSession} from "./types"; -interface RegistryEntry { - resolve: any; - reject: any; - workerIndex: number; -} interface WorkerState { /** @@ -41,19 +36,14 @@ interface WorkerState { w: Worker|null; /** - * Are we currently running a task on this worker? + * Work we're currently executing or null if not busy. */ - busy: boolean; + currentWorkItem: WorkItem|null; /** * Timer to terminate the worker if it's not busy enough. */ terminationTimerHandle: number|null; - - /** - * Index of this worker in the list of workers. - */ - workerIndex: number; } interface WorkItem { @@ -61,6 +51,11 @@ interface WorkItem { args: any[]; resolve: any; reject: any; + + /** + * Serial id to identify a matching response. + */ + rpcId: number; } @@ -72,27 +67,21 @@ const NUM_PRIO = 5; export class CryptoApi { private nextRpcId: number = 1; - private rpcRegistry: {[n: number]: RegistryEntry} = {}; private workers: WorkerState[]; private workQueues: WorkItem[][]; /** * Number of busy workers. */ private numBusy: number = 0; - /** - * Number if pending work items. - */ - private numWaiting: number = 0; - /** * Start a worker (if not started) and set as busy. */ - wake(ws: WorkerState) { - if (ws.busy) { + wake(ws: WorkerState, work: WorkItem): void { + if (ws.currentWorkItem != null) { throw Error("assertion failed"); } - ws.busy = true; + ws.currentWorkItem = work; this.numBusy++; if (!ws.w) { let w = new Worker("/lib/wallet/cryptoWorker.js"); @@ -100,27 +89,45 @@ export class CryptoApi { w.onerror = (e: ErrorEvent) => this.handleWorkerError(ws, e); ws.w = w; } + + let msg: any = { + operation: work.operation, args: work.args, + id: work.rpcId + }; + this.resetWorkerTimeout(ws); + ws.w!.postMessage(msg); + } + + resetWorkerTimeout(ws: WorkerState) { if (ws.terminationTimerHandle != null) { clearTimeout(ws.terminationTimerHandle); } let destroy = () => { - if (ws.w && !ws.busy) { + // terminate worker if it's idle + if (ws.w && ws.currentWorkItem == null) { ws.w!.terminate(); ws.w = null; } - } + }; ws.terminationTimerHandle = setTimeout(destroy, 20 * 1000); } handleWorkerError(ws: WorkerState, e: ErrorEvent) { - console.error("error in worker", e); + if (ws.currentWorkItem) { + console.error(`error in worker during ${ws.currentWorkItem!.operation}`, e); + } else { + console.error("error in worker", e); + } + console.error(e.message); try { ws.w!.terminate(); + ws.w = null; } catch (e) { console.error(e); } - if (ws.busy) { - ws.busy = false; + if (ws.currentWorkItem != null) { + ws.currentWorkItem.reject(e); + ws.currentWorkItem = null; this.numBusy--; } this.findWork(ws); @@ -132,14 +139,7 @@ export class CryptoApi { let q = this.workQueues[NUM_PRIO - i - 1]; if (q.length != 0) { let work: WorkItem = q.shift()!; - let msg: any = { - operation: work.operation, - args: work.args, - id: this.registerRpcId(work.resolve, work.reject, ws.workerIndex), - }; - - this.wake(ws); - ws.w!.postMessage(msg); + this.wake(ws, work); return; } } @@ -151,22 +151,19 @@ export class CryptoApi { console.error("rpc id must be number"); return; } - if (!this.rpcRegistry[id]) { - console.error(`RPC with id ${id} has no registry entry`); + let currentWorkItem = ws.currentWorkItem; + ws.currentWorkItem = null; + this.numBusy--; + this.findWork(ws); + if (!currentWorkItem) { + console.error("unsolicited response from worker"); return; } - let {resolve, workerIndex} = this.rpcRegistry[id]; - delete this.rpcRegistry[id]; - if (workerIndex != ws.workerIndex) { - throw Error("assertion failed"); - } - if (!ws.busy) { - throw Error("assertion failed"); + if (id != currentWorkItem.rpcId) { + console.error(`RPC with id ${id} has no registry entry`); + return; } - ws.busy = false; - this.numBusy--; - resolve(msg.data.result); - this.findWork(ws); + currentWorkItem.resolve(msg.data.result); } constructor() { @@ -175,9 +172,8 @@ export class CryptoApi { for (let i = 0; i < this.workers.length; i++) { this.workers[i] = { w: null, - busy: false, terminationTimerHandle: null, - workerIndex: i, + currentWorkItem: null, }; } this.workQueues = []; @@ -186,45 +182,34 @@ export class CryptoApi { } } - - private registerRpcId(resolve: any, reject: any, - workerIndex: number): number { - let id = this.nextRpcId++; - this.rpcRegistry[id] = {resolve, reject, workerIndex}; - return id; - } - - private doRpc(operation: string, priority: number, ...args: any[]): Promise { - if (this.numBusy == this.workers.length) { - let q = this.workQueues[priority]; - if (!q) { - throw Error("assertion failed"); - } - return new Promise((resolve, reject) => { - this.workQueues[priority].push({operation, args, resolve, reject}); - }); - } - for (let i = 0; i < this.workers.length; i++) { - let ws = this.workers[i]; - if (ws.busy) { - continue; + return new Promise((resolve, reject) => { + let rpcId = this.nextRpcId++; + let workItem: WorkItem = {operation, args, resolve, reject, rpcId}; + + if (this.numBusy == this.workers.length) { + let q = this.workQueues[priority]; + if (!q) { + throw Error("assertion failed"); + } + this.workQueues[priority].push(workItem); + return; } - this.wake(ws); + for (let i = 0; i < this.workers.length; i++) { + let ws = this.workers[i]; + if (ws.currentWorkItem != null) { + continue; + } - return new Promise((resolve, reject) => { - let msg: any = { - operation, args, - id: this.registerRpcId(resolve, reject, i), - }; - ws.w!.postMessage(msg); - }); - } + this.wake(ws, workItem); + return; + } - throw Error("assertion failed"); + throw Error("assertion failed"); + }); } @@ -270,4 +255,4 @@ export class CryptoApi { meltAmount, meltFee); } -} +} \ No newline at end of file diff --git a/lib/wallet/cryptoLib.ts b/lib/wallet/cryptoLib.ts index 7969682b4..d471b577d 100644 --- a/lib/wallet/cryptoLib.ts +++ b/lib/wallet/cryptoLib.ts @@ -79,9 +79,13 @@ namespace RpcFunctions { let coinPub = coinPriv.getPublicKey(); let blindingFactor = native.RsaBlindingKeySecret.create(); let pubHash: native.HashCode = coinPub.hash(); - let ev: native.ByteArray = native.rsaBlind(pubHash, - blindingFactor, - denomPub); + let ev = native.rsaBlind(pubHash, + blindingFactor, + denomPub); + + if (!ev) { + throw Error("couldn't blind (malicious exchange key?)"); + } if (!denom.fee_withdraw) { throw Error("Field fee_withdraw missing"); @@ -234,10 +238,10 @@ namespace RpcFunctions { } - function createWithdrawSession(kappa: number, meltCoin: Coin, - newCoinDenoms: Denomination[], - meltAmount: AmountJson, - meltFee: AmountJson): RefreshSession { + export function createWithdrawSession(kappa: number, meltCoin: Coin, + newCoinDenoms: Denomination[], + meltAmount: AmountJson, + meltFee: AmountJson): RefreshSession { let sessionHc = new HashContext(); @@ -268,9 +272,12 @@ namespace RpcFunctions { let blindingFactor = native.RsaBlindingKeySecret.create(); let pubHash: native.HashCode = coinPub.hash(); let denomPub = native.RsaPublicKey.fromCrock(newCoinDenoms[i].denom_pub); - let ev: native.ByteArray = native.rsaBlind(pubHash, - blindingFactor, - denomPub); + let ev = native.rsaBlind(pubHash, + blindingFactor, + denomPub); + if (!ev) { + throw Error("couldn't blind (malicious exchange key?)"); + } let preCoin: RefreshPreCoin = { blindingKey: blindingFactor.toCrock(), coinEv: ev.toCrock(), diff --git a/lib/wallet/cryptoWorker.ts b/lib/wallet/cryptoWorker.ts index 4483c64e6..22feb8eb7 100644 --- a/lib/wallet/cryptoWorker.ts +++ b/lib/wallet/cryptoWorker.ts @@ -22,7 +22,7 @@ "use strict"; -importScripts("../emscripten/libwrapper.js", +importScripts("../emscripten/taler-emscripten-lib.js", "../vendor/system-csp-production.src.js"); @@ -46,7 +46,7 @@ if ("object" !== typeof Module) { { let mod = System.newModule({Module: Module}); - let modName = System.normalizeSync("../emscripten/emsc"); + let modName = System.normalizeSync("../emscripten/taler-emscripten-lib"); console.log("registering", modName); System.set(modName, mod); } diff --git a/lib/wallet/emscriptif.ts b/lib/wallet/emscriptif.ts index 7c08fdc45..bad1a4c5e 100644 --- a/lib/wallet/emscriptif.ts +++ b/lib/wallet/emscriptif.ts @@ -14,13 +14,13 @@ TALER; see the file COPYING. If not, see */ -import { AmountJson } from "./types"; -import * as EmscWrapper from "../emscripten/emsc"; +import {AmountJson} from "./types"; +import * as EmscWrapper from "../emscripten/taler-emscripten-lib"; /** * High-level interface to emscripten-compiled modules used * by the wallet. - * + * * @author Florian Dold */ @@ -43,82 +43,82 @@ let getEmsc: EmscWrapper.EmscFunGen = (...args: any[]) => Module.cwrap.apply( var emsc = { free: (ptr: number) => Module._free(ptr), get_value: getEmsc('TALER_WR_get_value', - 'number', - ['number']), + 'number', + ['number']), get_fraction: getEmsc('TALER_WR_get_fraction', - 'number', - ['number']), + 'number', + ['number']), get_currency: getEmsc('TALER_WR_get_currency', - 'string', - ['number']), + 'string', + ['number']), amount_add: getEmsc('TALER_amount_add', - 'number', - ['number', 'number', 'number']), + 'number', + ['number', 'number', 'number']), amount_subtract: getEmsc('TALER_amount_subtract', - 'number', - ['number', 'number', 'number']), + 'number', + ['number', 'number', 'number']), amount_normalize: getEmsc('TALER_amount_normalize', - 'void', - ['number']), + 'void', + ['number']), amount_get_zero: getEmsc('TALER_amount_get_zero', - 'number', - ['string', 'number']), + 'number', + ['string', 'number']), amount_cmp: getEmsc('TALER_amount_cmp', - 'number', - ['number', 'number']), + 'number', + ['number', 'number']), amount_hton: getEmsc('TALER_amount_hton', - 'void', - ['number', 'number']), + 'void', + ['number', 'number']), amount_ntoh: getEmsc('TALER_amount_ntoh', - 'void', - ['number', 'number']), + 'void', + ['number', 'number']), hash: getEmsc('GNUNET_CRYPTO_hash', - 'void', - ['number', 'number', 'number']), + 'void', + ['number', 'number', 'number']), memmove: getEmsc('memmove', - 'number', - ['number', 'number', 'number']), + 'number', + ['number', 'number', 'number']), rsa_public_key_free: getEmsc('GNUNET_CRYPTO_rsa_public_key_free', - 'void', - ['number']), + 'void', + ['number']), rsa_signature_free: getEmsc('GNUNET_CRYPTO_rsa_signature_free', - 'void', - ['number']), + 'void', + ['number']), string_to_data: getEmsc('GNUNET_STRINGS_string_to_data', - 'number', - ['number', 'number', 'number', 'number']), + 'number', + ['number', 'number', 'number', 'number']), eddsa_sign: getEmsc('GNUNET_CRYPTO_eddsa_sign', - 'number', - ['number', 'number', 'number']), + 'number', + ['number', 'number', 'number']), eddsa_verify: getEmsc('GNUNET_CRYPTO_eddsa_verify', - 'number', - ['number', 'number', 'number', 'number']), + 'number', + ['number', 'number', 'number', 'number']), hash_create_random: getEmsc('GNUNET_CRYPTO_hash_create_random', - 'void', - ['number', 'number']), + 'void', + ['number', 'number']), rsa_blinding_key_destroy: getEmsc('GNUNET_CRYPTO_rsa_blinding_key_free', - 'void', - ['number']), + 'void', + ['number']), random_block: getEmsc('GNUNET_CRYPTO_random_block', - 'void', - ['number', 'number', 'number']), + 'void', + ['number', 'number', 'number']), hash_context_abort: getEmsc('GNUNET_CRYPTO_hash_context_abort', - 'void', - ['number']), + 'void', + ['number']), hash_context_read: getEmsc('GNUNET_CRYPTO_hash_context_read', - 'void', - ['number', 'number', 'number']), + 'void', + ['number', 'number', 'number']), hash_context_finish: getEmsc('GNUNET_CRYPTO_hash_context_finish', - 'void', - ['number', 'number']), + 'void', + ['number', 'number']), }; var emscAlloc = { get_amount: getEmsc('TALER_WRALL_get_amount', - 'number', - ['number', 'number', 'number', 'string']), + 'number', + ['number', 'number', 'number', 'string']), eddsa_key_create: getEmsc('GNUNET_CRYPTO_eddsa_key_create', - 'number', []), + 'number', []), ecdsa_key_create: getEmsc('GNUNET_CRYPTO_ecdsa_key_create', 'number', []), eddsa_public_key_from_private: getEmsc( @@ -130,41 +130,41 @@ var emscAlloc = { 'number', ['number']), data_to_string_alloc: getEmsc('GNUNET_STRINGS_data_to_string_alloc', - 'number', - ['number', 'number']), + 'number', + ['number', 'number']), purpose_create: getEmsc('TALER_WRALL_purpose_create', - 'number', - ['number', 'number', 'number']), + 'number', + ['number', 'number', 'number']), rsa_blind: getEmsc('GNUNET_CRYPTO_rsa_blind', - 'number', - ['number', 'number', 'number', 'number']), + 'number', + ['number', 'number', 'number', 'number', 'number']), rsa_blinding_key_create: getEmsc('GNUNET_CRYPTO_rsa_blinding_key_create', - 'number', - ['number']), + 'number', + ['number']), rsa_blinding_key_encode: getEmsc('GNUNET_CRYPTO_rsa_blinding_key_encode', - 'number', - ['number', 'number']), + 'number', + ['number', 'number']), rsa_signature_encode: getEmsc('GNUNET_CRYPTO_rsa_signature_encode', - 'number', - ['number', 'number']), + 'number', + ['number', 'number']), rsa_blinding_key_decode: getEmsc('GNUNET_CRYPTO_rsa_blinding_key_decode', - 'number', - ['number', 'number']), + 'number', + ['number', 'number']), rsa_public_key_decode: getEmsc('GNUNET_CRYPTO_rsa_public_key_decode', - 'number', - ['number', 'number']), + 'number', + ['number', 'number']), rsa_signature_decode: getEmsc('GNUNET_CRYPTO_rsa_signature_decode', - 'number', - ['number', 'number']), + 'number', + ['number', 'number']), rsa_public_key_encode: getEmsc('GNUNET_CRYPTO_rsa_public_key_encode', - 'number', - ['number', 'number']), + 'number', + ['number', 'number']), rsa_unblind: getEmsc('GNUNET_CRYPTO_rsa_unblind', - 'number', - ['number', 'number', 'number']), + 'number', + ['number', 'number', 'number']), hash_context_start: getEmsc('GNUNET_CRYPTO_hash_context_start', - 'number', - []), + 'number', + []), malloc: (size: number) => Module._malloc(size), }; @@ -359,9 +359,9 @@ export class Amount extends MallocArenaObject { super(arena); if (args) { this.nativePtr = emscAlloc.get_amount(args.value, - 0, - args.fraction, - args.currency); + 0, + args.fraction, + args.currency); } else { this.nativePtr = emscAlloc.get_amount(0, 0, 0, ""); } @@ -514,12 +514,12 @@ abstract class PackedArenaObject extends MallocArenaObject { // to the emscripten heap first. let buf = ByteArray.fromString(s); let res = emsc.string_to_data(buf.nativePtr, - s.length, - this.nativePtr, - this.size()); + s.length, + this.nativePtr, + this.size()); buf.destroy(); if (res < 1) { - throw { error: "wrong encoding" }; + throw {error: "wrong encoding"}; } } @@ -618,7 +618,6 @@ export class EcdsaPrivateKey extends PackedArenaObject { mixinStatic(EcdsaPrivateKey, fromCrock); - function fromCrock(s: string) { let x = new this(); x.alloc(); @@ -673,7 +672,7 @@ function makeFromCrock(decodeFn: (p: number, s: number) => number) { let obj = new this(a); let buf = ByteArray.fromCrock(s); obj.setNative(decodeFn(buf.getNative(), - buf.size())); + buf.size())); buf.destroy(); return obj; } @@ -682,7 +681,7 @@ function makeFromCrock(decodeFn: (p: number, s: number) => number) { } function makeToCrock(encodeFn: (po: number, - ps: number) => number): () => string { + ps: number) => number): () => string { function toCrock() { let ptr = emscAlloc.malloc(PTR_SIZE); let size = emscAlloc.rsa_blinding_key_encode(this.nativePtr, ptr); @@ -780,12 +779,12 @@ export class EccSignaturePurpose extends PackedArenaObject { payloadSize: number; constructor(purpose: SignaturePurpose, - payload: PackedArenaObject, - a?: Arena) { + payload: PackedArenaObject, + a?: Arena) { super(a); this.nativePtr = emscAlloc.purpose_create(purpose, - payload.nativePtr, - payload.size()); + payload.nativePtr, + payload.size()); this.payloadSize = payload.size(); } } @@ -1121,21 +1120,31 @@ mixin(RsaSignature, makeEncode(emscAlloc.rsa_signature_encode)); export function rsaBlind(hashCode: HashCode, - blindingKey: RsaBlindingKeySecret, - pkey: RsaPublicKey, - arena?: Arena): ByteArray { - let ptr = emscAlloc.malloc(PTR_SIZE); - let s = emscAlloc.rsa_blind(hashCode.nativePtr, - blindingKey.nativePtr, - pkey.nativePtr, - ptr); - return new ByteArray(s, Module.getValue(ptr, '*'), arena); + blindingKey: RsaBlindingKeySecret, + pkey: RsaPublicKey, + arena?: Arena): ByteArray|null { + let buf_ptr_out = emscAlloc.malloc(PTR_SIZE); + let buf_size_out = emscAlloc.malloc(PTR_SIZE); + let res = emscAlloc.rsa_blind(hashCode.nativePtr, + blindingKey.nativePtr, + pkey.nativePtr, + buf_ptr_out, + buf_size_out); + let buf_ptr = Module.getValue(buf_ptr_out, '*'); + let buf_size = Module.getValue(buf_size_out, '*'); + emsc.free(buf_ptr_out); + emsc.free(buf_size_out); + if (res != GNUNET_OK) { + // malicious key + return null; + } + return new ByteArray(buf_size, buf_ptr, arena); } export function eddsaSign(purpose: EccSignaturePurpose, - priv: EddsaPrivateKey, - a?: Arena): EddsaSignature { + priv: EddsaPrivateKey, + a?: Arena): EddsaSignature { let sig = new EddsaSignature(a); sig.alloc(); let res = emsc.eddsa_sign(priv.nativePtr, purpose.nativePtr, sig.nativePtr); @@ -1147,14 +1156,14 @@ export function eddsaSign(purpose: EccSignaturePurpose, export function eddsaVerify(purposeNum: number, - verify: EccSignaturePurpose, - sig: EddsaSignature, - pub: EddsaPublicKey, - a?: Arena): boolean { + verify: EccSignaturePurpose, + sig: EddsaSignature, + pub: EddsaPublicKey, + a?: Arena): boolean { let r = emsc.eddsa_verify(purposeNum, - verify.nativePtr, - sig.nativePtr, - pub.nativePtr); + verify.nativePtr, + sig.nativePtr, + pub.nativePtr); if (r === GNUNET_OK) { return true; } @@ -1163,13 +1172,13 @@ export function eddsaVerify(purposeNum: number, export function rsaUnblind(sig: RsaSignature, - bk: RsaBlindingKeySecret, - pk: RsaPublicKey, - a?: Arena): RsaSignature { + bk: RsaBlindingKeySecret, + pk: RsaPublicKey, + a?: Arena): RsaSignature { let x = new RsaSignature(a); x.nativePtr = emscAlloc.rsa_unblind(sig.nativePtr, - bk.nativePtr, - pk.nativePtr); + bk.nativePtr, + pk.nativePtr); return x; } @@ -1208,16 +1217,19 @@ export interface FreshCoin { blindingKey: RsaBlindingKeySecret; } -export function setupFreshCoin(secretSeed: TransferSecretP, coinIndex: number): FreshCoin { +export function setupFreshCoin(secretSeed: TransferSecretP, + coinIndex: number): FreshCoin { let priv = new EddsaPrivateKey(); priv.isWeak = true; let blindingKey = new RsaBlindingKeySecret(); blindingKey.isWeak = true; - let buf = kdf(priv.size() + blindingKey.size(), UInt32.fromNumber(coinIndex), ByteArray.fromString("taler-coin-derivation")); + let buf = kdf(priv.size() + blindingKey.size(), + UInt32.fromNumber(coinIndex), + ByteArray.fromString("taler-coin-derivation")); priv.nativePtr = buf.nativePtr; blindingKey.nativePtr = buf.nativePtr + priv.size(); - return { priv, blindingKey }; + return {priv, blindingKey}; } \ No newline at end of file diff --git a/lib/wallet/wallet.ts b/lib/wallet/wallet.ts index b248d5315..264eef250 100644 --- a/lib/wallet/wallet.ts +++ b/lib/wallet/wallet.ts @@ -797,8 +797,6 @@ export class Wallet { } async storeCoin(coin: Coin): Promise { - console.log("storing coin", new Date()); - let historyEntry: HistoryRecord = { type: "withdraw", timestamp: (new Date).getTime(), diff --git a/lib/wallet/wxMessaging.ts b/lib/wallet/wxMessaging.ts index 2c69afc99..07f5cc1d8 100644 --- a/lib/wallet/wxMessaging.ts +++ b/lib/wallet/wxMessaging.ts @@ -286,7 +286,6 @@ class ChromeNotifier implements Notifier { } notify() { - console.log("notifying all ports"); for (let p of this.ports) { p.postMessage({ notify: true }); } -- cgit v1.2.3