From ae8d481e1ce9f694a42619809d2c9b6e6acf3497 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 5 Sep 2021 15:25:46 +0200 Subject: implement taler-exchange-transfer DB sharding logic --- src/benchmark/benchmark.conf | 2 +- src/exchange/exchange.conf | 11 +- src/exchange/taler-exchange-aggregator.c | 17 +- src/exchange/taler-exchange-closer.c | 16 +- src/exchange/taler-exchange-transfer.c | 405 ++++++++++++++++++++++------ src/exchange/taler-exchange-wirewatch.c | 1 + src/exchangedb/plugin_exchangedb_postgres.c | 123 +++++++-- src/exchangedb/test_exchangedb.c | 6 + src/include/taler_exchangedb_plugin.h | 8 +- src/testing/testing_api_cmd_exec_transfer.c | 3 + 10 files changed, 468 insertions(+), 124 deletions(-) diff --git a/src/benchmark/benchmark.conf b/src/benchmark/benchmark.conf index 844106cf5..c38981dd6 100644 --- a/src/benchmark/benchmark.conf +++ b/src/benchmark/benchmark.conf @@ -24,7 +24,7 @@ DB = postgres # exchange (or the twister) is actually listening. BASE_URL = "http://localhost:8081/" -AGGREGATOR_SHARD_SIZE = 268435456 +AGGREGATOR_SHARD_SIZE = 67108864 #AGGREGATOR_SHARD_SIZE = 2147483648 diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf index 68c1556d7..4b7f5f5a8 100644 --- a/src/exchange/exchange.conf +++ b/src/exchange/exchange.conf @@ -41,10 +41,17 @@ PORT = 8081 BASE_URL = http://localhost:8081/ -# How long should the aggregator (and closer, and transfer) -# sleep if it has nothing to do? +# How long should the aggregator sleep if it has nothing to do? AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s +# How long should the transfer tool +# sleep if it has nothing to do? +TRANSFER_IDLE_SLEEP_INTERVAL = 60 s + +# How long should the closer tool +# sleep if it has nothing to do? +CLOSER_IDLE_SLEEP_INTERVAL = 60 s + # Values of 0 or above 2^31 disable sharding, which # is a sane default for most use-cases. # When changing this value, you MUST stop all diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 893fa79f9..caa4528db 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -1034,9 +1034,22 @@ run_shard (void *cls) &s->shard_end); if (0 >= qs) { + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + static struct GNUNET_TIME_Relative delay; + + GNUNET_free (s); + delay = GNUNET_TIME_randomized_backoff (delay, + GNUNET_TIME_UNIT_SECONDS); + task = GNUNET_SCHEDULER_add_delayed (delay, + &run_shard, + NULL); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to begin shard!\n"); - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); + "Failed to begin shard (%d)!\n", + qs); + GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; diff --git a/src/exchange/taler-exchange-closer.c b/src/exchange/taler-exchange-closer.c index 926c93554..19cc06c70 100644 --- a/src/exchange/taler-exchange-closer.c +++ b/src/exchange/taler-exchange-closer.c @@ -60,7 +60,7 @@ static struct GNUNET_SCHEDULER_Task *task; /** * How long should we sleep when idle before trying to find more work? */ -static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; +static struct GNUNET_TIME_Relative closer_idle_sleep_interval; /** * Value to return from main(). 0 on success, non-zero @@ -112,8 +112,8 @@ shutdown_task (void *cls) * * @return #GNUNET_OK on success */ -static int -parse_wirewatch_config (void) +static enum GNUNET_GenericReturnValue +parse_closer_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, @@ -129,12 +129,12 @@ parse_wirewatch_config (void) if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "exchange", - "AGGREGATOR_IDLE_SLEEP_INTERVAL", - &aggregator_idle_sleep_interval)) + "CLOSER_IDLE_SLEEP_INTERVAL", + &closer_idle_sleep_interval)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "exchange", - "AGGREGATOR_IDLE_SLEEP_INTERVAL"); + "CLOSER_IDLE_SLEEP_INTERVAL"); return GNUNET_SYSERR; } if ( (GNUNET_OK != @@ -444,7 +444,7 @@ run_reserve_closures (void *cls) } else { - task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, + task = GNUNET_SCHEDULER_add_delayed (closer_idle_sleep_interval, &run_reserve_closures, NULL); } @@ -480,7 +480,7 @@ run (void *cls, (void) cfgfile; cfg = c; - if (GNUNET_OK != parse_wirewatch_config ()) + if (GNUNET_OK != parse_closer_config ()) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; diff --git a/src/exchange/taler-exchange-transfer.c b/src/exchange/taler-exchange-transfer.c index d6d44eb05..b93d14600 100644 --- a/src/exchange/taler-exchange-transfer.c +++ b/src/exchange/taler-exchange-transfer.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016-2020 Taler Systems SA + Copyright (C) 2016-2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -27,6 +27,46 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" +/** + * What is the maximum batch size we use for credit history + * requests with the bank. See `batch_size` below. + */ +#define MAXIMUM_BATCH_SIZE 1024 + + +/** + * Information about our work shard. + */ +struct Shard +{ + + /** + * Time when we started to work on this shard. + */ + struct GNUNET_TIME_Absolute shard_start_time; + + /** + * Offset the shard begins at. + */ + uint64_t shard_start; + + /** + * Exclusive offset where the shard ends. + */ + uint64_t shard_end; + + /** + * Offset where our current batch begins. + */ + uint64_t batch_start; + + /** + * Highest row processed in the current batch. + */ + uint64_t batch_end; + +}; + /** * Data we keep to #run_transfers(). There is at most @@ -37,6 +77,18 @@ struct WirePrepareData { + /** + * All transfers done in the same transaction + * are kept in a DLL. + */ + struct WirePrepareData *next; + + /** + * All transfers done in the same transaction + * are kept in a DLL. + */ + struct WirePrepareData *prev; + /** * Wire execution handle. */ @@ -71,10 +123,21 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct GNUNET_SCHEDULER_Task *task; /** - * If we are currently executing a transfer, information about - * the active transfer is here. Otherwise, this variable is NULL. + * If we are currently executing transfers, information about + * the active transfers is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd_head; + +/** + * If we are currently executing transfers, information about + * the active transfers is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd_tail; + +/** + * Information about our work shard. */ -static struct WirePrepareData *wpd; +static struct Shard *shard; /** * Handle to the context for interacting with the bank / wire gateway. @@ -86,11 +149,6 @@ static struct GNUNET_CURL_Context *ctx; */ static struct GNUNET_CURL_RescheduleContext *rc; -/** - * How long should we sleep when idle before trying to find more work? - */ -static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; - /** * Value to return from main(). 0 on success, non-zero on errors. */ @@ -101,6 +159,54 @@ static int global_ret; */ static int test_mode; +/** + * How long should we sleep when idle before trying to find more work? + * Also used for how long we wait to grab a shard before trying it again. + * The value should be set to a bit above the average time it takes to + * process a shard. + */ +static struct GNUNET_TIME_Relative transfer_idle_sleep_interval; + +/** + * How long did we take to finish the last shard? + */ +static struct GNUNET_TIME_Relative shard_delay; + +/** + * Modulus to apply to group shards. The shard size must ultimately be a + * multiple of the batch size. Thus, if this is not a multiple of the + * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size. + */ +static unsigned int shard_size = MAXIMUM_BATCH_SIZE; + +/** + * How many workers should we plan our scheduling with? + */ +static unsigned int max_workers = 16; + + +/** + * Clean up all active bank interactions. + */ +static void +cleanup_wpd (void) +{ + struct WirePrepareData *wpd; + + while (NULL != (wpd = wpd_head)) + { + GNUNET_CONTAINER_DLL_remove (wpd_head, + wpd_tail, + wpd); + if (NULL != wpd->eh) + { + TALER_BANK_transfer_cancel (wpd->eh); + wpd->eh = NULL; + } + GNUNET_free (wpd); + } +} + /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. @@ -128,17 +234,9 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } - if (NULL != wpd) - { - if (NULL != wpd->eh) - { - TALER_BANK_transfer_cancel (wpd->eh); - wpd->eh = NULL; - } - db_plugin->rollback (db_plugin->cls); - GNUNET_free (wpd); - wpd = NULL; - } + cleanup_wpd (); + GNUNET_free (shard); + db_plugin->rollback (db_plugin->cls); /* just in case */ TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); @@ -151,18 +249,18 @@ shutdown_task (void *cls) * * @return #GNUNET_OK on success */ -static int -parse_wirewatch_config (void) +static enum GNUNET_GenericReturnValue +parse_transfer_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_time (cfg, "exchange", - "AGGREGATOR_IDLE_SLEEP_INTERVAL", - &aggregator_idle_sleep_interval)) + "TRANSFER_IDLE_SLEEP_INTERVAL", + &transfer_idle_sleep_interval)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "exchange", - "AGGREGATOR_IDLE_SLEEP_INTERVAL"); + "TRANSFER_IDLE_SLEEP_INTERVAL"); return GNUNET_SYSERR; } if (NULL == @@ -218,13 +316,22 @@ static void run_transfers (void *cls); +/** + * Select shard to process. + * + * @param cls NULL + */ +static void +select_shard (void *cls); + + /** * Function called with the result from the execute step. * On success, we mark the respective wire transfer as finished, * and in general we afterwards continue to #run_transfers(), * except for irrecoverable errors. * - * @param cls NULL + * @param cls `struct WirePrepareData` we are working on * @param http_status_code #MHD_HTTP_OK on success * @param ec taler error code * @param row_id unique ID of the wire transfer in the bank's records @@ -237,15 +344,18 @@ wire_confirm_cb (void *cls, uint64_t row_id, struct GNUNET_TIME_Absolute wire_timestamp) { + struct WirePrepareData *wpd = cls; enum GNUNET_DB_QueryStatus qs; - (void) cls; (void) row_id; (void) wire_timestamp; wpd->eh = NULL; switch (http_status_code) { case MHD_HTTP_OK: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Wire transfer %llu completed successfully\n", + (unsigned long long) wpd->row_id); qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, wpd->row_id); /* continued below */ @@ -262,38 +372,43 @@ wire_confirm_cb (void *cls, break; default: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Wire transaction failed: %u/%d\n", + "Wire transfer %llu failed: %u/%d\n", + (unsigned long long) wpd->row_id, http_status_code, ec); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; return; } - if (0 >= qs) + shard->batch_end = GNUNET_MAX (wpd->row_id, + shard->batch_end); + switch (qs) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + case GNUNET_DB_STATUS_SOFT_ERROR: db_plugin->rollback (db_plugin->cls); - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - } - else - { - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - } - GNUNET_free (wpd); - wpd = NULL; + cleanup_wpd (); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); return; + case GNUNET_DB_STATUS_HARD_ERROR: + db_plugin->rollback (db_plugin->cls); + cleanup_wpd (); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_CONTAINER_DLL_remove (wpd_head, + wpd_tail, + wpd); + GNUNET_free (wpd); + break; } - GNUNET_free (wpd); - wpd = NULL; + if (NULL != wpd_head) + return; /* wait for other queries to complete */ + /* batch done */ switch (commit_or_warn ()) { case GNUNET_DB_STATUS_SOFT_ERROR: @@ -308,8 +423,9 @@ wire_confirm_cb (void *cls, GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + shard->batch_start = shard->batch_end + 1; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Wire transfer complete\n"); + "Batch complete\n"); /* continue with #run_transfers(), just to guard against the unlikely case that there are more. */ GNUNET_assert (NULL == task); @@ -343,6 +459,7 @@ wire_prepare_cb (void *cls, size_t buf_size) { const struct TALER_EXCHANGEDB_AccountInfo *wa; + struct WirePrepareData *wpd; (void) cls; if ( (NULL == wire_method) || @@ -351,9 +468,14 @@ wire_prepare_cb (void *cls, GNUNET_break (0); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; - goto cleanup; + GNUNET_SCHEDULER_shutdown (); + return; } + wpd = GNUNET_new (struct WirePrepareData); wpd->row_id = rowid; + GNUNET_CONTAINER_DLL_insert (wpd_head, + wpd_tail, + wpd); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting wire transfer %llu\n", (unsigned long long) rowid); @@ -365,7 +487,8 @@ wire_prepare_cb (void *cls, GNUNET_break (0); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_NOTCONFIGURED; - goto cleanup; + GNUNET_SCHEDULER_shutdown (); + return; } wa = wpd->wa; wpd->eh = TALER_BANK_transfer (ctx, @@ -373,19 +496,15 @@ wire_prepare_cb (void *cls, buf, buf_size, &wire_confirm_cb, - NULL); + wpd); if (NULL == wpd->eh) { GNUNET_break (0); /* Irrecoverable */ db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; - goto cleanup; + GNUNET_SCHEDULER_shutdown (); + return; } - return; -cleanup: - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (wpd); - wpd = NULL; } @@ -399,23 +518,55 @@ static void run_transfers (void *cls) { enum GNUNET_DB_QueryStatus qs; + int64_t limit; (void) cls; task = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking for pending wire transfers\n"); - if (GNUNET_SYSERR == - db_plugin->preflight (db_plugin->cls)) + limit = shard->shard_end - shard->batch_start; + if (0 >= limit) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database connection!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shard [%llu,%llu) completed\n", + (unsigned long long) shard->shard_start, + (unsigned long long) shard->batch_end); + qs = db_plugin->complete_shard (db_plugin->cls, + "transfer", + shard->shard_start, + shard->batch_end + 1); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_free (shard); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + GNUNET_free (shard); + task = GNUNET_SCHEDULER_add_now (&select_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* already existed, ok, let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + break; + } + shard_delay = GNUNET_TIME_absolute_get_duration (shard->shard_start_time); + GNUNET_free (shard); + task = GNUNET_SCHEDULER_add_now (&select_shard, + NULL); return; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for %lld pending wire transfers [%llu-...)\n", + (long long) limit, + (unsigned long long) shard->batch_start); if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - "aggregator run transfer")) + db_plugin->start_read_committed (db_plugin->cls, + "aggregator run transfer")) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); @@ -423,30 +574,29 @@ run_transfers (void *cls) GNUNET_SCHEDULER_shutdown (); return; } - wpd = GNUNET_new (struct WirePrepareData); qs = db_plugin->wire_prepare_data_get (db_plugin->cls, + shard->batch_start, + limit, &wire_prepare_cb, NULL); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) - return; /* continued via continuation set in #wire_prepare_cb() */ - db_plugin->rollback (db_plugin->cls); - GNUNET_free (wpd); - wpd = NULL; switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: + db_plugin->rollback (db_plugin->cls); GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ + db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* no more prepared wire transfers, go sleep a bit! */ + db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); if (GNUNET_YES == test_mode) { @@ -458,15 +608,92 @@ run_transfers (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more pending wire transfers, going idle\n"); - task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, + task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval, &run_transfers, NULL); } return; + default: + /* continued in wire_prepare_cb() */ + return; + } +} + + +/** + * Select shard to process. + * + * @param cls NULL + */ +static void +select_shard (void *cls) +{ + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Relative delay; + uint64_t start; + uint64_t end; + + (void) cls; + task = NULL; + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (0 == max_workers) + delay = GNUNET_TIME_UNIT_ZERO; + else + delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( + GNUNET_CRYPTO_QUALITY_WEAK, + 4 * GNUNET_TIME_relative_max ( + transfer_idle_sleep_interval, + GNUNET_TIME_relative_multiply (shard_delay, + max_workers)).rel_value_us); + qs = db_plugin->begin_shard (db_plugin->cls, + "transfer", + delay, + shard_size, + &start, + &end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain starting point for montoring from database!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval, + &select_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval, + &select_shard, + NULL); + return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* should be impossible */ - GNUNET_assert (0); + /* continued below */ + break; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting with shard [%llu,%llu)\n", + (unsigned long long) start, + (unsigned long long) end); + shard = GNUNET_new (struct Shard); + shard->shard_start_time = GNUNET_TIME_absolute_get (); + shard->shard_start = start; + shard->shard_end = end; + shard->batch_start = start; + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); } @@ -489,7 +716,7 @@ run (void *cls, (void) cfgfile; cfg = c; - if (GNUNET_OK != parse_wirewatch_config ()) + if (GNUNET_OK != parse_transfer_config ()) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; @@ -503,9 +730,17 @@ run (void *cls, GNUNET_break (0); return; } - + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, + task = GNUNET_SCHEDULER_add_now (&select_shard, NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); @@ -524,12 +759,22 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_uint ('S', + "size", + "SIZE", + "Size to process per shard (default: 1024)", + &shard_size), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), + GNUNET_GETOPT_option_uint ('w', + "workers", + "COUNT", + "Plan work load with up to COUNT worker processes (default: 16)", + &max_workers), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index fb1fde31d..6e2cd1ee1 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -752,6 +752,7 @@ main (int argc, "COUNT", "Plan work load with up to COUNT worker processes (default: 16)", &max_workers), + GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index d66370a25..817c1a18c 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1188,11 +1188,12 @@ prepare_statements (struct PostgresClosure *pg) ",type" ",buf" " FROM prewire" - " WHERE finished=FALSE" + " WHERE prewire_uuid >= $1" + " AND finished=FALSE" " AND failed=FALSE" " ORDER BY prewire_uuid ASC" - " LIMIT 1;", - 0), + " LIMIT $2;", + 2), /* Used in #postgres_select_deposits_missing_wire */ GNUNET_PQ_make_prepare ("deposits_get_overdue", "SELECT" @@ -6984,52 +6985,116 @@ postgres_wire_prepare_data_mark_failed ( } +/** + * Closure for #prewire_cb(). + */ +struct PrewireContext +{ + /** + * Function to call on each result. + */ + TALER_EXCHANGEDB_WirePreparationIterator cb; + + /** + * Closure for @a cb. + */ + void *cb_cls; + + /** + * #GNUNET_OK if everything went fine. + */ + enum GNUNET_GenericReturnValue status; +}; + + +/** + * Invoke the callback for each result. + * + * @param cls a `struct MissingWireContext *` + * @param result SQL result + * @param num_results number of rows in @a result + */ +static void +prewire_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct PrewireContext *pc = cls; + + for (unsigned int i = 0; i < num_results; i++) + { + uint64_t prewire_uuid; + char *type; + void *buf = NULL; + size_t buf_size; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("prewire_uuid", + &prewire_uuid), + GNUNET_PQ_result_spec_string ("type", + &type), + GNUNET_PQ_result_spec_variable_size ("buf", + &buf, + &buf_size), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_OK != + GNUNET_PQ_extract_result (result, + rs, + i)) + { + GNUNET_break (0); + pc->status = GNUNET_SYSERR; + return; + } + pc->cb (pc->cb_cls, + prewire_uuid, + type, + buf, + buf_size); + GNUNET_PQ_cleanup_result (rs); + } +} + + /** * Function called to get an unfinished wire transfer * preparation data. Fetches at most one item. * * @param cls closure + * @param start_row offset to query table at + * @param limit maximum number of results to return * @param cb function to call for ONE unfinished item * @param cb_cls closure for @a cb * @return transaction status code */ static enum GNUNET_DB_QueryStatus postgres_wire_prepare_data_get (void *cls, + uint64_t start_row, + uint64_t limit, TALER_EXCHANGEDB_WirePreparationIterator cb, void *cb_cls) { struct PostgresClosure *pg = cls; - enum GNUNET_DB_QueryStatus qs; struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&start_row), + GNUNET_PQ_query_param_uint64 (&limit), GNUNET_PQ_query_param_end }; - uint64_t prewire_uuid; - char *type; - void *buf = NULL; - size_t buf_size; - struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_uint64 ("prewire_uuid", - &prewire_uuid), - GNUNET_PQ_result_spec_string ("type", - &type), - GNUNET_PQ_result_spec_variable_size ("buf", - &buf, - &buf_size), - GNUNET_PQ_result_spec_end + struct PrewireContext pc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK }; + enum GNUNET_DB_QueryStatus qs; - qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, - "wire_prepare_data_get", - params, - rs); - if (0 >= qs) - return qs; - cb (cb_cls, - prewire_uuid, - type, - buf, - buf_size); - GNUNET_PQ_cleanup_result (rs); + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + "wire_prepare_data_get", + params, + &prewire_cb, + &pc); + if (GNUNET_OK != pc.status) + return GNUNET_DB_STATUS_HARD_ERROR; return qs; } diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 8478fac0d..b332cd6d2 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -117,6 +117,8 @@ test_wire_prepare (void) { FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->wire_prepare_data_get (plugin->cls, + 0, + 1, &dead_prepare_cb, NULL)); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != @@ -126,10 +128,14 @@ test_wire_prepare (void) 11)); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != plugin->wire_prepare_data_get (plugin->cls, + 0, + 1, &mark_prepare_cb, NULL)); FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->wire_prepare_data_get (plugin->cls, + 0, + 1, &dead_prepare_cb, NULL)); return GNUNET_OK; diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 163b886cc..4037ebac0 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -2964,15 +2964,19 @@ struct TALER_EXCHANGEDB_Plugin /** * Function called to get an unfinished wire transfer - * preparation data. Fetches at most one item. + * preparation data. * * @param cls closure - * @param cb function to call for ONE unfinished item + * @param start_row offset to query table at + * @param limit maximum number of results to return + * @param cb function to call for unfinished work * @param cb_cls closure for @a cb * @return transaction status code */ enum GNUNET_DB_QueryStatus (*wire_prepare_data_get)(void *cls, + uint64_t start_row, + uint64_t limit, TALER_EXCHANGEDB_WirePreparationIterator cb, void *cb_cls); diff --git a/src/testing/testing_api_cmd_exec_transfer.c b/src/testing/testing_api_cmd_exec_transfer.c index 2db445bec..796f32d07 100644 --- a/src/testing/testing_api_cmd_exec_transfer.c +++ b/src/testing/testing_api_cmd_exec_transfer.c @@ -66,6 +66,9 @@ transfer_run (void *cls, "taler-exchange-transfer", "taler-exchange-transfer", "-c", as->config_filename, + "-L", "INFO", + "-S", "1", + "-w", "0", "-t", /* exit when done */ NULL); if (NULL == as->transfer_proc) -- cgit v1.2.3