aboutsummaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2024-06-18 17:41:09 +0200
committerFlorian Dold <florian@dold.me>2024-06-18 17:41:09 +0200
commit3913d0117bd034b5d7467eeed6b10c59dae90ce8 (patch)
treef265f12ac42ef40db8580103f6e19d86f7a3dab7 /packages
parente925f1d904ab3d81ab59133d74f94bbff52cb95b (diff)
downloadwallet-core-3913d0117bd034b5d7467eeed6b10c59dae90ce8.tar.xz
wallet-core: round-robin waiting for long-polling
With dynamic long-poll timeout computation
Diffstat (limited to 'packages')
-rw-r--r--packages/taler-harness/src/bench2.ts1
-rw-r--r--packages/taler-wallet-core/src/dbless.ts44
-rw-r--r--packages/taler-wallet-core/src/deposits.ts34
-rw-r--r--packages/taler-wallet-core/src/pay-merchant.ts21
-rw-r--r--packages/taler-wallet-core/src/pay-peer-pull-credit.ts30
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-credit.ts16
-rw-r--r--packages/taler-wallet-core/src/pay-peer-push-debit.ts17
-rw-r--r--packages/taler-wallet-core/src/wallet.ts62
-rw-r--r--packages/taler-wallet-core/src/withdraw.ts38
9 files changed, 190 insertions, 73 deletions
diff --git a/packages/taler-harness/src/bench2.ts b/packages/taler-harness/src/bench2.ts
index 90924caec..dc360ae62 100644
--- a/packages/taler-harness/src/bench2.ts
+++ b/packages/taler-harness/src/bench2.ts
@@ -30,7 +30,6 @@ import {
applyRunConfigDefaults,
CryptoDispatcher,
SynchronousCryptoWorkerFactoryPlain,
- Wallet,
} from "@gnu-taler/taler-wallet-core";
import {
checkReserve,
diff --git a/packages/taler-wallet-core/src/dbless.ts b/packages/taler-wallet-core/src/dbless.ts
index ec9655e6f..bc0b6e428 100644
--- a/packages/taler-wallet-core/src/dbless.ts
+++ b/packages/taler-wallet-core/src/dbless.ts
@@ -56,9 +56,9 @@ import {
} from "@gnu-taler/taler-util/http";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
import { DenominationRecord } from "./db.js";
+import { isWithdrawableDenom } from "./denominations.js";
import { ExchangeInfo, downloadExchangeInfo } from "./exchanges.js";
import { assembleRefreshRevealRequest } from "./refresh.js";
-import { isWithdrawableDenom } from "./denominations.js";
import { getBankStatusUrl, getBankWithdrawalInfo } from "./withdraw.js";
export { downloadExchangeInfo };
@@ -85,26 +85,6 @@ export interface CoinInfo {
maxAge: number;
}
-/**
- * Check the status of a reserve, use long-polling to wait
- * until the reserve actually has been created.
- */
-export async function checkReserve(
- http: HttpRequestLibrary,
- exchangeBaseUrl: string,
- reservePub: string,
- longpollTimeoutMs: number = 500,
-): Promise<void> {
- const reqUrl = new URL(`reserves/${reservePub}`, exchangeBaseUrl);
- if (longpollTimeoutMs) {
- reqUrl.searchParams.set("timeout_ms", `${longpollTimeoutMs}`);
- }
- const resp = await http.fetch(reqUrl.href, { method: "GET" });
- if (resp.status !== 200) {
- throw new Error("reserve not okay");
- }
-}
-
export interface TopupReserveWithBankArgs {
http: HttpRequestLibrary;
reservePub: string;
@@ -415,3 +395,25 @@ export async function createTestingReserve(args: {
);
await readSuccessResponseJsonOrThrow(fbReq, codecForAny());
}
+
+/**
+ * Check the status of a reserve, use long-polling to wait
+ * until the reserve actually has been created.
+ */
+export async function checkReserve(
+ http: HttpRequestLibrary,
+ exchangeBaseUrl: string,
+ reservePub: string,
+ longpollTimeoutMs: number = 500,
+): Promise<void> {
+ const reqUrl = new URL(`reserves/${reservePub}`, exchangeBaseUrl);
+ if (longpollTimeoutMs) {
+ reqUrl.searchParams.set("timeout_ms", `${longpollTimeoutMs}`);
+ }
+ const resp = await http.fetch(reqUrl.href, {
+ method: "GET",
+ });
+ if (resp.status !== 200) {
+ throw new Error("reserve not okay");
+ }
+}
diff --git a/packages/taler-wallet-core/src/deposits.ts b/packages/taler-wallet-core/src/deposits.ts
index 6394fdc78..919536c96 100644
--- a/packages/taler-wallet-core/src/deposits.ts
+++ b/packages/taler-wallet-core/src/deposits.ts
@@ -669,12 +669,19 @@ async function processDepositGroupPendingKyc(
`kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
kycInfo.exchangeBaseUrl,
);
- url.searchParams.set("timeout_ms", "10000");
- logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await wex.http.fetch(url.href, {
- method: "GET",
- cancellationToken: wex.cancellationToken,
- });
+
+ const kycStatusRes = await wex.ws.runLongpollQueueing(
+ url.hostname,
+ async (timeoutMs) => {
+ url.searchParams.set("timeout_ms", `${timeoutMs}`);
+ logger.info(`kyc url ${url.href}`);
+ return await wex.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
+
if (
kycStatusRes.status === HttpStatusCode.Ok ||
//FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
@@ -1278,11 +1285,16 @@ async function trackDeposit(
wireHash,
});
url.searchParams.set("merchant_sig", sigResp.sig);
- url.searchParams.set("timeout_ms", "30000");
- const httpResp = await wex.http.fetch(url.href, {
- method: "GET",
- cancellationToken: wex.cancellationToken,
- });
+ const httpResp = await wex.ws.runLongpollQueueing(
+ url.hostname,
+ async (timeoutMs) => {
+ url.searchParams.set("timeout_ms", `${timeoutMs}`);
+ return await wex.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
logger.trace(`deposits response status: ${httpResp.status}`);
switch (httpResp.status) {
case HttpStatusCode.Accepted: {
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts b/packages/taler-wallet-core/src/pay-merchant.ts
index 28fb204dd..b5d2670ad 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -95,6 +95,7 @@ import {
} from "@gnu-taler/taler-util";
import {
getHttpResponseErrorDetails,
+ HttpResponse,
readSuccessResponseJsonOrErrorCode,
readSuccessResponseJsonOrThrow,
readTalerErrorResponse,
@@ -2870,14 +2871,24 @@ async function checkIfOrderIsAlreadyPaid(
);
requestUrl.searchParams.set("h_contract", contract.contractTermsHash);
+ let resp: HttpResponse;
+
if (doLongPolling) {
- requestUrl.searchParams.set("timeout_ms", "30000");
+ resp = await wex.ws.runLongpollQueueing(
+ requestUrl.hostname,
+ async (timeoutMs) => {
+ requestUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
+ return await wex.http.fetch(requestUrl.href, {
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
+ } else {
+ resp = await wex.http.fetch(requestUrl.href, {
+ cancellationToken: wex.cancellationToken,
+ });
}
- const resp = await wex.http.fetch(requestUrl.href, {
- cancellationToken: wex.cancellationToken,
- });
-
if (
resp.status === HttpStatusCode.Ok ||
resp.status === HttpStatusCode.Accepted ||
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
index 3e7fdd36b..ae09a1280 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-credit.ts
@@ -374,10 +374,15 @@ async function queryPurseForPeerPullCredit(
);
purseDepositUrl.searchParams.set("timeout_ms", "30000");
logger.info(`querying purse status via ${purseDepositUrl.href}`);
- const resp = await wex.http.fetch(purseDepositUrl.href, {
- timeout: { d_ms: 60000 },
- cancellationToken: wex.cancellationToken,
- });
+ const resp = await wex.ws.runLongpollQueueing(
+ purseDepositUrl.hostname,
+ async () => {
+ return await wex.http.fetch(purseDepositUrl.href, {
+ timeout: { d_ms: 60000 },
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
const transactionId = constructTransactionIdentifier({
tag: TransactionType.PeerPullCredit,
pursePub: pullIni.pursePub,
@@ -488,12 +493,17 @@ async function longpollKycStatus(
`kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
exchangeUrl,
);
- url.searchParams.set("timeout_ms", "10000");
- logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await wex.http.fetch(url.href, {
- method: "GET",
- cancellationToken: wex.cancellationToken,
- });
+ const kycStatusRes = await wex.ws.runLongpollQueueing(
+ url.hostname,
+ async (timeoutMs) => {
+ url.searchParams.set("timeout_ms", `${timeoutMs}`);
+ logger.info(`kyc url ${url.href}`);
+ return await wex.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
if (
kycStatusRes.status === HttpStatusCode.Ok ||
// FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
diff --git a/packages/taler-wallet-core/src/pay-peer-push-credit.ts b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
index 14f32b545..5e8fc5941 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-credit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-credit.ts
@@ -537,12 +537,18 @@ async function longpollKycStatus(
`kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
exchangeUrl,
);
- url.searchParams.set("timeout_ms", "30000");
logger.info(`kyc url ${url.href}`);
- const kycStatusRes = await wex.http.fetch(url.href, {
- method: "GET",
- cancellationToken: wex.cancellationToken,
- });
+ const kycStatusRes = await wex.ws.runLongpollQueueing(
+ url.hostname,
+ async (timeoutMs) => {
+ url.searchParams.set("timeout_ms", `${timeoutMs}`);
+ return await wex.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
+
if (
kycStatusRes.status === HttpStatusCode.Ok ||
//FIXME: NoContent is not expected https://docs.taler.net/core/api-exchange.html#post--purses-$PURSE_PUB-merge
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index 6603cc4f3..544273552 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -944,12 +944,17 @@ async function processPeerPushDebitReady(
`purses/${pursePub}/merge`,
peerPushInitiation.exchangeBaseUrl,
);
- mergeUrl.searchParams.set("timeout_ms", "30000");
- logger.info(`long-polling on purse status at ${mergeUrl.href}`);
- const resp = await wex.http.fetch(mergeUrl.href, {
- // timeout: getReserveRequestTimeout(withdrawalGroup),
- cancellationToken: wex.cancellationToken,
- });
+ const resp = await wex.ws.runLongpollQueueing(
+ mergeUrl.hostname,
+ async (timeoutMs) => {
+ mergeUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
+ logger.info(`long-polling on purse status at ${mergeUrl.href}`);
+ return await wex.http.fetch(mergeUrl.href, {
+ // timeout: getReserveRequestTimeout(withdrawalGroup),
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
if (resp.status === HttpStatusCode.Ok) {
const purseStatus = await readSuccessResponseJsonOrThrow(
resp,
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index 69f0e5f0b..fb0bd9f62 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -1916,6 +1916,16 @@ class WalletDbTriggerSpec implements TriggerSpec {
}
}
+type LongpollRunFn<T> = (timeoutMs: number) => Promise<T>;
+type ResolveFn = () => void;
+
+/**
+ * Per-hostname state for longpolling.
+ */
+interface LongpollState {
+ queue: Array<ResolveFn>;
+}
+
/**
* Internal state of the wallet.
*
@@ -1977,6 +1987,58 @@ export class InternalWalletState {
return this._dbAccessHandle;
}
+ private longpollStatePerHostname: Map<string, LongpollState> = new Map();
+
+ private longpollRequestIdCounter = 1;
+
+ /**
+ * Run a long-polling request, potentially queueing the request
+ * if too many other long-polling requests against the same hostname
+ * (or too many overall) are active.
+ */
+ async runLongpollQueueing<T>(
+ hostname: string,
+ f: LongpollRunFn<T>,
+ ): Promise<T> {
+ let rid = this.longpollRequestIdCounter++;
+ const doRun: () => Promise<T> = async () => {
+ const st = this.longpollStatePerHostname.get(hostname);
+ const numWaiting = st?.queue.length ?? 0;
+ logger.info(
+ `running long-poll ${rid} to ${hostname} with ${numWaiting} waiting`,
+ );
+ try {
+ const timeoutMs = Math.round(Math.max(10000, 30000 / (numWaiting + 1)));
+ return await f(timeoutMs);
+ } finally {
+ logger.info(
+ `cleaning up after long-poll ${rid} request to ${hostname}`,
+ );
+ if (st) {
+ const next = st.queue.shift();
+ if (next) {
+ next();
+ } else {
+ this.longpollStatePerHostname.delete(hostname);
+ }
+ }
+ }
+ };
+ const state = this.longpollStatePerHostname.get(hostname);
+ if (state) {
+ logger.info(`long-poll request ${rid} to ${hostname} queued`);
+ const promcap = openPromise<void>();
+ state.queue.push(promcap.resolve);
+ return promcap.promise.then(doRun);
+ } else {
+ logger.info(`directly running long-poll request ${rid} to ${hostname}`);
+ this.longpollStatePerHostname.set(hostname, {
+ queue: [],
+ });
+ return Promise.resolve().then(doRun);
+ }
+ }
+
devExperimentState: DevExperimentState = {};
clientCancellationMap: Map<string, CancellationToken.Source> = new Map();
diff --git a/packages/taler-wallet-core/src/withdraw.ts b/packages/taler-wallet-core/src/withdraw.ts
index 2af8807cc..7ba69d2c1 100644
--- a/packages/taler-wallet-core/src/withdraw.ts
+++ b/packages/taler-wallet-core/src/withdraw.ts
@@ -45,7 +45,6 @@ import {
EddsaPrivateKeyString,
ExchangeBatchWithdrawRequest,
ExchangeListItem,
- ExchangeTosStatus,
ExchangeUpdateStatus,
ExchangeWireAccount,
ExchangeWithdrawBatchResponse,
@@ -1614,14 +1613,18 @@ async function processQueryReserve(
`reserves/${reservePub}`,
withdrawalGroup.exchangeBaseUrl,
);
- reserveUrl.searchParams.set("timeout_ms", "30000");
- logger.trace(`querying reserve status via ${reserveUrl.href}`);
-
- const resp = await wex.http.fetch(reserveUrl.href, {
- timeout: getReserveRequestTimeout(withdrawalGroup),
- cancellationToken: wex.cancellationToken,
- });
+ const resp = await wex.ws.runLongpollQueueing(
+ reserveUrl.hostname,
+ async (timeoutMs) => {
+ reserveUrl.searchParams.set("timeout_ms", `${timeoutMs}`);
+ logger.trace(`querying reserve status via ${reserveUrl.href}`);
+ return await wex.http.fetch(reserveUrl.href, {
+ timeout: getReserveRequestTimeout(withdrawalGroup),
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
logger.trace(`reserve status code: HTTP ${resp.status}`);
@@ -1760,12 +1763,19 @@ async function processWithdrawalGroupPendingKyc(
`kyc-check/${kycInfo.requirementRow}/${kycInfo.paytoHash}/${userType}`,
exchangeUrl,
);
- url.searchParams.set("timeout_ms", "30000");
- logger.info(`long-polling for withdrawal KYC status via ${url.href}`);
- const kycStatusRes = await wex.http.fetch(url.href, {
- method: "GET",
- cancellationToken: wex.cancellationToken,
- });
+
+ const kycStatusRes = await wex.ws.runLongpollQueueing(
+ url.hostname,
+ async (timeoutMs) => {
+ url.searchParams.set("timeout_ms", `${timeoutMs}`);
+ logger.info(`long-polling for withdrawal KYC status via ${url.href}`);
+ return await wex.http.fetch(url.href, {
+ method: "GET",
+ cancellationToken: wex.cancellationToken,
+ });
+ },
+ );
+
logger.info(`kyc long-polling response status: HTTP ${kycStatusRes.status}`);
if (
kycStatusRes.status === HttpStatusCode.Ok ||