From 1e378e4499906e466e933e40464727fb1c1cbf5e Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 12 Jan 2023 16:57:51 +0100 Subject: wallet-core: retries for peer pull payments --- .../src/crypto/cryptoImplementation.ts | 20 +-- .../taler-wallet-core/src/crypto/cryptoTypes.ts | 5 +- packages/taler-wallet-core/src/db.ts | 12 ++ .../taler-wallet-core/src/operations/pay-peer.ts | 197 ++++++++++++++------- .../taler-wallet-core/src/operations/pending.ts | 57 ++++++ packages/taler-wallet-core/src/pending-types.ts | 18 +- packages/taler-wallet-core/src/util/retries.ts | 13 +- packages/taler-wallet-core/src/wallet.ts | 13 +- 8 files changed, 249 insertions(+), 86 deletions(-) diff --git a/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts b/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts index c86a732d8..316755840 100644 --- a/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts +++ b/packages/taler-wallet-core/src/crypto/cryptoImplementation.ts @@ -60,6 +60,7 @@ import { hashCoinPub, hashDenomPub, hashTruncate32, + j2s, kdf, kdfKw, keyExchangeEcdhEddsa, @@ -447,11 +448,11 @@ export interface SignPurseCreationRequest { export interface SpendCoinDetails { coinPub: string; - coinPriv: string; - contribution: AmountString; - denomPubHash: string; - denomSig: UnblindedSignature; - ageCommitmentProof: AgeCommitmentProof | undefined; + coinPriv: string; + contribution: AmountString; + denomPubHash: string; + denomSig: UnblindedSignature; + ageCommitmentProof: AgeCommitmentProof | undefined; } export interface SignPurseDepositsRequest { @@ -1453,7 +1454,6 @@ export const nativeCryptoR: TalerCryptoInterfaceR = { tci: TalerCryptoInterfaceR, req: EncryptContractRequest, ): Promise { - const enc = await encryptContractForMerge( decodeCrock(req.pursePub), decodeCrock(req.contractPriv), @@ -1491,24 +1491,22 @@ export const nativeCryptoR: TalerCryptoInterfaceR = { tci: TalerCryptoInterfaceR, req: EncryptContractForDepositRequest, ): Promise { - const contractKeyPair = await this.createEddsaKeypair(tci, {}); const enc = await encryptContractForDeposit( decodeCrock(req.pursePub), - decodeCrock(contractKeyPair.priv), + decodeCrock(req.contractPriv), req.contractTerms, ); const sigBlob = buildSigPS(TalerSignaturePurpose.WALLET_PURSE_ECONTRACT) .put(hash(enc)) - .put(decodeCrock(contractKeyPair.pub)) + .put(decodeCrock(req.contractPub)) .build(); const sig = eddsaSign(sigBlob, decodeCrock(req.pursePriv)); return { econtract: { - contract_pub: contractKeyPair.pub, + contract_pub: req.contractPub, econtract: encodeCrock(enc), econtract_sig: encodeCrock(sig), }, - contractPriv: contractKeyPair.priv, }; }, async decryptContractForDeposit( diff --git a/packages/taler-wallet-core/src/crypto/cryptoTypes.ts b/packages/taler-wallet-core/src/crypto/cryptoTypes.ts index ea58b2820..98f3c935b 100644 --- a/packages/taler-wallet-core/src/crypto/cryptoTypes.ts +++ b/packages/taler-wallet-core/src/crypto/cryptoTypes.ts @@ -190,14 +190,15 @@ export interface EncryptContractResponse { export interface EncryptContractForDepositRequest { contractTerms: any; + contractPriv: string; + contractPub: string; + pursePub: string; pursePriv: string; } export interface EncryptContractForDepositResponse { econtract: EncryptedContract; - - contractPriv: string; } export interface DecryptContractRequest { diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index 5d1075c83..d929fd123 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -1780,6 +1780,18 @@ export interface PeerPullPaymentInitiationRecord { */ contractTermsHash: string; + mergePub: string; + mergePriv: string; + + contractPub: string; + contractPriv: string; + + contractTerms: PeerContractTerms; + + mergeTimestamp: TalerProtocolTimestamp; + + mergeReserveRowId: number; + /** * Status of the peer pull payment initiation. */ diff --git a/packages/taler-wallet-core/src/operations/pay-peer.ts b/packages/taler-wallet-core/src/operations/pay-peer.ts index 670b547ae..68b8eb741 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer.ts @@ -340,7 +340,7 @@ export async function preparePeerPushPayment( }; } -export async function processPeerPushOutgoing( +export async function processPeerPushInitiation( ws: InternalWalletState, pursePub: string, ): Promise { @@ -417,6 +417,7 @@ export async function processPeerPushOutgoing( return; } ppi.status = PeerPushPaymentInitiationStatus.PurseCreated; + await tx.peerPushPaymentInitiations.put(ppi); }); return { @@ -428,7 +429,7 @@ export async function processPeerPushOutgoing( /** * Initiate sending a peer-to-peer push payment. */ -export async function initiatePeerToPeerPush( +export async function initiatePeerPushPayment( ws: InternalWalletState, req: InitiatePeerPushPaymentRequest, ): Promise { @@ -513,7 +514,7 @@ export async function initiatePeerToPeerPush( ws, RetryTags.byPeerPushPaymentInitiationPursePub(pursePair.pub), async () => { - return await processPeerPushOutgoing(ws, pursePair.pub); + return await processPeerPushInitiation(ws, pursePair.pub); }, ); @@ -935,6 +936,115 @@ export async function checkPeerPullPayment( }; } +export async function processPeerPullInitiation( + ws: InternalWalletState, + pursePub: string, +): Promise { + const pullIni = await ws.db + .mktx((x) => [x.peerPullPaymentInitiations]) + .runReadOnly(async (tx) => { + return tx.peerPullPaymentInitiations.get(pursePub); + }); + if (!pullIni) { + throw Error("peer pull payment initiation not found in database"); + } + + if (pullIni.status === OperationStatus.Finished) { + logger.warn("peer pull payment initiation is already finished"); + return { + type: OperationAttemptResultType.Finished, + result: undefined, + } + } + + const mergeReserve = await ws.db + .mktx((x) => [x.reserves]) + .runReadOnly(async (tx) => { + return tx.reserves.get(pullIni.mergeReserveRowId); + }); + + if (!mergeReserve) { + throw Error("merge reserve for peer pull payment not found in database"); + } + + const purseFee = Amounts.stringify(Amounts.zeroOfAmount(pullIni.amount)); + + const reservePayto = talerPaytoFromExchangeReserve( + pullIni.exchangeBaseUrl, + mergeReserve.reservePub, + ); + + const econtractResp = await ws.cryptoApi.encryptContractForDeposit({ + contractPriv: pullIni.contractPriv, + contractPub: pullIni.contractPub, + contractTerms: pullIni.contractTerms, + pursePriv: pullIni.pursePriv, + pursePub: pullIni.pursePub, + }); + + const purseExpiration = pullIni.contractTerms.purse_expiration; + const sigRes = await ws.cryptoApi.signReservePurseCreate({ + contractTermsHash: pullIni.contractTermsHash, + flags: WalletAccountMergeFlags.CreateWithPurseFee, + mergePriv: pullIni.mergePriv, + mergeTimestamp: pullIni.mergeTimestamp, + purseAmount: pullIni.contractTerms.amount, + purseExpiration: purseExpiration, + purseFee: purseFee, + pursePriv: pullIni.pursePriv, + pursePub: pullIni.pursePub, + reservePayto, + reservePriv: mergeReserve.reservePriv, + }); + + const reservePurseReqBody: ExchangeReservePurseRequest = { + merge_sig: sigRes.mergeSig, + merge_timestamp: pullIni.mergeTimestamp, + h_contract_terms: pullIni.contractTermsHash, + merge_pub: pullIni.mergePub, + min_age: 0, + purse_expiration: purseExpiration, + purse_fee: purseFee, + purse_pub: pullIni.pursePub, + purse_sig: sigRes.purseSig, + purse_value: pullIni.contractTerms.amount, + reserve_sig: sigRes.accountSig, + econtract: econtractResp.econtract, + }; + + logger.info(`reserve purse request: ${j2s(reservePurseReqBody)}`); + + const reservePurseMergeUrl = new URL( + `reserves/${mergeReserve.reservePub}/purse`, + pullIni.exchangeBaseUrl, + ); + + const httpResp = await ws.http.postJson( + reservePurseMergeUrl.href, + reservePurseReqBody, + ); + + const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny()); + + logger.info(`reserve merge response: ${j2s(resp)}`); + + await ws.db + .mktx((x) => [x.peerPullPaymentInitiations]) + .runReadWrite(async (tx) => { + const pi2 = await tx.peerPullPaymentInitiations.get(pursePub); + if (!pi2) { + return; + } + pi2.status = OperationStatus.Finished; + await tx.peerPullPaymentInitiations.put(pi2); + }); + + return { + type: OperationAttemptResultType.Finished, + result: undefined, + }; +} + export async function preparePeerPullPayment( ws: InternalWalletState, req: PreparePeerPullPaymentRequest, @@ -967,39 +1077,14 @@ export async function initiatePeerPullPayment( const instructedAmount = Amounts.parseOrThrow( req.partialContractTerms.amount, ); - const purseExpiration = req.partialContractTerms.purse_expiration; const contractTerms = req.partialContractTerms; - const reservePayto = talerPaytoFromExchangeReserve( - req.exchangeBaseUrl, - mergeReserveInfo.reservePub, - ); - - const econtractResp = await ws.cryptoApi.encryptContractForDeposit({ - contractTerms, - pursePriv: pursePair.priv, - pursePub: pursePair.pub, - }); - const hContractTerms = ContractTermsUtil.hashContractTerms(contractTerms); - const purseFee = Amounts.stringify( - Amounts.zeroOfCurrency(instructedAmount.currency), - ); + const contractKeyPair = await ws.cryptoApi.createEddsaKeypair({}); - const sigRes = await ws.cryptoApi.signReservePurseCreate({ - contractTermsHash: hContractTerms, - flags: WalletAccountMergeFlags.CreateWithPurseFee, - mergePriv: mergePair.priv, - mergeTimestamp: mergeTimestamp, - purseAmount: req.partialContractTerms.amount, - purseExpiration: purseExpiration, - purseFee: purseFee, - pursePriv: pursePair.priv, - pursePub: pursePair.pub, - reservePayto, - reservePriv: mergeReserveInfo.reservePriv, - }); + const mergeReserveRowId = mergeReserveInfo.rowId; + checkDbInvariant(!!mergeReserveRowId); await ws.db .mktx((x) => [x.peerPullPaymentInitiations, x.contractTerms]) @@ -1010,7 +1095,14 @@ export async function initiatePeerPullPayment( exchangeBaseUrl: req.exchangeBaseUrl, pursePriv: pursePair.priv, pursePub: pursePair.pub, - status: OperationStatus.Finished, + mergePriv: mergePair.priv, + mergePub: mergePair.pub, + status: OperationStatus.Pending, + contractTerms: contractTerms, + mergeTimestamp, + mergeReserveRowId: mergeReserveRowId, + contractPriv: contractKeyPair.priv, + contractPub: contractKeyPair.pub, }); await tx.contractTerms.put({ contractTermsRaw: contractTerms, @@ -1018,43 +1110,24 @@ export async function initiatePeerPullPayment( }); }); - const reservePurseReqBody: ExchangeReservePurseRequest = { - merge_sig: sigRes.mergeSig, - merge_timestamp: mergeTimestamp, - h_contract_terms: hContractTerms, - merge_pub: mergePair.pub, - min_age: 0, - purse_expiration: purseExpiration, - purse_fee: purseFee, - purse_pub: pursePair.pub, - purse_sig: sigRes.purseSig, - purse_value: req.partialContractTerms.amount, - reserve_sig: sigRes.accountSig, - econtract: econtractResp.econtract, - }; - - logger.info(`reserve purse request: ${j2s(reservePurseReqBody)}`); - - const reservePurseMergeUrl = new URL( - `reserves/${mergeReserveInfo.reservePub}/purse`, - req.exchangeBaseUrl, - ); + // FIXME: Should we somehow signal to the client + // whether purse creation has failed, or does the client/ + // check this asynchronously from the transaction status? - const httpResp = await ws.http.postJson( - reservePurseMergeUrl.href, - reservePurseReqBody, + await runOperationWithErrorReporting( + ws, + RetryTags.byPeerPullPaymentInitiationPursePub(pursePair.pub), + async () => { + return processPeerPullInitiation(ws, pursePair.pub); + }, ); - const resp = await readSuccessResponseJsonOrThrow(httpResp, codecForAny()); - - logger.info(`reserve merge response: ${j2s(resp)}`); - const wg = await internalCreateWithdrawalGroup(ws, { amount: instructedAmount, wgInfo: { withdrawalType: WithdrawalRecordType.PeerPullCredit, contractTerms, - contractPriv: econtractResp.contractPriv, + contractPriv: contractKeyPair.priv, }, exchangeBaseUrl: req.exchangeBaseUrl, reserveStatus: WithdrawalGroupStatus.QueryingStatus, @@ -1067,7 +1140,7 @@ export async function initiatePeerPullPayment( return { talerUri: constructPayPullUri({ exchangeBaseUrl: req.exchangeBaseUrl, - contractPriv: econtractResp.contractPriv, + contractPriv: contractKeyPair.priv, }), transactionId: makeTransactionId( TransactionType.PeerPullCredit, diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index d2066d4fc..d9d62ec65 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -28,6 +28,7 @@ import { RefreshCoinStatus, OperationStatus, OperationStatusRange, + PeerPushPaymentInitiationStatus, } from "../db.js"; import { PendingOperationsResponse, @@ -341,6 +342,58 @@ async function gatherBackupPending( }); } +async function gatherPeerPullInitiationPending( + ws: InternalWalletState, + tx: GetReadOnlyAccess<{ + peerPullPaymentInitiations: typeof WalletStoresV1.peerPullPaymentInitiations; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, + now: AbsoluteTime, + resp: PendingOperationsResponse, +): Promise { + await tx.peerPullPaymentInitiations.iter().forEachAsync(async (pi) => { + if (pi.status === OperationStatus.Finished) { + return; + } + const opId = RetryTags.forPeerPullPaymentInitiation(pi); + const retryRecord = await tx.operationRetries.get(opId); + const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); + resp.pendingOperations.push({ + type: PendingTaskType.PeerPullInitiation, + ...getPendingCommon(ws, opId, timestampDue), + givesLifeness: true, + retryInfo: retryRecord?.retryInfo, + pursePub: pi.pursePub, + }); + }); +} + +async function gatherPeerPushInitiationPending( + ws: InternalWalletState, + tx: GetReadOnlyAccess<{ + peerPushPaymentInitiations: typeof WalletStoresV1.peerPushPaymentInitiations; + operationRetries: typeof WalletStoresV1.operationRetries; + }>, + now: AbsoluteTime, + resp: PendingOperationsResponse, +): Promise { + await tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => { + if (pi.status === PeerPushPaymentInitiationStatus.PurseCreated) { + return; + } + const opId = RetryTags.forPeerPushPaymentInitiation(pi); + const retryRecord = await tx.operationRetries.get(opId); + const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); + resp.pendingOperations.push({ + type: PendingTaskType.PeerPushInitiation, + ...getPendingCommon(ws, opId, timestampDue), + givesLifeness: true, + retryInfo: retryRecord?.retryInfo, + pursePub: pi.pursePub, + }); + }); +} + export async function getPendingOperations( ws: InternalWalletState, ): Promise { @@ -359,6 +412,8 @@ export async function getPendingOperations( x.depositGroups, x.recoupGroups, x.operationRetries, + x.peerPullPaymentInitiations, + x.peerPushPaymentInitiations, ]) .runReadWrite(async (tx) => { const resp: PendingOperationsResponse = { @@ -372,6 +427,8 @@ export async function getPendingOperations( await gatherPurchasePending(ws, tx, now, resp); await gatherRecoupPending(ws, tx, now, resp); await gatherBackupPending(ws, tx, now, resp); + await gatherPeerPushInitiationPending(ws, tx, now, resp); + await gatherPeerPullInitiationPending(ws, tx, now, resp); return resp; }); } diff --git a/packages/taler-wallet-core/src/pending-types.ts b/packages/taler-wallet-core/src/pending-types.ts index 65b72de04..65657f471 100644 --- a/packages/taler-wallet-core/src/pending-types.ts +++ b/packages/taler-wallet-core/src/pending-types.ts @@ -37,7 +37,8 @@ export enum PendingTaskType { Withdraw = "withdraw", Deposit = "deposit", Backup = "backup", - PeerPushOutgoing = "peer-push-outgoing", + PeerPushInitiation = "peer-push-initiation", + PeerPullInitiation = "peer-pull\-initiation", } /** @@ -54,7 +55,8 @@ export type PendingTaskInfo = PendingTaskInfoCommon & | PendingRecoupTask | PendingDepositTask | PendingBackupTask - | PendingPeerPushOutgoingTask + | PendingPeerPushInitiationTask + | PendingPeerPullInitiationTask ); export interface PendingBackupTask { @@ -75,8 +77,16 @@ export interface PendingExchangeUpdateTask { /** * The wallet wants to send a peer push payment. */ -export interface PendingPeerPushOutgoingTask { - type: PendingTaskType.PeerPushOutgoing; +export interface PendingPeerPushInitiationTask { + type: PendingTaskType.PeerPushInitiation; + pursePub: string; +} + +/** + * The wallet wants to send a peer pull payment. + */ +export interface PendingPeerPullInitiationTask { + type: PendingTaskType.PeerPullInitiation; pursePub: string; } diff --git a/packages/taler-wallet-core/src/util/retries.ts b/packages/taler-wallet-core/src/util/retries.ts index 300875db7..d72d3adaa 100644 --- a/packages/taler-wallet-core/src/util/retries.ts +++ b/packages/taler-wallet-core/src/util/retries.ts @@ -30,6 +30,7 @@ import { BackupProviderRecord, DepositGroupRecord, ExchangeRecord, + PeerPullPaymentInitiationRecord, PeerPushPaymentInitiationRecord, PurchaseRecord, RecoupGroupRecord, @@ -204,13 +205,21 @@ export namespace RetryTags { export function forPeerPushPaymentInitiation( ppi: PeerPushPaymentInitiationRecord, ): string { - return `${PendingTaskType.PeerPushOutgoing}:${ppi.pursePub}`; + return `${PendingTaskType.PeerPushInitiation}:${ppi.pursePub}`; + } + export function forPeerPullPaymentInitiation( + ppi: PeerPullPaymentInitiationRecord, + ): string { + return `${PendingTaskType.PeerPullInitiation}:${ppi.pursePub}`; } export function byPaymentProposalId(proposalId: string): string { return `${PendingTaskType.Purchase}:${proposalId}`; } export function byPeerPushPaymentInitiationPursePub(pursePub: string): string { - return `${PendingTaskType.PeerPushOutgoing}:${pursePub}`; + return `${PendingTaskType.PeerPushInitiation}:${pursePub}`; + } + export function byPeerPullPaymentInitiationPursePub(pursePub: string): string { + return `${PendingTaskType.PeerPullInitiation}:${pursePub}`; } } diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 73b86c8c6..a5c092b59 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -195,10 +195,11 @@ import { checkPeerPullPayment, checkPeerPushPayment, initiatePeerPullPayment, - initiatePeerToPeerPush, + initiatePeerPushPayment, preparePeerPullPayment, preparePeerPushPayment, - processPeerPushOutgoing, + processPeerPullInitiation, + processPeerPushInitiation, } from "./operations/pay-peer.js"; import { getPendingOperations } from "./operations/pending.js"; import { @@ -318,8 +319,10 @@ async function callOperationHandler( } case PendingTaskType.Backup: return await processBackupForProvider(ws, pending.backupProviderBaseUrl); - case PendingTaskType.PeerPushOutgoing: - return await processPeerPushOutgoing(ws, pending.pursePub); + case PendingTaskType.PeerPushInitiation: + return await processPeerPushInitiation(ws, pending.pursePub); + case PendingTaskType.PeerPullInitiation: + return await processPeerPullInitiation(ws, pending.pursePub); default: return assertUnreachable(pending); } @@ -1381,7 +1384,7 @@ async function dispatchRequestInternal( } case WalletApiOperation.InitiatePeerPushPayment: { const req = codecForInitiatePeerPushPaymentRequest().decode(payload); - return await initiatePeerToPeerPush(ws, req); + return await initiatePeerPushPayment(ws, req); } case WalletApiOperation.CheckPeerPushPayment: { const req = codecForCheckPeerPushPaymentRequest().decode(payload); -- cgit v1.2.3