/* This file is part of GNU Taler (C) 2023 Taler Systems S.A. GNU Taler is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with GNU Taler; see the file COPYING. If not, see */ import * as net from "node:net"; import * as fs from "node:fs"; import { Logger } from "./logging.js"; import { bytesToString, typedArrayConcat } from "./taler-crypto.js"; import type { RpcConnectArgs, RpcServerArgs } from "./twrpc.js"; interface ReadLinewiseArgs { onLine(lineData: Uint8Array): void; sock: net.Socket; } const logger = new Logger("twrpc-impl.node.ts"); function readStreamLinewise(args: ReadLinewiseArgs): void { let chunks: Uint8Array[] = []; args.sock.on("data", (buf: Uint8Array) => { logger.info(`received ${buf.length} bytes`); // Process all newlines in the newly received buffer while (1) { const newlineIdx = buf.indexOf("\n".charCodeAt(0)); if (newlineIdx >= 0) { let left = buf.subarray(0, newlineIdx + 1); let right = buf.subarray(newlineIdx + 1); chunks.push(left); const line = typedArrayConcat(chunks); args.onLine(line); chunks = []; buf = right; } else { chunks.push(buf); break; } } }); } export async function connectRpc(args: RpcConnectArgs): Promise { let sockFilename = args.socketFilename; return new Promise((resolve, reject) => { const client = net.createConnection(sockFilename); client.on("error", (e) => { reject(e); }); client.on("connect", () => { let parsingBody: string | undefined = undefined; let bodyChunks: string[] = []; logger.info("connected!"); client.write("%hello-from-client\n"); const res = args.onEstablished({ sendMessage(m) { client.write("%request\n"); client.write(JSON.stringify(m)); client.write("\n"); client.write("%end\n"); }, close() { client.destroy(); }, }); readStreamLinewise({ sock: client, onLine(line) { const lineStr = bytesToString(line); logger.info(`got line from server: ${lineStr}`); // Are we currently parsing the body of a request? if (!parsingBody) { const strippedLine = lineStr.trim(); if (strippedLine == "%message") { logger.info("got message start"); parsingBody = "message"; } else if (strippedLine == "%hello-from-server") { logger.info("got hello from server"); } else if (strippedLine.startsWith("%error:")) { logger.info("got error from server, disconnecting"); client.end(); res.onDisconnect(); } else { logger.info("got unknown request"); client.write("%error: invalid message\n"); client.end(); } } else if (parsingBody == "message") { const strippedLine = lineStr.trim(); if (strippedLine == "%end") { logger.info("finished request"); let req = bodyChunks.join(""); let reqJson: any = undefined; try { reqJson = JSON.parse(req); } catch (e) { logger.warn("JSON message from server was invalid"); logger.info(`message was: ${req}`); } if (reqJson !== undefined) { logger.info(`request: ${req}`); res.onMessage(reqJson); } else { client.write("%error: invalid JSON"); client.end(); } bodyChunks = []; parsingBody = undefined; } else { bodyChunks.push(lineStr); } } else { logger.info("invalid parser state"); client.write("%error: internal error\n"); client.end(); } }, }); client.on("close", () => { res.onDisconnect(); }); client.on("data", () => {}); resolve(res.result); }); }); } export async function runRpcServer(args: RpcServerArgs): Promise { let sockFilename = args.socketFilename; try { fs.unlinkSync(sockFilename); } catch (e) { // Do nothing! } return new Promise((resolve, reject) => { const server = net.createServer((sock) => { // Are we currently parsing the body of a request? let parsingBody: string | undefined = undefined; let bodyChunks: string[] = []; logger.info("got new connection"); sock.write("%hello-from-server\n"); const handlers = args.onConnect({ sendResponse(message) { sock.write("%message\n"); sock.write(JSON.stringify(message)); sock.write("\n"); sock.write("%end\n"); }, }); sock.on("error", (err) => { logger.info(`connection error: ${err}`); }); function processLine(line: Uint8Array) { const lineStr = bytesToString(line); logger.info(`got line: ${lineStr}`); if (!parsingBody) { const strippedLine = lineStr.trim(); if (strippedLine == "%request") { logger.info("got request start"); parsingBody = "request"; } else if (strippedLine === "%hello-from-client") { console.log("got hello from client"); } else if (strippedLine.startsWith("%error:")) { console.log("got error from client"); sock.end(); handlers.onDisconnect(); } else { logger.info("got unknown request"); sock.write("%error: invalid request\n"); sock.end(); } } else if (parsingBody == "request") { const strippedLine = lineStr.trim(); if (strippedLine == "%end") { logger.info("finished request"); let req = bodyChunks.join(""); let reqJson: any = undefined; try { reqJson = JSON.parse(req); } catch (e) { logger.warn("JSON request from client was invalid"); } if (reqJson !== undefined) { logger.info(`request: ${req}`); handlers.onMessage(reqJson); } else { sock.write("%error: invalid JSON"); sock.end(); } bodyChunks = []; parsingBody = undefined; } else { bodyChunks.push(lineStr); } } else { logger.info("invalid parser state"); sock.write("%error: internal error\n"); sock.end(); } } readStreamLinewise({ sock, onLine: processLine, }); sock.on("close", (hadError: boolean) => { logger.info(`connection closed, hadError=${hadError}`); handlers.onDisconnect(); }); }); server.listen(args.socketFilename); }); }