diff options
author | Christian Grothoff <christian@grothoff.org> | 2022-05-22 13:48:56 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2022-05-22 13:48:56 +0200 |
commit | 21bcc5fa0bb4e2c101fc71d5740934d5914eb480 (patch) | |
tree | 73c83681219894b607b97b8133331ecd0462e04f | |
parent | 3233195d2d6c4733e6c98e754c54902f9c6d657c (diff) | |
download | exchange-21bcc5fa0bb4e2c101fc71d5740934d5914eb480.tar.xz |
-fix wirewatch assertion
-rw-r--r-- | src/benchmark/taler-bank-benchmark.c | 125 | ||||
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 40 | ||||
-rw-r--r-- | src/testing/testing_api_loop.c | 5 |
3 files changed, 124 insertions, 46 deletions
diff --git a/src/benchmark/taler-bank-benchmark.c b/src/benchmark/taler-bank-benchmark.c index 4d7dbe35e..75a7434d2 100644 --- a/src/benchmark/taler-bank-benchmark.c +++ b/src/benchmark/taler-bank-benchmark.c @@ -111,9 +111,9 @@ static char *cfg_filename; static int use_fakebank = 1; /** - * Launch taler-exchange-wirewatch. + * Number of taler-exchange-wirewatchers to launch. */ -static int start_wirewatch; +static unsigned int start_wirewatch; /** * Verbosity level. @@ -265,8 +265,9 @@ run (void *cls, (void) cls; len = howmany_reserves + 2; - all_commands = GNUNET_new_array (len, - struct TALER_TESTING_Command); + all_commands = GNUNET_malloc_large (len + * sizeof (struct TALER_TESTING_Command)); + GNUNET_assert (NULL != all_commands); GNUNET_asprintf (&total_reserve_amount, "%s:5", currency); @@ -465,14 +466,17 @@ launch_fakebank (void *cls) * * @return #GNUNET_OK on success */ -static int +static enum GNUNET_GenericReturnValue parallel_benchmark (void) { enum GNUNET_GenericReturnValue result = GNUNET_OK; pid_t fakebank = -1; struct GNUNET_OS_Process *bankd = NULL; - struct GNUNET_OS_Process *wirewatch = NULL; + struct GNUNET_OS_Process *wirewatch[GNUNET_NZL (start_wirewatch)]; + memset (wirewatch, + 0, + sizeof (wirewatch)); if ( (MODE_BANK == mode) || (MODE_BOTH == mode) ) { @@ -560,19 +564,30 @@ parallel_benchmark (void) GNUNET_OS_process_wait (dbinit)); GNUNET_OS_process_destroy (dbinit); } - if (start_wirewatch) + /* start exchange wirewatch */ + for (unsigned int w = 0; w<start_wirewatch; w++) { - /* start exchange wirewatch */ - wirewatch = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, - NULL, NULL, NULL, - "taler-exchange-wirewatch", - "taler-exchange-wirewatch", - "-c", cfg_filename, - NULL); - if (NULL == wirewatch) + wirewatch[w] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, + NULL, NULL, NULL, + "taler-exchange-wirewatch", + "taler-exchange-wirewatch", + "-c", cfg_filename, + "-L", loglev, + NULL); + if (NULL == wirewatch[w]) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to launch wirewatch, aborting benchmark\n"); + for (unsigned int x = 0; x<w; x++) + { + GNUNET_break (0 == + GNUNET_OS_process_kill (wirewatch[x], + SIGTERM)); + GNUNET_break (GNUNET_OK == + GNUNET_OS_process_wait (wirewatch[x])); + GNUNET_OS_process_destroy (wirewatch[x]); + wirewatch[x] = NULL; + } if (-1 != fakebank) { int wstatus; @@ -618,17 +633,61 @@ parallel_benchmark (void) if ( (MODE_BANK == mode) || (MODE_BOTH == mode) ) { - if (NULL != wirewatch) + /* Ensure wirewatch runs to completion! */ + if (0 != start_wirewatch) { - /* stop wirewatch */ + /* replace ONE of the wirewatchers with one that is in test-mode */ GNUNET_break (0 == - GNUNET_OS_process_kill (wirewatch, + GNUNET_OS_process_kill (wirewatch[0], SIGTERM)); GNUNET_break (GNUNET_OK == - GNUNET_OS_process_wait (wirewatch)); - GNUNET_OS_process_destroy (wirewatch); - wirewatch = NULL; + GNUNET_OS_process_wait (wirewatch[0])); + GNUNET_OS_process_destroy (wirewatch[0]); + wirewatch[0] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, + NULL, NULL, NULL, + "taler-exchange-wirewatch", + "taler-exchange-wirewatch", + "-c", cfg_filename, + "-L", loglev, + "-t", + NULL); + /* wait for it to finish! */ + GNUNET_break (GNUNET_OK == + GNUNET_OS_process_wait (wirewatch[0])); + GNUNET_OS_process_destroy (wirewatch[0]); + wirewatch[0] = NULL; + /* Then stop the rest, which should basically also be finished */ + for (unsigned int w = 1; w<start_wirewatch; w++) + { + GNUNET_break (0 == + GNUNET_OS_process_kill (wirewatch[w], + SIGTERM)); + GNUNET_break (GNUNET_OK == + GNUNET_OS_process_wait (wirewatch[w])); + GNUNET_OS_process_destroy (wirewatch[w]); + } + + /* But be extra sure we did finish all shards by doing one more */ + wirewatch[0] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, + NULL, NULL, NULL, + "taler-exchange-wirewatch", + "taler-exchange-wirewatch", + "-c", cfg_filename, + "-L", loglev, + "-t", + NULL); + /* wait for it to finish! */ + GNUNET_break (GNUNET_OK == + GNUNET_OS_process_wait (wirewatch[0])); + GNUNET_OS_process_destroy (wirewatch[0]); + wirewatch[0] = NULL; } + + /* Now stop the time, if this was the right mode */ + if ( (GNUNET_YES != linger) && + (MODE_BANK != mode) ) + duration = GNUNET_TIME_absolute_get_duration (start_time); + /* stop fakebank */ if (-1 != fakebank) { @@ -727,9 +786,10 @@ main (int argc, &history_size), GNUNET_GETOPT_option_version (PACKAGE_VERSION " " VCS_VERSION), GNUNET_GETOPT_option_verbose (&verbose), - GNUNET_GETOPT_option_flag ('w', + GNUNET_GETOPT_option_uint ('w', "wirewatch", - "run taler-exchange-wirewatch", + "NPROC", + "run NPROC taler-exchange-wirewatch processes", &start_wirewatch), GNUNET_GETOPT_OPTION_END }; @@ -858,14 +918,17 @@ main (int argc, howmany_clients, GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_YES)); - tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU - / (duration.rel_value_us / 1000LL); - fprintf (stdout, - "RAW: %04u %04u %16llu (%llu TPS)\n", - howmany_reserves, - howmany_clients, - (unsigned long long) duration.rel_value_us, - tps); + if (! GNUNET_TIME_relative_is_zero (duration)) + { + tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU + / (duration.rel_value_us / 1000LL); + fprintf (stdout, + "RAW: %04u %04u %16llu (%llu TPS)\n", + howmany_reserves, + howmany_clients, + (unsigned long long) duration.rel_value_us, + tps); + } fprintf (stdout, "CPU time: sys %llu user %llu\n", \ (unsigned long long) (usage.ru_stime.tv_sec * 1000 * 1000 diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 21d2df150..7cc4ac382 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -384,10 +384,10 @@ handle_soft_error (struct WireAccount *wa) "Reduced batch size to %llu due to serialization issue\n", (unsigned long long) wa->batch_size); } - GNUNET_assert (NULL == task); /* Reset to beginning of transaction, and go again from there. */ wa->latest_row_off = wa->batch_start; + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&continue_with_shard, wa); } @@ -458,6 +458,7 @@ account_completed (struct WireAccount *wa) = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); wa = wa->next; } + GNUNET_assert (NULL == task); schedule_transfers (wa); } @@ -533,6 +534,7 @@ do_commit (struct WireAccount *wa) enum GNUNET_DB_QueryStatus qs; bool shard_done; + GNUNET_assert (NULL == task); shard_done = check_shard_done (wa); wa->started_transaction = false; GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -563,7 +565,8 @@ do_commit (struct WireAccount *wa) if (shard_done) account_completed (wa); else - continue_with_shard (wa); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + wa); } @@ -591,6 +594,7 @@ history_cb (void *cls, enum GNUNET_DB_QueryStatus qs; (void) json; + GNUNET_assert (NULL == task); if (NULL == details) { wa->hh = NULL; @@ -660,14 +664,17 @@ history_cb (void *cls, wa->hh = NULL; if (wa->started_transaction) { + GNUNET_assert (NULL == task); do_commit (wa); } else { + GNUNET_assert (NULL == task); if (check_shard_done (wa)) account_completed (wa); else - continue_with_shard (wa); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + wa); } return GNUNET_SYSERR; } @@ -746,6 +753,7 @@ continue_with_shard (void *cls) struct WireAccount *wa = cls; unsigned int limit; + task = NULL; limit = GNUNET_MIN (wa->batch_size, wa->shard_end - wa->latest_row_off); wa->max_row_off = wa->latest_row_off + limit; @@ -816,15 +824,18 @@ lock_shard (void *cls) return; case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Serialization error tying to obtain shard %s, will try again in %s!\n", - wa->job_name, - GNUNET_STRINGS_relative_time_to_string ( - wirewatch_idle_sleep_interval, - GNUNET_YES)); - wa->delayed_until = GNUNET_TIME_relative_to_absolute ( - wirewatch_idle_sleep_interval); + { + struct GNUNET_TIME_Relative rdelay; + + rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error tying to obtain shard %s, will try again in %s!\n", + wa->job_name, + GNUNET_STRINGS_relative_time_to_string (rdelay, + GNUNET_YES)); + wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); + } + GNUNET_assert (NULL == task); schedule_transfers (wa->next); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: @@ -837,6 +848,7 @@ lock_shard (void *cls) GNUNET_YES)); wa->delayed_until = GNUNET_TIME_relative_to_absolute ( wirewatch_idle_sleep_interval); + GNUNET_assert (NULL == task); schedule_transfers (wa->next); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: @@ -854,7 +866,8 @@ lock_shard (void *cls) we find out that we're really busy */ wa->batch_start = wa->shard_start; wa->latest_row_off = wa->batch_start; - continue_with_shard (wa); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + wa); } @@ -894,6 +907,7 @@ run (void *cls, return; } rc = GNUNET_CURL_gnunet_rc_create (ctx); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&lock_shard, wa_head); } diff --git a/src/testing/testing_api_loop.c b/src/testing/testing_api_loop.c index 1ea1d5a26..190e20928 100644 --- a/src/testing/testing_api_loop.c +++ b/src/testing/testing_api_loop.c @@ -449,8 +449,9 @@ TALER_TESTING_run2 (struct TALER_TESTING_Interpreter *is, /* get the number of commands */ for (i = 0; NULL != commands[i].label; i++) ; - is->commands = GNUNET_new_array (i + 1, - struct TALER_TESTING_Command); + is->commands = GNUNET_malloc_large ( (i + 1) + * sizeof (struct TALER_TESTING_Command)); + GNUNET_assert (NULL != is->commands); memcpy (is->commands, commands, sizeof (struct TALER_TESTING_Command) * i); |