diff options
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 28 |
1 files changed, 18 insertions, 10 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index 050de479a..ee55e1da1 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -56,6 +56,7 @@ import { WalletDbAllStoresReadOnlyTransaction, WalletDbReadOnlyTransaction, timestampAbsoluteFromDb, + timestampPreciseToDb, } from "./db.js"; import { computeDepositTransactionStatus, @@ -113,6 +114,8 @@ const logger = new Logger("shepherd.ts"); */ interface ShepherdInfo { cts: CancellationToken.Source; + latch?: Promise<void>; + stopped: boolean; } /** @@ -258,27 +261,34 @@ export class TaskSchedulerImpl implements TaskScheduler { const tasksIds = [...this.sheps.keys()]; logger.info(`reloading shepherd with ${tasksIds.length} tasks`); for (const taskId of tasksIds) { - this.stopShepherdTask(taskId); + await this.stopShepherdTask(taskId); } for (const taskId of tasksIds) { this.startShepherdTask(taskId); } } - private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> { logger.trace(`Starting to shepherd task ${taskId}`); const oldShep = this.sheps.get(taskId); if (oldShep) { - logger.trace(`Already have a shepherd for ${taskId}`); - return; + if (!oldShep.stopped) { + logger.trace(`Already have a shepherd for ${taskId}`); + return; + } + logger.trace( + `Waiting old task to complete the loop in cancel mode ${taskId}`, + ); + await oldShep.latch; } logger.trace(`Creating new shepherd for ${taskId}`); const newShep: ShepherdInfo = { cts: CancellationToken.create(), + stopped: false, }; this.sheps.set(taskId, newShep); try { - await this.internalShepherdTask(taskId, newShep); + newShep.latch = this.internalShepherdTask(taskId, newShep); + await newShep.latch; } finally { logger.trace(`Done shepherding ${taskId}`); this.sheps.delete(taskId); @@ -291,8 +301,8 @@ export class TaskSchedulerImpl implements TaskScheduler { const oldShep = this.sheps.get(taskId); if (oldShep) { logger.trace(`Cancelling old shepherd for ${taskId}`); - oldShep.cts.cancel(); - this.sheps.delete(taskId); + oldShep.cts.cancel(`stopping task ${taskId}`); + oldShep.stopped = true; this.iterCond.trigger(); } } @@ -377,7 +387,7 @@ export class TaskSchedulerImpl implements TaskScheduler { }; } if (info.cts.token.isCancelled) { - logger.trace("task cancelled, not processing result"); + logger.trace(`task ${taskId} cancelled, not processing result`); return; } if (this.ws.stopped) { @@ -390,11 +400,9 @@ export class TaskSchedulerImpl implements TaskScheduler { }); switch (res.type) { case TaskRunResultType.Error: { - // if (logger.shouldLogTrace()) { logger.trace( `Shepherd for ${taskId} got error result: ${j2s(res.errorDetail)}`, ); - // } const retryRecord = await storePendingTaskError( this.ws, taskId, |