diff options
Diffstat (limited to 'packages/taler-wallet-core/src/operations/refresh.ts')
-rw-r--r-- | packages/taler-wallet-core/src/operations/refresh.ts | 239 |
1 files changed, 116 insertions, 123 deletions
diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index b9ac12518..ad9fdedb4 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -72,7 +72,6 @@ import { RefreshCoinStatus, RefreshGroupRecord, RefreshOperationStatus, - WalletStoresV1, } from "../db.js"; import { getCandidateWithdrawalDenomsTx, @@ -81,7 +80,8 @@ import { RefreshSessionRecord, TaskId, timestampPreciseToDb, - WalletDbReadWriteTransactionArr, + WalletDbReadOnlyTransaction, + WalletDbReadWriteTransaction, } from "../index.js"; import { EXCHANGE_COINS_LOCK, @@ -90,7 +90,6 @@ import { import { assertUnreachable } from "../util/assertUnreachable.js"; import { selectWithdrawalDenominations } from "../util/coinSelection.js"; import { checkDbInvariant } from "../util/invariants.js"; -import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js"; import { constructTaskIdentifier, makeCoinAvailable, @@ -129,48 +128,44 @@ export class RefreshTransactionContext implements TransactionContext { async deleteTransaction(): Promise<void> { const refreshGroupId = this.refreshGroupId; const ws = this.ws; - await ws.db - .mktx((x) => [x.refreshGroups, x.tombstones]) - .runReadWrite(async (tx) => { - const rg = await tx.refreshGroups.get(refreshGroupId); - if (rg) { - await tx.refreshGroups.delete(refreshGroupId); - await tx.tombstones.put({ - id: TombstoneTag.DeleteRefreshGroup + ":" + refreshGroupId, - }); - } - }); + await ws.db.runReadWriteTx(["refreshGroups", "tombstones"], async (tx) => { + const rg = await tx.refreshGroups.get(refreshGroupId); + if (rg) { + await tx.refreshGroups.delete(refreshGroupId); + await tx.tombstones.put({ + id: TombstoneTag.DeleteRefreshGroup + ":" + refreshGroupId, + }); + } + }); } async suspendTransaction(): Promise<void> { const { ws, refreshGroupId, transactionId } = this; - let res = await ws.db - .mktx((x) => [x.refreshGroups]) - .runReadWrite(async (tx) => { - const dg = await tx.refreshGroups.get(refreshGroupId); - if (!dg) { - logger.warn( - `can't suspend refresh group, refreshGroupId=${refreshGroupId} not found`, - ); + let res = await ws.db.runReadWriteTx(["refreshGroups"], async (tx) => { + const dg = await tx.refreshGroups.get(refreshGroupId); + if (!dg) { + logger.warn( + `can't suspend refresh group, refreshGroupId=${refreshGroupId} not found`, + ); + return undefined; + } + const oldState = computeRefreshTransactionState(dg); + switch (dg.operationStatus) { + case RefreshOperationStatus.Finished: return undefined; + case RefreshOperationStatus.Pending: { + dg.operationStatus = RefreshOperationStatus.Suspended; + await tx.refreshGroups.put(dg); + return { + oldTxState: oldState, + newTxState: computeRefreshTransactionState(dg), + }; } - const oldState = computeRefreshTransactionState(dg); - switch (dg.operationStatus) { - case RefreshOperationStatus.Finished: - return undefined; - case RefreshOperationStatus.Pending: { - dg.operationStatus = RefreshOperationStatus.Suspended; - await tx.refreshGroups.put(dg); - return { - oldTxState: oldState, - newTxState: computeRefreshTransactionState(dg), - }; - } - case RefreshOperationStatus.Suspended: - return undefined; - } - return undefined; - }); + case RefreshOperationStatus.Suspended: + return undefined; + } + return undefined; + }); if (res) { ws.notify({ type: NotificationType.TransactionStateTransition, @@ -188,9 +183,9 @@ export class RefreshTransactionContext implements TransactionContext { async resumeTransaction(): Promise<void> { const { ws, refreshGroupId, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups"], + async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); if (!dg) { logger.warn( @@ -214,16 +209,17 @@ export class RefreshTransactionContext implements TransactionContext { }; } return undefined; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(this.taskId); } async failTransaction(): Promise<void> { const { ws, refreshGroupId, transactionId } = this; - const transitionInfo = await ws.db - .mktx((x) => [x.refreshGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups"], + async (tx) => { const dg = await tx.refreshGroups.get(refreshGroupId); if (!dg) { logger.warn( @@ -253,7 +249,8 @@ export class RefreshTransactionContext implements TransactionContext { oldTxState: oldState, newTxState: computeRefreshTransactionState(dg), }; - }); + }, + ); ws.taskScheduler.stopShepherdTask(this.taskId); notifyTransition(ws, transactionId, transitionInfo); ws.taskScheduler.startShepherdTask(this.taskId); @@ -341,9 +338,9 @@ async function provideRefreshSession( `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, ); - const d = await ws.db - .mktx((x) => [x.refreshGroups, x.coins, x.refreshSessions]) - .runReadWrite(async (tx) => { + const d = await ws.db.runReadWriteTx( + ["coins", "refreshGroups", "refreshSessions"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; @@ -363,7 +360,8 @@ async function provideRefreshSession( throw Error("Can't refresh, coin not found"); } return { refreshGroup, coin, existingRefreshSession }; - }); + }, + ); if (!d) { return undefined; @@ -380,9 +378,9 @@ async function provideRefreshSession( // FIXME: use helper functions from withdraw.ts // to update and filter withdrawable denoms. - const { availableAmount, availableDenoms } = await ws.db - .mktx((x) => [x.denominations]) - .runReadOnly(async (tx) => { + const { availableAmount, availableDenoms } = await ws.db.runReadOnlyTx( + ["denominations"], + async (tx) => { const oldDenom = await ws.getDenomInfo( ws, tx, @@ -405,7 +403,8 @@ async function provideRefreshSession( oldDenom.feeRefresh, ).amount; return { availableAmount, availableDenoms }; - }); + }, + ); const newCoinDenoms = selectWithdrawalDenominations( availableAmount, @@ -424,9 +423,9 @@ async function provideRefreshSession( availableAmount, )} too small`, ); - const transitionInfo = await ws.db - .mktx((x) => [x.coins, x.coinAvailability, x.refreshGroups]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups", "coins", "coinAvailability"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; @@ -440,7 +439,8 @@ async function provideRefreshSession( await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState }; - }); + }, + ); ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, @@ -452,9 +452,9 @@ async function provideRefreshSession( const sessionSecretSeed = encodeCrock(getRandomBytes(64)); // Store refresh session for this coin in the database. - const mySession = await ws.db - .mktx((x) => [x.refreshGroups, x.coins, x.refreshSessions]) - .runReadWrite(async (tx) => { + const mySession = await ws.db.runReadWriteTx( + ["refreshGroups", "refreshSessions"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; @@ -479,7 +479,8 @@ async function provideRefreshSession( }; await tx.refreshSessions.put(newSession); return newSession; - }); + }, + ); logger.trace( `found/created refresh session for coin #${coinIndex} in ${refreshGroupId}`, ); @@ -497,9 +498,9 @@ async function refreshMelt( refreshGroupId: string, coinIndex: number, ): Promise<void> { - const d = await ws.db - .mktx((x) => [x.refreshGroups, x.refreshSessions, x.coins, x.denominations]) - .runReadWrite(async (tx) => { + const d = await ws.db.runReadWriteTx( + ["refreshGroups", "refreshSessions", "coins", "denominations"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; @@ -550,7 +551,8 @@ async function refreshMelt( }); } return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession }; - }); + }, + ); if (!d) { return; @@ -618,14 +620,9 @@ async function refreshMelt( if (resp.status === HttpStatusCode.NotFound) { const errDetails = await readUnexpectedResponseDetails(resp); - const transitionInfo = await ws.db - .mktx((x) => [ - x.refreshGroups, - x.refreshSessions, - x.coins, - x.coinAvailability, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + ["refreshGroups", "refreshSessions", "coins", "coinAvailability"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; @@ -659,7 +656,8 @@ async function refreshMelt( oldTxState, newTxState, }; - }); + }, + ); ws.notify({ type: NotificationType.BalanceChange, hintTransactionId: transactionId, @@ -710,9 +708,9 @@ async function refreshMelt( refreshSession.norevealIndex = norevealIndex; - await ws.db - .mktx((x) => [x.refreshGroups, x.refreshSessions]) - .runReadWrite(async (tx) => { + await ws.db.runReadWriteTx( + ["refreshGroups", "refreshSessions"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { return; @@ -729,7 +727,8 @@ async function refreshMelt( } rs.norevealIndex = norevealIndex; await tx.refreshSessions.put(rs); - }); + }, + ); } export async function assembleRefreshRevealRequest(args: { @@ -798,9 +797,9 @@ async function refreshReveal( logger.trace( `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`, ); - const d = await ws.db - .mktx((x) => [x.refreshGroups, x.refreshSessions, x.coins, x.denominations]) - .runReadOnly(async (tx) => { + const d = await ws.db.runReadOnlyTx( + ["refreshGroups", "refreshSessions", "coins", "denominations"], + async (tx) => { const refreshGroup = await tx.refreshGroups.get(refreshGroupId); if (!refreshGroup) { return; @@ -859,7 +858,8 @@ async function refreshReveal( refreshGroup, norevealIndex, }; - }); + }, + ); if (!d) { return; @@ -972,15 +972,15 @@ async function refreshReveal( } } - const transitionInfo = await ws.db - .mktx((x) => [ - x.coins, - x.denominations, - x.coinAvailability, - x.refreshGroups, - x.refreshSessions, - ]) - .runReadWrite(async (tx) => { + const transitionInfo = await ws.db.runReadWriteTx( + [ + "coins", + "denominations", + "coinAvailability", + "refreshGroups", + "refreshSessions", + ], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); if (!rg) { logger.warn("no refresh session found"); @@ -1000,7 +1000,8 @@ async function refreshReveal( await tx.refreshGroups.put(rg); const newTxState = computeRefreshTransactionState(rg); return { oldTxState, newTxState }; - }); + }, + ); notifyTransition(ws, transactionId, transitionInfo); logger.trace("refresh finished (end of reveal)"); } @@ -1012,9 +1013,10 @@ export async function processRefreshGroup( ): Promise<TaskRunResult> { logger.trace(`processing refresh group ${refreshGroupId}`); - const refreshGroup = await ws.db - .mktx((x) => [x.refreshGroups]) - .runReadOnly(async (tx) => tx.refreshGroups.get(refreshGroupId)); + const refreshGroup = await ws.db.runReadOnlyTx( + ["refreshGroups"], + async (tx) => tx.refreshGroups.get(refreshGroupId), + ); if (!refreshGroup) { return TaskRunResult.finished(); } @@ -1084,16 +1086,17 @@ async function processRefreshSession( logger.trace( `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, ); - let { refreshGroup, refreshSession } = await ws.db - .mktx((x) => [x.refreshGroups, x.refreshSessions]) - .runReadOnly(async (tx) => { + let { refreshGroup, refreshSession } = await ws.db.runReadOnlyTx( + ["refreshGroups", "refreshSessions"], + async (tx) => { const rg = await tx.refreshGroups.get(refreshGroupId); const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]); return { refreshGroup: rg, refreshSession: rs, }; - }); + }, + ); if (!refreshGroup) { return; } @@ -1122,12 +1125,9 @@ export interface RefreshOutputInfo { export async function calculateRefreshOutput( ws: InternalWalletState, - tx: GetReadOnlyAccess<{ - denominations: typeof WalletStoresV1.denominations; - coins: typeof WalletStoresV1.coins; - refreshGroups: typeof WalletStoresV1.refreshGroups; - coinAvailability: typeof WalletStoresV1.coinAvailability; - }>, + tx: WalletDbReadOnlyTransaction< + ["denominations", "coins", "refreshGroups", "coinAvailability"] + >, currency: string, oldCoinPubs: CoinRefreshRequest[], ): Promise<RefreshOutputInfo> { @@ -1196,12 +1196,9 @@ export async function calculateRefreshOutput( async function applyRefresh( ws: InternalWalletState, - tx: GetReadWriteAccess<{ - denominations: typeof WalletStoresV1.denominations; - coins: typeof WalletStoresV1.coins; - refreshGroups: typeof WalletStoresV1.refreshGroups; - coinAvailability: typeof WalletStoresV1.coinAvailability; - }>, + tx: WalletDbReadWriteTransaction< + ["denominations", "coins", "refreshGroups", "coinAvailability"] + >, oldCoinPubs: CoinRefreshRequest[], refreshGroupId: string, ): Promise<void> { @@ -1272,7 +1269,7 @@ export interface CreateRefreshGroupResult { */ export async function createRefreshGroup( ws: InternalWalletState, - tx: WalletDbReadWriteTransactionArr< + tx: WalletDbReadWriteTransaction< ["denominations", "coins", "refreshGroups", "coinAvailability"] >, currency: string, @@ -1395,14 +1392,9 @@ export async function forceRefresh( if (req.coinPubList.length == 0) { throw Error("refusing to create empty refresh group"); } - const refreshGroupId = await ws.db - .mktx((x) => [ - x.refreshGroups, - x.coinAvailability, - x.denominations, - x.coins, - ]) - .runReadWrite(async (tx) => { + const refreshGroupId = await ws.db.runReadWriteTx( + ["refreshGroups", "coinAvailability", "denominations", "coins"], + async (tx) => { let coinPubs: CoinRefreshRequest[] = []; for (const c of req.coinPubList) { const coin = await tx.coins.get(c); @@ -1429,7 +1421,8 @@ export async function forceRefresh( RefreshReason.Manual, undefined, ); - }); + }, + ); return { refreshGroupId, |