aboutsummaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2022-09-23 18:56:21 +0200
committerFlorian Dold <florian@dold.me>2022-09-23 20:38:26 +0200
commit72336b149b4c27715e4e2f7610ec4007ecccdbd9 (patch)
treef209095fc98fd47d4681499662276f4cde5486ba /packages
parent9811e19252ef859099fa5c16d703808f6c778a94 (diff)
wallet-core: do not block when accepting a manual withdrawal
Diffstat (limited to 'packages')
-rw-r--r--packages/taler-util/src/walletTypes.ts1
-rw-r--r--packages/taler-wallet-cli/package.json2
-rw-r--r--packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts15
-rw-r--r--packages/taler-wallet-core/package.json2
-rw-r--r--packages/taler-wallet-core/src/internal-wallet-state.ts16
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts80
-rw-r--r--packages/taler-wallet-core/src/wallet.ts71
7 files changed, 130 insertions, 57 deletions
diff --git a/packages/taler-util/src/walletTypes.ts b/packages/taler-util/src/walletTypes.ts
index 35cb14837..64daee776 100644
--- a/packages/taler-util/src/walletTypes.ts
+++ b/packages/taler-util/src/walletTypes.ts
@@ -1378,6 +1378,7 @@ export interface DepositGroupFees {
wire: AmountJson;
refresh: AmountJson;
}
+
export interface CreateDepositGroupRequest {
depositPaytoUri: string;
amount: AmountString;
diff --git a/packages/taler-wallet-cli/package.json b/packages/taler-wallet-cli/package.json
index d01ce9465..2316a7d48 100644
--- a/packages/taler-wallet-cli/package.json
+++ b/packages/taler-wallet-cli/package.json
@@ -46,7 +46,7 @@
"dependencies": {
"@gnu-taler/taler-util": "workspace:*",
"@gnu-taler/taler-wallet-core": "workspace:*",
- "axios": "^0.25.0",
+ "axios": "^0.27.2",
"source-map-support": "^0.5.21",
"tslib": "^2.3.1"
}
diff --git a/packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts b/packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts
index 6ae0e65e7..b691ae508 100644
--- a/packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts
+++ b/packages/taler-wallet-cli/src/integrationtests/test-withdrawal-manual.ts
@@ -20,6 +20,11 @@
import { GlobalTestState } from "../harness/harness.js";
import { createSimpleTestkudosEnvironment } from "../harness/helpers.js";
import { WalletApiOperation, BankApi } from "@gnu-taler/taler-wallet-core";
+import {
+ AbsoluteTime,
+ Duration,
+ TalerProtocolTimestamp,
+} from "@gnu-taler/taler-util";
/**
* Run test for basic, bank-integrated withdrawal.
@@ -38,6 +43,9 @@ export async function runTestWithdrawalManualTest(t: GlobalTestState) {
exchangeBaseUrl: exchange.baseUrl,
});
+ const tStart = AbsoluteTime.now();
+
+ // We expect this to return immediately.
const wres = await wallet.client.call(
WalletApiOperation.AcceptManualWithdrawal,
{
@@ -46,9 +54,14 @@ export async function runTestWithdrawalManualTest(t: GlobalTestState) {
},
);
+ // Check that the request did not go into long-polling.
+ const duration = AbsoluteTime.difference(tStart, AbsoluteTime.now());
+ if (duration.d_ms > 5 * 1000) {
+ throw Error("withdrawal took too long (longpolling issue)");
+ }
+
const reservePub: string = wres.reservePub;
- // Bug.
await BankApi.adminAddIncoming(bank, {
exchangeBankAccount,
amount: "TESTKUDOS:10",
diff --git a/packages/taler-wallet-core/package.json b/packages/taler-wallet-core/package.json
index 80e6c6d2c..c2b75d582 100644
--- a/packages/taler-wallet-core/package.json
+++ b/packages/taler-wallet-core/package.json
@@ -64,7 +64,7 @@
"@gnu-taler/idb-bridge": "workspace:*",
"@gnu-taler/taler-util": "workspace:*",
"@types/node": "^17.0.17",
- "axios": "^0.25.0",
+ "axios": "^0.27.2",
"big-integer": "^1.6.51",
"fflate": "^0.7.3",
"source-map-support": "^0.5.21",
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts
index ad6afe3c3..b8415a469 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -127,6 +127,12 @@ export interface RecoupOperations {
export type NotificationListener = (n: WalletNotification) => void;
+export interface ActiveLongpollInfo {
+ [opId: string]: {
+ cancel: () => void;
+ };
+}
+
/**
* Internal, shard wallet state that is used by the implementation
* of wallet operations.
@@ -135,12 +141,10 @@ export type NotificationListener = (n: WalletNotification) => void;
* as it's an opaque implementation detail.
*/
export interface InternalWalletState {
- memoProcessReserve: AsyncOpMemoMap<void>;
- memoMakePlanchet: AsyncOpMemoMap<void>;
- memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>;
- memoGetBalance: AsyncOpMemoSingle<BalancesResponse>;
- memoProcessRefresh: AsyncOpMemoMap<void>;
- memoProcessRecoup: AsyncOpMemoMap<void>;
+ /**
+ * Active longpoll operations.
+ */
+ activeLongpoll: ActiveLongpollInfo;
cryptoApi: TalerCryptoInterface;
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index cedb62361..4901fdc86 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -28,6 +28,7 @@ import {
Amounts,
AmountString,
BankWithdrawDetails,
+ CancellationToken,
canonicalizeBaseUrl,
codecForBankWithdrawalOperationPostResponse,
codecForReserveStatus,
@@ -106,7 +107,12 @@ import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "../versions.js";
-import { makeCoinAvailable, storeOperationPending } from "../wallet.js";
+import {
+ makeCoinAvailable,
+ runOperationWithErrorReporting,
+ storeOperationError,
+ storeOperationPending,
+} from "../wallet.js";
import {
getExchangeDetails,
getExchangePaytoUri,
@@ -962,6 +968,7 @@ export async function updateWithdrawalDenoms(
async function queryReserve(
ws: InternalWalletState,
withdrawalGroupId: string,
+ cancellationToken: CancellationToken,
): Promise<{ ready: boolean }> {
const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, {
withdrawalGroupId,
@@ -982,6 +989,7 @@ async function queryReserve(
const resp = await ws.http.get(reserveUrl.href, {
timeout: getReserveRequestTimeout(withdrawalGroup),
+ cancellationToken,
});
const result = await readSuccessResponseJsonOrErrorCode(
@@ -1044,6 +1052,16 @@ export async function processWithdrawalGroup(
throw Error(`withdrawal group ${withdrawalGroupId} not found`);
}
+ const retryTag = RetryTags.forWithdrawal(withdrawalGroup);
+
+ // We're already running!
+ if (ws.activeLongpoll[retryTag]) {
+ logger.info("withdrawal group already in long-polling, returning!");
+ return {
+ type: OperationAttemptResultType.Longpoll,
+ };
+ }
+
switch (withdrawalGroup.status) {
case WithdrawalGroupStatus.RegisteringBank:
await processReserveBankStatus(ws, withdrawalGroupId);
@@ -1051,15 +1069,45 @@ export async function processWithdrawalGroup(
forceNow: true,
});
case WithdrawalGroupStatus.QueryingStatus: {
- const res = await queryReserve(ws, withdrawalGroupId);
- if (res.ready) {
- return await processWithdrawalGroup(ws, withdrawalGroupId, {
- forceNow: true,
- });
- }
+ const doQueryAsync = async () => {
+ if (ws.stopped) {
+ logger.info("not long-polling reserve, wallet already stopped");
+ await storeOperationPending(ws, retryTag);
+ return;
+ }
+ const cts = CancellationToken.create();
+ let res: { ready: boolean } | undefined = undefined;
+ try {
+ ws.activeLongpoll[retryTag] = {
+ cancel: () => {
+ logger.info("cancel of reserve longpoll requested");
+ cts.cancel();
+ },
+ };
+ res = await queryReserve(ws, withdrawalGroupId, cts.token);
+ } catch (e) {
+ await storeOperationError(
+ ws,
+ retryTag,
+ getErrorDetailFromException(e),
+ );
+ return;
+ }
+ delete ws.activeLongpoll[retryTag];
+ logger.info(
+ `active longpoll keys (2) ${Object.keys(ws.activeLongpoll)}`,
+ );
+ if (!res.ready) {
+ await storeOperationPending(ws, retryTag);
+ }
+ ws.latch.trigger();
+ };
+ doQueryAsync();
+ logger.info(
+ "returning early from withdrawal for long-polling in background",
+ );
return {
- type: OperationAttemptResultType.Pending,
- result: undefined,
+ type: OperationAttemptResultType.Longpoll,
};
}
case WithdrawalGroupStatus.WaitConfirmBank: {
@@ -1912,10 +1960,16 @@ export async function createManualWithdrawal(
return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId);
});
- // Start withdrawal in the background.
- await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true }).catch(
- (err) => {
- logger.error("Processing withdrawal (after creation) failed:", err);
+ // Start withdrawal in the background (do not await!)
+ // FIXME: We could also interrupt the task look if it is waiting and
+ // rely on retry handling to re-process the withdrawal group.
+ runOperationWithErrorReporting(
+ ws,
+ RetryTags.forWithdrawal(withdrawalGroup),
+ async () => {
+ return await processWithdrawalGroup(ws, withdrawalGroupId, {
+ forceNow: true,
+ });
},
);
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 7890259f2..dd47d7ce3 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -64,38 +64,37 @@ import {
codecForSetWalletDeviceIdRequest,
codecForTestPayArgs,
codecForTrackDepositGroupRequest,
+ codecForTransactionByIdRequest,
codecForTransactionsRequest,
codecForWithdrawFakebankRequest,
codecForWithdrawTestBalance,
CoinDumpJson,
CoreApiResponse,
+ DenominationInfo,
Duration,
durationFromSpec,
durationMin,
ExchangeFullDetails,
+ ExchangeListItem,
ExchangesListResponse,
+ FeeDescription,
GetExchangeTosResult,
j2s,
KnownBankAccounts,
Logger,
ManualWithdrawalDetails,
NotificationType,
+ OperationMap,
parsePaytoUri,
- PaytoUri,
RefreshReason,
TalerErrorCode,
- URL,
- WalletNotification,
- WalletCoreVersion,
- ExchangeListItem,
- OperationMap,
- FeeDescription,
TalerErrorDetail,
- codecForTransactionByIdRequest,
- DenominationInfo,
KnownBankAccountsInfo,
codecForAddKnownBankAccounts,
codecForForgetKnownBankAccounts,
+ URL,
+ WalletCoreVersion,
+ WalletNotification,
} from "@gnu-taler/taler-util";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
import {
@@ -119,6 +118,7 @@ import {
TalerError,
} from "./errors.js";
import {
+ ActiveLongpollInfo,
ExchangeOperations,
InternalWalletState,
MerchantInfo,
@@ -235,7 +235,6 @@ import {
OperationAttemptResult,
OperationAttemptResultType,
RetryInfo,
- runOperationHandlerForResult,
} from "./util/retries.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
@@ -392,27 +391,21 @@ export async function storeOperationPending(
});
}
-/**
- * Execute one operation based on the pending operation info record.
- *
- * Store success/failure result in the database.
- */
-async function processOnePendingOperation(
+export async function runOperationWithErrorReporting(
ws: InternalWalletState,
- pending: PendingTaskInfo,
- forceNow = false,
+ opId: string,
+ f: () => Promise<OperationAttemptResult>,
): Promise<void> {
- logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`);
let maybeError: TalerErrorDetail | undefined;
try {
- const resp = await callOperationHandler(ws, pending, forceNow);
+ const resp = await f();
switch (resp.type) {
case OperationAttemptResultType.Error:
- return await storeOperationError(ws, pending.id, resp.errorDetail);
+ return await storeOperationError(ws, opId, resp.errorDetail);
case OperationAttemptResultType.Finished:
- return await storeOperationFinished(ws, pending.id);
+ return await storeOperationFinished(ws, opId);
case OperationAttemptResultType.Pending:
- return await storeOperationPending(ws, pending.id);
+ return await storeOperationPending(ws, opId);
case OperationAttemptResultType.Longpoll:
break;
}
@@ -421,7 +414,7 @@ async function processOnePendingOperation(
logger.warn("operation processed resulted in error");
logger.warn(`error was: ${j2s(e.errorDetail)}`);
maybeError = e.errorDetail;
- return await storeOperationError(ws, pending.id, maybeError!);
+ return await storeOperationError(ws, opId, maybeError!);
} else if (e instanceof Error) {
// This is a bug, as we expect pending operations to always
// do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
@@ -435,7 +428,7 @@ async function processOnePendingOperation(
},
`unexpected exception (message: ${e.message})`,
);
- return await storeOperationError(ws, pending.id, maybeError);
+ return await storeOperationError(ws, opId, maybeError);
} else {
logger.error("Uncaught exception, value is not even an error.");
maybeError = makeErrorDetail(
@@ -443,7 +436,7 @@ async function processOnePendingOperation(
{},
`unexpected exception (not even an error)`,
);
- return await storeOperationError(ws, pending.id, maybeError);
+ return await storeOperationError(ws, opId, maybeError);
}
}
}
@@ -460,7 +453,10 @@ export async function runPending(
if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
- await processOnePendingOperation(ws, p, forceNow);
+ await runOperationWithErrorReporting(ws, p.id, async () => {
+ logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
+ return await callOperationHandler(ws, p, forceNow);
+ });
}
}
@@ -563,7 +559,10 @@ async function runTaskLoop(
if (!AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
- await processOnePendingOperation(ws, p);
+ await runOperationWithErrorReporting(ws, p.id, async () => {
+ logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
+ return await callOperationHandler(ws, p);
+ });
ws.notify({
type: NotificationType.PendingOperationProcessed,
});
@@ -1613,13 +1612,10 @@ export class Wallet {
* This ties together all the operation implementations.
*/
class InternalWalletStateImpl implements InternalWalletState {
- memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
- memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
- memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> =
- new AsyncOpMemoSingle();
- memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle();
- memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
- memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
+ /**
+ * @see {@link InternalWalletState.activeLongpoll}
+ */
+ activeLongpoll: ActiveLongpollInfo = {};
cryptoApi: TalerCryptoInterface;
cryptoDispatcher: CryptoDispatcher;
@@ -1719,9 +1715,14 @@ class InternalWalletStateImpl implements InternalWalletState {
* Stop ongoing processing.
*/
stop(): void {
+ logger.info("stopping (at internal wallet state)");
this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoDispatcher.stop();
+ for (const key of Object.keys(this.activeLongpoll)) {
+ logger.info(`cancelling active longpoll ${key}`);
+ this.activeLongpoll[key].cancel();
+ }
}
async runUntilDone(