diff options
author | Christian Grothoff <christian@grothoff.org> | 2020-03-15 21:20:56 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2020-03-15 21:20:56 +0100 |
commit | d3f7cc11842a3e2a574431919179d037e56715ea (patch) | |
tree | 8b4c14787de5815432ed28ed6d550dbd4c58eed2 | |
parent | c898a1e13b1f06dafec35051dfd232510bd28de3 (diff) |
clean up wirewatch logic
-rw-r--r-- | src/exchange/exchange.conf | 8 | ||||
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 249 |
2 files changed, 151 insertions, 106 deletions
diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf index 8144bddcd..9de198949 100644 --- a/src/exchange/exchange.conf +++ b/src/exchange/exchange.conf @@ -54,9 +54,15 @@ PORT = 8081 BASE_URL = http://localhost:8081/ -# How long should the aggregator sleep if it has nothing to do? +# How long should the aggregator (and closer, and transfer) +# sleep if it has nothing to do? AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s +# How long should wirewatch sleep if it has nothing to do? +# (Set very aggressively here for the demonstrators to be +# super fast.) +WIREWATCH_IDLE_SLEEP_INTERVAL = 1 s + # how long is one signkey valid? SIGNKEY_DURATION = 4 weeks diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 3731f6633..04bf21698 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016, 2017, 2018 Taler Systems SA + Copyright (C) 2016--2020 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -29,12 +29,13 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" +#define DEBUG_LOGGING 0 + /** - * How long do we sleep before trying again if there - * are no transactions returned by the wire plugin? + * What is the initial batch size we use for credit history + * requests with the bank. See `batch_size` below. */ -#define DELAY GNUNET_TIME_UNIT_SECONDS - +#define INITIAL_BATCH_SIZE 1024 /** * Information we keep for each supported account. @@ -57,20 +58,58 @@ struct WireAccount char *section_name; /** + * Database session we are using for the current transaction. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Active request for history. + */ + struct TALER_BANK_CreditHistoryHandle *hh; + + /** * Authentication data. */ struct TALER_BANK_AuthenticationData auth; /** + * Until when is processing this wire plugin delayed? + */ + struct GNUNET_TIME_Absolute delayed_until; + + /** + * Encoded offset in the wire transfer list from where + * to start the next query with the bank. + */ + uint64_t last_row_off; + + /** + * Latest row offset seen in this transaction, becomes + * the new #last_row_off upon commit. + */ + uint64_t latest_row_off; + + /** + * How many transactions do we retrieve per batch? + */ + unsigned int batch_size; + + /** + * How many transactions did we see in the current batch? + */ + unsigned int current_batch_size; + + /** * Are we running from scratch and should re-process all transactions * for this account? */ int reset_mode; /** - * Until when is processing this wire plugin delayed? + * Should we delay the next request to the wire plugin a bit? Set to + * #GNUNET_NO if we actually did some work. */ - struct GNUNET_TIME_Absolute delayed_until; + int delay; }; @@ -86,7 +125,8 @@ static struct WireAccount *wa_head; static struct WireAccount *wa_tail; /** - * Wire plugin we are currently using. + * Wire account we are currently processing. This would go away + * if we ever start processing all accounts in parallel. */ static struct WireAccount *wa_pos; @@ -111,27 +151,25 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; static struct TALER_EXCHANGEDB_Plugin *db_plugin; /** - * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR - * on serious errors. - */ -static int global_ret; - -/** - * Encoded offset in the wire transfer list from where - * to start the next query with the bank. + * How long should we sleep when idle before trying to find more work? */ -static uint64_t last_row_off; +static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; /** - * Latest row offset seen in this transaction, becomes - * the new #last_row_off upon commit. - */ -static uint64_t latest_row_off; - -/** - * Should we delay the next request to the wire plugin a bit? + * Value to return from main(). 0 on success, non-zero on + * on serious errors. */ -static int delay; +static enum +{ + GR_SUCCESS = 0, + GR_DATABASE_SESSION_FAIL = 1, + GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2, + GR_DATABASE_SELECT_LATEST_HARD_FAIL = 3, + GR_BANK_REQUEST_HISTORY_FAIL = 4, + GR_CONFIGURATION_INVALID = 5, + GR_CMD_LINE_UTF8_ERROR = 6, + GR_CMD_LINE_OPTIONS_WRONG = 7, +} global_ret; /** * Are we run in testing mode and should only do one pass? @@ -144,25 +182,10 @@ static int test_mode; static int reset_mode; /** - * How many transactions do we retrieve per batch? - */ -static unsigned int batch_size = 1024; - -/** - * How many transactions did we see in the current batch? - */ -static unsigned int current_batch_size; - -/** - * Next task to run, if any. + * Current task waiting for execution, if any. */ static struct GNUNET_SCHEDULER_Task *task; -/** - * Active request for history. - */ -static struct TALER_BANK_CreditHistoryHandle *hh; - /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. @@ -173,11 +196,26 @@ static void shutdown_task (void *cls) { (void) cls; - if (NULL != hh) { - TALER_BANK_credit_history_cancel (hh); - hh = NULL; + struct WireAccount *wa; + + while (NULL != (wa = wa_head)) + { + if (NULL != wa->hh) + { + TALER_BANK_credit_history_cancel (wa->hh); + wa->hh = NULL; + } + GNUNET_CONTAINER_DLL_remove (wa_head, + wa_tail, + wa); + TALER_BANK_auth_free (&wa->auth); + GNUNET_free (wa->section_name); + GNUNET_free (wa); + } } + wa_pos = NULL; + if (NULL != ctx) { GNUNET_CURL_fini (ctx); @@ -195,21 +233,6 @@ shutdown_task (void *cls) } TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; - { - struct WireAccount *wa; - - while (NULL != (wa = wa_head)) - { - GNUNET_CONTAINER_DLL_remove (wa_head, - wa_tail, - wa); - TALER_BANK_auth_free (&wa->auth); - GNUNET_free (wa->section_name); - GNUNET_free (wa); - } - } - wa_pos = NULL; - last_row_off = 0; } @@ -243,6 +266,7 @@ add_account_cb (void *cls, return; } wa->section_name = GNUNET_strdup (ai->section_name); + wa->batch_size = INITIAL_BATCH_SIZE; GNUNET_CONTAINER_DLL_insert (wa_head, wa_tail, wa); @@ -258,6 +282,17 @@ add_account_cb (void *cls, static int exchange_serve_process_config (void) { + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_time (cfg, + "exchange", + "WIREWATCH_IDLE_SLEEP_INTERVAL", + &wirewatch_idle_sleep_interval)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "exchange", + "WIREWATCH_IDLE_SLEEP_INTERVAL"); + return GNUNET_SYSERR; + } if (NULL == (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) { @@ -292,7 +327,7 @@ find_transfers (void *cls); * Callbacks of this type are used to serve the result of asking * the bank for the transaction history. * - * @param cls closure with the `struct TALER_EXCHANGEDB_Session *` + * @param cls closure with the `struct WioreAccount *` we are processing * @param http_status HTTP status code from the server * @param ec taler error code * @param serial_id identification of the position at which we are querying @@ -308,13 +343,14 @@ history_cb (void *cls, const struct TALER_BANK_CreditDetails *details, const json_t *json) { - struct TALER_EXCHANGEDB_Session *session = cls; + struct WireAccount *wa = cls; + struct TALER_EXCHANGEDB_Session *session = wa->session; enum GNUNET_DB_QueryStatus qs; (void) json; if (NULL == details) { - hh = NULL; + wa->hh = NULL; if (TALER_EC_NONE != ec) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -331,8 +367,8 @@ history_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got DB soft error for commit\n"); /* reduce transaction size to reduce rollback probability */ - if (2 > current_batch_size) - current_batch_size /= 2; + if (2 > wa->current_batch_size) + wa->current_batch_size /= 2; /* try again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&find_transfers, @@ -342,27 +378,28 @@ history_cb (void *cls, if (0 < qs) { /* transaction success, update #last_row_off */ - last_row_off = latest_row_off; - latest_row_off = 0; - + wa->last_row_off = wa->latest_row_off; + wa->latest_row_off = 0; /* should not be needed */ + wa->session = NULL; /* should not be needed */ /* if successful at limit, try increasing transaction batch size (AIMD) */ - if (current_batch_size == batch_size) - batch_size++; + if ( (wa->current_batch_size == wa->batch_size) && + (UINT_MAX > wa->batch_size) ) + wa->batch_size++; } GNUNET_break (0 <= qs); - if ( (GNUNET_YES == delay) && + if ( (GNUNET_YES == wa->delay) && (test_mode) && - (NULL == wa_pos->next) ) + (NULL == wa->next) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Shutdown due to test mode!\n"); GNUNET_SCHEDULER_shutdown (); return GNUNET_OK; } - if (GNUNET_YES == delay) + if (GNUNET_YES == wa->delay) { - wa_pos->delayed_until - = GNUNET_TIME_relative_to_absolute (DELAY); + wa->delayed_until + = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); wa_pos = wa_pos->next; if (NULL == wa_pos) wa_pos = wa_head; @@ -381,29 +418,32 @@ history_cb (void *cls, /** * Debug block. */ +#if DEBUG_LOGGING { -/* Should be 53, give 80 just to be redundant. */ + /** Should be 53, give 80 just to be extra conservative (and aligned). */ #define PUBSIZE 80 char wtid_s[PUBSIZE]; - GNUNET_break - (NULL != GNUNET_STRINGS_data_to_string (&details->reserve_pub, - sizeof (details->reserve_pub), - &wtid_s[0], - PUBSIZE)); + GNUNET_break (NULL != + GNUNET_STRINGS_data_to_string (&details->reserve_pub, + sizeof (details->reserve_pub), + &wtid_s[0], + PUBSIZE)); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Plain text subject (= reserve_pub): %s\n", wtid_s); } +#endif - current_batch_size++; + if (wa->current_batch_size < UINT_MAX) + wa->current_batch_size++; qs = db_plugin->reserves_in_insert (db_plugin->cls, session, &details->reserve_pub, &details->amount, details->execution_date, details->debit_account_url, - wa_pos->section_name, + wa->section_name, serial_id); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { @@ -425,8 +465,8 @@ history_cb (void *cls, NULL); return GNUNET_SYSERR; } - - latest_row_off = serial_id; + wa->delay = GNUNET_NO; + wa->latest_row_off = serial_id; return GNUNET_OK; } @@ -446,12 +486,11 @@ find_transfers (void *cls) task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Checking for incoming wire transfers\n"); - if (NULL == (session = db_plugin->get_session (db_plugin->cls))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database session!\n"); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_SESSION_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -464,7 +503,7 @@ find_transfers (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -473,14 +512,14 @@ find_transfers (void *cls) qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls, session, wa_pos->section_name, - &last_row_off); + &wa_pos->last_row_off); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain starting point for montoring from database!\n"); db_plugin->rollback (db_plugin->cls, session); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -493,28 +532,28 @@ find_transfers (void *cls) NULL); return; } + wa_pos->reset_mode = GNUNET_NO; } - wa_pos->reset_mode = GNUNET_NO; - delay = GNUNET_YES; - current_batch_size = 0; + wa_pos->delay = GNUNET_YES; + wa_pos->current_batch_size = 0; /* reset counter */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wirewatch: requesting incoming history from %s\n", wa_pos->auth.wire_gateway_url); - - hh = TALER_BANK_credit_history (ctx, - &wa_pos->auth, - last_row_off, - batch_size, - &history_cb, - session); - if (NULL == hh) + wa_pos->session = session; + wa_pos->hh = TALER_BANK_credit_history (ctx, + &wa_pos->auth, + wa_pos->last_row_off, + wa_pos->batch_size, + &history_cb, + wa_pos); + if (NULL == wa_pos->hh) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start request for account history!\n"); db_plugin->rollback (db_plugin->cls, session); - global_ret = GNUNET_SYSERR; + global_ret = GR_BANK_REQUEST_HISTORY_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -542,7 +581,7 @@ run (void *cls, if (GNUNET_OK != exchange_serve_process_config ()) { - global_ret = 1; + global_ret = GR_CONFIGURATION_INVALID; return; } wa_pos = wa_head; @@ -567,7 +606,7 @@ run (void *cls, * * @param argc number of arguments from the command line * @param argv command line arguments - * @return 0 ok, 1 on error + * @return 0 ok, non-zero on error */ int main (int argc, @@ -590,7 +629,7 @@ main (int argc, if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) - return 2; + return GR_CMD_LINE_UTF8_ERROR; if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "taler-exchange-wirewatch", @@ -600,7 +639,7 @@ main (int argc, &run, NULL)) { GNUNET_free ((void *) argv); - return 1; + return GR_CMD_LINE_OPTIONS_WRONG; } GNUNET_free ((void *) argv); return global_ret; |