diff options
author | Florian Dold <florian@dold.me> | 2022-09-05 18:12:30 +0200 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2022-09-13 16:10:41 +0200 |
commit | 13e7a674778754c0ed641dfd428e3d6b2b71ab2d (patch) | |
tree | f2a0e5029305a9b818416fd94908ef77cdd7446f | |
parent | f9f2911c761af1c8ed1c323dcd414cbaa9eeae7c (diff) |
wallet-core: uniform retry handling
19 files changed, 1016 insertions, 1272 deletions
diff --git a/packages/taler-util/src/time.ts b/packages/taler-util/src/time.ts index 9c25af400..1e79b943b 100644 --- a/packages/taler-util/src/time.ts +++ b/packages/taler-util/src/time.ts @@ -100,6 +100,10 @@ export namespace Duration { return durationMin(d1, d2); } + export function multiply(d1: Duration, n: number): Duration { + return durationMul(d1, n); + } + export function toIntegerYears(d: Duration): number { if (typeof d.d_ms !== "number") { throw Error("infinite duration"); @@ -357,7 +361,6 @@ export const codecForAbsoluteTime: Codec<AbsoluteTime> = { }, }; - export const codecForTimestamp: Codec<TalerProtocolTimestamp> = { decode(x: any, c?: Context): TalerProtocolTimestamp { // Compatibility, should be removed soon. diff --git a/packages/taler-util/src/walletTypes.ts b/packages/taler-util/src/walletTypes.ts index a993f29a0..c10e3be40 100644 --- a/packages/taler-util/src/walletTypes.ts +++ b/packages/taler-util/src/walletTypes.ts @@ -32,7 +32,12 @@ import { codecForAmountJson, codecForAmountString, } from "./amounts.js"; -import { AbsoluteTime, codecForAbsoluteTime, codecForTimestamp, TalerProtocolTimestamp } from "./time.js"; +import { + AbsoluteTime, + codecForAbsoluteTime, + codecForTimestamp, + TalerProtocolTimestamp, +} from "./time.js"; import { buildCodecForObject, codecForString, @@ -797,46 +802,43 @@ const codecForExchangeTos = (): Codec<ExchangeTos> => .property("content", codecOptional(codecForString())) .build("ExchangeTos"); -export const codecForFeeDescriptionPair = - (): Codec<FeeDescriptionPair> => - buildCodecForObject<FeeDescriptionPair>() - .property("value", codecForAmountJson()) - .property("from", codecForAbsoluteTime) - .property("until", codecForAbsoluteTime) - .property("left", codecOptional(codecForAmountJson())) - .property("right", codecOptional(codecForAmountJson())) - .build("FeeDescriptionPair"); - -export const codecForFeeDescription = - (): Codec<FeeDescription> => - buildCodecForObject<FeeDescription>() - .property("value", codecForAmountJson()) - .property("from", codecForAbsoluteTime) - .property("until", codecForAbsoluteTime) - .property("fee", codecOptional(codecForAmountJson())) - .build("FeeDescription"); - - -export const codecForFeesByOperations = - (): Codec<OperationMap<FeeDescription[]>> => - buildCodecForObject<OperationMap<FeeDescription[]>>() - .property("deposit", codecForList(codecForFeeDescription())) - .property("withdraw", codecForList(codecForFeeDescription())) - .property("refresh", codecForList(codecForFeeDescription())) - .property("refund", codecForList(codecForFeeDescription())) - .build("FeesByOperations"); - -export const codecForExchangeFullDetails = - (): Codec<ExchangeFullDetails> => - buildCodecForObject<ExchangeFullDetails>() - .property("currency", codecForString()) - .property("exchangeBaseUrl", codecForString()) - .property("paytoUris", codecForList(codecForString())) - .property("tos", codecForExchangeTos()) - .property("auditors", codecForList(codecForExchangeAuditor())) - .property("wireInfo", codecForWireInfo()) - .property("feesDescription", codecForFeesByOperations()) - .build("ExchangeFullDetails"); +export const codecForFeeDescriptionPair = (): Codec<FeeDescriptionPair> => + buildCodecForObject<FeeDescriptionPair>() + .property("value", codecForAmountJson()) + .property("from", codecForAbsoluteTime) + .property("until", codecForAbsoluteTime) + .property("left", codecOptional(codecForAmountJson())) + .property("right", codecOptional(codecForAmountJson())) + .build("FeeDescriptionPair"); + +export const codecForFeeDescription = (): Codec<FeeDescription> => + buildCodecForObject<FeeDescription>() + .property("value", codecForAmountJson()) + .property("from", codecForAbsoluteTime) + .property("until", codecForAbsoluteTime) + .property("fee", codecOptional(codecForAmountJson())) + .build("FeeDescription"); + +export const codecForFeesByOperations = (): Codec< + OperationMap<FeeDescription[]> +> => + buildCodecForObject<OperationMap<FeeDescription[]>>() + .property("deposit", codecForList(codecForFeeDescription())) + .property("withdraw", codecForList(codecForFeeDescription())) + .property("refresh", codecForList(codecForFeeDescription())) + .property("refund", codecForList(codecForFeeDescription())) + .build("FeesByOperations"); + +export const codecForExchangeFullDetails = (): Codec<ExchangeFullDetails> => + buildCodecForObject<ExchangeFullDetails>() + .property("currency", codecForString()) + .property("exchangeBaseUrl", codecForString()) + .property("paytoUris", codecForList(codecForString())) + .property("tos", codecForExchangeTos()) + .property("auditors", codecForList(codecForExchangeAuditor())) + .property("wireInfo", codecForWireInfo()) + .property("feesDescription", codecForFeesByOperations()) + .build("ExchangeFullDetails"); export const codecForExchangeListItem = (): Codec<ExchangeListItem> => buildCodecForObject<ExchangeListItem>() diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index 9d41f2114..1052e302d 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -361,14 +361,14 @@ export interface ExchangeDetailsRecord { * Terms of service text or undefined if not downloaded yet. * * This is just used as a cache of the last downloaded ToS. - * + * * FIXME: Put in separate object store! */ termsOfServiceText: string | undefined; /** * content-type of the last downloaded termsOfServiceText. - * + * * * FIXME: Put in separate object store! */ termsOfServiceContentType: string | undefined; @@ -455,17 +455,6 @@ export interface ExchangeRecord { nextRefreshCheck: TalerProtocolTimestamp; /** - * Last error (if any) for fetching updated information about the - * exchange. - */ - lastError?: TalerErrorDetail; - - /** - * Retry status for fetching updated information about the exchange. - */ - retryInfo?: RetryInfo; - - /** * Public key of the reserve that we're currently using for * receiving P2P payments. */ @@ -734,24 +723,12 @@ export interface ProposalRecord { * Session ID we got when downloading the contract. */ downloadSessionId?: string; - - /** - * Retry info, even present when the operation isn't active to allow indexing - * on the next retry timestamp. - * - * FIXME: Clarify what we even retry. - */ - retryInfo?: RetryInfo; - - lastError: TalerErrorDetail | undefined; } /** * Status of a tip we got from a merchant. */ export interface TipRecord { - lastError: TalerErrorDetail | undefined; - /** * Has the user accepted the tip? Only after the tip has been accepted coins * withdrawn from the tip may be used. @@ -810,12 +787,6 @@ export interface TipRecord { * from the merchant. */ pickedUpTimestamp: TalerProtocolTimestamp | undefined; - - /** - * Retry info, even present when the operation isn't active to allow indexing - * on the next retry timestamp. - */ - retryInfo: RetryInfo; } export enum RefreshCoinStatus { @@ -837,16 +808,7 @@ export enum OperationStatus { export interface RefreshGroupRecord { operationStatus: OperationStatus; - /** - * Retry info, even present when the operation isn't active to allow indexing - * on the next retry timestamp. - * - * FIXME: No, this can be optional, indexing is still possible - */ - retryInfo: RetryInfo; - - lastError: TalerErrorDetail | undefined; - + // FIXME: Put this into a different object store? lastErrorPerCoin: { [coinIndex: number]: TalerErrorDetail }; /** @@ -1117,6 +1079,8 @@ export interface PurchaseRecord { /** * Pending refunds for the purchase. A refund is pending * when the merchant reports a transient error from the exchange. + * + * FIXME: Put this into a separate object store? */ refunds: { [refundKey: string]: WalletRefundItem }; @@ -1132,6 +1096,7 @@ export interface PurchaseRecord { lastSessionId: string | undefined; /** + * Do we still need to post the deposit permissions to the merchant? * Set for the first payment, or on re-plays. */ paymentSubmitPending: boolean; @@ -1142,22 +1107,6 @@ export interface PurchaseRecord { */ refundQueryRequested: boolean; - abortStatus: AbortStatus; - - payRetryInfo?: RetryInfo; - - lastPayError: TalerErrorDetail | undefined; - - /** - * Retry information for querying the refund status with the merchant. - */ - refundStatusRetryInfo: RetryInfo; - - /** - * Last error (or undefined) for querying the refund status with the merchant. - */ - lastRefundStatusError: TalerErrorDetail | undefined; - /** * Continue querying the refund status until this deadline has expired. */ @@ -1174,6 +1123,11 @@ export interface PurchaseRecord { * an error where it doesn't make sense to retry. */ payFrozen?: boolean; + + /** + * FIXME: How does this interact with payFrozen? + */ + abortStatus: AbortStatus; } export const WALLET_BACKUP_STATE_KEY = "walletBackupState"; @@ -1184,9 +1138,9 @@ export const WALLET_BACKUP_STATE_KEY = "walletBackupState"; */ export type ConfigRecord = | { - key: typeof WALLET_BACKUP_STATE_KEY; - value: WalletBackupConfState; - } + key: typeof WALLET_BACKUP_STATE_KEY; + value: WalletBackupConfState; + } | { key: "currencyDefaultsApplied"; value: boolean }; export interface WalletBackupConfState { @@ -1368,13 +1322,6 @@ export interface WithdrawalGroupRecord { * FIXME: Should this not also include a timestamp for more logical merging? */ denomSelUid: string; - - /** - * Retry info. - */ - retryInfo?: RetryInfo; - - lastError: TalerErrorDetail | undefined; } export interface BankWithdrawUriRecord { @@ -1432,16 +1379,6 @@ export interface RecoupGroupRecord { * after all individual recoups are done. */ scheduleRefreshCoins: string[]; - - /** - * Retry info. - */ - retryInfo: RetryInfo; - - /** - * Last error that occurred, if any. - */ - lastError: TalerErrorDetail | undefined; } export enum BackupProviderStateTag { @@ -1452,17 +1389,15 @@ export enum BackupProviderStateTag { export type BackupProviderState = | { - tag: BackupProviderStateTag.Provisional; - } + tag: BackupProviderStateTag.Provisional; + } | { - tag: BackupProviderStateTag.Ready; - nextBackupTimestamp: TalerProtocolTimestamp; - } + tag: BackupProviderStateTag.Ready; + nextBackupTimestamp: TalerProtocolTimestamp; + } | { - tag: BackupProviderStateTag.Retrying; - retryInfo: RetryInfo; - lastError?: TalerErrorDetail; - }; + tag: BackupProviderStateTag.Retrying; + }; export interface BackupProviderTerms { supportedProtocolVersion: string; @@ -1573,13 +1508,6 @@ export interface DepositGroupRecord { timestampFinished: TalerProtocolTimestamp | undefined; operationStatus: OperationStatus; - - lastError: TalerErrorDetail | undefined; - - /** - * Retry info. - */ - retryInfo?: RetryInfo; } /** @@ -1749,6 +1677,60 @@ export interface ReserveRecord { reservePriv: string; } +export interface OperationRetryRecord { + /** + * Unique identifier for the operation. Typically of + * the format `${opType}-${opUniqueKey}` + */ + id: string; + + lastError?: TalerErrorDetail; + + retryInfo: RetryInfo; +} + +export enum OperationAttemptResultType { + Finished = "finished", + Pending = "pending", + Error = "error", + Longpoll = "longpoll", +} + +// FIXME: not part of DB! +export type OperationAttemptResult<TSuccess = unknown, TPending = unknown> = + | OperationAttemptFinishedResult<TSuccess> + | OperationAttemptErrorResult + | OperationAttemptLongpollResult + | OperationAttemptPendingResult<TPending>; + +export namespace OperationAttemptResult { + export function finishedEmpty(): OperationAttemptResult<unknown, unknown> { + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; + } +} + +export interface OperationAttemptFinishedResult<T> { + type: OperationAttemptResultType.Finished; + result: T; +} + +export interface OperationAttemptPendingResult<T> { + type: OperationAttemptResultType.Pending; + result: T; +} + +export interface OperationAttemptErrorResult { + type: OperationAttemptResultType.Error; + errorDetail: TalerErrorDetail; +} + +export interface OperationAttemptLongpollResult { + type: OperationAttemptResultType.Longpoll; +} + export const WalletStoresV1 = { coins: describeStore( describeContents<CoinRecord>("coins", { @@ -1913,6 +1895,12 @@ export const WalletStoresV1 = { describeContents<TombstoneRecord>("tombstones", { keyPath: "id" }), {}, ), + operationRetries: describeStore( + describeContents<OperationRetryRecord>("operationRetries", { + keyPath: "id", + }), + {}, + ), ghostDepositGroups: describeStore( describeContents<GhostDepositGroupRecord>("ghostDepositGroups", { keyPath: "contractTermsHash", diff --git a/packages/taler-wallet-core/src/operations/backup/import.ts b/packages/taler-wallet-core/src/operations/backup/import.ts index ff7ff0d03..e8683265b 100644 --- a/packages/taler-wallet-core/src/operations/backup/import.ts +++ b/packages/taler-wallet-core/src/operations/backup/import.ts @@ -274,7 +274,6 @@ export async function importBackup( protocolVersionRange: backupExchange.protocol_version_range, }, permanent: true, - retryInfo: RetryInfo.reset(), lastUpdate: undefined, nextUpdate: TalerProtocolTimestamp.now(), nextRefreshCheck: TalerProtocolTimestamp.now(), @@ -341,7 +340,7 @@ export async function importBackup( } const denomPubHash = cryptoComp.rsaDenomPubToHash[ - backupDenomination.denom_pub.rsa_public_key + backupDenomination.denom_pub.rsa_public_key ]; checkLogicInvariant(!!denomPubHash); const existingDenom = await tx.denominations.get([ @@ -426,7 +425,6 @@ export async function importBackup( } } - // FIXME: import reserves with new schema // for (const backupReserve of backupExchangeDetails.reserves) { @@ -517,7 +515,6 @@ export async function importBackup( // } // } // } - } for (const backupProposal of backupBlob.proposals) { @@ -560,7 +557,7 @@ export async function importBackup( const amount = Amounts.parseOrThrow(parsedContractTerms.amount); const contractTermsHash = cryptoComp.proposalIdToContractTermsHash[ - backupProposal.proposal_id + backupProposal.proposal_id ]; let maxWireFee: AmountJson; if (parsedContractTerms.max_wire_fee) { @@ -611,7 +608,6 @@ export async function importBackup( } await tx.proposals.put({ claimToken: backupProposal.claim_token, - lastError: undefined, merchantBaseUrl: backupProposal.merchant_base_url, timestamp: backupProposal.timestamp, orderId: backupProposal.order_id, @@ -620,7 +616,6 @@ export async function importBackup( cryptoComp.proposalNoncePrivToPub[backupProposal.nonce_priv], proposalId: backupProposal.proposal_id, repurchaseProposalId: backupProposal.repurchase_proposal_id, - retryInfo: RetryInfo.reset(), download, proposalStatus, }); @@ -706,7 +701,7 @@ export async function importBackup( const amount = Amounts.parseOrThrow(parsedContractTerms.amount); const contractTermsHash = cryptoComp.proposalIdToContractTermsHash[ - backupPurchase.proposal_id + backupPurchase.proposal_id ]; let maxWireFee: AmountJson; if (parsedContractTerms.max_wire_fee) { @@ -755,10 +750,7 @@ export async function importBackup( noncePriv: backupPurchase.nonce_priv, noncePub: cryptoComp.proposalNoncePrivToPub[backupPurchase.nonce_priv], - lastPayError: undefined, autoRefundDeadline: TalerProtocolTimestamp.never(), - refundStatusRetryInfo: RetryInfo.reset(), - lastRefundStatusError: undefined, refundAwaiting: undefined, timestampAccept: backupPurchase.timestamp_accept, timestampFirstSuccessfulPay: @@ -767,8 +759,6 @@ export async function importBackup( merchantPaySig: backupPurchase.merchant_pay_sig, lastSessionId: undefined, abortStatus, - // FIXME! - payRetryInfo: RetryInfo.reset(), download, paymentSubmitPending: !backupPurchase.timestamp_first_successful_pay, @@ -851,7 +841,6 @@ export async function importBackup( timestampCreated: backupRefreshGroup.timestamp_created, refreshGroupId: backupRefreshGroup.refresh_group_id, reason, - lastError: undefined, lastErrorPerCoin: {}, oldCoinPubs: backupRefreshGroup.old_coins.map((x) => x.coin_pub), statusPerCoin: backupRefreshGroup.old_coins.map((x) => @@ -869,7 +858,6 @@ export async function importBackup( Amounts.parseOrThrow(x.estimated_output_amount), ), refreshSessionPerCoin, - retryInfo: RetryInfo.reset(), }); } } @@ -891,11 +879,9 @@ export async function importBackup( createdTimestamp: backupTip.timestamp_created, denomsSel, exchangeBaseUrl: backupTip.exchange_base_url, - lastError: undefined, merchantBaseUrl: backupTip.exchange_base_url, merchantTipId: backupTip.merchant_tip_id, pickedUpTimestamp: backupTip.timestamp_finished, - retryInfo: RetryInfo.reset(), secretSeed: backupTip.secret_seed, tipAmountEffective: denomsSel.totalCoinValue, tipAmountRaw: Amounts.parseOrThrow(backupTip.tip_amount_raw), diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts index 45b8491df..56871104c 100644 --- a/packages/taler-wallet-core/src/operations/backup/index.ts +++ b/packages/taler-wallet-core/src/operations/backup/index.ts @@ -25,9 +25,12 @@ * Imports. */ import { - AbsoluteTime, AmountString, + AbsoluteTime, + AmountString, BackupRecovery, - buildCodecForObject, bytesToString, canonicalizeBaseUrl, + buildCodecForObject, + bytesToString, + canonicalizeBaseUrl, canonicalJson, Codec, codecForAmountString, @@ -36,19 +39,32 @@ import { codecForNumber, codecForString, codecOptional, - ConfirmPayResultType, decodeCrock, DenomKeyType, - durationFromSpec, eddsaGetPublic, + ConfirmPayResultType, + decodeCrock, + DenomKeyType, + durationFromSpec, + eddsaGetPublic, EddsaKeyPair, encodeCrock, getRandomBytes, - hash, hashDenomPub, + hash, + hashDenomPub, HttpStatusCode, - j2s, kdf, Logger, + j2s, + kdf, + Logger, notEmpty, PreparePayResultType, RecoveryLoadRequest, - RecoveryMergeStrategy, rsaBlind, secretbox, secretbox_open, stringToBytes, TalerErrorDetail, TalerProtocolTimestamp, URL, - WalletBackupContentV1 + RecoveryMergeStrategy, + rsaBlind, + secretbox, + secretbox_open, + stringToBytes, + TalerErrorDetail, + TalerProtocolTimestamp, + URL, + WalletBackupContentV1, } from "@gnu-taler/taler-util"; import { gunzipSync, gzipSync } from "fflate"; import { TalerCryptoInterface } from "../../crypto/cryptoImplementation.js"; @@ -58,26 +74,28 @@ import { BackupProviderStateTag, BackupProviderTerms, ConfigRecord, + OperationAttemptResult, + OperationAttemptResultType, WalletBackupConfState, WalletStoresV1, - WALLET_BACKUP_STATE_KEY + WALLET_BACKUP_STATE_KEY, } from "../../db.js"; import { InternalWalletState } from "../../internal-wallet-state.js"; import { readSuccessResponseJsonOrThrow, - readTalerErrorResponse + readTalerErrorResponse, } from "../../util/http.js"; import { checkDbInvariant, - checkLogicInvariant + checkLogicInvariant, } from "../../util/invariants.js"; import { GetReadWriteAccess } from "../../util/query.js"; -import { RetryInfo } from "../../util/retries.js"; +import { RetryInfo, RetryTags, scheduleRetryInTx } from "../../util/retries.js"; import { guardOperationException } from "../common.js"; import { checkPaymentByProposalId, confirmPay, - preparePayForUri + preparePayForUri, } from "../pay.js"; import { exportBackup } from "./export.js"; import { BackupCryptoPrecomputedData, importBackup } from "./import.js"; @@ -244,8 +262,7 @@ function getNextBackupTimestamp(): TalerProtocolTimestamp { async function runBackupCycleForProvider( ws: InternalWalletState, args: BackupForProviderArgs, -): Promise<void> { - +): Promise<OperationAttemptResult> { const provider = await ws.db .mktx((x) => ({ backupProviders: x.backupProviders })) .runReadOnly(async (tx) => { @@ -254,7 +271,10 @@ async function runBackupCycleForProvider( if (!provider) { logger.warn("provider disappeared"); - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } const backupJson = await exportBackup(ws); @@ -292,8 +312,8 @@ async function runBackupCycleForProvider( "if-none-match": newHash, ...(provider.lastBackupHash ? { - "if-match": provider.lastBackupHash, - } + "if-match": provider.lastBackupHash, + } : {}), }, }); @@ -315,7 +335,10 @@ async function runBackupCycleForProvider( }; await tx.backupProvider.put(prov); }); - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } if (resp.status === HttpStatusCode.PaymentRequired) { @@ -344,7 +367,10 @@ async function runBackupCycleForProvider( // FIXME: check if the provider is overcharging us! await ws.db - .mktx((x) => ({ backupProviders: x.backupProviders })) + .mktx((x) => ({ + backupProviders: x.backupProviders, + operationRetries: x.operationRetries, + })) .runReadWrite(async (tx) => { const provRec = await tx.backupProviders.get(provider.baseUrl); checkDbInvariant(!!provRec); @@ -354,11 +380,8 @@ async function runBackupCycleForProvider( provRec.currentPaymentProposalId = proposalId; // FIXME: allocate error code for this! await tx.backupProviders.put(provRec); - await incrementBackupRetryInTx( - tx, - args.backupProviderBaseUrl, - undefined, - ); + const opId = RetryTags.forBackup(provRec); + await scheduleRetryInTx(ws, tx, opId); }); if (doPay) { @@ -371,12 +394,15 @@ async function runBackupCycleForProvider( } if (args.retryAfterPayment) { - await runBackupCycleForProvider(ws, { + return await runBackupCycleForProvider(ws, { ...args, retryAfterPayment: false, }); } - return; + return { + type: OperationAttemptResultType.Pending, + result: undefined, + }; } if (resp.status === HttpStatusCode.NoContent) { @@ -395,7 +421,10 @@ async function runBackupCycleForProvider( }; await tx.backupProviders.put(prov); }); - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } if (resp.status === HttpStatusCode.Conflict) { @@ -406,7 +435,10 @@ async function runBackupCycleForProvider( const cryptoData = await computeBackupCryptoData(ws.cryptoApi, blob); await importBackup(ws, blob, cryptoData); await ws.db - .mktx((x) => ({ backupProvider: x.backupProviders })) + .mktx((x) => ({ + backupProvider: x.backupProviders, + operationRetries: x.operationRetries, + })) .runReadWrite(async (tx) => { const prov = await tx.backupProvider.get(provider.baseUrl); if (!prov) { @@ -414,20 +446,21 @@ async function runBackupCycleForProvider( return; } prov.lastBackupHash = encodeCrock(hash(backupEnc)); - // FIXME: Allocate error code for this situation? + // FIXME: Allocate error code for this situation? + // FIXME: Add operation retry record! + const opId = RetryTags.forBackup(prov); + await scheduleRetryInTx(ws, tx, opId); prov.state = { tag: BackupProviderStateTag.Retrying, - retryInfo: RetryInfo.reset(), }; await tx.backupProvider.put(prov); }); logger.info("processed existing backup"); // Now upload our own, merged backup. - await runBackupCycleForProvider(ws, { + return await runBackupCycleForProvider(ws, { ...args, retryAfterPayment: false, }); - return; } // Some other response that we did not expect! @@ -436,53 +469,16 @@ async function runBackupCycleForProvider( const err = await readTalerErrorResponse(resp); logger.error(`got error response from backup provider: ${j2s(err)}`); - await ws.db - .mktx((x) => ({ backupProviders: x.backupProviders })) - .runReadWrite(async (tx) => { - incrementBackupRetryInTx(tx, args.backupProviderBaseUrl, err); - }); -} - -async function incrementBackupRetryInTx( - tx: GetReadWriteAccess<{ - backupProviders: typeof WalletStoresV1.backupProviders; - }>, - backupProviderBaseUrl: string, - err: TalerErrorDetail | undefined, -): Promise<void> { - const pr = await tx.backupProviders.get(backupProviderBaseUrl); - if (!pr) { - return; - } - if (pr.state.tag === BackupProviderStateTag.Retrying) { - pr.state.lastError = err; - pr.state.retryInfo = RetryInfo.increment(pr.state.retryInfo); - } else if (pr.state.tag === BackupProviderStateTag.Ready) { - pr.state = { - tag: BackupProviderStateTag.Retrying, - retryInfo: RetryInfo.reset(), - lastError: err, - }; - } - await tx.backupProviders.put(pr); -} - -async function incrementBackupRetry( - ws: InternalWalletState, - backupProviderBaseUrl: string, - err: TalerErrorDetail | undefined, -): Promise<void> { - await ws.db - .mktx((x) => ({ backupProviders: x.backupProviders })) - .runReadWrite(async (tx) => - incrementBackupRetryInTx(tx, backupProviderBaseUrl, err), - ); + return { + type: OperationAttemptResultType.Error, + errorDetail: err, + }; } export async function processBackupForProvider( ws: InternalWalletState, backupProviderBaseUrl: string, -): Promise<void> { +): Promise<OperationAttemptResult> { const provider = await ws.db .mktx((x) => ({ backupProviders: x.backupProviders })) .runReadOnly(async (tx) => { @@ -492,17 +488,10 @@ export async function processBackupForProvider( throw Error("unknown backup provider"); } - const onOpErr = (err: TalerErrorDetail): Promise<void> => - incrementBackupRetry(ws, backupProviderBaseUrl, err); - - const run = async () => { - await runBackupCycleForProvider(ws, { - backupProviderBaseUrl: provider.baseUrl, - retryAfterPayment: true, - }); - }; - - await guardOperationException(run, onOpErr); + return await runBackupCycleForProvider(ws, { + backupProviderBaseUrl: provider.baseUrl, + retryAfterPayment: true, + }); } export interface RemoveBackupProviderRequest { @@ -818,24 +807,34 @@ export async function getBackupInfo( ): Promise<BackupInfo> { const backupConfig = await provideBackupState(ws); const providerRecords = await ws.db - .mktx((x) => ({ backupProviders: x.backupProviders })) + .mktx((x) => ({ + backupProviders: x.backupProviders, + operationRetries: x.operationRetries, + })) .runReadOnly(async (tx) => { - return await tx.backupProviders.iter().toArray(); + return await tx.backupProviders.iter().mapAsync(async (bp) => { + const opId = RetryTags.forBackup(bp); + const retryRecord = await tx.operationRetries.get(opId); + return { + provider: bp, + retryRecord, + }; + }); }); const providers: ProviderInfo[] = []; for (const x of providerRecords) { providers.push({ - active: x.state.tag !== BackupProviderStateTag.Provisional, - syncProviderBaseUrl: x.baseUrl, - lastSuccessfulBackupTimestamp: x.lastBackupCycleTimestamp, - paymentProposalIds: x.paymentProposalIds, + active: x.provider.state.tag !== BackupProviderStateTag.Provisional, + syncProviderBaseUrl: x.provider.baseUrl, + lastSuccessfulBackupTimestamp: x.provider.lastBackupCycleTimestamp, + paymentProposalIds: x.provider.paymentProposalIds, lastError: - x.state.tag === BackupProviderStateTag.Retrying - ? x.state.lastError + x.provider.state.tag === BackupProviderStateTag.Retrying + ? x.retryRecord?.lastError : undefined, - paymentStatus: await getProviderPaymentInfo(ws, x), - terms: x.terms, - name: x.name, + paymentStatus: await getProviderPaymentInfo(ws, x.provider), + terms: x.provider.terms, + name: x.provider.name, }); } return { diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts index 734bc4c2b..6eed12a38 100644 --- a/packages/taler-wallet-core/src/operations/deposits.ts +++ b/packages/taler-wallet-core/src/operations/deposits.ts @@ -44,7 +44,12 @@ import { TrackDepositGroupResponse, URL, } from "@gnu-taler/taler-util"; -import { DepositGroupRecord, OperationStatus } from "../db.js"; +import { + DepositGroupRecord, + OperationAttemptErrorResult, + OperationAttemptResult, + OperationStatus, +} from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { selectPayCoins } from "../util/coinSelection.js"; import { readSuccessResponseJsonOrThrow } from "../util/http.js"; @@ -67,91 +72,16 @@ import { getTotalRefreshCost } from "./refresh.js"; const logger = new Logger("deposits.ts"); /** - * Set up the retry timeout for a deposit group. - */ -async function setupDepositGroupRetry( - ws: InternalWalletState, - depositGroupId: string, - options: { - resetRetry: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ - depositGroups: x.depositGroups, - })) - .runReadWrite(async (tx) => { - const x = await tx.depositGroups.get(depositGroupId); - if (!x) { - return; - } - if (options.resetRetry) { - x.retryInfo = RetryInfo.reset(); - } else { - x.retryInfo = RetryInfo.increment(x.retryInfo); - } - delete x.lastError; - await tx.depositGroups.put(x); - }); -} - -/** - * Report an error that occurred while processing the deposit group. - */ -async function reportDepositGroupError( - ws: InternalWalletState, - depositGroupId: string, - err: TalerErrorDetail, -): Promise<void> { - await ws.db - .mktx((x) => ({ depositGroups: x.depositGroups })) - .runReadWrite(async (tx) => { - const r = await tx.depositGroups.get(depositGroupId); - if (!r) { - return; - } - if (!r.retryInfo) { - logger.error( - `deposit group record (${depositGroupId}) reports error, but no retry active`, - ); - return; - } - r.lastError = err; - await tx.depositGroups.put(r); - }); - ws.notify({ type: NotificationType.DepositOperationError, error: err }); -} - -export async function processDepositGroup( - ws: InternalWalletState, - depositGroupId: string, - options: { - forceNow?: boolean; - cancellationToken?: CancellationToken; - } = {}, -): Promise<void> { - const onOpErr = (err: TalerErrorDetail): Promise<void> => - reportDepositGroupError(ws, depositGroupId, err); - return await guardOperationException( - async () => await processDepositGroupImpl(ws, depositGroupId, options), - onOpErr, - ); -} - -/** * @see {processDepositGroup} */ -async function processDepositGroupImpl( +export async function processDepositGroup( ws: InternalWalletState, depositGroupId: string, options: { forceNow?: boolean; cancellationToken?: CancellationToken; } = {}, -): Promise<void> { - const forceNow = options.forceNow ?? false; - await setupDepositGroupRetry(ws, depositGroupId, { resetRetry: forceNow }); - +): Promise<OperationAttemptResult> { const depositGroup = await ws.db .mktx((x) => ({ depositGroups: x.depositGroups, @@ -161,11 +91,11 @@ async function processDepositGroupImpl( }); if (!depositGroup) { logger.warn(`deposit group ${depositGroupId} not found`); - return; + return OperationAttemptResult.finishedEmpty(); } if (depositGroup.timestampFinished) { logger.trace(`deposit group ${depositGroupId} already finished`); - return; + return OperationAttemptResult.finishedEmpty(); } const contractData = extractContractData( @@ -240,11 +170,10 @@ async function processDepositGroupImpl( if (allDeposited) { dg.timestampFinished = TalerProtocolTimestamp.now(); dg.operationStatus = OperationStatus.Finished; - delete dg.lastError; - delete dg.retryInfo; await tx.depositGroups.put(dg); } }); + return OperationAttemptResult.finishedEmpty(); } export async function trackDepositGroup( @@ -338,9 +267,9 @@ export async function getFeeForDeposit( const csr: CoinSelectionRequest = { allowedAuditors: [], - allowedExchanges: Object.values(exchangeInfos).map(v => ({ + allowedExchanges: Object.values(exchangeInfos).map((v) => ({ exchangeBaseUrl: v.url, - exchangePub: v.master_pub + exchangePub: v.master_pub, })), amount: Amounts.parseOrThrow(req.amount), maxDepositFee: Amounts.parseOrThrow(req.amount), @@ -383,7 +312,6 @@ export async function prepareDepositGroup( } const amount = Amounts.parseOrThrow(req.amount); - const exchangeInfos: { url: string; master_pub: string }[] = []; await ws.db @@ -473,7 +401,7 @@ export async function prepareDepositGroup( payCoinSel, ); - return { totalDepositCost, effectiveDepositAmount } + return { totalDepositCost, effectiveDepositAmount }; } export async function createDepositGroup( ws: InternalWalletState, @@ -600,9 +528,7 @@ export async function createDepositGroup( payto_uri: req.depositPaytoUri, salt: wireSalt, }, - retryInfo: RetryInfo.reset(), operationStatus: OperationStatus.Pending, - lastError: undefined, }; await ws.db diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts index b75bdfd74..1021da6b6 100644 --- a/packages/taler-wallet-core/src/operations/exchanges.ts +++ b/packages/taler-wallet-core/src/operations/exchanges.ts @@ -53,6 +53,8 @@ import { DenominationVerificationStatus, ExchangeDetailsRecord, ExchangeRecord, + OperationAttemptResult, + OperationAttemptResultType, WalletStoresV1, } from "../db.js"; import { TalerError } from "../errors.js"; @@ -64,7 +66,7 @@ import { readSuccessResponseTextOrThrow, } from "../util/http.js"; import { DbAccess, GetReadOnlyAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; +import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js"; import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js"; import { guardOperationException } from "./common.js"; @@ -102,51 +104,6 @@ function denominationRecordFromKeys( return d; } -async function reportExchangeUpdateError( - ws: InternalWalletState, - baseUrl: string, - err: TalerErrorDetail, -): Promise<void> { - await ws.db - .mktx((x) => ({ exchanges: x.exchanges })) - .runReadWrite(async (tx) => { - const exchange = await tx.exchanges.get(baseUrl); - if (!exchange) { - return; - } - if (!exchange.retryInfo) { - logger.reportBreak(); - } - exchange.lastError = err; - await tx.exchanges.put(exchange); - }); - ws.notify({ type: NotificationType.ExchangeOperationError, error: err }); -} - -async function setupExchangeUpdateRetry( - ws: InternalWalletState, - baseUrl: string, - options: { - reset: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ exchanges: x.exchanges })) - .runReadWrite(async (tx) => { - const exchange = await tx.exchanges.get(baseUrl); - if (!exchange) { - return; - } - if (options.reset) { - exchange.retryInfo = RetryInfo.reset(); - } else { - exchange.retryInfo = RetryInfo.increment(exchange.retryInfo); - } - delete exchange.lastError; - await tx.exchanges.put(exchange); - }); -} - export function getExchangeRequestTimeout(): Duration { return Duration.fromSpec({ seconds: 5, @@ -360,25 +317,6 @@ async function downloadExchangeWireInfo( return wireInfo; } -export async function updateExchangeFromUrl( - ws: InternalWalletState, - baseUrl: string, - options: { - forceNow?: boolean; - cancellationToken?: CancellationToken; - } = {}, -): Promise<{ - exchange: ExchangeRecord; - exchangeDetails: ExchangeDetailsRecord; -}> { - const onOpErr = (e: TalerErrorDetail): Promise<void> => - reportExchangeUpdateError(ws, baseUrl, e); - return await guardOperationException( - () => updateExchangeFromUrlImpl(ws, baseUrl, options), - onOpErr, - ); -} - async function provideExchangeRecord( ws: InternalWalletState, baseUrl: string, @@ -398,7 +336,6 @@ async function provideExchangeRecord( const r: ExchangeRecord = { permanent: true, baseUrl: baseUrl, - retryInfo: RetryInfo.reset(), detailsPointer: undefined, lastUpdate: undefined, nextUpdate: AbsoluteTime.toTimestamp(now), @@ -530,25 +467,42 @@ export async function downloadTosFromAcceptedFormat( ); } +export async function updateExchangeFromUrl( + ws: InternalWalletState, + baseUrl: string, + options: { + forceNow?: boolean; + cancellationToken?: CancellationToken; + } = {}, +): Promise<{ + exchange: ExchangeRecord; + exchangeDetails: ExchangeDetailsRecord; +}> { + return runOperationHandlerForResult( + await updateExchangeFromUrlHandler(ws, baseUrl, options), + ); +} + /** * Update or add exchange DB entry by fetching the /keys and /wire information. * Optionally link the reserve entry to the new or existing * exchange entry in then DB. */ -async function updateExchangeFromUrlImpl( +export async function updateExchangeFromUrlHandler( ws: InternalWalletState, baseUrl: string, options: { forceNow?: boolean; cancellationToken?: CancellationToken; } = {}, -): Promise<{ - exchange: ExchangeRecord; - exchangeDetails: ExchangeDetailsRecord; -}> { +): Promise< + OperationAttemptResult<{ + exchange: ExchangeRecord; + exchangeDetails: ExchangeDetailsRecord; + }> +> { const forceNow = options.forceNow ?? false; logger.info(`updating exchange info for ${baseUrl}, forced: ${forceNow}`); - await setupExchangeUpdateRetry(ws, baseUrl, { reset: forceNow }); const now = AbsoluteTime.now(); baseUrl = canonicalizeBaseUrl(baseUrl); @@ -565,7 +519,10 @@ async function updateExchangeFromUrlImpl( !AbsoluteTime.isExpired(AbsoluteTime.fromTimestamp(exchange.nextUpdate)) ) { logger.info("using existing exchange info"); - return { exchange, exchangeDetails }; + return { + type: OperationAttemptResultType.Finished, + result: { exchange, exchangeDetails }, + }; } logger.info("updating exchange /keys info"); @@ -649,8 +606,6 @@ async function updateExchangeFromUrlImpl( termsOfServiceAcceptedTimestamp: TalerProtocolTimestamp.now(), }; // FIXME: only update if pointer got updated - delete r.lastError; - delete r.retryInfo; r.lastUpdate = TalerProtocolTimestamp.now(); r.nextUpdate = keysInfo.expiry; // New denominations might be available. @@ -771,8 +726,11 @@ async function updateExchangeFromUrlImpl( type: NotificationType.ExchangeAdded, }); return { - exchange: updated.exchange, - exchangeDetails: updated.exchangeDetails, + type: OperationAttemptResultType.Finished, + result: { + exchange: updated.exchange, + exchangeDetails: updated.exchangeDetails, + }, }; } diff --git a/packages/taler-wallet-core/src/operations/pay.ts b/packages/taler-wallet-core/src/operations/pay.ts index 3d4d2b5a0..9e63cd516 100644 --- a/packages/taler-wallet-core/src/operations/pay.ts +++ b/packages/taler-wallet-core/src/operations/pay.ts @@ -37,9 +37,6 @@ import { ContractTerms, ContractTermsUtil, Duration, - durationMax, - durationMin, - durationMul, encodeCrock, ForcedCoinSel, getRandomBytes, @@ -59,10 +56,7 @@ import { TransactionType, URL, } from "@gnu-taler/taler-util"; -import { - EXCHANGE_COINS_LOCK, - InternalWalletState, -} from "../internal-wallet-state.js"; +import { EddsaKeypair } from "../crypto/cryptoImplementation.js"; import { AbortStatus, AllowedAuditorInfo, @@ -71,6 +65,8 @@ import { CoinRecord, CoinStatus, DenominationRecord, + OperationAttemptResult, + OperationAttemptResultType, ProposalRecord, ProposalStatus, PurchaseRecord, @@ -83,6 +79,11 @@ import { TalerError, } from "../errors.js"; import { + EXCHANGE_COINS_LOCK, + InternalWalletState, +} from "../internal-wallet-state.js"; +import { assertUnreachable } from "../util/assertUnreachable.js"; +import { AvailableCoinInfo, CoinCandidateSelection, PreviousPayCoins, @@ -98,11 +99,9 @@ import { throwUnexpectedRequestError, } from "../util/http.js"; import { GetReadWriteAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; +import { RetryInfo, RetryTags, scheduleRetry } from "../util/retries.js"; import { getExchangeDetails } from "./exchanges.js"; import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js"; -import { guardOperationException } from "./common.js"; -import { EddsaKeypair } from "../crypto/cryptoImplementation.js"; /** * Logger. @@ -448,10 +447,6 @@ async function recordConfirmPay( timestampAccept: AbsoluteTime.toTimestamp(AbsoluteTime.now()), timestampLastRefundStatus: undefined, proposalId: proposal.proposalId, - lastPayError: undefined, - lastRefundStatusError: undefined, - payRetryInfo: RetryInfo.reset(), - refundStatusRetryInfo: RetryInfo.reset(), refundQueryRequested: false, timestampFirstSuccessfulPay: undefined, autoRefundDeadline: undefined, @@ -475,8 +470,6 @@ async function recordConfirmPay( const p = await tx.proposals.get(proposal.proposalId); if (p) { p.proposalStatus = ProposalStatus.Accepted; - delete p.lastError; - delete p.retryInfo; await tx.proposals.put(p); } await tx.purchases.put(t); @@ -490,117 +483,6 @@ async function recordConfirmPay( return t; } -async function reportProposalError( - ws: InternalWalletState, - proposalId: string, - err: TalerErrorDetail, -): Promise<void> { - await ws.db - .mktx((x) => ({ proposals: x.proposals })) - .runReadWrite(async (tx) => { - const pr = await tx.proposals.get(proposalId); - if (!pr) { - return; - } - if (!pr.retryInfo) { - logger.error( - `Asked to report an error for a proposal (${proposalId}) that is not active (no retryInfo)`, - ); - logger.reportBreak(); - return; - } - pr.lastError = err; - await tx.proposals.put(pr); - }); - ws.notify({ type: NotificationType.ProposalOperationError, error: err }); -} - -async function setupProposalRetry( - ws: InternalWalletState, - proposalId: string, - options: { - reset: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ proposals: x.proposals })) - .runReadWrite(async (tx) => { - const pr = await tx.proposals.get(proposalId); - if (!pr) { - return; - } - if (options.reset) { - pr.retryInfo = RetryInfo.reset(); - } else { - pr.retryInfo = RetryInfo.increment(pr.retryInfo); - } - delete pr.lastError; - await tx.proposals.put(pr); - }); -} - -async function setupPurchasePayRetry( - ws: InternalWalletState, - proposalId: string, - options: { - reset: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ purchases: x.purchases })) - .runReadWrite(async (tx) => { - const p = await tx.purchases.get(proposalId); - if (!p) { - return; - } - if (options.reset) { - p.payRetryInfo = RetryInfo.reset(); - } else { - p.payRetryInfo = RetryInfo.increment(p.payRetryInfo); - } - delete p.lastPayError; - await tx.purchases.put(p); - }); -} - -async function reportPurchasePayError( - ws: InternalWalletState, - proposalId: string, - err: TalerErrorDetail, -): Promise<void> { - await ws.db - .mktx((x) => ({ purchases: x.purchases })) - .runReadWrite(async (tx) => { - const pr = await tx.purchases.get(proposalId); - if (!pr) { - return; - } - if (!pr.payRetryInfo) { - logger.error( - `purchase record (${proposalId}) reports error, but no retry active`, - ); - } - pr.lastPayError = err; - await tx.purchases.put(pr); - }); - ws.notify({ type: NotificationType.PayOperationError, error: err }); -} - -export async function processDownloadProposal( - ws: InternalWalletState, - proposalId: string, - options: { - forceNow?: boolean; - } = {}, -): Promise<void> { - const onOpErr = (err: TalerErrorDetail): Promise<void> => - reportProposalError(ws, proposalId, err); - await guardOperationException( - () => processDownloadProposalImpl(ws, proposalId, options), - onOpErr, - ); -} - async function failProposalPermanently( ws: InternalWalletState, proposalId: string, @@ -613,23 +495,21 @@ async function failProposalPermanently( if (!p) { return; } - delete p.retryInfo; - p.lastError = err; p.proposalStatus = ProposalStatus.PermanentlyFailed; await tx.proposals.put(p); }); } -function getProposalRequestTimeout(proposal: ProposalRecord): Duration { +function getProposalRequestTimeout(retryInfo?: RetryInfo): Duration { return Duration.clamp({ lower: Duration.fromSpec({ seconds: 1 }), upper: Duration.fromSpec({ seconds: 60 }), - value: RetryInfo.getDuration(proposal.retryInfo), + value: retryInfo ? RetryInfo.getDuration(retryInfo) : Duration.fromSpec({}), }); } function getPayRequestTimeout(purchase: PurchaseRecord): Duration { - return durationMul( + return Duration.multiply( { d_ms: 15000 }, 1 + purchase.payCoinSelection.coinPubs.length / 5, ); @@ -682,15 +562,13 @@ export function extractContractData( }; } -async function processDownloadProposalImpl( +export async function processDownloadProposal( ws: InternalWalletState, proposalId: string, - options: { - forceNow?: boolean; - } = {}, -): Promise<void> { - const forceNow = options.forceNow ?? false; - await setupProposalRetry(ws, proposalId, { reset: forceNow }); + options: {} = {}, +): Promise<OperationAttemptResult> { + + const res = ws.db.mktx2((x) => [x.auditorTrust, x.coins]) const proposal = await ws.db .mktx((x) => ({ proposals: x.proposals })) @@ -699,11 +577,17 @@ async function processDownloadProposalImpl( }); if (!proposal) { - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } if (proposal.proposalStatus != ProposalStatus.Downloading) { - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } const orderClaimUrl = new URL( @@ -722,8 +606,16 @@ async function processDownloadProposalImpl( requestBody.token = proposal.claimToken; } + const opId = RetryTags.forProposalClaim(proposal); + const retryRecord = await ws.db + .mktx((x) => ({ operationRetries: x.operationRetries })) + .runReadOnly(async (tx) => { + return tx.operationRetries.get(opId); + }); + + // FIXME: Do this in the background using the new return value const httpResponse = await ws.http.postJson(orderClaimUrl, requestBody, { - timeout: getProposalRequestTimeout(proposal), + timeout: getProposalRequestTimeout(retryRecord?.retryInfo), }); const r = await readSuccessResponseJsonOrErrorCode( httpResponse, @@ -892,6 +784,11 @@ async function processDownloadProposalImpl( type: NotificationType.ProposalDownloaded, proposalId: proposal.proposalId, }); + + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } /** @@ -954,8 +851,6 @@ async function startDownloadProposal( proposalId: proposalId, proposalStatus: ProposalStatus.Downloading, repurchaseProposalId: undefined, - retryInfo: RetryInfo.reset(), - lastError: undefined, downloadSessionId: sessionId, }; @@ -1000,17 +895,13 @@ async function storeFirstPaySuccess( } purchase.timestampFirstSuccessfulPay = now; purchase.paymentSubmitPending = false; - purchase.lastPayError = undefined; purchase.lastSessionId = sessionId; - purchase.payRetryInfo = RetryInfo.reset(); purchase.merchantPaySig = paySig; const protoAr = purchase.download.contractData.autoRefund; if (protoAr) { const ar = Duration.fromTalerProtocolDuration(protoAr); logger.info("auto_refund present"); purchase.refundQueryRequested = true; - purchase.refundStatusRetryInfo = RetryInfo.reset(); - purchase.lastRefundStatusError = undefined; purchase.autoRefundDeadline = AbsoluteTime.toTimestamp( AbsoluteTime.addDuration(AbsoluteTime.now(), ar), ); @@ -1038,8 +929,6 @@ async function storePayReplaySuccess( throw Error("invalid payment state"); } purchase.paymentSubmitPending = false; - purchase.lastPayError = undefined; - purchase.payRetryInfo = RetryInfo.reset(); purchase.lastSessionId = sessionId; await tx.purchases.put(purchase); }); @@ -1298,7 +1187,8 @@ export async function checkPaymentByProposalId( await tx.purchases.put(p); }); const r = await processPurchasePay(ws, proposalId, { forceNow: true }); - if (r.type !== ConfirmPayResultType.Done) { + if (r.type !== OperationAttemptResultType.Finished) { + // FIXME: This does not surface the original error throw Error("submitting pay failed"); } return { @@ -1458,6 +1348,45 @@ export async function generateDepositPermissions( } /** + * Run the operation handler for a payment + * and return the result as a {@link ConfirmPayResult}. + */ +export async function runPayForConfirmPay( + ws: InternalWalletState, + proposalId: string, +): Promise<ConfirmPayResult> { + const res = await processPurchasePay(ws, proposalId, { forceNow: true }); + switch (res.type) { + case OperationAttemptResultType.Finished: { + const purchase = await ws.db + .mktx((x) => ({ purchases: x.purchases })) + .runReadOnly(async (tx) => { + return tx.purchases.get(proposalId); + }); + if (!purchase?.download) { + throw Error("purchase record not available anymore"); + } + return { + type: ConfirmPayResultType.Done, + contractTerms: purchase.download.contractTermsRaw, + }; + } + case OperationAttemptResultType.Error: + // FIXME: allocate error code! + throw Error("payment failed"); + case OperationAttemptResultType.Pending: + return { + type: ConfirmPayResultType.Pending, + lastError: undefined, + }; + case OperationAttemptResultType.Longpoll: + throw Error("unexpected processPurchasePay result (longpoll)"); + default: + assertUnreachable(res); + } +} + +/** * Add a contract to the wallet and sign coins, and send them. */ export async function confirmPay( @@ -1503,7 +1432,7 @@ export async function confirmPay( if (existingPurchase) { logger.trace("confirmPay: submitting payment for existing purchase"); - return await processPurchasePay(ws, proposalId, { forceNow: true }); + return runPayForConfirmPay(ws, proposalId); } logger.trace("confirmPay: purchase record does not exist yet"); @@ -1559,6 +1488,7 @@ export async function confirmPay( res, d.contractData, ); + await recordConfirmPay( ws, proposal, @@ -1567,7 +1497,7 @@ export async function confirmPay( sessionIdOverride, ); - return await processPurchasePay(ws, proposalId, { forceNow: true }); + return runPayForConfirmPay(ws, proposalId); } export async function processPurchasePay( @@ -1576,24 +1506,7 @@ export async function processPurchasePay( options: { forceNow?: boolean; } = {}, -): Promise<ConfirmPayResult> { - const onOpErr = (e: TalerErrorDetail): Promise<void> => - reportPurchasePayError(ws, proposalId, e); - return await guardOperationException( - () => processPurchasePayImpl(ws, proposalId, options), - onOpErr, - ); -} - -async function processPurchasePayImpl( - ws: InternalWalletState, - proposalId: string, - options: { - forceNow?: boolean; - } = {}, -): Promise<ConfirmPayResult> { - const forceNow = options.forceNow ?? false; - await setupPurchasePayRetry(ws, proposalId, { reset: forceNow }); +): Promise<OperationAttemptResult> { const purchase = await ws.db .mktx((x) => ({ purchases: x.purchases })) .runReadOnly(async (tx) => { @@ -1601,8 +1514,8 @@ async function processPurchasePayImpl( }); if (!purchase) { return { - type: ConfirmPayResultType.Pending, - lastError: { + type: OperationAttemptResultType.Error, + errorDetail: { // FIXME: allocate more specific error code code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, hint: `trying to pay for purchase that is not in the database`, @@ -1611,10 +1524,7 @@ async function processPurchasePayImpl( }; } if (!purchase.paymentSubmitPending) { - return { - type: ConfirmPayResultType.Pending, - lastError: purchase.lastPayError, - }; + OperationAttemptResult.finishedEmpty(); } logger.trace(`processing purchase pay ${proposalId}`); @@ -1659,23 +1569,12 @@ async function processPurchasePayImpl( logger.trace(`got resp ${JSON.stringify(resp)}`); - // Hide transient errors. - if ( - (purchase.payRetryInfo?.retryCounter ?? 0) <= 5 && - resp.status >= 500 && - resp.status <= 599 - ) { - logger.trace("treating /pay error as transient"); - const err = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, - getHttpResponseErrorDetails(resp), - "/pay failed", - ); - return { - type: ConfirmPayResultType.Pending, - lastError: err, - }; - } + const payOpId = RetryTags.forPay(purchase); + const payRetryRecord = await ws.db + .mktx((x) => ({ operationRetries: x.operationRetries })) + .runReadOnly(async (tx) => { + return await tx.operationRetries.get(payOpId); + }); if (resp.status === HttpStatusCode.BadRequest) { const errDetails = await readUnexpectedResponseDetails(resp); @@ -1689,8 +1588,6 @@ async function processPurchasePayImpl( return; } purch.payFrozen = true; - purch.lastPayError = errDetails; - delete purch.payRetryInfo; await tx.purchases.put(purch); }); throw makePendingOperationFailedError( @@ -1708,7 +1605,9 @@ async function processPurchasePayImpl( ) { // Do this in the background, as it might take some time handleInsufficientFunds(ws, proposalId, err).catch(async (e) => { - reportPurchasePayError(ws, proposalId, { + console.log("handling insufficient funds failed"); + + await scheduleRetry(ws, RetryTags.forPay(purchase), { code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, message: "unexpected exception", hint: "unexpected exception", @@ -1719,9 +1618,8 @@ async function processPurchasePayImpl( }); return { - type: ConfirmPayResultType.Pending, - // FIXME: should we return something better here? - lastError: err, + type: OperationAttemptResultType.Pending, + result: undefined, }; } } @@ -1761,22 +1659,6 @@ async function processPurchasePayImpl( const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () => ws.http.postJson(payAgainUrl, reqBody), ); - // Hide transient errors. - if ( - (purchase.payRetryInfo?.retryCounter ?? 0) <= 5 && - resp.status >= 500 && - resp.status <= 599 - ) { - const err = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, - getHttpResponseErrorDetails(resp), - "/paid failed", - ); - return { - type: ConfirmPayResultType.Pending, - lastError: err, - }; - } if (resp.status !== 204) { throw TalerError.fromDetail( TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, @@ -1793,10 +1675,7 @@ async function processPurchasePayImpl( proposalId: purchase.proposalId, }); - return { - type: ConfirmPayResultType.Done, - contractTerms: purchase.download.contractTermsRaw, - }; + return OperationAttemptResult.finishedEmpty(); } export async function refuseProposal( diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index 5cf3afd4d..7d5a5bfd9 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -36,40 +36,50 @@ import { import { AbsoluteTime } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../internal-wallet-state.js"; import { GetReadOnlyAccess } from "../util/query.js"; +import { RetryTags } from "../util/retries.js"; +import { Wallet } from "../wallet.js"; async function gatherExchangePending( tx: GetReadOnlyAccess<{ exchanges: typeof WalletStoresV1.exchanges; exchangeDetails: typeof WalletStoresV1.exchangeDetails; + operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.exchanges.iter().forEachAsync(async (e) => { + await tx.exchanges.iter().forEachAsync(async (exch) => { + const opTag = RetryTags.forExchangeUpdate(exch); + let opr = await tx.operationRetries.get(opTag); resp.pendingOperations.push({ type: PendingTaskType.ExchangeUpdate, + id: opTag, givesLifeness: false, timestampDue: - e.retryInfo?.nextRetry ?? AbsoluteTime.fromTimestamp(e.nextUpdate), - exchangeBaseUrl: e.baseUrl, - lastError: e.lastError, + opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate), + exchangeBaseUrl: exch.baseUrl, + lastError: opr?.lastError, }); // We only schedule a check for auto-refresh if the exchange update // was successful. - if (!e.lastError) { + if (!opr?.lastError) { resp.pendingOperations.push({ type: PendingTaskType.ExchangeCheckRefresh, - timestampDue: AbsoluteTime.fromTimestamp(e.nextRefreshCheck), + id: RetryTags.forExchangeCheckRefresh(exch), + timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck), givesLifeness: false, - exchangeBaseUrl: e.baseUrl, + exchangeBaseUrl: exch.baseUrl, }); } }); } async function gatherRefreshPending( - tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>, + tx: GetReadOnlyAccess<{ + refreshGroups: typeof WalletStoresV1.refreshGroups; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { @@ -83,15 +93,19 @@ async function gatherRefreshPending( if (r.frozen) { return; } + const opId = RetryTags.forRefresh(r); + const retryRecord = await tx.operationRetries.get(opId); + resp.pendingOperations.push({ type: PendingTaskType.Refresh, + id: opId, givesLifeness: true, - timestampDue: r.retryInfo.nextRetry, + timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), refreshGroupId: r.refreshGroupId, finishedPerCoin: r.statusPerCoin.map( (x) => x === RefreshCoinStatus.Finished, ), - retryInfo: r.retryInfo, + retryInfo: retryRecord?.retryInfo, }); } } @@ -100,6 +114,7 @@ async function gatherWithdrawalPending( tx: GetReadOnlyAccess<{ withdrawalGroups: typeof WalletStoresV1.withdrawalGroups; planchets: typeof WalletStoresV1.planchets; + operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, @@ -111,54 +126,68 @@ async function gatherWithdrawalPending( if (wsr.timestampFinish) { return; } - let numCoinsWithdrawn = 0; - let numCoinsTotal = 0; - await tx.planchets.indexes.byGroup - .iter(wsr.withdrawalGroupId) - .forEach((x) => { - numCoinsTotal++; - if (x.withdrawalDone) { - numCoinsWithdrawn++; - } - }); + const opTag = RetryTags.forWithdrawal(wsr); + let opr = await tx.operationRetries.get(opTag); + const now = AbsoluteTime.now(); + if (!opr) { + opr = { + id: opTag, + retryInfo: { + firstTry: now, + nextRetry: now, + retryCounter: 0, + }, + }; + } resp.pendingOperations.push({ type: PendingTaskType.Withdraw, + id: opTag, givesLifeness: true, - timestampDue: wsr.retryInfo?.nextRetry ?? AbsoluteTime.now(), + timestampDue: opr.retryInfo?.nextRetry ?? AbsoluteTime.now(), withdrawalGroupId: wsr.withdrawalGroupId, - lastError: wsr.lastError, - retryInfo: wsr.retryInfo, + lastError: opr.lastError, + retryInfo: opr.retryInfo, }); } } async function gatherProposalPending( - tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>, + tx: GetReadOnlyAccess<{ + proposals: typeof WalletStoresV1.proposals; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.proposals.iter().forEach((proposal) => { + await tx.proposals.iter().forEachAsync(async (proposal) => { if (proposal.proposalStatus == ProposalStatus.Proposed) { // Nothing to do, user needs to choose. } else if (proposal.proposalStatus == ProposalStatus.Downloading) { - const timestampDue = proposal.retryInfo?.nextRetry ?? AbsoluteTime.now(); + const opId = RetryTags.forProposalClaim(proposal); + const retryRecord = await tx.operationRetries.get(opId); + const timestampDue = + retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.ProposalDownload, + id: opId, givesLifeness: true, timestampDue, merchantBaseUrl: proposal.merchantBaseUrl, orderId: proposal.orderId, proposalId: proposal.proposalId, proposalTimestamp: proposal.timestamp, - lastError: proposal.lastError, - retryInfo: proposal.retryInfo, + lastError: retryRecord?.lastError, + retryInfo: retryRecord?.retryInfo, }); } }); } async function gatherDepositPending( - tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>, + tx: GetReadOnlyAccess<{ + depositGroups: typeof WalletStoresV1.depositGroups; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { @@ -169,32 +198,42 @@ async function gatherDepositPending( if (dg.timestampFinished) { return; } - const timestampDue = dg.retryInfo?.nextRetry ?? AbsoluteTime.now(); + const opId = RetryTags.forDeposit(dg); + const retryRecord = await tx.operationRetries.get(opId); + const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Deposit, + id: opId, givesLifeness: true, timestampDue, depositGroupId: dg.depositGroupId, - lastError: dg.lastError, - retryInfo: dg.retryInfo, + lastError: retryRecord?.lastError, + retryInfo: retryRecord?.retryInfo, }); } } async function gatherTipPending( - tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>, + tx: GetReadOnlyAccess<{ + tips: typeof WalletStoresV1.tips; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.tips.iter().forEach((tip) => { + await tx.tips.iter().forEachAsync(async (tip) => { + // FIXME: The tip record needs a proper status field! if (tip.pickedUpTimestamp) { return; } + const opId = RetryTags.forTipPickup(tip); + const retryRecord = await tx.operationRetries.get(opId); if (tip.acceptedTimestamp) { resp.pendingOperations.push({ type: PendingTaskType.TipPickup, + id: opId, givesLifeness: true, - timestampDue: tip.retryInfo.nextRetry, + timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), merchantBaseUrl: tip.merchantBaseUrl, tipId: tip.walletTipId, merchantTipId: tip.merchantTipId, @@ -204,56 +243,77 @@ async function gatherTipPending( } async function gatherPurchasePending( - tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>, + tx: GetReadOnlyAccess<{ + purchases: typeof WalletStoresV1.purchases; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.purchases.iter().forEach((pr) => { + // FIXME: Only iter purchases with some "active" flag! + await tx.purchases.iter().forEachAsync(async (pr) => { if ( pr.paymentSubmitPending && pr.abortStatus === AbortStatus.None && !pr.payFrozen ) { - const timestampDue = pr.payRetryInfo?.nextRetry ?? AbsoluteTime.now(); + const payOpId = RetryTags.forPay(pr); + const payRetryRecord = await tx.operationRetries.get(payOpId); + + const timestampDue = + payRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); resp.pendingOperations.push({ type: PendingTaskType.Pay, + id: payOpId, givesLifeness: true, timestampDue, isReplay: false, proposalId: pr.proposalId, - retryInfo: pr.payRetryInfo, - lastError: pr.lastPayError, + retryInfo: payRetryRecord?.retryInfo, + lastError: payRetryRecord?.lastError, }); } if (pr.refundQueryRequested) { + const refundQueryOpId = RetryTags.forRefundQuery(pr); + const refundQueryRetryRecord = await tx.operationRetries.get( + refundQueryOpId, + ); resp.pendingOperations.push({ type: PendingTaskType.RefundQuery, + id: refundQueryOpId, givesLifeness: true, - timestampDue: pr.refundStatusRetryInfo.nextRetry, + timestampDue: + refundQueryRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), proposalId: pr.proposalId, - retryInfo: pr.refundStatusRetryInfo, - lastError: pr.lastRefundStatusError, + retryInfo: refundQueryRetryRecord?.retryInfo, + lastError: refundQueryRetryRecord?.lastError, }); } }); } async function gatherRecoupPending( - tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>, + tx: GetReadOnlyAccess<{ + recoupGroups: typeof WalletStoresV1.recoupGroups; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.recoupGroups.iter().forEach((rg) => { + await tx.recoupGroups.iter().forEachAsync(async (rg) => { if (rg.timestampFinished) { return; } + const opId = RetryTags.forRecoup(rg); + const retryRecord = await tx.operationRetries.get(opId); resp.pendingOperations.push({ type: PendingTaskType.Recoup, + id: opId, givesLifeness: true, - timestampDue: rg.retryInfo.nextRetry, + timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(), recoupGroupId: rg.recoupGroupId, - retryInfo: rg.retryInfo, - lastError: rg.lastError, + retryInfo: retryRecord?.retryInfo, + lastError: retryRecord?.lastError, }); }); } @@ -261,14 +321,18 @@ async function gatherRecoupPending( async function gatherBackupPending( tx: GetReadOnlyAccess<{ backupProviders: typeof WalletStoresV1.backupProviders; + operationRetries: typeof WalletStoresV1.operationRetries; }>, now: AbsoluteTime, resp: PendingOperationsResponse, ): Promise<void> { - await tx.backupProviders.iter().forEach((bp) => { + await tx.backupProviders.iter().forEachAsync(async (bp) => { + const opId = RetryTags.forBackup(bp); + const retryRecord = await tx.operationRetries.get(opId); if (bp.state.tag === BackupProviderStateTag.Ready) { resp.pendingOperations.push({ type: PendingTaskType.Backup, + id: opId, givesLifeness: false, timestampDue: AbsoluteTime.fromTimestamp(bp.state.nextBackupTimestamp), backupProviderBaseUrl: bp.baseUrl, @@ -277,11 +341,12 @@ async function gatherBackupPending( } else if (bp.state.tag === BackupProviderStateTag.Retrying) { resp.pendingOperations.push({ type: PendingTaskType.Backup, + id: opId, givesLifeness: false, - timestampDue: bp.state.retryInfo.nextRetry, + timestampDue: retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(), backupProviderBaseUrl: bp.baseUrl, - retryInfo: bp.state.retryInfo, - lastError: bp.state.lastError, + retryInfo: retryRecord?.retryInfo, + lastError: retryRecord?.lastError, }); } }); @@ -305,6 +370,7 @@ export async function getPendingOperations( planchets: x.planchets, depositGroups: x.depositGroups, recoupGroups: x.recoupGroups, + operationRetries: x.operationRetries, })) .runReadWrite(async (tx) => { const resp: PendingOperationsResponse = { diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts index 283707947..387c23f41 100644 --- a/packages/taler-wallet-core/src/operations/recoup.ts +++ b/packages/taler-wallet-core/src/operations/recoup.ts @@ -42,6 +42,8 @@ import { CoinRecord, CoinSourceType, CoinStatus, + OperationAttemptResult, + OperationAttemptResultType, RecoupGroupRecord, RefreshCoinSource, ReserveRecordStatus, @@ -52,64 +54,13 @@ import { import { InternalWalletState } from "../internal-wallet-state.js"; import { readSuccessResponseJsonOrThrow } from "../util/http.js"; import { GetReadWriteAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; +import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js"; import { guardOperationException } from "./common.js"; import { createRefreshGroup, processRefreshGroup } from "./refresh.js"; import { internalCreateWithdrawalGroup } from "./withdraw.js"; const logger = new Logger("operations/recoup.ts"); -async function setupRecoupRetry( - ws: InternalWalletState, - recoupGroupId: string, - options: { - reset: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ - recoupGroups: x.recoupGroups, - })) - .runReadWrite(async (tx) => { - const r = await tx.recoupGroups.get(recoupGroupId); - if (!r) { - return; - } - if (options.reset) { - r.retryInfo = RetryInfo.reset(); - } else { - r.retryInfo = RetryInfo.increment(r.retryInfo); - } - delete r.lastError; - await tx.recoupGroups.put(r); - }); -} - -async function reportRecoupError( - ws: InternalWalletState, - recoupGroupId: string, - err: TalerErrorDetail, -): Promise<void> { - await ws.db - .mktx((x) => ({ - recoupGroups: x.recoupGroups, - })) - .runReadWrite(async (tx) => { - const r = await tx.recoupGroups.get(recoupGroupId); - if (!r) { - return; - } - if (!r.retryInfo) { - logger.error( - "reporting error for inactive recoup group (no retry info)", - ); - } - r.lastError = err; - await tx.recoupGroups.put(r); - }); - ws.notify({ type: NotificationType.RecoupOperationError, error: err }); -} - /** * Store a recoup group record in the database after marking * a coin in the group as finished. @@ -353,25 +304,20 @@ export async function processRecoupGroup( forceNow?: boolean; } = {}, ): Promise<void> { - await ws.memoProcessRecoup.memo(recoupGroupId, async () => { - const onOpErr = (e: TalerErrorDetail): Promise<void> => - reportRecoupError(ws, recoupGroupId, e); - return await guardOperationException( - async () => await processRecoupGroupImpl(ws, recoupGroupId, options), - onOpErr, - ); - }); + await runOperationHandlerForResult( + await processRecoupGroupHandler(ws, recoupGroupId, options), + ); + return; } -async function processRecoupGroupImpl( +export async function processRecoupGroupHandler( ws: InternalWalletState, recoupGroupId: string, options: { forceNow?: boolean; } = {}, -): Promise<void> { +): Promise<OperationAttemptResult> { const forceNow = options.forceNow ?? false; - await setupRecoupRetry(ws, recoupGroupId, { reset: forceNow }); let recoupGroup = await ws.db .mktx((x) => ({ recoupGroups: x.recoupGroups, @@ -380,11 +326,11 @@ async function processRecoupGroupImpl( return tx.recoupGroups.get(recoupGroupId); }); if (!recoupGroup) { - return; + return OperationAttemptResult.finishedEmpty(); } if (recoupGroup.timestampFinished) { logger.trace("recoup group finished"); - return; + return OperationAttemptResult.finishedEmpty(); } const ps = recoupGroup.coinPubs.map(async (x, i) => { try { @@ -404,12 +350,12 @@ async function processRecoupGroupImpl( return tx.recoupGroups.get(recoupGroupId); }); if (!recoupGroup) { - return; + return OperationAttemptResult.finishedEmpty(); } for (const b of recoupGroup.recoupFinishedPerCoin) { if (!b) { - return; + return OperationAttemptResult.finishedEmpty(); } } @@ -480,8 +426,6 @@ async function processRecoupGroupImpl( return; } rg2.timestampFinished = TalerProtocolTimestamp.now(); - rg2.retryInfo = RetryInfo.reset(); - rg2.lastError = undefined; if (rg2.scheduleRefreshCoins.length > 0) { const refreshGroupId = await createRefreshGroup( ws, @@ -495,6 +439,7 @@ async function processRecoupGroupImpl( } await tx.recoupGroups.put(rg2); }); + return OperationAttemptResult.finishedEmpty(); } export async function createRecoupGroup( @@ -514,10 +459,8 @@ export async function createRecoupGroup( recoupGroupId, exchangeBaseUrl: exchangeBaseUrl, coinPubs: coinPubs, - lastError: undefined, timestampFinished: undefined, timestampStarted: TalerProtocolTimestamp.now(), - retryInfo: RetryInfo.reset(), recoupFinishedPerCoin: coinPubs.map(() => false), // Will be populated later oldAmountPerCoin: [], diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 64a734bb3..64b85a040 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -57,6 +57,8 @@ import { CoinSourceType, CoinStatus, DenominationRecord, + OperationAttemptResult, + OperationAttemptResultType, OperationStatus, RefreshCoinStatus, RefreshGroupRecord, @@ -74,7 +76,7 @@ import { } from "../util/http.js"; import { checkDbInvariant } from "../util/invariants.js"; import { GetReadWriteAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; +import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js"; import { guardOperationException } from "./common.js"; import { updateExchangeFromUrl } from "./exchanges.js"; import { @@ -133,11 +135,9 @@ function updateGroupStatus(rg: RefreshGroupRecord): void { if (allDone) { if (anyFrozen) { rg.frozen = true; - rg.retryInfo = RetryInfo.reset(); } else { rg.timestampFinished = AbsoluteTime.toTimestamp(AbsoluteTime.now()); rg.operationStatus = OperationStatus.Finished; - rg.retryInfo = RetryInfo.reset(); } } } @@ -730,89 +730,14 @@ async function refreshReveal( }); } -async function setupRefreshRetry( - ws: InternalWalletState, - refreshGroupId: string, - options: { - reset: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ - refreshGroups: x.refreshGroups, - })) - .runReadWrite(async (tx) => { - const r = await tx.refreshGroups.get(refreshGroupId); - if (!r) { - return; - } - if (options.reset) { - r.retryInfo = RetryInfo.reset(); - } else { - r.retryInfo = RetryInfo.increment(r.retryInfo); - } - delete r.lastError; - await tx.refreshGroups.put(r); - }); -} - -async function reportRefreshError( - ws: InternalWalletState, - refreshGroupId: string, - err: TalerErrorDetail | undefined, -): Promise<void> { - await ws.db - .mktx((x) => ({ - refreshGroups: x.refreshGroups, - })) - .runReadWrite(async (tx) => { - const r = await tx.refreshGroups.get(refreshGroupId); - if (!r) { - return; - } - if (!r.retryInfo) { - logger.error( - "reported error for inactive refresh group (no retry info)", - ); - } - r.lastError = err; - await tx.refreshGroups.put(r); - }); - if (err) { - ws.notify({ type: NotificationType.RefreshOperationError, error: err }); - } -} - -/** - * Actually process a refresh group that has been created. - */ export async function processRefreshGroup( ws: InternalWalletState, refreshGroupId: string, options: { forceNow?: boolean; } = {}, -): Promise<void> { - await ws.memoProcessRefresh.memo(refreshGroupId, async () => { - const onOpErr = (e: TalerErrorDetail): Promise<void> => - reportRefreshError(ws, refreshGroupId, e); - return await guardOperationException( - async () => await processRefreshGroupImpl(ws, refreshGroupId, options), - onOpErr, - ); - }); -} - -async function processRefreshGroupImpl( - ws: InternalWalletState, - refreshGroupId: string, - options: { - forceNow?: boolean; - } = {}, -): Promise<void> { - const forceNow = options.forceNow ?? false; +): Promise<OperationAttemptResult> { logger.info(`processing refresh group ${refreshGroupId}`); - await setupRefreshRetry(ws, refreshGroupId, { reset: forceNow }); const refreshGroup = await ws.db .mktx((x) => ({ @@ -822,10 +747,16 @@ async function processRefreshGroupImpl( return tx.refreshGroups.get(refreshGroupId); }); if (!refreshGroup) { - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } if (refreshGroup.timestampFinished) { - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } // Process refresh sessions of the group in parallel. logger.trace("processing refresh sessions for old coins"); @@ -855,6 +786,10 @@ async function processRefreshGroupImpl( logger.warn("process refresh sessions got exception"); logger.warn(`exception: ${e}`); } + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } async function processRefreshSession( @@ -975,13 +910,11 @@ export async function createRefreshGroup( operationStatus: OperationStatus.Pending, timestampFinished: undefined, statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending), - lastError: undefined, lastErrorPerCoin: {}, oldCoinPubs: oldCoinPubs.map((x) => x.coinPub), reason, refreshGroupId, refreshSessionPerCoin: oldCoinPubs.map(() => undefined), - retryInfo: RetryInfo.reset(), inputPerCoin, estimatedOutputPerCoin, timestampCreated: TalerProtocolTimestamp.now(), @@ -1034,7 +967,7 @@ function getAutoRefreshExecuteThreshold(d: DenominationRecord): AbsoluteTime { export async function autoRefresh( ws: InternalWalletState, exchangeBaseUrl: string, -): Promise<void> { +): Promise<OperationAttemptResult> { logger.info(`doing auto-refresh check for '${exchangeBaseUrl}'`); // We must make sure that the exchange is up-to-date so that @@ -1109,4 +1042,5 @@ export async function autoRefresh( exchange.nextRefreshCheck = AbsoluteTime.toTimestamp(minCheckThreshold); await tx.exchanges.put(exchange); }); + return OperationAttemptResult.finishedEmpty(); } diff --git a/packages/taler-wallet-core/src/operations/refund.ts b/packages/taler-wallet-core/src/operations/refund.ts index 8f5c1143a..bc8c185db 100644 --- a/packages/taler-wallet-core/src/operations/refund.ts +++ b/packages/taler-wallet-core/src/operations/refund.ts @@ -51,6 +51,7 @@ import { import { AbortStatus, CoinStatus, + OperationAttemptResult, PurchaseRecord, RefundReason, RefundState, @@ -60,8 +61,6 @@ import { InternalWalletState } from "../internal-wallet-state.js"; import { readSuccessResponseJsonOrThrow } from "../util/http.js"; import { checkDbInvariant } from "../util/invariants.js"; import { GetReadWriteAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; -import { guardOperationException } from "./common.js"; import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js"; const logger = new Logger("refund.ts"); @@ -120,68 +119,6 @@ export async function prepareRefund( }, }; } -/** - * Retry querying and applying refunds for an order later. - */ -async function setupPurchaseQueryRefundRetry( - ws: InternalWalletState, - proposalId: string, - options: { - reset: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ - purchases: x.purchases, - })) - .runReadWrite(async (tx) => { - const pr = await tx.purchases.get(proposalId); - if (!pr) { - return; - } - if (options.reset) { - pr.refundStatusRetryInfo = RetryInfo.reset(); - } else { - pr.refundStatusRetryInfo = RetryInfo.increment( - pr.refundStatusRetryInfo, - ); - } - await tx.purchases.put(pr); - }); -} - -/** - * Report an error that happending when querying for a purchase's refund. - */ -async function reportPurchaseQueryRefundError( - ws: InternalWalletState, - proposalId: string, - err: TalerErrorDetail, -): Promise<void> { - await ws.db - .mktx((x) => ({ - purchases: x.purchases, - })) - .runReadWrite(async (tx) => { - const pr = await tx.purchases.get(proposalId); - if (!pr) { - return; - } - if (!pr.refundStatusRetryInfo) { - logger.error( - "reported error on an inactive purchase (no refund status retry info)", - ); - } - pr.lastRefundStatusError = err; - await tx.purchases.put(pr); - }); - if (err) { - ws.notify({ - type: NotificationType.RefundStatusOperationError, - error: err, - }); - } -} function getRefundKey(d: MerchantCoinRefundStatus): string { return `${d.coin_pub}-${d.rtransaction_id}`; @@ -492,8 +429,6 @@ async function acceptRefunds( if (queryDone) { p.timestampLastRefundStatus = now; - p.lastRefundStatusError = undefined; - p.refundStatusRetryInfo = RetryInfo.reset(); p.refundQueryRequested = false; if (p.abortStatus === AbortStatus.AbortRefund) { p.abortStatus = AbortStatus.AbortFinished; @@ -502,8 +437,6 @@ async function acceptRefunds( } else { // No error, but we need to try again! p.timestampLastRefundStatus = now; - p.refundStatusRetryInfo = RetryInfo.increment(p.refundStatusRetryInfo); - p.lastRefundStatusError = undefined; logger.trace("refund query not done"); } @@ -621,8 +554,6 @@ export async function applyRefundFromPurchaseId( return false; } p.refundQueryRequested = true; - p.lastRefundStatusError = undefined; - p.refundStatusRetryInfo = RetryInfo.reset(); await tx.purchases.put(p); return true; }); @@ -631,7 +562,7 @@ export async function applyRefundFromPurchaseId( ws.notify({ type: NotificationType.RefundStarted, }); - await processPurchaseQueryRefundImpl(ws, proposalId, { + await processPurchaseQueryRefund(ws, proposalId, { forceNow: true, waitForAutoRefund: false, }); @@ -672,22 +603,6 @@ export async function applyRefundFromPurchaseId( }; } -export async function processPurchaseQueryRefund( - ws: InternalWalletState, - proposalId: string, - options: { - forceNow?: boolean; - waitForAutoRefund?: boolean; - } = {}, -): Promise<void> { - const onOpErr = (e: TalerErrorDetail): Promise<void> => - reportPurchaseQueryRefundError(ws, proposalId, e); - await guardOperationException( - () => processPurchaseQueryRefundImpl(ws, proposalId, options), - onOpErr, - ); -} - async function queryAndSaveAwaitingRefund( ws: InternalWalletState, purchase: PurchaseRecord, @@ -742,17 +657,15 @@ async function queryAndSaveAwaitingRefund( return refundAwaiting; } -async function processPurchaseQueryRefundImpl( +export async function processPurchaseQueryRefund( ws: InternalWalletState, proposalId: string, options: { forceNow?: boolean; waitForAutoRefund?: boolean; } = {}, -): Promise<void> { - const forceNow = options.forceNow ?? false; +): Promise<OperationAttemptResult> { const waitForAutoRefund = options.waitForAutoRefund ?? false; - await setupPurchaseQueryRefundRetry(ws, proposalId, { reset: forceNow }); const purchase = await ws.db .mktx((x) => ({ purchases: x.purchases, @@ -761,11 +674,11 @@ async function processPurchaseQueryRefundImpl( return tx.purchases.get(proposalId); }); if (!purchase) { - return; + return OperationAttemptResult.finishedEmpty(); } if (!purchase.refundQueryRequested) { - return; + return OperationAttemptResult.finishedEmpty(); } if (purchase.timestampFirstSuccessfulPay) { @@ -780,7 +693,9 @@ async function processPurchaseQueryRefundImpl( purchase, waitForAutoRefund, ); - if (Amounts.isZero(awaitingAmount)) return; + if (Amounts.isZero(awaitingAmount)) { + return OperationAttemptResult.finishedEmpty(); + } } const requestUrl = new URL( @@ -873,6 +788,7 @@ async function processPurchaseQueryRefundImpl( } await acceptRefunds(ws, proposalId, refunds, RefundReason.AbortRefund); } + return OperationAttemptResult.finishedEmpty(); } export async function abortFailedPayWithRefund( @@ -899,8 +815,6 @@ export async function abortFailedPayWithRefund( purchase.refundQueryRequested = true; purchase.paymentSubmitPending = false; purchase.abortStatus = AbortStatus.AbortRefund; - purchase.lastPayError = undefined; - purchase.payRetryInfo = RetryInfo.reset(); await tx.purchases.put(purchase); }); processPurchaseQueryRefund(ws, proposalId, { diff --git a/packages/taler-wallet-core/src/operations/tip.ts b/packages/taler-wallet-core/src/operations/tip.ts index 7148999c5..f19be91b2 100644 --- a/packages/taler-wallet-core/src/operations/tip.ts +++ b/packages/taler-wallet-core/src/operations/tip.ts @@ -18,29 +18,45 @@ * Imports. */ import { - Amounts, BlindedDenominationSignature, - codecForMerchantTipResponseV2, codecForTipPickupGetResponse, DenomKeyType, encodeCrock, getRandomBytes, j2s, Logger, NotificationType, parseTipUri, PrepareTipResult, TalerErrorCode, TalerErrorDetail, TalerProtocolTimestamp, TipPlanchetDetail, URL + Amounts, + BlindedDenominationSignature, + codecForMerchantTipResponseV2, + codecForTipPickupGetResponse, + DenomKeyType, + encodeCrock, + getRandomBytes, + j2s, + Logger, + parseTipUri, + PrepareTipResult, + TalerErrorCode, + TalerProtocolTimestamp, + TipPlanchetDetail, + URL, } from "@gnu-taler/taler-util"; import { DerivedTipPlanchet } from "../crypto/cryptoTypes.js"; import { CoinRecord, CoinSourceType, - CoinStatus, DenominationRecord, TipRecord + CoinStatus, + DenominationRecord, + OperationAttemptResult, + OperationAttemptResultType, + TipRecord, } from "../db.js"; import { makeErrorDetail } from "../errors.js"; import { InternalWalletState } from "../internal-wallet-state.js"; import { getHttpResponseErrorDetails, - readSuccessResponseJsonOrThrow + readSuccessResponseJsonOrThrow, } from "../util/http.js"; import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; -import { - RetryInfo -} from "../util/retries.js"; -import { guardOperationException } from "./common.js"; import { updateExchangeFromUrl } from "./exchanges.js"; import { - getCandidateWithdrawalDenoms, getExchangeWithdrawalInfo, selectWithdrawalDenominations, updateWithdrawalDenoms + getCandidateWithdrawalDenoms, + getExchangeWithdrawalInfo, + selectWithdrawalDenominations, + updateWithdrawalDenoms, } from "./withdraw.js"; const logger = new Logger("operations/tip.ts"); @@ -114,8 +130,6 @@ export async function prepareTip( createdTimestamp: TalerProtocolTimestamp.now(), merchantTipId: res.merchantTipId, tipAmountEffective: selectedDenoms.totalCoinValue, - retryInfo: RetryInfo.reset(), - lastError: undefined, denomsSel: selectedDenoms, pickedUpTimestamp: undefined, secretSeed, @@ -144,82 +158,13 @@ export async function prepareTip( return tipStatus; } -async function reportTipError( - ws: InternalWalletState, - walletTipId: string, - err: TalerErrorDetail, -): Promise<void> { - await ws.db - .mktx((x) => ({ - tips: x.tips, - })) - .runReadWrite(async (tx) => { - const t = await tx.tips.get(walletTipId); - if (!t) { - return; - } - if (!t.retryInfo) { - logger.reportBreak(); - } - t.lastError = err; - await tx.tips.put(t); - }); - if (err) { - ws.notify({ type: NotificationType.TipOperationError, error: err }); - } -} - -async function setupTipRetry( - ws: InternalWalletState, - walletTipId: string, - options: { - reset: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ - tips: x.tips, - })) - .runReadWrite(async (tx) => { - const t = await tx.tips.get(walletTipId); - if (!t) { - return; - } - if (options.reset) { - t.retryInfo = RetryInfo.reset(); - } else { - t.retryInfo = RetryInfo.increment(t.retryInfo); - } - delete t.lastError; - await tx.tips.put(t); - }); -} - export async function processTip( ws: InternalWalletState, - tipId: string, - options: { - forceNow?: boolean; - } = {}, -): Promise<void> { - const onOpErr = (e: TalerErrorDetail): Promise<void> => - reportTipError(ws, tipId, e); - await guardOperationException( - () => processTipImpl(ws, tipId, options), - onOpErr, - ); -} - -async function processTipImpl( - ws: InternalWalletState, walletTipId: string, options: { forceNow?: boolean; } = {}, -): Promise<void> { - const forceNow = options.forceNow ?? false; - await setupTipRetry(ws, walletTipId, { reset: forceNow }); - +): Promise<OperationAttemptResult> { const tipRecord = await ws.db .mktx((x) => ({ tips: x.tips, @@ -228,12 +173,18 @@ async function processTipImpl( return tx.tips.get(walletTipId); }); if (!tipRecord) { - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } if (tipRecord.pickedUpTimestamp) { logger.warn("tip already picked up"); - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } const denomsForWithdraw = tipRecord.denomsSel; @@ -284,22 +235,21 @@ async function processTipImpl( logger.trace(`got tip response, status ${merchantResp.status}`); - // Hide transient errors. + // FIXME: Why do we do this? if ( - tipRecord.retryInfo.retryCounter < 5 && - ((merchantResp.status >= 500 && merchantResp.status <= 599) || - merchantResp.status === 424) + (merchantResp.status >= 500 && merchantResp.status <= 599) || + merchantResp.status === 424 ) { logger.trace(`got transient tip error`); // FIXME: wrap in another error code that indicates a transient error - const err = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, - getHttpResponseErrorDetails(merchantResp), - "tip pickup failed (transient)", - ); - await reportTipError(ws, tipRecord.walletTipId, err); - // FIXME: Maybe we want to signal to the caller that the transient error happened? - return; + return { + type: OperationAttemptResultType.Error, + errorDetail: makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, + getHttpResponseErrorDetails(merchantResp), + "tip pickup failed (transient)", + ), + }; } let blindedSigs: BlindedDenominationSignature[] = []; @@ -344,21 +294,14 @@ async function processTipImpl( }); if (!isValid) { - await ws.db - .mktx((x) => ({ tips: x.tips })) - .runReadWrite(async (tx) => { - const tipRecord = await tx.tips.get(walletTipId); - if (!tipRecord) { - return; - } - tipRecord.lastError = makeErrorDetail( - TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID, - {}, - "invalid signature from the exchange (via merchant tip) after unblinding", - ); - await tx.tips.put(tipRecord); - }); - return; + return { + type: OperationAttemptResultType.Error, + errorDetail: makeErrorDetail( + TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID, + {}, + "invalid signature from the exchange (via merchant tip) after unblinding", + ), + }; } newCoinRecords.push({ @@ -395,13 +338,16 @@ async function processTipImpl( return; } tr.pickedUpTimestamp = TalerProtocolTimestamp.now(); - tr.lastError = undefined; - tr.retryInfo = RetryInfo.reset(); await tx.tips.put(tr); for (const cr of newCoinRecords) { await tx.coins.put(cr); } }); + + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } export async function acceptTip( diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index 956d565a6..5a96fc6ff 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -38,7 +38,6 @@ import { InternalWalletState } from "../internal-wallet-state.js"; import { AbortStatus, RefundState, - ReserveRecordStatus, WalletRefundItem, WithdrawalRecordType, } from "../db.js"; @@ -48,6 +47,7 @@ import { processPurchasePay } from "./pay.js"; import { processRefreshGroup } from "./refresh.js"; import { processTip } from "./tip.js"; import { processWithdrawalGroup } from "./withdraw.js"; +import { RetryTags } from "../util/retries.js"; const logger = new Logger("taler-wallet-core:transactions.ts"); @@ -142,6 +142,7 @@ export async function getTransactions( tombstones: x.tombstones, peerPushPaymentInitiations: x.peerPushPaymentInitiations, peerPullPaymentIncoming: x.peerPullPaymentIncoming, + operationRetries: x.operationRetries, })) .runReadOnly(async (tx) => { tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => { @@ -220,6 +221,10 @@ export async function getTransactions( if (shouldSkipSearch(transactionsRequest, [])) { return; } + + const opId = RetryTags.forWithdrawal(wsr); + const ort = await tx.operationRetries.get(opId); + let withdrawalDetails: WithdrawalDetails; if (wsr.wgInfo.withdrawalType === WithdrawalRecordType.PeerPullCredit) { transactions.push({ @@ -242,7 +247,7 @@ export async function getTransactions( wsr.withdrawalGroupId, ), frozen: false, - ...(wsr.lastError ? { error: wsr.lastError } : {}), + ...(ort?.lastError ? { error: ort.lastError } : {}), }); return; } else if ( @@ -264,7 +269,7 @@ export async function getTransactions( wsr.withdrawalGroupId, ), frozen: false, - ...(wsr.lastError ? { error: wsr.lastError } : {}), + ...(ort?.lastError ? { error: ort.lastError } : {}), }); return; } else if ( @@ -310,7 +315,7 @@ export async function getTransactions( wsr.withdrawalGroupId, ), frozen: false, - ...(wsr.lastError ? { error: wsr.lastError } : {}), + ...(ort?.lastError ? { error: ort.lastError } : {}), }); }); @@ -319,7 +324,8 @@ export async function getTransactions( if (shouldSkipCurrency(transactionsRequest, amount.currency)) { return; } - + const opId = RetryTags.forDeposit(dg); + const retryRecord = await tx.operationRetries.get(opId); transactions.push({ type: TransactionType.Deposit, amountRaw: Amounts.stringify(dg.effectiveDepositAmount), @@ -333,7 +339,7 @@ export async function getTransactions( dg.depositGroupId, ), depositGroupId: dg.depositGroupId, - ...(dg.lastError ? { error: dg.lastError } : {}), + ...(retryRecord?.lastError ? { error: retryRecord.lastError } : {}), }); }); @@ -456,7 +462,15 @@ export async function getTransactions( }); } - const err = pr.lastPayError ?? pr.lastRefundStatusError; + const payOpId = RetryTags.forPay(pr); + const refundQueryOpId = RetryTags.forRefundQuery(pr); + const payRetryRecord = await tx.operationRetries.get(payOpId); + const refundQueryRetryRecord = await tx.operationRetries.get( + refundQueryOpId, + ); + + const err = + refundQueryRetryRecord?.lastError ?? payRetryRecord?.lastError; transactions.push({ type: TransactionType.Payment, amountRaw: Amounts.stringify(contractData.amount), @@ -495,6 +509,8 @@ export async function getTransactions( if (!tipRecord.acceptedTimestamp) { return; } + const opId = RetryTags.forTipPickup(tipRecord); + const retryRecord = await tx.operationRetries.get(opId); transactions.push({ type: TransactionType.Tip, amountEffective: Amounts.stringify(tipRecord.tipAmountEffective), @@ -507,10 +523,7 @@ export async function getTransactions( tipRecord.walletTipId, ), merchantBaseUrl: tipRecord.merchantBaseUrl, - // merchant: { - // name: tipRecord.merchantBaseUrl, - // }, - error: tipRecord.lastError, + error: retryRecord?.lastError, }); }); }); @@ -589,7 +602,11 @@ export async function deleteTransaction( ): Promise<void> { const [typeStr, ...rest] = transactionId.split(":"); const type = typeStr as TransactionType; - if (type === TransactionType.Withdrawal || type === TransactionType.PeerPullCredit || type === TransactionType.PeerPushCredit) { + if ( + type === TransactionType.Withdrawal || + type === TransactionType.PeerPullCredit || + type === TransactionType.PeerPushCredit + ) { const withdrawalGroupId = rest[0]; await ws.db .mktx((x) => ({ @@ -714,7 +731,9 @@ export async function deleteTransaction( tombstones: x.tombstones, })) .runReadWrite(async (tx) => { - const debit = await tx.peerPullPaymentIncoming.get(peerPullPaymentIncomingId); + const debit = await tx.peerPullPaymentIncoming.get( + peerPullPaymentIncomingId, + ); if (debit) { await tx.peerPullPaymentIncoming.delete(peerPullPaymentIncomingId); await tx.tombstones.put({ @@ -737,10 +756,7 @@ export async function deleteTransaction( if (debit) { await tx.peerPushPaymentInitiations.delete(pursePub); await tx.tombstones.put({ - id: makeEventId( - TombstoneTag.DeletePeerPushDebit, - pursePub, - ), + id: makeEventId(TombstoneTag.DeletePeerPushDebit, pursePub), }); } }); diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index b80745316..ce5863b31 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -56,7 +56,6 @@ import { WithdrawBatchResponse, WithdrawResponse, WithdrawUriInfoResponse, - WithdrawUriResult, } from "@gnu-taler/taler-util"; import { EddsaKeypair } from "../crypto/cryptoImplementation.js"; import { @@ -68,9 +67,10 @@ import { DenomSelectionState, ExchangeDetailsRecord, ExchangeRecord, + OperationAttemptResult, + OperationAttemptResultType, OperationStatus, PlanchetRecord, - ReserveBankInfo, ReserveRecordStatus, WalletStoresV1, WgInfo, @@ -98,7 +98,6 @@ import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, WALLET_EXCHANGE_PROTOCOL_VERSION, } from "../versions.js"; -import { guardOperationException } from "./common.js"; import { getExchangeDetails, getExchangePaytoUri, @@ -691,31 +690,12 @@ async function processPlanchetExchangeBatchRequest( withdrawalGroup.exchangeBaseUrl, ).href; - try { - const resp = await ws.http.postJson(reqUrl, d); - const r = await readSuccessResponseJsonOrThrow( - resp, - codecForWithdrawBatchResponse(), - ); - return r; - } catch (e) { - const errDetail = getErrorDetailFromException(e); - logger.trace("withdrawal batch request failed", e); - logger.trace(e); - await ws.db - .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) - .runReadWrite(async (tx) => { - let wg = await tx.withdrawalGroups.get( - withdrawalGroup.withdrawalGroupId, - ); - if (!wg) { - return; - } - wg.lastError = errDetail; - await tx.withdrawalGroups.put(wg); - }); - return; - } + const resp = await ws.http.postJson(reqUrl, d); + const r = await readSuccessResponseJsonOrThrow( + resp, + codecForWithdrawBatchResponse(), + ); + return r; } async function processPlanchetVerifyAndStoreCoin( @@ -951,50 +931,6 @@ export async function updateWithdrawalDenoms( } } -async function setupWithdrawalRetry( - ws: InternalWalletState, - withdrawalGroupId: string, - options: { - reset: boolean; - }, -): Promise<void> { - await ws.db - .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) - .runReadWrite(async (tx) => { - const wsr = await tx.withdrawalGroups.get(withdrawalGroupId); - if (!wsr) { - return; - } - if (options.reset) { - wsr.retryInfo = RetryInfo.reset(); - } else { - wsr.retryInfo = RetryInfo.increment(wsr.retryInfo); - } - await tx.withdrawalGroups.put(wsr); - }); -} - -async function reportWithdrawalError( - ws: InternalWalletState, - withdrawalGroupId: string, - err: TalerErrorDetail, -): Promise<void> { - await ws.db - .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) - .runReadWrite(async (tx) => { - const wsr = await tx.withdrawalGroups.get(withdrawalGroupId); - if (!wsr) { - return; - } - if (!wsr.retryInfo) { - logger.reportBreak(); - } - wsr.lastError = err; - await tx.withdrawalGroups.put(wsr); - }); - ws.notify({ type: NotificationType.WithdrawOperationError, error: err }); -} - /** * Update the information about a reserve that is stored in the wallet * by querying the reserve's exchange. @@ -1071,28 +1007,9 @@ async function queryReserve( export async function processWithdrawalGroup( ws: InternalWalletState, withdrawalGroupId: string, - options: { - forceNow?: boolean; - } = {}, -): Promise<void> { - const onOpErr = (e: TalerErrorDetail): Promise<void> => - reportWithdrawalError(ws, withdrawalGroupId, e); - await guardOperationException( - () => processWithdrawGroupImpl(ws, withdrawalGroupId, options), - onOpErr, - ); -} - -async function processWithdrawGroupImpl( - ws: InternalWalletState, - withdrawalGroupId: string, - options: { - forceNow?: boolean; - } = {}, -): Promise<void> { - const forceNow = options.forceNow ?? false; - logger.trace("processing withdraw group", withdrawalGroupId); - await setupWithdrawalRetry(ws, withdrawalGroupId, { reset: forceNow }); + options: {} = {}, +): Promise<OperationAttemptResult> { + logger.trace("processing withdrawal group", withdrawalGroupId); const withdrawalGroup = await ws.db .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) .runReadOnly(async (tx) => { @@ -1106,24 +1023,44 @@ async function processWithdrawGroupImpl( switch (withdrawalGroup.reserveStatus) { case ReserveRecordStatus.RegisteringBank: await processReserveBankStatus(ws, withdrawalGroupId); - return await processWithdrawGroupImpl(ws, withdrawalGroupId, { + return await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true, }); case ReserveRecordStatus.QueryingStatus: { const res = await queryReserve(ws, withdrawalGroupId); if (res.ready) { - return await processWithdrawGroupImpl(ws, withdrawalGroupId, { + return await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true, }); } - return; + return { + type: OperationAttemptResultType.Pending, + result: undefined, + }; + } + case ReserveRecordStatus.WaitConfirmBank: { + const res = await processReserveBankStatus(ws, withdrawalGroupId); + switch (res.status) { + case BankStatusResultCode.Aborted: + case BankStatusResultCode.Done: + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; + case BankStatusResultCode.Waiting: { + return { + type: OperationAttemptResultType.Pending, + result: undefined, + }; + } + } } - case ReserveRecordStatus.WaitConfirmBank: - await processReserveBankStatus(ws, withdrawalGroupId); - return; case ReserveRecordStatus.BankAborted: // FIXME - return; + return { + type: OperationAttemptResultType.Pending, + result: undefined, + }; case ReserveRecordStatus.Dormant: // We can try to withdraw, nothing needs to be done with the reserve. break; @@ -1150,11 +1087,12 @@ async function processWithdrawGroupImpl( return; } wg.operationStatus = OperationStatus.Finished; - delete wg.lastError; - delete wg.retryInfo; await tx.withdrawalGroups.put(wg); }); - return; + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms @@ -1175,7 +1113,7 @@ async function processWithdrawGroupImpl( if (ws.batchWithdrawal) { const resp = await processPlanchetExchangeBatchRequest(ws, withdrawalGroup); if (!resp) { - return; + throw Error("unable to do batch withdrawal"); } for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { work.push( @@ -1236,8 +1174,6 @@ async function processWithdrawGroupImpl( finishedForFirstTime = true; wg.timestampFinish = TalerProtocolTimestamp.now(); wg.operationStatus = OperationStatus.Finished; - delete wg.lastError; - wg.retryInfo = RetryInfo.reset(); } await tx.withdrawalGroups.put(wg); @@ -1259,6 +1195,11 @@ async function processWithdrawGroupImpl( reservePub: withdrawalGroup.reservePub, }); } + + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; } const AGE_MASK_GROUPS = "8:10:12:14:16:18".split(":").map(n => parseInt(n, 10)) @@ -1529,10 +1470,7 @@ async function getWithdrawalGroupRecordTx( } export function getReserveRequestTimeout(r: WithdrawalGroupRecord): Duration { - return Duration.max( - { d_ms: 60000 }, - Duration.min({ d_ms: 5000 }, RetryInfo.getDuration(r.retryInfo)), - ); + return { d_ms: 60000 }; } export function getBankStatusUrl(talerWithdrawUri: string): string { @@ -1611,17 +1549,25 @@ async function registerReserveWithBank( ); r.reserveStatus = ReserveRecordStatus.WaitConfirmBank; r.operationStatus = OperationStatus.Pending; - r.retryInfo = RetryInfo.reset(); await tx.withdrawalGroups.put(r); }); ws.notify({ type: NotificationType.ReserveRegisteredWithBank }); - return processReserveBankStatus(ws, withdrawalGroupId); +} + +enum BankStatusResultCode { + Done = "done", + Waiting = "waiting", + Aborted = "aborted", +} + +interface BankStatusResult { + status: BankStatusResultCode; } async function processReserveBankStatus( ws: InternalWalletState, withdrawalGroupId: string, -): Promise<void> { +): Promise<BankStatusResult> { const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, { withdrawalGroupId, }); @@ -1630,17 +1576,21 @@ async function processReserveBankStatus( case ReserveRecordStatus.RegisteringBank: break; default: - return; + return { + status: BankStatusResultCode.Done, + }; } if ( withdrawalGroup.wgInfo.withdrawalType != WithdrawalRecordType.BankIntegrated ) { - throw Error(); + throw Error("wrong withdrawal record type"); } const bankInfo = withdrawalGroup.wgInfo.bankInfo; if (!bankInfo) { - return; + return { + status: BankStatusResultCode.Done, + }; } const bankStatusUrl = getBankStatusUrl(bankInfo.talerWithdrawUri); @@ -1678,10 +1628,11 @@ async function processReserveBankStatus( r.wgInfo.bankInfo.timestampBankConfirmed = now; r.reserveStatus = ReserveRecordStatus.BankAborted; r.operationStatus = OperationStatus.Finished; - r.retryInfo = RetryInfo.reset(); await tx.withdrawalGroups.put(r); }); - return; + return { + status: BankStatusResultCode.Aborted, + }; } // Bank still needs to know our reserve info @@ -1722,15 +1673,17 @@ async function processReserveBankStatus( r.wgInfo.bankInfo.timestampBankConfirmed = now; r.reserveStatus = ReserveRecordStatus.QueryingStatus; r.operationStatus = OperationStatus.Pending; - r.retryInfo = RetryInfo.reset(); } else { logger.info("withdrawal: transfer not yet confirmed by bank"); r.wgInfo.bankInfo.confirmUrl = status.confirm_transfer_url; r.senderWire = status.sender_wire; - r.retryInfo = RetryInfo.increment(r.retryInfo); } await tx.withdrawalGroups.put(r); }); + + return { + status: BankStatusResultCode.Done, + }; } export async function internalCreateWithdrawalGroup( @@ -1775,14 +1728,12 @@ export async function internalCreateWithdrawalGroup( exchangeBaseUrl: canonExchange, instructedAmount: amount, timestampStart: now, - lastError: undefined, operationStatus: OperationStatus.Pending, rawWithdrawalAmount: initialDenomSel.totalWithdrawCost, secretSeed, reservePriv: reserveKeyPair.priv, reservePub: reserveKeyPair.pub, reserveStatus: args.reserveStatus, - retryInfo: RetryInfo.reset(), withdrawalGroupId, restrictAge: args.restrictAge, senderWire: undefined, diff --git a/packages/taler-wallet-core/src/pending-types.ts b/packages/taler-wallet-core/src/pending-types.ts index 39df9d0cb..61c7136df 100644 --- a/packages/taler-wallet-core/src/pending-types.ts +++ b/packages/taler-wallet-core/src/pending-types.ts @@ -30,14 +30,12 @@ import { AbsoluteTime, TalerProtocolTimestamp, } from "@gnu-taler/taler-util"; -import { ReserveRecordStatus } from "./db.js"; import { RetryInfo } from "./util/retries.js"; export enum PendingTaskType { ExchangeUpdate = "exchange-update", ExchangeCheckRefresh = "exchange-check-refresh", Pay = "pay", - ProposalChoice = "proposal-choice", ProposalDownload = "proposal-download", Refresh = "refresh", Recoup = "recoup", @@ -109,7 +107,7 @@ export interface PendingRefreshTask { lastError?: TalerErrorDetail; refreshGroupId: string; finishedPerCoin: boolean[]; - retryInfo: RetryInfo; + retryInfo?: RetryInfo; } /** @@ -126,17 +124,6 @@ export interface PendingProposalDownloadTask { } /** - * User must choose whether to accept or reject the merchant's - * proposed contract terms. - */ -export interface PendingProposalChoiceOperation { - type: PendingTaskType.ProposalChoice; - merchantBaseUrl: string; - proposalTimestamp: AbsoluteTime; - proposalId: string; -} - -/** * The wallet is picking up a tip that the user has accepted. */ export interface PendingTipPickupTask { @@ -165,14 +152,14 @@ export interface PendingPayTask { export interface PendingRefundQueryTask { type: PendingTaskType.RefundQuery; proposalId: string; - retryInfo: RetryInfo; + retryInfo?: RetryInfo; lastError: TalerErrorDetail | undefined; } export interface PendingRecoupTask { type: PendingTaskType.Recoup; recoupGroupId: string; - retryInfo: RetryInfo; + retryInfo?: RetryInfo; lastError: TalerErrorDetail | undefined; } @@ -206,6 +193,11 @@ export interface PendingTaskInfoCommon { type: PendingTaskType; /** + * Unique identifier for the pending task. + */ + id: string; + + /** * Set to true if the operation indicates that something is really in progress, * as opposed to some regular scheduled operation that can be tried later. */ diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts index e954e5c78..65b67eff2 100644 --- a/packages/taler-wallet-core/src/util/query.ts +++ b/packages/taler-wallet-core/src/util/query.ts @@ -152,6 +152,19 @@ class ResultStream<T> { return arr; } + async mapAsync<R>(f: (x: T) => Promise<R>): Promise<R[]> { + const arr: R[] = []; + while (true) { + const x = await this.next(); + if (x.hasValue) { + arr.push(await f(x.value)); + } else { + break; + } + } + return arr; + } + async forEachAsync(f: (x: T) => Promise<void>): Promise<void> { while (true) { const x = await this.next(); @@ -572,6 +585,26 @@ function makeWriteContext( return ctx; } +const storeList = [ + { name: "foo" as const, value: 1 as const }, + { name: "bar" as const, value: 2 as const }, +]; +// => { foo: { value: 1}, bar: {value: 2} } + +type StoreList = typeof storeList; + +type StoreNames = StoreList[number] extends { name: infer I } ? I : never; + +type H = StoreList[number] & { name: "foo"}; + +type Cleanup<V> = V extends { name: infer N, value: infer X} ? {name: N, value: X} : never; + +type G = { + [X in StoreNames]: { + X: StoreList[number] & { name: X }; + }; +}; + /** * Type-safe access to a database with a particular store map. * @@ -584,6 +617,14 @@ export class DbAccess<StoreMap> { return this.db; } + mktx2< + StoreNames extends keyof StoreMap, + Stores extends StoreMap[StoreNames], + StoreList extends Stores[], + >(namePicker: (x: StoreMap) => StoreList): StoreList { + return namePicker(this.stores); + } + mktx< PickerType extends (x: StoreMap) => unknown, BoundStores extends GetPickerType<PickerType, StoreMap>, diff --git a/packages/taler-wallet-core/src/util/retries.ts b/packages/taler-wallet-core/src/util/retries.ts index 13a05b385..3a41e8348 100644 --- a/packages/taler-wallet-core/src/util/retries.ts +++ b/packages/taler-wallet-core/src/util/retries.ts @@ -21,7 +21,29 @@ /** * Imports. */ -import { AbsoluteTime, Duration } from "@gnu-taler/taler-util"; +import { + AbsoluteTime, + Duration, + TalerErrorDetail, +} from "@gnu-taler/taler-util"; +import { + BackupProviderRecord, + DepositGroupRecord, + ExchangeRecord, + OperationAttemptResult, + OperationAttemptResultType, + ProposalRecord, + PurchaseRecord, + RecoupGroupRecord, + RefreshGroupRecord, + TipRecord, + WalletStoresV1, + WithdrawalGroupRecord, +} from "../db.js"; +import { TalerError } from "../errors.js"; +import { InternalWalletState } from "../internal-wallet-state.js"; +import { PendingTaskType } from "../pending-types.js"; +import { GetReadWriteAccess } from "./query.js"; export interface RetryInfo { firstTry: AbsoluteTime; @@ -108,3 +130,95 @@ export namespace RetryInfo { return r2; } } + +export namespace RetryTags { + export function forWithdrawal(wg: WithdrawalGroupRecord): string { + return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}`; + } + export function forExchangeUpdate(exch: ExchangeRecord): string { + return `${PendingTaskType.ExchangeUpdate}:${exch.baseUrl}`; + } + export function forExchangeCheckRefresh(exch: ExchangeRecord): string { + return `${PendingTaskType.ExchangeCheckRefresh}:${exch.baseUrl}`; + } + export function forProposalClaim(pr: ProposalRecord): string { + return `${PendingTaskType.ProposalDownload}:${pr.proposalId}`; + } + export function forTipPickup(tipRecord: TipRecord): string { + return `${PendingTaskType.TipPickup}:${tipRecord.walletTipId}`; + } + export function forRefresh(refreshGroupRecord: RefreshGroupRecord): string { + return `${PendingTaskType.TipPickup}:${refreshGroupRecord.refreshGroupId}`; + } + export function forPay(purchaseRecord: PurchaseRecord): string { + return `${PendingTaskType.Pay}:${purchaseRecord.proposalId}`; + } + export function forRefundQuery(purchaseRecord: PurchaseRecord): string { + return `${PendingTaskType.RefundQuery}:${purchaseRecord.proposalId}`; + } + export function forRecoup(recoupRecord: RecoupGroupRecord): string { + return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}`; + } + export function forDeposit(depositRecord: DepositGroupRecord): string { + return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}`; + } + export function forBackup(backupRecord: BackupProviderRecord): string { + return `${PendingTaskType.Backup}:${backupRecord.baseUrl}`; + } +} + +export async function scheduleRetryInTx( + ws: InternalWalletState, + tx: GetReadWriteAccess<{ + operationRetries: typeof WalletStoresV1.operationRetries; + }>, + opId: string, + errorDetail?: TalerErrorDetail, +): Promise<void> { + let retryRecord = await tx.operationRetries.get(opId); + if (!retryRecord) { + retryRecord = { + id: opId, + retryInfo: RetryInfo.reset(), + }; + if (errorDetail) { + retryRecord.lastError = errorDetail; + } + } else { + retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); + if (errorDetail) { + retryRecord.lastError = errorDetail; + } else { + delete retryRecord.lastError; + } + } + await tx.operationRetries.put(retryRecord); +} + +export async function scheduleRetry( + ws: InternalWalletState, + opId: string, + errorDetail?: TalerErrorDetail, +): Promise<void> { + return await ws.db + .mktx((x) => ({ operationRetries: x.operationRetries })) + .runReadWrite(async (tx) => { + scheduleRetryInTx(ws, tx, opId, errorDetail); + }); +} + +/** + * Run an operation handler, expect a success result and extract the success value. + */ +export async function runOperationHandlerForResult<T>( + res: OperationAttemptResult<T>, +): Promise<T> { + switch (res.type) { + case OperationAttemptResultType.Finished: + return res.result; + case OperationAttemptResultType.Error: + throw TalerError.fromUncheckedDetail(res.errorDetail); + default: + throw Error(`unexpected operation result (${res.type})`); + } +} diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 779fe9528..f041d9aa9 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -90,6 +90,7 @@ import { ExchangeListItem, OperationMap, FeeDescription, + TalerErrorDetail, } from "@gnu-taler/taler-util"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { @@ -101,9 +102,15 @@ import { CoinSourceType, exportDb, importDb, + OperationAttemptResult, + OperationAttemptResultType, WalletStoresV1, } from "./db.js"; -import { getErrorDetailFromException, TalerError } from "./errors.js"; +import { + getErrorDetailFromException, + makeErrorDetail, + TalerError, +} from "./errors.js"; import { createDenominationTimeline } from "./index.browser.js"; import { DenomInfo, @@ -143,6 +150,7 @@ import { getExchangeRequestTimeout, getExchangeTrust, updateExchangeFromUrl, + updateExchangeFromUrlHandler, updateExchangeTermsOfService, } from "./operations/exchanges.js"; import { getMerchantInfo } from "./operations/merchants.js"; @@ -162,7 +170,11 @@ import { initiatePeerToPeerPush, } from "./operations/peer-to-peer.js"; import { getPendingOperations } from "./operations/pending.js"; -import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js"; +import { + createRecoupGroup, + processRecoupGroup, + processRecoupGroupHandler, +} from "./operations/recoup.js"; import { autoRefresh, createRefreshGroup, @@ -210,6 +222,7 @@ import { openPromise, } from "./util/promiseUtils.js"; import { DbAccess, GetReadWriteAccess } from "./util/query.js"; +import { RetryInfo, runOperationHandlerForResult } from "./util/retries.js"; import { TimerAPI, TimerGroup } from "./util/timer.js"; import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, @@ -237,7 +250,12 @@ async function getWithdrawalDetailsForAmount( amount: AmountJson, restrictAge: number | undefined, ): Promise<ManualWithdrawalDetails> { - const wi = await getExchangeWithdrawalInfo(ws, exchangeBaseUrl, amount, restrictAge); + const wi = await getExchangeWithdrawalInfo( + ws, + exchangeBaseUrl, + amount, + restrictAge, + ); const paytoUris = wi.exchangeDetails.wireInfo.accounts.map( (x) => x.payto_uri, ); @@ -253,55 +271,153 @@ async function getWithdrawalDetailsForAmount( } /** - * Execute one operation based on the pending operation info record. + * Call the right handler for a pending operation without doing + * any special error handling. */ -async function processOnePendingOperation( +async function callOperationHandler( ws: InternalWalletState, pending: PendingTaskInfo, forceNow = false, -): Promise<void> { - logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`); +): Promise<OperationAttemptResult<unknown, unknown>> { switch (pending.type) { case PendingTaskType.ExchangeUpdate: - await updateExchangeFromUrl(ws, pending.exchangeBaseUrl, { + return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl, { forceNow, }); - break; case PendingTaskType.Refresh: - await processRefreshGroup(ws, pending.refreshGroupId, { forceNow }); - break; + return await processRefreshGroup(ws, pending.refreshGroupId, { + forceNow, + }); case PendingTaskType.Withdraw: await processWithdrawalGroup(ws, pending.withdrawalGroupId, { forceNow }); break; case PendingTaskType.ProposalDownload: - await processDownloadProposal(ws, pending.proposalId, { forceNow }); - break; + return await processDownloadProposal(ws, pending.proposalId, { + forceNow, + }); case PendingTaskType.TipPickup: - await processTip(ws, pending.tipId, { forceNow }); - break; + return await processTip(ws, pending.tipId, { forceNow }); case PendingTaskType.Pay: - await processPurchasePay(ws, pending.proposalId, { forceNow }); - break; + return await processPurchasePay(ws, pending.proposalId, { forceNow }); case PendingTaskType.RefundQuery: - await processPurchaseQueryRefund(ws, pending.proposalId, { forceNow }); - break; + return await processPurchaseQueryRefund(ws, pending.proposalId, { + forceNow, + }); case PendingTaskType.Recoup: - await processRecoupGroup(ws, pending.recoupGroupId, { forceNow }); - break; + return await processRecoupGroupHandler(ws, pending.recoupGroupId, { + forceNow, + }); case PendingTaskType.ExchangeCheckRefresh: - await autoRefresh(ws, pending.exchangeBaseUrl); - break; + return await autoRefresh(ws, pending.exchangeBaseUrl); case PendingTaskType.Deposit: { - await processDepositGroup(ws, pending.depositGroupId, { + return await processDepositGroup(ws, pending.depositGroupId, { forceNow, }); - break; } case PendingTaskType.Backup: - await processBackupForProvider(ws, pending.backupProviderBaseUrl); - break; + return await processBackupForProvider(ws, pending.backupProviderBaseUrl); default: - assertUnreachable(pending); + return assertUnreachable(pending); + } + throw Error("not reached"); +} + +export async function storeOperationError( + ws: InternalWalletState, + pendingTaskId: string, + e: TalerErrorDetail, +): Promise<void> { + await ws.db + .mktx((x) => ({ operationRetries: x.operationRetries })) + .runReadWrite(async (tx) => { + const retryRecord = await tx.operationRetries.get(pendingTaskId); + if (!retryRecord) { + return; + } + retryRecord.lastError = e; + retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); + await tx.operationRetries.put(retryRecord); + }); +} + +export async function storeOperationFinished( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<void> { + await ws.db + .mktx((x) => ({ operationRetries: x.operationRetries })) + .runReadWrite(async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }); +} + +export async function storeOperationPending( + ws: InternalWalletState, + pendingTaskId: string, +): Promise<void> { + await ws.db + .mktx((x) => ({ operationRetries: x.operationRetries })) + .runReadWrite(async (tx) => { + const retryRecord = await tx.operationRetries.get(pendingTaskId); + if (!retryRecord) { + return; + } + delete retryRecord.lastError; + retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); + await tx.operationRetries.put(retryRecord); + }); +} + +/** + * Execute one operation based on the pending operation info record. + * + * Store success/failure result in the database. + */ +async function processOnePendingOperation( + ws: InternalWalletState, + pending: PendingTaskInfo, + forceNow = false, +): Promise<void> { + logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`); + let maybeError: TalerErrorDetail | undefined; + try { + const resp = await callOperationHandler(ws, pending, forceNow); + switch (resp.type) { + case OperationAttemptResultType.Error: + return await storeOperationError(ws, pending.id, resp.errorDetail); + case OperationAttemptResultType.Finished: + return await storeOperationFinished(ws, pending.id); + case OperationAttemptResultType.Pending: + return await storeOperationPending(ws, pending.id); + case OperationAttemptResultType.Longpoll: + break; + } + } catch (e: any) { + if ( + e instanceof TalerError && + e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED) + ) { + logger.warn("operation processed resulted in error"); + logger.warn(`error was: ${j2s(e.errorDetail)}`); + maybeError = e.errorDetail; + } else { + // This is a bug, as we expect pending operations to always + // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED + // or return something. + logger.error("Uncaught exception", e); + ws.notify({ + type: NotificationType.InternalError, + message: "uncaught exception", + exception: e, + }); + maybeError = makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + { + stack: e.stack, + }, + `unexpected exception (message: ${e.message})`, + ); + } } } @@ -317,18 +433,7 @@ export async function runPending( if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) { continue; } - try { - await processOnePendingOperation(ws, p, forceNow); - } catch (e) { - if (e instanceof TalerError) { - console.error( - "Pending operation failed:", - JSON.stringify(e.errorDetail, undefined, 2), - ); - } else { - console.error(e); - } - } + await processOnePendingOperation(ws, p, forceNow); } } @@ -420,27 +525,7 @@ async function runTaskLoop( if (!AbsoluteTime.isExpired(p.timestampDue)) { continue; } - try { - await processOnePendingOperation(ws, p); - } catch (e) { - if ( - e instanceof TalerError && - e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED) - ) { - logger.warn("operation processed resulted in error"); - logger.warn(`error was: ${j2s(e.errorDetail)}`); - } else { - // This is a bug, as we expect pending operations to always - // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED - // or return something. - logger.error("Uncaught exception", e); - ws.notify({ - type: NotificationType.InternalError, - message: "uncaught exception", - exception: e, - }); - } - } + await processOnePendingOperation(ws, p); ws.notify({ type: NotificationType.PendingOperationProcessed, }); @@ -629,7 +714,7 @@ async function getExchangeDetailedInfo( denominations: x.denominations, })) .runReadOnly(async (tx) => { - const ex = await tx.exchanges.get(exchangeBaseurl) + const ex = await tx.exchanges.get(exchangeBaseurl); const dp = ex?.detailsPointer; if (!dp) { return; @@ -663,11 +748,11 @@ async function getExchangeDetailedInfo( wireInfo: exchangeDetails.wireInfo, }, denominations: denominations, - } + }; }); if (!exchange) { - throw Error(`exchange with base url "${exchangeBaseurl}" not found`) + throw Error(`exchange with base url "${exchangeBaseurl}" not found`); } const feesDescription: OperationMap<FeeDescription[]> = { @@ -809,6 +894,7 @@ declare const __GIT_HASH__: string; const VERSION = typeof __VERSION__ !== "undefined" ? __VERSION__ : "dev"; const GIT_HASH = typeof __GIT_HASH__ !== "undefined" ? __GIT_HASH__ : undefined; + /** * Implementation of the "wallet-core" API. */ @@ -908,7 +994,7 @@ async function dispatchRequestInternal( ws, req.exchangeBaseUrl, Amounts.parseOrThrow(req.amount), - req.restrictAge + req.restrictAge, ); } case "getBalances": { @@ -1106,7 +1192,7 @@ async function dispatchRequestInternal( ws, req.exchange, amount, - undefined + undefined, ); const wres = await createManualWithdrawal(ws, { amount: amount, |