diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bank-lib/bank_api_credit.c | 25 | ||||
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 85 |
2 files changed, 50 insertions, 60 deletions
diff --git a/src/bank-lib/bank_api_credit.c b/src/bank-lib/bank_api_credit.c index 578d86c90..290a64849 100644 --- a/src/bank-lib/bank_api_credit.c +++ b/src/bank-lib/bank_api_credit.c @@ -210,24 +210,6 @@ handle_credit_history_finished (void *cls, } -/** - * Request the credit history of the exchange's bank account. - * - * @param ctx curl context for the event loop - * @param auth authentication data to use - * @param start_row from which row on do we want to get results, - * use UINT64_MAX for the latest; exclusive - * @param num_results how many results do we want; - * negative numbers to go into the past, positive numbers - * to go into the future starting at @a start_row; - * must not be zero. - * @param hres_cb the callback to call with the transaction - * history - * @param hres_cb_cls closure for the above callback - * @return NULL if the inputs are invalid (i.e. zero value for - * @e num_results). In this case, the callback is not - * called. - */ struct TALER_BANK_CreditHistoryHandle * TALER_BANK_credit_history (struct GNUNET_CURL_Context *ctx, const struct TALER_BANK_AuthenticationData *auth, @@ -300,13 +282,6 @@ TALER_BANK_credit_history (struct GNUNET_CURL_Context *ctx, } -/** - * Cancel a history request. This function cannot be - * used on a request handle if a response is already - * served for it. - * - * @param hh the history request handle - */ void TALER_BANK_credit_history_cancel (struct TALER_BANK_CreditHistoryHandle *hh) { diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 40b962f8a..760dbe10b 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016--2020 Taler Systems SA + Copyright (C) 2016--2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -90,6 +90,11 @@ struct WireAccount uint64_t latest_row_off; /** + * Offset where our current shard ends. + */ + uint64_t shard_end; + + /** * How many transactions do we retrieve per batch? */ unsigned int batch_size; @@ -103,19 +108,14 @@ struct WireAccount * Are we running from scratch and should re-process all transactions * for this account? */ - int reset_mode; + bool reset_mode; /** * Should we delay the next request to the wire plugin a bit? Set to - * #GNUNET_NO if we actually did some work. + * false if we actually did some work. */ - int delay; + bool delay; - /** - * Did we experience a soft failure during the current - * transaction? - */ - bool soft_fail; }; @@ -161,6 +161,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; /** + * Modulus to apply to group shards. + */ +static unsigned int shard_size = 1024; + +/** * Value to return from main(). 0 on success, non-zero on * on serious errors. */ @@ -363,20 +368,10 @@ history_cb (void *cls, (unsigned int) ec, http_status); } - if (wa->soft_fail) - { - /* no point to commit, transaction was already rolled - back after we encountered a soft failure */ - wa->soft_fail = false; - qs = GNUNET_DB_STATUS_SOFT_ERROR; - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "End of list. Committing progress!\n"); - qs = db_plugin->commit (db_plugin->cls, - session); - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "End of list. Committing progress!\n"); + qs = db_plugin->commit (db_plugin->cls, + session); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_SCHEDULER_shutdown (); @@ -410,7 +405,7 @@ history_cb (void *cls, "Increasing batch size to %llu\n", (unsigned long long) wa->batch_size); } - if ( (GNUNET_YES == wa->delay) && + if ( (wa->delay) && (test_mode) && (NULL == wa->next) ) { @@ -419,7 +414,7 @@ history_cb (void *cls, GNUNET_SCHEDULER_shutdown (); return GNUNET_OK; } - if (GNUNET_YES == wa->delay) + if (wa->delay) { wa->delayed_until = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); @@ -477,6 +472,7 @@ history_cb (void *cls, db_plugin->rollback (db_plugin->cls, session); GNUNET_SCHEDULER_shutdown (); + wa->hh = NULL; return GNUNET_SYSERR; } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) @@ -485,10 +481,13 @@ history_cb (void *cls, "Got DB soft error for reserves_in_insert. Rolling back.\n"); db_plugin->rollback (db_plugin->cls, session); - wa->soft_fail = true; + wa->hh = NULL; + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&find_transfers, + NULL); return GNUNET_SYSERR; } - wa->delay = GNUNET_NO; + wa->delay = false; wa->latest_row_off = serial_id; return GNUNET_OK; } @@ -504,6 +503,7 @@ find_transfers (void *cls) { struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; + unsigned int limit; (void) cls; task = NULL; @@ -555,13 +555,21 @@ find_transfers (void *cls) } wa_pos->reset_mode = GNUNET_NO; } - wa_pos->delay = GNUNET_YES; + wa_pos->delay = true; wa_pos->current_batch_size = 0; /* reset counter */ wa_pos->session = session; + if (wa_pos->shard_end == wa_pos->last_row_off) + { + /* advance to next shard */ + wa_pos->shard_end += shard_size; + } + limit = GNUNET_MIN (wa_pos->batch_size, + wa_pos->shard_end - wa_pos->last_row_off); + GNUNET_assert (NULL == wa_pos->hh); wa_pos->hh = TALER_BANK_credit_history (ctx, &wa_pos->auth, wa_pos->last_row_off, - wa_pos->batch_size, + limit, &history_cb, wa_pos); if (NULL == wa_pos->hh) @@ -594,6 +602,7 @@ run (void *cls, (void) cls; (void) args; (void) cfgfile; + cfg = c; if (GNUNET_OK != exchange_serve_process_config ()) @@ -603,8 +612,6 @@ run (void *cls, } wa_pos = wa_head; GNUNET_assert (NULL != wa_pos); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, @@ -615,6 +622,9 @@ run (void *cls, GNUNET_break (0); return; } + + task = GNUNET_SCHEDULER_add_now (&find_transfers, + NULL); } @@ -630,16 +640,21 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_flag ('r', + "reset", + "start fresh with all transactions in the history", + &reset_mode), + GNUNET_GETOPT_option_uint ('S', + "size", + "SIZE", + "Size to process per shard (default: 1024)", + &shard_size), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), - GNUNET_GETOPT_option_flag ('r', - "reset", - "start fresh with all transactions in the history", - &reset_mode), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; |