From 4b69853c347071acb73efcde9d4969cf06d0dfcc Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 23 Apr 2024 01:52:06 +0200 Subject: wallet-core: simple DB trigger mechanism --- packages/taler-wallet-core/src/query.ts | 126 +++++++++++++++++++++++++------ packages/taler-wallet-core/src/wallet.ts | 39 +++++++++- 2 files changed, 137 insertions(+), 28 deletions(-) (limited to 'packages/taler-wallet-core') diff --git a/packages/taler-wallet-core/src/query.ts b/packages/taler-wallet-core/src/query.ts index bdc8df0c7..0a321b835 100644 --- a/packages/taler-wallet-core/src/query.ts +++ b/packages/taler-wallet-core/src/query.ts @@ -15,8 +15,9 @@ */ /** - * Database query abstractions. - * @module Query + * @fileoverview + * Query helpers for IndexedDB databases. + * * @author Florian Dold */ @@ -563,6 +564,7 @@ function runTx( tx: IDBTransaction, arg: Arg, f: (t: Arg, t2: IDBTransaction) => Promise, + triggerContext: InternalTriggerContext, ): Promise { const stack = Error("Failed transaction was started here."); return new Promise((resolve, reject) => { @@ -583,6 +585,7 @@ function runTx( logger.error(`${stack.stack}`); reject(Error(msg)); } + triggerContext.handleAfterCommit(); resolve(funResult); }; tx.onerror = () => { @@ -627,6 +630,7 @@ function runTx( function makeReadContext( tx: IDBTransaction, storePick: { [n: string]: StoreWithIndexes }, + triggerContext: InternalTriggerContext, ): any { const ctx: { [s: string]: StoreReadOnlyAccessor } = {}; for (const storeAlias in storePick) { @@ -639,10 +643,12 @@ function makeReadContext( const indexName = indexDescriptor.name; indexes[indexAlias] = { get(key) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).index(indexName).get(key); return requestToPromise(req); }, iter(query) { + triggerContext.storesAccessed.add(storeName); const req = tx .objectStore(storeName) .index(indexName) @@ -650,6 +656,7 @@ function makeReadContext( return new ResultStream(req); }, getAll(query, count) { + triggerContext.storesAccessed.add(storeName); const req = tx .objectStore(storeName) .index(indexName) @@ -657,6 +664,7 @@ function makeReadContext( return requestToPromise(req); }, getAllKeys(query, count) { + triggerContext.storesAccessed.add(storeName); const req = tx .objectStore(storeName) .index(indexName) @@ -664,6 +672,7 @@ function makeReadContext( return requestToPromise(req); }, count(query) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).index(indexName).count(query); return requestToPromise(req); }, @@ -672,14 +681,17 @@ function makeReadContext( ctx[storeAlias] = { indexes, get(key) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).get(key); return requestToPromise(req); }, getAll(query, count) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).getAll(query, count); return requestToPromise(req); }, iter(query) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).openCursor(query); return new ResultStream(req); }, @@ -691,6 +703,7 @@ function makeReadContext( function makeWriteContext( tx: IDBTransaction, storePick: { [n: string]: StoreWithIndexes }, + triggerContext: InternalTriggerContext, ): any { const ctx: { [s: string]: StoreReadWriteAccessor } = {}; for (const storeAlias in storePick) { @@ -703,10 +716,12 @@ function makeWriteContext( const indexName = indexDescriptor.name; indexes[indexAlias] = { get(key) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).index(indexName).get(key); return requestToPromise(req); }, iter(query) { + triggerContext.storesAccessed.add(storeName); const req = tx .objectStore(storeName) .index(indexName) @@ -714,6 +729,7 @@ function makeWriteContext( return new ResultStream(req); }, getAll(query, count) { + triggerContext.storesAccessed.add(storeName); const req = tx .objectStore(storeName) .index(indexName) @@ -721,6 +737,7 @@ function makeWriteContext( return requestToPromise(req); }, getAllKeys(query, count) { + triggerContext.storesAccessed.add(storeName); const req = tx .objectStore(storeName) .index(indexName) @@ -728,6 +745,7 @@ function makeWriteContext( return requestToPromise(req); }, count(query) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).index(indexName).count(query); return requestToPromise(req); }, @@ -736,18 +754,23 @@ function makeWriteContext( ctx[storeAlias] = { indexes, get(key) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).get(key); return requestToPromise(req); }, getAll(query, count) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).getAll(query, count); return requestToPromise(req); }, iter(query) { + triggerContext.storesAccessed.add(storeName); const req = tx.objectStore(storeName).openCursor(query); return new ResultStream(req); }, async add(r, k) { + triggerContext.storesAccessed.add(storeName); + triggerContext.storesModified.add(storeName); const req = tx.objectStore(storeName).add(r, k); const key = await requestToPromise(req); return { @@ -755,6 +778,8 @@ function makeWriteContext( }; }, async put(r, k) { + triggerContext.storesAccessed.add(storeName); + triggerContext.storesModified.add(storeName); const req = tx.objectStore(storeName).put(r, k); const key = await requestToPromise(req); return { @@ -762,6 +787,8 @@ function makeWriteContext( }; }, delete(k) { + triggerContext.storesAccessed.add(storeName); + triggerContext.storesModified.add(storeName); const req = tx.objectStore(storeName).delete(k); return requestToPromise(req); }, @@ -802,11 +829,47 @@ export interface DbAccess { ): Promise; } +export interface AfterCommitInfo { + mode: IDBTransactionMode; + scope: Set; + accessedStores: Set; + modifiedStores: Set; +} + export interface TriggerSpec { /** * Trigger run after every successful commit, run outside of the transaction. */ - afterCommit?: (mode: IDBTransactionMode, stores: string[]) => void; + afterCommit?: (info: AfterCommitInfo) => void; + + // onRead(store, value) + // initState () => State + // beforeCommit? (tx: Transaction, s: State | undefined) => Promise; +} + +class InternalTriggerContext { + storesScope: Set; + storesAccessed: Set = new Set(); + storesModified: Set = new Set(); + + constructor( + private triggerSpec: TriggerSpec, + private mode: IDBTransactionMode, + scope: string[], + ) { + this.storesScope = new Set(scope); + } + + handleAfterCommit() { + if (this.triggerSpec.afterCommit) { + this.triggerSpec.afterCommit({ + mode: this.mode, + accessedStores: this.storesAccessed, + modifiedStores: this.storesModified, + scope: this.storesScope, + }); + } + } } /** @@ -842,12 +905,18 @@ export class DbAccessImpl implements DbAccess { strStoreNames.push(swi.storeName); accessibleStores[swi.storeName] = swi; } - const tx = this.db.transaction(strStoreNames, "readwrite"); - const writeContext = makeWriteContext(tx, accessibleStores); - return runTx(tx, writeContext, txf); + const mode = "readwrite"; + const triggerContext = new InternalTriggerContext( + this.triggers, + mode, + strStoreNames, + ); + const tx = this.db.transaction(strStoreNames, mode); + const writeContext = makeWriteContext(tx, accessibleStores, triggerContext); + return runTx(tx, writeContext, txf, triggerContext); } - runAllStoresReadOnlyTx( + async runAllStoresReadOnlyTx( options: { label?: string; }, @@ -863,12 +932,19 @@ export class DbAccessImpl implements DbAccess { strStoreNames.push(swi.storeName); accessibleStores[swi.storeName] = swi; } - const tx = this.db.transaction(strStoreNames, "readonly"); - const writeContext = makeReadContext(tx, accessibleStores); - return runTx(tx, writeContext, txf); + const mode = "readonly"; + const triggerContext = new InternalTriggerContext( + this.triggers, + mode, + strStoreNames, + ); + const tx = this.db.transaction(strStoreNames, mode); + const writeContext = makeReadContext(tx, accessibleStores, triggerContext); + const res = await runTx(tx, writeContext, txf, triggerContext); + return res; } - runReadWriteTx>>( + async runReadWriteTx>>( storeNames: StoreNameArray, txf: (tx: DbReadWriteTransaction) => Promise, ): Promise { @@ -881,12 +957,14 @@ export class DbAccessImpl implements DbAccess { accessibleStores[swi.storeName] = swi; } const mode = "readwrite"; + const triggerContext = new InternalTriggerContext( + this.triggers, + mode, + strStoreNames, + ); const tx = this.db.transaction(strStoreNames, mode); - const writeContext = makeWriteContext(tx, accessibleStores); - const res = runTx(tx, writeContext, txf); - if (this.triggers.afterCommit) { - this.triggers.afterCommit(mode, strStoreNames); - } + const writeContext = makeWriteContext(tx, accessibleStores, triggerContext); + const res = await runTx(tx, writeContext, txf, triggerContext); return res; } @@ -903,16 +981,14 @@ export class DbAccessImpl implements DbAccess { accessibleStores[swi.storeName] = swi; } const mode = "readonly"; + const triggerContext = new InternalTriggerContext( + this.triggers, + mode, + strStoreNames, + ); const tx = this.db.transaction(strStoreNames, mode); - const readContext = makeReadContext(tx, accessibleStores); - const res = runTx(tx, readContext, txf); - if (this.triggers.afterCommit) { - this.triggers.afterCommit(mode, strStoreNames); - } + const readContext = makeReadContext(tx, accessibleStores, triggerContext); + const res = runTx(tx, readContext, txf, triggerContext); return res; } - - registerPostCommitTrigger(args: { - handler: (storeNames: string[]) => void; - }): void {} } diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 9f9b90446..810c78583 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -243,7 +243,12 @@ import { checkPeerPushDebit, initiatePeerPushDebit, } from "./pay-peer-push-debit.js"; -import { DbAccess, DbAccessImpl } from "./query.js"; +import { + AfterCommitInfo, + DbAccess, + DbAccessImpl, + TriggerSpec, +} from "./query.js"; import { forceRefresh } from "./refresh.js"; import { TaskScheduler, @@ -665,7 +670,7 @@ export interface PendingOperationsResponse { /** * Implementation of the "wallet-core" API. */ -async function dispatchRequestInternal( +async function dispatchRequestInternal( wex: WalletExecutionContext, cts: CancellationToken.Source, operation: WalletApiOperation, @@ -1712,6 +1717,34 @@ export class Cache { } } +/** + * Implementation of triggers for the wallet DB. + */ +class WalletDbTriggerSpec implements TriggerSpec { + constructor(public ws: InternalWalletState) {} + + afterCommit(info: AfterCommitInfo): void { + if (info.mode !== "readwrite") { + return; + } + logger.info( + `in after commit callback for readwrite, modified ${j2s([ + ...info.modifiedStores, + ])}`, + ); + const modified = info.accessedStores; + if ( + modified.has(WalletStoresV1.exchanges.storeName) || + modified.has(WalletStoresV1.exchangeDetails.storeName) || + modified.has(WalletStoresV1.denominations.storeName) || + modified.has(WalletStoresV1.globalCurrencyAuditors.storeName) || + modified.has(WalletStoresV1.globalCurrencyExchanges.storeName) + ) { + this.ws.clearAllCaches(); + } + } +} + /** * Internal state of the wallet. * @@ -1804,7 +1837,7 @@ export class InternalWalletState { return new DbAccessImpl( this._indexedDbHandle, WalletStoresV1, - {}, + new WalletDbTriggerSpec(this), cancellationToken, ); } -- cgit v1.2.3