diff options
Diffstat (limited to 'packages')
-rw-r--r-- | packages/taler-util/src/TaskThrottler.ts | 160 | ||||
-rw-r--r-- | packages/taler-util/src/index.ts | 1 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/operations/pending.ts | 6 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/pending-types.ts | 5 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 25 |
5 files changed, 192 insertions, 5 deletions
diff --git a/packages/taler-util/src/TaskThrottler.ts b/packages/taler-util/src/TaskThrottler.ts new file mode 100644 index 000000000..e4fb82171 --- /dev/null +++ b/packages/taler-util/src/TaskThrottler.ts @@ -0,0 +1,160 @@ +/* + This file is part of GNU Taler + (C) 2019 GNUnet e.V. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + */ + +import { Logger } from "./logging.js"; +import { AbsoluteTime, Duration } from "./time.js"; + +/** + * Implementation of token bucket throttling. + */ + +/** + * Logger. + */ +const logger = new Logger("OperationThrottler.ts"); + +/** + * Maximum request per second, per origin. + */ +const MAX_PER_SECOND = 100; + +/** + * Maximum request per minute, per origin. + */ +const MAX_PER_MINUTE = 500; + +/** + * Maximum request per hour, per origin. + */ +const MAX_PER_HOUR = 2000; + +/** + * Throttling state for one task. + */ +class TaskState { + tokensSecond: number = MAX_PER_SECOND; + tokensMinute: number = MAX_PER_MINUTE; + tokensHour: number = MAX_PER_HOUR; + lastUpdate = AbsoluteTime.now(); + + private refill(): void { + const now = AbsoluteTime.now(); + if (AbsoluteTime.cmp(now, this.lastUpdate) < 0) { + // Did the system time change? + this.lastUpdate = now; + return; + } + const d = AbsoluteTime.difference(now, this.lastUpdate); + if (d.d_ms === "forever") { + throw Error("assertion failed"); + } + this.tokensSecond = Math.min( + MAX_PER_SECOND, + this.tokensSecond + d.d_ms / 1000, + ); + this.tokensMinute = Math.min( + MAX_PER_MINUTE, + this.tokensMinute + d.d_ms / 1000 / 60, + ); + this.tokensHour = Math.min( + MAX_PER_HOUR, + this.tokensHour + d.d_ms / 1000 / 60 / 60, + ); + this.lastUpdate = now; + } + + /** + * Return true if the request for this origin should be throttled. + * Otherwise, take a token out of the respective buckets. + */ + applyThrottle(): boolean { + this.refill(); + if (this.tokensSecond < 1) { + logger.warn("request throttled (per second limit exceeded)"); + return true; + } + if (this.tokensMinute < 1) { + logger.warn("request throttled (per minute limit exceeded)"); + return true; + } + if (this.tokensHour < 1) { + logger.warn("request throttled (per hour limit exceeded)"); + return true; + } + this.tokensSecond--; + this.tokensMinute--; + this.tokensHour--; + return false; + } +} + +/** + * Request throttler, used as a "last layer of defense" when some + * other part of the re-try logic is broken and we're sending too + * many requests to the same exchange/bank/merchant. + */ +export class TaskThrottler { + private perTaskInfo: { [taskId: string]: TaskState } = {}; + + /** + * Get the throttling state for an origin, or + * initialize if no state is associated with the + * origin yet. + */ + private getState(origin: string): TaskState { + const s = this.perTaskInfo[origin]; + if (s) { + return s; + } + const ns = (this.perTaskInfo[origin] = new TaskState()); + return ns; + } + + /** + * Apply throttling to a request. + * + * @returns whether the request should be throttled. + */ + applyThrottle(taskId: string): boolean { + for (let [k, v] of Object.entries(this.perTaskInfo)) { + // Remove throttled tasks that haven't seen an update in more than one hour. + if ( + Duration.cmp( + AbsoluteTime.difference(v.lastUpdate, AbsoluteTime.now()), + Duration.fromSpec({ hours: 1 }), + ) > 1 + ) { + delete this.perTaskInfo[k]; + } + } + return this.getState(taskId).applyThrottle(); + } + + /** + * Get the throttle statistics for a particular URL. + */ + getThrottleStats(taskId: string): Record<string, unknown> { + const state = this.getState(taskId); + return { + tokensHour: state.tokensHour, + tokensMinute: state.tokensMinute, + tokensSecond: state.tokensSecond, + maxTokensHour: MAX_PER_HOUR, + maxTokensMinute: MAX_PER_MINUTE, + maxTokensSecond: MAX_PER_SECOND, + }; + } +} diff --git a/packages/taler-util/src/index.ts b/packages/taler-util/src/index.ts index 053a25ab7..8db266620 100644 --- a/packages/taler-util/src/index.ts +++ b/packages/taler-util/src/index.ts @@ -32,6 +32,7 @@ export { setPRNG, } from "./nacl-fast.js"; export { RequestThrottler } from "./RequestThrottler.js"; +export { TaskThrottler } from "./TaskThrottler.js"; export * from "./CancellationToken.js"; export * from "./contract-terms.js"; export * from "./base64.js"; diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index 7590280bc..e30958226 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -538,6 +538,7 @@ async function gatherPeerPullInitiationPending( givesLifeness: true, retryInfo: retryRecord?.retryInfo, pursePub: pi.pursePub, + internalOperationStatus: `0x${pi.status.toString(16)}`, }); }, ); @@ -579,12 +580,17 @@ async function gatherPeerPullDebitPending( const timestampDue = timestampOptionalAbsoluteFromDb(retryRecord?.retryInfo.nextRetry) ?? AbsoluteTime.now(); + // switch (pi.status) { + // case PeerPullDebitRecordStatus.DialogProposed: + // return; + // } resp.pendingOperations.push({ type: PendingTaskType.PeerPullDebit, ...getPendingCommon(ws, opId, timestampDue), givesLifeness: true, retryInfo: retryRecord?.retryInfo, peerPullDebitId: pi.peerPullDebitId, + internalOperationStatus: `0x${pi.status.toString(16)}`, }); }, ); diff --git a/packages/taler-wallet-core/src/pending-types.ts b/packages/taler-wallet-core/src/pending-types.ts index e7a40e81b..f8406033a 100644 --- a/packages/taler-wallet-core/src/pending-types.ts +++ b/packages/taler-wallet-core/src/pending-types.ts @@ -234,6 +234,11 @@ export interface PendingTaskInfoCommon { * exceeds a number of retries. */ retryInfo?: DbRetryInfo; + + /** + * Internal operation status for debugging. + */ + internalOperationStatus?: string; } /** diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index e917e8059..978ce4c39 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -133,6 +133,8 @@ import { ListExchangesForScopedCurrencyRequest, ExchangesShortListResponse, AmountString, + RequestThrottler, + TaskThrottler, } from "@gnu-taler/taler-util"; import type { HttpRequestLibrary } from "@gnu-taler/taler-util/http"; import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http"; @@ -424,6 +426,7 @@ async function runTaskLoop( "task loop already running, nesting the wallet-core task loop is deprecated and should be avoided", ); } + const throttler = new TaskThrottler(); ws.isTaskLoopRunning = true; let retriesExceeded = false; for (let iteration = 0; !ws.stopped; iteration++) { @@ -431,6 +434,7 @@ async function runTaskLoop( logger.trace(`pending operations: ${j2s(pending)}`); let numGivingLiveness = 0; let numDue = 0; + let numThrottled = 0; let minDue: AbsoluteTime = AbsoluteTime.never(); for (const p of pending.pendingOperations) { @@ -449,12 +453,23 @@ async function runTaskLoop( if (!p.isDue) { continue; } - minDue = AbsoluteTime.min(minDue, p.timestampDue); numDue++; + + const isThrottled = throttler.applyThrottle(p.id); + + if (isThrottled) { + logger.warn( + `task ${p.id} throttled, this is very likely a bug in wallet-core, please report`, + ); + numDue--; + numThrottled++; + } else { + minDue = AbsoluteTime.min(minDue, p.timestampDue); + } } logger.trace( - `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue}`, + `running task loop, iter=${iteration}, #tasks=${pending.pendingOperations.length} #lifeness=${numGivingLiveness}, #due=${numDue} #trottled=${numThrottled}`, ); if (opts.stopWhenDone && numGivingLiveness === 0 && iteration !== 0) { @@ -932,9 +947,9 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> { ageCommitmentProof: c.ageCommitmentProof, spend_allocation: c.spendAllocation ? { - amount: c.spendAllocation.amount, - id: c.spendAllocation.id, - } + amount: c.spendAllocation.amount, + id: c.spendAllocation.id, + } : undefined, }); } |