From f93ab03a1b946af441e35b9c057f129d25311273 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Sat, 1 Jul 2023 00:52:14 +0200 Subject: wallet-core: get rid of internal runUntilDone usages --- .../taler-wallet-core/src/internal-wallet-state.ts | 6 +-- .../src/operations/pay-peer-pull-debit.ts | 2 - .../taler-wallet-core/src/operations/recoup.ts | 23 ++++---- .../taler-wallet-core/src/operations/refresh.ts | 11 +--- .../taler-wallet-core/src/operations/testing.ts | 61 ++++++++++++++++------ packages/taler-wallet-core/src/wallet.ts | 42 +++++++-------- 6 files changed, 79 insertions(+), 66 deletions(-) (limited to 'packages/taler-wallet-core/src') diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts index a2ca34a86..742af89a8 100644 --- a/packages/taler-wallet-core/src/internal-wallet-state.ts +++ b/packages/taler-wallet-core/src/internal-wallet-state.ts @@ -145,6 +145,8 @@ export interface ActiveLongpollInfo { }; } +export type CancelFn = () => void; + /** * Internal, shared wallet state that is used by the implementation * of wallet operations. @@ -206,7 +208,7 @@ export interface InternalWalletState { notify(n: WalletNotification): void; - addNotificationListener(f: (n: WalletNotification) => void): void; + addNotificationListener(f: (n: WalletNotification) => void): CancelFn; /** * Stop ongoing processing. @@ -219,8 +221,6 @@ export interface InternalWalletState { */ runSequentialized(tokens: string[], f: () => Promise): Promise; - runUntilDone(req?: { maxRetries?: number }): Promise; - /** * Ensure that a task loop is currently running. * Starts one if no task loop is running. diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts index 1aa332439..eca3bc91b 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts @@ -64,9 +64,7 @@ import { checkLogicInvariant } from "../util/invariants.js"; import { TaskRunResult, TaskRunResultType, - TaskIdentifiers, constructTaskIdentifier, - runTaskWithErrorReporting, spendCoins, } from "./common.js"; import { diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts index c8c766d1b..dea2d4b16 100644 --- a/packages/taler-wallet-core/src/operations/recoup.ts +++ b/packages/taler-wallet-core/src/operations/recoup.ts @@ -26,36 +26,34 @@ */ import { Amounts, - codecForRecoupConfirmation, - codecForReserveStatus, CoinStatus, - encodeCrock, - getRandomBytes, - j2s, Logger, - NotificationType, RefreshReason, - TalerProtocolTimestamp, TalerPreciseTimestamp, URL, + codecForRecoupConfirmation, + codecForReserveStatus, + encodeCrock, + getRandomBytes, + j2s, } from "@gnu-taler/taler-util"; +import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { CoinRecord, CoinSourceType, RecoupGroupRecord, RefreshCoinSource, WalletStoresV1, + WithdrawCoinSource, WithdrawalGroupStatus, WithdrawalRecordType, - WithdrawCoinSource, } from "../db.js"; import { InternalWalletState } from "../internal-wallet-state.js"; -import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; import { checkDbInvariant } from "../util/invariants.js"; import { GetReadWriteAccess } from "../util/query.js"; -import { createRefreshGroup, processRefreshGroup } from "./refresh.js"; -import { internalCreateWithdrawalGroup } from "./withdraw.js"; import { TaskRunResult } from "./common.js"; +import { createRefreshGroup } from "./refresh.js"; +import { internalCreateWithdrawalGroup } from "./withdraw.js"; const logger = new Logger("operations/recoup.ts"); @@ -402,9 +400,6 @@ export async function processRecoupGroup( rg2.scheduleRefreshCoins, RefreshReason.Recoup, ); - processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((e) => { - logger.error(`error while refreshing after recoup ${e}`); - }); } await tx.recoupGroups.put(rg2); }); diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 65346a923..fd6281eda 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -810,9 +810,9 @@ export async function processRefreshGroup( }), ); try { - logger.trace("waiting for refreshes"); + logger.info("waiting for refreshes"); await Promise.all(ps); - logger.trace("refresh finished"); + logger.info("refresh group finished"); } catch (e) { logger.warn("process refresh sessions got exception"); logger.warn(`exception: ${e}`); @@ -1066,13 +1066,6 @@ export async function createRefreshGroup( logger.info(`created refresh group ${refreshGroupId}`); - processRefreshGroup(ws, refreshGroupId).catch((e) => { - if (e instanceof CryptoApiStoppedError) { - return; - } - logger.warn(`processing refresh group ${refreshGroupId} failed: ${e}`); - }); - return { refreshGroupId, }; diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts index 2a3584a0a..8c84702b8 100644 --- a/packages/taler-wallet-core/src/operations/testing.ts +++ b/packages/taler-wallet-core/src/operations/testing.ts @@ -68,7 +68,7 @@ import { } from "./pay-peer-push-credit.js"; import { initiatePeerPushDebit } from "./pay-peer-push-debit.js"; import { OpenedPromise, openPromise } from "../index.js"; -import { getTransactionById } from "./transactions.js"; +import { getTransactionById, getTransactions } from "./transactions.js"; const logger = new Logger("operations/testing.ts"); @@ -378,7 +378,7 @@ export async function runIntegrationTest( bankAccessApiBaseUrl: args.bankAccessApiBaseUrl, exchangeBaseUrl: args.exchangeBaseUrl, }); - await ws.runUntilDone(); + await waitUntilDone(ws); logger.info("done withdrawing test balance"); const balance = await getBalances(ws); @@ -393,7 +393,7 @@ export async function runIntegrationTest( await makePayment(ws, myMerchant, args.amountToSpend, "hello world"); // Wait until the refresh is done - await ws.runUntilDone(); + await waitUntilDone(ws); logger.trace("withdrawing test balance for refund"); const withdrawAmountTwo = Amounts.parseOrThrow(`${currency}:18`); @@ -408,7 +408,7 @@ export async function runIntegrationTest( }); // Wait until the withdraw is done - await ws.runUntilDone(); + await waitUntilDone(ws); const { orderId: refundOrderId } = await makePayment( ws, @@ -432,7 +432,7 @@ export async function runIntegrationTest( logger.trace("integration test: applied refund"); // Wait until the refund is done - await ws.runUntilDone(); + await waitUntilDone(ws); logger.trace("integration test: making payment after refund"); @@ -445,21 +445,52 @@ export async function runIntegrationTest( logger.trace("integration test: make payment done"); - await ws.runUntilDone(); + await waitUntilDone(ws); logger.trace("integration test: all done!"); } async function waitUntilDone(ws: InternalWalletState): Promise { + logger.info("waiting until all transactions are in a final state"); + ws.ensureTaskLoopRunning(); let p: OpenedPromise | undefined = undefined; - ws.addNotificationListener((notif) => { + const cancelNotifs = ws.addNotificationListener((notif) => { if (!p) { return; } if (notif.type === NotificationType.TransactionStateTransition) { p.resolve(); } + // Work-around, refresh transactions don't properly emit transition notifications yet. + if (notif.type === NotificationType.PendingOperationProcessed) { + p.resolve(); + } }); + while (1) { + p = openPromise(); + const txs = await getTransactions(ws, { + includeRefreshes: true, + }); + let finished = true; + for (const tx of txs.transactions) { + switch (tx.txState.major) { + case TransactionMajorState.Pending: + case TransactionMajorState.Aborting: + finished = false; + logger.info( + `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`, + ); + break; + } + } + if (finished) { + break; + } + // Wait until transaction state changed + await p.promise; + } + cancelNotifs(); + logger.info("done waiting until all transactions are in a final state"); } async function waitUntilPendingReady( @@ -469,7 +500,7 @@ async function waitUntilPendingReady( logger.info(`starting waiting for ${transactionId} to be in pending(ready)`); ws.ensureTaskLoopRunning(); let p: OpenedPromise | undefined = undefined; - ws.addNotificationListener((notif) => { + const cancelNotifs = ws.addNotificationListener((notif) => { if (!p) { return; } @@ -492,7 +523,7 @@ async function waitUntilPendingReady( await p.promise; } logger.info(`done waiting for ${transactionId} to be in pending(ready)`); - // FIXME: Remove listener! + cancelNotifs(); } export async function runIntegrationTest2( @@ -516,7 +547,7 @@ export async function runIntegrationTest2( bankAccessApiBaseUrl: args.bankAccessApiBaseUrl, exchangeBaseUrl: args.exchangeBaseUrl, }); - await ws.runUntilDone(); + await waitUntilDone(ws); logger.info("done withdrawing test balance"); const balance = await getBalances(ws); @@ -536,7 +567,7 @@ export async function runIntegrationTest2( ); // Wait until the refresh is done - await ws.runUntilDone(); + await waitUntilDone(ws); logger.trace("withdrawing test balance for refund"); const withdrawAmountTwo = Amounts.parseOrThrow(`${currency}:18`); @@ -551,7 +582,7 @@ export async function runIntegrationTest2( }); // Wait until the withdraw is done - await ws.runUntilDone(); + await waitUntilDone(ws); const { orderId: refundOrderId } = await makePayment( ws, @@ -575,7 +606,7 @@ export async function runIntegrationTest2( logger.trace("integration test: applied refund"); // Wait until the refund is done - await ws.runUntilDone(); + await waitUntilDone(ws); logger.trace("integration test: making payment after refund"); @@ -588,7 +619,7 @@ export async function runIntegrationTest2( logger.trace("integration test: make payment done"); - await ws.runUntilDone(); + await waitUntilDone(ws); const peerPushInit = await initiatePeerPushDebit(ws, { partialContractTerms: { @@ -636,7 +667,7 @@ export async function runIntegrationTest2( peerPullPaymentIncomingId: peerPullInc.peerPullPaymentIncomingId, }); - await ws.runUntilDone(); + await waitUntilDone(ws); logger.trace("integration test: all done!"); } diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 1b355a32c..8f11a3d28 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -64,7 +64,6 @@ import { codecForAddKnownBankAccounts, codecForAny, codecForApplyDevExperiment, - codecForFailTransactionRequest, codecForCheckPeerPullPaymentRequest, codecForCheckPeerPushDebitRequest, codecForConfirmPayRequest, @@ -72,13 +71,13 @@ import { codecForConvertAmountRequest, codecForCreateDepositGroupRequest, codecForDeleteTransactionRequest, + codecForFailTransactionRequest, codecForForceRefreshRequest, codecForForgetKnownBankAccounts, codecForGetAmountRequest, codecForGetBalanceDetailRequest, codecForGetContractTermsDetails, codecForGetExchangeTosRequest, - codecForGetPlanForOperationRequest, codecForGetWithdrawalDetailsForAmountRequest, codecForGetWithdrawalDetailsForUri, codecForImportDbRequest, @@ -141,6 +140,7 @@ import { import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js"; import { ActiveLongpollInfo, + CancelFn, ExchangeOperations, InternalWalletState, MerchantInfo, @@ -171,6 +171,7 @@ import { import { setWalletDeviceId } from "./operations/backup/state.js"; import { getBalanceDetail, getBalances } from "./operations/balance.js"; import { + TaskIdentifiers, TaskRunResult, getExchangeTosStatus, makeExchangeListItem, @@ -196,7 +197,6 @@ import { } from "./operations/exchanges.js"; import { getMerchantInfo } from "./operations/merchants.js"; import { - computePayMerchantTransactionActions, computePayMerchantTransactionState, computeRefundTransactionState, confirmPay, @@ -271,6 +271,13 @@ import { } from "./operations/withdraw.js"; import { PendingTaskInfo, PendingTaskType } from "./pending-types.js"; import { assertUnreachable } from "./util/assertUnreachable.js"; +import { + convertDepositAmount, + convertPeerPushAmount, + convertWithdrawalAmount, + getMaxDepositAmount, + getMaxPeerPushAmount, +} from "./util/coinSelection.js"; import { createTimeline, selectBestForOverlappingDenominations, @@ -287,7 +294,6 @@ import { GetReadOnlyAccess, GetReadWriteAccess, } from "./util/query.js"; -import { TaskIdentifiers } from "./operations/common.js"; import { TimerAPI, TimerGroup } from "./util/timer.js"; import { WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, @@ -301,13 +307,6 @@ import { WalletCoreApiClient, WalletCoreResponseType, } from "./wallet-api-types.js"; -import { - convertDepositAmount, - convertPeerPushAmount, - convertWithdrawalAmount, - getMaxDepositAmount, - getMaxPeerPushAmount, -} from "./util/coinSelection.js"; const logger = new Logger("wallet.ts"); @@ -478,9 +477,8 @@ async function runTaskLoop( if (!AbsoluteTime.isExpired(p.timestampDue)) { continue; } + logger.info(`running task ${p.id}`); await runTaskWithErrorReporting(ws, p.id, async () => { - logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); - ws.isTaskLoopRunning = false; return await callOperationHandler(ws, p); }); ws.notify({ @@ -1679,7 +1677,7 @@ export class Wallet { return deepMerge(Wallet.defaultConfig, param ?? {}); } - addNotificationListener(f: (n: WalletNotification) => void): void { + addNotificationListener(f: (n: WalletNotification) => void): CancelFn { return this.ws.addNotificationListener(f); } @@ -1906,8 +1904,14 @@ class InternalWalletStateImpl implements InternalWalletState { } } - addNotificationListener(f: (n: WalletNotification) => void): void { + addNotificationListener(f: (n: WalletNotification) => void): CancelFn { this.listeners.push(f); + return () => { + const idx = this.listeners.indexOf(f); + if (idx >= 0) { + this.listeners.splice(idx, 1); + } + }; } /** @@ -1925,14 +1929,6 @@ class InternalWalletStateImpl implements InternalWalletState { } } - async runUntilDone( - req: { - maxRetries?: number; - } = {}, - ): Promise { - await runTaskLoop(this, { ...req, stopWhenDone: true }); - } - /** * Run an async function after acquiring a list of locks, identified * by string tokens. -- cgit v1.2.3