diff options
author | Christian Grothoff <christian@grothoff.org> | 2023-07-30 16:45:50 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2023-07-30 16:45:50 +0200 |
commit | c8e485088b0139c56edd45ab20ae6d7cfc196dbc (patch) | |
tree | 446c52bdf0b64cf915c399b8917179af8ad23d3c | |
parent | f05a59d8cc788ad50df63d94636d3175ad630a1c (diff) |
remove wrap_size, obsolete
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 32 | ||||
-rw-r--r-- | src/exchangedb/perf_reserves_in_insert.c | 15 | ||||
-rw-r--r-- | src/exchangedb/pg_reserves_in_insert.c | 406 | ||||
-rw-r--r-- | src/exchangedb/pg_reserves_in_insert.h | 2 | ||||
-rw-r--r-- | src/exchangedb/test_exchangedb.c | 2 | ||||
-rw-r--r-- | src/include/taler_exchangedb_plugin.h | 1 |
6 files changed, 209 insertions, 249 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index c27b07000..cfcd74676 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -510,13 +510,11 @@ transaction_completed (void) * We got incoming transaction details from the bank. Add them * to the database. * - * @param wrap_size desired bulk insert size * @param details array of transaction details * @param details_length length of the @a details array */ static void -process_reply (unsigned int wrap_size, - const struct TALER_BANK_CreditDetails *details, +process_reply (const struct TALER_BANK_CreditDetails *details, unsigned int details_length) { enum GNUNET_DB_QueryStatus qs; @@ -585,7 +583,6 @@ process_reply (unsigned int wrap_size, qs = db_plugin->reserves_in_insert (db_plugin->cls, reserves, details_length, - wrap_size, qss); switch (qs) { @@ -595,8 +592,8 @@ process_reply (unsigned int wrap_size, return; case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for batch2_reserves_in_insert (%u). Rolling back.\n", - wrap_size); + "Got DB soft error for reserves_in_insert (%u). Rolling back.\n", + details_length); handle_soft_error (); return; default: @@ -701,27 +698,7 @@ static void history_cb (void *cls, const struct TALER_BANK_CreditHistoryResponse *reply) { - static int wrap_size = -2; - (void) cls; - if (-2 == wrap_size) - { - const char *mode = getenv ("TALER_WIREWATCH_WARP_SIZE"); - char dummy; - - if ( (NULL == mode) || - (1 != sscanf (mode, - "%d%c", - &wrap_size, - &dummy)) ) - { - if (NULL != mode) - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Bad batch mode `%s' specified\n", - mode); - wrap_size = 8; /* maximum supported is currently 8 */ - } - } GNUNET_assert (NULL == task); hh = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -730,8 +707,7 @@ history_cb (void *cls, switch (reply->http_status) { case MHD_HTTP_OK: - process_reply (wrap_size, - reply->details.ok.details, + process_reply (reply->details.ok.details, reply->details.ok.details_length); return; case MHD_HTTP_NO_CONTENT: diff --git a/src/exchangedb/perf_reserves_in_insert.c b/src/exchangedb/perf_reserves_in_insert.c index 9f3ed4604..09c4a43c5 100644 --- a/src/exchangedb/perf_reserves_in_insert.c +++ b/src/exchangedb/perf_reserves_in_insert.c @@ -78,8 +78,9 @@ run (void *cls) { struct GNUNET_CONFIGURATION_Handle *cfg = cls; const uint32_t num_partitions = 10; - static unsigned int batches[] = {1, 1, 2, 3, 4, 16, 32 }; - const unsigned int lcm = 3 * 32; + static unsigned int batches[] = {1, 1, 2, 3, 4, 16, 32, 64, 128, 256, 512, + 1024, 1024, 512, 256, 128, 64, 32, + 16, 4, 3, 2, 1 }; struct GNUNET_TIME_Relative times[sizeof (batches) / sizeof(*batches)]; unsigned long long sqrs[sizeof (batches) / sizeof(*batches)]; @@ -109,7 +110,7 @@ run (void *cls) i< sizeof(batches) / sizeof(*batches); i++) { - unsigned int batch_size = batches[i]; + unsigned int lcm = batches[i]; struct TALER_Amount value; struct GNUNET_TIME_Absolute now; struct GNUNET_TIME_Timestamp ts; @@ -141,7 +142,6 @@ run (void *cls) plugin->reserves_in_insert (plugin->cls, reserves, lcm, - batch_size, results)); } duration = GNUNET_TIME_absolute_get_duration (now); @@ -159,6 +159,7 @@ run (void *cls) i< sizeof(batches) / sizeof(*batches); i++) { + unsigned int lcm = batches[i]; struct GNUNET_TIME_Relative avg; double avg_dbl; double variance; @@ -168,10 +169,10 @@ run (void *cls) avg_dbl = avg.rel_value_us; variance = sqrs[i] - (avg_dbl * avg_dbl * ROUNDS); fprintf (stdout, - "Batch[%2u]: %8llu ± %6.0f\n", + "Batch[%4u]: %8llu us/entry ± %6.0f\n", batches[i], - (unsigned long long) avg.rel_value_us, - sqrt (variance / (ROUNDS - 1))); + (unsigned long long) avg.rel_value_us / lcm, + sqrt (variance / lcm / (ROUNDS - 1))); } result = 0; drop: diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c index d5cb8820d..1b8540456 100644 --- a/src/exchangedb/pg_reserves_in_insert.c +++ b/src/exchangedb/pg_reserves_in_insert.c @@ -134,251 +134,239 @@ helper_cb (void *cls, enum GNUNET_DB_QueryStatus TEH_PG_reserves_in_insert ( void *cls, - const struct TALER_EXCHANGEDB_ReserveInInfo *real_reserves, - unsigned int real_reserves_length, - unsigned int batch_size, + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, enum GNUNET_DB_QueryStatus *results) { struct PostgresClosure *pg = cls; unsigned int dups = 0; - batch_size = real_reserves_length; - enum GNUNET_DB_QueryStatus rqs = 0; - for (unsigned int batch = 0; batch < real_reserves_length; - batch += batch_size) + struct TALER_PaytoHashP h_paytos[GNUNET_NZL (reserves_length)]; + char *notify_s[GNUNET_NZL (reserves_length)]; + struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)]; + struct TALER_Amount balances[GNUNET_NZL (reserves_length)]; + struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)]; + const char *sender_account_details[GNUNET_NZL (reserves_length)]; + const char *exchange_account_names[GNUNET_NZL (reserves_length)]; + uint64_t wire_references[GNUNET_NZL (reserves_length)]; + uint64_t reserve_uuids[GNUNET_NZL (reserves_length)]; + bool transaction_duplicates[GNUNET_NZL (reserves_length)]; + bool conflicts[GNUNET_NZL (reserves_length)]; + struct GNUNET_TIME_Timestamp reserve_expiration + = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); + struct GNUNET_TIME_Timestamp gc + = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); + enum GNUNET_DB_QueryStatus qs; + bool need_update; + + for (unsigned int i = 0; i<reserves_length; i++) { - const struct TALER_EXCHANGEDB_ReserveInInfo *reserves = - &real_reserves[batch]; - unsigned int reserves_length = GNUNET_MIN (batch_size, - real_reserves_length - batch); - struct TALER_PaytoHashP h_paytos[GNUNET_NZL (reserves_length)]; - char *notify_s[GNUNET_NZL (reserves_length)]; - struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)]; - struct TALER_Amount balances[GNUNET_NZL (reserves_length)]; - struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)]; - const char *sender_account_details[GNUNET_NZL (reserves_length)]; - const char *exchange_account_names[GNUNET_NZL (reserves_length)]; - uint64_t wire_references[GNUNET_NZL (reserves_length)]; - uint64_t reserve_uuids[GNUNET_NZL (reserves_length)]; - bool transaction_duplicates[GNUNET_NZL (reserves_length)]; - bool conflicts[GNUNET_NZL (reserves_length)]; - struct GNUNET_TIME_Timestamp reserve_expiration - = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); - struct GNUNET_TIME_Timestamp gc - = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); - enum GNUNET_DB_QueryStatus qs; - bool need_update; + const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i]; - for (unsigned int i = 0; i<reserves_length; i++) - { - const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i]; + TALER_payto_hash (reserve->sender_account_details, + &h_paytos[i]); + notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); + reserve_pubs[i] = *reserve->reserve_pub; + balances[i] = *reserve->balance; + execution_times[i] = reserve->execution_time; + sender_account_details[i] = reserve->sender_account_details; + exchange_account_names[i] = reserve->exchange_account_name; + wire_references[i] = reserve->wire_reference; + } - TALER_payto_hash (reserve->sender_account_details, - &h_paytos[i]); - notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); - reserve_pubs[i] = *reserve->reserve_pub; - balances[i] = *reserve->balance; - execution_times[i] = reserve->execution_time; - sender_account_details[i] = reserve->sender_account_details; - exchange_account_names[i] = reserve->exchange_account_name; - wire_references[i] = reserve->wire_reference; - } + /* NOTE: kind-of pointless to explicitly start a transaction here... */ + if (GNUNET_OK != + TEH_PG_preflight (pg)) + { + GNUNET_break (0); + qs = GNUNET_DB_STATUS_HARD_ERROR; + goto finished; + } + if (GNUNET_OK != + TEH_PG_start_read_committed (pg, + "READ_COMMITED")) + { + GNUNET_break (0); + qs = GNUNET_DB_STATUS_HARD_ERROR; + goto finished; + } + PREPARE (pg, + "reserves_insert_with_array", + "SELECT" + " transaction_duplicate" + ",ruuid" + " FROM exchange_do_array_reserves_insert" + " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10);"); + { + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + GNUNET_PQ_query_param_array_auto_from_type (reserves_length, + reserve_pubs, + pg->conn), + GNUNET_PQ_query_param_array_uint64 (reserves_length, + wire_references, + pg->conn), + TALER_PQ_query_param_array_amount ( + reserves_length, + balances, + pg->conn), + GNUNET_PQ_query_param_array_ptrs_string ( + reserves_length, + (const char **) exchange_account_names, + pg->conn), + GNUNET_PQ_query_param_array_timestamp ( + reserves_length, + execution_times, + pg->conn), + GNUNET_PQ_query_param_array_auto_from_type ( + reserves_length, + h_paytos, + pg->conn), + GNUNET_PQ_query_param_array_ptrs_string ( + reserves_length, + (const char **) sender_account_details, + pg->conn), + GNUNET_PQ_query_param_array_ptrs_string ( + reserves_length, + (const char **) notify_s, + pg->conn), + GNUNET_PQ_query_param_end + }; + struct Context ctx = { + .reserve_uuids = reserve_uuids, + .transaction_duplicates = transaction_duplicates, + .conflicts = conflicts, + .needs_update = false, + .status = GNUNET_OK + }; - /* NOTE: kind-of pointless to explicitly start a transaction here... */ - if (GNUNET_OK != - TEH_PG_preflight (pg)) + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + "reserves_insert_with_array", + params, + &helper_cb, + &ctx); + if ( (qs < 0) || + (GNUNET_OK != ctx.status) ) { - GNUNET_break (0); - qs = GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to insert into reserves (%d)\n", + qs); goto finished; } - if (GNUNET_OK != - TEH_PG_start_read_committed (pg, - "READ_COMMITED")) + need_update = ctx.needs_update; + } + + { + enum GNUNET_DB_QueryStatus cs; + + cs = TEH_PG_commit (pg); + if (cs < 0) { - GNUNET_break (0); - qs = GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit\n"); + qs = cs; goto finished; } - PREPARE (pg, - "reserves_insert_with_array", - "SELECT" - " transaction_duplicate" - ",ruuid" - " FROM exchange_do_array_reserves_insert" - " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10);"); + } + + for (unsigned int i = 0; i<reserves_length; i++) + { + if (transaction_duplicates[i]) + dups++; + results[i] = transaction_duplicates[i] + ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS + : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + + if (! need_update) + { + qs = reserves_length; + goto finished; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reserve update needed for some reserves in the batch\n"); + PREPARE (pg, + "reserves_update", + "SELECT" + " out_duplicate AS duplicate " + "FROM exchange_do_batch_reserves_update" + " ($1,$2,$3,$4,$5,$6,$7);"); + + if (GNUNET_OK != + TEH_PG_start (pg, + "reserve-insert-continued")) + { + GNUNET_break (0); + qs = GNUNET_DB_STATUS_HARD_ERROR; + goto finished; + } + + for (unsigned int i = 0; i<reserves_length; i++) + { + if (transaction_duplicates[i]) + continue; + if (! conflicts[i]) + continue; { + bool duplicate; struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]), GNUNET_PQ_query_param_timestamp (&reserve_expiration), - GNUNET_PQ_query_param_array_auto_from_type (reserves_length, - reserve_pubs, - pg->conn), - GNUNET_PQ_query_param_array_uint64 (reserves_length, - wire_references, - pg->conn), - TALER_PQ_query_param_array_amount ( - reserves_length, - balances, - pg->conn), - GNUNET_PQ_query_param_array_ptrs_string ( - reserves_length, - (const char **) exchange_account_names, - pg->conn), - GNUNET_PQ_query_param_array_timestamp ( - reserves_length, - execution_times, - pg->conn), - GNUNET_PQ_query_param_array_auto_from_type ( - reserves_length, - h_paytos, - pg->conn), - GNUNET_PQ_query_param_array_ptrs_string ( - reserves_length, - (const char **) sender_account_details, - pg->conn), - GNUNET_PQ_query_param_array_ptrs_string ( - reserves_length, - (const char **) notify_s, - pg->conn), + GNUNET_PQ_query_param_uint64 (&wire_references[i]), + TALER_PQ_query_param_amount (pg->conn, + &balances[i]), + GNUNET_PQ_query_param_string (exchange_account_names[i]), + GNUNET_PQ_query_param_auto_from_type (&h_paytos[i]), + GNUNET_PQ_query_param_string (notify_s[i]), GNUNET_PQ_query_param_end }; - struct Context ctx = { - .reserve_uuids = reserve_uuids, - .transaction_duplicates = transaction_duplicates, - .conflicts = conflicts, - .needs_update = false, - .status = GNUNET_OK + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("duplicate", + &duplicate), + GNUNET_PQ_result_spec_end }; + enum GNUNET_DB_QueryStatus qs; - qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, - "reserves_insert_with_array", - params, - &helper_cb, - &ctx); - if ( (qs < 0) || - (GNUNET_OK != ctx.status) ) + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "reserves_update", + params, + rs); + if (qs < 0) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to insert into reserves (%d)\n", + "Failed to update reserves (%d)\n", qs); + results[i] = qs; goto finished; } - need_update = ctx.needs_update; - } - - { - enum GNUNET_DB_QueryStatus cs; - - cs = TEH_PG_commit (pg); - if (cs < 0) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to commit\n"); - qs = cs; - goto finished; - } - } - - for (unsigned int i = 0; i<reserves_length; i++) - { - if (transaction_duplicates[i]) - dups++; - results[i] = transaction_duplicates[i] - ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS - : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } - - if (! need_update) - { - qs = reserves_length; - goto finished; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Reserve update needed for some reserves in the batch\n"); - PREPARE (pg, - "reserves_update", - "SELECT" - " out_duplicate AS duplicate " - "FROM exchange_do_batch_reserves_update" - " ($1,$2,$3,$4,$5,$6,$7);"); - - if (GNUNET_OK != - TEH_PG_start (pg, - "reserve-insert-continued")) - { - GNUNET_break (0); - qs = GNUNET_DB_STATUS_HARD_ERROR; - goto finished; - } - - for (unsigned int i = 0; i<reserves_length; i++) - { - if (transaction_duplicates[i]) - continue; - if (! conflicts[i]) - continue; - { - bool duplicate; - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]), - GNUNET_PQ_query_param_timestamp (&reserve_expiration), - GNUNET_PQ_query_param_uint64 (&wire_references[i]), - TALER_PQ_query_param_amount (pg->conn, - &balances[i]), - GNUNET_PQ_query_param_string (exchange_account_names[i]), - GNUNET_PQ_query_param_auto_from_type (&h_paytos[i]), - GNUNET_PQ_query_param_string (notify_s[i]), - GNUNET_PQ_query_param_end - }; - struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_bool ("duplicate", - &duplicate), - GNUNET_PQ_result_spec_end - }; - enum GNUNET_DB_QueryStatus qs; - - qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, - "reserves_update", - params, - rs); - if (qs < 0) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to update reserves (%d)\n", - qs); - results[i] = qs; - goto finished; - } - results[i] = duplicate + results[i] = duplicate ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } } - { - enum GNUNET_DB_QueryStatus cs; + } + { + enum GNUNET_DB_QueryStatus cs; - cs = TEH_PG_commit (pg); - if (cs < 0) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to commit\n"); - qs = cs; - goto finished; - } + cs = TEH_PG_commit (pg); + if (cs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit\n"); + qs = cs; + goto finished; } -finished: - for (unsigned int i = 0; i<reserves_length; i++) - GNUNET_free (notify_s[i]); - if (qs < 0) - return qs; - rqs += qs; } +finished: + for (unsigned int i = 0; i<reserves_length; i++) + GNUNET_free (notify_s[i]); + if (qs < 0) + return qs; GNUNET_PQ_event_do_poll (pg->conn); if (0 != dups) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n", dups, - real_reserves_length); - return rqs; + reserves_length); + return qs; } diff --git a/src/exchangedb/pg_reserves_in_insert.h b/src/exchangedb/pg_reserves_in_insert.h index f92843e79..938df3adb 100644 --- a/src/exchangedb/pg_reserves_in_insert.h +++ b/src/exchangedb/pg_reserves_in_insert.h @@ -33,7 +33,6 @@ * @param cls the `struct PostgresClosure` with the plugin-specific state * @param reserves array of reserves to insert * @param reserves_length length of the @a reserves array - * @param batch_size how many inserts to do in one go * @param[out] results set to query status per reserve, must be of length @a reserves_length * @return transaction status code */ @@ -42,7 +41,6 @@ TEH_PG_reserves_in_insert ( void *cls, const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, unsigned int reserves_length, - unsigned int batch_size, enum GNUNET_DB_QueryStatus *results); diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 36f51120a..659f82694 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -1311,7 +1311,6 @@ run (void *cls) plugin->reserves_in_insert (plugin->cls, &reserve, 1, - 1, &qsr)); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qsr); @@ -1338,7 +1337,6 @@ run (void *cls) plugin->reserves_in_insert (plugin->cls, &reserve, 1, - 1, &qsr)); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qsr); diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index f5fdd7f11..e5107808b 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -3700,7 +3700,6 @@ struct TALER_EXCHANGEDB_Plugin void *cls, const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, unsigned int reserves_length, - unsigned int batch_size, enum GNUNET_DB_QueryStatus *results); |