/*
This file is part of GNU Taler
(C) 2024 Taler Systems SA
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see
*/
/**
* Imports.
*/
import { GlobalIDB } from "@gnu-taler/idb-bridge";
import {
AbsoluteTime,
AsyncCondition,
CancellationToken,
Duration,
Logger,
NotificationType,
ObservabilityContext,
ObservabilityEventType,
TalerErrorDetail,
TaskThrottler,
TransactionIdStr,
TransactionState,
TransactionType,
WalletNotification,
assertUnreachable,
getErrorDetailFromException,
j2s,
safeStringifyException,
} from "@gnu-taler/taler-util";
import { processBackupForProvider } from "./backup/index.js";
import {
DbRetryInfo,
PendingTaskType,
TaskIdStr,
TaskRunResult,
TaskRunResultType,
constructTaskIdentifier,
getExchangeState,
parseTaskIdentifier,
} from "./common.js";
import {
OPERATION_STATUS_NONFINAL_FIRST,
OPERATION_STATUS_NONFINAL_LAST,
OperationRetryRecord,
ReserveRecordStatus,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadOnlyTransaction,
timestampAbsoluteFromDb,
timestampPreciseToDb,
} from "./db.js";
import {
computeDepositTransactionStatus,
processDepositGroup,
} from "./deposits.js";
import {
computeDenomLossTransactionStatus,
processExchangeKyc,
updateExchangeFromUrlHandler,
} from "./exchanges.js";
import {
computePayMerchantTransactionState,
computeRefundTransactionState,
processPurchase,
} from "./pay-merchant.js";
import {
computePeerPullCreditTransactionState,
processPeerPullCredit,
} from "./pay-peer-pull-credit.js";
import {
computePeerPullDebitTransactionState,
processPeerPullDebit,
} from "./pay-peer-pull-debit.js";
import {
computePeerPushCreditTransactionState,
processPeerPushCredit,
} from "./pay-peer-push-credit.js";
import {
computePeerPushDebitTransactionState,
processPeerPushDebit,
} from "./pay-peer-push-debit.js";
import { processRecoupGroup } from "./recoup.js";
import {
computeRefreshTransactionState,
processRefreshGroup,
} from "./refresh.js";
import {
constructTransactionIdentifier,
parseTransactionIdentifier,
} from "./transactions.js";
import {
InternalWalletState,
WalletExecutionContext,
getNormalWalletExecutionContext,
getObservedWalletExecutionContext,
} from "./wallet.js";
import {
computeWithdrawalTransactionStatus,
processWithdrawalGroup,
} from "./withdraw.js";
const logger = new Logger("shepherd.ts");
/**
* Info about one task being shepherded.
*/
interface ShepherdInfo {
cts: CancellationToken.Source;
latch?: Promise;
stopped: boolean;
}
/**
* Check if a task is alive, i.e. whether it prevents
* the main task loop from exiting.
*/
function taskGivesLiveness(taskId: string): boolean {
const parsedTaskId = parseTaskIdentifier(taskId);
switch (parsedTaskId.tag) {
case PendingTaskType.Backup:
case PendingTaskType.ExchangeUpdate:
case PendingTaskType.ExchangeWalletKyc:
return false;
case PendingTaskType.Deposit:
case PendingTaskType.PeerPullCredit:
case PendingTaskType.PeerPullDebit:
case PendingTaskType.PeerPushCredit:
case PendingTaskType.Refresh:
case PendingTaskType.Recoup:
case PendingTaskType.RewardPickup:
case PendingTaskType.Withdraw:
case PendingTaskType.PeerPushDebit:
case PendingTaskType.Purchase:
return true;
default:
assertUnreachable(parsedTaskId);
}
}
export interface TaskScheduler {
/**
* Ensure that the task scheduler is running.
*
* If it is not running, start it, with previous
* tasks loaded from the database.
*
* Returns after the scheduler is running.
*/
ensureRunning(): Promise;
startShepherdTask(taskId: TaskIdStr): void;
stopShepherdTask(taskId: TaskIdStr): void;
resetTaskRetries(taskId: TaskIdStr): Promise;
reload(): Promise;
getActiveTasks(): TaskIdStr[];
isIdle(): boolean;
shutdown(): Promise;
}
export class TaskSchedulerImpl implements TaskScheduler {
private sheps: Map = new Map();
private iterCond = new AsyncCondition();
private throttler = new TaskThrottler();
isRunning: boolean = false;
constructor(private ws: InternalWalletState) {}
private async loadTasksFromDb(): Promise {
const activeTasks = await getActiveTaskIds(this.ws);
logger.info(`active tasks from DB: ${j2s(activeTasks)}`);
for (const tid of activeTasks.taskIds) {
this.startShepherdTask(tid);
}
}
getActiveTasks(): TaskIdStr[] {
return [...this.sheps.keys()];
}
async shutdown(): Promise {
const tasksIds = [...this.sheps.keys()];
logger.info(`Stopping task shepherd.`);
for (const taskId of tasksIds) {
this.stopShepherdTask(taskId);
}
}
/**
* @see TaskScheduler.ensureRunning
*/
async ensureRunning(): Promise {
if (this.isRunning) {
return;
}
this.isRunning = true;
try {
await this.loadTasksFromDb();
} catch (e) {
this.isRunning = false;
throw e;
}
this.run()
.catch((e) => {
logger.error("error running task loop");
logger.error(`err: ${e}`);
})
.then(() => {
logger.trace("done running task loop");
this.isRunning = false;
});
}
isIdle(): boolean {
let alive = false;
const taskIds = [...this.sheps.keys()];
for (const taskId of taskIds) {
if (taskGivesLiveness(taskId)) {
alive = true;
break;
}
}
// We're idle if no task is alive anymore.
return !alive;
}
private async run(): Promise {
logger.trace("Running task loop.");
logger.trace(`sheps: ${this.sheps.size}`);
while (true) {
if (this.ws.stopped) {
logger.trace("Breaking out of task loop (wallet stopped).");
break;
}
if (this.isIdle()) {
this.ws.notify({
type: NotificationType.Idle,
});
}
await this.iterCond.wait();
}
logger.trace("Done with task loop.");
}
startShepherdTask(taskId: TaskIdStr): void {
this.ensureRunning().catch((e) => {
logger.error(`error running scheduler: ${safeStringifyException(e)}`);
});
// Run in the background, no await!
this.internalStartShepherdTask(taskId);
}
/**
* Stop and re-load all existing tasks.
*
* Mostly useful to interrupt all waits when time-travelling.
*/
async reload(): Promise {
await this.ensureRunning();
const tasksIds = [...this.sheps.keys()];
logger.info(`reloading shepherd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {
this.stopShepherdTask(taskId);
}
for (const taskId of tasksIds) {
this.startShepherdTask(taskId);
}
}
private async internalStartShepherdTask(taskId: TaskIdStr): Promise {
logger.trace(`Starting to shepherd task ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
if (!oldShep.stopped) {
logger.trace(`Already have a shepherd for ${taskId}`);
return;
}
logger.trace(
`Waiting for old task to complete the loop in cancel mode ${taskId}`,
);
await oldShep.latch;
logger.trace(`Old task ${taskId} completed in cancel mode`);
}
logger.trace(`Creating new shepherd for ${taskId}`);
const newShep: ShepherdInfo = {
cts: CancellationToken.create(),
stopped: false,
};
this.sheps.set(taskId, newShep);
try {
newShep.latch = this.internalShepherdTask(taskId, newShep);
await newShep.latch;
} finally {
logger.trace(`Done shepherding ${taskId}`);
this.sheps.delete(taskId);
this.iterCond.trigger();
}
}
stopShepherdTask(taskId: TaskIdStr): void {
logger.trace(`Stopping shepherding of ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
logger.trace(`Cancelling old shepherd for ${taskId}`);
oldShep.cts.cancel(`stopping task ${taskId}`);
oldShep.stopped = true;
this.iterCond.trigger();
}
}
restartShepherdTask(taskId: TaskIdStr): void {
this.stopShepherdTask(taskId);
this.startShepherdTask(taskId);
}
async resetTaskRetries(taskId: TaskIdStr): Promise {
const maybeNotification = await this.ws.db.runAllStoresReadWriteTx(
{},
async (tx) => {
logger.trace(`storing task [reset] for ${taskId}`);
await tx.operationRetries.delete(taskId);
return taskToRetryNotification(this.ws, tx, taskId, undefined);
},
);
this.stopShepherdTask(taskId);
if (maybeNotification) {
this.ws.notify(maybeNotification);
}
this.startShepherdTask(taskId);
}
private async wait(
taskId: TaskIdStr,
info: ShepherdInfo,
delay: Duration,
): Promise {
try {
await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay));
} catch (e) {
if (e instanceof CancellationToken.CancellationError) {
logger.info(
`waiting for ${taskId} interrupted: ${e.message} ${j2s(e.reason)}`,
);
} else {
logger.info(`waiting for ${taskId} interrupted: ${e}`);
}
}
}
private async internalShepherdTask(
taskId: TaskIdStr,
info: ShepherdInfo,
): Promise {
while (true) {
if (this.ws.stopped) {
logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`);
return;
}
if (info.cts.token.isCancelled) {
logger.trace(`Shepherd for ${taskId} got cancelled`);
return;
}
const isThrottled = this.throttler.applyThrottle(taskId);
if (isThrottled) {
logger.warn(
`task ${taskId} throttled, this is very likely a bug in wallet-core, please report`,
);
logger.warn("waiting for 60 seconds");
await this.ws.timerGroup.resolveAfter(
Duration.fromSpec({ seconds: 60 }),
);
}
const wex = getWalletExecutionContextForTask(
this.ws,
taskId,
info.cts.token,
);
const startTime = AbsoluteTime.now();
logger.trace(`Shepherd for ${taskId} will call handler`);
let res: TaskRunResult;
try {
res = await callOperationHandlerForTaskId(wex, taskId);
} catch (e) {
const errorDetail = getErrorDetailFromException(e);
logger.trace(
`Shepherd error ${taskId} saving response ${j2s(errorDetail)}`,
);
res = {
type: TaskRunResultType.Error,
errorDetail,
};
}
if (info.cts.token.isCancelled) {
logger.trace(`task ${taskId} cancelled, not processing result`);
return;
}
if (this.ws.stopped) {
logger.trace("wallet stopped, not processing result");
return;
}
wex.oc.observe({
type: ObservabilityEventType.ShepherdTaskResult,
resultType: res.type,
});
switch (res.type) {
case TaskRunResultType.Error: {
logger.trace(
`Shepherd for ${taskId} got error result: ${j2s(res.errorDetail)}`,
);
const retryRecord = await storePendingTaskError(
this.ws,
taskId,
res.errorDetail,
);
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Backoff: {
logger.trace(`Shepherd for ${taskId} got backoff result.`);
const retryRecord = await storePendingTaskPending(this.ws, taskId);
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Progress: {
logger.trace(
`Shepherd for ${taskId} got progress result, re-running immediately.`,
);
await storeTaskProgress(this.ws, taskId);
break;
}
case TaskRunResultType.ScheduleLater: {
logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
const retryRecord = await storePendingTaskPending(
this.ws,
taskId,
res.runAt,
);
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Finished:
logger.trace(`Shepherd for ${taskId} got finished result.`);
await storePendingTaskFinished(this.ws, taskId);
return;
case TaskRunResultType.LongpollReturnedPending: {
await storeTaskProgress(this.ws, taskId);
// Make sure that we are waiting a bit if long-polling returned too early.
const endTime = AbsoluteTime.now();
const taskDuration = AbsoluteTime.difference(endTime, startTime);
if (
Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0
) {
logger.info(
`long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`,
);
await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 }));
} else {
logger.info(`task ${taskId} will long-poll again`);
}
break;
}
case TaskRunResultType.NetworkRequired: {
logger.trace(`Shepherd for ${taskId} got network-required result.`);
await storePendingTaskPending(this.ws, taskId);
const delay = Duration.getForever();
logger.trace(`Not retrying task until network is restored.`);
await this.wait(taskId, info, delay);
break;
}
default:
assertUnreachable(res);
}
}
}
}
async function storePendingTaskError(
ws: InternalWalletState,
pendingTaskId: string,
e: TalerErrorDetail,
): Promise {
logger.trace(`storing task [pending] with ERROR for ${pendingTaskId}`);
const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
retryRecord = {
id: pendingTaskId,
lastError: e,
retryInfo: DbRetryInfo.reset(),
};
} else {
retryRecord.lastError = e;
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
return {
notification: await taskToRetryNotification(ws, tx, pendingTaskId, e),
retryRecord,
};
});
if (res?.notification) {
ws.notify(res.notification);
}
return res.retryRecord;
}
/**
* Task made progress, clear error.
*/
async function storeTaskProgress(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
logger.trace(`storing task [progress] for ${pendingTaskId}`);
await ws.db.runReadWriteTx(
{ storeNames: ["operationRetries"] },
async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
},
);
}
async function storePendingTaskPending(
ws: InternalWalletState,
pendingTaskId: string,
schedTime?: AbsoluteTime,
): Promise {
logger.trace(`storing task [pending] for ${pendingTaskId}`);
const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
let hadError = false;
if (!retryRecord) {
retryRecord = {
id: pendingTaskId,
retryInfo: DbRetryInfo.reset(),
};
} else {
if (retryRecord.lastError) {
hadError = true;
}
delete retryRecord.lastError;
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
if (schedTime) {
retryRecord.retryInfo.nextRetry = timestampPreciseToDb(
AbsoluteTime.toPreciseTimestamp(schedTime),
);
}
await tx.operationRetries.put(retryRecord);
let notification: WalletNotification | undefined = undefined;
if (hadError) {
notification = await taskToRetryNotification(
ws,
tx,
pendingTaskId,
undefined,
);
}
return {
notification,
retryRecord,
};
});
if (res.notification) {
ws.notify(res.notification);
}
return res.retryRecord;
}
async function storePendingTaskFinished(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
logger.trace(`storing task [finished] for ${pendingTaskId}`);
await ws.db.runReadWriteTx(
{ storeNames: ["operationRetries"] },
async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
},
);
}
function getWalletExecutionContextForTask(
ws: InternalWalletState,
taskId: TaskIdStr,
cancellationToken: CancellationToken,
): WalletExecutionContext {
let oc: ObservabilityContext;
let wex: WalletExecutionContext;
if (ws.config.testing.emitObservabilityEvents) {
oc = {
observe(evt) {
if (ws.config.testing.emitObservabilityEvents) {
ws.notify({
type: NotificationType.TaskObservabilityEvent,
taskId,
event: evt,
});
}
},
};
wex = getObservedWalletExecutionContext(
ws,
cancellationToken,
undefined,
oc,
);
} else {
oc = {
observe(evt) {},
};
wex = getNormalWalletExecutionContext(ws, cancellationToken, undefined, oc);
}
return wex;
}
async function callOperationHandlerForTaskId(
wex: WalletExecutionContext,
taskId: TaskIdStr,
): Promise {
const pending = parseTaskIdentifier(taskId);
const txId = convertTaskToTransactionId(taskId);
if (txId) {
wex.oc.observe({
type: ObservabilityEventType.DeclareConcernsTransaction,
transactionId: txId,
});
}
switch (pending.tag) {
case PendingTaskType.ExchangeUpdate:
return await updateExchangeFromUrlHandler(wex, pending.exchangeBaseUrl);
case PendingTaskType.Refresh:
return await processRefreshGroup(wex, pending.refreshGroupId);
case PendingTaskType.Withdraw:
return await processWithdrawalGroup(wex, pending.withdrawalGroupId);
case PendingTaskType.Purchase:
return await processPurchase(wex, pending.proposalId);
case PendingTaskType.Recoup:
return await processRecoupGroup(wex, pending.recoupGroupId);
case PendingTaskType.Deposit:
return await processDepositGroup(wex, pending.depositGroupId);
case PendingTaskType.Backup:
return await processBackupForProvider(wex, pending.backupProviderBaseUrl);
case PendingTaskType.PeerPushDebit:
return await processPeerPushDebit(wex, pending.pursePub);
case PendingTaskType.PeerPullCredit:
return await processPeerPullCredit(wex, pending.pursePub);
case PendingTaskType.PeerPullDebit:
return await processPeerPullDebit(wex, pending.peerPullDebitId);
case PendingTaskType.PeerPushCredit:
return await processPeerPushCredit(wex, pending.peerPushCreditId);
case PendingTaskType.ExchangeWalletKyc:
return await processExchangeKyc(wex, pending.exchangeBaseUrl);
case PendingTaskType.RewardPickup:
throw Error("not supported anymore");
default:
return assertUnreachable(pending);
}
throw Error(`not reached ${pending.tag}`);
}
/**
* Generate an appropriate error transition notification
* for applicable tasks.
*
* Namely, transition notifications are generated for:
* - exchange update errors
* - transactions
*/
async function taskToRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
const parsedTaskId = parseTaskIdentifier(pendingTaskId);
switch (parsedTaskId.tag) {
case PendingTaskType.ExchangeUpdate:
case PendingTaskType.ExchangeWalletKyc:
return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
case PendingTaskType.PeerPullCredit:
case PendingTaskType.PeerPullDebit:
case PendingTaskType.Withdraw:
case PendingTaskType.PeerPushCredit:
case PendingTaskType.Deposit:
case PendingTaskType.Refresh:
case PendingTaskType.RewardPickup:
case PendingTaskType.PeerPushDebit:
case PendingTaskType.Purchase:
return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
case PendingTaskType.Backup:
case PendingTaskType.Recoup:
return undefined;
}
}
async function getTransactionState(
ws: InternalWalletState,
tx: WalletDbReadOnlyTransaction<
[
"depositGroups",
"withdrawalGroups",
"purchases",
"refundGroups",
"peerPullCredit",
"peerPullDebit",
"peerPushDebit",
"peerPushCredit",
"rewards",
"refreshGroups",
"denomLossEvents",
]
>,
transactionId: string,
): Promise {
const parsedTxId = parseTransactionIdentifier(transactionId);
if (!parsedTxId) {
throw Error("invalid tx identifier");
}
switch (parsedTxId.tag) {
case TransactionType.Deposit: {
const rec = await tx.depositGroups.get(parsedTxId.depositGroupId);
if (!rec) {
return undefined;
}
return computeDepositTransactionStatus(rec);
}
case TransactionType.InternalWithdrawal:
case TransactionType.Withdrawal: {
const rec = await tx.withdrawalGroups.get(parsedTxId.withdrawalGroupId);
if (!rec) {
return undefined;
}
return computeWithdrawalTransactionStatus(rec);
}
case TransactionType.Payment: {
const rec = await tx.purchases.get(parsedTxId.proposalId);
if (!rec) {
return;
}
return computePayMerchantTransactionState(rec);
}
case TransactionType.Refund: {
const rec = await tx.refundGroups.get(parsedTxId.refundGroupId);
if (!rec) {
return undefined;
}
return computeRefundTransactionState(rec);
}
case TransactionType.PeerPullCredit: {
const rec = await tx.peerPullCredit.get(parsedTxId.pursePub);
if (!rec) {
return undefined;
}
return computePeerPullCreditTransactionState(rec);
}
case TransactionType.PeerPullDebit: {
const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId);
if (!rec) {
return undefined;
}
return computePeerPullDebitTransactionState(rec);
}
case TransactionType.PeerPushCredit: {
const rec = await tx.peerPushCredit.get(parsedTxId.peerPushCreditId);
if (!rec) {
return undefined;
}
return computePeerPushCreditTransactionState(rec);
}
case TransactionType.PeerPushDebit: {
const rec = await tx.peerPushDebit.get(parsedTxId.pursePub);
if (!rec) {
return undefined;
}
return computePeerPushDebitTransactionState(rec);
}
case TransactionType.Refresh: {
const rec = await tx.refreshGroups.get(parsedTxId.refreshGroupId);
if (!rec) {
return undefined;
}
return computeRefreshTransactionState(rec);
}
case TransactionType.Recoup:
throw Error("not yet supported");
case TransactionType.DenomLoss: {
const rec = await tx.denomLossEvents.get(parsedTxId.denomLossEventId);
if (!rec) {
return undefined;
}
return computeDenomLossTransactionStatus(rec);
}
default:
assertUnreachable(parsedTxId);
}
}
async function makeTransactionRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
const txId = convertTaskToTransactionId(pendingTaskId);
if (!txId) {
return undefined;
}
const txState = await getTransactionState(ws, tx, txId);
if (!txState) {
return undefined;
}
const notif: WalletNotification = {
type: NotificationType.TransactionStateTransition,
transactionId: txId,
oldTxState: txState,
newTxState: txState,
};
if (e) {
notif.errorInfo = {
code: e.code as number,
hint: e.hint,
};
}
return notif;
}
async function makeExchangeRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
logger.info("making exchange retry notification");
const parsedTaskId = parseTaskIdentifier(pendingTaskId);
switch (parsedTaskId.tag) {
case PendingTaskType.ExchangeUpdate:
case PendingTaskType.ExchangeWalletKyc:
break;
default:
throw Error("invalid task identifier");
}
const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
if (!rec) {
logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
return undefined;
}
const notif: WalletNotification = {
type: NotificationType.ExchangeStateTransition,
exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
oldExchangeState: getExchangeState(rec),
newExchangeState: getExchangeState(rec),
};
if (e) {
notif.errorInfo = {
code: e.code as number,
hint: e.hint,
};
}
return notif;
}
/**
* Convert the task ID for a task that processes a transaction int
* the ID for the transaction.
*/
export function convertTaskToTransactionId(
taskId: string,
): TransactionIdStr | undefined {
const parsedTaskId = parseTaskIdentifier(taskId);
switch (parsedTaskId.tag) {
case PendingTaskType.PeerPullCredit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
pursePub: parsedTaskId.pursePub,
});
case PendingTaskType.PeerPullDebit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPullDebit,
peerPullDebitId: parsedTaskId.peerPullDebitId,
});
// FIXME: This doesn't distinguish internal-withdrawal.
// Maybe we should have a different task type for that as well?
// Or maybe transaction IDs should be valid task identifiers?
case PendingTaskType.Withdraw:
return constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId: parsedTaskId.withdrawalGroupId,
});
case PendingTaskType.PeerPushCredit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
peerPushCreditId: parsedTaskId.peerPushCreditId,
});
case PendingTaskType.Deposit:
return constructTransactionIdentifier({
tag: TransactionType.Deposit,
depositGroupId: parsedTaskId.depositGroupId,
});
case PendingTaskType.Refresh:
return constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId: parsedTaskId.refreshGroupId,
});
case PendingTaskType.PeerPushDebit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub: parsedTaskId.pursePub,
});
case PendingTaskType.Purchase:
return constructTransactionIdentifier({
tag: TransactionType.Payment,
proposalId: parsedTaskId.proposalId,
});
default:
return undefined;
}
}
export interface ActiveTaskIdsResult {
taskIds: TaskIdStr[];
}
export async function getActiveTaskIds(
ws: InternalWalletState,
): Promise {
const res: ActiveTaskIdsResult = {
taskIds: [],
};
await ws.db.runReadWriteTx(
{
storeNames: [
"exchanges",
"refreshGroups",
"withdrawalGroups",
"purchases",
"depositGroups",
"recoupGroups",
"peerPullCredit",
"peerPushDebit",
"peerPullDebit",
"peerPushCredit",
"reserves",
],
},
async (tx) => {
const active = GlobalIDB.KeyRange.bound(
OPERATION_STATUS_NONFINAL_FIRST,
OPERATION_STATUS_NONFINAL_LAST,
);
// Withdrawals
{
const activeRecs =
await tx.withdrawalGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId: rec.withdrawalGroupId,
});
res.taskIds.push(taskId);
}
}
// Deposits
{
const activeRecs =
await tx.depositGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Deposit,
depositGroupId: rec.depositGroupId,
});
res.taskIds.push(taskId);
}
}
// Refreshes
{
const activeRecs =
await tx.refreshGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Refresh,
refreshGroupId: rec.refreshGroupId,
});
res.taskIds.push(taskId);
}
}
// Purchases
{
const activeRecs = await tx.purchases.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId: rec.proposalId,
});
res.taskIds.push(taskId);
}
}
// peer-push-debit
{
const activeRecs =
await tx.peerPushDebit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub: rec.pursePub,
});
res.taskIds.push(taskId);
}
}
// peer-push-credit
{
const activeRecs =
await tx.peerPushCredit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushCredit,
peerPushCreditId: rec.peerPushCreditId,
});
res.taskIds.push(taskId);
}
}
// peer-pull-debit
{
const activeRecs =
await tx.peerPullDebit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullDebit,
peerPullDebitId: rec.peerPullDebitId,
});
res.taskIds.push(taskId);
}
}
// peer-pull-credit
{
const activeRecs =
await tx.peerPullCredit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub: rec.pursePub,
});
res.taskIds.push(taskId);
}
}
// recoup
{
const activeRecs =
await tx.recoupGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Recoup,
recoupGroupId: rec.recoupGroupId,
});
res.taskIds.push(taskId);
}
}
// exchange update and KYC
{
const exchanges = await tx.exchanges.getAll();
for (const rec of exchanges) {
const taskIdUpdate = constructTaskIdentifier({
tag: PendingTaskType.ExchangeUpdate,
exchangeBaseUrl: rec.baseUrl,
});
res.taskIds.push(taskIdUpdate);
const reserveId = rec.currentMergeReserveRowId;
if (reserveId == null) {
continue;
}
const reserveRec = await tx.reserves.get(reserveId);
if (
reserveRec?.status != null &&
reserveRec.status != ReserveRecordStatus.Done
) {
const taskIdKyc = constructTaskIdentifier({
tag: PendingTaskType.ExchangeWalletKyc,
exchangeBaseUrl: rec.baseUrl,
});
res.taskIds.push(taskIdKyc);
}
}
}
// FIXME: Recoup!
},
);
return res;
}