diff options
author | Florian Dold <florian@dold.me> | 2024-06-26 16:02:04 +0200 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2024-06-26 16:02:16 +0200 |
commit | 60a08dd50be03f70561852c6893ab366d22568d6 (patch) | |
tree | 241c52008a1b0f1e1e81f9d625899177e44db588 /packages | |
parent | 0e434a26df32fd1e6b15713dd5c2433728913697 (diff) | |
download | wallet-core-60a08dd50be03f70561852c6893ab366d22568d6.tar.xz |
wallet-core: cancellation token support for DB handles
Diffstat (limited to 'packages')
-rw-r--r-- | packages/taler-wallet-core/src/query.ts | 86 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 26 |
2 files changed, 90 insertions, 22 deletions
diff --git a/packages/taler-wallet-core/src/query.ts b/packages/taler-wallet-core/src/query.ts index dc15bbdd1..9cbde9304 100644 --- a/packages/taler-wallet-core/src/query.ts +++ b/packages/taler-wallet-core/src/query.ts @@ -565,8 +565,17 @@ function runTx<Arg, Res>( arg: Arg, f: (t: Arg, t2: IDBTransaction) => Promise<Res>, triggerContext: InternalTriggerContext, + cancellationToken: CancellationToken, ): Promise<Res> { + cancellationToken.throwIfCancelled(); + // Create stack trace in case we need to to print later where + // the transaction was started. const stack = Error("Failed transaction was started here."); + + const unregisterOnCancelled = cancellationToken.onCancelled(() => { + tx.abort(); + }); + return new Promise((resolve, reject) => { let funResult: any = undefined; let gotFunResult = false; @@ -577,16 +586,18 @@ function runTx<Arg, Res>( // function waited on a promise that is *not* resolved in the // microtask queue, thus triggering the auto-commit behavior. // Unfortunately, the auto-commit behavior of IDB can't be switched - // of. There are some proposals to add this functionality in the future. + // off. There are some proposals to add this functionality in the future. if (!gotFunResult) { const msg = "BUG: transaction closed before transaction function returned"; logger.error(msg); logger.error(`${stack.stack}`); reject(Error(msg)); + } else { + resolve(funResult); } triggerContext.handleAfterCommit(); - resolve(funResult); + unregisterOnCancelled(); }; tx.onerror = () => { logger.error("error in transaction"); @@ -604,6 +615,7 @@ function runTx<Arg, Res>( logger.error(msg); logger.error(`${stack.stack}`); reject(new TransactionAbortedError(msg)); + unregisterOnCancelled(); }; const resP = Promise.resolve().then(() => f(arg, tx)); resP @@ -620,9 +632,6 @@ function runTx<Arg, Res>( console.error(stack); tx.abort(); } - }) - .catch((e) => { - console.error("fatal: aborting transaction failed", e); }); }); } @@ -797,9 +806,26 @@ function makeWriteContext( return ctx; } +/** + * Handle for typed access to a database. + */ export interface DbAccess<StoreMap> { + /** + * The underlying IndexedDB database handle. + * + * Use with caution, as using the handle directly will not + * properly run DB triggers. + */ idbHandle(): IDBDatabase; + /** + * Run an async function in a "readwrite" transaction on the database, using + * all object store. + * + * The transaction function must run within the microtask queue. + * Waiting for macrotasks results in an autocommit and + * a subsequent exception thrown by this function. + */ runAllStoresReadWriteTx<T>( options: { label?: string; @@ -809,6 +835,14 @@ export interface DbAccess<StoreMap> { ) => Promise<T>, ): Promise<T>; + /** + * Run an async function in a "readonly" transaction on the database, using + * all object store. + * + * The transaction function must run within the microtask queue. + * Waiting for macrotasks results in an autocommit and + * a subsequent exception thrown by this function. + */ runAllStoresReadOnlyTx<T>( options: { label?: string; @@ -818,6 +852,14 @@ export interface DbAccess<StoreMap> { ) => Promise<T>, ): Promise<T>; + /** + * Run an async function in a "readwrite" transaction on the database, using + * the selected object store. + * + * The transaction function must run within the microtask queue. + * Waiting for macrotasks results in an autocommit and + * a subsequent exception thrown by this function. + */ runReadWriteTx<T, StoreNameArray extends Array<StoreNames<StoreMap>>>( opts: { storeNames: StoreNameArray; @@ -826,6 +868,14 @@ export interface DbAccess<StoreMap> { txf: (tx: DbReadWriteTransaction<StoreMap, StoreNameArray>) => Promise<T>, ): Promise<T>; + /** + * Run an async function in a "readonly" transaction on the database, using + * the selected object store. + * + * The transaction function must run within the microtask queue. + * Waiting for macrotasks results in an autocommit and + * a subsequent exception thrown by this function. + */ runReadOnlyTx<T, StoreNameArray extends Array<StoreNames<StoreMap>>>( opts: { storeNames: StoreNameArray; @@ -919,7 +969,7 @@ export class DbAccessImpl<StoreMap> implements DbAccess<StoreMap> { ); const tx = this.db.transaction(strStoreNames, mode); const writeContext = makeWriteContext(tx, accessibleStores, triggerContext); - return runTx(tx, writeContext, txf, triggerContext); + return runTx(tx, writeContext, txf, triggerContext, this.cancellationToken); } async runAllStoresReadOnlyTx<T>( @@ -946,7 +996,13 @@ export class DbAccessImpl<StoreMap> implements DbAccess<StoreMap> { ); const tx = this.db.transaction(strStoreNames, mode); const writeContext = makeReadContext(tx, accessibleStores, triggerContext); - const res = await runTx(tx, writeContext, txf, triggerContext); + const res = await runTx( + tx, + writeContext, + txf, + triggerContext, + this.cancellationToken, + ); return res; } @@ -972,7 +1028,13 @@ export class DbAccessImpl<StoreMap> implements DbAccess<StoreMap> { ); const tx = this.db.transaction(strStoreNames, mode); const writeContext = makeWriteContext(tx, accessibleStores, triggerContext); - const res = await runTx(tx, writeContext, txf, triggerContext); + const res = await runTx( + tx, + writeContext, + txf, + triggerContext, + this.cancellationToken, + ); return res; } @@ -998,7 +1060,13 @@ export class DbAccessImpl<StoreMap> implements DbAccess<StoreMap> { ); const tx = this.db.transaction(strStoreNames, mode); const readContext = makeReadContext(tx, accessibleStores, triggerContext); - const res = runTx(tx, readContext, txf, triggerContext); + const res = runTx( + tx, + readContext, + txf, + triggerContext, + this.cancellationToken, + ); return res; } } diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 97d122480..52cfc02c6 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -29,7 +29,6 @@ import { AmountJson, AmountString, Amounts, - AsyncCondition, CancellationToken, CoinDumpJson, CoinStatus, @@ -1648,11 +1647,12 @@ export function getObservedWalletExecutionContext( cancellationToken: CancellationToken, oc: ObservabilityContext, ): WalletExecutionContext { + const db = ws.createDbAccessHandle(cancellationToken); const wex: WalletExecutionContext = { ws, cancellationToken, cryptoApi: observeTalerCrypto(ws.cryptoApi, oc), - db: new ObservableDbAccess(ws.db, oc), + db: new ObservableDbAccess(db, oc), http: new ObservableHttpClientLibrary(ws.http, oc), taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc), oc, @@ -1665,11 +1665,12 @@ export function getNormalWalletExecutionContext( cancellationToken: CancellationToken, oc: ObservabilityContext, ): WalletExecutionContext { + const db = ws.createDbAccessHandle(cancellationToken); const wex: WalletExecutionContext = { ws, cancellationToken, cryptoApi: ws.cryptoApi, - db: ws.db, + db, get http() { if (ws.initCalled) { return ws.http; @@ -1933,10 +1934,9 @@ interface LongpollState { */ export class InternalWalletState { cryptoApi: TalerCryptoInterface; - cryptoDispatcher: CryptoDispatcher; + private cryptoDispatcher: CryptoDispatcher; readonly timerGroup: TimerGroup; - workAvailable = new AsyncCondition(); stopped = false; private listeners: NotificationListener[] = []; @@ -1978,6 +1978,14 @@ export class InternalWalletState { private _http: HttpRequestLibrary | undefined = undefined; + devExperimentState: DevExperimentState = {}; + + clientCancellationMap: Map<string, CancellationToken.Source> = new Map(); + + private longpollStatePerHostname: Map<string, LongpollState> = new Map(); + + private longpollRequestIdCounter = 1; + get db(): DbAccess<typeof WalletStoresV1> { if (!this._dbAccessHandle) { this._dbAccessHandle = this.createDbAccessHandle( @@ -1987,10 +1995,6 @@ export class InternalWalletState { return this._dbAccessHandle; } - private longpollStatePerHostname: Map<string, LongpollState> = new Map(); - - private longpollRequestIdCounter = 1; - /** * Run a long-polling request, potentially queueing the request * if too many other long-polling requests against the same hostname @@ -2052,10 +2056,6 @@ export class InternalWalletState { } } - devExperimentState: DevExperimentState = {}; - - clientCancellationMap: Map<string, CancellationToken.Source> = new Map(); - clearAllCaches(): void { this.exchangeCache.clear(); this.denomInfoCache.clear(); |