aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-02-20 20:14:37 +0100
committerFlorian Dold <florian@dold.me>2023-02-20 20:14:46 +0100
commit3daa4dbb3fc5199fb05d58b40c0d7c9ee287595e (patch)
treebdf3bcd9af71ce9cc77a8c4dc32ff0537654e83f
parent7bb81a008b7148cfd3fd656f858e4cbd755531ac (diff)
wallet-core: fix retryTransaction, improve tx/op identifier parsing/construction
-rw-r--r--packages/taler-wallet-core/src/db.ts4
-rw-r--r--packages/taler-wallet-core/src/operations/backup/index.ts10
-rw-r--r--packages/taler-wallet-core/src/operations/common.ts17
-rw-r--r--packages/taler-wallet-core/src/operations/deposits.ts1
-rw-r--r--packages/taler-wallet-core/src/operations/exchanges.ts4
-rw-r--r--packages/taler-wallet-core/src/operations/pay-merchant.ts33
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer.ts64
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts26
-rw-r--r--packages/taler-wallet-core/src/operations/refresh.ts4
-rw-r--r--packages/taler-wallet-core/src/operations/tip.ts4
-rw-r--r--packages/taler-wallet-core/src/operations/transactions.ts190
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts11
-rw-r--r--packages/taler-wallet-core/src/util/retries.ts75
-rw-r--r--packages/taler-wallet-core/src/wallet.ts21
14 files changed, 334 insertions, 130 deletions
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts
index a23ba0f76..cbf49c4ca 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -69,7 +69,7 @@ import {
StoreDescriptor,
StoreWithIndexes,
} from "./util/query.js";
-import { RetryInfo, RetryTags } from "./util/retries.js";
+import { RetryInfo, TaskIdentifiers } from "./util/retries.js";
/**
* This file contains the database schema of the Taler wallet together
@@ -1945,7 +1945,7 @@ export interface OperationRetryRecord {
* Unique identifier for the operation. Typically of
* the format `${opType}-${opUniqueKey}`
*
- * @see {@link RetryTags}
+ * @see {@link TaskIdentifiers}
*/
id: string;
diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts
index 3dae26087..59e99b505 100644
--- a/packages/taler-wallet-core/src/operations/backup/index.ts
+++ b/packages/taler-wallet-core/src/operations/backup/index.ts
@@ -99,7 +99,7 @@ import {
import {
OperationAttemptResult,
OperationAttemptResultType,
- RetryTags,
+ TaskIdentifiers,
scheduleRetryInTx,
} from "../../util/retries.js";
import { addAttentionRequest, removeAttentionRequest } from "../attention.js";
@@ -379,7 +379,7 @@ async function runBackupCycleForProvider(
logger.warn("backup provider not found anymore");
return;
}
- const opId = RetryTags.forBackup(prov);
+ const opId = TaskIdentifiers.forBackup(prov);
await scheduleRetryInTx(ws, tx, opId);
prov.shouldRetryFreshProposal = true;
prov.state = {
@@ -405,7 +405,7 @@ async function runBackupCycleForProvider(
logger.warn("backup provider not found anymore");
return;
}
- const opId = RetryTags.forBackup(prov);
+ const opId = TaskIdentifiers.forBackup(prov);
await scheduleRetryInTx(ws, tx, opId);
prov.currentPaymentProposalId = result.proposalId;
prov.shouldRetryFreshProposal = false;
@@ -479,7 +479,7 @@ async function runBackupCycleForProvider(
prov.lastBackupHash = encodeCrock(hash(backupEnc));
// FIXME: Allocate error code for this situation?
// FIXME: Add operation retry record!
- const opId = RetryTags.forBackup(prov);
+ const opId = TaskIdentifiers.forBackup(prov);
await scheduleRetryInTx(ws, tx, opId);
prov.state = {
tag: BackupProviderStateTag.Retrying,
@@ -920,7 +920,7 @@ export async function getBackupInfo(
.mktx((x) => [x.backupProviders, x.operationRetries])
.runReadOnly(async (tx) => {
return await tx.backupProviders.iter().mapAsync(async (bp) => {
- const opId = RetryTags.forBackup(bp);
+ const opId = TaskIdentifiers.forBackup(bp);
const retryRecord = await tx.operationRetries.get(opId);
return {
provider: bp,
diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts
index 35e6455bc..e5eda074c 100644
--- a/packages/taler-wallet-core/src/operations/common.ts
+++ b/packages/taler-wallet-core/src/operations/common.ts
@@ -218,6 +218,23 @@ export async function storeOperationError(
});
}
+export async function resetOperationTimeout(
+ ws: InternalWalletState,
+ pendingTaskId: string,
+): Promise<void> {
+ await ws.db
+ .mktx((x) => [x.operationRetries])
+ .runReadWrite(async (tx) => {
+ let retryRecord = await tx.operationRetries.get(pendingTaskId);
+ if (retryRecord) {
+ // Note that we don't reset the lastError, it should still be visible
+ // while the retry runs.
+ retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
+ await tx.operationRetries.put(retryRecord);
+ }
+ });
+}
+
export async function storeOperationPending(
ws: InternalWalletState,
pendingTaskId: string,
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts
index 1e696a1d6..22283b7a8 100644
--- a/packages/taler-wallet-core/src/operations/deposits.ts
+++ b/packages/taler-wallet-core/src/operations/deposits.ts
@@ -122,7 +122,6 @@ export async function processDepositGroup(
ws: InternalWalletState,
depositGroupId: string,
options: {
- forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<OperationAttemptResult> {
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts
index 08d30eac6..457344e06 100644
--- a/packages/taler-wallet-core/src/operations/exchanges.ts
+++ b/packages/taler-wallet-core/src/operations/exchanges.ts
@@ -73,7 +73,7 @@ import {
import {
OperationAttemptResult,
OperationAttemptResultType,
- RetryTags,
+ TaskIdentifiers,
unwrapOperationHandlerResultOrThrow,
} from "../util/retries.js";
import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js";
@@ -552,7 +552,7 @@ export async function updateExchangeFromUrl(
return unwrapOperationHandlerResultOrThrow(
await runOperationWithErrorReporting(
ws,
- RetryTags.forExchangeUpdateFromUrl(canonUrl),
+ TaskIdentifiers.forExchangeUpdateFromUrl(canonUrl),
() => updateExchangeFromUrlHandler(ws, canonUrl, options),
),
);
diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts
index 19eb40f3a..25153f9fb 100644
--- a/packages/taler-wallet-core/src/operations/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts
@@ -95,7 +95,7 @@ import {
TalerError,
TalerProtocolViolationError,
} from "@gnu-taler/taler-util";
-import { GetReadWriteAccess } from "../index.js";
+import { GetReadWriteAccess, PendingTaskType } from "../index.js";
import {
EXCHANGE_COINS_LOCK,
InternalWalletState,
@@ -119,8 +119,9 @@ import {
OperationAttemptResult,
OperationAttemptResultType,
RetryInfo,
- RetryTags,
+ TaskIdentifiers,
scheduleRetry,
+ constructTaskIdentifier,
} from "../util/retries.js";
import {
makeTransactionId,
@@ -360,7 +361,7 @@ export async function processDownloadProposal(
requestBody.token = proposal.claimToken;
}
- const opId = RetryTags.forPay(proposal);
+ const opId = TaskIdentifiers.forPay(proposal);
const retryRecord = await ws.db
.mktx((x) => [x.operationRetries])
.runReadOnly(async (tx) => {
@@ -1598,8 +1599,11 @@ export async function runPayForConfirmPay(
proposalId: string,
): Promise<ConfirmPayResult> {
logger.trace("processing proposal for confirmPay");
- const opId = RetryTags.byPaymentProposalId(proposalId);
- const res = await runOperationWithErrorReporting(ws, opId, async () => {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Purchase,
+ proposalId,
+ });
+ const res = await runOperationWithErrorReporting(ws, taskId, async () => {
return await processPurchasePay(ws, proposalId, { forceNow: true });
});
logger.trace(`processPurchasePay response type ${res.type}`);
@@ -1624,9 +1628,7 @@ export async function runPayForConfirmPay(
// We hide transient errors from the caller.
const opRetry = await ws.db
.mktx((x) => [x.operationRetries])
- .runReadOnly(async (tx) =>
- tx.operationRetries.get(RetryTags.byPaymentProposalId(proposalId)),
- );
+ .runReadOnly(async (tx) => tx.operationRetries.get(taskId));
return {
type: ConfirmPayResultType.Pending,
lastError: opRetry?.lastError,
@@ -1792,9 +1794,7 @@ export async function confirmPay(
export async function processPurchase(
ws: InternalWalletState,
proposalId: string,
- options: {
- forceNow?: boolean;
- } = {},
+ options: Record<any, never> = {},
): Promise<OperationAttemptResult> {
const purchase = await ws.db
.mktx((x) => [x.purchases])
@@ -1843,9 +1843,7 @@ export async function processPurchase(
export async function processPurchasePay(
ws: InternalWalletState,
proposalId: string,
- options: {
- forceNow?: boolean;
- } = {},
+ options: unknown = {},
): Promise<OperationAttemptResult> {
const purchase = await ws.db
.mktx((x) => [x.purchases])
@@ -1935,7 +1933,7 @@ export async function processPurchasePay(
handleInsufficientFunds(ws, proposalId, err).catch(async (e) => {
console.log("handling insufficient funds failed");
- await scheduleRetry(ws, RetryTags.forPay(purchase), {
+ await scheduleRetry(ws, TaskIdentifiers.forPay(purchase), {
code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
when: AbsoluteTime.now(),
message: "unexpected exception",
@@ -2830,7 +2828,10 @@ export async function abortPay(
proposalId: string,
cancelImmediately?: boolean,
): Promise<void> {
- const opId = RetryTags.byPaymentProposalId(proposalId);
+ const opId = constructTaskIdentifier({
+ tag: PendingTaskType.Purchase,
+ proposalId,
+ });
await ws.db
.mktx((x) => [
x.purchases,
diff --git a/packages/taler-wallet-core/src/operations/pay-peer.ts b/packages/taler-wallet-core/src/operations/pay-peer.ts
index ff01342f8..4f65ec7ea 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer.ts
@@ -87,15 +87,17 @@ import { TalerError } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
makeTransactionId,
+ resetOperationTimeout,
runOperationWithErrorReporting,
spendCoins,
} from "../operations/common.js";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import { checkDbInvariant } from "../util/invariants.js";
import {
+ constructTaskIdentifier,
OperationAttemptResult,
OperationAttemptResultType,
- RetryTags,
+ TaskIdentifiers,
} from "../util/retries.js";
import { getPeerPaymentBalanceDetailsInTx } from "./balance.js";
import { updateExchangeFromUrl } from "./exchanges.js";
@@ -103,7 +105,10 @@ import { getTotalRefreshCost } from "./refresh.js";
import {
getExchangeWithdrawalInfo,
internalCreateWithdrawalGroup,
+ processWithdrawalGroup,
} from "./withdraw.js";
+import { PendingTaskType } from "../pending-types.js";
+import { stopLongpolling } from "./transactions.js";
const logger = new Logger("operations/peer-to-peer.ts");
@@ -590,13 +595,14 @@ export async function initiatePeerPushPayment(
});
});
- await runOperationWithErrorReporting(
- ws,
- RetryTags.byPeerPushPaymentInitiationPursePub(pursePair.pub),
- async () => {
- return await processPeerPushInitiation(ws, pursePair.pub);
- },
- );
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPushInitiation,
+ pursePub: pursePair.pub,
+ });
+
+ await runOperationWithErrorReporting(ws, taskId, async () => {
+ return await processPeerPushInitiation(ws, pursePair.pub);
+ });
return {
contractPriv: contractKeyPair.priv,
@@ -951,7 +957,7 @@ export async function confirmPeerPushPayment(
await updateExchangeFromUrl(ws, peerInc.exchangeBaseUrl);
- const retryTag = RetryTags.forPeerPushCredit(peerInc);
+ const retryTag = TaskIdentifiers.forPeerPushCredit(peerInc);
await runOperationWithErrorReporting(ws, retryTag, () =>
processPeerPushCredit(ws, req.peerPushPaymentIncomingId),
@@ -1113,7 +1119,7 @@ export async function acceptIncomingPeerPullPayment(
await runOperationWithErrorReporting(
ws,
- RetryTags.forPeerPullPaymentDebit(ppi),
+ TaskIdentifiers.forPeerPullPaymentDebit(ppi),
async () => {
return processPeerPullDebit(ws, ppi.peerPullPaymentIncomingId);
},
@@ -1263,7 +1269,23 @@ export async function processPeerPullCredit(
}
if (pullIni.status === OperationStatus.Finished) {
- logger.warn("peer pull payment initiation is already finished");
+ logger.warn(
+ "peer pull payment initiation is already finished, retrying withdrawal",
+ );
+
+ const withdrawalGroupId = pullIni.withdrawalGroupId;
+
+ if (withdrawalGroupId) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId,
+ });
+ stopLongpolling(ws, taskId);
+ await resetOperationTimeout(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processWithdrawalGroup(ws, withdrawalGroupId),
+ );
+ }
return {
type: OperationAttemptResultType.Finished,
result: undefined,
@@ -1514,19 +1536,19 @@ export async function initiatePeerPullPayment(
// whether purse creation has failed, or does the client/
// check this asynchronously from the transaction status?
- await runOperationWithErrorReporting(
- ws,
- RetryTags.byPeerPullPaymentInitiationPursePub(pursePair.pub),
- async () => {
- return processPeerPullCredit(ws, pursePair.pub);
- },
- );
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullInitiation,
+ pursePub: pursePair.pub,
+ });
+
+ await runOperationWithErrorReporting(ws, taskId, async () => {
+ return processPeerPullCredit(ws, pursePair.pub);
+ });
// FIXME: Why do we create this only here?
// What if the previous operation didn't succeed?
-
- // FIXME: Use a pre-computed withdrawal group ID
- // so we don't create it multiple times.
+ // We actually should create it once we know the
+ // money arrived (via long-polling).
await internalCreateWithdrawalGroup(ws, {
amount: instructedAmount,
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
index 240c7ff65..2e3a5c9dc 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -39,7 +39,7 @@ import {
import { AbsoluteTime } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadOnlyAccess } from "../util/query.js";
-import { RetryTags } from "../util/retries.js";
+import { TaskIdentifiers } from "../util/retries.js";
import { GlobalIDB } from "@gnu-taler/idb-bridge";
function getPendingCommon(
@@ -74,7 +74,7 @@ async function gatherExchangePending(
): Promise<void> {
// FIXME: We should do a range query here based on the update time.
await tx.exchanges.iter().forEachAsync(async (exch) => {
- const opTag = RetryTags.forExchangeUpdate(exch);
+ const opTag = TaskIdentifiers.forExchangeUpdate(exch);
let opr = await tx.operationRetries.get(opTag);
const timestampDue =
opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate);
@@ -120,7 +120,7 @@ async function gatherRefreshPending(
if (r.timestampFinished) {
return;
}
- const opId = RetryTags.forRefresh(r);
+ const opId = TaskIdentifiers.forRefresh(r);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
@@ -158,7 +158,7 @@ async function gatherWithdrawalPending(
if (wsr.timestampFinish) {
return;
}
- const opTag = RetryTags.forWithdrawal(wsr);
+ const opTag = TaskIdentifiers.forWithdrawal(wsr);
let opr = await tx.operationRetries.get(opTag);
const now = AbsoluteTime.now();
if (!opr) {
@@ -208,7 +208,7 @@ async function gatherDepositPending(
deposited = false;
}
}
- const opId = RetryTags.forDeposit(dg);
+ const opId = TaskIdentifiers.forDeposit(dg);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@@ -239,7 +239,7 @@ async function gatherTipPending(
if (tip.pickedUpTimestamp) {
return;
}
- const opId = RetryTags.forTipPickup(tip);
+ const opId = TaskIdentifiers.forTipPickup(tip);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
if (tip.acceptedTimestamp) {
@@ -272,7 +272,7 @@ async function gatherPurchasePending(
await tx.purchases.indexes.byStatus
.iter(keyRange)
.forEachAsync(async (pr) => {
- const opId = RetryTags.forPay(pr);
+ const opId = TaskIdentifiers.forPay(pr);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
@@ -301,7 +301,7 @@ async function gatherRecoupPending(
if (rg.timestampFinished) {
return;
}
- const opId = RetryTags.forRecoup(rg);
+ const opId = TaskIdentifiers.forRecoup(rg);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@@ -325,7 +325,7 @@ async function gatherBackupPending(
resp: PendingOperationsResponse,
): Promise<void> {
await tx.backupProviders.iter().forEachAsync(async (bp) => {
- const opId = RetryTags.forBackup(bp);
+ const opId = TaskIdentifiers.forBackup(bp);
const retryRecord = await tx.operationRetries.get(opId);
if (bp.state.tag === BackupProviderStateTag.Ready) {
const timestampDue = AbsoluteTime.fromTimestamp(
@@ -366,7 +366,7 @@ async function gatherPeerPullInitiationPending(
if (pi.status === OperationStatus.Finished) {
return;
}
- const opId = RetryTags.forPeerPullPaymentInitiation(pi);
+ const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@@ -392,7 +392,7 @@ async function gatherPeerPullDebitPending(
if (pi.status === PeerPullPaymentIncomingStatus.Paid) {
return;
}
- const opId = RetryTags.forPeerPullPaymentDebit(pi);
+ const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@@ -418,7 +418,7 @@ async function gatherPeerPushInitiationPending(
if (pi.status === PeerPushPaymentInitiationStatus.PurseCreated) {
return;
}
- const opId = RetryTags.forPeerPushPaymentInitiation(pi);
+ const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@@ -447,7 +447,7 @@ async function gatherPeerPushCreditPending(
case PeerPushPaymentIncomingStatus.WithdrawalCreated:
return;
}
- const opId = RetryTags.forPeerPushCredit(pi);
+ const opId = TaskIdentifiers.forPeerPushCredit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts
index 773689635..2d406ec7d 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -734,9 +734,7 @@ async function refreshReveal(
export async function processRefreshGroup(
ws: InternalWalletState,
refreshGroupId: string,
- options: {
- forceNow?: boolean;
- } = {},
+ options: Record<string, never> = {},
): Promise<OperationAttemptResult> {
logger.info(`processing refresh group ${refreshGroupId}`);
diff --git a/packages/taler-wallet-core/src/operations/tip.ts b/packages/taler-wallet-core/src/operations/tip.ts
index ec7546992..28c3cda52 100644
--- a/packages/taler-wallet-core/src/operations/tip.ts
+++ b/packages/taler-wallet-core/src/operations/tip.ts
@@ -164,9 +164,7 @@ export async function prepareTip(
export async function processTip(
ws: InternalWalletState,
walletTipId: string,
- options: {
- forceNow?: boolean;
- } = {},
+ options: Record<string, never> = {},
): Promise<OperationAttemptResult> {
const tipRecord = await ws.db
.mktx((x) => [x.tips])
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts
index c03d2aa3d..1c2ce34bb 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -63,12 +63,15 @@ import {
PeerPullPaymentInitiationRecord,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
+import { PendingTaskType } from "../pending-types.js";
import { checkDbInvariant } from "../util/invariants.js";
-import { RetryTags } from "../util/retries.js";
+import { constructTaskIdentifier, TaskIdentifiers } from "../util/retries.js";
import {
makeTombstoneId,
makeTransactionId,
parseId,
+ resetOperationTimeout,
+ runOperationWithErrorReporting,
TombstoneTag,
} from "./common.js";
import { processDepositGroup } from "./deposits.js";
@@ -79,6 +82,7 @@ import {
extractContractData,
processPurchasePay,
} from "./pay-merchant.js";
+import { processPeerPullCredit } from "./pay-peer.js";
import { processRefreshGroup } from "./refresh.js";
import { processTip } from "./tip.js";
import {
@@ -152,7 +156,7 @@ export async function getTransactionById(
if (!withdrawalGroupRecord) throw Error("not found");
- const opId = RetryTags.forWithdrawal(withdrawalGroupRecord);
+ const opId = TaskIdentifiers.forWithdrawal(withdrawalGroupRecord);
const ort = await tx.operationRetries.get(opId);
if (
@@ -215,7 +219,7 @@ export async function getTransactionById(
Amounts.zeroOfAmount(contractData.amount),
);
- const payOpId = RetryTags.forPay(purchase);
+ const payOpId = TaskIdentifiers.forPay(purchase);
const payRetryRecord = await tx.operationRetries.get(payOpId);
return buildTransactionForPurchase(
@@ -237,7 +241,7 @@ export async function getTransactionById(
if (!tipRecord) throw Error("not found");
const retries = await tx.operationRetries.get(
- RetryTags.forTipPickup(tipRecord),
+ TaskIdentifiers.forTipPickup(tipRecord),
);
return buildTransactionForTip(tipRecord, retries);
});
@@ -250,7 +254,7 @@ export async function getTransactionById(
if (!depositRecord) throw Error("not found");
const retries = await tx.operationRetries.get(
- RetryTags.forDeposit(depositRecord),
+ TaskIdentifiers.forDeposit(depositRecord),
);
return buildTransactionForDeposit(depositRecord, retries);
});
@@ -359,11 +363,11 @@ export async function getTransactionById(
if (pushInc.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId);
if (wg) {
- const withdrawalOpId = RetryTags.forWithdrawal(wg);
+ const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
- const pushIncOpId = RetryTags.forPeerPushCredit(pushInc);
+ const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pushInc);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
return buildTransactionForPeerPushCredit(
@@ -394,11 +398,12 @@ export async function getTransactionById(
if (pushInc.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId);
if (wg) {
- const withdrawalOpId = RetryTags.forWithdrawal(wg);
+ const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
- const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pushInc);
+ const pushIncOpId =
+ TaskIdentifiers.forPeerPullPaymentInitiation(pushInc);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
return buildTransactionForPeerPullCredit(
@@ -1109,11 +1114,11 @@ export async function getTransactions(
if (pi.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId);
if (wg) {
- const withdrawalOpId = RetryTags.forWithdrawal(wg);
+ const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
- const pushIncOpId = RetryTags.forPeerPushCredit(pi);
+ const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pi);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
checkDbInvariant(!!ct);
@@ -1142,11 +1147,11 @@ export async function getTransactions(
if (pi.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId);
if (wg) {
- const withdrawalOpId = RetryTags.forWithdrawal(wg);
+ const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
- const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pi);
+ const pushIncOpId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
checkDbInvariant(!!ct);
@@ -1166,7 +1171,7 @@ export async function getTransactions(
return;
}
let required = false;
- const opId = RetryTags.forRefresh(rg);
+ const opId = TaskIdentifiers.forRefresh(rg);
if (transactionsRequest?.includeRefreshes) {
required = true;
} else if (rg.operationStatus !== RefreshOperationStatus.Finished) {
@@ -1195,7 +1200,7 @@ export async function getTransactions(
return;
}
- const opId = RetryTags.forWithdrawal(wsr);
+ const opId = TaskIdentifiers.forWithdrawal(wsr);
const ort = await tx.operationRetries.get(opId);
switch (wsr.wgInfo.withdrawalType) {
@@ -1238,7 +1243,7 @@ export async function getTransactions(
if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
return;
}
- const opId = RetryTags.forDeposit(dg);
+ const opId = TaskIdentifiers.forDeposit(dg);
const retryRecord = await tx.operationRetries.get(opId);
transactions.push(buildTransactionForDeposit(dg, retryRecord));
@@ -1309,7 +1314,7 @@ export async function getTransactions(
);
});
- const payOpId = RetryTags.forPay(purchase);
+ const payOpId = TaskIdentifiers.forPay(purchase);
const payRetryRecord = await tx.operationRetries.get(payOpId);
transactions.push(
await buildTransactionForPurchase(
@@ -1333,7 +1338,7 @@ export async function getTransactions(
if (!tipRecord.acceptedTimestamp) {
return;
}
- const opId = RetryTags.forTipPickup(tipRecord);
+ const opId = TaskIdentifiers.forTipPickup(tipRecord);
const retryRecord = await tx.operationRetries.get(opId);
transactions.push(buildTransactionForTip(tipRecord, retryRecord));
});
@@ -1359,6 +1364,77 @@ export async function getTransactions(
return { transactions: [...txNotPending, ...txPending] };
}
+export type ParsedTransactionIdentifier =
+ | { tag: TransactionType.Deposit; depositGroupId: string }
+ | { tag: TransactionType.Payment; proposalId: string }
+ | { tag: TransactionType.PeerPullDebit; peerPullPaymentIncomingId: string }
+ | { tag: TransactionType.PeerPullCredit; pursePub: string }
+ | { tag: TransactionType.PeerPushCredit; peerPushPaymentIncomingId: string }
+ | { tag: TransactionType.PeerPushDebit; pursePub: string }
+ | { tag: TransactionType.Refresh; refreshGroupId: string }
+ | { tag: TransactionType.Refund; proposalId: string; executionTime: string }
+ | { tag: TransactionType.Tip; walletTipId: string }
+ | { tag: TransactionType.Withdrawal; withdrawalGroupId: string };
+
+/**
+ * Parse a transaction identifier string into a typed, structured representation.
+ */
+export function parseTransactionIdentifier(
+ transactionId: string,
+): ParsedTransactionIdentifier | undefined {
+ const { type, args: rest } = parseId("any", transactionId);
+
+ switch (type) {
+ case TransactionType.Deposit:
+ return { tag: TransactionType.Deposit, depositGroupId: rest[0] };
+ case TransactionType.Payment:
+ return { tag: TransactionType.Payment, proposalId: rest[0] };
+ case TransactionType.PeerPullCredit:
+ return { tag: TransactionType.PeerPullCredit, pursePub: rest[0] };
+ case TransactionType.PeerPullDebit:
+ return {
+ tag: TransactionType.PeerPullDebit,
+ peerPullPaymentIncomingId: rest[0],
+ };
+ case TransactionType.PeerPushCredit:
+ return {
+ tag: TransactionType.PeerPushCredit,
+ peerPushPaymentIncomingId: rest[0],
+ };
+ case TransactionType.PeerPushDebit:
+ return { tag: TransactionType.PeerPushDebit, pursePub: rest[0] };
+ case TransactionType.Refresh:
+ return { tag: TransactionType.Refresh, refreshGroupId: rest[0] };
+ case TransactionType.Refund:
+ return {
+ tag: TransactionType.Refund,
+ proposalId: rest[0],
+ executionTime: rest[1],
+ };
+ case TransactionType.Tip:
+ return {
+ tag: TransactionType.Tip,
+ walletTipId: rest[0],
+ };
+ case TransactionType.Withdrawal:
+ return {
+ tag: TransactionType.Withdrawal,
+ withdrawalGroupId: rest[0],
+ };
+ default:
+ return undefined;
+ }
+}
+
+export function stopLongpolling(ws: InternalWalletState, taskId: string) {
+ const longpoll = ws.activeLongpoll[taskId];
+ if (longpoll) {
+ logger.info(`cancelling long-polling for ${taskId}`);
+ longpoll.cancel();
+ delete ws.activeLongpoll[taskId];
+ }
+}
+
/**
* Immediately retry the underlying operation
* of a transaction.
@@ -1369,34 +1445,86 @@ export async function retryTransaction(
): Promise<void> {
logger.info(`retrying transaction ${transactionId}`);
- const { type, args: rest } = parseId("any", transactionId);
+ const parsedTx = parseTransactionIdentifier(transactionId);
- switch (type) {
+ if (!parsedTx) {
+ throw Error("invalid transaction identifier");
+ }
+
+ // FIXME: We currently don't cancel active long-polling tasks here.
+
+ switch (parsedTx.tag) {
+ case TransactionType.PeerPullCredit: {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullInitiation,
+ pursePub: parsedTx.pursePub,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processPeerPullCredit(ws, parsedTx.pursePub),
+ );
+ break;
+ }
case TransactionType.Deposit: {
- const depositGroupId = rest[0];
- processDepositGroup(ws, depositGroupId, {
- forceNow: true,
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Deposit,
+ depositGroupId: parsedTx.depositGroupId,
});
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processDepositGroup(ws, parsedTx.depositGroupId),
+ );
break;
}
case TransactionType.Withdrawal: {
- const withdrawalGroupId = rest[0];
- await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true });
+ // FIXME: Abort current long-poller!
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId: parsedTx.withdrawalGroupId,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processWithdrawalGroup(ws, parsedTx.withdrawalGroupId),
+ );
break;
}
case TransactionType.Payment: {
- const proposalId = rest[0];
- await processPurchasePay(ws, proposalId, { forceNow: true });
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Purchase,
+ proposalId: parsedTx.proposalId,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processPurchasePay(ws, parsedTx.proposalId),
+ );
break;
}
case TransactionType.Tip: {
- const walletTipId = rest[0];
- await processTip(ws, walletTipId, { forceNow: true });
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.TipPickup,
+ walletTipId: parsedTx.walletTipId,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processTip(ws, parsedTx.walletTipId),
+ );
break;
}
case TransactionType.Refresh: {
- const refreshGroupId = rest[0];
- await processRefreshGroup(ws, refreshGroupId, { forceNow: true });
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Refresh,
+ refreshGroupId: parsedTx.refreshGroupId,
+ });
+ await resetOperationTimeout(ws, taskId);
+ stopLongpolling(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processRefreshGroup(ws, parsedTx.refreshGroupId),
+ );
break;
}
default:
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index 9dfd72678..5729b8458 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -109,7 +109,7 @@ import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
import {
OperationAttemptResult,
OperationAttemptResultType,
- RetryTags,
+ TaskIdentifiers,
} from "../util/retries.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@@ -1023,7 +1023,6 @@ export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
options: {
- forceNow?: boolean;
} = {},
): Promise<OperationAttemptResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
@@ -1037,10 +1036,10 @@ export async function processWithdrawalGroup(
throw Error(`withdrawal group ${withdrawalGroupId} not found`);
}
- const retryTag = RetryTags.forWithdrawal(withdrawalGroup);
+ const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup);
// We're already running!
- if (ws.activeLongpoll[retryTag] && !options.forceNow) {
+ if (ws.activeLongpoll[retryTag]) {
logger.info("withdrawal group already in long-polling, returning!");
return {
type: OperationAttemptResultType.Longpoll,
@@ -1532,7 +1531,7 @@ export async function getWithdrawalDetailsForUri(
.iter(r.baseUrl)
.toArray();
const retryRecord = await tx.operationRetries.get(
- RetryTags.forExchangeUpdate(r),
+ TaskIdentifiers.forExchangeUpdate(r),
);
if (exchangeDetails && denominations) {
exchanges.push(
@@ -2087,7 +2086,7 @@ export async function createManualWithdrawal(
// rely on retry handling to re-process the withdrawal group.
runOperationWithErrorReporting(
ws,
- RetryTags.forWithdrawal(withdrawalGroup),
+ TaskIdentifiers.forWithdrawal(withdrawalGroup),
async () => {
return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,
diff --git a/packages/taler-wallet-core/src/util/retries.ts b/packages/taler-wallet-core/src/util/retries.ts
index 5744bf8fe..5b6645924 100644
--- a/packages/taler-wallet-core/src/util/retries.ts
+++ b/packages/taler-wallet-core/src/util/retries.ts
@@ -46,6 +46,7 @@ import { TalerError } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType } from "../pending-types.js";
import { GetReadWriteAccess } from "./query.js";
+import { assertUnreachable } from "./assertUnreachable.js";
const logger = new Logger("util/retries.ts");
@@ -176,7 +177,66 @@ export namespace RetryInfo {
}
}
-export namespace RetryTags {
+/**
+ * Parsed representation of task identifiers.
+ */
+export type ParsedTaskIdentifier =
+ | {
+ tag: PendingTaskType.Withdraw;
+ withdrawalGroupId: string;
+ }
+ | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string }
+ | { tag: PendingTaskType.Backup; backupProviderBaseUrl: string }
+ | { tag: PendingTaskType.Deposit; depositGroupId: string }
+ | { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string }
+ | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string }
+ | { tag: PendingTaskType.PeerPullDebit; peerPullPaymentIncomingId: string }
+ | { tag: PendingTaskType.PeerPullInitiation; pursePub: string }
+ | { tag: PendingTaskType.PeerPushCredit; peerPushPaymentIncomingId: string }
+ | { tag: PendingTaskType.PeerPushInitiation; pursePub: string }
+ | { tag: PendingTaskType.Purchase; proposalId: string }
+ | { tag: PendingTaskType.Recoup; recoupGroupId: string }
+ | { tag: PendingTaskType.TipPickup; walletTipId: string }
+ | { tag: PendingTaskType.Refresh; refreshGroupId: string };
+
+export function parseTaskIdentifier(x: string): ParsedTaskIdentifier {
+ throw Error("not yet implemented");
+}
+
+export function constructTaskIdentifier(p: ParsedTaskIdentifier): string {
+ switch (p.tag) {
+ case PendingTaskType.Backup:
+ return `${p.tag}:${p.backupProviderBaseUrl}`;
+ case PendingTaskType.Deposit:
+ return `${p.tag}:${p.depositGroupId}`;
+ case PendingTaskType.ExchangeCheckRefresh:
+ return `${p.tag}:${p.exchangeBaseUrl}`;
+ case PendingTaskType.ExchangeUpdate:
+ return `${p.tag}:${p.exchangeBaseUrl}`;
+ case PendingTaskType.PeerPullDebit:
+ return `${p.tag}:${p.peerPullPaymentIncomingId}`;
+ case PendingTaskType.PeerPushCredit:
+ return `${p.tag}:${p.peerPushPaymentIncomingId}`;
+ case PendingTaskType.PeerPullInitiation:
+ return `${p.tag}:${p.pursePub}`;
+ case PendingTaskType.PeerPushInitiation:
+ return `${p.tag}:${p.pursePub}`;
+ case PendingTaskType.Purchase:
+ return `${p.tag}:${p.proposalId}`;
+ case PendingTaskType.Recoup:
+ return `${p.tag}:${p.recoupGroupId}`;
+ case PendingTaskType.Refresh:
+ return `${p.tag}:${p.refreshGroupId}`;
+ case PendingTaskType.TipPickup:
+ return `${p.tag}:${p.walletTipId}`;
+ case PendingTaskType.Withdraw:
+ return `${p.tag}:${p.withdrawalGroupId}`;
+ default:
+ assertUnreachable(p);
+ }
+}
+
+export namespace TaskIdentifiers {
export function forWithdrawal(wg: WithdrawalGroupRecord): string {
return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}`;
}
@@ -227,19 +287,6 @@ export namespace RetryTags {
): string {
return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushPaymentIncomingId}`;
}
- export function byPaymentProposalId(proposalId: string): string {
- return `${PendingTaskType.Purchase}:${proposalId}`;
- }
- export function byPeerPushPaymentInitiationPursePub(
- pursePub: string,
- ): string {
- return `${PendingTaskType.PeerPushInitiation}:${pursePub}`;
- }
- export function byPeerPullPaymentInitiationPursePub(
- pursePub: string,
- ): string {
- return `${PendingTaskType.PeerPullInitiation}:${pursePub}`;
- }
}
export async function scheduleRetryInTx(
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index a036be86c..47724efdc 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -265,7 +265,7 @@ import {
GetReadOnlyAccess,
GetReadWriteAccess,
} from "./util/query.js";
-import { OperationAttemptResult, RetryTags } from "./util/retries.js";
+import { OperationAttemptResult, TaskIdentifiers } from "./util/retries.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@@ -306,17 +306,15 @@ async function callOperationHandler(
forceNow,
});
case PendingTaskType.Refresh:
- return await processRefreshGroup(ws, pending.refreshGroupId, {
- forceNow,
- });
+ return await processRefreshGroup(ws, pending.refreshGroupId);
case PendingTaskType.Withdraw:
return await processWithdrawalGroup(ws, pending.withdrawalGroupId, {
forceNow,
});
case PendingTaskType.TipPickup:
- return await processTip(ws, pending.tipId, { forceNow });
+ return await processTip(ws, pending.tipId);
case PendingTaskType.Purchase:
- return await processPurchase(ws, pending.proposalId, { forceNow });
+ return await processPurchase(ws, pending.proposalId);
case PendingTaskType.Recoup:
return await processRecoupGroupHandler(ws, pending.recoupGroupId, {
forceNow,
@@ -324,9 +322,7 @@ async function callOperationHandler(
case PendingTaskType.ExchangeCheckRefresh:
return await autoRefresh(ws, pending.exchangeBaseUrl);
case PendingTaskType.Deposit: {
- return await processDepositGroup(ws, pending.depositGroupId, {
- forceNow,
- });
+ return await processDepositGroup(ws, pending.depositGroupId);
}
case PendingTaskType.Backup:
return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
@@ -691,7 +687,7 @@ async function getExchanges(
for (const r of exchangeRecords) {
const exchangeDetails = await getExchangeDetails(tx, r.baseUrl);
const opRetryRecord = await tx.operationRetries.get(
- RetryTags.forExchangeUpdate(r),
+ TaskIdentifiers.forExchangeUpdate(r),
);
exchanges.push(
makeExchangeListItem(r, exchangeDetails, opRetryRecord?.lastError),
@@ -1285,9 +1281,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
RefreshReason.Manual,
);
});
- processRefreshGroup(ws, refreshGroupId.refreshGroupId, {
- forceNow: true,
- }).catch((x) => {
+ processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((x) => {
logger.error(x);
});
return {
@@ -1753,6 +1747,7 @@ class InternalWalletStateImpl implements InternalWalletState {
for (const key of Object.keys(this.activeLongpoll)) {
logger.trace(`cancelling active longpoll ${key}`);
this.activeLongpoll[key].cancel();
+ delete this.activeLongpoll[key];
}
}