aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-02-20 21:26:08 +0100
committerFlorian Dold <florian@dold.me>2023-02-20 21:26:08 +0100
commita49959d2c8bf82575c5d232217a33d91e7b008e8 (patch)
tree03dbbfd397a83dffaaa53580102f3d7108a7b70d
parente8b5f26ab6407fbcecfe464f5aba8d1db9e487cd (diff)
wallet-core: support long-polling for peer push credit
-rw-r--r--packages/taler-wallet-core/src/db.ts12
-rw-r--r--packages/taler-wallet-core/src/operations/common.ts41
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer.ts182
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts3
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts56
5 files changed, 216 insertions, 78 deletions
diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts
index cbf49c4ca..29e97cd90 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -1774,6 +1774,16 @@ export interface PeerPushPaymentInitiationRecord {
status: PeerPushPaymentInitiationStatus;
}
+export enum PeerPullPaymentInitiationStatus {
+ Initial = 10 /* ACTIVE_START */,
+ /**
+ * Purse created, waiting for the other party to accept the
+ * invoice and deposit money into it.
+ */
+ PurseCreated = 11 /* ACTIVE_START + 1 */,
+ PurseDeposited = 50 /* DORMANT_START */,
+}
+
export interface PeerPullPaymentInitiationRecord {
/**
* What exchange are we using for the payment request?
@@ -1817,7 +1827,7 @@ export interface PeerPullPaymentInitiationRecord {
/**
* Status of the peer pull payment initiation.
*/
- status: OperationStatus;
+ status: PeerPullPaymentInitiationStatus;
withdrawalGroupId: string | undefined;
}
diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts
index e5eda074c..3905eaf3e 100644
--- a/packages/taler-wallet-core/src/operations/common.ts
+++ b/packages/taler-wallet-core/src/operations/common.ts
@@ -21,11 +21,13 @@ import {
AgeRestriction,
AmountJson,
Amounts,
+ CancellationToken,
CoinRefreshRequest,
CoinStatus,
ExchangeEntryStatus,
ExchangeListItem,
ExchangeTosStatus,
+ getErrorDetailFromException,
j2s,
Logger,
OperationErrorInfo,
@@ -453,3 +455,42 @@ export function makeExchangeListItem(
lastUpdateErrorInfo,
};
}
+
+export interface LongpollResult {
+ ready: boolean;
+}
+
+export function runLongpollAsync(
+ ws: InternalWalletState,
+ retryTag: string,
+ reqFn: (ct: CancellationToken) => Promise<LongpollResult>,
+): void {
+ const asyncFn = async () => {
+ if (ws.stopped) {
+ logger.trace("not long-polling reserve, wallet already stopped");
+ await storeOperationPending(ws, retryTag);
+ return;
+ }
+ const cts = CancellationToken.create();
+ let res: { ready: boolean } | undefined = undefined;
+ try {
+ ws.activeLongpoll[retryTag] = {
+ cancel: () => {
+ logger.trace("cancel of reserve longpoll requested");
+ cts.cancel();
+ },
+ };
+ res = await reqFn(cts.token);
+ } catch (e) {
+ await storeOperationError(ws, retryTag, getErrorDetailFromException(e));
+ return;
+ } finally {
+ delete ws.activeLongpoll[retryTag];
+ }
+ if (!res.ready) {
+ await storeOperationPending(ws, retryTag);
+ }
+ ws.latch.trigger();
+ };
+ asyncFn();
+}
diff --git a/packages/taler-wallet-core/src/operations/pay-peer.ts b/packages/taler-wallet-core/src/operations/pay-peer.ts
index 4f65ec7ea..4dcc06076 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer.ts
@@ -69,12 +69,17 @@ import {
TransactionType,
UnblindedSignature,
WalletAccountMergeFlags,
+ codecOptional,
+ codecForTimestamp,
+ CancellationToken,
} from "@gnu-taler/taler-util";
import { SpendCoinDetails } from "../crypto/cryptoImplementation.js";
import {
DenominationRecord,
OperationStatus,
PeerPullPaymentIncomingStatus,
+ PeerPullPaymentInitiationRecord,
+ PeerPullPaymentInitiationStatus,
PeerPushPaymentCoinSelection,
PeerPushPaymentIncomingRecord,
PeerPushPaymentIncomingStatus,
@@ -86,12 +91,19 @@ import {
import { TalerError } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
+ LongpollResult,
makeTransactionId,
resetOperationTimeout,
+ runLongpollAsync,
runOperationWithErrorReporting,
spendCoins,
+ storeOperationPending,
} from "../operations/common.js";
-import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
+import {
+ readSuccessResponseJsonOrErrorCode,
+ readSuccessResponseJsonOrThrow,
+ throwUnexpectedRequestError,
+} from "@gnu-taler/taler-util/http";
import { checkDbInvariant } from "../util/invariants.js";
import {
constructTaskIdentifier,
@@ -622,11 +634,13 @@ export async function initiatePeerPushPayment(
interface ExchangePurseStatus {
balance: AmountString;
+ deposit_timestamp?: TalerProtocolTimestamp;
}
export const codecForExchangePurseStatus = (): Codec<ExchangePurseStatus> =>
buildCodecForObject<ExchangePurseStatus>()
.property("balance", codecForAmountString())
+ .property("deposit_timestamp", codecOptional(codecForTimestamp))
.build("ExchangePurseStatus");
export async function preparePeerPushCredit(
@@ -1255,6 +1269,87 @@ export async function preparePeerPullCredit(
};
}
+export async function queryPurseForPeerPullCredit(
+ ws: InternalWalletState,
+ pullIni: PeerPullPaymentInitiationRecord,
+ cancellationToken: CancellationToken,
+): Promise<LongpollResult> {
+ const purseDepositUrl = new URL(
+ `purses/${pullIni.pursePub}/merge`,
+ pullIni.exchangeBaseUrl,
+ );
+ purseDepositUrl.searchParams.set("timeout_ms", "30000");
+ logger.info(`querying purse status via ${purseDepositUrl.href}`);
+ const resp = await ws.http.get(purseDepositUrl.href, {
+ timeout: { d_ms: 60000 },
+ cancellationToken,
+ });
+
+ logger.info(`purse status code: HTTP ${resp.status}`);
+
+ const result = await readSuccessResponseJsonOrErrorCode(
+ resp,
+ codecForExchangePurseStatus(),
+ );
+
+ if (result.isError) {
+ logger.info(`got purse status error, EC=${result.talerErrorResponse.code}`);
+ if (resp.status === 404) {
+ return { ready: false };
+ } else {
+ throwUnexpectedRequestError(resp, result.talerErrorResponse);
+ }
+ }
+
+ if (!result.response.deposit_timestamp) {
+ logger.info("purse not ready yet (no deposit)");
+ return { ready: false };
+ }
+
+ const reserve = await ws.db
+ .mktx((x) => [x.reserves])
+ .runReadOnly(async (tx) => {
+ return await tx.reserves.get(pullIni.mergeReserveRowId);
+ });
+
+ if (!reserve) {
+ throw Error("reserve for peer pull credit not found in wallet DB");
+ }
+
+ await internalCreateWithdrawalGroup(ws, {
+ amount: Amounts.parseOrThrow(pullIni.amount),
+ wgInfo: {
+ withdrawalType: WithdrawalRecordType.PeerPullCredit,
+ contractTerms: pullIni.contractTerms,
+ contractPriv: pullIni.contractPriv,
+ },
+ forcedWithdrawalGroupId: pullIni.withdrawalGroupId,
+ exchangeBaseUrl: pullIni.exchangeBaseUrl,
+ reserveStatus: WithdrawalGroupStatus.QueryingStatus,
+ reserveKeyPair: {
+ priv: reserve.reservePriv,
+ pub: reserve.reservePub,
+ },
+ });
+
+ await ws.db
+ .mktx((x) => [x.peerPullPaymentInitiations])
+ .runReadWrite(async (tx) => {
+ const finPi = await tx.peerPullPaymentInitiations.get(pullIni.pursePub);
+ if (!finPi) {
+ logger.warn("peerPullPaymentInitiation not found anymore");
+ return;
+ }
+ if (finPi.status === PeerPullPaymentInitiationStatus.PurseCreated) {
+ finPi.status = PeerPullPaymentInitiationStatus.PurseDeposited;
+ }
+ await tx.peerPullPaymentInitiations.put(finPi);
+ });
+ return {
+ ready: true,
+ };
+}
+
export async function processPeerPullCredit(
ws: InternalWalletState,
pursePub: string,
@@ -1268,28 +1363,52 @@ export async function processPeerPullCredit(
throw Error("peer pull payment initiation not found in database");
}
- if (pullIni.status === OperationStatus.Finished) {
- logger.warn(
- "peer pull payment initiation is already finished, retrying withdrawal",
- );
+ const retryTag = constructTaskIdentifier({
+ tag: PendingTaskType.PeerPullInitiation,
+ pursePub,
+ });
- const withdrawalGroupId = pullIni.withdrawalGroupId;
+ switch (pullIni.status) {
+ case PeerPullPaymentInitiationStatus.PurseDeposited: {
+ // We implement this case so that the "retry" action on a peer-pull-credit transaction
+ // also retries the withdrawal task.
- if (withdrawalGroupId) {
- const taskId = constructTaskIdentifier({
- tag: PendingTaskType.Withdraw,
- withdrawalGroupId,
- });
- stopLongpolling(ws, taskId);
- await resetOperationTimeout(ws, taskId);
- await runOperationWithErrorReporting(ws, taskId, () =>
- processWithdrawalGroup(ws, withdrawalGroupId),
+ logger.warn(
+ "peer pull payment initiation is already finished, retrying withdrawal",
);
+
+ const withdrawalGroupId = pullIni.withdrawalGroupId;
+
+ if (withdrawalGroupId) {
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.Withdraw,
+ withdrawalGroupId,
+ });
+ stopLongpolling(ws, taskId);
+ await resetOperationTimeout(ws, taskId);
+ await runOperationWithErrorReporting(ws, taskId, () =>
+ processWithdrawalGroup(ws, withdrawalGroupId),
+ );
+ }
+ return {
+ type: OperationAttemptResultType.Finished,
+ result: undefined,
+ };
}
- return {
- type: OperationAttemptResultType.Finished,
- result: undefined,
- };
+ case PeerPullPaymentInitiationStatus.PurseCreated:
+ runLongpollAsync(ws, retryTag, async (cancellationToken) =>
+ queryPurseForPeerPullCredit(ws, pullIni, cancellationToken),
+ );
+ logger.trace(
+ "returning early from processPeerPullCredit for long-polling in background",
+ );
+ return {
+ type: OperationAttemptResultType.Longpoll,
+ };
+ case PeerPullPaymentInitiationStatus.Initial:
+ break;
+ default:
+ throw Error(`unknown PeerPullPaymentInitiationStatus ${pullIni.status}`);
}
const mergeReserve = await ws.db
@@ -1370,7 +1489,7 @@ export async function processPeerPullCredit(
if (!pi2) {
return;
}
- pi2.status = OperationStatus.Finished;
+ pi2.status = PeerPullPaymentInitiationStatus.PurseCreated;
await tx.peerPullPaymentInitiations.put(pi2);
});
@@ -1518,7 +1637,7 @@ export async function initiatePeerPullPayment(
pursePub: pursePair.pub,
mergePriv: mergePair.priv,
mergePub: mergePair.pub,
- status: OperationStatus.Pending,
+ status: PeerPullPaymentInitiationStatus.Initial,
contractTerms: contractTerms,
mergeTimestamp,
mergeReserveRowId: mergeReserveRowId,
@@ -1545,27 +1664,6 @@ export async function initiatePeerPullPayment(
return processPeerPullCredit(ws, pursePair.pub);
});
- // FIXME: Why do we create this only here?
- // What if the previous operation didn't succeed?
- // We actually should create it once we know the
- // money arrived (via long-polling).
-
- await internalCreateWithdrawalGroup(ws, {
- amount: instructedAmount,
- wgInfo: {
- withdrawalType: WithdrawalRecordType.PeerPullCredit,
- contractTerms,
- contractPriv: contractKeyPair.priv,
- },
- forcedWithdrawalGroupId: withdrawalGroupId,
- exchangeBaseUrl: exchangeBaseUrl,
- reserveStatus: WithdrawalGroupStatus.QueryingStatus,
- reserveKeyPair: {
- priv: mergeReserveInfo.reservePriv,
- pub: mergeReserveInfo.reservePub,
- },
- });
-
return {
talerUri: constructPayPullUri({
exchangeBaseUrl: exchangeBaseUrl,
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
index 2e3a5c9dc..458448b31 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -31,6 +31,7 @@ import {
PeerPushPaymentInitiationStatus,
PeerPullPaymentIncomingStatus,
PeerPushPaymentIncomingStatus,
+ PeerPullPaymentInitiationStatus,
} from "../db.js";
import {
PendingOperationsResponse,
@@ -363,7 +364,7 @@ async function gatherPeerPullInitiationPending(
resp: PendingOperationsResponse,
): Promise<void> {
await tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => {
- if (pi.status === OperationStatus.Finished) {
+ if (pi.status === PeerPullPaymentInitiationStatus.PurseDeposited) {
return;
}
const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index 5729b8458..aba2948cd 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -90,6 +90,7 @@ import { InternalWalletState } from "../internal-wallet-state.js";
import {
makeCoinAvailable,
makeExchangeListItem,
+ runLongpollAsync,
runOperationWithErrorReporting,
} from "../operations/common.js";
import { walletCoreDebugFlags } from "../util/debugFlags.js";
@@ -1022,8 +1023,7 @@ export interface WithdrawalGroupContext {
export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
- options: {
- } = {},
+ options: {} = {},
): Promise<OperationAttemptResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
const withdrawalGroup = await ws.db
@@ -1053,38 +1053,9 @@ export async function processWithdrawalGroup(
forceNow: true,
});
case WithdrawalGroupStatus.QueryingStatus: {
- const doQueryAsync = async () => {
- if (ws.stopped) {
- logger.trace("not long-polling reserve, wallet already stopped");
- await storeOperationPending(ws, retryTag);
- return;
- }
- const cts = CancellationToken.create();
- let res: { ready: boolean } | undefined = undefined;
- try {
- ws.activeLongpoll[retryTag] = {
- cancel: () => {
- logger.trace("cancel of reserve longpoll requested");
- cts.cancel();
- },
- };
- res = await queryReserve(ws, withdrawalGroupId, cts.token);
- } catch (e) {
- await storeOperationError(
- ws,
- retryTag,
- getErrorDetailFromException(e),
- );
- return;
- } finally {
- delete ws.activeLongpoll[retryTag];
- }
- if (!res.ready) {
- await storeOperationPending(ws, retryTag);
- }
- ws.latch.trigger();
- };
- doQueryAsync();
+ runLongpollAsync(ws, retryTag, (ct) => {
+ return queryReserve(ws, withdrawalGroupId, ct);
+ });
logger.trace(
"returning early from withdrawal for long-polling in background",
);
@@ -1832,6 +1803,14 @@ async function processReserveBankStatus(
}
}
+/**
+ * Create a withdrawal group.
+ *
+ * If a forcedWithdrawalGroupId is given and a
+ * withdrawal group with this ID already exists,
+ * the existing one is returned. No conflict checking
+ * of the other arguments is done in that case.
+ */
export async function internalCreateWithdrawalGroup(
ws: InternalWalletState,
args: {
@@ -1856,6 +1835,15 @@ export async function internalCreateWithdrawalGroup(
if (args.forcedWithdrawalGroupId) {
withdrawalGroupId = args.forcedWithdrawalGroupId;
+ const wgId = withdrawalGroupId;
+ const existingWg = await ws.db
+ .mktx((x) => [x.withdrawalGroups])
+ .runReadOnly(async (tx) => {
+ return tx.withdrawalGroups.get(wgId);
+ });
+ if (existingWg) {
+ return existingWg;
+ }
} else {
withdrawalGroupId = encodeCrock(getRandomBytes(32));
}