aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2022-09-05 18:12:30 +0200
committerFlorian Dold <florian@dold.me>2022-09-13 16:10:41 +0200
commit13e7a674778754c0ed641dfd428e3d6b2b71ab2d (patch)
treef2a0e5029305a9b818416fd94908ef77cdd7446f
parentf9f2911c761af1c8ed1c323dcd414cbaa9eeae7c (diff)
wallet-core: uniform retry handling
-rw-r--r--packages/taler-util/src/time.ts5
-rw-r--r--packages/taler-util/src/walletTypes.ts84
-rw-r--r--packages/taler-wallet-core/src/db.ts174
-rw-r--r--packages/taler-wallet-core/src/operations/backup/import.ts20
-rw-r--r--packages/taler-wallet-core/src/operations/backup/index.ts193
-rw-r--r--packages/taler-wallet-core/src/operations/deposits.ts102
-rw-r--r--packages/taler-wallet-core/src/operations/exchanges.ts112
-rw-r--r--packages/taler-wallet-core/src/operations/pay.ts319
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts170
-rw-r--r--packages/taler-wallet-core/src/operations/recoup.ts85
-rw-r--r--packages/taler-wallet-core/src/operations/refresh.ts102
-rw-r--r--packages/taler-wallet-core/src/operations/refund.ts106
-rw-r--r--packages/taler-wallet-core/src/operations/tip.ts170
-rw-r--r--packages/taler-wallet-core/src/operations/transactions.ts50
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts197
-rw-r--r--packages/taler-wallet-core/src/pending-types.ts24
-rw-r--r--packages/taler-wallet-core/src/util/query.ts41
-rw-r--r--packages/taler-wallet-core/src/util/retries.ts116
-rw-r--r--packages/taler-wallet-core/src/wallet.ts218
19 files changed, 1016 insertions, 1272 deletions
diff --git a/packages/taler-util/src/time.ts b/packages/taler-util/src/time.ts
index 9c25af400..1e79b943b 100644
--- a/packages/taler-util/src/time.ts
+++ b/packages/taler-util/src/time.ts
@@ -100,6 +100,10 @@ export namespace Duration {
return durationMin(d1, d2);
}
+ export function multiply(d1: Duration, n: number): Duration {
+ return durationMul(d1, n);
+ }
+
export function toIntegerYears(d: Duration): number {
if (typeof d.d_ms !== "number") {
throw Error("infinite duration");
@@ -357,7 +361,6 @@ export const codecForAbsoluteTime: Codec<AbsoluteTime> = {
},
};
-
export const codecForTimestamp: Codec<TalerProtocolTimestamp> = {
decode(x: any, c?: Context): TalerProtocolTimestamp {
// Compatibility, should be removed soon.
diff --git a/packages/taler-util/src/walletTypes.ts b/packages/taler-util/src/walletTypes.ts
index a993f29a0..c10e3be40 100644
--- a/packages/taler-util/src/walletTypes.ts
+++ b/packages/taler-util/src/walletTypes.ts
@@ -32,7 +32,12 @@ import {
codecForAmountJson,
codecForAmountString,
} from "./amounts.js";
-import { AbsoluteTime, codecForAbsoluteTime, codecForTimestamp, TalerProtocolTimestamp } from "./time.js";
+import {
+ AbsoluteTime,
+ codecForAbsoluteTime,
+ codecForTimestamp,
+ TalerProtocolTimestamp,
+} from "./time.js";
import {
buildCodecForObject,
codecForString,
@@ -797,46 +802,43 @@ const codecForExchangeTos = (): Codec<ExchangeTos> =>
.property("content", codecOptional(codecForString()))
.build("ExchangeTos");
-export const codecForFeeDescriptionPair =
- (): Codec<FeeDescriptionPair> =>
- buildCodecForObject<FeeDescriptionPair>()
- .property("value", codecForAmountJson())
- .property("from", codecForAbsoluteTime)
- .property("until", codecForAbsoluteTime)
- .property("left", codecOptional(codecForAmountJson()))
- .property("right", codecOptional(codecForAmountJson()))
- .build("FeeDescriptionPair");
-
-export const codecForFeeDescription =
- (): Codec<FeeDescription> =>
- buildCodecForObject<FeeDescription>()
- .property("value", codecForAmountJson())
- .property("from", codecForAbsoluteTime)
- .property("until", codecForAbsoluteTime)
- .property("fee", codecOptional(codecForAmountJson()))
- .build("FeeDescription");
-
-
-export const codecForFeesByOperations =
- (): Codec<OperationMap<FeeDescription[]>> =>
- buildCodecForObject<OperationMap<FeeDescription[]>>()
- .property("deposit", codecForList(codecForFeeDescription()))
- .property("withdraw", codecForList(codecForFeeDescription()))
- .property("refresh", codecForList(codecForFeeDescription()))
- .property("refund", codecForList(codecForFeeDescription()))
- .build("FeesByOperations");
-
-export const codecForExchangeFullDetails =
- (): Codec<ExchangeFullDetails> =>
- buildCodecForObject<ExchangeFullDetails>()
- .property("currency", codecForString())
- .property("exchangeBaseUrl", codecForString())
- .property("paytoUris", codecForList(codecForString()))
- .property("tos", codecForExchangeTos())
- .property("auditors", codecForList(codecForExchangeAuditor()))
- .property("wireInfo", codecForWireInfo())
- .property("feesDescription", codecForFeesByOperations())
- .build("ExchangeFullDetails");
+export const codecForFeeDescriptionPair = (): Codec<FeeDescriptionPair> =>
+ buildCodecForObject<FeeDescriptionPair>()
+ .property("value", codecForAmountJson())
+ .property("from", codecForAbsoluteTime)
+ .property("until", codecForAbsoluteTime)
+ .property("left", codecOptional(codecForAmountJson()))
+ .property("right", codecOptional(codecForAmountJson()))
+ .build("FeeDescriptionPair");
+
+export const codecForFeeDescription = (): Codec<FeeDescription> =>
+ buildCodecForObject<FeeDescription>()
+ .property("value", codecForAmountJson())
+ .property("from", codecForAbsoluteTime)
+ .property("until", codecForAbsoluteTime)
+ .property("fee", codecOptional(codecForAmountJson()))
+ .build("FeeDescription");
+
+export const codecForFeesByOperations = (): Codec<
+ OperationMap<FeeDescription[]>
+> =>
+ buildCodecForObject<OperationMap<FeeDescription[]>>()
+ .property("deposit", codecForList(codecForFeeDescription()))
+ .property("withdraw", codecForList(codecForFeeDescription()))
+ .property("refresh", codecForList(codecForFeeDescription()))
+ .property("refund", codecForList(codecForFeeDescription()))
+ .build("FeesByOperations");
+
+export const codecForExchangeFullDetails = (): Codec<ExchangeFullDetails> =>
+ buildCodecForObject<ExchangeFullDetails>()
+ .property("currency", codecForString())
+ .property("exchangeBaseUrl", codecForString())
+ .property("paytoUris", codecForList(codecForString()))
+ .property("tos", codecForExchangeTos())
+ .property("auditors", codecForList(codecForExchangeAuditor()))
+ .property("wireInfo", codecForWireInfo())
+ .property("feesDescription", codecForFeesByOperations())
+ .build("ExchangeFullDetails");
export const codecForExchangeListItem = (): Codec<ExchangeListItem> =>
buildCodecForObject<ExchangeListItem>()
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts
index 9d41f2114..1052e302d 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -361,14 +361,14 @@ export interface ExchangeDetailsRecord {
* Terms of service text or undefined if not downloaded yet.
*
* This is just used as a cache of the last downloaded ToS.
- *
+ *
* FIXME: Put in separate object store!
*/
termsOfServiceText: string | undefined;
/**
* content-type of the last downloaded termsOfServiceText.
- *
+ *
* * FIXME: Put in separate object store!
*/
termsOfServiceContentType: string | undefined;
@@ -455,17 +455,6 @@ export interface ExchangeRecord {
nextRefreshCheck: TalerProtocolTimestamp;
/**
- * Last error (if any) for fetching updated information about the
- * exchange.
- */
- lastError?: TalerErrorDetail;
-
- /**
- * Retry status for fetching updated information about the exchange.
- */
- retryInfo?: RetryInfo;
-
- /**
* Public key of the reserve that we're currently using for
* receiving P2P payments.
*/
@@ -734,24 +723,12 @@ export interface ProposalRecord {
* Session ID we got when downloading the contract.
*/
downloadSessionId?: string;
-
- /**
- * Retry info, even present when the operation isn't active to allow indexing
- * on the next retry timestamp.
- *
- * FIXME: Clarify what we even retry.
- */
- retryInfo?: RetryInfo;
-
- lastError: TalerErrorDetail | undefined;
}
/**
* Status of a tip we got from a merchant.
*/
export interface TipRecord {
- lastError: TalerErrorDetail | undefined;
-
/**
* Has the user accepted the tip? Only after the tip has been accepted coins
* withdrawn from the tip may be used.
@@ -810,12 +787,6 @@ export interface TipRecord {
* from the merchant.
*/
pickedUpTimestamp: TalerProtocolTimestamp | undefined;
-
- /**
- * Retry info, even present when the operation isn't active to allow indexing
- * on the next retry timestamp.
- */
- retryInfo: RetryInfo;
}
export enum RefreshCoinStatus {
@@ -837,16 +808,7 @@ export enum OperationStatus {
export interface RefreshGroupRecord {
operationStatus: OperationStatus;
- /**
- * Retry info, even present when the operation isn't active to allow indexing
- * on the next retry timestamp.
- *
- * FIXME: No, this can be optional, indexing is still possible
- */
- retryInfo: RetryInfo;
-
- lastError: TalerErrorDetail | undefined;
-
+ // FIXME: Put this into a different object store?
lastErrorPerCoin: { [coinIndex: number]: TalerErrorDetail };
/**
@@ -1117,6 +1079,8 @@ export interface PurchaseRecord {
/**
* Pending refunds for the purchase. A refund is pending
* when the merchant reports a transient error from the exchange.
+ *
+ * FIXME: Put this into a separate object store?
*/
refunds: { [refundKey: string]: WalletRefundItem };
@@ -1132,6 +1096,7 @@ export interface PurchaseRecord {
lastSessionId: string | undefined;
/**
+ * Do we still need to post the deposit permissions to the merchant?
* Set for the first payment, or on re-plays.
*/
paymentSubmitPending: boolean;
@@ -1142,22 +1107,6 @@ export interface PurchaseRecord {
*/
refundQueryRequested: boolean;
- abortStatus: AbortStatus;
-
- payRetryInfo?: RetryInfo;
-
- lastPayError: TalerErrorDetail | undefined;
-
- /**
- * Retry information for querying the refund status with the merchant.
- */
- refundStatusRetryInfo: RetryInfo;
-
- /**
- * Last error (or undefined) for querying the refund status with the merchant.
- */
- lastRefundStatusError: TalerErrorDetail | undefined;
-
/**
* Continue querying the refund status until this deadline has expired.
*/
@@ -1174,6 +1123,11 @@ export interface PurchaseRecord {
* an error where it doesn't make sense to retry.
*/
payFrozen?: boolean;
+
+ /**
+ * FIXME: How does this interact with payFrozen?
+ */
+ abortStatus: AbortStatus;
}
export const WALLET_BACKUP_STATE_KEY = "walletBackupState";
@@ -1184,9 +1138,9 @@ export const WALLET_BACKUP_STATE_KEY = "walletBackupState";
*/
export type ConfigRecord =
| {
- key: typeof WALLET_BACKUP_STATE_KEY;
- value: WalletBackupConfState;
- }
+ key: typeof WALLET_BACKUP_STATE_KEY;
+ value: WalletBackupConfState;
+ }
| { key: "currencyDefaultsApplied"; value: boolean };
export interface WalletBackupConfState {
@@ -1368,13 +1322,6 @@ export interface WithdrawalGroupRecord {
* FIXME: Should this not also include a timestamp for more logical merging?
*/
denomSelUid: string;
-
- /**
- * Retry info.
- */
- retryInfo?: RetryInfo;
-
- lastError: TalerErrorDetail | undefined;
}
export interface BankWithdrawUriRecord {
@@ -1432,16 +1379,6 @@ export interface RecoupGroupRecord {
* after all individual recoups are done.
*/
scheduleRefreshCoins: string[];
-
- /**
- * Retry info.
- */
- retryInfo: RetryInfo;
-
- /**
- * Last error that occurred, if any.
- */
- lastError: TalerErrorDetail | undefined;
}
export enum BackupProviderStateTag {
@@ -1452,17 +1389,15 @@ export enum BackupProviderStateTag {
export type BackupProviderState =
| {
- tag: BackupProviderStateTag.Provisional;
- }
+ tag: BackupProviderStateTag.Provisional;
+ }
| {
- tag: BackupProviderStateTag.Ready;
- nextBackupTimestamp: TalerProtocolTimestamp;
- }
+ tag: BackupProviderStateTag.Ready;
+ nextBackupTimestamp: TalerProtocolTimestamp;
+ }
| {
- tag: BackupProviderStateTag.Retrying;
- retryInfo: RetryInfo;
- lastError?: TalerErrorDetail;
- };
+ tag: BackupProviderStateTag.Retrying;
+ };
export interface BackupProviderTerms {
supportedProtocolVersion: string;
@@ -1573,13 +1508,6 @@ export interface DepositGroupRecord {
timestampFinished: TalerProtocolTimestamp | undefined;
operationStatus: OperationStatus;
-
- lastError: TalerErrorDetail | undefined;
-
- /**
- * Retry info.
- */
- retryInfo?: RetryInfo;
}
/**
@@ -1749,6 +1677,60 @@ export interface ReserveRecord {
reservePriv: string;
}
+export interface OperationRetryRecord {
+ /**
+ * Unique identifier for the operation. Typically of
+ * the format `${opType}-${opUniqueKey}`
+ */
+ id: string;
+
+ lastError?: TalerErrorDetail;
+
+ retryInfo: RetryInfo;
+}
+
+export enum OperationAttemptResultType {
+ Finished = "finished",
+ Pending = "pending",
+ Error = "error",
+ Longpoll = "longpoll",
+}
+
+// FIXME: not part of DB!
+export type OperationAttemptResult<TSuccess = unknown, TPending = unknown> =
+ | OperationAttemptFinishedResult<TSuccess>
+ | OperationAttemptErrorResult
+ | OperationAttemptLongpollResult
+ | OperationAttemptPendingResult<TPending>;
+
+export namespace OperationAttemptResult {
+ export function finishedEmpty(): OperationAttemptResult<unknown, unknown> {
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
+ }
+}
+
+export interface OperationAttemptFinishedResult<T> {
+ type: OperationAttemptResultType.Finished;
+ result: T;
+}
+
+export interface OperationAttemptPendingResult<T> {
+ type: OperationAttemptResultType.Pending;
+ result: T;
+}
+
+export interface OperationAttemptErrorResult {
+ type: OperationAttemptResultType.Error;
+ errorDetail: TalerErrorDetail;
+}
+
+export interface OperationAttemptLongpollResult {
+ type: OperationAttemptResultType.Longpoll;
+}
+
export const WalletStoresV1 = {
coins: describeStore(
describeContents<CoinRecord>("coins", {
@@ -1913,6 +1895,12 @@ export const WalletStoresV1 = {
describeContents<TombstoneRecord>("tombstones", { keyPath: "id" }),
{},
),
+ operationRetries: describeStore(
+ describeContents<OperationRetryRecord>("operationRetries", {
+ keyPath: "id",
+ }),
+ {},
+ ),
ghostDepositGroups: describeStore(
describeContents<GhostDepositGroupRecord>("ghostDepositGroups", {
keyPath: "contractTermsHash",
diff --git a/packages/taler-wallet-core/src/operations/backup/import.ts b/packages/taler-wallet-core/src/operations/backup/import.ts
index ff7ff0d03..e8683265b 100644
--- a/packages/taler-wallet-core/src/operations/backup/import.ts
+++ b/packages/taler-wallet-core/src/operations/backup/import.ts
@@ -274,7 +274,6 @@ export async function importBackup(
protocolVersionRange: backupExchange.protocol_version_range,
},
permanent: true,
- retryInfo: RetryInfo.reset(),
lastUpdate: undefined,
nextUpdate: TalerProtocolTimestamp.now(),
nextRefreshCheck: TalerProtocolTimestamp.now(),
@@ -341,7 +340,7 @@ export async function importBackup(
}
const denomPubHash =
cryptoComp.rsaDenomPubToHash[
- backupDenomination.denom_pub.rsa_public_key
+ backupDenomination.denom_pub.rsa_public_key
];
checkLogicInvariant(!!denomPubHash);
const existingDenom = await tx.denominations.get([
@@ -426,7 +425,6 @@ export async function importBackup(
}
}
-
// FIXME: import reserves with new schema
// for (const backupReserve of backupExchangeDetails.reserves) {
@@ -517,7 +515,6 @@ export async function importBackup(
// }
// }
// }
-
}
for (const backupProposal of backupBlob.proposals) {
@@ -560,7 +557,7 @@ export async function importBackup(
const amount = Amounts.parseOrThrow(parsedContractTerms.amount);
const contractTermsHash =
cryptoComp.proposalIdToContractTermsHash[
- backupProposal.proposal_id
+ backupProposal.proposal_id
];
let maxWireFee: AmountJson;
if (parsedContractTerms.max_wire_fee) {
@@ -611,7 +608,6 @@ export async function importBackup(
}
await tx.proposals.put({
claimToken: backupProposal.claim_token,
- lastError: undefined,
merchantBaseUrl: backupProposal.merchant_base_url,
timestamp: backupProposal.timestamp,
orderId: backupProposal.order_id,
@@ -620,7 +616,6 @@ export async function importBackup(
cryptoComp.proposalNoncePrivToPub[backupProposal.nonce_priv],
proposalId: backupProposal.proposal_id,
repurchaseProposalId: backupProposal.repurchase_proposal_id,
- retryInfo: RetryInfo.reset(),
download,
proposalStatus,
});
@@ -706,7 +701,7 @@ export async function importBackup(
const amount = Amounts.parseOrThrow(parsedContractTerms.amount);
const contractTermsHash =
cryptoComp.proposalIdToContractTermsHash[
- backupPurchase.proposal_id
+ backupPurchase.proposal_id
];
let maxWireFee: AmountJson;
if (parsedContractTerms.max_wire_fee) {
@@ -755,10 +750,7 @@ export async function importBackup(
noncePriv: backupPurchase.nonce_priv,
noncePub:
cryptoComp.proposalNoncePrivToPub[backupPurchase.nonce_priv],
- lastPayError: undefined,
autoRefundDeadline: TalerProtocolTimestamp.never(),
- refundStatusRetryInfo: RetryInfo.reset(),
- lastRefundStatusError: undefined,
refundAwaiting: undefined,
timestampAccept: backupPurchase.timestamp_accept,
timestampFirstSuccessfulPay:
@@ -767,8 +759,6 @@ export async function importBackup(
merchantPaySig: backupPurchase.merchant_pay_sig,
lastSessionId: undefined,
abortStatus,
- // FIXME!
- payRetryInfo: RetryInfo.reset(),
download,
paymentSubmitPending:
!backupPurchase.timestamp_first_successful_pay,
@@ -851,7 +841,6 @@ export async function importBackup(
timestampCreated: backupRefreshGroup.timestamp_created,
refreshGroupId: backupRefreshGroup.refresh_group_id,
reason,
- lastError: undefined,
lastErrorPerCoin: {},
oldCoinPubs: backupRefreshGroup.old_coins.map((x) => x.coin_pub),
statusPerCoin: backupRefreshGroup.old_coins.map((x) =>
@@ -869,7 +858,6 @@ export async function importBackup(
Amounts.parseOrThrow(x.estimated_output_amount),
),
refreshSessionPerCoin,
- retryInfo: RetryInfo.reset(),
});
}
}
@@ -891,11 +879,9 @@ export async function importBackup(
createdTimestamp: backupTip.timestamp_created,
denomsSel,
exchangeBaseUrl: backupTip.exchange_base_url,
- lastError: undefined,
merchantBaseUrl: backupTip.exchange_base_url,
merchantTipId: backupTip.merchant_tip_id,
pickedUpTimestamp: backupTip.timestamp_finished,
- retryInfo: RetryInfo.reset(),
secretSeed: backupTip.secret_seed,
tipAmountEffective: denomsSel.totalCoinValue,
tipAmountRaw: Amounts.parseOrThrow(backupTip.tip_amount_raw),
diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts
index 45b8491df..56871104c 100644
--- a/packages/taler-wallet-core/src/operations/backup/index.ts
+++ b/packages/taler-wallet-core/src/operations/backup/index.ts
@@ -25,9 +25,12 @@
* Imports.
*/
import {
- AbsoluteTime, AmountString,
+ AbsoluteTime,
+ AmountString,
BackupRecovery,
- buildCodecForObject, bytesToString, canonicalizeBaseUrl,
+ buildCodecForObject,
+ bytesToString,
+ canonicalizeBaseUrl,
canonicalJson,
Codec,
codecForAmountString,
@@ -36,19 +39,32 @@ import {
codecForNumber,
codecForString,
codecOptional,
- ConfirmPayResultType, decodeCrock, DenomKeyType,
- durationFromSpec, eddsaGetPublic,
+ ConfirmPayResultType,
+ decodeCrock,
+ DenomKeyType,
+ durationFromSpec,
+ eddsaGetPublic,
EddsaKeyPair,
encodeCrock,
getRandomBytes,
- hash, hashDenomPub,
+ hash,
+ hashDenomPub,
HttpStatusCode,
- j2s, kdf, Logger,
+ j2s,
+ kdf,
+ Logger,
notEmpty,
PreparePayResultType,
RecoveryLoadRequest,
- RecoveryMergeStrategy, rsaBlind, secretbox, secretbox_open, stringToBytes, TalerErrorDetail, TalerProtocolTimestamp, URL,
- WalletBackupContentV1
+ RecoveryMergeStrategy,
+ rsaBlind,
+ secretbox,
+ secretbox_open,
+ stringToBytes,
+ TalerErrorDetail,
+ TalerProtocolTimestamp,
+ URL,
+ WalletBackupContentV1,
} from "@gnu-taler/taler-util";
import { gunzipSync, gzipSync } from "fflate";
import { TalerCryptoInterface } from "../../crypto/cryptoImplementation.js";
@@ -58,26 +74,28 @@ import {
BackupProviderStateTag,
BackupProviderTerms,
ConfigRecord,
+ OperationAttemptResult,
+ OperationAttemptResultType,
WalletBackupConfState,
WalletStoresV1,
- WALLET_BACKUP_STATE_KEY
+ WALLET_BACKUP_STATE_KEY,
} from "../../db.js";
import { InternalWalletState } from "../../internal-wallet-state.js";
import {
readSuccessResponseJsonOrThrow,
- readTalerErrorResponse
+ readTalerErrorResponse,
} from "../../util/http.js";
import {
checkDbInvariant,
- checkLogicInvariant
+ checkLogicInvariant,
} from "../../util/invariants.js";
import { GetReadWriteAccess } from "../../util/query.js";
-import { RetryInfo } from "../../util/retries.js";
+import { RetryInfo, RetryTags, scheduleRetryInTx } from "../../util/retries.js";
import { guardOperationException } from "../common.js";
import {
checkPaymentByProposalId,
confirmPay,
- preparePayForUri
+ preparePayForUri,
} from "../pay.js";
import { exportBackup } from "./export.js";
import { BackupCryptoPrecomputedData, importBackup } from "./import.js";
@@ -244,8 +262,7 @@ function getNextBackupTimestamp(): TalerProtocolTimestamp {
async function runBackupCycleForProvider(
ws: InternalWalletState,
args: BackupForProviderArgs,
-): Promise<void> {
-
+): Promise<OperationAttemptResult> {
const provider = await ws.db
.mktx((x) => ({ backupProviders: x.backupProviders }))
.runReadOnly(async (tx) => {
@@ -254,7 +271,10 @@ async function runBackupCycleForProvider(
if (!provider) {
logger.warn("provider disappeared");
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
const backupJson = await exportBackup(ws);
@@ -292,8 +312,8 @@ async function runBackupCycleForProvider(
"if-none-match": newHash,
...(provider.lastBackupHash
? {
- "if-match": provider.lastBackupHash,
- }
+ "if-match": provider.lastBackupHash,
+ }
: {}),
},
});
@@ -315,7 +335,10 @@ async function runBackupCycleForProvider(
};
await tx.backupProvider.put(prov);
});
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
if (resp.status === HttpStatusCode.PaymentRequired) {
@@ -344,7 +367,10 @@ async function runBackupCycleForProvider(
// FIXME: check if the provider is overcharging us!
await ws.db
- .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .mktx((x) => ({
+ backupProviders: x.backupProviders,
+ operationRetries: x.operationRetries,
+ }))
.runReadWrite(async (tx) => {
const provRec = await tx.backupProviders.get(provider.baseUrl);
checkDbInvariant(!!provRec);
@@ -354,11 +380,8 @@ async function runBackupCycleForProvider(
provRec.currentPaymentProposalId = proposalId;
// FIXME: allocate error code for this!
await tx.backupProviders.put(provRec);
- await incrementBackupRetryInTx(
- tx,
- args.backupProviderBaseUrl,
- undefined,
- );
+ const opId = RetryTags.forBackup(provRec);
+ await scheduleRetryInTx(ws, tx, opId);
});
if (doPay) {
@@ -371,12 +394,15 @@ async function runBackupCycleForProvider(
}
if (args.retryAfterPayment) {
- await runBackupCycleForProvider(ws, {
+ return await runBackupCycleForProvider(ws, {
...args,
retryAfterPayment: false,
});
}
- return;
+ return {
+ type: OperationAttemptResultType.Pending,
+ result: undefined,
+ };
}
if (resp.status === HttpStatusCode.NoContent) {
@@ -395,7 +421,10 @@ async function runBackupCycleForProvider(
};
await tx.backupProviders.put(prov);
});
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
if (resp.status === HttpStatusCode.Conflict) {
@@ -406,7 +435,10 @@ async function runBackupCycleForProvider(
const cryptoData = await computeBackupCryptoData(ws.cryptoApi, blob);
await importBackup(ws, blob, cryptoData);
await ws.db
- .mktx((x) => ({ backupProvider: x.backupProviders }))
+ .mktx((x) => ({
+ backupProvider: x.backupProviders,
+ operationRetries: x.operationRetries,
+ }))
.runReadWrite(async (tx) => {
const prov = await tx.backupProvider.get(provider.baseUrl);
if (!prov) {
@@ -414,20 +446,21 @@ async function runBackupCycleForProvider(
return;
}
prov.lastBackupHash = encodeCrock(hash(backupEnc));
- // FIXME: Allocate error code for this situation?
+ // FIXME: Allocate error code for this situation?
+ // FIXME: Add operation retry record!
+ const opId = RetryTags.forBackup(prov);
+ await scheduleRetryInTx(ws, tx, opId);
prov.state = {
tag: BackupProviderStateTag.Retrying,
- retryInfo: RetryInfo.reset(),
};
await tx.backupProvider.put(prov);
});
logger.info("processed existing backup");
// Now upload our own, merged backup.
- await runBackupCycleForProvider(ws, {
+ return await runBackupCycleForProvider(ws, {
...args,
retryAfterPayment: false,
});
- return;
}
// Some other response that we did not expect!
@@ -436,53 +469,16 @@ async function runBackupCycleForProvider(
const err = await readTalerErrorResponse(resp);
logger.error(`got error response from backup provider: ${j2s(err)}`);
- await ws.db
- .mktx((x) => ({ backupProviders: x.backupProviders }))
- .runReadWrite(async (tx) => {
- incrementBackupRetryInTx(tx, args.backupProviderBaseUrl, err);
- });
-}
-
-async function incrementBackupRetryInTx(
- tx: GetReadWriteAccess<{
- backupProviders: typeof WalletStoresV1.backupProviders;
- }>,
- backupProviderBaseUrl: string,
- err: TalerErrorDetail | undefined,
-): Promise<void> {
- const pr = await tx.backupProviders.get(backupProviderBaseUrl);
- if (!pr) {
- return;
- }
- if (pr.state.tag === BackupProviderStateTag.Retrying) {
- pr.state.lastError = err;
- pr.state.retryInfo = RetryInfo.increment(pr.state.retryInfo);
- } else if (pr.state.tag === BackupProviderStateTag.Ready) {
- pr.state = {
- tag: BackupProviderStateTag.Retrying,
- retryInfo: RetryInfo.reset(),
- lastError: err,
- };
- }
- await tx.backupProviders.put(pr);
-}
-
-async function incrementBackupRetry(
- ws: InternalWalletState,
- backupProviderBaseUrl: string,
- err: TalerErrorDetail | undefined,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ backupProviders: x.backupProviders }))
- .runReadWrite(async (tx) =>
- incrementBackupRetryInTx(tx, backupProviderBaseUrl, err),
- );
+ return {
+ type: OperationAttemptResultType.Error,
+ errorDetail: err,
+ };
}
export async function processBackupForProvider(
ws: InternalWalletState,
backupProviderBaseUrl: string,
-): Promise<void> {
+): Promise<OperationAttemptResult> {
const provider = await ws.db
.mktx((x) => ({ backupProviders: x.backupProviders }))
.runReadOnly(async (tx) => {
@@ -492,17 +488,10 @@ export async function processBackupForProvider(
throw Error("unknown backup provider");
}
- const onOpErr = (err: TalerErrorDetail): Promise<void> =>
- incrementBackupRetry(ws, backupProviderBaseUrl, err);
-
- const run = async () => {
- await runBackupCycleForProvider(ws, {
- backupProviderBaseUrl: provider.baseUrl,
- retryAfterPayment: true,
- });
- };
-
- await guardOperationException(run, onOpErr);
+ return await runBackupCycleForProvider(ws, {
+ backupProviderBaseUrl: provider.baseUrl,
+ retryAfterPayment: true,
+ });
}
export interface RemoveBackupProviderRequest {
@@ -818,24 +807,34 @@ export async function getBackupInfo(
): Promise<BackupInfo> {
const backupConfig = await provideBackupState(ws);
const providerRecords = await ws.db
- .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .mktx((x) => ({
+ backupProviders: x.backupProviders,
+ operationRetries: x.operationRetries,
+ }))
.runReadOnly(async (tx) => {
- return await tx.backupProviders.iter().toArray();
+ return await tx.backupProviders.iter().mapAsync(async (bp) => {
+ const opId = RetryTags.forBackup(bp);
+ const retryRecord = await tx.operationRetries.get(opId);
+ return {
+ provider: bp,
+ retryRecord,
+ };
+ });
});
const providers: ProviderInfo[] = [];
for (const x of providerRecords) {
providers.push({
- active: x.state.tag !== BackupProviderStateTag.Provisional,
- syncProviderBaseUrl: x.baseUrl,
- lastSuccessfulBackupTimestamp: x.lastBackupCycleTimestamp,
- paymentProposalIds: x.paymentProposalIds,
+ active: x.provider.state.tag !== BackupProviderStateTag.Provisional,
+ syncProviderBaseUrl: x.provider.baseUrl,
+ lastSuccessfulBackupTimestamp: x.provider.lastBackupCycleTimestamp,
+ paymentProposalIds: x.provider.paymentProposalIds,
lastError:
- x.state.tag === BackupProviderStateTag.Retrying
- ? x.state.lastError
+ x.provider.state.tag === BackupProviderStateTag.Retrying
+ ? x.retryRecord?.lastError
: undefined,
- paymentStatus: await getProviderPaymentInfo(ws, x),
- terms: x.terms,
- name: x.name,
+ paymentStatus: await getProviderPaymentInfo(ws, x.provider),
+ terms: x.provider.terms,
+ name: x.provider.name,
});
}
return {
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts
index 734bc4c2b..6eed12a38 100644
--- a/packages/taler-wallet-core/src/operations/deposits.ts
+++ b/packages/taler-wallet-core/src/operations/deposits.ts
@@ -44,7 +44,12 @@ import {
TrackDepositGroupResponse,
URL,
} from "@gnu-taler/taler-util";
-import { DepositGroupRecord, OperationStatus } from "../db.js";
+import {
+ DepositGroupRecord,
+ OperationAttemptErrorResult,
+ OperationAttemptResult,
+ OperationStatus,
+} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { selectPayCoins } from "../util/coinSelection.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
@@ -67,91 +72,16 @@ import { getTotalRefreshCost } from "./refresh.js";
const logger = new Logger("deposits.ts");
/**
- * Set up the retry timeout for a deposit group.
- */
-async function setupDepositGroupRetry(
- ws: InternalWalletState,
- depositGroupId: string,
- options: {
- resetRetry: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- depositGroups: x.depositGroups,
- }))
- .runReadWrite(async (tx) => {
- const x = await tx.depositGroups.get(depositGroupId);
- if (!x) {
- return;
- }
- if (options.resetRetry) {
- x.retryInfo = RetryInfo.reset();
- } else {
- x.retryInfo = RetryInfo.increment(x.retryInfo);
- }
- delete x.lastError;
- await tx.depositGroups.put(x);
- });
-}
-
-/**
- * Report an error that occurred while processing the deposit group.
- */
-async function reportDepositGroupError(
- ws: InternalWalletState,
- depositGroupId: string,
- err: TalerErrorDetail,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ depositGroups: x.depositGroups }))
- .runReadWrite(async (tx) => {
- const r = await tx.depositGroups.get(depositGroupId);
- if (!r) {
- return;
- }
- if (!r.retryInfo) {
- logger.error(
- `deposit group record (${depositGroupId}) reports error, but no retry active`,
- );
- return;
- }
- r.lastError = err;
- await tx.depositGroups.put(r);
- });
- ws.notify({ type: NotificationType.DepositOperationError, error: err });
-}
-
-export async function processDepositGroup(
- ws: InternalWalletState,
- depositGroupId: string,
- options: {
- forceNow?: boolean;
- cancellationToken?: CancellationToken;
- } = {},
-): Promise<void> {
- const onOpErr = (err: TalerErrorDetail): Promise<void> =>
- reportDepositGroupError(ws, depositGroupId, err);
- return await guardOperationException(
- async () => await processDepositGroupImpl(ws, depositGroupId, options),
- onOpErr,
- );
-}
-
-/**
* @see {processDepositGroup}
*/
-async function processDepositGroupImpl(
+export async function processDepositGroup(
ws: InternalWalletState,
depositGroupId: string,
options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
-): Promise<void> {
- const forceNow = options.forceNow ?? false;
- await setupDepositGroupRetry(ws, depositGroupId, { resetRetry: forceNow });
-
+): Promise<OperationAttemptResult> {
const depositGroup = await ws.db
.mktx((x) => ({
depositGroups: x.depositGroups,
@@ -161,11 +91,11 @@ async function processDepositGroupImpl(
});
if (!depositGroup) {
logger.warn(`deposit group ${depositGroupId} not found`);
- return;
+ return OperationAttemptResult.finishedEmpty();
}
if (depositGroup.timestampFinished) {
logger.trace(`deposit group ${depositGroupId} already finished`);
- return;
+ return OperationAttemptResult.finishedEmpty();
}
const contractData = extractContractData(
@@ -240,11 +170,10 @@ async function processDepositGroupImpl(
if (allDeposited) {
dg.timestampFinished = TalerProtocolTimestamp.now();
dg.operationStatus = OperationStatus.Finished;
- delete dg.lastError;
- delete dg.retryInfo;
await tx.depositGroups.put(dg);
}
});
+ return OperationAttemptResult.finishedEmpty();
}
export async function trackDepositGroup(
@@ -338,9 +267,9 @@ export async function getFeeForDeposit(
const csr: CoinSelectionRequest = {
allowedAuditors: [],
- allowedExchanges: Object.values(exchangeInfos).map(v => ({
+ allowedExchanges: Object.values(exchangeInfos).map((v) => ({
exchangeBaseUrl: v.url,
- exchangePub: v.master_pub
+ exchangePub: v.master_pub,
})),
amount: Amounts.parseOrThrow(req.amount),
maxDepositFee: Amounts.parseOrThrow(req.amount),
@@ -383,7 +312,6 @@ export async function prepareDepositGroup(
}
const amount = Amounts.parseOrThrow(req.amount);
-
const exchangeInfos: { url: string; master_pub: string }[] = [];
await ws.db
@@ -473,7 +401,7 @@ export async function prepareDepositGroup(
payCoinSel,
);
- return { totalDepositCost, effectiveDepositAmount }
+ return { totalDepositCost, effectiveDepositAmount };
}
export async function createDepositGroup(
ws: InternalWalletState,
@@ -600,9 +528,7 @@ export async function createDepositGroup(
payto_uri: req.depositPaytoUri,
salt: wireSalt,
},
- retryInfo: RetryInfo.reset(),
operationStatus: OperationStatus.Pending,
- lastError: undefined,
};
await ws.db
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts
index b75bdfd74..1021da6b6 100644
--- a/packages/taler-wallet-core/src/operations/exchanges.ts
+++ b/packages/taler-wallet-core/src/operations/exchanges.ts
@@ -53,6 +53,8 @@ import {
DenominationVerificationStatus,
ExchangeDetailsRecord,
ExchangeRecord,
+ OperationAttemptResult,
+ OperationAttemptResultType,
WalletStoresV1,
} from "../db.js";
import { TalerError } from "../errors.js";
@@ -64,7 +66,7 @@ import {
readSuccessResponseTextOrThrow,
} from "../util/http.js";
import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
-import { RetryInfo } from "../util/retries.js";
+import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";
import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js";
import { guardOperationException } from "./common.js";
@@ -102,51 +104,6 @@ function denominationRecordFromKeys(
return d;
}
-async function reportExchangeUpdateError(
- ws: InternalWalletState,
- baseUrl: string,
- err: TalerErrorDetail,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ exchanges: x.exchanges }))
- .runReadWrite(async (tx) => {
- const exchange = await tx.exchanges.get(baseUrl);
- if (!exchange) {
- return;
- }
- if (!exchange.retryInfo) {
- logger.reportBreak();
- }
- exchange.lastError = err;
- await tx.exchanges.put(exchange);
- });
- ws.notify({ type: NotificationType.ExchangeOperationError, error: err });
-}
-
-async function setupExchangeUpdateRetry(
- ws: InternalWalletState,
- baseUrl: string,
- options: {
- reset: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ exchanges: x.exchanges }))
- .runReadWrite(async (tx) => {
- const exchange = await tx.exchanges.get(baseUrl);
- if (!exchange) {
- return;
- }
- if (options.reset) {
- exchange.retryInfo = RetryInfo.reset();
- } else {
- exchange.retryInfo = RetryInfo.increment(exchange.retryInfo);
- }
- delete exchange.lastError;
- await tx.exchanges.put(exchange);
- });
-}
-
export function getExchangeRequestTimeout(): Duration {
return Duration.fromSpec({
seconds: 5,
@@ -360,25 +317,6 @@ async function downloadExchangeWireInfo(
return wireInfo;
}
-export async function updateExchangeFromUrl(
- ws: InternalWalletState,
- baseUrl: string,
- options: {
- forceNow?: boolean;
- cancellationToken?: CancellationToken;
- } = {},
-): Promise<{
- exchange: ExchangeRecord;
- exchangeDetails: ExchangeDetailsRecord;
-}> {
- const onOpErr = (e: TalerErrorDetail): Promise<void> =>
- reportExchangeUpdateError(ws, baseUrl, e);
- return await guardOperationException(
- () => updateExchangeFromUrlImpl(ws, baseUrl, options),
- onOpErr,
- );
-}
-
async function provideExchangeRecord(
ws: InternalWalletState,
baseUrl: string,
@@ -398,7 +336,6 @@ async function provideExchangeRecord(
const r: ExchangeRecord = {
permanent: true,
baseUrl: baseUrl,
- retryInfo: RetryInfo.reset(),
detailsPointer: undefined,
lastUpdate: undefined,
nextUpdate: AbsoluteTime.toTimestamp(now),
@@ -530,25 +467,42 @@ export async function downloadTosFromAcceptedFormat(
);
}
+export async function updateExchangeFromUrl(
+ ws: InternalWalletState,
+ baseUrl: string,
+ options: {
+ forceNow?: boolean;
+ cancellationToken?: CancellationToken;
+ } = {},
+): Promise<{
+ exchange: ExchangeRecord;
+ exchangeDetails: ExchangeDetailsRecord;
+}> {
+ return runOperationHandlerForResult(
+ await updateExchangeFromUrlHandler(ws, baseUrl, options),
+ );
+}
+
/**
* Update or add exchange DB entry by fetching the /keys and /wire information.
* Optionally link the reserve entry to the new or existing
* exchange entry in then DB.
*/
-async function updateExchangeFromUrlImpl(
+export async function updateExchangeFromUrlHandler(
ws: InternalWalletState,
baseUrl: string,
options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
-): Promise<{
- exchange: ExchangeRecord;
- exchangeDetails: ExchangeDetailsRecord;
-}> {
+): Promise<
+ OperationAttemptResult<{
+ exchange: ExchangeRecord;
+ exchangeDetails: ExchangeDetailsRecord;
+ }>
+> {
const forceNow = options.forceNow ?? false;
logger.info(`updating exchange info for ${baseUrl}, forced: ${forceNow}`);
- await setupExchangeUpdateRetry(ws, baseUrl, { reset: forceNow });
const now = AbsoluteTime.now();
baseUrl = canonicalizeBaseUrl(baseUrl);
@@ -565,7 +519,10 @@ async function updateExchangeFromUrlImpl(
!AbsoluteTime.isExpired(AbsoluteTime.fromTimestamp(exchange.nextUpdate))
) {
logger.info("using existing exchange info");
- return { exchange, exchangeDetails };
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: { exchange, exchangeDetails },
+ };
}
logger.info("updating exchange /keys info");
@@ -649,8 +606,6 @@ async function updateExchangeFromUrlImpl(
termsOfServiceAcceptedTimestamp: TalerProtocolTimestamp.now(),
};
// FIXME: only update if pointer got updated
- delete r.lastError;
- delete r.retryInfo;
r.lastUpdate = TalerProtocolTimestamp.now();
r.nextUpdate = keysInfo.expiry;
// New denominations might be available.
@@ -771,8 +726,11 @@ async function updateExchangeFromUrlImpl(
type: NotificationType.ExchangeAdded,
});
return {
- exchange: updated.exchange,
- exchangeDetails: updated.exchangeDetails,
+ type: OperationAttemptResultType.Finished,
+ result: {
+ exchange: updated.exchange,
+ exchangeDetails: updated.exchangeDetails,
+ },
};
}
diff --git a/packages/taler-wallet-core/src/operations/pay.ts b/packages/taler-wallet-core/src/operations/pay.ts
index 3d4d2b5a0..9e63cd516 100644
--- a/packages/taler-wallet-core/src/operations/pay.ts
+++ b/packages/taler-wallet-core/src/operations/pay.ts
@@ -37,9 +37,6 @@ import {
ContractTerms,
ContractTermsUtil,
Duration,
- durationMax,
- durationMin,
- durationMul,
encodeCrock,
ForcedCoinSel,
getRandomBytes,
@@ -59,10 +56,7 @@ import {
TransactionType,
URL,
} from "@gnu-taler/taler-util";
-import {
- EXCHANGE_COINS_LOCK,
- InternalWalletState,
-} from "../internal-wallet-state.js";
+import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
import {
AbortStatus,
AllowedAuditorInfo,
@@ -71,6 +65,8 @@ import {
CoinRecord,
CoinStatus,
DenominationRecord,
+ OperationAttemptResult,
+ OperationAttemptResultType,
ProposalRecord,
ProposalStatus,
PurchaseRecord,
@@ -83,6 +79,11 @@ import {
TalerError,
} from "../errors.js";
import {
+ EXCHANGE_COINS_LOCK,
+ InternalWalletState,
+} from "../internal-wallet-state.js";
+import { assertUnreachable } from "../util/assertUnreachable.js";
+import {
AvailableCoinInfo,
CoinCandidateSelection,
PreviousPayCoins,
@@ -98,11 +99,9 @@ import {
throwUnexpectedRequestError,
} from "../util/http.js";
import { GetReadWriteAccess } from "../util/query.js";
-import { RetryInfo } from "../util/retries.js";
+import { RetryInfo, RetryTags, scheduleRetry } from "../util/retries.js";
import { getExchangeDetails } from "./exchanges.js";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js";
-import { guardOperationException } from "./common.js";
-import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
/**
* Logger.
@@ -448,10 +447,6 @@ async function recordConfirmPay(
timestampAccept: AbsoluteTime.toTimestamp(AbsoluteTime.now()),
timestampLastRefundStatus: undefined,
proposalId: proposal.proposalId,
- lastPayError: undefined,
- lastRefundStatusError: undefined,
- payRetryInfo: RetryInfo.reset(),
- refundStatusRetryInfo: RetryInfo.reset(),
refundQueryRequested: false,
timestampFirstSuccessfulPay: undefined,
autoRefundDeadline: undefined,
@@ -475,8 +470,6 @@ async function recordConfirmPay(
const p = await tx.proposals.get(proposal.proposalId);
if (p) {
p.proposalStatus = ProposalStatus.Accepted;
- delete p.lastError;
- delete p.retryInfo;
await tx.proposals.put(p);
}
await tx.purchases.put(t);
@@ -490,117 +483,6 @@ async function recordConfirmPay(
return t;
}
-async function reportProposalError(
- ws: InternalWalletState,
- proposalId: string,
- err: TalerErrorDetail,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ proposals: x.proposals }))
- .runReadWrite(async (tx) => {
- const pr = await tx.proposals.get(proposalId);
- if (!pr) {
- return;
- }
- if (!pr.retryInfo) {
- logger.error(
- `Asked to report an error for a proposal (${proposalId}) that is not active (no retryInfo)`,
- );
- logger.reportBreak();
- return;
- }
- pr.lastError = err;
- await tx.proposals.put(pr);
- });
- ws.notify({ type: NotificationType.ProposalOperationError, error: err });
-}
-
-async function setupProposalRetry(
- ws: InternalWalletState,
- proposalId: string,
- options: {
- reset: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ proposals: x.proposals }))
- .runReadWrite(async (tx) => {
- const pr = await tx.proposals.get(proposalId);
- if (!pr) {
- return;
- }
- if (options.reset) {
- pr.retryInfo = RetryInfo.reset();
- } else {
- pr.retryInfo = RetryInfo.increment(pr.retryInfo);
- }
- delete pr.lastError;
- await tx.proposals.put(pr);
- });
-}
-
-async function setupPurchasePayRetry(
- ws: InternalWalletState,
- proposalId: string,
- options: {
- reset: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ purchases: x.purchases }))
- .runReadWrite(async (tx) => {
- const p = await tx.purchases.get(proposalId);
- if (!p) {
- return;
- }
- if (options.reset) {
- p.payRetryInfo = RetryInfo.reset();
- } else {
- p.payRetryInfo = RetryInfo.increment(p.payRetryInfo);
- }
- delete p.lastPayError;
- await tx.purchases.put(p);
- });
-}
-
-async function reportPurchasePayError(
- ws: InternalWalletState,
- proposalId: string,
- err: TalerErrorDetail,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ purchases: x.purchases }))
- .runReadWrite(async (tx) => {
- const pr = await tx.purchases.get(proposalId);
- if (!pr) {
- return;
- }
- if (!pr.payRetryInfo) {
- logger.error(
- `purchase record (${proposalId}) reports error, but no retry active`,
- );
- }
- pr.lastPayError = err;
- await tx.purchases.put(pr);
- });
- ws.notify({ type: NotificationType.PayOperationError, error: err });
-}
-
-export async function processDownloadProposal(
- ws: InternalWalletState,
- proposalId: string,
- options: {
- forceNow?: boolean;
- } = {},
-): Promise<void> {
- const onOpErr = (err: TalerErrorDetail): Promise<void> =>
- reportProposalError(ws, proposalId, err);
- await guardOperationException(
- () => processDownloadProposalImpl(ws, proposalId, options),
- onOpErr,
- );
-}
-
async function failProposalPermanently(
ws: InternalWalletState,
proposalId: string,
@@ -613,23 +495,21 @@ async function failProposalPermanently(
if (!p) {
return;
}
- delete p.retryInfo;
- p.lastError = err;
p.proposalStatus = ProposalStatus.PermanentlyFailed;
await tx.proposals.put(p);
});
}
-function getProposalRequestTimeout(proposal: ProposalRecord): Duration {
+function getProposalRequestTimeout(retryInfo?: RetryInfo): Duration {
return Duration.clamp({
lower: Duration.fromSpec({ seconds: 1 }),
upper: Duration.fromSpec({ seconds: 60 }),
- value: RetryInfo.getDuration(proposal.retryInfo),
+ value: retryInfo ? RetryInfo.getDuration(retryInfo) : Duration.fromSpec({}),
});
}
function getPayRequestTimeout(purchase: PurchaseRecord): Duration {
- return durationMul(
+ return Duration.multiply(
{ d_ms: 15000 },
1 + purchase.payCoinSelection.coinPubs.length / 5,
);
@@ -682,15 +562,13 @@ export function extractContractData(
};
}
-async function processDownloadProposalImpl(
+export async function processDownloadProposal(
ws: InternalWalletState,
proposalId: string,
- options: {
- forceNow?: boolean;
- } = {},
-): Promise<void> {
- const forceNow = options.forceNow ?? false;
- await setupProposalRetry(ws, proposalId, { reset: forceNow });
+ options: {} = {},
+): Promise<OperationAttemptResult> {
+
+ const res = ws.db.mktx2((x) => [x.auditorTrust, x.coins])
const proposal = await ws.db
.mktx((x) => ({ proposals: x.proposals }))
@@ -699,11 +577,17 @@ async function processDownloadProposalImpl(
});
if (!proposal) {
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
if (proposal.proposalStatus != ProposalStatus.Downloading) {
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
const orderClaimUrl = new URL(
@@ -722,8 +606,16 @@ async function processDownloadProposalImpl(
requestBody.token = proposal.claimToken;
}
+ const opId = RetryTags.forProposalClaim(proposal);
+ const retryRecord = await ws.db
+ .mktx((x) => ({ operationRetries: x.operationRetries }))
+ .runReadOnly(async (tx) => {
+ return tx.operationRetries.get(opId);
+ });
+
+ // FIXME: Do this in the background using the new return value
const httpResponse = await ws.http.postJson(orderClaimUrl, requestBody, {
- timeout: getProposalRequestTimeout(proposal),
+ timeout: getProposalRequestTimeout(retryRecord?.retryInfo),
});
const r = await readSuccessResponseJsonOrErrorCode(
httpResponse,
@@ -892,6 +784,11 @@ async function processDownloadProposalImpl(
type: NotificationType.ProposalDownloaded,
proposalId: proposal.proposalId,
});
+
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
/**
@@ -954,8 +851,6 @@ async function startDownloadProposal(
proposalId: proposalId,
proposalStatus: ProposalStatus.Downloading,
repurchaseProposalId: undefined,
- retryInfo: RetryInfo.reset(),
- lastError: undefined,
downloadSessionId: sessionId,
};
@@ -1000,17 +895,13 @@ async function storeFirstPaySuccess(
}
purchase.timestampFirstSuccessfulPay = now;
purchase.paymentSubmitPending = false;
- purchase.lastPayError = undefined;
purchase.lastSessionId = sessionId;
- purchase.payRetryInfo = RetryInfo.reset();
purchase.merchantPaySig = paySig;
const protoAr = purchase.download.contractData.autoRefund;
if (protoAr) {
const ar = Duration.fromTalerProtocolDuration(protoAr);
logger.info("auto_refund present");
purchase.refundQueryRequested = true;
- purchase.refundStatusRetryInfo = RetryInfo.reset();
- purchase.lastRefundStatusError = undefined;
purchase.autoRefundDeadline = AbsoluteTime.toTimestamp(
AbsoluteTime.addDuration(AbsoluteTime.now(), ar),
);
@@ -1038,8 +929,6 @@ async function storePayReplaySuccess(
throw Error("invalid payment state");
}
purchase.paymentSubmitPending = false;
- purchase.lastPayError = undefined;
- purchase.payRetryInfo = RetryInfo.reset();
purchase.lastSessionId = sessionId;
await tx.purchases.put(purchase);
});
@@ -1298,7 +1187,8 @@ export async function checkPaymentByProposalId(
await tx.purchases.put(p);
});
const r = await processPurchasePay(ws, proposalId, { forceNow: true });
- if (r.type !== ConfirmPayResultType.Done) {
+ if (r.type !== OperationAttemptResultType.Finished) {
+ // FIXME: This does not surface the original error
throw Error("submitting pay failed");
}
return {
@@ -1458,6 +1348,45 @@ export async function generateDepositPermissions(
}
/**
+ * Run the operation handler for a payment
+ * and return the result as a {@link ConfirmPayResult}.
+ */
+export async function runPayForConfirmPay(
+ ws: InternalWalletState,
+ proposalId: string,
+): Promise<ConfirmPayResult> {
+ const res = await processPurchasePay(ws, proposalId, { forceNow: true });
+ switch (res.type) {
+ case OperationAttemptResultType.Finished: {
+ const purchase = await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadOnly(async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
+ if (!purchase?.download) {
+ throw Error("purchase record not available anymore");
+ }
+ return {
+ type: ConfirmPayResultType.Done,
+ contractTerms: purchase.download.contractTermsRaw,
+ };
+ }
+ case OperationAttemptResultType.Error:
+ // FIXME: allocate error code!
+ throw Error("payment failed");
+ case OperationAttemptResultType.Pending:
+ return {
+ type: ConfirmPayResultType.Pending,
+ lastError: undefined,
+ };
+ case OperationAttemptResultType.Longpoll:
+ throw Error("unexpected processPurchasePay result (longpoll)");
+ default:
+ assertUnreachable(res);
+ }
+}
+
+/**
* Add a contract to the wallet and sign coins, and send them.
*/
export async function confirmPay(
@@ -1503,7 +1432,7 @@ export async function confirmPay(
if (existingPurchase) {
logger.trace("confirmPay: submitting payment for existing purchase");
- return await processPurchasePay(ws, proposalId, { forceNow: true });
+ return runPayForConfirmPay(ws, proposalId);
}
logger.trace("confirmPay: purchase record does not exist yet");
@@ -1559,6 +1488,7 @@ export async function confirmPay(
res,
d.contractData,
);
+
await recordConfirmPay(
ws,
proposal,
@@ -1567,7 +1497,7 @@ export async function confirmPay(
sessionIdOverride,
);
- return await processPurchasePay(ws, proposalId, { forceNow: true });
+ return runPayForConfirmPay(ws, proposalId);
}
export async function processPurchasePay(
@@ -1576,24 +1506,7 @@ export async function processPurchasePay(
options: {
forceNow?: boolean;
} = {},
-): Promise<ConfirmPayResult> {
- const onOpErr = (e: TalerErrorDetail): Promise<void> =>
- reportPurchasePayError(ws, proposalId, e);
- return await guardOperationException(
- () => processPurchasePayImpl(ws, proposalId, options),
- onOpErr,
- );
-}
-
-async function processPurchasePayImpl(
- ws: InternalWalletState,
- proposalId: string,
- options: {
- forceNow?: boolean;
- } = {},
-): Promise<ConfirmPayResult> {
- const forceNow = options.forceNow ?? false;
- await setupPurchasePayRetry(ws, proposalId, { reset: forceNow });
+): Promise<OperationAttemptResult> {
const purchase = await ws.db
.mktx((x) => ({ purchases: x.purchases }))
.runReadOnly(async (tx) => {
@@ -1601,8 +1514,8 @@ async function processPurchasePayImpl(
});
if (!purchase) {
return {
- type: ConfirmPayResultType.Pending,
- lastError: {
+ type: OperationAttemptResultType.Error,
+ errorDetail: {
// FIXME: allocate more specific error code
code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
hint: `trying to pay for purchase that is not in the database`,
@@ -1611,10 +1524,7 @@ async function processPurchasePayImpl(
};
}
if (!purchase.paymentSubmitPending) {
- return {
- type: ConfirmPayResultType.Pending,
- lastError: purchase.lastPayError,
- };
+ OperationAttemptResult.finishedEmpty();
}
logger.trace(`processing purchase pay ${proposalId}`);
@@ -1659,23 +1569,12 @@ async function processPurchasePayImpl(
logger.trace(`got resp ${JSON.stringify(resp)}`);
- // Hide transient errors.
- if (
- (purchase.payRetryInfo?.retryCounter ?? 0) <= 5 &&
- resp.status >= 500 &&
- resp.status <= 599
- ) {
- logger.trace("treating /pay error as transient");
- const err = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
- getHttpResponseErrorDetails(resp),
- "/pay failed",
- );
- return {
- type: ConfirmPayResultType.Pending,
- lastError: err,
- };
- }
+ const payOpId = RetryTags.forPay(purchase);
+ const payRetryRecord = await ws.db
+ .mktx((x) => ({ operationRetries: x.operationRetries }))
+ .runReadOnly(async (tx) => {
+ return await tx.operationRetries.get(payOpId);
+ });
if (resp.status === HttpStatusCode.BadRequest) {
const errDetails = await readUnexpectedResponseDetails(resp);
@@ -1689,8 +1588,6 @@ async function processPurchasePayImpl(
return;
}
purch.payFrozen = true;
- purch.lastPayError = errDetails;
- delete purch.payRetryInfo;
await tx.purchases.put(purch);
});
throw makePendingOperationFailedError(
@@ -1708,7 +1605,9 @@ async function processPurchasePayImpl(
) {
// Do this in the background, as it might take some time
handleInsufficientFunds(ws, proposalId, err).catch(async (e) => {
- reportPurchasePayError(ws, proposalId, {
+ console.log("handling insufficient funds failed");
+
+ await scheduleRetry(ws, RetryTags.forPay(purchase), {
code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
message: "unexpected exception",
hint: "unexpected exception",
@@ -1719,9 +1618,8 @@ async function processPurchasePayImpl(
});
return {
- type: ConfirmPayResultType.Pending,
- // FIXME: should we return something better here?
- lastError: err,
+ type: OperationAttemptResultType.Pending,
+ result: undefined,
};
}
}
@@ -1761,22 +1659,6 @@ async function processPurchasePayImpl(
const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () =>
ws.http.postJson(payAgainUrl, reqBody),
);
- // Hide transient errors.
- if (
- (purchase.payRetryInfo?.retryCounter ?? 0) <= 5 &&
- resp.status >= 500 &&
- resp.status <= 599
- ) {
- const err = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
- getHttpResponseErrorDetails(resp),
- "/paid failed",
- );
- return {
- type: ConfirmPayResultType.Pending,
- lastError: err,
- };
- }
if (resp.status !== 204) {
throw TalerError.fromDetail(
TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
@@ -1793,10 +1675,7 @@ async function processPurchasePayImpl(
proposalId: purchase.proposalId,
});
- return {
- type: ConfirmPayResultType.Done,
- contractTerms: purchase.download.contractTermsRaw,
- };
+ return OperationAttemptResult.finishedEmpty();
}
export async function refuseProposal(
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
index 5cf3afd4d..7d5a5bfd9 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -36,40 +36,50 @@ import {
import { AbsoluteTime } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadOnlyAccess } from "../util/query.js";
+import { RetryTags } from "../util/retries.js";
+import { Wallet } from "../wallet.js";
async function gatherExchangePending(
tx: GetReadOnlyAccess<{
exchanges: typeof WalletStoresV1.exchanges;
exchangeDetails: typeof WalletStoresV1.exchangeDetails;
+ operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.exchanges.iter().forEachAsync(async (e) => {
+ await tx.exchanges.iter().forEachAsync(async (exch) => {
+ const opTag = RetryTags.forExchangeUpdate(exch);
+ let opr = await tx.operationRetries.get(opTag);
resp.pendingOperations.push({
type: PendingTaskType.ExchangeUpdate,
+ id: opTag,
givesLifeness: false,
timestampDue:
- e.retryInfo?.nextRetry ?? AbsoluteTime.fromTimestamp(e.nextUpdate),
- exchangeBaseUrl: e.baseUrl,
- lastError: e.lastError,
+ opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate),
+ exchangeBaseUrl: exch.baseUrl,
+ lastError: opr?.lastError,
});
// We only schedule a check for auto-refresh if the exchange update
// was successful.
- if (!e.lastError) {
+ if (!opr?.lastError) {
resp.pendingOperations.push({
type: PendingTaskType.ExchangeCheckRefresh,
- timestampDue: AbsoluteTime.fromTimestamp(e.nextRefreshCheck),
+ id: RetryTags.forExchangeCheckRefresh(exch),
+ timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck),
givesLifeness: false,
- exchangeBaseUrl: e.baseUrl,
+ exchangeBaseUrl: exch.baseUrl,
});
}
});
}
async function gatherRefreshPending(
- tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>,
+ tx: GetReadOnlyAccess<{
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
@@ -83,15 +93,19 @@ async function gatherRefreshPending(
if (r.frozen) {
return;
}
+ const opId = RetryTags.forRefresh(r);
+ const retryRecord = await tx.operationRetries.get(opId);
+
resp.pendingOperations.push({
type: PendingTaskType.Refresh,
+ id: opId,
givesLifeness: true,
- timestampDue: r.retryInfo.nextRetry,
+ timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
refreshGroupId: r.refreshGroupId,
finishedPerCoin: r.statusPerCoin.map(
(x) => x === RefreshCoinStatus.Finished,
),
- retryInfo: r.retryInfo,
+ retryInfo: retryRecord?.retryInfo,
});
}
}
@@ -100,6 +114,7 @@ async function gatherWithdrawalPending(
tx: GetReadOnlyAccess<{
withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
planchets: typeof WalletStoresV1.planchets;
+ operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
@@ -111,54 +126,68 @@ async function gatherWithdrawalPending(
if (wsr.timestampFinish) {
return;
}
- let numCoinsWithdrawn = 0;
- let numCoinsTotal = 0;
- await tx.planchets.indexes.byGroup
- .iter(wsr.withdrawalGroupId)
- .forEach((x) => {
- numCoinsTotal++;
- if (x.withdrawalDone) {
- numCoinsWithdrawn++;
- }
- });
+ const opTag = RetryTags.forWithdrawal(wsr);
+ let opr = await tx.operationRetries.get(opTag);
+ const now = AbsoluteTime.now();
+ if (!opr) {
+ opr = {
+ id: opTag,
+ retryInfo: {
+ firstTry: now,
+ nextRetry: now,
+ retryCounter: 0,
+ },
+ };
+ }
resp.pendingOperations.push({
type: PendingTaskType.Withdraw,
+ id: opTag,
givesLifeness: true,
- timestampDue: wsr.retryInfo?.nextRetry ?? AbsoluteTime.now(),
+ timestampDue: opr.retryInfo?.nextRetry ?? AbsoluteTime.now(),
withdrawalGroupId: wsr.withdrawalGroupId,
- lastError: wsr.lastError,
- retryInfo: wsr.retryInfo,
+ lastError: opr.lastError,
+ retryInfo: opr.retryInfo,
});
}
}
async function gatherProposalPending(
- tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>,
+ tx: GetReadOnlyAccess<{
+ proposals: typeof WalletStoresV1.proposals;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.proposals.iter().forEach((proposal) => {
+ await tx.proposals.iter().forEachAsync(async (proposal) => {
if (proposal.proposalStatus == ProposalStatus.Proposed) {
// Nothing to do, user needs to choose.
} else if (proposal.proposalStatus == ProposalStatus.Downloading) {
- const timestampDue = proposal.retryInfo?.nextRetry ?? AbsoluteTime.now();
+ const opId = RetryTags.forProposalClaim(proposal);
+ const retryRecord = await tx.operationRetries.get(opId);
+ const timestampDue =
+ retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.ProposalDownload,
+ id: opId,
givesLifeness: true,
timestampDue,
merchantBaseUrl: proposal.merchantBaseUrl,
orderId: proposal.orderId,
proposalId: proposal.proposalId,
proposalTimestamp: proposal.timestamp,
- lastError: proposal.lastError,
- retryInfo: proposal.retryInfo,
+ lastError: retryRecord?.lastError,
+ retryInfo: retryRecord?.retryInfo,
});
}
});
}
async function gatherDepositPending(
- tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>,
+ tx: GetReadOnlyAccess<{
+ depositGroups: typeof WalletStoresV1.depositGroups;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
@@ -169,32 +198,42 @@ async function gatherDepositPending(
if (dg.timestampFinished) {
return;
}
- const timestampDue = dg.retryInfo?.nextRetry ?? AbsoluteTime.now();
+ const opId = RetryTags.forDeposit(dg);
+ const retryRecord = await tx.operationRetries.get(opId);
+ const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.Deposit,
+ id: opId,
givesLifeness: true,
timestampDue,
depositGroupId: dg.depositGroupId,
- lastError: dg.lastError,
- retryInfo: dg.retryInfo,
+ lastError: retryRecord?.lastError,
+ retryInfo: retryRecord?.retryInfo,
});
}
}
async function gatherTipPending(
- tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>,
+ tx: GetReadOnlyAccess<{
+ tips: typeof WalletStoresV1.tips;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.tips.iter().forEach((tip) => {
+ await tx.tips.iter().forEachAsync(async (tip) => {
+ // FIXME: The tip record needs a proper status field!
if (tip.pickedUpTimestamp) {
return;
}
+ const opId = RetryTags.forTipPickup(tip);
+ const retryRecord = await tx.operationRetries.get(opId);
if (tip.acceptedTimestamp) {
resp.pendingOperations.push({
type: PendingTaskType.TipPickup,
+ id: opId,
givesLifeness: true,
- timestampDue: tip.retryInfo.nextRetry,
+ timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
merchantBaseUrl: tip.merchantBaseUrl,
tipId: tip.walletTipId,
merchantTipId: tip.merchantTipId,
@@ -204,56 +243,77 @@ async function gatherTipPending(
}
async function gatherPurchasePending(
- tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>,
+ tx: GetReadOnlyAccess<{
+ purchases: typeof WalletStoresV1.purchases;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.purchases.iter().forEach((pr) => {
+ // FIXME: Only iter purchases with some "active" flag!
+ await tx.purchases.iter().forEachAsync(async (pr) => {
if (
pr.paymentSubmitPending &&
pr.abortStatus === AbortStatus.None &&
!pr.payFrozen
) {
- const timestampDue = pr.payRetryInfo?.nextRetry ?? AbsoluteTime.now();
+ const payOpId = RetryTags.forPay(pr);
+ const payRetryRecord = await tx.operationRetries.get(payOpId);
+
+ const timestampDue =
+ payRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.Pay,
+ id: payOpId,
givesLifeness: true,
timestampDue,
isReplay: false,
proposalId: pr.proposalId,
- retryInfo: pr.payRetryInfo,
- lastError: pr.lastPayError,
+ retryInfo: payRetryRecord?.retryInfo,
+ lastError: payRetryRecord?.lastError,
});
}
if (pr.refundQueryRequested) {
+ const refundQueryOpId = RetryTags.forRefundQuery(pr);
+ const refundQueryRetryRecord = await tx.operationRetries.get(
+ refundQueryOpId,
+ );
resp.pendingOperations.push({
type: PendingTaskType.RefundQuery,
+ id: refundQueryOpId,
givesLifeness: true,
- timestampDue: pr.refundStatusRetryInfo.nextRetry,
+ timestampDue:
+ refundQueryRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
proposalId: pr.proposalId,
- retryInfo: pr.refundStatusRetryInfo,
- lastError: pr.lastRefundStatusError,
+ retryInfo: refundQueryRetryRecord?.retryInfo,
+ lastError: refundQueryRetryRecord?.lastError,
});
}
});
}
async function gatherRecoupPending(
- tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>,
+ tx: GetReadOnlyAccess<{
+ recoupGroups: typeof WalletStoresV1.recoupGroups;
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.recoupGroups.iter().forEach((rg) => {
+ await tx.recoupGroups.iter().forEachAsync(async (rg) => {
if (rg.timestampFinished) {
return;
}
+ const opId = RetryTags.forRecoup(rg);
+ const retryRecord = await tx.operationRetries.get(opId);
resp.pendingOperations.push({
type: PendingTaskType.Recoup,
+ id: opId,
givesLifeness: true,
- timestampDue: rg.retryInfo.nextRetry,
+ timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
recoupGroupId: rg.recoupGroupId,
- retryInfo: rg.retryInfo,
- lastError: rg.lastError,
+ retryInfo: retryRecord?.retryInfo,
+ lastError: retryRecord?.lastError,
});
});
}
@@ -261,14 +321,18 @@ async function gatherRecoupPending(
async function gatherBackupPending(
tx: GetReadOnlyAccess<{
backupProviders: typeof WalletStoresV1.backupProviders;
+ operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
- await tx.backupProviders.iter().forEach((bp) => {
+ await tx.backupProviders.iter().forEachAsync(async (bp) => {
+ const opId = RetryTags.forBackup(bp);
+ const retryRecord = await tx.operationRetries.get(opId);
if (bp.state.tag === BackupProviderStateTag.Ready) {
resp.pendingOperations.push({
type: PendingTaskType.Backup,
+ id: opId,
givesLifeness: false,
timestampDue: AbsoluteTime.fromTimestamp(bp.state.nextBackupTimestamp),
backupProviderBaseUrl: bp.baseUrl,
@@ -277,11 +341,12 @@ async function gatherBackupPending(
} else if (bp.state.tag === BackupProviderStateTag.Retrying) {
resp.pendingOperations.push({
type: PendingTaskType.Backup,
+ id: opId,
givesLifeness: false,
- timestampDue: bp.state.retryInfo.nextRetry,
+ timestampDue: retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(),
backupProviderBaseUrl: bp.baseUrl,
- retryInfo: bp.state.retryInfo,
- lastError: bp.state.lastError,
+ retryInfo: retryRecord?.retryInfo,
+ lastError: retryRecord?.lastError,
});
}
});
@@ -305,6 +370,7 @@ export async function getPendingOperations(
planchets: x.planchets,
depositGroups: x.depositGroups,
recoupGroups: x.recoupGroups,
+ operationRetries: x.operationRetries,
}))
.runReadWrite(async (tx) => {
const resp: PendingOperationsResponse = {
diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts
index 283707947..387c23f41 100644
--- a/packages/taler-wallet-core/src/operations/recoup.ts
+++ b/packages/taler-wallet-core/src/operations/recoup.ts
@@ -42,6 +42,8 @@ import {
CoinRecord,
CoinSourceType,
CoinStatus,
+ OperationAttemptResult,
+ OperationAttemptResultType,
RecoupGroupRecord,
RefreshCoinSource,
ReserveRecordStatus,
@@ -52,64 +54,13 @@ import {
import { InternalWalletState } from "../internal-wallet-state.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { GetReadWriteAccess } from "../util/query.js";
-import { RetryInfo } from "../util/retries.js";
+import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";
import { guardOperationException } from "./common.js";
import { createRefreshGroup, processRefreshGroup } from "./refresh.js";
import { internalCreateWithdrawalGroup } from "./withdraw.js";
const logger = new Logger("operations/recoup.ts");
-async function setupRecoupRetry(
- ws: InternalWalletState,
- recoupGroupId: string,
- options: {
- reset: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- recoupGroups: x.recoupGroups,
- }))
- .runReadWrite(async (tx) => {
- const r = await tx.recoupGroups.get(recoupGroupId);
- if (!r) {
- return;
- }
- if (options.reset) {
- r.retryInfo = RetryInfo.reset();
- } else {
- r.retryInfo = RetryInfo.increment(r.retryInfo);
- }
- delete r.lastError;
- await tx.recoupGroups.put(r);
- });
-}
-
-async function reportRecoupError(
- ws: InternalWalletState,
- recoupGroupId: string,
- err: TalerErrorDetail,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- recoupGroups: x.recoupGroups,
- }))
- .runReadWrite(async (tx) => {
- const r = await tx.recoupGroups.get(recoupGroupId);
- if (!r) {
- return;
- }
- if (!r.retryInfo) {
- logger.error(
- "reporting error for inactive recoup group (no retry info)",
- );
- }
- r.lastError = err;
- await tx.recoupGroups.put(r);
- });
- ws.notify({ type: NotificationType.RecoupOperationError, error: err });
-}
-
/**
* Store a recoup group record in the database after marking
* a coin in the group as finished.
@@ -353,25 +304,20 @@ export async function processRecoupGroup(
forceNow?: boolean;
} = {},
): Promise<void> {
- await ws.memoProcessRecoup.memo(recoupGroupId, async () => {
- const onOpErr = (e: TalerErrorDetail): Promise<void> =>
- reportRecoupError(ws, recoupGroupId, e);
- return await guardOperationException(
- async () => await processRecoupGroupImpl(ws, recoupGroupId, options),
- onOpErr,
- );
- });
+ await runOperationHandlerForResult(
+ await processRecoupGroupHandler(ws, recoupGroupId, options),
+ );
+ return;
}
-async function processRecoupGroupImpl(
+export async function processRecoupGroupHandler(
ws: InternalWalletState,
recoupGroupId: string,
options: {
forceNow?: boolean;
} = {},
-): Promise<void> {
+): Promise<OperationAttemptResult> {
const forceNow = options.forceNow ?? false;
- await setupRecoupRetry(ws, recoupGroupId, { reset: forceNow });
let recoupGroup = await ws.db
.mktx((x) => ({
recoupGroups: x.recoupGroups,
@@ -380,11 +326,11 @@ async function processRecoupGroupImpl(
return tx.recoupGroups.get(recoupGroupId);
});
if (!recoupGroup) {
- return;
+ return OperationAttemptResult.finishedEmpty();
}
if (recoupGroup.timestampFinished) {
logger.trace("recoup group finished");
- return;
+ return OperationAttemptResult.finishedEmpty();
}
const ps = recoupGroup.coinPubs.map(async (x, i) => {
try {
@@ -404,12 +350,12 @@ async function processRecoupGroupImpl(
return tx.recoupGroups.get(recoupGroupId);
});
if (!recoupGroup) {
- return;
+ return OperationAttemptResult.finishedEmpty();
}
for (const b of recoupGroup.recoupFinishedPerCoin) {
if (!b) {
- return;
+ return OperationAttemptResult.finishedEmpty();
}
}
@@ -480,8 +426,6 @@ async function processRecoupGroupImpl(
return;
}
rg2.timestampFinished = TalerProtocolTimestamp.now();
- rg2.retryInfo = RetryInfo.reset();
- rg2.lastError = undefined;
if (rg2.scheduleRefreshCoins.length > 0) {
const refreshGroupId = await createRefreshGroup(
ws,
@@ -495,6 +439,7 @@ async function processRecoupGroupImpl(
}
await tx.recoupGroups.put(rg2);
});
+ return OperationAttemptResult.finishedEmpty();
}
export async function createRecoupGroup(
@@ -514,10 +459,8 @@ export async function createRecoupGroup(
recoupGroupId,
exchangeBaseUrl: exchangeBaseUrl,
coinPubs: coinPubs,
- lastError: undefined,
timestampFinished: undefined,
timestampStarted: TalerProtocolTimestamp.now(),
- retryInfo: RetryInfo.reset(),
recoupFinishedPerCoin: coinPubs.map(() => false),
// Will be populated later
oldAmountPerCoin: [],
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts
index 64a734bb3..64b85a040 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -57,6 +57,8 @@ import {
CoinSourceType,
CoinStatus,
DenominationRecord,
+ OperationAttemptResult,
+ OperationAttemptResultType,
OperationStatus,
RefreshCoinStatus,
RefreshGroupRecord,
@@ -74,7 +76,7 @@ import {
} from "../util/http.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
-import { RetryInfo } from "../util/retries.js";
+import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";
import { guardOperationException } from "./common.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import {
@@ -133,11 +135,9 @@ function updateGroupStatus(rg: RefreshGroupRecord): void {
if (allDone) {
if (anyFrozen) {
rg.frozen = true;
- rg.retryInfo = RetryInfo.reset();
} else {
rg.timestampFinished = AbsoluteTime.toTimestamp(AbsoluteTime.now());
rg.operationStatus = OperationStatus.Finished;
- rg.retryInfo = RetryInfo.reset();
}
}
}
@@ -730,89 +730,14 @@ async function refreshReveal(
});
}
-async function setupRefreshRetry(
- ws: InternalWalletState,
- refreshGroupId: string,
- options: {
- reset: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- refreshGroups: x.refreshGroups,
- }))
- .runReadWrite(async (tx) => {
- const r = await tx.refreshGroups.get(refreshGroupId);
- if (!r) {
- return;
- }
- if (options.reset) {
- r.retryInfo = RetryInfo.reset();
- } else {
- r.retryInfo = RetryInfo.increment(r.retryInfo);
- }
- delete r.lastError;
- await tx.refreshGroups.put(r);
- });
-}
-
-async function reportRefreshError(
- ws: InternalWalletState,
- refreshGroupId: string,
- err: TalerErrorDetail | undefined,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- refreshGroups: x.refreshGroups,
- }))
- .runReadWrite(async (tx) => {
- const r = await tx.refreshGroups.get(refreshGroupId);
- if (!r) {
- return;
- }
- if (!r.retryInfo) {
- logger.error(
- "reported error for inactive refresh group (no retry info)",
- );
- }
- r.lastError = err;
- await tx.refreshGroups.put(r);
- });
- if (err) {
- ws.notify({ type: NotificationType.RefreshOperationError, error: err });
- }
-}
-
-/**
- * Actually process a refresh group that has been created.
- */
export async function processRefreshGroup(
ws: InternalWalletState,
refreshGroupId: string,
options: {
forceNow?: boolean;
} = {},
-): Promise<void> {
- await ws.memoProcessRefresh.memo(refreshGroupId, async () => {
- const onOpErr = (e: TalerErrorDetail): Promise<void> =>
- reportRefreshError(ws, refreshGroupId, e);
- return await guardOperationException(
- async () => await processRefreshGroupImpl(ws, refreshGroupId, options),
- onOpErr,
- );
- });
-}
-
-async function processRefreshGroupImpl(
- ws: InternalWalletState,
- refreshGroupId: string,
- options: {
- forceNow?: boolean;
- } = {},
-): Promise<void> {
- const forceNow = options.forceNow ?? false;
+): Promise<OperationAttemptResult> {
logger.info(`processing refresh group ${refreshGroupId}`);
- await setupRefreshRetry(ws, refreshGroupId, { reset: forceNow });
const refreshGroup = await ws.db
.mktx((x) => ({
@@ -822,10 +747,16 @@ async function processRefreshGroupImpl(
return tx.refreshGroups.get(refreshGroupId);
});
if (!refreshGroup) {
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
if (refreshGroup.timestampFinished) {
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
// Process refresh sessions of the group in parallel.
logger.trace("processing refresh sessions for old coins");
@@ -855,6 +786,10 @@ async function processRefreshGroupImpl(
logger.warn("process refresh sessions got exception");
logger.warn(`exception: ${e}`);
}
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
async function processRefreshSession(
@@ -975,13 +910,11 @@ export async function createRefreshGroup(
operationStatus: OperationStatus.Pending,
timestampFinished: undefined,
statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending),
- lastError: undefined,
lastErrorPerCoin: {},
oldCoinPubs: oldCoinPubs.map((x) => x.coinPub),
reason,
refreshGroupId,
refreshSessionPerCoin: oldCoinPubs.map(() => undefined),
- retryInfo: RetryInfo.reset(),
inputPerCoin,
estimatedOutputPerCoin,
timestampCreated: TalerProtocolTimestamp.now(),
@@ -1034,7 +967,7 @@ function getAutoRefreshExecuteThreshold(d: DenominationRecord): AbsoluteTime {
export async function autoRefresh(
ws: InternalWalletState,
exchangeBaseUrl: string,
-): Promise<void> {
+): Promise<OperationAttemptResult> {
logger.info(`doing auto-refresh check for '${exchangeBaseUrl}'`);
// We must make sure that the exchange is up-to-date so that
@@ -1109,4 +1042,5 @@ export async function autoRefresh(
exchange.nextRefreshCheck = AbsoluteTime.toTimestamp(minCheckThreshold);
await tx.exchanges.put(exchange);
});
+ return OperationAttemptResult.finishedEmpty();
}
diff --git a/packages/taler-wallet-core/src/operations/refund.ts b/packages/taler-wallet-core/src/operations/refund.ts
index 8f5c1143a..bc8c185db 100644
--- a/packages/taler-wallet-core/src/operations/refund.ts
+++ b/packages/taler-wallet-core/src/operations/refund.ts
@@ -51,6 +51,7 @@ import {
import {
AbortStatus,
CoinStatus,
+ OperationAttemptResult,
PurchaseRecord,
RefundReason,
RefundState,
@@ -60,8 +61,6 @@ import { InternalWalletState } from "../internal-wallet-state.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
-import { RetryInfo } from "../util/retries.js";
-import { guardOperationException } from "./common.js";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js";
const logger = new Logger("refund.ts");
@@ -120,68 +119,6 @@ export async function prepareRefund(
},
};
}
-/**
- * Retry querying and applying refunds for an order later.
- */
-async function setupPurchaseQueryRefundRetry(
- ws: InternalWalletState,
- proposalId: string,
- options: {
- reset: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- purchases: x.purchases,
- }))
- .runReadWrite(async (tx) => {
- const pr = await tx.purchases.get(proposalId);
- if (!pr) {
- return;
- }
- if (options.reset) {
- pr.refundStatusRetryInfo = RetryInfo.reset();
- } else {
- pr.refundStatusRetryInfo = RetryInfo.increment(
- pr.refundStatusRetryInfo,
- );
- }
- await tx.purchases.put(pr);
- });
-}
-
-/**
- * Report an error that happending when querying for a purchase's refund.
- */
-async function reportPurchaseQueryRefundError(
- ws: InternalWalletState,
- proposalId: string,
- err: TalerErrorDetail,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- purchases: x.purchases,
- }))
- .runReadWrite(async (tx) => {
- const pr = await tx.purchases.get(proposalId);
- if (!pr) {
- return;
- }
- if (!pr.refundStatusRetryInfo) {
- logger.error(
- "reported error on an inactive purchase (no refund status retry info)",
- );
- }
- pr.lastRefundStatusError = err;
- await tx.purchases.put(pr);
- });
- if (err) {
- ws.notify({
- type: NotificationType.RefundStatusOperationError,
- error: err,
- });
- }
-}
function getRefundKey(d: MerchantCoinRefundStatus): string {
return `${d.coin_pub}-${d.rtransaction_id}`;
@@ -492,8 +429,6 @@ async function acceptRefunds(
if (queryDone) {
p.timestampLastRefundStatus = now;
- p.lastRefundStatusError = undefined;
- p.refundStatusRetryInfo = RetryInfo.reset();
p.refundQueryRequested = false;
if (p.abortStatus === AbortStatus.AbortRefund) {
p.abortStatus = AbortStatus.AbortFinished;
@@ -502,8 +437,6 @@ async function acceptRefunds(
} else {
// No error, but we need to try again!
p.timestampLastRefundStatus = now;
- p.refundStatusRetryInfo = RetryInfo.increment(p.refundStatusRetryInfo);
- p.lastRefundStatusError = undefined;
logger.trace("refund query not done");
}
@@ -621,8 +554,6 @@ export async function applyRefundFromPurchaseId(
return false;
}
p.refundQueryRequested = true;
- p.lastRefundStatusError = undefined;
- p.refundStatusRetryInfo = RetryInfo.reset();
await tx.purchases.put(p);
return true;
});
@@ -631,7 +562,7 @@ export async function applyRefundFromPurchaseId(
ws.notify({
type: NotificationType.RefundStarted,
});
- await processPurchaseQueryRefundImpl(ws, proposalId, {
+ await processPurchaseQueryRefund(ws, proposalId, {
forceNow: true,
waitForAutoRefund: false,
});
@@ -672,22 +603,6 @@ export async function applyRefundFromPurchaseId(
};
}
-export async function processPurchaseQueryRefund(
- ws: InternalWalletState,
- proposalId: string,
- options: {
- forceNow?: boolean;
- waitForAutoRefund?: boolean;
- } = {},
-): Promise<void> {
- const onOpErr = (e: TalerErrorDetail): Promise<void> =>
- reportPurchaseQueryRefundError(ws, proposalId, e);
- await guardOperationException(
- () => processPurchaseQueryRefundImpl(ws, proposalId, options),
- onOpErr,
- );
-}
-
async function queryAndSaveAwaitingRefund(
ws: InternalWalletState,
purchase: PurchaseRecord,
@@ -742,17 +657,15 @@ async function queryAndSaveAwaitingRefund(
return refundAwaiting;
}
-async function processPurchaseQueryRefundImpl(
+export async function processPurchaseQueryRefund(
ws: InternalWalletState,
proposalId: string,
options: {
forceNow?: boolean;
waitForAutoRefund?: boolean;
} = {},
-): Promise<void> {
- const forceNow = options.forceNow ?? false;
+): Promise<OperationAttemptResult> {
const waitForAutoRefund = options.waitForAutoRefund ?? false;
- await setupPurchaseQueryRefundRetry(ws, proposalId, { reset: forceNow });
const purchase = await ws.db
.mktx((x) => ({
purchases: x.purchases,
@@ -761,11 +674,11 @@ async function processPurchaseQueryRefundImpl(
return tx.purchases.get(proposalId);
});
if (!purchase) {
- return;
+ return OperationAttemptResult.finishedEmpty();
}
if (!purchase.refundQueryRequested) {
- return;
+ return OperationAttemptResult.finishedEmpty();
}
if (purchase.timestampFirstSuccessfulPay) {
@@ -780,7 +693,9 @@ async function processPurchaseQueryRefundImpl(
purchase,
waitForAutoRefund,
);
- if (Amounts.isZero(awaitingAmount)) return;
+ if (Amounts.isZero(awaitingAmount)) {
+ return OperationAttemptResult.finishedEmpty();
+ }
}
const requestUrl = new URL(
@@ -873,6 +788,7 @@ async function processPurchaseQueryRefundImpl(
}
await acceptRefunds(ws, proposalId, refunds, RefundReason.AbortRefund);
}
+ return OperationAttemptResult.finishedEmpty();
}
export async function abortFailedPayWithRefund(
@@ -899,8 +815,6 @@ export async function abortFailedPayWithRefund(
purchase.refundQueryRequested = true;
purchase.paymentSubmitPending = false;
purchase.abortStatus = AbortStatus.AbortRefund;
- purchase.lastPayError = undefined;
- purchase.payRetryInfo = RetryInfo.reset();
await tx.purchases.put(purchase);
});
processPurchaseQueryRefund(ws, proposalId, {
diff --git a/packages/taler-wallet-core/src/operations/tip.ts b/packages/taler-wallet-core/src/operations/tip.ts
index 7148999c5..f19be91b2 100644
--- a/packages/taler-wallet-core/src/operations/tip.ts
+++ b/packages/taler-wallet-core/src/operations/tip.ts
@@ -18,29 +18,45 @@
* Imports.
*/
import {
- Amounts, BlindedDenominationSignature,
- codecForMerchantTipResponseV2, codecForTipPickupGetResponse, DenomKeyType, encodeCrock, getRandomBytes, j2s, Logger, NotificationType, parseTipUri, PrepareTipResult, TalerErrorCode, TalerErrorDetail, TalerProtocolTimestamp, TipPlanchetDetail, URL
+ Amounts,
+ BlindedDenominationSignature,
+ codecForMerchantTipResponseV2,
+ codecForTipPickupGetResponse,
+ DenomKeyType,
+ encodeCrock,
+ getRandomBytes,
+ j2s,
+ Logger,
+ parseTipUri,
+ PrepareTipResult,
+ TalerErrorCode,
+ TalerProtocolTimestamp,
+ TipPlanchetDetail,
+ URL,
} from "@gnu-taler/taler-util";
import { DerivedTipPlanchet } from "../crypto/cryptoTypes.js";
import {
CoinRecord,
CoinSourceType,
- CoinStatus, DenominationRecord, TipRecord
+ CoinStatus,
+ DenominationRecord,
+ OperationAttemptResult,
+ OperationAttemptResultType,
+ TipRecord,
} from "../db.js";
import { makeErrorDetail } from "../errors.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
getHttpResponseErrorDetails,
- readSuccessResponseJsonOrThrow
+ readSuccessResponseJsonOrThrow,
} from "../util/http.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
-import {
- RetryInfo
-} from "../util/retries.js";
-import { guardOperationException } from "./common.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import {
- getCandidateWithdrawalDenoms, getExchangeWithdrawalInfo, selectWithdrawalDenominations, updateWithdrawalDenoms
+ getCandidateWithdrawalDenoms,
+ getExchangeWithdrawalInfo,
+ selectWithdrawalDenominations,
+ updateWithdrawalDenoms,
} from "./withdraw.js";
const logger = new Logger("operations/tip.ts");
@@ -114,8 +130,6 @@ export async function prepareTip(
createdTimestamp: TalerProtocolTimestamp.now(),
merchantTipId: res.merchantTipId,
tipAmountEffective: selectedDenoms.totalCoinValue,
- retryInfo: RetryInfo.reset(),
- lastError: undefined,
denomsSel: selectedDenoms,
pickedUpTimestamp: undefined,
secretSeed,
@@ -144,82 +158,13 @@ export async function prepareTip(
return tipStatus;
}
-async function reportTipError(
- ws: InternalWalletState,
- walletTipId: string,
- err: TalerErrorDetail,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- tips: x.tips,
- }))
- .runReadWrite(async (tx) => {
- const t = await tx.tips.get(walletTipId);
- if (!t) {
- return;
- }
- if (!t.retryInfo) {
- logger.reportBreak();
- }
- t.lastError = err;
- await tx.tips.put(t);
- });
- if (err) {
- ws.notify({ type: NotificationType.TipOperationError, error: err });
- }
-}
-
-async function setupTipRetry(
- ws: InternalWalletState,
- walletTipId: string,
- options: {
- reset: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({
- tips: x.tips,
- }))
- .runReadWrite(async (tx) => {
- const t = await tx.tips.get(walletTipId);
- if (!t) {
- return;
- }
- if (options.reset) {
- t.retryInfo = RetryInfo.reset();
- } else {
- t.retryInfo = RetryInfo.increment(t.retryInfo);
- }
- delete t.lastError;
- await tx.tips.put(t);
- });
-}
-
export async function processTip(
ws: InternalWalletState,
- tipId: string,
- options: {
- forceNow?: boolean;
- } = {},
-): Promise<void> {
- const onOpErr = (e: TalerErrorDetail): Promise<void> =>
- reportTipError(ws, tipId, e);
- await guardOperationException(
- () => processTipImpl(ws, tipId, options),
- onOpErr,
- );
-}
-
-async function processTipImpl(
- ws: InternalWalletState,
walletTipId: string,
options: {
forceNow?: boolean;
} = {},
-): Promise<void> {
- const forceNow = options.forceNow ?? false;
- await setupTipRetry(ws, walletTipId, { reset: forceNow });
-
+): Promise<OperationAttemptResult> {
const tipRecord = await ws.db
.mktx((x) => ({
tips: x.tips,
@@ -228,12 +173,18 @@ async function processTipImpl(
return tx.tips.get(walletTipId);
});
if (!tipRecord) {
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
if (tipRecord.pickedUpTimestamp) {
logger.warn("tip already picked up");
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
const denomsForWithdraw = tipRecord.denomsSel;
@@ -284,22 +235,21 @@ async function processTipImpl(
logger.trace(`got tip response, status ${merchantResp.status}`);
- // Hide transient errors.
+ // FIXME: Why do we do this?
if (
- tipRecord.retryInfo.retryCounter < 5 &&
- ((merchantResp.status >= 500 && merchantResp.status <= 599) ||
- merchantResp.status === 424)
+ (merchantResp.status >= 500 && merchantResp.status <= 599) ||
+ merchantResp.status === 424
) {
logger.trace(`got transient tip error`);
// FIXME: wrap in another error code that indicates a transient error
- const err = makeErrorDetail(
- TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
- getHttpResponseErrorDetails(merchantResp),
- "tip pickup failed (transient)",
- );
- await reportTipError(ws, tipRecord.walletTipId, err);
- // FIXME: Maybe we want to signal to the caller that the transient error happened?
- return;
+ return {
+ type: OperationAttemptResultType.Error,
+ errorDetail: makeErrorDetail(
+ TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
+ getHttpResponseErrorDetails(merchantResp),
+ "tip pickup failed (transient)",
+ ),
+ };
}
let blindedSigs: BlindedDenominationSignature[] = [];
@@ -344,21 +294,14 @@ async function processTipImpl(
});
if (!isValid) {
- await ws.db
- .mktx((x) => ({ tips: x.tips }))
- .runReadWrite(async (tx) => {
- const tipRecord = await tx.tips.get(walletTipId);
- if (!tipRecord) {
- return;
- }
- tipRecord.lastError = makeErrorDetail(
- TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID,
- {},
- "invalid signature from the exchange (via merchant tip) after unblinding",
- );
- await tx.tips.put(tipRecord);
- });
- return;
+ return {
+ type: OperationAttemptResultType.Error,
+ errorDetail: makeErrorDetail(
+ TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID,
+ {},
+ "invalid signature from the exchange (via merchant tip) after unblinding",
+ ),
+ };
}
newCoinRecords.push({
@@ -395,13 +338,16 @@ async function processTipImpl(
return;
}
tr.pickedUpTimestamp = TalerProtocolTimestamp.now();
- tr.lastError = undefined;
- tr.retryInfo = RetryInfo.reset();
await tx.tips.put(tr);
for (const cr of newCoinRecords) {
await tx.coins.put(cr);
}
});
+
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
export async function acceptTip(
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts
index 956d565a6..5a96fc6ff 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -38,7 +38,6 @@ import { InternalWalletState } from "../internal-wallet-state.js";
import {
AbortStatus,
RefundState,
- ReserveRecordStatus,
WalletRefundItem,
WithdrawalRecordType,
} from "../db.js";
@@ -48,6 +47,7 @@ import { processPurchasePay } from "./pay.js";
import { processRefreshGroup } from "./refresh.js";
import { processTip } from "./tip.js";
import { processWithdrawalGroup } from "./withdraw.js";
+import { RetryTags } from "../util/retries.js";
const logger = new Logger("taler-wallet-core:transactions.ts");
@@ -142,6 +142,7 @@ export async function getTransactions(
tombstones: x.tombstones,
peerPushPaymentInitiations: x.peerPushPaymentInitiations,
peerPullPaymentIncoming: x.peerPullPaymentIncoming,
+ operationRetries: x.operationRetries,
}))
.runReadOnly(async (tx) => {
tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => {
@@ -220,6 +221,10 @@ export async function getTransactions(
if (shouldSkipSearch(transactionsRequest, [])) {
return;
}
+
+ const opId = RetryTags.forWithdrawal(wsr);
+ const ort = await tx.operationRetries.get(opId);
+
let withdrawalDetails: WithdrawalDetails;
if (wsr.wgInfo.withdrawalType === WithdrawalRecordType.PeerPullCredit) {
transactions.push({
@@ -242,7 +247,7 @@ export async function getTransactions(
wsr.withdrawalGroupId,
),
frozen: false,
- ...(wsr.lastError ? { error: wsr.lastError } : {}),
+ ...(ort?.lastError ? { error: ort.lastError } : {}),
});
return;
} else if (
@@ -264,7 +269,7 @@ export async function getTransactions(
wsr.withdrawalGroupId,
),
frozen: false,
- ...(wsr.lastError ? { error: wsr.lastError } : {}),
+ ...(ort?.lastError ? { error: ort.lastError } : {}),
});
return;
} else if (
@@ -310,7 +315,7 @@ export async function getTransactions(
wsr.withdrawalGroupId,
),
frozen: false,
- ...(wsr.lastError ? { error: wsr.lastError } : {}),
+ ...(ort?.lastError ? { error: ort.lastError } : {}),
});
});
@@ -319,7 +324,8 @@ export async function getTransactions(
if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
return;
}
-
+ const opId = RetryTags.forDeposit(dg);
+ const retryRecord = await tx.operationRetries.get(opId);
transactions.push({
type: TransactionType.Deposit,
amountRaw: Amounts.stringify(dg.effectiveDepositAmount),
@@ -333,7 +339,7 @@ export async function getTransactions(
dg.depositGroupId,
),
depositGroupId: dg.depositGroupId,
- ...(dg.lastError ? { error: dg.lastError } : {}),
+ ...(retryRecord?.lastError ? { error: retryRecord.lastError } : {}),
});
});
@@ -456,7 +462,15 @@ export async function getTransactions(
});
}
- const err = pr.lastPayError ?? pr.lastRefundStatusError;
+ const payOpId = RetryTags.forPay(pr);
+ const refundQueryOpId = RetryTags.forRefundQuery(pr);
+ const payRetryRecord = await tx.operationRetries.get(payOpId);
+ const refundQueryRetryRecord = await tx.operationRetries.get(
+ refundQueryOpId,
+ );
+
+ const err =
+ refundQueryRetryRecord?.lastError ?? payRetryRecord?.lastError;
transactions.push({
type: TransactionType.Payment,
amountRaw: Amounts.stringify(contractData.amount),
@@ -495,6 +509,8 @@ export async function getTransactions(
if (!tipRecord.acceptedTimestamp) {
return;
}
+ const opId = RetryTags.forTipPickup(tipRecord);
+ const retryRecord = await tx.operationRetries.get(opId);
transactions.push({
type: TransactionType.Tip,
amountEffective: Amounts.stringify(tipRecord.tipAmountEffective),
@@ -507,10 +523,7 @@ export async function getTransactions(
tipRecord.walletTipId,
),
merchantBaseUrl: tipRecord.merchantBaseUrl,
- // merchant: {
- // name: tipRecord.merchantBaseUrl,
- // },
- error: tipRecord.lastError,
+ error: retryRecord?.lastError,
});
});
});
@@ -589,7 +602,11 @@ export async function deleteTransaction(
): Promise<void> {
const [typeStr, ...rest] = transactionId.split(":");
const type = typeStr as TransactionType;
- if (type === TransactionType.Withdrawal || type === TransactionType.PeerPullCredit || type === TransactionType.PeerPushCredit) {
+ if (
+ type === TransactionType.Withdrawal ||
+ type === TransactionType.PeerPullCredit ||
+ type === TransactionType.PeerPushCredit
+ ) {
const withdrawalGroupId = rest[0];
await ws.db
.mktx((x) => ({
@@ -714,7 +731,9 @@ export async function deleteTransaction(
tombstones: x.tombstones,
}))
.runReadWrite(async (tx) => {
- const debit = await tx.peerPullPaymentIncoming.get(peerPullPaymentIncomingId);
+ const debit = await tx.peerPullPaymentIncoming.get(
+ peerPullPaymentIncomingId,
+ );
if (debit) {
await tx.peerPullPaymentIncoming.delete(peerPullPaymentIncomingId);
await tx.tombstones.put({
@@ -737,10 +756,7 @@ export async function deleteTransaction(
if (debit) {
await tx.peerPushPaymentInitiations.delete(pursePub);
await tx.tombstones.put({
- id: makeEventId(
- TombstoneTag.DeletePeerPushDebit,
- pursePub,
- ),
+ id: makeEventId(TombstoneTag.DeletePeerPushDebit, pursePub),
});
}
});
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index b80745316..ce5863b31 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -56,7 +56,6 @@ import {
WithdrawBatchResponse,
WithdrawResponse,
WithdrawUriInfoResponse,
- WithdrawUriResult,
} from "@gnu-taler/taler-util";
import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
import {
@@ -68,9 +67,10 @@ import {
DenomSelectionState,
ExchangeDetailsRecord,
ExchangeRecord,
+ OperationAttemptResult,
+ OperationAttemptResultType,
OperationStatus,
PlanchetRecord,
- ReserveBankInfo,
ReserveRecordStatus,
WalletStoresV1,
WgInfo,
@@ -98,7 +98,6 @@ import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "../versions.js";
-import { guardOperationException } from "./common.js";
import {
getExchangeDetails,
getExchangePaytoUri,
@@ -691,31 +690,12 @@ async function processPlanchetExchangeBatchRequest(
withdrawalGroup.exchangeBaseUrl,
).href;
- try {
- const resp = await ws.http.postJson(reqUrl, d);
- const r = await readSuccessResponseJsonOrThrow(
- resp,
- codecForWithdrawBatchResponse(),
- );
- return r;
- } catch (e) {
- const errDetail = getErrorDetailFromException(e);
- logger.trace("withdrawal batch request failed", e);
- logger.trace(e);
- await ws.db
- .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
- .runReadWrite(async (tx) => {
- let wg = await tx.withdrawalGroups.get(
- withdrawalGroup.withdrawalGroupId,
- );
- if (!wg) {
- return;
- }
- wg.lastError = errDetail;
- await tx.withdrawalGroups.put(wg);
- });
- return;
- }
+ const resp = await ws.http.postJson(reqUrl, d);
+ const r = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForWithdrawBatchResponse(),
+ );
+ return r;
}
async function processPlanchetVerifyAndStoreCoin(
@@ -951,50 +931,6 @@ export async function updateWithdrawalDenoms(
}
}
-async function setupWithdrawalRetry(
- ws: InternalWalletState,
- withdrawalGroupId: string,
- options: {
- reset: boolean;
- },
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
- .runReadWrite(async (tx) => {
- const wsr = await tx.withdrawalGroups.get(withdrawalGroupId);
- if (!wsr) {
- return;
- }
- if (options.reset) {
- wsr.retryInfo = RetryInfo.reset();
- } else {
- wsr.retryInfo = RetryInfo.increment(wsr.retryInfo);
- }
- await tx.withdrawalGroups.put(wsr);
- });
-}
-
-async function reportWithdrawalError(
- ws: InternalWalletState,
- withdrawalGroupId: string,
- err: TalerErrorDetail,
-): Promise<void> {
- await ws.db
- .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
- .runReadWrite(async (tx) => {
- const wsr = await tx.withdrawalGroups.get(withdrawalGroupId);
- if (!wsr) {
- return;
- }
- if (!wsr.retryInfo) {
- logger.reportBreak();
- }
- wsr.lastError = err;
- await tx.withdrawalGroups.put(wsr);
- });
- ws.notify({ type: NotificationType.WithdrawOperationError, error: err });
-}
-
/**
* Update the information about a reserve that is stored in the wallet
* by querying the reserve's exchange.
@@ -1071,28 +1007,9 @@ async function queryReserve(
export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
- options: {
- forceNow?: boolean;
- } = {},
-): Promise<void> {
- const onOpErr = (e: TalerErrorDetail): Promise<void> =>
- reportWithdrawalError(ws, withdrawalGroupId, e);
- await guardOperationException(
- () => processWithdrawGroupImpl(ws, withdrawalGroupId, options),
- onOpErr,
- );
-}
-
-async function processWithdrawGroupImpl(
- ws: InternalWalletState,
- withdrawalGroupId: string,
- options: {
- forceNow?: boolean;
- } = {},
-): Promise<void> {
- const forceNow = options.forceNow ?? false;
- logger.trace("processing withdraw group", withdrawalGroupId);
- await setupWithdrawalRetry(ws, withdrawalGroupId, { reset: forceNow });
+ options: {} = {},
+): Promise<OperationAttemptResult> {
+ logger.trace("processing withdrawal group", withdrawalGroupId);
const withdrawalGroup = await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
.runReadOnly(async (tx) => {
@@ -1106,24 +1023,44 @@ async function processWithdrawGroupImpl(
switch (withdrawalGroup.reserveStatus) {
case ReserveRecordStatus.RegisteringBank:
await processReserveBankStatus(ws, withdrawalGroupId);
- return await processWithdrawGroupImpl(ws, withdrawalGroupId, {
+ return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,
});
case ReserveRecordStatus.QueryingStatus: {
const res = await queryReserve(ws, withdrawalGroupId);
if (res.ready) {
- return await processWithdrawGroupImpl(ws, withdrawalGroupId, {
+ return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,
});
}
- return;
+ return {
+ type: OperationAttemptResultType.Pending,
+ result: undefined,
+ };
+ }
+ case ReserveRecordStatus.WaitConfirmBank: {
+ const res = await processReserveBankStatus(ws, withdrawalGroupId);
+ switch (res.status) {
+ case BankStatusResultCode.Aborted:
+ case BankStatusResultCode.Done:
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
+ case BankStatusResultCode.Waiting: {
+ return {
+ type: OperationAttemptResultType.Pending,
+ result: undefined,
+ };
+ }
+ }
}
- case ReserveRecordStatus.WaitConfirmBank:
- await processReserveBankStatus(ws, withdrawalGroupId);
- return;
case ReserveRecordStatus.BankAborted:
// FIXME
- return;
+ return {
+ type: OperationAttemptResultType.Pending,
+ result: undefined,
+ };
case ReserveRecordStatus.Dormant:
// We can try to withdraw, nothing needs to be done with the reserve.
break;
@@ -1150,11 +1087,12 @@ async function processWithdrawGroupImpl(
return;
}
wg.operationStatus = OperationStatus.Finished;
- delete wg.lastError;
- delete wg.retryInfo;
await tx.withdrawalGroups.put(wg);
});
- return;
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms
@@ -1175,7 +1113,7 @@ async function processWithdrawGroupImpl(
if (ws.batchWithdrawal) {
const resp = await processPlanchetExchangeBatchRequest(ws, withdrawalGroup);
if (!resp) {
- return;
+ throw Error("unable to do batch withdrawal");
}
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
work.push(
@@ -1236,8 +1174,6 @@ async function processWithdrawGroupImpl(
finishedForFirstTime = true;
wg.timestampFinish = TalerProtocolTimestamp.now();
wg.operationStatus = OperationStatus.Finished;
- delete wg.lastError;
- wg.retryInfo = RetryInfo.reset();
}
await tx.withdrawalGroups.put(wg);
@@ -1259,6 +1195,11 @@ async function processWithdrawGroupImpl(
reservePub: withdrawalGroup.reservePub,
});
}
+
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
const AGE_MASK_GROUPS = "8:10:12:14:16:18".split(":").map(n => parseInt(n, 10))
@@ -1529,10 +1470,7 @@ async function getWithdrawalGroupRecordTx(
}
export function getReserveRequestTimeout(r: WithdrawalGroupRecord): Duration {
- return Duration.max(
- { d_ms: 60000 },
- Duration.min({ d_ms: 5000 }, RetryInfo.getDuration(r.retryInfo)),
- );
+ return { d_ms: 60000 };
}
export function getBankStatusUrl(talerWithdrawUri: string): string {
@@ -1611,17 +1549,25 @@ async function registerReserveWithBank(
);
r.reserveStatus = ReserveRecordStatus.WaitConfirmBank;
r.operationStatus = OperationStatus.Pending;
- r.retryInfo = RetryInfo.reset();
await tx.withdrawalGroups.put(r);
});
ws.notify({ type: NotificationType.ReserveRegisteredWithBank });
- return processReserveBankStatus(ws, withdrawalGroupId);
+}
+
+enum BankStatusResultCode {
+ Done = "done",
+ Waiting = "waiting",
+ Aborted = "aborted",
+}
+
+interface BankStatusResult {
+ status: BankStatusResultCode;
}
async function processReserveBankStatus(
ws: InternalWalletState,
withdrawalGroupId: string,
-): Promise<void> {
+): Promise<BankStatusResult> {
const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, {
withdrawalGroupId,
});
@@ -1630,17 +1576,21 @@ async function processReserveBankStatus(
case ReserveRecordStatus.RegisteringBank:
break;
default:
- return;
+ return {
+ status: BankStatusResultCode.Done,
+ };
}
if (
withdrawalGroup.wgInfo.withdrawalType != WithdrawalRecordType.BankIntegrated
) {
- throw Error();
+ throw Error("wrong withdrawal record type");
}
const bankInfo = withdrawalGroup.wgInfo.bankInfo;
if (!bankInfo) {
- return;
+ return {
+ status: BankStatusResultCode.Done,
+ };
}
const bankStatusUrl = getBankStatusUrl(bankInfo.talerWithdrawUri);
@@ -1678,10 +1628,11 @@ async function processReserveBankStatus(
r.wgInfo.bankInfo.timestampBankConfirmed = now;
r.reserveStatus = ReserveRecordStatus.BankAborted;
r.operationStatus = OperationStatus.Finished;
- r.retryInfo = RetryInfo.reset();
await tx.withdrawalGroups.put(r);
});
- return;
+ return {
+ status: BankStatusResultCode.Aborted,
+ };
}
// Bank still needs to know our reserve info
@@ -1722,15 +1673,17 @@ async function processReserveBankStatus(
r.wgInfo.bankInfo.timestampBankConfirmed = now;
r.reserveStatus = ReserveRecordStatus.QueryingStatus;
r.operationStatus = OperationStatus.Pending;
- r.retryInfo = RetryInfo.reset();
} else {
logger.info("withdrawal: transfer not yet confirmed by bank");
r.wgInfo.bankInfo.confirmUrl = status.confirm_transfer_url;
r.senderWire = status.sender_wire;
- r.retryInfo = RetryInfo.increment(r.retryInfo);
}
await tx.withdrawalGroups.put(r);
});
+
+ return {
+ status: BankStatusResultCode.Done,
+ };
}
export async function internalCreateWithdrawalGroup(
@@ -1775,14 +1728,12 @@ export async function internalCreateWithdrawalGroup(
exchangeBaseUrl: canonExchange,
instructedAmount: amount,
timestampStart: now,
- lastError: undefined,
operationStatus: OperationStatus.Pending,
rawWithdrawalAmount: initialDenomSel.totalWithdrawCost,
secretSeed,
reservePriv: reserveKeyPair.priv,
reservePub: reserveKeyPair.pub,
reserveStatus: args.reserveStatus,
- retryInfo: RetryInfo.reset(),
withdrawalGroupId,
restrictAge: args.restrictAge,
senderWire: undefined,
diff --git a/packages/taler-wallet-core/src/pending-types.ts b/packages/taler-wallet-core/src/pending-types.ts
index 39df9d0cb..61c7136df 100644
--- a/packages/taler-wallet-core/src/pending-types.ts
+++ b/packages/taler-wallet-core/src/pending-types.ts
@@ -30,14 +30,12 @@ import {
AbsoluteTime,
TalerProtocolTimestamp,
} from "@gnu-taler/taler-util";
-import { ReserveRecordStatus } from "./db.js";
import { RetryInfo } from "./util/retries.js";
export enum PendingTaskType {
ExchangeUpdate = "exchange-update",
ExchangeCheckRefresh = "exchange-check-refresh",
Pay = "pay",
- ProposalChoice = "proposal-choice",
ProposalDownload = "proposal-download",
Refresh = "refresh",
Recoup = "recoup",
@@ -109,7 +107,7 @@ export interface PendingRefreshTask {
lastError?: TalerErrorDetail;
refreshGroupId: string;
finishedPerCoin: boolean[];
- retryInfo: RetryInfo;
+ retryInfo?: RetryInfo;
}
/**
@@ -126,17 +124,6 @@ export interface PendingProposalDownloadTask {
}
/**
- * User must choose whether to accept or reject the merchant's
- * proposed contract terms.
- */
-export interface PendingProposalChoiceOperation {
- type: PendingTaskType.ProposalChoice;
- merchantBaseUrl: string;
- proposalTimestamp: AbsoluteTime;
- proposalId: string;
-}
-
-/**
* The wallet is picking up a tip that the user has accepted.
*/
export interface PendingTipPickupTask {
@@ -165,14 +152,14 @@ export interface PendingPayTask {
export interface PendingRefundQueryTask {
type: PendingTaskType.RefundQuery;
proposalId: string;
- retryInfo: RetryInfo;
+ retryInfo?: RetryInfo;
lastError: TalerErrorDetail | undefined;
}
export interface PendingRecoupTask {
type: PendingTaskType.Recoup;
recoupGroupId: string;
- retryInfo: RetryInfo;
+ retryInfo?: RetryInfo;
lastError: TalerErrorDetail | undefined;
}
@@ -206,6 +193,11 @@ export interface PendingTaskInfoCommon {
type: PendingTaskType;
/**
+ * Unique identifier for the pending task.
+ */
+ id: string;
+
+ /**
* Set to true if the operation indicates that something is really in progress,
* as opposed to some regular scheduled operation that can be tried later.
*/
diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts
index e954e5c78..65b67eff2 100644
--- a/packages/taler-wallet-core/src/util/query.ts
+++ b/packages/taler-wallet-core/src/util/query.ts
@@ -152,6 +152,19 @@ class ResultStream<T> {
return arr;
}
+ async mapAsync<R>(f: (x: T) => Promise<R>): Promise<R[]> {
+ const arr: R[] = [];
+ while (true) {
+ const x = await this.next();
+ if (x.hasValue) {
+ arr.push(await f(x.value));
+ } else {
+ break;
+ }
+ }
+ return arr;
+ }
+
async forEachAsync(f: (x: T) => Promise<void>): Promise<void> {
while (true) {
const x = await this.next();
@@ -572,6 +585,26 @@ function makeWriteContext(
return ctx;
}
+const storeList = [
+ { name: "foo" as const, value: 1 as const },
+ { name: "bar" as const, value: 2 as const },
+];
+// => { foo: { value: 1}, bar: {value: 2} }
+
+type StoreList = typeof storeList;
+
+type StoreNames = StoreList[number] extends { name: infer I } ? I : never;
+
+type H = StoreList[number] & { name: "foo"};
+
+type Cleanup<V> = V extends { name: infer N, value: infer X} ? {name: N, value: X} : never;
+
+type G = {
+ [X in StoreNames]: {
+ X: StoreList[number] & { name: X };
+ };
+};
+
/**
* Type-safe access to a database with a particular store map.
*
@@ -584,6 +617,14 @@ export class DbAccess<StoreMap> {
return this.db;
}
+ mktx2<
+ StoreNames extends keyof StoreMap,
+ Stores extends StoreMap[StoreNames],
+ StoreList extends Stores[],
+ >(namePicker: (x: StoreMap) => StoreList): StoreList {
+ return namePicker(this.stores);
+ }
+
mktx<
PickerType extends (x: StoreMap) => unknown,
BoundStores extends GetPickerType<PickerType, StoreMap>,
diff --git a/packages/taler-wallet-core/src/util/retries.ts b/packages/taler-wallet-core/src/util/retries.ts
index 13a05b385..3a41e8348 100644
--- a/packages/taler-wallet-core/src/util/retries.ts
+++ b/packages/taler-wallet-core/src/util/retries.ts
@@ -21,7 +21,29 @@
/**
* Imports.
*/
-import { AbsoluteTime, Duration } from "@gnu-taler/taler-util";
+import {
+ AbsoluteTime,
+ Duration,
+ TalerErrorDetail,
+} from "@gnu-taler/taler-util";
+import {
+ BackupProviderRecord,
+ DepositGroupRecord,
+ ExchangeRecord,
+ OperationAttemptResult,
+ OperationAttemptResultType,
+ ProposalRecord,
+ PurchaseRecord,
+ RecoupGroupRecord,
+ RefreshGroupRecord,
+ TipRecord,
+ WalletStoresV1,
+ WithdrawalGroupRecord,
+} from "../db.js";
+import { TalerError } from "../errors.js";
+import { InternalWalletState } from "../internal-wallet-state.js";
+import { PendingTaskType } from "../pending-types.js";
+import { GetReadWriteAccess } from "./query.js";
export interface RetryInfo {
firstTry: AbsoluteTime;
@@ -108,3 +130,95 @@ export namespace RetryInfo {
return r2;
}
}
+
+export namespace RetryTags {
+ export function forWithdrawal(wg: WithdrawalGroupRecord): string {
+ return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}`;
+ }
+ export function forExchangeUpdate(exch: ExchangeRecord): string {
+ return `${PendingTaskType.ExchangeUpdate}:${exch.baseUrl}`;
+ }
+ export function forExchangeCheckRefresh(exch: ExchangeRecord): string {
+ return `${PendingTaskType.ExchangeCheckRefresh}:${exch.baseUrl}`;
+ }
+ export function forProposalClaim(pr: ProposalRecord): string {
+ return `${PendingTaskType.ProposalDownload}:${pr.proposalId}`;
+ }
+ export function forTipPickup(tipRecord: TipRecord): string {
+ return `${PendingTaskType.TipPickup}:${tipRecord.walletTipId}`;
+ }
+ export function forRefresh(refreshGroupRecord: RefreshGroupRecord): string {
+ return `${PendingTaskType.TipPickup}:${refreshGroupRecord.refreshGroupId}`;
+ }
+ export function forPay(purchaseRecord: PurchaseRecord): string {
+ return `${PendingTaskType.Pay}:${purchaseRecord.proposalId}`;
+ }
+ export function forRefundQuery(purchaseRecord: PurchaseRecord): string {
+ return `${PendingTaskType.RefundQuery}:${purchaseRecord.proposalId}`;
+ }
+ export function forRecoup(recoupRecord: RecoupGroupRecord): string {
+ return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}`;
+ }
+ export function forDeposit(depositRecord: DepositGroupRecord): string {
+ return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}`;
+ }
+ export function forBackup(backupRecord: BackupProviderRecord): string {
+ return `${PendingTaskType.Backup}:${backupRecord.baseUrl}`;
+ }
+}
+
+export async function scheduleRetryInTx(
+ ws: InternalWalletState,
+ tx: GetReadWriteAccess<{
+ operationRetries: typeof WalletStoresV1.operationRetries;
+ }>,
+ opId: string,
+ errorDetail?: TalerErrorDetail,
+): Promise<void> {
+ let retryRecord = await tx.operationRetries.get(opId);
+ if (!retryRecord) {
+ retryRecord = {
+ id: opId,
+ retryInfo: RetryInfo.reset(),
+ };
+ if (errorDetail) {
+ retryRecord.lastError = errorDetail;
+ }
+ } else {
+ retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
+ if (errorDetail) {
+ retryRecord.lastError = errorDetail;
+ } else {
+ delete retryRecord.lastError;
+ }
+ }
+ await tx.operationRetries.put(retryRecord);
+}
+
+export async function scheduleRetry(
+ ws: InternalWalletState,
+ opId: string,
+ errorDetail?: TalerErrorDetail,
+): Promise<void> {
+ return await ws.db
+ .mktx((x) => ({ operationRetries: x.operationRetries }))
+ .runReadWrite(async (tx) => {
+ scheduleRetryInTx(ws, tx, opId, errorDetail);
+ });
+}
+
+/**
+ * Run an operation handler, expect a success result and extract the success value.
+ */
+export async function runOperationHandlerForResult<T>(
+ res: OperationAttemptResult<T>,
+): Promise<T> {
+ switch (res.type) {
+ case OperationAttemptResultType.Finished:
+ return res.result;
+ case OperationAttemptResultType.Error:
+ throw TalerError.fromUncheckedDetail(res.errorDetail);
+ default:
+ throw Error(`unexpected operation result (${res.type})`);
+ }
+}
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 779fe9528..f041d9aa9 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -90,6 +90,7 @@ import {
ExchangeListItem,
OperationMap,
FeeDescription,
+ TalerErrorDetail,
} from "@gnu-taler/taler-util";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
import {
@@ -101,9 +102,15 @@ import {
CoinSourceType,
exportDb,
importDb,
+ OperationAttemptResult,
+ OperationAttemptResultType,
WalletStoresV1,
} from "./db.js";
-import { getErrorDetailFromException, TalerError } from "./errors.js";
+import {
+ getErrorDetailFromException,
+ makeErrorDetail,
+ TalerError,
+} from "./errors.js";
import { createDenominationTimeline } from "./index.browser.js";
import {
DenomInfo,
@@ -143,6 +150,7 @@ import {
getExchangeRequestTimeout,
getExchangeTrust,
updateExchangeFromUrl,
+ updateExchangeFromUrlHandler,
updateExchangeTermsOfService,
} from "./operations/exchanges.js";
import { getMerchantInfo } from "./operations/merchants.js";
@@ -162,7 +170,11 @@ import {
initiatePeerToPeerPush,
} from "./operations/peer-to-peer.js";
import { getPendingOperations } from "./operations/pending.js";
-import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js";
+import {
+ createRecoupGroup,
+ processRecoupGroup,
+ processRecoupGroupHandler,
+} from "./operations/recoup.js";
import {
autoRefresh,
createRefreshGroup,
@@ -210,6 +222,7 @@ import {
openPromise,
} from "./util/promiseUtils.js";
import { DbAccess, GetReadWriteAccess } from "./util/query.js";
+import { RetryInfo, runOperationHandlerForResult } from "./util/retries.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@@ -237,7 +250,12 @@ async function getWithdrawalDetailsForAmount(
amount: AmountJson,
restrictAge: number | undefined,
): Promise<ManualWithdrawalDetails> {
- const wi = await getExchangeWithdrawalInfo(ws, exchangeBaseUrl, amount, restrictAge);
+ const wi = await getExchangeWithdrawalInfo(
+ ws,
+ exchangeBaseUrl,
+ amount,
+ restrictAge,
+ );
const paytoUris = wi.exchangeDetails.wireInfo.accounts.map(
(x) => x.payto_uri,
);
@@ -253,55 +271,153 @@ async function getWithdrawalDetailsForAmount(
}
/**
- * Execute one operation based on the pending operation info record.
+ * Call the right handler for a pending operation without doing
+ * any special error handling.
*/
-async function processOnePendingOperation(
+async function callOperationHandler(
ws: InternalWalletState,
pending: PendingTaskInfo,
forceNow = false,
-): Promise<void> {
- logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`);
+): Promise<OperationAttemptResult<unknown, unknown>> {
switch (pending.type) {
case PendingTaskType.ExchangeUpdate:
- await updateExchangeFromUrl(ws, pending.exchangeBaseUrl, {
+ return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl, {
forceNow,
});
- break;
case PendingTaskType.Refresh:
- await processRefreshGroup(ws, pending.refreshGroupId, { forceNow });
- break;
+ return await processRefreshGroup(ws, pending.refreshGroupId, {
+ forceNow,
+ });
case PendingTaskType.Withdraw:
await processWithdrawalGroup(ws, pending.withdrawalGroupId, { forceNow });
break;
case PendingTaskType.ProposalDownload:
- await processDownloadProposal(ws, pending.proposalId, { forceNow });
- break;
+ return await processDownloadProposal(ws, pending.proposalId, {
+ forceNow,
+ });
case PendingTaskType.TipPickup:
- await processTip(ws, pending.tipId, { forceNow });
- break;
+ return await processTip(ws, pending.tipId, { forceNow });
case PendingTaskType.Pay:
- await processPurchasePay(ws, pending.proposalId, { forceNow });
- break;
+ return await processPurchasePay(ws, pending.proposalId, { forceNow });
case PendingTaskType.RefundQuery:
- await processPurchaseQueryRefund(ws, pending.proposalId, { forceNow });
- break;
+ return await processPurchaseQueryRefund(ws, pending.proposalId, {
+ forceNow,
+ });
case PendingTaskType.Recoup:
- await processRecoupGroup(ws, pending.recoupGroupId, { forceNow });
- break;
+ return await processRecoupGroupHandler(ws, pending.recoupGroupId, {
+ forceNow,
+ });
case PendingTaskType.ExchangeCheckRefresh:
- await autoRefresh(ws, pending.exchangeBaseUrl);
- break;
+ return await autoRefresh(ws, pending.exchangeBaseUrl);
case PendingTaskType.Deposit: {
- await processDepositGroup(ws, pending.depositGroupId, {
+ return await processDepositGroup(ws, pending.depositGroupId, {
forceNow,
});
- break;
}
case PendingTaskType.Backup:
- await processBackupForProvider(ws, pending.backupProviderBaseUrl);
- break;
+ return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
default:
- assertUnreachable(pending);
+ return assertUnreachable(pending);
+ }
+ throw Error("not reached");
+}
+
+export async function storeOperationError(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+ e: TalerErrorDetail,
+): Promise<void> {
+ await ws.db
+ .mktx((x) => ({ operationRetries: x.operationRetries }))
+ .runReadWrite(async (tx) => {
+ const retryRecord = await tx.operationRetries.get(pendingTaskId);
+ if (!retryRecord) {
+ return;
+ }
+ retryRecord.lastError = e;
+ retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
+ await tx.operationRetries.put(retryRecord);
+ });
+}
+
+export async function storeOperationFinished(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db
+ .mktx((x) => ({ operationRetries: x.operationRetries }))
+ .runReadWrite(async (tx) => {
+ await tx.operationRetries.delete(pendingTaskId);
+ });
+}
+
+export async function storeOperationPending(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db
+ .mktx((x) => ({ operationRetries: x.operationRetries }))
+ .runReadWrite(async (tx) => {
+ const retryRecord = await tx.operationRetries.get(pendingTaskId);
+ if (!retryRecord) {
+ return;
+ }
+ delete retryRecord.lastError;
+ retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
+ await tx.operationRetries.put(retryRecord);
+ });
+}
+
+/**
+ * Execute one operation based on the pending operation info record.
+ *
+ * Store success/failure result in the database.
+ */
+async function processOnePendingOperation(
+ ws: InternalWalletState,
+ pending: PendingTaskInfo,
+ forceNow = false,
+): Promise<void> {
+ logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`);
+ let maybeError: TalerErrorDetail | undefined;
+ try {
+ const resp = await callOperationHandler(ws, pending, forceNow);
+ switch (resp.type) {
+ case OperationAttemptResultType.Error:
+ return await storeOperationError(ws, pending.id, resp.errorDetail);
+ case OperationAttemptResultType.Finished:
+ return await storeOperationFinished(ws, pending.id);
+ case OperationAttemptResultType.Pending:
+ return await storeOperationPending(ws, pending.id);
+ case OperationAttemptResultType.Longpoll:
+ break;
+ }
+ } catch (e: any) {
+ if (
+ e instanceof TalerError &&
+ e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED)
+ ) {
+ logger.warn("operation processed resulted in error");
+ logger.warn(`error was: ${j2s(e.errorDetail)}`);
+ maybeError = e.errorDetail;
+ } else {
+ // This is a bug, as we expect pending operations to always
+ // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
+ // or return something.
+ logger.error("Uncaught exception", e);
+ ws.notify({
+ type: NotificationType.InternalError,
+ message: "uncaught exception",
+ exception: e,
+ });
+ maybeError = makeErrorDetail(
+ TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
+ {
+ stack: e.stack,
+ },
+ `unexpected exception (message: ${e.message})`,
+ );
+ }
}
}
@@ -317,18 +433,7 @@ export async function runPending(
if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
- try {
- await processOnePendingOperation(ws, p, forceNow);
- } catch (e) {
- if (e instanceof TalerError) {
- console.error(
- "Pending operation failed:",
- JSON.stringify(e.errorDetail, undefined, 2),
- );
- } else {
- console.error(e);
- }
- }
+ await processOnePendingOperation(ws, p, forceNow);
}
}
@@ -420,27 +525,7 @@ async function runTaskLoop(
if (!AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
- try {
- await processOnePendingOperation(ws, p);
- } catch (e) {
- if (
- e instanceof TalerError &&
- e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED)
- ) {
- logger.warn("operation processed resulted in error");
- logger.warn(`error was: ${j2s(e.errorDetail)}`);
- } else {
- // This is a bug, as we expect pending operations to always
- // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
- // or return something.
- logger.error("Uncaught exception", e);
- ws.notify({
- type: NotificationType.InternalError,
- message: "uncaught exception",
- exception: e,
- });
- }
- }
+ await processOnePendingOperation(ws, p);
ws.notify({
type: NotificationType.PendingOperationProcessed,
});
@@ -629,7 +714,7 @@ async function getExchangeDetailedInfo(
denominations: x.denominations,
}))
.runReadOnly(async (tx) => {
- const ex = await tx.exchanges.get(exchangeBaseurl)
+ const ex = await tx.exchanges.get(exchangeBaseurl);
const dp = ex?.detailsPointer;
if (!dp) {
return;
@@ -663,11 +748,11 @@ async function getExchangeDetailedInfo(
wireInfo: exchangeDetails.wireInfo,
},
denominations: denominations,
- }
+ };
});
if (!exchange) {
- throw Error(`exchange with base url "${exchangeBaseurl}" not found`)
+ throw Error(`exchange with base url "${exchangeBaseurl}" not found`);
}
const feesDescription: OperationMap<FeeDescription[]> = {
@@ -809,6 +894,7 @@ declare const __GIT_HASH__: string;
const VERSION = typeof __VERSION__ !== "undefined" ? __VERSION__ : "dev";
const GIT_HASH = typeof __GIT_HASH__ !== "undefined" ? __GIT_HASH__ : undefined;
+
/**
* Implementation of the "wallet-core" API.
*/
@@ -908,7 +994,7 @@ async function dispatchRequestInternal(
ws,
req.exchangeBaseUrl,
Amounts.parseOrThrow(req.amount),
- req.restrictAge
+ req.restrictAge,
);
}
case "getBalances": {
@@ -1106,7 +1192,7 @@ async function dispatchRequestInternal(
ws,
req.exchange,
amount,
- undefined
+ undefined,
);
const wres = await createManualWithdrawal(ws, {
amount: amount,