aboutsummaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-06-30 23:01:48 +0200
committerFlorian Dold <florian@dold.me>2023-06-30 23:01:48 +0200
commit7a18e12a175856b3d17d2bb70ec549004c281ff5 (patch)
tree73b753a55458774aa646356c3d09ac1417458696 /packages
parent86e9799ffdfa5aab5a25cbce1d18881e0a3dfc3e (diff)
downloadwallet-core-7a18e12a175856b3d17d2bb70ec549004c281ff5.tar.xz
wallet-core: towards event-based waiting in runIntegrationTestV2
Diffstat (limited to 'packages')
-rw-r--r--packages/taler-wallet-core/src/internal-wallet-state.ts8
-rw-r--r--packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts48
-rw-r--r--packages/taler-wallet-core/src/operations/testing.ts44
-rw-r--r--packages/taler-wallet-core/src/wallet.ts48
4 files changed, 124 insertions, 24 deletions
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts
index d97703dc1..a2ca34a86 100644
--- a/packages/taler-wallet-core/src/internal-wallet-state.ts
+++ b/packages/taler-wallet-core/src/internal-wallet-state.ts
@@ -184,6 +184,8 @@ export interface InternalWalletState {
merchantOps: MerchantOperations;
refreshOps: RefreshOperations;
+ isTaskLoopRunning: boolean;
+
getTransactionState(
ws: InternalWalletState,
tx: GetReadOnlyAccess<typeof WalletStoresV1>,
@@ -218,4 +220,10 @@ export interface InternalWalletState {
runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>;
runUntilDone(req?: { maxRetries?: number }): Promise<void>;
+
+ /**
+ * Ensure that a task loop is currently running.
+ * Starts one if no task loop is running.
+ */
+ ensureTaskLoopRunning(): void;
}
diff --git a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
index 4c00ed592..c7e13754f 100644
--- a/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/operations/pay-peer-pull-credit.ts
@@ -746,31 +746,37 @@ export async function initiatePeerPullPayment(
undefined,
);
- await ws.db
+ const transitionInfo = await ws.db
.mktx((x) => [x.peerPullPaymentInitiations, x.contractTerms])
.runReadWrite(async (tx) => {
- await tx.peerPullPaymentInitiations.put({
- amount: req.partialContractTerms.amount,
- contractTermsHash: hContractTerms,
- exchangeBaseUrl: exchangeBaseUrl,
- pursePriv: pursePair.priv,
- pursePub: pursePair.pub,
- mergePriv: mergePair.priv,
- mergePub: mergePair.pub,
- status: PeerPullPaymentInitiationStatus.PendingCreatePurse,
- contractTerms: contractTerms,
- mergeTimestamp,
- contractEncNonce,
- mergeReserveRowId: mergeReserveRowId,
- contractPriv: contractKeyPair.priv,
- contractPub: contractKeyPair.pub,
- withdrawalGroupId,
- estimatedAmountEffective: wi.withdrawalAmountEffective,
- });
+ const ppi: PeerPullPaymentInitiationRecord = {
+ amount: req.partialContractTerms.amount,
+ contractTermsHash: hContractTerms,
+ exchangeBaseUrl: exchangeBaseUrl,
+ pursePriv: pursePair.priv,
+ pursePub: pursePair.pub,
+ mergePriv: mergePair.priv,
+ mergePub: mergePair.pub,
+ status: PeerPullPaymentInitiationStatus.PendingCreatePurse,
+ contractTerms: contractTerms,
+ mergeTimestamp,
+ contractEncNonce,
+ mergeReserveRowId: mergeReserveRowId,
+ contractPriv: contractKeyPair.priv,
+ contractPub: contractKeyPair.pub,
+ withdrawalGroupId,
+ estimatedAmountEffective: wi.withdrawalAmountEffective,
+ }
+ await tx.peerPullPaymentInitiations.put(ppi);
+ const oldTxState: TransactionState = {
+ major: TransactionMajorState.None,
+ };
+ const newTxState = computePeerPullCreditTransactionState(ppi);
await tx.contractTerms.put({
contractTermsRaw: contractTerms,
h: hContractTerms,
});
+ return { oldTxState, newTxState };
});
const transactionId = constructTransactionIdentifier({
@@ -781,6 +787,10 @@ export async function initiatePeerPullPayment(
// The pending-incoming balance has changed.
ws.notify({ type: NotificationType.BalanceChange });
+ notifyTransition(ws, transactionId, transitionInfo);
+
+ ws.workAvailable.trigger();
+
return {
talerUri: stringifyTalerUri({
type: TalerUriAction.PayPull,
diff --git a/packages/taler-wallet-core/src/operations/testing.ts b/packages/taler-wallet-core/src/operations/testing.ts
index 77e218cd7..2a3584a0a 100644
--- a/packages/taler-wallet-core/src/operations/testing.ts
+++ b/packages/taler-wallet-core/src/operations/testing.ts
@@ -27,6 +27,8 @@ import {
NotificationType,
stringToBytes,
TestPayResult,
+ TransactionMajorState,
+ TransactionMinorState,
WithdrawTestBalanceRequest,
} from "@gnu-taler/taler-util";
import {
@@ -66,6 +68,7 @@ import {
} from "./pay-peer-push-credit.js";
import { initiatePeerPushDebit } from "./pay-peer-push-debit.js";
import { OpenedPromise, openPromise } from "../index.js";
+import { getTransactionById } from "./transactions.js";
const logger = new Logger("operations/testing.ts");
@@ -459,10 +462,45 @@ async function waitUntilDone(ws: InternalWalletState): Promise<void> {
});
}
+async function waitUntilPendingReady(
+ ws: InternalWalletState,
+ transactionId: string,
+): Promise<void> {
+ logger.info(`starting waiting for ${transactionId} to be in pending(ready)`);
+ ws.ensureTaskLoopRunning();
+ let p: OpenedPromise<void> | undefined = undefined;
+ ws.addNotificationListener((notif) => {
+ if (!p) {
+ return;
+ }
+ if (notif.type === NotificationType.TransactionStateTransition) {
+ p.resolve();
+ }
+ });
+ while (1) {
+ p = openPromise();
+ const tx = await getTransactionById(ws, {
+ transactionId,
+ });
+ if (
+ tx.txState.major == TransactionMajorState.Pending &&
+ tx.txState.minor === TransactionMinorState.Ready
+ ) {
+ break;
+ }
+ // Wait until transaction state changed
+ await p.promise;
+ }
+ logger.info(`done waiting for ${transactionId} to be in pending(ready)`);
+ // FIXME: Remove listener!
+}
+
export async function runIntegrationTest2(
ws: InternalWalletState,
args: IntegrationTestV2Args,
): Promise<void> {
+ // FIXME: Make sure that a task look is running, since we're
+ // waiting for notifications.
logger.info("running test with arguments", args);
const exchangeInfo = await updateExchangeFromUrl(ws, args.exchangeBaseUrl);
@@ -565,6 +603,8 @@ export async function runIntegrationTest2(
},
});
+ await waitUntilPendingReady(ws, peerPushInit.transactionId);
+
const peerPushCredit = await preparePeerPushCredit(ws, {
talerUri: peerPushInit.talerUri,
});
@@ -586,6 +626,8 @@ export async function runIntegrationTest2(
},
});
+ await waitUntilPendingReady(ws, peerPullInit.transactionId);
+
const peerPullInc = await preparePeerPullDebit(ws, {
talerUri: peerPullInit.talerUri,
});
@@ -594,6 +636,8 @@ export async function runIntegrationTest2(
peerPullPaymentIncomingId: peerPullInc.peerPullPaymentIncomingId,
});
+ await ws.runUntilDone();
+
logger.trace("integration test: all done!");
}
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 583dc33d4..1b355a32c 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -287,9 +287,7 @@ import {
GetReadOnlyAccess,
GetReadWriteAccess,
} from "./util/query.js";
-import {
- TaskIdentifiers,
-} from "./operations/common.js";
+import { TaskIdentifiers } from "./operations/common.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@@ -404,6 +402,12 @@ async function runTaskLoop(
opts: RetryLoopOpts = {},
): Promise<TaskLoopResult> {
logger.info(`running task loop opts=${j2s(opts)}`);
+ if (ws.isTaskLoopRunning) {
+ logger.warn(
+ "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided",
+ );
+ }
+ ws.isTaskLoopRunning = true;
let retriesExceeded = false;
for (let iteration = 0; !ws.stopped; iteration++) {
const pending = await getPendingOperations(ws);
@@ -434,6 +438,14 @@ async function runTaskLoop(
if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {
logger.warn(`stopping, as no pending operations have lifeness`);
+ ws.isTaskLoopRunning = false;
+ return {
+ retriesExceeded,
+ };
+ }
+
+ if (ws.stopped) {
+ ws.isTaskLoopRunning = false;
return {
retriesExceeded,
};
@@ -468,16 +480,24 @@ async function runTaskLoop(
}
await runTaskWithErrorReporting(ws, p.id, async () => {
logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
+ ws.isTaskLoopRunning = false;
return await callOperationHandler(ws, p);
});
ws.notify({
type: NotificationType.PendingOperationProcessed,
id: p.id,
});
+ if (ws.stopped) {
+ ws.isTaskLoopRunning = false;
+ return {
+ retriesExceeded,
+ };
+ }
}
}
}
- logger.trace("exiting wallet retry loop");
+ logger.trace("exiting wallet task loop");
+ ws.isTaskLoopRunning = false;
return {
retriesExceeded,
};
@@ -1575,7 +1595,9 @@ export async function handleCoreApiRequest(
};
} catch (e: any) {
const err = getErrorDetailFromException(e);
- logger.info(`finished wallet core request ${operation} with error: ${j2s(err)}`);
+ logger.info(
+ `finished wallet core request ${operation} with error: ${j2s(err)}`,
+ );
return {
type: "error",
operation,
@@ -1737,6 +1759,8 @@ class InternalWalletStateImpl implements InternalWalletState {
*/
private resourceLocks: Set<string> = new Set();
+ isTaskLoopRunning: boolean = false;
+
config: Readonly<WalletConfig>;
constructor(
@@ -1948,6 +1972,20 @@ class InternalWalletStateImpl implements InternalWalletState {
}
}
}
+
+ ensureTaskLoopRunning(): void {
+ if (this.isTaskLoopRunning) {
+ return;
+ }
+ runTaskLoop(this)
+ .catch((e) => {
+ logger.error("error running task loop");
+ logger.error(`err: ${e}`);
+ })
+ .then(() => {
+ logger.info("done running task loop");
+ });
+ }
}
/**