aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2020-03-14 22:56:14 +0100
committerChristian Grothoff <christian@grothoff.org>2020-03-14 22:56:14 +0100
commitce44b4a02849a4f9f3e9cf3fd2e76da4ab2b0e64 (patch)
tree3f3ef3ff0a6dea56cebbd6871eeee0fd91105852
parent6aca928cf89bbd3ddc9384d773415dbb04f0659f (diff)
clean up aggregator logic, make it more robust against invariant failures
m---------doc/prebuilt0
-rw-r--r--src/exchange/taler-exchange-aggregator.c221
2 files changed, 115 insertions, 106 deletions
diff --git a/doc/prebuilt b/doc/prebuilt
-Subproject 934a6a18301e81c4fd1b3a8cda2dc13dca4741c
+Subproject ca53235ccfa0458ebf11c204888ca370e20ec3f
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 59db4daef..c3b94b3d8 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -16,7 +16,7 @@
/**
* @file taler-exchange-aggregator.c
- * @brief Process that aggregates outgoing transactions and executes them
+ * @brief Process that aggregates outgoing transactions and prepares their execution
* @author Christian Grothoff
*/
#include "platform.h"
@@ -70,7 +70,7 @@ struct AggregationUnit
/**
* Row ID of the transaction that started it all.
*/
- unsigned long long row_id;
+ uint64_t row_id;
/**
* The current time (which triggered the aggregation and
@@ -100,10 +100,9 @@ struct AggregationUnit
struct TALER_BANK_PrepareHandle *ph;
/**
- * Array of #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT row_ids from the
- * aggregation.
+ * Array of row_ids from the aggregation.
*/
- unsigned long long *additional_rows;
+ uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
/**
* Offset specifying how many @e additional_rows are in use.
@@ -124,11 +123,6 @@ struct AggregationUnit
/**
- * Which currency is used by this exchange?
- */
-static char *exchange_currency_string;
-
-/**
* What is the smallest unit we support for wire transfers?
* We will need to round down to a multiple of this amount.
*/
@@ -162,10 +156,23 @@ static struct GNUNET_SCHEDULER_Task *task;
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
/**
- * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
- * on serious errors.
+ * Value to return from main(). 0 on success, non-zero on erorrs.
*/
-static int global_ret;
+static enum
+{
+ GR_SUCCESS = 0,
+ GR_DATABASE_SESSION_FAIL = 1,
+ GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
+ GR_DATABASE_READY_DEPOSIT_HARD_FAIL = 3,
+ GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL = 4,
+ GR_DATABASE_TINY_MARK_HARD_FAIL = 5,
+ GR_DATABASE_PREPARE_HARD_FAIL = 6,
+ GR_DATABASE_PREPARE_COMMIT_HARD_FAIL = 7,
+ GR_INVARIANT_FAILURE = 8,
+ GR_CONFIGURATION_INVALID = 9,
+ GR_CMD_LINE_UTF8_ERROR = 9,
+ GR_CMD_LINE_OPTIONS_WRONG = 10,
+} global_ret;
/**
* #GNUNET_YES if we are in test mode and should exit when idle.
@@ -192,7 +199,6 @@ static void
cleanup_au (struct AggregationUnit *au)
{
GNUNET_assert (NULL != au);
- GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
memset (au,
@@ -230,7 +236,7 @@ shutdown_task (void *cls)
* @return #GNUNET_OK on success
*/
static int
-parse_wirewatch_config ()
+parse_wirewatch_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -254,33 +260,11 @@ parse_wirewatch_config ()
"AGGREGATOR_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_string (cfg,
- "taler",
- "CURRENCY",
- &exchange_currency_string))
- {
- GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
- "taler",
- "CURRENCY");
- return GNUNET_SYSERR;
- }
- if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Currency `%s' longer than the allowed limit of %u characters.",
- exchange_currency_string,
- (unsigned int) TALER_CURRENCY_LEN);
- return GNUNET_SYSERR;
- }
-
if ( (GNUNET_OK !=
TALER_config_get_amount (cfg,
"taler",
"CURRENCY_ROUND_UNIT",
&currency_round_unit)) ||
- (0 != strcasecmp (exchange_currency_string,
- currency_round_unit.currency)) ||
( (0 != currency_round_unit.fraction) &&
(0 != currency_round_unit.value) ) )
{
@@ -396,19 +380,29 @@ deposit_cb (void *cls,
}
if (GNUNET_NO == au->have_refund)
{
+ struct TALER_Amount ntotal;
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Non-refunded transaction, subtracting deposit fee %s\n",
TALER_amount2s (deposit_fee));
if (GNUNET_SYSERR ==
- TALER_amount_subtract (&au->total_amount,
+ TALER_amount_subtract (&ntotal,
amount_with_fee,
deposit_fee))
{
+ /* This should never happen, issue a warning, but continue processing
+ with an amount of zero, least we hang here for good. */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Fatally malformed record at row %llu over %s\n",
+ "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n",
(unsigned long long) row_id,
TALER_amount2s (amount_with_fee));
- return GNUNET_DB_STATUS_HARD_ERROR;
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_get_zero (au->total_amount.currency,
+ &au->total_amount));
+ }
+ else
+ {
+ au->total_amount = ntotal;
}
}
@@ -440,13 +434,16 @@ deposit_cb (void *cls,
url = TALER_JSON_wire_to_payto (au->wire);
au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url);
+ if (NULL == au->wa)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "No exchange account configured for `%s', please fix your setup to continue!\n",
+ url);
+ GNUNET_free (url);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
GNUNET_free (url);
}
- if (NULL == au->wa)
- {
- GNUNET_break (0);
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
/* make sure we have current fees */
au->execution_time = GNUNET_TIME_absolute_get ();
@@ -462,7 +459,8 @@ deposit_cb (void *cls,
if (NULL == af)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Could not get or persist wire fees. Aborting run.\n");
+ "Could not get or persist wire fees for %s. Aborting run.\n",
+ GNUNET_STRINGS_absolute_time_to_string (au->execution_time));
return GNUNET_DB_STATUS_HARD_ERROR;
}
au->wire_fee = af->wire_fee;
@@ -549,17 +547,6 @@ aggregate_cb (void *cls,
"Adding transaction amount %s from row %llu to aggregation\n",
TALER_amount2s (amount_with_fee),
(unsigned long long) row_id);
- if (GNUNET_OK !=
- TALER_amount_add (&au->total_amount,
- &au->total_amount,
- amount_with_fee))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Overflow or currency incompatibility during aggregation at %llu\n",
- (unsigned long long) row_id);
- /* Skip this one, but keep going! */
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- }
au->have_refund = GNUNET_NO;
qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
au->session,
@@ -580,22 +567,43 @@ aggregate_cb (void *cls,
TALER_amount2s (deposit_fee));
if (GNUNET_SYSERR ==
TALER_amount_subtract (&delta,
- &au->total_amount,
+ amount_with_fee,
deposit_fee))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Fatally malformed record at %llu over amount %s\n",
+ "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n",
(unsigned long long) row_id,
TALER_amount2s (&au->total_amount));
- return GNUNET_DB_STATUS_HARD_ERROR;
}
- au->total_amount = delta;
+ else
+ {
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_get_zero (au->total_amount.currency,
+ &delta));
+ }
+ }
+ else
+ {
+ delta = *amount_with_fee;
+ }
+
+ {
+ struct TALER_Amount tmp;
+
+ if (GNUNET_OK !=
+ TALER_amount_add (&tmp,
+ &au->total_amount,
+ &delta))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Overflow or currency incompatibility during aggregation at %llu\n",
+ (unsigned long long) row_id);
+ /* Skip this one, but keep going! */
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ }
+ au->total_amount = tmp;
}
- if (NULL == au->additional_rows)
- au->additional_rows = GNUNET_new_array (
- TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT,
- unsigned long long);
/* "append" to our list of rows */
au->additional_rows[au->rows_offset++] = row_id;
/* insert into aggregation tracking table */
@@ -659,22 +667,16 @@ run_aggregation (void *cls)
struct AggregationUnit au_active;
struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs;
- const struct GNUNET_SCHEDULER_TaskContext *tc;
- void *buf;
- size_t buf_size;
(void) cls;
task = NULL;
- tc = GNUNET_SCHEDULER_get_task_context ();
- if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
- return;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for ready deposits to aggregate\n");
if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database session!\n");
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_SESSION_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -684,7 +686,7 @@ run_aggregation (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -705,7 +707,7 @@ run_aggregation (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_READY_DEPOSIT_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -754,7 +756,7 @@ run_aggregation (void *cls)
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls,
session);
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -803,7 +805,7 @@ run_aggregation (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
cleanup_au (&au_active);
GNUNET_SCHEDULER_shutdown ();
return;
@@ -841,6 +843,7 @@ run_aggregation (void *cls)
db_plugin->rollback (db_plugin->cls,
session);
cleanup_au (&au_active);
+ global_ret = GR_DATABASE_TINY_MARK_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -864,30 +867,35 @@ run_aggregation (void *cls)
TALER_B2S (&au_active.merchant_pub));
GNUNET_free (amount_s);
}
+
{
- char *url;
+ void *buf;
+ size_t buf_size;
- url = TALER_JSON_wire_to_payto (au_active.wire);
- TALER_BANK_prepare_transfer (url,
- &au_active.final_amount,
- exchange_base_url,
- &au_active.wtid,
- &buf,
- &buf_size);
- GNUNET_free (url);
+ {
+ char *url;
+
+ url = TALER_JSON_wire_to_payto (au_active.wire);
+ TALER_BANK_prepare_transfer (url,
+ &au_active.final_amount,
+ exchange_base_url,
+ &au_active.wtid,
+ &buf,
+ &buf_size);
+ GNUNET_free (url);
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Storing %u bytes of wire prepare data\n",
+ (unsigned int) buf_size);
+ /* Commit our intention to execute the wire transfer! */
+ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ session,
+ au_active.wa->method,
+ buf,
+ buf_size);
+ GNUNET_free (buf);
}
- GNUNET_free_non_null (au_active.additional_rows);
- au_active.additional_rows = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Storing %u bytes of wire prepare data\n",
- (unsigned int) buf_size);
- /* Commit our intention to execute the wire transfer! */
- qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
- session,
- au_active.wa->method,
- buf,
- buf_size);
- GNUNET_free (buf);
/* Commit the WTID data to 'wire_out' to finally satisfy aggregation
table constraints */
if (qs >= 0)
@@ -918,7 +926,7 @@ run_aggregation (void *cls)
db_plugin->rollback (db_plugin->cls,
session);
/* die hard */
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_PREPARE_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -940,7 +948,7 @@ run_aggregation (void *cls)
return;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_PREPARE_COMMIT_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
@@ -952,7 +960,7 @@ run_aggregation (void *cls)
return;
default:
GNUNET_break (0);
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_INVARIANT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -981,7 +989,7 @@ run (void *cls,
if (GNUNET_OK != parse_wirewatch_config ())
{
cfg = NULL;
- global_ret = 1;
+ global_ret = GR_CONFIGURATION_INVALID;
return;
}
GNUNET_assert (NULL == task);
@@ -997,7 +1005,7 @@ run (void *cls,
*
* @param argc number of arguments from the command line
* @param argv command line arguments
- * @return 0 ok, 1 on error
+ * @return 0 ok, non-zero on error, see #global_ret
*/
int
main (int argc,
@@ -1014,9 +1022,10 @@ main (int argc,
GNUNET_GETOPT_OPTION_END
};
- if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv,
- &argc, &argv))
- return 2;
+ if (GNUNET_OK !=
+ GNUNET_STRINGS_get_utf8_args (argc, argv,
+ &argc, &argv))
+ return GR_CMD_LINE_UTF8_ERROR;
if (GNUNET_OK !=
GNUNET_PROGRAM_run (argc, argv,
"taler-exchange-aggregator",
@@ -1026,7 +1035,7 @@ main (int argc,
&run, NULL))
{
GNUNET_free ((void *) argv);
- return 1;
+ return GR_CMD_LINE_OPTIONS_WRONG;
}
GNUNET_free ((void *) argv);
return global_ret;