diff options
-rw-r--r-- | packages/taler-wallet-android/src/index.ts | 15 | ||||
-rw-r--r-- | packages/taler-wallet-cli/src/index.ts | 18 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/common.ts | 123 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/headless/helpers.ts | 5 | ||||
-rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 202 | ||||
-rw-r--r-- | packages/taler-wallet-webextension/src/wxBackend.ts | 16 |
6 files changed, 253 insertions, 126 deletions
diff --git a/packages/taler-wallet-android/src/index.ts b/packages/taler-wallet-android/src/index.ts index 2e24a7c41..94774bcf4 100644 --- a/packages/taler-wallet-android/src/index.ts +++ b/packages/taler-wallet-android/src/index.ts @@ -33,8 +33,7 @@ import { WALLET_EXCHANGE_PROTOCOL_VERSION, WALLET_MERCHANT_PROTOCOL_VERSION, runRetryLoop, - handleCoreApiRequest, - InternalWalletState, + Wallet, } from "@gnu-taler/taler-wallet-core"; import fs from "fs"; @@ -156,8 +155,8 @@ function sendAkonoMessage(ev: CoreApiEnvelope): void { class AndroidWalletMessageHandler { walletArgs: DefaultNodeWalletArgs | undefined; - maybeWallet: InternalWalletState | undefined; - wp = openPromise<InternalWalletState>(); + maybeWallet: Wallet | undefined; + wp = openPromise<Wallet>(); httpLib = new NodeHttpLib(); /** @@ -180,8 +179,8 @@ class AndroidWalletMessageHandler { const reinit = async () => { const w = await getDefaultNodeWallet(this.walletArgs); this.maybeWallet = w; - await handleCoreApiRequest(w, "initWallet", "akono-init", {}); - runRetryLoop(w).catch((e) => { + await w.handleCoreApiRequest("initWallet", "akono-init", {}); + w.runRetryLoop().catch((e) => { console.error("Error during wallet retry loop", e); }); this.wp.resolve(w); @@ -230,14 +229,14 @@ class AndroidWalletMessageHandler { } const wallet = await this.wp.promise; wallet.stop(); - this.wp = openPromise<InternalWalletState>(); + this.wp = openPromise<Wallet>(); this.maybeWallet = undefined; await reinit(); return wrapResponse({}); } default: { const wallet = await this.wp.promise; - return await handleCoreApiRequest(wallet, operation, id, args); + return await wallet.handleCoreApiRequest(operation, id, args); } } } diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts index 3949444a1..d4e5bbe46 100644 --- a/packages/taler-wallet-cli/src/index.ts +++ b/packages/taler-wallet-cli/src/index.ts @@ -51,7 +51,7 @@ import { getClientFromWalletState, WalletApiOperation, WalletCoreApiClient, - InternalWalletState, + Wallet, } from "@gnu-taler/taler-wallet-core"; // This module also serves as the entry point for the crypto @@ -172,10 +172,7 @@ type WalletCliArgsType = clk.GetArgType<typeof walletCli>; async function withWallet<T>( walletCliArgs: WalletCliArgsType, - f: (w: { - client: WalletCoreApiClient; - ws: InternalWalletState; - }) => Promise<T>, + f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>, ): Promise<T> { const dbPath = walletCliArgs.wallet.walletDbFile ?? defaultWalletDbPath; const myHttpLib = new NodeHttpLib(); @@ -190,7 +187,7 @@ async function withWallet<T>( try { const w = { ws: wallet, - client: await getClientFromWalletState(wallet), + client: wallet.client, }; const ret = await f(w); return ret; @@ -242,8 +239,7 @@ walletCli console.error("Invalid JSON"); process.exit(1); } - const resp = await handleCoreApiRequest( - wallet.ws, + const resp = await wallet.ws.handleCoreApiRequest( args.api.operation, "reqid-1", requestJson, @@ -294,7 +290,7 @@ walletCli .flag("forceNow", ["-f", "--force-now"]) .action(async (args) => { await withWallet(args, async (wallet) => { - await runPending(wallet.ws, args.runPendingOpt.forceNow); + await wallet.ws.runPending(args.runPendingOpt.forceNow); }); }); @@ -318,7 +314,7 @@ walletCli .maybeOption("maxRetries", ["--max-retries"], clk.INT) .action(async (args) => { await withWallet(args, async (wallet) => { - await runUntilDone(wallet.ws, { + await wallet.ws.runUntilDone({ maxRetries: args.finishPendingOpt.maxRetries, }); wallet.ws.stop(); @@ -607,7 +603,7 @@ depositCli }, ); console.log(`Created deposit ${resp.depositGroupId}`); - await runPending(wallet.ws); + await wallet.ws.runPending(); }); }); diff --git a/packages/taler-wallet-core/src/common.ts b/packages/taler-wallet-core/src/common.ts index f1b34a291..a52877b33 100644 --- a/packages/taler-wallet-core/src/common.ts +++ b/packages/taler-wallet-core/src/common.ts @@ -40,8 +40,6 @@ import { import { DbAccess, GetReadOnlyAccess } from "./util/query.js"; import { TimerGroup } from "./util/timer.js"; -type NotificationListener = (n: WalletNotification) => void; - const logger = new Logger("state.ts"); export const EXCHANGE_COINS_LOCK = "exchange-coins-lock"; @@ -79,114 +77,51 @@ export interface ExchangeOperations { }>; } +export type NotificationListener = (n: WalletNotification) => void; + /** - * Internal state of the wallet. + * Internal, shard wallet state that is used by the implementation + * of wallet operations. + * + * FIXME: This should not be exported anywhere from the taler-wallet-core package, + * as it's an opaque implementation detail. */ -export class InternalWalletState implements InternalWalletState { - memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); - memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); - memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> = new AsyncOpMemoSingle(); - memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle(); - memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); - memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); - memoProcessDeposit: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); +export interface InternalWalletState { + memoProcessReserve: AsyncOpMemoMap<void>; + memoMakePlanchet: AsyncOpMemoMap<void>; + memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>; + memoGetBalance: AsyncOpMemoSingle<BalancesResponse>; + memoProcessRefresh: AsyncOpMemoMap<void>; + memoProcessRecoup: AsyncOpMemoMap<void>; + memoProcessDeposit: AsyncOpMemoMap<void>; cryptoApi: CryptoApi; - timerGroup: TimerGroup = new TimerGroup(); - latch = new AsyncCondition(); - stopped = false; - memoRunRetryLoop = new AsyncOpMemoSingle<void>(); + timerGroup: TimerGroup; + latch: AsyncCondition; + stopped: boolean; + memoRunRetryLoop: AsyncOpMemoSingle<void>; - listeners: NotificationListener[] = []; + listeners: NotificationListener[]; - initCalled: boolean = false; + initCalled: boolean; - // FIXME: This should be done in wallet.ts, here we should only give declarations - exchangeOps: ExchangeOperations = { - getExchangeDetails, - getExchangeTrust, - updateExchangeFromUrl, - }; + exchangeOps: ExchangeOperations; - /** - * Promises that are waiting for a particular resource. - */ - private resourceWaiters: Record<string, OpenedPromise<void>[]> = {}; + db: DbAccess<typeof WalletStoresV1>; + http: HttpRequestLibrary; - /** - * Resources that are currently locked. - */ - private resourceLocks: Set<string> = new Set(); - - constructor( - // FIXME: Make this a getter and make - // the actual value nullable. - // Check if we are in a DB migration / garbage collection - // and throw an error in that case. - public db: DbAccess<typeof WalletStoresV1>, - public http: HttpRequestLibrary, - cryptoWorkerFactory: CryptoWorkerFactory, - ) { - this.cryptoApi = new CryptoApi(cryptoWorkerFactory); - } - - notify(n: WalletNotification): void { - logger.trace("Notification", n); - for (const l of this.listeners) { - const nc = JSON.parse(JSON.stringify(n)); - setTimeout(() => { - l(nc); - }, 0); - } - } - - addNotificationListener(f: (n: WalletNotification) => void): void { - this.listeners.push(f); - } + notify(n: WalletNotification): void; + + addNotificationListener(f: (n: WalletNotification) => void): void; /** * Stop ongoing processing. */ - stop(): void { - this.stopped = true; - this.timerGroup.stopCurrentAndFutureTimers(); - this.cryptoApi.stop(); - } + stop(): void; /** * Run an async function after acquiring a list of locks, identified * by string tokens. */ - async runSequentialized<T>(tokens: string[], f: () => Promise<T>) { - // Make sure locks are always acquired in the same order - tokens = [...tokens].sort(); - - for (const token of tokens) { - if (this.resourceLocks.has(token)) { - const p = openPromise<void>(); - let waitList = this.resourceWaiters[token]; - if (!waitList) { - waitList = this.resourceWaiters[token] = []; - } - waitList.push(p); - await p.promise; - } - this.resourceLocks.add(token); - } - - try { - logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`); - const result = await f(); - logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`); - return result; - } finally { - for (const token of tokens) { - this.resourceLocks.delete(token); - let waiter = (this.resourceWaiters[token] ?? []).shift(); - if (waiter) { - waiter.resolve(); - } - } - } - } + runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>; } diff --git a/packages/taler-wallet-core/src/headless/helpers.ts b/packages/taler-wallet-core/src/headless/helpers.ts index a0053fc0f..cb788e2bd 100644 --- a/packages/taler-wallet-core/src/headless/helpers.ts +++ b/packages/taler-wallet-core/src/headless/helpers.ts @@ -36,6 +36,7 @@ import { SynchronousCryptoWorkerFactory } from "../crypto/workers/synchronousWor import type { IDBFactory } from "@gnu-taler/idb-bridge"; import { WalletNotification } from "@gnu-taler/taler-util"; import { InternalWalletState } from "../common.js"; +import { Wallet } from "../wallet.js"; const logger = new Logger("headless/helpers.ts"); @@ -93,7 +94,7 @@ function makeId(length: number): string { */ export async function getDefaultNodeWallet( args: DefaultNodeWalletArgs = {}, -): Promise<InternalWalletState> { +): Promise<Wallet> { BridgeIDBFactory.enableTracing = false; const myBackend = new MemoryBackend(); myBackend.enableTracing = false; @@ -172,7 +173,7 @@ export async function getDefaultNodeWallet( workerFactory = new SynchronousCryptoWorkerFactory(); } - const w = new InternalWalletState(myDb, myHttpLib, workerFactory); + const w = await Wallet.create(myDb, myHttpLib, workerFactory); if (args.notifyHandler) { w.addNotificationListener(args.notifyHandler); diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index a9c4c97e8..3a3b4f6fd 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -23,6 +23,7 @@ * Imports. */ import { + BalancesResponse, codecForAny, codecForDeleteTransactionRequest, codecForRetryTransactionRequest, @@ -32,9 +33,11 @@ import { getDurationRemaining, isTimestampExpired, j2s, + PreparePayResultType, TalerErrorCode, Timestamp, timestampMin, + WalletNotification, } from "@gnu-taler/taler-util"; import { addBackupProvider, @@ -59,6 +62,7 @@ import { import { acceptExchangeTermsOfService, getExchangeDetails, + getExchangeTrust, updateExchangeFromUrl, } from "./operations/exchanges.js"; import { @@ -85,7 +89,11 @@ import { getFundingPaytoUris, processReserve, } from "./operations/reserves.js"; -import { InternalWalletState } from "./common.js"; +import { + ExchangeOperations, + InternalWalletState, + NotificationListener, +} from "./common.js"; import { runIntegrationTest, testPay, @@ -106,16 +114,16 @@ import { AuditorTrustRecord, CoinSourceType, ReserveRecordStatus, + WalletStoresV1, } from "./db.js"; import { NotificationType } from "@gnu-taler/taler-util"; import { PendingOperationInfo, + PendingOperationsResponse, PendingOperationType, } from "./pending-types.js"; import { CoinDumpJson } from "@gnu-taler/taler-util"; -import { - codecForTransactionsRequest, -} from "@gnu-taler/taler-util"; +import { codecForTransactionsRequest } from "@gnu-taler/taler-util"; import { AcceptManualWithdrawalResult, AcceptWithdrawalResponse, @@ -151,6 +159,16 @@ import { assertUnreachable } from "./util/assertUnreachable.js"; import { Logger } from "@gnu-taler/taler-util"; import { setWalletDeviceId } from "./operations/backup/state.js"; import { WalletCoreApiClient } from "./wallet-api-types.js"; +import { AsyncOpMemoMap, AsyncOpMemoSingle } from "./util/asyncMemo.js"; +import { CryptoApi, CryptoWorkerFactory } from "./crypto/workers/cryptoApi.js"; +import { TimerGroup } from "./util/timer.js"; +import { + AsyncCondition, + OpenedPromise, + openPromise, +} from "./util/promiseUtils.js"; +import { DbAccess } from "./util/query.js"; +import { HttpRequestLibrary } from "./util/http.js"; const builtinAuditors: AuditorTrustRecord[] = [ { @@ -618,7 +636,6 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> { return coinsJson; } - /** * Get an API client from an internal wallet state object. */ @@ -936,3 +953,178 @@ export async function handleCoreApiRequest( } } } + +/** + * Public handle to a running wallet. + */ +export class Wallet { + private ws: InternalWalletState; + private _client: WalletCoreApiClient; + + private constructor( + db: DbAccess<typeof WalletStoresV1>, + http: HttpRequestLibrary, + cryptoWorkerFactory: CryptoWorkerFactory, + ) { + this.ws = new InternalWalletStateImpl(db, http, cryptoWorkerFactory); + } + + get client() { + return this._client; + } + + static async create( + db: DbAccess<typeof WalletStoresV1>, + http: HttpRequestLibrary, + cryptoWorkerFactory: CryptoWorkerFactory, + ): Promise<Wallet> { + const w = new Wallet(db, http, cryptoWorkerFactory); + w._client = await getClientFromWalletState(w.ws); + return w; + } + + addNotificationListener(f: (n: WalletNotification) => void): void { + return this.ws.addNotificationListener(f); + } + + stop(): void { + this.ws.stop(); + } + + runRetryLoop(): Promise<void> { + return runRetryLoop(this.ws); + } + + runPending(forceNow: boolean = false) { + return runPending(this.ws, forceNow); + } + + runUntilDone( + req: { + maxRetries?: number; + } = {}, + ) { + return runUntilDone(this.ws, req); + } + + handleCoreApiRequest( + operation: string, + id: string, + payload: unknown, + ): Promise<CoreApiResponse> { + return handleCoreApiRequest(this.ws, operation, id, payload); + } +} + +/** + * Internal state of the wallet. + * + * This ties together all the operation implementations. + */ +class InternalWalletStateImpl implements InternalWalletState { + memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> = new AsyncOpMemoSingle(); + memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle(); + memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + memoProcessDeposit: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); + cryptoApi: CryptoApi; + + timerGroup: TimerGroup = new TimerGroup(); + latch = new AsyncCondition(); + stopped = false; + memoRunRetryLoop = new AsyncOpMemoSingle<void>(); + + listeners: NotificationListener[] = []; + + initCalled: boolean = false; + + exchangeOps: ExchangeOperations = { + getExchangeDetails, + getExchangeTrust, + updateExchangeFromUrl, + }; + + /** + * Promises that are waiting for a particular resource. + */ + private resourceWaiters: Record<string, OpenedPromise<void>[]> = {}; + + /** + * Resources that are currently locked. + */ + private resourceLocks: Set<string> = new Set(); + + constructor( + // FIXME: Make this a getter and make + // the actual value nullable. + // Check if we are in a DB migration / garbage collection + // and throw an error in that case. + public db: DbAccess<typeof WalletStoresV1>, + public http: HttpRequestLibrary, + cryptoWorkerFactory: CryptoWorkerFactory, + ) { + this.cryptoApi = new CryptoApi(cryptoWorkerFactory); + } + + notify(n: WalletNotification): void { + logger.trace("Notification", n); + for (const l of this.listeners) { + const nc = JSON.parse(JSON.stringify(n)); + setTimeout(() => { + l(nc); + }, 0); + } + } + + addNotificationListener(f: (n: WalletNotification) => void): void { + this.listeners.push(f); + } + + /** + * Stop ongoing processing. + */ + stop(): void { + this.stopped = true; + this.timerGroup.stopCurrentAndFutureTimers(); + this.cryptoApi.stop(); + } + + /** + * Run an async function after acquiring a list of locks, identified + * by string tokens. + */ + async runSequentialized<T>(tokens: string[], f: () => Promise<T>) { + // Make sure locks are always acquired in the same order + tokens = [...tokens].sort(); + + for (const token of tokens) { + if (this.resourceLocks.has(token)) { + const p = openPromise<void>(); + let waitList = this.resourceWaiters[token]; + if (!waitList) { + waitList = this.resourceWaiters[token] = []; + } + waitList.push(p); + await p.promise; + } + this.resourceLocks.add(token); + } + + try { + logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`); + const result = await f(); + logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`); + return result; + } finally { + for (const token of tokens) { + this.resourceLocks.delete(token); + let waiter = (this.resourceWaiters[token] ?? []).shift(); + if (waiter) { + waiter.resolve(); + } + } + } + } +} diff --git a/packages/taler-wallet-webextension/src/wxBackend.ts b/packages/taler-wallet-webextension/src/wxBackend.ts index 9c01f4c0c..c8b2dbd78 100644 --- a/packages/taler-wallet-webextension/src/wxBackend.ts +++ b/packages/taler-wallet-webextension/src/wxBackend.ts @@ -37,6 +37,8 @@ import { runRetryLoop, handleNotifyReserve, InternalWalletState, + Wallet, + WalletApiOperation, } from "@gnu-taler/taler-wallet-core"; import { classifyTalerUri, @@ -52,8 +54,10 @@ import { BrowserCryptoWorkerFactory } from "./browserCryptoWorkerFactory"; /** * Currently active wallet instance. Might be unloaded and * re-instantiated when the database is reset. + * + * FIXME: Maybe move the wallet reseting into the Wallet class? */ -let currentWallet: InternalWalletState | undefined; +let currentWallet: Wallet | undefined; let currentDatabase: DbAccess<typeof WalletStoresV1> | undefined; @@ -170,7 +174,7 @@ async function dispatch( }; break; } - r = await handleCoreApiRequest(w, req.operation, req.id, req.payload); + r = await w.handleCoreApiRequest(req.operation, req.id, req.payload); break; } } @@ -256,7 +260,7 @@ async function reinitWallet(): Promise<void> { } const http = new BrowserHttpLib(); console.log("setting wallet"); - const wallet = new InternalWalletState( + const wallet = await Wallet.create( currentDatabase, http, new BrowserCryptoWorkerFactory(), @@ -270,7 +274,7 @@ async function reinitWallet(): Promise<void> { } } }); - runRetryLoop(wallet).catch((e) => { + wallet.runRetryLoop().catch((e) => { console.log("error during wallet retry loop", e); }); // Useful for debugging in the background page. @@ -360,7 +364,8 @@ function headerListener( if (!w) { return; } - handleNotifyReserve(w); + // FIXME: Is this still useful? + // handleNotifyReserve(w); }); break; default: @@ -451,4 +456,3 @@ export async function wxMain(): Promise<void> { setupHeaderListener(); }); } - |