diff options
Diffstat (limited to 'packages/taler-wallet-core')
-rw-r--r-- | packages/taler-wallet-core/src/common.ts | 15 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/shepherd.ts | 57 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/withdraw.ts | 28 |
3 files changed, 71 insertions, 29 deletions
diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index 9d7f2e763..45351f680 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -350,6 +350,7 @@ export enum TaskRunResultType { Backoff = "backoff", Progress = "progress", Error = "error", + LongpollReturnedPending = "longpoll-returned-pending", ScheduleLater = "schedule-later", } @@ -358,6 +359,7 @@ export type TaskRunResult = | TaskRunErrorResult | TaskRunBackoffResult | TaskRunProgressResult + | TaskRunLongpollReturnedPendingResult | TaskRunScheduleLaterResult; export namespace TaskRunResult { @@ -396,6 +398,15 @@ export namespace TaskRunResult { runAt, }; } + /** + * Longpolling returned, but what we're waiting for + * is still pending on the other side. + */ + export function longpollReturnedPending(): TaskRunLongpollReturnedPendingResult { + return { + type: TaskRunResultType.LongpollReturnedPending, + }; + } } export interface TaskRunFinishedResult { @@ -415,6 +426,10 @@ export interface TaskRunScheduleLaterResult { runAt: AbsoluteTime; } +export interface TaskRunLongpollReturnedPendingResult { + type: TaskRunResultType.LongpollReturnedPending; +} + export interface TaskRunErrorResult { type: TaskRunResultType.Error; errorDetail: TalerErrorDetail; diff --git a/packages/taler-wallet-core/src/shepherd.ts b/packages/taler-wallet-core/src/shepherd.ts index d6fc604e8..0639b7976 100644 --- a/packages/taler-wallet-core/src/shepherd.ts +++ b/packages/taler-wallet-core/src/shepherd.ts @@ -227,6 +227,18 @@ export class TaskScheduler { this.startShepherdTask(taskId); } + private async wait( + taskId: TaskId, + info: ShepherdInfo, + delay: Duration, + ): Promise<void> { + try { + await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay)); + } catch (e) { + logger.info(`waiting for ${taskId} interrupted`); + } + } + private async internalShepherdTask( taskId: TaskId, info: ShepherdInfo, @@ -250,6 +262,7 @@ export class TaskScheduler { Duration.fromSpec({ seconds: 60 }), ); } + const startTime = AbsoluteTime.now(); logger.trace(`Shepherd for ${taskId} will call handler`); // FIXME: This should already return the retry record. const res = await runTaskWithErrorReporting(this.ws, taskId, async () => { @@ -273,13 +286,7 @@ export class TaskScheduler { const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); } else { logger.trace("Retrying immediately."); } @@ -292,13 +299,7 @@ export class TaskScheduler { const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry); delay = AbsoluteTime.remaining(t); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); } else { logger.trace("Retrying immediately."); } @@ -314,17 +315,27 @@ export class TaskScheduler { logger.trace(`Shepherd for ${taskId} got schedule-later result.`); const delay = AbsoluteTime.remaining(res.runAt); logger.trace(`Waiting for ${delay.d_ms} ms`); - try { - await info.cts.token.racePromise( - this.ws.timerGroup.resolveAfter(delay), - ); - } catch (e) { - logger.info(`waiting for ${taskId} interrupted`); - } + await this.wait(taskId, info, delay); break; case TaskRunResultType.Finished: logger.trace(`Shepherd for ${taskId} got finished result.`); return; + case TaskRunResultType.LongpollReturnedPending: { + // Make sure that we are waiting a bit if long-polling returned too early. + const endTime = AbsoluteTime.now(); + const taskDuration = AbsoluteTime.difference(endTime, startTime); + if ( + Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0 + ) { + logger.info( + `long-poller for ${taskId} returned unexpectedly early (${taskDuration.d_ms} ms), waiting 10 seconds`, + ); + await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 })); + } else { + logger.info(`task ${taskId} will long-poll again`); + } + break; + } default: assertUnreachable(res); } @@ -435,6 +446,10 @@ async function runTaskWithErrorReporting( case TaskRunResultType.Progress: await storeTaskProgress(ws, opId); return resp; + case TaskRunResultType.LongpollReturnedPending: + // Longpoll should be run again immediately. + await storeTaskProgress(ws, opId); + return resp; } } catch (e) { if (e instanceof CryptoApiStoppedError) { diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts index 9cf1ad36d..bfcf23588 100644 --- a/packages/taler-wallet-core/src/withdraw.ts +++ b/packages/taler-wallet-core/src/withdraw.ts @@ -911,6 +911,7 @@ async function processPlanchetExchangeBatchRequest( ws: InternalWalletState, wgContext: WithdrawalGroupContext, args: WithdrawalRequestBatchArgs, + cancellationToken: CancellationToken, ): Promise<WithdrawalBatchResult> { const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; logger.info( @@ -997,6 +998,8 @@ async function processPlanchetExchangeBatchRequest( const resp = await ws.http.fetch(reqUrl, { method: "POST", body: batchReq, + cancellationToken, + timeout: Duration.fromSpec({ seconds: 40 }), }); if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { await handleKycRequired(ws, withdrawalGroup, resp, 0, requestCoinIdxs); @@ -1300,7 +1303,7 @@ async function processQueryReserve( `got reserve status error, EC=${result.talerErrorResponse.code}`, ); if (resp.status === HttpStatusCode.NotFound) { - return TaskRunResult.backoff(); + return TaskRunResult.longpollReturnedPending(); } else { throwUnexpectedRequestError(resp, result.talerErrorResponse); } @@ -1491,6 +1494,7 @@ async function processWithdrawalGroupPendingKyc( async function processWithdrawalGroupPendingReady( ws: InternalWalletState, withdrawalGroup: WithdrawalGroupRecord, + cancellationToken: CancellationToken, ): Promise<TaskRunResult> { const { withdrawalGroupId } = withdrawalGroup; const transactionId = constructTransactionIdentifier({ @@ -1553,10 +1557,15 @@ async function processWithdrawalGroupPendingReady( const maxBatchSize = 100; for (let i = 0; i < numTotalCoins; i += maxBatchSize) { - const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, { - batchSize: maxBatchSize, - coinStartIndex: i, - }); + const resp = await processPlanchetExchangeBatchRequest( + ws, + wgContext, + { + batchSize: maxBatchSize, + coinStartIndex: i, + }, + cancellationToken, + ); let work: Promise<void>[] = []; work = []; for (let j = 0; j < resp.coinIdxs.length; j++) { @@ -1688,7 +1697,11 @@ export async function processWithdrawalGroup( ); case WithdrawalGroupStatus.PendingReady: // Continue with the actual withdrawal! - return await processWithdrawalGroupPendingReady(ws, withdrawalGroup); + return await processWithdrawalGroupPendingReady( + ws, + withdrawalGroup, + cancellationToken, + ); case WithdrawalGroupStatus.AbortingBank: return await processWithdrawalGroupAbortingBank(ws, withdrawalGroup); case WithdrawalGroupStatus.AbortedBank: @@ -2265,8 +2278,7 @@ async function processReserveBankStatus( } if (!status.transfer_done) { - // FIXME: This is a long-poll result - return TaskRunResult.backoff(); + return TaskRunResult.longpollReturnedPending(); } const transitionInfo = await ws.db.runReadWriteTx( |