aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-07-01 00:52:14 +0200
committerFlorian Dold <florian@dold.me>2023-07-01 00:52:14 +0200
commitf93ab03a1b946af441e35b9c057f129d25311273 (patch)
tree063624453ae0b6a38f859bd530ab0a1e29db93fb
parent7a18e12a175856b3d17d2bb70ec549004c281ff5 (diff)
wallet-core: get rid of internal runUntilDone usages
-rw-r--r--packages/taler-wallet-core/src/internal-wallet-state.ts6
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts2
-rw-r--r--packages/taler-wallet-core/src/operations/recoup.ts23
-rw-r--r--packages/taler-wallet-core/src/operations/refresh.ts11
-rw-r--r--packages/taler-wallet-core/src/operations/testing.ts61
-rw-r--r--packages/taler-wallet-core/src/wallet.ts42
-rw-r--r--packages/taler-wallet-embedded/src/wallet-qjs.ts15
7 files changed, 92 insertions, 68 deletions
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts
index a2ca34a86..742af89a8 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -145,6 +145,8 @@ export interface ActiveLongpollInfo {
};
}
+export type CancelFn = () => void;
+
/**
* Internal, shared wallet state that is used by the implementation
* of wallet operations.
@@ -206,7 +208,7 @@ export interface InternalWalletState {
notify(n: WalletNotification): void;
- addNotificationListener(f: (n: WalletNotification) => void): void;
+ addNotificationListener(f: (n: WalletNotification) => void): CancelFn;
/**
* Stop ongoing processing.
@@ -219,8 +221,6 @@ export interface InternalWalletState {
*/
runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>;
- runUntilDone(req?: { maxRetries?: number }): Promise<void>;
-
/**
* Ensure that a task loop is currently running.
* Starts one if no task loop is running.
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
index 1aa332439..eca3bc91b 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-debit.ts
@@ -64,9 +64,7 @@ import { checkLogicInvariant } from "../util/invariants.js";
import {
TaskRunResult,
TaskRunResultType,
- TaskIdentifiers,
constructTaskIdentifier,
- runTaskWithErrorReporting,
spendCoins,
} from "./common.js";
import {
diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts
index c8c766d1b..dea2d4b16 100644
--- a/packages/taler-wallet-core/src/operations/recoup.ts
+++ b/packages/taler-wallet-core/src/operations/recoup.ts
@@ -26,36 +26,34 @@
*/
import {
Amounts,
- codecForRecoupConfirmation,
- codecForReserveStatus,
CoinStatus,
- encodeCrock,
- getRandomBytes,
- j2s,
Logger,
- NotificationType,
RefreshReason,
- TalerProtocolTimestamp,
TalerPreciseTimestamp,
URL,
+ codecForRecoupConfirmation,
+ codecForReserveStatus,
+ encodeCrock,
+ getRandomBytes,
+ j2s,
} from "@gnu-taler/taler-util";
+import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import {
CoinRecord,
CoinSourceType,
RecoupGroupRecord,
RefreshCoinSource,
WalletStoresV1,
+ WithdrawCoinSource,
WithdrawalGroupStatus,
WithdrawalRecordType,
- WithdrawCoinSource,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
-import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
-import { createRefreshGroup, processRefreshGroup } from "./refresh.js";
-import { internalCreateWithdrawalGroup } from "./withdraw.js";
import { TaskRunResult } from "./common.js";
+import { createRefreshGroup } from "./refresh.js";
+import { internalCreateWithdrawalGroup } from "./withdraw.js";
const logger = new Logger("operations/recoup.ts");
@@ -402,9 +400,6 @@ export async function processRecoupGroup(
rg2.scheduleRefreshCoins,
RefreshReason.Recoup,
);
- processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((e) => {
- logger.error(`error while refreshing after recoup ${e}`);
- });
}
await tx.recoupGroups.put(rg2);
});
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts
index 65346a923..fd6281eda 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -810,9 +810,9 @@ export async function processRefreshGroup(
}),
);
try {
- logger.trace("waiting for refreshes");
+ logger.info("waiting for refreshes");
await Promise.all(ps);
- logger.trace("refresh finished");
+ logger.info("refresh group finished");
} catch (e) {
logger.warn("process refresh sessions got exception");
logger.warn(`exception: ${e}`);
@@ -1066,13 +1066,6 @@ export async function createRefreshGroup(
logger.info(`created refresh group ${refreshGroupId}`);
- processRefreshGroup(ws, refreshGroupId).catch((e) => {
- if (e instanceof CryptoApiStoppedError) {
- return;
- }
- logger.warn(`processing refresh group ${refreshGroupId} failed: ${e}`);
- });
-
return {
refreshGroupId,
};
diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts
index 2a3584a0a..8c84702b8 100644
--- a/packages/taler-wallet-core/src/operations/testing.ts
+++ b/packages/taler-wallet-core/src/operations/testing.ts
@@ -68,7 +68,7 @@ import {
} from "./pay-peer-push-credit.js";
import { initiatePeerPushDebit } from "./pay-peer-push-debit.js";
import { OpenedPromise, openPromise } from "../index.js";
-import { getTransactionById } from "./transactions.js";
+import { getTransactionById, getTransactions } from "./transactions.js";
const logger = new Logger("operations/testing.ts");
@@ -378,7 +378,7 @@ export async function runIntegrationTest(
bankAccessApiBaseUrl: args.bankAccessApiBaseUrl,
exchangeBaseUrl: args.exchangeBaseUrl,
});
- await ws.runUntilDone();
+ await waitUntilDone(ws);
logger.info("done withdrawing test balance");
const balance = await getBalances(ws);
@@ -393,7 +393,7 @@ export async function runIntegrationTest(
await makePayment(ws, myMerchant, args.amountToSpend, "hello world");
// Wait until the refresh is done
- await ws.runUntilDone();
+ await waitUntilDone(ws);
logger.trace("withdrawing test balance for refund");
const withdrawAmountTwo = Amounts.parseOrThrow(`${currency}:18`);
@@ -408,7 +408,7 @@ export async function runIntegrationTest(
});
// Wait until the withdraw is done
- await ws.runUntilDone();
+ await waitUntilDone(ws);
const { orderId: refundOrderId } = await makePayment(
ws,
@@ -432,7 +432,7 @@ export async function runIntegrationTest(
logger.trace("integration test: applied refund");
// Wait until the refund is done
- await ws.runUntilDone();
+ await waitUntilDone(ws);
logger.trace("integration test: making payment after refund");
@@ -445,21 +445,52 @@ export async function runIntegrationTest(
logger.trace("integration test: make payment done");
- await ws.runUntilDone();
+ await waitUntilDone(ws);
logger.trace("integration test: all done!");
}
async function waitUntilDone(ws: InternalWalletState): Promise<void> {
+ logger.info("waiting until all transactions are in a final state");
+ ws.ensureTaskLoopRunning();
let p: OpenedPromise<void> | undefined = undefined;
- ws.addNotificationListener((notif) => {
+ const cancelNotifs = ws.addNotificationListener((notif) => {
if (!p) {
return;
}
if (notif.type === NotificationType.TransactionStateTransition) {
p.resolve();
}
+ // Work-around, refresh transactions don't properly emit transition notifications yet.
+ if (notif.type === NotificationType.PendingOperationProcessed) {
+ p.resolve();
+ }
});
+ while (1) {
+ p = openPromise();
+ const txs = await getTransactions(ws, {
+ includeRefreshes: true,
+ });
+ let finished = true;
+ for (const tx of txs.transactions) {
+ switch (tx.txState.major) {
+ case TransactionMajorState.Pending:
+ case TransactionMajorState.Aborting:
+ finished = false;
+ logger.info(
+ `continuing waiting, ${tx.transactionId} in ${tx.txState.major}(${tx.txState.minor})`,
+ );
+ break;
+ }
+ }
+ if (finished) {
+ break;
+ }
+ // Wait until transaction state changed
+ await p.promise;
+ }
+ cancelNotifs();
+ logger.info("done waiting until all transactions are in a final state");
}
async function waitUntilPendingReady(
@@ -469,7 +500,7 @@ async function waitUntilPendingReady(
logger.info(`starting waiting for ${transactionId} to be in pending(ready)`);
ws.ensureTaskLoopRunning();
let p: OpenedPromise<void> | undefined = undefined;
- ws.addNotificationListener((notif) => {
+ const cancelNotifs = ws.addNotificationListener((notif) => {
if (!p) {
return;
}
@@ -492,7 +523,7 @@ async function waitUntilPendingReady(
await p.promise;
}
logger.info(`done waiting for ${transactionId} to be in pending(ready)`);
- // FIXME: Remove listener!
+ cancelNotifs();
}
export async function runIntegrationTest2(
@@ -516,7 +547,7 @@ export async function runIntegrationTest2(
bankAccessApiBaseUrl: args.bankAccessApiBaseUrl,
exchangeBaseUrl: args.exchangeBaseUrl,
});
- await ws.runUntilDone();
+ await waitUntilDone(ws);
logger.info("done withdrawing test balance");
const balance = await getBalances(ws);
@@ -536,7 +567,7 @@ export async function runIntegrationTest2(
);
// Wait until the refresh is done
- await ws.runUntilDone();
+ await waitUntilDone(ws);
logger.trace("withdrawing test balance for refund");
const withdrawAmountTwo = Amounts.parseOrThrow(`${currency}:18`);
@@ -551,7 +582,7 @@ export async function runIntegrationTest2(
});
// Wait until the withdraw is done
- await ws.runUntilDone();
+ await waitUntilDone(ws);
const { orderId: refundOrderId } = await makePayment(
ws,
@@ -575,7 +606,7 @@ export async function runIntegrationTest2(
logger.trace("integration test: applied refund");
// Wait until the refund is done
- await ws.runUntilDone();
+ await waitUntilDone(ws);
logger.trace("integration test: making payment after refund");
@@ -588,7 +619,7 @@ export async function runIntegrationTest2(
logger.trace("integration test: make payment done");
- await ws.runUntilDone();
+ await waitUntilDone(ws);
const peerPushInit = await initiatePeerPushDebit(ws, {
partialContractTerms: {
@@ -636,7 +667,7 @@ export async function runIntegrationTest2(
peerPullPaymentIncomingId: peerPullInc.peerPullPaymentIncomingId,
});
- await ws.runUntilDone();
+ await waitUntilDone(ws);
logger.trace("integration test: all done!");
}
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 1b355a32c..8f11a3d28 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -64,7 +64,6 @@ import {
codecForAddKnownBankAccounts,
codecForAny,
codecForApplyDevExperiment,
- codecForFailTransactionRequest,
codecForCheckPeerPullPaymentRequest,
codecForCheckPeerPushDebitRequest,
codecForConfirmPayRequest,
@@ -72,13 +71,13 @@ import {
codecForConvertAmountRequest,
codecForCreateDepositGroupRequest,
codecForDeleteTransactionRequest,
+ codecForFailTransactionRequest,
codecForForceRefreshRequest,
codecForForgetKnownBankAccounts,
codecForGetAmountRequest,
codecForGetBalanceDetailRequest,
codecForGetContractTermsDetails,
codecForGetExchangeTosRequest,
- codecForGetPlanForOperationRequest,
codecForGetWithdrawalDetailsForAmountRequest,
codecForGetWithdrawalDetailsForUri,
codecForImportDbRequest,
@@ -141,6 +140,7 @@ import {
import { DevExperimentHttpLib, applyDevExperiment } from "./dev-experiments.js";
import {
ActiveLongpollInfo,
+ CancelFn,
ExchangeOperations,
InternalWalletState,
MerchantInfo,
@@ -171,6 +171,7 @@ import {
import { setWalletDeviceId } from "./operations/backup/state.js";
import { getBalanceDetail, getBalances } from "./operations/balance.js";
import {
+ TaskIdentifiers,
TaskRunResult,
getExchangeTosStatus,
makeExchangeListItem,
@@ -196,7 +197,6 @@ import {
} from "./operations/exchanges.js";
import { getMerchantInfo } from "./operations/merchants.js";
import {
- computePayMerchantTransactionActions,
computePayMerchantTransactionState,
computeRefundTransactionState,
confirmPay,
@@ -272,6 +272,13 @@ import {
import { PendingTaskInfo, PendingTaskType } from "./pending-types.js";
import { assertUnreachable } from "./util/assertUnreachable.js";
import {
+ convertDepositAmount,
+ convertPeerPushAmount,
+ convertWithdrawalAmount,
+ getMaxDepositAmount,
+ getMaxPeerPushAmount,
+} from "./util/coinSelection.js";
+import {
createTimeline,
selectBestForOverlappingDenominations,
selectMinimumFee,
@@ -287,7 +294,6 @@ import {
GetReadOnlyAccess,
GetReadWriteAccess,
} from "./util/query.js";
-import { TaskIdentifiers } from "./operations/common.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@@ -301,13 +307,6 @@ import {
WalletCoreApiClient,
WalletCoreResponseType,
} from "./wallet-api-types.js";
-import {
- convertDepositAmount,
- convertPeerPushAmount,
- convertWithdrawalAmount,
- getMaxDepositAmount,
- getMaxPeerPushAmount,
-} from "./util/coinSelection.js";
const logger = new Logger("wallet.ts");
@@ -478,9 +477,8 @@ async function runTaskLoop(
if (!AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
+ logger.info(`running task ${p.id}`);
await runTaskWithErrorReporting(ws, p.id, async () => {
- logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
- ws.isTaskLoopRunning = false;
return await callOperationHandler(ws, p);
});
ws.notify({
@@ -1679,7 +1677,7 @@ export class Wallet {
return deepMerge(Wallet.defaultConfig, param ?? {});
}
- addNotificationListener(f: (n: WalletNotification) => void): void {
+ addNotificationListener(f: (n: WalletNotification) => void): CancelFn {
return this.ws.addNotificationListener(f);
}
@@ -1906,8 +1904,14 @@ class InternalWalletStateImpl implements InternalWalletState {
}
}
- addNotificationListener(f: (n: WalletNotification) => void): void {
+ addNotificationListener(f: (n: WalletNotification) => void): CancelFn {
this.listeners.push(f);
+ return () => {
+ const idx = this.listeners.indexOf(f);
+ if (idx >= 0) {
+ this.listeners.splice(idx, 1);
+ }
+ };
}
/**
@@ -1925,14 +1929,6 @@ class InternalWalletStateImpl implements InternalWalletState {
}
}
- async runUntilDone(
- req: {
- maxRetries?: number;
- } = {},
- ): Promise<void> {
- await runTaskLoop(this, { ...req, stopWhenDone: true });
- }
-
/**
* Run an async function after acquiring a list of locks, identified
* by string tokens.
diff --git a/packages/taler-wallet-embedded/src/wallet-qjs.ts b/packages/taler-wallet-embedded/src/wallet-qjs.ts
index e6456c930..2b1c8a108 100644
--- a/packages/taler-wallet-embedded/src/wallet-qjs.ts
+++ b/packages/taler-wallet-embedded/src/wallet-qjs.ts
@@ -214,7 +214,13 @@ export function installNativeWalletListener(): void {
globalThis.installNativeWalletListener = installNativeWalletListener;
export async function testWithGv() {
- const w = await createNativeWalletHost2();
+ const w = await createNativeWalletHost2({
+ config: {
+ features: {
+ allowHttp: true,
+ },
+ },
+ });
await w.wallet.client.call(WalletApiOperation.InitWallet, {});
await w.wallet.client.call(WalletApiOperation.RunIntegrationTest, {
amountToSpend: "KUDOS:1",
@@ -233,13 +239,18 @@ export async function testWithLocal() {
console.log("running local test");
const w = await createNativeWalletHost2({
persistentStoragePath: "walletdb.json",
+ config: {
+ features: {
+ allowHttp: true,
+ },
+ },
});
console.log("created wallet");
await w.wallet.client.call(WalletApiOperation.InitWallet, {
skipDefaults: true,
});
console.log("initialized wallet");
- await w.wallet.client.call(WalletApiOperation.RunIntegrationTest, {
+ await w.wallet.client.call(WalletApiOperation.RunIntegrationTestV2, {
amountToSpend: "TESTKUDOS:1",
amountToWithdraw: "TESTKUDOS:3",
bankAccessApiBaseUrl: "http://localhost:8082/taler-bank-access/",