/*
This file is part of GNU Taler
(C) 2024 Taler Systems SA
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see
*/
/**
* Imports.
*/
import { GlobalIDB } from "@gnu-taler/idb-bridge";
import {
AbsoluteTime,
AsyncCondition,
CancellationToken,
Duration,
Logger,
NotificationType,
RetryLoopOpts,
TalerError,
TalerErrorCode,
TalerErrorDetail,
TaskThrottler,
TransactionIdStr,
TransactionType,
WalletNotification,
assertUnreachable,
j2s,
makeErrorDetail,
} from "@gnu-taler/taler-util";
import { processBackupForProvider } from "./backup/index.js";
import {
DbRetryInfo,
PendingTaskType,
TaskIdStr,
TaskRunResult,
TaskRunResultType,
constructTaskIdentifier,
getExchangeState,
parseTaskIdentifier,
} from "./common.js";
import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
import {
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
WalletDbAllStoresReadOnlyTransaction,
timestampAbsoluteFromDb,
} from "./db.js";
import { processDepositGroup } from "./deposits.js";
import { updateExchangeFromUrlHandler } from "./exchanges.js";
import { processPurchase } from "./pay-merchant.js";
import { processPeerPullCredit } from "./pay-peer-pull-credit.js";
import { processPeerPullDebit } from "./pay-peer-pull-debit.js";
import { processPeerPushCredit } from "./pay-peer-push-credit.js";
import { processPeerPushDebit } from "./pay-peer-push-debit.js";
import { processRecoupGroup } from "./recoup.js";
import { processRefreshGroup } from "./refresh.js";
import { constructTransactionIdentifier } from "./transactions.js";
import { InternalWalletState } from "./wallet.js";
import { processWithdrawalGroup } from "./withdraw.js";
const logger = new Logger("shepherd.ts");
/**
* Info about one task being shepherded.
*/
interface ShepherdInfo {
cts: CancellationToken.Source;
}
/**
* Check if a task is alive, i.e. whether it prevents
* the main task loop from exiting.
*/
function taskGivesLiveness(taskId: string): boolean {
const parsedTaskId = parseTaskIdentifier(taskId);
switch (parsedTaskId.tag) {
case PendingTaskType.Backup:
case PendingTaskType.ExchangeUpdate:
return false;
case PendingTaskType.Deposit:
case PendingTaskType.PeerPullCredit:
case PendingTaskType.PeerPullDebit:
case PendingTaskType.PeerPushCredit:
case PendingTaskType.Refresh:
case PendingTaskType.Recoup:
case PendingTaskType.RewardPickup:
case PendingTaskType.Withdraw:
case PendingTaskType.PeerPushDebit:
case PendingTaskType.Purchase:
return true;
default:
assertUnreachable(parsedTaskId);
}
}
export class TaskScheduler {
private sheps: Map = new Map();
private iterCond = new AsyncCondition();
private throttler = new TaskThrottler();
constructor(private ws: InternalWalletState) {}
async loadTasksFromDb(): Promise {
const activeTasks = await getActiveTaskIds(this.ws);
logger.info(`active tasks from DB: ${j2s(activeTasks)}`);
for (const tid of activeTasks.taskIds) {
this.startShepherdTask(tid);
}
}
async run(opts: RetryLoopOpts = {}): Promise {
logger.info("Running task loop.");
this.ws.isTaskLoopRunning = true;
await this.loadTasksFromDb();
logger.info("loaded!");
logger.info(`sheps: ${this.sheps.size}`);
while (true) {
if (opts.stopWhenDone) {
let alive = false;
const taskIds = [...this.sheps.keys()];
logger.info(`current task IDs: ${j2s(taskIds)}`);
logger.info(`sheps: ${this.sheps.size}`);
for (const taskId of taskIds) {
if (taskGivesLiveness(taskId)) {
alive = true;
break;
}
}
if (!alive) {
logger.info("Breaking out of task loop (no more work).");
break;
}
}
if (this.ws.stopped) {
logger.info("Breaking out of task loop (wallet stopped).");
break;
}
await this.iterCond.wait();
}
this.ws.isTaskLoopRunning = false;
logger.info("Done with task loop.");
}
startShepherdTask(taskId: TaskIdStr): void {
// Run in the background, no await!
this.internalStartShepherdTask(taskId);
}
/**
* Stop and re-load all existing tasks.
*
* Mostly useful to interrupt all waits when time-travelling.
*/
reload() {
const tasksIds = [...this.sheps.keys()];
logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
for (const taskId of tasksIds) {
this.stopShepherdTask(taskId);
}
for (const taskId of tasksIds) {
this.startShepherdTask(taskId);
}
}
private async internalStartShepherdTask(taskId: TaskIdStr): Promise {
logger.trace(`Starting to shepherd task ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
logger.trace(`Already have a shepherd for ${taskId}`);
return;
}
logger.trace(`Creating new shepherd for ${taskId}`);
const newShep: ShepherdInfo = {
cts: CancellationToken.create(),
};
this.sheps.set(taskId, newShep);
try {
await this.internalShepherdTask(taskId, newShep);
} finally {
logger.trace(`Done shepherding ${taskId}`);
this.sheps.delete(taskId);
this.iterCond.trigger();
}
}
stopShepherdTask(taskId: TaskIdStr): void {
logger.trace(`Stopping shepherding of ${taskId}`);
const oldShep = this.sheps.get(taskId);
if (oldShep) {
logger.trace(`Cancelling old shepherd for ${taskId}`);
oldShep.cts.cancel();
this.sheps.delete(taskId);
this.iterCond.trigger();
}
}
restartShepherdTask(taskId: TaskIdStr): void {
this.stopShepherdTask(taskId);
this.startShepherdTask(taskId);
}
async resetTaskRetries(taskId: TaskIdStr): Promise {
const maybeNotification = await this.ws.db.runAllStoresReadWriteTx(
async (tx) => {
await tx.operationRetries.delete(taskId);
return taskToRetryNotification(this.ws, tx, taskId, undefined);
},
);
this.stopShepherdTask(taskId);
if (maybeNotification) {
this.ws.notify(maybeNotification);
}
this.startShepherdTask(taskId);
}
private async wait(
taskId: TaskIdStr,
info: ShepherdInfo,
delay: Duration,
): Promise {
try {
await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay));
} catch (e) {
logger.info(`waiting for ${taskId} interrupted`);
}
}
private async internalShepherdTask(
taskId: TaskIdStr,
info: ShepherdInfo,
): Promise {
while (true) {
if (this.ws.stopped) {
logger.trace(`Shepherd for ${taskId} stopping as wallet is stopped`);
return;
}
if (info.cts.token.isCancelled) {
logger.trace(`Shepherd for ${taskId} got cancelled`);
return;
}
const isThrottled = this.throttler.applyThrottle(taskId);
if (isThrottled) {
logger.warn(
`task ${taskId} throttled, this is very likely a bug in wallet-core, please report`,
);
logger.warn("waiting for 60 seconds");
await this.ws.timerGroup.resolveAfter(
Duration.fromSpec({ seconds: 60 }),
);
}
const startTime = AbsoluteTime.now();
logger.trace(`Shepherd for ${taskId} will call handler`);
// FIXME: This should already return the retry record.
const res = await runTaskWithErrorReporting(this.ws, taskId, async () => {
return await callOperationHandlerForTaskId(
this.ws,
taskId,
info.cts.token,
);
});
const retryRecord = await this.ws.db.runReadOnlyTx(
["operationRetries"],
async (tx) => {
return tx.operationRetries.get(taskId);
},
);
switch (res.type) {
case TaskRunResultType.Error: {
logger.trace(`Shepherd for ${taskId} got error result.`);
if (retryRecord) {
let delay: Duration;
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
} else {
logger.trace("Retrying immediately.");
}
break;
}
case TaskRunResultType.Backoff: {
logger.trace(`Shepherd for ${taskId} got backoff result.`);
if (retryRecord) {
let delay: Duration;
const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
delay = AbsoluteTime.remaining(t);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
} else {
logger.trace("Retrying immediately.");
}
break;
}
case TaskRunResultType.Progress: {
logger.trace(
`Shepherd for ${taskId} got progress result, re-running immediately.`,
);
break;
}
case TaskRunResultType.ScheduleLater:
logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
const delay = AbsoluteTime.remaining(res.runAt);
logger.trace(`Waiting for ${delay.d_ms} ms`);
await this.wait(taskId, info, delay);
break;
case TaskRunResultType.Finished:
logger.trace(`Shepherd for ${taskId} got finished result.`);
return;
case TaskRunResultType.LongpollReturnedPending: {
// Make sure that we are waiting a bit if long-polling returned too early.
const endTime = AbsoluteTime.now();
const taskDuration = AbsoluteTime.difference(endTime, startTime);
if (
Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0
) {
logger.info(
`long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`,
);
await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 }));
} else {
logger.info(`task ${taskId} will long-poll again`);
}
break;
}
default:
assertUnreachable(res);
}
}
}
}
async function storePendingTaskError(
ws: InternalWalletState,
pendingTaskId: string,
e: TalerErrorDetail,
): Promise {
logger.info(`storing pending task error for ${pendingTaskId}`);
const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
retryRecord = {
id: pendingTaskId,
lastError: e,
retryInfo: DbRetryInfo.reset(),
};
} else {
retryRecord.lastError = e;
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
return taskToRetryNotification(ws, tx, pendingTaskId, e);
});
if (maybeNotification) {
ws.notify(maybeNotification);
}
}
/**
* Task made progress, clear error.
*/
async function storeTaskProgress(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
await ws.db.runReadWriteTx(["operationRetries"], async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
});
}
async function storePendingTaskPending(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
let hadError = false;
if (!retryRecord) {
retryRecord = {
id: pendingTaskId,
retryInfo: DbRetryInfo.reset(),
};
} else {
if (retryRecord.lastError) {
hadError = true;
}
delete retryRecord.lastError;
retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
if (hadError) {
return taskToRetryNotification(ws, tx, pendingTaskId, undefined);
} else {
return undefined;
}
});
if (maybeNotification) {
ws.notify(maybeNotification);
}
}
async function storePendingTaskFinished(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
await ws.db.runReadWriteTx(["operationRetries"], async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
});
}
async function runTaskWithErrorReporting(
ws: InternalWalletState,
opId: TaskIdStr,
f: () => Promise,
): Promise {
let maybeError: TalerErrorDetail | undefined;
try {
const resp = await f();
switch (resp.type) {
case TaskRunResultType.Error:
await storePendingTaskError(ws, opId, resp.errorDetail);
return resp;
case TaskRunResultType.Finished:
await storePendingTaskFinished(ws, opId);
return resp;
case TaskRunResultType.Backoff:
await storePendingTaskPending(ws, opId);
return resp;
case TaskRunResultType.ScheduleLater:
// Task succeeded but wants to be run again.
await storeTaskProgress(ws, opId);
return resp;
case TaskRunResultType.Progress:
await storeTaskProgress(ws, opId);
return resp;
case TaskRunResultType.LongpollReturnedPending:
// Longpoll should be run again immediately.
await storeTaskProgress(ws, opId);
return resp;
}
} catch (e) {
if (e instanceof CryptoApiStoppedError) {
if (ws.stopped) {
logger.warn("crypto API stopped during shutdown, ignoring error");
return {
type: TaskRunResultType.Error,
errorDetail: makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
{},
"Crypto API stopped during shutdown",
),
};
}
}
if (e instanceof TalerError) {
logger.warn("operation processed resulted in error");
logger.warn(`error was: ${j2s(e.errorDetail)}`);
maybeError = e.errorDetail;
await storePendingTaskError(ws, opId, maybeError!);
return {
type: TaskRunResultType.Error,
errorDetail: e.errorDetail,
};
} else if (e instanceof Error) {
// This is a bug, as we expect pending operations to always
// do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
// or return something.
logger.error(`Uncaught exception: ${e.message}`);
logger.error(`Stack: ${e.stack}`);
maybeError = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
{
stack: e.stack,
},
`unexpected exception (message: ${e.message})`,
);
await storePendingTaskError(ws, opId, maybeError);
return {
type: TaskRunResultType.Error,
errorDetail: maybeError,
};
} else {
logger.error("Uncaught exception, value is not even an error.");
maybeError = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
{},
`unexpected exception (not even an error)`,
);
await storePendingTaskError(ws, opId, maybeError);
return {
type: TaskRunResultType.Error,
errorDetail: maybeError,
};
}
}
}
async function callOperationHandlerForTaskId(
ws: InternalWalletState,
taskId: TaskIdStr,
cancellationToken: CancellationToken,
): Promise {
const pending = parseTaskIdentifier(taskId);
switch (pending.tag) {
case PendingTaskType.ExchangeUpdate:
return await updateExchangeFromUrlHandler(
ws,
pending.exchangeBaseUrl,
cancellationToken,
);
case PendingTaskType.Refresh:
return await processRefreshGroup(
ws,
pending.refreshGroupId,
cancellationToken,
);
case PendingTaskType.Withdraw:
return await processWithdrawalGroup(
ws,
pending.withdrawalGroupId,
cancellationToken,
);
case PendingTaskType.Purchase:
return await processPurchase(ws, pending.proposalId, cancellationToken);
case PendingTaskType.Recoup:
return await processRecoupGroup(
ws,
pending.recoupGroupId,
cancellationToken,
);
case PendingTaskType.Deposit:
return await processDepositGroup(
ws,
pending.depositGroupId,
cancellationToken,
);
case PendingTaskType.Backup:
return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
case PendingTaskType.PeerPushDebit:
return await processPeerPushDebit(
ws,
pending.pursePub,
cancellationToken,
);
case PendingTaskType.PeerPullCredit:
return await processPeerPullCredit(
ws,
pending.pursePub,
cancellationToken,
);
case PendingTaskType.PeerPullDebit:
return await processPeerPullDebit(
ws,
pending.peerPullDebitId,
cancellationToken,
);
case PendingTaskType.PeerPushCredit:
return await processPeerPushCredit(
ws,
pending.peerPushCreditId,
cancellationToken,
);
case PendingTaskType.RewardPickup:
throw Error("not supported anymore");
default:
return assertUnreachable(pending);
}
throw Error(`not reached ${pending.tag}`);
}
/**
* Generate an appropriate error transition notification
* for applicable tasks.
*
* Namely, transition notifications are generated for:
* - exchange update errors
* - transactions
*/
async function taskToRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
const parsedTaskId = parseTaskIdentifier(pendingTaskId);
switch (parsedTaskId.tag) {
case PendingTaskType.ExchangeUpdate:
return makeExchangeRetryNotification(ws, tx, pendingTaskId, e);
case PendingTaskType.PeerPullCredit:
case PendingTaskType.PeerPullDebit:
case PendingTaskType.Withdraw:
case PendingTaskType.PeerPushCredit:
case PendingTaskType.Deposit:
case PendingTaskType.Refresh:
case PendingTaskType.RewardPickup:
case PendingTaskType.PeerPushDebit:
case PendingTaskType.Purchase:
return makeTransactionRetryNotification(ws, tx, pendingTaskId, e);
case PendingTaskType.Backup:
case PendingTaskType.Recoup:
return undefined;
}
}
async function makeTransactionRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
const txId = convertTaskToTransactionId(pendingTaskId);
if (!txId) {
return undefined;
}
const txState = await ws.getTransactionState(ws, tx, txId);
if (!txState) {
return undefined;
}
const notif: WalletNotification = {
type: NotificationType.TransactionStateTransition,
transactionId: txId,
oldTxState: txState,
newTxState: txState,
};
if (e) {
notif.errorInfo = {
code: e.code as number,
hint: e.hint,
};
}
return notif;
}
async function makeExchangeRetryNotification(
ws: InternalWalletState,
tx: WalletDbAllStoresReadOnlyTransaction,
pendingTaskId: string,
e: TalerErrorDetail | undefined,
): Promise {
logger.info("making exchange retry notification");
const parsedTaskId = parseTaskIdentifier(pendingTaskId);
if (parsedTaskId.tag !== PendingTaskType.ExchangeUpdate) {
throw Error("invalid task identifier");
}
const rec = await tx.exchanges.get(parsedTaskId.exchangeBaseUrl);
if (!rec) {
logger.info(`exchange ${parsedTaskId.exchangeBaseUrl} not found`);
return undefined;
}
const notif: WalletNotification = {
type: NotificationType.ExchangeStateTransition,
exchangeBaseUrl: parsedTaskId.exchangeBaseUrl,
oldExchangeState: getExchangeState(rec),
newExchangeState: getExchangeState(rec),
};
if (e) {
notif.errorInfo = {
code: e.code as number,
hint: e.hint,
};
}
return notif;
}
/**
* Convert the task ID for a task that processes a transaction int
* the ID for the transaction.
*/
function convertTaskToTransactionId(
taskId: string,
): TransactionIdStr | undefined {
const parsedTaskId = parseTaskIdentifier(taskId);
switch (parsedTaskId.tag) {
case PendingTaskType.PeerPullCredit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
pursePub: parsedTaskId.pursePub,
});
case PendingTaskType.PeerPullDebit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPullDebit,
peerPullDebitId: parsedTaskId.peerPullDebitId,
});
// FIXME: This doesn't distinguish internal-withdrawal.
// Maybe we should have a different task type for that as well?
// Or maybe transaction IDs should be valid task identifiers?
case PendingTaskType.Withdraw:
return constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId: parsedTaskId.withdrawalGroupId,
});
case PendingTaskType.PeerPushCredit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPushCredit,
peerPushCreditId: parsedTaskId.peerPushCreditId,
});
case PendingTaskType.Deposit:
return constructTransactionIdentifier({
tag: TransactionType.Deposit,
depositGroupId: parsedTaskId.depositGroupId,
});
case PendingTaskType.Refresh:
return constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId: parsedTaskId.refreshGroupId,
});
case PendingTaskType.RewardPickup:
return constructTransactionIdentifier({
tag: TransactionType.Reward,
walletRewardId: parsedTaskId.walletRewardId,
});
case PendingTaskType.PeerPushDebit:
return constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub: parsedTaskId.pursePub,
});
case PendingTaskType.Purchase:
return constructTransactionIdentifier({
tag: TransactionType.Payment,
proposalId: parsedTaskId.proposalId,
});
default:
return undefined;
}
}
export interface ActiveTaskIdsResult {
taskIds: TaskIdStr[];
}
export async function getActiveTaskIds(
ws: InternalWalletState,
): Promise {
const res: ActiveTaskIdsResult = {
taskIds: [],
};
await ws.db.runReadWriteTx(
[
"exchanges",
"refreshGroups",
"withdrawalGroups",
"purchases",
"depositGroups",
"recoupGroups",
"peerPullCredit",
"peerPushDebit",
"peerPullDebit",
"peerPushCredit",
],
async (tx) => {
const active = GlobalIDB.KeyRange.bound(
OPERATION_STATUS_ACTIVE_FIRST,
OPERATION_STATUS_ACTIVE_LAST,
);
// Withdrawals
{
const activeRecs =
await tx.withdrawalGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId: rec.withdrawalGroupId,
});
res.taskIds.push(taskId);
}
}
// Deposits
{
const activeRecs =
await tx.depositGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Deposit,
depositGroupId: rec.depositGroupId,
});
res.taskIds.push(taskId);
}
}
// Refreshes
{
const activeRecs =
await tx.refreshGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Refresh,
refreshGroupId: rec.refreshGroupId,
});
res.taskIds.push(taskId);
}
}
// Purchases
{
const activeRecs = await tx.purchases.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId: rec.proposalId,
});
res.taskIds.push(taskId);
}
}
// peer-push-debit
{
const activeRecs =
await tx.peerPushDebit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub: rec.pursePub,
});
res.taskIds.push(taskId);
}
}
// peer-push-credit
{
const activeRecs =
await tx.peerPushCredit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushCredit,
peerPushCreditId: rec.peerPushCreditId,
});
res.taskIds.push(taskId);
}
}
// peer-pull-debit
{
const activeRecs =
await tx.peerPullDebit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullDebit,
peerPullDebitId: rec.peerPullDebitId,
});
res.taskIds.push(taskId);
}
}
// peer-pull-credit
{
const activeRecs =
await tx.peerPullCredit.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullCredit,
pursePub: rec.pursePub,
});
res.taskIds.push(taskId);
}
}
// recoup
{
const activeRecs =
await tx.recoupGroups.indexes.byStatus.getAll(active);
for (const rec of activeRecs) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Recoup,
recoupGroupId: rec.recoupGroupId,
});
res.taskIds.push(taskId);
}
}
// exchange update
{
const exchanges = await tx.exchanges.getAll();
for (const rec of exchanges) {
const taskIdUpdate = constructTaskIdentifier({
tag: PendingTaskType.ExchangeUpdate,
exchangeBaseUrl: rec.baseUrl,
});
res.taskIds.push(taskIdUpdate);
}
}
// FIXME: Recoup!
},
);
return res;
}