aboutsummaryrefslogtreecommitdiff
path: root/packages
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-11-07 11:52:08 +0100
committerFlorian Dold <florian@dold.me>2023-11-07 11:52:08 +0100
commit96e8ba5c239c8bcb40cbe330ee599b132005e4de (patch)
treef499c0ac7b1a0c1f592d615fb4a9ff6d7b2f409d /packages
parent7436d8d8612006b5f7917c63ee6bfe18b06281c9 (diff)
downloadwallet-core-96e8ba5c239c8bcb40cbe330ee599b132005e4de.tar.xz
wallet-core: throttle tasks
Diffstat (limited to 'packages')
-rw-r--r--packages/taler-util/src/TaskThrottler.ts160
-rw-r--r--packages/taler-util/src/index.ts1
-rw-r--r--packages/taler-wallet-core/src/operations/pending.ts6
-rw-r--r--packages/taler-wallet-core/src/pending-types.ts5
-rw-r--r--packages/taler-wallet-core/src/wallet.ts25
5 files changed, 192 insertions, 5 deletions
diff --git a/packages/taler-util/src/TaskThrottler.ts b/packages/taler-util/src/TaskThrottler.ts
new file mode 100644
index 000000000..e4fb82171
--- /dev/null
+++ b/packages/taler-util/src/TaskThrottler.ts
@@ -0,0 +1,160 @@
+/*
+ This file is part of GNU Taler
+ (C) 2019 GNUnet e.V.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+import { Logger } from "./logging.js";
+import { AbsoluteTime, Duration } from "./time.js";
+
+/**
+ * Implementation of token bucket throttling.
+ */
+
+/**
+ * Logger.
+ */
+const logger = new Logger("OperationThrottler.ts");
+
+/**
+ * Maximum request per second, per origin.
+ */
+const MAX_PER_SECOND = 100;
+
+/**
+ * Maximum request per minute, per origin.
+ */
+const MAX_PER_MINUTE = 500;
+
+/**
+ * Maximum request per hour, per origin.
+ */
+const MAX_PER_HOUR = 2000;
+
+/**
+ * Throttling state for one task.
+ */
+class TaskState {
+ tokensSecond: number = MAX_PER_SECOND;
+ tokensMinute: number = MAX_PER_MINUTE;
+ tokensHour: number = MAX_PER_HOUR;
+ lastUpdate = AbsoluteTime.now();
+
+ private refill(): void {
+ const now = AbsoluteTime.now();
+ if (AbsoluteTime.cmp(now, this.lastUpdate) < 0) {
+ // Did the system time change?
+ this.lastUpdate = now;
+ return;
+ }
+ const d = AbsoluteTime.difference(now, this.lastUpdate);
+ if (d.d_ms === "forever") {
+ throw Error("assertion failed");
+ }
+ this.tokensSecond = Math.min(
+ MAX_PER_SECOND,
+ this.tokensSecond + d.d_ms / 1000,
+ );
+ this.tokensMinute = Math.min(
+ MAX_PER_MINUTE,
+ this.tokensMinute + d.d_ms / 1000 / 60,
+ );
+ this.tokensHour = Math.min(
+ MAX_PER_HOUR,
+ this.tokensHour + d.d_ms / 1000 / 60 / 60,
+ );
+ this.lastUpdate = now;
+ }
+
+ /**
+ * Return true if the request for this origin should be throttled.
+ * Otherwise, take a token out of the respective buckets.
+ */
+ applyThrottle(): boolean {
+ this.refill();
+ if (this.tokensSecond < 1) {
+ logger.warn("request throttled (per second limit exceeded)");
+ return true;
+ }
+ if (this.tokensMinute < 1) {
+ logger.warn("request throttled (per minute limit exceeded)");
+ return true;
+ }
+ if (this.tokensHour < 1) {
+ logger.warn("request throttled (per hour limit exceeded)");
+ return true;
+ }
+ this.tokensSecond--;
+ this.tokensMinute--;
+ this.tokensHour--;
+ return false;
+ }
+}
+
+/**
+ * Request throttler, used as a "last layer of defense" when some
+ * other part of the re-try logic is broken and we're sending too
+ * many requests to the same exchange/bank/merchant.
+ */
+export class TaskThrottler {
+ private perTaskInfo: { [taskId: string]: TaskState } = {};
+
+ /**
+ * Get the throttling state for an origin, or
+ * initialize if no state is associated with the
+ * origin yet.
+ */
+ private getState(origin: string): TaskState {
+ const s = this.perTaskInfo[origin];
+ if (s) {
+ return s;
+ }
+ const ns = (this.perTaskInfo[origin] = new TaskState());
+ return ns;
+ }
+
+ /**
+ * Apply throttling to a request.
+ *
+ * @returns whether the request should be throttled.
+ */
+ applyThrottle(taskId: string): boolean {
+ for (let [k, v] of Object.entries(this.perTaskInfo)) {
+ // Remove throttled tasks that haven't seen an update in more than one hour.
+ if (
+ Duration.cmp(
+ AbsoluteTime.difference(v.lastUpdate, AbsoluteTime.now()),
+ Duration.fromSpec({ hours: 1 }),
+ ) > 1
+ ) {
+ delete this.perTaskInfo[k];
+ }
+ }
+ return this.getState(taskId).applyThrottle();
+ }
+
+ /**
+ * Get the throttle statistics for a particular URL.
+ */
+ getThrottleStats(taskId: string): Record<string, unknown> {
+ const state = this.getState(taskId);
+ return {
+ tokensHour: state.tokensHour,
+ tokensMinute: state.tokensMinute,
+ tokensSecond: state.tokensSecond,
+ maxTokensHour: MAX_PER_HOUR,
+ maxTokensMinute: MAX_PER_MINUTE,
+ maxTokensSecond: MAX_PER_SECOND,
+ };
+ }
+}
diff --git a/packages/taler-util/src/index.ts b/packages/taler-util/src/index.ts
index 053a25ab7..8db266620 100644
--- a/packages/taler-util/src/index.ts
+++ b/packages/taler-util/src/index.ts
@@ -32,6 +32,7 @@ export {
setPRNG,
} from "./nacl-fast.js";
export { RequestThrottler } from "./RequestThrottler.js";
+export { TaskThrottler } from "./TaskThrottler.js";
export * from "./CancellationToken.js";
export * from "./contract-terms.js";
export * from "./base64.js";
diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts
index 7590280bc..e30958226 100644
--- a/packages/taler-wallet-core/src/operations/pending.ts
+++ b/packages/taler-wallet-core/src/operations/pending.ts
@@ -538,6 +538,7 @@ async function gatherPeerPullInitiationPending(
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
pursePub: pi.pursePub,
+ internalOperationStatus: `0x${pi.status.toString(16)}`,
});
},
);
@@ -579,12 +580,17 @@ async function gatherPeerPullDebitPending(
const timestampDue =
timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ??
AbsoluteTime.now();
+ // switch (pi.status) {
+ // case PeerPullDebitRecordStatus.DialogProposed:
+ // return;
+ // }
resp.pendingOperations.push({
type: PendingTaskType.PeerPullDebit,
...getPendingCommon(ws, opId, timestampDue),
givesLifeness: true,
retryInfo: retryRecord?.retryInfo,
peerPullDebitId: pi.peerPullDebitId,
+ internalOperationStatus: `0x${pi.status.toString(16)}`,
});
},
);
diff --git a/packages/taler-wallet-core/src/pending-types.ts b/packages/taler-wallet-core/src/pending-types.ts
index e7a40e81b..f8406033a 100644
--- a/packages/taler-wallet-core/src/pending-types.ts
+++ b/packages/taler-wallet-core/src/pending-types.ts
@@ -234,6 +234,11 @@ export interface PendingTaskInfoCommon {
* exceeds a number of retries.
*/
retryInfo?: DbRetryInfo;
+
+ /**
+ * Internal operation status for debugging.
+ */
+ internalOperationStatus?: string;
}
/**
diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts
index e917e8059..978ce4c39 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -133,6 +133,8 @@ import {
ListExchangesForScopedCurrencyRequest,
ExchangesShortListResponse,
AmountString,
+ RequestThrottler,
+ TaskThrottler,
} from "@gnu-taler/taler-util";
import type { HttpRequestLibrary } from "@gnu-taler/taler-util/http";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
@@ -424,6 +426,7 @@ async function runTaskLoop(
"task loop already running, nesting the wallet-core task loop is deprecated and should be avoided",
);
}
+ const throttler = new TaskThrottler();
ws.isTaskLoopRunning = true;
let retriesExceeded = false;
for (let iteration = 0; !ws.stopped; iteration++) {
@@ -431,6 +434,7 @@ async function runTaskLoop(
logger.trace(`pending operations: ${j2s(pending)}`);
let numGivingLiveness = 0;
let numDue = 0;
+ let numThrottled = 0;
let minDue: AbsoluteTime = AbsoluteTime.never();
for (const p of pending.pendingOperations) {
@@ -449,12 +453,23 @@ async function runTaskLoop(
if (!p.isDue) {
continue;
}
- minDue = AbsoluteTime.min(minDue, p.timestampDue);
numDue++;
+
+ const isThrottled = throttler.applyThrottle(p.id);
+
+ if (isThrottled) {
+ logger.warn(
+ `task ${p.id} throttled, this is very likely a bug in wallet-core, please report`,
+ );
+ numDue--;
+ numThrottled++;
+ } else {
+ minDue = AbsoluteTime.min(minDue, p.timestampDue);
+ }
}
logger.trace(
- `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue}`,
+ `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue} #trottled=${numThrottled}`,
);
if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) {
@@ -932,9 +947,9 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> {
ageCommitmentProof: c.ageCommitmentProof,
spend_allocation: c.spendAllocation
? {
- amount: c.spendAllocation.amount,
- id: c.spendAllocation.id,
- }
+ amount: c.spendAllocation.amount,
+ id: c.spendAllocation.id,
+ }
: undefined,
});
}