diff options
Diffstat (limited to 'packages/taler-wallet-core/src/exchanges.ts')
-rw-r--r-- | packages/taler-wallet-core/src/exchanges.ts | 1644 |
1 files changed, 1320 insertions, 324 deletions
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts index adb696de0..773ad0d59 100644 --- a/packages/taler-wallet-core/src/exchanges.ts +++ b/packages/taler-wallet-core/src/exchanges.ts @@ -25,13 +25,17 @@ */ import { AbsoluteTime, + AccountKycStatus, + AccountLimit, AgeRestriction, Amount, + AmountLike, + AmountString, Amounts, - AsyncFlag, CancellationToken, CoinRefreshRequest, CoinStatus, + CurrencySpecification, DeleteExchangeRequest, DenomKeyType, DenomLossEventType, @@ -40,12 +44,15 @@ import { DenominationPubKey, Duration, EddsaPublicKeyString, + EmptyObject, ExchangeAuditor, ExchangeDetailedResponse, ExchangeGlobalFees, ExchangeListItem, ExchangeSignKeyJson, ExchangeTosStatus, + ExchangeUpdateStatus, + ExchangeWalletKycStatus, ExchangeWireAccount, ExchangesListResponse, FeeDescription, @@ -53,6 +60,8 @@ import { GetExchangeResourcesResponse, GetExchangeTosResult, GlobalFees, + HttpStatusCode, + LegitimizationNeededResponse, LibtoolVersion, Logger, NotificationType, @@ -61,38 +70,54 @@ import { RefreshReason, ScopeInfo, ScopeType, + StartExchangeWalletKycRequest, TalerError, TalerErrorCode, TalerErrorDetail, TalerPreciseTimestamp, TalerProtocolDuration, TalerProtocolTimestamp, + TestingWaitExchangeStateRequest, + TestingWaitWalletKycRequest, + Transaction, + TransactionAction, TransactionIdStr, TransactionMajorState, TransactionState, TransactionType, URL, + WalletKycRequest, WalletNotification, WireFee, WireFeeMap, WireFeesJson, WireInfo, + ZeroLimitedOperation, assertUnreachable, checkDbInvariant, + checkLogicInvariant, + codecForAccountKycStatus, codecForExchangeKeysJson, + codecForLegitimizationNeededResponse, durationMul, encodeCrock, getRandomBytes, hashDenomPub, + hashPaytoUri, j2s, makeErrorDetail, + makeTalerErrorDetail, parsePaytoUri, + stringifyReservePaytoUri, } from "@gnu-taler/taler-util"; import { HttpRequestLibrary, getExpiry, + readResponseJsonOrThrow, readSuccessResponseJsonOrThrow, readSuccessResponseTextOrThrow, + readTalerErrorResponse, + throwUnexpectedRequestError, } from "@gnu-taler/taler-util/http"; import { PendingTaskType, @@ -103,6 +128,7 @@ import { TransactionContext, computeDbBackoff, constructTaskIdentifier, + genericWaitForState, getAutoRefreshExecuteThreshold, getExchangeEntryStatusFromRecord, getExchangeState, @@ -118,6 +144,11 @@ import { ExchangeEntryDbRecordStatus, ExchangeEntryDbUpdateStatus, ExchangeEntryRecord, + ReserveRecord, + ReserveRecordStatus, + WalletDbAllStoresReadOnlyTransaction, + WalletDbAllStoresReadWriteTransaction, + WalletDbHelpers, WalletDbReadOnlyTransaction, WalletDbReadWriteTransaction, WalletStoresV1, @@ -140,6 +171,7 @@ import { createRefreshGroup } from "./refresh.js"; import { constructTransactionIdentifier, notifyTransition, + rematerializeTransactions, } from "./transactions.js"; import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "./versions.js"; import { InternalWalletState, WalletExecutionContext } from "./wallet.js"; @@ -187,7 +219,7 @@ async function downloadExchangeWithTermsOfService( cancellationToken: wex.cancellationToken, }); const tosText = await readSuccessResponseTextOrThrow(resp); - const tosEtag = resp.headers.get("etag") || "unknown"; + const tosEtag = resp.headers.get("taler-terms-version") || "unknown"; const tosContentLanguage = resp.headers.get("content-language") || undefined; const tosContentType = resp.headers.get("content-type") || "text/plain"; const availLangStr = resp.headers.get("avail-languages") || ""; @@ -237,6 +269,83 @@ async function getExchangeRecordsInternal( return details; } +export async function getScopeForAllCoins( + tx: WalletDbReadOnlyTransaction< + [ + "exchanges", + "exchangeDetails", + "globalCurrencyExchanges", + "globalCurrencyAuditors", + ] + >, + exs: string[], +): Promise<ScopeInfo[]> { + const queries = exs.map((exchange) => { + return getExchangeScopeInfoOrUndefined(tx, exchange); + }); + const rs = await Promise.all(queries); + return rs.filter((d): d is ScopeInfo => d !== undefined); +} + +export async function getScopeForAllExchanges( + tx: WalletDbReadOnlyTransaction< + [ + "exchanges", + "exchangeDetails", + "globalCurrencyExchanges", + "globalCurrencyAuditors", + ] + >, + exs: string[], +): Promise<ScopeInfo[]> { + const queries = exs.map((exchange) => { + return getExchangeScopeInfoOrUndefined(tx, exchange); + }); + const rs = await Promise.all(queries); + return rs.filter((d): d is ScopeInfo => d !== undefined); +} + +export async function getCoinScopeInfoOrUndefined( + tx: WalletDbReadOnlyTransaction< + [ + "coins", + "exchanges", + "exchangeDetails", + "globalCurrencyExchanges", + "globalCurrencyAuditors", + ] + >, + coinPub: string, +): Promise<ScopeInfo | undefined> { + const coin = await tx.coins.get(coinPub); + if (!coin) { + return undefined; + } + const det = await getExchangeRecordsInternal(tx, coin.exchangeBaseUrl); + if (!det) { + return undefined; + } + return internalGetExchangeScopeInfo(tx, det); +} + +export async function getExchangeScopeInfoOrUndefined( + tx: WalletDbReadOnlyTransaction< + [ + "exchanges", + "exchangeDetails", + "globalCurrencyExchanges", + "globalCurrencyAuditors", + ] + >, + exchangeBaseUrl: string, +): Promise<ScopeInfo | undefined> { + const det = await getExchangeRecordsInternal(tx, exchangeBaseUrl); + if (!det) { + return undefined; + } + return internalGetExchangeScopeInfo(tx, det); +} + export async function getExchangeScopeInfo( tx: WalletDbReadOnlyTransaction< [ @@ -301,12 +410,30 @@ async function internalGetExchangeScopeInfo( }; } +function getKycStatusFromReserveStatus( + status: ReserveRecordStatus, +): ExchangeWalletKycStatus { + switch (status) { + case ReserveRecordStatus.Done: + return ExchangeWalletKycStatus.Done; + // FIXME: Do we handle the suspended state? + case ReserveRecordStatus.SuspendedLegiInit: + case ReserveRecordStatus.PendingLegiInit: + return ExchangeWalletKycStatus.LegiInit; + // FIXME: Do we handle the suspended state? + case ReserveRecordStatus.SuspendedLegi: + case ReserveRecordStatus.PendingLegi: + return ExchangeWalletKycStatus.Legi; + } +} + async function makeExchangeListItem( tx: WalletDbReadOnlyTransaction< ["globalCurrencyExchanges", "globalCurrencyAuditors"] >, r: ExchangeEntryRecord, exchangeDetails: ExchangeDetailsRecord | undefined, + reserveRec: ReserveRecord | undefined, lastError: TalerErrorDetail | undefined, ): Promise<ExchangeListItem> { const lastUpdateErrorInfo: OperationErrorInfo | undefined = lastError @@ -321,7 +448,12 @@ async function makeExchangeListItem( scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails); } - return { + let walletKycStatus: ExchangeWalletKycStatus | undefined = + reserveRec && reserveRec.status + ? getKycStatusFromReserveStatus(reserveRec.status) + : undefined; + + const listItem: ExchangeListItem = { exchangeBaseUrl: r.baseUrl, masterPub: exchangeDetails?.masterPublicKey, noFees: r.noFees ?? false, @@ -329,6 +461,13 @@ async function makeExchangeListItem( currency: exchangeDetails?.currency ?? r.presetCurrencyHint ?? "UNKNOWN", exchangeUpdateStatus: getExchangeUpdateStatusFromRecord(r), exchangeEntryStatus: getExchangeEntryStatusFromRecord(r), + walletKycStatus, + walletKycReservePub: reserveRec?.reservePub, + // FIXME: #9109 this should not be constructed here, it should be an opaque URL from exchange response + walletKycUrl: reserveRec?.kycAccessToken + ? new URL(`kyc-spa/${reserveRec.kycAccessToken}`, r.baseUrl).href + : undefined, + walletKycAccessToken: reserveRec?.kycAccessToken, tosStatus: getExchangeTosStatusFromRecord(r), ageRestrictionOptions: exchangeDetails?.ageMask ? AgeRestriction.getAgeGroupsFromMask(exchangeDetails.ageMask) @@ -342,6 +481,14 @@ async function makeExchangeListItem( url: r.baseUrl, }, }; + switch (listItem.exchangeUpdateStatus) { + case ExchangeUpdateStatus.UnavailableUpdate: + if (r.unavailableReason) { + listItem.unavailableReason = r.unavailableReason; + } + break; + } + return listItem; } export interface ExchangeWireDetails { @@ -351,6 +498,7 @@ export interface ExchangeWireDetails { exchangeBaseUrl: string; auditors: ExchangeAuditor[]; globalFees: ExchangeGlobalFees[]; + reserveClosingDelay: TalerProtocolDuration; } export async function getExchangeWireDetailsInTx( @@ -368,6 +516,7 @@ export async function getExchangeWireDetailsInTx( exchangeBaseUrl: det.exchangeBaseUrl, auditors: det.auditors, globalFees: det.globalFees, + reserveClosingDelay: det.reserveClosingDelay, }; } @@ -379,6 +528,7 @@ export async function lookupExchangeByUri( { storeNames: [ "exchanges", + "reserves", "exchangeDetails", "operationRetries", "globalCurrencyAuditors", @@ -397,10 +547,18 @@ export async function lookupExchangeByUri( const opRetryRecord = await tx.operationRetries.get( TaskIdentifiers.forExchangeUpdate(exchangeRec), ); + let reserveRec: ReserveRecord | undefined = undefined; + if (exchangeRec.currentMergeReserveRowId != null) { + reserveRec = await tx.reserves.get( + exchangeRec.currentMergeReserveRowId, + ); + checkDbInvariant(!!reserveRec, "reserve record not found"); + } return await makeExchangeListItem( tx, exchangeRec, exchangeDetails, + reserveRec, opRetryRecord?.lastError, ); }, @@ -696,6 +854,10 @@ export interface ExchangeKeysDownloadResult { globalFees: GlobalFees[]; accounts: ExchangeWireAccount[]; wireFees: { [methodName: string]: WireFeesJson[] }; + currencySpecification?: CurrencySpecification; + walletBalanceLimits: AmountString[] | undefined; + hardLimits: AccountLimit[] | undefined; + zeroLimits: ZeroLimitedOperation[] | undefined; } /** @@ -858,6 +1020,46 @@ async function downloadExchangeKeysInfo( globalFees: exchangeKeysJsonUnchecked.global_fees, accounts: exchangeKeysJsonUnchecked.accounts, wireFees: exchangeKeysJsonUnchecked.wire_fees, + currencySpecification: exchangeKeysJsonUnchecked.currency_specification, + walletBalanceLimits: + exchangeKeysJsonUnchecked.wallet_balance_limit_without_kyc, + hardLimits: exchangeKeysJsonUnchecked.hard_limits, + zeroLimits: exchangeKeysJsonUnchecked.zero_limits, + }; +} + +type TosMetaResult = { type: "not-found" } | { type: "ok"; etag: string }; + +/** + * Download metadata about an exchange's terms of service. + */ +async function downloadTosMeta( + wex: WalletExecutionContext, + exchangeBaseUrl: string, +): Promise<TosMetaResult> { + logger.trace(`downloading exchange tos metadata for ${exchangeBaseUrl}`); + const reqUrl = new URL("terms", exchangeBaseUrl); + + // FIXME: We can/should make a HEAD request here. + // Not sure if qtart supports it at the moment. + const resp = await wex.http.fetch(reqUrl.href, { + cancellationToken: wex.cancellationToken, + }); + + switch (resp.status) { + case HttpStatusCode.NotFound: + case HttpStatusCode.NotImplemented: + return { type: "not-found" }; + case HttpStatusCode.Ok: + break; + default: + throwUnexpectedRequestError(resp, await readTalerErrorResponse(resp)); + } + + const etag = resp.headers.get("taler-terms-version") || "unknown"; + return { + type: "ok", + etag, }; } @@ -900,6 +1102,36 @@ async function downloadTosFromAcceptedFormat( } /** + * Check if an exchange entry should be considered + * to be outdated. + */ +async function checkExchangeEntryOutdated( + wex: WalletExecutionContext, + tx: WalletDbReadOnlyTransaction<["exchanges", "denominations"]>, + exchangeBaseUrl: string, +): Promise<boolean> { + // We currently consider the exchange outdated when no + // denominations can be used for withdrawal. + + logger.trace(`checking if exchange entry for ${exchangeBaseUrl} is outdated`); + let numOkay = 0; + let denoms = + await tx.denominations.indexes.byExchangeBaseUrl.getAll(exchangeBaseUrl); + logger.trace(`exchange entry has ${denoms.length} denominations`); + for (const denom of denoms) { + const denomOkay = isWithdrawableDenom( + denom, + wex.ws.config.testing.denomselAllowLate, + ); + if (denomOkay) { + numOkay++; + } + } + logger.trace(`Of these, ${numOkay} are usable`); + return numOkay === 0; +} + +/** * Transition an exchange into an updating state. * * If the update is forced, the exchange is put into an updating state @@ -935,12 +1167,16 @@ async function startUpdateExchangeEntry( const { oldExchangeState, newExchangeState, taskId } = await wex.db.runReadWriteTx( - { storeNames: ["exchanges", "operationRetries"] }, + { storeNames: ["exchanges", "operationRetries", "denominations"] }, async (tx) => { const r = await tx.exchanges.get(exchangeBaseUrl); if (!r) { throw Error("exchange not found"); } + + // FIXME: Do not transition at all if the exchange info is recent enough + // and the request is not forced. + const oldExchangeState = getExchangeState(r); switch (r.updateStatus) { case ExchangeEntryDbUpdateStatus.UnavailableUpdate: @@ -949,7 +1185,21 @@ async function startUpdateExchangeEntry( case ExchangeEntryDbUpdateStatus.Suspended: r.cachebreakNextUpdate = options.forceUpdate; break; - case ExchangeEntryDbUpdateStatus.ReadyUpdate: + case ExchangeEntryDbUpdateStatus.ReadyUpdate: { + const outdated = await checkExchangeEntryOutdated( + wex, + tx, + exchangeBaseUrl, + ); + if (outdated) { + r.updateStatus = ExchangeEntryDbUpdateStatus.OutdatedUpdate; + } else { + r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate; + } + r.cachebreakNextUpdate = options.forceUpdate; + break; + } + case ExchangeEntryDbUpdateStatus.OutdatedUpdate: r.cachebreakNextUpdate = options.forceUpdate; break; case ExchangeEntryDbUpdateStatus.Ready: { @@ -961,7 +1211,16 @@ async function startUpdateExchangeEntry( options.forceUpdate || AbsoluteTime.isExpired(nextUpdateTimestamp) ) { - r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate; + const outdated = await checkExchangeEntryOutdated( + wex, + tx, + exchangeBaseUrl, + ); + if (outdated) { + r.updateStatus = ExchangeEntryDbUpdateStatus.OutdatedUpdate; + } else { + r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate; + } r.cachebreakNextUpdate = options.forceUpdate; } break; @@ -1006,132 +1265,8 @@ export interface ReadyExchangeSummary { protocolVersionRange: string; tosAcceptedTimestamp: TalerPreciseTimestamp | undefined; scopeInfo: ScopeInfo; -} - -async function internalWaitReadyExchange( - wex: WalletExecutionContext, - canonUrl: string, - exchangeNotifFlag: AsyncFlag, - options: { - cancellationToken?: CancellationToken; - forceUpdate?: boolean; - expectedMasterPub?: string; - } = {}, -): Promise<ReadyExchangeSummary> { - const operationId = constructTaskIdentifier({ - tag: PendingTaskType.ExchangeUpdate, - exchangeBaseUrl: canonUrl, - }); - while (true) { - if (wex.cancellationToken.isCancelled) { - throw Error("cancelled"); - } - logger.info(`waiting for ready exchange ${canonUrl}`); - const { exchange, exchangeDetails, retryInfo, scopeInfo } = - await wex.db.runReadOnlyTx( - { - storeNames: [ - "exchanges", - "exchangeDetails", - "operationRetries", - "globalCurrencyAuditors", - "globalCurrencyExchanges", - ], - }, - async (tx) => { - const exchange = await tx.exchanges.get(canonUrl); - const exchangeDetails = await getExchangeRecordsInternal( - tx, - canonUrl, - ); - const retryInfo = await tx.operationRetries.get(operationId); - let scopeInfo: ScopeInfo | undefined = undefined; - if (exchange && exchangeDetails) { - scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails); - } - return { exchange, exchangeDetails, retryInfo, scopeInfo }; - }, - ); - - if (!exchange) { - throw Error("exchange entry does not exist anymore"); - } - - let ready = false; - - switch (exchange.updateStatus) { - case ExchangeEntryDbUpdateStatus.Ready: - ready = true; - break; - case ExchangeEntryDbUpdateStatus.ReadyUpdate: - // If the update is forced, - // we wait until we're in a full "ready" state, - // as we're not happy with the stale information. - if (!options.forceUpdate) { - ready = true; - } - break; - case ExchangeEntryDbUpdateStatus.UnavailableUpdate: - throw TalerError.fromDetail( - TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, - { - exchangeBaseUrl: canonUrl, - innerError: retryInfo?.lastError, - }, - ); - default: { - if (retryInfo) { - throw TalerError.fromDetail( - TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, - { - exchangeBaseUrl: canonUrl, - innerError: retryInfo?.lastError, - }, - ); - } - } - } - - if (!ready) { - logger.info("waiting for exchange update notification"); - await exchangeNotifFlag.wait(); - logger.info("done waiting for exchange update notification"); - exchangeNotifFlag.reset(); - continue; - } - - if (!exchangeDetails) { - throw Error("invariant failed"); - } - - if (!scopeInfo) { - throw Error("invariant failed"); - } - - const res: ReadyExchangeSummary = { - currency: exchangeDetails.currency, - exchangeBaseUrl: canonUrl, - masterPub: exchangeDetails.masterPublicKey, - tosStatus: getExchangeTosStatusFromRecord(exchange), - tosAcceptedEtag: exchange.tosAcceptedEtag, - wireInfo: exchangeDetails.wireInfo, - protocolVersionRange: exchangeDetails.protocolVersionRange, - tosCurrentEtag: exchange.tosCurrentEtag, - tosAcceptedTimestamp: timestampOptionalPreciseFromDb( - exchange.tosAcceptedTimestamp, - ), - scopeInfo, - }; - - if (options.expectedMasterPub) { - if (res.masterPub !== options.expectedMasterPub) { - throw Error( - "public key of the exchange does not match expected public key", - ); - } - } - return res; - } + zeroLimits: ZeroLimitedOperation[]; + hardLimits: AccountLimit[]; } /** @@ -1186,39 +1321,134 @@ async function waitReadyExchange( } = {}, ): Promise<ReadyExchangeSummary> { logger.trace(`waiting for exchange ${canonUrl} to become ready`); - // FIXME: We should use Symbol.dispose magic here for cleanup! - const exchangeNotifFlag = new AsyncFlag(); - // Raise exchangeNotifFlag whenever we get a notification - // about our exchange. - const cancelNotif = wex.ws.addNotificationListener((notif) => { - if ( - notif.type === NotificationType.ExchangeStateTransition && - notif.exchangeBaseUrl === canonUrl - ) { - logger.info(`raising update notification: ${j2s(notif)}`); - exchangeNotifFlag.raise(); - } + const operationId = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeUpdate, + exchangeBaseUrl: canonUrl, }); - const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => { - cancelNotif(); - exchangeNotifFlag.raise(); + let res: ReadyExchangeSummary | undefined = undefined; + + await genericWaitForState(wex, { + filterNotification(notif): boolean { + return ( + notif.type === NotificationType.ExchangeStateTransition && + notif.exchangeBaseUrl === canonUrl + ); + }, + async checkState(): Promise<boolean> { + const { exchange, exchangeDetails, retryInfo, scopeInfo } = + await wex.db.runReadOnlyTx( + { + storeNames: [ + "exchanges", + "exchangeDetails", + "operationRetries", + "globalCurrencyAuditors", + "globalCurrencyExchanges", + ], + }, + async (tx) => { + const exchange = await tx.exchanges.get(canonUrl); + const exchangeDetails = await getExchangeRecordsInternal( + tx, + canonUrl, + ); + const retryInfo = await tx.operationRetries.get(operationId); + let scopeInfo: ScopeInfo | undefined = undefined; + if (exchange && exchangeDetails) { + scopeInfo = await internalGetExchangeScopeInfo( + tx, + exchangeDetails, + ); + } + return { exchange, exchangeDetails, retryInfo, scopeInfo }; + }, + ); + + if (!exchange) { + throw Error("exchange entry does not exist anymore"); + } + + let ready = false; + + switch (exchange.updateStatus) { + case ExchangeEntryDbUpdateStatus.Ready: + ready = true; + break; + case ExchangeEntryDbUpdateStatus.ReadyUpdate: + // If the update is forced, + // we wait until we're in a full "ready" state, + // as we're not happy with the stale information. + if (!options.forceUpdate) { + ready = true; + } + break; + case ExchangeEntryDbUpdateStatus.UnavailableUpdate: + throw TalerError.fromDetail( + TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, + { + exchangeBaseUrl: canonUrl, + innerError: retryInfo?.lastError, + }, + ); + case ExchangeEntryDbUpdateStatus.OutdatedUpdate: + default: { + if (retryInfo) { + throw TalerError.fromDetail( + TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE, + { + exchangeBaseUrl: canonUrl, + innerError: retryInfo?.lastError, + }, + ); + } + } + } + + if (!ready) { + return false; + } + + if (!exchangeDetails) { + throw Error("invariant failed"); + } + + if (!scopeInfo) { + throw Error("invariant failed"); + } + + const mySummary: ReadyExchangeSummary = { + currency: exchangeDetails.currency, + exchangeBaseUrl: canonUrl, + masterPub: exchangeDetails.masterPublicKey, + tosStatus: getExchangeTosStatusFromRecord(exchange), + tosAcceptedEtag: exchange.tosAcceptedEtag, + wireInfo: exchangeDetails.wireInfo, + protocolVersionRange: exchangeDetails.protocolVersionRange, + tosCurrentEtag: exchange.tosCurrentEtag, + tosAcceptedTimestamp: timestampOptionalPreciseFromDb( + exchange.tosAcceptedTimestamp, + ), + scopeInfo, + hardLimits: exchangeDetails.hardLimits ?? [], + zeroLimits: exchangeDetails.zeroLimits ?? [], + }; + + if (options.expectedMasterPub) { + if (mySummary.masterPub !== options.expectedMasterPub) { + throw Error( + "public key of the exchange does not match expected public key", + ); + } + } + res = mySummary; + return true; + }, }); - try { - const res = await internalWaitReadyExchange( - wex, - canonUrl, - exchangeNotifFlag, - options, - ); - logger.info("done waiting for ready exchange"); - return res; - } finally { - unregisterOnCancelled(); - cancelNotif(); - } + checkLogicInvariant(!!res); + return res; } function checkPeerPaymentsDisabled( @@ -1286,6 +1516,10 @@ export async function updateExchangeFromUrlHandler( wex: WalletExecutionContext, exchangeBaseUrl: string, ): Promise<TaskRunResult> { + if (!wex.ws.networkAvailable) { + return TaskRunResult.networkRequired(); + } + logger.trace(`updating exchange info for ${exchangeBaseUrl}`); const oldExchangeRec = await wex.db.runReadOnlyTx( @@ -1309,6 +1543,7 @@ export async function updateExchangeFromUrlHandler( case ExchangeEntryDbUpdateStatus.Initial: logger.info(`not updating exchange in status "initial"`); return TaskRunResult.finished(); + case ExchangeEntryDbUpdateStatus.OutdatedUpdate: case ExchangeEntryDbUpdateStatus.InitialUpdate: case ExchangeEntryDbUpdateStatus.ReadyUpdate: updateRequestedExplicitly = true; @@ -1367,7 +1602,6 @@ export async function updateExchangeFromUrlHandler( AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp), ); } - } // When doing the auto-refresh check, we always update @@ -1423,15 +1657,7 @@ export async function updateExchangeFromUrlHandler( logger.trace("finished validating exchange /wire info"); - // We download the text/plain version here, - // because that one needs to exist, and we - // will get the current etag from the response. - const tosDownload = await downloadTosFromAcceptedFormat( - wex, - exchangeBaseUrl, - timeout, - ["text/plain"], - ); + const tosMeta = await downloadTosMeta(wex, exchangeBaseUrl); logger.trace("updating exchange info in database"); @@ -1460,6 +1686,8 @@ export async function updateExchangeFromUrlHandler( "recoupGroups", "coinAvailability", "denomLossEvents", + "currencyInfo", + "transactionsMeta", ], }, async (tx) => { @@ -1480,16 +1708,18 @@ export async function updateExchangeFromUrlHandler( detailsPointerChanged = true; } let detailsIncompatible = false; + let conflictHint: string | undefined = undefined; if (existingDetails) { if (existingDetails.masterPublicKey !== keysInfo.masterPublicKey) { detailsIncompatible = true; detailsPointerChanged = true; - } - if (existingDetails.currency !== keysInfo.currency) { + conflictHint = "master public key changed"; + } else if (existingDetails.currency !== keysInfo.currency) { detailsIncompatible = true; detailsPointerChanged = true; + conflictHint = "currency changed"; } - // FIXME: We need to do some consistency checks! + // FIXME: We need to do some more consistency checks! } if (detailsIncompatible) { logger.warn( @@ -1498,6 +1728,12 @@ export async function updateExchangeFromUrlHandler( // We don't support this gracefully right now. // See https://bugs.taler.net/n/8576 r.updateStatus = ExchangeEntryDbUpdateStatus.UnavailableUpdate; + r.unavailableReason = makeTalerErrorDetail( + TalerErrorCode.WALLET_EXCHANGE_ENTRY_UPDATE_CONFLICT, + { + detail: conflictHint, + }, + ); r.updateRetryCounter = (r.updateRetryCounter ?? 0) + 1; r.nextUpdateStamp = computeDbBackoff(r.updateRetryCounter); r.nextRefreshCheckStamp = timestampPreciseToDb( @@ -1510,6 +1746,7 @@ export async function updateExchangeFromUrlHandler( newExchangeState: getExchangeState(r), }; } + delete r.unavailableReason; r.updateRetryCounter = 0; const newDetails: ExchangeDetailsRecord = { auditors: keysInfo.auditors, @@ -1521,10 +1758,18 @@ export async function updateExchangeFromUrlHandler( exchangeBaseUrl: r.baseUrl, wireInfo, ageMask, + walletBalanceLimits: keysInfo.walletBalanceLimits, }; r.noFees = noFees; r.peerPaymentsDisabled = peerPaymentsDisabled; - r.tosCurrentEtag = tosDownload.tosEtag; + switch (tosMeta.type) { + case "not-found": + r.tosCurrentEtag = undefined; + break; + case "ok": + r.tosCurrentEtag = tosMeta.etag; + break; + } if (existingDetails?.rowId) { newDetails.rowId = existingDetails.rowId; } @@ -1549,6 +1794,21 @@ export async function updateExchangeFromUrlHandler( r.updateStatus = ExchangeEntryDbUpdateStatus.Ready; r.cachebreakNextUpdate = false; await tx.exchanges.put(r); + + if (keysInfo.currencySpecification) { + // Since this is the per-exchange currency info, + // we update it when the exchange changes it. + await WalletDbHelpers.upsertCurrencyInfo(tx, { + currencySpec: keysInfo.currencySpecification, + scopeInfo: { + type: ScopeType.Exchange, + currency: newDetails.currency, + url: exchangeBaseUrl, + }, + source: "exchange", + }); + } + const drRowId = await tx.exchangeDetails.put(newDetails); checkDbInvariant( typeof drRowId.key === "number", @@ -1659,87 +1919,8 @@ export async function updateExchangeFromUrlHandler( logger.trace("done updating exchange info in database"); - logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`); - - let minCheckThreshold = AbsoluteTime.addDuration( - AbsoluteTime.now(), - Duration.fromSpec({ days: 1 }), - ); - if (refreshCheckNecessary) { - // Do auto-refresh. - await wex.db.runReadWriteTx( - { - storeNames: [ - "coins", - "denominations", - "coinAvailability", - "refreshGroups", - "refreshSessions", - "exchanges", - ], - }, - async (tx) => { - const exchange = await tx.exchanges.get(exchangeBaseUrl); - if (!exchange || !exchange.detailsPointer) { - return; - } - const coins = await tx.coins.indexes.byBaseUrl - .iter(exchangeBaseUrl) - .toArray(); - const refreshCoins: CoinRefreshRequest[] = []; - for (const coin of coins) { - if (coin.status !== CoinStatus.Fresh) { - continue; - } - const denom = await tx.denominations.get([ - exchangeBaseUrl, - coin.denomPubHash, - ]); - if (!denom) { - logger.warn("denomination not in database"); - continue; - } - const executeThreshold = - getAutoRefreshExecuteThresholdForDenom(denom); - if (AbsoluteTime.isExpired(executeThreshold)) { - refreshCoins.push({ - coinPub: coin.coinPub, - amount: denom.value, - }); - } else { - const checkThreshold = getAutoRefreshCheckThreshold(denom); - minCheckThreshold = AbsoluteTime.min( - minCheckThreshold, - checkThreshold, - ); - } - } - if (refreshCoins.length > 0) { - const res = await createRefreshGroup( - wex, - tx, - exchange.detailsPointer?.currency, - refreshCoins, - RefreshReason.Scheduled, - undefined, - ); - logger.trace( - `created refresh group for auto-refresh (${res.refreshGroupId})`, - ); - } - logger.trace( - `next refresh check at ${AbsoluteTime.toIsoString( - minCheckThreshold, - )}`, - ); - exchange.nextRefreshCheckStamp = timestampPreciseToDb( - AbsoluteTime.toPreciseTimestamp(minCheckThreshold), - ); - wex.ws.exchangeCache.clear(); - await tx.exchanges.put(exchange); - }, - ); + await doAutoRefresh(wex, exchangeBaseUrl); } wex.ws.notify({ @@ -1754,6 +1935,90 @@ export async function updateExchangeFromUrlHandler( return TaskRunResult.progress(); } +async function doAutoRefresh( + wex: WalletExecutionContext, + exchangeBaseUrl: string, +): Promise<void> { + logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`); + + let minCheckThreshold = AbsoluteTime.addDuration( + AbsoluteTime.now(), + Duration.fromSpec({ days: 1 }), + ); + + await wex.db.runReadWriteTx( + { + storeNames: [ + "coinAvailability", + "coinHistory", + "coins", + "denominations", + "exchanges", + "refreshGroups", + "refreshSessions", + "transactionsMeta", + ], + }, + async (tx) => { + const exchange = await tx.exchanges.get(exchangeBaseUrl); + if (!exchange || !exchange.detailsPointer) { + return; + } + const coins = await tx.coins.indexes.byBaseUrl + .iter(exchangeBaseUrl) + .toArray(); + const refreshCoins: CoinRefreshRequest[] = []; + for (const coin of coins) { + if (coin.status !== CoinStatus.Fresh) { + continue; + } + const denom = await tx.denominations.get([ + exchangeBaseUrl, + coin.denomPubHash, + ]); + if (!denom) { + logger.warn("denomination not in database"); + continue; + } + const executeThreshold = getAutoRefreshExecuteThresholdForDenom(denom); + if (AbsoluteTime.isExpired(executeThreshold)) { + refreshCoins.push({ + coinPub: coin.coinPub, + amount: denom.value, + }); + } else { + const checkThreshold = getAutoRefreshCheckThreshold(denom); + minCheckThreshold = AbsoluteTime.min( + minCheckThreshold, + checkThreshold, + ); + } + } + if (refreshCoins.length > 0) { + const res = await createRefreshGroup( + wex, + tx, + exchange.detailsPointer?.currency, + refreshCoins, + RefreshReason.Scheduled, + undefined, + ); + logger.trace( + `created refresh group for auto-refresh (${res.refreshGroupId})`, + ); + } + logger.trace( + `next refresh check at ${AbsoluteTime.toIsoString(minCheckThreshold)}`, + ); + exchange.nextRefreshCheckStamp = timestampPreciseToDb( + AbsoluteTime.toPreciseTimestamp(minCheckThreshold), + ); + wex.ws.exchangeCache.clear(); + await tx.exchanges.put(exchange); + }, + ); +} + interface DenomLossResult { notifications: WalletNotification[]; } @@ -1761,7 +2026,13 @@ interface DenomLossResult { async function handleDenomLoss( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< - ["coinAvailability", "denominations", "denomLossEvents", "coins"] + [ + "coinAvailability", + "denominations", + "denomLossEvents", + "coins", + "transactionsMeta", + ] >, currency: string, exchangeBaseUrl: string, @@ -1852,13 +2123,11 @@ async function handleDenomLoss( status: DenomLossStatus.Done, timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), }); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.DenomLoss, - denomLossEventId, - }); + const ctx = new DenomLossTransactionContext(wex, denomLossEventId); + await ctx.updateTransactionMeta(tx); result.notifications.push({ type: NotificationType.TransactionStateTransition, - transactionId, + transactionId: ctx.transactionId, oldTxState: { major: TransactionMajorState.None, }, @@ -1868,7 +2137,7 @@ async function handleDenomLoss( }); result.notifications.push({ type: NotificationType.BalanceChange, - hintTransactionId: transactionId, + hintTransactionId: ctx.transactionId, }); } @@ -1884,13 +2153,11 @@ async function handleDenomLoss( status: DenomLossStatus.Done, timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()), }); - const transactionId = constructTransactionIdentifier({ - tag: TransactionType.DenomLoss, - denomLossEventId, - }); + const ctx = new DenomLossTransactionContext(wex, denomLossEventId); + await ctx.updateTransactionMeta(tx); result.notifications.push({ type: NotificationType.TransactionStateTransition, - transactionId, + transactionId: ctx.transactionId, oldTxState: { major: TransactionMajorState.None, }, @@ -1900,7 +2167,7 @@ async function handleDenomLoss( }); result.notifications.push({ type: NotificationType.BalanceChange, - hintTransactionId: transactionId, + hintTransactionId: ctx.transactionId, }); } @@ -1955,23 +2222,55 @@ export function computeDenomLossTransactionStatus( } export class DenomLossTransactionContext implements TransactionContext { + transactionId: TransactionIdStr; + + constructor( + private wex: WalletExecutionContext, + public denomLossEventId: string, + ) { + this.transactionId = constructTransactionIdentifier({ + tag: TransactionType.DenomLoss, + denomLossEventId, + }); + } + get taskId(): TaskIdStr | undefined { return undefined; } - transactionId: TransactionIdStr; + + async updateTransactionMeta( + tx: WalletDbReadWriteTransaction<["denomLossEvents", "transactionsMeta"]>, + ): Promise<void> { + const denomLossRec = await tx.denomLossEvents.get(this.denomLossEventId); + if (!denomLossRec) { + await tx.transactionsMeta.delete(this.transactionId); + return; + } + await tx.transactionsMeta.put({ + transactionId: this.transactionId, + status: denomLossRec.status, + timestamp: denomLossRec.timestampCreated, + currency: denomLossRec.currency, + exchanges: [denomLossRec.exchangeBaseUrl], + }); + } abortTransaction(): Promise<void> { throw new Error("Method not implemented."); } + suspendTransaction(): Promise<void> { throw new Error("Method not implemented."); } + resumeTransaction(): Promise<void> { throw new Error("Method not implemented."); } + failTransaction(): Promise<void> { throw new Error("Method not implemented."); } + async deleteTransaction(): Promise<void> { const transitionInfo = await this.wex.db.runReadWriteTx( { storeNames: ["denomLossEvents"] }, @@ -1993,21 +2292,43 @@ export class DenomLossTransactionContext implements TransactionContext { notifyTransition(this.wex, this.transactionId, transitionInfo); } - constructor( - private wex: WalletExecutionContext, - public denomLossEventId: string, - ) { - this.transactionId = constructTransactionIdentifier({ - tag: TransactionType.DenomLoss, - denomLossEventId, - }); + async lookupFullTransaction( + tx: WalletDbAllStoresReadOnlyTransaction, + ): Promise<Transaction | undefined> { + const rec = await tx.denomLossEvents.get(this.denomLossEventId); + if (!rec) { + return undefined; + } + const txState = computeDenomLossTransactionStatus(rec); + return { + type: TransactionType.DenomLoss, + txState, + scopes: await getScopeForAllExchanges(tx, [rec.exchangeBaseUrl]), + txActions: [TransactionAction.Delete], + amountRaw: Amounts.stringify(rec.amount), + amountEffective: Amounts.stringify(rec.amount), + timestamp: timestampPreciseFromDb(rec.timestampCreated), + transactionId: constructTransactionIdentifier({ + tag: TransactionType.DenomLoss, + denomLossEventId: rec.denomLossEventId, + }), + lossEventType: rec.eventType, + exchangeBaseUrl: rec.exchangeBaseUrl, + }; } } async function handleRecoup( wex: WalletExecutionContext, tx: WalletDbReadWriteTransaction< - ["denominations", "coins", "recoupGroups", "refreshGroups"] + [ + "denominations", + "coins", + "recoupGroups", + "refreshGroups", + "transactionsMeta", + "exchanges", + ] >, exchangeBaseUrl: string, recoup: Recoup[], @@ -2119,6 +2440,19 @@ export async function getExchangeTos( ): Promise<GetExchangeTosResult> { const exch = await fetchFreshExchange(wex, exchangeBaseUrl); + switch (exch.tosStatus) { + case ExchangeTosStatus.MissingTos: + return { + tosStatus: ExchangeTosStatus.MissingTos, + acceptedEtag: undefined, + contentLanguage: undefined, + contentType: "text/plain", + content: "NULL", + currentEtag: "NULL", + tosAvailableLanguages: [], + }; + } + const tosDownload = await downloadTosFromAcceptedFormat( wex, exchangeBaseUrl, @@ -2187,6 +2521,7 @@ export async function listExchanges( { storeNames: [ "exchanges", + "reserves", "operationRetries", "exchangeDetails", "globalCurrencyAuditors", @@ -2195,18 +2530,29 @@ export async function listExchanges( }, async (tx) => { const exchangeRecords = await tx.exchanges.iter().toArray(); - for (const r of exchangeRecords) { + for (const exchangeRec of exchangeRecords) { const taskId = constructTaskIdentifier({ tag: PendingTaskType.ExchangeUpdate, - exchangeBaseUrl: r.baseUrl, + exchangeBaseUrl: exchangeRec.baseUrl, }); - const exchangeDetails = await getExchangeRecordsInternal(tx, r.baseUrl); + const exchangeDetails = await getExchangeRecordsInternal( + tx, + exchangeRec.baseUrl, + ); const opRetryRecord = await tx.operationRetries.get(taskId); + let reserveRec: ReserveRecord | undefined = undefined; + if (exchangeRec.currentMergeReserveRowId != null) { + reserveRec = await tx.reserves.get( + exchangeRec.currentMergeReserveRowId, + ); + checkDbInvariant(!!reserveRec, "reserve record not found"); + } exchanges.push( await makeExchangeListItem( tx, - r, + exchangeRec, exchangeDetails, + reserveRec, opRetryRecord?.lastError, ), ); @@ -2444,27 +2790,20 @@ async function internalGetExchangeResources( * but keeps some transactions (payments, p2p, refreshes) around. */ async function purgeExchange( - tx: WalletDbReadWriteTransaction< - [ - "exchanges", - "exchangeDetails", - "transactions", - "coinAvailability", - "coins", - "denominations", - "exchangeSignKeys", - "withdrawalGroups", - "planchets", - ] - >, + wex: WalletExecutionContext, + tx: WalletDbAllStoresReadWriteTransaction, exchangeBaseUrl: string, ): Promise<void> { const detRecs = await tx.exchangeDetails.indexes.byExchangeBaseUrl.getAll(); + // Remove all exchange detail records for that exchange for (const r of detRecs) { if (r.rowId == null) { // Should never happen, as rowId is the primary key. continue; } + if (r.exchangeBaseUrl !== exchangeBaseUrl) { + continue; + } await tx.exchangeDetails.delete(r.rowId); const signkeyRecs = await tx.exchangeSignKeys.indexes.byExchangeDetailsRowId.getAll(r.rowId); @@ -2519,6 +2858,8 @@ async function purgeExchange( } } } + + await rematerializeTransactions(wex, tx); } export async function deleteExchange( @@ -2527,36 +2868,21 @@ export async function deleteExchange( ): Promise<void> { let inUse: boolean = false; const exchangeBaseUrl = req.exchangeBaseUrl; - await wex.db.runReadWriteTx( - { - storeNames: [ - "exchanges", - "exchangeDetails", - "transactions", - "coinAvailability", - "coins", - "denominations", - "exchangeSignKeys", - "withdrawalGroups", - "planchets", - ], - }, - async (tx) => { - const exchangeRec = await tx.exchanges.get(exchangeBaseUrl); - if (!exchangeRec) { - // Nothing to delete! - logger.info("no exchange found to delete"); - return; - } - const res = await internalGetExchangeResources(wex, tx, exchangeBaseUrl); - if (res.hasResources && !req.purge) { - inUse = true; - return; - } - await purgeExchange(tx, exchangeBaseUrl); - wex.ws.exchangeCache.clear(); - }, - ); + await wex.db.runAllStoresReadWriteTx({}, async (tx) => { + const exchangeRec = await tx.exchanges.get(exchangeBaseUrl); + if (!exchangeRec) { + // Nothing to delete! + logger.info("no exchange found to delete"); + return; + } + const res = await internalGetExchangeResources(wex, tx, exchangeBaseUrl); + if (res.hasResources && !req.purge) { + inUse = true; + return; + } + await purgeExchange(wex, tx, exchangeBaseUrl); + wex.ws.exchangeCache.clear(); + }); if (inUse) { throw TalerError.fromUncheckedDetail({ @@ -2586,3 +2912,673 @@ export async function getExchangeResources( } return res; } + +/** + * Find the currently applicable wire fee for an exchange. + */ +export async function getExchangeWireFee( + wex: WalletExecutionContext, + wireType: string, + baseUrl: string, + time: TalerProtocolTimestamp, +): Promise<WireFee> { + const exchangeDetails = await wex.db.runReadOnlyTx( + { storeNames: ["exchangeDetails", "exchanges"] }, + async (tx) => { + const ex = await tx.exchanges.get(baseUrl); + if (!ex || !ex.detailsPointer) return undefined; + return await tx.exchangeDetails.indexes.byPointer.get([ + baseUrl, + ex.detailsPointer.currency, + ex.detailsPointer.masterPublicKey, + ]); + }, + ); + + if (!exchangeDetails) { + throw Error(`exchange missing: ${baseUrl}`); + } + + const fees = exchangeDetails.wireInfo.feesForType[wireType]; + if (!fees || fees.length === 0) { + throw Error( + `exchange ${baseUrl} doesn't have fees for wire type ${wireType}`, + ); + } + const fee = fees.find((x) => { + return AbsoluteTime.isBetween( + AbsoluteTime.fromProtocolTimestamp(time), + AbsoluteTime.fromProtocolTimestamp(x.startStamp), + AbsoluteTime.fromProtocolTimestamp(x.endStamp), + ); + }); + if (!fee) { + throw Error( + `exchange ${exchangeDetails.exchangeBaseUrl} doesn't have fees for wire type ${wireType} at ${time.t_s}`, + ); + } + + return fee; +} + +export type BalanceThresholdCheckResult = + | { + result: "ok"; + } + | { + result: "violation"; + nextThreshold: AmountString; + walletKycStatus: ExchangeWalletKycStatus | undefined; + walletKycAccessToken: string | undefined; + }; + +export async function checkIncomingAmountLegalUnderKycBalanceThreshold( + wex: WalletExecutionContext, + exchangeBaseUrl: string, + amountIncoming: AmountLike, +): Promise<BalanceThresholdCheckResult> { + logger.info(`checking ${exchangeBaseUrl} +${amountIncoming} for KYC`); + return await wex.db.runReadOnlyTx( + { + storeNames: [ + "exchanges", + "exchangeDetails", + "reserves", + "coinAvailability", + ], + }, + async (tx): Promise<BalanceThresholdCheckResult> => { + const exchangeRec = await tx.exchanges.get(exchangeBaseUrl); + if (!exchangeRec) { + throw Error("exchange not found"); + } + const det = await getExchangeRecordsInternal(tx, exchangeBaseUrl); + if (!det) { + throw Error("exchange not found"); + } + const coinAvRecs = + await tx.coinAvailability.indexes.byExchangeBaseUrl.getAll( + exchangeBaseUrl, + ); + let balAmount = Amounts.zeroOfCurrency(det.currency); + for (const av of coinAvRecs) { + const n = av.freshCoinCount + (av.pendingRefreshOutputCount ?? 0); + balAmount = Amounts.add( + balAmount, + Amounts.mult(av.value, n).amount, + ).amount; + } + const balExpected = Amounts.add(balAmount, amountIncoming).amount; + + // Check if we already have KYC for a sufficient threshold. + + const reserveId = exchangeRec.currentMergeReserveRowId; + let reserveRec: ReserveRecord | undefined; + if (reserveId) { + reserveRec = await tx.reserves.get(reserveId); + checkDbInvariant(!!reserveRec, "reserve"); + // FIXME: also consider KYC expiration! + if (reserveRec.thresholdNext) { + if (Amounts.cmp(reserveRec.thresholdNext, balExpected) >= 0) { + return { + result: "ok", + }; + } + } else if (reserveRec.status === ReserveRecordStatus.Done) { + // We don't know what the next threshold is, but we've passed *some* KYC + // check. We don't have enough information, so we allow the balance increase. + return { + result: "ok", + }; + } + } + + // No luck, check the next limit we should request, if any. + + const limits = det.walletBalanceLimits; + if (!limits) { + logger.info("no balance limits defined"); + return { + result: "ok", + }; + } + limits.sort((a, b) => Amounts.cmp(a, b)); + logger.info(`applicable limits: ${j2s(limits)}`); + let limViolated: AmountString | undefined = undefined; + let limNext: AmountString | undefined = undefined; + for (let i = 0; i < limits.length; i++) { + if (Amounts.cmp(limits[i], balExpected) <= 0) { + limViolated = limits[i]; + limNext = limits[i + 1]; + if (limNext == null || Amounts.cmp(limNext, balExpected) > 0) { + break; + } + } + } + if (!limViolated) { + logger.info("balance limit okay"); + return { + result: "ok", + }; + } else { + logger.info( + `balance limit ${limViolated} would be violated, next is ${limNext}`, + ); + return { + result: "violation", + nextThreshold: limNext ?? limViolated, + walletKycStatus: reserveRec?.status + ? getKycStatusFromReserveStatus(reserveRec.status) + : undefined, + walletKycAccessToken: reserveRec?.kycAccessToken, + }; + } + }, + ); +} + +/** + * Wait until kyc has passed for the wallet. + * + * If passed==false, already return when legitimization + * is requested. + */ +export async function waitExchangeWalletKyc( + wex: WalletExecutionContext, + exchangeBaseUrl: string, + amount: AmountLike, + passed: boolean, +): Promise<void> { + await genericWaitForState(wex, { + async checkState(): Promise<boolean> { + return await wex.db.runReadOnlyTx( + { + storeNames: ["exchanges", "reserves"], + }, + async (tx) => { + const exchange = await tx.exchanges.get(exchangeBaseUrl); + if (!exchange) { + throw new Error("exchange not found"); + } + const reserveId = exchange.currentMergeReserveRowId; + if (reserveId == null) { + logger.warn("KYC does not exist yet"); + return false; + } + const reserve = await tx.reserves.get(reserveId); + if (!reserve) { + throw Error("reserve not found"); + } + if (passed) { + if ( + reserve.thresholdGranted && + Amounts.cmp(reserve.thresholdGranted, amount) >= 0 + ) { + return true; + } + return false; + } else { + if ( + reserve.thresholdGranted && + Amounts.cmp(reserve.thresholdGranted, amount) >= 0 + ) { + return true; + } + if (reserve.status === ReserveRecordStatus.PendingLegi) { + return true; + } + return false; + } + }, + ); + }, + filterNotification(notif) { + return ( + notif.type === NotificationType.ExchangeStateTransition && + notif.exchangeBaseUrl === exchangeBaseUrl + ); + }, + }); +} + +export async function handleTestingWaitExchangeState( + wex: WalletExecutionContext, + req: TestingWaitExchangeStateRequest, +): Promise<EmptyObject> { + await genericWaitForState(wex, { + async checkState(): Promise<boolean> { + const exchangeEntry = await lookupExchangeByUri(wex, { + exchangeBaseUrl: req.exchangeBaseUrl, + }); + if (req.walletKycStatus) { + if (req.walletKycStatus !== exchangeEntry.walletKycStatus) { + return false; + } + } + return true; + }, + filterNotification(notif) { + return ( + notif.type === NotificationType.ExchangeStateTransition && + notif.exchangeBaseUrl === req.exchangeBaseUrl + ); + }, + }); + return {}; +} + +export async function handleTestingWaitExchangeWalletKyc( + wex: WalletExecutionContext, + req: TestingWaitWalletKycRequest, +): Promise<EmptyObject> { + await waitExchangeWalletKyc(wex, req.exchangeBaseUrl, req.amount, req.passed); + return {}; +} + +export async function handleStartExchangeWalletKyc( + wex: WalletExecutionContext, + req: StartExchangeWalletKycRequest, +): Promise<EmptyObject> { + const newReservePair = await wex.cryptoApi.createEddsaKeypair({}); + const dbRes = await wex.db.runReadWriteTx( + { + storeNames: ["exchanges", "reserves"], + }, + async (tx) => { + const exchange = await tx.exchanges.get(req.exchangeBaseUrl); + if (!exchange) { + throw Error("exchange not found"); + } + const oldExchangeState = getExchangeState(exchange); + let mergeReserveRowId = exchange.currentMergeReserveRowId; + if (mergeReserveRowId == null) { + const putRes = await tx.reserves.put({ + reservePriv: newReservePair.priv, + reservePub: newReservePair.pub, + }); + checkDbInvariant(typeof putRes.key === "number", "primary key type"); + mergeReserveRowId = putRes.key; + exchange.currentMergeReserveRowId = mergeReserveRowId; + await tx.exchanges.put(exchange); + } + const reserveRec = await tx.reserves.get(mergeReserveRowId); + checkDbInvariant(reserveRec != null, "reserve record exists"); + if ( + reserveRec.thresholdGranted == null || + Amounts.cmp(reserveRec.thresholdGranted, req.amount) < 0 + ) { + if ( + reserveRec.thresholdRequested == null || + Amounts.cmp(reserveRec.thresholdRequested, req.amount) < 0 + ) { + reserveRec.thresholdRequested = req.amount; + reserveRec.status = ReserveRecordStatus.PendingLegiInit; + await tx.reserves.put(reserveRec); + return { + notification: { + type: NotificationType.ExchangeStateTransition, + exchangeBaseUrl: exchange.baseUrl, + oldExchangeState, + newExchangeState: getExchangeState(exchange), + } satisfies WalletNotification, + }; + } else { + logger.info( + `another KYC process is already active for ${req.exchangeBaseUrl} over ${reserveRec.thresholdRequested}`, + ); + return undefined; + } + } else { + // FIXME: Check expiration once exchange tells us! + logger.info( + `KYC already granted for ${req.exchangeBaseUrl} over ${req.amount}, granted ${reserveRec.thresholdGranted}`, + ); + return undefined; + } + }, + ); + if (dbRes && dbRes.notification) { + wex.ws.notify(dbRes.notification); + } + const taskId = constructTaskIdentifier({ + tag: PendingTaskType.ExchangeWalletKyc, + exchangeBaseUrl: req.exchangeBaseUrl, + }); + wex.taskScheduler.startShepherdTask(taskId); + return {}; +} + +async function handleExchangeKycPendingWallet( + wex: WalletExecutionContext, + exchange: ExchangeEntryRecord, + reserve: ReserveRecord, +): Promise<TaskRunResult> { + checkDbInvariant(!!reserve.thresholdRequested, "threshold"); + const threshold = reserve.thresholdRequested; + const sigResp = await wex.cryptoApi.signWalletAccountSetup({ + reservePriv: reserve.reservePriv, + reservePub: reserve.reservePub, + threshold, + }); + const requestUrl = new URL("kyc-wallet", exchange.baseUrl); + const body: WalletKycRequest = { + balance: reserve.thresholdRequested, + reserve_pub: reserve.reservePub, + reserve_sig: sigResp.sig, + }; + logger.info(`kyc-wallet request body: ${j2s(body)}`); + const res = await wex.http.fetch(requestUrl.href, { + method: "POST", + body, + }); + + logger.info(`kyc-wallet response status is ${res.status}`); + + switch (res.status) { + case HttpStatusCode.Ok: { + // KYC somehow already passed + // FIXME: Store next threshold and timestamp! + const accountKycStatus = await readSuccessResponseJsonOrThrow( + res, + codecForAccountKycStatus(), + ); + return handleExchangeKycSuccess(wex, exchange.baseUrl, accountKycStatus); + } + case HttpStatusCode.NoContent: { + // KYC disabled at exchange. + return handleExchangeKycSuccess(wex, exchange.baseUrl, undefined); + } + case HttpStatusCode.Forbidden: { + // Did not work! + const err = await readTalerErrorResponse(res); + throwUnexpectedRequestError(res, err); + } + case HttpStatusCode.UnavailableForLegalReasons: { + const kycBody = await readResponseJsonOrThrow( + res, + codecForLegitimizationNeededResponse(), + ); + return handleExchangeKycRespLegi(wex, exchange.baseUrl, reserve, kycBody); + } + default: { + const err = await readTalerErrorResponse(res); + throwUnexpectedRequestError(res, err); + } + } +} + +async function handleExchangeKycSuccess( + wex: WalletExecutionContext, + exchangeBaseUrl: string, + accountKycStatus: AccountKycStatus | undefined, +): Promise<TaskRunResult> { + logger.info(`kyc check for ${exchangeBaseUrl} satisfied`); + const dbRes = await wex.db.runReadWriteTx( + { storeNames: ["exchanges", "reserves"] }, + async (tx) => { + const exchange = await tx.exchanges.get(exchangeBaseUrl); + if (!exchange) { + throw Error("exchange not found"); + } + const oldExchangeState = getExchangeState(exchange); + const reserveId = exchange.currentMergeReserveRowId; + if (reserveId == null) { + throw Error("expected exchange to have reserve ID"); + } + const reserve = await tx.reserves.get(reserveId); + checkDbInvariant(!!reserve, "merge reserve should exist"); + switch (reserve.status) { + case ReserveRecordStatus.PendingLegiInit: + case ReserveRecordStatus.PendingLegi: + break; + default: + throw Error("unexpected state (concurrent modification?)"); + } + reserve.status = ReserveRecordStatus.Done; + reserve.thresholdGranted = reserve.thresholdRequested; + delete reserve.thresholdRequested; + delete reserve.requirementRow; + + // Try to figure out the next balance limit + let nextLimit: AmountString | undefined = undefined; + if (accountKycStatus?.limits) { + for (const lim of accountKycStatus.limits) { + if (lim.operation_type.toLowerCase() === "balance") { + nextLimit = lim.threshold; + } + } + } + reserve.thresholdNext = nextLimit; + + await tx.reserves.put(reserve); + logger.info(`newly granted threshold: ${reserve.thresholdGranted}`); + return { + notification: { + type: NotificationType.ExchangeStateTransition, + exchangeBaseUrl: exchange.baseUrl, + oldExchangeState, + newExchangeState: getExchangeState(exchange), + } satisfies WalletNotification, + }; + }, + ); + if (dbRes && dbRes.notification) { + wex.ws.notify(dbRes.notification); + } + return TaskRunResult.progress(); +} + +/** + * The exchange has just told us that we need some legitimization + * from the user. Request more details and store the result in the database. + */ +async function handleExchangeKycRespLegi( + wex: WalletExecutionContext, + exchangeBaseUrl: string, + reserve: ReserveRecord, + kycBody: LegitimizationNeededResponse, +): Promise<TaskRunResult> { + const sigResp = await wex.cryptoApi.signWalletKycAuth({ + accountPriv: reserve.reservePriv, + accountPub: reserve.reservePub, + }); + const reqUrl = new URL(`kyc-check/${kycBody.h_payto}`, exchangeBaseUrl); + const resp = await wex.http.fetch(reqUrl.href, { + method: "GET", + headers: { + ["Account-Owner-Signature"]: sigResp.sig, + }, + }); + + logger.info(`kyc-check (long-poll) response status ${resp.status}`); + + switch (resp.status) { + case HttpStatusCode.Ok: { + // FIXME: Store information about next limit! + const accountKycStatus = await readSuccessResponseJsonOrThrow( + resp, + codecForAccountKycStatus(), + ); + return handleExchangeKycSuccess(wex, exchangeBaseUrl, accountKycStatus); + } + case HttpStatusCode.Accepted: { + // Store the result in the DB! + break; + } + case HttpStatusCode.NoContent: { + // KYC not configured, so already satisfied + return handleExchangeKycSuccess(wex, exchangeBaseUrl, undefined); + } + default: { + const err = await readTalerErrorResponse(resp); + throwUnexpectedRequestError(resp, err); + } + } + + const accountKycStatusResp = await readResponseJsonOrThrow( + resp, + codecForAccountKycStatus(), + ); + + const dbRes = await wex.db.runReadWriteTx( + { storeNames: ["exchanges", "reserves"] }, + async (tx) => { + const exchange = await tx.exchanges.get(exchangeBaseUrl); + if (!exchange) { + throw Error("exchange not found"); + } + const oldExchangeState = getExchangeState(exchange); + const reserveId = exchange.currentMergeReserveRowId; + if (reserveId == null) { + throw Error("expected exchange to have reserve ID"); + } + const reserve = await tx.reserves.get(reserveId); + checkDbInvariant(!!reserve, "merge reserve should exist"); + switch (reserve.status) { + case ReserveRecordStatus.PendingLegiInit: + break; + default: + throw Error("unexpected state (concurrent modification?)"); + } + reserve.status = ReserveRecordStatus.PendingLegi; + reserve.requirementRow = kycBody.requirement_row; + reserve.amlReview = accountKycStatusResp.aml_review; + reserve.kycAccessToken = accountKycStatusResp.access_token; + + await tx.reserves.put(reserve); + return { + notification: { + type: NotificationType.ExchangeStateTransition, + exchangeBaseUrl: exchange.baseUrl, + oldExchangeState, + newExchangeState: getExchangeState(exchange), + } satisfies WalletNotification, + }; + }, + ); + if (dbRes && dbRes.notification) { + wex.ws.notify(dbRes.notification); + } + return TaskRunResult.progress(); +} + +/** + * Legitimization was requested from the user by the exchange. + * + * Long-poll for the legitimization to succeed. + */ +async function handleExchangeKycPendingLegitimization( + wex: WalletExecutionContext, + exchange: ExchangeEntryRecord, + reserve: ReserveRecord, +): Promise<TaskRunResult> { + // FIXME: Cache this signature + const sigResp = await wex.cryptoApi.signWalletKycAuth({ + accountPriv: reserve.reservePriv, + accountPub: reserve.reservePub, + }); + + const reservePayto = stringifyReservePaytoUri( + exchange.baseUrl, + reserve.reservePub, + ); + + const paytoHash = encodeCrock(hashPaytoUri(reservePayto)); + + const resp = await wex.ws.runLongpollQueueing( + wex, + exchange.baseUrl, + async (timeoutMs) => { + const reqUrl = new URL(`kyc-check/${paytoHash}`, exchange.baseUrl); + reqUrl.searchParams.set("timeout_ms", `${timeoutMs}`); + logger.info(`long-polling wallet KYC status at ${reqUrl.href}`); + return await wex.http.fetch(reqUrl.href, { + method: "GET", + headers: { + ["Account-Owner-Signature"]: sigResp.sig, + }, + }); + }, + ); + + logger.info(`kyc-check (long-poll) response status ${resp.status}`); + + switch (resp.status) { + case HttpStatusCode.Ok: { + // FIXME: Store information about next limit! + const accountKycStatus = await readSuccessResponseJsonOrThrow( + resp, + codecForAccountKycStatus(), + ); + return handleExchangeKycSuccess(wex, exchange.baseUrl, accountKycStatus); + } + case HttpStatusCode.Accepted: + // FIXME: Do we ever need to update the access token? + return TaskRunResult.longpollReturnedPending(); + case HttpStatusCode.NoContent: { + // KYC not configured, so already satisfied + return handleExchangeKycSuccess(wex, exchange.baseUrl, undefined); + } + default: { + const err = await readTalerErrorResponse(resp); + throwUnexpectedRequestError(resp, err); + } + } +} + +export async function processExchangeKyc( + wex: WalletExecutionContext, + exchangeBaseUrl: string, +): Promise<TaskRunResult> { + const res = await wex.db.runReadOnlyTx( + { storeNames: ["exchanges", "reserves"] }, + async (tx) => { + const exchange = await tx.exchanges.get(exchangeBaseUrl); + if (!exchange) { + return undefined; + } + const reserveId = exchange.currentMergeReserveRowId; + let reserve: ReserveRecord | undefined = undefined; + if (reserveId != null) { + reserve = await tx.reserves.get(reserveId); + } + return { exchange, reserve }; + }, + ); + if (!res) { + logger.warn(`exchange ${exchangeBaseUrl} not found, not processing KYC`); + return TaskRunResult.finished(); + } + if (!res.reserve) { + return TaskRunResult.finished(); + } + switch (res.reserve.status) { + case undefined: + // No KYC requested + return TaskRunResult.finished(); + case ReserveRecordStatus.Done: + return TaskRunResult.finished(); + case ReserveRecordStatus.SuspendedLegiInit: + case ReserveRecordStatus.SuspendedLegi: + return TaskRunResult.finished(); + case ReserveRecordStatus.PendingLegiInit: + return handleExchangeKycPendingWallet(wex, res.exchange, res.reserve); + case ReserveRecordStatus.PendingLegi: + return handleExchangeKycPendingLegitimization( + wex, + res.exchange, + res.reserve, + ); + } +} + +export async function checkExchangeInScope( + wex: WalletExecutionContext, + exchangeBaseUrl: string, + scope: ScopeInfo, +): Promise<boolean> { + if (scope.type === ScopeType.Exchange && scope.url !== exchangeBaseUrl) { + return false; + } + return true; +} |