aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts')
-rw-r--r--packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts136
1 files changed, 86 insertions, 50 deletions
diff --git a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts b/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
index 2ef0d7c69..48c9c6060 100644
--- a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
+++ b/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts
@@ -22,7 +22,9 @@
/**
* Imports.
*/
-import { Logger } from "@gnu-taler/taler-util";
+import { Logger, TalerErrorCode } from "@gnu-taler/taler-util";
+import { TalerError } from "../../errors.js";
+import { openPromise } from "../../util/promiseUtils.js";
import { timer, performanceNow, TimerHandle } from "../../util/timer.js";
import { nullCrypto, TalerCryptoInterface } from "../cryptoImplementation.js";
import { CryptoWorker } from "./cryptoWorkerInterface.js";
@@ -32,7 +34,7 @@ const logger = new Logger("cryptoApi.ts");
/**
* State of a crypto worker.
*/
-interface WorkerState {
+interface WorkerInfo {
/**
* The actual worker thread.
*/
@@ -64,6 +66,8 @@ interface WorkItem {
* Time when the work was submitted to a (non-busy) worker thread.
*/
startTime: BigInt;
+
+ state: WorkItemState;
}
/**
@@ -92,12 +96,18 @@ export class CryptoApiStoppedError extends Error {
}
}
+export enum WorkItemState {
+ Pending = 1,
+ Running = 2,
+ Finished = 3,
+}
+
/**
* Dispatcher for cryptographic operations to underlying crypto workers.
*/
export class CryptoDispatcher {
private nextRpcId = 1;
- private workers: WorkerState[];
+ private workers: WorkerInfo[];
private workQueues: WorkItem[][];
private workerFactory: CryptoWorkerFactory;
@@ -141,7 +151,7 @@ export class CryptoDispatcher {
/**
* Start a worker (if not started) and set as busy.
*/
- wake(ws: WorkerState, work: WorkItem): void {
+ wake(ws: WorkerInfo, work: WorkItem): void {
if (this.stopped) {
return;
}
@@ -167,10 +177,11 @@ export class CryptoDispatcher {
};
this.resetWorkerTimeout(ws);
work.startTime = performanceNow();
+ work.state = WorkItemState.Running;
timer.after(0, () => worker.postMessage(msg));
}
- resetWorkerTimeout(ws: WorkerState): void {
+ resetWorkerTimeout(ws: WorkerInfo): void {
if (ws.idleTimeoutHandle !== null) {
ws.idleTimeoutHandle.clear();
ws.idleTimeoutHandle = null;
@@ -187,7 +198,7 @@ export class CryptoDispatcher {
ws.idleTimeoutHandle.unref();
}
- handleWorkerError(ws: WorkerState, e: any): void {
+ handleWorkerError(ws: WorkerInfo, e: any): void {
if (ws.currentWorkItem) {
logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e);
} else {
@@ -203,6 +214,7 @@ export class CryptoDispatcher {
logger.error(e as string);
}
if (ws.currentWorkItem !== null) {
+ ws.currentWorkItem.state = WorkItemState.Finished;
ws.currentWorkItem.reject(e);
ws.currentWorkItem = null;
this.numBusy--;
@@ -210,7 +222,7 @@ export class CryptoDispatcher {
this.findWork(ws);
}
- private findWork(ws: WorkerState): void {
+ private findWork(ws: WorkerInfo): void {
// try to find more work for this worker
for (let i = 0; i < NUM_PRIO; i++) {
const q = this.workQueues[NUM_PRIO - i - 1];
@@ -225,26 +237,38 @@ export class CryptoDispatcher {
}
}
- handleWorkerMessage(ws: WorkerState, msg: any): void {
+ handleWorkerMessage(ws: WorkerInfo, msg: any): void {
const id = msg.data.id;
if (typeof id !== "number") {
- console.error("rpc id must be number");
+ logger.error("rpc id must be number");
return;
}
const currentWorkItem = ws.currentWorkItem;
ws.currentWorkItem = null;
- this.numBusy--;
- this.findWork(ws);
if (!currentWorkItem) {
- console.error("unsolicited response from worker");
+ logger.error("unsolicited response from worker");
return;
}
if (id !== currentWorkItem.rpcId) {
- console.error(`RPC with id ${id} has no registry entry`);
+ logger.error(`RPC with id ${id} has no registry entry`);
return;
}
-
- currentWorkItem.resolve(msg.data.result);
+ if (currentWorkItem.state === WorkItemState.Running) {
+ this.numBusy--;
+ currentWorkItem.state = WorkItemState.Finished;
+ if (msg.data.type === "success") {
+ currentWorkItem.resolve(msg.data.result);
+ } else if (msg.data.type === "error") {
+ currentWorkItem.reject(
+ TalerError.fromDetail(TalerErrorCode.WALLET_CRYPTO_WORKER_ERROR, {
+ innerError: msg.data.error,
+ }),
+ );
+ } else {
+ currentWorkItem.reject(new Error("bad message from crypto worker"));
+ }
+ }
+ this.findWork(ws);
}
cryptoApi: TalerCryptoInterface;
@@ -258,7 +282,7 @@ export class CryptoDispatcher {
this.cryptoApi = fns;
this.workerFactory = workerFactory;
- this.workers = new Array<WorkerState>(workerFactory.getConcurrency());
+ this.workers = new Array<WorkerInfo>(workerFactory.getConcurrency());
for (let i = 0; i < this.workers.length; i++) {
this.workers[i] = {
@@ -282,36 +306,42 @@ export class CryptoDispatcher {
if (this.stopped) {
throw new CryptoApiStoppedError();
}
- const p: Promise<T> = new Promise<T>((resolve, reject) => {
- const rpcId = this.nextRpcId++;
- const workItem: WorkItem = {
- operation,
- req,
- resolve,
- reject,
- rpcId,
- startTime: BigInt(0),
- };
-
- if (this.numBusy === this.workers.length) {
- const q = this.workQueues[priority];
- if (!q) {
- throw Error("assertion failed");
- }
- this.workQueues[priority].push(workItem);
- return;
+ const rpcId = this.nextRpcId++;
+ const myProm = openPromise<T>();
+ const workItem: WorkItem = {
+ operation,
+ req,
+ resolve: myProm.resolve,
+ reject: myProm.reject,
+ rpcId,
+ startTime: BigInt(0),
+ state: WorkItemState.Pending,
+ };
+ let scheduled = false;
+ if (this.numBusy === this.workers.length) {
+ // All workers are busy, queue work item
+ const q = this.workQueues[priority];
+ if (!q) {
+ throw Error("assertion failed");
}
-
+ this.workQueues[priority].push(workItem);
+ scheduled = true;
+ }
+ if (!scheduled) {
for (const ws of this.workers) {
if (ws.currentWorkItem !== null) {
continue;
}
this.wake(ws, workItem);
- return;
+ scheduled = true;
+ break;
}
+ }
+ if (!scheduled) {
+ // Could not schedule work.
throw Error("assertion failed");
- });
+ }
// Make sure that we wait for the result while a timer is active
// to prevent the event loop from dying, as just waiting for a promise
@@ -324,21 +354,27 @@ export class CryptoDispatcher {
logger.warn(`crypto RPC call ('${operation}') timed out`);
timedOut = true;
reject(new Error(`crypto RPC call ('${operation}') timed out`));
- });
- p.then((x) => {
- if (timedOut) {
- return;
- }
- timeout.clear();
- resolve(x);
- }).catch((x) => {
- logger.info(`crypto RPC call ${operation} threw`);
- if (timedOut) {
- return;
+ if (workItem.state === WorkItemState.Running) {
+ workItem.state = WorkItemState.Finished;
+ this.numBusy--;
}
- timeout.clear();
- reject(x);
});
+ myProm.promise
+ .then((x) => {
+ if (timedOut) {
+ return;
+ }
+ timeout.clear();
+ resolve(x);
+ })
+ .catch((x) => {
+ logger.info(`crypto RPC call ${operation} threw`);
+ if (timedOut) {
+ return;
+ }
+ timeout.clear();
+ reject(x);
+ });
});
}
}