aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/exchanges.ts
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-06-11 14:56:25 +0200
committerFlorian Dold <florian@dold.me>2024-06-11 14:56:25 +0200
commit2a590ca82f3b63edcaeeba054961e6947df7a651 (patch)
tree2bddb91866a74989b9902111e1f0323f2f8d3754 /packages/taler-wallet-core/src/exchanges.ts
parent11648f3bfc7e87b9472602b95a2f05cfdd964a4c (diff)
downloadwallet-core-2a590ca82f3b63edcaeeba054961e6947df7a651.tar.xz
wallet-core: refactor wait function
Diffstat (limited to 'packages/taler-wallet-core/src/exchanges.ts')
-rw-r--r--packages/taler-wallet-core/src/exchanges.ts277
1 files changed, 122 insertions, 155 deletions
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
index 51c702ca9..be46e4352 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -28,7 +28,6 @@ import {
AgeRestriction,
Amount,
Amounts,
- AsyncFlag,
CancellationToken,
CoinRefreshRequest,
CoinStatus,
@@ -80,6 +79,7 @@ import {
WireInfo,
assertUnreachable,
checkDbInvariant,
+ checkLogicInvariant,
codecForExchangeKeysJson,
durationMul,
encodeCrock,
@@ -106,6 +106,7 @@ import {
TransactionContext,
computeDbBackoff,
constructTaskIdentifier,
+ genericWaitForState,
getAutoRefreshExecuteThreshold,
getExchangeEntryStatusFromRecord,
getExchangeState,
@@ -1045,132 +1046,6 @@ export interface ReadyExchangeSummary {
scopeInfo: ScopeInfo;
}
-async function internalWaitReadyExchange(
- wex: WalletExecutionContext,
- canonUrl: string,
- exchangeNotifFlag: AsyncFlag,
- options: {
- cancellationToken?: CancellationToken;
- forceUpdate?: boolean;
- expectedMasterPub?: string;
- } = {},
-): Promise<ReadyExchangeSummary> {
- const operationId = constructTaskIdentifier({
- tag: PendingTaskType.ExchangeUpdate,
- exchangeBaseUrl: canonUrl,
- });
- while (true) {
- if (wex.cancellationToken.isCancelled) {
- throw Error("cancelled");
- }
- logger.info(`waiting for ready exchange ${canonUrl}`);
- const { exchange, exchangeDetails, retryInfo, scopeInfo } =
- await wex.db.runReadOnlyTx(
- {
- storeNames: [
- "exchanges",
- "exchangeDetails",
- "operationRetries",
- "globalCurrencyAuditors",
- "globalCurrencyExchanges",
- ],
- },
- async (tx) => {
- const exchange = await tx.exchanges.get(canonUrl);
- const exchangeDetails = await getExchangeRecordsInternal(
- tx,
- canonUrl,
- );
- const retryInfo = await tx.operationRetries.get(operationId);
- let scopeInfo: ScopeInfo | undefined = undefined;
- if (exchange && exchangeDetails) {
- scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails);
- }
- return { exchange, exchangeDetails, retryInfo, scopeInfo };
- },
- );
-
- if (!exchange) {
- throw Error("exchange entry does not exist anymore");
- }
-
- let ready = false;
-
- switch (exchange.updateStatus) {
- case ExchangeEntryDbUpdateStatus.Ready:
- ready = true;
- break;
- case ExchangeEntryDbUpdateStatus.ReadyUpdate:
- // If the update is forced,
- // we wait until we're in a full "ready" state,
- // as we're not happy with the stale information.
- if (!options.forceUpdate) {
- ready = true;
- }
- break;
- case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
- throw TalerError.fromDetail(
- TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
- {
- exchangeBaseUrl: canonUrl,
- innerError: retryInfo?.lastError,
- },
- );
- default: {
- if (retryInfo) {
- throw TalerError.fromDetail(
- TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
- {
- exchangeBaseUrl: canonUrl,
- innerError: retryInfo?.lastError,
- },
- );
- }
- }
- }
-
- if (!ready) {
- logger.info("waiting for exchange update notification");
- await exchangeNotifFlag.wait();
- logger.info("done waiting for exchange update notification");
- exchangeNotifFlag.reset();
- continue;
- }
-
- if (!exchangeDetails) {
- throw Error("invariant failed");
- }
-
- if (!scopeInfo) {
- throw Error("invariant failed");
- }
-
- const res: ReadyExchangeSummary = {
- currency: exchangeDetails.currency,
- exchangeBaseUrl: canonUrl,
- masterPub: exchangeDetails.masterPublicKey,
- tosStatus: getExchangeTosStatusFromRecord(exchange),
- tosAcceptedEtag: exchange.tosAcceptedEtag,
- wireInfo: exchangeDetails.wireInfo,
- protocolVersionRange: exchangeDetails.protocolVersionRange,
- tosCurrentEtag: exchange.tosCurrentEtag,
- tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
- exchange.tosAcceptedTimestamp,
- ),
- scopeInfo,
- };
-
- if (options.expectedMasterPub) {
- if (res.masterPub !== options.expectedMasterPub) {
- throw Error(
- "public key of the exchange does not match expected public key",
- );
- }
- }
- return res;
- }
-}
-
/**
* Ensure that a fresh exchange entry exists for the given
* exchange base URL.
@@ -1223,39 +1098,131 @@ async function waitReadyExchange(
} = {},
): Promise<ReadyExchangeSummary> {
logger.trace(`waiting for exchange ${canonUrl} to become ready`);
- // FIXME: We should use Symbol.dispose magic here for cleanup!
- const exchangeNotifFlag = new AsyncFlag();
- // Raise exchangeNotifFlag whenever we get a notification
- // about our exchange.
- const cancelNotif = wex.ws.addNotificationListener((notif) => {
- if (
- notif.type === NotificationType.ExchangeStateTransition &&
- notif.exchangeBaseUrl === canonUrl
- ) {
- logger.info(`raising update notification: ${j2s(notif)}`);
- exchangeNotifFlag.raise();
- }
+ const operationId = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeUpdate,
+ exchangeBaseUrl: canonUrl,
});
- const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
- cancelNotif();
- exchangeNotifFlag.raise();
+ let res: ReadyExchangeSummary | undefined = undefined;
+
+ await genericWaitForState(wex, {
+ filterNotification(notif): boolean {
+ return (
+ notif.type === NotificationType.ExchangeStateTransition &&
+ notif.exchangeBaseUrl === canonUrl
+ );
+ },
+ async checkState(): Promise<boolean> {
+ const { exchange, exchangeDetails, retryInfo, scopeInfo } =
+ await wex.db.runReadOnlyTx(
+ {
+ storeNames: [
+ "exchanges",
+ "exchangeDetails",
+ "operationRetries",
+ "globalCurrencyAuditors",
+ "globalCurrencyExchanges",
+ ],
+ },
+ async (tx) => {
+ const exchange = await tx.exchanges.get(canonUrl);
+ const exchangeDetails = await getExchangeRecordsInternal(
+ tx,
+ canonUrl,
+ );
+ const retryInfo = await tx.operationRetries.get(operationId);
+ let scopeInfo: ScopeInfo | undefined = undefined;
+ if (exchange && exchangeDetails) {
+ scopeInfo = await internalGetExchangeScopeInfo(
+ tx,
+ exchangeDetails,
+ );
+ }
+ return { exchange, exchangeDetails, retryInfo, scopeInfo };
+ },
+ );
+
+ if (!exchange) {
+ throw Error("exchange entry does not exist anymore");
+ }
+
+ let ready = false;
+
+ switch (exchange.updateStatus) {
+ case ExchangeEntryDbUpdateStatus.Ready:
+ ready = true;
+ break;
+ case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ // If the update is forced,
+ // we wait until we're in a full "ready" state,
+ // as we're not happy with the stale information.
+ if (!options.forceUpdate) {
+ ready = true;
+ }
+ break;
+ case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
+ {
+ exchangeBaseUrl: canonUrl,
+ innerError: retryInfo?.lastError,
+ },
+ );
+ default: {
+ if (retryInfo) {
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
+ {
+ exchangeBaseUrl: canonUrl,
+ innerError: retryInfo?.lastError,
+ },
+ );
+ }
+ }
+ }
+
+ if (!ready) {
+ return false;
+ }
+
+ if (!exchangeDetails) {
+ throw Error("invariant failed");
+ }
+
+ if (!scopeInfo) {
+ throw Error("invariant failed");
+ }
+
+ const mySummary: ReadyExchangeSummary = {
+ currency: exchangeDetails.currency,
+ exchangeBaseUrl: canonUrl,
+ masterPub: exchangeDetails.masterPublicKey,
+ tosStatus: getExchangeTosStatusFromRecord(exchange),
+ tosAcceptedEtag: exchange.tosAcceptedEtag,
+ wireInfo: exchangeDetails.wireInfo,
+ protocolVersionRange: exchangeDetails.protocolVersionRange,
+ tosCurrentEtag: exchange.tosCurrentEtag,
+ tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
+ exchange.tosAcceptedTimestamp,
+ ),
+ scopeInfo,
+ };
+
+ if (options.expectedMasterPub) {
+ if (mySummary.masterPub !== options.expectedMasterPub) {
+ throw Error(
+ "public key of the exchange does not match expected public key",
+ );
+ }
+ }
+ res = mySummary;
+ return true;
+ },
});
- try {
- const res = await internalWaitReadyExchange(
- wex,
- canonUrl,
- exchangeNotifFlag,
- options,
- );
- logger.info("done waiting for ready exchange");
- return res;
- } finally {
- unregisterOnCancelled();
- cancelNotif();
- }
+ checkLogicInvariant(!!res);
+ return res;
}
function checkPeerPaymentsDisabled(