aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/shepherd.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r--packages/taler-wallet-core/src/shepherd.ts168
1 files changed, 74 insertions, 94 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts
index 470f45aff..c52c55f50 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -53,6 +53,7 @@ import {
OPERATION_STATUS_NONFINAL_FIRST,
OPERATION_STATUS_NONFINAL_LAST,
OperationRetryRecord,
+ ReserveRecordStatus,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadOnlyTransaction,
timestampAbsoluteFromDb,
@@ -64,6 +65,7 @@ import {
} from "./deposits.js";
import {
computeDenomLossTransactionStatus,
+ processExchangeKyc,
updateExchangeFromUrlHandler,
} from "./exchanges.js";
import {
@@ -127,6 +129,7 @@ function taskGivesLiveness(taskId: string): boolean {
switch (parsedTaskId.tag) {
case PendingTaskType.Backup:
case PendingTaskType.ExchangeUpdate:
+ case PendingTaskType.ExchangeWalletKyc:
return false;
case PendingTaskType.Deposit:
case PendingTaskType.PeerPullCredit:
@@ -145,6 +148,14 @@ function taskGivesLiveness(taskId: string): boolean {
}
export interface TaskScheduler {
+ /**
+ * Ensure that the task scheduler is running.
+ *
+ * If it is not running, start it, with previous
+ * tasks loaded from the database.
+ *
+ * Returns after the scheduler is running.
+ */
ensureRunning(): Promise<void>;
startShepherdTask(taskId: TaskIdStr): void;
stopShepherdTask(taskId: TaskIdStr): void;
@@ -188,6 +199,9 @@ export class TaskSchedulerImpl implements TaskScheduler {
}
}
+ /**
+ * @see TaskScheduler.ensureRunning
+ */
async ensureRunning(): Promise<void> {
if (this.isRunning) {
return;
@@ -261,12 +275,13 @@ export class TaskSchedulerImpl implements TaskScheduler {
const tasksIds = [...this.sheps.keys()];
logger.info(`reloading shepherd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {
- await this.stopShepherdTask(taskId);
+ this.stopShepherdTask(taskId);
}
for (const taskId of tasksIds) {
this.startShepherdTask(taskId);
}
}
+
private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> {
logger.trace(`Starting to shepherd task ${taskId}`);
const oldShep = this.sheps.get(taskId);
@@ -276,9 +291,10 @@ export class TaskSchedulerImpl implements TaskScheduler {
return;
}
logger.trace(
- `Waiting old task to complete the loop in cancel mode ${taskId}`,
+ `Waiting for old task to complete the loop in cancel mode ${taskId}`,
);
await oldShep.latch;
+ logger.trace(`Old task ${taskId} completed in cancel mode`);
}
logger.trace(`Creating new shepherd for ${taskId}`);
const newShep: ShepherdInfo = {
@@ -380,10 +396,13 @@ export class TaskSchedulerImpl implements TaskScheduler {
try {
res = await callOperationHandlerForTaskId(wex, taskId);
} catch (e) {
- logger.trace(`Shepherd error ${taskId} saving response ${e}`);
+ const errorDetail = getErrorDetailFromException(e);
+ logger.trace(
+ `Shepherd error ${taskId} saving response ${j2s(errorDetail)}`,
+ );
res = {
type: TaskRunResultType.Error,
- errorDetail: getErrorDetailFromException(e),
+ errorDetail,
};
}
if (info.cts.token.isCancelled) {
@@ -464,6 +483,14 @@ export class TaskSchedulerImpl implements TaskScheduler {
}
break;
}
+ case TaskRunResultType.NetworkRequired: {
+ logger.trace(`Shepherd for ${taskId} got network-required result.`);
+ await storePendingTaskPending(this.ws, taskId);
+ const delay = Duration.getForever();
+ logger.trace(`Not retrying task until network is restored.`);
+ await this.wait(taskId, info, delay);
+ break;
+ }
default:
assertUnreachable(res);
}
@@ -598,12 +625,17 @@ function getWalletExecutionContextForTask(
},
};
- wex = getObservedWalletExecutionContext(ws, cancellationToken, oc);
+ wex = getObservedWalletExecutionContext(
+ ws,
+ cancellationToken,
+ undefined,
+ oc,
+ );
} else {
oc = {
observe(evt) {},
};
- wex = getNormalWalletExecutionContext(ws, cancellationToken, oc);
+ wex = getNormalWalletExecutionContext(ws, cancellationToken, undefined, oc);
}
return wex;
}
@@ -613,6 +645,15 @@ async function callOperationHandlerForTaskId(
taskId: TaskIdStr,
): Promise<TaskRunResult> {
const pending = parseTaskIdentifier(taskId);
+
+ const txId = convertTaskToTransactionId(taskId);
+ if (txId) {
+ wex.oc.observe({
+ type: ObservabilityEventType.DeclareConcernsTransaction,
+ transactionId: txId,
+ });
+ }
+
switch (pending.tag) {
case PendingTaskType.ExchangeUpdate:
return await updateExchangeFromUrlHandler(wex, pending.exchangeBaseUrl);
@@ -636,6 +677,8 @@ async function callOperationHandlerForTaskId(
return await processPeerPullDebit(wex, pending.peerPullDebitId);
case PendingTaskType.PeerPushCredit:
return await processPeerPushCredit(wex, pending.peerPushCreditId);
+ case PendingTaskType.ExchangeWalletKyc:
+ return await processExchangeKyc(wex, pending.exchangeBaseUrl);
case PendingTaskType.RewardPickup:
throw Error("not supported anymore");
default:
@@ -662,6 +705,7 @@ async function taskToRetryNotification(
switch (parsedTaskId.tag) {
case PendingTaskType.ExchangeUpdate:
+ case PendingTaskType.ExchangeWalletKyc:
return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
case PendingTaskType.PeerPullCredit:
case PendingTaskType.PeerPullDebit:
@@ -818,8 +862,12 @@ async function makeExchangeRetryNotification(
): Promise<WalletNotification | undefined> {
logger.info("making exchange retry notification");
const parsedTaskId = parseTaskIdentifier(pendingTaskId);
- if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
- throw Error("invalid task identifier");
+ switch (parsedTaskId.tag) {
+ case PendingTaskType.ExchangeUpdate:
+ case PendingTaskType.ExchangeWalletKyc:
+ break;
+ default:
+ throw Error("invalid task identifier");
}
const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
@@ -843,91 +891,6 @@ async function makeExchangeRetryNotification(
return notif;
}
-export function listTaskForTransactionId(transactionId: string): TaskIdStr[] {
- const tid = parseTransactionIdentifier(transactionId);
- if (!tid) {
- throw Error("invalid task ID");
- }
- switch (tid.tag) {
- case TransactionType.Deposit:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.Deposit,
- depositGroupId: tid.depositGroupId,
- }),
- ];
- case TransactionType.InternalWithdrawal:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.Withdraw,
- withdrawalGroupId: tid.withdrawalGroupId,
- }),
- ];
- case TransactionType.Payment:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.Purchase,
- proposalId: tid.proposalId,
- }),
- ];
- case TransactionType.PeerPullCredit:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.PeerPullCredit,
- pursePub: tid.pursePub,
- }),
- ];
- case TransactionType.PeerPullDebit:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.PeerPullDebit,
- peerPullDebitId: tid.peerPullDebitId,
- }),
- ];
- case TransactionType.PeerPushCredit:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.PeerPullCredit,
- pursePub: tid.peerPushCreditId,
- }),
- ];
- case TransactionType.PeerPushDebit:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.PeerPullCredit,
- pursePub: tid.pursePub,
- }),
- ];
- case TransactionType.Recoup:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.Recoup,
- recoupGroupId: tid.recoupGroupId,
- }),
- ];
- case TransactionType.Refresh:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.Refresh,
- refreshGroupId: tid.refreshGroupId,
- }),
- ];
- case TransactionType.Refund:
- return [];
- case TransactionType.Withdrawal:
- return [
- constructTaskIdentifier({
- tag: PendingTaskType.Withdraw,
- withdrawalGroupId: tid.withdrawalGroupId,
- }),
- ];
- case TransactionType.DenomLoss:
- return [];
- default:
- assertUnreachable(tid);
- }
-}
-
/**
* Convert the task ID for a task that processes a transaction int
* the ID for the transaction.
@@ -1008,6 +971,7 @@ export async function getActiveTaskIds(
"peerPushDebit",
"peerPullDebit",
"peerPushCredit",
+ "reserves",
],
},
async (tx) => {
@@ -1141,7 +1105,7 @@ export async function getActiveTaskIds(
}
}
- // exchange update
+ // exchange update and KYC
{
const exchanges = await tx.exchanges.getAll();
@@ -1151,6 +1115,22 @@ export async function getActiveTaskIds(
exchangeBaseUrl: rec.baseUrl,
});
res.taskIds.push(taskIdUpdate);
+
+ const reserveId = rec.currentMergeReserveRowId;
+ if (reserveId == null) {
+ continue;
+ }
+ const reserveRec = await tx.reserves.get(reserveId);
+ if (
+ reserveRec?.status != null &&
+ reserveRec.status != ReserveRecordStatus.Done
+ ) {
+ const taskIdKyc = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeWalletKyc,
+ exchangeBaseUrl: rec.baseUrl,
+ });
+ res.taskIds.push(taskIdKyc);
+ }
}
}