diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/reserves.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/reserves.ts | 334 |
1 files changed, 202 insertions, 132 deletions
diff --git a/packages/taler-wallet-core/src/operations/reserves.ts b/packages/taler-wallet-core/src/operations/reserves.ts index a2482db70..73975fb03 100644 --- a/packages/taler-wallet-core/src/operations/reserves.ts +++ b/packages/taler-wallet-core/src/operations/reserves.ts @@ -34,11 +34,11 @@ import { } from "@gnu-taler/taler-util"; import { randomBytes } from "../crypto/primitives/nacl-fast.js"; import { - Stores, ReserveRecordStatus, ReserveBankInfo, ReserveRecord, WithdrawalGroupRecord, + WalletStoresV1, } from "../db.js"; import { assertUnreachable } from "../util/assertUnreachable.js"; import { canonicalizeBaseUrl } from "@gnu-taler/taler-util"; @@ -65,9 +65,13 @@ import { import { getExchangeTrust } from "./currencies.js"; import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto.js"; import { Logger } from "@gnu-taler/taler-util"; -import { readSuccessResponseJsonOrErrorCode, readSuccessResponseJsonOrThrow, throwUnexpectedRequestError } from "../util/http.js"; +import { + readSuccessResponseJsonOrErrorCode, + readSuccessResponseJsonOrThrow, + throwUnexpectedRequestError, +} from "../util/http.js"; import { URL } from "../util/url.js"; -import { TransactionHandle } from "../util/query.js"; +import { GetReadOnlyAccess } from "../util/query.js"; const logger = new Logger("reserves.ts"); @@ -75,12 +79,17 @@ async function resetReserveRetry( ws: InternalWalletState, reservePub: string, ): Promise<void> { - await ws.db.mutate(Stores.reserves, reservePub, (x) => { - if (x.retryInfo.active) { - x.retryInfo = initRetryInfo(); - } - return x; - }); + await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadWrite(async (tx) => { + const x = await tx.reserves.get(reservePub); + if (x && x.retryInfo.active) { + x.retryInfo = initRetryInfo(); + await tx.reserves.put(x); + } + }); } /** @@ -157,17 +166,20 @@ export async function createReserve( exchangeInfo.exchange, ); - const resp = await ws.db.runWithWriteTransaction( - [Stores.exchangeTrustStore, Stores.reserves, Stores.bankWithdrawUris], - async (tx) => { + const resp = await ws.db + .mktx((x) => ({ + exchangeTrust: x.exchangeTrust, + reserves: x.reserves, + bankWithdrawUris: x.bankWithdrawUris, + })) + .runReadWrite(async (tx) => { // Check if we have already created a reserve for that bankWithdrawStatusUrl if (reserveRecord.bankInfo?.statusUrl) { - const bwi = await tx.get( - Stores.bankWithdrawUris, + const bwi = await tx.bankWithdrawUris.get( reserveRecord.bankInfo.statusUrl, ); if (bwi) { - const otherReserve = await tx.get(Stores.reserves, bwi.reservePub); + const otherReserve = await tx.reserves.get(bwi.reservePub); if (otherReserve) { logger.trace( "returning existing reserve for bankWithdrawStatusUri", @@ -178,27 +190,26 @@ export async function createReserve( }; } } - await tx.put(Stores.bankWithdrawUris, { + await tx.bankWithdrawUris.put({ reservePub: reserveRecord.reservePub, talerWithdrawUri: reserveRecord.bankInfo.statusUrl, }); } if (!isAudited && !isTrusted) { - await tx.put(Stores.exchangeTrustStore, { + await tx.exchangeTrust.put({ currency: reserveRecord.currency, exchangeBaseUrl: reserveRecord.exchangeBaseUrl, exchangeMasterPub: exchangeDetails.masterPublicKey, uids: [encodeCrock(getRandomBytes(32))], }); } - await tx.put(Stores.reserves, reserveRecord); + await tx.reserves.put(reserveRecord); const r: CreateReserveResponse = { exchange: canonExchange, reservePub: keypair.pub, }; return r; - }, - ); + }); if (reserveRecord.reservePub === resp.reservePub) { // Only emit notification when a new reserve was created. @@ -224,23 +235,27 @@ export async function forceQueryReserve( ws: InternalWalletState, reservePub: string, ): Promise<void> { - await ws.db.runWithWriteTransaction([Stores.reserves], async (tx) => { - const reserve = await tx.get(Stores.reserves, reservePub); - if (!reserve) { - return; - } - // Only force status query where it makes sense - switch (reserve.reserveStatus) { - case ReserveRecordStatus.DORMANT: - reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; - break; - default: - reserve.requestedQuery = true; - break; - } - reserve.retryInfo = initRetryInfo(); - await tx.put(Stores.reserves, reserve); - }); + await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadWrite(async (tx) => { + const reserve = await tx.reserves.get(reservePub); + if (!reserve) { + return; + } + // Only force status query where it makes sense + switch (reserve.reserveStatus) { + case ReserveRecordStatus.DORMANT: + reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + break; + default: + reserve.requestedQuery = true; + break; + } + reserve.retryInfo = initRetryInfo(); + await tx.reserves.put(reserve); + }); await processReserve(ws, reservePub, true); } @@ -270,7 +285,13 @@ async function registerReserveWithBank( ws: InternalWalletState, reservePub: string, ): Promise<void> { - const reserve = await ws.db.get(Stores.reserves, reservePub); + const reserve = await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadOnly(async (tx) => { + return await tx.reserves.get(reservePub); + }); switch (reserve?.reserveStatus) { case ReserveRecordStatus.WAIT_CONFIRM_BANK: case ReserveRecordStatus.REGISTERING_BANK: @@ -297,22 +318,30 @@ async function registerReserveWithBank( httpResp, codecForBankWithdrawalOperationPostResponse(), ); - await ws.db.mutate(Stores.reserves, reservePub, (r) => { - switch (r.reserveStatus) { - case ReserveRecordStatus.REGISTERING_BANK: - case ReserveRecordStatus.WAIT_CONFIRM_BANK: - break; - default: + await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadWrite(async (tx) => { + const r = await tx.reserves.get(reservePub); + if (!r) { return; - } - r.timestampReserveInfoPosted = getTimestampNow(); - r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK; - if (!r.bankInfo) { - throw Error("invariant failed"); - } - r.retryInfo = initRetryInfo(); - return r; - }); + } + switch (r.reserveStatus) { + case ReserveRecordStatus.REGISTERING_BANK: + case ReserveRecordStatus.WAIT_CONFIRM_BANK: + break; + default: + return; + } + r.timestampReserveInfoPosted = getTimestampNow(); + r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK; + if (!r.bankInfo) { + throw Error("invariant failed"); + } + r.retryInfo = initRetryInfo(); + await tx.reserves.put(r); + }); ws.notify({ type: NotificationType.ReserveRegisteredWithBank }); return processReserveBankStatus(ws, reservePub); } @@ -340,7 +369,13 @@ async function processReserveBankStatusImpl( ws: InternalWalletState, reservePub: string, ): Promise<void> { - const reserve = await ws.db.get(Stores.reserves, reservePub); + const reserve = await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadOnly(async (tx) => { + return tx.reserves.get(reservePub); + }); switch (reserve?.reserveStatus) { case ReserveRecordStatus.WAIT_CONFIRM_BANK: case ReserveRecordStatus.REGISTERING_BANK: @@ -363,20 +398,28 @@ async function processReserveBankStatusImpl( if (status.aborted) { logger.trace("bank aborted the withdrawal"); - await ws.db.mutate(Stores.reserves, reservePub, (r) => { - switch (r.reserveStatus) { - case ReserveRecordStatus.REGISTERING_BANK: - case ReserveRecordStatus.WAIT_CONFIRM_BANK: - break; - default: + await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadWrite(async (tx) => { + const r = await tx.reserves.get(reservePub); + if (!r) { return; - } - const now = getTimestampNow(); - r.timestampBankConfirmed = now; - r.reserveStatus = ReserveRecordStatus.BANK_ABORTED; - r.retryInfo = initRetryInfo(); - return r; - }); + } + switch (r.reserveStatus) { + case ReserveRecordStatus.REGISTERING_BANK: + case ReserveRecordStatus.WAIT_CONFIRM_BANK: + break; + default: + return; + } + const now = getTimestampNow(); + r.timestampBankConfirmed = now; + r.reserveStatus = ReserveRecordStatus.BANK_ABORTED; + r.retryInfo = initRetryInfo(); + await tx.reserves.put(r); + }); return; } @@ -390,37 +433,40 @@ async function processReserveBankStatusImpl( return await processReserveBankStatus(ws, reservePub); } - if (status.transfer_done) { - await ws.db.mutate(Stores.reserves, reservePub, (r) => { - switch (r.reserveStatus) { - case ReserveRecordStatus.REGISTERING_BANK: - case ReserveRecordStatus.WAIT_CONFIRM_BANK: - break; - default: - return; - } - const now = getTimestampNow(); - r.timestampBankConfirmed = now; - r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; - r.retryInfo = initRetryInfo(); - return r; - }); - await processReserveImpl(ws, reservePub, true); - } else { - await ws.db.mutate(Stores.reserves, reservePub, (r) => { - switch (r.reserveStatus) { - case ReserveRecordStatus.WAIT_CONFIRM_BANK: - break; - default: - return; + await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadWrite(async (tx) => { + const r = await tx.reserves.get(reservePub); + if (!r) { + return; } - if (r.bankInfo) { - r.bankInfo.confirmUrl = status.confirm_transfer_url; + if (status.transfer_done) { + switch (r.reserveStatus) { + case ReserveRecordStatus.REGISTERING_BANK: + case ReserveRecordStatus.WAIT_CONFIRM_BANK: + break; + default: + return; + } + const now = getTimestampNow(); + r.timestampBankConfirmed = now; + r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS; + r.retryInfo = initRetryInfo(); + } else { + switch (r.reserveStatus) { + case ReserveRecordStatus.WAIT_CONFIRM_BANK: + break; + default: + return; + } + if (r.bankInfo) { + r.bankInfo.confirmUrl = status.confirm_transfer_url; + } } - return r; + await tx.reserves.put(r); }); - await incrementReserveRetry(ws, reservePub, undefined); - } } async function incrementReserveRetry( @@ -428,19 +474,23 @@ async function incrementReserveRetry( reservePub: string, err: TalerErrorDetails | undefined, ): Promise<void> { - await ws.db.runWithWriteTransaction([Stores.reserves], async (tx) => { - const r = await tx.get(Stores.reserves, reservePub); - if (!r) { - return; - } - if (!r.retryInfo) { - return; - } - r.retryInfo.retryCounter++; - updateRetryInfoTimeout(r.retryInfo); - r.lastError = err; - await tx.put(Stores.reserves, r); - }); + await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadWrite(async (tx) => { + const r = await tx.reserves.get(reservePub); + if (!r) { + return; + } + if (!r.retryInfo) { + return; + } + r.retryInfo.retryCounter++; + updateRetryInfoTimeout(r.retryInfo); + r.lastError = err; + await tx.reserves.put(r); + }); if (err) { ws.notify({ type: NotificationType.ReserveOperationError, @@ -461,7 +511,13 @@ async function updateReserve( ws: InternalWalletState, reservePub: string, ): Promise<{ ready: boolean }> { - const reserve = await ws.db.get(Stores.reserves, reservePub); + const reserve = await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadOnly(async (tx) => { + return tx.reserves.get(reservePub); + }); if (!reserve) { throw Error("reserve not in db"); } @@ -508,10 +564,15 @@ async function updateReserve( reserve.exchangeBaseUrl, ); - const newWithdrawalGroup = await ws.db.runWithWriteTransaction( - [Stores.coins, Stores.planchets, Stores.withdrawalGroups, Stores.reserves], - async (tx) => { - const newReserve = await tx.get(Stores.reserves, reserve.reservePub); + const newWithdrawalGroup = await ws.db + .mktx((x) => ({ + coins: x.coins, + planchets: x.planchets, + withdrawalGroups: x.withdrawalGroups, + reserves: x.reserves, + })) + .runReadWrite(async (tx) => { + const newReserve = await tx.reserves.get(reserve.reservePub); if (!newReserve) { return; } @@ -519,8 +580,8 @@ async function updateReserve( let amountReserveMinus = Amounts.getZero(currency); // Subtract withdrawal groups for this reserve from the available amount. - await tx - .iterIndexed(Stores.withdrawalGroups.byReservePub, reservePub) + await tx.withdrawalGroups.indexes.byReservePub + .iter(reservePub) .forEach((wg) => { const cost = wg.denomsSel.totalWithdrawCost; amountReserveMinus = Amounts.add(amountReserveMinus, cost).amount; @@ -549,16 +610,14 @@ async function updateReserve( case ReserveTransactionType.Withdraw: { // Now we check if the withdrawal transaction // is part of any withdrawal known to this wallet. - const planchet = await tx.getIndexed( - Stores.planchets.coinEvHashIndex, + const planchet = await tx.planchets.indexes.byCoinEvHash.get( entry.h_coin_envelope, ); if (planchet) { // Amount is already accounted in some withdrawal session break; } - const coin = await tx.getIndexed( - Stores.coins.coinEvHashIndex, + const coin = await tx.coins.indexes.byCoinEvHash.get( entry.h_coin_envelope, ); if (coin) { @@ -594,7 +653,7 @@ async function updateReserve( newReserve.reserveStatus = ReserveRecordStatus.DORMANT; newReserve.lastError = undefined; newReserve.retryInfo = initRetryInfo(false); - await tx.put(Stores.reserves, newReserve); + await tx.reserves.put(newReserve); return; } @@ -624,11 +683,10 @@ async function updateReserve( newReserve.retryInfo = initRetryInfo(false); newReserve.reserveStatus = ReserveRecordStatus.DORMANT; - await tx.put(Stores.reserves, newReserve); - await tx.put(Stores.withdrawalGroups, withdrawalRecord); + await tx.reserves.put(newReserve); + await tx.withdrawalGroups.put(withdrawalRecord); return withdrawalRecord; - }, - ); + }); if (newWithdrawalGroup) { logger.trace("processing new withdraw group"); @@ -647,7 +705,13 @@ async function processReserveImpl( reservePub: string, forceNow = false, ): Promise<void> { - const reserve = await ws.db.get(Stores.reserves, reservePub); + const reserve = await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadOnly(async (tx) => { + return tx.reserves.get(reservePub); + }); if (!reserve) { logger.trace("not processing reserve: reserve does not exist"); return; @@ -712,7 +776,13 @@ export async function createTalerWithdrawReserve( // We do this here, as the reserve should be registered before we return, // so that we can redirect the user to the bank's status page. await processReserveBankStatus(ws, reserve.reservePub); - const processedReserve = await ws.db.get(Stores.reserves, reserve.reservePub); + const processedReserve = await ws.db + .mktx((x) => ({ + reserves: x.reserves, + })) + .runReadOnly(async (tx) => { + return tx.reserves.get(reserve.reservePub); + }); if (processedReserve?.reserveStatus === ReserveRecordStatus.BANK_ABORTED) { throw OperationFailedError.fromCode( TalerErrorCode.WALLET_WITHDRAWAL_OPERATION_ABORTED_BY_BANK, @@ -730,14 +800,14 @@ export async function createTalerWithdrawReserve( * Get payto URIs needed to fund a reserve. */ export async function getFundingPaytoUris( - tx: TransactionHandle< - | typeof Stores.reserves - | typeof Stores.exchanges - | typeof Stores.exchangeDetails - >, + tx: GetReadOnlyAccess<{ + reserves: typeof WalletStoresV1.reserves; + exchanges: typeof WalletStoresV1.exchanges; + exchangeDetails: typeof WalletStoresV1.exchangeDetails; + }>, reservePub: string, ): Promise<string[]> { - const r = await tx.get(Stores.reserves, reservePub); + const r = await tx.reserves.get(reservePub); if (!r) { logger.error(`reserve ${reservePub} not found (DB corrupted?)`); return []; |