aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts28
1 files changed, 18 insertions, 10 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index 050de479a..ee55e1da1 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -56,6 +56,7 @@ import {
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadOnlyTransaction,
timestampAbsoluteFromDb,
+ timestampPreciseToDb,
} from "./db.js";
import {
computeDepositTransactionStatus,
@@ -113,6 +114,8 @@ const logger = new Logger("shepherd.ts");
*/
interface ShepherdInfo {
cts: CancellationToken.Source;
+ latch?: Promise<void>;
+ stopped: boolean;
}
/**
@@ -258,27 +261,34 @@ export class TaskSchedulerImpl implements TaskScheduler {
const tasksIds = [...this.sheps.keys()];
logger.info(`reloading shepherd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {
- this.stopShepherdTask(taskId);
+ await this.stopShepherdTask(taskId);
}
for (const taskId of tasksIds) {
this.startShepherdTask(taskId);
}
}
-
private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> {
logger.trace(`Starting to shepherd task ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
- logger.trace(`Already have a shepherd for ${taskId}`);
- return;
+ if (!oldShep.stopped) {
+ logger.trace(`Already have a shepherd for ${taskId}`);
+ return;
+ }
+ logger.trace(
+ `Waiting old task to complete the loop in cancel mode ${taskId}`,
+ );
+ await oldShep.latch;
}
logger.trace(`Creating new shepherd for ${taskId}`);
const newShep: ShepherdInfo = {
cts: CancellationToken.create(),
+ stopped: false,
};
this.sheps.set(taskId, newShep);
try {
- await this.internalShepherdTask(taskId, newShep);
+ newShep.latch = this.internalShepherdTask(taskId, newShep);
+ await newShep.latch;
} finally {
logger.trace(`Done shepherding ${taskId}`);
this.sheps.delete(taskId);
@@ -291,8 +301,8 @@ export class TaskSchedulerImpl implements TaskScheduler {
const oldShep = this.sheps.get(taskId);
if (oldShep) {
logger.trace(`Cancelling old shepherd for ${taskId}`);
- oldShep.cts.cancel();
- this.sheps.delete(taskId);
+ oldShep.cts.cancel(`stopping task ${taskId}`);
+ oldShep.stopped = true;
this.iterCond.trigger();
}
}
@@ -377,7 +387,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
};
}
if (info.cts.token.isCancelled) {
- logger.trace("task cancelled, not processing result");
+ logger.trace(`task ${taskId} cancelled, not processing result`);
return;
}
if (this.ws.stopped) {
@@ -390,11 +400,9 @@ export class TaskSchedulerImpl implements TaskScheduler {
});
switch (res.type) {
case TaskRunResultType.Error: {
- // if (logger.shouldLogTrace()) {
logger.trace(
`Shepherd for ${taskId} got error result: ${j2s(res.errorDetail)}`,
);
- // }
const retryRecord = await storePendingTaskError(
this.ws,
taskId,