/*
This file is part of GNU Taler
(C) 2024 Taler Systems SA
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see
*/
/**
* Imports.
*/
import { GlobalIDB } from "@gnu-taler/idb-bridge";
import {
AbsoluteTime,
AsyncCondition,
CancellationToken,
Duration,
Logger,
NotificationType,
ObservabilityContext,
ObservabilityEventType,
TalerErrorDetail,
TaskThrottler,
TransactionIdStr,
TransactionState,
TransactionType,
WalletNotification,
assertUnreachable,
getErrorDetailFromException,
j2s,
} from "@gnu-taler/taler-util";
import { processBackupForProvider } from "./backup/index.js";
import {
DbRetryInfo,
PendingTaskType,
TaskIdStr,
TaskRunResult,
TaskRunResultType,
constructTaskIdentifier,
getExchangeState,
parseTaskIdentifier,
} from "./common.js";
import {
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
OperationRetryRecord,
WalletDbAllStoresReadOnlyTransaction,
WalletDbReadOnlyTransaction,
timestampAbsoluteFromDb,
} from "./db.js";
import {
computeDepositTransactionStatus,
processDepositGroup,
} from "./deposits.js";
import {
computeDenomLossTransactionStatus,
updateExchangeFromUrlHandler,
} from "./exchanges.js";
import {
computePayMerchantTransactionState,
computeRefundTransactionState,
processPurchase,
} from "./pay-merchant.js";
import {
computePeerPullCreditTransactionState,
processPeerPullCredit,
} from "./pay-peer-pull-credit.js";
import {
computePeerPullDebitTransactionState,
processPeerPullDebit,
} from "./pay-peer-pull-debit.js";
import {
computePeerPushCreditTransactionState,
processPeerPushCredit,
} from "./pay-peer-push-credit.js";
import {
computePeerPushDebitTransactionState,
processPeerPushDebit,
} from "./pay-peer-push-debit.js";
import { processRecoupGroup } from "./recoup.js";
import {
computeRefreshTransactionState,
processRefreshGroup,
} from "./refresh.js";
import {
constructTransactionIdentifier,
parseTransactionIdentifier,
} from "./transactions.js";
import {
InternalWalletState,
WalletExecutionContext,
getNormalWalletExecutionContext,
getObservedWalletExecutionContext,
} from "./wallet.js";
import {
computeWithdrawalTransactionStatus,
processWithdrawalGroup,
} from "./withdraw.js";
const logger = new Logger("shepherd.ts");
/**
* Info about one task being shepherded.
*/
interface ShepherdInfo {
cts: CancellationToken.Source;
}
/**
* Check if a task is alive, i.e. whether it prevents
* the main task loop from exiting.
*/
function taskGivesLiveness(taskId: string): boolean {
const parsedTaskId = parseTaskIdentifier(taskId);
switch (parsedTaskId.tag) {
case PendingTaskType.Backup:
case PendingTaskType.ExchangeUpdate:
return false;
case PendingTaskType.Deposit:
case PendingTaskType.PeerPullCredit:
case PendingTaskType.PeerPullDebit:
case PendingTaskType.PeerPushCredit:
case PendingTaskType.Refresh:
case PendingTaskType.Recoup:
case PendingTaskType.RewardPickup:
case PendingTaskType.Withdraw:
case PendingTaskType.PeerPushDebit:
case PendingTaskType.Purchase:
return true;
default:
assertUnreachable(parsedTaskId);
}
}
export interface TaskScheduler {
ensureRunning(): Promise;
startShepherdTask(taskId: TaskIdStr): void;
stopShepherdTask(taskId: TaskIdStr): void;
resetTaskRetries(taskId: TaskIdStr): Promise;
reload(): Promise;
getActiveTasks(): TaskIdStr[];
isIdle(): boolean;
}
export class TaskSchedulerImpl implements TaskScheduler {
private sheps: Map = new Map();
private iterCond = new AsyncCondition();
private throttler = new TaskThrottler();
isRunning: boolean = false;
constructor(private ws: InternalWalletState) {}
private async loadTasksFromDb(): Promise {
const activeTasks = await getActiveTaskIds(this.ws);
logger.info(`active tasks from DB: ${j2s(activeTasks)}`);
for (const tid of activeTasks.taskIds) {
this.startShepherdTask(tid);
}
}
getActiveTasks(): TaskIdStr[] {
return [...this.sheps.keys()];
}
async ensureRunning(): Promise {
if (this.isRunning) {
return;
}
this.isRunning = true;
try {
await this.loadTasksFromDb();
} catch (e) {
this.isRunning = false;
throw e;
}
this.run()
.catch((e) => {
logger.error("error running task loop");
logger.error(`err: ${e}`);
})
.then(() => {
logger.info("done running task loop");
this.isRunning = false;
});
}
isIdle(): boolean {
let alive = false;
const taskIds = [...this.sheps.keys()];
for (const taskId of taskIds) {
if (taskGivesLiveness(taskId)) {
alive = true;
break;
}
}
// We're idle if no task is alive anymore.
return !alive;
}
private async run(): Promise {
logger.info("Running task loop.");
logger.info(`sheps: ${this.sheps.size}`);
while (true) {
if (this.ws.stopped) {
logger.info("Breaking out of task loop (wallet stopped).");
break;
}
if (this.isIdle()) {
this.ws.notify({
type: NotificationType.Idle,
});
}
await this.iterCond.wait();
}
logger.info("Done with task loop.");
}
startShepherdTask(taskId: TaskIdStr): void {
this.ensureRunning();
// Run in the background, no await!
this.internalStartShepherdTask(taskId);
}
/**
* Stop and re-load all existing tasks.
*
* Mostly useful to interrupt all waits when time-travelling.
*/
async reload(): Promise {
await this.ensureRunning();
const tasksIds = [...this.sheps.keys()];
logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {
this.stopShepherdTask(taskId);
}
for (const taskId of tasksIds) {
this.startShepherdTask(taskId);
}
}
private async internalStartShepherdTask(taskId: TaskIdStr): Promise {
logger.trace(`Starting to shepherd task ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
logger.trace(`Already have a shepherd for ${taskId}`);
return;
}
logger.trace(`Creating new shepherd for ${taskId}`);
const newShep: ShepherdInfo = {
cts: CancellationToken.create(),
};
this.sheps.set(taskId, newShep);
try {
await this.internalShepherdTask(taskId, newShep);
} finally {
logger.trace(`Done shepherding ${taskId}`);
this.sheps.delete(taskId);
this.iterCond.trigger();
}
}
stopShepherdTask(taskId: TaskIdStr): void {
logger.trace(`Stopping shepherding of ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
logger.trace(`Cancelling old shepherd for ${taskId}`);
oldShep.cts.cancel();
this.sheps.delete(taskId);
this.iterCond.trigger();
}
}
restartShepherdTask(taskId: TaskIdStr): void {
this.stopShepherdTask(taskId);
this.startShepherdTask(taskId);
}
async resetTaskRetries(taskId: TaskIdStr): Promise {
const maybeNotification = await this.ws.db.runAllStoresReadWriteTx(
{},
async (tx) => {
await tx.operationRetries.delete(taskId);
return taskToRetryNotification(this.ws, tx, taskId, undefined);
},
);
this.stopShepherdTask(taskId);
if (maybeNotification) {
this.ws.notify(maybeNotification);
}
this.startShepherdTask(taskId);
}
private async wait(
taskId: TaskIdStr,
info: ShepherdInfo,
delay: Duration,
): Promise {
try {
await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay));
} catch (e) {
logger.info(`waiting for ${taskId} interrupted`);
}
}
private async internalShepherdTask(
taskId: TaskIdStr,
info: ShepherdInfo,
): Promise {
while (true) {
if (this.ws.stopped) {
logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`);
return;
}
if (info.cts.token.isCancelled) {
logger.trace(`Shepherd for ${taskId} got cancelled`);
return;
}
const isThrottled = this.throttler.applyThrottle(taskId);
if (isThrottled) {
logger.warn(
`task ${taskId} throttled, this is very likely a bug in wallet-core, please report`,
);
logger.warn("waiting for 60 seconds");
await this.ws.timerGroup.resolveAfter(
Duration.fromSpec({ seconds: 60 }),
);
}
const wex = getWalletExecutionContextForTask(
this.ws,
taskId,
info.cts.token,
);
const startTime = AbsoluteTime.now();
logger.trace(`Shepherd for ${taskId} will call handler`);
let res: TaskRunResult;
try {
res = await callOperationHandlerForTaskId(wex, taskId);
} catch (e) {
res = {
type: TaskRunResultType.Error,
errorDetail: getErrorDetailFromException(e),
};
}
if (info.cts.token.isCancelled) {
logger.info("task cancelled, not processing result");
return;
}
if (this.ws.stopped) {
logger.info("wallet stopped, not processing result");
return;
}
wex.oc.observe({
type: ObservabilityEventType.ShepherdTaskResult,
resultType: res.type,
});
switch (res.type) {
case TaskRunResultType.Error: {
logger.trace(`Shepherd for ${taskId} got error result.`);
const retryRecord = await storePendingTaskError(
this.ws,
taskId,
res.errorDetail,
);
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Backoff: {
logger.trace(`Shepherd for ${taskId} got backoff result.`);
const retryRecord = await storePendingTaskPending(this.ws, taskId);
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
const delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Progress: {
logger.trace(
`Shepherd for ${taskId} got progress result, re-running immediately.`,
);
await storeTaskProgress(this.ws, taskId);
break;
}
case TaskRunResultType.ScheduleLater: {
logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
await storeTaskProgress(this.ws, taskId);
const delay = AbsoluteTime.remaining(res.runAt);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
}
case TaskRunResultType.Finished:
logger.trace(`Shepherd for ${taskId} got finished result.`);
await storePendingTaskFinished(this.ws, taskId);
return;
case TaskRunResultType.LongpollReturnedPending: {
await storeTaskProgress(this.ws, taskId);
// Make sure that we are waiting a bit if long-polling returned too early.
const endTime = AbsoluteTime.now();
const taskDuration = AbsoluteTime.difference(endTime, startTime);
if (
Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0
) {
logger.info(
`long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`,
);
await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 }));
} else {
logger.info(`task ${taskId} will long-poll again`);
}
break;
}
default:
assertUnreachable(res);
}
}
}
}
async function storePendingTaskError(
ws: InternalWalletState,
pendingTaskId: string,
e: TalerErrorDetail,
): Promise {
logger.info(`storing pending task error for ${pendingTaskId}`);
const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
retryRecord = {
id: pendingTaskId,
lastError: e,
retryInfo: DbRetryInfo.reset(),
};
} else {
retryRecord.lastError = e;
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
return {
notification: await taskToRetryNotification(ws, tx, pendingTaskId, e),
retryRecord,
};
});
if (res?.notification) {
ws.notify(res.notification);
}
return res.retryRecord;
}
/**
* Task made progress, clear error.
*/
async function storeTaskProgress(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
await ws.db.runReadWriteTx(["operationRetries"], async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
});
}
async function storePendingTaskPending(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
const res = await ws.db.runAllStoresReadWriteTx({}, async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
let hadError = false;
if (!retryRecord) {
retryRecord = {
id: pendingTaskId,
retryInfo: DbRetryInfo.reset(),
};
} else {
if (retryRecord.lastError) {
hadError = true;
}
delete retryRecord.lastError;
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
let notification: WalletNotification | undefined = undefined;
if (hadError) {
notification = await taskToRetryNotification(
ws,
tx,
pendingTaskId,
undefined,
);
}
return {
notification,
retryRecord,
};
});
if (res.notification) {
ws.notify(res.notification);
}
return res.retryRecord;
}
async function storePendingTaskFinished(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
await ws.db.runReadWriteTx(["operationRetries"], async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
});
}
function getWalletExecutionContextForTask(
ws: InternalWalletState,
taskId: TaskIdStr,
cancellationToken: CancellationToken,
): WalletExecutionContext {
let oc: ObservabilityContext;
let wex: WalletExecutionContext;
if (ws.config.testing.emitObservabilityEvents) {
oc = {
observe(evt) {
if (ws.config.testing.emitObservabilityEvents) {
ws.notify({
type: NotificationType.TaskObservabilityEvent,
taskId,
event: evt,
});
}
},
};
wex = getObservedWalletExecutionContext(ws, cancellationToken, oc);
} else {
oc = {
observe(evt) {},
};
wex = getNormalWalletExecutionContext(ws, cancellationToken, oc);
}
return wex;
}
async function callOperationHandlerForTaskId(
wex: WalletExecutionContext,
taskId: TaskIdStr,
): Promise {
const pending = parseTaskIdentifier(taskId);
switch (pending.tag) {
case PendingTaskType.ExchangeUpdate:
return await updateExchangeFromUrlHandler(wex, pending.exchangeBaseUrl);
case PendingTaskType.Refresh:
return await processRefreshGroup(wex, pending.refreshGroupId);
case PendingTaskType.Withdraw:
return await processWithdrawalGroup(wex, pending.withdrawalGroupId);
case PendingTaskType.Purchase:
return await processPurchase(wex, pending.proposalId);
case PendingTaskType.Recoup:
return await processRecoupGroup(wex, pending.recoupGroupId);
case PendingTaskType.Deposit:
return await processDepositGroup(wex, pending.depositGroupId);
case PendingTaskType.Backup:
return await processBackupForProvider(wex, pending.backupProviderBaseUrl);
case PendingTaskType.PeerPushDebit:
return await processPeerPushDebit(wex, pending.pursePub);
case PendingTaskType.PeerPullCredit:
return await processPeerPullCredit(wex, pending.pursePub);
case PendingTaskType.PeerPullDebit:
return await processPeerPullDebit(wex, pending.peerPullDebitId);
case PendingTaskType.PeerPushCredit:
return await processPeerPushCredit(wex, pending.peerPushCreditId);
case PendingTaskType.RewardPickup:
throw Error("not supported anymore");
default:
return assertUnreachable(pending);
}
throw Error(`not reached ${pending.tag}`);
}
/**
* Generate an appropriate error transition notification
* for applicable tasks.
*
* Namely, transition notifications are generated for:
* - exchange update errors
* - transactions
*/
async function taskToRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
const parsedTaskId = parseTaskIdentifier(pendingTaskId);
switch (parsedTaskId.tag) {
case PendingTaskType.ExchangeUpdate:
return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
case PendingTaskType.PeerPullCredit:
case PendingTaskType.PeerPullDebit:
case PendingTaskType.Withdraw:
case PendingTaskType.PeerPushCredit:
case PendingTaskType.Deposit:
case PendingTaskType.Refresh:
case PendingTaskType.RewardPickup:
case PendingTaskType.PeerPushDebit:
case PendingTaskType.Purchase:
return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
case PendingTaskType.Backup:
case PendingTaskType.Recoup:
return undefined;
}
}
async function getTransactionState(
ws: InternalWalletState,
tx: WalletDbReadOnlyTransaction<
[
"depositGroups",
"withdrawalGroups",
"purchases",
"refundGroups",
"peerPullCredit",
"peerPullDebit",
"peerPushDebit",
"peerPushCredit",
"rewards",
"refreshGroups",
"denomLossEvents",
]
>,
transactionId: string,
): Promise {
const parsedTxId = parseTransactionIdentifier(transactionId);
if (!parsedTxId) {
throw Error("invalid tx identifier");
}
switch (parsedTxId.tag) {
case TransactionType.Deposit: {
const rec = await tx.depositGroups.get(parsedTxId.depositGroupId);
if (!rec) {
return undefined;
}
return computeDepositTransactionStatus(rec);
}
case TransactionType.InternalWithdrawal:
case TransactionType.Withdrawal: {
const rec = await tx.withdrawalGroups.get(parsedTxId.withdrawalGroupId);
if (!rec) {
return undefined;
}
return computeWithdrawalTransactionStatus(rec);
}
case TransactionType.Payment: {
const rec = await tx.purchases.get(parsedTxId.proposalId);
if (!rec) {
return;
}
return computePayMerchantTransactionState(rec);
}
case TransactionType.Refund: {
const rec = await tx.refundGroups.get(parsedTxId.refundGroupId);
if (!rec) {
return undefined;
}
return computeRefundTransactionState(rec);
}
case TransactionType.PeerPullCredit: {
const rec = await tx.peerPullCredit.get(parsedTxId.pursePub);
if (!rec) {
return undefined;
}
return computePeerPullCreditTransactionState(rec);
}
case TransactionType.PeerPullDebit: {
const rec = await tx.peerPullDebit.get(parsedTxId.peerPullDebitId);
if (!rec) {
return undefined;
}
return computePeerPullDebitTransactionState(rec);
}
case TransactionType.PeerPushCredit: {
const rec = await tx.peerPushCredit.get(parsedTxId.peerPushCreditId);
if (!rec) {
return undefined;
}
return computePeerPushCreditTransactionState(rec);
}
case TransactionType.PeerPushDebit: {
const rec = await tx.peerPushDebit.get(parsedTxId.pursePub);
if (!rec) {
return undefined;
}
return computePeerPushDebitTransactionState(rec);
}
case TransactionType.Refresh: {
const rec = await tx.refreshGroups.get(parsedTxId.refreshGroupId);
if (!rec) {
return undefined;
}
return computeRefreshTransactionState(rec);
}
case TransactionType.Recoup:
throw Error("not yet supported");
case TransactionType.DenomLoss: {
const rec = await tx.denomLossEvents.get(parsedTxId.denomLossEventId);
if (!rec) {
return undefined;
}
return computeDenomLossTransactionStatus(rec);
}
default:
assertUnreachable(parsedTxId);
}
}
async function makeTransactionRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
const txId = convertTaskToTransactionId(pendingTaskId);
if (!txId) {
return undefined;
}
const txState = await getTransactionState(ws, tx, txId);
if (!txState) {
return undefined;
}
const notif: WalletNotification = {
type: NotificationType.TransactionStateTransition,
transactionId: txId,
oldTxState: txState,
newTxState: txState,
};
if (e) {
notif.errorInfo = {
code: e.code as number,
hint: e.hint,
};
}
return notif;
}
async function makeExchangeRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
logger.info("making exchange retry notification");
const parsedTaskId = parseTaskIdentifier(pendingTaskId);
if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
throw Error("invalid task identifier");
}
const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
if (!rec) {
logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
return undefined;
}
const notif: WalletNotification = {
type: NotificationType.ExchangeStateTransition,
exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
oldExchangeState: getExchangeState(rec),
newExchangeState: getExchangeState(rec),
};
if (e) {
notif.errorInfo = {
code: e.code as number,
hint: e.hint,
};
}
return notif;
}
export function listTaskForTransactionId(transactionId: string): TaskIdStr[] {
const tid = parseTransactionIdentifier(transactionId);
if (!tid) {
throw Error("invalid task ID");
}
switch (tid.tag) {
case TransactionType.Deposit:
return [
constructTaskIdentifier({
tag: PendingTaskType.Deposit,
depositGroupId: tid.depositGroupId,
}),
];
case TransactionType.InternalWithdrawal:
return [
constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId: tid.withdrawalGroupId,
}),
];
case TransactionType.Payment:
return [
constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId: tid.proposalId,
}),
];
case TransactionType.PeerPullCredit:
return [
constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub: tid.pursePub,
}),
];
case TransactionType.PeerPullDebit:
return [
constructTaskIdentifier({
tag: PendingTaskType.PeerPullDebit,
peerPullDebitId: tid.peerPullDebitId,
}),
];
case TransactionType.PeerPushCredit:
return [
constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub: tid.peerPushCreditId,
}),
];
case TransactionType.PeerPushDebit:
return [
constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub: tid.pursePub,
}),
];
case TransactionType.Recoup:
return [
constructTaskIdentifier({
tag: PendingTaskType.Recoup,
recoupGroupId: tid.recoupGroupId,
}),
];
case TransactionType.Refresh:
return [
constructTaskIdentifier({
tag: PendingTaskType.Refresh,
refreshGroupId: tid.refreshGroupId,
}),
];
case TransactionType.Refund:
return [];
case TransactionType.Withdrawal:
return [
constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId: tid.withdrawalGroupId,
}),
];
case TransactionType.DenomLoss:
return [];
default:
assertUnreachable(tid);
}
}
/**
* Convert the task ID for a task that processes a transaction int
* the ID for the transaction.
*/
export function convertTaskToTransactionId(
taskId: string,
): TransactionIdStr | undefined {
const parsedTaskId = parseTaskIdentifier(taskId);
switch (parsedTaskId.tag) {
case PendingTaskType.PeerPullCredit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
pursePub: parsedTaskId.pursePub,
});
case PendingTaskType.PeerPullDebit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPullDebit,
peerPullDebitId: parsedTaskId.peerPullDebitId,
});
// FIXME: This doesn't distinguish internal-withdrawal.
// Maybe we should have a different task type for that as well?
// Or maybe transaction IDs should be valid task identifiers?
case PendingTaskType.Withdraw:
return constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId: parsedTaskId.withdrawalGroupId,
});
case PendingTaskType.PeerPushCredit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
peerPushCreditId: parsedTaskId.peerPushCreditId,
});
case PendingTaskType.Deposit:
return constructTransactionIdentifier({
tag: TransactionType.Deposit,
depositGroupId: parsedTaskId.depositGroupId,
});
case PendingTaskType.Refresh:
return constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId: parsedTaskId.refreshGroupId,
});
case PendingTaskType.PeerPushDebit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub: parsedTaskId.pursePub,
});
case PendingTaskType.Purchase:
return constructTransactionIdentifier({
tag: TransactionType.Payment,
proposalId: parsedTaskId.proposalId,
});
default:
return undefined;
}
}
export interface ActiveTaskIdsResult {
taskIds: TaskIdStr[];
}
export async function getActiveTaskIds(
ws: InternalWalletState,
): Promise {
const res: ActiveTaskIdsResult = {
taskIds: [],
};
await ws.db.runReadWriteTx(
[
"exchanges",
"refreshGroups",
"withdrawalGroups",
"purchases",
"depositGroups",
"recoupGroups",
"peerPullCredit",
"peerPushDebit",
"peerPullDebit",
"peerPushCredit",
],
async (tx) => {
const active = GlobalIDB.KeyRange.bound(
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
);
// Withdrawals
{
const activeRecs =
await tx.withdrawalGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId: rec.withdrawalGroupId,
});
res.taskIds.push(taskId);
}
}
// Deposits
{
const activeRecs =
await tx.depositGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Deposit,
depositGroupId: rec.depositGroupId,
});
res.taskIds.push(taskId);
}
}
// Refreshes
{
const activeRecs =
await tx.refreshGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Refresh,
refreshGroupId: rec.refreshGroupId,
});
res.taskIds.push(taskId);
}
}
// Purchases
{
const activeRecs = await tx.purchases.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId: rec.proposalId,
});
res.taskIds.push(taskId);
}
}
// peer-push-debit
{
const activeRecs =
await tx.peerPushDebit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub: rec.pursePub,
});
res.taskIds.push(taskId);
}
}
// peer-push-credit
{
const activeRecs =
await tx.peerPushCredit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushCredit,
peerPushCreditId: rec.peerPushCreditId,
});
res.taskIds.push(taskId);
}
}
// peer-pull-debit
{
const activeRecs =
await tx.peerPullDebit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullDebit,
peerPullDebitId: rec.peerPullDebitId,
});
res.taskIds.push(taskId);
}
}
// peer-pull-credit
{
const activeRecs =
await tx.peerPullCredit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub: rec.pursePub,
});
res.taskIds.push(taskId);
}
}
// recoup
{
const activeRecs =
await tx.recoupGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Recoup,
recoupGroupId: rec.recoupGroupId,
});
res.taskIds.push(taskId);
}
}
// exchange update
{
const exchanges = await tx.exchanges.getAll();
for (const rec of exchanges) {
const taskIdUpdate = constructTaskIdentifier({
tag: PendingTaskType.ExchangeUpdate,
exchangeBaseUrl: rec.baseUrl,
});
res.taskIds.push(taskIdUpdate);
}
}
// FIXME: Recoup!
},
);
return res;
}