diff options
Diffstat (limited to 'packages')
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts | 130 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts (renamed from packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts) | 48 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/index.ts | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/internal-wallet-state.ts | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/operations/refresh.ts | 2 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 2 |
9 files changed, 159 insertions, 33 deletions
diff --git a/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts new file mode 100644 index 000000000..b63c9bf11 --- /dev/null +++ b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.test.ts @@ -0,0 +1,130 @@ +/* + This file is part of GNU Taler + (C) 2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +import test from "ava"; +import { CryptoDispatcher, CryptoWorkerFactory } from "./crypto-dispatcher.js"; +import { CryptoWorker, CryptoWorkerResponseMessage } from "./cryptoWorkerInterface.js"; +import { SynchronousCryptoWorkerFactoryNode } from "./synchronousWorkerFactoryNode.js"; +import { processRequestWithImpl } from "./worker-common.js"; + + +export class MyCryptoWorker implements CryptoWorker { + /** + * Function to be called when we receive a message from the worker thread. + */ + onmessage: undefined | ((m: any) => void) = undefined; + + /** + * Function to be called when we receive an error from the worker thread. + */ + onerror: undefined | ((m: any) => void) = undefined; + + /** + * Add an event listener for either an "error" or "message" event. + */ + addEventListener(event: "message" | "error", fn: (x: any) => void): void { + switch (event) { + case "message": + this.onmessage = fn; + break; + case "error": + this.onerror = fn; + break; + } + } + + private dispatchMessage(msg: any): void { + if (this.onmessage) { + this.onmessage(msg); + } + } + + /** + * Send a message to the worker thread. + */ + postMessage(msg: any): void { + const handleRequest = async () => { + let responseMsg: CryptoWorkerResponseMessage; + if (msg.operation === "testSuccess") { + responseMsg = { + id: msg.id, + type: "success", + result: { + testResult: 42, + } + } + } else if (msg.operation === "testError") { + responseMsg = { + id: msg.id, + type: "error", + error: { + code: 42, + hint: "bla", + } + } + } else if (msg.operation === "testTimeout") { + // Don't respond + return; + } + try { + setTimeout(() => this.dispatchMessage(responseMsg), 0); + } catch (e) { + console.error("got error during dispatch", e); + } + }; + handleRequest().catch((e) => { + console.error("Error while handling crypto request:", e); + }); + } + + /** + * Forcibly terminate the worker thread. + */ + terminate(): void { + // This is a no-op. + } +} + + + +export class MyCryptoWorkerFactory implements CryptoWorkerFactory { + startWorker(): CryptoWorker { + return new MyCryptoWorker(); + } + + getConcurrency(): number { + return 1; + } +} + +test("continues after error", async (t) => { + const cryptoDisp = new CryptoDispatcher( + new MyCryptoWorkerFactory(), + ); + const resp1 = await cryptoDisp.doRpc("testSuccess", 0, {}); + t.assert((resp1 as any).testResult === 42); + const exc = await t.throwsAsync(async() => { + const resp2 = await cryptoDisp.doRpc("testError", 0, {}); + }); + + // Check that it still works after one error. + const resp2 = await cryptoDisp.doRpc("testSuccess", 0, {}); + t.assert((resp2 as any).testResult === 42); + + // Check that it still works after timeout. + const resp3 = await cryptoDisp.doRpc("testSuccess", 0, {}); + t.assert((resp3 as any).testResult === 42); +}); diff --git a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts index 88aea71b9..940078ea6 100644 --- a/packages/taler-wallet-core/src/crypto/workers/cryptoDispatcher.ts +++ b/packages/taler-wallet-core/src/crypto/workers/crypto-dispatcher.ts @@ -203,13 +203,7 @@ export class CryptoDispatcher { ws.idleTimeoutHandle.unref(); } - handleWorkerError(ws: WorkerInfo, e: any): void { - if (ws.currentWorkItem) { - logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e); - } else { - logger.error("error in worker", e); - } - logger.error(e.message); + private resetWorker(ws: WorkerInfo, e: any): void { try { if (ws.w) { ws.w.terminate(); @@ -227,6 +221,16 @@ export class CryptoDispatcher { this.findWork(ws); } + handleWorkerError(ws: WorkerInfo, e: any): void { + if (ws.currentWorkItem) { + logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e); + } else { + logger.error("error in worker", e); + } + logger.error(e.message); + this.resetWorker(ws, e); + } + private findWork(ws: WorkerInfo): void { // try to find more work for this worker for (let i = 0; i < NUM_PRIO; i++) { @@ -304,7 +308,7 @@ export class CryptoDispatcher { } } - private doRpc<T>( + doRpc<T>( operation: string, priority: number, req: unknown, @@ -355,30 +359,22 @@ export class CryptoDispatcher { // (The worker child process won't keep us alive either, because we un-ref // it to make sure it doesn't keep us alive if there is no work.) return new Promise<T>((resolve, reject) => { - let timedOut = false; - const timeout = timer.after(10000, () => { - logger.warn(`crypto RPC call ('${operation}') timed out`); - timedOut = true; - reject(new Error(`crypto RPC call ('${operation}') timed out`)); - if (workItem.state === WorkItemState.Running) { - workItem.state = WorkItemState.Finished; - this.numBusy--; - } - }); + let timeoutHandle: TimerHandle | undefined = undefined; + const timeoutMs = 5000; + const onTimeout = () => { + // FIXME: Maybe destroy and re-init worker if request is in processing + // state and really taking too long? + logger.warn(`crypto RPC call ('${operation}') has been queued for a long time`); + timeoutHandle = timer.after(timeoutMs, onTimeout); + }; myProm.promise .then((x) => { - if (timedOut) { - return; - } - timeout.clear(); + timeoutHandle?.clear(); resolve(x); }) .catch((x) => { logger.info(`crypto RPC call ${operation} threw`); - if (timedOut) { - return; - } - timeout.clear(); + timeoutHandle?.clear(); reject(x); }); }); diff --git a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts index 634c891b6..db8bb4737 100644 --- a/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts +++ b/packages/taler-wallet-core/src/crypto/workers/nodeThreadWorker.ts @@ -21,7 +21,7 @@ import { Logger } from "@gnu-taler/taler-util"; import os from "os"; import url from "url"; import { nativeCryptoR } from "../cryptoImplementation.js"; -import { CryptoWorkerFactory } from "./cryptoDispatcher.js"; +import { CryptoWorkerFactory } from "./crypto-dispatcher.js"; import { CryptoWorker } from "./cryptoWorkerInterface.js"; import { processRequestWithImpl } from "./worker-common.js"; diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts index 46cf12915..90f9a43fa 100644 --- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts +++ b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryNode.ts @@ -17,7 +17,7 @@ /** * Imports. */ -import { CryptoWorkerFactory } from "./cryptoDispatcher.js"; +import { CryptoWorkerFactory } from "./crypto-dispatcher.js"; import { CryptoWorker } from "./cryptoWorkerInterface.js"; import { SynchronousCryptoWorkerNode } from "./synchronousWorkerNode.js"; diff --git a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts index d0c8e4b3a..66381bc0e 100644 --- a/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts +++ b/packages/taler-wallet-core/src/crypto/workers/synchronousWorkerFactoryPlain.ts @@ -17,7 +17,7 @@ /** * Imports. */ -import { CryptoWorkerFactory } from "./cryptoDispatcher.js"; +import { CryptoWorkerFactory } from "./crypto-dispatcher.js"; import { CryptoWorker } from "./cryptoWorkerInterface.js"; import { SynchronousCryptoWorkerPlain } from "./synchronousWorkerPlain.js"; diff --git a/packages/taler-wallet-core/src/index.ts b/packages/taler-wallet-core/src/index.ts index 7cc23aa88..e48c9430f 100644 --- a/packages/taler-wallet-core/src/index.ts +++ b/packages/taler-wallet-core/src/index.ts @@ -37,7 +37,7 @@ export type { CryptoWorker } from "./crypto/workers/cryptoWorkerInterface.js"; export { CryptoWorkerFactory, CryptoDispatcher, -} from "./crypto/workers/cryptoDispatcher.js"; +} from "./crypto/workers/crypto-dispatcher.js"; export * from "./pending-types.js"; diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts index ebb9cdb9b..93d813cc9 100644 --- a/packages/taler-wallet-core/src/internal-wallet-state.ts +++ b/packages/taler-wallet-core/src/internal-wallet-state.ts @@ -41,7 +41,7 @@ import { CoinRefreshRequest, RefreshReason, } from "@gnu-taler/taler-util"; -import { CryptoDispatcher } from "./crypto/workers/cryptoDispatcher.js"; +import { CryptoDispatcher } from "./crypto/workers/crypto-dispatcher.js"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { ExchangeDetailsRecord, ExchangeRecord, WalletStoresV1 } from "./db.js"; import { PendingOperationsResponse } from "./pending-types.js"; diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 806e4a246..eeff84be6 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -52,7 +52,7 @@ import { DerivedRefreshSession, RefreshNewDenomInfo, } from "../crypto/cryptoTypes.js"; -import { CryptoApiStoppedError } from "../crypto/workers/cryptoDispatcher.js"; +import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js"; import { CoinRecord, CoinSourceType, diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index f442e678a..1defff0d2 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -106,7 +106,7 @@ import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { CryptoDispatcher, CryptoWorkerFactory, -} from "./crypto/workers/cryptoDispatcher.js"; +} from "./crypto/workers/crypto-dispatcher.js"; import { clearDatabase } from "./db-utils.js"; import { AuditorTrustRecord, |