diff options
author | Christian Grothoff <christian@grothoff.org> | 2020-03-12 06:11:48 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2020-03-12 08:17:46 +0100 |
commit | a1db41e09a618c3a9797242ee593da1331175c14 (patch) | |
tree | 7f0f123c7dcdb521df82eb8b18074a9c73e461f5 /src | |
parent | 1896c1dfb58b9e11bd2b4d3822823a623de7004a (diff) |
aggregator clean up
Diffstat (limited to 'src')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 269 |
1 files changed, 143 insertions, 126 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 90f9f28c7..5f99a472b 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -18,6 +18,19 @@ * @file taler-exchange-aggregator.c * @brief Process that aggregates outgoing transactions and executes them * @author Christian Grothoff + * + * Note: + * It might be simpler and theoretically more performant to split up + * this process into three: + * - one that runs the 'pending' wire transfers + * - one that performs aggregation + * - one that closes (expired) reserves + * + * They would have some (minor) code duplication to load the database and wire + * plugins and account data, and this would also slightly complicate + * operations by having to launch three processes. OTOH, those processes could + * then fail independently, which might also be a good thing. In any case, + * doing this is not expected to be complicated. */ #include "platform.h" #include <gnunet/gnunet_util_lib.h> @@ -30,7 +43,7 @@ /** - * Information we keep for each supported account. + * Information we keep for each supported account of the exchange. */ struct WireAccount { @@ -70,6 +83,8 @@ struct WireAccount /** * Data we keep to #run_transfers(). There is at most * one of these around at any given point in time. + * Note that this limits parallelism, and we might want + * to revise this decision at a later point. */ struct WirePrepareData { @@ -100,6 +115,8 @@ struct WirePrepareData /** * Information about one aggregation process to be executed. There is * at most one of these around at any given point in time. + * Note that this limits parallelism, and we might want + * to revise this decision at a later point. */ struct AggregationUnit { @@ -139,7 +156,8 @@ struct AggregationUnit unsigned long long row_id; /** - * The current time. + * The current time (which triggered the aggregation and + * defines the wire fee). */ struct GNUNET_TIME_Absolute execution_time; @@ -149,7 +167,8 @@ struct AggregationUnit json_t *wire; /** - * Wire account to be used for the preparation. + * Exchange wire account to be used for the preparation and + * eventual execution of the aggregate wire transfer. */ struct WireAccount *wa; @@ -164,13 +183,13 @@ struct AggregationUnit struct TALER_BANK_PrepareHandle *ph; /** - * Array of #aggregation_limit row_ids from the + * Array of #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT row_ids from the * aggregation. */ unsigned long long *additional_rows; /** - * Offset specifying how many #additional_rows are in use. + * Offset specifying how many @e additional_rows are in use. */ unsigned int rows_offset; @@ -222,32 +241,35 @@ static struct CloseTransferContext *ctc; static char *exchange_currency_string; /** - * How many fractional digits does the currency use? + * What is the smallest unit we support for wire transfers? + * We will need to round down to a multiple of this amount. */ static struct TALER_Amount currency_round_unit; /** - * What is the base URL of this exchange? + * What is the base URL of this exchange? Used in the + * wire transfer subjects to that merchants and governments + * can ask for the list of aggregated deposits. */ static char *exchange_base_url; /** - * The exchange's configuration (global) + * The exchange's configuration. */ -static struct GNUNET_CONFIGURATION_Handle *cfg; +static const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Our DB plugin. + * Our database plugin. */ static struct TALER_EXCHANGEDB_Plugin *db_plugin; /** - * Head of list wire accounts of the exchange. + * Head of list of wire accounts of the exchange. */ static struct WireAccount *wa_head; /** - * Head of list wire accounts of the exchange. + * Tail of list of wire accounts of the exchange. */ static struct WireAccount *wa_tail; @@ -263,13 +285,7 @@ static struct GNUNET_SCHEDULER_Task *task; static struct WirePrepareData *wpd; /** - * If we are currently aggregating transactions, information about the - * active aggregation is here. Otherwise, this variable is NULL. - */ -static struct AggregationUnit *au; - -/** - * Handle to the context for interacting with the bank. + * Handle to the context for interacting with the bank / wire gateway. */ static struct GNUNET_CURL_Context *ctx; @@ -296,21 +312,11 @@ static int test_mode; /** * Did #run_reserve_closures() have any work during its last run? + * Used to detect when we should go to sleep for a while to avoid + * busy waiting. */ static int reserves_idle; -/** - * Limit on the number of transactions we aggregate at once. Note - * that the limit must be big enough to ensure that when transactions - * of the smallest possible unit are aggregated, they do surpass the - * "tiny" threshold beyond which we never trigger a wire transaction! - * - * Note: do not change here, Postgres requires us to hard-code the - * LIMIT in the prepared statement. - */ -static unsigned int aggregation_limit = - TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; - /** * Main work function that finds and triggers transfers for reserves @@ -336,15 +342,14 @@ run_aggregation (void *cls); * Execute the wire transfers that we have committed to * do. * - * @param cls pointer to an `int` which we will return from main() + * @param cls NULL */ static void run_transfers (void *cls); /** - * Find the record valid at time @a now in the fee - * structure. + * Find the record valid at time @a now in the fee structure. * * @param wa wire transfer fee data structure to update * @param now timestamp to update fees to @@ -356,7 +361,6 @@ advance_fees (struct WireAccount *wa, { struct TALER_EXCHANGEDB_AggregateFees *af; - /* First, try to see if we have current fee information in memory */ af = wa->af; while ( (NULL != af) && (af->end_date.abs_value_us < now.abs_value_us) ) @@ -416,8 +420,9 @@ update_fees (struct WireAccount *wa, if (NULL != af) return af; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to find current wire transfer fees for `%s'\n", - wa->method); + "Failed to find current wire transfer fees for `%s' at %s\n", + wa->method, + GNUNET_STRINGS_absolute_time_to_string (now)); return NULL; } @@ -435,6 +440,9 @@ find_account_by_method (const char *method) if (0 == strcmp (method, wa->method)) return wa; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No wire account known for method `%s'\n", + method); return NULL; } @@ -454,9 +462,9 @@ find_account_by_payto_uri (const char *url) method = TALER_payto_get_method (url); if (NULL == method) { - fprintf (stderr, - "Invalid payto:// URL `%s'\n", - url); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Invalid payto:// URL `%s'\n", + url); return NULL; } wa = find_account_by_method (method); @@ -496,6 +504,15 @@ add_account_cb (void *cls, return; } wa->method = TALER_payto_get_method (payto_uri); + if (NULL == wa->method) + { + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, + ai->section_name, + "PAYTO_URI", + "could not obtain wire method from URI"); + GNUNET_free (wa); + return; + } GNUNET_free (payto_uri); if (GNUNET_OK != TALER_BANK_auth_parse_cfg (cfg, @@ -517,21 +534,20 @@ add_account_cb (void *cls, /** - * Free data stored in #au. + * Free data stored in @a au, but not @a au itself (stack allocated). + * + * @param au aggreation unit to clean up */ static void -cleanup_au (void) +cleanup_au (struct AggregationUnit *au) { - if (NULL == au) - return; + GNUNET_assert (NULL != au); GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) - { json_decref (au->wire); - au->wire = NULL; - } - GNUNET_free (au); - au = NULL; + memset (au, + 0, + sizeof (*au)); } @@ -573,12 +589,6 @@ shutdown_task (void *cls) GNUNET_free (wpd); wpd = NULL; } - if (NULL != au) - { - db_plugin->rollback (db_plugin->cls, - au->session); - cleanup_au (); - } if (NULL != ctc) { db_plugin->rollback (db_plugin->cls, @@ -605,7 +615,6 @@ shutdown_task (void *cls) GNUNET_free (wa); } } - GNUNET_CONFIGURATION_destroy (cfg); cfg = NULL; } @@ -643,20 +652,20 @@ parse_wirewatch_config () if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "taler", - "currency", + "CURRENCY", &exchange_currency_string)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "taler", - "currency"); + "CURRENCY"); return GNUNET_SYSERR; } if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN) { - fprintf (stderr, - "Currency `%s' longer than the allowed limit of %u characters.", - exchange_currency_string, - (unsigned int) TALER_CURRENCY_LEN); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Currency `%s' longer than the allowed limit of %u characters.", + exchange_currency_string, + (unsigned int) TALER_CURRENCY_LEN); return GNUNET_SYSERR; } @@ -678,8 +687,8 @@ parse_wirewatch_config () if (NULL == (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) { - fprintf (stderr, - "Failed to initialize DB subsystem\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to initialize DB subsystem\n"); return GNUNET_SYSERR; } TALER_EXCHANGEDB_find_accounts (cfg, @@ -687,8 +696,8 @@ parse_wirewatch_config () NULL); if (NULL == wa_head) { - fprintf (stderr, - "No wire accounts configured for debit!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No wire accounts configured for debit!\n"); TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; return GNUNET_SYSERR; @@ -732,7 +741,7 @@ refund_by_coin_cb (void *cls, * Function called with details about deposits that have been made, * with the goal of executing the corresponding wire transaction. * - * @param cls NULL + * @param cls a `struct AggregationUnit` * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin @@ -755,6 +764,7 @@ deposit_cb (void *cls, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { + struct AggregationUnit *au = cls; enum GNUNET_DB_QueryStatus qs; (void) cls; @@ -886,7 +896,7 @@ deposit_cb (void *cls, * Function called with details about another deposit we * can aggregate into an existing aggregation unit. * - * @param cls NULL + * @param cls a `struct AggregationUnit` * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin @@ -909,16 +919,25 @@ aggregate_cb (void *cls, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { + struct AggregationUnit *au = cls; struct TALER_Amount delta; enum GNUNET_DB_QueryStatus qs; - (void) cls; /* NOTE: potential optimization: use custom SQL API to not fetch these: */ (void) wire_deadline; /* checked by SQL */ (void) wire; /* must match */ GNUNET_break (0 == GNUNET_memcmp (&au->merchant_pub, merchant_pub)); + + if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) + { + /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ + GNUNET_break (0); + /* Skip this one, but keep going with the overall transaction */ + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + /* compute contribution of this coin after fees */ /* add to total */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -968,16 +987,10 @@ aggregate_cb (void *cls, au->total_amount = delta; } - if (au->rows_offset >= aggregation_limit) - { - /* Bug: we asked for at most #aggregation_limit results! */ - GNUNET_break (0); - /* Skip this one, but keep going. */ - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } if (NULL == au->additional_rows) - au->additional_rows = GNUNET_new_array (aggregation_limit, - unsigned long long); + au->additional_rows = GNUNET_new_array ( + TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, + unsigned long long); /* "append" to our list of rows */ au->additional_rows[au->rows_offset++] = row_id; /* insert into aggregation tracking table */ @@ -990,9 +1003,6 @@ aggregate_cb (void *cls, GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marks aggregated deposit %llu as DONE\n", - (unsigned long long) row_id); qs = db_plugin->mark_deposit_done (db_plugin->cls, au->session, row_id); @@ -1002,7 +1012,7 @@ aggregate_cb (void *cls, return qs; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Added row %llu with %s to aggregation\n", + "Aggregator marked deposit %llu over %s as DONE\n", (unsigned long long) row_id, TALER_amount2s (&delta)); return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; @@ -1097,7 +1107,9 @@ expired_reserve_cb (void *cls, wa = find_account_by_payto_uri (account_payto_uri); if (NULL == wa) { - GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No wire account configured to deal with target URI `%s'\n", + account_payto_uri); global_ret = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown (); return GNUNET_DB_STATUS_HARD_ERROR; @@ -1127,7 +1139,7 @@ expired_reserve_cb (void *cls, if ( (GNUNET_SYSERR == ret) || (GNUNET_NO == ret) ) { - /* Closing fee higher than remaining balance, close + /* Closing fee higher than or equal to remaining balance, close without wire transfer. */ closing_fee = left; GNUNET_assert (GNUNET_OK == @@ -1345,6 +1357,7 @@ static void run_aggregation (void *cls) { static unsigned int swap; + struct AggregationUnit au_active; struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; const struct GNUNET_SCHEDULER_TaskContext *tc; @@ -1383,15 +1396,17 @@ run_aggregation (void *cls) GNUNET_SCHEDULER_shutdown (); return; } - au = GNUNET_new (struct AggregationUnit); - au->session = session; + memset (&au_active, + 0, + sizeof (au_active)); + au_active.session = session; qs = db_plugin->get_ready_deposit (db_plugin->cls, session, &deposit_cb, - au); + &au_active); if (0 >= qs) { - cleanup_au (); + cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls, session); if (GNUNET_DB_STATUS_HARD_ERROR == qs) @@ -1444,20 +1459,20 @@ run_aggregation (void *cls) /* Now try to find other deposits to aggregate */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Found ready deposit for %s, aggregating\n", - TALER_B2S (&au->merchant_pub)); + TALER_B2S (&au_active.merchant_pub)); qs = db_plugin->iterate_matching_deposits (db_plugin->cls, session, - &au->h_wire, - &au->merchant_pub, + &au_active.h_wire, + &au_active.merchant_pub, &aggregate_cb, - au, - aggregation_limit); + &au_active, + TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT); if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) || - (GNUNET_YES == au->failed) ) + (GNUNET_YES == au_active.failed) ) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); - cleanup_au (); + cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls, session); global_ret = GNUNET_SYSERR; @@ -1471,6 +1486,7 @@ run_aggregation (void *cls) "Serialization issue, trying again later!\n"); db_plugin->rollback (db_plugin->cls, session); + cleanup_au (&au_active); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); @@ -1481,19 +1497,19 @@ run_aggregation (void *cls) wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ if ( (GNUNET_OK != - TALER_amount_subtract (&au->final_amount, - &au->total_amount, - &au->wire_fee)) || + TALER_amount_subtract (&au_active.final_amount, + &au_active.total_amount, + &au_active.wire_fee)) || (GNUNET_SYSERR == - TALER_amount_round_down (&au->final_amount, + TALER_amount_round_down (&au_active.final_amount, ¤cy_round_unit)) || - ( (0 == au->final_amount.value) && - (0 == au->final_amount.fraction) ) ) + ( (0 == au_active.final_amount.value) && + (0 == au_active.final_amount.fraction) ) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Aggregate value too low for transfer (%d/%s)\n", qs, - TALER_amount2s (&au->final_amount)); + TALER_amount2s (&au_active.final_amount)); /* Rollback ongoing transaction, as we will not use the respective WTID and thus need to remove the tracking data */ db_plugin->rollback (db_plugin->cls, @@ -1509,21 +1525,21 @@ run_aggregation (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); global_ret = GNUNET_SYSERR; - cleanup_au (); + cleanup_au (&au_active); GNUNET_SCHEDULER_shutdown (); return; } /* Mark transactions by row_id as minor */ qs = db_plugin->mark_deposit_tiny (db_plugin->cls, session, - au->row_id); + au_active.row_id); if (0 <= qs) { - for (unsigned int i = 0; i<au->rows_offset; i++) + for (unsigned int i = 0; i<au_active.rows_offset; i++) { qs = db_plugin->mark_deposit_tiny (db_plugin->cls, session, - au->additional_rows[i]); + au_active.additional_rows[i]); if (0 > qs) break; } @@ -1534,7 +1550,7 @@ run_aggregation (void *cls) "Serialization issue, trying again later!\n"); db_plugin->rollback (db_plugin->cls, session); - cleanup_au (); + cleanup_au (&au_active); /* start again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -1545,13 +1561,14 @@ run_aggregation (void *cls) { db_plugin->rollback (db_plugin->cls, session); - cleanup_au (); + cleanup_au (&au_active); GNUNET_SCHEDULER_shutdown (); return; } /* commit */ (void) commit_or_warn (session); - cleanup_au (); + cleanup_au (&au_active); + /* start again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -1561,34 +1578,34 @@ run_aggregation (void *cls) { char *amount_s; - amount_s = TALER_amount_to_string (&au->final_amount); + amount_s = TALER_amount_to_string (&au_active.final_amount); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Preparing wire transfer of %s to %s\n", amount_s, - TALER_B2S (&au->merchant_pub)); + TALER_B2S (&au_active.merchant_pub)); GNUNET_free (amount_s); } { char *url; - url = TALER_JSON_wire_to_payto (au->wire); + url = TALER_JSON_wire_to_payto (au_active.wire); TALER_BANK_prepare_transfer (url, - &au->final_amount, + &au_active.final_amount, exchange_base_url, - &au->wtid, + &au_active.wtid, &buf, &buf_size); GNUNET_free (url); } - GNUNET_free_non_null (au->additional_rows); - au->additional_rows = NULL; + GNUNET_free_non_null (au_active.additional_rows); + au_active.additional_rows = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Storing %u bytes of wire prepare data\n", (unsigned int) buf_size); /* Commit our intention to execute the wire transfer! */ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, session, - au->wa->method, + au_active.wa->method, buf, buf_size); GNUNET_free (buf); @@ -1597,12 +1614,13 @@ run_aggregation (void *cls) if (qs >= 0) qs = db_plugin->store_wire_transfer_out (db_plugin->cls, session, - au->execution_time, - &au->wtid, - au->wire, - au->wa->section_name, - &au->final_amount); - cleanup_au (); + au_active.execution_time, + &au_active.wtid, + au_active.wire, + au_active.wa->section_name, + &au_active.final_amount); + cleanup_au (&au_active); + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1912,10 +1930,9 @@ run (void *cls, (void) args; (void) cfgfile; - cfg = GNUNET_CONFIGURATION_dup (c); + cfg = c; if (GNUNET_OK != parse_wirewatch_config ()) { - GNUNET_CONFIGURATION_destroy (cfg); cfg = NULL; global_ret = 1; return; @@ -1966,7 +1983,7 @@ main (int argc, GNUNET_PROGRAM_run (argc, argv, "taler-exchange-aggregator", gettext_noop ( - "background process that aggregates and executes wire transfers to merchants"), + "background process that aggregates and executes wire transfers"), options, &run, NULL)) { |