From e951075d2ef52fa8e9e7489c62031777c3a7e66b Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Mon, 19 Feb 2024 18:05:48 +0100 Subject: wallet-core: flatten directory structure --- packages/taler-wallet-core/src/deposits.ts | 1598 ++++++++++++++++++++++++++++ 1 file changed, 1598 insertions(+) create mode 100644 packages/taler-wallet-core/src/deposits.ts (limited to 'packages/taler-wallet-core/src/deposits.ts') diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts new file mode 100644 index 000000000..94e7f01ac --- /dev/null +++ b/packages/taler-wallet-core/src/deposits.ts @@ -0,0 +1,1598 @@ +/* + This file is part of GNU Taler + (C) 2021-2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +/** + * Implementation of the deposit transaction. + */ + +/** + * Imports. + */ +import { + AbsoluteTime, + AmountJson, + Amounts, + BatchDepositRequestCoin, + CancellationToken, + CoinRefreshRequest, + CreateDepositGroupRequest, + CreateDepositGroupResponse, + DepositGroupFees, + Duration, + ExchangeBatchDepositRequest, + ExchangeRefundRequest, + HttpStatusCode, + Logger, + MerchantContractTerms, + NotificationType, + PayCoinSelection, + PrepareDepositRequest, + PrepareDepositResponse, + RefreshReason, + TalerError, + TalerErrorCode, + TalerPreciseTimestamp, + TalerProtocolTimestamp, + TrackTransaction, + TransactionAction, + TransactionIdStr, + TransactionMajorState, + TransactionMinorState, + TransactionState, + TransactionType, + URL, + WireFee, + canonicalJson, + codecForBatchDepositSuccess, + codecForTackTransactionAccepted, + codecForTackTransactionWired, + durationFromSpec, + encodeCrock, + getRandomBytes, + hashTruncate32, + hashWire, + j2s, + parsePaytoUri, + stringToBytes, +} from "@gnu-taler/taler-util"; +import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; +import { DepositElementStatus, DepositGroupRecord } from "./db.js"; +import { + DepositOperationStatus, + DepositTrackingInfo, + KycPendingInfo, + PendingTaskType, + RefreshOperationStatus, + TaskId, + createRefreshGroup, + getCandidateWithdrawalDenomsTx, + getTotalRefreshCost, + timestampPreciseToDb, + timestampProtocolToDb, +} from "./index.js"; +import { InternalWalletState } from "./internal-wallet-state.js"; +import { assertUnreachable } from "./util/assertUnreachable.js"; +import { selectPayCoinsNew } from "./util/coinSelection.js"; +import { checkDbInvariant, checkLogicInvariant } from "./util/invariants.js"; +import { + TaskRunResult, + TombstoneTag, + TransactionContext, + constructTaskIdentifier, + spendCoins, +} from "./common.js"; +import { getExchangeWireDetailsInTx } from "./exchanges.js"; +import { + extractContractData, + generateDepositPermissions, + getTotalPaymentCost, +} from "./pay-merchant.js"; +import { + constructTransactionIdentifier, + notifyTransition, + parseTransactionIdentifier, +} from "./transactions.js"; + +/** + * Logger. + */ +const logger = new Logger("deposits.ts"); + +export class DepositTransactionContext implements TransactionContext { + readonly transactionId: TransactionIdStr; + readonly taskId: TaskId; + constructor( + public ws: InternalWalletState, + public depositGroupId: string, + ) { + this.transactionId = constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId, + }); + this.taskId = constructTaskIdentifier({ + tag: PendingTaskType.Deposit, + depositGroupId, + }); + } + + async deleteTransaction(): Promise { + const depositGroupId = this.depositGroupId; + const ws = this.ws; + // FIXME: We should check first if we are in a final state + // where deletion is allowed. + await ws.db.runReadWriteTx(["depositGroups", "tombstones"], async (tx) => { + const tipRecord = await tx.depositGroups.get(depositGroupId); + if (tipRecord) { + await tx.depositGroups.delete(depositGroupId); + await tx.tombstones.put({ + id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId, + }); + } + }); + return; + } + + async suspendTransaction(): Promise { + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + logger.warn( + `can't suspend deposit group, depositGroupId=${depositGroupId} not found`, + ); + return undefined; + } + const oldState = computeDepositTransactionStatus(dg); + let newOpStatus: DepositOperationStatus | undefined; + switch (dg.operationStatus) { + case DepositOperationStatus.PendingDeposit: + newOpStatus = DepositOperationStatus.SuspendedDeposit; + break; + case DepositOperationStatus.PendingKyc: + newOpStatus = DepositOperationStatus.SuspendedKyc; + break; + case DepositOperationStatus.PendingTrack: + newOpStatus = DepositOperationStatus.SuspendedTrack; + break; + case DepositOperationStatus.Aborting: + newOpStatus = DepositOperationStatus.SuspendedAborting; + break; + } + if (!newOpStatus) { + return undefined; + } + dg.operationStatus = newOpStatus; + await tx.depositGroups.put(dg); + return { + oldTxState: oldState, + newTxState: computeDepositTransactionStatus(dg), + }; + }, + ); + ws.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(ws, transactionId, transitionInfo); + } + + async abortTransaction(): Promise { + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + logger.warn( + `can't suspend deposit group, depositGroupId=${depositGroupId} not found`, + ); + return undefined; + } + const oldState = computeDepositTransactionStatus(dg); + switch (dg.operationStatus) { + case DepositOperationStatus.Finished: + return undefined; + case DepositOperationStatus.PendingDeposit: { + dg.operationStatus = DepositOperationStatus.Aborting; + await tx.depositGroups.put(dg); + return { + oldTxState: oldState, + newTxState: computeDepositTransactionStatus(dg), + }; + } + case DepositOperationStatus.SuspendedDeposit: + // FIXME: Can we abort a suspended transaction?! + return undefined; + } + return undefined; + }, + ); + ws.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); + } + + async resumeTransaction(): Promise { + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + logger.warn( + `can't resume deposit group, depositGroupId=${depositGroupId} not found`, + ); + return; + } + const oldState = computeDepositTransactionStatus(dg); + let newOpStatus: DepositOperationStatus | undefined; + switch (dg.operationStatus) { + case DepositOperationStatus.SuspendedDeposit: + newOpStatus = DepositOperationStatus.PendingDeposit; + break; + case DepositOperationStatus.SuspendedAborting: + newOpStatus = DepositOperationStatus.Aborting; + break; + case DepositOperationStatus.SuspendedKyc: + newOpStatus = DepositOperationStatus.PendingKyc; + break; + case DepositOperationStatus.SuspendedTrack: + newOpStatus = DepositOperationStatus.PendingTrack; + break; + } + if (!newOpStatus) { + return undefined; + } + dg.operationStatus = newOpStatus; + await tx.depositGroups.put(dg); + return { + oldTxState: oldState, + newTxState: computeDepositTransactionStatus(dg), + }; + }, + ); + notifyTransition(ws, transactionId, transitionInfo); + ws.taskScheduler.startShepherdTask(retryTag); + } + + async failTransaction(): Promise { + const { ws, depositGroupId, transactionId, taskId: retryTag } = this; + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + logger.warn( + `can't cancel aborting deposit group, depositGroupId=${depositGroupId} not found`, + ); + return undefined; + } + const oldState = computeDepositTransactionStatus(dg); + switch (dg.operationStatus) { + case DepositOperationStatus.SuspendedAborting: + case DepositOperationStatus.Aborting: { + dg.operationStatus = DepositOperationStatus.Failed; + await tx.depositGroups.put(dg); + return { + oldTxState: oldState, + newTxState: computeDepositTransactionStatus(dg), + }; + } + } + return undefined; + }, + ); + // FIXME: Also cancel ongoing work (via cancellation token, once implemented) + ws.taskScheduler.stopShepherdTask(retryTag); + notifyTransition(ws, transactionId, transitionInfo); + } +} + +/** + * Get the (DD37-style) transaction status based on the + * database record of a deposit group. + */ +export function computeDepositTransactionStatus( + dg: DepositGroupRecord, +): TransactionState { + switch (dg.operationStatus) { + case DepositOperationStatus.Finished: + return { + major: TransactionMajorState.Done, + }; + case DepositOperationStatus.PendingDeposit: + return { + major: TransactionMajorState.Pending, + minor: TransactionMinorState.Deposit, + }; + case DepositOperationStatus.PendingKyc: + return { + major: TransactionMajorState.Pending, + minor: TransactionMinorState.KycRequired, + }; + case DepositOperationStatus.PendingTrack: + return { + major: TransactionMajorState.Pending, + minor: TransactionMinorState.Track, + }; + case DepositOperationStatus.SuspendedKyc: + return { + major: TransactionMajorState.Suspended, + minor: TransactionMinorState.KycRequired, + }; + case DepositOperationStatus.SuspendedTrack: + return { + major: TransactionMajorState.Suspended, + minor: TransactionMinorState.Track, + }; + case DepositOperationStatus.SuspendedDeposit: + return { + major: TransactionMajorState.Suspended, + }; + case DepositOperationStatus.Aborting: + return { + major: TransactionMajorState.Aborting, + }; + case DepositOperationStatus.Aborted: + return { + major: TransactionMajorState.Aborted, + }; + case DepositOperationStatus.Failed: + return { + major: TransactionMajorState.Failed, + }; + case DepositOperationStatus.SuspendedAborting: + return { + major: TransactionMajorState.SuspendedAborting, + }; + default: + assertUnreachable(dg.operationStatus); + } +} + +/** + * Compute the possible actions possible on a deposit transaction + * based on the current transaction state. + */ +export function computeDepositTransactionActions( + dg: DepositGroupRecord, +): TransactionAction[] { + switch (dg.operationStatus) { + case DepositOperationStatus.Finished: + return [TransactionAction.Delete]; + case DepositOperationStatus.PendingDeposit: + return [TransactionAction.Suspend, TransactionAction.Abort]; + case DepositOperationStatus.SuspendedDeposit: + return [TransactionAction.Resume]; + case DepositOperationStatus.Aborting: + return [TransactionAction.Fail, TransactionAction.Suspend]; + case DepositOperationStatus.Aborted: + return [TransactionAction.Delete]; + case DepositOperationStatus.Failed: + return [TransactionAction.Delete]; + case DepositOperationStatus.SuspendedAborting: + return [TransactionAction.Resume, TransactionAction.Fail]; + case DepositOperationStatus.PendingKyc: + return [TransactionAction.Suspend, TransactionAction.Fail]; + case DepositOperationStatus.PendingTrack: + return [TransactionAction.Suspend, TransactionAction.Abort]; + case DepositOperationStatus.SuspendedKyc: + return [TransactionAction.Resume, TransactionAction.Fail]; + case DepositOperationStatus.SuspendedTrack: + return [TransactionAction.Resume, TransactionAction.Abort]; + default: + assertUnreachable(dg.operationStatus); + } +} + +/** + * Check whether the refresh associated with the + * aborting deposit group is done. + * + * If done, mark the deposit transaction as aborted. + * + * Otherwise continue waiting. + * + * FIXME: Wait for the refresh group notifications instead of periodically + * checking the refresh group status. + * FIXME: This is just one transaction, can't we do this in the initial + * transaction of processDepositGroup? + */ +async function waitForRefreshOnDepositGroup( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, +): Promise { + const abortRefreshGroupId = depositGroup.abortRefreshGroupId; + checkLogicInvariant(!!abortRefreshGroupId); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId: depositGroup.depositGroupId, + }); + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups", "refreshGroups"], + async (tx) => { + const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId); + let newOpState: DepositOperationStatus | undefined; + if (!refreshGroup) { + // Maybe it got manually deleted? Means that we should + // just go into aborted. + logger.warn("no aborting refresh group found for deposit group"); + newOpState = DepositOperationStatus.Aborted; + } else { + if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) { + newOpState = DepositOperationStatus.Aborted; + } else if ( + refreshGroup.operationStatus === RefreshOperationStatus.Failed + ) { + newOpState = DepositOperationStatus.Aborted; + } + } + if (newOpState) { + const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); + if (!newDg) { + return; + } + const oldTxState = computeDepositTransactionStatus(newDg); + newDg.operationStatus = newOpState; + const newTxState = computeDepositTransactionStatus(newDg); + await tx.depositGroups.put(newDg); + return { oldTxState, newTxState }; + } + return undefined; + }, + ); + + notifyTransition(ws, transactionId, transitionInfo); + return TaskRunResult.backoff(); +} + +async function refundDepositGroup( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, +): Promise { + const newTxPerCoin = [...depositGroup.statusPerCoin]; + logger.info(`status per coin: ${j2s(depositGroup.statusPerCoin)}`); + for (let i = 0; i < depositGroup.statusPerCoin.length; i++) { + const st = depositGroup.statusPerCoin[i]; + switch (st) { + case DepositElementStatus.RefundFailed: + case DepositElementStatus.RefundSuccess: + break; + default: { + const coinPub = depositGroup.payCoinSelection.coinPubs[i]; + const coinExchange = await ws.db.runReadOnlyTx( + ["coins"], + async (tx) => { + const coinRecord = await tx.coins.get(coinPub); + checkDbInvariant(!!coinRecord); + return coinRecord.exchangeBaseUrl; + }, + ); + const refundAmount = depositGroup.payCoinSelection.coinContributions[i]; + // We use a constant refund transaction ID, since there can + // only be one refund. + const rtid = 1; + const sig = await ws.cryptoApi.signRefund({ + coinPub, + contractTermsHash: depositGroup.contractTermsHash, + merchantPriv: depositGroup.merchantPriv, + merchantPub: depositGroup.merchantPub, + refundAmount: refundAmount, + rtransactionId: rtid, + }); + const refundReq: ExchangeRefundRequest = { + h_contract_terms: depositGroup.contractTermsHash, + merchant_pub: depositGroup.merchantPub, + merchant_sig: sig.sig, + refund_amount: refundAmount, + rtransaction_id: rtid, + }; + const refundUrl = new URL(`coins/${coinPub}/refund`, coinExchange); + const httpResp = await ws.http.fetch(refundUrl.href, { + method: "POST", + body: refundReq, + }); + logger.info( + `coin ${i} refund HTTP status for coin: ${httpResp.status}`, + ); + let newStatus: DepositElementStatus; + if (httpResp.status === 200) { + // FIXME: validate response + newStatus = DepositElementStatus.RefundSuccess; + } else { + // FIXME: Store problem somewhere! + newStatus = DepositElementStatus.RefundFailed; + } + // FIXME: Handle case where refund request needs to be tried again + newTxPerCoin[i] = newStatus; + break; + } + } + } + let isDone = true; + for (let i = 0; i < newTxPerCoin.length; i++) { + if ( + newTxPerCoin[i] != DepositElementStatus.RefundFailed && + newTxPerCoin[i] != DepositElementStatus.RefundSuccess + ) { + isDone = false; + } + } + + const currency = Amounts.currencyOf(depositGroup.totalPayCost); + + await ws.db.runReadWriteTx( + [ + "depositGroups", + "refreshGroups", + "coins", + "denominations", + "coinAvailability", + ], + async (tx) => { + const newDg = await tx.depositGroups.get(depositGroup.depositGroupId); + if (!newDg) { + return; + } + newDg.statusPerCoin = newTxPerCoin; + const refreshCoins: CoinRefreshRequest[] = []; + for (let i = 0; i < newTxPerCoin.length; i++) { + refreshCoins.push({ + amount: depositGroup.payCoinSelection.coinContributions[i], + coinPub: depositGroup.payCoinSelection.coinPubs[i], + }); + } + if (isDone) { + const rgid = await createRefreshGroup( + ws, + tx, + currency, + refreshCoins, + RefreshReason.AbortDeposit, + constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId: newDg.depositGroupId, + }), + ); + newDg.abortRefreshGroupId = rgid.refreshGroupId; + } + await tx.depositGroups.put(newDg); + }, + ); + + return TaskRunResult.backoff(); +} + +async function processDepositGroupAborting( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, +): Promise { + logger.info("processing deposit tx in 'aborting'"); + const abortRefreshGroupId = depositGroup.abortRefreshGroupId; + if (!abortRefreshGroupId) { + logger.info("refunding deposit group"); + return refundDepositGroup(ws, depositGroup); + } + logger.info("waiting for refresh"); + return waitForRefreshOnDepositGroup(ws, depositGroup); +} + +async function processDepositGroupPendingKyc( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, + cancellationToken: CancellationToken, +): Promise { + const { depositGroupId } = depositGroup; + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId, + }); + const retryTag = constructTaskIdentifier({ + tag: PendingTaskType.Deposit, + depositGroupId, + }); + + const kycInfo = depositGroup.kycInfo; + const userType = "individual"; + + if (!kycInfo) { + throw Error("invalid DB state, in pending(kyc), but no kycInfo present"); + } + + const url = new URL( + `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, + kycInfo.exchangeBaseUrl, + ); + url.searchParams.set("timeout_ms", "10000"); + logger.info(`kyc url ${url.href}`); + const kycStatusRes = await ws.http.fetch(url.href, { + method: "GET", + cancellationToken, + }); + if ( + kycStatusRes.status === HttpStatusCode.Ok || + //FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge + // remove after the exchange is fixed or clarified + kycStatusRes.status === HttpStatusCode.NoContent + ) { + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { + const newDg = await tx.depositGroups.get(depositGroupId); + if (!newDg) { + return; + } + if (newDg.operationStatus !== DepositOperationStatus.PendingKyc) { + return; + } + const oldTxState = computeDepositTransactionStatus(newDg); + newDg.operationStatus = DepositOperationStatus.PendingTrack; + const newTxState = computeDepositTransactionStatus(newDg); + await tx.depositGroups.put(newDg); + return { oldTxState, newTxState }; + }, + ); + notifyTransition(ws, transactionId, transitionInfo); + } else if (kycStatusRes.status === HttpStatusCode.Accepted) { + // FIXME: Do we have to update the URL here? + } else { + throw Error(`unexpected response from kyc-check (${kycStatusRes.status})`); + } + return TaskRunResult.backoff(); +} + +/** + * Tracking information from the exchange indicated that + * KYC is required. We need to check the KYC info + * and transition the transaction to the KYC required state. + */ +async function transitionToKycRequired( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, + kycInfo: KycPendingInfo, + exchangeUrl: string, +): Promise { + const { depositGroupId } = depositGroup; + const userType = "individual"; + + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId, + }); + + const url = new URL( + `kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`, + exchangeUrl, + ); + logger.info(`kyc url ${url.href}`); + const kycStatusReq = await ws.http.fetch(url.href, { + method: "GET", + }); + if (kycStatusReq.status === HttpStatusCode.Ok) { + logger.warn("kyc requested, but already fulfilled"); + return TaskRunResult.backoff(); + } else if (kycStatusReq.status === HttpStatusCode.Accepted) { + const kycStatus = await kycStatusReq.json(); + logger.info(`kyc status: ${j2s(kycStatus)}`); + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return undefined; + } + if (dg.operationStatus !== DepositOperationStatus.PendingTrack) { + return undefined; + } + const oldTxState = computeDepositTransactionStatus(dg); + dg.kycInfo = { + exchangeBaseUrl: exchangeUrl, + kycUrl: kycStatus.kyc_url, + paytoHash: kycInfo.paytoHash, + requirementRow: kycInfo.requirementRow, + }; + await tx.depositGroups.put(dg); + const newTxState = computeDepositTransactionStatus(dg); + return { oldTxState, newTxState }; + }, + ); + notifyTransition(ws, transactionId, transitionInfo); + return TaskRunResult.finished(); + } else { + throw Error(`unexpected response from kyc-check (${kycStatusReq.status})`); + } +} + +async function processDepositGroupPendingTrack( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, + cancellationToken?: CancellationToken, +): Promise { + const { depositGroupId } = depositGroup; + for (let i = 0; i < depositGroup.statusPerCoin.length; i++) { + const coinPub = depositGroup.payCoinSelection.coinPubs[i]; + // FIXME: Make the URL part of the coin selection? + const exchangeBaseUrl = await ws.db.runReadWriteTx( + ["coins"], + async (tx) => { + const coinRecord = await tx.coins.get(coinPub); + checkDbInvariant(!!coinRecord); + return coinRecord.exchangeBaseUrl; + }, + ); + + let updatedTxStatus: DepositElementStatus | undefined = undefined; + let newWiredCoin: + | { + id: string; + value: DepositTrackingInfo; + } + | undefined; + + if (depositGroup.statusPerCoin[i] !== DepositElementStatus.Wired) { + const track = await trackDeposit( + ws, + depositGroup, + coinPub, + exchangeBaseUrl, + ); + + if (track.type === "accepted") { + if (!track.kyc_ok && track.requirement_row !== undefined) { + const paytoHash = encodeCrock( + hashTruncate32(stringToBytes(depositGroup.wire.payto_uri + "\0")), + ); + const { requirement_row: requirementRow } = track; + const kycInfo: KycPendingInfo = { + paytoHash, + requirementRow, + }; + return transitionToKycRequired( + ws, + depositGroup, + kycInfo, + exchangeBaseUrl, + ); + } else { + updatedTxStatus = DepositElementStatus.Tracking; + } + } else if (track.type === "wired") { + updatedTxStatus = DepositElementStatus.Wired; + + const payto = parsePaytoUri(depositGroup.wire.payto_uri); + if (!payto) { + throw Error(`unparsable payto: ${depositGroup.wire.payto_uri}`); + } + + const fee = await getExchangeWireFee( + ws, + payto.targetType, + exchangeBaseUrl, + track.execution_time, + ); + const raw = Amounts.parseOrThrow(track.coin_contribution); + const wireFee = Amounts.parseOrThrow(fee.wireFee); + + newWiredCoin = { + value: { + amountRaw: Amounts.stringify(raw), + wireFee: Amounts.stringify(wireFee), + exchangePub: track.exchange_pub, + timestampExecuted: timestampProtocolToDb(track.execution_time), + wireTransferId: track.wtid, + }, + id: track.exchange_sig, + }; + } else { + updatedTxStatus = DepositElementStatus.DepositPending; + } + } + + if (updatedTxStatus !== undefined) { + await ws.db.runReadWriteTx(["depositGroups"], async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return; + } + if (updatedTxStatus !== undefined) { + dg.statusPerCoin[i] = updatedTxStatus; + } + if (newWiredCoin) { + /** + * FIXME: if there is a new wire information from the exchange + * it should add up to the previous tracking states. + * + * This may loose information by overriding prev state. + * + * And: add checks to integration tests + */ + if (!dg.trackingState) { + dg.trackingState = {}; + } + + dg.trackingState[newWiredCoin.id] = newWiredCoin.value; + } + await tx.depositGroups.put(dg); + }); + } + } + + let allWired = true; + + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return undefined; + } + const oldTxState = computeDepositTransactionStatus(dg); + for (let i = 0; i < depositGroup.statusPerCoin.length; i++) { + if (depositGroup.statusPerCoin[i] !== DepositElementStatus.Wired) { + allWired = false; + break; + } + } + if (allWired) { + dg.timestampFinished = timestampPreciseToDb( + TalerPreciseTimestamp.now(), + ); + dg.operationStatus = DepositOperationStatus.Finished; + await tx.depositGroups.put(dg); + } + const newTxState = computeDepositTransactionStatus(dg); + return { oldTxState, newTxState }; + }, + ); + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId, + }); + notifyTransition(ws, transactionId, transitionInfo); + if (allWired) { + return TaskRunResult.finished(); + } else { + // FIXME: Use long-polling. + return TaskRunResult.backoff(); + } +} + +async function processDepositGroupPendingDeposit( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, + cancellationToken?: CancellationToken, +): Promise { + logger.info("processing deposit group in pending(deposit)"); + const depositGroupId = depositGroup.depositGroupId; + const contractTermsRec = await ws.db.runReadOnlyTx( + ["contractTerms"], + async (tx) => { + return tx.contractTerms.get(depositGroup.contractTermsHash); + }, + ); + if (!contractTermsRec) { + throw Error("contract terms for deposit not found in database"); + } + const contractTerms: MerchantContractTerms = + contractTermsRec.contractTermsRaw; + const contractData = extractContractData( + contractTermsRec.contractTermsRaw, + depositGroup.contractTermsHash, + "", + ); + + const transactionId = constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId, + }); + + // Check for cancellation before expensive operations. + cancellationToken?.throwIfCancelled(); + + // FIXME: Cache these! + const depositPermissions = await generateDepositPermissions( + ws, + depositGroup.payCoinSelection, + contractData, + ); + + // Exchanges involved in the deposit + const exchanges: Set = new Set(); + + for (const dp of depositPermissions) { + exchanges.add(dp.exchange_url); + } + + // We need to do one batch per exchange. + for (const exchangeUrl of exchanges.values()) { + const coins: BatchDepositRequestCoin[] = []; + const batchIndexes: number[] = []; + + const batchReq: ExchangeBatchDepositRequest = { + coins, + h_contract_terms: depositGroup.contractTermsHash, + merchant_payto_uri: depositGroup.wire.payto_uri, + merchant_pub: contractTerms.merchant_pub, + timestamp: contractTerms.timestamp, + wire_salt: depositGroup.wire.salt, + wire_transfer_deadline: contractTerms.wire_transfer_deadline, + refund_deadline: contractTerms.refund_deadline, + }; + + for (let i = 0; i < depositPermissions.length; i++) { + const perm = depositPermissions[i]; + if (perm.exchange_url != exchangeUrl) { + continue; + } + coins.push({ + coin_pub: perm.coin_pub, + coin_sig: perm.coin_sig, + contribution: Amounts.stringify(perm.contribution), + denom_pub_hash: perm.h_denom, + ub_sig: perm.ub_sig, + h_age_commitment: perm.h_age_commitment, + }); + batchIndexes.push(i); + } + + // Check for cancellation before making network request. + cancellationToken?.throwIfCancelled(); + const url = new URL(`batch-deposit`, exchangeUrl); + logger.info(`depositing to ${url.href}`); + logger.trace(`deposit request: ${j2s(batchReq)}`); + const httpResp = await ws.http.fetch(url.href, { + method: "POST", + body: batchReq, + cancellationToken: cancellationToken, + }); + await readSuccessResponseJsonOrThrow( + httpResp, + codecForBatchDepositSuccess(), + ); + + await ws.db.runReadWriteTx(["depositGroups"], async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return; + } + for (const batchIndex of batchIndexes) { + const coinStatus = dg.statusPerCoin[batchIndex]; + switch (coinStatus) { + case DepositElementStatus.DepositPending: + dg.statusPerCoin[batchIndex] = DepositElementStatus.Tracking; + await tx.depositGroups.put(dg); + } + } + }); + } + + const transitionInfo = await ws.db.runReadWriteTx( + ["depositGroups"], + async (tx) => { + const dg = await tx.depositGroups.get(depositGroupId); + if (!dg) { + return undefined; + } + const oldTxState = computeDepositTransactionStatus(dg); + dg.operationStatus = DepositOperationStatus.PendingTrack; + await tx.depositGroups.put(dg); + const newTxState = computeDepositTransactionStatus(dg); + return { oldTxState, newTxState }; + }, + ); + + notifyTransition(ws, transactionId, transitionInfo); + return TaskRunResult.progress(); +} + +/** + * Process a deposit group that is not in its final state yet. + */ +export async function processDepositGroup( + ws: InternalWalletState, + depositGroupId: string, + cancellationToken: CancellationToken, +): Promise { + const depositGroup = await ws.db.runReadOnlyTx( + ["depositGroups"], + async (tx) => { + return tx.depositGroups.get(depositGroupId); + }, + ); + if (!depositGroup) { + logger.warn(`deposit group ${depositGroupId} not found`); + return TaskRunResult.finished(); + } + + switch (depositGroup.operationStatus) { + case DepositOperationStatus.PendingTrack: + return processDepositGroupPendingTrack( + ws, + depositGroup, + cancellationToken, + ); + case DepositOperationStatus.PendingKyc: + return processDepositGroupPendingKyc(ws, depositGroup, cancellationToken); + case DepositOperationStatus.PendingDeposit: + return processDepositGroupPendingDeposit( + ws, + depositGroup, + cancellationToken, + ); + case DepositOperationStatus.Aborting: + return processDepositGroupAborting(ws, depositGroup); + } + + return TaskRunResult.finished(); +} + +/** + * FIXME: Consider moving this to exchanges.ts. + */ +async function getExchangeWireFee( + ws: InternalWalletState, + wireType: string, + baseUrl: string, + time: TalerProtocolTimestamp, +): Promise { + const exchangeDetails = await ws.db.runReadOnlyTx( + ["exchangeDetails", "exchanges"], + async (tx) => { + const ex = await tx.exchanges.get(baseUrl); + if (!ex || !ex.detailsPointer) return undefined; + return await tx.exchangeDetails.indexes.byPointer.get([ + baseUrl, + ex.detailsPointer.currency, + ex.detailsPointer.masterPublicKey, + ]); + }, + ); + + if (!exchangeDetails) { + throw Error(`exchange missing: ${baseUrl}`); + } + + const fees = exchangeDetails.wireInfo.feesForType[wireType]; + if (!fees || fees.length === 0) { + throw Error( + `exchange ${baseUrl} doesn't have fees for wire type ${wireType}`, + ); + } + const fee = fees.find((x) => { + return AbsoluteTime.isBetween( + AbsoluteTime.fromProtocolTimestamp(time), + AbsoluteTime.fromProtocolTimestamp(x.startStamp), + AbsoluteTime.fromProtocolTimestamp(x.endStamp), + ); + }); + if (!fee) { + throw Error( + `exchange ${exchangeDetails.exchangeBaseUrl} doesn't have fees for wire type ${wireType} at ${time.t_s}`, + ); + } + + return fee; +} + +async function trackDeposit( + ws: InternalWalletState, + depositGroup: DepositGroupRecord, + coinPub: string, + exchangeUrl: string, +): Promise { + const wireHash = hashWire( + depositGroup.wire.payto_uri, + depositGroup.wire.salt, + ); + + const url = new URL( + `deposits/${wireHash}/${depositGroup.merchantPub}/${depositGroup.contractTermsHash}/${coinPub}`, + exchangeUrl, + ); + const sigResp = await ws.cryptoApi.signTrackTransaction({ + coinPub, + contractTermsHash: depositGroup.contractTermsHash, + merchantPriv: depositGroup.merchantPriv, + merchantPub: depositGroup.merchantPub, + wireHash, + }); + url.searchParams.set("merchant_sig", sigResp.sig); + const httpResp = await ws.http.fetch(url.href, { method: "GET" }); + logger.trace(`deposits response status: ${httpResp.status}`); + switch (httpResp.status) { + case HttpStatusCode.Accepted: { + const accepted = await readSuccessResponseJsonOrThrow( + httpResp, + codecForTackTransactionAccepted(), + ); + return { type: "accepted", ...accepted }; + } + case HttpStatusCode.Ok: { + const wired = await readSuccessResponseJsonOrThrow( + httpResp, + codecForTackTransactionWired(), + ); + return { type: "wired", ...wired }; + } + default: { + throw Error( + `unexpected response from track-transaction (${httpResp.status})`, + ); + } + } +} + +/** + * Check if creating a deposit group is possible and calculate + * the associated fees. + * + * FIXME: This should be renamed to checkDepositGroup, + * as it doesn't prepare anything + */ +export async function prepareDepositGroup( + ws: InternalWalletState, + req: PrepareDepositRequest, +): Promise { + const p = parsePaytoUri(req.depositPaytoUri); + if (!p) { + throw Error("invalid payto URI"); + } + const amount = Amounts.parseOrThrow(req.amount); + + const exchangeInfos: { url: string; master_pub: string }[] = []; + + await ws.db.runReadOnlyTx(["exchangeDetails", "exchanges"], async (tx) => { + const allExchanges = await tx.exchanges.iter().toArray(); + for (const e of allExchanges) { + const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); + if (!details || amount.currency !== details.currency) { + continue; + } + exchangeInfos.push({ + master_pub: details.masterPublicKey, + url: e.baseUrl, + }); + } + }); + + const now = AbsoluteTime.now(); + const nowRounded = AbsoluteTime.toProtocolTimestamp(now); + const contractTerms: MerchantContractTerms = { + exchanges: exchangeInfos, + amount: req.amount, + max_fee: Amounts.stringify(amount), + max_wire_fee: Amounts.stringify(amount), + wire_method: p.targetType, + timestamp: nowRounded, + merchant_base_url: "", + summary: "", + nonce: "", + wire_transfer_deadline: nowRounded, + order_id: "", + h_wire: "", + pay_deadline: AbsoluteTime.toProtocolTimestamp( + AbsoluteTime.addDuration(now, durationFromSpec({ hours: 1 })), + ), + merchant: { + name: "(wallet)", + }, + merchant_pub: "", + refund_deadline: TalerProtocolTimestamp.zero(), + }; + + const { h: contractTermsHash } = await ws.cryptoApi.hashString({ + str: canonicalJson(contractTerms), + }); + + const contractData = extractContractData( + contractTerms, + contractTermsHash, + "", + ); + + const payCoinSel = await selectPayCoinsNew(ws, { + auditors: [], + exchanges: contractData.allowedExchanges, + wireMethod: contractData.wireMethod, + contractTermsAmount: Amounts.parseOrThrow(contractData.amount), + depositFeeLimit: Amounts.parseOrThrow(contractData.maxDepositFee), + wireFeeAmortization: contractData.wireFeeAmortization ?? 1, + wireFeeLimit: Amounts.parseOrThrow(contractData.maxWireFee), + prevPayCoins: [], + }); + + if (payCoinSel.type !== "success") { + throw TalerError.fromDetail( + TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: payCoinSel.insufficientBalanceDetails, + }, + ); + } + + const totalDepositCost = await getTotalPaymentCost(ws, payCoinSel.coinSel); + + const effectiveDepositAmount = await getCounterpartyEffectiveDepositAmount( + ws, + p.targetType, + payCoinSel.coinSel, + ); + + const fees = await getTotalFeesForDepositAmount( + ws, + p.targetType, + amount, + payCoinSel.coinSel, + ); + + return { + totalDepositCost: Amounts.stringify(totalDepositCost), + effectiveDepositAmount: Amounts.stringify(effectiveDepositAmount), + fees, + }; +} + +export function generateDepositGroupTxId(): string { + const depositGroupId = encodeCrock(getRandomBytes(32)); + return constructTransactionIdentifier({ + tag: TransactionType.Deposit, + depositGroupId: depositGroupId, + }); +} + +export async function createDepositGroup( + ws: InternalWalletState, + req: CreateDepositGroupRequest, +): Promise { + const p = parsePaytoUri(req.depositPaytoUri); + if (!p) { + throw Error("invalid payto URI"); + } + + const amount = Amounts.parseOrThrow(req.amount); + + const exchangeInfos: { url: string; master_pub: string }[] = []; + + await ws.db.runReadOnlyTx(["exchanges", "exchangeDetails"], async (tx) => { + const allExchanges = await tx.exchanges.iter().toArray(); + for (const e of allExchanges) { + const details = await getExchangeWireDetailsInTx(tx, e.baseUrl); + if (!details || amount.currency !== details.currency) { + continue; + } + exchangeInfos.push({ + master_pub: details.masterPublicKey, + url: e.baseUrl, + }); + } + }); + + const now = AbsoluteTime.now(); + const wireDeadline = AbsoluteTime.toProtocolTimestamp( + AbsoluteTime.addDuration(now, Duration.fromSpec({ minutes: 5 })), + ); + const nowRounded = AbsoluteTime.toProtocolTimestamp(now); + const noncePair = await ws.cryptoApi.createEddsaKeypair({}); + const merchantPair = await ws.cryptoApi.createEddsaKeypair({}); + const wireSalt = encodeCrock(getRandomBytes(16)); + const wireHash = hashWire(req.depositPaytoUri, wireSalt); + const contractTerms: MerchantContractTerms = { + exchanges: exchangeInfos, + amount: req.amount, + max_fee: Amounts.stringify(amount), + max_wire_fee: Amounts.stringify(amount), + wire_method: p.targetType, + timestamp: nowRounded, + merchant_base_url: "", + summary: "", + nonce: noncePair.pub, + wire_transfer_deadline: wireDeadline, + order_id: "", + h_wire: wireHash, + pay_deadline: AbsoluteTime.toProtocolTimestamp( + AbsoluteTime.addDuration(now, durationFromSpec({ hours: 1 })), + ), + merchant: { + name: "(wallet)", + }, + merchant_pub: merchantPair.pub, + refund_deadline: TalerProtocolTimestamp.zero(), + }; + + const { h: contractTermsHash } = await ws.cryptoApi.hashString({ + str: canonicalJson(contractTerms), + }); + + const contractData = extractContractData( + contractTerms, + contractTermsHash, + "", + ); + + const payCoinSel = await selectPayCoinsNew(ws, { + auditors: [], + exchanges: contractData.allowedExchanges, + wireMethod: contractData.wireMethod, + contractTermsAmount: Amounts.parseOrThrow(contractData.amount), + depositFeeLimit: Amounts.parseOrThrow(contractData.maxDepositFee), + wireFeeAmortization: contractData.wireFeeAmortization ?? 1, + wireFeeLimit: Amounts.parseOrThrow(contractData.maxWireFee), + prevPayCoins: [], + }); + + if (payCoinSel.type !== "success") { + throw TalerError.fromDetail( + TalerErrorCode.WALLET_DEPOSIT_GROUP_INSUFFICIENT_BALANCE, + { + insufficientBalanceDetails: payCoinSel.insufficientBalanceDetails, + }, + ); + } + + const totalDepositCost = await getTotalPaymentCost(ws, payCoinSel.coinSel); + + let depositGroupId: string; + if (req.transactionId) { + const txId = parseTransactionIdentifier(req.transactionId); + if (!txId || txId.tag !== TransactionType.Deposit) { + throw Error("invalid transaction ID"); + } + depositGroupId = txId.depositGroupId; + } else { + depositGroupId = encodeCrock(getRandomBytes(32)); + } + + const counterpartyEffectiveDepositAmount = + await getCounterpartyEffectiveDepositAmount( + ws, + p.targetType, + payCoinSel.coinSel, + ); + + const depositGroup: DepositGroupRecord = { + contractTermsHash, + depositGroupId, + currency: Amounts.currencyOf(totalDepositCost), + amount: contractData.amount, + noncePriv: noncePair.priv, + noncePub: noncePair.pub, + timestampCreated: timestampPreciseToDb( + AbsoluteTime.toPreciseTimestamp(now), + ), + timestampFinished: undefined, + statusPerCoin: payCoinSel.coinSel.coinPubs.map( + () => DepositElementStatus.DepositPending, + ), + payCoinSelection: payCoinSel.coinSel, + payCoinSelectionUid: encodeCrock(getRandomBytes(32)), + merchantPriv: merchantPair.priv, + merchantPub: merchantPair.pub, + totalPayCost: Amounts.stringify(totalDepositCost), + counterpartyEffectiveDepositAmount: Amounts.stringify( + counterpartyEffectiveDepositAmount, + ), + wireTransferDeadline: timestampProtocolToDb( + contractTerms.wire_transfer_deadline, + ), + wire: { + payto_uri: req.depositPaytoUri, + salt: wireSalt, + }, + operationStatus: DepositOperationStatus.PendingDeposit, + }; + + const ctx = new DepositTransactionContext(ws, depositGroupId); + const transactionId = ctx.transactionId; + + const newTxState = await ws.db.runReadWriteTx( + [ + "depositGroups", + "coins", + "recoupGroups", + "denominations", + "refreshGroups", + "coinAvailability", + "contractTerms", + ], + async (tx) => { + await spendCoins(ws, tx, { + allocationId: transactionId, + coinPubs: payCoinSel.coinSel.coinPubs, + contributions: payCoinSel.coinSel.coinContributions.map((x) => + Amounts.parseOrThrow(x), + ), + refreshReason: RefreshReason.PayDeposit, + }); + await tx.depositGroups.put(depositGroup); + await tx.contractTerms.put({ + contractTermsRaw: contractTerms, + h: contractTermsHash, + }); + return computeDepositTransactionStatus(depositGroup); + }, + ); + + ws.notify({ + type: NotificationType.TransactionStateTransition, + transactionId, + oldTxState: { + major: TransactionMajorState.None, + }, + newTxState, + }); + + ws.notify({ + type: NotificationType.BalanceChange, + hintTransactionId: transactionId, + }); + + ws.taskScheduler.startShepherdTask(ctx.taskId); + + return { + depositGroupId, + transactionId, + }; +} + +/** + * Get the amount that will be deposited on the users bank + * account after depositing, not considering aggregation. + */ +export async function getCounterpartyEffectiveDepositAmount( + ws: InternalWalletState, + wireType: string, + pcs: PayCoinSelection, +): Promise { + const amt: AmountJson[] = []; + const fees: AmountJson[] = []; + const exchangeSet: Set = new Set(); + + await ws.db.runReadOnlyTx( + ["coins", "denominations", "exchangeDetails", "exchanges"], + async (tx) => { + for (let i = 0; i < pcs.coinPubs.length; i++) { + const coin = await tx.coins.get(pcs.coinPubs[i]); + if (!coin) { + throw Error("can't calculate deposit amount, coin not found"); + } + const denom = await ws.getDenomInfo( + ws, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + if (!denom) { + throw Error("can't find denomination to calculate deposit amount"); + } + amt.push(Amounts.parseOrThrow(pcs.coinContributions[i])); + fees.push(Amounts.parseOrThrow(denom.feeDeposit)); + exchangeSet.add(coin.exchangeBaseUrl); + } + + for (const exchangeUrl of exchangeSet.values()) { + const exchangeDetails = await getExchangeWireDetailsInTx( + tx, + exchangeUrl, + ); + if (!exchangeDetails) { + continue; + } + + // FIXME/NOTE: the line below _likely_ throws exception + // about "find method not found on undefined" when the wireType + // is not supported by the Exchange. + const fee = exchangeDetails.wireInfo.feesForType[wireType].find((x) => { + return AbsoluteTime.isBetween( + AbsoluteTime.now(), + AbsoluteTime.fromProtocolTimestamp(x.startStamp), + AbsoluteTime.fromProtocolTimestamp(x.endStamp), + ); + })?.wireFee; + if (fee) { + fees.push(Amounts.parseOrThrow(fee)); + } + } + }, + ); + return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount; +} + +/** + * Get the fee amount that will be charged when trying to deposit the + * specified amount using the selected coins and the wire method. + */ +async function getTotalFeesForDepositAmount( + ws: InternalWalletState, + wireType: string, + total: AmountJson, + pcs: PayCoinSelection, +): Promise { + const wireFee: AmountJson[] = []; + const coinFee: AmountJson[] = []; + const refreshFee: AmountJson[] = []; + const exchangeSet: Set = new Set(); + const currency = Amounts.currencyOf(total); + + await ws.db.runReadOnlyTx( + ["coins", "denominations", "exchanges", "exchangeDetails"], + async (tx) => { + for (let i = 0; i < pcs.coinPubs.length; i++) { + const coin = await tx.coins.get(pcs.coinPubs[i]); + if (!coin) { + throw Error("can't calculate deposit amount, coin not found"); + } + const denom = await ws.getDenomInfo( + ws, + tx, + coin.exchangeBaseUrl, + coin.denomPubHash, + ); + if (!denom) { + throw Error("can't find denomination to calculate deposit amount"); + } + coinFee.push(Amounts.parseOrThrow(denom.feeDeposit)); + exchangeSet.add(coin.exchangeBaseUrl); + + const allDenoms = await getCandidateWithdrawalDenomsTx( + ws, + tx, + coin.exchangeBaseUrl, + currency, + ); + const amountLeft = Amounts.sub( + denom.value, + pcs.coinContributions[i], + ).amount; + const refreshCost = getTotalRefreshCost( + allDenoms, + denom, + amountLeft, + ws.config.testing.denomselAllowLate, + ); + refreshFee.push(refreshCost); + } + + for (const exchangeUrl of exchangeSet.values()) { + const exchangeDetails = await getExchangeWireDetailsInTx( + tx, + exchangeUrl, + ); + if (!exchangeDetails) { + continue; + } + const fee = exchangeDetails.wireInfo.feesForType[wireType]?.find( + (x) => { + return AbsoluteTime.isBetween( + AbsoluteTime.now(), + AbsoluteTime.fromProtocolTimestamp(x.startStamp), + AbsoluteTime.fromProtocolTimestamp(x.endStamp), + ); + }, + )?.wireFee; + if (fee) { + wireFee.push(Amounts.parseOrThrow(fee)); + } + } + }, + ); + + return { + coin: Amounts.stringify(Amounts.sumOrZero(total.currency, coinFee).amount), + wire: Amounts.stringify(Amounts.sumOrZero(total.currency, wireFee).amount), + refresh: Amounts.stringify( + Amounts.sumOrZero(total.currency, refreshFee).amount, + ), + }; +} -- cgit v1.2.3