aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations/pending.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2022-09-05 18:12:30 +0200
committerFlorian Dold <florian@dold.me>2022-09-13 16:10:41 +0200
commit13e7a674778754c0ed641dfd428e3d6b2b71ab2d (patch)
treef2a0e5029305a9b818416fd94908ef77cdd7446f /packages/taler-wallet-core/src/operations/pending.ts
parentf9f2911c761af1c8ed1c323dcd414cbaa9eeae7c (diff)
downloadwallet-core-13e7a674778754c0ed641dfd428e3d6b2b71ab2d.tar.xz
wallet-core: uniform retry handling
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pending.ts')
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts170
1 files changed, 118 insertions, 52 deletions
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
index 5cf3afd4d..7d5a5bfd9 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -36,40 +36,50 @@ import {
import { AbsoluteTime } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadOnlyAccess } from "../util/query.js";
+import { RetryTags } from "../util/retries.js";
+import { Wallet } from "../wallet.js";
async function gatherExchangePending(
tx: GetReadOnlyAccess<{
exchanges: typeof WalletStoresV1.exchanges;
exchangeDetails: typeof WalletStoresV1.exchangeDetails;
+ operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.exchanges.iter().forEachAsync(async (e) => {
+ await tx.exchanges.iter().forEachAsync(async (exch) => {
+ const opTag = RetryTags.forExchangeUpdate(exch);
+ let opr = await tx.operationRetries.get(opTag);
resp.pendingOperations.push({
type: PendingTaskType.ExchangeUpdate,
+ id: opTag,
givesLifeness: false,
timestampDue:
- e.retryInfo?.nextRetry ?? AbsoluteTime.fromTimestamp(e.nextUpdate),
- exchangeBaseUrl: e.baseUrl,
- lastError: e.lastError,
+ opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate),
+ exchangeBaseUrl: exch.baseUrl,
+ lastError: opr?.lastError,
});
// We only schedule a check for auto-refresh if the exchange update
// was successful.
- if (!e.lastError) {
+ if (!opr?.lastError) {
resp.pendingOperations.push({
type: PendingTaskType.ExchangeCheckRefresh,
- timestampDue: AbsoluteTime.fromTimestamp(e.nextRefreshCheck),
+ id: RetryTags.forExchangeCheckRefresh(exch),
+ timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck),
givesLifeness: false,
- exchangeBaseUrl: e.baseUrl,
+ exchangeBaseUrl: exch.baseUrl,
});
}
});
}
async function gatherRefreshPending(
- tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>,
+ tx: GetReadOnlyAccess<{
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
@@ -83,15 +93,19 @@ async function gatherRefreshPending(
if (r.frozen) {
return;
}
+ const opId = RetryTags.forRefresh(r);
+ const retryRecord = await tx.operationRetries.get(opId);
+
resp.pendingOperations.push({
type: PendingTaskType.Refresh,
+ id: opId,
givesLifeness: true,
- timestampDue: r.retryInfo.nextRetry,
+ timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
refreshGroupId: r.refreshGroupId,
finishedPerCoin: r.statusPerCoin.map(
(x) => x === RefreshCoinStatus.Finished,
),
- retryInfo: r.retryInfo,
+ retryInfo: retryRecord?.retryInfo,
});
}
}
@@ -100,6 +114,7 @@ async function gatherWithdrawalPending(
tx: GetReadOnlyAccess<{
withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
planchets: typeof WalletStoresV1.planchets;
+ operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
@@ -111,54 +126,68 @@ async function gatherWithdrawalPending(
if (wsr.timestampFinish) {
return;
}
- let numCoinsWithdrawn = 0;
- let numCoinsTotal = 0;
- await tx.planchets.indexes.byGroup
- .iter(wsr.withdrawalGroupId)
- .forEach((x) => {
- numCoinsTotal++;
- if (x.withdrawalDone) {
- numCoinsWithdrawn++;
- }
- });
+ const opTag = RetryTags.forWithdrawal(wsr);
+ let opr = await tx.operationRetries.get(opTag);
+ const now = AbsoluteTime.now();
+ if (!opr) {
+ opr = {
+ id: opTag,
+ retryInfo: {
+ firstTry: now,
+ nextRetry: now,
+ retryCounter: 0,
+ },
+ };
+ }
resp.pendingOperations.push({
type: PendingTaskType.Withdraw,
+ id: opTag,
givesLifeness: true,
- timestampDue: wsr.retryInfo?.nextRetry ?? AbsoluteTime.now(),
+ timestampDue: opr.retryInfo?.nextRetry ?? AbsoluteTime.now(),
withdrawalGroupId: wsr.withdrawalGroupId,
- lastError: wsr.lastError,
- retryInfo: wsr.retryInfo,
+ lastError: opr.lastError,
+ retryInfo: opr.retryInfo,
});
}
}
async function gatherProposalPending(
- tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>,
+ tx: GetReadOnlyAccess<{
+ proposals: typeof WalletStoresV1.proposals;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.proposals.iter().forEach((proposal) => {
+ await tx.proposals.iter().forEachAsync(async (proposal) => {
if (proposal.proposalStatus == ProposalStatus.Proposed) {
// Nothing to do, user needs to choose.
} else if (proposal.proposalStatus == ProposalStatus.Downloading) {
- const timestampDue = proposal.retryInfo?.nextRetry ?? AbsoluteTime.now();
+ const opId = RetryTags.forProposalClaim(proposal);
+ const retryRecord = await tx.operationRetries.get(opId);
+ const timestampDue =
+ retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.ProposalDownload,
+ id: opId,
givesLifeness: true,
timestampDue,
merchantBaseUrl: proposal.merchantBaseUrl,
orderId: proposal.orderId,
proposalId: proposal.proposalId,
proposalTimestamp: proposal.timestamp,
- lastError: proposal.lastError,
- retryInfo: proposal.retryInfo,
+ lastError: retryRecord?.lastError,
+ retryInfo: retryRecord?.retryInfo,
});
}
});
}
async function gatherDepositPending(
- tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>,
+ tx: GetReadOnlyAccess<{
+ depositGroups: typeof WalletStoresV1.depositGroups;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
@@ -169,32 +198,42 @@ async function gatherDepositPending(
if (dg.timestampFinished) {
return;
}
- const timestampDue = dg.retryInfo?.nextRetry ?? AbsoluteTime.now();
+ const opId = RetryTags.forDeposit(dg);
+ const retryRecord = await tx.operationRetries.get(opId);
+ const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.Deposit,
+ id: opId,
givesLifeness: true,
timestampDue,
depositGroupId: dg.depositGroupId,
- lastError: dg.lastError,
- retryInfo: dg.retryInfo,
+ lastError: retryRecord?.lastError,
+ retryInfo: retryRecord?.retryInfo,
});
}
}
async function gatherTipPending(
- tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>,
+ tx: GetReadOnlyAccess<{
+ tips: typeof WalletStoresV1.tips;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.tips.iter().forEach((tip) => {
+ await tx.tips.iter().forEachAsync(async (tip) => {
+ // FIXME: The tip record needs a proper status field!
if (tip.pickedUpTimestamp) {
return;
}
+ const opId = RetryTags.forTipPickup(tip);
+ const retryRecord = await tx.operationRetries.get(opId);
if (tip.acceptedTimestamp) {
resp.pendingOperations.push({
type: PendingTaskType.TipPickup,
+ id: opId,
givesLifeness: true,
- timestampDue: tip.retryInfo.nextRetry,
+ timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
merchantBaseUrl: tip.merchantBaseUrl,
tipId: tip.walletTipId,
merchantTipId: tip.merchantTipId,
@@ -204,56 +243,77 @@ async function gatherTipPending(
}
async function gatherPurchasePending(
- tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>,
+ tx: GetReadOnlyAccess<{
+ purchases: typeof WalletStoresV1.purchases;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.purchases.iter().forEach((pr) => {
+ // FIXME: Only iter purchases with some "active" flag!
+ await tx.purchases.iter().forEachAsync(async (pr) => {
if (
pr.paymentSubmitPending &&
pr.abortStatus === AbortStatus.None &&
!pr.payFrozen
) {
- const timestampDue = pr.payRetryInfo?.nextRetry ?? AbsoluteTime.now();
+ const payOpId = RetryTags.forPay(pr);
+ const payRetryRecord = await tx.operationRetries.get(payOpId);
+
+ const timestampDue =
+ payRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.Pay,
+ id: payOpId,
givesLifeness: true,
timestampDue,
isReplay: false,
proposalId: pr.proposalId,
- retryInfo: pr.payRetryInfo,
- lastError: pr.lastPayError,
+ retryInfo: payRetryRecord?.retryInfo,
+ lastError: payRetryRecord?.lastError,
});
}
if (pr.refundQueryRequested) {
+ const refundQueryOpId = RetryTags.forRefundQuery(pr);
+ const refundQueryRetryRecord = await tx.operationRetries.get(
+ refundQueryOpId,
+ );
resp.pendingOperations.push({
type: PendingTaskType.RefundQuery,
+ id: refundQueryOpId,
givesLifeness: true,
- timestampDue: pr.refundStatusRetryInfo.nextRetry,
+ timestampDue:
+ refundQueryRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
proposalId: pr.proposalId,
- retryInfo: pr.refundStatusRetryInfo,
- lastError: pr.lastRefundStatusError,
+ retryInfo: refundQueryRetryRecord?.retryInfo,
+ lastError: refundQueryRetryRecord?.lastError,
});
}
});
}
async function gatherRecoupPending(
- tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>,
+ tx: GetReadOnlyAccess<{
+ recoupGroups: typeof WalletStoresV1.recoupGroups;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.recoupGroups.iter().forEach((rg) => {
+ await tx.recoupGroups.iter().forEachAsync(async (rg) => {
if (rg.timestampFinished) {
return;
}
+ const opId = RetryTags.forRecoup(rg);
+ const retryRecord = await tx.operationRetries.get(opId);
resp.pendingOperations.push({
type: PendingTaskType.Recoup,
+ id: opId,
givesLifeness: true,
- timestampDue: rg.retryInfo.nextRetry,
+ timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
recoupGroupId: rg.recoupGroupId,
- retryInfo: rg.retryInfo,
- lastError: rg.lastError,
+ retryInfo: retryRecord?.retryInfo,
+ lastError: retryRecord?.lastError,
});
});
}
@@ -261,14 +321,18 @@ async function gatherRecoupPending(
async function gatherBackupPending(
tx: GetReadOnlyAccess<{
backupProviders: typeof WalletStoresV1.backupProviders;
+ operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.backupProviders.iter().forEach((bp) => {
+ await tx.backupProviders.iter().forEachAsync(async (bp) => {
+ const opId = RetryTags.forBackup(bp);
+ const retryRecord = await tx.operationRetries.get(opId);
if (bp.state.tag === BackupProviderStateTag.Ready) {
resp.pendingOperations.push({
type: PendingTaskType.Backup,
+ id: opId,
givesLifeness: false,
timestampDue: AbsoluteTime.fromTimestamp(bp.state.nextBackupTimestamp),
backupProviderBaseUrl: bp.baseUrl,
@@ -277,11 +341,12 @@ async function gatherBackupPending(
} else if (bp.state.tag === BackupProviderStateTag.Retrying) {
resp.pendingOperations.push({
type: PendingTaskType.Backup,
+ id: opId,
givesLifeness: false,
- timestampDue: bp.state.retryInfo.nextRetry,
+ timestampDue: retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(),
backupProviderBaseUrl: bp.baseUrl,
- retryInfo: bp.state.retryInfo,
- lastError: bp.state.lastError,
+ retryInfo: retryRecord?.retryInfo,
+ lastError: retryRecord?.lastError,
});
}
});
@@ -305,6 +370,7 @@ export async function getPendingOperations(
planchets: x.planchets,
depositGroups: x.depositGroups,
recoupGroups: x.recoupGroups,
+ operationRetries: x.operationRetries,
}))
.runReadWrite(async (tx) => {
const resp: PendingOperationsResponse = {