aboutsummaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorSebastian <sebasjm@gmail.com>2024-06-06 15:49:36 -0300
committerSebastian <sebasjm@gmail.com>2024-06-07 10:38:26 -0300
commit2a37575cf6db006431a7d174b85203ae41cc629f (patch)
tree7cac742f656c3df99e5d7ba22f4d6931ed7fb719 /packages
parent1e22b5bcf77b3391f86105727e8c0b413b652fd4 (diff)
downloadwallet-core-2a37575cf6db006431a7d174b85203ae41cc629f.tar.xz
wait for previous task to be cancelled
Diffstat (limited to 'packages')
-rw-r--r--packages/taler-util/src/CancellationToken.ts2
-rw-r--r--packages/taler-wallet-core/src/common.ts3
-rw-r--r--packages/taler-wallet-core/src/exchanges.ts2
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts28
-rw-r--r--packages/taler-wallet-core/src/withdraw.ts2
5 files changed, 23 insertions, 14 deletions
diff --git a/packages/taler-util/src/CancellationToken.ts b/packages/taler-util/src/CancellationToken.ts
index 3aa576d77..5f38f0c7b 100644
--- a/packages/taler-util/src/CancellationToken.ts
+++ b/packages/taler-util/src/CancellationToken.ts
@@ -172,7 +172,7 @@ class CancellationToken {
} = CancellationToken.create();
let timer: NodeJS.Timeout | null;
- timer = setTimeout(() => originalCancel(CancellationToken.timeout), ms);
+ timer = setTimeout(() => originalCancel(`CancellationToken.timeout ${ms}`), ms);
const disposeTimer = () => {
if (timer == null) return;
clearTimeout(timer);
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts
index 00d462d6f..a9e962dda 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -41,6 +41,7 @@ import {
checkDbInvariant,
checkLogicInvariant,
durationMul,
+ j2s,
} from "@gnu-taler/taler-util";
import {
BackupProviderRecord,
@@ -798,7 +799,7 @@ export async function genericWaitForState(
flag.raise();
}
});
- const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
+ const unregisterOnCancelled = wex.cancellationToken.onCancelled((reason) => {
cancelNotif();
flag.raise();
});
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
index 05de3d8e5..626441d35 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -1363,13 +1363,13 @@ export async function updateExchangeFromUrlHandler(
);
refreshCheckNecessary = false;
}
-
if (!(updateNecessary || refreshCheckNecessary)) {
logger.trace("update not necessary, running again later");
return TaskRunResult.runAgainAt(
AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp),
);
}
+
}
// When doing the auto-refresh check, we always update
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index 050de479a..ee55e1da1 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -56,6 +56,7 @@ import {
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadOnlyTransaction,
timestampAbsoluteFromDb,
+ timestampPreciseToDb,
} from "./db.js";
import {
computeDepositTransactionStatus,
@@ -113,6 +114,8 @@ const logger = new Logger("shepherd.ts");
*/
interface ShepherdInfo {
cts: CancellationToken.Source;
+ latch?: Promise<void>;
+ stopped: boolean;
}
/**
@@ -258,27 +261,34 @@ export class TaskSchedulerImpl implements TaskScheduler {
const tasksIds = [...this.sheps.keys()];
logger.info(`reloading shepherd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {
- this.stopShepherdTask(taskId);
+ await this.stopShepherdTask(taskId);
}
for (const taskId of tasksIds) {
this.startShepherdTask(taskId);
}
}
-
private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> {
logger.trace(`Starting to shepherd task ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
- logger.trace(`Already have a shepherd for ${taskId}`);
- return;
+ if (!oldShep.stopped) {
+ logger.trace(`Already have a shepherd for ${taskId}`);
+ return;
+ }
+ logger.trace(
+ `Waiting old task to complete the loop in cancel mode ${taskId}`,
+ );
+ await oldShep.latch;
}
logger.trace(`Creating new shepherd for ${taskId}`);
const newShep: ShepherdInfo = {
cts: CancellationToken.create(),
+ stopped: false,
};
this.sheps.set(taskId, newShep);
try {
- await this.internalShepherdTask(taskId, newShep);
+ newShep.latch = this.internalShepherdTask(taskId, newShep);
+ await newShep.latch;
} finally {
logger.trace(`Done shepherding ${taskId}`);
this.sheps.delete(taskId);
@@ -291,8 +301,8 @@ export class TaskSchedulerImpl implements TaskScheduler {
const oldShep = this.sheps.get(taskId);
if (oldShep) {
logger.trace(`Cancelling old shepherd for ${taskId}`);
- oldShep.cts.cancel();
- this.sheps.delete(taskId);
+ oldShep.cts.cancel(`stopping task ${taskId}`);
+ oldShep.stopped = true;
this.iterCond.trigger();
}
}
@@ -377,7 +387,7 @@ export class TaskSchedulerImpl implements TaskScheduler {
};
}
if (info.cts.token.isCancelled) {
- logger.trace("task cancelled, not processing result");
+ logger.trace(`task ${taskId} cancelled, not processing result`);
return;
}
if (this.ws.stopped) {
@@ -390,11 +400,9 @@ export class TaskSchedulerImpl implements TaskScheduler {
});
switch (res.type) {
case TaskRunResultType.Error: {
- // if (logger.shouldLogTrace()) {
logger.trace(
`Shepherd for ${taskId} got error result: ${j2s(res.errorDetail)}`,
);
- // }
const retryRecord = await storePendingTaskError(
this.ws,
taskId,
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
index 42adf3585..dbd7e8673 100644
--- a/packages/taler-wallet-core/src/withdraw.ts
+++ b/packages/taler-wallet-core/src/withdraw.ts
@@ -3608,7 +3608,7 @@ export async function getWithdrawalDetailsForAmount(
type: ObservabilityEventType.Message,
contents: `Cancelling previous key ${clientCancelKey}`,
});
- prevCts.cancel();
+ prevCts.cancel(`getting details amount`);
} else {
wex.oc.observe({
type: ObservabilityEventType.Message,