aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/pending.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pending.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts350
1 files changed, 264 insertions, 86 deletions
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
index cc9217d67..6c6546f83 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -34,13 +34,24 @@ import {
WithdrawalGroupStatus,
RewardRecordStatus,
DepositOperationStatus,
+ RefreshGroupRecord,
+ WithdrawalGroupRecord,
+ DepositGroupRecord,
+ RewardRecord,
+ PurchaseRecord,
+ PeerPullPaymentInitiationRecord,
+ PeerPullPaymentIncomingRecord,
+ PeerPushPaymentInitiationRecord,
+ PeerPushPaymentIncomingRecord,
+ RefundGroupRecord,
+ RefundGroupStatus,
} from "../db.js";
import {
PendingOperationsResponse,
PendingTaskType,
TaskId,
} from "../pending-types.js";
-import { AbsoluteTime } from "@gnu-taler/taler-util";
+import { AbsoluteTime, TransactionRecordFilter } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadOnlyAccess } from "../util/query.js";
import { GlobalIDB } from "@gnu-taler/idb-bridge";
@@ -105,6 +116,32 @@ async function gatherExchangePending(
});
}
+/**
+ * Iterate refresh records based on a filter.
+ */
+export async function iterRecordsForRefresh(
+ tx: GetReadOnlyAccess<{
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RefreshGroupRecord) => Promise<void>,
+): Promise<void> {
+ let refreshGroups: RefreshGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ OperationStatusRange.ACTIVE_START,
+ OperationStatusRange.ACTIVE_END,
+ );
+ refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(keyRange);
+ } else {
+ refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll();
+ }
+
+ for (const r of refreshGroups) {
+ await f(r);
+ }
+}
+
async function gatherRefreshPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
@@ -114,22 +151,13 @@ async function gatherRefreshPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- OperationStatusRange.ACTIVE_START,
- OperationStatusRange.ACTIVE_END,
- );
- const refreshGroups = await tx.refreshGroups.indexes.byStatus.getAll(
- keyRange,
- );
- for (const r of refreshGroups) {
+ await iterRecordsForRefresh(tx, { onlyState: "nonfinal" }, async (r) => {
if (r.timestampFinished) {
return;
}
const opId = TaskIdentifiers.forRefresh(r);
const retryRecord = await tx.operationRetries.get(opId);
-
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
-
resp.pendingOperations.push({
type: PendingTaskType.Refresh,
...getPendingCommon(ws, opId, timestampDue),
@@ -140,6 +168,30 @@ async function gatherRefreshPending(
),
retryInfo: retryRecord?.retryInfo,
});
+ });
+}
+
+export async function iterRecordsForWithdrawal(
+ tx: GetReadOnlyAccess<{
+ withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: WithdrawalGroupRecord) => Promise<void>,
+): Promise<void> {
+ let withdrawalGroupRecords: WithdrawalGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ const range = GlobalIDB.KeyRange.bound(
+ WithdrawalGroupStatus.PendingRegisteringBank,
+ WithdrawalGroupStatus.PendingAml,
+ );
+ withdrawalGroupRecords =
+ await tx.withdrawalGroups.indexes.byStatus.getAll(range);
+ } else {
+ withdrawalGroupRecords =
+ await tx.withdrawalGroups.indexes.byStatus.getAll();
+ }
+ for (const wgr of withdrawalGroupRecords) {
+ await f(wgr);
}
}
@@ -153,12 +205,7 @@ async function gatherWithdrawalPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const range = GlobalIDB.KeyRange.bound(
- WithdrawalGroupStatus.PendingRegisteringBank,
- WithdrawalGroupStatus.PendingAml,
- );
- const wsrs = await tx.withdrawalGroups.indexes.byStatus.getAll(range);
- for (const wsr of wsrs) {
+ await iterRecordsForWithdrawal(tx, { onlyState: "nonfinal" }, async (wsr) => {
const opTag = TaskIdentifiers.forWithdrawal(wsr);
let opr = await tx.operationRetries.get(opTag);
const now = AbsoluteTime.now();
@@ -184,6 +231,30 @@ async function gatherWithdrawalPending(
lastError: opr.lastError,
retryInfo: opr.retryInfo,
});
+ });
+}
+
+export async function iterRecordsForDeposit(
+ tx: GetReadOnlyAccess<{
+ depositGroups: typeof WalletStoresV1.depositGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: DepositGroupRecord) => Promise<void>,
+): Promise<void> {
+ let dgs: DepositGroupRecord[];
+ if (filter.onlyState === "nonfinal") {
+ dgs = await tx.depositGroups.indexes.byStatus.getAll(
+ GlobalIDB.KeyRange.bound(
+ DepositOperationStatus.PendingDeposit,
+ DepositOperationStatus.PendingKyc,
+ ),
+ );
+ } else {
+ dgs = await tx.depositGroups.indexes.byStatus.getAll();
+ }
+
+ for (const dg of dgs) {
+ await f(dg);
}
}
@@ -196,16 +267,7 @@ async function gatherDepositPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const dgs = await tx.depositGroups.indexes.byStatus.getAll(
- GlobalIDB.KeyRange.bound(
- DepositOperationStatus.PendingDeposit,
- DepositOperationStatus.PendingKyc,
- ),
- );
- for (const dg of dgs) {
- if (dg.timestampFinished) {
- return;
- }
+ await iterRecordsForDeposit(tx, { onlyState: "nonfinal" }, async (dg) => {
let deposited = true;
for (const d of dg.depositedPerCoin) {
if (!d) {
@@ -226,10 +288,28 @@ async function gatherDepositPending(
lastError: retryRecord?.lastError,
retryInfo: retryRecord?.retryInfo,
});
+ });
+}
+
+export async function iterRecordsForReward(
+ tx: GetReadOnlyAccess<{
+ rewards: typeof WalletStoresV1.rewards;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RewardRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const range = GlobalIDB.KeyRange.bound(
+ RewardRecordStatus.PendingPickup,
+ RewardRecordStatus.PendingPickup,
+ );
+ await tx.rewards.indexes.byStatus.iter(range).forEachAsync(f);
+ } else {
+ await tx.rewards.indexes.byStatus.iter().forEachAsync(f);
}
}
-async function gatherTipPending(
+async function gatherRewardPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
rewards: typeof WalletStoresV1.rewards;
@@ -238,15 +318,7 @@ async function gatherTipPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const range = GlobalIDB.KeyRange.bound(
- RewardRecordStatus.PendingPickup,
- RewardRecordStatus.PendingPickup,
- );
- await tx.rewards.indexes.byStatus.iter(range).forEachAsync(async (tip) => {
- // FIXME: The tip record needs a proper status field!
- if (tip.pickedUpTimestamp) {
- return;
- }
+ await iterRecordsForReward(tx, { onlyState: "nonfinal" }, async (tip) => {
const opId = TaskIdentifiers.forTipPickup(tip);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
@@ -264,6 +336,43 @@ async function gatherTipPending(
});
}
+export async function iterRecordsForRefund(
+ tx: GetReadOnlyAccess<{
+ refundGroups: typeof WalletStoresV1.refundGroups;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: RefundGroupRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.only(
+ RefundGroupStatus.Pending
+ );
+ await tx.refundGroups.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.refundGroups.iter().forEachAsync(f);
+ }
+}
+
+export async function iterRecordsForPurchase(
+ tx: GetReadOnlyAccess<{
+ purchases: typeof WalletStoresV1.purchases;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PurchaseRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PurchaseStatus.PendingDownloadingProposal,
+ PurchaseStatus.PendingAcceptRefund,
+ );
+ await tx.purchases.indexes.byStatus.iter(keyRange).forEachAsync(f);
+ } else {
+ await tx.purchases.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
async function gatherPurchasePending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
@@ -273,27 +382,20 @@ async function gatherPurchasePending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- PurchaseStatus.PendingDownloadingProposal,
- PurchaseStatus.PendingAcceptRefund,
- );
- await tx.purchases.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pr) => {
- const opId = TaskIdentifiers.forPay(pr);
- const retryRecord = await tx.operationRetries.get(opId);
- const timestampDue =
- retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
- resp.pendingOperations.push({
- type: PendingTaskType.Purchase,
- ...getPendingCommon(ws, opId, timestampDue),
- givesLifeness: true,
- statusStr: PurchaseStatus[pr.purchaseStatus],
- proposalId: pr.proposalId,
- retryInfo: retryRecord?.retryInfo,
- lastError: retryRecord?.lastError,
- });
+ await iterRecordsForPurchase(tx, { onlyState: "nonfinal" }, async (pr) => {
+ const opId = TaskIdentifiers.forPay(pr);
+ const retryRecord = await tx.operationRetries.get(opId);
+ const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
+ resp.pendingOperations.push({
+ type: PendingTaskType.Purchase,
+ ...getPendingCommon(ws, opId, timestampDue),
+ givesLifeness: true,
+ statusStr: PurchaseStatus[pr.purchaseStatus],
+ proposalId: pr.proposalId,
+ retryInfo: retryRecord?.retryInfo,
+ lastError: retryRecord?.lastError,
});
+ });
}
async function gatherRecoupPending(
@@ -362,6 +464,26 @@ async function gatherBackupPending(
});
}
+export async function iterRecordsForPeerPullInitiation(
+ tx: GetReadOnlyAccess<{
+ peerPullPaymentInitiations: typeof WalletStoresV1.peerPullPaymentInitiations;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPullPaymentInitiationRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PeerPullPaymentInitiationStatus.PendingCreatePurse,
+ PeerPullPaymentInitiationStatus.AbortingDeletePurse,
+ );
+ await tx.peerPullPaymentInitiations.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.peerPullPaymentInitiations.indexes.byStatus.iter().forEachAsync(f);
+ }
+}
+
async function gatherPeerPullInitiationPending(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
@@ -371,13 +493,10 @@ async function gatherPeerPullInitiationPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- PeerPullPaymentInitiationStatus.PendingCreatePurse,
- PeerPullPaymentInitiationStatus.AbortingDeletePurse,
- );
- await tx.peerPullPaymentInitiations.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pi) => {
+ await iterRecordsForPeerPullInitiation(
+ tx,
+ { onlyState: "nonfinal" },
+ async (pi) => {
const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
@@ -389,7 +508,28 @@ async function gatherPeerPullInitiationPending(
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
});
- });
+ },
+ );
+}
+
+export async function iterRecordsForPeerPullDebit(
+ tx: GetReadOnlyAccess<{
+ peerPullPaymentIncoming: typeof WalletStoresV1.peerPullPaymentIncoming;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPullPaymentIncomingRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PeerPullDebitRecordStatus.PendingDeposit,
+ PeerPullDebitRecordStatus.AbortingRefresh,
+ );
+ await tx.peerPullPaymentIncoming.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.peerPullPaymentIncoming.indexes.byStatus.iter().forEachAsync(f);
+ }
}
async function gatherPeerPullDebitPending(
@@ -401,13 +541,10 @@ async function gatherPeerPullDebitPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- PeerPullDebitRecordStatus.PendingDeposit,
- PeerPullDebitRecordStatus.AbortingRefresh,
- );
- await tx.peerPullPaymentIncoming.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pi) => {
+ await iterRecordsForPeerPullDebit(
+ tx,
+ { onlyState: "nonfinal" },
+ async (pi) => {
const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
@@ -419,7 +556,28 @@ async function gatherPeerPullDebitPending(
retryInfo: retryRecord?.retryInfo,
peerPullPaymentIncomingId: pi.peerPullPaymentIncomingId,
});
- });
+ },
+ );
+}
+
+export async function iterRecordsForPeerPushInitiation(
+ tx: GetReadOnlyAccess<{
+ peerPushPaymentInitiations: typeof WalletStoresV1.peerPushPaymentInitiations;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPushPaymentInitiationRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PeerPushPaymentInitiationStatus.PendingCreatePurse,
+ PeerPushPaymentInitiationStatus.AbortingRefresh,
+ );
+ await tx.peerPushPaymentInitiations.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.peerPushPaymentInitiations.indexes.byStatus.iter().forEachAsync(f);
+ }
}
async function gatherPeerPushInitiationPending(
@@ -431,13 +589,10 @@ async function gatherPeerPushInitiationPending(
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- const keyRange = GlobalIDB.KeyRange.bound(
- PeerPushPaymentInitiationStatus.PendingCreatePurse,
- PeerPushPaymentInitiationStatus.AbortingRefresh,
- );
- await tx.peerPushPaymentInitiations.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pi) => {
+ await iterRecordsForPeerPushInitiation(
+ tx,
+ { onlyState: "nonfinal" },
+ async (pi) => {
const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
@@ -449,7 +604,28 @@ async function gatherPeerPushInitiationPending(
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
});
- });
+ },
+ );
+}
+
+export async function iterRecordsForPeerPushCredit(
+ tx: GetReadOnlyAccess<{
+ peerPushPaymentIncoming: typeof WalletStoresV1.peerPushPaymentIncoming;
+ }>,
+ filter: TransactionRecordFilter,
+ f: (r: PeerPushPaymentIncomingRecord) => Promise<void>,
+): Promise<void> {
+ if (filter.onlyState === "nonfinal") {
+ const keyRange = GlobalIDB.KeyRange.bound(
+ PeerPushPaymentIncomingStatus.PendingMerge,
+ PeerPushPaymentIncomingStatus.PendingWithdrawing,
+ );
+ await tx.peerPushPaymentIncoming.indexes.byStatus
+ .iter(keyRange)
+ .forEachAsync(f);
+ } else {
+ await tx.peerPushPaymentIncoming.indexes.byStatus.iter().forEachAsync(f);
+ }
}
async function gatherPeerPushCreditPending(
@@ -465,9 +641,10 @@ async function gatherPeerPushCreditPending(
PeerPushPaymentIncomingStatus.PendingMerge,
PeerPushPaymentIncomingStatus.PendingWithdrawing,
);
- await tx.peerPushPaymentIncoming.indexes.byStatus
- .iter(keyRange)
- .forEachAsync(async (pi) => {
+ await iterRecordsForPeerPushCredit(
+ tx,
+ { onlyState: "nonfinal" },
+ async (pi) => {
const opId = TaskIdentifiers.forPeerPushCredit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
@@ -479,7 +656,8 @@ async function gatherPeerPushCreditPending(
retryInfo: retryRecord?.retryInfo,
peerPushPaymentIncomingId: pi.peerPushPaymentIncomingId,
});
- });
+ },
+ );
}
export async function getPendingOperations(
@@ -513,7 +691,7 @@ export async function getPendingOperations(
await gatherRefreshPending(ws, tx, now, resp);
await gatherWithdrawalPending(ws, tx, now, resp);
await gatherDepositPending(ws, tx, now, resp);
- await gatherTipPending(ws, tx, now, resp);
+ await gatherRewardPending(ws, tx, now, resp);
await gatherPurchasePending(ws, tx, now, resp);
await gatherRecoupPending(ws, tx, now, resp);
await gatherBackupPending(ws, tx, now, resp);