From 0a2540d676904e804544b95959bae223e42bc0c1 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Fri, 8 Mar 2024 10:30:23 +0100 Subject: wallet-core: notification-based waiting for dependent transactions instead of long-polling --- packages/taler-wallet-core/src/exchanges.ts | 12 ++- packages/taler-wallet-core/src/pay-merchant.ts | 2 + .../taler-wallet-core/src/pay-peer-pull-credit.ts | 2 + .../taler-wallet-core/src/pay-peer-push-credit.ts | 2 + .../taler-wallet-core/src/pay-peer-push-debit.ts | 5 +- packages/taler-wallet-core/src/refresh.ts | 77 ++++++++++++++++++- packages/taler-wallet-core/src/withdraw.ts | 86 ++++++++++++++++++++-- 7 files changed, 177 insertions(+), 9 deletions(-) (limited to 'packages/taler-wallet-core') diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts index 1fb3a8795..43ab8ac4e 100644 --- a/packages/taler-wallet-core/src/exchanges.ts +++ b/packages/taler-wallet-core/src/exchanges.ts @@ -999,6 +999,9 @@ async function internalWaitReadyExchange( exchangeBaseUrl: canonUrl, }); while (true) { + if (wex.cancellationToken.isCancelled) { + throw Error("cancelled"); + } logger.info(`waiting for ready exchange ${canonUrl}`); const { exchange, exchangeDetails, retryInfo, scopeInfo } = await wex.db.runReadOnlyTx( @@ -1128,14 +1131,13 @@ export async function fetchFreshExchange( forceUpdate: options.forceUpdate, }); - return waitReadyExchange(wex, canonUrl, options); + return await waitReadyExchange(wex, canonUrl, options); } async function waitReadyExchange( wex: WalletExecutionContext, canonUrl: string, options: { - cancellationToken?: CancellationToken; forceUpdate?: boolean; expectedMasterPub?: string; } = {}, @@ -1155,6 +1157,11 @@ async function waitReadyExchange( } }); + const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { + cancelNotif(); + exchangeNotifFlag.raise(); + }); + try { const res = await internalWaitReadyExchange( wex, @@ -1165,6 +1172,7 @@ async function waitReadyExchange( logger.info("done waiting for ready exchange"); return res; } finally { + unregisterOnCancelled(); cancelNotif(); } } diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts index 4f9c20c9e..872e554c9 100644 --- a/packages/taler-wallet-core/src/pay-merchant.ts +++ b/packages/taler-wallet-core/src/pay-merchant.ts @@ -1458,6 +1458,7 @@ async function waitProposalDownloaded( wex: WalletExecutionContext, proposalId: string, ): Promise { + // FIXME: This doesn't support cancellation yet const ctx = new PayMerchantTransactionContext(wex, proposalId); logger.info(`waiting for ${ctx.transactionId} to be downloaded`); @@ -1711,6 +1712,7 @@ async function waitPaymentResult( proposalId: string, waitSessionId?: string, ): Promise { + // FIXME: We don't support cancelletion yet! const ctx = new PayMerchantTransactionContext(wex, proposalId); wex.taskScheduler.startShepherdTask(ctx.taskId); diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts index de30f66d2..96d8f65a6 100644 --- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts @@ -84,6 +84,7 @@ import { WalletExecutionContext } from "./wallet.js"; import { getExchangeWithdrawalInfo, internalCreateWithdrawalGroup, + waitWithdrawalFinal, } from "./withdraw.js"; const logger = new Logger("pay-peer-pull-credit.ts"); @@ -584,6 +585,7 @@ async function handlePeerPullCreditWithdrawing( if (!pullIni.withdrawalGroupId) { throw Error("invalid db state (withdrawing, but no withdrawal group ID"); } + await waitWithdrawalFinal(wex, pullIni.withdrawalGroupId); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPullCredit, pursePub: pullIni.pursePub, diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts index ecc1e827f..281b3ff61 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts @@ -88,6 +88,7 @@ import { getExchangeWithdrawalInfo, internalPerformCreateWithdrawalGroup, internalPrepareCreateWithdrawalGroup, + waitWithdrawalFinal, } from "./withdraw.js"; const logger = new Logger("pay-peer-push-credit.ts"); @@ -789,6 +790,7 @@ async function handlePendingWithdrawing( if (!peerInc.withdrawalGroupId) { throw Error("invalid db state (withdrawing, but no withdrawal group ID"); } + await waitWithdrawalFinal(wex, peerInc.withdrawalGroupId); const transactionId = constructTransactionIdentifier({ tag: TransactionType.PeerPushCredit, peerPushCreditId: peerInc.peerPushCreditId, diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts index cf4e7b619..ab80888eb 100644 --- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts +++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts @@ -72,7 +72,7 @@ import { getTotalPeerPaymentCost, queryCoinInfosForSelection, } from "./pay-peer-common.js"; -import { createRefreshGroup } from "./refresh.js"; +import { createRefreshGroup, waitRefreshFinal } from "./refresh.js"; import { constructTransactionIdentifier, notifyTransition, @@ -682,6 +682,9 @@ async function processPeerPushDebitAbortingRefreshDeleted( tag: TransactionType.PeerPushDebit, pursePub: peerPushInitiation.pursePub, }); + if (peerPushInitiation.abortRefreshGroupId) { + await waitRefreshFinal(wex, peerPushInitiation.abortRefreshGroupId); + } const transitionInfo = await wex.db.runReadWriteTx( ["refreshGroups", "peerPushDebit"], async (tx) => { diff --git a/packages/taler-wallet-core/src/refresh.ts b/packages/taler-wallet-core/src/refresh.ts index c6a7b768d..7bf231870 100644 --- a/packages/taler-wallet-core/src/refresh.ts +++ b/packages/taler-wallet-core/src/refresh.ts @@ -21,6 +21,7 @@ import { Amounts, amountToPretty, assertUnreachable, + AsyncFlag, checkDbInvariant, codecForExchangeMeltResponse, codecForExchangeRevealResponse, @@ -61,7 +62,6 @@ import { readSuccessResponseJsonOrThrow, readUnexpectedResponseDetails, } from "@gnu-taler/taler-util/http"; -import { selectWithdrawalDenominations } from "./denomSelection.js"; import { constructTaskIdentifier, makeCoinAvailable, @@ -92,6 +92,7 @@ import { WalletDbReadOnlyTransaction, WalletDbReadWriteTransaction, } from "./db.js"; +import { selectWithdrawalDenominations } from "./denomSelection.js"; import { fetchFreshExchange } from "./exchanges.js"; import { constructTransactionIdentifier, @@ -1462,3 +1463,77 @@ export async function forceRefresh( refreshGroupId: res.refreshGroupId, }; } + +/** + * Wait until a refresh operation is final. + */ +export async function waitRefreshFinal( + wex: WalletExecutionContext, + refreshGroupId: string, +): Promise { + const ctx = new RefreshTransactionContext(wex, refreshGroupId); + wex.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. + const refreshNotifFlag = new AsyncFlag(); + // Raise purchaseNotifFlag whenever we get a notification + // about our refresh. + const cancelNotif = wex.ws.addNotificationListener((notif) => { + if ( + notif.type === NotificationType.TransactionStateTransition && + notif.transactionId === ctx.transactionId + ) { + refreshNotifFlag.raise(); + } + }); + const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { + cancelNotif(); + refreshNotifFlag.raise(); + }); + + try { + await internalWaitRefreshFinal(ctx, refreshNotifFlag); + } catch (e) { + unregisterOnCancelled(); + cancelNotif(); + } +} + +async function internalWaitRefreshFinal( + ctx: RefreshTransactionContext, + flag: AsyncFlag, +): Promise { + while (true) { + if (ctx.wex.cancellationToken.isCancelled) { + throw Error("cancelled"); + } + + // Check if refresh is final + const res = await ctx.wex.db.runReadOnlyTx( + ["refreshGroups", "operationRetries"], + async (tx) => { + return { + rg: await tx.refreshGroups.get(ctx.refreshGroupId), + }; + }, + ); + const { rg } = res; + if (!rg) { + // Must've been deleted, we consider that final. + return; + } + switch (rg.operationStatus) { + case RefreshOperationStatus.Failed: + case RefreshOperationStatus.Finished: + // Transaction is final + return; + case RefreshOperationStatus.Pending: + case RefreshOperationStatus.Suspended: + break; + } + + // Wait for the next transition + await flag.wait(); + flag.reset(); + } +} diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 0f70479a5..f27e9e132 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -94,10 +94,6 @@ import { readSuccessResponseJsonOrThrow, throwUnexpectedRequestError, } from "@gnu-taler/taler-util/http"; -import { - selectForcedWithdrawalDenominations, - selectWithdrawalDenominations, -} from "./denomSelection.js"; import { PendingTaskType, TaskIdStr, @@ -127,6 +123,10 @@ import { WithdrawalRecordType, timestampPreciseToDb, } from "./db.js"; +import { + selectForcedWithdrawalDenominations, + selectWithdrawalDenominations, +} from "./denomSelection.js"; import { isWithdrawableDenom } from "./denominations.js"; import { ReadyExchangeSummary, @@ -1935,7 +1935,8 @@ export async function getWithdrawalDetailsForUri( uri: talerWithdrawUri, }); } - }).finally(() => { + }) + .finally(() => { ongoingChecks[talerWithdrawUri] = false; }); } @@ -2703,6 +2704,7 @@ async function waitWithdrawalRegistered( wex: WalletExecutionContext, ctx: WithdrawTransactionContext, ): Promise { + // FIXME: Doesn't support cancellation yet // FIXME: We should use Symbol.dispose magic here for cleanup! const withdrawalNotifFlag = new AsyncFlag(); @@ -2914,3 +2916,77 @@ export async function createManualWithdrawal( transactionId: ctx.transactionId, }; } + +/** + * Wait until a refresh operation is final. + */ +export async function waitWithdrawalFinal( + wex: WalletExecutionContext, + withdrawalGroupId: string, +): Promise { + const ctx = new WithdrawTransactionContext(wex, withdrawalGroupId); + wex.taskScheduler.startShepherdTask(ctx.taskId); + + // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax. + const withdrawalNotifFlag = new AsyncFlag(); + // Raise purchaseNotifFlag whenever we get a notification + // about our refresh. + const cancelNotif = wex.ws.addNotificationListener((notif) => { + if ( + notif.type === NotificationType.TransactionStateTransition && + notif.transactionId === ctx.transactionId + ) { + withdrawalNotifFlag.raise(); + } + }); + const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { + cancelNotif(); + withdrawalNotifFlag.raise(); + }); + + try { + await internalWaitWithdrawalFinal(ctx, withdrawalNotifFlag); + } catch (e) { + unregisterOnCancelled(); + cancelNotif(); + } +} + +async function internalWaitWithdrawalFinal( + ctx: WithdrawTransactionContext, + flag: AsyncFlag, +): Promise { + while (true) { + if (ctx.wex.cancellationToken.isCancelled) { + throw Error("cancelled"); + } + + // Check if refresh is final + const res = await ctx.wex.db.runReadOnlyTx( + ["withdrawalGroups", "operationRetries"], + async (tx) => { + return { + wg: await tx.withdrawalGroups.get(ctx.withdrawalGroupId), + }; + }, + ); + const { wg } = res; + if (!wg) { + // Must've been deleted, we consider that final. + return; + } + switch (wg.status) { + case WithdrawalGroupStatus.AbortedBank: + case WithdrawalGroupStatus.AbortedExchange: + case WithdrawalGroupStatus.Done: + case WithdrawalGroupStatus.FailedAbortingBank: + case WithdrawalGroupStatus.FailedBankAborted: + // Transaction is final + return; + } + + // Wait for the next transition + await flag.wait(); + flag.reset(); + } +} -- cgit v1.2.3