diff options
Diffstat (limited to 'packages/taler-wallet-core')
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index 0bc548a60..e6ea412e3 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -26,6 +26,7 @@ import { Logger, NotificationType, ObservabilityContext, + ObservabilityEventType, RetryLoopOpts, TalerError, TalerErrorCode, @@ -327,15 +328,16 @@ export class TaskSchedulerImpl implements TaskScheduler { Duration.fromSpec({ seconds: 60 }), ); } + const wex = getWalletExecutionContextForTask( + this.ws, + taskId, + info.cts.token, + ); const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); // FIXME: This should already return the retry record. const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { - return await callOperationHandlerForTaskId( - this.ws, - taskId, - info.cts.token, - ); + return await callOperationHandlerForTaskId(wex, taskId); }); const retryRecord = await this.ws.db.runReadOnlyTx( ["operationRetries"], @@ -343,6 +345,10 @@ export class TaskSchedulerImpl implements TaskScheduler { return tx.operationRetries.get(taskId); }, ); + wex.oc.observe({ + type: ObservabilityEventType.ShepherdTaskResult, + resultType: res.type, + }); switch (res.type) { case TaskRunResultType.Error: { logger.trace(`Shepherd for ${taskId} got error result.`); @@ -573,11 +579,11 @@ async function runTaskWithErrorReporting( } } -async function callOperationHandlerForTaskId( +function getWalletExecutionContextForTask( ws: InternalWalletState, taskId: TaskIdStr, cancellationToken: CancellationToken, -): Promise<TaskRunResult> { +): WalletExecutionContext { let oc: ObservabilityContext; let wex: WalletExecutionContext; @@ -594,14 +600,20 @@ async function callOperationHandlerForTaskId( }, }; - wex = getObservedWalletExecutionContext(ws, CancellationToken.CONTINUE, oc); + wex = getObservedWalletExecutionContext(ws, cancellationToken, oc); } else { oc = { observe(evt) {}, }; - wex = getNormalWalletExecutionContext(ws, CancellationToken.CONTINUE, oc); + wex = getNormalWalletExecutionContext(ws, cancellationToken, oc); } + return wex; +} +async function callOperationHandlerForTaskId( + wex: WalletExecutionContext, + taskId: TaskIdStr, +): Promise<TaskRunResult> { const pending = parseTaskIdentifier(taskId); switch (pending.tag) { case PendingTaskType.ExchangeUpdate: |