/*
This file is part of TALER
Copyright (C) 2023 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
TALER; see the file COPYING. If not, see
*/
/**
* @file conversion.c
* @brief helper routines to run some external JSON-to-JSON converter
* @author Christian Grothoff
*/
#include "platform.h"
#include "taler_util.h"
#include "taler_json_lib.h"
#include
struct TALER_JSON_ExternalConversion
{
/**
* Callback to call with the result.
*/
TALER_JSON_JsonCallback cb;
/**
* Closure for @e cb.
*/
void *cb_cls;
/**
* Handle to the helper process.
*/
struct GNUNET_OS_Process *helper;
/**
* Pipe for the stdin of the @e helper.
*/
struct GNUNET_DISK_FileHandle *chld_stdin;
/**
* Pipe for the stdout of the @e helper.
*/
struct GNUNET_DISK_FileHandle *chld_stdout;
/**
* Handle to wait on the child to terminate.
*/
struct GNUNET_ChildWaitHandle *cwh;
/**
* Task to read JSON output from the child.
*/
struct GNUNET_SCHEDULER_Task *read_task;
/**
* Task to send JSON input to the child.
*/
struct GNUNET_SCHEDULER_Task *write_task;
/**
* Buffer with data we need to send to the helper.
*/
void *write_buf;
/**
* Buffer for reading data from the helper.
*/
void *read_buf;
/**
* Total length of @e write_buf.
*/
size_t write_size;
/**
* Current write position in @e write_buf.
*/
size_t write_pos;
/**
* Current size of @a read_buf.
*/
size_t read_size;
/**
* Current offset in @a read_buf.
*/
size_t read_pos;
};
/**
* Function called when we can read more data from
* the child process.
*
* @param cls our `struct TALER_JSON_ExternalConversion *`
*/
static void
read_cb (void *cls)
{
struct TALER_JSON_ExternalConversion *ec = cls;
ec->read_task = NULL;
while (1)
{
ssize_t ret;
if (ec->read_size == ec->read_pos)
{
/* Grow input buffer */
size_t ns;
void *tmp;
ns = GNUNET_MAX (2 * ec->read_size,
1024);
if (ns > GNUNET_MAX_MALLOC_CHECKED)
ns = GNUNET_MAX_MALLOC_CHECKED;
if (ec->read_size == ns)
{
/* Helper returned more than 40 MB of data! Stop reading! */
GNUNET_break (0);
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_file_close (ec->chld_stdin));
return;
}
tmp = GNUNET_malloc_large (ns);
if (NULL == tmp)
{
/* out of memory, also stop reading */
GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
"malloc");
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_file_close (ec->chld_stdin));
return;
}
GNUNET_memcpy (tmp,
ec->read_buf,
ec->read_pos);
GNUNET_free (ec->read_buf);
ec->read_buf = tmp;
ec->read_size = ns;
}
ret = GNUNET_DISK_file_read (ec->chld_stdout,
ec->read_buf + ec->read_pos,
ec->read_size - ec->read_pos);
if (ret < 0)
{
if ( (EAGAIN != errno) &&
(EWOULDBLOCK != errno) &&
(EINTR != errno) )
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"read");
return;
}
break;
}
if (0 == ret)
{
/* regular end of stream, good! */
return;
}
GNUNET_assert (ec->read_size >= ec->read_pos + ret);
ec->read_pos += ret;
}
ec->read_task
= GNUNET_SCHEDULER_add_read_file (
GNUNET_TIME_UNIT_FOREVER_REL,
ec->chld_stdout,
&read_cb,
ec);
}
/**
* Function called when we can write more data to
* the child process.
*
* @param cls our `struct TALER_JSON_ExternalConversion *`
*/
static void
write_cb (void *cls)
{
struct TALER_JSON_ExternalConversion *ec = cls;
ssize_t ret;
ec->write_task = NULL;
while (ec->write_size > ec->write_pos)
{
ret = GNUNET_DISK_file_write (ec->chld_stdin,
ec->write_buf + ec->write_pos,
ec->write_size - ec->write_pos);
if (ret < 0)
{
if ( (EAGAIN != errno) &&
(EINTR != errno) )
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"write");
break;
}
if (0 == ret)
{
GNUNET_break (0);
break;
}
GNUNET_assert (ec->write_size >= ec->write_pos + ret);
ec->write_pos += ret;
}
if ( (ec->write_size > ec->write_pos) &&
( (EAGAIN == errno) ||
(EWOULDBLOCK == errno) ||
(EINTR == errno) ) )
{
ec->write_task
= GNUNET_SCHEDULER_add_write_file (
GNUNET_TIME_UNIT_FOREVER_REL,
ec->chld_stdin,
&write_cb,
ec);
}
else
{
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_file_close (ec->chld_stdin));
ec->chld_stdin = NULL;
}
}
/**
* Defines a GNUNET_ChildCompletedCallback which is sent back
* upon death or completion of a child process.
*
* @param cls handle for the callback
* @param type type of the process
* @param exit_code status code of the process
*
*/
static void
child_done_cb (void *cls,
enum GNUNET_OS_ProcessStatusType type,
long unsigned int exit_code)
{
struct TALER_JSON_ExternalConversion *ec = cls;
json_t *j = NULL;
json_error_t err;
ec->cwh = NULL;
if (NULL != ec->read_task)
{
GNUNET_SCHEDULER_cancel (ec->read_task);
/* We could get the process termination notification before having drained
the read buffer. So drain it now, just in case. */
read_cb (ec);
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Conversion helper exited with status %d and code %llu after outputting %llu bytes of data\n",
(int) type,
(unsigned long long) exit_code,
(unsigned long long) ec->read_pos);
GNUNET_OS_process_destroy (ec->helper);
ec->helper = NULL;
if (0 != ec->read_pos)
{
j = json_loadb (ec->read_buf,
ec->read_pos,
JSON_REJECT_DUPLICATES,
&err);
if (NULL == j)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to parse JSON from helper at %d: %s\n",
err.position,
err.text);
}
}
ec->cb (ec->cb_cls,
type,
exit_code,
j);
json_decref (j);
TALER_JSON_external_conversion_stop (ec);
}
struct TALER_JSON_ExternalConversion *
TALER_JSON_external_conversion_start (const json_t *input,
TALER_JSON_JsonCallback cb,
void *cb_cls,
const char *binary,
...)
{
struct TALER_JSON_ExternalConversion *ec;
struct GNUNET_DISK_PipeHandle *pipe_stdin;
struct GNUNET_DISK_PipeHandle *pipe_stdout;
va_list ap;
ec = GNUNET_new (struct TALER_JSON_ExternalConversion);
ec->cb = cb;
ec->cb_cls = cb_cls;
pipe_stdin = GNUNET_DISK_pipe (GNUNET_DISK_PF_BLOCKING_READ);
GNUNET_assert (NULL != pipe_stdin);
pipe_stdout = GNUNET_DISK_pipe (GNUNET_DISK_PF_BLOCKING_WRITE);
GNUNET_assert (NULL != pipe_stdout);
va_start (ap,
binary);
ec->helper = GNUNET_OS_start_process_va (GNUNET_OS_INHERIT_STD_ERR,
pipe_stdin,
pipe_stdout,
NULL,
binary,
ap);
va_end (ap);
if (NULL == ec->helper)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to run conversion helper `%s'\n",
binary);
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_pipe_close (pipe_stdin));
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_pipe_close (pipe_stdout));
GNUNET_free (ec);
return NULL;
}
ec->chld_stdin =
GNUNET_DISK_pipe_detach_end (pipe_stdin,
GNUNET_DISK_PIPE_END_WRITE);
ec->chld_stdout =
GNUNET_DISK_pipe_detach_end (pipe_stdout,
GNUNET_DISK_PIPE_END_READ);
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_pipe_close (pipe_stdin));
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_pipe_close (pipe_stdout));
ec->write_buf = json_dumps (input, JSON_COMPACT);
ec->write_size = strlen (ec->write_buf);
ec->read_task
= GNUNET_SCHEDULER_add_read_file (GNUNET_TIME_UNIT_FOREVER_REL,
ec->chld_stdout,
&read_cb,
ec);
ec->write_task
= GNUNET_SCHEDULER_add_write_file (GNUNET_TIME_UNIT_FOREVER_REL,
ec->chld_stdin,
&write_cb,
ec);
ec->cwh = GNUNET_wait_child (ec->helper,
&child_done_cb,
ec);
return ec;
}
void
TALER_JSON_external_conversion_stop (
struct TALER_JSON_ExternalConversion *ec)
{
if (NULL != ec->cwh)
{
GNUNET_wait_child_cancel (ec->cwh);
ec->cwh = NULL;
}
if (NULL != ec->helper)
{
GNUNET_break (0 ==
GNUNET_OS_process_kill (ec->helper,
SIGKILL));
GNUNET_OS_process_destroy (ec->helper);
}
if (NULL != ec->read_task)
{
GNUNET_SCHEDULER_cancel (ec->read_task);
ec->read_task = NULL;
}
if (NULL != ec->write_task)
{
GNUNET_SCHEDULER_cancel (ec->write_task);
ec->write_task = NULL;
}
if (NULL != ec->chld_stdin)
{
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_file_close (ec->chld_stdin));
ec->chld_stdin = NULL;
}
if (NULL != ec->chld_stdout)
{
GNUNET_break (GNUNET_OK ==
GNUNET_DISK_file_close (ec->chld_stdout));
ec->chld_stdout = NULL;
}
GNUNET_free (ec->read_buf);
free (ec->write_buf);
GNUNET_free (ec);
}