aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-core/src/operations
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2021-06-09 15:14:17 +0200
committerFlorian Dold <florian@dold.me>2021-06-09 15:24:19 +0200
commit5c26461247040c07c86291babf0c87631df638b5 (patch)
tree8ff93454d0c21d2675d6734f210d4e8ff91d2bfb /packages/taler-wallet-core/src/operations
parent68dddc848f2f650d74697bb3a5c05d649e5db3c7 (diff)
downloadwallet-core-5c26461247040c07c86291babf0c87631df638b5.tar.xz
database access refactor
Diffstat (limited to 'packages/taler-wallet-core/src/operations')
-rw-r--r--packages/taler-wallet-core/src/operations/backup/export.ts65
-rw-r--r--packages/taler-wallet-core/src/operations/backup/import.ts134
-rw-r--r--packages/taler-wallet-core/src/operations/backup/index.ts169
-rw-r--r--packages/taler-wallet-core/src/operations/backup/state.ts63
-rw-r--r--packages/taler-wallet-core/src/operations/balance.ts50
-rw-r--r--packages/taler-wallet-core/src/operations/currencies.ts71
-rw-r--r--packages/taler-wallet-core/src/operations/deposits.ts173
-rw-r--r--packages/taler-wallet-core/src/operations/exchanges.ts141
-rw-r--r--packages/taler-wallet-core/src/operations/pay.ts830
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts80
-rw-r--r--packages/taler-wallet-core/src/operations/recoup.ts247
-rw-r--r--packages/taler-wallet-core/src/operations/refresh.ts514
-rw-r--r--packages/taler-wallet-core/src/operations/refund.ts257
-rw-r--r--packages/taler-wallet-core/src/operations/reserves.ts334
-rw-r--r--packages/taler-wallet-core/src/operations/state.ts22
-rw-r--r--packages/taler-wallet-core/src/operations/tip.ts169
-rw-r--r--packages/taler-wallet-core/src/operations/transactions.ts656
-rw-r--r--packages/taler-wallet-core/src/operations/withdraw.ts459
18 files changed, 2489 insertions, 1945 deletions
diff --git a/packages/taler-wallet-core/src/operations/backup/export.ts b/packages/taler-wallet-core/src/operations/backup/export.ts
index fa0af1b07..a6b2ff2a7 100644
--- a/packages/taler-wallet-core/src/operations/backup/export.ts
+++ b/packages/taler-wallet-core/src/operations/backup/export.ts
@@ -57,7 +57,6 @@ import {
} from "./state";
import { Amounts, getTimestampNow } from "@gnu-taler/taler-util";
import {
- Stores,
CoinSourceType,
CoinStatus,
RefundState,
@@ -66,29 +65,28 @@ import {
} from "../../db.js";
import { encodeCrock, stringToBytes, getRandomBytes } from "../../index.js";
import { canonicalizeBaseUrl, canonicalJson } from "@gnu-taler/taler-util";
-import { getExchangeDetails } from "../exchanges.js";
export async function exportBackup(
ws: InternalWalletState,
): Promise<WalletBackupContentV1> {
await provideBackupState(ws);
- return ws.db.runWithWriteTransaction(
- [
- Stores.config,
- Stores.exchanges,
- Stores.exchangeDetails,
- Stores.coins,
- Stores.denominations,
- Stores.purchases,
- Stores.proposals,
- Stores.refreshGroups,
- Stores.backupProviders,
- Stores.tips,
- Stores.recoupGroups,
- Stores.reserves,
- Stores.withdrawalGroups,
- ],
- async (tx) => {
+ return ws.db
+ .mktx((x) => ({
+ config: x.config,
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ coins: x.coins,
+ denominations: x.denominations,
+ purchases: x.purchases,
+ proposals: x.proposals,
+ refreshGroups: x.refreshGroups,
+ backupProviders: x.backupProviders,
+ tips: x.tips,
+ recoupGroups: x.recoupGroups,
+ reserves: x.reserves,
+ withdrawalGroups: x.withdrawalGroups,
+ }))
+ .runReadWrite(async (tx) => {
const bs = await getWalletBackupState(ws, tx);
const backupExchangeDetails: BackupExchangeDetails[] = [];
@@ -108,7 +106,7 @@ export async function exportBackup(
[reservePub: string]: BackupWithdrawalGroup[];
} = {};
- await tx.iter(Stores.withdrawalGroups).forEachAsync(async (wg) => {
+ await tx.withdrawalGroups.iter().forEachAsync(async (wg) => {
const withdrawalGroups = (withdrawalGroupsByReserve[
wg.reservePub
] ??= []);
@@ -126,7 +124,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.reserves).forEach((reserve) => {
+ await tx.reserves.iter().forEach((reserve) => {
const backupReserve: BackupReserve = {
initial_selected_denoms: reserve.initialDenomSel.selectedDenoms.map(
(x) => ({
@@ -149,7 +147,7 @@ export async function exportBackup(
backupReserves.push(backupReserve);
});
- await tx.iter(Stores.tips).forEach((tip) => {
+ await tx.tips.iter().forEach((tip) => {
backupTips.push({
exchange_base_url: tip.exchangeBaseUrl,
merchant_base_url: tip.merchantBaseUrl,
@@ -169,7 +167,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.recoupGroups).forEach((recoupGroup) => {
+ await tx.recoupGroups.iter().forEach((recoupGroup) => {
backupRecoupGroups.push({
recoup_group_id: recoupGroup.recoupGroupId,
timestamp_created: recoupGroup.timestampStarted,
@@ -182,7 +180,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.backupProviders).forEach((bp) => {
+ await tx.backupProviders.iter().forEach((bp) => {
let terms: BackupBackupProviderTerms | undefined;
if (bp.terms) {
terms = {
@@ -199,7 +197,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.coins).forEach((coin) => {
+ await tx.coins.iter().forEach((coin) => {
let bcs: BackupCoinSource;
switch (coin.coinSource.type) {
case CoinSourceType.Refresh:
@@ -236,7 +234,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.denominations).forEach((denom) => {
+ await tx.denominations.iter().forEach((denom) => {
const backupDenoms = (backupDenominationsByExchange[
denom.exchangeBaseUrl
] ??= []);
@@ -258,7 +256,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.exchanges).forEachAsync(async (ex) => {
+ await tx.exchanges.iter().forEachAsync(async (ex) => {
const dp = ex.detailsPointer;
if (!dp) {
return;
@@ -271,7 +269,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.exchangeDetails).forEachAsync(async (ex) => {
+ await tx.exchangeDetails.iter().forEachAsync(async (ex) => {
// Only back up permanently added exchanges.
const wi = ex.wireInfo;
@@ -323,7 +321,7 @@ export async function exportBackup(
const purchaseProposalIdSet = new Set<string>();
- await tx.iter(Stores.purchases).forEach((purch) => {
+ await tx.purchases.iter().forEach((purch) => {
const refunds: BackupRefundItem[] = [];
purchaseProposalIdSet.add(purch.proposalId);
for (const refundKey of Object.keys(purch.refunds)) {
@@ -376,7 +374,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.proposals).forEach((prop) => {
+ await tx.proposals.iter().forEach((prop) => {
if (purchaseProposalIdSet.has(prop.proposalId)) {
return;
}
@@ -413,7 +411,7 @@ export async function exportBackup(
});
});
- await tx.iter(Stores.refreshGroups).forEach((rg) => {
+ await tx.refreshGroups.iter().forEach((rg) => {
const oldCoins: BackupRefreshOldCoin[] = [];
for (let i = 0; i < rg.oldCoinPubs.length; i++) {
@@ -482,13 +480,12 @@ export async function exportBackup(
hash(stringToBytes(canonicalJson(backupBlob))),
);
bs.lastBackupNonce = encodeCrock(getRandomBytes(32));
- await tx.put(Stores.config, {
+ await tx.config.put({
key: WALLET_BACKUP_STATE_KEY,
value: bs,
});
}
return backupBlob;
- },
- );
+ });
}
diff --git a/packages/taler-wallet-core/src/operations/backup/import.ts b/packages/taler-wallet-core/src/operations/backup/import.ts
index 74b7a3b59..e024b76ab 100644
--- a/packages/taler-wallet-core/src/operations/backup/import.ts
+++ b/packages/taler-wallet-core/src/operations/backup/import.ts
@@ -29,7 +29,6 @@ import {
BackupRefreshReason,
} from "@gnu-taler/taler-util";
import {
- Stores,
WalletContractData,
DenomSelectionState,
ExchangeUpdateStatus,
@@ -46,8 +45,8 @@ import {
AbortStatus,
RefreshSessionRecord,
WireInfo,
+ WalletStoresV1,
} from "../../db.js";
-import { TransactionHandle } from "../../index.js";
import { PayCoinSelection } from "../../util/coinSelection";
import { j2s } from "@gnu-taler/taler-util";
import { checkDbInvariant, checkLogicInvariant } from "../../util/invariants";
@@ -57,6 +56,7 @@ import { InternalWalletState } from "../state";
import { provideBackupState } from "./state";
import { makeEventId, TombstoneTag } from "../transactions.js";
import { getExchangeDetails } from "../exchanges.js";
+import { GetReadOnlyAccess, GetReadWriteAccess } from "../../util/query.js";
const logger = new Logger("operations/backup/import.ts");
@@ -74,9 +74,12 @@ function checkBackupInvariant(b: boolean, m?: string): asserts b {
* Re-compute information about the coin selection for a payment.
*/
async function recoverPayCoinSelection(
- tx: TransactionHandle<
- typeof Stores.exchanges | typeof Stores.coins | typeof Stores.denominations
- >,
+ tx: GetReadWriteAccess<{
+ exchanges: typeof WalletStoresV1.exchanges;
+ exchangeDetails: typeof WalletStoresV1.exchangeDetails;
+ coins: typeof WalletStoresV1.coins;
+ denominations: typeof WalletStoresV1.denominations;
+ }>,
contractData: WalletContractData,
backupPurchase: BackupPurchase,
): Promise<PayCoinSelection> {
@@ -93,9 +96,9 @@ async function recoverPayCoinSelection(
);
for (const coinPub of coinPubs) {
- const coinRecord = await tx.get(Stores.coins, coinPub);
+ const coinRecord = await tx.coins.get(coinPub);
checkBackupInvariant(!!coinRecord);
- const denom = await tx.get(Stores.denominations, [
+ const denom = await tx.denominations.get([
coinRecord.exchangeBaseUrl,
coinRecord.denomPubHash,
]);
@@ -154,11 +157,11 @@ async function recoverPayCoinSelection(
}
async function getDenomSelStateFromBackup(
- tx: TransactionHandle<typeof Stores.denominations>,
+ tx: GetReadOnlyAccess<{ denominations: typeof WalletStoresV1.denominations }>,
exchangeBaseUrl: string,
sel: BackupDenomSel,
): Promise<DenomSelectionState> {
- const d0 = await tx.get(Stores.denominations, [
+ const d0 = await tx.denominations.get([
exchangeBaseUrl,
sel[0].denom_pub_hash,
]);
@@ -170,10 +173,7 @@ async function getDenomSelStateFromBackup(
let totalCoinValue = Amounts.getZero(d0.value.currency);
let totalWithdrawCost = Amounts.getZero(d0.value.currency);
for (const s of sel) {
- const d = await tx.get(Stores.denominations, [
- exchangeBaseUrl,
- s.denom_pub_hash,
- ]);
+ const d = await tx.denominations.get([exchangeBaseUrl, s.denom_pub_hash]);
checkBackupInvariant(!!d);
totalCoinValue = Amounts.add(totalCoinValue, d.value).amount;
totalWithdrawCost = Amounts.add(totalWithdrawCost, d.value, d.feeWithdraw)
@@ -215,32 +215,32 @@ export async function importBackup(
logger.info(`importing backup ${j2s(backupBlobArg)}`);
- return ws.db.runWithWriteTransaction(
- [
- Stores.config,
- Stores.exchanges,
- Stores.exchangeDetails,
- Stores.coins,
- Stores.denominations,
- Stores.purchases,
- Stores.proposals,
- Stores.refreshGroups,
- Stores.backupProviders,
- Stores.tips,
- Stores.recoupGroups,
- Stores.reserves,
- Stores.withdrawalGroups,
- Stores.tombstones,
- Stores.depositGroups,
- ],
- async (tx) => {
+ return ws.db
+ .mktx((x) => ({
+ config: x.config,
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ coins: x.coins,
+ denominations: x.denominations,
+ purchases: x.purchases,
+ proposals: x.proposals,
+ refreshGroups: x.refreshGroups,
+ backupProviders: x.backupProviders,
+ tips: x.tips,
+ recoupGroups: x.recoupGroups,
+ reserves: x.reserves,
+ withdrawalGroups: x.withdrawalGroups,
+ tombstones: x.tombstones,
+ depositGroups: x.depositGroups,
+ }))
+ .runReadWrite(async (tx) => {
// FIXME: validate schema!
const backupBlob = backupBlobArg as WalletBackupContentV1;
// FIXME: validate version
for (const tombstone of backupBlob.tombstones) {
- await tx.put(Stores.tombstones, {
+ await tx.tombstones.put({
id: tombstone,
});
}
@@ -250,14 +250,13 @@ export async function importBackup(
// FIXME: Validate that the "details pointer" is correct
for (const backupExchange of backupBlob.exchanges) {
- const existingExchange = await tx.get(
- Stores.exchanges,
+ const existingExchange = await tx.exchanges.get(
backupExchange.base_url,
);
if (existingExchange) {
continue;
}
- await tx.put(Stores.exchanges, {
+ await tx.exchanges.put({
baseUrl: backupExchange.base_url,
detailsPointer: {
currency: backupExchange.currency,
@@ -272,7 +271,7 @@ export async function importBackup(
}
for (const backupExchangeDetails of backupBlob.exchange_details) {
- const existingExchangeDetails = await tx.get(Stores.exchangeDetails, [
+ const existingExchangeDetails = await tx.exchangeDetails.get([
backupExchangeDetails.base_url,
backupExchangeDetails.currency,
backupExchangeDetails.master_public_key,
@@ -296,7 +295,7 @@ export async function importBackup(
wireFee: Amounts.parseOrThrow(fee.wire_fee),
});
}
- await tx.put(Stores.exchangeDetails, {
+ await tx.exchangeDetails.put({
exchangeBaseUrl: backupExchangeDetails.base_url,
termsOfServiceAcceptedEtag: backupExchangeDetails.tos_etag_accepted,
termsOfServiceText: undefined,
@@ -327,7 +326,7 @@ export async function importBackup(
const denomPubHash =
cryptoComp.denomPubToHash[backupDenomination.denom_pub];
checkLogicInvariant(!!denomPubHash);
- const existingDenom = await tx.get(Stores.denominations, [
+ const existingDenom = await tx.denominations.get([
backupExchangeDetails.base_url,
denomPubHash,
]);
@@ -336,7 +335,7 @@ export async function importBackup(
`importing backup denomination: ${j2s(backupDenomination)}`,
);
- await tx.put(Stores.denominations, {
+ await tx.denominations.put({
denomPub: backupDenomination.denom_pub,
denomPubHash: denomPubHash,
exchangeBaseUrl: backupExchangeDetails.base_url,
@@ -361,7 +360,7 @@ export async function importBackup(
const compCoin =
cryptoComp.coinPrivToCompletedCoin[backupCoin.coin_priv];
checkLogicInvariant(!!compCoin);
- const existingCoin = await tx.get(Stores.coins, compCoin.coinPub);
+ const existingCoin = await tx.coins.get(compCoin.coinPub);
if (!existingCoin) {
let coinSource: CoinSource;
switch (backupCoin.coin_source.type) {
@@ -388,7 +387,7 @@ export async function importBackup(
};
break;
}
- await tx.put(Stores.coins, {
+ await tx.coins.put({
blindingKey: backupCoin.blinding_key,
coinEvHash: compCoin.coinEvHash,
coinPriv: backupCoin.coin_priv,
@@ -416,7 +415,7 @@ export async function importBackup(
continue;
}
checkLogicInvariant(!!reservePub);
- const existingReserve = await tx.get(Stores.reserves, reservePub);
+ const existingReserve = await tx.reserves.get(reservePub);
const instructedAmount = Amounts.parseOrThrow(
backupReserve.instructed_amount,
);
@@ -429,7 +428,7 @@ export async function importBackup(
confirmUrl: backupReserve.bank_info.confirm_url,
};
}
- await tx.put(Stores.reserves, {
+ await tx.reserves.put({
currency: instructedAmount.currency,
instructedAmount,
exchangeBaseUrl: backupExchangeDetails.base_url,
@@ -467,12 +466,11 @@ export async function importBackup(
if (tombstoneSet.has(ts)) {
continue;
}
- const existingWg = await tx.get(
- Stores.withdrawalGroups,
+ const existingWg = await tx.withdrawalGroups.get(
backupWg.withdrawal_group_id,
);
if (!existingWg) {
- await tx.put(Stores.withdrawalGroups, {
+ await tx.withdrawalGroups.put({
denomsSel: await getDenomSelStateFromBackup(
tx,
backupExchangeDetails.base_url,
@@ -504,8 +502,7 @@ export async function importBackup(
if (tombstoneSet.has(ts)) {
continue;
}
- const existingProposal = await tx.get(
- Stores.proposals,
+ const existingProposal = await tx.proposals.get(
backupProposal.proposal_id,
);
if (!existingProposal) {
@@ -584,7 +581,7 @@ export async function importBackup(
contractTermsRaw: backupProposal.contract_terms_raw,
};
}
- await tx.put(Stores.proposals, {
+ await tx.proposals.put({
claimToken: backupProposal.claim_token,
lastError: undefined,
merchantBaseUrl: backupProposal.merchant_base_url,
@@ -610,17 +607,16 @@ export async function importBackup(
if (tombstoneSet.has(ts)) {
continue;
}
- const existingPurchase = await tx.get(
- Stores.purchases,
+ const existingPurchase = await tx.purchases.get(
backupPurchase.proposal_id,
);
if (!existingPurchase) {
const refunds: { [refundKey: string]: WalletRefundItem } = {};
for (const backupRefund of backupPurchase.refunds) {
const key = `${backupRefund.coin_pub}-${backupRefund.rtransaction_id}`;
- const coin = await tx.get(Stores.coins, backupRefund.coin_pub);
+ const coin = await tx.coins.get(backupRefund.coin_pub);
checkBackupInvariant(!!coin);
- const denom = await tx.get(Stores.denominations, [
+ const denom = await tx.denominations.get([
coin.exchangeBaseUrl,
coin.denomPubHash,
]);
@@ -724,7 +720,7 @@ export async function importBackup(
},
contractTermsRaw: backupPurchase.contract_terms_raw,
};
- await tx.put(Stores.purchases, {
+ await tx.purchases.put({
proposalId: backupPurchase.proposal_id,
noncePriv: backupPurchase.nonce_priv,
noncePub:
@@ -766,8 +762,7 @@ export async function importBackup(
if (tombstoneSet.has(ts)) {
continue;
}
- const existingRg = await tx.get(
- Stores.refreshGroups,
+ const existingRg = await tx.refreshGroups.get(
backupRefreshGroup.refresh_group_id,
);
if (!existingRg) {
@@ -800,7 +795,7 @@ export async function importBackup(
| undefined
)[] = [];
for (const oldCoin of backupRefreshGroup.old_coins) {
- const c = await tx.get(Stores.coins, oldCoin.coin_pub);
+ const c = await tx.coins.get(oldCoin.coin_pub);
checkBackupInvariant(!!c);
if (oldCoin.refresh_session) {
const denomSel = await getDenomSelStateFromBackup(
@@ -821,7 +816,7 @@ export async function importBackup(
refreshSessionPerCoin.push(undefined);
}
}
- await tx.put(Stores.refreshGroups, {
+ await tx.refreshGroups.put({
timestampFinished: backupRefreshGroup.timestamp_finish,
timestampCreated: backupRefreshGroup.timestamp_created,
refreshGroupId: backupRefreshGroup.refresh_group_id,
@@ -849,14 +844,14 @@ export async function importBackup(
if (tombstoneSet.has(ts)) {
continue;
}
- const existingTip = await tx.get(Stores.tips, backupTip.wallet_tip_id);
+ const existingTip = await tx.tips.get(backupTip.wallet_tip_id);
if (!existingTip) {
const denomsSel = await getDenomSelStateFromBackup(
tx,
backupTip.exchange_base_url,
backupTip.selected_denoms,
);
- await tx.put(Stores.tips, {
+ await tx.tips.put({
acceptedTimestamp: backupTip.timestamp_accepted,
createdTimestamp: backupTip.timestamp_created,
denomsSel,
@@ -884,27 +879,26 @@ export async function importBackup(
for (const tombstone of backupBlob.tombstones) {
const [type, ...rest] = tombstone.split(":");
if (type === TombstoneTag.DeleteDepositGroup) {
- await tx.delete(Stores.depositGroups, rest[0]);
+ await tx.depositGroups.delete(rest[0]);
} else if (type === TombstoneTag.DeletePayment) {
- await tx.delete(Stores.purchases, rest[0]);
- await tx.delete(Stores.proposals, rest[0]);
+ await tx.purchases.delete(rest[0]);
+ await tx.proposals.delete(rest[0]);
} else if (type === TombstoneTag.DeleteRefreshGroup) {
- await tx.delete(Stores.refreshGroups, rest[0]);
+ await tx.refreshGroups.delete(rest[0]);
} else if (type === TombstoneTag.DeleteRefund) {
// Nothing required, will just prevent display
// in the transactions list
} else if (type === TombstoneTag.DeleteReserve) {
// FIXME: Once we also have account (=kyc) reserves,
// we need to check if the reserve is an account before deleting here
- await tx.delete(Stores.reserves, rest[0]);
+ await tx.reserves.delete(rest[0]);
} else if (type === TombstoneTag.DeleteTip) {
- await tx.delete(Stores.tips, rest[0]);
+ await tx.tips.delete(rest[0]);
} else if (type === TombstoneTag.DeleteWithdrawalGroup) {
- await tx.delete(Stores.withdrawalGroups, rest[0]);
+ await tx.withdrawalGroups.delete(rest[0]);
} else {
logger.warn(`unable to process tombstone of type '${type}'`);
}
}
- },
- );
+ });
}
diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts
index 743314791..bb067dfb5 100644
--- a/packages/taler-wallet-core/src/operations/backup/index.ts
+++ b/packages/taler-wallet-core/src/operations/backup/index.ts
@@ -35,7 +35,6 @@ import {
BackupProviderRecord,
BackupProviderTerms,
ConfigRecord,
- Stores,
} from "../../db.js";
import { checkDbInvariant, checkLogicInvariant } from "../../util/invariants";
import {
@@ -312,18 +311,17 @@ async function runBackupCycleForProvider(
// FIXME: check if the provider is overcharging us!
- await ws.db.runWithWriteTransaction(
- [Stores.backupProviders],
- async (tx) => {
- const provRec = await tx.get(Stores.backupProviders, provider.baseUrl);
+ await ws.db
+ .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .runReadWrite(async (tx) => {
+ const provRec = await tx.backupProviders.get(provider.baseUrl);
checkDbInvariant(!!provRec);
const ids = new Set(provRec.paymentProposalIds);
ids.add(proposalId);
provRec.paymentProposalIds = Array.from(ids).sort();
provRec.currentPaymentProposalId = proposalId;
- await tx.put(Stores.backupProviders, provRec);
- },
- );
+ await tx.backupProviders.put(provRec);
+ });
if (doPay) {
const confirmRes = await confirmPay(ws, proposalId);
@@ -344,19 +342,18 @@ async function runBackupCycleForProvider(
}
if (resp.status === HttpResponseStatus.NoContent) {
- await ws.db.runWithWriteTransaction(
- [Stores.backupProviders],
- async (tx) => {
- const prov = await tx.get(Stores.backupProviders, provider.baseUrl);
+ await ws.db
+ .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .runReadWrite(async (tx) => {
+ const prov = await tx.backupProviders.get(provider.baseUrl);
if (!prov) {
return;
}
prov.lastBackupHash = encodeCrock(currentBackupHash);
prov.lastBackupTimestamp = getTimestampNow();
prov.lastError = undefined;
- await tx.put(Stores.backupProviders, prov);
- },
- );
+ await tx.backupProviders.put(prov);
+ });
return;
}
@@ -367,19 +364,18 @@ async function runBackupCycleForProvider(
const blob = await decryptBackup(backupConfig, backupEnc);
const cryptoData = await computeBackupCryptoData(ws.cryptoApi, blob);
await importBackup(ws, blob, cryptoData);
- await ws.db.runWithWriteTransaction(
- [Stores.backupProviders],
- async (tx) => {
- const prov = await tx.get(Stores.backupProviders, provider.baseUrl);
+ await ws.db
+ .mktx((x) => ({ backupProvider: x.backupProviders }))
+ .runReadWrite(async (tx) => {
+ const prov = await tx.backupProvider.get(provider.baseUrl);
if (!prov) {
return;
}
prov.lastBackupHash = encodeCrock(hash(backupEnc));
prov.lastBackupTimestamp = getTimestampNow();
prov.lastError = undefined;
- await tx.put(Stores.backupProviders, prov);
- },
- );
+ await tx.backupProvider.put(prov);
+ });
logger.info("processed existing backup");
return;
}
@@ -390,14 +386,16 @@ async function runBackupCycleForProvider(
const err = await readTalerErrorResponse(resp);
logger.error(`got error response from backup provider: ${j2s(err)}`);
- await ws.db.runWithWriteTransaction([Stores.backupProviders], async (tx) => {
- const prov = await tx.get(Stores.backupProviders, provider.baseUrl);
- if (!prov) {
- return;
- }
- prov.lastError = err;
- await tx.put(Stores.backupProviders, prov);
- });
+ await ws.db
+ .mktx((x) => ({ backupProvider: x.backupProviders }))
+ .runReadWrite(async (tx) => {
+ const prov = await tx.backupProvider.get(provider.baseUrl);
+ if (!prov) {
+ return;
+ }
+ prov.lastError = err;
+ await tx.backupProvider.put(prov);
+ });
}
/**
@@ -408,7 +406,11 @@ async function runBackupCycleForProvider(
* 3. Upload the updated backup blob.
*/
export async function runBackupCycle(ws: InternalWalletState): Promise<void> {
- const providers = await ws.db.iter(Stores.backupProviders).toArray();
+ const providers = await ws.db
+ .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .runReadOnly(async (tx) => {
+ return await tx.backupProviders.iter().toArray();
+ });
logger.trace("got backup providers", providers);
const backupJson = await exportBackup(ws);
@@ -472,35 +474,43 @@ export async function addBackupProvider(
logger.info(`adding backup provider ${j2s(req)}`);
await provideBackupState(ws);
const canonUrl = canonicalizeBaseUrl(req.backupProviderBaseUrl);
- const oldProv = await ws.db.get(Stores.backupProviders, canonUrl);
- if (oldProv) {
- logger.info("old backup provider found");
- if (req.activate) {
- oldProv.active = true;
- logger.info("setting existing backup provider to active");
- await ws.db.put(Stores.backupProviders, oldProv);
- }
- return;
- }
+ await ws.db
+ .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .runReadWrite(async (tx) => {
+ const oldProv = await tx.backupProviders.get(canonUrl);
+ if (oldProv) {
+ logger.info("old backup provider found");
+ if (req.activate) {
+ oldProv.active = true;
+ logger.info("setting existing backup provider to active");
+ await tx.backupProviders.put(oldProv);
+ }
+ return;
+ }
+ });
const termsUrl = new URL("terms", canonUrl);
const resp = await ws.http.get(termsUrl.href);
const terms = await readSuccessResponseJsonOrThrow(
resp,
codecForSyncTermsOfServiceResponse(),
);
- await ws.db.put(Stores.backupProviders, {
- active: !!req.activate,
- terms: {
- annualFee: terms.annual_fee,
- storageLimitInMegabytes: terms.storage_limit_in_megabytes,
- supportedProtocolVersion: terms.version,
- },
- paymentProposalIds: [],
- baseUrl: canonUrl,
- lastError: undefined,
- retryInfo: initRetryInfo(false),
- uids: [encodeCrock(getRandomBytes(32))],
- });
+ await ws.db
+ .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .runReadWrite(async (tx) => {
+ await tx.backupProviders.put({
+ active: !!req.activate,
+ terms: {
+ annualFee: terms.annual_fee,
+ storageLimitInMegabytes: terms.storage_limit_in_megabytes,
+ supportedProtocolVersion: terms.version,
+ },
+ paymentProposalIds: [],
+ baseUrl: canonUrl,
+ lastError: undefined,
+ retryInfo: initRetryInfo(false),
+ uids: [encodeCrock(getRandomBytes(32))],
+ });
+ });
}
export async function removeBackupProvider(
@@ -654,7 +664,11 @@ export async function getBackupInfo(
ws: InternalWalletState,
): Promise<BackupInfo> {
const backupConfig = await provideBackupState(ws);
- const providerRecords = await ws.db.iter(Stores.backupProviders).toArray();
+ const providerRecords = await ws.db
+ .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .runReadOnly(async (tx) => {
+ return await tx.backupProviders.iter().toArray();
+ });
const providers: ProviderInfo[] = [];
for (const x of providerRecords) {
providers.push({
@@ -675,13 +689,18 @@ export async function getBackupInfo(
}
/**
- * Get information about the current state of wallet backups.
+ * Get backup recovery information, including the wallet's
+ * private key.
*/
export async function getBackupRecovery(
ws: InternalWalletState,
): Promise<BackupRecovery> {
const bs = await provideBackupState(ws);
- const providers = await ws.db.iter(Stores.backupProviders).toArray();
+ const providers = await ws.db
+ .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .runReadOnly(async (tx) => {
+ return await tx.backupProviders.iter().toArray();
+ });
return {
providers: providers
.filter((x) => x.active)
@@ -698,12 +717,12 @@ async function backupRecoveryTheirs(
ws: InternalWalletState,
br: BackupRecovery,
) {
- await ws.db.runWithWriteTransaction(
- [Stores.config, Stores.backupProviders],
- async (tx) => {
+ await ws.db
+ .mktx((x) => ({ config: x.config, backupProviders: x.backupProviders }))
+ .runReadWrite(async (tx) => {
let backupStateEntry:
| ConfigRecord<WalletBackupConfState>
- | undefined = await tx.get(Stores.config, WALLET_BACKUP_STATE_KEY);
+ | undefined = await tx.config.get(WALLET_BACKUP_STATE_KEY);
checkDbInvariant(!!backupStateEntry);
backupStateEntry.value.lastBackupNonce = undefined;
backupStateEntry.value.lastBackupTimestamp = undefined;
@@ -713,11 +732,11 @@ async function backupRecoveryTheirs(
backupStateEntry.value.walletRootPub = encodeCrock(
eddsaGetPublic(decodeCrock(br.walletRootPriv)),
);
- await tx.put(Stores.config, backupStateEntry);
+ await tx.config.put(backupStateEntry);
for (const prov of br.providers) {
- const existingProv = await tx.get(Stores.backupProviders, prov.url);
+ const existingProv = await tx.backupProviders.get(prov.url);
if (!existingProv) {
- await tx.put(Stores.backupProviders, {
+ await tx.backupProviders.put({
active: true,
baseUrl: prov.url,
paymentProposalIds: [],
@@ -727,14 +746,13 @@ async function backupRecoveryTheirs(
});
}
}
- const providers = await tx.iter(Stores.backupProviders).toArray();
+ const providers = await tx.backupProviders.iter().toArray();
for (const prov of providers) {
prov.lastBackupTimestamp = undefined;
prov.lastBackupHash = undefined;
- await tx.put(Stores.backupProviders, prov);
+ await tx.backupProviders.put(prov);
}
- },
- );
+ });
}
async function backupRecoveryOurs(ws: InternalWalletState, br: BackupRecovery) {
@@ -746,7 +764,11 @@ export async function loadBackupRecovery(
br: RecoveryLoadRequest,
): Promise<void> {
const bs = await provideBackupState(ws);
- const providers = await ws.db.iter(Stores.backupProviders).toArray();
+ const providers = await ws.db
+ .mktx((x) => ({ backupProviders: x.backupProviders }))
+ .runReadOnly(async (tx) => {
+ return await tx.backupProviders.iter().toArray();
+ });
let strategy = br.strategy;
if (
br.recovery.walletRootPriv != bs.walletRootPriv &&
@@ -772,12 +794,11 @@ export async function exportBackupEncrypted(
): Promise<Uint8Array> {
await provideBackupState(ws);
const blob = await exportBackup(ws);
- const bs = await ws.db.runWithWriteTransaction(
- [Stores.config],
- async (tx) => {
+ const bs = await ws.db
+ .mktx((x) => ({ config: x.config }))
+ .runReadOnly(async (tx) => {
return await getWalletBackupState(ws, tx);
- },
- );
+ });
return encryptBackup(bs, blob);
}
diff --git a/packages/taler-wallet-core/src/operations/backup/state.ts b/packages/taler-wallet-core/src/operations/backup/state.ts
index e2a0f4cf3..226880439 100644
--- a/packages/taler-wallet-core/src/operations/backup/state.ts
+++ b/packages/taler-wallet-core/src/operations/backup/state.ts
@@ -15,9 +15,11 @@
*/
import { Timestamp } from "@gnu-taler/taler-util";
-import { ConfigRecord, Stores } from "../../db.js";
-import { getRandomBytes, encodeCrock, TransactionHandle } from "../../index.js";
+import { ConfigRecord, WalletStoresV1 } from "../../db.js";
+import { getRandomBytes, encodeCrock } from "../../index.js";
import { checkDbInvariant } from "../../util/invariants";
+import { GetReadOnlyAccess } from "../../util/query.js";
+import { Wallet } from "../../wallet.js";
import { InternalWalletState } from "../state";
export interface WalletBackupConfState {
@@ -48,10 +50,13 @@ export const WALLET_BACKUP_STATE_KEY = "walletBackupState";
export async function provideBackupState(
ws: InternalWalletState,
): Promise<WalletBackupConfState> {
- const bs: ConfigRecord<WalletBackupConfState> | undefined = await ws.db.get(
- Stores.config,
- WALLET_BACKUP_STATE_KEY,
- );
+ const bs: ConfigRecord<WalletBackupConfState> | undefined = await ws.db
+ .mktx((x) => ({
+ config: x.config,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.config.get(WALLET_BACKUP_STATE_KEY);
+ });
if (bs) {
return bs.value;
}
@@ -62,32 +67,36 @@ export async function provideBackupState(
// FIXME: device ID should be configured when wallet is initialized
// and be based on hostname
const deviceId = `wallet-core-${encodeCrock(d)}`;
- return await ws.db.runWithWriteTransaction([Stores.config], async (tx) => {
- let backupStateEntry:
- | ConfigRecord<WalletBackupConfState>
- | undefined = await tx.get(Stores.config, WALLET_BACKUP_STATE_KEY);
- if (!backupStateEntry) {
- backupStateEntry = {
- key: WALLET_BACKUP_STATE_KEY,
- value: {
- deviceId,
- clocks: { [deviceId]: 1 },
- walletRootPub: k.pub,
- walletRootPriv: k.priv,
- lastBackupPlainHash: undefined,
- },
- };
- await tx.put(Stores.config, backupStateEntry);
- }
- return backupStateEntry.value;
- });
+ return await ws.db
+ .mktx((x) => ({
+ config: x.config,
+ }))
+ .runReadWrite(async (tx) => {
+ let backupStateEntry:
+ | ConfigRecord<WalletBackupConfState>
+ | undefined = await tx.config.get(WALLET_BACKUP_STATE_KEY);
+ if (!backupStateEntry) {
+ backupStateEntry = {
+ key: WALLET_BACKUP_STATE_KEY,
+ value: {
+ deviceId,
+ clocks: { [deviceId]: 1 },
+ walletRootPub: k.pub,
+ walletRootPriv: k.priv,
+ lastBackupPlainHash: undefined,
+ },
+ };
+ await tx.config.put(backupStateEntry);
+ }
+ return backupStateEntry.value;
+ });
}
export async function getWalletBackupState(
ws: InternalWalletState,
- tx: TransactionHandle<typeof Stores.config>,
+ tx: GetReadOnlyAccess<{ config: typeof WalletStoresV1.config }>,
): Promise<WalletBackupConfState> {
- let bs = await tx.get(Stores.config, WALLET_BACKUP_STATE_KEY);
+ const bs = await tx.config.get(WALLET_BACKUP_STATE_KEY);
checkDbInvariant(!!bs, "wallet backup state should be in DB");
return bs.value;
}
diff --git a/packages/taler-wallet-core/src/operations/balance.ts b/packages/taler-wallet-core/src/operations/balance.ts
index afa561bfb..7273f0b42 100644
--- a/packages/taler-wallet-core/src/operations/balance.ts
+++ b/packages/taler-wallet-core/src/operations/balance.ts
@@ -17,10 +17,10 @@
/**
* Imports.
*/
-import { AmountJson, BalancesResponse, Amounts } from "@gnu-taler/taler-util";
-import { Stores, CoinStatus } from "../db.js";
-import { TransactionHandle } from "../index.js";
-import { Logger } from "@gnu-taler/taler-util";
+import { AmountJson, BalancesResponse, Amounts, Logger } from "@gnu-taler/taler-util";
+
+import { CoinStatus, WalletStoresV1 } from "../db.js";
+import { GetReadOnlyAccess } from "../util/query.js";
import { InternalWalletState } from "./state.js";
const logger = new Logger("withdraw.ts");
@@ -36,13 +36,12 @@ interface WalletBalance {
*/
export async function getBalancesInsideTransaction(
ws: InternalWalletState,
- tx: TransactionHandle<
- | typeof Stores.reserves
- | typeof Stores.coins
- | typeof Stores.reserves
- | typeof Stores.refreshGroups
- | typeof Stores.withdrawalGroups
- >,
+ tx: GetReadOnlyAccess<{
+ reserves: typeof WalletStoresV1.reserves;
+ coins: typeof WalletStoresV1.coins;
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
+ }>,
): Promise<BalancesResponse> {
const balanceStore: Record<string, WalletBalance> = {};
@@ -63,7 +62,7 @@ export async function getBalancesInsideTransaction(
};
// Initialize balance to zero, even if we didn't start withdrawing yet.
- await tx.iter(Stores.reserves).forEach((r) => {
+ await tx.reserves.iter().forEach((r) => {
const b = initBalance(r.currency);
if (!r.initialWithdrawalStarted) {
b.pendingIncoming = Amounts.add(
@@ -73,7 +72,7 @@ export async function getBalancesInsideTransaction(
}
});
- await tx.iter(Stores.coins).forEach((c) => {
+ await tx.coins.iter().forEach((c) => {
// Only count fresh coins, as dormant coins will
// already be in a refresh session.
if (c.status === CoinStatus.Fresh) {
@@ -82,7 +81,7 @@ export async function getBalancesInsideTransaction(
}
});
- await tx.iter(Stores.refreshGroups).forEach((r) => {
+ await tx.refreshGroups.iter().forEach((r) => {
// Don't count finished refreshes, since the refresh already resulted
// in coins being added to the wallet.
if (r.timestampFinished) {
@@ -108,7 +107,7 @@ export async function getBalancesInsideTransaction(
}
});
- await tx.iter(Stores.withdrawalGroups).forEach((wds) => {
+ await tx.withdrawalGroups.iter().forEach((wds) => {
if (wds.timestampFinish) {
return;
}
@@ -147,18 +146,17 @@ export async function getBalances(
): Promise<BalancesResponse> {
logger.trace("starting to compute balance");
- const wbal = await ws.db.runWithReadTransaction(
- [
- Stores.coins,
- Stores.refreshGroups,
- Stores.reserves,
- Stores.purchases,
- Stores.withdrawalGroups,
- ],
- async (tx) => {
+ const wbal = await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ refreshGroups: x.refreshGroups,
+ reserves: x.reserves,
+ purchases: x.purchases,
+ withdrawalGroups: x.withdrawalGroups,
+ }))
+ .runReadOnly(async (tx) => {
return getBalancesInsideTransaction(ws, tx);
- },
- );
+ });
logger.trace("finished computing wallet balance");
diff --git a/packages/taler-wallet-core/src/operations/currencies.ts b/packages/taler-wallet-core/src/operations/currencies.ts
index cead07a69..e591b50c0 100644
--- a/packages/taler-wallet-core/src/operations/currencies.ts
+++ b/packages/taler-wallet-core/src/operations/currencies.ts
@@ -17,7 +17,7 @@
/**
* Imports.
*/
-import { ExchangeRecord, Stores } from "../db.js";
+import { ExchangeRecord } from "../db.js";
import { Logger } from "@gnu-taler/taler-util";
import { getExchangeDetails } from "./exchanges.js";
import { InternalWalletState } from "./state.js";
@@ -38,37 +38,44 @@ export async function getExchangeTrust(
): Promise<TrustInfo> {
let isTrusted = false;
let isAudited = false;
- const exchangeDetails = await ws.db.runWithReadTransaction(
- [Stores.exchangeDetails, Stores.exchanges],
- async (tx) => {
- return getExchangeDetails(tx, exchangeInfo.baseUrl);
- },
- );
- if (!exchangeDetails) {
- throw Error(`exchange ${exchangeInfo.baseUrl} details not available`);
- }
- const exchangeTrustRecord = await ws.db.getIndexed(
- Stores.exchangeTrustStore.exchangeMasterPubIndex,
- exchangeDetails.masterPublicKey,
- );
- if (
- exchangeTrustRecord &&
- exchangeTrustRecord.uids.length > 0 &&
- exchangeTrustRecord.currency === exchangeDetails.currency
- ) {
- isTrusted = true;
- }
- for (const auditor of exchangeDetails.auditors) {
- const auditorTrustRecord = await ws.db.getIndexed(
- Stores.auditorTrustStore.auditorPubIndex,
- auditor.auditor_pub,
- );
- if (auditorTrustRecord && auditorTrustRecord.uids.length > 0) {
- isAudited = true;
- break;
- }
- }
+ return await ws.db
+ .mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ exchangesTrustStore: x.exchangeTrust,
+ auditorTrust: x.auditorTrust,
+ }))
+ .runReadOnly(async (tx) => {
+ const exchangeDetails = await getExchangeDetails(
+ tx,
+ exchangeInfo.baseUrl,
+ );
- return { isTrusted, isAudited };
+ if (!exchangeDetails) {
+ throw Error(`exchange ${exchangeInfo.baseUrl} details not available`);
+ }
+ const exchangeTrustRecord = await tx.exchangesTrustStore.indexes.byExchangeMasterPub.get(
+ exchangeDetails.masterPublicKey,
+ );
+ if (
+ exchangeTrustRecord &&
+ exchangeTrustRecord.uids.length > 0 &&
+ exchangeTrustRecord.currency === exchangeDetails.currency
+ ) {
+ isTrusted = true;
+ }
+
+ for (const auditor of exchangeDetails.auditors) {
+ const auditorTrustRecord = await tx.auditorTrust.indexes.byAuditorPub.get(
+ auditor.auditor_pub,
+ );
+ if (auditorTrustRecord && auditorTrustRecord.uids.length > 0) {
+ isAudited = true;
+ break;
+ }
+ }
+
+ return { isTrusted, isAudited };
+ });
}
diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts
index 408ad3926..996e8cf39 100644
--- a/packages/taler-wallet-core/src/operations/deposits.ts
+++ b/packages/taler-wallet-core/src/operations/deposits.ts
@@ -56,7 +56,8 @@ import {
} from "./pay";
import { InternalWalletState } from "./state";
import { Logger } from "@gnu-taler/taler-util";
-import { DepositGroupRecord, Stores } from "../db.js";
+import { DepositGroupRecord } from "../db.js";
+
import { guardOperationException } from "./errors.js";
import { getExchangeDetails } from "./exchanges.js";
@@ -116,12 +117,17 @@ async function resetDepositGroupRetry(
ws: InternalWalletState,
depositGroupId: string,
): Promise<void> {
- await ws.db.mutate(Stores.depositGroups, depositGroupId, (x) => {
- if (x.retryInfo.active) {
- x.retryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({
+ depositGroups: x.depositGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const x = await tx.depositGroups.get(depositGroupId);
+ if (x && x.retryInfo.active) {
+ x.retryInfo = initRetryInfo();
+ await tx.depositGroups.put(x);
+ }
+ });
}
async function incrementDepositRetry(
@@ -129,19 +135,21 @@ async function incrementDepositRetry(
depositGroupId: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.depositGroups], async (tx) => {
- const r = await tx.get(Stores.depositGroups, depositGroupId);
- if (!r) {
- return;
- }
- if (!r.retryInfo) {
- return;
- }
- r.retryInfo.retryCounter++;
- updateRetryInfoTimeout(r.retryInfo);
- r.lastError = err;
- await tx.put(Stores.depositGroups, r);
- });
+ await ws.db
+ .mktx((x) => ({ depositGroups: x.depositGroups }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.depositGroups.get(depositGroupId);
+ if (!r) {
+ return;
+ }
+ if (!r.retryInfo) {
+ return;
+ }
+ r.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(r.retryInfo);
+ r.lastError = err;
+ await tx.depositGroups.put(r);
+ });
if (err) {
ws.notify({ type: NotificationType.DepositOperationError, error: err });
}
@@ -170,7 +178,13 @@ async function processDepositGroupImpl(
if (forceNow) {
await resetDepositGroupRetry(ws, depositGroupId);
}
- const depositGroup = await ws.db.get(Stores.depositGroups, depositGroupId);
+ const depositGroup = await ws.db
+ .mktx((x) => ({
+ depositGroups: x.depositGroups,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.depositGroups.get(depositGroupId);
+ });
if (!depositGroup) {
logger.warn(`deposit group ${depositGroupId} not found`);
return;
@@ -213,32 +227,38 @@ async function processDepositGroupImpl(
merchant_pub: depositGroup.merchantPub,
});
await readSuccessResponseJsonOrThrow(httpResp, codecForDepositSuccess());
- await ws.db.runWithWriteTransaction([Stores.depositGroups], async (tx) => {
- const dg = await tx.get(Stores.depositGroups, depositGroupId);
+ await ws.db
+ .mktx((x) => ({ depositGroups: x.depositGroups }))
+ .runReadWrite(async (tx) => {
+ const dg = await tx.depositGroups.get(depositGroupId);
+ if (!dg) {
+ return;
+ }
+ dg.depositedPerCoin[i] = true;
+ await tx.depositGroups.put(dg);
+ });
+ }
+
+ await ws.db
+ .mktx((x) => ({
+ depositGroups: x.depositGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const dg = await tx.depositGroups.get(depositGroupId);
if (!dg) {
return;
}
- dg.depositedPerCoin[i] = true;
- await tx.put(Stores.depositGroups, dg);
- });
- }
-
- await ws.db.runWithWriteTransaction([Stores.depositGroups], async (tx) => {
- const dg = await tx.get(Stores.depositGroups, depositGroupId);
- if (!dg) {
- return;
- }
- let allDeposited = true;
- for (const d of depositGroup.depositedPerCoin) {
- if (!d) {
- allDeposited = false;
+ let allDeposited = true;
+ for (const d of depositGroup.depositedPerCoin) {
+ if (!d) {
+ allDeposited = false;
+ }
}
- }
- if (allDeposited) {
- dg.timestampFinished = getTimestampNow();
- await tx.put(Stores.depositGroups, dg);
- }
- });
+ if (allDeposited) {
+ dg.timestampFinished = getTimestampNow();
+ await tx.depositGroups.put(dg);
+ }
+ });
}
export async function trackDepositGroup(
@@ -249,10 +269,13 @@ export async function trackDepositGroup(
status: number;
body: any;
}[] = [];
- const depositGroup = await ws.db.get(
- Stores.depositGroups,
- req.depositGroupId,
- );
+ const depositGroup = await ws.db
+ .mktx((x) => ({
+ depositGroups: x.depositGroups,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.depositGroups.get(req.depositGroupId);
+ });
if (!depositGroup) {
throw Error("deposit group not found");
}
@@ -306,23 +329,26 @@ export async function createDepositGroup(
const amount = Amounts.parseOrThrow(req.amount);
- const allExchanges = await ws.db.iter(Stores.exchanges).toArray();
const exchangeInfos: { url: string; master_pub: string }[] = [];
- for (const e of allExchanges) {
- const details = await ws.db.runWithReadTransaction(
- [Stores.exchanges, Stores.exchangeDetails],
- async (tx) => {
- return getExchangeDetails(tx, e.baseUrl);
- },
- );
- if (!details) {
- continue;
- }
- exchangeInfos.push({
- master_pub: details.masterPublicKey,
- url: e.baseUrl,
+
+ await ws.db
+ .mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ }))
+ .runReadOnly(async (tx) => {
+ const allExchanges = await tx.exchanges.iter().toArray();
+ for (const e of allExchanges) {
+ const details = await getExchangeDetails(tx, e.baseUrl);
+ if (!details) {
+ continue;
+ }
+ exchangeInfos.push({
+ master_pub: details.masterPublicKey,
+ url: e.baseUrl,
+ });
+ }
});
- }
const timestamp = getTimestampNow();
const timestampRound = timestampTruncateToSecond(timestamp);
@@ -421,20 +447,17 @@ export async function createDepositGroup(
lastError: undefined,
};
- await ws.db.runWithWriteTransaction(
- [
- Stores.depositGroups,
- Stores.coins,
- Stores.refreshGroups,
- Stores.denominations,
- ],
- async (tx) => {
+ await ws.db
+ .mktx((x) => ({
+ depositGroups: x.depositGroups,
+ coins: x.coins,
+ refreshGroups: x.refreshGroups,
+ denominations: x.denominations,
+ }))
+ .runReadWrite(async (tx) => {
await applyCoinSpend(ws, tx, payCoinSel);
- await tx.put(Stores.depositGroups, depositGroup);
- },
- );
-
- await ws.db.put(Stores.depositGroups, depositGroup);
+ await tx.depositGroups.put(depositGroup);
+ });
return { depositGroupId };
}
diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts
index e48d12998..789ce1da4 100644
--- a/packages/taler-wallet-core/src/operations/exchanges.ts
+++ b/packages/taler-wallet-core/src/operations/exchanges.ts
@@ -41,13 +41,13 @@ import {
import {
DenominationRecord,
DenominationStatus,
- Stores,
ExchangeRecord,
ExchangeUpdateStatus,
WireFee,
ExchangeUpdateReason,
ExchangeDetailsRecord,
WireInfo,
+ WalletStoresV1,
} from "../db.js";
import {
URL,
@@ -73,7 +73,7 @@ import {
} from "./versions.js";
import { HttpRequestLibrary } from "../util/http.js";
import { CryptoApi } from "../crypto/workers/cryptoApi.js";
-import { TransactionHandle } from "../util/query.js";
+import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
const logger = new Logger("exchanges.ts");
@@ -108,15 +108,17 @@ async function handleExchangeUpdateError(
baseUrl: string,
err: TalerErrorDetails,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.exchanges], async (tx) => {
- const exchange = await tx.get(Stores.exchanges, baseUrl);
- if (!exchange) {
- return;
- }
- exchange.retryInfo.retryCounter++;
- updateRetryInfoTimeout(exchange.retryInfo);
- exchange.lastError = err;
- });
+ await ws.db
+ .mktx((x) => ({ exchanges: x.exchanges }))
+ .runReadOnly(async (tx) => {
+ const exchange = await tx.exchanges.get(baseUrl);
+ if (!exchange) {
+ return;
+ }
+ exchange.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(exchange.retryInfo);
+ exchange.lastError = err;
+ });
if (err) {
ws.notify({ type: NotificationType.ExchangeOperationError, error: err });
}
@@ -153,12 +155,13 @@ async function downloadExchangeWithTermsOfService(
}
export async function getExchangeDetails(
- tx: TransactionHandle<
- typeof Stores.exchanges | typeof Stores.exchangeDetails
- >,
+ tx: GetReadOnlyAccess<{
+ exchanges: typeof WalletStoresV1.exchanges;
+ exchangeDetails: typeof WalletStoresV1.exchangeDetails;
+ }>,
exchangeBaseUrl: string,
): Promise<ExchangeDetailsRecord | undefined> {
- const r = await tx.get(Stores.exchanges, exchangeBaseUrl);
+ const r = await tx.exchanges.get(exchangeBaseUrl);
if (!r) {
return;
}
@@ -167,28 +170,32 @@ export async function getExchangeDetails(
return;
}
const { currency, masterPublicKey } = dp;
- return await tx.get(Stores.exchangeDetails, [
- r.baseUrl,
- currency,
- masterPublicKey,
- ]);
+ return await tx.exchangeDetails.get([r.baseUrl, currency, masterPublicKey]);
}
+getExchangeDetails.makeContext = (db: DbAccess<typeof WalletStoresV1>) =>
+ db.mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ }));
+
export async function acceptExchangeTermsOfService(
ws: InternalWalletState,
exchangeBaseUrl: string,
etag: string | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction(
- [Stores.exchanges, Stores.exchangeDetails],
- async (tx) => {
+ await ws.db
+ .mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ }))
+ .runReadWrite(async (tx) => {
const d = await getExchangeDetails(tx, exchangeBaseUrl);
if (d) {
d.termsOfServiceAcceptedEtag = etag;
- await tx.put(Stores.exchangeDetails, d);
+ await tx.exchangeDetails.put(d);
}
- },
- );
+ });
}
async function validateWireInfo(
@@ -284,21 +291,24 @@ async function provideExchangeRecord(
baseUrl: string,
now: Timestamp,
): Promise<ExchangeRecord> {
- let r = await ws.db.get(Stores.exchanges, baseUrl);
- if (!r) {
- const newExchangeRecord: ExchangeRecord = {
- permanent: true,
- baseUrl: baseUrl,
- updateStatus: ExchangeUpdateStatus.FetchKeys,
- updateStarted: now,
- updateReason: ExchangeUpdateReason.Initial,
- retryInfo: initRetryInfo(false),
- detailsPointer: undefined,
- };
- await ws.db.put(Stores.exchanges, newExchangeRecord);
- r = newExchangeRecord;
- }
- return r;
+ return await ws.db
+ .mktx((x) => ({ exchanges: x.exchanges }))
+ .runReadWrite(async (tx) => {
+ let r = await tx.exchanges.get(baseUrl);
+ if (!r) {
+ r = {
+ permanent: true,
+ baseUrl: baseUrl,
+ updateStatus: ExchangeUpdateStatus.FetchKeys,
+ updateStarted: now,
+ updateReason: ExchangeUpdateReason.Initial,
+ retryInfo: initRetryInfo(false),
+ detailsPointer: undefined,
+ };
+ await tx.exchanges.put(r);
+ }
+ return r;
+ });
}
interface ExchangeKeysDownloadResult {
@@ -427,16 +437,17 @@ async function updateExchangeFromUrlImpl(
let recoupGroupId: string | undefined = undefined;
- const updated = await ws.db.runWithWriteTransaction(
- [
- Stores.exchanges,
- Stores.exchangeDetails,
- Stores.denominations,
- Stores.recoupGroups,
- Stores.coins,
- ],
- async (tx) => {
- const r = await tx.get(Stores.exchanges, baseUrl);
+ const updated = await ws.db
+ .mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ denominations: x.denominations,
+ coins: x.coins,
+ refreshGroups: x.refreshGroups,
+ recoupGroups: x.recoupGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.exchanges.get(baseUrl);
if (!r) {
logger.warn(`exchange ${baseUrl} no longer present`);
return;
@@ -473,18 +484,18 @@ async function updateExchangeFromUrlImpl(
// FIXME: only change if pointer really changed
updateClock: getTimestampNow(),
};
- await tx.put(Stores.exchanges, r);
- await tx.put(Stores.exchangeDetails, details);
+ await tx.exchanges.put(r);
+ await tx.exchangeDetails.put(details);
for (const currentDenom of keysInfo.currentDenominations) {
- const oldDenom = await tx.get(Stores.denominations, [
+ const oldDenom = await tx.denominations.get([
baseUrl,
currentDenom.denomPubHash,
]);
if (oldDenom) {
// FIXME: Do consistency check
} else {
- await tx.put(Stores.denominations, currentDenom);
+ await tx.denominations.put(currentDenom);
}
}
@@ -493,7 +504,7 @@ async function updateExchangeFromUrlImpl(
const newlyRevokedCoinPubs: string[] = [];
logger.trace("recoup list from exchange", recoupDenomList);
for (const recoupInfo of recoupDenomList) {
- const oldDenom = await tx.get(Stores.denominations, [
+ const oldDenom = await tx.denominations.get([
r.baseUrl,
recoupInfo.h_denom_pub,
]);
@@ -509,9 +520,9 @@ async function updateExchangeFromUrlImpl(
}
logger.trace("revoking denom", recoupInfo.h_denom_pub);
oldDenom.isRevoked = true;
- await tx.put(Stores.denominations, oldDenom);
- const affectedCoins = await tx
- .iterIndexed(Stores.coins.denomPubHashIndex, recoupInfo.h_denom_pub)
+ await tx.denominations.put(oldDenom);
+ const affectedCoins = await tx.coins.indexes.byDenomPubHash
+ .iter(recoupInfo.h_denom_pub)
.toArray();
for (const ac of affectedCoins) {
newlyRevokedCoinPubs.push(ac.coinPub);
@@ -525,8 +536,7 @@ async function updateExchangeFromUrlImpl(
exchange: r,
exchangeDetails: details,
};
- },
- );
+ });
if (recoupGroupId) {
// Asynchronously start recoup. This doesn't need to finish
@@ -553,12 +563,11 @@ export async function getExchangePaytoUri(
): Promise<string> {
// We do the update here, since the exchange might not even exist
// yet in our database.
- const details = await ws.db.runWithReadTransaction(
- [Stores.exchangeDetails, Stores.exchanges],
- async (tx) => {
+ const details = await getExchangeDetails
+ .makeContext(ws.db)
+ .runReadOnly(async (tx) => {
return getExchangeDetails(tx, exchangeBaseUrl);
- },
- );
+ });
const accounts = details?.wireInfo.accounts ?? [];
for (const account of accounts) {
const res = parsePaytoUri(account.payto_uri);
diff --git a/packages/taler-wallet-core/src/operations/pay.ts b/packages/taler-wallet-core/src/operations/pay.ts
index 0b1b30f68..c57243b59 100644
--- a/packages/taler-wallet-core/src/operations/pay.ts
+++ b/packages/taler-wallet-core/src/operations/pay.ts
@@ -72,9 +72,7 @@ import {
readSuccessResponseJsonOrErrorCode,
readSuccessResponseJsonOrThrow,
readTalerErrorResponse,
- Stores,
throwUnexpectedRequestError,
- TransactionHandle,
URL,
WalletContractData,
} from "../index.js";
@@ -85,7 +83,7 @@ import {
selectPayCoins,
PreviousPayCoins,
} from "../util/coinSelection.js";
-import { canonicalJson, j2s } from "@gnu-taler/taler-util";
+import { j2s } from "@gnu-taler/taler-util";
import {
initRetryInfo,
updateRetryInfoTimeout,
@@ -95,6 +93,10 @@ import { getTotalRefreshCost, createRefreshGroup } from "./refresh.js";
import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state.js";
import { ContractTermsUtil } from "../util/contractTerms.js";
import { getExchangeDetails } from "./exchanges.js";
+import { DbAccess, GetReadWriteAccess } from "../util/query.js";
+import { WalletStoresV1 } from "../db.js";
+import { Wallet } from "../wallet.js";
+import { x25519_edwards_keyPair_fromSecretKey } from "../crypto/primitives/nacl-fast.js";
/**
* Logger.
@@ -112,34 +114,35 @@ export async function getTotalPaymentCost(
ws: InternalWalletState,
pcs: PayCoinSelection,
): Promise<AmountJson> {
- const costs = [];
- for (let i = 0; i < pcs.coinPubs.length; i++) {
- const coin = await ws.db.get(Stores.coins, pcs.coinPubs[i]);
- if (!coin) {
- throw Error("can't calculate payment cost, coin not found");
- }
- const denom = await ws.db.get(Stores.denominations, [
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- throw Error(
- "can't calculate payment cost, denomination for coin not found",
- );
- }
- const allDenoms = await ws.db
- .iterIndex(
- Stores.denominations.exchangeBaseUrlIndex,
- coin.exchangeBaseUrl,
- )
- .toArray();
- const amountLeft = Amounts.sub(denom.value, pcs.coinContributions[i])
- .amount;
- const refreshCost = getTotalRefreshCost(allDenoms, denom, amountLeft);
- costs.push(pcs.coinContributions[i]);
- costs.push(refreshCost);
- }
- return Amounts.sum(costs).amount;
+ return ws.db
+ .mktx((x) => ({ coins: x.coins, denominations: x.denominations }))
+ .runReadOnly(async (tx) => {
+ const costs = [];
+ for (let i = 0; i < pcs.coinPubs.length; i++) {
+ const coin = await tx.coins.get(pcs.coinPubs[i]);
+ if (!coin) {
+ throw Error("can't calculate payment cost, coin not found");
+ }
+ const denom = await tx.denominations.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ throw Error(
+ "can't calculate payment cost, denomination for coin not found",
+ );
+ }
+ const allDenoms = await tx.denominations.indexes.byExchangeBaseUrl
+ .iter()
+ .toArray();
+ const amountLeft = Amounts.sub(denom.value, pcs.coinContributions[i])
+ .amount;
+ const refreshCost = getTotalRefreshCost(allDenoms, denom, amountLeft);
+ costs.push(pcs.coinContributions[i]);
+ costs.push(refreshCost);
+ }
+ return Amounts.sum(costs).amount;
+ });
}
/**
@@ -154,39 +157,48 @@ export async function getEffectiveDepositAmount(
const amt: AmountJson[] = [];
const fees: AmountJson[] = [];
const exchangeSet: Set<string> = new Set();
- for (let i = 0; i < pcs.coinPubs.length; i++) {
- const coin = await ws.db.get(Stores.coins, pcs.coinPubs[i]);
- if (!coin) {
- throw Error("can't calculate deposit amountt, coin not found");
- }
- const denom = await ws.db.get(Stores.denominations, [
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- throw Error("can't find denomination to calculate deposit amount");
- }
- amt.push(pcs.coinContributions[i]);
- fees.push(denom.feeDeposit);
- exchangeSet.add(coin.exchangeBaseUrl);
- }
- for (const exchangeUrl of exchangeSet.values()) {
- const exchangeDetails = await ws.db.runWithReadTransaction(
- [Stores.exchanges, Stores.exchangeDetails],
- async (tx) => {
- return getExchangeDetails(tx, exchangeUrl);
- },
- );
- if (!exchangeDetails) {
- continue;
- }
- const fee = exchangeDetails.wireInfo.feesForType[wireType].find((x) => {
- return timestampIsBetween(getTimestampNow(), x.startStamp, x.endStamp);
- })?.wireFee;
- if (fee) {
- fees.push(fee);
- }
- }
+
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ denominations: x.denominations,
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ }))
+ .runReadOnly(async (tx) => {
+ for (let i = 0; i < pcs.coinPubs.length; i++) {
+ const coin = await tx.coins.get(pcs.coinPubs[i]);
+ if (!coin) {
+ throw Error("can't calculate deposit amountt, coin not found");
+ }
+ const denom = await tx.denominations.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ throw Error("can't find denomination to calculate deposit amount");
+ }
+ amt.push(pcs.coinContributions[i]);
+ fees.push(denom.feeDeposit);
+ exchangeSet.add(coin.exchangeBaseUrl);
+ }
+ for (const exchangeUrl of exchangeSet.values()) {
+ const exchangeDetails = await getExchangeDetails(tx, exchangeUrl);
+ if (!exchangeDetails) {
+ continue;
+ }
+ const fee = exchangeDetails.wireInfo.feesForType[wireType].find((x) => {
+ return timestampIsBetween(
+ getTimestampNow(),
+ x.startStamp,
+ x.endStamp,
+ );
+ })?.wireFee;
+ if (fee) {
+ fees.push(fee);
+ }
+ }
+ });
return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount;
}
@@ -243,105 +255,112 @@ export async function getCandidatePayCoins(
const candidateCoins: AvailableCoinInfo[] = [];
const wireFeesPerExchange: Record<string, AmountJson> = {};
- const exchanges = await ws.db.iter(Stores.exchanges).toArray();
- for (const exchange of exchanges) {
- let isOkay = false;
- const exchangeDetails = await ws.db.runWithReadTransaction(
- [Stores.exchanges, Stores.exchangeDetails],
- async (tx) => {
- return getExchangeDetails(tx, exchange.baseUrl);
- },
- );
- if (!exchangeDetails) {
- continue;
- }
- const exchangeFees = exchangeDetails.wireInfo;
- if (!exchangeFees) {
- continue;
- }
-
- // is the exchange explicitly allowed?
- for (const allowedExchange of req.allowedExchanges) {
- if (allowedExchange.exchangePub === exchangeDetails.masterPublicKey) {
- isOkay = true;
- break;
- }
- }
+ await ws.db
+ .mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ denominations: x.denominations,
+ coins: x.coins,
+ }))
+ .runReadOnly(async (tx) => {
+ const exchanges = await tx.exchanges.iter().toArray();
+ for (const exchange of exchanges) {
+ let isOkay = false;
+ const exchangeDetails = await getExchangeDetails(tx, exchange.baseUrl);
+ if (!exchangeDetails) {
+ continue;
+ }
+ const exchangeFees = exchangeDetails.wireInfo;
+ if (!exchangeFees) {
+ continue;
+ }
- // is the exchange allowed because of one of its auditors?
- if (!isOkay) {
- for (const allowedAuditor of req.allowedAuditors) {
- for (const auditor of exchangeDetails.auditors) {
- if (auditor.auditor_pub === allowedAuditor.auditorPub) {
+ // is the exchange explicitly allowed?
+ for (const allowedExchange of req.allowedExchanges) {
+ if (allowedExchange.exchangePub === exchangeDetails.masterPublicKey) {
isOkay = true;
break;
}
}
- if (isOkay) {
- break;
+
+ // is the exchange allowed because of one of its auditors?
+ if (!isOkay) {
+ for (const allowedAuditor of req.allowedAuditors) {
+ for (const auditor of exchangeDetails.auditors) {
+ if (auditor.auditor_pub === allowedAuditor.auditorPub) {
+ isOkay = true;
+ break;
+ }
+ }
+ if (isOkay) {
+ break;
+ }
+ }
}
- }
- }
- if (!isOkay) {
- continue;
- }
+ if (!isOkay) {
+ continue;
+ }
- const coins = await ws.db
- .iterIndex(Stores.coins.exchangeBaseUrlIndex, exchange.baseUrl)
- .toArray();
+ const coins = await tx.coins.indexes.byBaseUrl
+ .iter(exchange.baseUrl)
+ .toArray();
- if (!coins || coins.length === 0) {
- continue;
- }
+ if (!coins || coins.length === 0) {
+ continue;
+ }
- // Denomination of the first coin, we assume that all other
- // coins have the same currency
- const firstDenom = await ws.db.get(Stores.denominations, [
- exchange.baseUrl,
- coins[0].denomPubHash,
- ]);
- if (!firstDenom) {
- throw Error("db inconsistent");
- }
- const currency = firstDenom.value.currency;
- for (const coin of coins) {
- const denom = await ws.db.get(Stores.denominations, [
- exchange.baseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- throw Error("db inconsistent");
- }
- if (denom.value.currency !== currency) {
- logger.warn(
- `same pubkey for different currencies at exchange ${exchange.baseUrl}`,
- );
- continue;
- }
- if (!isSpendableCoin(coin, denom)) {
- continue;
- }
- candidateCoins.push({
- availableAmount: coin.currentAmount,
- coinPub: coin.coinPub,
- denomPub: coin.denomPub,
- feeDeposit: denom.feeDeposit,
- exchangeBaseUrl: denom.exchangeBaseUrl,
- });
- }
+ // Denomination of the first coin, we assume that all other
+ // coins have the same currency
+ const firstDenom = await tx.denominations.get([
+ exchange.baseUrl,
+ coins[0].denomPubHash,
+ ]);
+ if (!firstDenom) {
+ throw Error("db inconsistent");
+ }
+ const currency = firstDenom.value.currency;
+ for (const coin of coins) {
+ const denom = await tx.denominations.get([
+ exchange.baseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ throw Error("db inconsistent");
+ }
+ if (denom.value.currency !== currency) {
+ logger.warn(
+ `same pubkey for different currencies at exchange ${exchange.baseUrl}`,
+ );
+ continue;
+ }
+ if (!isSpendableCoin(coin, denom)) {
+ continue;
+ }
+ candidateCoins.push({
+ availableAmount: coin.currentAmount,
+ coinPub: coin.coinPub,
+ denomPub: coin.denomPub,
+ feeDeposit: denom.feeDeposit,
+ exchangeBaseUrl: denom.exchangeBaseUrl,
+ });
+ }
- let wireFee: AmountJson | undefined;
- for (const fee of exchangeFees.feesForType[req.wireMethod] || []) {
- if (fee.startStamp <= req.timestamp && fee.endStamp >= req.timestamp) {
- wireFee = fee.wireFee;
- break;
+ let wireFee: AmountJson | undefined;
+ for (const fee of exchangeFees.feesForType[req.wireMethod] || []) {
+ if (
+ fee.startStamp <= req.timestamp &&
+ fee.endStamp >= req.timestamp
+ ) {
+ wireFee = fee.wireFee;
+ break;
+ }
+ }
+ if (wireFee) {
+ wireFeesPerExchange[exchange.baseUrl] = wireFee;
+ }
}
- }
- if (wireFee) {
- wireFeesPerExchange[exchange.baseUrl] = wireFee;
- }
- }
+ });
return {
candidateCoins,
@@ -351,15 +370,15 @@ export async function getCandidatePayCoins(
export async function applyCoinSpend(
ws: InternalWalletState,
- tx: TransactionHandle<
- | typeof Stores.coins
- | typeof Stores.refreshGroups
- | typeof Stores.denominations
- >,
+ tx: GetReadWriteAccess<{
+ coins: typeof WalletStoresV1.coins;
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ denominations: typeof WalletStoresV1.denominations;
+ }>,
coinSelection: PayCoinSelection,
) {
for (let i = 0; i < coinSelection.coinPubs.length; i++) {
- const coin = await tx.get(Stores.coins, coinSelection.coinPubs[i]);
+ const coin = await tx.coins.get(coinSelection.coinPubs[i]);
if (!coin) {
throw Error("coin allocated for payment doesn't exist anymore");
}
@@ -379,7 +398,7 @@ export async function applyCoinSpend(
throw Error("not enough remaining balance on coin for payment");
}
coin.currentAmount = remaining.amount;
- await tx.put(Stores.coins, coin);
+ await tx.coins.put(coin);
}
const refreshCoinPubs = coinSelection.coinPubs.map((x) => ({
coinPub: x,
@@ -437,26 +456,25 @@ async function recordConfirmPay(
noncePub: proposal.noncePub,
};
- await ws.db.runWithWriteTransaction(
- [
- Stores.coins,
- Stores.purchases,
- Stores.proposals,
- Stores.refreshGroups,
- Stores.denominations,
- ],
- async (tx) => {
- const p = await tx.get(Stores.proposals, proposal.proposalId);
+ await ws.db
+ .mktx((x) => ({
+ proposals: x.proposals,
+ purchases: x.purchases,
+ coins: x.coins,
+ refreshGroups: x.refreshGroups,
+ denominations: x.denominations,
+ }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.proposals.get(proposal.proposalId);
if (p) {
p.proposalStatus = ProposalStatus.ACCEPTED;
p.lastError = undefined;
p.retryInfo = initRetryInfo(false);
- await tx.put(Stores.proposals, p);
+ await tx.proposals.put(p);
}
- await tx.put(Stores.purchases, t);
+ await tx.purchases.put(t);
await applyCoinSpend(ws, tx, coinSelection);
- },
- );
+ });
ws.notify({
type: NotificationType.ProposalAccepted,
@@ -470,19 +488,21 @@ async function incrementProposalRetry(
proposalId: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.proposals], async (tx) => {
- const pr = await tx.get(Stores.proposals, proposalId);
- if (!pr) {
- return;
- }
- if (!pr.retryInfo) {
- return;
- }
- pr.retryInfo.retryCounter++;
- updateRetryInfoTimeout(pr.retryInfo);
- pr.lastError = err;
- await tx.put(Stores.proposals, pr);
- });
+ await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadWrite(async (tx) => {
+ const pr = await tx.proposals.get(proposalId);
+ if (!pr) {
+ return;
+ }
+ if (!pr.retryInfo) {
+ return;
+ }
+ pr.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(pr.retryInfo);
+ pr.lastError = err;
+ await tx.proposals.put(pr);
+ });
if (err) {
ws.notify({ type: NotificationType.ProposalOperationError, error: err });
}
@@ -494,19 +514,21 @@ async function incrementPurchasePayRetry(
err: TalerErrorDetails | undefined,
): Promise<void> {
logger.warn("incrementing purchase pay retry with error", err);
- await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => {
- const pr = await tx.get(Stores.purchases, proposalId);
- if (!pr) {
- return;
- }
- if (!pr.payRetryInfo) {
- return;
- }
- pr.payRetryInfo.retryCounter++;
- updateRetryInfoTimeout(pr.payRetryInfo);
- pr.lastPayError = err;
- await tx.put(Stores.purchases, pr);
- });
+ await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadWrite(async (tx) => {
+ const pr = await tx.purchases.get(proposalId);
+ if (!pr) {
+ return;
+ }
+ if (!pr.payRetryInfo) {
+ return;
+ }
+ pr.payRetryInfo.retryCounter++;
+ updateRetryInfoTimeout(pr.payRetryInfo);
+ pr.lastPayError = err;
+ await tx.purchases.put(pr);
+ });
if (err) {
ws.notify({ type: NotificationType.PayOperationError, error: err });
}
@@ -529,12 +551,15 @@ async function resetDownloadProposalRetry(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
- await ws.db.mutate(Stores.proposals, proposalId, (x) => {
- if (x.retryInfo.active) {
- x.retryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.proposals.get(proposalId);
+ if (p && p.retryInfo.active) {
+ p.retryInfo = initRetryInfo();
+ await tx.proposals.put(p);
+ }
+ });
}
async function failProposalPermanently(
@@ -542,12 +567,18 @@ async function failProposalPermanently(
proposalId: string,
err: TalerErrorDetails,
): Promise<void> {
- await ws.db.mutate(Stores.proposals, proposalId, (x) => {
- x.retryInfo.active = false;
- x.lastError = err;
- x.proposalStatus = ProposalStatus.PERMANENTLY_FAILED;
- return x;
- });
+ await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.proposals.get(proposalId);
+ if (!p) {
+ return;
+ }
+ p.retryInfo.active = false;
+ p.lastError = err;
+ p.proposalStatus = ProposalStatus.PERMANENTLY_FAILED;
+ await tx.proposals.put(p);
+ });
}
function getProposalRequestTimeout(proposal: ProposalRecord): Duration {
@@ -616,7 +647,11 @@ async function processDownloadProposalImpl(
if (forceNow) {
await resetDownloadProposalRetry(ws, proposalId);
}
- const proposal = await ws.db.get(Stores.proposals, proposalId);
+ const proposal = await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadOnly(async (tx) => {
+ return tx.proposals.get(proposalId);
+ });
if (!proposal) {
return;
}
@@ -750,10 +785,10 @@ async function processDownloadProposalImpl(
proposalResp.sig,
);
- await ws.db.runWithWriteTransaction(
- [Stores.proposals, Stores.purchases],
- async (tx) => {
- const p = await tx.get(Stores.proposals, proposalId);
+ await ws.db
+ .mktx((x) => ({ proposals: x.proposals, purchases: x.purchases }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.proposals.get(proposalId);
if (!p) {
return;
}
@@ -769,22 +804,20 @@ async function processDownloadProposalImpl(
(fulfillmentUrl.startsWith("http://") ||
fulfillmentUrl.startsWith("https://"))
) {
- const differentPurchase = await tx.getIndexed(
- Stores.purchases.fulfillmentUrlIndex,
+ const differentPurchase = await tx.purchases.indexes.byFulfillmentUrl.get(
fulfillmentUrl,
);
if (differentPurchase) {
logger.warn("repurchase detected");
p.proposalStatus = ProposalStatus.REPURCHASE;
p.repurchaseProposalId = differentPurchase.proposalId;
- await tx.put(Stores.proposals, p);
+ await tx.proposals.put(p);
return;
}
}
p.proposalStatus = ProposalStatus.PROPOSED;
- await tx.put(Stores.proposals, p);
- },
- );
+ await tx.proposals.put(p);
+ });
ws.notify({
type: NotificationType.ProposalDownloaded,
@@ -806,10 +839,14 @@ async function startDownloadProposal(
sessionId: string | undefined,
claimToken: string | undefined,
): Promise<string> {
- const oldProposal = await ws.db.getIndexed(
- Stores.proposals.urlAndOrderIdIndex,
- [merchantBaseUrl, orderId],
- );
+ const oldProposal = await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadOnly(async (tx) => {
+ return tx.proposals.indexes.byUrlAndOrderId.get([
+ merchantBaseUrl,
+ orderId,
+ ]);
+ });
if (oldProposal) {
await processDownloadProposal(ws, oldProposal.proposalId);
return oldProposal.proposalId;
@@ -834,17 +871,19 @@ async function startDownloadProposal(
downloadSessionId: sessionId,
};
- await ws.db.runWithWriteTransaction([Stores.proposals], async (tx) => {
- const existingRecord = await tx.getIndexed(
- Stores.proposals.urlAndOrderIdIndex,
- [merchantBaseUrl, orderId],
- );
- if (existingRecord) {
- // Created concurrently
- return;
- }
- await tx.put(Stores.proposals, proposalRecord);
- });
+ await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadWrite(async (tx) => {
+ const existingRecord = tx.proposals.indexes.byUrlAndOrderId.get([
+ merchantBaseUrl,
+ orderId,
+ ]);
+ if (existingRecord) {
+ // Created concurrently
+ return;
+ }
+ await tx.proposals.put(proposalRecord);
+ });
await processDownloadProposal(ws, proposalId);
return proposalId;
@@ -857,37 +896,38 @@ async function storeFirstPaySuccess(
paySig: string,
): Promise<void> {
const now = getTimestampNow();
- await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => {
- const purchase = await tx.get(Stores.purchases, proposalId);
+ await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadWrite(async (tx) => {
+ const purchase = await tx.purchases.get(proposalId);
- if (!purchase) {
- logger.warn("purchase does not exist anymore");
- return;
- }
- const isFirst = purchase.timestampFirstSuccessfulPay === undefined;
- if (!isFirst) {
- logger.warn("payment success already stored");
- return;
- }
- purchase.timestampFirstSuccessfulPay = now;
- purchase.paymentSubmitPending = false;
- purchase.lastPayError = undefined;
- purchase.lastSessionId = sessionId;
- purchase.payRetryInfo = initRetryInfo(false);
- purchase.merchantPaySig = paySig;
- if (isFirst) {
- const ar = purchase.download.contractData.autoRefund;
- if (ar) {
- logger.info("auto_refund present");
- purchase.refundQueryRequested = true;
- purchase.refundStatusRetryInfo = initRetryInfo();
- purchase.lastRefundStatusError = undefined;
- purchase.autoRefundDeadline = timestampAddDuration(now, ar);
+ if (!purchase) {
+ logger.warn("purchase does not exist anymore");
+ return;
}
- }
-
- await tx.put(Stores.purchases, purchase);
- });
+ const isFirst = purchase.timestampFirstSuccessfulPay === undefined;
+ if (!isFirst) {
+ logger.warn("payment success already stored");
+ return;
+ }
+ purchase.timestampFirstSuccessfulPay = now;
+ purchase.paymentSubmitPending = false;
+ purchase.lastPayError = undefined;
+ purchase.lastSessionId = sessionId;
+ purchase.payRetryInfo = initRetryInfo(false);
+ purchase.merchantPaySig = paySig;
+ if (isFirst) {
+ const ar = purchase.download.contractData.autoRefund;
+ if (ar) {
+ logger.info("auto_refund present");
+ purchase.refundQueryRequested = true;
+ purchase.refundStatusRetryInfo = initRetryInfo();
+ purchase.lastRefundStatusError = undefined;
+ purchase.autoRefundDeadline = timestampAddDuration(now, ar);
+ }
+ }
+ await tx.purchases.put(purchase);
+ });
}
async function storePayReplaySuccess(
@@ -895,23 +935,25 @@ async function storePayReplaySuccess(
proposalId: string,
sessionId: string | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => {
- const purchase = await tx.get(Stores.purchases, proposalId);
+ await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadWrite(async (tx) => {
+ const purchase = await tx.purchases.get(proposalId);
- if (!purchase) {
- logger.warn("purchase does not exist anymore");
- return;
- }
- const isFirst = purchase.timestampFirstSuccessfulPay === undefined;
- if (isFirst) {
- throw Error("invalid payment state");
- }
- purchase.paymentSubmitPending = false;
- purchase.lastPayError = undefined;
- purchase.payRetryInfo = initRetryInfo(false);
- purchase.lastSessionId = sessionId;
- await tx.put(Stores.purchases, purchase);
- });
+ if (!purchase) {
+ logger.warn("purchase does not exist anymore");
+ return;
+ }
+ const isFirst = purchase.timestampFirstSuccessfulPay === undefined;
+ if (isFirst) {
+ throw Error("invalid payment state");
+ }
+ purchase.paymentSubmitPending = false;
+ purchase.lastPayError = undefined;
+ purchase.payRetryInfo = initRetryInfo(false);
+ purchase.lastSessionId = sessionId;
+ await tx.purchases.put(purchase);
+ });
}
/**
@@ -929,7 +971,11 @@ async function handleInsufficientFunds(
): Promise<void> {
logger.trace("handling insufficient funds, trying to re-select coins");
- const proposal = await ws.db.get(Stores.purchases, proposalId);
+ const proposal = await ws.db
+ .mktx((x) => ({ purchaes: x.purchases }))
+ .runReadOnly(async (tx) => {
+ return tx.purchaes.get(proposalId);
+ });
if (!proposal) {
return;
}
@@ -961,30 +1007,34 @@ async function handleInsufficientFunds(
const prevPayCoins: PreviousPayCoins = [];
- for (let i = 0; i < proposal.payCoinSelection.coinPubs.length; i++) {
- const coinPub = proposal.payCoinSelection.coinPubs[i];
- if (coinPub === brokenCoinPub) {
- continue;
- }
- const contrib = proposal.payCoinSelection.coinContributions[i];
- const coin = await ws.db.get(Stores.coins, coinPub);
- if (!coin) {
- continue;
- }
- const denom = await ws.db.get(Stores.denominations, [
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- continue;
- }
- prevPayCoins.push({
- coinPub,
- contribution: contrib,
- exchangeBaseUrl: coin.exchangeBaseUrl,
- feeDeposit: denom.feeDeposit,
+ await ws.db
+ .mktx((x) => ({ coins: x.coins, denominations: x.denominations }))
+ .runReadOnly(async (tx) => {
+ for (let i = 0; i < proposal.payCoinSelection.coinPubs.length; i++) {
+ const coinPub = proposal.payCoinSelection.coinPubs[i];
+ if (coinPub === brokenCoinPub) {
+ continue;
+ }
+ const contrib = proposal.payCoinSelection.coinContributions[i];
+ const coin = await tx.coins.get(coinPub);
+ if (!coin) {
+ continue;
+ }
+ const denom = await tx.denominations.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ continue;
+ }
+ prevPayCoins.push({
+ coinPub,
+ contribution: contrib,
+ exchangeBaseUrl: coin.exchangeBaseUrl,
+ feeDeposit: denom.feeDeposit,
+ });
+ }
});
- }
const res = selectPayCoins({
candidates,
@@ -1002,24 +1052,23 @@ async function handleInsufficientFunds(
logger.trace("re-selected coins");
- await ws.db.runWithWriteTransaction(
- [
- Stores.purchases,
- Stores.coins,
- Stores.denominations,
- Stores.refreshGroups,
- ],
- async (tx) => {
- const p = await tx.get(Stores.purchases, proposalId);
+ await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ coins: x.coins,
+ denominations: x.denominations,
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.purchases.get(proposalId);
if (!p) {
return;
}
p.payCoinSelection = res;
p.coinDepositPermissions = undefined;
- await tx.put(Stores.purchases, p);
+ await tx.purchases.put(p);
await applyCoinSpend(ws, tx, res);
- },
- );
+ });
}
/**
@@ -1032,7 +1081,11 @@ async function submitPay(
ws: InternalWalletState,
proposalId: string,
): Promise<ConfirmPayResult> {
- const purchase = await ws.db.get(Stores.purchases, proposalId);
+ const purchase = await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadOnly(async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!purchase) {
throw Error("Purchase not found: " + proposalId);
}
@@ -1202,7 +1255,11 @@ export async function checkPaymentByProposalId(
proposalId: string,
sessionId?: string,
): Promise<PreparePayResult> {
- let proposal = await ws.db.get(Stores.proposals, proposalId);
+ let proposal = await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadOnly(async (tx) => {
+ return tx.proposals.get(proposalId);
+ });
if (!proposal) {
throw Error(`could not get proposal ${proposalId}`);
}
@@ -1212,7 +1269,11 @@ export async function checkPaymentByProposalId(
throw Error("invalid proposal state");
}
logger.trace("using existing purchase for same product");
- proposal = await ws.db.get(Stores.proposals, existingProposalId);
+ proposal = await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadOnly(async (tx) => {
+ return tx.proposals.get(existingProposalId);
+ });
if (!proposal) {
throw Error("existing proposal is in wrong state");
}
@@ -1231,7 +1292,11 @@ export async function checkPaymentByProposalId(
proposalId = proposal.proposalId;
// First check if we already paid for it.
- const purchase = await ws.db.get(Stores.purchases, proposalId);
+ const purchase = await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadOnly(async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!purchase) {
// If not already paid, check if we could pay for it.
@@ -1281,14 +1346,16 @@ export async function checkPaymentByProposalId(
logger.trace(
"automatically re-submitting payment with different session ID",
);
- await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => {
- const p = await tx.get(Stores.purchases, proposalId);
- if (!p) {
- return;
- }
- p.lastSessionId = sessionId;
- await tx.put(Stores.purchases, p);
- });
+ await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.purchases.get(proposalId);
+ if (!p) {
+ return;
+ }
+ p.lastSessionId = sessionId;
+ await tx.purchases.put(p);
+ });
const r = await guardOperationException(
() => submitPay(ws, proposalId),
(e: TalerErrorDetails): Promise<void> =>
@@ -1375,20 +1442,33 @@ export async function generateDepositPermissions(
contractData: WalletContractData,
): Promise<CoinDepositPermission[]> {
const depositPermissions: CoinDepositPermission[] = [];
+ const coinWithDenom: Array<{
+ coin: CoinRecord;
+ denom: DenominationRecord;
+ }> = [];
+ await ws.db
+ .mktx((x) => ({ coins: x.coins, denominations: x.denominations }))
+ .runReadOnly(async (tx) => {
+ for (let i = 0; i < payCoinSel.coinPubs.length; i++) {
+ const coin = await tx.coins.get(payCoinSel.coinPubs[i]);
+ if (!coin) {
+ throw Error("can't pay, allocated coin not found anymore");
+ }
+ const denom = await tx.denominations.get([
+ coin.exchangeBaseUrl,
+ coin.denomPubHash,
+ ]);
+ if (!denom) {
+ throw Error(
+ "can't pay, denomination of allocated coin not found anymore",
+ );
+ }
+ coinWithDenom.push({ coin, denom });
+ }
+ });
+
for (let i = 0; i < payCoinSel.coinPubs.length; i++) {
- const coin = await ws.db.get(Stores.coins, payCoinSel.coinPubs[i]);
- if (!coin) {
- throw Error("can't pay, allocated coin not found anymore");
- }
- const denom = await ws.db.get(Stores.denominations, [
- coin.exchangeBaseUrl,
- coin.denomPubHash,
- ]);
- if (!denom) {
- throw Error(
- "can't pay, denomination of allocated coin not found anymore",
- );
- }
+ const { coin, denom } = coinWithDenom[i];
const dp = await ws.cryptoApi.signDepositPermission({
coinPriv: coin.coinPriv,
coinPub: coin.coinPub,
@@ -1419,7 +1499,11 @@ export async function confirmPay(
logger.trace(
`executing confirmPay with proposalId ${proposalId} and sessionIdOverride ${sessionIdOverride}`,
);
- const proposal = await ws.db.get(Stores.proposals, proposalId);
+ const proposal = await ws.db
+ .mktx((x) => ({ proposals: x.proposals }))
+ .runReadOnly(async (tx) => {
+ return tx.proposals.get(proposalId);
+ });
if (!proposal) {
throw Error(`proposal with id ${proposalId} not found`);
@@ -1430,20 +1514,24 @@ export async function confirmPay(
throw Error("proposal is in invalid state");
}
- let purchase = await ws.db.get(Stores.purchases, proposalId);
+ const existingPurchase = await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadWrite(async (tx) => {
+ const purchase = await tx.purchases.get(proposalId);
+ if (
+ purchase &&
+ sessionIdOverride !== undefined &&
+ sessionIdOverride != purchase.lastSessionId
+ ) {
+ logger.trace(`changing session ID to ${sessionIdOverride}`);
+ purchase.lastSessionId = sessionIdOverride;
+ purchase.paymentSubmitPending = true;
+ await tx.purchases.put(purchase);
+ }
+ return purchase;
+ });
- if (purchase) {
- if (
- sessionIdOverride !== undefined &&
- sessionIdOverride != purchase.lastSessionId
- ) {
- logger.trace(`changing session ID to ${sessionIdOverride}`);
- await ws.db.mutate(Stores.purchases, purchase.proposalId, (x) => {
- x.lastSessionId = sessionIdOverride;
- x.paymentSubmitPending = true;
- return x;
- });
- }
+ if (existingPurchase) {
logger.trace("confirmPay: submitting payment for existing purchase");
return await guardOperationException(
() => submitPay(ws, proposalId),
@@ -1491,7 +1579,7 @@ export async function confirmPay(
res,
d.contractData,
);
- purchase = await recordConfirmPay(
+ await recordConfirmPay(
ws,
proposal,
res,
@@ -1523,12 +1611,15 @@ async function resetPurchasePayRetry(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
- await ws.db.mutate(Stores.purchases, proposalId, (x) => {
- if (x.payRetryInfo.active) {
- x.payRetryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.purchases.get(proposalId);
+ if (p) {
+ p.payRetryInfo = initRetryInfo();
+ await tx.purchases.put(p);
+ }
+ });
}
async function processPurchasePayImpl(
@@ -1539,7 +1630,11 @@ async function processPurchasePayImpl(
if (forceNow) {
await resetPurchasePayRetry(ws, proposalId);
}
- const purchase = await ws.db.get(Stores.purchases, proposalId);
+ const purchase = await ws.db
+ .mktx((x) => ({ purchases: x.purchases }))
+ .runReadOnly(async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!purchase) {
return;
}
@@ -1554,10 +1649,9 @@ export async function refuseProposal(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
- const success = await ws.db.runWithWriteTransaction(
- [Stores.proposals],
+ const success = await ws.db.mktx((x) => ({proposals: x.proposals})).runReadWrite(
async (tx) => {
- const proposal = await tx.get(Stores.proposals, proposalId);
+ const proposal = await tx.proposals.get(proposalId);
if (!proposal) {
logger.trace(`proposal ${proposalId} not found, won't refuse proposal`);
return false;
@@ -1566,7 +1660,7 @@ export async function refuseProposal(
return false;
}
proposal.proposalStatus = ProposalStatus.REFUSED;
- await tx.put(Stores.proposals, proposal);
+ await tx.proposals.put(proposal);
return true;
},
);
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
index 85f8faa18..d3904c426 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -21,8 +21,8 @@ import {
ExchangeUpdateStatus,
ProposalStatus,
ReserveRecordStatus,
- Stores,
AbortStatus,
+ WalletStoresV1,
} from "../db.js";
import {
PendingOperationsResponse,
@@ -37,10 +37,10 @@ import {
getDurationRemaining,
durationMin,
} from "@gnu-taler/taler-util";
-import { TransactionHandle } from "../util/query";
import { InternalWalletState } from "./state";
import { getBalancesInsideTransaction } from "./balance";
import { getExchangeDetails } from "./exchanges.js";
+import { GetReadOnlyAccess } from "../util/query.js";
function updateRetryDelay(
oldDelay: Duration,
@@ -53,14 +53,15 @@ function updateRetryDelay(
}
async function gatherExchangePending(
- tx: TransactionHandle<
- typeof Stores.exchanges | typeof Stores.exchangeDetails
- >,
+ tx: GetReadOnlyAccess<{
+ exchanges: typeof WalletStoresV1.exchanges;
+ exchangeDetails: typeof WalletStoresV1.exchangeDetails;
+ }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
- await tx.iter(Stores.exchanges).forEachAsync(async (e) => {
+ await tx.exchanges.iter().forEachAsync(async (e) => {
switch (e.updateStatus) {
case ExchangeUpdateStatus.Finished:
if (e.lastError) {
@@ -153,13 +154,13 @@ async function gatherExchangePending(
}
async function gatherReservePending(
- tx: TransactionHandle<typeof Stores.reserves>,
+ tx: GetReadOnlyAccess<{ reserves: typeof WalletStoresV1.reserves }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
// FIXME: this should be optimized by using an index for "onlyDue==true".
- await tx.iter(Stores.reserves).forEach((reserve) => {
+ await tx.reserves.iter().forEach((reserve) => {
const reserveType = reserve.bankInfo
? ReserveType.TalerBankWithdraw
: ReserveType.Manual;
@@ -207,12 +208,12 @@ async function gatherReservePending(
}
async function gatherRefreshPending(
- tx: TransactionHandle<typeof Stores.refreshGroups>,
+ tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
- await tx.iter(Stores.refreshGroups).forEach((r) => {
+ await tx.refreshGroups.iter().forEach((r) => {
if (r.timestampFinished) {
return;
}
@@ -236,12 +237,15 @@ async function gatherRefreshPending(
}
async function gatherWithdrawalPending(
- tx: TransactionHandle<typeof Stores.withdrawalGroups>,
+ tx: GetReadOnlyAccess<{
+ withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
+ planchets: typeof WalletStoresV1.planchets,
+ }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
- await tx.iter(Stores.withdrawalGroups).forEachAsync(async (wsr) => {
+ await tx.withdrawalGroups.iter().forEachAsync(async (wsr) => {
if (wsr.timestampFinish) {
return;
}
@@ -255,8 +259,8 @@ async function gatherWithdrawalPending(
}
let numCoinsWithdrawn = 0;
let numCoinsTotal = 0;
- await tx
- .iterIndexed(Stores.planchets.byGroup, wsr.withdrawalGroupId)
+ await tx.planchets.indexes.byGroup
+ .iter(wsr.withdrawalGroupId)
.forEach((x) => {
numCoinsTotal++;
if (x.withdrawalDone) {
@@ -276,12 +280,12 @@ async function gatherWithdrawalPending(
}
async function gatherProposalPending(
- tx: TransactionHandle<typeof Stores.proposals>,
+ tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
- await tx.iter(Stores.proposals).forEach((proposal) => {
+ await tx.proposals.iter().forEach((proposal) => {
if (proposal.proposalStatus == ProposalStatus.PROPOSED) {
if (onlyDue) {
return;
@@ -327,12 +331,12 @@ async function gatherProposalPending(
}
async function gatherTipPending(
- tx: TransactionHandle<typeof Stores.tips>,
+ tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
- await tx.iter(Stores.tips).forEach((tip) => {
+ await tx.tips.iter().forEach((tip) => {
if (tip.pickedUpTimestamp) {
return;
}
@@ -357,12 +361,12 @@ async function gatherTipPending(
}
async function gatherPurchasePending(
- tx: TransactionHandle<typeof Stores.purchases>,
+ tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
- await tx.iter(Stores.purchases).forEach((pr) => {
+ await tx.purchases.iter().forEach((pr) => {
if (pr.paymentSubmitPending && pr.abortStatus === AbortStatus.None) {
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
@@ -400,12 +404,12 @@ async function gatherPurchasePending(
}
async function gatherRecoupPending(
- tx: TransactionHandle<typeof Stores.recoupGroups>,
+ tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
- await tx.iter(Stores.recoupGroups).forEach((rg) => {
+ await tx.recoupGroups.iter().forEach((rg) => {
if (rg.timestampFinished) {
return;
}
@@ -428,12 +432,12 @@ async function gatherRecoupPending(
}
async function gatherDepositPending(
- tx: TransactionHandle<typeof Stores.depositGroups>,
+ tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> {
- await tx.iter(Stores.depositGroups).forEach((dg) => {
+ await tx.depositGroups.iter().forEach((dg) => {
if (dg.timestampFinished) {
return;
}
@@ -460,20 +464,20 @@ export async function getPendingOperations(
{ onlyDue = false } = {},
): Promise<PendingOperationsResponse> {
const now = getTimestampNow();
- return await ws.db.runWithReadTransaction(
- [
- Stores.exchanges,
- Stores.reserves,
- Stores.refreshGroups,
- Stores.coins,
- Stores.withdrawalGroups,
- Stores.proposals,
- Stores.tips,
- Stores.purchases,
- Stores.recoupGroups,
- Stores.planchets,
- Stores.depositGroups,
- ],
+ return await ws.db.mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ reserves: x.reserves,
+ refreshGroups: x.refreshGroups,
+ coins: x.coins,
+ withdrawalGroups: x.withdrawalGroups,
+ proposals: x.proposals,
+ tips: x.tips,
+ purchases: x.purchases,
+ planchets: x.planchets,
+ depositGroups: x.depositGroups,
+ recoupGroups: x.recoupGroups,
+ })).runReadWrite(
async (tx) => {
const walletBalance = await getBalancesInsideTransaction(ws, tx);
const resp: PendingOperationsResponse = {
diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts
index da01ca825..7dac7faf4 100644
--- a/packages/taler-wallet-core/src/operations/recoup.ts
+++ b/packages/taler-wallet-core/src/operations/recoup.ts
@@ -40,20 +40,19 @@ import {
RecoupGroupRecord,
RefreshCoinSource,
ReserveRecordStatus,
- Stores,
WithdrawCoinSource,
+ WalletStoresV1,
} from "../db.js";
import { readSuccessResponseJsonOrThrow } from "../util/http";
import { Logger } from "@gnu-taler/taler-util";
-import { TransactionHandle } from "../util/query";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries";
import { URL } from "../util/url";
import { guardOperationException } from "./errors";
-import { getExchangeDetails } from "./exchanges.js";
import { createRefreshGroup, processRefreshGroup } from "./refresh";
import { getReserveRequestTimeout, processReserve } from "./reserves";
import { InternalWalletState } from "./state";
+import { GetReadWriteAccess } from "../util/query.js";
const logger = new Logger("operations/recoup.ts");
@@ -62,19 +61,23 @@ async function incrementRecoupRetry(
recoupGroupId: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.recoupGroups], async (tx) => {
- const r = await tx.get(Stores.recoupGroups, recoupGroupId);
- if (!r) {
- return;
- }
- if (!r.retryInfo) {
- return;
- }
- r.retryInfo.retryCounter++;
- updateRetryInfoTimeout(r.retryInfo);
- r.lastError = err;
- await tx.put(Stores.recoupGroups, r);
- });
+ await ws.db
+ .mktx((x) => ({
+ recoupGroups: x.recoupGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.recoupGroups.get(recoupGroupId);
+ if (!r) {
+ return;
+ }
+ if (!r.retryInfo) {
+ return;
+ }
+ r.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(r.retryInfo);
+ r.lastError = err;
+ await tx.recoupGroups.put(r);
+ });
if (err) {
ws.notify({ type: NotificationType.RecoupOperationError, error: err });
}
@@ -82,7 +85,12 @@ async function incrementRecoupRetry(
async function putGroupAsFinished(
ws: InternalWalletState,
- tx: TransactionHandle<typeof Stores.recoupGroups>,
+ tx: GetReadWriteAccess<{
+ recoupGroups: typeof WalletStoresV1.recoupGroups;
+ denominations: typeof WalletStoresV1.denominations;
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ coins: typeof WalletStoresV1.coins;
+ }>,
recoupGroup: RecoupGroupRecord,
coinIdx: number,
): Promise<void> {
@@ -116,7 +124,7 @@ async function putGroupAsFinished(
});
}
}
- await tx.put(Stores.recoupGroups, recoupGroup);
+ await tx.recoupGroups.put(recoupGroup);
}
async function recoupTipCoin(
@@ -128,16 +136,23 @@ async function recoupTipCoin(
// We can't really recoup a coin we got via tipping.
// Thus we just put the coin to sleep.
// FIXME: somehow report this to the user
- await ws.db.runWithWriteTransaction([Stores.recoupGroups], async (tx) => {
- const recoupGroup = await tx.get(Stores.recoupGroups, recoupGroupId);
- if (!recoupGroup) {
- return;
- }
- if (recoupGroup.recoupFinishedPerCoin[coinIdx]) {
- return;
- }
- await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
- });
+ await ws.db
+ .mktx((x) => ({
+ recoupGroups: x.recoupGroups,
+ denominations: WalletStoresV1.denominations,
+ refreshGroups: WalletStoresV1.refreshGroups,
+ coins: WalletStoresV1.coins,
+ }))
+ .runReadWrite(async (tx) => {
+ const recoupGroup = await tx.recoupGroups.get(recoupGroupId);
+ if (!recoupGroup) {
+ return;
+ }
+ if (recoupGroup.recoupFinishedPerCoin[coinIdx]) {
+ return;
+ }
+ await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
+ });
}
async function recoupWithdrawCoin(
@@ -148,7 +163,13 @@ async function recoupWithdrawCoin(
cs: WithdrawCoinSource,
): Promise<void> {
const reservePub = cs.reservePub;
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reservePub);
+ });
if (!reserve) {
// FIXME: We should at least emit some pending operation / warning for this?
return;
@@ -172,35 +193,29 @@ async function recoupWithdrawCoin(
throw Error(`Coin's reserve doesn't match reserve on recoup`);
}
- const exchangeDetails = await ws.db.runWithReadTransaction(
- [Stores.exchanges, Stores.exchangeDetails],
- async (tx) => {
- return getExchangeDetails(tx, reserve.exchangeBaseUrl);
- },
- );
-
- if (!exchangeDetails) {
- // FIXME: report inconsistency?
- return;
- }
-
// FIXME: verify that our expectations about the amount match
- await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.denominations, Stores.reserves, Stores.recoupGroups],
- async (tx) => {
- const recoupGroup = await tx.get(Stores.recoupGroups, recoupGroupId);
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ denominations: x.denominations,
+ reserves: x.reserves,
+ recoupGroups: x.recoupGroups,
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const recoupGroup = await tx.recoupGroups.get(recoupGroupId);
if (!recoupGroup) {
return;
}
if (recoupGroup.recoupFinishedPerCoin[coinIdx]) {
return;
}
- const updatedCoin = await tx.get(Stores.coins, coin.coinPub);
+ const updatedCoin = await tx.coins.get(coin.coinPub);
if (!updatedCoin) {
return;
}
- const updatedReserve = await tx.get(Stores.reserves, reserve.reservePub);
+ const updatedReserve = await tx.reserves.get(reserve.reservePub);
if (!updatedReserve) {
return;
}
@@ -214,11 +229,10 @@ async function recoupWithdrawCoin(
updatedReserve.requestedQuery = true;
updatedReserve.retryInfo = initRetryInfo();
}
- await tx.put(Stores.coins, updatedCoin);
- await tx.put(Stores.reserves, updatedReserve);
+ await tx.coins.put(updatedCoin);
+ await tx.reserves.put(updatedReserve);
await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
- },
- );
+ });
ws.notify({
type: NotificationType.RecoupFinished,
@@ -250,38 +264,24 @@ async function recoupRefreshCoin(
throw Error(`Coin's oldCoinPub doesn't match reserve on recoup`);
}
- const exchangeDetails = await ws.db.runWithReadTransaction(
- [Stores.exchanges, Stores.exchangeDetails],
- async (tx) => {
- // FIXME: Get the exchange details based on the
- // exchange master public key instead of via just the URL.
- return getExchangeDetails(tx, coin.exchangeBaseUrl);
- },
- );
- if (!exchangeDetails) {
- // FIXME: report inconsistency?
- logger.warn("exchange details for recoup not found");
- return;
- }
-
- await ws.db.runWithWriteTransaction(
- [
- Stores.coins,
- Stores.denominations,
- Stores.reserves,
- Stores.recoupGroups,
- Stores.refreshGroups,
- ],
- async (tx) => {
- const recoupGroup = await tx.get(Stores.recoupGroups, recoupGroupId);
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ denominations: x.denominations,
+ reserves: x.reserves,
+ recoupGroups: x.recoupGroups,
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const recoupGroup = await tx.recoupGroups.get(recoupGroupId);
if (!recoupGroup) {
return;
}
if (recoupGroup.recoupFinishedPerCoin[coinIdx]) {
return;
}
- const oldCoin = await tx.get(Stores.coins, cs.oldCoinPub);
- const revokedCoin = await tx.get(Stores.coins, coin.coinPub);
+ const oldCoin = await tx.coins.get(cs.oldCoinPub);
+ const revokedCoin = await tx.coins.get(coin.coinPub);
if (!revokedCoin) {
logger.warn("revoked coin for recoup not found");
return;
@@ -300,23 +300,27 @@ async function recoupRefreshCoin(
Amounts.stringify(oldCoin.currentAmount),
);
recoupGroup.scheduleRefreshCoins.push(oldCoin.coinPub);
- await tx.put(Stores.coins, revokedCoin);
- await tx.put(Stores.coins, oldCoin);
+ await tx.coins.put(revokedCoin);
+ await tx.coins.put(oldCoin);
await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
- },
- );
+ });
}
async function resetRecoupGroupRetry(
ws: InternalWalletState,
recoupGroupId: string,
): Promise<void> {
- await ws.db.mutate(Stores.recoupGroups, recoupGroupId, (x) => {
- if (x.retryInfo.active) {
- x.retryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({
+ recoupGroups: x.recoupGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const x = await tx.recoupGroups.get(recoupGroupId);
+ if (x && x.retryInfo.active) {
+ x.retryInfo = initRetryInfo();
+ await tx.recoupGroups.put(x);
+ }
+ });
}
export async function processRecoupGroup(
@@ -342,7 +346,13 @@ async function processRecoupGroupImpl(
if (forceNow) {
await resetRecoupGroupRetry(ws, recoupGroupId);
}
- const recoupGroup = await ws.db.get(Stores.recoupGroups, recoupGroupId);
+ const recoupGroup = await ws.db
+ .mktx((x) => ({
+ recoupGroups: x.recoupGroups,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.recoupGroups.get(recoupGroupId);
+ });
if (!recoupGroup) {
return;
}
@@ -358,9 +368,15 @@ async function processRecoupGroupImpl(
const reserveSet = new Set<string>();
for (let i = 0; i < recoupGroup.coinPubs.length; i++) {
const coinPub = recoupGroup.coinPubs[i];
- const coin = await ws.db.get(Stores.coins, coinPub);
+ const coin = await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.coins.get(coinPub);
+ });
if (!coin) {
- throw Error(`Coin ${coinPub} not found, can't request payback`);
+ throw Error(`Coin ${coinPub} not found, can't request recoup`);
}
if (coin.coinSource.type === CoinSourceType.Withdraw) {
reserveSet.add(coin.coinSource.reservePub);
@@ -376,7 +392,12 @@ async function processRecoupGroupImpl(
export async function createRecoupGroup(
ws: InternalWalletState,
- tx: TransactionHandle<typeof Stores.recoupGroups | typeof Stores.coins>,
+ tx: GetReadWriteAccess<{
+ recoupGroups: typeof WalletStoresV1.recoupGroups;
+ denominations: typeof WalletStoresV1.denominations;
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ coins: typeof WalletStoresV1.coins;
+ }>,
coinPubs: string[],
): Promise<string> {
const recoupGroupId = encodeCrock(getRandomBytes(32));
@@ -396,7 +417,7 @@ export async function createRecoupGroup(
for (let coinIdx = 0; coinIdx < coinPubs.length; coinIdx++) {
const coinPub = coinPubs[coinIdx];
- const coin = await tx.get(Stores.coins, coinPub);
+ const coin = await tx.coins.get(coinPub);
if (!coin) {
await putGroupAsFinished(ws, tx, recoupGroup, coinIdx);
continue;
@@ -407,10 +428,10 @@ export async function createRecoupGroup(
}
recoupGroup.oldAmountPerCoin[coinIdx] = coin.currentAmount;
coin.currentAmount = Amounts.getZero(coin.currentAmount.currency);
- await tx.put(Stores.coins, coin);
+ await tx.coins.put(coin);
}
- await tx.put(Stores.recoupGroups, recoupGroup);
+ await tx.recoupGroups.put(recoupGroup);
return recoupGroupId;
}
@@ -420,22 +441,34 @@ async function processRecoup(
recoupGroupId: string,
coinIdx: number,
): Promise<void> {
- const recoupGroup = await ws.db.get(Stores.recoupGroups, recoupGroupId);
- if (!recoupGroup) {
- return;
- }
- if (recoupGroup.timestampFinished) {
- return;
- }
- if (recoupGroup.recoupFinishedPerCoin[coinIdx]) {
- return;
- }
+ const coin = await ws.db
+ .mktx((x) => ({
+ recoupGroups: x.recoupGroups,
+ coins: x.coins,
+ }))
+ .runReadOnly(async (tx) => {
+ const recoupGroup = await tx.recoupGroups.get(recoupGroupId);
+ if (!recoupGroup) {
+ return;
+ }
+ if (recoupGroup.timestampFinished) {
+ return;
+ }
+ if (recoupGroup.recoupFinishedPerCoin[coinIdx]) {
+ return;
+ }
- const coinPub = recoupGroup.coinPubs[coinIdx];
+ const coinPub = recoupGroup.coinPubs[coinIdx];
+
+ const coin = await tx.coins.get(coinPub);
+ if (!coin) {
+ throw Error(`Coin ${coinPub} not found, can't request payback`);
+ }
+ return coin;
+ });
- const coin = await ws.db.get(Stores.coins, coinPub);
if (!coin) {
- throw Error(`Coin ${coinPub} not found, can't request payback`);
+ return;
}
const cs = coin.coinSource;
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts
index 6f4c9725a..8d21e811d 100644
--- a/packages/taler-wallet-core/src/operations/refresh.ts
+++ b/packages/taler-wallet-core/src/operations/refresh.ts
@@ -22,7 +22,7 @@ import {
DenominationRecord,
RefreshGroupRecord,
RefreshPlanchet,
- Stores,
+ WalletStoresV1,
} from "../db.js";
import {
codecForExchangeMeltResponse,
@@ -38,7 +38,6 @@ import { amountToPretty } from "@gnu-taler/taler-util";
import { readSuccessResponseJsonOrThrow } from "../util/http";
import { checkDbInvariant } from "../util/invariants";
import { Logger } from "@gnu-taler/taler-util";
-import { TransactionHandle } from "../util/query";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries";
import {
Duration,
@@ -57,6 +56,8 @@ import { updateExchangeFromUrl } from "./exchanges";
import { EXCHANGE_COINS_LOCK, InternalWalletState } from "./state";
import { isWithdrawableDenom, selectWithdrawalDenominations } from "./withdraw";
import { RefreshNewDenomInfo } from "../crypto/cryptoTypes.js";
+import { GetReadWriteAccess } from "../util/query.js";
+import { Wallet } from "../wallet.js";
const logger = new Logger("refresh.ts");
@@ -95,7 +96,7 @@ export function getTotalRefreshCost(
}
/**
- * Create a refresh session inside a refresh group.
+ * Create a refresh session for one particular coin inside a refresh group.
*/
async function refreshCreateSession(
ws: InternalWalletState,
@@ -105,45 +106,68 @@ async function refreshCreateSession(
logger.trace(
`creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
);
- const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
- if (!refreshGroup) {
- return;
- }
- if (refreshGroup.finishedPerCoin[coinIndex]) {
- return;
- }
- const existingRefreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
- if (existingRefreshSession) {
+
+ const d = await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ coins: x.coins,
+ }))
+ .runReadWrite(async (tx) => {
+ const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
+ if (refreshGroup.finishedPerCoin[coinIndex]) {
+ return;
+ }
+ const existingRefreshSession =
+ refreshGroup.refreshSessionPerCoin[coinIndex];
+ if (existingRefreshSession) {
+ return;
+ }
+ const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
+ const coin = await tx.coins.get(oldCoinPub);
+ if (!coin) {
+ throw Error("Can't refresh, coin not found");
+ }
+ return { refreshGroup, coin };
+ });
+
+ if (!d) {
return;
}
- const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
- const coin = await ws.db.get(Stores.coins, oldCoinPub);
- if (!coin) {
- throw Error("Can't refresh, coin not found");
- }
+
+ const { refreshGroup, coin } = d;
const { exchange } = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl);
if (!exchange) {
throw Error("db inconsistent: exchange of coin not found");
}
- const oldDenom = await ws.db.get(Stores.denominations, [
- exchange.baseUrl,
- coin.denomPubHash,
- ]);
+ const { availableAmount, availableDenoms } = await ws.db
+ .mktx((x) => ({
+ denominations: x.denominations,
+ }))
+ .runReadOnly(async (tx) => {
+ const oldDenom = await tx.denominations.get([
+ exchange.baseUrl,
+ coin.denomPubHash,
+ ]);
- if (!oldDenom) {
- throw Error("db inconsistent: denomination for coin not found");
- }
+ if (!oldDenom) {
+ throw Error("db inconsistent: denomination for coin not found");
+ }
- const availableDenoms: DenominationRecord[] = await ws.db
- .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl)
- .toArray();
+ const availableDenoms: DenominationRecord[] = await tx.denominations.indexes.byExchangeBaseUrl
+ .iter(exchange.baseUrl)
+ .toArray();
- const availableAmount = Amounts.sub(
- refreshGroup.inputPerCoin[coinIndex],
- oldDenom.feeRefresh,
- ).amount;
+ const availableAmount = Amounts.sub(
+ refreshGroup.inputPerCoin[coinIndex],
+ oldDenom.feeRefresh,
+ ).amount;
+ return { availableAmount, availableDenoms };
+ });
const newCoinDenoms = selectWithdrawalDenominations(
availableAmount,
@@ -156,10 +180,13 @@ async function refreshCreateSession(
availableAmount,
)} too small`,
);
- await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.refreshGroups],
- async (tx) => {
- const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
}
@@ -175,9 +202,8 @@ async function refreshCreateSession(
rg.timestampFinished = getTimestampNow();
rg.retryInfo = initRetryInfo(false);
}
- await tx.put(Stores.refreshGroups, rg);
- },
- );
+ await tx.refreshGroups.put(rg);
+ });
ws.notify({ type: NotificationType.RefreshUnwarranted });
return;
}
@@ -185,10 +211,13 @@ async function refreshCreateSession(
const sessionSecretSeed = encodeCrock(getRandomBytes(64));
// Store refresh session for this coin in the database.
- await ws.db.runWithWriteTransaction(
- [Stores.refreshGroups, Stores.coins],
- async (tx) => {
- const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
+ await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ coins: x.coins,
+ }))
+ .runReadWrite(async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
}
@@ -204,9 +233,8 @@ async function refreshCreateSession(
})),
amountRefreshOutput: newCoinDenoms.totalCoinValue,
};
- await tx.put(Stores.refreshGroups, rg);
- },
- );
+ await tx.refreshGroups.put(rg);
+ });
logger.info(
`created refresh session for coin #${coinIndex} in ${refreshGroupId}`,
);
@@ -222,48 +250,63 @@ async function refreshMelt(
refreshGroupId: string,
coinIndex: number,
): Promise<void> {
- const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
- if (!refreshGroup) {
- return;
- }
- const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
- if (!refreshSession) {
- return;
- }
- if (refreshSession.norevealIndex !== undefined) {
- return;
- }
+ const d = await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ coins: x.coins,
+ denominations: x.denominations,
+ }))
+ .runReadWrite(async (tx) => {
+ const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
+ const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
+ if (!refreshSession) {
+ return;
+ }
+ if (refreshSession.norevealIndex !== undefined) {
+ return;
+ }
- const oldCoin = await ws.db.get(
- Stores.coins,
- refreshGroup.oldCoinPubs[coinIndex],
- );
- checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
- const oldDenom = await ws.db.get(Stores.denominations, [
- oldCoin.exchangeBaseUrl,
- oldCoin.denomPubHash,
- ]);
- checkDbInvariant(!!oldDenom, "denomination for melted coin doesn't exist");
+ const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]);
+ checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
+ const oldDenom = await tx.denominations.get([
+ oldCoin.exchangeBaseUrl,
+ oldCoin.denomPubHash,
+ ]);
+ checkDbInvariant(
+ !!oldDenom,
+ "denomination for melted coin doesn't exist",
+ );
- const newCoinDenoms: RefreshNewDenomInfo[] = [];
+ const newCoinDenoms: RefreshNewDenomInfo[] = [];
- for (const dh of refreshSession.newDenoms) {
- const newDenom = await ws.db.get(Stores.denominations, [
- oldCoin.exchangeBaseUrl,
- dh.denomPubHash,
- ]);
- checkDbInvariant(
- !!newDenom,
- "new denomination for refresh not in database",
- );
- newCoinDenoms.push({
- count: dh.count,
- denomPub: newDenom.denomPub,
- feeWithdraw: newDenom.feeWithdraw,
- value: newDenom.value,
+ for (const dh of refreshSession.newDenoms) {
+ const newDenom = await tx.denominations.get([
+ oldCoin.exchangeBaseUrl,
+ dh.denomPubHash,
+ ]);
+ checkDbInvariant(
+ !!newDenom,
+ "new denomination for refresh not in database",
+ );
+ newCoinDenoms.push({
+ count: dh.count,
+ denomPub: newDenom.denomPub,
+ feeWithdraw: newDenom.feeWithdraw,
+ value: newDenom.value,
+ });
+ }
+ return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession };
});
+
+ if (!d) {
+ return;
}
+ const { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession } = d;
+
const derived = await ws.cryptoApi.deriveRefreshSession({
kappa: 3,
meltCoinDenomPubHash: oldCoin.denomPubHash,
@@ -303,20 +346,28 @@ async function refreshMelt(
refreshSession.norevealIndex = norevealIndex;
- await ws.db.mutate(Stores.refreshGroups, refreshGroupId, (rg) => {
- const rs = rg.refreshSessionPerCoin[coinIndex];
- if (rg.timestampFinished) {
- return;
- }
- if (!rs) {
- return;
- }
- if (rs.norevealIndex !== undefined) {
- return;
- }
- rs.norevealIndex = norevealIndex;
- return rg;
- });
+ await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
+ if (!rg) {
+ return;
+ }
+ if (rg.timestampFinished) {
+ return;
+ }
+ const rs = rg.refreshSessionPerCoin[coinIndex];
+ if (!rs) {
+ return;
+ }
+ if (rs.norevealIndex !== undefined) {
+ return;
+ }
+ rs.norevealIndex = norevealIndex;
+ await tx.refreshGroups.put(rg);
+ });
ws.notify({
type: NotificationType.RefreshMelted,
@@ -328,49 +379,78 @@ async function refreshReveal(
refreshGroupId: string,
coinIndex: number,
): Promise<void> {
- const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
- if (!refreshGroup) {
- return;
- }
- const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
- if (!refreshSession) {
- return;
- }
- const norevealIndex = refreshSession.norevealIndex;
- if (norevealIndex === undefined) {
- throw Error("can't reveal without melting first");
- }
+ const d = await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ coins: x.coins,
+ denominations: x.denominations,
+ }))
+ .runReadOnly(async (tx) => {
+ const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
+ if (!refreshGroup) {
+ return;
+ }
+ const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
+ if (!refreshSession) {
+ return;
+ }
+ const norevealIndex = refreshSession.norevealIndex;
+ if (norevealIndex === undefined) {
+ throw Error("can't reveal without melting first");
+ }
- const oldCoin = await ws.db.get(
- Stores.coins,
- refreshGroup.oldCoinPubs[coinIndex],
- );
- checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
- const oldDenom = await ws.db.get(Stores.denominations, [
- oldCoin.exchangeBaseUrl,
- oldCoin.denomPubHash,
- ]);
- checkDbInvariant(!!oldDenom, "denomination for melted coin doesn't exist");
+ const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]);
+ checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
+ const oldDenom = await tx.denominations.get([
+ oldCoin.exchangeBaseUrl,
+ oldCoin.denomPubHash,
+ ]);
+ checkDbInvariant(
+ !!oldDenom,
+ "denomination for melted coin doesn't exist",
+ );
- const newCoinDenoms: RefreshNewDenomInfo[] = [];
+ const newCoinDenoms: RefreshNewDenomInfo[] = [];
- for (const dh of refreshSession.newDenoms) {
- const newDenom = await ws.db.get(Stores.denominations, [
- oldCoin.exchangeBaseUrl,
- dh.denomPubHash,
- ]);
- checkDbInvariant(
- !!newDenom,
- "new denomination for refresh not in database",
- );
- newCoinDenoms.push({
- count: dh.count,
- denomPub: newDenom.denomPub,
- feeWithdraw: newDenom.feeWithdraw,
- value: newDenom.value,
+ for (const dh of refreshSession.newDenoms) {
+ const newDenom = await tx.denominations.get([
+ oldCoin.exchangeBaseUrl,
+ dh.denomPubHash,
+ ]);
+ checkDbInvariant(
+ !!newDenom,
+ "new denomination for refresh not in database",
+ );
+ newCoinDenoms.push({
+ count: dh.count,
+ denomPub: newDenom.denomPub,
+ feeWithdraw: newDenom.feeWithdraw,
+ value: newDenom.value,
+ });
+ }
+ return {
+ oldCoin,
+ oldDenom,
+ newCoinDenoms,
+ refreshSession,
+ refreshGroup,
+ norevealIndex,
+ };
});
+
+ if (!d) {
+ return;
}
+ const {
+ oldCoin,
+ oldDenom,
+ newCoinDenoms,
+ refreshSession,
+ refreshGroup,
+ norevealIndex,
+ } = d;
+
const derived = await ws.cryptoApi.deriveRefreshSession({
kappa: 3,
meltCoinDenomPubHash: oldCoin.denomPubHash,
@@ -389,14 +469,6 @@ async function refreshReveal(
throw Error("refresh index error");
}
- const meltCoinRecord = await ws.db.get(
- Stores.coins,
- refreshGroup.oldCoinPubs[coinIndex],
- );
- if (!meltCoinRecord) {
- throw Error("inconsistent database");
- }
-
const evs = planchets.map((x: RefreshPlanchet) => x.coinEv);
const newDenomsFlat: string[] = [];
const linkSigs: string[] = [];
@@ -406,9 +478,9 @@ async function refreshReveal(
for (let j = 0; j < dsel.count; j++) {
const newCoinIndex = linkSigs.length;
const linkSig = await ws.cryptoApi.signCoinLink(
- meltCoinRecord.coinPriv,
+ oldCoin.coinPriv,
dsel.denomPubHash,
- meltCoinRecord.coinPub,
+ oldCoin.coinPub,
derived.transferPubs[norevealIndex],
planchets[newCoinIndex].coinEv,
);
@@ -447,10 +519,17 @@ async function refreshReveal(
for (let i = 0; i < refreshSession.newDenoms.length; i++) {
for (let j = 0; j < refreshSession.newDenoms[i].count; j++) {
const newCoinIndex = coins.length;
- const denom = await ws.db.get(Stores.denominations, [
- oldCoin.exchangeBaseUrl,
- refreshSession.newDenoms[i].denomPubHash,
- ]);
+ // FIXME: Look up in earlier transaction!
+ const denom = await ws.db
+ .mktx((x) => ({
+ denominations: x.denominations,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.denominations.get([
+ oldCoin.exchangeBaseUrl,
+ refreshSession.newDenoms[i].denomPubHash,
+ ]);
+ });
if (!denom) {
console.error("denom not found");
continue;
@@ -483,10 +562,13 @@ async function refreshReveal(
}
}
- await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.refreshGroups],
- async (tx) => {
- const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
logger.warn("no refresh session found");
return;
@@ -508,11 +590,10 @@ async function refreshReveal(
rg.retryInfo = initRetryInfo(false);
}
for (const coin of coins) {
- await tx.put(Stores.coins, coin);
+ await tx.coins.put(coin);
}
- await tx.put(Stores.refreshGroups, rg);
- },
- );
+ await tx.refreshGroups.put(rg);
+ });
logger.trace("refresh finished (end of reveal)");
ws.notify({
type: NotificationType.RefreshRevealed,
@@ -524,19 +605,23 @@ async function incrementRefreshRetry(
refreshGroupId: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.refreshGroups], async (tx) => {
- const r = await tx.get(Stores.refreshGroups, refreshGroupId);
- if (!r) {
- return;
- }
- if (!r.retryInfo) {
- return;
- }
- r.retryInfo.retryCounter++;
- updateRetryInfoTimeout(r.retryInfo);
- r.lastError = err;
- await tx.put(Stores.refreshGroups, r);
- });
+ await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.refreshGroups.get(refreshGroupId);
+ if (!r) {
+ return;
+ }
+ if (!r.retryInfo) {
+ return;
+ }
+ r.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(r.retryInfo);
+ r.lastError = err;
+ await tx.refreshGroups.put(r);
+ });
if (err) {
ws.notify({ type: NotificationType.RefreshOperationError, error: err });
}
@@ -562,14 +647,19 @@ export async function processRefreshGroup(
async function resetRefreshGroupRetry(
ws: InternalWalletState,
- refreshSessionId: string,
+ refreshGroupId: string,
): Promise<void> {
- await ws.db.mutate(Stores.refreshGroups, refreshSessionId, (x) => {
- if (x.retryInfo.active) {
- x.retryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const x = await tx.refreshGroups.get(refreshGroupId);
+ if (x && x.retryInfo.active) {
+ x.retryInfo = initRetryInfo();
+ await tx.refreshGroups.put(x);
+ }
+ });
}
async function processRefreshGroupImpl(
@@ -580,13 +670,20 @@ async function processRefreshGroupImpl(
if (forceNow) {
await resetRefreshGroupRetry(ws, refreshGroupId);
}
- const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ const refreshGroup = await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.refreshGroups.get(refreshGroupId);
+ });
if (!refreshGroup) {
return;
}
if (refreshGroup.timestampFinished) {
return;
}
+ // Process refresh sessions of the group in parallel.
const ps = refreshGroup.oldCoinPubs.map((x, i) =>
processRefreshSession(ws, refreshGroupId, i),
);
@@ -602,7 +699,11 @@ async function processRefreshSession(
logger.trace(
`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`,
);
- let refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ let refreshGroup = await ws.db
+ .mktx((x) => ({ refreshGroups: x.refreshGroups }))
+ .runReadOnly(async (tx) => {
+ return tx.refreshGroups.get(refreshGroupId);
+ });
if (!refreshGroup) {
return;
}
@@ -611,7 +712,11 @@ async function processRefreshSession(
}
if (!refreshGroup.refreshSessionPerCoin[coinIndex]) {
await refreshCreateSession(ws, refreshGroupId, coinIndex);
- refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
+ refreshGroup = await ws.db
+ .mktx((x) => ({ refreshGroups: x.refreshGroups }))
+ .runReadOnly(async (tx) => {
+ return tx.refreshGroups.get(refreshGroupId);
+ });
if (!refreshGroup) {
return;
}
@@ -646,11 +751,11 @@ async function processRefreshSession(
*/
export async function createRefreshGroup(
ws: InternalWalletState,
- tx: TransactionHandle<
- | typeof Stores.denominations
- | typeof Stores.coins
- | typeof Stores.refreshGroups
- >,
+ tx: GetReadWriteAccess<{
+ denominations: typeof WalletStoresV1.denominations;
+ coins: typeof WalletStoresV1.coins;
+ refreshGroups: typeof WalletStoresV1.refreshGroups;
+ }>,
oldCoinPubs: CoinPublicKey[],
reason: RefreshReason,
): Promise<RefreshGroupId> {
@@ -667,8 +772,8 @@ export async function createRefreshGroup(
if (denomsPerExchange[exchangeBaseUrl]) {
return denomsPerExchange[exchangeBaseUrl];
}
- const allDenoms = await tx
- .iterIndexed(Stores.denominations.exchangeBaseUrlIndex, exchangeBaseUrl)
+ const allDenoms = await tx.denominations.indexes.byExchangeBaseUrl
+ .iter(exchangeBaseUrl)
.filter((x) => {
return isWithdrawableDenom(x);
});
@@ -677,9 +782,9 @@ export async function createRefreshGroup(
};
for (const ocp of oldCoinPubs) {
- const coin = await tx.get(Stores.coins, ocp.coinPub);
+ const coin = await tx.coins.get(ocp.coinPub);
checkDbInvariant(!!coin, "coin must be in database");
- const denom = await tx.get(Stores.denominations, [
+ const denom = await tx.denominations.get([
coin.exchangeBaseUrl,
coin.denomPubHash,
]);
@@ -691,7 +796,7 @@ export async function createRefreshGroup(
inputPerCoin.push(refreshAmount);
coin.currentAmount = Amounts.getZero(refreshAmount.currency);
coin.status = CoinStatus.Dormant;
- await tx.put(Stores.coins, coin);
+ await tx.coins.put(coin);
const denoms = await getDenoms(coin.exchangeBaseUrl);
const cost = getTotalRefreshCost(denoms, denom, refreshAmount);
const output = Amounts.sub(refreshAmount, cost).amount;
@@ -718,7 +823,7 @@ export async function createRefreshGroup(
refreshGroup.timestampFinished = getTimestampNow();
}
- await tx.put(Stores.refreshGroups, refreshGroup);
+ await tx.refreshGroups.put(refreshGroup);
logger.trace(`created refresh group ${refreshGroupId}`);
@@ -760,20 +865,20 @@ export async function autoRefresh(
exchangeBaseUrl: string,
): Promise<void> {
await updateExchangeFromUrl(ws, exchangeBaseUrl, true);
- await ws.db.runWithWriteTransaction(
- [
- Stores.coins,
- Stores.denominations,
- Stores.refreshGroups,
- Stores.exchanges,
- ],
- async (tx) => {
- const exchange = await tx.get(Stores.exchanges, exchangeBaseUrl);
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ denominations: x.denominations,
+ refreshGroups: x.refreshGroups,
+ exchanges: x.exchanges,
+ }))
+ .runReadWrite(async (tx) => {
+ const exchange = await tx.exchanges.get(exchangeBaseUrl);
if (!exchange) {
return;
}
- const coins = await tx
- .iterIndexed(Stores.coins.exchangeBaseUrlIndex, exchangeBaseUrl)
+ const coins = await tx.coins.indexes.byBaseUrl
+ .iter(exchangeBaseUrl)
.toArray();
const refreshCoins: CoinPublicKey[] = [];
for (const coin of coins) {
@@ -783,7 +888,7 @@ export async function autoRefresh(
if (coin.suspended) {
continue;
}
- const denom = await tx.get(Stores.denominations, [
+ const denom = await tx.denominations.get([
exchangeBaseUrl,
coin.denomPubHash,
]);
@@ -800,8 +905,8 @@ export async function autoRefresh(
await createRefreshGroup(ws, tx, refreshCoins, RefreshReason.Scheduled);
}
- const denoms = await tx
- .iterIndexed(Stores.denominations.exchangeBaseUrlIndex, exchangeBaseUrl)
+ const denoms = await tx.denominations.indexes.byExchangeBaseUrl
+ .iter(exchangeBaseUrl)
.toArray();
let minCheckThreshold = timestampAddDuration(
getTimestampNow(),
@@ -817,7 +922,6 @@ export async function autoRefresh(
minCheckThreshold = timestampMin(minCheckThreshold, checkThreshold);
}
exchange.nextRefreshCheck = minCheckThreshold;
- await tx.put(Stores.exchanges, exchange);
- },
- );
+ await tx.exchanges.put(exchange);
+ });
}
diff --git a/packages/taler-wallet-core/src/operations/refund.ts b/packages/taler-wallet-core/src/operations/refund.ts
index 2e2ab7803..ba0674f06 100644
--- a/packages/taler-wallet-core/src/operations/refund.ts
+++ b/packages/taler-wallet-core/src/operations/refund.ts
@@ -48,13 +48,21 @@ import {
} from "@gnu-taler/taler-util";
import { Logger } from "@gnu-taler/taler-util";
import { readSuccessResponseJsonOrThrow } from "../util/http";
-import { TransactionHandle } from "../util/query";
import { URL } from "../util/url";
import { updateRetryInfoTimeout, initRetryInfo } from "../util/retries";
import { checkDbInvariant } from "../util/invariants";
import { TalerErrorCode } from "@gnu-taler/taler-util";
-import { Stores, PurchaseRecord, CoinStatus, RefundState, AbortStatus, RefundReason } from "../db.js";
+import {
+ PurchaseRecord,
+ CoinStatus,
+ RefundState,
+ AbortStatus,
+ RefundReason,
+ WalletStoresV1,
+} from "../db.js";
import { getTotalRefreshCost, createRefreshGroup } from "./refresh.js";
+import { GetReadWriteAccess } from "../util/query.js";
+import { Wallet } from "../wallet.js";
const logger = new Logger("refund.ts");
@@ -66,19 +74,23 @@ async function incrementPurchaseQueryRefundRetry(
proposalId: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => {
- const pr = await tx.get(Stores.purchases, proposalId);
- if (!pr) {
- return;
- }
- if (!pr.refundStatusRetryInfo) {
- return;
- }
- pr.refundStatusRetryInfo.retryCounter++;
- updateRetryInfoTimeout(pr.refundStatusRetryInfo);
- pr.lastRefundStatusError = err;
- await tx.put(Stores.purchases, pr);
- });
+ await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ }))
+ .runReadWrite(async (tx) => {
+ const pr = await tx.purchases.get(proposalId);
+ if (!pr) {
+ return;
+ }
+ if (!pr.refundStatusRetryInfo) {
+ return;
+ }
+ pr.refundStatusRetryInfo.retryCounter++;
+ updateRetryInfoTimeout(pr.refundStatusRetryInfo);
+ pr.lastRefundStatusError = err;
+ await tx.purchases.put(pr);
+ });
if (err) {
ws.notify({
type: NotificationType.RefundStatusOperationError,
@@ -92,7 +104,10 @@ function getRefundKey(d: MerchantCoinRefundStatus): string {
}
async function applySuccessfulRefund(
- tx: TransactionHandle<typeof Stores.coins | typeof Stores.denominations>,
+ tx: GetReadWriteAccess<{
+ coins: typeof WalletStoresV1.coins;
+ denominations: typeof WalletStoresV1.denominations;
+ }>,
p: PurchaseRecord,
refreshCoinsMap: Record<string, { coinPub: string }>,
r: MerchantCoinRefundSuccessStatus,
@@ -100,12 +115,12 @@ async function applySuccessfulRefund(
// FIXME: check signature before storing it as valid!
const refundKey = getRefundKey(r);
- const coin = await tx.get(Stores.coins, r.coin_pub);
+ const coin = await tx.coins.get(r.coin_pub);
if (!coin) {
logger.warn("coin not found, can't apply refund");
return;
}
- const denom = await tx.get(Stores.denominations, [
+ const denom = await tx.denominations.get([
coin.exchangeBaseUrl,
coin.denomPubHash,
]);
@@ -119,13 +134,10 @@ async function applySuccessfulRefund(
coin.currentAmount = Amounts.add(coin.currentAmount, refundAmount).amount;
coin.currentAmount = Amounts.sub(coin.currentAmount, refundFee).amount;
logger.trace(`coin amount after is ${Amounts.stringify(coin.currentAmount)}`);
- await tx.put(Stores.coins, coin);
+ await tx.coins.put(coin);
- const allDenoms = await tx
- .iterIndexed(
- Stores.denominations.exchangeBaseUrlIndex,
- coin.exchangeBaseUrl,
- )
+ const allDenoms = await tx.denominations.indexes.byExchangeBaseUrl
+ .iter(coin.exchangeBaseUrl)
.toArray();
const amountLeft = Amounts.sub(
@@ -153,18 +165,21 @@ async function applySuccessfulRefund(
}
async function storePendingRefund(
- tx: TransactionHandle<typeof Stores.denominations | typeof Stores.coins>,
+ tx: GetReadWriteAccess<{
+ denominations: typeof WalletStoresV1.denominations;
+ coins: typeof WalletStoresV1.coins;
+ }>,
p: PurchaseRecord,
r: MerchantCoinRefundFailureStatus,
): Promise<void> {
const refundKey = getRefundKey(r);
- const coin = await tx.get(Stores.coins, r.coin_pub);
+ const coin = await tx.coins.get(r.coin_pub);
if (!coin) {
logger.warn("coin not found, can't apply refund");
return;
}
- const denom = await tx.get(Stores.denominations, [
+ const denom = await tx.denominations.get([
coin.exchangeBaseUrl,
coin.denomPubHash,
]);
@@ -173,11 +188,8 @@ async function storePendingRefund(
throw Error("inconsistent database");
}
- const allDenoms = await tx
- .iterIndexed(
- Stores.denominations.exchangeBaseUrlIndex,
- coin.exchangeBaseUrl,
- )
+ const allDenoms = await tx.denominations.indexes.byExchangeBaseUrl
+ .iter(coin.exchangeBaseUrl)
.toArray();
const amountLeft = Amounts.sub(
@@ -205,19 +217,22 @@ async function storePendingRefund(
}
async function storeFailedRefund(
- tx: TransactionHandle<typeof Stores.coins | typeof Stores.denominations>,
+ tx: GetReadWriteAccess<{
+ coins: typeof WalletStoresV1.coins;
+ denominations: typeof WalletStoresV1.denominations;
+ }>,
p: PurchaseRecord,
refreshCoinsMap: Record<string, { coinPub: string }>,
r: MerchantCoinRefundFailureStatus,
): Promise<void> {
const refundKey = getRefundKey(r);
- const coin = await tx.get(Stores.coins, r.coin_pub);
+ const coin = await tx.coins.get(r.coin_pub);
if (!coin) {
logger.warn("coin not found, can't apply refund");
return;
}
- const denom = await tx.get(Stores.denominations, [
+ const denom = await tx.denominations.get([
coin.exchangeBaseUrl,
coin.denomPubHash,
]);
@@ -226,11 +241,8 @@ async function storeFailedRefund(
throw Error("inconsistent database");
}
- const allDenoms = await tx
- .iterIndexed(
- Stores.denominations.exchangeBaseUrlIndex,
- coin.exchangeBaseUrl,
- )
+ const allDenoms = await tx.denominations.indexes.byExchangeBaseUrl
+ .iter(coin.exchangeBaseUrl)
.toArray();
const amountLeft = Amounts.sub(
@@ -260,12 +272,12 @@ async function storeFailedRefund(
// Refund failed because the merchant didn't even try to deposit
// the coin yet, so we try to refresh.
if (r.exchange_code === TalerErrorCode.EXCHANGE_REFUND_DEPOSIT_NOT_FOUND) {
- const coin = await tx.get(Stores.coins, r.coin_pub);
+ const coin = await tx.coins.get(r.coin_pub);
if (!coin) {
logger.warn("coin not found, can't apply refund");
return;
}
- const denom = await tx.get(Stores.denominations, [
+ const denom = await tx.denominations.get([
coin.exchangeBaseUrl,
coin.denomPubHash,
]);
@@ -287,7 +299,7 @@ async function storeFailedRefund(
).amount;
}
refreshCoinsMap[coin.coinPub] = { coinPub: coin.coinPub };
- await tx.put(Stores.coins, coin);
+ await tx.coins.put(coin);
}
}
}
@@ -301,15 +313,15 @@ async function acceptRefunds(
logger.trace("handling refunds", refunds);
const now = getTimestampNow();
- await ws.db.runWithWriteTransaction(
- [
- Stores.purchases,
- Stores.coins,
- Stores.denominations,
- Stores.refreshGroups,
- ],
- async (tx) => {
- const p = await tx.get(Stores.purchases, proposalId);
+ await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ coins: x.coins,
+ denominations: x.denominations,
+ refreshGroups: x.refreshGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.purchases.get(proposalId);
if (!p) {
logger.error("purchase not found, not adding refunds");
return;
@@ -409,9 +421,8 @@ async function acceptRefunds(
logger.trace("refund query not done");
}
- await tx.put(Stores.purchases, p);
- },
- );
+ await tx.purchases.put(p);
+ });
ws.notify({
type: NotificationType.RefundQueried,
@@ -444,10 +455,16 @@ export async function applyRefund(
throw Error("invalid refund URI");
}
- let purchase = await ws.db.getIndexed(Stores.purchases.orderIdIndex, [
- parseResult.merchantBaseUrl,
- parseResult.orderId,
- ]);
+ let purchase = await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.purchases.indexes.byMerchantUrlAndOrderId.get([
+ parseResult.merchantBaseUrl,
+ parseResult.orderId,
+ ]);
+ });
if (!purchase) {
throw Error(
@@ -458,10 +475,12 @@ export async function applyRefund(
const proposalId = purchase.proposalId;
logger.info("processing purchase for refund");
- const success = await ws.db.runWithWriteTransaction(
- [Stores.purchases],
- async (tx) => {
- const p = await tx.get(Stores.purchases, proposalId);
+ const success = await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.purchases.get(proposalId);
if (!p) {
logger.error("no purchase found for refund URL");
return false;
@@ -469,10 +488,9 @@ export async function applyRefund(
p.refundQueryRequested = true;
p.lastRefundStatusError = undefined;
p.refundStatusRetryInfo = initRetryInfo();
- await tx.put(Stores.purchases, p);
+ await tx.purchases.put(p);
return true;
- },
- );
+ });
if (success) {
ws.notify({
@@ -481,7 +499,13 @@ export async function applyRefund(
await processPurchaseQueryRefund(ws, proposalId);
}
- purchase = await ws.db.get(Stores.purchases, proposalId);
+ purchase = await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!purchase) {
throw Error("purchase no longer exists");
@@ -559,12 +583,17 @@ async function resetPurchaseQueryRefundRetry(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
- await ws.db.mutate(Stores.purchases, proposalId, (x) => {
- if (x.refundStatusRetryInfo.active) {
- x.refundStatusRetryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ }))
+ .runReadWrite(async (tx) => {
+ const x = await tx.purchases.get(proposalId);
+ if (x && x.refundStatusRetryInfo.active) {
+ x.refundStatusRetryInfo = initRetryInfo();
+ await tx.purchases.put(x);
+ }
+ });
}
async function processPurchaseQueryRefundImpl(
@@ -575,7 +604,13 @@ async function processPurchaseQueryRefundImpl(
if (forceNow) {
await resetPurchaseQueryRefundRetry(ws, proposalId);
}
- const purchase = await ws.db.get(Stores.purchases, proposalId);
+ const purchase = await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.purchases.get(proposalId);
+ });
if (!purchase) {
return;
}
@@ -589,7 +624,6 @@ async function processPurchaseQueryRefundImpl(
`orders/${purchase.download.contractData.orderId}/refund`,
purchase.download.contractData.merchantBaseUrl,
);
-
logger.trace(`making refund request to ${requestUrl.href}`);
@@ -620,18 +654,25 @@ async function processPurchaseQueryRefundImpl(
);
const abortingCoins: AbortingCoin[] = [];
- for (let i = 0; i < purchase.payCoinSelection.coinPubs.length; i++) {
- const coinPub = purchase.payCoinSelection.coinPubs[i];
- const coin = await ws.db.get(Stores.coins, coinPub);
- checkDbInvariant(!!coin, "expected coin to be present");
- abortingCoins.push({
- coin_pub: coinPub,
- contribution: Amounts.stringify(
- purchase.payCoinSelection.coinContributions[i],
- ),
- exchange_url: coin.exchangeBaseUrl,
+
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ }))
+ .runReadOnly(async (tx) => {
+ for (let i = 0; i < purchase.payCoinSelection.coinPubs.length; i++) {
+ const coinPub = purchase.payCoinSelection.coinPubs[i];
+ const coin = await tx.coins.get(coinPub);
+ checkDbInvariant(!!coin, "expected coin to be present");
+ abortingCoins.push({
+ coin_pub: coinPub,
+ contribution: Amounts.stringify(
+ purchase.payCoinSelection.coinContributions[i],
+ ),
+ exchange_url: coin.exchangeBaseUrl,
+ });
+ }
});
- }
const abortReq: AbortRequest = {
h_contract: purchase.download.contractData.contractTermsHash,
@@ -678,26 +719,30 @@ export async function abortFailedPayWithRefund(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.purchases], async (tx) => {
- const purchase = await tx.get(Stores.purchases, proposalId);
- if (!purchase) {
- throw Error("purchase not found");
- }
- if (purchase.timestampFirstSuccessfulPay) {
- // No point in aborting it. We don't even report an error.
- logger.warn(`tried to abort successful payment`);
- return;
- }
- if (purchase.abortStatus !== AbortStatus.None) {
- return;
- }
- purchase.refundQueryRequested = true;
- purchase.paymentSubmitPending = false;
- purchase.abortStatus = AbortStatus.AbortRefund;
- purchase.lastPayError = undefined;
- purchase.payRetryInfo = initRetryInfo(false);
- await tx.put(Stores.purchases, purchase);
- });
+ await ws.db
+ .mktx((x) => ({
+ purchases: x.purchases,
+ }))
+ .runReadWrite(async (tx) => {
+ const purchase = await tx.purchases.get(proposalId);
+ if (!purchase) {
+ throw Error("purchase not found");
+ }
+ if (purchase.timestampFirstSuccessfulPay) {
+ // No point in aborting it. We don't even report an error.
+ logger.warn(`tried to abort successful payment`);
+ return;
+ }
+ if (purchase.abortStatus !== AbortStatus.None) {
+ return;
+ }
+ purchase.refundQueryRequested = true;
+ purchase.paymentSubmitPending = false;
+ purchase.abortStatus = AbortStatus.AbortRefund;
+ purchase.lastPayError = undefined;
+ purchase.payRetryInfo = initRetryInfo(false);
+ await tx.purchases.put(purchase);
+ });
processPurchaseQueryRefund(ws, proposalId, true).catch((e) => {
logger.trace(`error during refund processing after abort pay: ${e}`);
});
diff --git a/packages/taler-wallet-core/src/operations/reserves.ts b/packages/taler-wallet-core/src/operations/reserves.ts
index a2482db70..73975fb03 100644
--- a/packages/taler-wallet-core/src/operations/reserves.ts
+++ b/packages/taler-wallet-core/src/operations/reserves.ts
@@ -34,11 +34,11 @@ import {
} from "@gnu-taler/taler-util";
import { randomBytes } from "../crypto/primitives/nacl-fast.js";
import {
- Stores,
ReserveRecordStatus,
ReserveBankInfo,
ReserveRecord,
WithdrawalGroupRecord,
+ WalletStoresV1,
} from "../db.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import { canonicalizeBaseUrl } from "@gnu-taler/taler-util";
@@ -65,9 +65,13 @@ import {
import { getExchangeTrust } from "./currencies.js";
import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto.js";
import { Logger } from "@gnu-taler/taler-util";
-import { readSuccessResponseJsonOrErrorCode, readSuccessResponseJsonOrThrow, throwUnexpectedRequestError } from "../util/http.js";
+import {
+ readSuccessResponseJsonOrErrorCode,
+ readSuccessResponseJsonOrThrow,
+ throwUnexpectedRequestError,
+} from "../util/http.js";
import { URL } from "../util/url.js";
-import { TransactionHandle } from "../util/query.js";
+import { GetReadOnlyAccess } from "../util/query.js";
const logger = new Logger("reserves.ts");
@@ -75,12 +79,17 @@ async function resetReserveRetry(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
- await ws.db.mutate(Stores.reserves, reservePub, (x) => {
- if (x.retryInfo.active) {
- x.retryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const x = await tx.reserves.get(reservePub);
+ if (x && x.retryInfo.active) {
+ x.retryInfo = initRetryInfo();
+ await tx.reserves.put(x);
+ }
+ });
}
/**
@@ -157,17 +166,20 @@ export async function createReserve(
exchangeInfo.exchange,
);
- const resp = await ws.db.runWithWriteTransaction(
- [Stores.exchangeTrustStore, Stores.reserves, Stores.bankWithdrawUris],
- async (tx) => {
+ const resp = await ws.db
+ .mktx((x) => ({
+ exchangeTrust: x.exchangeTrust,
+ reserves: x.reserves,
+ bankWithdrawUris: x.bankWithdrawUris,
+ }))
+ .runReadWrite(async (tx) => {
// Check if we have already created a reserve for that bankWithdrawStatusUrl
if (reserveRecord.bankInfo?.statusUrl) {
- const bwi = await tx.get(
- Stores.bankWithdrawUris,
+ const bwi = await tx.bankWithdrawUris.get(
reserveRecord.bankInfo.statusUrl,
);
if (bwi) {
- const otherReserve = await tx.get(Stores.reserves, bwi.reservePub);
+ const otherReserve = await tx.reserves.get(bwi.reservePub);
if (otherReserve) {
logger.trace(
"returning existing reserve for bankWithdrawStatusUri",
@@ -178,27 +190,26 @@ export async function createReserve(
};
}
}
- await tx.put(Stores.bankWithdrawUris, {
+ await tx.bankWithdrawUris.put({
reservePub: reserveRecord.reservePub,
talerWithdrawUri: reserveRecord.bankInfo.statusUrl,
});
}
if (!isAudited && !isTrusted) {
- await tx.put(Stores.exchangeTrustStore, {
+ await tx.exchangeTrust.put({
currency: reserveRecord.currency,
exchangeBaseUrl: reserveRecord.exchangeBaseUrl,
exchangeMasterPub: exchangeDetails.masterPublicKey,
uids: [encodeCrock(getRandomBytes(32))],
});
}
- await tx.put(Stores.reserves, reserveRecord);
+ await tx.reserves.put(reserveRecord);
const r: CreateReserveResponse = {
exchange: canonExchange,
reservePub: keypair.pub,
};
return r;
- },
- );
+ });
if (reserveRecord.reservePub === resp.reservePub) {
// Only emit notification when a new reserve was created.
@@ -224,23 +235,27 @@ export async function forceQueryReserve(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.reserves], async (tx) => {
- const reserve = await tx.get(Stores.reserves, reservePub);
- if (!reserve) {
- return;
- }
- // Only force status query where it makes sense
- switch (reserve.reserveStatus) {
- case ReserveRecordStatus.DORMANT:
- reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
- break;
- default:
- reserve.requestedQuery = true;
- break;
- }
- reserve.retryInfo = initRetryInfo();
- await tx.put(Stores.reserves, reserve);
- });
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const reserve = await tx.reserves.get(reservePub);
+ if (!reserve) {
+ return;
+ }
+ // Only force status query where it makes sense
+ switch (reserve.reserveStatus) {
+ case ReserveRecordStatus.DORMANT:
+ reserve.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
+ break;
+ default:
+ reserve.requestedQuery = true;
+ break;
+ }
+ reserve.retryInfo = initRetryInfo();
+ await tx.reserves.put(reserve);
+ });
await processReserve(ws, reservePub, true);
}
@@ -270,7 +285,13 @@ async function registerReserveWithBank(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return await tx.reserves.get(reservePub);
+ });
switch (reserve?.reserveStatus) {
case ReserveRecordStatus.WAIT_CONFIRM_BANK:
case ReserveRecordStatus.REGISTERING_BANK:
@@ -297,22 +318,30 @@ async function registerReserveWithBank(
httpResp,
codecForBankWithdrawalOperationPostResponse(),
);
- await ws.db.mutate(Stores.reserves, reservePub, (r) => {
- switch (r.reserveStatus) {
- case ReserveRecordStatus.REGISTERING_BANK:
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- break;
- default:
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.reserves.get(reservePub);
+ if (!r) {
return;
- }
- r.timestampReserveInfoPosted = getTimestampNow();
- r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK;
- if (!r.bankInfo) {
- throw Error("invariant failed");
- }
- r.retryInfo = initRetryInfo();
- return r;
- });
+ }
+ switch (r.reserveStatus) {
+ case ReserveRecordStatus.REGISTERING_BANK:
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ break;
+ default:
+ return;
+ }
+ r.timestampReserveInfoPosted = getTimestampNow();
+ r.reserveStatus = ReserveRecordStatus.WAIT_CONFIRM_BANK;
+ if (!r.bankInfo) {
+ throw Error("invariant failed");
+ }
+ r.retryInfo = initRetryInfo();
+ await tx.reserves.put(r);
+ });
ws.notify({ type: NotificationType.ReserveRegisteredWithBank });
return processReserveBankStatus(ws, reservePub);
}
@@ -340,7 +369,13 @@ async function processReserveBankStatusImpl(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reservePub);
+ });
switch (reserve?.reserveStatus) {
case ReserveRecordStatus.WAIT_CONFIRM_BANK:
case ReserveRecordStatus.REGISTERING_BANK:
@@ -363,20 +398,28 @@ async function processReserveBankStatusImpl(
if (status.aborted) {
logger.trace("bank aborted the withdrawal");
- await ws.db.mutate(Stores.reserves, reservePub, (r) => {
- switch (r.reserveStatus) {
- case ReserveRecordStatus.REGISTERING_BANK:
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- break;
- default:
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.reserves.get(reservePub);
+ if (!r) {
return;
- }
- const now = getTimestampNow();
- r.timestampBankConfirmed = now;
- r.reserveStatus = ReserveRecordStatus.BANK_ABORTED;
- r.retryInfo = initRetryInfo();
- return r;
- });
+ }
+ switch (r.reserveStatus) {
+ case ReserveRecordStatus.REGISTERING_BANK:
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ break;
+ default:
+ return;
+ }
+ const now = getTimestampNow();
+ r.timestampBankConfirmed = now;
+ r.reserveStatus = ReserveRecordStatus.BANK_ABORTED;
+ r.retryInfo = initRetryInfo();
+ await tx.reserves.put(r);
+ });
return;
}
@@ -390,37 +433,40 @@ async function processReserveBankStatusImpl(
return await processReserveBankStatus(ws, reservePub);
}
- if (status.transfer_done) {
- await ws.db.mutate(Stores.reserves, reservePub, (r) => {
- switch (r.reserveStatus) {
- case ReserveRecordStatus.REGISTERING_BANK:
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- break;
- default:
- return;
- }
- const now = getTimestampNow();
- r.timestampBankConfirmed = now;
- r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
- r.retryInfo = initRetryInfo();
- return r;
- });
- await processReserveImpl(ws, reservePub, true);
- } else {
- await ws.db.mutate(Stores.reserves, reservePub, (r) => {
- switch (r.reserveStatus) {
- case ReserveRecordStatus.WAIT_CONFIRM_BANK:
- break;
- default:
- return;
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.reserves.get(reservePub);
+ if (!r) {
+ return;
}
- if (r.bankInfo) {
- r.bankInfo.confirmUrl = status.confirm_transfer_url;
+ if (status.transfer_done) {
+ switch (r.reserveStatus) {
+ case ReserveRecordStatus.REGISTERING_BANK:
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ break;
+ default:
+ return;
+ }
+ const now = getTimestampNow();
+ r.timestampBankConfirmed = now;
+ r.reserveStatus = ReserveRecordStatus.QUERYING_STATUS;
+ r.retryInfo = initRetryInfo();
+ } else {
+ switch (r.reserveStatus) {
+ case ReserveRecordStatus.WAIT_CONFIRM_BANK:
+ break;
+ default:
+ return;
+ }
+ if (r.bankInfo) {
+ r.bankInfo.confirmUrl = status.confirm_transfer_url;
+ }
}
- return r;
+ await tx.reserves.put(r);
});
- await incrementReserveRetry(ws, reservePub, undefined);
- }
}
async function incrementReserveRetry(
@@ -428,19 +474,23 @@ async function incrementReserveRetry(
reservePub: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.reserves], async (tx) => {
- const r = await tx.get(Stores.reserves, reservePub);
- if (!r) {
- return;
- }
- if (!r.retryInfo) {
- return;
- }
- r.retryInfo.retryCounter++;
- updateRetryInfoTimeout(r.retryInfo);
- r.lastError = err;
- await tx.put(Stores.reserves, r);
- });
+ await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const r = await tx.reserves.get(reservePub);
+ if (!r) {
+ return;
+ }
+ if (!r.retryInfo) {
+ return;
+ }
+ r.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(r.retryInfo);
+ r.lastError = err;
+ await tx.reserves.put(r);
+ });
if (err) {
ws.notify({
type: NotificationType.ReserveOperationError,
@@ -461,7 +511,13 @@ async function updateReserve(
ws: InternalWalletState,
reservePub: string,
): Promise<{ ready: boolean }> {
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reservePub);
+ });
if (!reserve) {
throw Error("reserve not in db");
}
@@ -508,10 +564,15 @@ async function updateReserve(
reserve.exchangeBaseUrl,
);
- const newWithdrawalGroup = await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.planchets, Stores.withdrawalGroups, Stores.reserves],
- async (tx) => {
- const newReserve = await tx.get(Stores.reserves, reserve.reservePub);
+ const newWithdrawalGroup = await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ planchets: x.planchets,
+ withdrawalGroups: x.withdrawalGroups,
+ reserves: x.reserves,
+ }))
+ .runReadWrite(async (tx) => {
+ const newReserve = await tx.reserves.get(reserve.reservePub);
if (!newReserve) {
return;
}
@@ -519,8 +580,8 @@ async function updateReserve(
let amountReserveMinus = Amounts.getZero(currency);
// Subtract withdrawal groups for this reserve from the available amount.
- await tx
- .iterIndexed(Stores.withdrawalGroups.byReservePub, reservePub)
+ await tx.withdrawalGroups.indexes.byReservePub
+ .iter(reservePub)
.forEach((wg) => {
const cost = wg.denomsSel.totalWithdrawCost;
amountReserveMinus = Amounts.add(amountReserveMinus, cost).amount;
@@ -549,16 +610,14 @@ async function updateReserve(
case ReserveTransactionType.Withdraw: {
// Now we check if the withdrawal transaction
// is part of any withdrawal known to this wallet.
- const planchet = await tx.getIndexed(
- Stores.planchets.coinEvHashIndex,
+ const planchet = await tx.planchets.indexes.byCoinEvHash.get(
entry.h_coin_envelope,
);
if (planchet) {
// Amount is already accounted in some withdrawal session
break;
}
- const coin = await tx.getIndexed(
- Stores.coins.coinEvHashIndex,
+ const coin = await tx.coins.indexes.byCoinEvHash.get(
entry.h_coin_envelope,
);
if (coin) {
@@ -594,7 +653,7 @@ async function updateReserve(
newReserve.reserveStatus = ReserveRecordStatus.DORMANT;
newReserve.lastError = undefined;
newReserve.retryInfo = initRetryInfo(false);
- await tx.put(Stores.reserves, newReserve);
+ await tx.reserves.put(newReserve);
return;
}
@@ -624,11 +683,10 @@ async function updateReserve(
newReserve.retryInfo = initRetryInfo(false);
newReserve.reserveStatus = ReserveRecordStatus.DORMANT;
- await tx.put(Stores.reserves, newReserve);
- await tx.put(Stores.withdrawalGroups, withdrawalRecord);
+ await tx.reserves.put(newReserve);
+ await tx.withdrawalGroups.put(withdrawalRecord);
return withdrawalRecord;
- },
- );
+ });
if (newWithdrawalGroup) {
logger.trace("processing new withdraw group");
@@ -647,7 +705,13 @@ async function processReserveImpl(
reservePub: string,
forceNow = false,
): Promise<void> {
- const reserve = await ws.db.get(Stores.reserves, reservePub);
+ const reserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reservePub);
+ });
if (!reserve) {
logger.trace("not processing reserve: reserve does not exist");
return;
@@ -712,7 +776,13 @@ export async function createTalerWithdrawReserve(
// We do this here, as the reserve should be registered before we return,
// so that we can redirect the user to the bank's status page.
await processReserveBankStatus(ws, reserve.reservePub);
- const processedReserve = await ws.db.get(Stores.reserves, reserve.reservePub);
+ const processedReserve = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.reserves.get(reserve.reservePub);
+ });
if (processedReserve?.reserveStatus === ReserveRecordStatus.BANK_ABORTED) {
throw OperationFailedError.fromCode(
TalerErrorCode.WALLET_WITHDRAWAL_OPERATION_ABORTED_BY_BANK,
@@ -730,14 +800,14 @@ export async function createTalerWithdrawReserve(
* Get payto URIs needed to fund a reserve.
*/
export async function getFundingPaytoUris(
- tx: TransactionHandle<
- | typeof Stores.reserves
- | typeof Stores.exchanges
- | typeof Stores.exchangeDetails
- >,
+ tx: GetReadOnlyAccess<{
+ reserves: typeof WalletStoresV1.reserves;
+ exchanges: typeof WalletStoresV1.exchanges;
+ exchangeDetails: typeof WalletStoresV1.exchangeDetails;
+ }>,
reservePub: string,
): Promise<string[]> {
- const r = await tx.get(Stores.reserves, reservePub);
+ const r = await tx.reserves.get(reservePub);
if (!r) {
logger.error(`reserve ${reservePub} not found (DB corrupted?)`);
return [];
diff --git a/packages/taler-wallet-core/src/operations/state.ts b/packages/taler-wallet-core/src/operations/state.ts
index 0d07f293d..9bf73142c 100644
--- a/packages/taler-wallet-core/src/operations/state.ts
+++ b/packages/taler-wallet-core/src/operations/state.ts
@@ -17,12 +17,22 @@
/**
* Imports.
*/
-import { WalletNotification, BalancesResponse, Logger } from "@gnu-taler/taler-util";
-import { Stores } from "../db.js";
-import { CryptoApi, OpenedPromise, Database, CryptoWorkerFactory, openPromise } from "../index.js";
+import {
+ WalletNotification,
+ BalancesResponse,
+ Logger,
+} from "@gnu-taler/taler-util";
+import { WalletStoresV1 } from "../db.js";
+import {
+ CryptoApi,
+ OpenedPromise,
+ CryptoWorkerFactory,
+ openPromise,
+} from "../index.js";
import { PendingOperationsResponse } from "../pending-types.js";
import { AsyncOpMemoMap, AsyncOpMemoSingle } from "../util/asyncMemo.js";
import { HttpRequestLibrary } from "../util/http";
+import { DbAccess } from "../util/query.js";
type NotificationListener = (n: WalletNotification) => void;
@@ -34,9 +44,7 @@ export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock";
export class InternalWalletState {
memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
- memoGetPending: AsyncOpMemoSingle<
- PendingOperationsResponse
- > = new AsyncOpMemoSingle();
+ memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> = new AsyncOpMemoSingle();
memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle();
memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
@@ -60,7 +68,7 @@ export class InternalWalletState {
// the actual value nullable.
// Check if we are in a DB migration / garbage collection
// and throw an error in that case.
- public db: Database<typeof Stores>,
+ public db: DbAccess<typeof WalletStoresV1>,
public http: HttpRequestLibrary,
cryptoWorkerFactory: CryptoWorkerFactory,
) {
diff --git a/packages/taler-wallet-core/src/operations/tip.ts b/packages/taler-wallet-core/src/operations/tip.ts
index f9d7a024d..e9659248d 100644
--- a/packages/taler-wallet-core/src/operations/tip.ts
+++ b/packages/taler-wallet-core/src/operations/tip.ts
@@ -32,7 +32,6 @@ import {
} from "@gnu-taler/taler-util";
import { DerivedTipPlanchet } from "../crypto/cryptoTypes.js";
import {
- Stores,
DenominationRecord,
CoinRecord,
CoinSourceType,
@@ -70,10 +69,16 @@ export async function prepareTip(
throw Error("invalid taler://tip URI");
}
- let tipRecord = await ws.db.getIndexed(
- Stores.tips.byMerchantTipIdAndBaseUrl,
- [res.merchantTipId, res.merchantBaseUrl],
- );
+ let tipRecord = await ws.db
+ .mktx((x) => ({
+ tips: x.tips,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.tips.indexes.byMerchantTipIdAndBaseUrl.get([
+ res.merchantTipId,
+ res.merchantBaseUrl,
+ ]);
+ });
if (!tipRecord) {
const tipStatusUrl = new URL(
@@ -109,7 +114,7 @@ export async function prepareTip(
const secretSeed = encodeCrock(getRandomBytes(64));
const denomSelUid = encodeCrock(getRandomBytes(32));
- tipRecord = {
+ const newTipRecord = {
walletTipId: walletTipId,
acceptedTimestamp: undefined,
tipAmountRaw: amount,
@@ -130,7 +135,14 @@ export async function prepareTip(
secretSeed,
denomSelUid,
};
- await ws.db.put(Stores.tips, tipRecord);
+ await ws.db
+ .mktx((x) => ({
+ tips: x.tips,
+ }))
+ .runReadWrite(async (tx) => {
+ await tx.tips.put(newTipRecord);
+ });
+ tipRecord = newTipRecord;
}
const tipStatus: PrepareTipResult = {
@@ -151,19 +163,23 @@ async function incrementTipRetry(
walletTipId: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.tips], async (tx) => {
- const t = await tx.get(Stores.tips, walletTipId);
- if (!t) {
- return;
- }
- if (!t.retryInfo) {
- return;
- }
- t.retryInfo.retryCounter++;
- updateRetryInfoTimeout(t.retryInfo);
- t.lastError = err;
- await tx.put(Stores.tips, t);
- });
+ await ws.db
+ .mktx((x) => ({
+ tips: x.tips,
+ }))
+ .runReadWrite(async (tx) => {
+ const t = await tx.tips.get(walletTipId);
+ if (!t) {
+ return;
+ }
+ if (!t.retryInfo) {
+ return;
+ }
+ t.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(t.retryInfo);
+ t.lastError = err;
+ await tx.tips.put(t);
+ });
if (err) {
ws.notify({ type: NotificationType.TipOperationError, error: err });
}
@@ -186,12 +202,17 @@ async function resetTipRetry(
ws: InternalWalletState,
tipId: string,
): Promise<void> {
- await ws.db.mutate(Stores.tips, tipId, (x) => {
- if (x.retryInfo.active) {
- x.retryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({
+ tips: x.tips,
+ }))
+ .runReadWrite(async (tx) => {
+ const x = await tx.tips.get(tipId);
+ if (x && x.retryInfo.active) {
+ x.retryInfo = initRetryInfo();
+ await tx.tips.put(x);
+ }
+ });
}
async function processTipImpl(
@@ -202,7 +223,13 @@ async function processTipImpl(
if (forceNow) {
await resetTipRetry(ws, walletTipId);
}
- let tipRecord = await ws.db.get(Stores.tips, walletTipId);
+ const tipRecord = await ws.db
+ .mktx((x) => ({
+ tips: x.tips,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.tips.get(walletTipId);
+ });
if (!tipRecord) {
return;
}
@@ -214,19 +241,22 @@ async function processTipImpl(
const denomsForWithdraw = tipRecord.denomsSel;
- tipRecord = await ws.db.get(Stores.tips, walletTipId);
- checkDbInvariant(!!tipRecord, "tip record should be in database");
-
const planchets: DerivedTipPlanchet[] = [];
// Planchets in the form that the merchant expects
const planchetsDetail: TipPlanchetDetail[] = [];
const denomForPlanchet: { [index: number]: DenominationRecord } = [];
for (const dh of denomsForWithdraw.selectedDenoms) {
- const denom = await ws.db.get(Stores.denominations, [
- tipRecord.exchangeBaseUrl,
- dh.denomPubHash,
- ]);
+ const denom = await ws.db
+ .mktx((x) => ({
+ denominations: x.denominations,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.denominations.get([
+ tipRecord.exchangeBaseUrl,
+ dh.denomPubHash,
+ ]);
+ });
checkDbInvariant(!!denom, "denomination should be in database");
for (let i = 0; i < dh.count; i++) {
const deriveReq = {
@@ -306,18 +336,20 @@ async function processTipImpl(
);
if (!isValid) {
- await ws.db.runWithWriteTransaction([Stores.tips], async (tx) => {
- const tipRecord = await tx.get(Stores.tips, walletTipId);
- if (!tipRecord) {
- return;
- }
- tipRecord.lastError = makeErrorDetails(
- TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID,
- "invalid signature from the exchange (via merchant tip) after unblinding",
- {},
- );
- await tx.put(Stores.tips, tipRecord);
- });
+ await ws.db
+ .mktx((x) => ({ tips: x.tips }))
+ .runReadWrite(async (tx) => {
+ const tipRecord = await tx.tips.get(walletTipId);
+ if (!tipRecord) {
+ return;
+ }
+ tipRecord.lastError = makeErrorDetails(
+ TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID,
+ "invalid signature from the exchange (via merchant tip) after unblinding",
+ {},
+ );
+ await tx.tips.put(tipRecord);
+ });
return;
}
@@ -341,10 +373,14 @@ async function processTipImpl(
});
}
- await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.tips, Stores.withdrawalGroups],
- async (tx) => {
- const tr = await tx.get(Stores.tips, walletTipId);
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ tips: x.tips,
+ withdrawalGroups: x.withdrawalGroups,
+ }))
+ .runReadWrite(async (tx) => {
+ const tr = await tx.tips.get(walletTipId);
if (!tr) {
return;
}
@@ -354,27 +390,32 @@ async function processTipImpl(
tr.pickedUpTimestamp = getTimestampNow();
tr.lastError = undefined;
tr.retryInfo = initRetryInfo(false);
- await tx.put(Stores.tips, tr);
+ await tx.tips.put(tr);
for (const cr of newCoinRecords) {
- await tx.put(Stores.coins, cr);
+ await tx.coins.put(cr);
}
- },
- );
+ });
}
export async function acceptTip(
ws: InternalWalletState,
tipId: string,
): Promise<void> {
- const tipRecord = await ws.db.get(Stores.tips, tipId);
- if (!tipRecord) {
- logger.error("tip not found");
- return;
+ const found = await ws.db
+ .mktx((x) => ({
+ tips: x.tips,
+ }))
+ .runReadWrite(async (tx) => {
+ const tipRecord = await tx.tips.get(tipId);
+ if (!tipRecord) {
+ logger.error("tip not found");
+ return false;
+ }
+ tipRecord.acceptedTimestamp = getTimestampNow();
+ await tx.tips.put(tipRecord);
+ return true;
+ });
+ if (found) {
+ await processTip(ws, tipId);
}
-
- tipRecord.acceptedTimestamp = getTimestampNow();
- await ws.db.put(Stores.tips, tipRecord);
-
- await processTip(ws, tipId);
- return;
}
diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts
index 42ed2d2ec..ecef3c2ce 100644
--- a/packages/taler-wallet-core/src/operations/transactions.ts
+++ b/packages/taler-wallet-core/src/operations/transactions.ts
@@ -19,7 +19,6 @@
*/
import { InternalWalletState } from "./state";
import {
- Stores,
WalletRefundItem,
RefundState,
ReserveRecordStatus,
@@ -85,296 +84,300 @@ export async function getTransactions(
): Promise<TransactionsResponse> {
const transactions: Transaction[] = [];
- await ws.db.runWithReadTransaction(
- [
- Stores.coins,
- Stores.denominations,
- Stores.exchanges,
- Stores.exchangeDetails,
- Stores.proposals,
- Stores.purchases,
- Stores.refreshGroups,
- Stores.reserves,
- Stores.tips,
- Stores.withdrawalGroups,
- Stores.planchets,
- Stores.recoupGroups,
- Stores.depositGroups,
- Stores.tombstones,
- ],
- // Report withdrawals that are currently in progress.
- async (tx) => {
- tx.iter(Stores.withdrawalGroups).forEachAsync(async (wsr) => {
- if (
- shouldSkipCurrency(
- transactionsRequest,
- wsr.rawWithdrawalAmount.currency,
- )
- ) {
- return;
- }
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ denominations: x.denominations,
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ proposals: x.proposals,
+ purchases: x.purchases,
+ refreshGroups: x.refreshGroups,
+ reserves: x.reserves,
+ tips: x.tips,
+ withdrawalGroups: x.withdrawalGroups,
+ planchets: x.planchets,
+ recoupGroups: x.recoupGroups,
+ depositGroups: x.depositGroups,
+ tombstones: x.tombstones,
+ }))
+ .runReadOnly(
+ // Report withdrawals that are currently in progress.
+ async (tx) => {
+ tx.withdrawalGroups.iter().forEachAsync(async (wsr) => {
+ if (
+ shouldSkipCurrency(
+ transactionsRequest,
+ wsr.rawWithdrawalAmount.currency,
+ )
+ ) {
+ return;
+ }
- if (shouldSkipSearch(transactionsRequest, [])) {
- return;
- }
+ if (shouldSkipSearch(transactionsRequest, [])) {
+ return;
+ }
- const r = await tx.get(Stores.reserves, wsr.reservePub);
- if (!r) {
- return;
- }
- let amountRaw: AmountJson | undefined = undefined;
- if (wsr.withdrawalGroupId === r.initialWithdrawalGroupId) {
- amountRaw = r.instructedAmount;
- } else {
- amountRaw = wsr.denomsSel.totalWithdrawCost;
- }
- let withdrawalDetails: WithdrawalDetails;
- if (r.bankInfo) {
- withdrawalDetails = {
- type: WithdrawalType.TalerBankIntegrationApi,
- confirmed: true,
- bankConfirmationUrl: r.bankInfo.confirmUrl,
- };
- } else {
- const exchangeDetails = await getExchangeDetails(
- tx,
- wsr.exchangeBaseUrl,
- );
- if (!exchangeDetails) {
- // FIXME: report somehow
+ const r = await tx.reserves.get(wsr.reservePub);
+ if (!r) {
return;
}
- withdrawalDetails = {
- type: WithdrawalType.ManualTransfer,
- exchangePaytoUris:
- exchangeDetails.wireInfo?.accounts.map((x) => x.payto_uri) ?? [],
- };
- }
- transactions.push({
- type: TransactionType.Withdrawal,
- amountEffective: Amounts.stringify(wsr.denomsSel.totalCoinValue),
- amountRaw: Amounts.stringify(amountRaw),
- withdrawalDetails,
- exchangeBaseUrl: wsr.exchangeBaseUrl,
- pending: !wsr.timestampFinish,
- timestamp: wsr.timestampStart,
- transactionId: makeEventId(
- TransactionType.Withdrawal,
- wsr.withdrawalGroupId,
- ),
- ...(wsr.lastError ? { error: wsr.lastError } : {}),
+ let amountRaw: AmountJson | undefined = undefined;
+ if (wsr.withdrawalGroupId === r.initialWithdrawalGroupId) {
+ amountRaw = r.instructedAmount;
+ } else {
+ amountRaw = wsr.denomsSel.totalWithdrawCost;
+ }
+ let withdrawalDetails: WithdrawalDetails;
+ if (r.bankInfo) {
+ withdrawalDetails = {
+ type: WithdrawalType.TalerBankIntegrationApi,
+ confirmed: true,
+ bankConfirmationUrl: r.bankInfo.confirmUrl,
+ };
+ } else {
+ const exchangeDetails = await getExchangeDetails(
+ tx,
+ wsr.exchangeBaseUrl,
+ );
+ if (!exchangeDetails) {
+ // FIXME: report somehow
+ return;
+ }
+ withdrawalDetails = {
+ type: WithdrawalType.ManualTransfer,
+ exchangePaytoUris:
+ exchangeDetails.wireInfo?.accounts.map((x) => x.payto_uri) ??
+ [],
+ };
+ }
+ transactions.push({
+ type: TransactionType.Withdrawal,
+ amountEffective: Amounts.stringify(wsr.denomsSel.totalCoinValue),
+ amountRaw: Amounts.stringify(amountRaw),
+ withdrawalDetails,
+ exchangeBaseUrl: wsr.exchangeBaseUrl,
+ pending: !wsr.timestampFinish,
+ timestamp: wsr.timestampStart,
+ transactionId: makeEventId(
+ TransactionType.Withdrawal,
+ wsr.withdrawalGroupId,
+ ),
+ ...(wsr.lastError ? { error: wsr.lastError } : {}),
+ });
});
- });
- // Report pending withdrawals based on reserves that
- // were created, but where the actual withdrawal group has
- // not started yet.
- tx.iter(Stores.reserves).forEachAsync(async (r) => {
- if (shouldSkipCurrency(transactionsRequest, r.currency)) {
- return;
- }
- if (shouldSkipSearch(transactionsRequest, [])) {
- return;
- }
- if (r.initialWithdrawalStarted) {
- return;
- }
- if (r.reserveStatus === ReserveRecordStatus.BANK_ABORTED) {
- return;
- }
- let withdrawalDetails: WithdrawalDetails;
- if (r.bankInfo) {
- withdrawalDetails = {
- type: WithdrawalType.TalerBankIntegrationApi,
- confirmed: false,
- bankConfirmationUrl: r.bankInfo.confirmUrl,
- };
- } else {
- withdrawalDetails = {
- type: WithdrawalType.ManualTransfer,
- exchangePaytoUris: await getFundingPaytoUris(tx, r.reservePub),
- };
- }
- transactions.push({
- type: TransactionType.Withdrawal,
- amountRaw: Amounts.stringify(r.instructedAmount),
- amountEffective: Amounts.stringify(r.initialDenomSel.totalCoinValue),
- exchangeBaseUrl: r.exchangeBaseUrl,
- pending: true,
- timestamp: r.timestampCreated,
- withdrawalDetails: withdrawalDetails,
- transactionId: makeEventId(
- TransactionType.Withdrawal,
- r.initialWithdrawalGroupId,
- ),
- ...(r.lastError ? { error: r.lastError } : {}),
+ // Report pending withdrawals based on reserves that
+ // were created, but where the actual withdrawal group has
+ // not started yet.
+ tx.reserves.iter().forEachAsync(async (r) => {
+ if (shouldSkipCurrency(transactionsRequest, r.currency)) {
+ return;
+ }
+ if (shouldSkipSearch(transactionsRequest, [])) {
+ return;
+ }
+ if (r.initialWithdrawalStarted) {
+ return;
+ }
+ if (r.reserveStatus === ReserveRecordStatus.BANK_ABORTED) {
+ return;
+ }
+ let withdrawalDetails: WithdrawalDetails;
+ if (r.bankInfo) {
+ withdrawalDetails = {
+ type: WithdrawalType.TalerBankIntegrationApi,
+ confirmed: false,
+ bankConfirmationUrl: r.bankInfo.confirmUrl,
+ };
+ } else {
+ withdrawalDetails = {
+ type: WithdrawalType.ManualTransfer,
+ exchangePaytoUris: await getFundingPaytoUris(tx, r.reservePub),
+ };
+ }
+ transactions.push({
+ type: TransactionType.Withdrawal,
+ amountRaw: Amounts.stringify(r.instructedAmount),
+ amountEffective: Amounts.stringify(
+ r.initialDenomSel.totalCoinValue,
+ ),
+ exchangeBaseUrl: r.exchangeBaseUrl,
+ pending: true,
+ timestamp: r.timestampCreated,
+ withdrawalDetails: withdrawalDetails,
+ transactionId: makeEventId(
+ TransactionType.Withdrawal,
+ r.initialWithdrawalGroupId,
+ ),
+ ...(r.lastError ? { error: r.lastError } : {}),
+ });
});
- });
- tx.iter(Stores.depositGroups).forEachAsync(async (dg) => {
- const amount = Amounts.parseOrThrow(dg.contractTermsRaw.amount);
- if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
- return;
- }
-
- transactions.push({
- type: TransactionType.Deposit,
- amountRaw: Amounts.stringify(dg.effectiveDepositAmount),
- amountEffective: Amounts.stringify(dg.totalPayCost),
- pending: !dg.timestampFinished,
- timestamp: dg.timestampCreated,
- targetPaytoUri: dg.wire.payto_uri,
- transactionId: makeEventId(
- TransactionType.Deposit,
- dg.depositGroupId,
- ),
- depositGroupId: dg.depositGroupId,
- ...(dg.lastError ? { error: dg.lastError } : {}),
- });
- });
+ tx.depositGroups.iter().forEachAsync(async (dg) => {
+ const amount = Amounts.parseOrThrow(dg.contractTermsRaw.amount);
+ if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
+ return;
+ }
- tx.iter(Stores.purchases).forEachAsync(async (pr) => {
- if (
- shouldSkipCurrency(
- transactionsRequest,
- pr.download.contractData.amount.currency,
- )
- ) {
- return;
- }
- const contractData = pr.download.contractData;
- if (shouldSkipSearch(transactionsRequest, [contractData.summary])) {
- return;
- }
- const proposal = await tx.get(Stores.proposals, pr.proposalId);
- if (!proposal) {
- return;
- }
- const info: OrderShortInfo = {
- merchant: contractData.merchant,
- orderId: contractData.orderId,
- products: contractData.products,
- summary: contractData.summary,
- summary_i18n: contractData.summaryI18n,
- contractTermsHash: contractData.contractTermsHash,
- };
- if (contractData.fulfillmentUrl !== "") {
- info.fulfillmentUrl = contractData.fulfillmentUrl;
- }
- const paymentTransactionId = makeEventId(
- TransactionType.Payment,
- pr.proposalId,
- );
- const err = pr.lastPayError ?? pr.lastRefundStatusError;
- transactions.push({
- type: TransactionType.Payment,
- amountRaw: Amounts.stringify(contractData.amount),
- amountEffective: Amounts.stringify(pr.totalPayCost),
- status: pr.timestampFirstSuccessfulPay
- ? PaymentStatus.Paid
- : PaymentStatus.Accepted,
- pending:
- !pr.timestampFirstSuccessfulPay &&
- pr.abortStatus === AbortStatus.None,
- timestamp: pr.timestampAccept,
- transactionId: paymentTransactionId,
- proposalId: pr.proposalId,
- info: info,
- ...(err ? { error: err } : {}),
+ transactions.push({
+ type: TransactionType.Deposit,
+ amountRaw: Amounts.stringify(dg.effectiveDepositAmount),
+ amountEffective: Amounts.stringify(dg.totalPayCost),
+ pending: !dg.timestampFinished,
+ timestamp: dg.timestampCreated,
+ targetPaytoUri: dg.wire.payto_uri,
+ transactionId: makeEventId(
+ TransactionType.Deposit,
+ dg.depositGroupId,
+ ),
+ depositGroupId: dg.depositGroupId,
+ ...(dg.lastError ? { error: dg.lastError } : {}),
+ });
});
- const refundGroupKeys = new Set<string>();
-
- for (const rk of Object.keys(pr.refunds)) {
- const refund = pr.refunds[rk];
- const groupKey = `${refund.executionTime.t_ms}`;
- refundGroupKeys.add(groupKey);
- }
-
- for (const groupKey of refundGroupKeys.values()) {
- const refundTombstoneId = makeEventId(
- TombstoneTag.DeleteRefund,
- pr.proposalId,
- groupKey,
- );
- const tombstone = await tx.get(Stores.tombstones, refundTombstoneId);
- if (tombstone) {
- continue;
+ tx.purchases.iter().forEachAsync(async (pr) => {
+ if (
+ shouldSkipCurrency(
+ transactionsRequest,
+ pr.download.contractData.amount.currency,
+ )
+ ) {
+ return;
}
- const refundTransactionId = makeEventId(
- TransactionType.Refund,
+ const contractData = pr.download.contractData;
+ if (shouldSkipSearch(transactionsRequest, [contractData.summary])) {
+ return;
+ }
+ const proposal = await tx.proposals.get(pr.proposalId);
+ if (!proposal) {
+ return;
+ }
+ const info: OrderShortInfo = {
+ merchant: contractData.merchant,
+ orderId: contractData.orderId,
+ products: contractData.products,
+ summary: contractData.summary,
+ summary_i18n: contractData.summaryI18n,
+ contractTermsHash: contractData.contractTermsHash,
+ };
+ if (contractData.fulfillmentUrl !== "") {
+ info.fulfillmentUrl = contractData.fulfillmentUrl;
+ }
+ const paymentTransactionId = makeEventId(
+ TransactionType.Payment,
pr.proposalId,
- groupKey,
);
- let r0: WalletRefundItem | undefined;
- let amountRaw = Amounts.getZero(contractData.amount.currency);
- let amountEffective = Amounts.getZero(contractData.amount.currency);
+ const err = pr.lastPayError ?? pr.lastRefundStatusError;
+ transactions.push({
+ type: TransactionType.Payment,
+ amountRaw: Amounts.stringify(contractData.amount),
+ amountEffective: Amounts.stringify(pr.totalPayCost),
+ status: pr.timestampFirstSuccessfulPay
+ ? PaymentStatus.Paid
+ : PaymentStatus.Accepted,
+ pending:
+ !pr.timestampFirstSuccessfulPay &&
+ pr.abortStatus === AbortStatus.None,
+ timestamp: pr.timestampAccept,
+ transactionId: paymentTransactionId,
+ proposalId: pr.proposalId,
+ info: info,
+ ...(err ? { error: err } : {}),
+ });
+
+ const refundGroupKeys = new Set<string>();
+
for (const rk of Object.keys(pr.refunds)) {
const refund = pr.refunds[rk];
- const myGroupKey = `${refund.executionTime.t_ms}`;
- if (myGroupKey !== groupKey) {
+ const groupKey = `${refund.executionTime.t_ms}`;
+ refundGroupKeys.add(groupKey);
+ }
+
+ for (const groupKey of refundGroupKeys.values()) {
+ const refundTombstoneId = makeEventId(
+ TombstoneTag.DeleteRefund,
+ pr.proposalId,
+ groupKey,
+ );
+ const tombstone = await tx.tombstones.get(refundTombstoneId);
+ if (tombstone) {
continue;
}
+ const refundTransactionId = makeEventId(
+ TransactionType.Refund,
+ pr.proposalId,
+ groupKey,
+ );
+ let r0: WalletRefundItem | undefined;
+ let amountRaw = Amounts.getZero(contractData.amount.currency);
+ let amountEffective = Amounts.getZero(contractData.amount.currency);
+ for (const rk of Object.keys(pr.refunds)) {
+ const refund = pr.refunds[rk];
+ const myGroupKey = `${refund.executionTime.t_ms}`;
+ if (myGroupKey !== groupKey) {
+ continue;
+ }
+ if (!r0) {
+ r0 = refund;
+ }
+
+ if (refund.type === RefundState.Applied) {
+ amountRaw = Amounts.add(amountRaw, refund.refundAmount).amount;
+ amountEffective = Amounts.add(
+ amountEffective,
+ Amounts.sub(
+ refund.refundAmount,
+ refund.refundFee,
+ refund.totalRefreshCostBound,
+ ).amount,
+ ).amount;
+ }
+ }
if (!r0) {
- r0 = refund;
+ throw Error("invariant violated");
}
+ transactions.push({
+ type: TransactionType.Refund,
+ info,
+ refundedTransactionId: paymentTransactionId,
+ transactionId: refundTransactionId,
+ timestamp: r0.obtainedTime,
+ amountEffective: Amounts.stringify(amountEffective),
+ amountRaw: Amounts.stringify(amountRaw),
+ pending: false,
+ });
+ }
+ });
- if (refund.type === RefundState.Applied) {
- amountRaw = Amounts.add(amountRaw, refund.refundAmount).amount;
- amountEffective = Amounts.add(
- amountEffective,
- Amounts.sub(
- refund.refundAmount,
- refund.refundFee,
- refund.totalRefreshCostBound,
- ).amount,
- ).amount;
- }
+ tx.tips.iter().forEachAsync(async (tipRecord) => {
+ if (
+ shouldSkipCurrency(
+ transactionsRequest,
+ tipRecord.tipAmountRaw.currency,
+ )
+ ) {
+ return;
}
- if (!r0) {
- throw Error("invariant violated");
+ if (!tipRecord.acceptedTimestamp) {
+ return;
}
transactions.push({
- type: TransactionType.Refund,
- info,
- refundedTransactionId: paymentTransactionId,
- transactionId: refundTransactionId,
- timestamp: r0.obtainedTime,
- amountEffective: Amounts.stringify(amountEffective),
- amountRaw: Amounts.stringify(amountRaw),
- pending: false,
+ type: TransactionType.Tip,
+ amountEffective: Amounts.stringify(tipRecord.tipAmountEffective),
+ amountRaw: Amounts.stringify(tipRecord.tipAmountRaw),
+ pending: !tipRecord.pickedUpTimestamp,
+ timestamp: tipRecord.acceptedTimestamp,
+ transactionId: makeEventId(
+ TransactionType.Tip,
+ tipRecord.walletTipId,
+ ),
+ merchantBaseUrl: tipRecord.merchantBaseUrl,
+ error: tipRecord.lastError,
});
- }
- });
-
- tx.iter(Stores.tips).forEachAsync(async (tipRecord) => {
- if (
- shouldSkipCurrency(
- transactionsRequest,
- tipRecord.tipAmountRaw.currency,
- )
- ) {
- return;
- }
- if (!tipRecord.acceptedTimestamp) {
- return;
- }
- transactions.push({
- type: TransactionType.Tip,
- amountEffective: Amounts.stringify(tipRecord.tipAmountEffective),
- amountRaw: Amounts.stringify(tipRecord.tipAmountRaw),
- pending: !tipRecord.pickedUpTimestamp,
- timestamp: tipRecord.acceptedTimestamp,
- transactionId: makeEventId(
- TransactionType.Tip,
- tipRecord.walletTipId,
- ),
- merchantBaseUrl: tipRecord.merchantBaseUrl,
- error: tipRecord.lastError,
});
- });
- },
- );
+ },
+ );
const txPending = transactions.filter((x) => x.pending);
const txNotPending = transactions.filter((x) => !x.pending);
@@ -406,110 +409,126 @@ export async function deleteTransaction(
if (type === TransactionType.Withdrawal) {
const withdrawalGroupId = rest[0];
- await ws.db.runWithWriteTransaction(
- [Stores.withdrawalGroups, Stores.reserves, Stores.tombstones],
- async (tx) => {
- const withdrawalGroupRecord = await tx.get(
- Stores.withdrawalGroups,
+ await ws.db
+ .mktx((x) => ({
+ withdrawalGroups: x.withdrawalGroups,
+ reserves: x.reserves,
+ tombstones: x.tombstones,
+ }))
+ .runReadWrite(async (tx) => {
+ const withdrawalGroupRecord = await tx.withdrawalGroups.get(
withdrawalGroupId,
);
if (withdrawalGroupRecord) {
- await tx.delete(Stores.withdrawalGroups, withdrawalGroupId);
- await tx.put(Stores.tombstones, {
+ await tx.withdrawalGroups.delete(withdrawalGroupId);
+ await tx.tombstones.put({
id: TombstoneTag.DeleteWithdrawalGroup + ":" + withdrawalGroupId,
});
return;
}
- const reserveRecord: ReserveRecord | undefined = await tx.getIndexed(
- Stores.reserves.byInitialWithdrawalGroupId,
+ const reserveRecord:
+ | ReserveRecord
+ | undefined = await tx.reserves.indexes.byInitialWithdrawalGroupId.get(
withdrawalGroupId,
);
if (reserveRecord && !reserveRecord.initialWithdrawalStarted) {
const reservePub = reserveRecord.reservePub;
- await tx.delete(Stores.reserves, reservePub);
- await tx.put(Stores.tombstones, {
+ await tx.reserves.delete(reservePub);
+ await tx.tombstones.put({
id: TombstoneTag.DeleteReserve + ":" + reservePub,
});
}
- },
- );
+ });
} else if (type === TransactionType.Payment) {
const proposalId = rest[0];
- await ws.db.runWithWriteTransaction(
- [Stores.proposals, Stores.purchases, Stores.tombstones],
- async (tx) => {
+ await ws.db
+ .mktx((x) => ({
+ proposals: x.proposals,
+ purchases: x.purchases,
+ tombstones: x.tombstones,
+ }))
+ .runReadWrite(async (tx) => {
let found = false;
- const proposal = await tx.get(Stores.proposals, proposalId);
+ const proposal = await tx.proposals.get(proposalId);
if (proposal) {
found = true;
- await tx.delete(Stores.proposals, proposalId);
+ await tx.proposals.delete(proposalId);
}
- const purchase = await tx.get(Stores.purchases, proposalId);
+ const purchase = await tx.purchases.get(proposalId);
if (purchase) {
found = true;
- await tx.delete(Stores.proposals, proposalId);
+ await tx.proposals.delete(proposalId);
}
if (found) {
- await tx.put(Stores.tombstones, {
+ await tx.tombstones.put({
id: TombstoneTag.DeletePayment + ":" + proposalId,
});
}
- },
- );
+ });
} else if (type === TransactionType.Refresh) {
const refreshGroupId = rest[0];
- await ws.db.runWithWriteTransaction(
- [Stores.refreshGroups, Stores.tombstones],
- async (tx) => {
- const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
+ await ws.db
+ .mktx((x) => ({
+ refreshGroups: x.refreshGroups,
+ tombstones: x.tombstones,
+ }))
+ .runReadWrite(async (tx) => {
+ const rg = await tx.refreshGroups.get(refreshGroupId);
if (rg) {
- await tx.delete(Stores.refreshGroups, refreshGroupId);
- await tx.put(Stores.tombstones, {
+ await tx.refreshGroups.delete(refreshGroupId);
+ await tx.tombstones.put({
id: TombstoneTag.DeleteRefreshGroup + ":" + refreshGroupId,
});
}
- },
- );
+ });
} else if (type === TransactionType.Tip) {
const tipId = rest[0];
- await ws.db.runWithWriteTransaction(
- [Stores.tips, Stores.tombstones],
- async (tx) => {
- const tipRecord = await tx.get(Stores.tips, tipId);
+ await ws.db
+ .mktx((x) => ({
+ tips: x.tips,
+ tombstones: x.tombstones,
+ }))
+ .runReadWrite(async (tx) => {
+ const tipRecord = await tx.tips.get(tipId);
if (tipRecord) {
- await tx.delete(Stores.tips, tipId);
- await tx.put(Stores.tombstones, {
+ await tx.tips.delete(tipId);
+ await tx.tombstones.put({
id: TombstoneTag.DeleteTip + ":" + tipId,
});
}
- },
- );
+ });
} else if (type === TransactionType.Deposit) {
const depositGroupId = rest[0];
- await ws.db.runWithWriteTransaction(
- [Stores.depositGroups, Stores.tombstones],
- async (tx) => {
- const tipRecord = await tx.get(Stores.depositGroups, depositGroupId);
+ await ws.db
+ .mktx((x) => ({
+ depositGroups: x.depositGroups,
+ tombstones: x.tombstones,
+ }))
+ .runReadWrite(async (tx) => {
+ const tipRecord = await tx.depositGroups.get(depositGroupId);
if (tipRecord) {
- await tx.delete(Stores.depositGroups, depositGroupId);
- await tx.put(Stores.tombstones, {
+ await tx.depositGroups.delete(depositGroupId);
+ await tx.tombstones.put({
id: TombstoneTag.DeleteDepositGroup + ":" + depositGroupId,
});
}
- },
- );
+ });
} else if (type === TransactionType.Refund) {
const proposalId = rest[0];
const executionTimeStr = rest[1];
- await ws.db.runWithWriteTransaction(
- [Stores.proposals, Stores.purchases, Stores.tombstones],
- async (tx) => {
- const purchase = await tx.get(Stores.purchases, proposalId);
+ await ws.db
+ .mktx((x) => ({
+ proposals: x.proposals,
+ purchases: x.purchases,
+ tombstones: x.tombstones,
+ }))
+ .runReadWrite(async (tx) => {
+ const purchase = await tx.purchases.get(proposalId);
if (purchase) {
// This should just influence the history view,
// but won't delete any actual refund information.
- await tx.put(Stores.tombstones, {
+ await tx.tombstones.put({
id: makeEventId(
TombstoneTag.DeleteRefund,
proposalId,
@@ -517,8 +536,7 @@ export async function deleteTransaction(
),
});
}
- },
- );
+ });
} else {
throw Error(`can't delete a '${type}' transaction`);
}
diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts
index 36be84df0..1266a3b0f 100644
--- a/packages/taler-wallet-core/src/operations/withdraw.ts
+++ b/packages/taler-wallet-core/src/operations/withdraw.ts
@@ -26,7 +26,6 @@ import {
} from "@gnu-taler/taler-util";
import {
DenominationRecord,
- Stores,
DenominationStatus,
CoinStatus,
CoinRecord,
@@ -314,13 +313,17 @@ export async function getCandidateWithdrawalDenoms(
exchangeBaseUrl: string,
): Promise<DenominationRecord[]> {
return await ws.db
- .iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchangeBaseUrl)
- .filter((d) => {
- return (
- (d.status === DenominationStatus.Unverified ||
- d.status === DenominationStatus.VerifiedGood) &&
- !d.isRevoked
- );
+ .mktx((x) => ({ denominations: x.denominations }))
+ .runReadOnly(async (tx) => {
+ return tx.denominations.indexes.byExchangeBaseUrl
+ .iter(exchangeBaseUrl)
+ .filter((d) => {
+ return (
+ (d.status === DenominationStatus.Unverified ||
+ d.status === DenominationStatus.VerifiedGood) &&
+ !d.isRevoked
+ );
+ });
});
}
@@ -336,17 +339,24 @@ async function processPlanchetGenerate(
withdrawalGroupId: string,
coinIdx: number,
): Promise<void> {
- const withdrawalGroup = await ws.db.get(
- Stores.withdrawalGroups,
- withdrawalGroupId,
- );
+ const withdrawalGroup = await ws.db
+ .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
+ .runReadOnly(async (tx) => {
+ return await tx.withdrawalGroups.get(withdrawalGroupId);
+ });
if (!withdrawalGroup) {
return;
}
- let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [
- withdrawalGroupId,
- coinIdx,
- ]);
+ let planchet = await ws.db
+ .mktx((x) => ({
+ planchets: x.planchets,
+ }))
+ .runReadOnly(async (tx) => {
+ return tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ });
if (!planchet) {
let ci = 0;
let denomPubHash: string | undefined;
@@ -365,20 +375,26 @@ async function processPlanchetGenerate(
if (!denomPubHash) {
throw Error("invariant violated");
}
- const denom = await ws.db.get(Stores.denominations, [
- withdrawalGroup.exchangeBaseUrl,
- denomPubHash,
- ]);
- if (!denom) {
- throw Error("invariant violated");
- }
- const reserve = await ws.db.get(
- Stores.reserves,
- withdrawalGroup.reservePub,
- );
- if (!reserve) {
- throw Error("invariant violated");
- }
+
+ const { denom, reserve } = await ws.db
+ .mktx((x) => ({
+ reserves: x.reserves,
+ denominations: x.denominations,
+ }))
+ .runReadOnly(async (tx) => {
+ const denom = await tx.denominations.get([
+ withdrawalGroup.exchangeBaseUrl,
+ denomPubHash!,
+ ]);
+ if (!denom) {
+ throw Error("invariant violated");
+ }
+ const reserve = await tx.reserves.get(withdrawalGroup.reservePub);
+ if (!reserve) {
+ throw Error("invariant violated");
+ }
+ return { denom, reserve };
+ });
const r = await ws.cryptoApi.createPlanchet({
denomPub: denom.denomPub,
feeWithdraw: denom.feeWithdraw,
@@ -405,18 +421,20 @@ async function processPlanchetGenerate(
withdrawalGroupId: withdrawalGroupId,
lastError: undefined,
};
- await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => {
- const p = await tx.getIndexed(Stores.planchets.byGroupAndIndex, [
- withdrawalGroupId,
- coinIdx,
- ]);
- if (p) {
- planchet = p;
- return;
- }
- await tx.put(Stores.planchets, newPlanchet);
- planchet = newPlanchet;
- });
+ await ws.db
+ .mktx((x) => ({ planchets: x.planchets }))
+ .runReadWrite(async (tx) => {
+ const p = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (p) {
+ planchet = p;
+ return;
+ }
+ await tx.planchets.put(newPlanchet);
+ planchet = newPlanchet;
+ });
}
}
@@ -430,59 +448,70 @@ async function processPlanchetExchangeRequest(
withdrawalGroupId: string,
coinIdx: number,
): Promise<WithdrawResponse | undefined> {
- const withdrawalGroup = await ws.db.get(
- Stores.withdrawalGroups,
- withdrawalGroupId,
- );
- if (!withdrawalGroup) {
- return;
- }
- let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [
- withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- if (planchet.withdrawalDone) {
- logger.warn("processPlanchet: planchet already withdrawn");
- return;
- }
- const exchange = await ws.db.get(
- Stores.exchanges,
- withdrawalGroup.exchangeBaseUrl,
- );
- if (!exchange) {
- logger.error("db inconsistent: exchange for planchet not found");
- return;
- }
+ const d = await ws.db
+ .mktx((x) => ({
+ withdrawalGroups: x.withdrawalGroups,
+ planchets: x.planchets,
+ exchanges: x.exchanges,
+ denominations: x.denominations,
+ }))
+ .runReadOnly(async (tx) => {
+ const withdrawalGroup = await tx.withdrawalGroups.get(withdrawalGroupId);
+ if (!withdrawalGroup) {
+ return;
+ }
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ if (planchet.withdrawalDone) {
+ logger.warn("processPlanchet: planchet already withdrawn");
+ return;
+ }
+ const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
+ if (!exchange) {
+ logger.error("db inconsistent: exchange for planchet not found");
+ return;
+ }
- const denom = await ws.db.get(Stores.denominations, [
- withdrawalGroup.exchangeBaseUrl,
- planchet.denomPubHash,
- ]);
+ const denom = await tx.denominations.get([
+ withdrawalGroup.exchangeBaseUrl,
+ planchet.denomPubHash,
+ ]);
- if (!denom) {
- console.error("db inconsistent: denom for planchet not found");
- return;
- }
+ if (!denom) {
+ console.error("db inconsistent: denom for planchet not found");
+ return;
+ }
- logger.trace(
- `processing planchet #${coinIdx} in withdrawal ${withdrawalGroupId}`,
- );
+ logger.trace(
+ `processing planchet #${coinIdx} in withdrawal ${withdrawalGroupId}`,
+ );
- const wd: any = {};
- wd.denom_pub_hash = planchet.denomPubHash;
- wd.reserve_pub = planchet.reservePub;
- wd.reserve_sig = planchet.withdrawSig;
- wd.coin_ev = planchet.coinEv;
- const reqUrl = new URL(
- `reserves/${planchet.reservePub}/withdraw`,
- exchange.baseUrl,
- ).href;
+ const reqBody: any = {
+ denom_pub_hash: planchet.denomPubHash,
+ reserve_pub: planchet.reservePub,
+ reserve_sig: planchet.withdrawSig,
+ coin_ev: planchet.coinEv,
+ };
+ const reqUrl = new URL(
+ `reserves/${planchet.reservePub}/withdraw`,
+ exchange.baseUrl,
+ ).href;
+
+ return { reqUrl, reqBody };
+ });
+
+ if (!d) {
+ return;
+ }
+ const { reqUrl, reqBody } = d;
try {
- const resp = await ws.http.postJson(reqUrl, wd);
+ const resp = await ws.http.postJson(reqUrl, reqBody);
const r = await readSuccessResponseJsonOrThrow(
resp,
codecForWithdrawResponse(),
@@ -495,17 +524,19 @@ async function processPlanchetExchangeRequest(
throw e;
}
const errDetails = e.operationError;
- await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => {
- let planchet = await tx.getIndexed(Stores.planchets.byGroupAndIndex, [
- withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- planchet.lastError = errDetails;
- await tx.put(Stores.planchets, planchet);
- });
+ await ws.db
+ .mktx((x) => ({ planchets: x.planchets }))
+ .runReadWrite(async (tx) => {
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ planchet.lastError = errDetails;
+ await tx.planchets.put(planchet);
+ });
return;
}
}
@@ -516,25 +547,36 @@ async function processPlanchetVerifyAndStoreCoin(
coinIdx: number,
resp: WithdrawResponse,
): Promise<void> {
- const withdrawalGroup = await ws.db.get(
- Stores.withdrawalGroups,
- withdrawalGroupId,
- );
- if (!withdrawalGroup) {
- return;
- }
- let planchet = await ws.db.getIndexed(Stores.planchets.byGroupAndIndex, [
- withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- if (planchet.withdrawalDone) {
- logger.warn("processPlanchet: planchet already withdrawn");
+ const d = await ws.db
+ .mktx((x) => ({
+ withdrawalGroups: x.withdrawalGroups,
+ planchets: x.planchets,
+ }))
+ .runReadOnly(async (tx) => {
+ const withdrawalGroup = await tx.withdrawalGroups.get(withdrawalGroupId);
+ if (!withdrawalGroup) {
+ return;
+ }
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ if (planchet.withdrawalDone) {
+ logger.warn("processPlanchet: planchet already withdrawn");
+ return;
+ }
+ return { planchet, exchangeBaseUrl: withdrawalGroup.exchangeBaseUrl };
+ });
+
+ if (!d) {
return;
}
+ const { planchet, exchangeBaseUrl } = d;
+
const denomSig = await ws.cryptoApi.rsaUnblind(
resp.ev_sig,
planchet.blindingKey,
@@ -548,21 +590,23 @@ async function processPlanchetVerifyAndStoreCoin(
);
if (!isValid) {
- await ws.db.runWithWriteTransaction([Stores.planchets], async (tx) => {
- let planchet = await tx.getIndexed(Stores.planchets.byGroupAndIndex, [
- withdrawalGroupId,
- coinIdx,
- ]);
- if (!planchet) {
- return;
- }
- planchet.lastError = makeErrorDetails(
- TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID,
- "invalid signature from the exchange after unblinding",
- {},
- );
- await tx.put(Stores.planchets, planchet);
- });
+ await ws.db
+ .mktx((x) => ({ planchets: x.planchets }))
+ .runReadWrite(async (tx) => {
+ let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
+ withdrawalGroupId,
+ coinIdx,
+ ]);
+ if (!planchet) {
+ return;
+ }
+ planchet.lastError = makeErrorDetails(
+ TalerErrorCode.WALLET_EXCHANGE_COIN_SIGNATURE_INVALID,
+ "invalid signature from the exchange after unblinding",
+ {},
+ );
+ await tx.planchets.put(planchet);
+ });
return;
}
@@ -575,7 +619,7 @@ async function processPlanchetVerifyAndStoreCoin(
denomPubHash: planchet.denomPubHash,
denomSig,
coinEvHash: planchet.coinEvHash,
- exchangeBaseUrl: withdrawalGroup.exchangeBaseUrl,
+ exchangeBaseUrl: exchangeBaseUrl,
status: CoinStatus.Fresh,
coinSource: {
type: CoinSourceType.Withdraw,
@@ -588,23 +632,27 @@ async function processPlanchetVerifyAndStoreCoin(
const planchetCoinPub = planchet.coinPub;
- const firstSuccess = await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.withdrawalGroups, Stores.reserves, Stores.planchets],
- async (tx) => {
- const ws = await tx.get(Stores.withdrawalGroups, withdrawalGroupId);
+ const firstSuccess = await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ withdrawalGroups: x.withdrawalGroups,
+ reserves: x.reserves,
+ planchets: x.planchets,
+ }))
+ .runReadWrite(async (tx) => {
+ const ws = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!ws) {
return false;
}
- const p = await tx.get(Stores.planchets, planchetCoinPub);
+ const p = await tx.planchets.get(planchetCoinPub);
if (!p || p.withdrawalDone) {
return false;
}
p.withdrawalDone = true;
- await tx.put(Stores.planchets, p);
- await tx.add(Stores.coins, coin);
+ await tx.planchets.put(p);
+ await tx.coins.add(coin);
return true;
- },
- );
+ });
if (firstSuccess) {
ws.notify({
@@ -636,12 +684,14 @@ export async function updateWithdrawalDenoms(
ws: InternalWalletState,
exchangeBaseUrl: string,
): Promise<void> {
- const exchangeDetails = await ws.db.runWithReadTransaction(
- [Stores.exchanges, Stores.exchangeDetails],
- async (tx) => {
+ const exchangeDetails = await ws.db
+ .mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ }))
+ .runReadOnly(async (tx) => {
return getExchangeDetails(tx, exchangeBaseUrl);
- },
- );
+ });
if (!exchangeDetails) {
logger.error("exchange details not available");
throw Error(`exchange ${exchangeBaseUrl} details not available`);
@@ -663,7 +713,11 @@ export async function updateWithdrawalDenoms(
} else {
denom.status = DenominationStatus.VerifiedGood;
}
- await ws.db.put(Stores.denominations, denom);
+ await ws.db
+ .mktx((x) => ({ denominations: x.denominations }))
+ .runReadWrite(async (tx) => {
+ await tx.denominations.put(denom);
+ });
}
}
// FIXME: This debug info should either be made conditional on some flag
@@ -698,16 +752,18 @@ async function incrementWithdrawalRetry(
withdrawalGroupId: string,
err: TalerErrorDetails | undefined,
): Promise<void> {
- await ws.db.runWithWriteTransaction([Stores.withdrawalGroups], async (tx) => {
- const wsr = await tx.get(Stores.withdrawalGroups, withdrawalGroupId);
- if (!wsr) {
- return;
- }
- wsr.retryInfo.retryCounter++;
- updateRetryInfoTimeout(wsr.retryInfo);
- wsr.lastError = err;
- await tx.put(Stores.withdrawalGroups, wsr);
- });
+ await ws.db
+ .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
+ .runReadWrite(async (tx) => {
+ const wsr = await tx.withdrawalGroups.get(withdrawalGroupId);
+ if (!wsr) {
+ return;
+ }
+ wsr.retryInfo.retryCounter++;
+ updateRetryInfoTimeout(wsr.retryInfo);
+ wsr.lastError = err;
+ await tx.withdrawalGroups.put(wsr);
+ });
if (err) {
ws.notify({ type: NotificationType.WithdrawOperationError, error: err });
}
@@ -730,12 +786,15 @@ async function resetWithdrawalGroupRetry(
ws: InternalWalletState,
withdrawalGroupId: string,
): Promise<void> {
- await ws.db.mutate(Stores.withdrawalGroups, withdrawalGroupId, (x) => {
- if (x.retryInfo.active) {
- x.retryInfo = initRetryInfo();
- }
- return x;
- });
+ await ws.db
+ .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
+ .runReadWrite(async (tx) => {
+ const x = await tx.withdrawalGroups.get(withdrawalGroupId);
+ if (x && x.retryInfo.active) {
+ x.retryInfo = initRetryInfo();
+ await tx.withdrawalGroups.put(x);
+ }
+ });
}
async function processWithdrawGroupImpl(
@@ -747,10 +806,11 @@ async function processWithdrawGroupImpl(
if (forceNow) {
await resetWithdrawalGroupRetry(ws, withdrawalGroupId);
}
- const withdrawalGroup = await ws.db.get(
- Stores.withdrawalGroups,
- withdrawalGroupId,
- );
+ const withdrawalGroup = await ws.db
+ .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
+ .runReadOnly(async (tx) => {
+ return tx.withdrawalGroups.get(withdrawalGroupId);
+ });
if (!withdrawalGroup) {
logger.trace("withdraw session doesn't exist");
return;
@@ -793,16 +853,21 @@ async function processWithdrawGroupImpl(
let finishedForFirstTime = false;
let errorsPerCoin: Record<number, TalerErrorDetails> = {};
- await ws.db.runWithWriteTransaction(
- [Stores.coins, Stores.withdrawalGroups, Stores.reserves, Stores.planchets],
- async (tx) => {
- const wg = await tx.get(Stores.withdrawalGroups, withdrawalGroupId);
+ await ws.db
+ .mktx((x) => ({
+ coins: x.coins,
+ withdrawalGroups: x.withdrawalGroups,
+ reserves: x.reserves,
+ planchets: x.planchets,
+ }))
+ .runReadWrite(async (tx) => {
+ const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
return;
}
- await tx
- .iterIndexed(Stores.planchets.byGroup, withdrawalGroupId)
+ await tx.planchets.indexes.byGroup
+ .iter(withdrawalGroupId)
.forEach((x) => {
if (x.withdrawalDone) {
numFinished++;
@@ -819,9 +884,8 @@ async function processWithdrawGroupImpl(
wg.retryInfo = initRetryInfo(false);
}
- await tx.put(Stores.withdrawalGroups, wg);
- },
- );
+ await tx.withdrawalGroups.put(wg);
+ });
if (numFinished != numTotalCoins) {
throw OperationFailedError.fromCode(
@@ -871,8 +935,12 @@ export async function getExchangeWithdrawalInfo(
}
const possibleDenoms = await ws.db
- .iterIndex(Stores.denominations.exchangeBaseUrlIndex, baseUrl)
- .filter((d) => d.isOffered);
+ .mktx((x) => ({ denominations: x.denominations }))
+ .runReadOnly(async (tx) => {
+ return tx.denominations.indexes.byExchangeBaseUrl
+ .iter()
+ .filter((d) => d.isOffered);
+ });
let versionMatch;
if (exchangeDetails.protocolVersion) {
@@ -953,23 +1021,24 @@ export async function getWithdrawalDetailsForUri(
const exchanges: ExchangeListItem[] = [];
- const exchangeRecords = await ws.db.iter(Stores.exchanges).toArray();
-
- for (const r of exchangeRecords) {
- const details = await ws.db.runWithReadTransaction(
- [Stores.exchanges, Stores.exchangeDetails],
- async (tx) => {
- return getExchangeDetails(tx, r.baseUrl);
- },
- );
- if (details) {
- exchanges.push({
- exchangeBaseUrl: details.exchangeBaseUrl,
- currency: details.currency,
- paytoUris: details.wireInfo.accounts.map((x) => x.payto_uri),
- });
- }
- }
+ await ws.db
+ .mktx((x) => ({
+ exchanges: x.exchanges,
+ exchangeDetails: x.exchangeDetails,
+ }))
+ .runReadOnly(async (tx) => {
+ const exchangeRecords = await tx.exchanges.iter().toArray();
+ for (const r of exchangeRecords) {
+ const details = await getExchangeDetails(tx, r.baseUrl);
+ if (details) {
+ exchanges.push({
+ exchangeBaseUrl: details.exchangeBaseUrl,
+ currency: details.currency,
+ paytoUris: details.wireInfo.accounts.map((x) => x.payto_uri),
+ });
+ }
+ }
+ });
return {
amount: Amounts.stringify(info.amount),