/* This file is part of GNU Taler (C) 2024 Taler Systems SA GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ /** * Imports. */ import { GlobalIDB } from "@gnu-taler/idb-bridge"; import { AbsoluteTime, AsyncCondition, CancellationToken, Duration, Logger, NotificationType, ObservabilityContext, ObservabilityEventType, TalerErrorDetail, TaskThrottler, TransactionIdStr, TransactionState, TransactionType, WalletNotification, assertUnreachable, getErrorDetailFromException, j2s, safeStringifyException, } from "@gnu-taler/taler-util"; import { processBackupForProvider } from "./backup/index.js"; import { DbRetryInfo, PendingTaskType, TaskIdStr, TaskRunResult, TaskRunResultType, constructTaskIdentifier, getExchangeState, parseTaskIdentifier, } from "./common.js"; import { OPERATION_STATUS_NONFINAL_FIRST, OPERATION_STATUS_NONFINAL_LAST, OperationRetryRecord, ReserveRecordStatus, WalletDbAllStoresReadOnlyTransaction, WalletDbReadOnlyTransaction, timestampAbsoluteFromDb, timestampPreciseToDb, } from "./db.js"; import { computeDepositTransactionStatus, processDepositGroup, } from "./deposits.js"; import { computeDenomLossTransactionStatus, processExchangeKyc, updateExchangeFromUrlHandler, } from "./exchanges.js"; import { computePayMerchantTransactionState, computeRefundTransactionState, processPurchase, } from "./pay-merchant.js"; import { computePeerPullCreditTransactionState, processPeerPullCredit, } from "./pay-peer-pull-credit.js"; import { computePeerPullDebitTransactionState, processPeerPullDebit, } from "./pay-peer-pull-debit.js"; import { computePeerPushCreditTransactionState, processPeerPushCredit, } from "./pay-peer-push-credit.js"; import { computePeerPushDebitTransactionState, processPeerPushDebit, } from "./pay-peer-push-debit.js"; import { processRecoupGroup } from "./recoup.js"; import { computeRefreshTransactionState, processRefreshGroup, } from "./refresh.js"; import { constructTransactionIdentifier, parseTransactionIdentifier, } from "./transactions.js"; import { InternalWalletState, WalletExecutionContext, getNormalWalletExecutionContext, getObservedWalletExecutionContext, } from "./wallet.js"; import { computeWithdrawalTransactionStatus, processWithdrawalGroup, } from "./withdraw.js"; const logger = new Logger("shepherd.ts"); /** * Info about one task being shepherded. */ interface ShepherdInfo { cts: CancellationToken.Source; latch?: Promise; stopped: boolean; } /** * Check if a task is alive, i.e. whether it prevents * the main task loop from exiting. */ function taskGivesLiveness(taskId: string): boolean { const parsedTaskId = parseTaskIdentifier(taskId); switch (parsedTaskId.tag) { case PendingTaskType.Backup: case PendingTaskType.ExchangeUpdate: case PendingTaskType.ExchangeWalletKyc: return false; case PendingTaskType.Deposit: case PendingTaskType.PeerPullCredit: case PendingTaskType.PeerPullDebit: case PendingTaskType.PeerPushCredit: case PendingTaskType.Refresh: case PendingTaskType.Recoup: case PendingTaskType.RewardPickup: case PendingTaskType.Withdraw: case PendingTaskType.PeerPushDebit: case PendingTaskType.Purchase: return true; default: assertUnreachable(parsedTaskId); } } export interface TaskScheduler { /** * Ensure that the task scheduler is running. * * If it is not running, start it, with previous * tasks loaded from the database. * * Returns after the scheduler is running. */ ensureRunning(): Promise; startShepherdTask(taskId: TaskIdStr): void; stopShepherdTask(taskId: TaskIdStr): void; resetTaskRetries(taskId: TaskIdStr): Promise; reload(): Promise; getActiveTasks(): TaskIdStr[]; isIdle(): boolean; shutdown(): Promise; } export class TaskSchedulerImpl implements TaskScheduler { private sheps: Map = new Map(); private iterCond = new AsyncCondition(); private throttler = new TaskThrottler(); isRunning: boolean = false; constructor(private ws: InternalWalletState) {} private async loadTasksFromDb(): Promise { const activeTasks = await getActiveTaskIds(this.ws); logger.info(`active tasks from DB: ${j2s(activeTasks)}`); for (const tid of activeTasks.taskIds) { this.startShepherdTask(tid); } } getActiveTasks(): TaskIdStr[] { return [...this.sheps.keys()]; } async shutdown(): Promise { const tasksIds = [...this.sheps.keys()]; logger.info(`Stopping task shepherd.`); for (const taskId of tasksIds) { this.stopShepherdTask(taskId); } } /** * @see TaskScheduler.ensureRunning */ async ensureRunning(): Promise { if (this.isRunning) { return; } this.isRunning = true; try { await this.loadTasksFromDb(); } catch (e) { this.isRunning = false; throw e; } this.run() .catch((e) => { logger.error("error running task loop"); logger.error(`err: ${e}`); }) .then(() => { logger.trace("done running task loop"); this.isRunning = false; }); } isIdle(): boolean { let alive = false; const taskIds = [...this.sheps.keys()]; for (const taskId of taskIds) { if (taskGivesLiveness(taskId)) { alive = true; break; } } // We're idle if no task is alive anymore. return !alive; } private async run(): Promise { logger.trace("Running task loop."); logger.trace(`sheps: ${this.sheps.size}`); while (true) { if (this.ws.stopped) { logger.trace("Breaking out of task loop (wallet stopped)."); break; } if (this.isIdle()) { this.ws.notify({ type: NotificationType.Idle, }); } await this.iterCond.wait(); } logger.trace("Done with task loop."); } startShepherdTask(taskId: TaskIdStr): void { this.ensureRunning().catch((e) => { logger.error(`error running scheduler: ${safeStringifyException(e)}`); }); // Run in the background, no await! this.internalStartShepherdTask(taskId); } /** * Stop and re-load all existing tasks. * * Mostly useful to interrupt all waits when time-travelling. */ async reload(): Promise { await this.ensureRunning(); const tasksIds = [...this.sheps.keys()]; logger.info(`reloading shepherd with ${tasksIds.length} tasks`); for (const taskId of tasksIds) { this.stopShepherdTask(taskId); } for (const taskId of tasksIds) { this.startShepherdTask(taskId); } } private async internalStartShepherdTask(taskId: TaskIdStr): Promise { logger.trace(`Starting to shepherd task ${taskId}`); const oldShep = this.sheps.get(taskId); if (oldShep) { if (!oldShep.stopped) { logger.trace(`Already have a shepherd for ${taskId}`); return; } logger.trace( `Waiting for old task to complete the loop in cancel mode ${taskId}`, ); await oldShep.latch; logger.trace(`Old task ${taskId} completed in cancel mode`); } logger.trace(`Creating new shepherd for ${taskId}`); const newShep: ShepherdInfo = { cts: CancellationToken.create(), stopped: false, }; this.sheps.set(taskId, newShep); try { newShep.latch = this.internalShepherdTask(taskId, newShep); await newShep.latch; } finally { logger.trace(`Done shepherding ${taskId}`); this.sheps.delete(taskId); this.iterCond.trigger(); } } stopShepherdTask(taskId: TaskIdStr): void { logger.trace(`Stopping shepherding of ${taskId}`); const oldShep = this.sheps.get(taskId); if (oldShep) { logger.trace(`Cancelling old shepherd for ${taskId}`); oldShep.cts.cancel(`stopping task ${taskId}`); oldShep.stopped = true; this.iterCond.trigger(); } } restartShepherdTask(taskId: TaskIdStr): void { this.stopShepherdTask(taskId); this.startShepherdTask(taskId); } async resetTaskRetries(taskId: TaskIdStr): Promise { const maybeNotification = await this.ws.db.runAllStoresReadWriteTx( {}, async (tx) => { logger.trace(`storing task [reset] for ${taskId}`); await tx.operationRetries.delete(taskId); return taskToRetryNotification(this.ws, tx, taskId, undefined); }, ); this.stopShepherdTask(taskId); if (maybeNotification) { this.ws.notify(maybeNotification); } this.startShepherdTask(taskId); } private async wait( taskId: TaskIdStr, info: ShepherdInfo, delay: Duration, ): Promise { try { await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay)); } catch (e) { if (e instanceof CancellationToken.CancellationError) { logger.info( `waiting for ${taskId} interrupted: ${e.message} ${j2s(e.reason)}`, ); } else { logger.info(`waiting for ${taskId} interrupted: ${e}`); } } } private async internalShepherdTask( taskId: TaskIdStr, info: ShepherdInfo, ): Promise { while (true) { if (this.ws.stopped) { logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`); return; } if (info.cts.token.isCancelled) { logger.trace(`Shepherd for ${taskId} got cancelled`); return; } const isThrottled = this.throttler.applyThrottle(taskId); if (isThrottled) { logger.warn( `task ${taskId} throttled, this is very likely a bug in wallet-core, please report`, ); logger.warn("waiting for 60 seconds"); await this.ws.timerGroup.resolveAfter( Duration.fromSpec({ seconds: 60 }), ); } const wex = getWalletExecutionContextForTask( this.ws, taskId, info.cts.token, ); const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); let res: TaskRunResult; try { res = await callOperationHandlerForTaskId(wex, taskId); } catch (e) { const errorDetail = getErrorDetailFromException(e); logger.trace( `Shepherd error ${taskId} saving response ${j2s(errorDetail)}`, ); res = { type: TaskRunResultType.Error, errorDetail, }; } if (info.cts.token.isCancelled) { logger.trace(`task ${taskId} cancelled, not processing result`); return; } if (this.ws.stopped) { logger.trace("wallet stopped, not processing result"); return; } wex.oc.observe({ type: ObservabilityEventType.ShepherdTaskResult, resultType: res.type, }); switch (res.type) { case TaskRunResultType.Error: { logger.trace( `Shepherd for ${taskId} got error result: ${j2s(res.errorDetail)}`, ); const retryRecord = await storePendingTaskError( this.ws, taskId, res.errorDetail, ); const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); const delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; } case TaskRunResultType.Backoff: { logger.trace(`Shepherd for ${taskId} got backoff result.`); const retryRecord = await storePendingTaskPending(this.ws, taskId); const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); const delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; } case TaskRunResultType.Progress: { logger.trace( `Shepherd for ${taskId} got progress result, re-running immediately.`, ); await storeTaskProgress(this.ws, taskId); break; } case TaskRunResultType.ScheduleLater: { logger.trace(`Shepherd for ${taskId} got schedule-later result.`); const retryRecord = await storePendingTaskPending( this.ws, taskId, res.runAt, ); const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); const delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; } case TaskRunResultType.Finished: logger.trace(`Shepherd for ${taskId} got finished result.`); await storePendingTaskFinished(this.ws, taskId); return; case TaskRunResultType.LongpollReturnedPending: { await storeTaskProgress(this.ws, taskId); // Make sure that we are waiting a bit if long-polling returned too early. const endTime = AbsoluteTime.now(); const taskDuration = AbsoluteTime.difference(endTime, startTime); if ( Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0 ) { logger.info( `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`, ); await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 })); } else { logger.info(`task ${taskId} will long-poll again`); } break; } case TaskRunResultType.NetworkRequired: { logger.trace(`Shepherd for ${taskId} got network-required result.`); await storePendingTaskPending(this.ws, taskId); const delay = Duration.getForever(); logger.trace(`Not retrying task until network is restored.`); await this.wait(taskId, info, delay); break; } default: assertUnreachable(res); } } } } async function storePendingTaskError( ws: InternalWalletState, pendingTaskId: string, e: TalerErrorDetail, ): Promise { logger.trace(`storing task [pending] with ERROR for ${pendingTaskId}`); const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); if (!retryRecord) { retryRecord = { id: pendingTaskId, lastError: e, retryInfo: DbRetryInfo.reset(), }; } else { retryRecord.lastError = e; retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); } await tx.operationRetries.put(retryRecord); return { notification: await taskToRetryNotification(ws, tx, pendingTaskId, e), retryRecord, }; }); if (res?.notification) { ws.notify(res.notification); } return res.retryRecord; } /** * Task made progress, clear error. */ async function storeTaskProgress( ws: InternalWalletState, pendingTaskId: string, ): Promise { logger.trace(`storing task [progress] for ${pendingTaskId}`); await ws.db.runReadWriteTx( { storeNames: ["operationRetries"] }, async (tx) => { await tx.operationRetries.delete(pendingTaskId); }, ); } async function storePendingTaskPending( ws: InternalWalletState, pendingTaskId: string, schedTime?: AbsoluteTime, ): Promise { logger.trace(`storing task [pending] for ${pendingTaskId}`); const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); let hadError = false; if (!retryRecord) { retryRecord = { id: pendingTaskId, retryInfo: DbRetryInfo.reset(), }; } else { if (retryRecord.lastError) { hadError = true; } delete retryRecord.lastError; retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); } if (schedTime) { retryRecord.retryInfo.nextRetry = timestampPreciseToDb( AbsoluteTime.toPreciseTimestamp(schedTime), ); } await tx.operationRetries.put(retryRecord); let notification: WalletNotification | undefined = undefined; if (hadError) { notification = await taskToRetryNotification( ws, tx, pendingTaskId, undefined, ); } return { notification, retryRecord, }; }); if (res.notification) { ws.notify(res.notification); } return res.retryRecord; } async function storePendingTaskFinished( ws: InternalWalletState, pendingTaskId: string, ): Promise { logger.trace(`storing task [finished] for ${pendingTaskId}`); await ws.db.runReadWriteTx( { storeNames: ["operationRetries"] }, async (tx) => { await tx.operationRetries.delete(pendingTaskId); }, ); } function getWalletExecutionContextForTask( ws: InternalWalletState, taskId: TaskIdStr, cancellationToken: CancellationToken, ): WalletExecutionContext { let oc: ObservabilityContext; let wex: WalletExecutionContext; if (ws.config.testing.emitObservabilityEvents) { oc = { observe(evt) { if (ws.config.testing.emitObservabilityEvents) { ws.notify({ type: NotificationType.TaskObservabilityEvent, taskId, event: evt, }); } }, }; wex = getObservedWalletExecutionContext( ws, cancellationToken, undefined, oc, ); } else { oc = { observe(evt) {}, }; wex = getNormalWalletExecutionContext(ws, cancellationToken, undefined, oc); } return wex; } async function callOperationHandlerForTaskId( wex: WalletExecutionContext, taskId: TaskIdStr, ): Promise { const pending = parseTaskIdentifier(taskId); const txId = convertTaskToTransactionId(taskId); if (txId) { wex.oc.observe({ type: ObservabilityEventType.DeclareConcernsTransaction, transactionId: txId, }); } switch (pending.tag) { case PendingTaskType.ExchangeUpdate: return await updateExchangeFromUrlHandler(wex, pending.exchangeBaseUrl); case PendingTaskType.Refresh: return await processRefreshGroup(wex, pending.refreshGroupId); case PendingTaskType.Withdraw: return await processWithdrawalGroup(wex, pending.withdrawalGroupId); case PendingTaskType.Purchase: return await processPurchase(wex, pending.proposalId); case PendingTaskType.Recoup: return await processRecoupGroup(wex, pending.recoupGroupId); case PendingTaskType.Deposit: return await processDepositGroup(wex, pending.depositGroupId); case PendingTaskType.Backup: return await processBackupForProvider(wex, pending.backupProviderBaseUrl); case PendingTaskType.PeerPushDebit: return await processPeerPushDebit(wex, pending.pursePub); case PendingTaskType.PeerPullCredit: return await processPeerPullCredit(wex, pending.pursePub); case PendingTaskType.PeerPullDebit: return await processPeerPullDebit(wex, pending.peerPullDebitId); case PendingTaskType.PeerPushCredit: return await processPeerPushCredit(wex, pending.peerPushCreditId); case PendingTaskType.ExchangeWalletKyc: return await processExchangeKyc(wex, pending.exchangeBaseUrl); case PendingTaskType.RewardPickup: throw Error("not supported anymore"); default: return assertUnreachable(pending); } throw Error(`not reached ${pending.tag}`); } /** * Generate an appropriate error transition notification * for applicable tasks. * * Namely, transition notifications are generated for: * - exchange update errors * - transactions */ async function taskToRetryNotification( ws: InternalWalletState, tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { const parsedTaskId = parseTaskIdentifier(pendingTaskId); switch (parsedTaskId.tag) { case PendingTaskType.ExchangeUpdate: case PendingTaskType.ExchangeWalletKyc: return makeExchangeRetryNotification(ws, tx, pendingTaskId, e); case PendingTaskType.PeerPullCredit: case PendingTaskType.PeerPullDebit: case PendingTaskType.Withdraw: case PendingTaskType.PeerPushCredit: case PendingTaskType.Deposit: case PendingTaskType.Refresh: case PendingTaskType.RewardPickup: case PendingTaskType.PeerPushDebit: case PendingTaskType.Purchase: return makeTransactionRetryNotification(ws, tx, pendingTaskId, e); case PendingTaskType.Backup: case PendingTaskType.Recoup: return undefined; } } async function getTransactionState( ws: InternalWalletState, tx: WalletDbReadOnlyTransaction< [ "depositGroups", "withdrawalGroups", "purchases", "refundGroups", "peerPullCredit", "peerPullDebit", "peerPushDebit", "peerPushCredit", "rewards", "refreshGroups", "denomLossEvents", ] >, transactionId: string, ): Promise { const parsedTxId = parseTransactionIdentifier(transactionId); if (!parsedTxId) { throw Error("invalid tx identifier"); } switch (parsedTxId.tag) { case TransactionType.Deposit: { const rec = await tx.depositGroups.get(parsedTxId.depositGroupId); if (!rec) { return undefined; } return computeDepositTransactionStatus(rec); } case TransactionType.InternalWithdrawal: case TransactionType.Withdrawal: { const rec = await tx.withdrawalGroups.get(parsedTxId.withdrawalGroupId); if (!rec) { return undefined; } return computeWithdrawalTransactionStatus(rec); } case TransactionType.Payment: { const rec = await tx.purchases.get(parsedTxId.proposalId); if (!rec) { return; } return computePayMerchantTransactionState(rec); } case TransactionType.Refund: { const rec = await tx.refundGroups.get(parsedTxId.refundGroupId); if (!rec) { return undefined; } return computeRefundTransactionState(rec); } case TransactionType.PeerPullCredit: { const rec = await tx.peerPullCredit.get(parsedTxId.pursePub); if (!rec) { return undefined; } return computePeerPullCreditTransactionState(rec); } case TransactionType.PeerPullDebit: { const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId); if (!rec) { return undefined; } return computePeerPullDebitTransactionState(rec); } case TransactionType.PeerPushCredit: { const rec = await tx.peerPushCredit.get(parsedTxId.peerPushCreditId); if (!rec) { return undefined; } return computePeerPushCreditTransactionState(rec); } case TransactionType.PeerPushDebit: { const rec = await tx.peerPushDebit.get(parsedTxId.pursePub); if (!rec) { return undefined; } return computePeerPushDebitTransactionState(rec); } case TransactionType.Refresh: { const rec = await tx.refreshGroups.get(parsedTxId.refreshGroupId); if (!rec) { return undefined; } return computeRefreshTransactionState(rec); } case TransactionType.Recoup: throw Error("not yet supported"); case TransactionType.DenomLoss: { const rec = await tx.denomLossEvents.get(parsedTxId.denomLossEventId); if (!rec) { return undefined; } return computeDenomLossTransactionStatus(rec); } default: assertUnreachable(parsedTxId); } } async function makeTransactionRetryNotification( ws: InternalWalletState, tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { const txId = convertTaskToTransactionId(pendingTaskId); if (!txId) { return undefined; } const txState = await getTransactionState(ws, tx, txId); if (!txState) { return undefined; } const notif: WalletNotification = { type: NotificationType.TransactionStateTransition, transactionId: txId, oldTxState: txState, newTxState: txState, }; if (e) { notif.errorInfo = { code: e.code as number, hint: e.hint, }; } return notif; } async function makeExchangeRetryNotification( ws: InternalWalletState, tx: WalletDbAllStoresReadOnlyTransaction, pendingTaskId: string, e: TalerErrorDetail | undefined, ): Promise { logger.info("making exchange retry notification"); const parsedTaskId = parseTaskIdentifier(pendingTaskId); switch (parsedTaskId.tag) { case PendingTaskType.ExchangeUpdate: case PendingTaskType.ExchangeWalletKyc: break; default: throw Error("invalid task identifier"); } const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl); if (!rec) { logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`); return undefined; } const notif: WalletNotification = { type: NotificationType.ExchangeStateTransition, exchangeBaseUrl: parsedTaskId.exchangeBaseUrl, oldExchangeState: getExchangeState(rec), newExchangeState: getExchangeState(rec), }; if (e) { notif.errorInfo = { code: e.code as number, hint: e.hint, }; } return notif; } /** * Convert the task ID for a task that processes a transaction int * the ID for the transaction. */ export function convertTaskToTransactionId( taskId: string, ): TransactionIdStr | undefined { const parsedTaskId = parseTaskIdentifier(taskId); switch (parsedTaskId.tag) { case PendingTaskType.PeerPullCredit: return constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, pursePub: parsedTaskId.pursePub, }); case PendingTaskType.PeerPullDebit: return constructTransactionIdentifier({ tag: TransactionType.PeerPullDebit, peerPullDebitId: parsedTaskId.peerPullDebitId, }); // FIXME: This doesn't distinguish internal-withdrawal. // Maybe we should have a different task type for that as well? // Or maybe transaction IDs should be valid task identifiers? case PendingTaskType.Withdraw: return constructTransactionIdentifier({ tag: TransactionType.Withdrawal, withdrawalGroupId: parsedTaskId.withdrawalGroupId, }); case PendingTaskType.PeerPushCredit: return constructTransactionIdentifier({ tag: TransactionType.PeerPushCredit, peerPushCreditId: parsedTaskId.peerPushCreditId, }); case PendingTaskType.Deposit: return constructTransactionIdentifier({ tag: TransactionType.Deposit, depositGroupId: parsedTaskId.depositGroupId, }); case PendingTaskType.Refresh: return constructTransactionIdentifier({ tag: TransactionType.Refresh, refreshGroupId: parsedTaskId.refreshGroupId, }); case PendingTaskType.PeerPushDebit: return constructTransactionIdentifier({ tag: TransactionType.PeerPushDebit, pursePub: parsedTaskId.pursePub, }); case PendingTaskType.Purchase: return constructTransactionIdentifier({ tag: TransactionType.Payment, proposalId: parsedTaskId.proposalId, }); default: return undefined; } } export interface ActiveTaskIdsResult { taskIds: TaskIdStr[]; } export async function getActiveTaskIds( ws: InternalWalletState, ): Promise { const res: ActiveTaskIdsResult = { taskIds: [], }; await ws.db.runReadWriteTx( { storeNames: [ "exchanges", "refreshGroups", "withdrawalGroups", "purchases", "depositGroups", "recoupGroups", "peerPullCredit", "peerPushDebit", "peerPullDebit", "peerPushCredit", "reserves", ], }, async (tx) => { const active = GlobalIDB.KeyRange.bound( OPERATION_STATUS_NONFINAL_FIRST, OPERATION_STATUS_NONFINAL_LAST, ); // Withdrawals { const activeRecs = await tx.withdrawalGroups.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Withdraw, withdrawalGroupId: rec.withdrawalGroupId, }); res.taskIds.push(taskId); } } // Deposits { const activeRecs = await tx.depositGroups.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Deposit, depositGroupId: rec.depositGroupId, }); res.taskIds.push(taskId); } } // Refreshes { const activeRecs = await tx.refreshGroups.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Refresh, refreshGroupId: rec.refreshGroupId, }); res.taskIds.push(taskId); } } // Purchases { const activeRecs = await tx.purchases.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Purchase, proposalId: rec.proposalId, }); res.taskIds.push(taskId); } } // peer-push-debit { const activeRecs = await tx.peerPushDebit.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushDebit, pursePub: rec.pursePub, }); res.taskIds.push(taskId); } } // peer-push-credit { const activeRecs = await tx.peerPushCredit.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPushCredit, peerPushCreditId: rec.peerPushCreditId, }); res.taskIds.push(taskId); } } // peer-pull-debit { const activeRecs = await tx.peerPullDebit.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPullDebit, peerPullDebitId: rec.peerPullDebitId, }); res.taskIds.push(taskId); } } // peer-pull-credit { const activeRecs = await tx.peerPullCredit.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.PeerPullCredit, pursePub: rec.pursePub, }); res.taskIds.push(taskId); } } // recoup { const activeRecs = await tx.recoupGroups.indexes.byStatus.getAll(active); for (const rec of activeRecs) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.Recoup, recoupGroupId: rec.recoupGroupId, }); res.taskIds.push(taskId); } } // exchange update and KYC { const exchanges = await tx.exchanges.getAll(); for (const rec of exchanges) { const taskIdUpdate = constructTaskIdentifier({ tag: PendingTaskType.ExchangeUpdate, exchangeBaseUrl: rec.baseUrl, }); res.taskIds.push(taskIdUpdate); const reserveId = rec.currentMergeReserveRowId; if (reserveId == null) { continue; } const reserveRec = await tx.reserves.get(reserveId); if ( reserveRec?.status != null && reserveRec.status != ReserveRecordStatus.Done ) { const taskIdKyc = constructTaskIdentifier({ tag: PendingTaskType.ExchangeWalletKyc, exchangeBaseUrl: rec.baseUrl, }); res.taskIds.push(taskIdKyc); } } } // FIXME: Recoup! }, ); return res; }