aboutsummaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-06-26 16:02:04 +0200
committerFlorian Dold <florian@dold.me>2024-06-26 16:02:16 +0200
commit60a08dd50be03f70561852c6893ab366d22568d6 (patch)
tree241c52008a1b0f1e1e81f9d625899177e44db588 /packages
parent0e434a26df32fd1e6b15713dd5c2433728913697 (diff)
downloadwallet-core-60a08dd50be03f70561852c6893ab366d22568d6.tar.xz
wallet-core: cancellation token support for DB handles
Diffstat (limited to 'packages')
-rw-r--r--packages/taler-wallet-core/src/query.ts86
-rw-r--r--packages/taler-wallet-core/src/wallet.ts26
2 files changed, 90 insertions, 22 deletions
diff --git a/packages/taler-wallet-core/src/query.ts b/packages/taler-wallet-core/src/query.ts
index dc15bbdd1..9cbde9304 100644
--- a/packages/taler-wallet-core/src/query.ts
+++ b/packages/taler-wallet-core/src/query.ts
@@ -565,8 +565,17 @@ function runTx<Arg, Res>(
arg: Arg,
f: (t: Arg, t2: IDBTransaction) => Promise<Res>,
triggerContext: InternalTriggerContext,
+ cancellationToken: CancellationToken,
): Promise<Res> {
+ cancellationToken.throwIfCancelled();
+ // Create stack trace in case we need to to print later where
+ // the transaction was started.
const stack = Error("Failed transaction was started here.");
+
+ const unregisterOnCancelled = cancellationToken.onCancelled(() => {
+ tx.abort();
+ });
+
return new Promise((resolve, reject) => {
let funResult: any = undefined;
let gotFunResult = false;
@@ -577,16 +586,18 @@ function runTx<Arg, Res>(
// function waited on a promise that is *not* resolved in the
// microtask queue, thus triggering the auto-commit behavior.
// Unfortunately, the auto-commit behavior of IDB can't be switched
- // of. There are some proposals to add this functionality in the future.
+ // off. There are some proposals to add this functionality in the future.
if (!gotFunResult) {
const msg =
"BUG: transaction closed before transaction function returned";
logger.error(msg);
logger.error(`${stack.stack}`);
reject(Error(msg));
+ } else {
+ resolve(funResult);
}
triggerContext.handleAfterCommit();
- resolve(funResult);
+ unregisterOnCancelled();
};
tx.onerror = () => {
logger.error("error in transaction");
@@ -604,6 +615,7 @@ function runTx<Arg, Res>(
logger.error(msg);
logger.error(`${stack.stack}`);
reject(new TransactionAbortedError(msg));
+ unregisterOnCancelled();
};
const resP = Promise.resolve().then(() => f(arg, tx));
resP
@@ -620,9 +632,6 @@ function runTx<Arg, Res>(
console.error(stack);
tx.abort();
}
- })
- .catch((e) => {
- console.error("fatal: aborting transaction failed", e);
});
});
}
@@ -797,9 +806,26 @@ function makeWriteContext(
return ctx;
}
+/**
+ * Handle for typed access to a database.
+ */
export interface DbAccess<StoreMap> {
+ /**
+ * The underlying IndexedDB database handle.
+ *
+ * Use with caution, as using the handle directly will not
+ * properly run DB triggers.
+ */
idbHandle(): IDBDatabase;
+ /**
+ * Run an async function in a "readwrite" transaction on the database, using
+ * all object store.
+ *
+ * The transaction function must run within the microtask queue.
+ * Waiting for macrotasks results in an autocommit and
+ * a subsequent exception thrown by this function.
+ */
runAllStoresReadWriteTx<T>(
options: {
label?: string;
@@ -809,6 +835,14 @@ export interface DbAccess<StoreMap> {
) => Promise<T>,
): Promise<T>;
+ /**
+ * Run an async function in a "readonly" transaction on the database, using
+ * all object store.
+ *
+ * The transaction function must run within the microtask queue.
+ * Waiting for macrotasks results in an autocommit and
+ * a subsequent exception thrown by this function.
+ */
runAllStoresReadOnlyTx<T>(
options: {
label?: string;
@@ -818,6 +852,14 @@ export interface DbAccess<StoreMap> {
) => Promise<T>,
): Promise<T>;
+ /**
+ * Run an async function in a "readwrite" transaction on the database, using
+ * the selected object store.
+ *
+ * The transaction function must run within the microtask queue.
+ * Waiting for macrotasks results in an autocommit and
+ * a subsequent exception thrown by this function.
+ */
runReadWriteTx<T, StoreNameArray extends Array<StoreNames<StoreMap>>>(
opts: {
storeNames: StoreNameArray;
@@ -826,6 +868,14 @@ export interface DbAccess<StoreMap> {
txf: (tx: DbReadWriteTransaction<StoreMap, StoreNameArray>) => Promise<T>,
): Promise<T>;
+ /**
+ * Run an async function in a "readonly" transaction on the database, using
+ * the selected object store.
+ *
+ * The transaction function must run within the microtask queue.
+ * Waiting for macrotasks results in an autocommit and
+ * a subsequent exception thrown by this function.
+ */
runReadOnlyTx<T, StoreNameArray extends Array<StoreNames<StoreMap>>>(
opts: {
storeNames: StoreNameArray;
@@ -919,7 +969,7 @@ export class DbAccessImpl<StoreMap> implements DbAccess<StoreMap> {
);
const tx = this.db.transaction(strStoreNames, mode);
const writeContext = makeWriteContext(tx, accessibleStores, triggerContext);
- return runTx(tx, writeContext, txf, triggerContext);
+ return runTx(tx, writeContext, txf, triggerContext, this.cancellationToken);
}
async runAllStoresReadOnlyTx<T>(
@@ -946,7 +996,13 @@ export class DbAccessImpl<StoreMap> implements DbAccess<StoreMap> {
);
const tx = this.db.transaction(strStoreNames, mode);
const writeContext = makeReadContext(tx, accessibleStores, triggerContext);
- const res = await runTx(tx, writeContext, txf, triggerContext);
+ const res = await runTx(
+ tx,
+ writeContext,
+ txf,
+ triggerContext,
+ this.cancellationToken,
+ );
return res;
}
@@ -972,7 +1028,13 @@ export class DbAccessImpl<StoreMap> implements DbAccess<StoreMap> {
);
const tx = this.db.transaction(strStoreNames, mode);
const writeContext = makeWriteContext(tx, accessibleStores, triggerContext);
- const res = await runTx(tx, writeContext, txf, triggerContext);
+ const res = await runTx(
+ tx,
+ writeContext,
+ txf,
+ triggerContext,
+ this.cancellationToken,
+ );
return res;
}
@@ -998,7 +1060,13 @@ export class DbAccessImpl<StoreMap> implements DbAccess<StoreMap> {
);
const tx = this.db.transaction(strStoreNames, mode);
const readContext = makeReadContext(tx, accessibleStores, triggerContext);
- const res = runTx(tx, readContext, txf, triggerContext);
+ const res = runTx(
+ tx,
+ readContext,
+ txf,
+ triggerContext,
+ this.cancellationToken,
+ );
return res;
}
}
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 97d122480..52cfc02c6 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -29,7 +29,6 @@ import {
AmountJson,
AmountString,
Amounts,
- AsyncCondition,
CancellationToken,
CoinDumpJson,
CoinStatus,
@@ -1648,11 +1647,12 @@ export function getObservedWalletExecutionContext(
cancellationToken: CancellationToken,
oc: ObservabilityContext,
): WalletExecutionContext {
+ const db = ws.createDbAccessHandle(cancellationToken);
const wex: WalletExecutionContext = {
ws,
cancellationToken,
cryptoApi: observeTalerCrypto(ws.cryptoApi, oc),
- db: new ObservableDbAccess(ws.db, oc),
+ db: new ObservableDbAccess(db, oc),
http: new ObservableHttpClientLibrary(ws.http, oc),
taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc),
oc,
@@ -1665,11 +1665,12 @@ export function getNormalWalletExecutionContext(
cancellationToken: CancellationToken,
oc: ObservabilityContext,
): WalletExecutionContext {
+ const db = ws.createDbAccessHandle(cancellationToken);
const wex: WalletExecutionContext = {
ws,
cancellationToken,
cryptoApi: ws.cryptoApi,
- db: ws.db,
+ db,
get http() {
if (ws.initCalled) {
return ws.http;
@@ -1933,10 +1934,9 @@ interface LongpollState {
*/
export class InternalWalletState {
cryptoApi: TalerCryptoInterface;
- cryptoDispatcher: CryptoDispatcher;
+ private cryptoDispatcher: CryptoDispatcher;
readonly timerGroup: TimerGroup;
- workAvailable = new AsyncCondition();
stopped = false;
private listeners: NotificationListener[] = [];
@@ -1978,6 +1978,14 @@ export class InternalWalletState {
private _http: HttpRequestLibrary | undefined = undefined;
+ devExperimentState: DevExperimentState = {};
+
+ clientCancellationMap: Map<string, CancellationToken.Source> = new Map();
+
+ private longpollStatePerHostname: Map<string, LongpollState> = new Map();
+
+ private longpollRequestIdCounter = 1;
+
get db(): DbAccess<typeof WalletStoresV1> {
if (!this._dbAccessHandle) {
this._dbAccessHandle = this.createDbAccessHandle(
@@ -1987,10 +1995,6 @@ export class InternalWalletState {
return this._dbAccessHandle;
}
- private longpollStatePerHostname: Map<string, LongpollState> = new Map();
-
- private longpollRequestIdCounter = 1;
-
/**
* Run a long-polling request, potentially queueing the request
* if too many other long-polling requests against the same hostname
@@ -2052,10 +2056,6 @@ export class InternalWalletState {
}
}
- devExperimentState: DevExperimentState = {};
-
- clientCancellationMap: Map<string, CancellationToken.Source> = new Map();
-
clearAllCaches(): void {
this.exchangeCache.clear();
this.denomInfoCache.clear();