diff options
author | Florian Dold <florian@dold.me> | 2022-09-05 18:12:30 +0200 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2022-09-13 16:10:41 +0200 |
commit | 13e7a674778754c0ed641dfd428e3d6b2b71ab2d (patch) | |
tree | f2a0e5029305a9b818416fd94908ef77cdd7446f /packages/taler-wallet-core/src/operations/pending.ts | |
parent | f9f2911c761af1c8ed1c323dcd414cbaa9eeae7c (diff) | |
download | wallet-core-13e7a674778754c0ed641dfd428e3d6b2b71ab2d.tar.xz |
wallet-core: uniform retry handling
Diffstat (limited to 'packages/taler-wallet-core/src/operations/pending.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/pending.ts | 170 |
1 files changed, 118 insertions, 52 deletions
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index 5cf3afd4d..7d5a5bfd9 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -36,40 +36,50 @@ import { import { AbsoluteTime } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { GetReadOnlyAccess } from "../util/query.js"; +import { RetryTags } from "../util/retries.js"; +import { Wallet } from "../wallet.js"; async function gatherExchangePending( tx: GetReadOnlyAccess<{ exchanges: typeof WalletStoresV1.exchanges; exchangeDetails: typeof WalletStoresV1.exchangeDetails; + operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.exchanges.iter().forEachAsync(async (e) => { + await tx.exchanges.iter().forEachAsync(async (exch) => { + const opTag = RetryTags.forExchangeUpdate(exch); + let opr = await tx.operationRetries.get(opTag); resp.pendingOperations.push({ type: PendingTaskType.ExchangeUpdate, + id: opTag, givesLifeness: false, timestampDue: - e.retryInfo?.nextRetry ?? AbsoluteTime.fromTimestamp(e.nextUpdate), - exchangeBaseUrl: e.baseUrl, - lastError: e.lastError, + opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate), + exchangeBaseUrl: exch.baseUrl, + lastError: opr?.lastError, }); // We only schedule a check for auto-refresh if the exchange update // was successful. - if (!e.lastError) { + if (!opr?.lastError) { resp.pendingOperations.push({ type: PendingTaskType.ExchangeCheckRefresh, - timestampDue: AbsoluteTime.fromTimestamp(e.nextRefreshCheck), + id: RetryTags.forExchangeCheckRefresh(exch), + timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck), givesLifeness: false, - exchangeBaseUrl: e.baseUrl, + exchangeBaseUrl: exch.baseUrl, }); } }); } async function gatherRefreshPending( - tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>, + tx: GetReadOnlyAccess<{ + refreshGroups: typeof WalletStoresV1.refreshGroups; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { @@ -83,15 +93,19 @@ async function gatherRefreshPending( if (r.frozen) { return; } + const opId = RetryTags.forRefresh(r); + const retryRecord = await tx.operationRetries.get(opId); + resp.pendingOperations.push({ type: PendingTaskType.Refresh, + id: opId, givesLifeness: true, - timestampDue: r.retryInfo.nextRetry, + timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), refreshGroupId: r.refreshGroupId, finishedPerCoin: r.statusPerCoin.map( (x) => x === RefreshCoinStatus.Finished, ), - retryInfo: r.retryInfo, + retryInfo: retryRecord?.retryInfo, }); } } @@ -100,6 +114,7 @@ async function gatherWithdrawalPending( tx: GetReadOnlyAccess<{ withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; planchets: typeof WalletStoresV1.planchets; + operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, @@ -111,54 +126,68 @@ async function gatherWithdrawalPending( if (wsr.timestampFinish) { return; } - let numCoinsWithdrawn = 0; - let numCoinsTotal = 0; - await tx.planchets.indexes.byGroup - .iter(wsr.withdrawalGroupId) - .forEach((x) => { - numCoinsTotal++; - if (x.withdrawalDone) { - numCoinsWithdrawn++; - } - }); + const opTag = RetryTags.forWithdrawal(wsr); + let opr = await tx.operationRetries.get(opTag); + const now = AbsoluteTime.now(); + if (!opr) { + opr = { + id: opTag, + retryInfo: { + firstTry: now, + nextRetry: now, + retryCounter: 0, + }, + }; + } resp.pendingOperations.push({ type: PendingTaskType.Withdraw, + id: opTag, givesLifeness: true, - timestampDue: wsr.retryInfo?.nextRetry ?? AbsoluteTime.now(), + timestampDue: opr.retryInfo?.nextRetry ?? AbsoluteTime.now(), withdrawalGroupId: wsr.withdrawalGroupId, - lastError: wsr.lastError, - retryInfo: wsr.retryInfo, + lastError: opr.lastError, + retryInfo: opr.retryInfo, }); } } async function gatherProposalPending( - tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>, + tx: GetReadOnlyAccess<{ + proposals: typeof WalletStoresV1.proposals; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.proposals.iter().forEach((proposal) => { + await tx.proposals.iter().forEachAsync(async (proposal) => { if (proposal.proposalStatus == ProposalStatus.Proposed) { // Nothing to do, user needs to choose. } else if (proposal.proposalStatus == ProposalStatus.Downloading) { - const timestampDue = proposal.retryInfo?.nextRetry ?? AbsoluteTime.now(); + const opId = RetryTags.forProposalClaim(proposal); + const retryRecord = await tx.operationRetries.get(opId); + const timestampDue = + retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.ProposalDownload, + id: opId, givesLifeness: true, timestampDue, merchantBaseUrl: proposal.merchantBaseUrl, orderId: proposal.orderId, proposalId: proposal.proposalId, proposalTimestamp: proposal.timestamp, - lastError: proposal.lastError, - retryInfo: proposal.retryInfo, + lastError: retryRecord?.lastError, + retryInfo: retryRecord?.retryInfo, }); } }); } async function gatherDepositPending( - tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>, + tx: GetReadOnlyAccess<{ + depositGroups: typeof WalletStoresV1.depositGroups; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { @@ -169,32 +198,42 @@ async function gatherDepositPending( if (dg.timestampFinished) { return; } - const timestampDue = dg.retryInfo?.nextRetry ?? AbsoluteTime.now(); + const opId = RetryTags.forDeposit(dg); + const retryRecord = await tx.operationRetries.get(opId); + const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Deposit, + id: opId, givesLifeness: true, timestampDue, depositGroupId: dg.depositGroupId, - lastError: dg.lastError, - retryInfo: dg.retryInfo, + lastError: retryRecord?.lastError, + retryInfo: retryRecord?.retryInfo, }); } } async function gatherTipPending( - tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>, + tx: GetReadOnlyAccess<{ + tips: typeof WalletStoresV1.tips; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.tips.iter().forEach((tip) => { + await tx.tips.iter().forEachAsync(async (tip) => { + // FIXME: The tip record needs a proper status field! if (tip.pickedUpTimestamp) { return; } + const opId = RetryTags.forTipPickup(tip); + const retryRecord = await tx.operationRetries.get(opId); if (tip.acceptedTimestamp) { resp.pendingOperations.push({ type: PendingTaskType.TipPickup, + id: opId, givesLifeness: true, - timestampDue: tip.retryInfo.nextRetry, + timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), merchantBaseUrl: tip.merchantBaseUrl, tipId: tip.walletTipId, merchantTipId: tip.merchantTipId, @@ -204,56 +243,77 @@ async function gatherTipPending( } async function gatherPurchasePending( - tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>, + tx: GetReadOnlyAccess<{ + purchases: typeof WalletStoresV1.purchases; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.purchases.iter().forEach((pr) => { + // FIXME: Only iter purchases with some "active" flag! + await tx.purchases.iter().forEachAsync(async (pr) => { if ( pr.paymentSubmitPending && pr.abortStatus === AbortStatus.None && !pr.payFrozen ) { - const timestampDue = pr.payRetryInfo?.nextRetry ?? AbsoluteTime.now(); + const payOpId = RetryTags.forPay(pr); + const payRetryRecord = await tx.operationRetries.get(payOpId); + + const timestampDue = + payRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Pay, + id: payOpId, givesLifeness: true, timestampDue, isReplay: false, proposalId: pr.proposalId, - retryInfo: pr.payRetryInfo, - lastError: pr.lastPayError, + retryInfo: payRetryRecord?.retryInfo, + lastError: payRetryRecord?.lastError, }); } if (pr.refundQueryRequested) { + const refundQueryOpId = RetryTags.forRefundQuery(pr); + const refundQueryRetryRecord = await tx.operationRetries.get( + refundQueryOpId, + ); resp.pendingOperations.push({ type: PendingTaskType.RefundQuery, + id: refundQueryOpId, givesLifeness: true, - timestampDue: pr.refundStatusRetryInfo.nextRetry, + timestampDue: + refundQueryRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), proposalId: pr.proposalId, - retryInfo: pr.refundStatusRetryInfo, - lastError: pr.lastRefundStatusError, + retryInfo: refundQueryRetryRecord?.retryInfo, + lastError: refundQueryRetryRecord?.lastError, }); } }); } async function gatherRecoupPending( - tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>, + tx: GetReadOnlyAccess<{ + recoupGroups: typeof WalletStoresV1.recoupGroups; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.recoupGroups.iter().forEach((rg) => { + await tx.recoupGroups.iter().forEachAsync(async (rg) => { if (rg.timestampFinished) { return; } + const opId = RetryTags.forRecoup(rg); + const retryRecord = await tx.operationRetries.get(opId); resp.pendingOperations.push({ type: PendingTaskType.Recoup, + id: opId, givesLifeness: true, - timestampDue: rg.retryInfo.nextRetry, + timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), recoupGroupId: rg.recoupGroupId, - retryInfo: rg.retryInfo, - lastError: rg.lastError, + retryInfo: retryRecord?.retryInfo, + lastError: retryRecord?.lastError, }); }); } @@ -261,14 +321,18 @@ async function gatherRecoupPending( async function gatherBackupPending( tx: GetReadOnlyAccess<{ backupProviders: typeof WalletStoresV1.backupProviders; + operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.backupProviders.iter().forEach((bp) => { + await tx.backupProviders.iter().forEachAsync(async (bp) => { + const opId = RetryTags.forBackup(bp); + const retryRecord = await tx.operationRetries.get(opId); if (bp.state.tag === BackupProviderStateTag.Ready) { resp.pendingOperations.push({ type: PendingTaskType.Backup, + id: opId, givesLifeness: false, timestampDue: AbsoluteTime.fromTimestamp(bp.state.nextBackupTimestamp), backupProviderBaseUrl: bp.baseUrl, @@ -277,11 +341,12 @@ async function gatherBackupPending( } else if (bp.state.tag === BackupProviderStateTag.Retrying) { resp.pendingOperations.push({ type: PendingTaskType.Backup, + id: opId, givesLifeness: false, - timestampDue: bp.state.retryInfo.nextRetry, + timestampDue: retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(), backupProviderBaseUrl: bp.baseUrl, - retryInfo: bp.state.retryInfo, - lastError: bp.state.lastError, + retryInfo: retryRecord?.retryInfo, + lastError: retryRecord?.lastError, }); } }); @@ -305,6 +370,7 @@ export async function getPendingOperations( planchets: x.planchets, depositGroups: x.depositGroups, recoupGroups: x.recoupGroups, + operationRetries: x.operationRetries, })) .runReadWrite(async (tx) => { const resp: PendingOperationsResponse = { |