From 79c316f0d55ef404fbb2c2eea6182eb31e865c79 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 3 May 2016 07:57:49 +0200 Subject: enable multiple wire methods to be used with aggregator, add command to run aggregator in testcase --- doc/taler-exchange-aggregator.1 | 3 - src/exchange-lib/test_exchange_api.c | 126 ++++++++++------ src/exchange/taler-exchange-aggregator.c | 225 ++++++++++++++++++++-------- src/exchangedb/plugin_exchangedb_postgres.c | 17 ++- src/include/taler_exchangedb_plugin.h | 4 +- src/include/taler_signatures.h | 4 +- 6 files changed, 261 insertions(+), 118 deletions(-) diff --git a/doc/taler-exchange-aggregator.1 b/doc/taler-exchange-aggregator.1 index d0da4cbca..8525443e5 100644 --- a/doc/taler-exchange-aggregator.1 +++ b/doc/taler-exchange-aggregator.1 @@ -16,9 +16,6 @@ taler\-exchange\-aggregator \- Aggregate and execute exchange transactions .IP "\-d DIRNAME, \-\-exchange-dir=DIRNAME" Use the configuration and other resources for the exchange to operate from DIRNAME. .B -.IP "\-f WIREFORMAT, \-\-format=WIREFORMAT" -Overrides WIREFORMAT option from the configuation file. -.B .IP "\-h, \-\-help" Print short help on options. .B diff --git a/src/exchange-lib/test_exchange_api.c b/src/exchange-lib/test_exchange_api.c index e48c59b2a..addb4d741 100644 --- a/src/exchange-lib/test_exchange_api.c +++ b/src/exchange-lib/test_exchange_api.c @@ -122,7 +122,12 @@ enum OpCode /** * Verify exchange's /deposit/wtid method. */ - OC_DEPOSIT_WTID + OC_DEPOSIT_WTID, + + /** + * Run the aggregator to execute deposits. + */ + OC_RUN_AGGREGATOR }; @@ -535,6 +540,15 @@ struct Command } deposit_wtid; + struct { + + /** + * Process for the aggregator. + */ + struct GNUNET_OS_Process *aggregator_proc; + + } run_aggregator; + } details; }; @@ -623,6 +637,20 @@ static void interpreter_run (void *cls); +/** + * Run the next command with the interpreter. + * + * @param is current interpeter state. + */ +static void +next_command (struct InterpreterState *is) +{ + is->ip++; + is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, + is); +} + + /** * Function called upon completion of our /admin/add/incoming request. * @@ -646,9 +674,7 @@ add_incoming_cb (void *cls, fail (is); return; } - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -833,9 +859,7 @@ reserve_status_cb (void *cls, GNUNET_break (0); break; } - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -889,9 +913,7 @@ reserve_withdraw_cb (void *cls, GNUNET_break (0); break; } - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -923,9 +945,7 @@ deposit_cb (void *cls, fail (is); return; } - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -960,9 +980,7 @@ melt_cb (void *cls, return; } cmd->details.refresh_melt.noreveal_index = noreveal_index; - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -1024,10 +1042,7 @@ reveal_cb (void *cls, default: break; } - - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -1124,9 +1139,7 @@ link_cb (void *cls, default: break; } - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -1236,9 +1249,7 @@ wire_cb (void *cls, default: break; } - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -1325,11 +1336,7 @@ wire_deposits_cb (void *cls, default: break; } - - /* move to next command */ - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -1377,11 +1384,7 @@ deposit_wtid_cb (void *cls, default: break; } - - /* move to next command */ - is->ip++; - is->task = GNUNET_SCHEDULER_add_now (&interpreter_run, - is); + next_command (is); } @@ -1932,15 +1935,38 @@ interpreter_run (void *cls) json_decref (contract); cmd->details.deposit_wtid.dwh = TALER_EXCHANGE_deposit_wtid (exchange, - &ref->details.deposit.merchant_priv, - &h_wire, - &h_contract, - &coin_pub, - ref->details.deposit.transaction_id, - &deposit_wtid_cb, - is); + &ref->details.deposit.merchant_priv, + &h_wire, + &h_contract, + &coin_pub, + ref->details.deposit.transaction_id, + &deposit_wtid_cb, + is); } return; + case OC_RUN_AGGREGATOR: + { + cmd->details.run_aggregator.aggregator_proc + = GNUNET_OS_start_process (GNUNET_NO, + GNUNET_OS_INHERIT_STD_ALL, + NULL, NULL, NULL, + "taler-exchange-aggregator", + "taler-exchange-aggregator", + "-c", "test_exchange_api.conf", + "-t", /* exit when done */ + NULL); + if (NULL == cmd->details.run_aggregator.aggregator_proc) + { + GNUNET_break (0); + fail (is); + return; + } + GNUNET_OS_process_wait (cmd->details.run_aggregator.aggregator_proc); + GNUNET_OS_process_destroy (cmd->details.run_aggregator.aggregator_proc); + cmd->details.run_aggregator.aggregator_proc = NULL; + next_command (is); + return; + } default: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unknown instruction %d at %u (%s)\n", @@ -2122,6 +2148,17 @@ do_shutdown (void *cls) cmd->details.deposit_wtid.dwh = NULL; } break; + case OC_RUN_AGGREGATOR: + if (NULL != cmd->details.run_aggregator.aggregator_proc) + { + GNUNET_break (0 == + GNUNET_OS_process_kill (cmd->details.run_aggregator.aggregator_proc, + SIGKILL)); + GNUNET_OS_process_wait (cmd->details.run_aggregator.aggregator_proc); + GNUNET_OS_process_destroy (cmd->details.run_aggregator.aggregator_proc); + cmd->details.run_aggregator.aggregator_proc = NULL; + } + break; default: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unknown instruction %d at %u (%s)\n", @@ -2425,6 +2462,9 @@ run (void *cls) .label = "wire-deposit-failing", .expected_response_code = MHD_HTTP_NOT_FOUND }, + { .oc = OC_RUN_AGGREGATOR, + .label = "run-aggregator" }, + /* TODO: trigger aggregation logic and then check the cases where tracking succeeds! */ diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 4732e4e18..aeafd60f6 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -29,6 +29,35 @@ #include "taler_wire_lib.h" + + +/** + * Information we keep for each loaded wire plugin. + */ +struct WirePlugin +{ + /** + * Plugins are kept in a DLL. + */ + struct WirePlugin *next; + + /** + * Plugins are kept in a DLL. + */ + struct WirePlugin *prev; + + /** + * Handle to the plugin. + */ + struct TALER_WIRE_Plugin *wire_plugin; + + /** + * Name of the plugin. + */ + char *type; +}; + + /** * Data we keep to #run_transfers(). There is at most * one of these around at any given point in time. @@ -46,6 +75,11 @@ struct WirePrepareData */ struct TALER_WIRE_ExecuteHandle *eh; + /** + * Wire plugin used for this preparation. + */ + struct WirePlugin *wp; + /** * Row ID of the transfer. */ @@ -95,6 +129,11 @@ struct AggregationUnit */ json_t *wire; + /** + * Wire plugin to be used for the preparation. + */ + struct WirePlugin *wp; + /** * Database session for all of our transactions. */ @@ -129,11 +168,6 @@ struct AggregationUnit */ static char *exchange_currency_string; -/** - * Which wireformat should be supported by this aggregator? - */ -static char *exchange_wireformat; - /** * The exchange's configuration (global) */ @@ -145,9 +179,14 @@ static struct GNUNET_CONFIGURATION_Handle *cfg; static struct TALER_EXCHANGEDB_Plugin *db_plugin; /** - * Our wire plugin. + * Head of list of loaded wire plugins. + */ +static struct WirePlugin *wp_head; + +/** + * Tail of list of loaded wire plugins. */ -static struct TALER_WIRE_Plugin *wire_plugin; +static struct WirePlugin *wp_tail; /** * Next task to run, if any. @@ -189,6 +228,69 @@ static int test_mode; static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; +/** + * Extract wire plugin type from @a wire address + * + * @param wire a wire address + * @return NULL if @a wire is ill-formed + */ +const char * +extract_type (const json_t *wire) +{ + const char *type; + json_t *t; + + t = json_object_get (wire, "type"); + if (NULL == t) + { + GNUNET_break (0); + return NULL; + } + type = json_string_value (t); + if (NULL == type) + { + GNUNET_break (0); + return NULL; + } + return type; +} + + +/** + * Find the wire plugin for the given wire address. + * + * @param type wire plugin type we need a plugin for + * @return NULL on error + */ +static struct WirePlugin * +find_plugin (const char *type) +{ + struct WirePlugin *wp; + + if (NULL == type) + return NULL; + for (wp = wp_head; NULL != wp; wp = wp->next) + if (0 == strcmp (type, + wp->type)) + return wp; + wp = GNUNET_new (struct WirePlugin); + wp->wire_plugin = TALER_WIRE_plugin_load (cfg, + type); + if (NULL == wp->wire_plugin) + { + fprintf (stderr, + "Failed to load wire plugin for `%s'\n", + type); + GNUNET_free (wp); + return NULL; + } + wp->type = GNUNET_strdup (type); + GNUNET_CONTAINER_DLL_insert (wp_head, + wp_tail, + wp); + return wp; +} + /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. * @@ -197,6 +299,8 @@ static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT static void shutdown_task (void *cls) { + struct WirePlugin *wp; + if (NULL != task) { GNUNET_SCHEDULER_cancel (task); @@ -206,8 +310,8 @@ shutdown_task (void *cls) { if (NULL != wpd->eh) { - wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls, - wpd->eh); + wpd->wp->wire_plugin->execute_wire_transfer_cancel (wpd->wp->wire_plugin->cls, + wpd->eh); wpd->eh = NULL; } db_plugin->rollback (db_plugin->cls, @@ -219,8 +323,8 @@ shutdown_task (void *cls) { if (NULL != au->ph) { - wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls, - au->ph); + au->wp->wire_plugin->prepare_wire_transfer_cancel (au->wp->wire_plugin->cls, + au->ph); au->ph = NULL; } db_plugin->rollback (db_plugin->cls, @@ -232,7 +336,15 @@ shutdown_task (void *cls) GNUNET_free (au); } TALER_EXCHANGEDB_plugin_unload (db_plugin); - TALER_WIRE_plugin_unload (wire_plugin); + while (NULL != (wp = wp_head)) + { + GNUNET_CONTAINER_DLL_remove (wp_head, + wp_tail, + wp); + TALER_WIRE_plugin_unload (wp->wire_plugin); + GNUNET_free (wp->type); + GNUNET_free (wp); + } GNUNET_CONFIGURATION_destroy (cfg); cfg = NULL; } @@ -266,22 +378,6 @@ exchange_serve_process_config () (unsigned int) TALER_CURRENCY_LEN); return GNUNET_SYSERR; } - if (NULL != exchange_wireformat) - GNUNET_CONFIGURATION_set_value_string (cfg, - "exchange", - "wireformat", - exchange_wireformat); - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cfg, - "exchange", - "wireformat", - &exchange_wireformat)) - { - GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, - "exchange", - "wireformat"); - return GNUNET_SYSERR; - } if (NULL == (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) @@ -291,15 +387,6 @@ exchange_serve_process_config () return GNUNET_SYSERR; } - if (NULL == - (wire_plugin = TALER_WIRE_plugin_load (cfg, - exchange_wireformat))) - { - fprintf (stderr, - "Failed to load wire plugin for `%s'\n", - exchange_wireformat); - return GNUNET_SYSERR; - } return GNUNET_OK; } @@ -511,6 +598,7 @@ run_aggregation (void *cls) unsigned int i; int ret; const struct GNUNET_SCHEDULER_TaskContext *tc; + struct WirePlugin *wp; task = NULL; tc = GNUNET_SCHEDULER_get_task_context (); @@ -571,6 +659,19 @@ run_aggregation (void *cls) } return; } + + wp = find_plugin (extract_type (au->wire)); + if (NULL == wp) + { + json_decref (au->wire); + GNUNET_free (au); + au = NULL; + db_plugin->rollback (db_plugin->cls, + session); + GNUNET_SCHEDULER_shutdown (); + return; + } + /* Now try to find other deposits to aggregate */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Found ready deposit for %s, aggregating\n", @@ -588,8 +689,7 @@ run_aggregation (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); GNUNET_free_non_null (au->additional_rows); - if (NULL != au->wire) - json_decref (au->wire); + json_decref (au->wire); GNUNET_free (au); au = NULL; db_plugin->rollback (db_plugin->cls, @@ -597,10 +697,11 @@ run_aggregation (void *cls) global_ret = GNUNET_SYSERR; return; } + /* Round to the unit supported by the wire transfer method */ GNUNET_assert (GNUNET_SYSERR != - wire_plugin->amount_round (wire_plugin->cls, - &au->total_amount)); + wp->wire_plugin->amount_round (wp->wire_plugin->cls, + &au->total_amount)); /* Check if after rounding down, we still have an amount to transfer */ if ( (0 == au->total_amount.value) && (0 == au->total_amount.fraction) ) @@ -668,12 +769,13 @@ run_aggregation (void *cls) amount_s, TALER_B2S (&au->merchant_pub)); } - au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls, - au->wire, - &au->total_amount, - &au->wtid, - &prepare_cb, - au); + au->wp = wp; + au->ph = wp->wire_plugin->prepare_wire_transfer (wp->wire_plugin->cls, + au->wire, + &au->total_amount, + &au->wtid, + &prepare_cb, + au); if (NULL == au->ph) { GNUNET_break (0); /* why? how to best recover? */ @@ -717,11 +819,9 @@ prepare_cb (void *cls, { struct TALER_EXCHANGEDB_Session *session = au->session; - GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); - GNUNET_free (au); - au = NULL; + GNUNET_free_non_null (au->additional_rows); if (NULL == buf) { GNUNET_break (0); /* why? how to best recover? */ @@ -730,6 +830,8 @@ prepare_cb (void *cls, /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); + GNUNET_free (au); + au = NULL; return; } @@ -737,7 +839,7 @@ prepare_cb (void *cls, if (GNUNET_OK != db_plugin->wire_prepare_data_insert (db_plugin->cls, session, - exchange_wireformat, + au->wp->type, buf, buf_size)) { @@ -747,8 +849,12 @@ prepare_cb (void *cls, /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); + GNUNET_free (au); + au = NULL; return; } + GNUNET_free (au); + au = NULL; /* Now we can finally commit the overall transaction, as we are again consistent if all of this passes. */ @@ -839,12 +945,14 @@ wire_confirm_cb (void *cls, * * @param cls NULL * @param rowid row identifier used to mark prepared transaction as done + * @param wire_method wire method the preparation was done for * @param buf transaction data that was persisted, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ static void wire_prepare_cb (void *cls, unsigned long long rowid, + const char *wire_method, const char *buf, size_t buf_size) { @@ -852,11 +960,12 @@ wire_prepare_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting wire transfer %llu\n", rowid); - wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, - buf, - buf_size, - &wire_confirm_cb, - NULL); + wpd->wp = find_plugin (wire_method); + wpd->eh = wpd->wp->wire_plugin->execute_wire_transfer (wpd->wp->wire_plugin->cls, + buf, + buf_size, + &wire_confirm_cb, + NULL); if (NULL == wpd->eh) { GNUNET_break (0); /* why? how to best recover? */ @@ -910,7 +1019,6 @@ run_transfers (void *cls) wpd->session = session; ret = db_plugin->wire_prepare_data_get (db_plugin->cls, session, - exchange_wireformat, &wire_prepare_cb, NULL); if (GNUNET_SYSERR == ret) @@ -981,9 +1089,6 @@ main (int argc, char *const *argv) { static const struct GNUNET_GETOPT_CommandLineOption options[] = { - {'f', "format", "WIREFORMAT", - "wireformat to use, overrides WIREFORMAT option in [exchange] section", 1, - &GNUNET_GETOPT_set_filename, &exchange_wireformat}, {'t', "test", NULL, "run in test mode and exit when idle", 0, &GNUNET_GETOPT_set_one, &test_mode}, diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 1cc64ce4d..612dabb79 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1156,14 +1156,14 @@ postgres_prepare (PGconn *db_conn) PREPARE ("wire_prepare_data_get", "SELECT" " serial_id" + ",type" ",buf" " FROM prewire" " WHERE" - " type=$1 AND" " finished=false" " ORDER BY serial_id ASC" " LIMIT 1", - 1, NULL); + 0, NULL); return GNUNET_OK; #undef PREPARE @@ -4115,7 +4115,6 @@ postgres_wire_prepare_data_mark_finished (void *cls, * * @param cls closure * @param session database connection - * @param type type fo the wire transfer (i.e. "sepa") * @param cb function to call for ONE unfinished item * @param cb_cls closure for @a cb * @return #GNUNET_OK on success, @@ -4125,13 +4124,11 @@ postgres_wire_prepare_data_mark_finished (void *cls, static int postgres_wire_prepare_data_get (void *cls, struct TALER_EXCHANGEDB_Session *session, - const char *type, TALER_EXCHANGEDB_WirePreparationCallback cb, void *cb_cls) { PGresult *result; struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_string (type), GNUNET_PQ_query_param_end }; @@ -4158,14 +4155,17 @@ postgres_wire_prepare_data_get (void *cls, { uint64_t serial_id; + char *type; void *buf = NULL; size_t buf_size; struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_uint64 ("serial_id", - &serial_id), + &serial_id), + GNUNET_PQ_result_spec_string ("type", + &type), GNUNET_PQ_result_spec_variable_size ("buf", - &buf, - &buf_size), + &buf, + &buf_size), GNUNET_PQ_result_spec_end }; @@ -4180,6 +4180,7 @@ postgres_wire_prepare_data_get (void *cls, } cb (cb_cls, serial_id, + type, buf, buf_size); GNUNET_PQ_cleanup_result (rs); diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index ebcfe3757..8e00be751 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -684,12 +684,14 @@ typedef void * * @param cls closure * @param rowid row identifier used to mark prepared transaction as done + * @param wire_method which wire method is this preparation data for * @param buf transaction data that was persisted, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ typedef void (*TALER_EXCHANGEDB_WirePreparationCallback) (void *cls, unsigned long long rowid, + const char *wire_method, const char *buf, size_t buf_size); @@ -1479,7 +1481,6 @@ struct TALER_EXCHANGEDB_Plugin * * @param cls closure * @param session database connection - * @param type type fo the wire transfer (i.e. "sepa") * @param cb function to call for ONE unfinished item * @param cb_cls closure for @a cb * @return #GNUNET_OK on success, @@ -1489,7 +1490,6 @@ struct TALER_EXCHANGEDB_Plugin int (*wire_prepare_data_get)(void *cls, struct TALER_EXCHANGEDB_Session *session, - const char *type, TALER_EXCHANGEDB_WirePreparationCallback cb, void *cb_cls); diff --git a/src/include/taler_signatures.h b/src/include/taler_signatures.h index 4ce57b0f8..e338916e1 100644 --- a/src/include/taler_signatures.h +++ b/src/include/taler_signatures.h @@ -418,7 +418,7 @@ struct TALER_RefundRequestPS /** * The coin's public key. This is the value that must have been - * signed (blindly) by the Exchange. + * signed (blindly) by the Exchange. */ struct TALER_CoinSpendPublicKeyP coin_pub; @@ -912,7 +912,7 @@ struct TALER_DepositTrackPS /** * @brief Format internally used for packing the detailed information - * to generate the signature for /wire/deposit signatures. + * to generate the signature for /wire/deposits signatures. */ struct TALER_WireDepositDetailP { -- cgit v1.2.3