/*
This file is part of GNU Taler
(C) 2022-2023 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see
*/
import {
Amounts,
CheckPeerPushDebitRequest,
CheckPeerPushDebitResponse,
CoinRefreshRequest,
ContractTermsUtil,
ExchangePurseDeposits,
HttpStatusCode,
InitiatePeerPushDebitRequest,
InitiatePeerPushDebitResponse,
Logger,
NotificationType,
RefreshReason,
SelectedProspectiveCoin,
TalerError,
TalerErrorCode,
TalerPreciseTimestamp,
TalerProtocolTimestamp,
TalerProtocolViolationError,
TransactionAction,
TransactionIdStr,
TransactionMajorState,
TransactionMinorState,
TransactionState,
TransactionType,
assertUnreachable,
checkDbInvariant,
checkLogicInvariant,
encodeCrock,
getRandomBytes,
j2s,
} from "@gnu-taler/taler-util";
import {
HttpResponse,
readSuccessResponseJsonOrThrow,
readTalerErrorResponse,
} from "@gnu-taler/taler-util/http";
import {
PreviousPayCoins,
selectPeerCoins,
selectPeerCoinsInTx,
} from "./coinSelection.js";
import {
PendingTaskType,
TaskIdStr,
TaskRunResult,
TaskRunResultType,
TransactionContext,
constructTaskIdentifier,
spendCoins,
} from "./common.js";
import { EncryptContractRequest } from "./crypto/cryptoTypes.js";
import {
PeerPushDebitRecord,
PeerPushDebitStatus,
RefreshOperationStatus,
timestampPreciseToDb,
timestampProtocolFromDb,
timestampProtocolToDb,
} from "./db.js";
import {
codecForExchangePurseStatus,
getTotalPeerPaymentCost,
getTotalPeerPaymentCostInTx,
queryCoinInfosForSelection,
} from "./pay-peer-common.js";
import { createRefreshGroup, waitRefreshFinal } from "./refresh.js";
import {
constructTransactionIdentifier,
notifyTransition,
} from "./transactions.js";
import { WalletExecutionContext } from "./wallet.js";
const logger = new Logger("pay-peer-push-debit.ts");
export class PeerPushDebitTransactionContext implements TransactionContext {
readonly transactionId: TransactionIdStr;
readonly taskId: TaskIdStr;
constructor(
public wex: WalletExecutionContext,
public pursePub: string,
) {
this.transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub,
});
this.taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub,
});
}
async deleteTransaction(): Promise {
const { wex, pursePub, transactionId } = this;
await wex.db.runReadWriteTx(
{ storeNames: ["peerPushDebit", "tombstones"] },
async (tx) => {
const debit = await tx.peerPushDebit.get(pursePub);
if (debit) {
await tx.peerPushDebit.delete(pursePub);
await tx.tombstones.put({ id: transactionId });
}
},
);
}
async suspendTransaction(): Promise {
const { wex, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["peerPushDebit"] },
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
if (!pushDebitRec) {
logger.warn(`peer push debit ${pursePub} not found`);
return;
}
let newStatus: PeerPushDebitStatus | undefined = undefined;
switch (pushDebitRec.status) {
case PeerPushDebitStatus.PendingCreatePurse:
newStatus = PeerPushDebitStatus.SuspendedCreatePurse;
break;
case PeerPushDebitStatus.AbortingRefreshDeleted:
newStatus = PeerPushDebitStatus.SuspendedAbortingRefreshDeleted;
break;
case PeerPushDebitStatus.AbortingRefreshExpired:
newStatus = PeerPushDebitStatus.SuspendedAbortingRefreshExpired;
break;
case PeerPushDebitStatus.AbortingDeletePurse:
newStatus = PeerPushDebitStatus.SuspendedAbortingDeletePurse;
break;
case PeerPushDebitStatus.PendingReady:
newStatus = PeerPushDebitStatus.SuspendedReady;
break;
case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted:
case PeerPushDebitStatus.SuspendedAbortingRefreshExpired:
case PeerPushDebitStatus.SuspendedReady:
case PeerPushDebitStatus.SuspendedCreatePurse:
case PeerPushDebitStatus.Done:
case PeerPushDebitStatus.Aborted:
case PeerPushDebitStatus.Failed:
case PeerPushDebitStatus.Expired:
// Do nothing
break;
default:
assertUnreachable(pushDebitRec.status);
}
if (newStatus != null) {
const oldTxState = computePeerPushDebitTransactionState(pushDebitRec);
pushDebitRec.status = newStatus;
const newTxState = computePeerPushDebitTransactionState(pushDebitRec);
await tx.peerPushDebit.put(pushDebitRec);
return {
oldTxState,
newTxState,
};
}
return undefined;
},
);
wex.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(wex, transactionId, transitionInfo);
}
async abortTransaction(): Promise {
const { wex, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["peerPushDebit"] },
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
if (!pushDebitRec) {
logger.warn(`peer push debit ${pursePub} not found`);
return;
}
let newStatus: PeerPushDebitStatus | undefined = undefined;
switch (pushDebitRec.status) {
case PeerPushDebitStatus.PendingReady:
case PeerPushDebitStatus.SuspendedReady:
newStatus = PeerPushDebitStatus.AbortingDeletePurse;
break;
case PeerPushDebitStatus.SuspendedCreatePurse:
case PeerPushDebitStatus.PendingCreatePurse:
// Network request might already be in-flight!
newStatus = PeerPushDebitStatus.AbortingDeletePurse;
break;
case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted:
case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
case PeerPushDebitStatus.SuspendedAbortingRefreshExpired:
case PeerPushDebitStatus.AbortingRefreshDeleted:
case PeerPushDebitStatus.AbortingRefreshExpired:
case PeerPushDebitStatus.Done:
case PeerPushDebitStatus.AbortingDeletePurse:
case PeerPushDebitStatus.Aborted:
case PeerPushDebitStatus.Expired:
case PeerPushDebitStatus.Failed:
// Do nothing
break;
default:
assertUnreachable(pushDebitRec.status);
}
if (newStatus != null) {
const oldTxState = computePeerPushDebitTransactionState(pushDebitRec);
pushDebitRec.status = newStatus;
const newTxState = computePeerPushDebitTransactionState(pushDebitRec);
await tx.peerPushDebit.put(pushDebitRec);
return {
oldTxState,
newTxState,
};
}
return undefined;
},
);
wex.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(wex, transactionId, transitionInfo);
wex.taskScheduler.startShepherdTask(retryTag);
}
async resumeTransaction(): Promise {
const { wex, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["peerPushDebit"] },
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
if (!pushDebitRec) {
logger.warn(`peer push debit ${pursePub} not found`);
return;
}
let newStatus: PeerPushDebitStatus | undefined = undefined;
switch (pushDebitRec.status) {
case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
newStatus = PeerPushDebitStatus.AbortingDeletePurse;
break;
case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted:
newStatus = PeerPushDebitStatus.AbortingRefreshDeleted;
break;
case PeerPushDebitStatus.SuspendedAbortingRefreshExpired:
newStatus = PeerPushDebitStatus.AbortingRefreshExpired;
break;
case PeerPushDebitStatus.SuspendedReady:
newStatus = PeerPushDebitStatus.PendingReady;
break;
case PeerPushDebitStatus.SuspendedCreatePurse:
newStatus = PeerPushDebitStatus.PendingCreatePurse;
break;
case PeerPushDebitStatus.PendingCreatePurse:
case PeerPushDebitStatus.AbortingRefreshDeleted:
case PeerPushDebitStatus.AbortingRefreshExpired:
case PeerPushDebitStatus.AbortingDeletePurse:
case PeerPushDebitStatus.PendingReady:
case PeerPushDebitStatus.Done:
case PeerPushDebitStatus.Aborted:
case PeerPushDebitStatus.Failed:
case PeerPushDebitStatus.Expired:
// Do nothing
break;
default:
assertUnreachable(pushDebitRec.status);
}
if (newStatus != null) {
const oldTxState = computePeerPushDebitTransactionState(pushDebitRec);
pushDebitRec.status = newStatus;
const newTxState = computePeerPushDebitTransactionState(pushDebitRec);
await tx.peerPushDebit.put(pushDebitRec);
return {
oldTxState,
newTxState,
};
}
return undefined;
},
);
wex.taskScheduler.startShepherdTask(retryTag);
notifyTransition(wex, transactionId, transitionInfo);
}
async failTransaction(): Promise {
const { wex, pursePub, transactionId, taskId: retryTag } = this;
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["peerPushDebit"] },
async (tx) => {
const pushDebitRec = await tx.peerPushDebit.get(pursePub);
if (!pushDebitRec) {
logger.warn(`peer push debit ${pursePub} not found`);
return;
}
let newStatus: PeerPushDebitStatus | undefined = undefined;
switch (pushDebitRec.status) {
case PeerPushDebitStatus.AbortingRefreshDeleted:
case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted:
// FIXME: What to do about the refresh group?
newStatus = PeerPushDebitStatus.Failed;
break;
case PeerPushDebitStatus.AbortingDeletePurse:
case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
case PeerPushDebitStatus.AbortingRefreshExpired:
case PeerPushDebitStatus.SuspendedAbortingRefreshExpired:
case PeerPushDebitStatus.PendingReady:
case PeerPushDebitStatus.SuspendedReady:
case PeerPushDebitStatus.SuspendedCreatePurse:
case PeerPushDebitStatus.PendingCreatePurse:
newStatus = PeerPushDebitStatus.Failed;
break;
case PeerPushDebitStatus.Done:
case PeerPushDebitStatus.Aborted:
case PeerPushDebitStatus.Failed:
case PeerPushDebitStatus.Expired:
// Do nothing
break;
default:
assertUnreachable(pushDebitRec.status);
}
if (newStatus != null) {
const oldTxState = computePeerPushDebitTransactionState(pushDebitRec);
pushDebitRec.status = newStatus;
const newTxState = computePeerPushDebitTransactionState(pushDebitRec);
await tx.peerPushDebit.put(pushDebitRec);
return {
oldTxState,
newTxState,
};
}
return undefined;
},
);
wex.taskScheduler.stopShepherdTask(retryTag);
notifyTransition(wex, transactionId, transitionInfo);
wex.taskScheduler.startShepherdTask(retryTag);
}
}
export async function checkPeerPushDebit(
wex: WalletExecutionContext,
req: CheckPeerPushDebitRequest,
): Promise {
const instructedAmount = Amounts.parseOrThrow(req.amount);
logger.trace(
`checking peer push debit for ${Amounts.stringify(instructedAmount)}`,
);
const coinSelRes = await selectPeerCoins(wex, {
instructedAmount,
});
let coins: SelectedProspectiveCoin[] | undefined = undefined;
switch (coinSelRes.type) {
case "failure":
throw TalerError.fromDetail(
TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
{
insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails,
},
);
case "prospective":
coins = coinSelRes.result.prospectiveCoins;
break;
case "success":
coins = coinSelRes.result.coins;
break;
default:
assertUnreachable(coinSelRes);
}
logger.trace(`selected peer coins (len=${coins.length})`);
const totalAmount = await getTotalPeerPaymentCost(wex, coins);
logger.trace("computed total peer payment cost");
return {
exchangeBaseUrl: coinSelRes.result.exchangeBaseUrl,
amountEffective: Amounts.stringify(totalAmount),
amountRaw: req.amount,
maxExpirationDate: coinSelRes.result.maxExpirationDate,
};
}
async function handlePurseCreationConflict(
wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
resp: HttpResponse,
): Promise {
const pursePub = peerPushInitiation.pursePub;
const errResp = await readTalerErrorResponse(resp);
const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
if (errResp.code !== TalerErrorCode.EXCHANGE_GENERIC_INSUFFICIENT_FUNDS) {
await ctx.failTransaction();
return TaskRunResult.finished();
}
// FIXME: Properly parse!
const brokenCoinPub = (errResp as any).coin_pub;
logger.trace(`excluded broken coin pub=${brokenCoinPub}`);
if (!brokenCoinPub) {
// FIXME: Details!
throw new TalerProtocolViolationError();
}
const instructedAmount = Amounts.parseOrThrow(peerPushInitiation.amount);
const sel = peerPushInitiation.coinSel;
checkDbInvariant(
!!sel,
`no coin selected for peer push initiation ${peerPushInitiation.pursePub}`,
);
const repair: PreviousPayCoins = [];
for (let i = 0; i < sel.coinPubs.length; i++) {
if (sel.coinPubs[i] != brokenCoinPub) {
repair.push({
coinPub: sel.coinPubs[i],
contribution: Amounts.parseOrThrow(sel.contributions[i]),
});
}
}
const coinSelRes = await selectPeerCoins(wex, {
instructedAmount,
repair,
});
switch (coinSelRes.type) {
case "failure":
case "prospective":
// FIXME: Details!
throw Error(
"insufficient balance to re-select coins to repair double spending",
);
case "success":
break;
default:
assertUnreachable(coinSelRes);
}
await wex.db.runReadWriteTx({ storeNames: ["peerPushDebit"] }, async (tx) => {
const myPpi = await tx.peerPushDebit.get(peerPushInitiation.pursePub);
if (!myPpi) {
return;
}
switch (myPpi.status) {
case PeerPushDebitStatus.PendingCreatePurse:
case PeerPushDebitStatus.SuspendedCreatePurse: {
const sel = coinSelRes.result;
myPpi.coinSel = {
coinPubs: sel.coins.map((x) => x.coinPub),
contributions: sel.coins.map((x) => x.contribution),
};
break;
}
default:
return;
}
await tx.peerPushDebit.put(myPpi);
});
return TaskRunResult.progress();
}
async function processPeerPushDebitCreateReserve(
wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
): Promise {
const pursePub = peerPushInitiation.pursePub;
const purseExpiration = peerPushInitiation.purseExpiration;
const hContractTerms = peerPushInitiation.contractTermsHash;
const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
const transactionId = ctx.transactionId;
logger.trace(`processing ${transactionId} pending(create-reserve)`);
const contractTermsRecord = await wex.db.runReadOnlyTx(
{ storeNames: ["contractTerms"] },
async (tx) => {
return tx.contractTerms.get(hContractTerms);
},
);
if (!contractTermsRecord) {
throw Error(
`db invariant failed, contract terms for ${transactionId} missing`,
);
}
if (!peerPushInitiation.coinSel) {
const coinSelRes = await selectPeerCoins(wex, {
instructedAmount: Amounts.parseOrThrow(peerPushInitiation.amount),
});
switch (coinSelRes.type) {
case "failure":
throw TalerError.fromDetail(
TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
{
insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails,
},
);
case "prospective":
throw Error("insufficient funds (blocked on refresh)");
case "success":
break;
default:
assertUnreachable(coinSelRes);
}
const transitionDone = await wex.db.runReadWriteTx(
{
storeNames: [
"coinAvailability",
"coinHistory",
"coins",
"contractTerms",
"denominations",
"exchanges",
"peerPushDebit",
"refreshGroups",
"refreshSessions",
],
},
async (tx) => {
const ppi = await tx.peerPushDebit.get(pursePub);
if (!ppi) {
return false;
}
if (ppi.coinSel) {
return false;
}
ppi.coinSel = {
coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
contributions: coinSelRes.result.coins.map((x) => x.contribution),
};
// FIXME: Instead of directly doing a spendCoin here,
// we might want to mark the coins as used and spend them
// after we've been able to create the purse.
await spendCoins(wex, tx, {
transactionId: ctx.transactionId,
coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
contributions: coinSelRes.result.coins.map((x) =>
Amounts.parseOrThrow(x.contribution),
),
refreshReason: RefreshReason.PayPeerPush,
});
await tx.peerPushDebit.put(ppi);
return true;
},
);
if (transitionDone) {
return TaskRunResult.progress();
}
return TaskRunResult.backoff();
}
const purseSigResp = await wex.cryptoApi.signPurseCreation({
hContractTerms,
mergePub: peerPushInitiation.mergePub,
minAge: 0,
purseAmount: peerPushInitiation.amount,
purseExpiration: timestampProtocolFromDb(purseExpiration),
pursePriv: peerPushInitiation.pursePriv,
});
const coins = await queryCoinInfosForSelection(
wex,
peerPushInitiation.coinSel,
);
const encryptContractRequest: EncryptContractRequest = {
contractTerms: contractTermsRecord.contractTermsRaw,
mergePriv: peerPushInitiation.mergePriv,
pursePriv: peerPushInitiation.pursePriv,
pursePub: peerPushInitiation.pursePub,
contractPriv: peerPushInitiation.contractPriv,
contractPub: peerPushInitiation.contractPub,
nonce: peerPushInitiation.contractEncNonce,
};
const econtractResp = await wex.cryptoApi.encryptContractForMerge(
encryptContractRequest,
);
const maxBatchSize = 100;
for (let i = 0; i < coins.length; i += maxBatchSize) {
const batchSize = Math.min(maxBatchSize, coins.length - i);
const batchCoins = coins.slice(i, i + batchSize);
const depositSigsResp = await wex.cryptoApi.signPurseDeposits({
exchangeBaseUrl: peerPushInitiation.exchangeBaseUrl,
pursePub: peerPushInitiation.pursePub,
coins: batchCoins,
});
if (i == 0) {
// First batch creates the purse!
logger.trace(`encrypt contract request: ${j2s(encryptContractRequest)}`);
const createPurseUrl = new URL(
`purses/${peerPushInitiation.pursePub}/create`,
peerPushInitiation.exchangeBaseUrl,
);
const reqBody = {
amount: peerPushInitiation.amount,
merge_pub: peerPushInitiation.mergePub,
purse_sig: purseSigResp.sig,
h_contract_terms: hContractTerms,
purse_expiration: timestampProtocolFromDb(purseExpiration),
deposits: depositSigsResp.deposits,
min_age: 0,
econtract: econtractResp.econtract,
};
if (logger.shouldLogTrace()) {
logger.trace(`request body: ${j2s(reqBody)}`);
}
const httpResp = await wex.http.fetch(createPurseUrl.href, {
method: "POST",
body: reqBody,
cancellationToken: wex.cancellationToken,
});
switch (httpResp.status) {
case HttpStatusCode.Ok:
// Possibly on to the next batch.
continue;
case HttpStatusCode.Forbidden: {
// FIXME: Store this error!
await ctx.failTransaction();
return TaskRunResult.finished();
}
case HttpStatusCode.Conflict: {
// Handle double-spending
return handlePurseCreationConflict(wex, peerPushInitiation, httpResp);
}
default: {
const errResp = await readTalerErrorResponse(httpResp);
return {
type: TaskRunResultType.Error,
errorDetail: errResp,
};
}
}
} else {
const purseDepositUrl = new URL(
`purses/${pursePub}/deposit`,
peerPushInitiation.exchangeBaseUrl,
);
const depositPayload: ExchangePurseDeposits = {
deposits: depositSigsResp.deposits,
};
const httpResp = await wex.http.fetch(purseDepositUrl.href, {
method: "POST",
body: depositPayload,
cancellationToken: wex.cancellationToken,
});
switch (httpResp.status) {
case HttpStatusCode.Ok:
// Possibly on to the next batch.
continue;
case HttpStatusCode.Forbidden: {
// FIXME: Store this error!
await ctx.failTransaction();
return TaskRunResult.finished();
}
case HttpStatusCode.Conflict: {
// Handle double-spending
return handlePurseCreationConflict(wex, peerPushInitiation, httpResp);
}
default: {
const errResp = await readTalerErrorResponse(httpResp);
return {
type: TaskRunResultType.Error,
errorDetail: errResp,
};
}
}
}
}
// All batches done!
await transitionPeerPushDebitTransaction(wex, pursePub, {
stFrom: PeerPushDebitStatus.PendingCreatePurse,
stTo: PeerPushDebitStatus.PendingReady,
});
return TaskRunResult.backoff();
}
async function processPeerPushDebitAbortingDeletePurse(
wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
): Promise {
const { pursePub, pursePriv } = peerPushInitiation;
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub,
});
const sigResp = await wex.cryptoApi.signDeletePurse({
pursePriv,
});
const purseUrl = new URL(
`purses/${pursePub}`,
peerPushInitiation.exchangeBaseUrl,
);
const resp = await wex.http.fetch(purseUrl.href, {
method: "DELETE",
headers: {
"taler-purse-signature": sigResp.sig,
},
cancellationToken: wex.cancellationToken,
});
logger.info(`deleted purse with response status ${resp.status}`);
const transitionInfo = await wex.db.runReadWriteTx(
{
storeNames: [
"coinAvailability",
"coinHistory",
"coins",
"denominations",
"peerPushDebit",
"refreshGroups",
"refreshSessions",
],
},
async (tx) => {
const ppiRec = await tx.peerPushDebit.get(pursePub);
if (!ppiRec) {
return undefined;
}
if (ppiRec.status !== PeerPushDebitStatus.AbortingDeletePurse) {
return undefined;
}
const currency = Amounts.currencyOf(ppiRec.amount);
const oldTxState = computePeerPushDebitTransactionState(ppiRec);
const coinPubs: CoinRefreshRequest[] = [];
if (!ppiRec.coinSel) {
return undefined;
}
for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
coinPubs.push({
amount: ppiRec.coinSel.contributions[i],
coinPub: ppiRec.coinSel.coinPubs[i],
});
}
const refresh = await createRefreshGroup(
wex,
tx,
currency,
coinPubs,
RefreshReason.AbortPeerPushDebit,
transactionId,
);
ppiRec.status = PeerPushDebitStatus.AbortingRefreshDeleted;
ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
await tx.peerPushDebit.put(ppiRec);
const newTxState = computePeerPushDebitTransactionState(ppiRec);
return {
oldTxState,
newTxState,
};
},
);
notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
}
interface SimpleTransition {
stFrom: PeerPushDebitStatus;
stTo: PeerPushDebitStatus;
}
// FIXME: This should be a transition on the peer push debit transaction context!
async function transitionPeerPushDebitTransaction(
wex: WalletExecutionContext,
pursePub: string,
transitionSpec: SimpleTransition,
): Promise {
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub,
});
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["peerPushDebit"] },
async (tx) => {
const ppiRec = await tx.peerPushDebit.get(pursePub);
if (!ppiRec) {
return undefined;
}
if (ppiRec.status !== transitionSpec.stFrom) {
return undefined;
}
const oldTxState = computePeerPushDebitTransactionState(ppiRec);
ppiRec.status = transitionSpec.stTo;
await tx.peerPushDebit.put(ppiRec);
const newTxState = computePeerPushDebitTransactionState(ppiRec);
return {
oldTxState,
newTxState,
};
},
);
notifyTransition(wex, transactionId, transitionInfo);
}
async function processPeerPushDebitAbortingRefreshDeleted(
wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
): Promise {
const pursePub = peerPushInitiation.pursePub;
const abortRefreshGroupId = peerPushInitiation.abortRefreshGroupId;
checkLogicInvariant(!!abortRefreshGroupId);
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub: peerPushInitiation.pursePub,
});
if (peerPushInitiation.abortRefreshGroupId) {
await waitRefreshFinal(wex, peerPushInitiation.abortRefreshGroupId);
}
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["refreshGroups", "peerPushDebit"] },
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: PeerPushDebitStatus | undefined;
if (!refreshGroup) {
// Maybe it got manually deleted? Means that we should
// just go into failed.
logger.warn("no aborting refresh group found for deposit group");
newOpState = PeerPushDebitStatus.Failed;
} else {
if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) {
newOpState = PeerPushDebitStatus.Aborted;
} else if (
refreshGroup.operationStatus === RefreshOperationStatus.Failed
) {
newOpState = PeerPushDebitStatus.Failed;
}
}
if (newOpState) {
const newDg = await tx.peerPushDebit.get(pursePub);
if (!newDg) {
return;
}
const oldTxState = computePeerPushDebitTransactionState(newDg);
newDg.status = newOpState;
const newTxState = computePeerPushDebitTransactionState(newDg);
await tx.peerPushDebit.put(newDg);
return { oldTxState, newTxState };
}
return undefined;
},
);
notifyTransition(wex, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
return TaskRunResult.backoff();
}
async function processPeerPushDebitAbortingRefreshExpired(
wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
): Promise {
const pursePub = peerPushInitiation.pursePub;
const abortRefreshGroupId = peerPushInitiation.abortRefreshGroupId;
checkLogicInvariant(!!abortRefreshGroupId);
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub: peerPushInitiation.pursePub,
});
const transitionInfo = await wex.db.runReadWriteTx(
{ storeNames: ["peerPushDebit", "refreshGroups"] },
async (tx) => {
const refreshGroup = await tx.refreshGroups.get(abortRefreshGroupId);
let newOpState: PeerPushDebitStatus | undefined;
if (!refreshGroup) {
// Maybe it got manually deleted? Means that we should
// just go into failed.
logger.warn("no aborting refresh group found for deposit group");
newOpState = PeerPushDebitStatus.Failed;
} else {
if (refreshGroup.operationStatus === RefreshOperationStatus.Finished) {
newOpState = PeerPushDebitStatus.Expired;
} else if (
refreshGroup.operationStatus === RefreshOperationStatus.Failed
) {
newOpState = PeerPushDebitStatus.Failed;
}
}
if (newOpState) {
const newDg = await tx.peerPushDebit.get(pursePub);
if (!newDg) {
return;
}
const oldTxState = computePeerPushDebitTransactionState(newDg);
newDg.status = newOpState;
const newTxState = computePeerPushDebitTransactionState(newDg);
await tx.peerPushDebit.put(newDg);
return { oldTxState, newTxState };
}
return undefined;
},
);
notifyTransition(wex, transactionId, transitionInfo);
// FIXME: Shouldn't this be finished in some cases?!
return TaskRunResult.backoff();
}
/**
* Process the "pending(ready)" state of a peer-push-debit transaction.
*/
async function processPeerPushDebitReady(
wex: WalletExecutionContext,
peerPushInitiation: PeerPushDebitRecord,
): Promise {
logger.trace("processing peer-push-debit pending(ready)");
const pursePub = peerPushInitiation.pursePub;
const transactionId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushDebit,
pursePub,
});
const mergeUrl = new URL(
`purses/${pursePub}/merge`,
peerPushInitiation.exchangeBaseUrl,
);
const resp = await wex.ws.runLongpollQueueing(
wex,
mergeUrl.hostname,
async (timeoutMs) => {
mergeUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
logger.info(`long-polling on purse status at ${mergeUrl.href}`);
return await wex.http.fetch(mergeUrl.href, {
// timeout: getReserveRequestTimeout(withdrawalGroup),
cancellationToken: wex.cancellationToken,
});
},
);
if (resp.status === HttpStatusCode.Ok) {
const purseStatus = await readSuccessResponseJsonOrThrow(
resp,
codecForExchangePurseStatus(),
);
const mergeTimestamp = purseStatus.merge_timestamp;
logger.info(`got purse status ${j2s(purseStatus)}`);
if (!mergeTimestamp || TalerProtocolTimestamp.isNever(mergeTimestamp)) {
return TaskRunResult.backoff();
} else {
await transitionPeerPushDebitTransaction(
wex,
peerPushInitiation.pursePub,
{
stFrom: PeerPushDebitStatus.PendingReady,
stTo: PeerPushDebitStatus.Done,
},
);
return TaskRunResult.progress();
}
} else if (resp.status === HttpStatusCode.Gone) {
logger.info(`purse ${pursePub} is gone, aborting peer-push-debit`);
const transitionInfo = await wex.db.runReadWriteTx(
{
storeNames: [
"coinAvailability",
"coinHistory",
"coins",
"denominations",
"peerPushDebit",
"refreshGroups",
"refreshSessions",
],
},
async (tx) => {
const ppiRec = await tx.peerPushDebit.get(pursePub);
if (!ppiRec) {
return undefined;
}
if (ppiRec.status !== PeerPushDebitStatus.PendingReady) {
return undefined;
}
const currency = Amounts.currencyOf(ppiRec.amount);
const oldTxState = computePeerPushDebitTransactionState(ppiRec);
const coinPubs: CoinRefreshRequest[] = [];
if (ppiRec.coinSel) {
for (let i = 0; i < ppiRec.coinSel.coinPubs.length; i++) {
coinPubs.push({
amount: ppiRec.coinSel.contributions[i],
coinPub: ppiRec.coinSel.coinPubs[i],
});
}
const refresh = await createRefreshGroup(
wex,
tx,
currency,
coinPubs,
RefreshReason.AbortPeerPushDebit,
transactionId,
);
ppiRec.abortRefreshGroupId = refresh.refreshGroupId;
}
ppiRec.status = PeerPushDebitStatus.AbortingRefreshExpired;
await tx.peerPushDebit.put(ppiRec);
const newTxState = computePeerPushDebitTransactionState(ppiRec);
return {
oldTxState,
newTxState,
};
},
);
notifyTransition(wex, transactionId, transitionInfo);
return TaskRunResult.backoff();
} else {
logger.warn(`unexpected HTTP status for purse: ${resp.status}`);
return TaskRunResult.longpollReturnedPending();
}
}
export async function processPeerPushDebit(
wex: WalletExecutionContext,
pursePub: string,
): Promise {
if (!wex.ws.networkAvailable) {
return TaskRunResult.networkRequired();
}
const peerPushInitiation = await wex.db.runReadOnlyTx(
{ storeNames: ["peerPushDebit"] },
async (tx) => {
return tx.peerPushDebit.get(pursePub);
},
);
if (!peerPushInitiation) {
throw Error("peer push payment not found");
}
switch (peerPushInitiation.status) {
case PeerPushDebitStatus.PendingCreatePurse:
return processPeerPushDebitCreateReserve(wex, peerPushInitiation);
case PeerPushDebitStatus.PendingReady:
return processPeerPushDebitReady(wex, peerPushInitiation);
case PeerPushDebitStatus.AbortingDeletePurse:
return processPeerPushDebitAbortingDeletePurse(wex, peerPushInitiation);
case PeerPushDebitStatus.AbortingRefreshDeleted:
return processPeerPushDebitAbortingRefreshDeleted(
wex,
peerPushInitiation,
);
case PeerPushDebitStatus.AbortingRefreshExpired:
return processPeerPushDebitAbortingRefreshExpired(
wex,
peerPushInitiation,
);
default: {
const txState = computePeerPushDebitTransactionState(peerPushInitiation);
logger.warn(
`not processing peer-push-debit transaction in state ${j2s(txState)}`,
);
}
}
return TaskRunResult.finished();
}
/**
* Initiate sending a peer-to-peer push payment.
*/
export async function initiatePeerPushDebit(
wex: WalletExecutionContext,
req: InitiatePeerPushDebitRequest,
): Promise {
const instructedAmount = Amounts.parseOrThrow(
req.partialContractTerms.amount,
);
const purseExpiration = req.partialContractTerms.purse_expiration;
const contractTerms = req.partialContractTerms;
const pursePair = await wex.cryptoApi.createEddsaKeypair({});
const mergePair = await wex.cryptoApi.createEddsaKeypair({});
const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms);
const contractKeyPair = await wex.cryptoApi.createEddsaKeypair({});
const pursePub = pursePair.pub;
const ctx = new PeerPushDebitTransactionContext(wex, pursePub);
const transactionId = ctx.transactionId;
const contractEncNonce = encodeCrock(getRandomBytes(24));
const res = await wex.db.runReadWriteTx(
{
storeNames: [
"coinAvailability",
"coinHistory",
"coins",
"contractTerms",
"denominations",
"exchangeDetails",
"exchanges",
"peerPushDebit",
"refreshGroups",
"refreshSessions",
],
},
async (tx) => {
const coinSelRes = await selectPeerCoinsInTx(wex, tx, {
instructedAmount,
});
let coins: SelectedProspectiveCoin[] | undefined = undefined;
switch (coinSelRes.type) {
case "failure":
throw TalerError.fromDetail(
TalerErrorCode.WALLET_PEER_PUSH_PAYMENT_INSUFFICIENT_BALANCE,
{
insufficientBalanceDetails: coinSelRes.insufficientBalanceDetails,
},
);
case "prospective":
coins = coinSelRes.result.prospectiveCoins;
break;
case "success":
coins = coinSelRes.result.coins;
break;
default:
assertUnreachable(coinSelRes);
}
const sel = coinSelRes.result;
const totalAmount = await getTotalPeerPaymentCostInTx(wex, tx, coins);
const ppi: PeerPushDebitRecord = {
amount: Amounts.stringify(instructedAmount),
contractPriv: contractKeyPair.priv,
contractPub: contractKeyPair.pub,
contractTermsHash: hContractTerms,
exchangeBaseUrl: sel.exchangeBaseUrl,
mergePriv: mergePair.priv,
mergePub: mergePair.pub,
purseExpiration: timestampProtocolToDb(purseExpiration),
pursePriv: pursePair.priv,
pursePub: pursePair.pub,
timestampCreated: timestampPreciseToDb(TalerPreciseTimestamp.now()),
status: PeerPushDebitStatus.PendingCreatePurse,
contractEncNonce,
totalCost: Amounts.stringify(totalAmount),
};
if (coinSelRes.type === "success") {
ppi.coinSel = {
coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
contributions: coinSelRes.result.coins.map((x) => x.contribution),
};
// FIXME: Instead of directly doing a spendCoin here,
// we might want to mark the coins as used and spend them
// after we've been able to create the purse.
await spendCoins(wex, tx, {
transactionId: ctx.transactionId,
coinPubs: coinSelRes.result.coins.map((x) => x.coinPub),
contributions: coinSelRes.result.coins.map((x) =>
Amounts.parseOrThrow(x.contribution),
),
refreshReason: RefreshReason.PayPeerPush,
});
}
await tx.peerPushDebit.add(ppi);
await tx.contractTerms.put({
h: hContractTerms,
contractTermsRaw: contractTerms,
});
const newTxState = computePeerPushDebitTransactionState(ppi);
return {
transitionInfo: {
oldTxState: { major: TransactionMajorState.None },
newTxState,
},
exchangeBaseUrl: coinSelRes.result.exchangeBaseUrl,
};
},
);
notifyTransition(wex, transactionId, res.transitionInfo);
wex.ws.notify({
type: NotificationType.BalanceChange,
hintTransactionId: transactionId,
});
wex.taskScheduler.startShepherdTask(ctx.taskId);
return {
contractPriv: contractKeyPair.priv,
mergePriv: mergePair.priv,
pursePub: pursePair.pub,
exchangeBaseUrl: res.exchangeBaseUrl,
transactionId: constructTransactionIdentifier({
tag: TransactionType.PeerPushDebit,
pursePub: pursePair.pub,
}),
};
}
export function computePeerPushDebitTransactionActions(
ppiRecord: PeerPushDebitRecord,
): TransactionAction[] {
switch (ppiRecord.status) {
case PeerPushDebitStatus.PendingCreatePurse:
return [
TransactionAction.Retry,
TransactionAction.Abort,
TransactionAction.Suspend,
];
case PeerPushDebitStatus.PendingReady:
return [
TransactionAction.Retry,
TransactionAction.Abort,
TransactionAction.Suspend,
];
case PeerPushDebitStatus.Aborted:
return [TransactionAction.Delete];
case PeerPushDebitStatus.AbortingDeletePurse:
return [
TransactionAction.Retry,
TransactionAction.Suspend,
TransactionAction.Fail,
];
case PeerPushDebitStatus.AbortingRefreshDeleted:
return [
TransactionAction.Retry,
TransactionAction.Suspend,
TransactionAction.Fail,
];
case PeerPushDebitStatus.AbortingRefreshExpired:
return [
TransactionAction.Retry,
TransactionAction.Suspend,
TransactionAction.Fail,
];
case PeerPushDebitStatus.SuspendedAbortingRefreshExpired:
return [TransactionAction.Resume, TransactionAction.Fail];
case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
return [TransactionAction.Resume, TransactionAction.Fail];
case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted:
return [TransactionAction.Resume, TransactionAction.Fail];
case PeerPushDebitStatus.SuspendedCreatePurse:
return [TransactionAction.Resume, TransactionAction.Abort];
case PeerPushDebitStatus.SuspendedReady:
return [TransactionAction.Resume, TransactionAction.Abort];
case PeerPushDebitStatus.Done:
return [TransactionAction.Delete];
case PeerPushDebitStatus.Expired:
return [TransactionAction.Delete];
case PeerPushDebitStatus.Failed:
return [TransactionAction.Delete];
}
}
export function computePeerPushDebitTransactionState(
ppiRecord: PeerPushDebitRecord,
): TransactionState {
switch (ppiRecord.status) {
case PeerPushDebitStatus.PendingCreatePurse:
return {
major: TransactionMajorState.Pending,
minor: TransactionMinorState.CreatePurse,
};
case PeerPushDebitStatus.PendingReady:
return {
major: TransactionMajorState.Pending,
minor: TransactionMinorState.Ready,
};
case PeerPushDebitStatus.Aborted:
return {
major: TransactionMajorState.Aborted,
};
case PeerPushDebitStatus.AbortingDeletePurse:
return {
major: TransactionMajorState.Aborting,
minor: TransactionMinorState.DeletePurse,
};
case PeerPushDebitStatus.AbortingRefreshDeleted:
return {
major: TransactionMajorState.Aborting,
minor: TransactionMinorState.Refresh,
};
case PeerPushDebitStatus.AbortingRefreshExpired:
return {
major: TransactionMajorState.Aborting,
minor: TransactionMinorState.RefreshExpired,
};
case PeerPushDebitStatus.SuspendedAbortingDeletePurse:
return {
major: TransactionMajorState.SuspendedAborting,
minor: TransactionMinorState.DeletePurse,
};
case PeerPushDebitStatus.SuspendedAbortingRefreshExpired:
return {
major: TransactionMajorState.SuspendedAborting,
minor: TransactionMinorState.RefreshExpired,
};
case PeerPushDebitStatus.SuspendedAbortingRefreshDeleted:
return {
major: TransactionMajorState.SuspendedAborting,
minor: TransactionMinorState.Refresh,
};
case PeerPushDebitStatus.SuspendedCreatePurse:
return {
major: TransactionMajorState.Suspended,
minor: TransactionMinorState.CreatePurse,
};
case PeerPushDebitStatus.SuspendedReady:
return {
major: TransactionMajorState.Suspended,
minor: TransactionMinorState.Ready,
};
case PeerPushDebitStatus.Done:
return {
major: TransactionMajorState.Done,
};
case PeerPushDebitStatus.Failed:
return {
major: TransactionMajorState.Failed,
};
case PeerPushDebitStatus.Expired:
return {
major: TransactionMajorState.Expired,
};
}
}