From 526f4eba9554f27e33afb0e02d19d870825b038c Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Sat, 8 Oct 2022 20:56:57 +0200 Subject: wallet-core: Clean up merchant payments DB schema --- .../taler-wallet-core/src/operations/common.ts | 292 +++++++++++++++++++-- 1 file changed, 263 insertions(+), 29 deletions(-) (limited to 'packages/taler-wallet-core/src/operations/common.ts') diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts index 6d54503a1..9f235c9b4 100644 --- a/packages/taler-wallet-core/src/operations/common.ts +++ b/packages/taler-wallet-core/src/operations/common.ts @@ -17,38 +17,272 @@ /** * Imports. */ -import { TalerErrorDetail, TalerErrorCode } from "@gnu-taler/taler-util"; -import { CryptoApiStoppedError } from "../crypto/workers/cryptoDispatcher.js"; -import { TalerError, getErrorDetailFromException } from "../errors.js"; +import { + AmountJson, + Amounts, + j2s, + Logger, + RefreshReason, + TalerErrorCode, + TalerErrorDetail, + TransactionType, +} from "@gnu-taler/taler-util"; +import { WalletStoresV1, CoinStatus, CoinRecord } from "../db.js"; +import { makeErrorDetail, TalerError } from "../errors.js"; +import { InternalWalletState } from "../internal-wallet-state.js"; +import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; +import { GetReadWriteAccess } from "../util/query.js"; +import { + OperationAttemptResult, + OperationAttemptResultType, + RetryInfo, +} from "../util/retries.js"; +import { createRefreshGroup } from "./refresh.js"; -/** - * Run an operation and call the onOpError callback - * when there was an exception or operation error that must be reported. - * The cause will be re-thrown to the caller. - */ -export async function guardOperationException( - op: () => Promise, - onOpError: (e: TalerErrorDetail) => Promise, -): Promise { +const logger = new Logger("operations/common.ts"); + +export interface CoinsSpendInfo { + coinPubs: string[]; + contributions: AmountJson[]; + refreshReason: RefreshReason; + /** + * Identifier for what the coin has been spent for. + */ + allocationId: string; +} + +export async function makeCoinAvailable( + ws: InternalWalletState, + tx: GetReadWriteAccess<{ + coins: typeof WalletStoresV1.coins; + coinAvailability: typeof WalletStoresV1.coinAvailability; + denominations: typeof WalletStoresV1.denominations; + }>, + coinRecord: CoinRecord, +): Promise { + checkLogicInvariant(coinRecord.status === CoinStatus.Fresh); + const existingCoin = await tx.coins.get(coinRecord.coinPub); + if (existingCoin) { + return; + } + const denom = await tx.denominations.get([ + coinRecord.exchangeBaseUrl, + coinRecord.denomPubHash, + ]); + checkDbInvariant(!!denom); + const ageRestriction = coinRecord.maxAge; + let car = await tx.coinAvailability.get([ + coinRecord.exchangeBaseUrl, + coinRecord.denomPubHash, + ageRestriction, + ]); + if (!car) { + car = { + maxAge: ageRestriction, + amountFrac: denom.amountFrac, + amountVal: denom.amountVal, + currency: denom.currency, + denomPubHash: denom.denomPubHash, + exchangeBaseUrl: denom.exchangeBaseUrl, + freshCoinCount: 0, + }; + } + car.freshCoinCount++; + await tx.coins.put(coinRecord); + await tx.coinAvailability.put(car); +} + +export async function spendCoins( + ws: InternalWalletState, + tx: GetReadWriteAccess<{ + coins: typeof WalletStoresV1.coins; + coinAvailability: typeof WalletStoresV1.coinAvailability; + refreshGroups: typeof WalletStoresV1.refreshGroups; + denominations: typeof WalletStoresV1.denominations; + }>, + csi: CoinsSpendInfo, +): Promise { + for (let i = 0; i < csi.coinPubs.length; i++) { + const coin = await tx.coins.get(csi.coinPubs[i]); + if (!coin) { + throw Error("coin allocated for payment doesn't exist anymore"); + } + const coinAvailability = await tx.coinAvailability.get([ + coin.exchangeBaseUrl, + coin.denomPubHash, + coin.maxAge, + ]); + checkDbInvariant(!!coinAvailability); + const contrib = csi.contributions[i]; + if (coin.status !== CoinStatus.Fresh) { + const alloc = coin.allocation; + if (!alloc) { + continue; + } + if (alloc.id !== csi.allocationId) { + // FIXME: assign error code + throw Error("conflicting coin allocation (id)"); + } + if (0 !== Amounts.cmp(alloc.amount, contrib)) { + // FIXME: assign error code + throw Error("conflicting coin allocation (contrib)"); + } + continue; + } + coin.status = CoinStatus.Dormant; + coin.allocation = { + id: csi.allocationId, + amount: Amounts.stringify(contrib), + }; + const remaining = Amounts.sub(coin.currentAmount, contrib); + if (remaining.saturated) { + throw Error("not enough remaining balance on coin for payment"); + } + coin.currentAmount = remaining.amount; + checkDbInvariant(!!coinAvailability); + if (coinAvailability.freshCoinCount === 0) { + throw Error( + `invalid coin count ${coinAvailability.freshCoinCount} in DB`, + ); + } + coinAvailability.freshCoinCount--; + await tx.coins.put(coin); + await tx.coinAvailability.put(coinAvailability); + } + const refreshCoinPubs = csi.coinPubs.map((x) => ({ + coinPub: x, + })); + await ws.refreshOps.createRefreshGroup( + ws, + tx, + refreshCoinPubs, + RefreshReason.PayMerchant, + ); +} + +export async function storeOperationError( + ws: InternalWalletState, + pendingTaskId: string, + e: TalerErrorDetail, +): Promise { + await ws.db + .mktx((x) => [x.operationRetries]) + .runReadWrite(async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + if (!retryRecord) { + retryRecord = { + id: pendingTaskId, + lastError: e, + retryInfo: RetryInfo.reset(), + }; + } else { + retryRecord.lastError = e; + retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); + } + await tx.operationRetries.put(retryRecord); + }); +} + +export async function storeOperationPending( + ws: InternalWalletState, + pendingTaskId: string, +): Promise { + await ws.db + .mktx((x) => [x.operationRetries]) + .runReadWrite(async (tx) => { + let retryRecord = await tx.operationRetries.get(pendingTaskId); + if (!retryRecord) { + retryRecord = { + id: pendingTaskId, + retryInfo: RetryInfo.reset(), + }; + } else { + delete retryRecord.lastError; + retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); + } + await tx.operationRetries.put(retryRecord); + }); +} + +export async function runOperationWithErrorReporting( + ws: InternalWalletState, + opId: string, + f: () => Promise, +): Promise { + let maybeError: TalerErrorDetail | undefined; try { - return await op(); - } catch (e: any) { - if (e instanceof CryptoApiStoppedError) { - throw e; + const resp = await f(); + switch (resp.type) { + case OperationAttemptResultType.Error: + return await storeOperationError(ws, opId, resp.errorDetail); + case OperationAttemptResultType.Finished: + return await storeOperationFinished(ws, opId); + case OperationAttemptResultType.Pending: + return await storeOperationPending(ws, opId); + case OperationAttemptResultType.Longpoll: + break; } - if ( - e instanceof TalerError && - e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED) - ) { - throw e; + } catch (e) { + if (e instanceof TalerError) { + logger.warn("operation processed resulted in error"); + logger.warn(`error was: ${j2s(e.errorDetail)}`); + maybeError = e.errorDetail; + return await storeOperationError(ws, opId, maybeError!); + } else if (e instanceof Error) { + // This is a bug, as we expect pending operations to always + // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED + // or return something. + logger.error(`Uncaught exception: ${e.message}`); + logger.error(`Stack: ${e.stack}`); + maybeError = makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + { + stack: e.stack, + }, + `unexpected exception (message: ${e.message})`, + ); + return await storeOperationError(ws, opId, maybeError); + } else { + logger.error("Uncaught exception, value is not even an error."); + maybeError = makeErrorDetail( + TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, + {}, + `unexpected exception (not even an error)`, + ); + return await storeOperationError(ws, opId, maybeError); } - const opErr = getErrorDetailFromException(e); - await onOpError(opErr); - throw TalerError.fromDetail( - TalerErrorCode.WALLET_PENDING_OPERATION_FAILED, - { - innerError: opErr, - }, - ); } } + +export async function storeOperationFinished( + ws: InternalWalletState, + pendingTaskId: string, +): Promise { + await ws.db + .mktx((x) => [x.operationRetries]) + .runReadWrite(async (tx) => { + await tx.operationRetries.delete(pendingTaskId); + }); +} + +export enum TombstoneTag { + DeleteWithdrawalGroup = "delete-withdrawal-group", + DeleteReserve = "delete-reserve", + DeletePayment = "delete-payment", + DeleteTip = "delete-tip", + DeleteRefreshGroup = "delete-refresh-group", + DeleteDepositGroup = "delete-deposit-group", + DeleteRefund = "delete-refund", + DeletePeerPullDebit = "delete-peer-pull-debit", + DeletePeerPushDebit = "delete-peer-push-debit", +} + +/** + * Create an event ID from the type and the primary key for the event. + */ +export function makeEventId( + type: TransactionType | TombstoneTag, + ...args: string[] +): string { + return type + ":" + args.map((x) => encodeURIComponent(x)).join(":"); +} -- cgit v1.2.3