aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/exchanges.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/taler-wallet-core/src/exchanges.ts')
-rw-r--r--packages/taler-wallet-core/src/exchanges.ts1644
1 files changed, 1320 insertions, 324 deletions
diff --git a/packages/taler-wallet-core/src/exchanges.ts b/packages/taler-wallet-core/src/exchanges.ts
index adb696de0..773ad0d59 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -25,13 +25,17 @@
*/
import {
AbsoluteTime,
+ AccountKycStatus,
+ AccountLimit,
AgeRestriction,
Amount,
+ AmountLike,
+ AmountString,
Amounts,
- AsyncFlag,
CancellationToken,
CoinRefreshRequest,
CoinStatus,
+ CurrencySpecification,
DeleteExchangeRequest,
DenomKeyType,
DenomLossEventType,
@@ -40,12 +44,15 @@ import {
DenominationPubKey,
Duration,
EddsaPublicKeyString,
+ EmptyObject,
ExchangeAuditor,
ExchangeDetailedResponse,
ExchangeGlobalFees,
ExchangeListItem,
ExchangeSignKeyJson,
ExchangeTosStatus,
+ ExchangeUpdateStatus,
+ ExchangeWalletKycStatus,
ExchangeWireAccount,
ExchangesListResponse,
FeeDescription,
@@ -53,6 +60,8 @@ import {
GetExchangeResourcesResponse,
GetExchangeTosResult,
GlobalFees,
+ HttpStatusCode,
+ LegitimizationNeededResponse,
LibtoolVersion,
Logger,
NotificationType,
@@ -61,38 +70,54 @@ import {
RefreshReason,
ScopeInfo,
ScopeType,
+ StartExchangeWalletKycRequest,
TalerError,
TalerErrorCode,
TalerErrorDetail,
TalerPreciseTimestamp,
TalerProtocolDuration,
TalerProtocolTimestamp,
+ TestingWaitExchangeStateRequest,
+ TestingWaitWalletKycRequest,
+ Transaction,
+ TransactionAction,
TransactionIdStr,
TransactionMajorState,
TransactionState,
TransactionType,
URL,
+ WalletKycRequest,
WalletNotification,
WireFee,
WireFeeMap,
WireFeesJson,
WireInfo,
+ ZeroLimitedOperation,
assertUnreachable,
checkDbInvariant,
+ checkLogicInvariant,
+ codecForAccountKycStatus,
codecForExchangeKeysJson,
+ codecForLegitimizationNeededResponse,
durationMul,
encodeCrock,
getRandomBytes,
hashDenomPub,
+ hashPaytoUri,
j2s,
makeErrorDetail,
+ makeTalerErrorDetail,
parsePaytoUri,
+ stringifyReservePaytoUri,
} from "@gnu-taler/taler-util";
import {
HttpRequestLibrary,
getExpiry,
+ readResponseJsonOrThrow,
readSuccessResponseJsonOrThrow,
readSuccessResponseTextOrThrow,
+ readTalerErrorResponse,
+ throwUnexpectedRequestError,
} from "@gnu-taler/taler-util/http";
import {
PendingTaskType,
@@ -103,6 +128,7 @@ import {
TransactionContext,
computeDbBackoff,
constructTaskIdentifier,
+ genericWaitForState,
getAutoRefreshExecuteThreshold,
getExchangeEntryStatusFromRecord,
getExchangeState,
@@ -118,6 +144,11 @@ import {
ExchangeEntryDbRecordStatus,
ExchangeEntryDbUpdateStatus,
ExchangeEntryRecord,
+ ReserveRecord,
+ ReserveRecordStatus,
+ WalletDbAllStoresReadOnlyTransaction,
+ WalletDbAllStoresReadWriteTransaction,
+ WalletDbHelpers,
WalletDbReadOnlyTransaction,
WalletDbReadWriteTransaction,
WalletStoresV1,
@@ -140,6 +171,7 @@ import { createRefreshGroup } from "./refresh.js";
import {
constructTransactionIdentifier,
notifyTransition,
+ rematerializeTransactions,
} from "./transactions.js";
import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "./versions.js";
import { InternalWalletState, WalletExecutionContext } from "./wallet.js";
@@ -187,7 +219,7 @@ async function downloadExchangeWithTermsOfService(
cancellationToken: wex.cancellationToken,
});
const tosText = await readSuccessResponseTextOrThrow(resp);
- const tosEtag = resp.headers.get("etag") || "unknown";
+ const tosEtag = resp.headers.get("taler-terms-version") || "unknown";
const tosContentLanguage = resp.headers.get("content-language") || undefined;
const tosContentType = resp.headers.get("content-type") || "text/plain";
const availLangStr = resp.headers.get("avail-languages") || "";
@@ -237,6 +269,83 @@ async function getExchangeRecordsInternal(
return details;
}
+export async function getScopeForAllCoins(
+ tx: WalletDbReadOnlyTransaction<
+ [
+ "exchanges",
+ "exchangeDetails",
+ "globalCurrencyExchanges",
+ "globalCurrencyAuditors",
+ ]
+ >,
+ exs: string[],
+): Promise<ScopeInfo[]> {
+ const queries = exs.map((exchange) => {
+ return getExchangeScopeInfoOrUndefined(tx, exchange);
+ });
+ const rs = await Promise.all(queries);
+ return rs.filter((d): d is ScopeInfo => d !== undefined);
+}
+
+export async function getScopeForAllExchanges(
+ tx: WalletDbReadOnlyTransaction<
+ [
+ "exchanges",
+ "exchangeDetails",
+ "globalCurrencyExchanges",
+ "globalCurrencyAuditors",
+ ]
+ >,
+ exs: string[],
+): Promise<ScopeInfo[]> {
+ const queries = exs.map((exchange) => {
+ return getExchangeScopeInfoOrUndefined(tx, exchange);
+ });
+ const rs = await Promise.all(queries);
+ return rs.filter((d): d is ScopeInfo => d !== undefined);
+}
+
+export async function getCoinScopeInfoOrUndefined(
+ tx: WalletDbReadOnlyTransaction<
+ [
+ "coins",
+ "exchanges",
+ "exchangeDetails",
+ "globalCurrencyExchanges",
+ "globalCurrencyAuditors",
+ ]
+ >,
+ coinPub: string,
+): Promise<ScopeInfo | undefined> {
+ const coin = await tx.coins.get(coinPub);
+ if (!coin) {
+ return undefined;
+ }
+ const det = await getExchangeRecordsInternal(tx, coin.exchangeBaseUrl);
+ if (!det) {
+ return undefined;
+ }
+ return internalGetExchangeScopeInfo(tx, det);
+}
+
+export async function getExchangeScopeInfoOrUndefined(
+ tx: WalletDbReadOnlyTransaction<
+ [
+ "exchanges",
+ "exchangeDetails",
+ "globalCurrencyExchanges",
+ "globalCurrencyAuditors",
+ ]
+ >,
+ exchangeBaseUrl: string,
+): Promise<ScopeInfo | undefined> {
+ const det = await getExchangeRecordsInternal(tx, exchangeBaseUrl);
+ if (!det) {
+ return undefined;
+ }
+ return internalGetExchangeScopeInfo(tx, det);
+}
+
export async function getExchangeScopeInfo(
tx: WalletDbReadOnlyTransaction<
[
@@ -301,12 +410,30 @@ async function internalGetExchangeScopeInfo(
};
}
+function getKycStatusFromReserveStatus(
+ status: ReserveRecordStatus,
+): ExchangeWalletKycStatus {
+ switch (status) {
+ case ReserveRecordStatus.Done:
+ return ExchangeWalletKycStatus.Done;
+ // FIXME: Do we handle the suspended state?
+ case ReserveRecordStatus.SuspendedLegiInit:
+ case ReserveRecordStatus.PendingLegiInit:
+ return ExchangeWalletKycStatus.LegiInit;
+ // FIXME: Do we handle the suspended state?
+ case ReserveRecordStatus.SuspendedLegi:
+ case ReserveRecordStatus.PendingLegi:
+ return ExchangeWalletKycStatus.Legi;
+ }
+}
+
async function makeExchangeListItem(
tx: WalletDbReadOnlyTransaction<
["globalCurrencyExchanges", "globalCurrencyAuditors"]
>,
r: ExchangeEntryRecord,
exchangeDetails: ExchangeDetailsRecord | undefined,
+ reserveRec: ReserveRecord | undefined,
lastError: TalerErrorDetail | undefined,
): Promise<ExchangeListItem> {
const lastUpdateErrorInfo: OperationErrorInfo | undefined = lastError
@@ -321,7 +448,12 @@ async function makeExchangeListItem(
scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails);
}
- return {
+ let walletKycStatus: ExchangeWalletKycStatus | undefined =
+ reserveRec && reserveRec.status
+ ? getKycStatusFromReserveStatus(reserveRec.status)
+ : undefined;
+
+ const listItem: ExchangeListItem = {
exchangeBaseUrl: r.baseUrl,
masterPub: exchangeDetails?.masterPublicKey,
noFees: r.noFees ?? false,
@@ -329,6 +461,13 @@ async function makeExchangeListItem(
currency: exchangeDetails?.currency ?? r.presetCurrencyHint ?? "UNKNOWN",
exchangeUpdateStatus: getExchangeUpdateStatusFromRecord(r),
exchangeEntryStatus: getExchangeEntryStatusFromRecord(r),
+ walletKycStatus,
+ walletKycReservePub: reserveRec?.reservePub,
+ // FIXME: #9109 this should not be constructed here, it should be an opaque URL from exchange response
+ walletKycUrl: reserveRec?.kycAccessToken
+ ? new URL(`kyc-spa/${reserveRec.kycAccessToken}`, r.baseUrl).href
+ : undefined,
+ walletKycAccessToken: reserveRec?.kycAccessToken,
tosStatus: getExchangeTosStatusFromRecord(r),
ageRestrictionOptions: exchangeDetails?.ageMask
? AgeRestriction.getAgeGroupsFromMask(exchangeDetails.ageMask)
@@ -342,6 +481,14 @@ async function makeExchangeListItem(
url: r.baseUrl,
},
};
+ switch (listItem.exchangeUpdateStatus) {
+ case ExchangeUpdateStatus.UnavailableUpdate:
+ if (r.unavailableReason) {
+ listItem.unavailableReason = r.unavailableReason;
+ }
+ break;
+ }
+ return listItem;
}
export interface ExchangeWireDetails {
@@ -351,6 +498,7 @@ export interface ExchangeWireDetails {
exchangeBaseUrl: string;
auditors: ExchangeAuditor[];
globalFees: ExchangeGlobalFees[];
+ reserveClosingDelay: TalerProtocolDuration;
}
export async function getExchangeWireDetailsInTx(
@@ -368,6 +516,7 @@ export async function getExchangeWireDetailsInTx(
exchangeBaseUrl: det.exchangeBaseUrl,
auditors: det.auditors,
globalFees: det.globalFees,
+ reserveClosingDelay: det.reserveClosingDelay,
};
}
@@ -379,6 +528,7 @@ export async function lookupExchangeByUri(
{
storeNames: [
"exchanges",
+ "reserves",
"exchangeDetails",
"operationRetries",
"globalCurrencyAuditors",
@@ -397,10 +547,18 @@ export async function lookupExchangeByUri(
const opRetryRecord = await tx.operationRetries.get(
TaskIdentifiers.forExchangeUpdate(exchangeRec),
);
+ let reserveRec: ReserveRecord | undefined = undefined;
+ if (exchangeRec.currentMergeReserveRowId != null) {
+ reserveRec = await tx.reserves.get(
+ exchangeRec.currentMergeReserveRowId,
+ );
+ checkDbInvariant(!!reserveRec, "reserve record not found");
+ }
return await makeExchangeListItem(
tx,
exchangeRec,
exchangeDetails,
+ reserveRec,
opRetryRecord?.lastError,
);
},
@@ -696,6 +854,10 @@ export interface ExchangeKeysDownloadResult {
globalFees: GlobalFees[];
accounts: ExchangeWireAccount[];
wireFees: { [methodName: string]: WireFeesJson[] };
+ currencySpecification?: CurrencySpecification;
+ walletBalanceLimits: AmountString[] | undefined;
+ hardLimits: AccountLimit[] | undefined;
+ zeroLimits: ZeroLimitedOperation[] | undefined;
}
/**
@@ -858,6 +1020,46 @@ async function downloadExchangeKeysInfo(
globalFees: exchangeKeysJsonUnchecked.global_fees,
accounts: exchangeKeysJsonUnchecked.accounts,
wireFees: exchangeKeysJsonUnchecked.wire_fees,
+ currencySpecification: exchangeKeysJsonUnchecked.currency_specification,
+ walletBalanceLimits:
+ exchangeKeysJsonUnchecked.wallet_balance_limit_without_kyc,
+ hardLimits: exchangeKeysJsonUnchecked.hard_limits,
+ zeroLimits: exchangeKeysJsonUnchecked.zero_limits,
+ };
+}
+
+type TosMetaResult = { type: "not-found" } | { type: "ok"; etag: string };
+
+/**
+ * Download metadata about an exchange's terms of service.
+ */
+async function downloadTosMeta(
+ wex: WalletExecutionContext,
+ exchangeBaseUrl: string,
+): Promise<TosMetaResult> {
+ logger.trace(`downloading exchange tos metadata for ${exchangeBaseUrl}`);
+ const reqUrl = new URL("terms", exchangeBaseUrl);
+
+ // FIXME: We can/should make a HEAD request here.
+ // Not sure if qtart supports it at the moment.
+ const resp = await wex.http.fetch(reqUrl.href, {
+ cancellationToken: wex.cancellationToken,
+ });
+
+ switch (resp.status) {
+ case HttpStatusCode.NotFound:
+ case HttpStatusCode.NotImplemented:
+ return { type: "not-found" };
+ case HttpStatusCode.Ok:
+ break;
+ default:
+ throwUnexpectedRequestError(resp, await readTalerErrorResponse(resp));
+ }
+
+ const etag = resp.headers.get("taler-terms-version") || "unknown";
+ return {
+ type: "ok",
+ etag,
};
}
@@ -900,6 +1102,36 @@ async function downloadTosFromAcceptedFormat(
}
/**
+ * Check if an exchange entry should be considered
+ * to be outdated.
+ */
+async function checkExchangeEntryOutdated(
+ wex: WalletExecutionContext,
+ tx: WalletDbReadOnlyTransaction<["exchanges", "denominations"]>,
+ exchangeBaseUrl: string,
+): Promise<boolean> {
+ // We currently consider the exchange outdated when no
+ // denominations can be used for withdrawal.
+
+ logger.trace(`checking if exchange entry for ${exchangeBaseUrl} is outdated`);
+ let numOkay = 0;
+ let denoms =
+ await tx.denominations.indexes.byExchangeBaseUrl.getAll(exchangeBaseUrl);
+ logger.trace(`exchange entry has ${denoms.length} denominations`);
+ for (const denom of denoms) {
+ const denomOkay = isWithdrawableDenom(
+ denom,
+ wex.ws.config.testing.denomselAllowLate,
+ );
+ if (denomOkay) {
+ numOkay++;
+ }
+ }
+ logger.trace(`Of these, ${numOkay} are usable`);
+ return numOkay === 0;
+}
+
+/**
* Transition an exchange into an updating state.
*
* If the update is forced, the exchange is put into an updating state
@@ -935,12 +1167,16 @@ async function startUpdateExchangeEntry(
const { oldExchangeState, newExchangeState, taskId } =
await wex.db.runReadWriteTx(
- { storeNames: ["exchanges", "operationRetries"] },
+ { storeNames: ["exchanges", "operationRetries", "denominations"] },
async (tx) => {
const r = await tx.exchanges.get(exchangeBaseUrl);
if (!r) {
throw Error("exchange not found");
}
+
+ // FIXME: Do not transition at all if the exchange info is recent enough
+ // and the request is not forced.
+
const oldExchangeState = getExchangeState(r);
switch (r.updateStatus) {
case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
@@ -949,7 +1185,21 @@ async function startUpdateExchangeEntry(
case ExchangeEntryDbUpdateStatus.Suspended:
r.cachebreakNextUpdate = options.forceUpdate;
break;
- case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ case ExchangeEntryDbUpdateStatus.ReadyUpdate: {
+ const outdated = await checkExchangeEntryOutdated(
+ wex,
+ tx,
+ exchangeBaseUrl,
+ );
+ if (outdated) {
+ r.updateStatus = ExchangeEntryDbUpdateStatus.OutdatedUpdate;
+ } else {
+ r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate;
+ }
+ r.cachebreakNextUpdate = options.forceUpdate;
+ break;
+ }
+ case ExchangeEntryDbUpdateStatus.OutdatedUpdate:
r.cachebreakNextUpdate = options.forceUpdate;
break;
case ExchangeEntryDbUpdateStatus.Ready: {
@@ -961,7 +1211,16 @@ async function startUpdateExchangeEntry(
options.forceUpdate ||
AbsoluteTime.isExpired(nextUpdateTimestamp)
) {
- r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate;
+ const outdated = await checkExchangeEntryOutdated(
+ wex,
+ tx,
+ exchangeBaseUrl,
+ );
+ if (outdated) {
+ r.updateStatus = ExchangeEntryDbUpdateStatus.OutdatedUpdate;
+ } else {
+ r.updateStatus = ExchangeEntryDbUpdateStatus.ReadyUpdate;
+ }
r.cachebreakNextUpdate = options.forceUpdate;
}
break;
@@ -1006,132 +1265,8 @@ export interface ReadyExchangeSummary {
protocolVersionRange: string;
tosAcceptedTimestamp: TalerPreciseTimestamp | undefined;
scopeInfo: ScopeInfo;
-}
-
-async function internalWaitReadyExchange(
- wex: WalletExecutionContext,
- canonUrl: string,
- exchangeNotifFlag: AsyncFlag,
- options: {
- cancellationToken?: CancellationToken;
- forceUpdate?: boolean;
- expectedMasterPub?: string;
- } = {},
-): Promise<ReadyExchangeSummary> {
- const operationId = constructTaskIdentifier({
- tag: PendingTaskType.ExchangeUpdate,
- exchangeBaseUrl: canonUrl,
- });
- while (true) {
- if (wex.cancellationToken.isCancelled) {
- throw Error("cancelled");
- }
- logger.info(`waiting for ready exchange ${canonUrl}`);
- const { exchange, exchangeDetails, retryInfo, scopeInfo } =
- await wex.db.runReadOnlyTx(
- {
- storeNames: [
- "exchanges",
- "exchangeDetails",
- "operationRetries",
- "globalCurrencyAuditors",
- "globalCurrencyExchanges",
- ],
- },
- async (tx) => {
- const exchange = await tx.exchanges.get(canonUrl);
- const exchangeDetails = await getExchangeRecordsInternal(
- tx,
- canonUrl,
- );
- const retryInfo = await tx.operationRetries.get(operationId);
- let scopeInfo: ScopeInfo | undefined = undefined;
- if (exchange && exchangeDetails) {
- scopeInfo = await internalGetExchangeScopeInfo(tx, exchangeDetails);
- }
- return { exchange, exchangeDetails, retryInfo, scopeInfo };
- },
- );
-
- if (!exchange) {
- throw Error("exchange entry does not exist anymore");
- }
-
- let ready = false;
-
- switch (exchange.updateStatus) {
- case ExchangeEntryDbUpdateStatus.Ready:
- ready = true;
- break;
- case ExchangeEntryDbUpdateStatus.ReadyUpdate:
- // If the update is forced,
- // we wait until we're in a full "ready" state,
- // as we're not happy with the stale information.
- if (!options.forceUpdate) {
- ready = true;
- }
- break;
- case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
- throw TalerError.fromDetail(
- TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
- {
- exchangeBaseUrl: canonUrl,
- innerError: retryInfo?.lastError,
- },
- );
- default: {
- if (retryInfo) {
- throw TalerError.fromDetail(
- TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
- {
- exchangeBaseUrl: canonUrl,
- innerError: retryInfo?.lastError,
- },
- );
- }
- }
- }
-
- if (!ready) {
- logger.info("waiting for exchange update notification");
- await exchangeNotifFlag.wait();
- logger.info("done waiting for exchange update notification");
- exchangeNotifFlag.reset();
- continue;
- }
-
- if (!exchangeDetails) {
- throw Error("invariant failed");
- }
-
- if (!scopeInfo) {
- throw Error("invariant failed");
- }
-
- const res: ReadyExchangeSummary = {
- currency: exchangeDetails.currency,
- exchangeBaseUrl: canonUrl,
- masterPub: exchangeDetails.masterPublicKey,
- tosStatus: getExchangeTosStatusFromRecord(exchange),
- tosAcceptedEtag: exchange.tosAcceptedEtag,
- wireInfo: exchangeDetails.wireInfo,
- protocolVersionRange: exchangeDetails.protocolVersionRange,
- tosCurrentEtag: exchange.tosCurrentEtag,
- tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
- exchange.tosAcceptedTimestamp,
- ),
- scopeInfo,
- };
-
- if (options.expectedMasterPub) {
- if (res.masterPub !== options.expectedMasterPub) {
- throw Error(
- "public key of the exchange does not match expected public key",
- );
- }
- }
- return res;
- }
+ zeroLimits: ZeroLimitedOperation[];
+ hardLimits: AccountLimit[];
}
/**
@@ -1186,39 +1321,134 @@ async function waitReadyExchange(
} = {},
): Promise<ReadyExchangeSummary> {
logger.trace(`waiting for exchange ${canonUrl} to become ready`);
- // FIXME: We should use Symbol.dispose magic here for cleanup!
- const exchangeNotifFlag = new AsyncFlag();
- // Raise exchangeNotifFlag whenever we get a notification
- // about our exchange.
- const cancelNotif = wex.ws.addNotificationListener((notif) => {
- if (
- notif.type === NotificationType.ExchangeStateTransition &&
- notif.exchangeBaseUrl === canonUrl
- ) {
- logger.info(`raising update notification: ${j2s(notif)}`);
- exchangeNotifFlag.raise();
- }
+ const operationId = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeUpdate,
+ exchangeBaseUrl: canonUrl,
});
- const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
- cancelNotif();
- exchangeNotifFlag.raise();
+ let res: ReadyExchangeSummary | undefined = undefined;
+
+ await genericWaitForState(wex, {
+ filterNotification(notif): boolean {
+ return (
+ notif.type === NotificationType.ExchangeStateTransition &&
+ notif.exchangeBaseUrl === canonUrl
+ );
+ },
+ async checkState(): Promise<boolean> {
+ const { exchange, exchangeDetails, retryInfo, scopeInfo } =
+ await wex.db.runReadOnlyTx(
+ {
+ storeNames: [
+ "exchanges",
+ "exchangeDetails",
+ "operationRetries",
+ "globalCurrencyAuditors",
+ "globalCurrencyExchanges",
+ ],
+ },
+ async (tx) => {
+ const exchange = await tx.exchanges.get(canonUrl);
+ const exchangeDetails = await getExchangeRecordsInternal(
+ tx,
+ canonUrl,
+ );
+ const retryInfo = await tx.operationRetries.get(operationId);
+ let scopeInfo: ScopeInfo | undefined = undefined;
+ if (exchange && exchangeDetails) {
+ scopeInfo = await internalGetExchangeScopeInfo(
+ tx,
+ exchangeDetails,
+ );
+ }
+ return { exchange, exchangeDetails, retryInfo, scopeInfo };
+ },
+ );
+
+ if (!exchange) {
+ throw Error("exchange entry does not exist anymore");
+ }
+
+ let ready = false;
+
+ switch (exchange.updateStatus) {
+ case ExchangeEntryDbUpdateStatus.Ready:
+ ready = true;
+ break;
+ case ExchangeEntryDbUpdateStatus.ReadyUpdate:
+ // If the update is forced,
+ // we wait until we're in a full "ready" state,
+ // as we're not happy with the stale information.
+ if (!options.forceUpdate) {
+ ready = true;
+ }
+ break;
+ case ExchangeEntryDbUpdateStatus.UnavailableUpdate:
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
+ {
+ exchangeBaseUrl: canonUrl,
+ innerError: retryInfo?.lastError,
+ },
+ );
+ case ExchangeEntryDbUpdateStatus.OutdatedUpdate:
+ default: {
+ if (retryInfo) {
+ throw TalerError.fromDetail(
+ TalerErrorCode.WALLET_EXCHANGE_UNAVAILABLE,
+ {
+ exchangeBaseUrl: canonUrl,
+ innerError: retryInfo?.lastError,
+ },
+ );
+ }
+ }
+ }
+
+ if (!ready) {
+ return false;
+ }
+
+ if (!exchangeDetails) {
+ throw Error("invariant failed");
+ }
+
+ if (!scopeInfo) {
+ throw Error("invariant failed");
+ }
+
+ const mySummary: ReadyExchangeSummary = {
+ currency: exchangeDetails.currency,
+ exchangeBaseUrl: canonUrl,
+ masterPub: exchangeDetails.masterPublicKey,
+ tosStatus: getExchangeTosStatusFromRecord(exchange),
+ tosAcceptedEtag: exchange.tosAcceptedEtag,
+ wireInfo: exchangeDetails.wireInfo,
+ protocolVersionRange: exchangeDetails.protocolVersionRange,
+ tosCurrentEtag: exchange.tosCurrentEtag,
+ tosAcceptedTimestamp: timestampOptionalPreciseFromDb(
+ exchange.tosAcceptedTimestamp,
+ ),
+ scopeInfo,
+ hardLimits: exchangeDetails.hardLimits ?? [],
+ zeroLimits: exchangeDetails.zeroLimits ?? [],
+ };
+
+ if (options.expectedMasterPub) {
+ if (mySummary.masterPub !== options.expectedMasterPub) {
+ throw Error(
+ "public key of the exchange does not match expected public key",
+ );
+ }
+ }
+ res = mySummary;
+ return true;
+ },
});
- try {
- const res = await internalWaitReadyExchange(
- wex,
- canonUrl,
- exchangeNotifFlag,
- options,
- );
- logger.info("done waiting for ready exchange");
- return res;
- } finally {
- unregisterOnCancelled();
- cancelNotif();
- }
+ checkLogicInvariant(!!res);
+ return res;
}
function checkPeerPaymentsDisabled(
@@ -1286,6 +1516,10 @@ export async function updateExchangeFromUrlHandler(
wex: WalletExecutionContext,
exchangeBaseUrl: string,
): Promise<TaskRunResult> {
+ if (!wex.ws.networkAvailable) {
+ return TaskRunResult.networkRequired();
+ }
+
logger.trace(`updating exchange info for ${exchangeBaseUrl}`);
const oldExchangeRec = await wex.db.runReadOnlyTx(
@@ -1309,6 +1543,7 @@ export async function updateExchangeFromUrlHandler(
case ExchangeEntryDbUpdateStatus.Initial:
logger.info(`not updating exchange in status "initial"`);
return TaskRunResult.finished();
+ case ExchangeEntryDbUpdateStatus.OutdatedUpdate:
case ExchangeEntryDbUpdateStatus.InitialUpdate:
case ExchangeEntryDbUpdateStatus.ReadyUpdate:
updateRequestedExplicitly = true;
@@ -1367,7 +1602,6 @@ export async function updateExchangeFromUrlHandler(
AbsoluteTime.min(nextUpdateStamp, nextRefreshCheckStamp),
);
}
-
}
// When doing the auto-refresh check, we always update
@@ -1423,15 +1657,7 @@ export async function updateExchangeFromUrlHandler(
logger.trace("finished validating exchange /wire info");
- // We download the text/plain version here,
- // because that one needs to exist, and we
- // will get the current etag from the response.
- const tosDownload = await downloadTosFromAcceptedFormat(
- wex,
- exchangeBaseUrl,
- timeout,
- ["text/plain"],
- );
+ const tosMeta = await downloadTosMeta(wex, exchangeBaseUrl);
logger.trace("updating exchange info in database");
@@ -1460,6 +1686,8 @@ export async function updateExchangeFromUrlHandler(
"recoupGroups",
"coinAvailability",
"denomLossEvents",
+ "currencyInfo",
+ "transactionsMeta",
],
},
async (tx) => {
@@ -1480,16 +1708,18 @@ export async function updateExchangeFromUrlHandler(
detailsPointerChanged = true;
}
let detailsIncompatible = false;
+ let conflictHint: string | undefined = undefined;
if (existingDetails) {
if (existingDetails.masterPublicKey !== keysInfo.masterPublicKey) {
detailsIncompatible = true;
detailsPointerChanged = true;
- }
- if (existingDetails.currency !== keysInfo.currency) {
+ conflictHint = "master public key changed";
+ } else if (existingDetails.currency !== keysInfo.currency) {
detailsIncompatible = true;
detailsPointerChanged = true;
+ conflictHint = "currency changed";
}
- // FIXME: We need to do some consistency checks!
+ // FIXME: We need to do some more consistency checks!
}
if (detailsIncompatible) {
logger.warn(
@@ -1498,6 +1728,12 @@ export async function updateExchangeFromUrlHandler(
// We don't support this gracefully right now.
// See https://bugs.taler.net/n/8576
r.updateStatus = ExchangeEntryDbUpdateStatus.UnavailableUpdate;
+ r.unavailableReason = makeTalerErrorDetail(
+ TalerErrorCode.WALLET_EXCHANGE_ENTRY_UPDATE_CONFLICT,
+ {
+ detail: conflictHint,
+ },
+ );
r.updateRetryCounter = (r.updateRetryCounter ?? 0) + 1;
r.nextUpdateStamp = computeDbBackoff(r.updateRetryCounter);
r.nextRefreshCheckStamp = timestampPreciseToDb(
@@ -1510,6 +1746,7 @@ export async function updateExchangeFromUrlHandler(
newExchangeState: getExchangeState(r),
};
}
+ delete r.unavailableReason;
r.updateRetryCounter = 0;
const newDetails: ExchangeDetailsRecord = {
auditors: keysInfo.auditors,
@@ -1521,10 +1758,18 @@ export async function updateExchangeFromUrlHandler(
exchangeBaseUrl: r.baseUrl,
wireInfo,
ageMask,
+ walletBalanceLimits: keysInfo.walletBalanceLimits,
};
r.noFees = noFees;
r.peerPaymentsDisabled = peerPaymentsDisabled;
- r.tosCurrentEtag = tosDownload.tosEtag;
+ switch (tosMeta.type) {
+ case "not-found":
+ r.tosCurrentEtag = undefined;
+ break;
+ case "ok":
+ r.tosCurrentEtag = tosMeta.etag;
+ break;
+ }
if (existingDetails?.rowId) {
newDetails.rowId = existingDetails.rowId;
}
@@ -1549,6 +1794,21 @@ export async function updateExchangeFromUrlHandler(
r.updateStatus = ExchangeEntryDbUpdateStatus.Ready;
r.cachebreakNextUpdate = false;
await tx.exchanges.put(r);
+
+ if (keysInfo.currencySpecification) {
+ // Since this is the per-exchange currency info,
+ // we update it when the exchange changes it.
+ await WalletDbHelpers.upsertCurrencyInfo(tx, {
+ currencySpec: keysInfo.currencySpecification,
+ scopeInfo: {
+ type: ScopeType.Exchange,
+ currency: newDetails.currency,
+ url: exchangeBaseUrl,
+ },
+ source: "exchange",
+ });
+ }
+
const drRowId = await tx.exchangeDetails.put(newDetails);
checkDbInvariant(
typeof drRowId.key === "number",
@@ -1659,87 +1919,8 @@ export async function updateExchangeFromUrlHandler(
logger.trace("done updating exchange info in database");
- logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`);
-
- let minCheckThreshold = AbsoluteTime.addDuration(
- AbsoluteTime.now(),
- Duration.fromSpec({ days: 1 }),
- );
-
if (refreshCheckNecessary) {
- // Do auto-refresh.
- await wex.db.runReadWriteTx(
- {
- storeNames: [
- "coins",
- "denominations",
- "coinAvailability",
- "refreshGroups",
- "refreshSessions",
- "exchanges",
- ],
- },
- async (tx) => {
- const exchange = await tx.exchanges.get(exchangeBaseUrl);
- if (!exchange || !exchange.detailsPointer) {
- return;
- }
- const coins = await tx.coins.indexes.byBaseUrl
- .iter(exchangeBaseUrl)
- .toArray();
- const refreshCoins: CoinRefreshRequest[] = [];
- for (const coin of coins) {
- if (coin.status !== CoinStatus.Fresh) {
- continue;
- }
- const denom = await tx.denominations.get([
- exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- logger.warn("denomination not in database");
- continue;
- }
- const executeThreshold =
- getAutoRefreshExecuteThresholdForDenom(denom);
- if (AbsoluteTime.isExpired(executeThreshold)) {
- refreshCoins.push({
- coinPub: coin.coinPub,
- amount: denom.value,
- });
- } else {
- const checkThreshold = getAutoRefreshCheckThreshold(denom);
- minCheckThreshold = AbsoluteTime.min(
- minCheckThreshold,
- checkThreshold,
- );
- }
- }
- if (refreshCoins.length > 0) {
- const res = await createRefreshGroup(
- wex,
- tx,
- exchange.detailsPointer?.currency,
- refreshCoins,
- RefreshReason.Scheduled,
- undefined,
- );
- logger.trace(
- `created refresh group for auto-refresh (${res.refreshGroupId})`,
- );
- }
- logger.trace(
- `next refresh check at ${AbsoluteTime.toIsoString(
- minCheckThreshold,
- )}`,
- );
- exchange.nextRefreshCheckStamp = timestampPreciseToDb(
- AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
- );
- wex.ws.exchangeCache.clear();
- await tx.exchanges.put(exchange);
- },
- );
+ await doAutoRefresh(wex, exchangeBaseUrl);
}
wex.ws.notify({
@@ -1754,6 +1935,90 @@ export async function updateExchangeFromUrlHandler(
return TaskRunResult.progress();
}
+async function doAutoRefresh(
+ wex: WalletExecutionContext,
+ exchangeBaseUrl: string,
+): Promise<void> {
+ logger.trace(`doing auto-refresh check for '${exchangeBaseUrl}'`);
+
+ let minCheckThreshold = AbsoluteTime.addDuration(
+ AbsoluteTime.now(),
+ Duration.fromSpec({ days: 1 }),
+ );
+
+ await wex.db.runReadWriteTx(
+ {
+ storeNames: [
+ "coinAvailability",
+ "coinHistory",
+ "coins",
+ "denominations",
+ "exchanges",
+ "refreshGroups",
+ "refreshSessions",
+ "transactionsMeta",
+ ],
+ },
+ async (tx) => {
+ const exchange = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchange || !exchange.detailsPointer) {
+ return;
+ }
+ const coins = await tx.coins.indexes.byBaseUrl
+ .iter(exchangeBaseUrl)
+ .toArray();
+ const refreshCoins: CoinRefreshRequest[] = [];
+ for (const coin of coins) {
+ if (coin.status !== CoinStatus.Fresh) {
+ continue;
+ }
+ const denom = await tx.denominations.get([
+ exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ logger.warn("denomination not in database");
+ continue;
+ }
+ const executeThreshold = getAutoRefreshExecuteThresholdForDenom(denom);
+ if (AbsoluteTime.isExpired(executeThreshold)) {
+ refreshCoins.push({
+ coinPub: coin.coinPub,
+ amount: denom.value,
+ });
+ } else {
+ const checkThreshold = getAutoRefreshCheckThreshold(denom);
+ minCheckThreshold = AbsoluteTime.min(
+ minCheckThreshold,
+ checkThreshold,
+ );
+ }
+ }
+ if (refreshCoins.length > 0) {
+ const res = await createRefreshGroup(
+ wex,
+ tx,
+ exchange.detailsPointer?.currency,
+ refreshCoins,
+ RefreshReason.Scheduled,
+ undefined,
+ );
+ logger.trace(
+ `created refresh group for auto-refresh (${res.refreshGroupId})`,
+ );
+ }
+ logger.trace(
+ `next refresh check at ${AbsoluteTime.toIsoString(minCheckThreshold)}`,
+ );
+ exchange.nextRefreshCheckStamp = timestampPreciseToDb(
+ AbsoluteTime.toPreciseTimestamp(minCheckThreshold),
+ );
+ wex.ws.exchangeCache.clear();
+ await tx.exchanges.put(exchange);
+ },
+ );
+}
+
interface DenomLossResult {
notifications: WalletNotification[];
}
@@ -1761,7 +2026,13 @@ interface DenomLossResult {
async function handleDenomLoss(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
- ["coinAvailability", "denominations", "denomLossEvents", "coins"]
+ [
+ "coinAvailability",
+ "denominations",
+ "denomLossEvents",
+ "coins",
+ "transactionsMeta",
+ ]
>,
currency: string,
exchangeBaseUrl: string,
@@ -1852,13 +2123,11 @@ async function handleDenomLoss(
status: DenomLossStatus.Done,
timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()),
});
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.DenomLoss,
- denomLossEventId,
- });
+ const ctx = new DenomLossTransactionContext(wex, denomLossEventId);
+ await ctx.updateTransactionMeta(tx);
result.notifications.push({
type: NotificationType.TransactionStateTransition,
- transactionId,
+ transactionId: ctx.transactionId,
oldTxState: {
major: TransactionMajorState.None,
},
@@ -1868,7 +2137,7 @@ async function handleDenomLoss(
});
result.notifications.push({
type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
+ hintTransactionId: ctx.transactionId,
});
}
@@ -1884,13 +2153,11 @@ async function handleDenomLoss(
status: DenomLossStatus.Done,
timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()),
});
- const transactionId = constructTransactionIdentifier({
- tag: TransactionType.DenomLoss,
- denomLossEventId,
- });
+ const ctx = new DenomLossTransactionContext(wex, denomLossEventId);
+ await ctx.updateTransactionMeta(tx);
result.notifications.push({
type: NotificationType.TransactionStateTransition,
- transactionId,
+ transactionId: ctx.transactionId,
oldTxState: {
major: TransactionMajorState.None,
},
@@ -1900,7 +2167,7 @@ async function handleDenomLoss(
});
result.notifications.push({
type: NotificationType.BalanceChange,
- hintTransactionId: transactionId,
+ hintTransactionId: ctx.transactionId,
});
}
@@ -1955,23 +2222,55 @@ export function computeDenomLossTransactionStatus(
}
export class DenomLossTransactionContext implements TransactionContext {
+ transactionId: TransactionIdStr;
+
+ constructor(
+ private wex: WalletExecutionContext,
+ public denomLossEventId: string,
+ ) {
+ this.transactionId = constructTransactionIdentifier({
+ tag: TransactionType.DenomLoss,
+ denomLossEventId,
+ });
+ }
+
get taskId(): TaskIdStr | undefined {
return undefined;
}
- transactionId: TransactionIdStr;
+
+ async updateTransactionMeta(
+ tx: WalletDbReadWriteTransaction<["denomLossEvents", "transactionsMeta"]>,
+ ): Promise<void> {
+ const denomLossRec = await tx.denomLossEvents.get(this.denomLossEventId);
+ if (!denomLossRec) {
+ await tx.transactionsMeta.delete(this.transactionId);
+ return;
+ }
+ await tx.transactionsMeta.put({
+ transactionId: this.transactionId,
+ status: denomLossRec.status,
+ timestamp: denomLossRec.timestampCreated,
+ currency: denomLossRec.currency,
+ exchanges: [denomLossRec.exchangeBaseUrl],
+ });
+ }
abortTransaction(): Promise<void> {
throw new Error("Method not implemented.");
}
+
suspendTransaction(): Promise<void> {
throw new Error("Method not implemented.");
}
+
resumeTransaction(): Promise<void> {
throw new Error("Method not implemented.");
}
+
failTransaction(): Promise<void> {
throw new Error("Method not implemented.");
}
+
async deleteTransaction(): Promise<void> {
const transitionInfo = await this.wex.db.runReadWriteTx(
{ storeNames: ["denomLossEvents"] },
@@ -1993,21 +2292,43 @@ export class DenomLossTransactionContext implements TransactionContext {
notifyTransition(this.wex, this.transactionId, transitionInfo);
}
- constructor(
- private wex: WalletExecutionContext,
- public denomLossEventId: string,
- ) {
- this.transactionId = constructTransactionIdentifier({
- tag: TransactionType.DenomLoss,
- denomLossEventId,
- });
+ async lookupFullTransaction(
+ tx: WalletDbAllStoresReadOnlyTransaction,
+ ): Promise<Transaction | undefined> {
+ const rec = await tx.denomLossEvents.get(this.denomLossEventId);
+ if (!rec) {
+ return undefined;
+ }
+ const txState = computeDenomLossTransactionStatus(rec);
+ return {
+ type: TransactionType.DenomLoss,
+ txState,
+ scopes: await getScopeForAllExchanges(tx, [rec.exchangeBaseUrl]),
+ txActions: [TransactionAction.Delete],
+ amountRaw: Amounts.stringify(rec.amount),
+ amountEffective: Amounts.stringify(rec.amount),
+ timestamp: timestampPreciseFromDb(rec.timestampCreated),
+ transactionId: constructTransactionIdentifier({
+ tag: TransactionType.DenomLoss,
+ denomLossEventId: rec.denomLossEventId,
+ }),
+ lossEventType: rec.eventType,
+ exchangeBaseUrl: rec.exchangeBaseUrl,
+ };
}
}
async function handleRecoup(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
- ["denominations", "coins", "recoupGroups", "refreshGroups"]
+ [
+ "denominations",
+ "coins",
+ "recoupGroups",
+ "refreshGroups",
+ "transactionsMeta",
+ "exchanges",
+ ]
>,
exchangeBaseUrl: string,
recoup: Recoup[],
@@ -2119,6 +2440,19 @@ export async function getExchangeTos(
): Promise<GetExchangeTosResult> {
const exch = await fetchFreshExchange(wex, exchangeBaseUrl);
+ switch (exch.tosStatus) {
+ case ExchangeTosStatus.MissingTos:
+ return {
+ tosStatus: ExchangeTosStatus.MissingTos,
+ acceptedEtag: undefined,
+ contentLanguage: undefined,
+ contentType: "text/plain",
+ content: "NULL",
+ currentEtag: "NULL",
+ tosAvailableLanguages: [],
+ };
+ }
+
const tosDownload = await downloadTosFromAcceptedFormat(
wex,
exchangeBaseUrl,
@@ -2187,6 +2521,7 @@ export async function listExchanges(
{
storeNames: [
"exchanges",
+ "reserves",
"operationRetries",
"exchangeDetails",
"globalCurrencyAuditors",
@@ -2195,18 +2530,29 @@ export async function listExchanges(
},
async (tx) => {
const exchangeRecords = await tx.exchanges.iter().toArray();
- for (const r of exchangeRecords) {
+ for (const exchangeRec of exchangeRecords) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.ExchangeUpdate,
- exchangeBaseUrl: r.baseUrl,
+ exchangeBaseUrl: exchangeRec.baseUrl,
});
- const exchangeDetails = await getExchangeRecordsInternal(tx, r.baseUrl);
+ const exchangeDetails = await getExchangeRecordsInternal(
+ tx,
+ exchangeRec.baseUrl,
+ );
const opRetryRecord = await tx.operationRetries.get(taskId);
+ let reserveRec: ReserveRecord | undefined = undefined;
+ if (exchangeRec.currentMergeReserveRowId != null) {
+ reserveRec = await tx.reserves.get(
+ exchangeRec.currentMergeReserveRowId,
+ );
+ checkDbInvariant(!!reserveRec, "reserve record not found");
+ }
exchanges.push(
await makeExchangeListItem(
tx,
- r,
+ exchangeRec,
exchangeDetails,
+ reserveRec,
opRetryRecord?.lastError,
),
);
@@ -2444,27 +2790,20 @@ async function internalGetExchangeResources(
* but keeps some transactions (payments, p2p, refreshes) around.
*/
async function purgeExchange(
- tx: WalletDbReadWriteTransaction<
- [
- "exchanges",
- "exchangeDetails",
- "transactions",
- "coinAvailability",
- "coins",
- "denominations",
- "exchangeSignKeys",
- "withdrawalGroups",
- "planchets",
- ]
- >,
+ wex: WalletExecutionContext,
+ tx: WalletDbAllStoresReadWriteTransaction,
exchangeBaseUrl: string,
): Promise<void> {
const detRecs = await tx.exchangeDetails.indexes.byExchangeBaseUrl.getAll();
+ // Remove all exchange detail records for that exchange
for (const r of detRecs) {
if (r.rowId == null) {
// Should never happen, as rowId is the primary key.
continue;
}
+ if (r.exchangeBaseUrl !== exchangeBaseUrl) {
+ continue;
+ }
await tx.exchangeDetails.delete(r.rowId);
const signkeyRecs =
await tx.exchangeSignKeys.indexes.byExchangeDetailsRowId.getAll(r.rowId);
@@ -2519,6 +2858,8 @@ async function purgeExchange(
}
}
}
+
+ await rematerializeTransactions(wex, tx);
}
export async function deleteExchange(
@@ -2527,36 +2868,21 @@ export async function deleteExchange(
): Promise<void> {
let inUse: boolean = false;
const exchangeBaseUrl = req.exchangeBaseUrl;
- await wex.db.runReadWriteTx(
- {
- storeNames: [
- "exchanges",
- "exchangeDetails",
- "transactions",
- "coinAvailability",
- "coins",
- "denominations",
- "exchangeSignKeys",
- "withdrawalGroups",
- "planchets",
- ],
- },
- async (tx) => {
- const exchangeRec = await tx.exchanges.get(exchangeBaseUrl);
- if (!exchangeRec) {
- // Nothing to delete!
- logger.info("no exchange found to delete");
- return;
- }
- const res = await internalGetExchangeResources(wex, tx, exchangeBaseUrl);
- if (res.hasResources && !req.purge) {
- inUse = true;
- return;
- }
- await purgeExchange(tx, exchangeBaseUrl);
- wex.ws.exchangeCache.clear();
- },
- );
+ await wex.db.runAllStoresReadWriteTx({}, async (tx) => {
+ const exchangeRec = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchangeRec) {
+ // Nothing to delete!
+ logger.info("no exchange found to delete");
+ return;
+ }
+ const res = await internalGetExchangeResources(wex, tx, exchangeBaseUrl);
+ if (res.hasResources && !req.purge) {
+ inUse = true;
+ return;
+ }
+ await purgeExchange(wex, tx, exchangeBaseUrl);
+ wex.ws.exchangeCache.clear();
+ });
if (inUse) {
throw TalerError.fromUncheckedDetail({
@@ -2586,3 +2912,673 @@ export async function getExchangeResources(
}
return res;
}
+
+/**
+ * Find the currently applicable wire fee for an exchange.
+ */
+export async function getExchangeWireFee(
+ wex: WalletExecutionContext,
+ wireType: string,
+ baseUrl: string,
+ time: TalerProtocolTimestamp,
+): Promise<WireFee> {
+ const exchangeDetails = await wex.db.runReadOnlyTx(
+ { storeNames: ["exchangeDetails", "exchanges"] },
+ async (tx) => {
+ const ex = await tx.exchanges.get(baseUrl);
+ if (!ex || !ex.detailsPointer) return undefined;
+ return await tx.exchangeDetails.indexes.byPointer.get([
+ baseUrl,
+ ex.detailsPointer.currency,
+ ex.detailsPointer.masterPublicKey,
+ ]);
+ },
+ );
+
+ if (!exchangeDetails) {
+ throw Error(`exchange missing: ${baseUrl}`);
+ }
+
+ const fees = exchangeDetails.wireInfo.feesForType[wireType];
+ if (!fees || fees.length === 0) {
+ throw Error(
+ `exchange ${baseUrl} doesn't have fees for wire type ${wireType}`,
+ );
+ }
+ const fee = fees.find((x) => {
+ return AbsoluteTime.isBetween(
+ AbsoluteTime.fromProtocolTimestamp(time),
+ AbsoluteTime.fromProtocolTimestamp(x.startStamp),
+ AbsoluteTime.fromProtocolTimestamp(x.endStamp),
+ );
+ });
+ if (!fee) {
+ throw Error(
+ `exchange ${exchangeDetails.exchangeBaseUrl} doesn't have fees for wire type ${wireType} at ${time.t_s}`,
+ );
+ }
+
+ return fee;
+}
+
+export type BalanceThresholdCheckResult =
+ | {
+ result: "ok";
+ }
+ | {
+ result: "violation";
+ nextThreshold: AmountString;
+ walletKycStatus: ExchangeWalletKycStatus | undefined;
+ walletKycAccessToken: string | undefined;
+ };
+
+export async function checkIncomingAmountLegalUnderKycBalanceThreshold(
+ wex: WalletExecutionContext,
+ exchangeBaseUrl: string,
+ amountIncoming: AmountLike,
+): Promise<BalanceThresholdCheckResult> {
+ logger.info(`checking ${exchangeBaseUrl} +${amountIncoming} for KYC`);
+ return await wex.db.runReadOnlyTx(
+ {
+ storeNames: [
+ "exchanges",
+ "exchangeDetails",
+ "reserves",
+ "coinAvailability",
+ ],
+ },
+ async (tx): Promise<BalanceThresholdCheckResult> => {
+ const exchangeRec = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchangeRec) {
+ throw Error("exchange not found");
+ }
+ const det = await getExchangeRecordsInternal(tx, exchangeBaseUrl);
+ if (!det) {
+ throw Error("exchange not found");
+ }
+ const coinAvRecs =
+ await tx.coinAvailability.indexes.byExchangeBaseUrl.getAll(
+ exchangeBaseUrl,
+ );
+ let balAmount = Amounts.zeroOfCurrency(det.currency);
+ for (const av of coinAvRecs) {
+ const n = av.freshCoinCount + (av.pendingRefreshOutputCount ?? 0);
+ balAmount = Amounts.add(
+ balAmount,
+ Amounts.mult(av.value, n).amount,
+ ).amount;
+ }
+ const balExpected = Amounts.add(balAmount, amountIncoming).amount;
+
+ // Check if we already have KYC for a sufficient threshold.
+
+ const reserveId = exchangeRec.currentMergeReserveRowId;
+ let reserveRec: ReserveRecord | undefined;
+ if (reserveId) {
+ reserveRec = await tx.reserves.get(reserveId);
+ checkDbInvariant(!!reserveRec, "reserve");
+ // FIXME: also consider KYC expiration!
+ if (reserveRec.thresholdNext) {
+ if (Amounts.cmp(reserveRec.thresholdNext, balExpected) >= 0) {
+ return {
+ result: "ok",
+ };
+ }
+ } else if (reserveRec.status === ReserveRecordStatus.Done) {
+ // We don't know what the next threshold is, but we've passed *some* KYC
+ // check. We don't have enough information, so we allow the balance increase.
+ return {
+ result: "ok",
+ };
+ }
+ }
+
+ // No luck, check the next limit we should request, if any.
+
+ const limits = det.walletBalanceLimits;
+ if (!limits) {
+ logger.info("no balance limits defined");
+ return {
+ result: "ok",
+ };
+ }
+ limits.sort((a, b) => Amounts.cmp(a, b));
+ logger.info(`applicable limits: ${j2s(limits)}`);
+ let limViolated: AmountString | undefined = undefined;
+ let limNext: AmountString | undefined = undefined;
+ for (let i = 0; i < limits.length; i++) {
+ if (Amounts.cmp(limits[i], balExpected) <= 0) {
+ limViolated = limits[i];
+ limNext = limits[i + 1];
+ if (limNext == null || Amounts.cmp(limNext, balExpected) > 0) {
+ break;
+ }
+ }
+ }
+ if (!limViolated) {
+ logger.info("balance limit okay");
+ return {
+ result: "ok",
+ };
+ } else {
+ logger.info(
+ `balance limit ${limViolated} would be violated, next is ${limNext}`,
+ );
+ return {
+ result: "violation",
+ nextThreshold: limNext ?? limViolated,
+ walletKycStatus: reserveRec?.status
+ ? getKycStatusFromReserveStatus(reserveRec.status)
+ : undefined,
+ walletKycAccessToken: reserveRec?.kycAccessToken,
+ };
+ }
+ },
+ );
+}
+
+/**
+ * Wait until kyc has passed for the wallet.
+ *
+ * If passed==false, already return when legitimization
+ * is requested.
+ */
+export async function waitExchangeWalletKyc(
+ wex: WalletExecutionContext,
+ exchangeBaseUrl: string,
+ amount: AmountLike,
+ passed: boolean,
+): Promise<void> {
+ await genericWaitForState(wex, {
+ async checkState(): Promise<boolean> {
+ return await wex.db.runReadOnlyTx(
+ {
+ storeNames: ["exchanges", "reserves"],
+ },
+ async (tx) => {
+ const exchange = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchange) {
+ throw new Error("exchange not found");
+ }
+ const reserveId = exchange.currentMergeReserveRowId;
+ if (reserveId == null) {
+ logger.warn("KYC does not exist yet");
+ return false;
+ }
+ const reserve = await tx.reserves.get(reserveId);
+ if (!reserve) {
+ throw Error("reserve not found");
+ }
+ if (passed) {
+ if (
+ reserve.thresholdGranted &&
+ Amounts.cmp(reserve.thresholdGranted, amount) >= 0
+ ) {
+ return true;
+ }
+ return false;
+ } else {
+ if (
+ reserve.thresholdGranted &&
+ Amounts.cmp(reserve.thresholdGranted, amount) >= 0
+ ) {
+ return true;
+ }
+ if (reserve.status === ReserveRecordStatus.PendingLegi) {
+ return true;
+ }
+ return false;
+ }
+ },
+ );
+ },
+ filterNotification(notif) {
+ return (
+ notif.type === NotificationType.ExchangeStateTransition &&
+ notif.exchangeBaseUrl === exchangeBaseUrl
+ );
+ },
+ });
+}
+
+export async function handleTestingWaitExchangeState(
+ wex: WalletExecutionContext,
+ req: TestingWaitExchangeStateRequest,
+): Promise<EmptyObject> {
+ await genericWaitForState(wex, {
+ async checkState(): Promise<boolean> {
+ const exchangeEntry = await lookupExchangeByUri(wex, {
+ exchangeBaseUrl: req.exchangeBaseUrl,
+ });
+ if (req.walletKycStatus) {
+ if (req.walletKycStatus !== exchangeEntry.walletKycStatus) {
+ return false;
+ }
+ }
+ return true;
+ },
+ filterNotification(notif) {
+ return (
+ notif.type === NotificationType.ExchangeStateTransition &&
+ notif.exchangeBaseUrl === req.exchangeBaseUrl
+ );
+ },
+ });
+ return {};
+}
+
+export async function handleTestingWaitExchangeWalletKyc(
+ wex: WalletExecutionContext,
+ req: TestingWaitWalletKycRequest,
+): Promise<EmptyObject> {
+ await waitExchangeWalletKyc(wex, req.exchangeBaseUrl, req.amount, req.passed);
+ return {};
+}
+
+export async function handleStartExchangeWalletKyc(
+ wex: WalletExecutionContext,
+ req: StartExchangeWalletKycRequest,
+): Promise<EmptyObject> {
+ const newReservePair = await wex.cryptoApi.createEddsaKeypair({});
+ const dbRes = await wex.db.runReadWriteTx(
+ {
+ storeNames: ["exchanges", "reserves"],
+ },
+ async (tx) => {
+ const exchange = await tx.exchanges.get(req.exchangeBaseUrl);
+ if (!exchange) {
+ throw Error("exchange not found");
+ }
+ const oldExchangeState = getExchangeState(exchange);
+ let mergeReserveRowId = exchange.currentMergeReserveRowId;
+ if (mergeReserveRowId == null) {
+ const putRes = await tx.reserves.put({
+ reservePriv: newReservePair.priv,
+ reservePub: newReservePair.pub,
+ });
+ checkDbInvariant(typeof putRes.key === "number", "primary key type");
+ mergeReserveRowId = putRes.key;
+ exchange.currentMergeReserveRowId = mergeReserveRowId;
+ await tx.exchanges.put(exchange);
+ }
+ const reserveRec = await tx.reserves.get(mergeReserveRowId);
+ checkDbInvariant(reserveRec != null, "reserve record exists");
+ if (
+ reserveRec.thresholdGranted == null ||
+ Amounts.cmp(reserveRec.thresholdGranted, req.amount) < 0
+ ) {
+ if (
+ reserveRec.thresholdRequested == null ||
+ Amounts.cmp(reserveRec.thresholdRequested, req.amount) < 0
+ ) {
+ reserveRec.thresholdRequested = req.amount;
+ reserveRec.status = ReserveRecordStatus.PendingLegiInit;
+ await tx.reserves.put(reserveRec);
+ return {
+ notification: {
+ type: NotificationType.ExchangeStateTransition,
+ exchangeBaseUrl: exchange.baseUrl,
+ oldExchangeState,
+ newExchangeState: getExchangeState(exchange),
+ } satisfies WalletNotification,
+ };
+ } else {
+ logger.info(
+ `another KYC process is already active for ${req.exchangeBaseUrl} over ${reserveRec.thresholdRequested}`,
+ );
+ return undefined;
+ }
+ } else {
+ // FIXME: Check expiration once exchange tells us!
+ logger.info(
+ `KYC already granted for ${req.exchangeBaseUrl} over ${req.amount}, granted ${reserveRec.thresholdGranted}`,
+ );
+ return undefined;
+ }
+ },
+ );
+ if (dbRes && dbRes.notification) {
+ wex.ws.notify(dbRes.notification);
+ }
+ const taskId = constructTaskIdentifier({
+ tag: PendingTaskType.ExchangeWalletKyc,
+ exchangeBaseUrl: req.exchangeBaseUrl,
+ });
+ wex.taskScheduler.startShepherdTask(taskId);
+ return {};
+}
+
+async function handleExchangeKycPendingWallet(
+ wex: WalletExecutionContext,
+ exchange: ExchangeEntryRecord,
+ reserve: ReserveRecord,
+): Promise<TaskRunResult> {
+ checkDbInvariant(!!reserve.thresholdRequested, "threshold");
+ const threshold = reserve.thresholdRequested;
+ const sigResp = await wex.cryptoApi.signWalletAccountSetup({
+ reservePriv: reserve.reservePriv,
+ reservePub: reserve.reservePub,
+ threshold,
+ });
+ const requestUrl = new URL("kyc-wallet", exchange.baseUrl);
+ const body: WalletKycRequest = {
+ balance: reserve.thresholdRequested,
+ reserve_pub: reserve.reservePub,
+ reserve_sig: sigResp.sig,
+ };
+ logger.info(`kyc-wallet request body: ${j2s(body)}`);
+ const res = await wex.http.fetch(requestUrl.href, {
+ method: "POST",
+ body,
+ });
+
+ logger.info(`kyc-wallet response status is ${res.status}`);
+
+ switch (res.status) {
+ case HttpStatusCode.Ok: {
+ // KYC somehow already passed
+ // FIXME: Store next threshold and timestamp!
+ const accountKycStatus = await readSuccessResponseJsonOrThrow(
+ res,
+ codecForAccountKycStatus(),
+ );
+ return handleExchangeKycSuccess(wex, exchange.baseUrl, accountKycStatus);
+ }
+ case HttpStatusCode.NoContent: {
+ // KYC disabled at exchange.
+ return handleExchangeKycSuccess(wex, exchange.baseUrl, undefined);
+ }
+ case HttpStatusCode.Forbidden: {
+ // Did not work!
+ const err = await readTalerErrorResponse(res);
+ throwUnexpectedRequestError(res, err);
+ }
+ case HttpStatusCode.UnavailableForLegalReasons: {
+ const kycBody = await readResponseJsonOrThrow(
+ res,
+ codecForLegitimizationNeededResponse(),
+ );
+ return handleExchangeKycRespLegi(wex, exchange.baseUrl, reserve, kycBody);
+ }
+ default: {
+ const err = await readTalerErrorResponse(res);
+ throwUnexpectedRequestError(res, err);
+ }
+ }
+}
+
+async function handleExchangeKycSuccess(
+ wex: WalletExecutionContext,
+ exchangeBaseUrl: string,
+ accountKycStatus: AccountKycStatus | undefined,
+): Promise<TaskRunResult> {
+ logger.info(`kyc check for ${exchangeBaseUrl} satisfied`);
+ const dbRes = await wex.db.runReadWriteTx(
+ { storeNames: ["exchanges", "reserves"] },
+ async (tx) => {
+ const exchange = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchange) {
+ throw Error("exchange not found");
+ }
+ const oldExchangeState = getExchangeState(exchange);
+ const reserveId = exchange.currentMergeReserveRowId;
+ if (reserveId == null) {
+ throw Error("expected exchange to have reserve ID");
+ }
+ const reserve = await tx.reserves.get(reserveId);
+ checkDbInvariant(!!reserve, "merge reserve should exist");
+ switch (reserve.status) {
+ case ReserveRecordStatus.PendingLegiInit:
+ case ReserveRecordStatus.PendingLegi:
+ break;
+ default:
+ throw Error("unexpected state (concurrent modification?)");
+ }
+ reserve.status = ReserveRecordStatus.Done;
+ reserve.thresholdGranted = reserve.thresholdRequested;
+ delete reserve.thresholdRequested;
+ delete reserve.requirementRow;
+
+ // Try to figure out the next balance limit
+ let nextLimit: AmountString | undefined = undefined;
+ if (accountKycStatus?.limits) {
+ for (const lim of accountKycStatus.limits) {
+ if (lim.operation_type.toLowerCase() === "balance") {
+ nextLimit = lim.threshold;
+ }
+ }
+ }
+ reserve.thresholdNext = nextLimit;
+
+ await tx.reserves.put(reserve);
+ logger.info(`newly granted threshold: ${reserve.thresholdGranted}`);
+ return {
+ notification: {
+ type: NotificationType.ExchangeStateTransition,
+ exchangeBaseUrl: exchange.baseUrl,
+ oldExchangeState,
+ newExchangeState: getExchangeState(exchange),
+ } satisfies WalletNotification,
+ };
+ },
+ );
+ if (dbRes && dbRes.notification) {
+ wex.ws.notify(dbRes.notification);
+ }
+ return TaskRunResult.progress();
+}
+
+/**
+ * The exchange has just told us that we need some legitimization
+ * from the user. Request more details and store the result in the database.
+ */
+async function handleExchangeKycRespLegi(
+ wex: WalletExecutionContext,
+ exchangeBaseUrl: string,
+ reserve: ReserveRecord,
+ kycBody: LegitimizationNeededResponse,
+): Promise<TaskRunResult> {
+ const sigResp = await wex.cryptoApi.signWalletKycAuth({
+ accountPriv: reserve.reservePriv,
+ accountPub: reserve.reservePub,
+ });
+ const reqUrl = new URL(`kyc-check/${kycBody.h_payto}`, exchangeBaseUrl);
+ const resp = await wex.http.fetch(reqUrl.href, {
+ method: "GET",
+ headers: {
+ ["Account-Owner-Signature"]: sigResp.sig,
+ },
+ });
+
+ logger.info(`kyc-check (long-poll) response status ${resp.status}`);
+
+ switch (resp.status) {
+ case HttpStatusCode.Ok: {
+ // FIXME: Store information about next limit!
+ const accountKycStatus = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForAccountKycStatus(),
+ );
+ return handleExchangeKycSuccess(wex, exchangeBaseUrl, accountKycStatus);
+ }
+ case HttpStatusCode.Accepted: {
+ // Store the result in the DB!
+ break;
+ }
+ case HttpStatusCode.NoContent: {
+ // KYC not configured, so already satisfied
+ return handleExchangeKycSuccess(wex, exchangeBaseUrl, undefined);
+ }
+ default: {
+ const err = await readTalerErrorResponse(resp);
+ throwUnexpectedRequestError(resp, err);
+ }
+ }
+
+ const accountKycStatusResp = await readResponseJsonOrThrow(
+ resp,
+ codecForAccountKycStatus(),
+ );
+
+ const dbRes = await wex.db.runReadWriteTx(
+ { storeNames: ["exchanges", "reserves"] },
+ async (tx) => {
+ const exchange = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchange) {
+ throw Error("exchange not found");
+ }
+ const oldExchangeState = getExchangeState(exchange);
+ const reserveId = exchange.currentMergeReserveRowId;
+ if (reserveId == null) {
+ throw Error("expected exchange to have reserve ID");
+ }
+ const reserve = await tx.reserves.get(reserveId);
+ checkDbInvariant(!!reserve, "merge reserve should exist");
+ switch (reserve.status) {
+ case ReserveRecordStatus.PendingLegiInit:
+ break;
+ default:
+ throw Error("unexpected state (concurrent modification?)");
+ }
+ reserve.status = ReserveRecordStatus.PendingLegi;
+ reserve.requirementRow = kycBody.requirement_row;
+ reserve.amlReview = accountKycStatusResp.aml_review;
+ reserve.kycAccessToken = accountKycStatusResp.access_token;
+
+ await tx.reserves.put(reserve);
+ return {
+ notification: {
+ type: NotificationType.ExchangeStateTransition,
+ exchangeBaseUrl: exchange.baseUrl,
+ oldExchangeState,
+ newExchangeState: getExchangeState(exchange),
+ } satisfies WalletNotification,
+ };
+ },
+ );
+ if (dbRes && dbRes.notification) {
+ wex.ws.notify(dbRes.notification);
+ }
+ return TaskRunResult.progress();
+}
+
+/**
+ * Legitimization was requested from the user by the exchange.
+ *
+ * Long-poll for the legitimization to succeed.
+ */
+async function handleExchangeKycPendingLegitimization(
+ wex: WalletExecutionContext,
+ exchange: ExchangeEntryRecord,
+ reserve: ReserveRecord,
+): Promise<TaskRunResult> {
+ // FIXME: Cache this signature
+ const sigResp = await wex.cryptoApi.signWalletKycAuth({
+ accountPriv: reserve.reservePriv,
+ accountPub: reserve.reservePub,
+ });
+
+ const reservePayto = stringifyReservePaytoUri(
+ exchange.baseUrl,
+ reserve.reservePub,
+ );
+
+ const paytoHash = encodeCrock(hashPaytoUri(reservePayto));
+
+ const resp = await wex.ws.runLongpollQueueing(
+ wex,
+ exchange.baseUrl,
+ async (timeoutMs) => {
+ const reqUrl = new URL(`kyc-check/${paytoHash}`, exchange.baseUrl);
+ reqUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
+ logger.info(`long-polling wallet KYC status at ${reqUrl.href}`);
+ return await wex.http.fetch(reqUrl.href, {
+ method: "GET",
+ headers: {
+ ["Account-Owner-Signature"]: sigResp.sig,
+ },
+ });
+ },
+ );
+
+ logger.info(`kyc-check (long-poll) response status ${resp.status}`);
+
+ switch (resp.status) {
+ case HttpStatusCode.Ok: {
+ // FIXME: Store information about next limit!
+ const accountKycStatus = await readSuccessResponseJsonOrThrow(
+ resp,
+ codecForAccountKycStatus(),
+ );
+ return handleExchangeKycSuccess(wex, exchange.baseUrl, accountKycStatus);
+ }
+ case HttpStatusCode.Accepted:
+ // FIXME: Do we ever need to update the access token?
+ return TaskRunResult.longpollReturnedPending();
+ case HttpStatusCode.NoContent: {
+ // KYC not configured, so already satisfied
+ return handleExchangeKycSuccess(wex, exchange.baseUrl, undefined);
+ }
+ default: {
+ const err = await readTalerErrorResponse(resp);
+ throwUnexpectedRequestError(resp, err);
+ }
+ }
+}
+
+export async function processExchangeKyc(
+ wex: WalletExecutionContext,
+ exchangeBaseUrl: string,
+): Promise<TaskRunResult> {
+ const res = await wex.db.runReadOnlyTx(
+ { storeNames: ["exchanges", "reserves"] },
+ async (tx) => {
+ const exchange = await tx.exchanges.get(exchangeBaseUrl);
+ if (!exchange) {
+ return undefined;
+ }
+ const reserveId = exchange.currentMergeReserveRowId;
+ let reserve: ReserveRecord | undefined = undefined;
+ if (reserveId != null) {
+ reserve = await tx.reserves.get(reserveId);
+ }
+ return { exchange, reserve };
+ },
+ );
+ if (!res) {
+ logger.warn(`exchange ${exchangeBaseUrl} not found, not processing KYC`);
+ return TaskRunResult.finished();
+ }
+ if (!res.reserve) {
+ return TaskRunResult.finished();
+ }
+ switch (res.reserve.status) {
+ case undefined:
+ // No KYC requested
+ return TaskRunResult.finished();
+ case ReserveRecordStatus.Done:
+ return TaskRunResult.finished();
+ case ReserveRecordStatus.SuspendedLegiInit:
+ case ReserveRecordStatus.SuspendedLegi:
+ return TaskRunResult.finished();
+ case ReserveRecordStatus.PendingLegiInit:
+ return handleExchangeKycPendingWallet(wex, res.exchange, res.reserve);
+ case ReserveRecordStatus.PendingLegi:
+ return handleExchangeKycPendingLegitimization(
+ wex,
+ res.exchange,
+ res.reserve,
+ );
+ }
+}
+
+export async function checkExchangeInScope(
+ wex: WalletExecutionContext,
+ exchangeBaseUrl: string,
+ scope: ScopeInfo,
+): Promise<boolean> {
+ if (scope.type === ScopeType.Exchange && scope.url !== exchangeBaseUrl) {
+ return false;
+ }
+ return true;
+}