diff options
Diffstat (limited to 'packages/taler-wallet-core/src/shepherd.ts')
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 168 |
1 files changed, 74 insertions, 94 deletions
diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index 470f45aff..c52c55f50 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -53,6 +53,7 @@ import { OPERATION_STATUS_NONFINAL_FIRST, OPERATION_STATUS_NONFINAL_LAST, OperationRetryRecord, + ReserveRecordStatus, WalletDbAllStoresReadOnlyTransaction, WalletDbReadOnlyTransaction, timestampAbsoluteFromDb, @@ -64,6 +65,7 @@ import { } from "./deposits.js"; import { computeDenomLossTransactionStatus, + processExchangeKyc, updateExchangeFromUrlHandler, } from "./exchanges.js"; import { @@ -127,6 +129,7 @@ function taskGivesLiveness(taskId: string): boolean { switch (parsedTaskId.tag) { case PendingTaskType.Backup: case PendingTaskType.ExchangeUpdate: + case PendingTaskType.ExchangeWalletKyc: return false; case PendingTaskType.Deposit: case PendingTaskType.PeerPullCredit: @@ -145,6 +148,14 @@ function taskGivesLiveness(taskId: string): boolean { } export interface TaskScheduler { + /** + * Ensure that the task scheduler is running. + * + * If it is not running, start it, with previous + * tasks loaded from the database. + * + * Returns after the scheduler is running. + */ ensureRunning(): Promise<void>; startShepherdTask(taskId: TaskIdStr): void; stopShepherdTask(taskId: TaskIdStr): void; @@ -188,6 +199,9 @@ export class TaskSchedulerImpl implements TaskScheduler { } } + /** + * @see TaskScheduler.ensureRunning + */ async ensureRunning(): Promise<void> { if (this.isRunning) { return; @@ -261,12 +275,13 @@ export class TaskSchedulerImpl implements TaskScheduler { const tasksIds = [...this.sheps.keys()]; logger.info(`reloading shepherd with ${tasksIds.length} tasks`); for (const taskId of tasksIds) { - await this.stopShepherdTask(taskId); + this.stopShepherdTask(taskId); } for (const taskId of tasksIds) { this.startShepherdTask(taskId); } } + private async internalStartShepherdTask(taskId: TaskIdStr): Promise<void> { logger.trace(`Starting to shepherd task ${taskId}`); const oldShep = this.sheps.get(taskId); @@ -276,9 +291,10 @@ export class TaskSchedulerImpl implements TaskScheduler { return; } logger.trace( - `Waiting old task to complete the loop in cancel mode ${taskId}`, + `Waiting for old task to complete the loop in cancel mode ${taskId}`, ); await oldShep.latch; + logger.trace(`Old task ${taskId} completed in cancel mode`); } logger.trace(`Creating new shepherd for ${taskId}`); const newShep: ShepherdInfo = { @@ -380,10 +396,13 @@ export class TaskSchedulerImpl implements TaskScheduler { try { res = await callOperationHandlerForTaskId(wex, taskId); } catch (e) { - logger.trace(`Shepherd error ${taskId} saving response ${e}`); + const errorDetail = getErrorDetailFromException(e); + logger.trace( + `Shepherd error ${taskId} saving response ${j2s(errorDetail)}`, + ); res = { type: TaskRunResultType.Error, - errorDetail: getErrorDetailFromException(e), + errorDetail, }; } if (info.cts.token.isCancelled) { @@ -464,6 +483,14 @@ export class TaskSchedulerImpl implements TaskScheduler { } break; } + case TaskRunResultType.NetworkRequired: { + logger.trace(`Shepherd for ${taskId} got network-required result.`); + await storePendingTaskPending(this.ws, taskId); + const delay = Duration.getForever(); + logger.trace(`Not retrying task until network is restored.`); + await this.wait(taskId, info, delay); + break; + } default: assertUnreachable(res); } @@ -598,12 +625,17 @@ function getWalletExecutionContextForTask( }, }; - wex = getObservedWalletExecutionContext(ws, cancellationToken, oc); + wex = getObservedWalletExecutionContext( + ws, + cancellationToken, + undefined, + oc, + ); } else { oc = { observe(evt) {}, }; - wex = getNormalWalletExecutionContext(ws, cancellationToken, oc); + wex = getNormalWalletExecutionContext(ws, cancellationToken, undefined, oc); } return wex; } @@ -613,6 +645,15 @@ async function callOperationHandlerForTaskId( taskId: TaskIdStr, ): Promise<TaskRunResult> { const pending = parseTaskIdentifier(taskId); + + const txId = convertTaskToTransactionId(taskId); + if (txId) { + wex.oc.observe({ + type: ObservabilityEventType.DeclareConcernsTransaction, + transactionId: txId, + }); + } + switch (pending.tag) { case PendingTaskType.ExchangeUpdate: return await updateExchangeFromUrlHandler(wex, pending.exchangeBaseUrl); @@ -636,6 +677,8 @@ async function callOperationHandlerForTaskId( return await processPeerPullDebit(wex, pending.peerPullDebitId); case PendingTaskType.PeerPushCredit: return await processPeerPushCredit(wex, pending.peerPushCreditId); + case PendingTaskType.ExchangeWalletKyc: + return await processExchangeKyc(wex, pending.exchangeBaseUrl); case PendingTaskType.RewardPickup: throw Error("not supported anymore"); default: @@ -662,6 +705,7 @@ async function taskToRetryNotification( switch (parsedTaskId.tag) { case PendingTaskType.ExchangeUpdate: + case PendingTaskType.ExchangeWalletKyc: return makeExchangeRetryNotification(ws, tx, pendingTaskId, e); case PendingTaskType.PeerPullCredit: case PendingTaskType.PeerPullDebit: @@ -818,8 +862,12 @@ async function makeExchangeRetryNotification( ): Promise<WalletNotification | undefined> { logger.info("making exchange retry notification"); const parsedTaskId = parseTaskIdentifier(pendingTaskId); - if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) { - throw Error("invalid task identifier"); + switch (parsedTaskId.tag) { + case PendingTaskType.ExchangeUpdate: + case PendingTaskType.ExchangeWalletKyc: + break; + default: + throw Error("invalid task identifier"); } const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl); @@ -843,91 +891,6 @@ async function makeExchangeRetryNotification( return notif; } -export function listTaskForTransactionId(transactionId: string): TaskIdStr[] { - const tid = parseTransactionIdentifier(transactionId); - if (!tid) { - throw Error("invalid task ID"); - } - switch (tid.tag) { - case TransactionType.Deposit: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.Deposit, - depositGroupId: tid.depositGroupId, - }), - ]; - case TransactionType.InternalWithdrawal: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.Withdraw, - withdrawalGroupId: tid.withdrawalGroupId, - }), - ]; - case TransactionType.Payment: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.Purchase, - proposalId: tid.proposalId, - }), - ]; - case TransactionType.PeerPullCredit: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.PeerPullCredit, - pursePub: tid.pursePub, - }), - ]; - case TransactionType.PeerPullDebit: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.PeerPullDebit, - peerPullDebitId: tid.peerPullDebitId, - }), - ]; - case TransactionType.PeerPushCredit: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.PeerPullCredit, - pursePub: tid.peerPushCreditId, - }), - ]; - case TransactionType.PeerPushDebit: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.PeerPullCredit, - pursePub: tid.pursePub, - }), - ]; - case TransactionType.Recoup: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.Recoup, - recoupGroupId: tid.recoupGroupId, - }), - ]; - case TransactionType.Refresh: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.Refresh, - refreshGroupId: tid.refreshGroupId, - }), - ]; - case TransactionType.Refund: - return []; - case TransactionType.Withdrawal: - return [ - constructTaskIdentifier({ - tag: PendingTaskType.Withdraw, - withdrawalGroupId: tid.withdrawalGroupId, - }), - ]; - case TransactionType.DenomLoss: - return []; - default: - assertUnreachable(tid); - } -} - /** * Convert the task ID for a task that processes a transaction int * the ID for the transaction. @@ -1008,6 +971,7 @@ export async function getActiveTaskIds( "peerPushDebit", "peerPullDebit", "peerPushCredit", + "reserves", ], }, async (tx) => { @@ -1141,7 +1105,7 @@ export async function getActiveTaskIds( } } - // exchange update + // exchange update and KYC { const exchanges = await tx.exchanges.getAll(); @@ -1151,6 +1115,22 @@ export async function getActiveTaskIds( exchangeBaseUrl: rec.baseUrl, }); res.taskIds.push(taskIdUpdate); + + const reserveId = rec.currentMergeReserveRowId; + if (reserveId == null) { + continue; + } + const reserveRec = await tx.reserves.get(reserveId); + if ( + reserveRec?.status != null && + reserveRec.status != ReserveRecordStatus.Done + ) { + const taskIdKyc = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeWalletKyc, + exchangeBaseUrl: rec.baseUrl, + }); + res.taskIds.push(taskIdKyc); + } } } |