From 30997afc7fc81f1fa6af85c754390209d0200a67 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 17 Nov 2022 21:50:20 +0100 Subject: -more work on wirewatch revision --- src/exchange/taler-exchange-wirewatch.c | 814 ++++++++++++++------------------ 1 file changed, 358 insertions(+), 456 deletions(-) (limited to 'src') diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index d84344fc8..0d902bf25 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -13,7 +13,6 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ - /** * @file taler-exchange-wirewatch.c * @brief Process that watches for wire transfers to the exchange's bank account @@ -43,122 +42,88 @@ #define MAXIMUM_BATCH_SIZE 1024 /** - * Information we keep for each supported account. + * Information about our account. */ -struct WireAccount -{ - /** - * Accounts are kept in a DLL. - */ - struct WireAccount *next; - - /** - * Plugins are kept in a DLL. - */ - struct WireAccount *prev; - - /** - * Information about this account. - */ - const struct TALER_EXCHANGEDB_AccountInfo *ai; - - /** - * Active request for history. - */ - struct TALER_BANK_CreditHistoryHandle *hh; - - /** - * Until when is processing this wire plugin delayed? - */ - struct GNUNET_TIME_Absolute delayed_until; - - /** - * Encoded offset in the wire transfer list from where - * to start the next query with the bank. - */ - uint64_t batch_start; - - /** - * Latest row offset seen in this transaction, becomes - * the new #batch_start upon commit. - */ - uint64_t latest_row_off; - - /** - * Maximum row offset this transaction may yield. If we got the - * maximum number of rows, we must not @e delay before running - * the next transaction. - */ - uint64_t max_row_off; - - /** - * Offset where our current shard begins (inclusive). - */ - uint64_t shard_start; - - /** - * Offset where our current shard ends (exclusive). - */ - uint64_t shard_end; - - /** - * When did we start with the shard? - */ - struct GNUNET_TIME_Absolute shard_start_time; - - /** - * For how long did we lock the shard? - */ - struct GNUNET_TIME_Absolute shard_end_time; - - /** - * How long did we take to finish the last shard - * for this account? - */ - struct GNUNET_TIME_Relative shard_delay; - - /** - * Name of our job in the shard table. - */ - char *job_name; - - /** - * How many transactions do we retrieve per batch? - */ - unsigned int batch_size; - - /** - * How much do we increment @e batch_size on success? - */ - unsigned int batch_thresh; - - /** - * Should we delay the next request to the wire plugin a bit? Set to - * false if we actually did some work. - */ - bool delay; - - /** - * Did we start a transaction yet? - */ - bool started_transaction; - - /** - * Is this shard still open for processing. - */ - bool shard_open; -}; +static const struct TALER_EXCHANGEDB_AccountInfo *ai; + +/** + * Active request for history. + */ +static struct TALER_BANK_CreditHistoryHandle *hh; + +/** + * Until when is processing this wire plugin delayed? + */ +static struct GNUNET_TIME_Absolute delayed_until; + +/** + * Encoded offset in the wire transfer list from where + * to start the next query with the bank. + */ +static uint64_t batch_start; + +/** + * Latest row offset seen in this transaction, becomes + * the new #batch_start upon commit. + */ +static uint64_t latest_row_off; + +/** + * Offset where our current shard begins (inclusive). + */ +static uint64_t shard_start; + +/** + * Offset where our current shard ends (exclusive). + */ +static uint64_t shard_end; + +/** + * When did we start with the shard? + */ +static struct GNUNET_TIME_Absolute shard_start_time; + +/** + * For how long did we lock the shard? + */ +static struct GNUNET_TIME_Absolute shard_end_time; + +/** + * How long did we take to finish the last shard + * for this account? + */ +static struct GNUNET_TIME_Relative shard_delay; + +/** + * Name of our job in the shard table. + */ +static char *job_name; + +/** + * How many transactions do we retrieve per batch? + */ +static unsigned int batch_size; +/** + * How much do we increment @e batch_size on success? + */ +static unsigned int batch_thresh; /** - * Head of list of loaded wire plugins. + * Did work remain in the transaction queue? Set to true + * if we did some work and thus there might be more. */ -static struct WireAccount *wa_head; +static bool progress; /** - * Tail of list of loaded wire plugins. + * Did we start a transaction yet? */ -static struct WireAccount *wa_tail; +static bool started_transaction; + +/** + * Is this shard still open for processing. + */ +static bool shard_open; /** * Handle to the context for interacting with the bank. @@ -227,6 +192,10 @@ static int ignore_account_404; */ static struct GNUNET_SCHEDULER_Task *task; +/** + * Name of the configuration section with the account we should watch. + */ +static char *account_section; /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. @@ -236,38 +205,27 @@ static struct GNUNET_SCHEDULER_Task *task; static void shutdown_task (void *cls) { + enum GNUNET_DB_QueryStatus qs; (void) cls; - { - struct WireAccount *wa; - while (NULL != (wa = wa_head)) - { - enum GNUNET_DB_QueryStatus qs; - - if (NULL != wa->hh) - { - TALER_BANK_credit_history_cancel (wa->hh); - wa->hh = NULL; - } - GNUNET_CONTAINER_DLL_remove (wa_head, - wa_tail, - wa); - if (wa->started_transaction) - { - db_plugin->rollback (db_plugin->cls); - wa->started_transaction = false; - } - qs = db_plugin->abort_shard (db_plugin->cls, - wa->job_name, - wa->shard_start, - wa->shard_end); - if (qs <= 0) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to abort work shard on shutdown\n"); - GNUNET_free (wa->job_name); - GNUNET_free (wa); - } + if (NULL != hh) + { + TALER_BANK_credit_history_cancel (hh); + hh = NULL; + } + if (started_transaction) + { + db_plugin->rollback (db_plugin->cls); + started_transaction = false; } + qs = db_plugin->abort_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + if (qs <= 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to abort work shard on shutdown\n"); + GNUNET_free (job_name); if (NULL != ctx) { GNUNET_CURL_fini (ctx); @@ -295,28 +253,36 @@ shutdown_task (void *cls) * account to our list (if it is enabled and we can load the plugin). * * @param cls closure, NULL - * @param ai account information + * @param in_ai account information */ static void add_account_cb (void *cls, - const struct TALER_EXCHANGEDB_AccountInfo *ai) + const struct TALER_EXCHANGEDB_AccountInfo *in_ai) { - struct WireAccount *wa; - (void) cls; - if (! ai->credit_enabled) + if (! in_ai->credit_enabled) + return; /* not enabled for us, skip */ + if ( (NULL != account_section) && + (0 != strcasecmp (ai->section_name, + account_section)) ) return; /* not enabled for us, skip */ - wa = GNUNET_new (struct WireAccount); - wa->ai = ai; - GNUNET_asprintf (&wa->job_name, + if (NULL != ai) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n", + ai->section_name, + in_ai->section_name); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_INVALIDARGUMENT; + return; + } + ai = in_ai; + GNUNET_asprintf (&job_name, "wirewatch-%s", ai->section_name); - wa->batch_size = MAXIMUM_BATCH_SIZE; - if (0 != shard_size % wa->batch_size) - wa->batch_size = shard_size; - GNUNET_CONTAINER_DLL_insert (wa_head, - wa_tail, - wa); + batch_size = MAXIMUM_BATCH_SIZE; + if (0 != shard_size % batch_size) + batch_size = shard_size; } @@ -360,7 +326,16 @@ exchange_serve_process_config (void) } TALER_EXCHANGEDB_find_accounts (&add_account_cb, NULL); - GNUNET_assert (NULL != wa_head); + if (NULL == ai) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No accounts enabled for credit!\n"); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_INVALIDARGUMENT; + TALER_EXCHANGEDB_plugin_unload (db_plugin); + db_plugin = NULL; + return GNUNET_SYSERR; + } return GNUNET_OK; } @@ -368,240 +343,111 @@ exchange_serve_process_config (void) /** * Lock a shard and then begin to query for incoming wire transfers. * - * @param cls a `struct WireAccount` to operate on + * @param cls NULL */ static void lock_shard (void *cls); /** - * Continue with the credit history of the shard - * reserved as @a wa. + * Continue with the credit history of the shard. * - * @param[in,out] cls `struct WireAccount *` account with shard to continue processing + * @param cls NULL */ static void continue_with_shard (void *cls); /** - * We encountered a serialization error. - * Rollback the transaction and try again - * - * @param wa account we are transacting on + * We encountered a serialization error. Rollback the transaction and try + * again. */ static void -handle_soft_error (struct WireAccount *wa) +handle_soft_error (void) { db_plugin->rollback (db_plugin->cls); - wa->started_transaction = false; - if (1 < wa->batch_size) + started_transaction = false; + if (1 < batch_size) { - wa->batch_thresh = wa->batch_size; - wa->batch_size /= 2; + batch_thresh = batch_size; + batch_size /= 2; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Reduced batch size to %llu due to serialization issue\n", - (unsigned long long) wa->batch_size); + (unsigned long long) batch_size); } /* Reset to beginning of transaction, and go again from there. */ - wa->latest_row_off = wa->batch_start; + latest_row_off = batch_start; GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); + NULL); } /** - * Schedule the #lock_shard() operation for - * @a wa. If @a wa is NULL, start with #wa_head. - * - * @param wa account to schedule #lock_shard() for, - * possibly NULL (!). + * Schedule the #lock_shard() operation. */ static void -schedule_transfers (struct WireAccount *wa) +schedule_transfers (void) { - if (NULL == wa) - { - wa = wa_head; - GNUNET_assert (NULL != wa); - } - if (wa->shard_open) + if (shard_open) GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Will retry my shard (%llu,%llu] of %s in %s\n", - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, - wa->job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + job_name, GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_remaining (wa->delayed_until), - GNUNET_YES)); + GNUNET_TIME_absolute_get_remaining (delayed_until), + true)); else GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Will try to lock next shard of %s in %s\n", - wa->job_name, + job_name, GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_remaining (wa->delayed_until), - GNUNET_YES)); + GNUNET_TIME_absolute_get_remaining (delayed_until), + true)); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_at (wa->delayed_until, + task = GNUNET_SCHEDULER_add_at (delayed_until, &lock_shard, - wa); + NULL); } /** - * We are done with the work that is possible on @a wa right now (and the - * transaction was committed, if there was one to commit). Move on to the next - * account. - * - * @param wa wire account for which we completed a shard + * We are done with the work that is possible right now (and the transaction + * was committed, if there was one to commit). Move on to the next shard. */ static void -account_completed (struct WireAccount *wa) +transaction_completed (void) { - GNUNET_assert (! wa->started_transaction); - if ( (wa->batch_start + wa->batch_size == - wa->latest_row_off) && - (wa->batch_size < MAXIMUM_BATCH_SIZE) ) + GNUNET_assert (! started_transaction); + if ( (batch_start + batch_size == + latest_row_off) && + (batch_size < MAXIMUM_BATCH_SIZE) ) { /* The current batch size worked without serialization issues, and we are allowed to grow. Do so slowly. */ int delta; - delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; + delta = ((int) batch_thresh - (int) batch_size) / 4; if (delta < 0) delta = -delta; - wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, - wa->batch_size + delta + 1); + batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, + batch_size + delta + 1); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Increasing batch size to %llu\n", - (unsigned long long) wa->batch_size); - } - - if (wa->delay) - { - /* This account was finished, block this one for the - #wirewatch_idle_sleep_interval and move on to the next one. */ - wa->delayed_until - = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); - wa = wa->next; + (unsigned long long) batch_size); } - GNUNET_assert (NULL == task); - schedule_transfers (wa); -} - -/** - * Check if we are finished with the current shard. If so, update the - * database, marking the shard as finished. - * - * @param wa wire account to commit for - * @return true if we were indeed done with the shard - */ -static bool -check_shard_done (struct WireAccount *wa) -{ - enum GNUNET_DB_QueryStatus qs; - - if (wa->shard_end > wa->latest_row_off) + if ( (! progress) && test_mode) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Shard %s (%llu,%llu] at %llu\n", - wa->job_name, - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, - (unsigned long long) wa->latest_row_off); - return false; /* actually, not done! */ - } - /* shard is complete, mark this as well */ - qs = db_plugin->complete_shard (db_plugin->cls, - wa->job_name, - wa->shard_start, - wa->shard_end); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - GNUNET_SCHEDULER_shutdown (); - return false; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for complete_shard. Rolling back.\n"); - handle_soft_error (wa); - return false; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_break (0); - /* Not expected, but let's just continue */ - break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Completed shard %s (%llu,%llu] after %s\n", - wa->job_name, - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, - GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_duration (wa->shard_start_time), - GNUNET_YES)); - break; - } - return true; -} - - -/** - * We are finished with the current transaction, try - * to commit and then schedule the next iteration. - * - * @param wa wire account to commit for - */ -static void -do_commit (struct WireAccount *wa) -{ - enum GNUNET_DB_QueryStatus qs; - bool shard_done; - - GNUNET_assert (NULL == task); - shard_done = check_shard_done (wa); - wa->started_transaction = false; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Committing %s progress (%llu,%llu] at %llu\n (%s)", - wa->job_name, - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, - (unsigned long long) wa->latest_row_off, - shard_done - ? "shard done" - : "shard incomplete"); - qs = db_plugin->commit (db_plugin->cls); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); + /* Transaction list was drained and we are in + test mode. So we are done. */ GNUNET_SCHEDULER_shutdown (); return; - case GNUNET_DB_STATUS_SOFT_ERROR: - /* reduce transaction size to reduce rollback probability */ - handle_soft_error (wa); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ - break; - } - if (shard_done) - { - wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); - wa->shard_open = false; - account_completed (wa); - } - else - { - task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); } + GNUNET_assert (NULL == task); + schedule_transfers (); } @@ -609,18 +455,24 @@ do_commit (struct WireAccount *wa) * We got incoming transaction details from the bank. Add them * to the database. * - * @param wa wire account we are handling * @param details array of transaction details * @param details_length length of the @a details array - * @return true on success */ -static bool -process_reply (struct WireAccount *wa, - const struct TALER_BANK_CreditDetails *details, +static void +process_reply (const struct TALER_BANK_CreditDetails *details, unsigned int details_length) { - uint64_t lroff = wa->latest_row_off; + enum GNUNET_DB_QueryStatus qs; + bool shard_done; + uint64_t lroff = latest_row_off; + if (0 == details_length) + { + /* Server should have used 204, not 200! */ + GNUNET_break_op (0); + transaction_completed (); + return; + } /* check serial IDs for range constraints */ for (unsigned int i = 0; irollback (db_plugin->cls); GNUNET_SCHEDULER_shutdown (); - wa->hh = NULL; - return false; - } - if (cd->serial_id >= wa->max_row_off) - { - /* We got 'limit' transactions back from the bank, so we should not - introduce any delay before the next call. */ - wa->delay = false; + return; } - if (cd->serial_id > wa->shard_end) + if (cd->serial_id > shard_end) { /* we are *past* the current shard (likely because the serial_id of the shard_end happens to not exist in the DB). So commit and stop this @@ -651,19 +496,14 @@ process_reply (struct WireAccount *wa, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serial ID %llu past shard end at %llu, ending iteration early!\n", (unsigned long long) cd->serial_id, - (unsigned long long) wa->shard_end); + (unsigned long long) shard_end); details_length = i; - wa->delay = false; + progress = true; + lroff = cd->serial_id - 1; break; } lroff = cd->serial_id; } - if (0 == details_length) - { - /* Server should have used 204, not 200! */ - GNUNET_break_op (0); - return true; - } if (GNUNET_OK != db_plugin->start_read_committed (db_plugin->cls, "wirewatch check for incoming wire transfers")) @@ -672,15 +512,13 @@ process_reply (struct WireAccount *wa, "Failed to start database transaction!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); - wa->hh = NULL; - return false; + return; } - wa->started_transaction = true; + started_transaction = true; for (unsigned int i = 0; iamount, cd->execution_date, cd->debit_account_uri, - wa->ai->section_name, + ai->section_name, cd->serial_id); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - wa->started_transaction = false; GNUNET_SCHEDULER_shutdown (); - wa->hh = NULL; - return false; + return; case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got DB soft error for reserves_in_insert. Rolling back.\n"); - handle_soft_error (wa); - wa->hh = NULL; - return true; + handle_soft_error (); + return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* Either wirewatch was freshly started after the system was shutdown and we're going over an incomplete shard again @@ -720,25 +554,92 @@ process_reply (struct WireAccount *wa, "Attempted to import transaction %llu (%s) twice. " "This should happen rarely (if not, ask for support).\n", (unsigned long long) cd->serial_id, - wa->job_name); + job_name); db_plugin->rollback (db_plugin->cls); - wa->latest_row_off = cd->serial_id; - wa->started_transaction = false; + started_transaction = false; /* already existed, ok, let's just continue */ - return true; + return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - wa->latest_row_off = cd->serial_id; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Imported transaction %llu.", + (unsigned long long) cd->serial_id); /* normal case */ break; } } - do_commit (wa); - if (check_shard_done (wa)) - account_completed (wa); - else - task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); - return true; + latest_row_off = lroff; + shard_done = (shard_end <= latest_row_off); + if (shard_done) + { + /* shard is complete, mark this as well */ + qs = db_plugin->complete_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + /* Not expected, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard %s (%llu,%llu] after %s\n", + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_duration (shard_start_time), + true)); + break; + } + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Committing %s progress (%llu,%llu] at %llu\n (%s)", + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + (unsigned long long) latest_row_off, + shard_done + ? "shard done" + : "shard incomplete"); + qs = db_plugin->commit (db_plugin->cls); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* reduce transaction size to reduce rollback probability */ + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + started_transaction = false; + /* normal case */ + break; + } + if (shard_done) + { + shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); + shard_open = false; + transaction_completed (); + return; + } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + NULL); } @@ -746,76 +647,75 @@ process_reply (struct WireAccount *wa, * Callbacks of this type are used to serve the result of asking * the bank for the transaction history. * - * @param cls closure with the `struct WireAccount *` we are processing + * @param cls NULL * @param reply response we got from the bank */ static void history_cb (void *cls, const struct TALER_BANK_CreditHistoryResponse *reply) { - struct WireAccount *wa = cls; - bool ok; - + (void) cls; GNUNET_assert (NULL == task); - wa->hh = NULL; + hh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "History request returned with HTTP status %u\n", + reply->http_status); switch (reply->http_status) { - case 0: - ok = false; case MHD_HTTP_OK: - ok = process_reply (wa, - reply->details.success.details, - reply->details.success.details_length); - break; + process_reply (reply->details.success.details, + reply->details.success.details_length); + return; case MHD_HTTP_NO_CONTENT: - ok = true; - break; + transaction_completed (); + return; case MHD_HTTP_NOT_FOUND: - ok = ignore_account_404; + if (ignore_account_404) + { + transaction_completed (); + return; + } break; default: - ok = false; break; } - - if (! ok) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching history: %s (%u)\n", + TALER_ErrorCode_get_hint (reply->ec), + reply->http_status); + if (! exit_on_error) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching history: %s (%u)\n", - TALER_ErrorCode_get_hint (reply->ec), - reply->http_status); - if (! (exit_on_error || test_mode) ) - { - account_completed (wa); - return; - } - GNUNET_SCHEDULER_shutdown (); + transaction_completed (); return; } + GNUNET_SCHEDULER_shutdown (); } static void continue_with_shard (void *cls) { - struct WireAccount *wa = cls; unsigned int limit; + (void) cls; task = NULL; - limit = GNUNET_MIN (wa->batch_size, - wa->shard_end - wa->latest_row_off); - wa->max_row_off = wa->latest_row_off + limit; - GNUNET_assert (NULL == wa->hh); - wa->hh = TALER_BANK_credit_history (ctx, - wa->ai->auth, - wa->latest_row_off, - limit, - test_mode - ? GNUNET_TIME_UNIT_ZERO - : LONGPOLL_TIMEOUT, - &history_cb, - wa); - if (NULL == wa->hh) + GNUNET_assert (shard_end > latest_row_off); + limit = GNUNET_MIN (batch_size, + shard_end - latest_row_off); + GNUNET_assert (NULL == hh); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Requesting credit history staring from %llu\n", + (unsigned long long) latest_row_off); + hh = TALER_BANK_credit_history (ctx, + ai->auth, + latest_row_off, + limit, + test_mode + ? GNUNET_TIME_UNIT_ZERO + : LONGPOLL_TIMEOUT, + &history_cb, + NULL); + if (NULL == hh) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start request for account history!\n"); @@ -829,12 +729,12 @@ continue_with_shard (void *cls) static void lock_shard (void *cls) { - struct WireAccount *wa = cls; enum GNUNET_DB_QueryStatus qs; struct GNUNET_TIME_Relative delay; - uint64_t last_shard_start = wa->shard_start; - uint64_t last_shard_end = wa->shard_end; + uint64_t last_shard_start = shard_start; + uint64_t last_shard_end = shard_end; + (void) cls; task = NULL; if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) @@ -845,17 +745,16 @@ lock_shard (void *cls) GNUNET_SCHEDULER_shutdown (); return; } - if ( (wa->shard_open) && - (GNUNET_TIME_absolute_is_future (wa->shard_end_time)) ) + if ( (shard_open) && + (GNUNET_TIME_absolute_is_future (shard_end_time)) ) { - wa->delay = true; /* default is to delay, unless - we find out that we're really busy */ - wa->batch_start = wa->latest_row_off; + progress = false; + batch_start = latest_row_off; task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); + NULL); return; } - if (wa->shard_open) + if (shard_open) GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Shard not completed in time, will try to re-acquire\n"); /* How long we lock a shard depends on the number of @@ -868,15 +767,15 @@ lock_shard (void *cls) GNUNET_CRYPTO_QUALITY_WEAK, 4 * GNUNET_TIME_relative_max ( wirewatch_idle_sleep_interval, - GNUNET_TIME_relative_multiply (wa->shard_delay, + GNUNET_TIME_relative_multiply (shard_delay, max_workers)).rel_value_us); - wa->shard_start_time = GNUNET_TIME_absolute_get (); + shard_start_time = GNUNET_TIME_absolute_get (); qs = db_plugin->begin_shard (db_plugin->cls, - wa->job_name, + job_name, delay, shard_size, - &wa->shard_start, - &wa->shard_end); + &shard_start, + &shard_end); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -893,52 +792,51 @@ lock_shard (void *cls) rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Serialization error tying to obtain shard %s, will try again in %s!\n", - wa->job_name, + job_name, GNUNET_STRINGS_relative_time_to_string (rdelay, - GNUNET_YES)); - wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); + true)); + delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); } GNUNET_assert (NULL == task); - schedule_transfers (wa->next); + schedule_transfers (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "No shard available, will try again for %s in %s!\n", - wa->job_name, + job_name, GNUNET_STRINGS_relative_time_to_string ( wirewatch_idle_sleep_interval, GNUNET_YES)); - wa->delayed_until = GNUNET_TIME_relative_to_absolute ( + delayed_until = GNUNET_TIME_relative_to_absolute ( wirewatch_idle_sleep_interval); - wa->shard_open = false; + shard_open = false; GNUNET_assert (NULL == task); - schedule_transfers (wa->next); + schedule_transfers (); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* continued below */ break; } - wa->shard_end_time = GNUNET_TIME_relative_to_absolute (delay); + shard_end_time = GNUNET_TIME_relative_to_absolute (delay); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting with shard %s at (%llu,%llu] locked for %s\n", - wa->job_name, - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES)); - wa->delay = true; /* default is to delay, unless - we find out that we're really busy */ - wa->batch_start = wa->shard_start; - if ( (wa->shard_open) && - (wa->shard_start == last_shard_start) && - (wa->shard_end == last_shard_end) ) - GNUNET_break (wa->latest_row_off >= wa->batch_start); /* resume where we left things */ + true)); + progress = false; + batch_start = shard_start; + if ( (shard_open) && + (shard_start == last_shard_start) && + (shard_end == last_shard_end) ) + GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */ else - wa->latest_row_off = wa->batch_start; - wa->shard_open = true; + latest_row_off = batch_start; + shard_open = true; task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); + NULL); } @@ -961,14 +859,15 @@ run (void *cls, (void) cfgfile; cfg = c; + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + cls); if (GNUNET_OK != exchange_serve_process_config ()) { global_ret = EXIT_NOTCONFIGURED; + GNUNET_SCHEDULER_shutdown (); return; } - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - cls); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); if (NULL == ctx) @@ -978,9 +877,7 @@ run (void *cls, return; } rc = GNUNET_CURL_gnunet_rc_create (ctx); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&lock_shard, - wa_head); + schedule_transfers (); } @@ -996,6 +893,11 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_string ('a', + "account", + "SECTION_NAME", + "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)", + &account_section), GNUNET_GETOPT_option_flag ('e', "exit-on-error", "terminate wirewatch if we failed to download information from the bank", -- cgit v1.2.3