From 2a37575cf6db006431a7d174b85203ae41cc629f Mon Sep 17 00:00:00 2001 From: Sebastian Date: Thu, 6 Jun 2024 15:49:36 -0300 Subject: wait for previous task to be cancelled --- packages/taler-util/src/CancellationToken.ts | 2 +- packages/taler-wallet-core/src/common.ts | 3 ++- packages/taler-wallet-core/src/exchanges.ts | 2 +- packages/taler-wallet-core/src/shepherd.ts | 28 ++++++++++++++++++---------- packages/taler-wallet-core/src/withdraw.ts | 2 +- 5 files changed, 23 insertions(+), 14 deletions(-) (limited to 'packages') diff --git a/packages/taler-util/src/CancellationToken.ts b/packages/taler-util/src/CancellationToken.ts index 3aa576d77..5f38f0c7b 100644 --- a/packages/taler-util/src/CancellationToken.ts +++ b/packages/taler-util/src/CancellationToken.ts @@ -172,7 +172,7 @@ class CancellationToken { } = CancellationToken.create(); let timer: NodeJS.Timeout | null; - timer = setTimeout(() => originalCancel(CancellationToken.timeout), ms); + timer = setTimeout(() => originalCancel(`CancellationToken.timeout ${ms}`), ms); const disposeTimer = () => { if (timer == null) return; clearTimeout(timer); diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index 00d462d6f..a9e962dda 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -41,6 +41,7 @@ import { checkDbInvariant, checkLogicInvariant, durationMul, + j2s, } from "@gnu-taler/taler-util"; import { BackupProviderRecord, @@ -798,7 +799,7 @@ export async function genericWaitForState( flag.raise(); } }); - const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { + const unregisterOnCancelled = wex.cancellationToken.onCancelled((reason) => { cancelNotif(); flag.raise(); }); diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts index 05de3d8e5..626441d35 100644 --- a/packages/taler-wallet-core/src/exchanges.ts +++ b/packages/taler-wallet-core/src/exchanges.ts @@ -1363,13 +1363,13 @@ export async function updateExchangeFromUrlHandler( ); refreshCheckNecessary = false; } - if (!(updateNecessary || refreshCheckNecessary)) { logger.trace("update not necessary, running again later"); return TaskRunResult.runAgainAt( AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp), ); } + } // When doing the auto-refresh check, we always update diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index 050de479a..ee55e1da1 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -56,6 +56,7 @@ import { WalletDbAllStoresReadOnlyTransaction, WalletDbReadOnlyTransaction, timestampAbsoluteFromDb, + timestampPreciseToDb, } from "./db.js"; import { computeDepositTransactionStatus, @@ -113,6 +114,8 @@ const logger = new Logger("shepherd.ts"); */ interface ShepherdInfo { cts: CancellationToken.Source; + latch?: Promise; + stopped: boolean; } /** @@ -258,27 +261,34 @@ export class TaskSchedulerImpl implements TaskScheduler { const tasksIds = [...this.sheps.keys()]; logger.info(`reloading shepherd with ${tasksIds.length} tasks`); for (const taskId of tasksIds) { - this.stopShepherdTask(taskId); + await this.stopShepherdTask(taskId); } for (const taskId of tasksIds) { this.startShepherdTask(taskId); } } - private async internalStartShepherdTask(taskId: TaskIdStr): Promise { logger.trace(`Starting to shepherd task ${taskId}`); const oldShep = this.sheps.get(taskId); if (oldShep) { - logger.trace(`Already have a shepherd for ${taskId}`); - return; + if (!oldShep.stopped) { + logger.trace(`Already have a shepherd for ${taskId}`); + return; + } + logger.trace( + `Waiting old task to complete the loop in cancel mode ${taskId}`, + ); + await oldShep.latch; } logger.trace(`Creating new shepherd for ${taskId}`); const newShep: ShepherdInfo = { cts: CancellationToken.create(), + stopped: false, }; this.sheps.set(taskId, newShep); try { - await this.internalShepherdTask(taskId, newShep); + newShep.latch = this.internalShepherdTask(taskId, newShep); + await newShep.latch; } finally { logger.trace(`Done shepherding ${taskId}`); this.sheps.delete(taskId); @@ -291,8 +301,8 @@ export class TaskSchedulerImpl implements TaskScheduler { const oldShep = this.sheps.get(taskId); if (oldShep) { logger.trace(`Cancelling old shepherd for ${taskId}`); - oldShep.cts.cancel(); - this.sheps.delete(taskId); + oldShep.cts.cancel(`stopping task ${taskId}`); + oldShep.stopped = true; this.iterCond.trigger(); } } @@ -377,7 +387,7 @@ export class TaskSchedulerImpl implements TaskScheduler { }; } if (info.cts.token.isCancelled) { - logger.trace("task cancelled, not processing result"); + logger.trace(`task ${taskId} cancelled, not processing result`); return; } if (this.ws.stopped) { @@ -390,11 +400,9 @@ export class TaskSchedulerImpl implements TaskScheduler { }); switch (res.type) { case TaskRunResultType.Error: { - // if (logger.shouldLogTrace()) { logger.trace( `Shepherd for ${taskId} got error result: ${j2s(res.errorDetail)}`, ); - // } const retryRecord = await storePendingTaskError( this.ws, taskId, diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 42adf3585..dbd7e8673 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -3608,7 +3608,7 @@ export async function getWithdrawalDetailsForAmount( type: ObservabilityEventType.Message, contents: `Cancelling previous key ${clientCancelKey}`, }); - prevCts.cancel(); + prevCts.cancel(`getting details amount`); } else { wex.oc.observe({ type: ObservabilityEventType.Message, -- cgit v1.2.3