From 6584d9e054faf9a927708f1f7f51bcbed7873afb Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 27 Feb 2024 23:23:36 +0100 Subject: observability --- packages/taler-util/src/notifications.ts | 39 ++++-- .../taler-wallet-core/src/observable-wrappers.ts | 138 ++++++++++++++++++--- packages/taler-wallet-core/src/shepherd.ts | 55 ++++---- packages/taler-wallet-core/src/wallet.ts | 65 +++++++--- 4 files changed, 224 insertions(+), 73 deletions(-) diff --git a/packages/taler-util/src/notifications.ts b/packages/taler-util/src/notifications.ts index 9a62362e7..d0095dcd2 100644 --- a/packages/taler-util/src/notifications.ts +++ b/packages/taler-util/src/notifications.ts @@ -114,15 +114,18 @@ export enum ObservabilityEventType { HttpFetchFinishError = "http-fetch-finish-success", HttpFetchFinishSuccess = "http-fetch-finish-error", DbQueryStart = "db-query-start", - DbQueryCommit = "db-query-commit", - DbQueryAbort = "db-query-abort", + DbQueryFinishSuccess = "db-query-finish-error", + DbQueryFinishError = "db-query-finish-success", RequestStart = "request-start", RequestFinishSuccess = "request-finish-success", RequestFinishError = "request-finish-error", - StartTask = "start-task", - StopTask = "stop-task", - ResetTask = "reset-task", - TaskDependency = "task-dependency", + TaskStart = "start-task", + TaskStop = "stop-task", + TaskReset = "reset-task", + DeclareTaskDependency = "declare-task-dependency", + CryptoStart = "crypto-start", + CryptoFinishSuccess = "crypto-finish-success", + CryptoFinishError = "crypto-finish-error", } export type ObservabilityEvent = @@ -146,12 +149,12 @@ export type ObservabilityEvent = location: string; } | { - type: ObservabilityEventType.DbQueryCommit; + type: ObservabilityEventType.DbQueryFinishSuccess; name: string; location: string; } | { - type: ObservabilityEventType.DbQueryAbort; + type: ObservabilityEventType.DbQueryFinishError; name: string; location: string; } @@ -165,20 +168,32 @@ export type ObservabilityEvent = type: ObservabilityEventType.RequestFinishError; } | { - type: ObservabilityEventType.StartTask; + type: ObservabilityEventType.TaskStart; taskId: string; } | { - type: ObservabilityEventType.StopTask; + type: ObservabilityEventType.TaskStop; taskId: string; } | { - type: ObservabilityEventType.ResetTask; + type: ObservabilityEventType.TaskReset; taskId: string; } | { - type: ObservabilityEventType.TaskDependency; + type: ObservabilityEventType.DeclareTaskDependency; taskId: string; + } + | { + type: ObservabilityEventType.CryptoStart; + operation: string; + } + | { + type: ObservabilityEventType.CryptoFinishSuccess; + operation: string; + } + | { + type: ObservabilityEventType.CryptoFinishError; + operation: string; }; export interface BackupOperationErrorNotification { diff --git a/packages/taler-wallet-core/src/observable-wrappers.ts b/packages/taler-wallet-core/src/observable-wrappers.ts index 3df084268..f2c76ae5b 100644 --- a/packages/taler-wallet-core/src/observable-wrappers.ts +++ b/packages/taler-wallet-core/src/observable-wrappers.ts @@ -34,6 +34,7 @@ import { HttpResponse, } from "@gnu-taler/taler-util/http"; import { TaskIdStr } from "./common.js"; +import { TalerCryptoInterface } from "./index.js"; import { DbAccess, DbReadOnlyTransaction, @@ -60,7 +61,7 @@ export class ObservableTaskScheduler implements TaskScheduler { if (!this.taskDepCache.has(taskId)) { this.taskDepCache.add(taskId); this.oc.observe({ - type: ObservabilityEventType.TaskDependency, + type: ObservabilityEventType.DeclareTaskDependency, taskId, }); } @@ -76,7 +77,7 @@ export class ObservableTaskScheduler implements TaskScheduler { startShepherdTask(taskId: TaskIdStr): void { this.declareDep(taskId); this.oc.observe({ - type: ObservabilityEventType.StartTask, + type: ObservabilityEventType.TaskStart, taskId, }); return this.impl.startShepherdTask(taskId); @@ -84,7 +85,7 @@ export class ObservableTaskScheduler implements TaskScheduler { stopShepherdTask(taskId: TaskIdStr): void { this.declareDep(taskId); this.oc.observe({ - type: ObservabilityEventType.StopTask, + type: ObservabilityEventType.TaskStop, taskId, }); return this.impl.stopShepherdTask(taskId); @@ -95,7 +96,7 @@ export class ObservableTaskScheduler implements TaskScheduler { this.taskDepCache.clear(); } this.oc.observe({ - type: ObservabilityEventType.ResetTask, + type: ObservabilityEventType.TaskReset, taskId, }); return this.impl.resetTaskRetries(taskId); @@ -160,53 +161,150 @@ export class ObservableDbAccess implements DbAccess { return this.impl.idbHandle(); } - runAllStoresReadWriteTx( + async runAllStoresReadWriteTx( txf: ( tx: DbReadWriteTransaction[]>, ) => Promise, ): Promise { + const location = getCallerInfo(); this.oc.observe({ type: ObservabilityEventType.DbQueryStart, name: "", - location: getCallerInfo(), + location, }); - return this.impl.runAllStoresReadWriteTx(txf); + try { + const ret = await this.impl.runAllStoresReadWriteTx(txf); + this.oc.observe({ + type: ObservabilityEventType.DbQueryFinishSuccess, + name: "", + location, + }); + return ret; + } catch (e) { + this.oc.observe({ + type: ObservabilityEventType.DbQueryFinishError, + name: "", + location, + }); + throw e; + } } - runAllStoresReadOnlyTx( + async runAllStoresReadOnlyTx( txf: ( tx: DbReadOnlyTransaction[]>, ) => Promise, ): Promise { + const location = getCallerInfo(); this.oc.observe({ type: ObservabilityEventType.DbQueryStart, name: "", - location: getCallerInfo(), + location, }); - return this.impl.runAllStoresReadOnlyTx(txf); + try { + const ret = await this.impl.runAllStoresReadOnlyTx(txf); + this.oc.observe({ + type: ObservabilityEventType.DbQueryFinishSuccess, + name: "", + location, + }); + return ret; + } catch (e) { + this.oc.observe({ + type: ObservabilityEventType.DbQueryFinishError, + name: "", + location, + }); + throw e; + } } - runReadWriteTx[]>( + async runReadWriteTx[]>( storeNames: StoreNameArray, txf: (tx: DbReadWriteTransaction) => Promise, ): Promise { + const location = getCallerInfo(); this.oc.observe({ type: ObservabilityEventType.DbQueryStart, name: "", - location: getCallerInfo(), + location, }); - return this.impl.runReadWriteTx(storeNames, txf); + try { + const ret = await this.impl.runReadWriteTx(storeNames, txf); + this.oc.observe({ + type: ObservabilityEventType.DbQueryFinishSuccess, + name: "", + location, + }); + return ret; + } catch (e) { + this.oc.observe({ + type: ObservabilityEventType.DbQueryFinishError, + name: "", + location, + }); + throw e; + } } - runReadOnlyTx[]>( + async runReadOnlyTx[]>( storeNames: StoreNameArray, txf: (tx: DbReadOnlyTransaction) => Promise, ): Promise { - this.oc.observe({ - type: ObservabilityEventType.DbQueryStart, - name: "", - location: getCallerInfo(), - }); - return this.impl.runReadOnlyTx(storeNames, txf); + const location = getCallerInfo(); + try { + this.oc.observe({ + type: ObservabilityEventType.DbQueryStart, + name: "", + location, + }); + const ret = await this.impl.runReadOnlyTx(storeNames, txf); + this.oc.observe({ + type: ObservabilityEventType.DbQueryFinishSuccess, + name: "", + location, + }); + return ret; + } catch (e) { + this.oc.observe({ + type: ObservabilityEventType.DbQueryFinishError, + name: "", + location, + }); + throw e; + } } } + +export function observeTalerCrypto( + impl: TalerCryptoInterface, + oc: ObservabilityContext, +): TalerCryptoInterface { + return Object.fromEntries( + Object.keys(impl).map((name) => { + return [ + name, + async (req: any) => { + oc.observe({ + type: ObservabilityEventType.CryptoStart, + operation: name, + }); + try { + const res = await (impl as any)[name](req); + oc.observe({ + type: ObservabilityEventType.CryptoFinishSuccess, + operation: name, + }); + return res; + } catch (e) { + oc.observe({ + type: ObservabilityEventType.CryptoFinishError, + operation: name, + }); + throw e; + } + }, + ]; + }), + ) as any; +} diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index 66c39ee48..0bc548a60 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -63,11 +63,6 @@ import { processDepositGroup, } from "./deposits.js"; import { updateExchangeFromUrlHandler } from "./exchanges.js"; -import { - ObservableDbAccess, - ObservableHttpClientLibrary, - ObservableTaskScheduler, -} from "./observable-wrappers.js"; import { computePayMerchantTransactionState, computeRefundTransactionState, @@ -99,7 +94,12 @@ import { constructTransactionIdentifier, parseTransactionIdentifier, } from "./transactions.js"; -import { InternalWalletState, WalletExecutionContext } from "./wallet.js"; +import { + InternalWalletState, + WalletExecutionContext, + getNormalWalletExecutionContext, + getObservedWalletExecutionContext, +} from "./wallet.js"; import { computeWithdrawalTransactionStatus, processWithdrawalGroup, @@ -578,27 +578,30 @@ async function callOperationHandlerForTaskId( taskId: TaskIdStr, cancellationToken: CancellationToken, ): Promise { - const oc: ObservabilityContext = { - observe(evt) { - if (ws.config.testing.emitObservabilityEvents) { - ws.notify({ - type: NotificationType.TaskObservabilityEvent, - taskId, - event: evt, - }); - } - }, - }; + let oc: ObservabilityContext; + let wex: WalletExecutionContext; + + if (ws.config.testing.emitObservabilityEvents) { + oc = { + observe(evt) { + if (ws.config.testing.emitObservabilityEvents) { + ws.notify({ + type: NotificationType.TaskObservabilityEvent, + taskId, + event: evt, + }); + } + }, + }; + + wex = getObservedWalletExecutionContext(ws, CancellationToken.CONTINUE, oc); + } else { + oc = { + observe(evt) {}, + }; + wex = getNormalWalletExecutionContext(ws, CancellationToken.CONTINUE, oc); + } - const wex: WalletExecutionContext = { - ws, - cancellationToken, - cryptoApi: ws.cryptoApi, - db: new ObservableDbAccess(ws.db, oc), - http: new ObservableHttpClientLibrary(ws.http, oc), - taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc), - oc, - }; const pending = parseTaskIdentifier(taskId); switch (pending.tag) { case PendingTaskType.ExchangeUpdate: diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index c3aa68303..ea3c4bb83 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -204,6 +204,7 @@ import { ObservableDbAccess, ObservableHttpClientLibrary, ObservableTaskScheduler, + observeTalerCrypto, } from "./observable-wrappers.js"; import { confirmPay, @@ -1363,6 +1364,40 @@ export function getVersion(wex: WalletExecutionContext): WalletCoreVersion { return result; } +export function getObservedWalletExecutionContext( + ws: InternalWalletState, + cancellationToken: CancellationToken, + oc: ObservabilityContext, +) { + const wex: WalletExecutionContext = { + ws, + cancellationToken, + cryptoApi: observeTalerCrypto(ws.cryptoApi, oc), + db: new ObservableDbAccess(ws.db, oc), + http: new ObservableHttpClientLibrary(ws.http, oc), + taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc), + oc, + }; + return wex; +} + +export function getNormalWalletExecutionContext( + ws: InternalWalletState, + cancellationToken: CancellationToken, + oc: ObservabilityContext, +) { + const wex: WalletExecutionContext = { + ws, + cancellationToken, + cryptoApi: ws.cryptoApi, + db: ws.db, + http: ws.http, + taskScheduler: ws.taskScheduler, + oc, + }; + return wex; +} + /** * Handle a request to the wallet-core API. */ @@ -1372,28 +1407,28 @@ async function handleCoreApiRequest( id: string, payload: unknown, ): Promise { - const oc: ObservabilityContext = { - observe(evt) { - if (ws.config.testing.emitObservabilityEvents) { + let wex: WalletExecutionContext; + let oc: ObservabilityContext; + + if (ws.config.testing.emitObservabilityEvents) { + oc = { + observe(evt) { ws.notify({ type: NotificationType.RequestObservabilityEvent, operation, requestId: id, event: evt, }); - } - }, - }; + }, + }; - const wex: WalletExecutionContext = { - ws, - cancellationToken: CancellationToken.CONTINUE, - cryptoApi: ws.cryptoApi, - db: new ObservableDbAccess(ws.db, oc), - http: new ObservableHttpClientLibrary(ws.http, oc), - taskScheduler: new ObservableTaskScheduler(ws.taskScheduler, oc), - oc, - }; + wex = getObservedWalletExecutionContext(ws, CancellationToken.CONTINUE, oc); + } else { + oc = { + observe(evt) {}, + }; + wex = getNormalWalletExecutionContext(ws, CancellationToken.CONTINUE, oc); + } try { await ws.ensureWalletDbOpen(); -- cgit v1.2.3