/*
This file is part of GNU Taler
(C) 2019-2024 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see
*/
/**
* @fileoverview
* Implementation of the refresh transaction.
*/
/**
* Imports.
*/
import {
AgeCommitment,
AgeRestriction,
AmountJson,
Amounts,
amountToPretty,
assertUnreachable,
checkDbInvariant,
codecForCoinHistoryResponse,
codecForExchangeMeltResponse,
codecForExchangeRevealResponse,
CoinPublicKeyString,
CoinRefreshRequest,
CoinStatus,
DenominationInfo,
DenomKeyType,
Duration,
encodeCrock,
ExchangeMeltRequest,
ExchangeProtocolVersion,
ExchangeRefreshRevealRequest,
fnutil,
ForceRefreshRequest,
getErrorDetailFromException,
getRandomBytes,
HashCodeString,
HttpStatusCode,
j2s,
Logger,
makeErrorDetail,
NotificationType,
RefreshReason,
TalerErrorCode,
TalerErrorDetail,
TalerPreciseTimestamp,
TransactionAction,
TransactionIdStr,
TransactionMajorState,
TransactionState,
TransactionType,
URL,
WalletNotification,
} from "@gnu-taler/taler-util";
import {
HttpResponse,
readSuccessResponseJsonOrThrow,
readTalerErrorResponse,
throwUnexpectedRequestError,
} from "@gnu-taler/taler-util/http";
import {
constructTaskIdentifier,
genericWaitForState,
makeCoinsVisible,
PendingTaskType,
TaskIdStr,
TaskRunResult,
TaskRunResultType,
TombstoneTag,
TransactionContext,
TransitionResult,
TransitionResultType,
} from "./common.js";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
import {
DerivedRefreshSession,
RefreshNewDenomInfo,
} from "./crypto/cryptoTypes.js";
import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
import {
CoinAvailabilityRecord,
CoinHistoryRecord,
CoinRecord,
CoinSourceType,
DenominationRecord,
RefreshCoinStatus,
RefreshGroupPerExchangeInfo,
RefreshGroupRecord,
RefreshOperationStatus,
RefreshSessionRecord,
timestampPreciseToDb,
WalletDbReadOnlyTransaction,
WalletDbReadWriteTransaction,
WalletDbStoresArr,
} from "./db.js";
import { selectWithdrawalDenominations } from "./denomSelection.js";
import {
constructTransactionIdentifier,
notifyTransition,
TransitionInfo,
} from "./transactions.js";
import {
EXCHANGE_COINS_LOCK,
getDenomInfo,
WalletExecutionContext,
} from "./wallet.js";
import { getCandidateWithdrawalDenomsTx } from "./withdraw.js";
const logger = new Logger("refresh.ts");
/**
* Update the materialized refresh transaction based
* on the refresh group record.
*/
async function updateRefreshTransaction(
ctx: RefreshTransactionContext,
tx: WalletDbReadWriteTransaction<
[
"refreshGroups",
"transactions",
"operationRetries",
"exchanges",
"exchangeDetails",
]
>,
): Promise {}
export class RefreshTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
readonly taskId: TaskIdStr;
constructor(
public wex: WalletExecutionContext,
public refreshGroupId: string,
) {
this.transactionId = constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId,
});
this.taskId = constructTaskIdentifier({
tag: PendingTaskType.Refresh,
refreshGroupId,
});
}
/**
* Transition a withdrawal transaction.
* Extra object stores may be accessed during the transition.
*/
async transition(
opts: { extraStores?: StoreNameArray; transactionLabel?: string },
f: (
rec: RefreshGroupRecord | undefined,
tx: WalletDbReadWriteTransaction<
[
"refreshGroups",
"transactions",
"operationRetries",
"exchanges",
"exchangeDetails",
...StoreNameArray,
]
>,
) => Promise>,
): Promise {
const baseStores = [
"refreshGroups" as const,
"transactions" as const,
"operationRetries" as const,
"exchanges" as const,
"exchangeDetails" as const,
];
let stores = opts.extraStores
? [...baseStores, ...opts.extraStores]
: baseStores;
const transitionInfo = await this.wex.db.runReadWriteTx(
{ storeNames: stores },
async (tx) => {
const wgRec = await tx.refreshGroups.get(this.refreshGroupId);
let oldTxState: TransactionState;
if (wgRec) {
oldTxState = computeRefreshTransactionState(wgRec);
} else {
oldTxState = {
major: TransactionMajorState.None,
};
}
const res = await f(wgRec, tx);
switch (res.type) {
case TransitionResultType.Transition: {
await tx.refreshGroups.put(res.rec);
await updateRefreshTransaction(this, tx);
const newTxState = computeRefreshTransactionState(res.rec);
return {
oldTxState,
newTxState,
};
}
case TransitionResultType.Delete:
await tx.refreshGroups.delete(this.refreshGroupId);
await updateRefreshTransaction(this, tx);
return {
oldTxState,
newTxState: {
major: TransactionMajorState.None,
},
};
default:
return undefined;
}
},
);
notifyTransition(this.wex, this.transactionId, transitionInfo);
return transitionInfo;
}
async deleteTransaction(): Promise {
await this.transition(
{
extraStores: ["tombstones"],
},
async (rec, tx) => {
if (!rec) {
return TransitionResult.stay();
}
await tx.tombstones.put({
id: TombstoneTag.DeleteRefreshGroup + ":" + this.refreshGroupId,
});
return TransitionResult.delete();
},
);
}
async suspendTransaction(): Promise {
await this.transition({}, async (rec, tx) => {
if (!rec) {
return TransitionResult.stay();
}
switch (rec.operationStatus) {
case RefreshOperationStatus.Finished:
case RefreshOperationStatus.Suspended:
case RefreshOperationStatus.Failed:
return TransitionResult.stay();
case RefreshOperationStatus.Pending: {
rec.operationStatus = RefreshOperationStatus.Suspended;
return TransitionResult.transition(rec);
}
default:
assertUnreachable(rec.operationStatus);
}
});
}
async abortTransaction(): Promise {
// Refresh transactions only support fail, not abort.
throw new Error("refresh transactions cannot be aborted");
}
async resumeTransaction(): Promise {
await this.transition({}, async (rec, tx) => {
if (!rec) {
return TransitionResult.stay();
}
switch (rec.operationStatus) {
case RefreshOperationStatus.Finished:
case RefreshOperationStatus.Failed:
case RefreshOperationStatus.Pending:
return TransitionResult.stay();
case RefreshOperationStatus.Suspended: {
rec.operationStatus = RefreshOperationStatus.Pending;
return TransitionResult.transition(rec);
}
default:
assertUnreachable(rec.operationStatus);
}
});
}
async failTransaction(): Promise {
await this.transition({}, async (rec, tx) => {
if (!rec) {
return TransitionResult.stay();
}
switch (rec.operationStatus) {
case RefreshOperationStatus.Finished:
case RefreshOperationStatus.Failed:
return TransitionResult.stay();
case RefreshOperationStatus.Pending:
case RefreshOperationStatus.Suspended: {
rec.operationStatus = RefreshOperationStatus.Failed;
return TransitionResult.transition(rec);
}
default:
assertUnreachable(rec.operationStatus);
}
});
}
}
export async function getTotalRefreshCost(
wex: WalletExecutionContext,
tx: WalletDbReadOnlyTransaction<["denominations"]>,
refreshedDenom: DenominationInfo,
amountLeft: AmountJson,
): Promise {
const cacheKey = `denom=${refreshedDenom.exchangeBaseUrl}/${
refreshedDenom.denomPubHash
};left=${Amounts.stringify(amountLeft)}`;
const cacheRes = wex.ws.refreshCostCache.get(cacheKey);
if (cacheRes) {
return cacheRes;
}
const allDenoms = await getCandidateWithdrawalDenomsTx(
wex,
tx,
refreshedDenom.exchangeBaseUrl,
Amounts.currencyOf(amountLeft),
);
const res = getTotalRefreshCostInternal(
allDenoms,
refreshedDenom,
amountLeft,
);
wex.ws.refreshCostCache.put(cacheKey, res);
return res;
}
/**
* Get the amount that we lose when refreshing a coin of the given denomination
* with a certain amount left.
*
* If the amount left is zero, then the refresh cost
* is also considered to be zero. If a refresh isn't possible (e.g. due to lack of
* the right denominations), then the cost is the full amount left.
*
* Considers refresh fees, withdrawal fees after refresh and amounts too small
* to refresh.
*/
export function getTotalRefreshCostInternal(
denoms: DenominationRecord[],
refreshedDenom: DenominationInfo,
amountLeft: AmountJson,
): AmountJson {
const withdrawAmount = Amounts.sub(
amountLeft,
refreshedDenom.feeRefresh,
).amount;
const denomMap = Object.fromEntries(denoms.map((x) => [x.denomPubHash, x]));
const withdrawDenoms = selectWithdrawalDenominations(
withdrawAmount,
denoms,
false,
);
const resultingAmount = Amounts.add(
Amounts.zeroOfCurrency(withdrawAmount.currency),
...withdrawDenoms.selectedDenoms.map(
(d) => Amounts.mult(denomMap[d.denomPubHash].value, d.count).amount,
),
).amount;
const totalCost = Amounts.sub(amountLeft, resultingAmount).amount;
logger.trace(
`total refresh cost for ${amountToPretty(amountLeft)} is ${amountToPretty(
totalCost,
)}`,
);
return totalCost;
}
async function getCoinAvailabilityForDenom(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
["coins", "coinAvailability", "denominations"]
>,
denom: DenominationInfo,
ageRestriction: number,
): Promise {
let car = await tx.coinAvailability.get([
denom.exchangeBaseUrl,
denom.denomPubHash,
ageRestriction,
]);
if (!car) {
car = {
maxAge: ageRestriction,
value: denom.value,
currency: Amounts.currencyOf(denom.value),
denomPubHash: denom.denomPubHash,
exchangeBaseUrl: denom.exchangeBaseUrl,
freshCoinCount: 0,
visibleCoinCount: 0,
};
}
return car;
}
/**
* Create a refresh session for one particular coin inside a refresh group.
*/
async function initRefreshSession(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
["refreshSessions", "coinAvailability", "coins", "denominations"]
>,
refreshGroup: RefreshGroupRecord,
coinIndex: number,
): Promise {
const refreshGroupId = refreshGroup.refreshGroupId;
logger.trace(
`creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
);
const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
const oldCoin = await tx.coins.get(oldCoinPub);
if (!oldCoin) {
throw Error("Can't refresh, coin not found");
}
const exchangeBaseUrl = oldCoin.exchangeBaseUrl;
const sessionSecretSeed = encodeCrock(getRandomBytes(64));
const oldDenom = await getDenomInfo(
wex,
tx,
exchangeBaseUrl,
oldCoin.denomPubHash,
);
if (!oldDenom) {
throw Error("db inconsistent: denomination for coin not found");
}
const currency = refreshGroup.currency;
const availableDenoms = await getCandidateWithdrawalDenomsTx(
wex,
tx,
exchangeBaseUrl,
currency,
);
const availableAmount = Amounts.sub(
refreshGroup.inputPerCoin[coinIndex],
oldDenom.feeRefresh,
).amount;
const newCoinDenoms = selectWithdrawalDenominations(
availableAmount,
availableDenoms,
wex.ws.config.testing.denomselAllowLate,
);
if (newCoinDenoms.selectedDenoms.length === 0) {
logger.trace(
`not refreshing, available amount ${amountToPretty(
availableAmount,
)} too small`,
);
refreshGroup.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
return;
}
for (let i = 0; i < newCoinDenoms.selectedDenoms.length; i++) {
const dph = newCoinDenoms.selectedDenoms[i].denomPubHash;
const denom = await getDenomInfo(wex, tx, oldDenom.exchangeBaseUrl, dph);
if (!denom) {
logger.error(`denom ${dph} not in DB`);
continue;
}
const car = await getCoinAvailabilityForDenom(
wex,
tx,
denom,
oldCoin.maxAge,
);
car.pendingRefreshOutputCount =
(car.pendingRefreshOutputCount ?? 0) +
newCoinDenoms.selectedDenoms[i].count;
await tx.coinAvailability.put(car);
}
const newSession: RefreshSessionRecord = {
coinIndex,
refreshGroupId,
norevealIndex: undefined,
sessionSecretSeed: sessionSecretSeed,
newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({
count: x.count,
denomPubHash: x.denomPubHash,
})),
amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue),
};
await tx.refreshSessions.put(newSession);
}
/**
* Uninitialize a refresh session.
*
* Adjust the coin availability of involved coins.
*/
async function destroyRefreshSession(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
["denominations", "coinAvailability", "coins"]
>,
refreshGroup: RefreshGroupRecord,
refreshSession: RefreshSessionRecord,
): Promise {
for (let i = 0; i < refreshSession.newDenoms.length; i++) {
const oldCoin = await tx.coins.get(
refreshGroup.oldCoinPubs[refreshSession.coinIndex],
);
if (!oldCoin) {
continue;
}
const dph = refreshSession.newDenoms[i].denomPubHash;
const denom = await getDenomInfo(wex, tx, oldCoin.exchangeBaseUrl, dph);
if (!denom) {
logger.error(`denom ${dph} not in DB`);
continue;
}
const car = await getCoinAvailabilityForDenom(
wex,
tx,
denom,
oldCoin.maxAge,
);
checkDbInvariant(
car.pendingRefreshOutputCount != null,
`no pendingRefreshOutputCount for denom ${dph}`,
);
car.pendingRefreshOutputCount =
car.pendingRefreshOutputCount - refreshSession.newDenoms[i].count;
await tx.coinAvailability.put(car);
}
}
function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration {
return Duration.fromSpec({
seconds: 5,
});
}
/**
* Run the melt step of a refresh session.
*
* If the melt step succeeds or fails permanently,
* the status in the refresh group is updated.
*
* When a transient error occurs, an exception is thrown.
*/
async function refreshMelt(
wex: WalletExecutionContext,
refreshGroupId: string,
coinIndex: number,
): Promise {
const ctx = new RefreshTransactionContext(wex, refreshGroupId);
const d = await wex.db.runReadWriteTx(
{
storeNames: [
"refreshGroups",
"refreshSessions",
"coins",
"denominations",
],
},
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
if (!refreshGroup) {
return;
}
const refreshSession = await tx.refreshSessions.get([
refreshGroupId,
coinIndex,
]);
if (!refreshSession) {
return;
}
if (refreshSession.norevealIndex !== undefined) {
return;
}
const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]);
checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
const oldDenom = await getDenomInfo(
wex,
tx,
oldCoin.exchangeBaseUrl,
oldCoin.denomPubHash,
);
checkDbInvariant(
!!oldDenom,
"denomination for melted coin doesn't exist",
);
const newCoinDenoms: RefreshNewDenomInfo[] = [];
for (const dh of refreshSession.newDenoms) {
const newDenom = await getDenomInfo(
wex,
tx,
oldCoin.exchangeBaseUrl,
dh.denomPubHash,
);
checkDbInvariant(
!!newDenom,
"new denomination for refresh not in database",
);
newCoinDenoms.push({
count: dh.count,
denomPub: newDenom.denomPub,
denomPubHash: newDenom.denomPubHash,
feeWithdraw: newDenom.feeWithdraw,
value: Amounts.stringify(newDenom.value),
});
}
return { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession };
},
);
if (!d) {
return;
}
const { newCoinDenoms, oldCoin, oldDenom, refreshGroup, refreshSession } = d;
let exchangeProtocolVersion: ExchangeProtocolVersion;
switch (d.oldDenom.denomPub.cipher) {
case DenomKeyType.Rsa: {
exchangeProtocolVersion = ExchangeProtocolVersion.V12;
break;
}
default:
throw Error("unsupported key type");
}
const derived = await wex.cryptoApi.deriveRefreshSession({
exchangeProtocolVersion,
kappa: 3,
meltCoinDenomPubHash: oldCoin.denomPubHash,
meltCoinPriv: oldCoin.coinPriv,
meltCoinPub: oldCoin.coinPub,
feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh),
meltCoinMaxAge: oldCoin.maxAge,
meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof,
newCoinDenoms,
sessionSecretSeed: refreshSession.sessionSecretSeed,
});
const reqUrl = new URL(
`coins/${oldCoin.coinPub}/melt`,
oldCoin.exchangeBaseUrl,
);
let maybeAch: HashCodeString | undefined;
if (oldCoin.ageCommitmentProof) {
maybeAch = AgeRestriction.hashCommitment(
oldCoin.ageCommitmentProof.commitment,
);
}
const meltReqBody: ExchangeMeltRequest = {
coin_pub: oldCoin.coinPub,
confirm_sig: derived.confirmSig,
denom_pub_hash: oldCoin.denomPubHash,
denom_sig: oldCoin.denomSig,
rc: derived.hash,
value_with_fee: Amounts.stringify(derived.meltValueWithFee),
age_commitment_hash: maybeAch,
};
const resp = await wex.ws.runSequentialized(
[EXCHANGE_COINS_LOCK],
async () => {
return await wex.http.fetch(reqUrl.href, {
method: "POST",
body: meltReqBody,
timeout: getRefreshRequestTimeout(refreshGroup),
cancellationToken: wex.cancellationToken,
});
},
);
switch (resp.status) {
case HttpStatusCode.NotFound: {
const errDetail = await readTalerErrorResponse(resp);
await handleRefreshMeltNotFound(ctx, coinIndex, resp, errDetail);
return;
}
case HttpStatusCode.Gone: {
const errDetail = await readTalerErrorResponse(resp);
await handleRefreshMeltGone(ctx, coinIndex, errDetail);
return;
}
case HttpStatusCode.Conflict: {
const errDetail = await readTalerErrorResponse(resp);
await handleRefreshMeltConflict(
ctx,
coinIndex,
errDetail,
derived,
oldCoin,
);
return;
}
case HttpStatusCode.Ok:
break;
default: {
const errDetail = await readTalerErrorResponse(resp);
throwUnexpectedRequestError(resp, errDetail);
}
}
const meltResponse = await readSuccessResponseJsonOrThrow(
resp,
codecForExchangeMeltResponse(),
);
const norevealIndex = meltResponse.noreveal_index;
refreshSession.norevealIndex = norevealIndex;
await wex.db.runReadWriteTx(
{ storeNames: ["refreshGroups", "refreshSessions"] },
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
}
if (rg.timestampFinished) {
return;
}
const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
if (!rs) {
return;
}
if (rs.norevealIndex !== undefined) {
return;
}
rs.norevealIndex = norevealIndex;
await tx.refreshSessions.put(rs);
},
);
}
async function handleRefreshMeltGone(
ctx: RefreshTransactionContext,
coinIndex: number,
errDetails: TalerErrorDetail,
): Promise {
// const expiredMsg = codecForDenominationExpiredMessage().decode(errDetails);
// FIXME: Validate signature.
await ctx.wex.db.runReadWriteTx(
{
storeNames: [
"refreshGroups",
"refreshSessions",
"coins",
"denominations",
"coinAvailability",
],
},
async (tx) => {
const rg = await tx.refreshGroups.get(ctx.refreshGroupId);
if (!rg) {
return;
}
if (rg.timestampFinished) {
return;
}
if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
return;
}
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
const refreshSession = await tx.refreshSessions.get([
ctx.refreshGroupId,
coinIndex,
]);
if (!refreshSession) {
throw Error("db invariant failed: missing refresh session in database");
}
refreshSession.lastError = errDetails;
await destroyRefreshSession(ctx.wex, tx, rg, refreshSession);
await tx.refreshGroups.put(rg);
await tx.refreshSessions.put(refreshSession);
},
);
}
async function handleRefreshMeltConflict(
ctx: RefreshTransactionContext,
coinIndex: number,
errDetails: TalerErrorDetail,
derived: DerivedRefreshSession,
oldCoin: CoinRecord,
): Promise {
// Just log for better diagnostics here, error status
// will be handled later.
logger.error(
`melt request for ${Amounts.stringify(
derived.meltValueWithFee,
)} failed in refresh group ${ctx.refreshGroupId} due to conflict`,
);
const historySig = await ctx.wex.cryptoApi.signCoinHistoryRequest({
coinPriv: oldCoin.coinPriv,
coinPub: oldCoin.coinPub,
startOffset: 0,
});
const historyUrl = new URL(
`coins/${oldCoin.coinPub}/history`,
oldCoin.exchangeBaseUrl,
);
const historyResp = await ctx.wex.http.fetch(historyUrl.href, {
method: "GET",
headers: {
"Taler-Coin-History-Signature": historySig.sig,
},
cancellationToken: ctx.wex.cancellationToken,
});
const historyJson = await readSuccessResponseJsonOrThrow(
historyResp,
codecForCoinHistoryResponse(),
);
logger.info(`coin history: ${j2s(historyJson)}`);
// FIXME: If response seems wrong, report to auditor (in the future!);
await ctx.wex.db.runReadWriteTx(
{
storeNames: [
"refreshGroups",
"refreshSessions",
"denominations",
"coins",
"coinAvailability",
],
},
async (tx) => {
const rg = await tx.refreshGroups.get(ctx.refreshGroupId);
if (!rg) {
return;
}
if (rg.timestampFinished) {
return;
}
if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
return;
}
if (Amounts.isZero(historyJson.balance)) {
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
const refreshSession = await tx.refreshSessions.get([
ctx.refreshGroupId,
coinIndex,
]);
if (!refreshSession) {
throw Error(
"db invariant failed: missing refresh session in database",
);
}
refreshSession.lastError = errDetails;
await tx.refreshGroups.put(rg);
await tx.refreshSessions.put(refreshSession);
} else {
// Try again with new denoms!
rg.inputPerCoin[coinIndex] = historyJson.balance;
const refreshSession = await tx.refreshSessions.get([
ctx.refreshGroupId,
coinIndex,
]);
if (!refreshSession) {
throw Error(
"db invariant failed: missing refresh session in database",
);
}
await destroyRefreshSession(ctx.wex, tx, rg, refreshSession);
await tx.refreshSessions.delete([ctx.refreshGroupId, coinIndex]);
await initRefreshSession(ctx.wex, tx, rg, coinIndex);
}
},
);
}
async function handleRefreshMeltNotFound(
ctx: RefreshTransactionContext,
coinIndex: number,
resp: HttpResponse,
errDetails: TalerErrorDetail,
): Promise {
// Make sure that we only act on a 404 that indicates a final problem
// with the coin.
switch (errDetails.code) {
case TalerErrorCode.EXCHANGE_GENERIC_COIN_UNKNOWN:
case TalerErrorCode.EXCHANGE_GENERIC_DENOMINATION_KEY_UNKNOWN:
break;
default:
throwUnexpectedRequestError(resp, errDetails);
}
await ctx.wex.db.runReadWriteTx(
{
storeNames: [
"refreshGroups",
"refreshSessions",
"coins",
"denominations",
"coinAvailability",
],
},
async (tx) => {
const rg = await tx.refreshGroups.get(ctx.refreshGroupId);
if (!rg) {
return;
}
if (rg.timestampFinished) {
return;
}
if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
return;
}
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
const refreshSession = await tx.refreshSessions.get([
ctx.refreshGroupId,
coinIndex,
]);
if (!refreshSession) {
throw Error("db invariant failed: missing refresh session in database");
}
await destroyRefreshSession(ctx.wex, tx, rg, refreshSession);
refreshSession.lastError = errDetails;
await tx.refreshGroups.put(rg);
await tx.refreshSessions.put(refreshSession);
},
);
}
export async function assembleRefreshRevealRequest(args: {
cryptoApi: TalerCryptoInterface;
derived: DerivedRefreshSession;
norevealIndex: number;
oldCoinPub: CoinPublicKeyString;
oldCoinPriv: string;
newDenoms: {
denomPubHash: string;
count: number;
}[];
oldAgeCommitment?: AgeCommitment;
}): Promise {
const {
derived,
norevealIndex,
cryptoApi,
oldCoinPriv,
oldCoinPub,
newDenoms,
} = args;
const privs = Array.from(derived.transferPrivs);
privs.splice(norevealIndex, 1);
const planchets = derived.planchetsForGammas[norevealIndex];
if (!planchets) {
throw Error("refresh index error");
}
const newDenomsFlat: string[] = [];
const linkSigs: string[] = [];
for (let i = 0; i < newDenoms.length; i++) {
const dsel = newDenoms[i];
for (let j = 0; j < dsel.count; j++) {
const newCoinIndex = linkSigs.length;
const linkSig = await cryptoApi.signCoinLink({
coinEv: planchets[newCoinIndex].coinEv,
newDenomHash: dsel.denomPubHash,
oldCoinPriv: oldCoinPriv,
oldCoinPub: oldCoinPub,
transferPub: derived.transferPubs[norevealIndex],
});
linkSigs.push(linkSig.sig);
newDenomsFlat.push(dsel.denomPubHash);
}
}
const req: ExchangeRefreshRevealRequest = {
coin_evs: planchets.map((x) => x.coinEv),
new_denoms_h: newDenomsFlat,
transfer_privs: privs,
transfer_pub: derived.transferPubs[norevealIndex],
link_sigs: linkSigs,
old_age_commitment: args.oldAgeCommitment?.publicKeys,
};
return req;
}
async function refreshReveal(
wex: WalletExecutionContext,
refreshGroupId: string,
coinIndex: number,
): Promise {
logger.trace(
`doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`,
);
const ctx = new RefreshTransactionContext(wex, refreshGroupId);
const d = await wex.db.runReadOnlyTx(
{
storeNames: [
"refreshGroups",
"refreshSessions",
"coins",
"denominations",
],
},
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
if (!refreshGroup) {
return;
}
const refreshSession = await tx.refreshSessions.get([
refreshGroupId,
coinIndex,
]);
if (!refreshSession) {
return;
}
const norevealIndex = refreshSession.norevealIndex;
if (norevealIndex === undefined) {
throw Error("can't reveal without melting first");
}
const oldCoin = await tx.coins.get(refreshGroup.oldCoinPubs[coinIndex]);
checkDbInvariant(!!oldCoin, "melt coin doesn't exist");
const oldDenom = await getDenomInfo(
wex,
tx,
oldCoin.exchangeBaseUrl,
oldCoin.denomPubHash,
);
checkDbInvariant(
!!oldDenom,
"denomination for melted coin doesn't exist",
);
const newCoinDenoms: RefreshNewDenomInfo[] = [];
for (const dh of refreshSession.newDenoms) {
const newDenom = await getDenomInfo(
wex,
tx,
oldCoin.exchangeBaseUrl,
dh.denomPubHash,
);
checkDbInvariant(
!!newDenom,
"new denomination for refresh not in database",
);
newCoinDenoms.push({
count: dh.count,
denomPub: newDenom.denomPub,
denomPubHash: newDenom.denomPubHash,
feeWithdraw: newDenom.feeWithdraw,
value: Amounts.stringify(newDenom.value),
});
}
return {
oldCoin,
oldDenom,
newCoinDenoms,
refreshSession,
refreshGroup,
norevealIndex,
};
},
);
if (!d) {
return;
}
const {
oldCoin,
oldDenom,
newCoinDenoms,
refreshSession,
refreshGroup,
norevealIndex,
} = d;
let exchangeProtocolVersion: ExchangeProtocolVersion;
switch (d.oldDenom.denomPub.cipher) {
case DenomKeyType.Rsa: {
exchangeProtocolVersion = ExchangeProtocolVersion.V12;
break;
}
default:
throw Error("unsupported key type");
}
const derived = await wex.cryptoApi.deriveRefreshSession({
exchangeProtocolVersion,
kappa: 3,
meltCoinDenomPubHash: oldCoin.denomPubHash,
meltCoinPriv: oldCoin.coinPriv,
meltCoinPub: oldCoin.coinPub,
feeRefresh: Amounts.parseOrThrow(oldDenom.feeRefresh),
newCoinDenoms,
meltCoinMaxAge: oldCoin.maxAge,
meltCoinAgeCommitmentProof: oldCoin.ageCommitmentProof,
sessionSecretSeed: refreshSession.sessionSecretSeed,
});
const reqUrl = new URL(
`refreshes/${derived.hash}/reveal`,
oldCoin.exchangeBaseUrl,
);
const req = await assembleRefreshRevealRequest({
cryptoApi: wex.cryptoApi,
derived,
newDenoms: newCoinDenoms,
norevealIndex: norevealIndex,
oldCoinPriv: oldCoin.coinPriv,
oldCoinPub: oldCoin.coinPub,
oldAgeCommitment: oldCoin.ageCommitmentProof?.commitment,
});
const resp = await wex.ws.runSequentialized(
[EXCHANGE_COINS_LOCK],
async () => {
return await wex.http.fetch(reqUrl.href, {
body: req,
method: "POST",
timeout: getRefreshRequestTimeout(refreshGroup),
cancellationToken: wex.cancellationToken,
});
},
);
switch (resp.status) {
case HttpStatusCode.Ok:
break;
case HttpStatusCode.Conflict:
case HttpStatusCode.Gone: {
const errDetail = await readTalerErrorResponse(resp);
await handleRefreshRevealError(ctx, coinIndex, errDetail);
return;
}
default: {
const errDetail = await readTalerErrorResponse(resp);
throwUnexpectedRequestError(resp, errDetail);
}
}
const reveal = await readSuccessResponseJsonOrThrow(
resp,
codecForExchangeRevealResponse(),
);
const coins: CoinRecord[] = [];
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId,
});
for (let i = 0; i < refreshSession.newDenoms.length; i++) {
const ncd = newCoinDenoms[i];
for (let j = 0; j < refreshSession.newDenoms[i].count; j++) {
const newCoinIndex = coins.length;
const pc = derived.planchetsForGammas[norevealIndex][newCoinIndex];
if (ncd.denomPub.cipher !== DenomKeyType.Rsa) {
throw Error("cipher unsupported");
}
const evSig = reveal.ev_sigs[newCoinIndex].ev_sig;
const denomSig = await wex.cryptoApi.unblindDenominationSignature({
planchet: {
blindingKey: pc.blindingKey,
denomPub: ncd.denomPub,
},
evSig,
});
const coin: CoinRecord = {
blindingKey: pc.blindingKey,
coinPriv: pc.coinPriv,
coinPub: pc.coinPub,
denomPubHash: ncd.denomPubHash,
denomSig,
exchangeBaseUrl: oldCoin.exchangeBaseUrl,
status: CoinStatus.Fresh,
coinSource: {
type: CoinSourceType.Refresh,
refreshGroupId,
oldCoinPub: refreshGroup.oldCoinPubs[coinIndex],
},
sourceTransactionId: transactionId,
coinEvHash: pc.coinEvHash,
maxAge: pc.maxAge,
ageCommitmentProof: pc.ageCommitmentProof,
};
coins.push(coin);
}
}
await wex.db.runReadWriteTx(
{
storeNames: [
"coins",
"denominations",
"coinAvailability",
"refreshGroups",
"refreshSessions",
],
},
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
logger.warn("no refresh session found");
return;
}
if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
return;
}
const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
if (!rs) {
return;
}
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
for (const coin of coins) {
const existingCoin = await tx.coins.get(coin.coinPub);
if (existingCoin) {
continue;
}
await tx.coins.add(coin);
const denomInfo = await getDenomInfo(
wex,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
);
checkDbInvariant(
!!denomInfo,
`no denom with hash ${coin.denomPubHash}`,
);
const car = await getCoinAvailabilityForDenom(
wex,
tx,
denomInfo,
coin.maxAge,
);
checkDbInvariant(
car.pendingRefreshOutputCount != null &&
car.pendingRefreshOutputCount > 0,
`no pendingRefreshOutputCount for denom ${coin.denomPubHash} age ${coin.maxAge}`,
);
car.pendingRefreshOutputCount--;
car.freshCoinCount++;
await tx.coinAvailability.put(car);
}
await tx.refreshGroups.put(rg);
},
);
logger.trace("refresh finished (end of reveal)");
}
async function handleRefreshRevealError(
ctx: RefreshTransactionContext,
coinIndex: number,
errDetails: TalerErrorDetail,
): Promise {
await ctx.wex.db.runReadWriteTx(
{
storeNames: [
"refreshGroups",
"refreshSessions",
"coins",
"denominations",
"coinAvailability",
],
},
async (tx) => {
const rg = await tx.refreshGroups.get(ctx.refreshGroupId);
if (!rg) {
return;
}
if (rg.timestampFinished) {
return;
}
if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
return;
}
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
const refreshSession = await tx.refreshSessions.get([
ctx.refreshGroupId,
coinIndex,
]);
if (!refreshSession) {
throw Error("db invariant failed: missing refresh session in database");
}
refreshSession.lastError = errDetails;
await destroyRefreshSession(ctx.wex, tx, rg, refreshSession);
await tx.refreshGroups.put(rg);
await tx.refreshSessions.put(refreshSession);
},
);
}
export async function processRefreshGroup(
wex: WalletExecutionContext,
refreshGroupId: string,
): Promise {
logger.trace(`processing refresh group ${refreshGroupId}`);
const refreshGroup = await wex.db.runReadOnlyTx(
{ storeNames: ["refreshGroups"] },
async (tx) => tx.refreshGroups.get(refreshGroupId),
);
if (!refreshGroup) {
return TaskRunResult.finished();
}
if (refreshGroup.timestampFinished) {
return TaskRunResult.finished();
}
if (
wex.ws.config.testing.devModeActive &&
wex.ws.devExperimentState.blockRefreshes
) {
throw Error("refresh blocked");
}
logger.trace(
`processing refresh sessions for ${refreshGroup.oldCoinPubs.length} old coins`,
);
let errors: TalerErrorDetail[] = [];
let inShutdown = false;
// Process refresh sessions in sequence.
// In the future, we could parallelize request, in particular when multiple
// exchanges are involved.
// But we need to make sure that we write results to DB with high priority,
// otherwise we run into problems with very large refresh groups, where we'd first
// do many many network requests before even going to the DB.
for (let i = 0; i < refreshGroup.oldCoinPubs.length; i++) {
try {
await processRefreshSession(wex, refreshGroupId, i);
} catch (x) {
if (x instanceof CryptoApiStoppedError) {
inShutdown = true;
logger.info(
"crypto API stopped while processing refresh group, probably the wallet is currently shutting down.",
);
break;
}
const err = getErrorDetailFromException(x);
logger.warn(`exception in refresh session: ${j2s(err)}`);
errors.push(getErrorDetailFromException(err));
}
}
if (inShutdown) {
return TaskRunResult.finished();
}
const ctx = new RefreshTransactionContext(wex, refreshGroupId);
// We've processed all refresh session and can now update the
// status of the whole refresh group.
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["coins", "coinAvailability", "refreshGroups"] },
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
}
switch (rg.operationStatus) {
case RefreshOperationStatus.Pending:
break;
default:
return undefined;
}
const oldTxState = computeRefreshTransactionState(rg);
const allFinal = fnutil.all(
rg.statusPerCoin,
(x) =>
x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed,
);
const anyFailed = fnutil.any(
rg.statusPerCoin,
(x) => x === RefreshCoinStatus.Failed,
);
if (allFinal) {
if (anyFailed) {
rg.timestampFinished = timestampPreciseToDb(
TalerPreciseTimestamp.now(),
);
rg.operationStatus = RefreshOperationStatus.Failed;
} else {
rg.timestampFinished = timestampPreciseToDb(
TalerPreciseTimestamp.now(),
);
rg.operationStatus = RefreshOperationStatus.Finished;
}
await makeCoinsVisible(wex, tx, ctx.transactionId);
await tx.refreshGroups.put(rg);
const newTxState = computeRefreshTransactionState(rg);
return {
oldTxState,
newTxState,
};
}
return undefined;
},
);
if (transitionInfo) {
notifyTransition(wex, ctx.transactionId, transitionInfo);
return TaskRunResult.progress();
}
if (errors.length > 0) {
return {
type: TaskRunResultType.Error,
errorDetail: makeErrorDetail(
TalerErrorCode.WALLET_REFRESH_GROUP_INCOMPLETE,
{
numErrors: errors.length,
errors: errors.slice(0, 5),
},
),
};
}
return TaskRunResult.backoff();
}
async function processRefreshSession(
wex: WalletExecutionContext,
refreshGroupId: string,
coinIndex: number,
): Promise {
logger.trace(
`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`,
);
let { refreshGroup, refreshSession } = await wex.db.runReadOnlyTx(
{ storeNames: ["refreshGroups", "refreshSessions"] },
async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
return {
refreshGroup: rg,
refreshSession: rs,
};
},
);
if (!refreshGroup) {
return;
}
if (refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished) {
return;
}
if (!refreshSession) {
// No refresh session for that coin.
return;
}
if (refreshSession.norevealIndex === undefined) {
await refreshMelt(wex, refreshGroupId, coinIndex);
}
await refreshReveal(wex, refreshGroupId, coinIndex);
}
export interface RefreshOutputInfo {
outputPerCoin: AmountJson[];
perExchangeInfo: Record;
}
export async function calculateRefreshOutput(
wex: WalletExecutionContext,
tx: WalletDbReadOnlyTransaction<
["denominations", "coins", "refreshGroups", "coinAvailability"]
>,
currency: string,
oldCoinPubs: CoinRefreshRequest[],
): Promise {
const estimatedOutputPerCoin: AmountJson[] = [];
const denomsPerExchange: Record = {};
const infoPerExchange: Record = {};
for (const ocp of oldCoinPubs) {
const coin = await tx.coins.get(ocp.coinPub);
checkDbInvariant(!!coin, "coin must be in database");
const denom = await getDenomInfo(
wex,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
);
checkDbInvariant(
!!denom,
"denomination for existing coin must be in database",
);
const refreshAmount = ocp.amount;
const cost = await getTotalRefreshCost(
wex,
tx,
denom,
Amounts.parseOrThrow(refreshAmount),
);
const output = Amounts.sub(refreshAmount, cost).amount;
let exchInfo = infoPerExchange[coin.exchangeBaseUrl];
if (!exchInfo) {
infoPerExchange[coin.exchangeBaseUrl] = exchInfo = {
outputEffective: Amounts.stringify(Amounts.zeroOfAmount(cost)),
};
}
exchInfo.outputEffective = Amounts.stringify(
Amounts.add(exchInfo.outputEffective, output).amount,
);
estimatedOutputPerCoin.push(output);
}
return {
outputPerCoin: estimatedOutputPerCoin,
perExchangeInfo: infoPerExchange,
};
}
async function applyRefreshToOldCoins(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
[
"denominations",
"coins",
"coinHistory",
"refreshGroups",
"coinAvailability",
]
>,
oldCoinPubs: CoinRefreshRequest[],
refreshGroupId: string,
): Promise {
for (const ocp of oldCoinPubs) {
const coin = await tx.coins.get(ocp.coinPub);
checkDbInvariant(!!coin, "coin must be in database");
const denom = await getDenomInfo(
wex,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
);
checkDbInvariant(
!!denom,
"denomination for existing coin must be in database",
);
switch (coin.status) {
case CoinStatus.Dormant:
break;
case CoinStatus.Fresh: {
coin.status = CoinStatus.Dormant;
const coinAv = await tx.coinAvailability.get([
coin.exchangeBaseUrl,
coin.denomPubHash,
coin.maxAge,
]);
checkDbInvariant(
!!coinAv,
`no denom info for ${coin.denomPubHash} age ${coin.maxAge}`,
);
checkDbInvariant(
coinAv.freshCoinCount > 0,
`no fresh coins for ${coin.denomPubHash}`,
);
coinAv.freshCoinCount--;
if (coin.visible) {
if (!coinAv.visibleCoinCount) {
logger.error("coin availability inconsistent");
} else {
coinAv.visibleCoinCount--;
}
}
await tx.coinAvailability.put(coinAv);
break;
}
case CoinStatus.FreshSuspended: {
// For suspended coins, we don't have to adjust coin
// availability, as they are not counted as available.
coin.status = CoinStatus.Dormant;
break;
}
case CoinStatus.DenomLoss:
break;
default:
assertUnreachable(coin.status);
}
let histEntry: CoinHistoryRecord | undefined = await tx.coinHistory.get(
coin.coinPub,
);
if (!histEntry) {
histEntry = {
coinPub: coin.coinPub,
history: [],
};
}
histEntry.history.push({
type: "refresh",
transactionId: constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId,
}),
amount: Amounts.stringify(ocp.amount),
});
await tx.coinHistory.put(histEntry);
await tx.coins.put(coin);
}
}
export interface CreateRefreshGroupResult {
refreshGroupId: string;
notifications: WalletNotification[];
}
/**
* Create a refresh group for a list of coins.
*
* Refreshes the remaining amount on the coin, effectively capturing the remaining
* value in the refresh group.
*
* The caller must also ensure that the coins that should be refreshed exist
* in the current database transaction.
*/
export async function createRefreshGroup(
wex: WalletExecutionContext,
tx: WalletDbReadWriteTransaction<
[
"denominations",
"coins",
"coinHistory",
"refreshGroups",
"refreshSessions",
"coinAvailability",
]
>,
currency: string,
oldCoinPubs: CoinRefreshRequest[],
refreshReason: RefreshReason,
originatingTransactionId: string | undefined,
): Promise {
// FIXME: Check that involved exchanges are reasonably up-to-date.
// Otherwise, error out.
const refreshGroupId = encodeCrock(getRandomBytes(32));
const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs);
const estimatedOutputPerCoin = outInfo.outputPerCoin;
await applyRefreshToOldCoins(wex, tx, oldCoinPubs, refreshGroupId);
const refreshGroup: RefreshGroupRecord = {
operationStatus: RefreshOperationStatus.Pending,
currency,
timestampFinished: undefined,
statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending),
oldCoinPubs: oldCoinPubs.map((x) => x.coinPub),
originatingTransactionId,
reason: refreshReason,
refreshGroupId,
inputPerCoin: oldCoinPubs.map((x) => x.amount),
expectedOutputPerCoin: estimatedOutputPerCoin.map((x) =>
Amounts.stringify(x),
),
infoPerExchange: outInfo.perExchangeInfo,
timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()),
};
if (oldCoinPubs.length == 0) {
logger.warn("created refresh group with zero coins");
refreshGroup.timestampFinished = timestampPreciseToDb(
TalerPreciseTimestamp.now(),
);
refreshGroup.operationStatus = RefreshOperationStatus.Finished;
}
for (let i = 0; i < oldCoinPubs.length; i++) {
await initRefreshSession(wex, tx, refreshGroup, i);
}
await tx.refreshGroups.put(refreshGroup);
const newTxState = computeRefreshTransactionState(refreshGroup);
logger.trace(`created refresh group ${refreshGroupId}`);
const ctx = new RefreshTransactionContext(wex, refreshGroupId);
// Shepherd the task.
// If the current transaction fails to commit the refresh
// group to the DB, the shepherd will give up.
wex.taskScheduler.startShepherdTask(ctx.taskId);
return {
refreshGroupId,
notifications: [
{
type: NotificationType.TransactionStateTransition,
transactionId: ctx.transactionId,
oldTxState: {
major: TransactionMajorState.None,
},
newTxState,
},
],
};
}
export function computeRefreshTransactionState(
rg: RefreshGroupRecord,
): TransactionState {
switch (rg.operationStatus) {
case RefreshOperationStatus.Finished:
return {
major: TransactionMajorState.Done,
};
case RefreshOperationStatus.Failed:
return {
major: TransactionMajorState.Failed,
};
case RefreshOperationStatus.Pending:
return {
major: TransactionMajorState.Pending,
};
case RefreshOperationStatus.Suspended:
return {
major: TransactionMajorState.Suspended,
};
}
}
export function computeRefreshTransactionActions(
rg: RefreshGroupRecord,
): TransactionAction[] {
switch (rg.operationStatus) {
case RefreshOperationStatus.Finished:
return [TransactionAction.Delete];
case RefreshOperationStatus.Failed:
return [TransactionAction.Delete];
case RefreshOperationStatus.Pending:
return [
TransactionAction.Retry,
TransactionAction.Suspend,
TransactionAction.Fail,
];
case RefreshOperationStatus.Suspended:
return [TransactionAction.Resume, TransactionAction.Fail];
}
}
export function getRefreshesForTransaction(
wex: WalletExecutionContext,
transactionId: string,
): Promise {
return wex.db.runReadOnlyTx({ storeNames: ["refreshGroups"] }, async (tx) => {
const groups =
await tx.refreshGroups.indexes.byOriginatingTransactionId.getAll(
transactionId,
);
return groups.map((x) =>
constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId: x.refreshGroupId,
}),
);
});
}
export interface ForceRefreshResult {
refreshGroupId: string;
}
export async function forceRefresh(
wex: WalletExecutionContext,
req: ForceRefreshRequest,
): Promise {
if (req.refreshCoinSpecs.length == 0) {
throw Error("refusing to create empty refresh group");
}
const res = await wex.db.runReadWriteTx(
{
storeNames: [
"refreshGroups",
"coinAvailability",
"refreshSessions",
"denominations",
"coins",
"coinHistory",
],
},
async (tx) => {
const coinPubs: CoinRefreshRequest[] = [];
for (const c of req.refreshCoinSpecs) {
const coin = await tx.coins.get(c.coinPub);
if (!coin) {
throw Error(`coin (pubkey ${c}) not found`);
}
const denom = await getDenomInfo(
wex,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
);
checkDbInvariant(!!denom, `no denom hash: ${coin.denomPubHash}`);
coinPubs.push({
coinPub: c.coinPub,
amount: c.amount ?? denom.value,
});
}
return await createRefreshGroup(
wex,
tx,
Amounts.currencyOf(coinPubs[0].amount),
coinPubs,
RefreshReason.Manual,
undefined,
);
},
);
for (const notif of res.notifications) {
wex.ws.notify(notif);
}
return {
refreshGroupId: res.refreshGroupId,
};
}
/**
* Wait until a refresh operation is final.
*/
export async function waitRefreshFinal(
wex: WalletExecutionContext,
refreshGroupId: string,
): Promise {
const ctx = new RefreshTransactionContext(wex, refreshGroupId);
wex.taskScheduler.startShepherdTask(ctx.taskId);
await genericWaitForState(wex, {
async checkState(): Promise {
// Check if refresh is final
const res = await ctx.wex.db.runReadOnlyTx(
{ storeNames: ["refreshGroups"] },
async (tx) => {
return {
rg: await tx.refreshGroups.get(ctx.refreshGroupId),
};
},
);
const { rg } = res;
if (!rg) {
// Must've been deleted, we consider that final.
return true;
}
switch (rg.operationStatus) {
case RefreshOperationStatus.Failed:
case RefreshOperationStatus.Finished:
// Transaction is final
return true;
case RefreshOperationStatus.Pending:
case RefreshOperationStatus.Suspended:
break;
}
return false;
},
filterNotification(notif): boolean {
return (
notif.type === NotificationType.TransactionStateTransition &&
notif.transactionId === ctx.transactionId
);
},
});
}