From c57ba4c0cea133059ac30eae3c7e527886240059 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Wed, 25 Jan 2023 18:49:00 +0100 Subject: wallet-cli: daemonized wallet MVP --- packages/taler-util/src/wallet-types.ts | 28 ++- packages/taler-wallet-cli/src/index.ts | 304 +++++++++++++++++++++-- packages/taler-wallet-cli/src/rpc.ts | 266 ++++++++++++++++++++ packages/taler-wallet-embedded/src/index.ts | 4 +- packages/taler-wallet-embedded/src/wallet-qjs.ts | 4 +- 5 files changed, 572 insertions(+), 34 deletions(-) create mode 100644 packages/taler-wallet-cli/src/rpc.ts (limited to 'packages') diff --git a/packages/taler-util/src/wallet-types.ts b/packages/taler-util/src/wallet-types.ts index 50cb50f30..b7a51de9e 100644 --- a/packages/taler-util/src/wallet-types.ts +++ b/packages/taler-util/src/wallet-types.ts @@ -109,11 +109,11 @@ export interface GetBalanceDetailRequest { currency: string; } -export const codecForGetBalanceDetailRequest = (): Codec => - buildCodecForObject() - .property("currency", codecForString()) - .build("GetBalanceDetailRequest"); - +export const codecForGetBalanceDetailRequest = + (): Codec => + buildCodecForObject() + .property("currency", codecForString()) + .build("GetBalanceDetailRequest"); export interface Balance { available: AmountString; @@ -225,11 +225,11 @@ export interface CoinDumpJson { withdrawal_reserve_pub: string | undefined; coin_status: CoinStatus; spend_allocation: - | { - id: string; - amount: string; - } - | undefined; + | { + id: string; + amount: string; + } + | undefined; /** * Information about the age restriction */ @@ -1430,9 +1430,14 @@ export const codecForConfirmPayRequest = (): Codec => .property("forcedCoinSel", codecForAny()) .build("ConfirmPay"); +export interface CoreApiRequestEnvelope { + id: string; + operation: string; + args: unknown; +} export type CoreApiResponse = CoreApiResponseSuccess | CoreApiResponseError; -export type CoreApiEnvelope = CoreApiResponse | CoreApiNotification; +export type CoreApiMessageEnvelope = CoreApiResponse | CoreApiNotification; export interface CoreApiNotification { type: "notification"; @@ -1802,7 +1807,6 @@ export const codecForUserAttentionsRequest = (): Codec => ) .build("UserAttentionsRequest"); - export interface UserAttentionsRequest { priority?: AttentionPriority; } diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts index 4f84bb6c4..14000aefd 100644 --- a/packages/taler-wallet-cli/src/index.ts +++ b/packages/taler-wallet-cli/src/index.ts @@ -24,6 +24,9 @@ import { clk, codecForList, codecForString, + CoreApiMessageEnvelope, + CoreApiRequestEnvelope, + CoreApiResponse, decodeCrock, encodeCrock, getRandomBytes, @@ -35,8 +38,15 @@ import { setDangerousTimetravel, setGlobalLogLevelFromString, TalerUriType, + WalletNotification, } from "@gnu-taler/taler-util"; -import type { TalerCryptoInterface } from "@gnu-taler/taler-wallet-core"; +import { + OpenedPromise, + openPromise, + TalerCryptoInterface, + TalerError, + WalletCoreResponseType, +} from "@gnu-taler/taler-wallet-core"; import { CryptoDispatcher, getDefaultNodeWallet, @@ -54,6 +64,7 @@ import { } from "@gnu-taler/taler-wallet-core"; import fs from "fs"; import os from "os"; +import { connectRpc, JsonMessage, runRpcServer } from "./rpc.js"; // This module also serves as the entry point for the crypto // thread worker, and thus must expose these two handlers. @@ -154,7 +165,10 @@ export const walletCli = clk help: "Command line interface for the GNU Taler wallet.", }) .maybeOption("walletDbFile", ["--wallet-db"], clk.STRING, { - help: "location of the wallet database file", + help: "Location of the wallet database file", + }) + .maybeOption("walletConnection", ["--wallet-connection"], clk.STRING, { + help: "Connect to an RPC wallet", }) .maybeOption("timetravel", ["--timetravel"], clk.INT, { help: "modify system time by given offset in microseconds", @@ -199,10 +213,33 @@ function checkEnvFlag(name: string): boolean { return false; } -async function withWallet( +export interface WalletContext { + /** + * High-level client for making API requests to wallet-core. + */ + client: WalletCoreApiClient; + + /** + * Low-level interface for making API requests to wallet-core. + */ + makeCoreApiRequest( + operation: string, + payload: unknown, + ): Promise; + + /** + * Return a promise that resolves after the wallet has emitted a notification + * that meets the criteria of the "cond" predicate. + */ + waitForNotificationCond( + cond: (n: WalletNotification) => boolean, + ): Promise; +} + +async function createLocalWallet( walletCliArgs: WalletCliArgsType, - f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise, -): Promise { + notificationHandler?: (n: WalletNotification) => void, +): Promise { const dbPath = walletCliArgs.wallet.walletDbFile ?? defaultWalletDbPath; const myHttpLib = new NodeHttpLib(); if (walletCliArgs.wallet.noThrottle) { @@ -213,6 +250,9 @@ async function withWallet( httpLib: myHttpLib, notifyHandler: (n) => { logger.info(`wallet notification: ${j2s(n)}`); + if (notificationHandler) { + notificationHandler(n); + } }, cryptoWorkerType: walletCliArgs.wallet.cryptoWorker as any, }); @@ -223,15 +263,10 @@ async function withWallet( applyVerbose(walletCliArgs.wallet.verbose); try { - const w = { - ws: wallet, - client: wallet.client, - }; await wallet.handleCoreApiRequest("initWallet", "native-init", { skipDefaults: walletCliArgs.wallet.skipDefaults, }); - const ret = await f(w); - return ret; + return wallet; } catch (e) { const ed = getErrorDetailFromException(e); console.error("Operation failed: " + summarizeTalerErrorDetail(ed)); @@ -239,11 +274,189 @@ async function withWallet( process.exit(1); } finally { logger.trace("operation with wallet finished, stopping"); - wallet.stop(); logger.trace("stopped wallet"); } } +export interface RemoteWallet { + /** + * Low-level interface for making API requests to wallet-core. + */ + makeCoreApiRequest( + operation: string, + payload: unknown, + ): Promise; + + /** + * Close the connection to the remote wallet. + */ + close(): void; +} + +async function createRemoteWallet( + notificationHandler?: (n: WalletNotification) => void, +): Promise { + let nextRequestId = 1; + let requestMap: Map< + string, + { + promiseCapability: OpenedPromise; + } + > = new Map(); + + const ctx = await connectRpc({ + socketFilename: "wallet-core.sock", + onEstablished(connection) { + const ctx: RemoteWallet = { + makeCoreApiRequest(operation, payload) { + const id = `req-${nextRequestId}`; + const req: CoreApiRequestEnvelope = { + operation, + id, + args: payload, + }; + const promiseCap = openPromise(); + requestMap.set(id, { + promiseCapability: promiseCap, + }); + connection.sendMessage(req as unknown as JsonMessage); + return promiseCap.promise; + }, + close() { + connection.close(); + }, + }; + return { + result: ctx, + onDisconnect() { + logger.info("remote wallet disconnected"); + }, + onMessage(m) { + // FIXME: use a codec for parsing the response envelope! + + logger.info(`got message from remote wallet: ${j2s(m)}`); + if (typeof m !== "object" || m == null) { + logger.warn("message from wallet not understood (wrong type)"); + return; + } + const type = (m as any).type; + if (type === "response" || type === "error") { + const id = (m as any).id; + if (typeof id !== "string") { + logger.warn( + "message from wallet not understood (no id in response)", + ); + return; + } + const h = requestMap.get(id); + if (!h) { + logger.warn(`no handler registered for response id ${id}`); + return; + } + h.promiseCapability.resolve(m as any); + } else if (type === "notification") { + logger.info("got notification"); + if (notificationHandler) { + notificationHandler((m as any).payload); + } + } else { + logger.warn("message from wallet not understood"); + } + }, + }; + }, + }); + return ctx; +} + +/** + * Get a high-level API client from a remove wallet. + */ +function getClientFromRemoteWallet(w: RemoteWallet): WalletCoreApiClient { + const client: WalletCoreApiClient = { + async call(op, payload): Promise { + const res = await w.makeCoreApiRequest(op, payload); + switch (res.type) { + case "error": + throw TalerError.fromUncheckedDetail(res.error); + case "response": + return res.result; + } + }, + }; + return client; +} + +async function withWallet( + walletCliArgs: WalletCliArgsType, + f: (ctx: WalletContext) => Promise, +): Promise { + // Bookkeeping for waiting on notification conditions + let nextCondIndex = 1; + const condMap: Map< + number, + { + condition: (n: WalletNotification) => boolean; + promiseCapability: OpenedPromise; + } + > = new Map(); + function onNotification(n: WalletNotification) { + condMap.forEach((cond, condKey) => { + if (cond.condition(n)) { + cond.promiseCapability.resolve(); + } + }); + } + function waitForNotificationCond(cond: (n: WalletNotification) => boolean) { + const promCap = openPromise(); + condMap.set(nextCondIndex++, { + condition: cond, + promiseCapability: promCap, + }); + return promCap.promise; + } + + if (walletCliArgs.wallet.walletConnection) { + logger.info("creating remote wallet"); + const w = await createRemoteWallet(onNotification); + const ctx: WalletContext = { + makeCoreApiRequest(operation, payload) { + return w.makeCoreApiRequest(operation, payload); + }, + client: getClientFromRemoteWallet(w), + waitForNotificationCond, + }; + const res = await f(ctx); + w.close(); + return res; + } else { + const w = await createLocalWallet(walletCliArgs, onNotification); + const ctx: WalletContext = { + client: w.client, + waitForNotificationCond, + makeCoreApiRequest(operation, payload) { + return w.handleCoreApiRequest(operation, "my-req", payload); + }, + }; + return await f(ctx); + } +} + +/** + * Run a function with a local wallet. + * + * Stops the wallet after the function is done. + */ +async function withLocalWallet( + walletCliArgs: WalletCliArgsType, + f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise, +): Promise { + const w = await createLocalWallet(walletCliArgs); + const res = await f({ client: w.client, ws: w }); + w.stop(); + return res; +} + walletCli .subcommand("balance", "balance", { help: "Show wallet balance." }) .flag("json", ["--json"], { @@ -277,9 +490,8 @@ walletCli process.exit(1); } try { - const resp = await wallet.ws.handleCoreApiRequest( + const resp = await wallet.makeCoreApiRequest( args.api.operation, - "reqid-1", requestJson, ); console.log(JSON.stringify(resp, undefined, 2)); @@ -338,7 +550,7 @@ transactionsCli help: "Identifier of the transaction to delete", }) .flag("force", ["--force"], { - help: "Force aborting the transaction. Might lose money." + help: "Force aborting the transaction. Might lose money.", }) .action(async (args) => { await withWallet(args, async (wallet) => { @@ -383,7 +595,7 @@ walletCli .maybeOption("maxRetries", ["--max-retries"], clk.INT) .flag("failOnMaxRetries", ["--fail-on-max-retries"]) .action(async (args) => { - await withWallet(args, async (wallet) => { + await withLocalWallet(args, async (wallet) => { logger.info("running until pending operations are finished"); const resp = await wallet.ws.runTaskLoop({ maxRetries: args.finishPendingOpt.maxRetries, @@ -802,7 +1014,7 @@ depositCli .requiredArgument("amount", clk.STRING) .requiredArgument("targetPayto", clk.STRING) .action(async (args) => { - await withWallet(args, async (wallet) => { + await withLocalWallet(args, async (wallet) => { const resp = await wallet.client.call( WalletApiOperation.CreateDepositGroup, { @@ -815,6 +1027,7 @@ depositCli }); }); +// FIXME: should probably be removed depositCli .subcommand("trackDepositArgs", "track") .requiredArgument("depositGroupId", clk.STRING) @@ -834,6 +1047,61 @@ const advancedCli = walletCli.subcommand("advancedArgs", "advanced", { help: "Subcommands for advanced operations (only use if you know what you're doing!).", }); +advancedCli + .subcommand("serve", "serve", { + help: "Serve the wallet API via a unix domain socket.", + }) + .action(async (args) => { + const w = await createLocalWallet(args); + w.runTaskLoop() + .then((res) => { + logger.warn("task loop exited unexpectedly"); + }) + .catch((e) => { + logger.error(`error in task loop: ${e}`); + }); + let nextClientId = 1; + const notifyHandlers = new Map void>(); + w.addNotificationListener((n) => { + notifyHandlers.forEach((v, k) => { + v(n); + }); + }); + await runRpcServer({ + socketFilename: "wallet-core.sock", + onConnect(client) { + logger.info("connected"); + const clientId = nextClientId++; + notifyHandlers.set(clientId, (n: WalletNotification) => { + client.sendResponse({ + type: "notification", + payload: n as unknown as JsonMessage, + }); + }); + return { + onDisconnect() { + notifyHandlers.delete(clientId); + logger.info("disconnected"); + }, + onMessage(msg) { + logger.info(`message: ${j2s(msg)}`); + const op = (msg as any).operation; + const id = (msg as any).id; + const payload = (msg as any).args; + w.handleCoreApiRequest(op, id, payload) + .then((resp) => { + logger.info("sending response"); + client.sendResponse(resp as unknown as JsonMessage); + }) + .catch((e) => { + logger.error(`unexpected error: ${e}`); + }); + }, + }; + }, + }); + }); + advancedCli .subcommand("init", "init", { help: "Initialize the wallet (with DB) and exit.", @@ -848,7 +1116,7 @@ advancedCli }) .flag("forceNow", ["-f", "--force-now"]) .action(async (args) => { - await withWallet(args, async (wallet) => { + await withLocalWallet(args, async (wallet) => { await wallet.ws.runPending(args.runPendingOpt.forceNow); }); }); diff --git a/packages/taler-wallet-cli/src/rpc.ts b/packages/taler-wallet-cli/src/rpc.ts new file mode 100644 index 000000000..8070b96c5 --- /dev/null +++ b/packages/taler-wallet-cli/src/rpc.ts @@ -0,0 +1,266 @@ +/* + This file is part of GNU Taler + (C) 2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +/** + * Implementation for the wallet-core IPC protocol. + * + * Currently the protcol is completely unstable and only used internally + * by the wallet for testing purposes. + */ + +/** + * Imports. + */ +import * as net from "node:net"; +import * as fs from "node:fs"; +import { bytesToString, Logger, typedArrayConcat } from "@gnu-taler/taler-util"; + +const logger = new Logger("rpc.ts"); + +export type JsonMessage = + | string + | number + | boolean + | null + | JsonMessage[] + | { [key: string]: JsonMessage }; + +export interface RpcServerClientHandlers { + onMessage(msg: JsonMessage): void; + onDisconnect(): void; +} + +export interface RpcServerClient { + sendResponse(message: JsonMessage): void; +} + +export interface RpcServerArgs { + socketFilename: string; + onConnect(client: RpcServerClient): RpcServerClientHandlers; +} + +export interface RpcClientServerConnection { + sendMessage(m: JsonMessage): void; + close(): void; +} + +export interface RpcConnectArgs { + socketFilename: string; + onEstablished(connection: RpcClientServerConnection): { + result: T; + onDisconnect(): void; + onMessage(m: JsonMessage): void; + }; +} + +export interface ReadLinewiseArgs { + onLine(lineData: Uint8Array): void; + sock: net.Socket; +} + +function readStreamLinewise(args: ReadLinewiseArgs): void { + let chunks: Uint8Array[] = []; + args.sock.on("data", (buf: Uint8Array) => { + logger.info(`received ${buf.length} bytes`); + // Process all newlines in the newly received buffer + while (1) { + const newlineIdx = buf.indexOf("\n".charCodeAt(0)); + if (newlineIdx >= 0) { + let left = buf.subarray(0, newlineIdx + 1); + let right = buf.subarray(newlineIdx + 1); + chunks.push(left); + const line = typedArrayConcat(chunks); + args.onLine(line); + chunks = []; + buf = right; + } else { + chunks.push(buf); + break; + } + } + }); +} + +export async function connectRpc(args: RpcConnectArgs): Promise { + let sockFilename = args.socketFilename; + return new Promise((resolve, reject) => { + const client = net.createConnection(sockFilename); + client.on("connect", () => { + let parsingBody: string | undefined = undefined; + let bodyChunks: string[] = []; + + logger.info("connected!"); + client.write("%hello-from-client\n"); + const res = args.onEstablished({ + sendMessage(m) { + client.write("%request\n"); + client.write(JSON.stringify(m)); + client.write("\n"); + client.write("%end\n"); + }, + close() { + client.destroy(); + }, + }); + readStreamLinewise({ + sock: client, + onLine(line) { + const lineStr = bytesToString(line); + logger.info(`got line from server: ${lineStr}`); + // Are we currently parsing the body of a request? + if (!parsingBody) { + const strippedLine = lineStr.trim(); + if (strippedLine == "%message") { + logger.info("got message start"); + parsingBody = "message"; + } else if (strippedLine == "%hello-from-server") { + logger.info("got hello from server"); + } else if (strippedLine.startsWith("%error:")) { + logger.info("got error from server, disconnecting"); + client.end(); + res.onDisconnect(); + } else { + logger.info("got unknown request"); + client.write("%error: invalid message\n"); + client.end(); + } + } else if (parsingBody == "message") { + const strippedLine = lineStr.trim(); + if (strippedLine == "%end") { + logger.info("finished request"); + let req = bodyChunks.join(""); + let reqJson: any = undefined; + try { + reqJson = JSON.parse(req); + } catch (e) { + logger.warn("JSON request was invalid"); + } + if (reqJson !== undefined) { + logger.info(`request: ${req}`); + res.onMessage(reqJson); + } else { + client.write("%error: invalid JSON"); + client.end(); + } + bodyChunks = []; + } else { + bodyChunks.push(lineStr); + } + } else { + logger.info("invalid parser state"); + client.write("%error: internal error\n"); + client.end(); + } + }, + }); + client.on("close", () => { + res.onDisconnect(); + }); + client.on("data", () => {}); + resolve(res.result); + }); + }); +} + +export async function runRpcServer(args: RpcServerArgs): Promise { + let sockFilename = args.socketFilename; + try { + fs.unlinkSync(sockFilename); + } catch (e) { + // Do nothing! + } + return new Promise((resolve, reject) => { + const server = net.createServer((sock) => { + // Are we currently parsing the body of a request? + let parsingBody: string | undefined = undefined; + let bodyChunks: string[] = []; + + logger.info("got new connection"); + sock.write("%hello-from-server\n"); + const handlers = args.onConnect({ + sendResponse(message) { + sock.write("%message\n"); + sock.write(JSON.stringify(message)); + sock.write("\n"); + sock.write("%end\n"); + }, + }); + + sock.on("error", (err) => { + logger.info(`connection error: ${err}`); + }); + + function processLine(line: Uint8Array) { + const lineStr = bytesToString(line); + logger.info(`got line: ${lineStr}`); + if (!parsingBody) { + const strippedLine = lineStr.trim(); + if (strippedLine == "%request") { + logger.info("got request start"); + parsingBody = "request"; + } else if (strippedLine === "%hello-from-client") { + console.log("got hello from client"); + } else if (strippedLine.startsWith("%error:")) { + console.log("got error from client"); + sock.end(); + handlers.onDisconnect(); + } else { + logger.info("got unknown request"); + sock.write("%error: invalid request\n"); + sock.end(); + } + } else if (parsingBody == "request") { + const strippedLine = lineStr.trim(); + if (strippedLine == "%end") { + logger.info("finished request"); + let req = bodyChunks.join(""); + let reqJson: any = undefined; + try { + reqJson = JSON.parse(req); + } catch (e) { + logger.warn("JSON request was invalid"); + } + if (reqJson !== undefined) { + logger.info(`request: ${req}`); + handlers.onMessage(reqJson); + } else { + sock.write("%error: invalid JSON"); + sock.end(); + } + bodyChunks = []; + } else { + bodyChunks.push(lineStr); + } + } else { + logger.info("invalid parser state"); + sock.write("%error: internal error\n"); + sock.end(); + } + } + + readStreamLinewise({ + sock, + onLine: processLine, + }); + + sock.on("close", (hadError: boolean) => { + logger.info(`connection closed, hadError=${hadError}`); + handlers.onDisconnect(); + }); + }); + server.listen("wallet-core.sock"); + }); +} diff --git a/packages/taler-wallet-embedded/src/index.ts b/packages/taler-wallet-embedded/src/index.ts index ab8fdd32b..b505a2d9d 100644 --- a/packages/taler-wallet-embedded/src/index.ts +++ b/packages/taler-wallet-embedded/src/index.ts @@ -36,7 +36,7 @@ import { } from "@gnu-taler/taler-wallet-core"; import { - CoreApiEnvelope, + CoreApiMessageEnvelope, CoreApiResponse, CoreApiResponseSuccess, Logger, @@ -140,7 +140,7 @@ export class NativeHttpLib implements HttpRequestLibrary { } } -function sendNativeMessage(ev: CoreApiEnvelope): void { +function sendNativeMessage(ev: CoreApiMessageEnvelope): void { // @ts-ignore const sendMessage = globalThis.__native_sendMessage; if (typeof sendMessage !== "function") { diff --git a/packages/taler-wallet-embedded/src/wallet-qjs.ts b/packages/taler-wallet-embedded/src/wallet-qjs.ts index e6e1a34a5..c4178e596 100644 --- a/packages/taler-wallet-embedded/src/wallet-qjs.ts +++ b/packages/taler-wallet-embedded/src/wallet-qjs.ts @@ -34,7 +34,7 @@ import { } from "@gnu-taler/taler-wallet-core"; import { - CoreApiEnvelope, + CoreApiMessageEnvelope, CoreApiResponse, CoreApiResponseSuccess, j2s, @@ -169,7 +169,7 @@ export class NativeHttpLib implements HttpRequestLibrary { } } -function sendNativeMessage(ev: CoreApiEnvelope): void { +function sendNativeMessage(ev: CoreApiMessageEnvelope): void { const m = JSON.stringify(ev); qjsOs.postMessageToHost(m); } -- cgit v1.2.3