diff options
Diffstat (limited to 'packages/taler-wallet-core/src')
-rw-r--r-- | packages/taler-wallet-core/src/pay-merchant.ts | 38 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 171 |
2 files changed, 101 insertions, 108 deletions
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts index 8eff7e17b..be3f7f106 100644 --- a/packages/taler-wallet-core/src/pay-merchant.ts +++ b/packages/taler-wallet-core/src/pay-merchant.ts @@ -880,7 +880,11 @@ async function createOrReusePurchase( ); if (oldProposal.purchaseStatus === PurchaseStatus.DialogShared) { const download = await expectProposalDownload(wex, oldProposal); - const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, false); + const paid = await checkIfOrderIsAlreadyPaid( + wex, + download.contractData, + false, + ); logger.info(`old proposal paid: ${paid}`); if (paid) { // if this transaction was shared and the order is paid then it @@ -1912,6 +1916,11 @@ export async function confirmPay( hintTransactionId: transactionId, }); + const ctx = new PayMerchantTransactionContext(wex, proposalId); + + // In case we're sharing the payment and we're long-polling + wex.taskScheduler.stopShepherdTask(ctx.taskId); + // Wait until we have completed the first attempt to pay. return waitPaymentResult(wex, proposalId); } @@ -2011,7 +2020,11 @@ async function processPurchasePay( const download = await expectProposalDownload(wex, purchase); if (purchase.shared) { - const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, false); + const paid = await checkIfOrderIsAlreadyPaid( + wex, + download.contractData, + false, + ); if (paid) { const transitionInfo = await wex.db.runReadWriteTx( @@ -2463,17 +2476,24 @@ export async function sharePayment( // FIXME: purchase can be shared before being paid return undefined; } + const oldTxState = computePayMerchantTransactionState(p); if (p.purchaseStatus === PurchaseStatus.DialogProposed) { p.purchaseStatus = PurchaseStatus.DialogShared; p.shared = true; tx.purchases.put(p); } + const newTxState = computePayMerchantTransactionState(p); + return { proposalId: p.proposalId, nonce: p.noncePriv, session: p.lastSessionId ?? p.downloadSessionId, token: p.claimToken, + transitionInfo: { + oldTxState, + newTxState, + }, }; }); @@ -2481,8 +2501,11 @@ export async function sharePayment( throw Error("This purchase can't be shared"); } - // schedule a task to watch for the status const ctx = new PayMerchantTransactionContext(wex, result.proposalId); + + notifyTransition(wex, ctx.transactionId, result.transitionInfo); + + // schedule a task to watch for the status wex.taskScheduler.startShepherdTask(ctx.taskId); const privatePayUri = stringifyPayUri({ @@ -2514,6 +2537,7 @@ async function checkIfOrderIsAlreadyPaid( const resp = await wex.http.fetch(requestUrl.href, { cancellationToken: wex.cancellationToken, }); + if ( resp.status === HttpStatusCode.Ok || resp.status === HttpStatusCode.Accepted || @@ -2539,7 +2563,11 @@ async function processPurchaseDialogShared( return TaskRunResult.finished(); } - const paid = await checkIfOrderIsAlreadyPaid(wex, download.contractData, true); + const paid = await checkIfOrderIsAlreadyPaid( + wex, + download.contractData, + true, + ); if (paid) { const transitionInfo = await wex.db.runReadWriteTx( ["purchases"], @@ -2551,7 +2579,7 @@ async function processPurchaseDialogShared( } const oldTxState = computePayMerchantTransactionState(p); p.purchaseStatus = PurchaseStatus.FailedClaim; - p.paidByOther = true + p.paidByOther = true; const newTxState = computePayMerchantTransactionState(p); await tx.purchases.put(p); return { oldTxState, newTxState }; diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index db090c352..0544288ba 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -28,7 +28,6 @@ import { ObservabilityContext, ObservabilityEventType, RetryLoopOpts, - TalerError, TalerErrorCode, TalerErrorDetail, TaskThrottler, @@ -37,6 +36,7 @@ import { TransactionType, WalletNotification, assertUnreachable, + getErrorDetailFromException, j2s, makeErrorDetail, } from "@gnu-taler/taler-util"; @@ -55,6 +55,7 @@ import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js"; import { OPERATION_STATUS_ACTIVE_FIRST, OPERATION_STATUS_ACTIVE_LAST, + OperationRetryRecord, WalletDbAllStoresReadOnlyTransaction, WalletDbReadOnlyTransaction, timestampAbsoluteFromDb, @@ -173,7 +174,7 @@ export class TaskSchedulerImpl implements TaskScheduler { } getActiveTasks(): TaskIdStr[] { - return [...this.sheps.keys()] + return [...this.sheps.keys()]; } ensureRunning(): void { @@ -343,15 +344,21 @@ export class TaskSchedulerImpl implements TaskScheduler { const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); // FIXME: This should already return the retry record. - const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { - return await callOperationHandlerForTaskId(wex, taskId); - }); - const retryRecord = await this.ws.db.runReadOnlyTx( - ["operationRetries"], - async (tx) => { - return tx.operationRetries.get(taskId); + const res = await runTaskWithErrorReporting( + this.ws, + taskId, + info, + async () => { + return await callOperationHandlerForTaskId(wex, taskId); }, ); + if (info.cts.token.isCancelled) { + logger.info("task cancelled, not processing result"); + return; + } + if (this.ws.stopped) { + logger.info("wallet stopped, not processing result"); + } wex.oc.observe({ type: ObservabilityEventType.ShepherdTaskResult, resultType: res.type, @@ -359,46 +366,48 @@ export class TaskSchedulerImpl implements TaskScheduler { switch (res.type) { case TaskRunResultType.Error: { logger.trace(`Shepherd for ${taskId} got error result.`); - if (retryRecord) { - let delay: Duration; - const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); - delay = AbsoluteTime.remaining(t); - logger.trace(`Waiting for ${delay.d_ms} ms`); - await this.wait(taskId, info, delay); - } else { - logger.trace("Retrying immediately."); - } + const retryRecord = await storePendingTaskError( + this.ws, + taskId, + res.errorDetail, + ); + let delay: Duration; + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + await this.wait(taskId, info, delay); break; } case TaskRunResultType.Backoff: { logger.trace(`Shepherd for ${taskId} got backoff result.`); - if (retryRecord) { - let delay: Duration; - const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); - delay = AbsoluteTime.remaining(t); - logger.trace(`Waiting for ${delay.d_ms} ms`); - await this.wait(taskId, info, delay); - } else { - logger.trace("Retrying immediately."); - } + const retryRecord = await storePendingTaskPending(this.ws, taskId); + let delay: Duration; + const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); + delay = AbsoluteTime.remaining(t); + logger.trace(`Waiting for ${delay.d_ms} ms`); + await this.wait(taskId, info, delay); break; } case TaskRunResultType.Progress: { logger.trace( `Shepherd for ${taskId} got progress result, re-running immediately.`, ); + await storeTaskProgress(this.ws, taskId); break; } case TaskRunResultType.ScheduleLater: logger.trace(`Shepherd for ${taskId} got schedule-later result.`); + await storeTaskProgress(this.ws, taskId); const delay = AbsoluteTime.remaining(res.runAt); logger.trace(`Waiting for ${delay.d_ms} ms`); await this.wait(taskId, info, delay); break; case TaskRunResultType.Finished: logger.trace(`Shepherd for ${taskId} got finished result.`); + await storePendingTaskFinished(this.ws, taskId); return; case TaskRunResultType.LongpollReturnedPending: { + await storeTaskProgress(this.ws, taskId); // Make sure that we are waiting a bit if long-polling returned too early. const endTime = AbsoluteTime.now(); const taskDuration = AbsoluteTime.difference(endTime, startTime); @@ -425,9 +434,9 @@ async function storePendingTaskError( ws: InternalWalletState, pendingTaskId: string, e: TalerErrorDetail, -): Promise<void> { +): Promise<OperationRetryRecord> { logger.info(`storing pending task error for ${pendingTaskId}`); - const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => { + const res = await ws.db.runAllStoresReadWriteTx(async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); if (!retryRecord) { retryRecord = { @@ -440,11 +449,15 @@ async function storePendingTaskError( retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); } await tx.operationRetries.put(retryRecord); - return taskToRetryNotification(ws, tx, pendingTaskId, e); + return { + notification: await taskToRetryNotification(ws, tx, pendingTaskId, e), + retryRecord, + }; }); - if (maybeNotification) { - ws.notify(maybeNotification); + if (res?.notification) { + ws.notify(res.notification); } + return res.retryRecord; } /** @@ -462,8 +475,8 @@ async function storeTaskProgress( async function storePendingTaskPending( ws: InternalWalletState, pendingTaskId: string, -): Promise<void> { - const maybeNotification = await ws.db.runAllStoresReadWriteTx(async (tx) => { +): Promise<OperationRetryRecord> { + const res = await ws.db.runAllStoresReadWriteTx(async (tx) => { let retryRecord = await tx.operationRetries.get(pendingTaskId); let hadError = false; if (!retryRecord) { @@ -479,15 +492,24 @@ async function storePendingTaskPending( retryRecord.retryInfo = DbRetryInfo.increment(retryRecord.retryInfo); } await tx.operationRetries.put(retryRecord); + let notification: WalletNotification | undefined = undefined; if (hadError) { - return taskToRetryNotification(ws, tx, pendingTaskId, undefined); - } else { - return undefined; + notification = await taskToRetryNotification( + ws, + tx, + pendingTaskId, + undefined, + ); } + return { + notification, + retryRecord, + }; }); - if (maybeNotification) { - ws.notify(maybeNotification); + if (res.notification) { + ws.notify(res.notification); } + return res.retryRecord; } async function storePendingTaskFinished( @@ -502,33 +524,11 @@ async function storePendingTaskFinished( async function runTaskWithErrorReporting( ws: InternalWalletState, opId: TaskIdStr, + info: ShepherdInfo, f: () => Promise<TaskRunResult>, ): Promise<TaskRunResult> { - let maybeError: TalerErrorDetail | undefined; try { - const resp = await f(); - switch (resp.type) { - case TaskRunResultType.Error: - await storePendingTaskError(ws, opId, resp.errorDetail); - return resp; - case TaskRunResultType.Finished: - await storePendingTaskFinished(ws, opId); - return resp; - case TaskRunResultType.Backoff: - await storePendingTaskPending(ws, opId); - return resp; - case TaskRunResultType.ScheduleLater: - // Task succeeded but wants to be run again. - await storeTaskProgress(ws, opId); - return resp; - case TaskRunResultType.Progress: - await storeTaskProgress(ws, opId); - return resp; - case TaskRunResultType.LongpollReturnedPending: - // Longpoll should be run again immediately. - await storeTaskProgress(ws, opId); - return resp; - } + return await f(); } catch (e) { if (e instanceof CryptoApiStoppedError) { if (ws.stopped) { @@ -543,46 +543,11 @@ async function runTaskWithErrorReporting( }; } } - if (e instanceof TalerError) { - logger.warn("operation processed resulted in error"); - logger.warn(`error was: ${j2s(e.errorDetail)}`); - maybeError = e.errorDetail; - await storePendingTaskError(ws, opId, maybeError!); - return { - type: TaskRunResultType.Error, - errorDetail: e.errorDetail, - }; - } else if (e instanceof Error) { - // This is a bug, as we expect pending operations to always - // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED - // or return something. - logger.error(`Uncaught exception: ${e.message}`); - logger.error(`Stack: ${e.stack}`); - maybeError = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - { - stack: e.stack, - }, - `unexpected exception (message: ${e.message})`, - ); - await storePendingTaskError(ws, opId, maybeError); - return { - type: TaskRunResultType.Error, - errorDetail: maybeError, - }; - } else { - logger.error("Uncaught exception, value is not even an error."); - maybeError = makeErrorDetail( - TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, - {}, - `unexpected exception (not even an error)`, - ); - await storePendingTaskError(ws, opId, maybeError); - return { - type: TaskRunResultType.Error, - errorDetail: maybeError, - }; - } + const errorDetail = getErrorDetailFromException(e); + return { + type: TaskRunResultType.Error, + errorDetail, + }; } } |