aboutsummaryrefslogtreecommitdiff
path: root/packages/taler-wallet-cli
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-01-25 18:49:00 +0100
committerFlorian Dold <florian@dold.me>2023-01-25 18:49:00 +0100
commitc57ba4c0cea133059ac30eae3c7e527886240059 (patch)
tree9028ccde2bbd990b00449b9bb14c77e2b4732dcc /packages/taler-wallet-cli
parent3aa077e0975128b66ade8866ad8227e3666a1b13 (diff)
downloadwallet-core-c57ba4c0cea133059ac30eae3c7e527886240059.tar.xz
wallet-cli: daemonized wallet MVP
Diffstat (limited to 'packages/taler-wallet-cli')
-rw-r--r--packages/taler-wallet-cli/src/index.ts304
-rw-r--r--packages/taler-wallet-cli/src/rpc.ts266
2 files changed, 552 insertions, 18 deletions
diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts
index 4f84bb6c4..14000aefd 100644
--- a/packages/taler-wallet-cli/src/index.ts
+++ b/packages/taler-wallet-cli/src/index.ts
@@ -24,6 +24,9 @@ import {
clk,
codecForList,
codecForString,
+ CoreApiMessageEnvelope,
+ CoreApiRequestEnvelope,
+ CoreApiResponse,
decodeCrock,
encodeCrock,
getRandomBytes,
@@ -35,8 +38,15 @@ import {
setDangerousTimetravel,
setGlobalLogLevelFromString,
TalerUriType,
+ WalletNotification,
} from "@gnu-taler/taler-util";
-import type { TalerCryptoInterface } from "@gnu-taler/taler-wallet-core";
+import {
+ OpenedPromise,
+ openPromise,
+ TalerCryptoInterface,
+ TalerError,
+ WalletCoreResponseType,
+} from "@gnu-taler/taler-wallet-core";
import {
CryptoDispatcher,
getDefaultNodeWallet,
@@ -54,6 +64,7 @@ import {
} from "@gnu-taler/taler-wallet-core";
import fs from "fs";
import os from "os";
+import { connectRpc, JsonMessage, runRpcServer } from "./rpc.js";
// This module also serves as the entry point for the crypto
// thread worker, and thus must expose these two handlers.
@@ -154,7 +165,10 @@ export const walletCli = clk
help: "Command line interface for the GNU Taler wallet.",
})
.maybeOption("walletDbFile", ["--wallet-db"], clk.STRING, {
- help: "location of the wallet database file",
+ help: "Location of the wallet database file",
+ })
+ .maybeOption("walletConnection", ["--wallet-connection"], clk.STRING, {
+ help: "Connect to an RPC wallet",
})
.maybeOption("timetravel", ["--timetravel"], clk.INT, {
help: "modify system time by given offset in microseconds",
@@ -199,10 +213,33 @@ function checkEnvFlag(name: string): boolean {
return false;
}
-async function withWallet<T>(
+export interface WalletContext {
+ /**
+ * High-level client for making API requests to wallet-core.
+ */
+ client: WalletCoreApiClient;
+
+ /**
+ * Low-level interface for making API requests to wallet-core.
+ */
+ makeCoreApiRequest(
+ operation: string,
+ payload: unknown,
+ ): Promise<CoreApiResponse>;
+
+ /**
+ * Return a promise that resolves after the wallet has emitted a notification
+ * that meets the criteria of the "cond" predicate.
+ */
+ waitForNotificationCond(
+ cond: (n: WalletNotification) => boolean,
+ ): Promise<void>;
+}
+
+async function createLocalWallet(
walletCliArgs: WalletCliArgsType,
- f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>,
-): Promise<T> {
+ notificationHandler?: (n: WalletNotification) => void,
+): Promise<Wallet> {
const dbPath = walletCliArgs.wallet.walletDbFile ?? defaultWalletDbPath;
const myHttpLib = new NodeHttpLib();
if (walletCliArgs.wallet.noThrottle) {
@@ -213,6 +250,9 @@ async function withWallet<T>(
httpLib: myHttpLib,
notifyHandler: (n) => {
logger.info(`wallet notification: ${j2s(n)}`);
+ if (notificationHandler) {
+ notificationHandler(n);
+ }
},
cryptoWorkerType: walletCliArgs.wallet.cryptoWorker as any,
});
@@ -223,15 +263,10 @@ async function withWallet<T>(
applyVerbose(walletCliArgs.wallet.verbose);
try {
- const w = {
- ws: wallet,
- client: wallet.client,
- };
await wallet.handleCoreApiRequest("initWallet", "native-init", {
skipDefaults: walletCliArgs.wallet.skipDefaults,
});
- const ret = await f(w);
- return ret;
+ return wallet;
} catch (e) {
const ed = getErrorDetailFromException(e);
console.error("Operation failed: " + summarizeTalerErrorDetail(ed));
@@ -239,11 +274,189 @@ async function withWallet<T>(
process.exit(1);
} finally {
logger.trace("operation with wallet finished, stopping");
- wallet.stop();
logger.trace("stopped wallet");
}
}
+export interface RemoteWallet {
+ /**
+ * Low-level interface for making API requests to wallet-core.
+ */
+ makeCoreApiRequest(
+ operation: string,
+ payload: unknown,
+ ): Promise<CoreApiResponse>;
+
+ /**
+ * Close the connection to the remote wallet.
+ */
+ close(): void;
+}
+
+async function createRemoteWallet(
+ notificationHandler?: (n: WalletNotification) => void,
+): Promise<RemoteWallet> {
+ let nextRequestId = 1;
+ let requestMap: Map<
+ string,
+ {
+ promiseCapability: OpenedPromise<CoreApiResponse>;
+ }
+ > = new Map();
+
+ const ctx = await connectRpc<RemoteWallet>({
+ socketFilename: "wallet-core.sock",
+ onEstablished(connection) {
+ const ctx: RemoteWallet = {
+ makeCoreApiRequest(operation, payload) {
+ const id = `req-${nextRequestId}`;
+ const req: CoreApiRequestEnvelope = {
+ operation,
+ id,
+ args: payload,
+ };
+ const promiseCap = openPromise<CoreApiResponse>();
+ requestMap.set(id, {
+ promiseCapability: promiseCap,
+ });
+ connection.sendMessage(req as unknown as JsonMessage);
+ return promiseCap.promise;
+ },
+ close() {
+ connection.close();
+ },
+ };
+ return {
+ result: ctx,
+ onDisconnect() {
+ logger.info("remote wallet disconnected");
+ },
+ onMessage(m) {
+ // FIXME: use a codec for parsing the response envelope!
+
+ logger.info(`got message from remote wallet: ${j2s(m)}`);
+ if (typeof m !== "object" || m == null) {
+ logger.warn("message from wallet not understood (wrong type)");
+ return;
+ }
+ const type = (m as any).type;
+ if (type === "response" || type === "error") {
+ const id = (m as any).id;
+ if (typeof id !== "string") {
+ logger.warn(
+ "message from wallet not understood (no id in response)",
+ );
+ return;
+ }
+ const h = requestMap.get(id);
+ if (!h) {
+ logger.warn(`no handler registered for response id ${id}`);
+ return;
+ }
+ h.promiseCapability.resolve(m as any);
+ } else if (type === "notification") {
+ logger.info("got notification");
+ if (notificationHandler) {
+ notificationHandler((m as any).payload);
+ }
+ } else {
+ logger.warn("message from wallet not understood");
+ }
+ },
+ };
+ },
+ });
+ return ctx;
+}
+
+/**
+ * Get a high-level API client from a remove wallet.
+ */
+function getClientFromRemoteWallet(w: RemoteWallet): WalletCoreApiClient {
+ const client: WalletCoreApiClient = {
+ async call(op, payload): Promise<any> {
+ const res = await w.makeCoreApiRequest(op, payload);
+ switch (res.type) {
+ case "error":
+ throw TalerError.fromUncheckedDetail(res.error);
+ case "response":
+ return res.result;
+ }
+ },
+ };
+ return client;
+}
+
+async function withWallet<T>(
+ walletCliArgs: WalletCliArgsType,
+ f: (ctx: WalletContext) => Promise<T>,
+): Promise<T> {
+ // Bookkeeping for waiting on notification conditions
+ let nextCondIndex = 1;
+ const condMap: Map<
+ number,
+ {
+ condition: (n: WalletNotification) => boolean;
+ promiseCapability: OpenedPromise<void>;
+ }
+ > = new Map();
+ function onNotification(n: WalletNotification) {
+ condMap.forEach((cond, condKey) => {
+ if (cond.condition(n)) {
+ cond.promiseCapability.resolve();
+ }
+ });
+ }
+ function waitForNotificationCond(cond: (n: WalletNotification) => boolean) {
+ const promCap = openPromise<void>();
+ condMap.set(nextCondIndex++, {
+ condition: cond,
+ promiseCapability: promCap,
+ });
+ return promCap.promise;
+ }
+
+ if (walletCliArgs.wallet.walletConnection) {
+ logger.info("creating remote wallet");
+ const w = await createRemoteWallet(onNotification);
+ const ctx: WalletContext = {
+ makeCoreApiRequest(operation, payload) {
+ return w.makeCoreApiRequest(operation, payload);
+ },
+ client: getClientFromRemoteWallet(w),
+ waitForNotificationCond,
+ };
+ const res = await f(ctx);
+ w.close();
+ return res;
+ } else {
+ const w = await createLocalWallet(walletCliArgs, onNotification);
+ const ctx: WalletContext = {
+ client: w.client,
+ waitForNotificationCond,
+ makeCoreApiRequest(operation, payload) {
+ return w.handleCoreApiRequest(operation, "my-req", payload);
+ },
+ };
+ return await f(ctx);
+ }
+}
+
+/**
+ * Run a function with a local wallet.
+ *
+ * Stops the wallet after the function is done.
+ */
+async function withLocalWallet<T>(
+ walletCliArgs: WalletCliArgsType,
+ f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>,
+): Promise<T> {
+ const w = await createLocalWallet(walletCliArgs);
+ const res = await f({ client: w.client, ws: w });
+ w.stop();
+ return res;
+}
+
walletCli
.subcommand("balance", "balance", { help: "Show wallet balance." })
.flag("json", ["--json"], {
@@ -277,9 +490,8 @@ walletCli
process.exit(1);
}
try {
- const resp = await wallet.ws.handleCoreApiRequest(
+ const resp = await wallet.makeCoreApiRequest(
args.api.operation,
- "reqid-1",
requestJson,
);
console.log(JSON.stringify(resp, undefined, 2));
@@ -338,7 +550,7 @@ transactionsCli
help: "Identifier of the transaction to delete",
})
.flag("force", ["--force"], {
- help: "Force aborting the transaction. Might lose money."
+ help: "Force aborting the transaction. Might lose money.",
})
.action(async (args) => {
await withWallet(args, async (wallet) => {
@@ -383,7 +595,7 @@ walletCli
.maybeOption("maxRetries", ["--max-retries"], clk.INT)
.flag("failOnMaxRetries", ["--fail-on-max-retries"])
.action(async (args) => {
- await withWallet(args, async (wallet) => {
+ await withLocalWallet(args, async (wallet) => {
logger.info("running until pending operations are finished");
const resp = await wallet.ws.runTaskLoop({
maxRetries: args.finishPendingOpt.maxRetries,
@@ -802,7 +1014,7 @@ depositCli
.requiredArgument("amount", clk.STRING)
.requiredArgument("targetPayto", clk.STRING)
.action(async (args) => {
- await withWallet(args, async (wallet) => {
+ await withLocalWallet(args, async (wallet) => {
const resp = await wallet.client.call(
WalletApiOperation.CreateDepositGroup,
{
@@ -815,6 +1027,7 @@ depositCli
});
});
+// FIXME: should probably be removed
depositCli
.subcommand("trackDepositArgs", "track")
.requiredArgument("depositGroupId", clk.STRING)
@@ -835,6 +1048,61 @@ const advancedCli = walletCli.subcommand("advancedArgs", "advanced", {
});
advancedCli
+ .subcommand("serve", "serve", {
+ help: "Serve the wallet API via a unix domain socket.",
+ })
+ .action(async (args) => {
+ const w = await createLocalWallet(args);
+ w.runTaskLoop()
+ .then((res) => {
+ logger.warn("task loop exited unexpectedly");
+ })
+ .catch((e) => {
+ logger.error(`error in task loop: ${e}`);
+ });
+ let nextClientId = 1;
+ const notifyHandlers = new Map<number, (n: WalletNotification) => void>();
+ w.addNotificationListener((n) => {
+ notifyHandlers.forEach((v, k) => {
+ v(n);
+ });
+ });
+ await runRpcServer({
+ socketFilename: "wallet-core.sock",
+ onConnect(client) {
+ logger.info("connected");
+ const clientId = nextClientId++;
+ notifyHandlers.set(clientId, (n: WalletNotification) => {
+ client.sendResponse({
+ type: "notification",
+ payload: n as unknown as JsonMessage,
+ });
+ });
+ return {
+ onDisconnect() {
+ notifyHandlers.delete(clientId);
+ logger.info("disconnected");
+ },
+ onMessage(msg) {
+ logger.info(`message: ${j2s(msg)}`);
+ const op = (msg as any).operation;
+ const id = (msg as any).id;
+ const payload = (msg as any).args;
+ w.handleCoreApiRequest(op, id, payload)
+ .then((resp) => {
+ logger.info("sending response");
+ client.sendResponse(resp as unknown as JsonMessage);
+ })
+ .catch((e) => {
+ logger.error(`unexpected error: ${e}`);
+ });
+ },
+ };
+ },
+ });
+ });
+
+advancedCli
.subcommand("init", "init", {
help: "Initialize the wallet (with DB) and exit.",
})
@@ -848,7 +1116,7 @@ advancedCli
})
.flag("forceNow", ["-f", "--force-now"])
.action(async (args) => {
- await withWallet(args, async (wallet) => {
+ await withLocalWallet(args, async (wallet) => {
await wallet.ws.runPending(args.runPendingOpt.forceNow);
});
});
diff --git a/packages/taler-wallet-cli/src/rpc.ts b/packages/taler-wallet-cli/src/rpc.ts
new file mode 100644
index 000000000..8070b96c5
--- /dev/null
+++ b/packages/taler-wallet-cli/src/rpc.ts
@@ -0,0 +1,266 @@
+/*
+ This file is part of GNU Taler
+ (C) 2023 Taler Systems S.A.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * Implementation for the wallet-core IPC protocol.
+ *
+ * Currently the protcol is completely unstable and only used internally
+ * by the wallet for testing purposes.
+ */
+
+/**
+ * Imports.
+ */
+import * as net from "node:net";
+import * as fs from "node:fs";
+import { bytesToString, Logger, typedArrayConcat } from "@gnu-taler/taler-util";
+
+const logger = new Logger("rpc.ts");
+
+export type JsonMessage =
+ | string
+ | number
+ | boolean
+ | null
+ | JsonMessage[]
+ | { [key: string]: JsonMessage };
+
+export interface RpcServerClientHandlers {
+ onMessage(msg: JsonMessage): void;
+ onDisconnect(): void;
+}
+
+export interface RpcServerClient {
+ sendResponse(message: JsonMessage): void;
+}
+
+export interface RpcServerArgs {
+ socketFilename: string;
+ onConnect(client: RpcServerClient): RpcServerClientHandlers;
+}
+
+export interface RpcClientServerConnection {
+ sendMessage(m: JsonMessage): void;
+ close(): void;
+}
+
+export interface RpcConnectArgs<T> {
+ socketFilename: string;
+ onEstablished(connection: RpcClientServerConnection): {
+ result: T;
+ onDisconnect(): void;
+ onMessage(m: JsonMessage): void;
+ };
+}
+
+export interface ReadLinewiseArgs {
+ onLine(lineData: Uint8Array): void;
+ sock: net.Socket;
+}
+
+function readStreamLinewise(args: ReadLinewiseArgs): void {
+ let chunks: Uint8Array[] = [];
+ args.sock.on("data", (buf: Uint8Array) => {
+ logger.info(`received ${buf.length} bytes`);
+ // Process all newlines in the newly received buffer
+ while (1) {
+ const newlineIdx = buf.indexOf("\n".charCodeAt(0));
+ if (newlineIdx >= 0) {
+ let left = buf.subarray(0, newlineIdx + 1);
+ let right = buf.subarray(newlineIdx + 1);
+ chunks.push(left);
+ const line = typedArrayConcat(chunks);
+ args.onLine(line);
+ chunks = [];
+ buf = right;
+ } else {
+ chunks.push(buf);
+ break;
+ }
+ }
+ });
+}
+
+export async function connectRpc<T>(args: RpcConnectArgs<T>): Promise<T> {
+ let sockFilename = args.socketFilename;
+ return new Promise((resolve, reject) => {
+ const client = net.createConnection(sockFilename);
+ client.on("connect", () => {
+ let parsingBody: string | undefined = undefined;
+ let bodyChunks: string[] = [];
+
+ logger.info("connected!");
+ client.write("%hello-from-client\n");
+ const res = args.onEstablished({
+ sendMessage(m) {
+ client.write("%request\n");
+ client.write(JSON.stringify(m));
+ client.write("\n");
+ client.write("%end\n");
+ },
+ close() {
+ client.destroy();
+ },
+ });
+ readStreamLinewise({
+ sock: client,
+ onLine(line) {
+ const lineStr = bytesToString(line);
+ logger.info(`got line from server: ${lineStr}`);
+ // Are we currently parsing the body of a request?
+ if (!parsingBody) {
+ const strippedLine = lineStr.trim();
+ if (strippedLine == "%message") {
+ logger.info("got message start");
+ parsingBody = "message";
+ } else if (strippedLine == "%hello-from-server") {
+ logger.info("got hello from server");
+ } else if (strippedLine.startsWith("%error:")) {
+ logger.info("got error from server, disconnecting");
+ client.end();
+ res.onDisconnect();
+ } else {
+ logger.info("got unknown request");
+ client.write("%error: invalid message\n");
+ client.end();
+ }
+ } else if (parsingBody == "message") {
+ const strippedLine = lineStr.trim();
+ if (strippedLine == "%end") {
+ logger.info("finished request");
+ let req = bodyChunks.join("");
+ let reqJson: any = undefined;
+ try {
+ reqJson = JSON.parse(req);
+ } catch (e) {
+ logger.warn("JSON request was invalid");
+ }
+ if (reqJson !== undefined) {
+ logger.info(`request: ${req}`);
+ res.onMessage(reqJson);
+ } else {
+ client.write("%error: invalid JSON");
+ client.end();
+ }
+ bodyChunks = [];
+ } else {
+ bodyChunks.push(lineStr);
+ }
+ } else {
+ logger.info("invalid parser state");
+ client.write("%error: internal error\n");
+ client.end();
+ }
+ },
+ });
+ client.on("close", () => {
+ res.onDisconnect();
+ });
+ client.on("data", () => {});
+ resolve(res.result);
+ });
+ });
+}
+
+export async function runRpcServer(args: RpcServerArgs): Promise<void> {
+ let sockFilename = args.socketFilename;
+ try {
+ fs.unlinkSync(sockFilename);
+ } catch (e) {
+ // Do nothing!
+ }
+ return new Promise((resolve, reject) => {
+ const server = net.createServer((sock) => {
+ // Are we currently parsing the body of a request?
+ let parsingBody: string | undefined = undefined;
+ let bodyChunks: string[] = [];
+
+ logger.info("got new connection");
+ sock.write("%hello-from-server\n");
+ const handlers = args.onConnect({
+ sendResponse(message) {
+ sock.write("%message\n");
+ sock.write(JSON.stringify(message));
+ sock.write("\n");
+ sock.write("%end\n");
+ },
+ });
+
+ sock.on("error", (err) => {
+ logger.info(`connection error: ${err}`);
+ });
+
+ function processLine(line: Uint8Array) {
+ const lineStr = bytesToString(line);
+ logger.info(`got line: ${lineStr}`);
+ if (!parsingBody) {
+ const strippedLine = lineStr.trim();
+ if (strippedLine == "%request") {
+ logger.info("got request start");
+ parsingBody = "request";
+ } else if (strippedLine === "%hello-from-client") {
+ console.log("got hello from client");
+ } else if (strippedLine.startsWith("%error:")) {
+ console.log("got error from client");
+ sock.end();
+ handlers.onDisconnect();
+ } else {
+ logger.info("got unknown request");
+ sock.write("%error: invalid request\n");
+ sock.end();
+ }
+ } else if (parsingBody == "request") {
+ const strippedLine = lineStr.trim();
+ if (strippedLine == "%end") {
+ logger.info("finished request");
+ let req = bodyChunks.join("");
+ let reqJson: any = undefined;
+ try {
+ reqJson = JSON.parse(req);
+ } catch (e) {
+ logger.warn("JSON request was invalid");
+ }
+ if (reqJson !== undefined) {
+ logger.info(`request: ${req}`);
+ handlers.onMessage(reqJson);
+ } else {
+ sock.write("%error: invalid JSON");
+ sock.end();
+ }
+ bodyChunks = [];
+ } else {
+ bodyChunks.push(lineStr);
+ }
+ } else {
+ logger.info("invalid parser state");
+ sock.write("%error: internal error\n");
+ sock.end();
+ }
+ }
+
+ readStreamLinewise({
+ sock,
+ onLine: processLine,
+ });
+
+ sock.on("close", (hadError: boolean) => {
+ logger.info(`connection closed, hadError=${hadError}`);
+ handlers.onDisconnect();
+ });
+ });
+ server.listen("wallet-core.sock");
+ });
+}