From 2a590ca82f3b63edcaeeba054961e6947df7a651 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 11 Jun 2024 14:56:25 +0200 Subject: wallet-core: refactor wait function --- packages/taler-wallet-core/src/exchanges.ts | 277 ++++++++++++---------------- 1 file changed, 122 insertions(+), 155 deletions(-) (limited to 'packages/taler-wallet-core/src/exchanges.ts') diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts index 51c702ca9..be46e4352 100644 --- a/packages/taler-wallet-core/src/exchanges.ts +++ b/packages/taler-wallet-core/src/exchanges.ts @@ -28,7 +28,6 @@ import { AgeRestriction, Amount, Amounts, - AsyncFlag, CancellationToken, CoinRefreshRequest, CoinStatus, @@ -80,6 +79,7 @@ import { WireInfo, assertUnreachable, checkDbInvariant, + checkLogicInvariant, codecForExchangeKeysJson, durationMul, encodeCrock, @@ -106,6 +106,7 @@ import { TransactionContext, computeDbBackoff, constructTaskIdentifier, + genericWaitForState, getAutoRefreshExecuteThreshold, getExchangeEntryStatusFromRecord, getExchangeState, @@ -1045,132 +1046,6 @@ export interface ReadyExchangeSummary { scopeInfo: ScopeInfo; } -async function internalWaitReadyExchange( - wex: WalletExecutionContext, - canonUrl: string, - exchangeNotifFlag: AsyncFlag, - options: { - cancellationToken?: CancellationToken; - forceUpdate?: boolean; - expectedMasterPub?: string; - } = {}, -): Promise { - const operationId = constructTaskIdentifier({ - tag: PendingTaskType.ExchangeUpdate, - exchangeBaseUrl: canonUrl, - }); - while (true) { - if (wex.cancellationToken.isCancelled) { - throw Error("cancelled"); - } - logger.info(`waiting for ready exchange ${canonUrl}`); - const { exchange, exchangeDetails, retryInfo, scopeInfo } = - await wex.db.runReadOnlyTx( - { - storeNames: [ - "exchanges", - "exchangeDetails", - "operationRetries", - "globalCurrencyAuditors", - "globalCurrencyExchanges", - ], - }, - async (tx) => { - const exchange = await tx.exchanges.get(canonUrl); - const exchangeDetails = await getExchangeRecordsInternal( - tx, - canonUrl, - ); - const retryInfo = await tx.operationRetries.get(operationId); - let scopeInfo: ScopeInfo | undefined = undefined; - if (exchange && exchangeDetails) { - scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails); - } - return { exchange, exchangeDetails, retryInfo, scopeInfo }; - }, - ); - - if (!exchange) { - throw Error("exchange entry does not exist anymore"); - } - - let ready = false; - - switch (exchange.updateStatus) { - case ExchangeEntryDbUpdateStatus.Ready: - ready = true; - break; - case ExchangeEntryDbUpdateStatus.ReadyUpdate: - // If the update is forced, - // we wait until we're in a full "ready" state, - // as we're not happy with the stale information. - if (!options.forceUpdate) { - ready = true; - } - break; - case ExchangeEntryDbUpdateStatus.UnavailableUpdate: - throw TalerError.fromDetail( - TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, - { - exchangeBaseUrl: canonUrl, - innerError: retryInfo?.lastError, - }, - ); - default: { - if (retryInfo) { - throw TalerError.fromDetail( - TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, - { - exchangeBaseUrl: canonUrl, - innerError: retryInfo?.lastError, - }, - ); - } - } - } - - if (!ready) { - logger.info("waiting for exchange update notification"); - await exchangeNotifFlag.wait(); - logger.info("done waiting for exchange update notification"); - exchangeNotifFlag.reset(); - continue; - } - - if (!exchangeDetails) { - throw Error("invariant failed"); - } - - if (!scopeInfo) { - throw Error("invariant failed"); - } - - const res: ReadyExchangeSummary = { - currency: exchangeDetails.currency, - exchangeBaseUrl: canonUrl, - masterPub: exchangeDetails.masterPublicKey, - tosStatus: getExchangeTosStatusFromRecord(exchange), - tosAcceptedEtag: exchange.tosAcceptedEtag, - wireInfo: exchangeDetails.wireInfo, - protocolVersionRange: exchangeDetails.protocolVersionRange, - tosCurrentEtag: exchange.tosCurrentEtag, - tosAcceptedTimestamp: timestampOptionalPreciseFromDb( - exchange.tosAcceptedTimestamp, - ), - scopeInfo, - }; - - if (options.expectedMasterPub) { - if (res.masterPub !== options.expectedMasterPub) { - throw Error( - "public key of the exchange does not match expected public key", - ); - } - } - return res; - } -} - /** * Ensure that a fresh exchange entry exists for the given * exchange base URL. @@ -1223,39 +1098,131 @@ async function waitReadyExchange( } = {}, ): Promise { logger.trace(`waiting for exchange ${canonUrl} to become ready`); - // FIXME: We should use Symbol.dispose magic here for cleanup! - const exchangeNotifFlag = new AsyncFlag(); - // Raise exchangeNotifFlag whenever we get a notification - // about our exchange. - const cancelNotif = wex.ws.addNotificationListener((notif) => { - if ( - notif.type === NotificationType.ExchangeStateTransition && - notif.exchangeBaseUrl === canonUrl - ) { - logger.info(`raising update notification: ${j2s(notif)}`); - exchangeNotifFlag.raise(); - } + const operationId = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeUpdate, + exchangeBaseUrl: canonUrl, }); - const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { - cancelNotif(); - exchangeNotifFlag.raise(); + let res: ReadyExchangeSummary | undefined = undefined; + + await genericWaitForState(wex, { + filterNotification(notif): boolean { + return ( + notif.type === NotificationType.ExchangeStateTransition && + notif.exchangeBaseUrl === canonUrl + ); + }, + async checkState(): Promise { + const { exchange, exchangeDetails, retryInfo, scopeInfo } = + await wex.db.runReadOnlyTx( + { + storeNames: [ + "exchanges", + "exchangeDetails", + "operationRetries", + "globalCurrencyAuditors", + "globalCurrencyExchanges", + ], + }, + async (tx) => { + const exchange = await tx.exchanges.get(canonUrl); + const exchangeDetails = await getExchangeRecordsInternal( + tx, + canonUrl, + ); + const retryInfo = await tx.operationRetries.get(operationId); + let scopeInfo: ScopeInfo | undefined = undefined; + if (exchange && exchangeDetails) { + scopeInfo = await internalGetExchangeScopeInfo( + tx, + exchangeDetails, + ); + } + return { exchange, exchangeDetails, retryInfo, scopeInfo }; + }, + ); + + if (!exchange) { + throw Error("exchange entry does not exist anymore"); + } + + let ready = false; + + switch (exchange.updateStatus) { + case ExchangeEntryDbUpdateStatus.Ready: + ready = true; + break; + case ExchangeEntryDbUpdateStatus.ReadyUpdate: + // If the update is forced, + // we wait until we're in a full "ready" state, + // as we're not happy with the stale information. + if (!options.forceUpdate) { + ready = true; + } + break; + case ExchangeEntryDbUpdateStatus.UnavailableUpdate: + throw TalerError.fromDetail( + TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, + { + exchangeBaseUrl: canonUrl, + innerError: retryInfo?.lastError, + }, + ); + default: { + if (retryInfo) { + throw TalerError.fromDetail( + TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, + { + exchangeBaseUrl: canonUrl, + innerError: retryInfo?.lastError, + }, + ); + } + } + } + + if (!ready) { + return false; + } + + if (!exchangeDetails) { + throw Error("invariant failed"); + } + + if (!scopeInfo) { + throw Error("invariant failed"); + } + + const mySummary: ReadyExchangeSummary = { + currency: exchangeDetails.currency, + exchangeBaseUrl: canonUrl, + masterPub: exchangeDetails.masterPublicKey, + tosStatus: getExchangeTosStatusFromRecord(exchange), + tosAcceptedEtag: exchange.tosAcceptedEtag, + wireInfo: exchangeDetails.wireInfo, + protocolVersionRange: exchangeDetails.protocolVersionRange, + tosCurrentEtag: exchange.tosCurrentEtag, + tosAcceptedTimestamp: timestampOptionalPreciseFromDb( + exchange.tosAcceptedTimestamp, + ), + scopeInfo, + }; + + if (options.expectedMasterPub) { + if (mySummary.masterPub !== options.expectedMasterPub) { + throw Error( + "public key of the exchange does not match expected public key", + ); + } + } + res = mySummary; + return true; + }, }); - try { - const res = await internalWaitReadyExchange( - wex, - canonUrl, - exchangeNotifFlag, - options, - ); - logger.info("done waiting for ready exchange"); - return res; - } finally { - unregisterOnCancelled(); - cancelNotif(); - } + checkLogicInvariant(!!res); + return res; } function checkPeerPaymentsDisabled( -- cgit v1.2.3