From 7529ffb8b05bb63386630b3667d1e42db889f31a Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 19 Sep 2023 21:47:01 +0200 Subject: finish new reserve history logic --- src/exchangedb/pg_get_reserve_history.c | 497 ++++++++++++++++++++------------ 1 file changed, 314 insertions(+), 183 deletions(-) (limited to 'src/exchangedb/pg_get_reserve_history.c') diff --git a/src/exchangedb/pg_get_reserve_history.c b/src/exchangedb/pg_get_reserve_history.c index 9c7ff39de..1f1ca95b5 100644 --- a/src/exchangedb/pg_get_reserve_history.c +++ b/src/exchangedb/pg_get_reserve_history.c @@ -15,7 +15,7 @@ */ /** * @file pg_get_reserve_history.c - * @brief Low-level (statement-level) Postgres database access for the exchange + * @brief Obtain (parts of) the history of a reserve. * @author Christian Grothoff */ #include "platform.h" @@ -23,9 +23,20 @@ #include "taler_dbevents.h" #include "taler_pq_lib.h" #include "pg_get_reserve_history.h" +#include "pg_start_read_committed.h" +#include "pg_commit.h" +#include "pg_rollback.h" #include "plugin_exchangedb_common.h" #include "pg_helper.h" +/** + * How often do we re-try when encountering DB serialization issues? + * (We are read-only, so can only happen due to concurrent insert, + * which should be very rare.) + */ +#define RETRIES 3 + + /** * Closure for callbacks invoked via #TEH_PG_get_reserve_history(). */ @@ -63,10 +74,10 @@ struct ReserveHistoryContext struct TALER_Amount balance_out; /** - * Set to #GNUNET_SYSERR on serious internal errors during + * Set to true on serious internal errors during * the callbacks. */ - enum GNUNET_GenericReturnValue status; + bool failed; }; @@ -138,7 +149,7 @@ add_bank_to_exchange (void *cls, { GNUNET_break (0); GNUNET_free (bt); - rhc->status = GNUNET_SYSERR; + rhc->failed = true; return; } } @@ -199,7 +210,7 @@ add_withdraw_coin (void *cls, { GNUNET_break (0); GNUNET_free (cbc); - rhc->status = GNUNET_SYSERR; + rhc->failed = true; return; } } @@ -263,7 +274,7 @@ add_recoup (void *cls, { GNUNET_break (0); GNUNET_free (recoup); - rhc->status = GNUNET_SYSERR; + rhc->failed = true; return; } } @@ -323,7 +334,7 @@ add_exchange_to_bank (void *cls, { GNUNET_break (0); GNUNET_free (closing); - rhc->status = GNUNET_SYSERR; + rhc->failed = true; return; } } @@ -397,7 +408,7 @@ add_p2p_merge (void *cls, { GNUNET_break (0); GNUNET_free (merge); - rhc->status = GNUNET_SYSERR; + rhc->failed = true; return; } merge->flags = (enum TALER_WalletAccountMergeFlags) flags32; @@ -468,7 +479,7 @@ add_open_requests (void *cls, { GNUNET_break (0); GNUNET_free (orq); - rhc->status = GNUNET_SYSERR; + rhc->failed = true; return; } } @@ -524,7 +535,7 @@ add_close_requests (void *cls, { GNUNET_break (0); GNUNET_free (crq); - rhc->status = GNUNET_SYSERR; + rhc->failed = true; return; } TALER_payto_hash (payto_uri, @@ -539,20 +550,24 @@ add_close_requests (void *cls, } -enum GNUNET_DB_QueryStatus -TEH_PG_get_reserve_history ( - void *cls, - const struct TALER_ReservePublicKeyP *reserve_pub, - uint64_t start_off, - uint64_t etag_in, - uint64_t *etag_out, - struct TALER_Amount *balance, - struct TALER_EXCHANGEDB_ReserveHistory **rhp) +/** + * Add reserve history entries found. + * + * @param cls a `struct ReserveHistoryContext *` + * @param result SQL result + * @param num_results number of rows in @a result + */ +static void +handle_history_entry (void *cls, + PGresult *result, + unsigned int num_results) { - struct PostgresClosure *pg = cls; - struct ReserveHistoryContext rhc; - struct + static const struct { + /** + * Table with reserve history entry we are responsible for. + */ + const char *table; /** * Name of the prepared statement to run. */ @@ -563,68 +578,174 @@ TEH_PG_get_reserve_history ( GNUNET_PQ_PostgresResultHandler cb; } work[] = { /** #TALER_EXCHANGEDB_RO_BANK_TO_EXCHANGE */ - { "reserves_in_get_transactions", + { "reserves_in", + "reserves_in_get_transactions", add_bank_to_exchange }, /** #TALER_EXCHANGEDB_RO_WITHDRAW_COIN */ - { "get_reserves_out", + { "reserves_out", + "get_reserves_out", &add_withdraw_coin }, /** #TALER_EXCHANGEDB_RO_RECOUP_COIN */ - { "recoup_by_reserve", + { "recoup", + "recoup_by_reserve", &add_recoup }, /** #TALER_EXCHANGEDB_RO_EXCHANGE_TO_BANK */ - { "close_by_reserve", + { "reserves_close", + "close_by_reserve", &add_exchange_to_bank }, /** #TALER_EXCHANGEDB_RO_PURSE_MERGE */ - { "merge_by_reserve", + { "purse_decision", + "merge_by_reserve", &add_p2p_merge }, /** #TALER_EXCHANGEDB_RO_OPEN_REQUEST */ - { "open_request_by_reserve", + { "reserves_open_requests", + "open_request_by_reserve", &add_open_requests }, /** #TALER_EXCHANGEDB_RO_CLOSE_REQUEST */ - { "close_request_by_reserve", + { "close_requests", + "close_request_by_reserve", &add_close_requests }, /* List terminator */ - { NULL, - NULL } + { NULL, NULL, NULL } + }; + struct ReserveHistoryContext *rhc = cls; + char *table_name; + uint64_t serial_id; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_string ("table_name", + &table_name), + GNUNET_PQ_result_spec_uint64 ("serial_id", + &serial_id), + GNUNET_PQ_result_spec_end + }; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (rhc->reserve_pub), + GNUNET_PQ_query_param_uint64 (&serial_id), + GNUNET_PQ_query_param_end + }; + + while (0 < num_results--) + { + enum GNUNET_DB_QueryStatus qs; + bool found = false; + + if (GNUNET_OK != + GNUNET_PQ_extract_result (result, + rs, + num_results)) + { + GNUNET_break (0); + rhc->failed = true; + return; + } + + for (unsigned int i = 0; + NULL != work[i].cb; + i++) + { + if (0 != strcmp (table_name, + work[i].table)) + continue; + found = true; + qs = GNUNET_PQ_eval_prepared_multi_select (rhc->pg->conn, + work[i].statement, + params, + work[i].cb, + rhc); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reserve %s had %d transactions at %llu in table %s\n", + TALER_B2S (rhc->reserve_pub), + (int) qs, + (unsigned long long) serial_id, + table_name); + if (0 >= qs) + rhc->failed = true; + break; + } + if (! found) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Coin history includes unsupported table `%s`\n", + table_name); + rhc->failed = true; + } + GNUNET_PQ_cleanup_result (rs); + if (rhc->failed) + break; + } +} + + +enum GNUNET_DB_QueryStatus +TEH_PG_get_reserve_history ( + void *cls, + const struct TALER_ReservePublicKeyP *reserve_pub, + uint64_t start_off, + uint64_t etag_in, + uint64_t *etag_out, + struct TALER_Amount *balance, + struct TALER_EXCHANGEDB_ReserveHistory **rhp) +{ + struct PostgresClosure *pg = cls; + struct ReserveHistoryContext rhc = { + .pg = pg, + .reserve_pub = reserve_pub }; - enum GNUNET_DB_QueryStatus qs; struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (reserve_pub), GNUNET_PQ_query_param_end }; + struct GNUNET_PQ_QueryParam lparams[] = { + GNUNET_PQ_query_param_auto_from_type (reserve_pub), + GNUNET_PQ_query_param_uint64 (&start_off), + GNUNET_PQ_query_param_end + }; - PREPARE (pg, // done + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + &rhc.balance_in)); + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + &rhc.balance_out)); + + *rhp = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Getting transactions for reserve %s\n", + TALER_B2S (reserve_pub)); + PREPARE (pg, + "get_reserve_history_etag", + "SELECT" + " hist.reserve_history_serial_id" + ",r.current_balance" + " FROM reserve_history hist" + " JOIN reserves r USING (reserve_pub)" + " WHERE hist.reserve_pub=$1" + " ORDER BY reserve_history_serial_id DESC" + " LIMIT 1;"); + PREPARE (pg, + "get_reserve_history", + "SELECT" + " table_name" + ",serial_id" + " FROM reserve_history" + " WHERE reserve_pub=$1" + " AND reserve_history_serial_id > $2" + " ORDER BY reserve_history_serial_id DESC;"); + + PREPARE (pg, "reserves_in_get_transactions", - /* "SELECT" - " wire_reference" - ",credit" - ",execution_date" - ",payto_uri AS sender_account_details" - " FROM reserves_in" - " JOIN wire_targets" + " ri.wire_reference" + ",ri.credit" + ",ri.execution_date" + ",wt.payto_uri AS sender_account_details" + " FROM reserves_in ri" + " JOIN wire_targets wt" " ON (wire_source_h_payto = wire_target_h_payto)" - " WHERE reserve_pub=$1;", - */ - "WITH ri AS MATERIALIZED ( " - " SELECT * " - " FROM reserves_in " - " WHERE reserve_pub = $1 " - ") " - "SELECT" - " wire_reference" - " ,credit" - " ,execution_date" - " ,payto_uri AS sender_account_details" - " FROM wire_targets" - " JOIN ri" - " ON (wire_target_h_payto = wire_source_h_payto) " - "WHERE wire_target_h_payto = ( " - " SELECT wire_source_h_payto FROM ri " - "); "); - PREPARE (pg, // DONE + " WHERE ri.reserve_pub=$1" + " AND ri.reserve_in_serial_id=$2;"); + PREPARE (pg, "get_reserves_out", - /* "SELECT" " ro.h_blind_ev" ",denom.denom_pub_hash" @@ -633,86 +754,48 @@ TEH_PG_get_reserve_history ( ",ro.execution_date" ",ro.amount_with_fee" ",denom.fee_withdraw" - " FROM reserves res" - " JOIN reserves_out_by_reserve ror" - " ON (res.reserve_uuid = ror.reserve_uuid)" - " JOIN reserves_out ro" - " ON (ro.h_blind_ev = ror.h_blind_ev)" - " JOIN denominations denom" - " ON (ro.denominations_serial = denom.denominations_serial)" - " WHERE res.reserve_pub=$1;", - */ - "WITH robr AS MATERIALIZED ( " - " SELECT h_blind_ev " - " FROM reserves_out_by_reserve " - " WHERE reserve_uuid= ( " - " SELECT reserve_uuid " - " FROM reserves " - " WHERE reserve_pub = $1 " - " ) " - ") SELECT" - " ro.h_blind_ev" - " ,denom.denom_pub_hash" - " ,ro.denom_sig" - " ,ro.reserve_sig" - " ,ro.execution_date" - " ,ro.amount_with_fee" - " ,denom.fee_withdraw" - " FROM robr" - " JOIN reserves_out ro" - " ON (ro.h_blind_ev = robr.h_blind_ev)" + " FROM reserves_out ro" " JOIN denominations denom" - " ON (ro.denominations_serial = denom.denominations_serial);"); - PREPARE (pg, // DONE + " USING (denominations_serial)" + " JOIN reserves res" + " USING (reserve_uuid)" + " WHERE ro.reserve_out_serial_id=$2" + " AND res.reserve_pub=$1;"); + PREPARE (pg, "recoup_by_reserve", - /* "SELECT" - " recoup.coin_pub" - ",recoup.coin_sig" - ",recoup.coin_blind" - ",recoup.amount" - ",recoup.recoup_timestamp" - ",denominations.denom_pub_hash" - ",known_coins.denom_sig" - " FROM denominations" - " JOIN (known_coins" - " JOIN recoup " - " ON (recoup.coin_pub = known_coins.coin_pub))" - " ON (known_coins.denominations_serial = denominations.denominations_serial)" - " WHERE recoup.coin_pub" - " IN (SELECT coin_pub" - " FROM recoup_by_reserve" - " JOIN (reserves_out" - " JOIN (reserves_out_by_reserve" - " JOIN reserves" - " ON (reserves.reserve_uuid = reserves_out_by_reserve.reserve_uuid))" - " ON (reserves_out_by_reserve.h_blind_ev = reserves_out.h_blind_ev))" - " ON (recoup_by_reserve.reserve_out_serial_id = reserves_out.reserve_out_serial_id)" - " WHERE reserves.reserve_pub=$1);", - */ - "SELECT robr.coin_pub " - " ,robr.coin_sig" - " ,robr.coin_blind" - " ,robr.amount" - " ,robr.recoup_timestamp " - " ,denominations.denom_pub_hash " - " ,robr.denom_sig " - "FROM denominations " - " JOIN exchange_do_recoup_by_reserve($1) robr" - " USING (denominations_serial);"); + " rec.coin_pub" + ",rec.coin_sig" + ",rec.coin_blind" + ",rec.amount" + ",rec.recoup_timestamp" + ",denom.denom_pub_hash" + ",kc.denom_sig" + " FROM recoup rec" + " JOIN reserves_out ro" + " USING (reserve_out_serial_id)" + " JOIN reserves res" + " USING (reserve_uuid)" + " JOIN known_coins kc" + " USING (coin_pub)" + " JOIN denominations denom" + " ON (denom.denominations_serial = kc.denominations_serial)" + " WHERE rec.recoup_uuid=$2" + " AND res.reserve_pub=$1;"); PREPARE (pg, "close_by_reserve", "SELECT" - " amount" - ",closing_fee" - ",execution_date" - ",payto_uri AS receiver_account" - ",wtid" - " FROM reserves_close" - " JOIN wire_targets" - " USING (wire_target_h_payto)" - " WHERE reserve_pub=$1;"); - PREPARE (pg, // DONE + " rc.amount" + ",rc.closing_fee" + ",rc.execution_date" + ",wt.payto_uri AS receiver_account" + ",rc.wtid" + " FROM reserves_close rc" + " JOIN wire_targets wt" + " USING (wire_target_h_payto)" + " WHERE reserve_pub=$1" + " AND close_uuid=$2;"); + PREPARE (pg, "merge_by_reserve", "SELECT" " pr.amount_with_fee" @@ -726,18 +809,19 @@ TEH_PG_get_reserve_history ( ",pr.purse_expiration" ",pr.age_limit" ",pr.flags" - " FROM purse_merges pm" + " FROM purse_decision pdes" " JOIN purse_requests pr" - " USING (purse_pub)" - " LEFT JOIN purse_decision pdes" - " USING (purse_pub)" + " ON (pr.purse_pub = pdes.purse_pub)" + " JOIN purse_merges pm" + " ON (pm.purse_pub = pdes.purse_pub)" " JOIN account_merges am" " ON (am.purse_pub = pm.purse_pub AND" " am.reserve_pub = pm.reserve_pub)" - " WHERE pm.reserve_pub=$1" + " WHERE pdes.purse_decision_serial_id=$2" + " AND pm.reserve_pub=$1" " AND COALESCE(pm.partner_serial_id,0)=0" /* must be local! */ - " AND NOT COALESCE (pdes.refunded, FALSE);"); - PREPARE (pg, // done + " AND NOT pdes.refunded;"); + PREPARE (pg, "open_request_by_reserve", "SELECT" " reserve_payment" @@ -746,60 +830,107 @@ TEH_PG_get_reserve_history ( ",requested_purse_limit" ",reserve_sig" " FROM reserves_open_requests" - " WHERE reserve_pub=$1;"); - PREPARE (pg, // done + " WHERE reserve_pub=$1" + " AND open_request_uuid=$2;"); + PREPARE (pg, "close_request_by_reserve", "SELECT" " close_timestamp" ",payto_uri" ",reserve_sig" " FROM close_requests" - " WHERE reserve_pub=$1;"); + " WHERE reserve_pub=$1" + " AND close_request_serial_id=$2;"); - rhc.reserve_pub = reserve_pub; - rhc.rh = NULL; - rhc.rh_tail = NULL; - rhc.pg = pg; - rhc.status = GNUNET_OK; - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (pg->currency, - &rhc.balance_in)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (pg->currency, - &rhc.balance_out)); - qs = GNUNET_DB_STATUS_SUCCESS_NO_RESULTS; /* make static analysis happy */ - for (unsigned int i = 0; NULL != work[i].cb; i++) + for (unsigned int i = 0; iconn, - work[i].statement, - params, - work[i].cb, - &rhc); - if ( (0 > qs) || - (GNUNET_OK != rhc.status) ) + enum GNUNET_DB_QueryStatus qs; + uint64_t end; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("reserve_history_serial_id", + &end), + TALER_PQ_RESULT_SPEC_AMOUNT ("current_balance", + balance), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_OK != + TEH_PG_start_read_committed (pg, + "get-reserve-transactions")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + /* First only check the last item, to see if + we even need to iterate */ + qs = GNUNET_PQ_eval_prepared_singleton_select ( + pg->conn, + "get_reserve_history_etag", + params, + rs); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + TEH_PG_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + *etag_out = end; + if (end == etag_in) + return qs; + } + /* We indeed need to iterate over the history */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Current ETag for reserve %s is %llu\n", + TALER_B2S (reserve_pub), + (unsigned long long) end); + + qs = GNUNET_PQ_eval_prepared_multi_select ( + pg->conn, + "get_reserve_history", + lparams, + &handle_history_entry, + &rhc); + switch (qs) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to compile reserve history at `%s'\n", - work[i].statement); + case GNUNET_DB_STATUS_HARD_ERROR: + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + TEH_PG_rollback (pg); + continue; + default: break; } - } - if ( (qs < 0) || - (rhc.status != GNUNET_OK) ) - { - TEH_COMMON_free_reserve_history (cls, - rhc.rh); - rhc.rh = NULL; - if (qs >= 0) + if (rhc.failed) { - /* status == SYSERR is a very hard error... */ - qs = GNUNET_DB_STATUS_HARD_ERROR; + TEH_PG_rollback (pg); + TEH_COMMON_free_reserve_history (pg, + rhc.rh); + return GNUNET_DB_STATUS_SOFT_ERROR; + } + qs = TEH_PG_commit (pg); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + TEH_COMMON_free_reserve_history (pg, + rhc.rh); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + TEH_COMMON_free_reserve_history (pg, + rhc.rh); + rhc.rh = NULL; + continue; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + *rhp = rhc.rh; + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } } - *rhp = rhc.rh; - GNUNET_assert (0 <= - TALER_amount_subtract (balance, - &rhc.balance_in, - &rhc.balance_out)); - return qs; + return GNUNET_DB_STATUS_SOFT_ERROR; } -- cgit v1.2.3