diff options
author | Christian Grothoff <grothoff@gnunet.org> | 2022-03-27 13:48:25 +0200 |
---|---|---|
committer | Christian Grothoff <grothoff@gnunet.org> | 2022-03-27 13:48:25 +0200 |
commit | b9a9af3a59f3abdb09afb9d0f9e4c0d83df789b7 (patch) | |
tree | 5fda2a92b09ef8cb476a289e44e73d48ffef00e2 | |
parent | d0a69da8954fd72f361795c2e007bad3fe5accd1 (diff) |
new aggregator mega transaction logic
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 545 | ||||
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 111 | ||||
-rw-r--r-- | src/include/taler_exchangedb_plugin.h | 8 |
3 files changed, 159 insertions, 505 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index c34d47f9c..04cf426de 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -28,18 +28,6 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" -struct AdditionalDeposit -{ - /** - * Public key of the coin. - */ - struct TALER_CoinSpendPublicKeyP coin_pub; - - /** - * Row of the deposit. - */ - uint64_t row; -}; /** * Information about one aggregation process to be executed. There is @@ -55,11 +43,6 @@ struct AggregationUnit struct TALER_MerchantPublicKeyP merchant_pub; /** - * Public key of the coin. - */ - struct TALER_CoinSpendPublicKeyP coin_pub; - - /** * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ struct TALER_Amount total_amount; @@ -80,11 +63,6 @@ struct AggregationUnit struct TALER_WireTransferIdentifierRawP wtid; /** - * Row ID of the transaction that started it all. - */ - uint64_t row_id; - - /** * The current time (which triggered the aggregation and * defines the wire fee). */ @@ -101,32 +79,11 @@ struct AggregationUnit struct TALER_PaytoHashP h_payto; /** - * Serial number of the wire target. - */ - uint64_t wire_target; - - /** * Exchange wire account to be used for the preparation and * eventual execution of the aggregate wire transfer. */ const struct TALER_EXCHANGEDB_AccountInfo *wa; - /** - * Array of row_ids from the aggregation. - */ - struct AdditionalDeposit - additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; - - /** - * Offset specifying how many @e additional_rows are in use. - */ - unsigned int rows_offset; - - /** - * Set to true if we encountered a refund during #refund_by_coin_cb. - * Used to wave the deposit fee. - */ - bool have_refund; }; @@ -341,331 +298,6 @@ parse_wirewatch_config (void) /** - * Callback invoked with information about refunds applicable - * to a particular coin. Subtract refunded amount(s) from - * the aggregation unit's total amount. - * - * @param cls closure with a `struct AggregationUnit *` - * @param amount_with_fee what was the refunded amount with the fee - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop - */ -static enum GNUNET_GenericReturnValue -refund_by_coin_cb (void *cls, - const struct TALER_Amount *amount_with_fee) -{ - struct AggregationUnit *aux = cls; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator subtracts applicable refund of amount %s\n", - TALER_amount2s (amount_with_fee)); - aux->have_refund = true; - if (0 > - TALER_amount_subtract (&aux->total_amount, - &aux->total_amount, - amount_with_fee)) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - return GNUNET_OK; -} - - -/** - * Function called with details about deposits that have been made, - * with the goal of executing the corresponding wire transaction. - * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param merchant_pub public key of the merchant - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @param wire_target target account for the wire transfer - * @param payto_uri URI of the target account - * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate - */ -static enum GNUNET_DB_QueryStatus -deposit_cb (void *cls, - uint64_t row_id, - const struct TALER_MerchantPublicKeyP *merchant_pub, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct TALER_PrivateContractHashP *h_contract_terms, - uint64_t wire_target, - const char *payto_uri) -{ - struct AggregationUnit *au = cls; - enum GNUNET_DB_QueryStatus qs; - - au->merchant_pub = *merchant_pub; - au->coin_pub = *coin_pub; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator processing payment %s with amount %s\n", - TALER_B2S (coin_pub), - TALER_amount2s (amount_with_fee)); - au->row_id = row_id; - au->total_amount = *amount_with_fee; - au->have_refund = false; - qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - coin_pub, - &au->merchant_pub, - h_contract_terms, - &refund_by_coin_cb, - au); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - if (! au->have_refund) - { - struct TALER_Amount ntotal; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Non-refunded transaction, subtracting deposit fee %s\n", - TALER_amount2s (deposit_fee)); - if (0 > - TALER_amount_subtract (&ntotal, - amount_with_fee, - deposit_fee)) - { - /* This should never happen, issue a warning, but continue processing - with an amount of zero, least we hang here for good. */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n", - (unsigned long long) row_id, - TALER_amount2s (amount_with_fee)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (au->total_amount.currency, - &au->total_amount)); - } - else - { - au->total_amount = ntotal; - } - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Amount after fee is %s\n", - TALER_amount2s (&au->total_amount)); - - GNUNET_assert (NULL == au->payto_uri); - au->payto_uri = GNUNET_strdup (payto_uri); - TALER_payto_hash (payto_uri, - &au->h_payto); - au->wire_target = wire_target; - GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &au->wtid, - sizeof (au->wtid)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n", - TALER_B2S (&au->wtid), - TALER_amount2s (amount_with_fee), - (unsigned long long) row_id); - au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (payto_uri); - if (NULL == au->wa) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "No exchange account configured for `%s', please fix your setup to continue!\n", - payto_uri); - return GNUNET_DB_STATUS_HARD_ERROR; - } - - /* make sure we have current fees */ - au->execution_time = GNUNET_TIME_timestamp_get (); - { - struct GNUNET_TIME_Timestamp start_date; - struct GNUNET_TIME_Timestamp end_date; - struct TALER_MasterSignatureP master_sig; - enum GNUNET_DB_QueryStatus qs; - - qs = db_plugin->get_wire_fee (db_plugin->cls, - au->wa->method, - au->execution_time, - &start_date, - &end_date, - &au->fees, - &master_sig); - if (0 >= qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Could not get wire fees for %s at %s. Aborting run.\n", - au->wa->method, - GNUNET_TIME_timestamp2s (au->execution_time)); - return GNUNET_DB_STATUS_HARD_ERROR; - } - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n", - (unsigned long long) row_id, - TALER_B2S (&au->wtid), - TALER_amount2s (&au->fees.wire)); - qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - &au->wtid, - row_id); - if (qs <= 0) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marks deposit %llu as done\n", - (unsigned long long) row_id); - qs = db_plugin->mark_deposit_done (db_plugin->cls, - coin_pub, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - return qs; -} - - -/** - * Function called with details about another deposit we - * can aggregate into an existing aggregation unit. - * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @return transaction status code - */ -static enum GNUNET_DB_QueryStatus -aggregate_cb (void *cls, - uint64_t row_id, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct TALER_PrivateContractHashP *h_contract_terms) -{ - struct AggregationUnit *au = cls; - struct TALER_Amount old; - enum GNUNET_DB_QueryStatus qs; - - if (row_id == au->row_id) - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) - { - /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ - GNUNET_break (0); - /* Skip this one, but keep going with the overall transaction */ - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } - - /* add to total */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding transaction amount %s from row %llu to aggregation\n", - TALER_amount2s (amount_with_fee), - (unsigned long long) row_id); - /* save the existing total aggregate in 'old', for later */ - old = au->total_amount; - /* we begin with the total contribution of the current coin */ - au->total_amount = *amount_with_fee; - /* compute contribution of this coin (after fees) */ - au->have_refund = false; - qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - coin_pub, - &au->merchant_pub, - h_contract_terms, - &refund_by_coin_cb, - au); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - if (! au->have_refund) - { - struct TALER_Amount tmp; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Subtracting deposit fee %s for non-refunded coin\n", - TALER_amount2s (deposit_fee)); - if (0 > - TALER_amount_subtract (&tmp, - &au->total_amount, - deposit_fee)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n", - (unsigned long long) row_id, - TALER_amount2s (&au->total_amount)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (old.currency, - &au->total_amount)); - } - else - { - au->total_amount = tmp; - } - } - - /* now add the au->total_amount with the (remaining) contribution of - the current coin to the 'old' value with the current aggregate value */ - { - struct TALER_Amount tmp; - - if (0 > - TALER_amount_add (&tmp, - &au->total_amount, - &old)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Overflow or currency incompatibility during aggregation at %llu\n", - (unsigned long long) row_id); - /* Skip this one, but keep going! */ - au->total_amount = old; - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } - au->total_amount = tmp; - } - - /* "append" to our list of rows */ - au->additional_rows[au->rows_offset].coin_pub = *coin_pub; - au->additional_rows[au->rows_offset].row = row_id; - au->rows_offset++; - /* insert into aggregation tracking table */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding %llu to aggregate %s\n", - (unsigned long long) row_id, - TALER_B2S (&au->wtid)); - qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - &au->wtid, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to add %llu to aggregate %s: %d\n", - (unsigned long long) row_id, - TALER_B2S (&au->wtid), - qs); - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - qs = db_plugin->mark_deposit_done (db_plugin->cls, - coin_pub, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marked deposit %llu as DONE\n", - (unsigned long long) row_id); - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -} - - -/** * Perform a database commit. If it fails, print a warning. * * @return status of commit @@ -727,10 +359,17 @@ run_aggregation (void *cls) struct Shard *s = cls; struct AggregationUnit au_active; enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount trans; + bool have_transient; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for ready deposits to aggregate\n"); + /* make sure we have current fees */ + memset (&au_active, + 0, + sizeof (au_active)); + au_active.execution_time = GNUNET_TIME_timestamp_get (); if (GNUNET_OK != db_plugin->start_deferred_wire_out (db_plugin->cls)) { @@ -741,16 +380,13 @@ run_aggregation (void *cls) release_shard (s); return; } - memset (&au_active, - 0, - sizeof (au_active)); qs = db_plugin->get_ready_deposit ( db_plugin->cls, s->shard_start, s->shard_end, kyc_off ? true : false, - &deposit_cb, - &au_active); + &au_active.merchant_pub, + &au_active.payto_uri); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -808,22 +444,98 @@ run_aggregation (void *cls) /* continued below */ break; } + au_active.wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( + au_active.payto_uri); + if (NULL == au_active.wa) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No exchange account configured for `%s', please fix your setup to continue!\n", + au_active.payto_uri); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + } + + { + struct GNUNET_TIME_Timestamp start_date; + struct GNUNET_TIME_Timestamp end_date; + struct TALER_MasterSignatureP master_sig; + + qs = db_plugin->get_wire_fee (db_plugin->cls, + au_active.wa->method, + au_active.execution_time, + &start_date, + &end_date, + &au_active.fees, + &master_sig); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not get wire fees for %s at %s. Aborting run.\n", + au_active.wa->method, + GNUNET_TIME_timestamp2s (au_active.execution_time)); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + } + } + /* Now try to find other deposits to aggregate */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found ready deposit for %s, aggregating by target %llu\n", + "Found ready deposit for %s, aggregating by target %s\n", TALER_B2S (&au_active.merchant_pub), - (unsigned long long) au_active.wire_target); - qs = db_plugin->iterate_matching_deposits (db_plugin->cls, - &au_active.h_payto, - &au_active.merchant_pub, - &aggregate_cb, - &au_active, - TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT); + au_active.payto_uri); + TALER_payto_hash (au_active.payto_uri, + &au_active.h_payto); + + qs = db_plugin->select_aggregation_transient (db_plugin->cls, + &au_active.h_payto, + au_active.wa->section_name, + &au_active.wtid, + &trans); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup transient aggregates!\n"); + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + db_plugin->rollback (db_plugin->cls); + cleanup_au (&au_active); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, + &au_active.wtid, + sizeof (au_active.wtid)); + have_transient = false; + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + have_transient = true; + break; + } + qs = db_plugin->aggregate (db_plugin->cls, + &au_active.h_payto, + &au_active.merchant_pub, + &au_active.wtid, + &au_active.total_amount); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to execute deposit iteration!\n"); + "Failed to execute aggregation!\n"); cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; @@ -844,13 +556,17 @@ run_aggregation (void *cls) return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found %d other deposits to combine into wire transfer with fee %s.\n", - qs, - TALER_amount2s (&au_active.fees.wire)); + "Aggregation total is %s.\n", + TALER_amount2s (&au_active.total_amount)); /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ + if (have_transient) + GNUNET_assert (0 <= + TALER_amount_add (&au_active.total_amount, + &au_active.total_amount, + &trans)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Rounding aggregate of %s\n", TALER_amount2s (&au_active.total_amount)); @@ -867,45 +583,17 @@ run_aggregation (void *cls) "Aggregate value too low for transfer (%d/%s)\n", qs, TALER_amount2s (&au_active.final_amount)); - /* Rollback ongoing transaction, as we will not use the respective - WTID and thus need to remove the tracking data */ - db_plugin->rollback (db_plugin->cls); - - /* There were results, just the value was too low. Start another - transaction to mark all* of the selected deposits as minor! */ - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - "aggregator mark tiny transactions")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - cleanup_au (&au_active); - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; - } - /* Mark transactions by row_id as minor */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Marking %s (%llu) as tiny\n", - TALER_B2S (&au_active.coin_pub), - (unsigned long long) au_active.row_id); - qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - &au_active.coin_pub, - au_active.row_id); - if (0 < qs) - { - for (unsigned int i = 0; i<au_active.rows_offset; i++) - { - qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - &au_active.additional_rows[i]. - coin_pub, - au_active.additional_rows[i].row); - if (0 >= qs) - break; - } - } - GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs); + if (have_transient) + qs = db_plugin->update_aggregation_transient (db_plugin->cls, + &au_active.h_payto, + &au_active.wtid, + &au_active.total_amount); + else + qs = db_plugin->create_aggregation_transient (db_plugin->cls, + &au_active.h_payto, + au_active.wa->section_name, + &au_active.wtid, + &au_active.total_amount); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -962,8 +650,7 @@ run_aggregation (void *cls) buf_size); GNUNET_free (buf); } - /* Commit the WTID data to 'wire_out' to finally satisfy aggregation - table constraints */ + /* Commit the WTID data to 'wire_out' */ if (qs >= 0) qs = db_plugin->store_wire_transfer_out (db_plugin->cls, au_active.execution_time, @@ -971,6 +658,12 @@ run_aggregation (void *cls) &au_active.h_payto, au_active.wa->section_name, &au_active.final_amount); + + if ( (qs >= 0) && + have_transient) + qs = db_plugin->delete_aggregation_transient (db_plugin->cls, + &au_active.h_payto, + &au_active.wtid); cleanup_au (&au_active); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 1709f17e4..36a5e48b2 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1160,29 +1160,17 @@ prepare_statements (struct PostgresClosure *pg) GNUNET_PQ_make_prepare ( "deposits_get_ready", "SELECT" - " dep.deposit_serial_id" - ",amount_with_fee_val" - ",amount_with_fee_frac" - ",denom.fee_deposit_val" - ",denom.fee_deposit_frac" - ",h_contract_terms" - ",payto_uri" - ",wire_target_serial_id" + " payto_uri" ",merchant_pub" - ",kc.coin_pub" " FROM deposits_by_ready dbr" " JOIN deposits dep" " ON (dbr.coin_pub = dep.coin_pub AND dbr.deposit_serial_id = dep.deposit_serial_id)" - " JOIN wire_targets " + " JOIN wire_targets wt" " USING (wire_target_h_payto)" - " JOIN known_coins kc" - " ON (kc.coin_pub = dep.coin_pub)" - " JOIN denominations denom" - " USING (denominations_serial)" " WHERE dbr.wire_deadline<=$1" " AND dbr.shard >= $2" " AND dbr.shard <= $3" - " AND (kyc_ok OR $4)" + " AND (wt.kyc_ok OR $4)" " ORDER BY " " dbr.wire_deadline ASC" " ,dbr.shard ASC" @@ -1218,22 +1206,23 @@ prepare_statements (struct PostgresClosure *pg) /* Used in #postgres_aggregate() */ GNUNET_PQ_make_prepare ( "aggregate", - "WITH rdy AS (" /* find deposits ready */ + "WITH rdy AS (" /* find deposits ready by merchant */ " SELECT" " coin_pub" " FROM deposits_for_matching" - " WHERE refund_deadline<$1" - " AND merchant_pub=$2" + " WHERE refund_deadline<$1" /* filter by shard, only actually executable deposits */ + " AND merchant_pub=$2" /* filter by target merchant */ " ORDER BY refund_deadline ASC" /* ordering is not critical */ " LIMIT " - TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) + TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) /* limits transaction size */ " )" - " ,dep AS (" /* restrict to our merchant and account */ + " ,dep AS (" /* restrict to our merchant and account and mark as done */ " UPDATE deposits" " SET done=TRUE" " WHERE coin_pub IN (SELECT coin_pub FROM rdy)" - " AND merchant_pub=$2" - " AND wire_target_h_payto=$3" + " AND merchant_pub=$2" /* theoretically, same coin could be spent at another merchant */ + " AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */ + " AND done=FALSE" /* theoretically, same coin could be spend at the same merchant a 2nd time */ " RETURNING" " deposit_serial_id" " ,coin_pub" @@ -1244,18 +1233,26 @@ prepare_statements (struct PostgresClosure *pg) " amount_with_fee_val AS refund_val" " ,amount_with_fee_frac AS refund_frac" " ,coin_pub" + " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ " FROM refunds" " WHERE coin_pub IN (SELECT coin_pub FROM dep)" " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))" + " ,coins_with_fees AS (" /* find coins for which deposit fees apply */ + " SELECT" + " coin_pub" + " ,deposit_serial_id" /* ensures that if the same coin is deposited twice, it is in the list twice */ + " FROM dep" + " WHERE deposit_serial_id NOT IN (SELECT deposit_serial_id FROM ref))" " ,fees AS (" /* find deposit fees for non-refunded deposits */ " SELECT" " denom.fee_deposit_val AS fee_val" " ,denom.fee_deposit_frac AS fee_frac" - " FROM known_coins kc" + " ,cs.deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */ + " FROM coins_with_fees cs" + " JOIN known_coins kc" + " USING (coin_pub)" " JOIN denominations denom" - " USING (denominations_serial)" - " WHERE coin_pub IN (SELECT coin_pub FROM dep)" - " AND coin_pub NOT IN (SELECT coin_pub FROM ref))" + " USING (denominations_serial))" " ,dummy AS (" /* add deposits to aggregation_tracking */ " INSERT INTO aggregation_tracking" " (deposit_serial_id" @@ -1263,14 +1260,14 @@ prepare_statements (struct PostgresClosure *pg) " SELECT deposit_serial_id,$4" " FROM dep)" "SELECT" /* calculate totals (deposits, refunds and fees) */ - " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" - " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" + " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" /* cast needed, otherwise we get NUMBER */ + " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */ " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value" " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction" " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value" " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction" " FROM dep " - " FULL OUTER JOIN ref ON (FALSE)" + " FULL OUTER JOIN ref ON (FALSE)" /* We just want all sums */ " FULL OUTER JOIN fees ON (FALSE);", 4), @@ -6270,8 +6267,8 @@ postgres_mark_deposit_done (void *cls, * @param end_shard_row maximum shard row to select (inclusive) * @param kyc_off true if we should not check the KYC status because * this exchange does not need/support KYC checks. - * @param deposit_cb function to call for ONE such deposit - * @param deposit_cb_cls closure for @a deposit_cb + * @param[out] merchant_pub set to the public key of a merchant with a ready deposit + * @param[out] payto_uri set to the account of the merchant, to be freed by caller * @return transaction status code */ static enum GNUNET_DB_QueryStatus @@ -6279,8 +6276,8 @@ postgres_get_ready_deposit (void *cls, uint64_t start_shard_row, uint64_t end_shard_row, bool kyc_off, - TALER_EXCHANGEDB_DepositIterator deposit_cb, - void *deposit_cb_cls) + struct TALER_MerchantPublicKeyP *merchant_pub, + char **payto_uri) { struct PostgresClosure *pg = cls; struct GNUNET_TIME_Absolute now = {0}; @@ -6291,34 +6288,13 @@ postgres_get_ready_deposit (void *cls, GNUNET_PQ_query_param_bool (kyc_off), GNUNET_PQ_query_param_end }; - struct TALER_Amount amount_with_fee; - struct TALER_Amount deposit_fee; - struct TALER_PrivateContractHashP h_contract_terms; - struct TALER_MerchantPublicKeyP merchant_pub; - struct TALER_CoinSpendPublicKeyP coin_pub; - uint64_t serial_id; - uint64_t wire_target; - char *payto_uri; struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_uint64 ("deposit_serial_id", - &serial_id), - GNUNET_PQ_result_spec_uint64 ("wire_target_serial_id", - &wire_target), - TALER_PQ_RESULT_SPEC_AMOUNT ("amount_with_fee", - &amount_with_fee), - TALER_PQ_RESULT_SPEC_AMOUNT ("fee_deposit", - &deposit_fee), - GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms", - &h_contract_terms), GNUNET_PQ_result_spec_auto_from_type ("merchant_pub", - &merchant_pub), - GNUNET_PQ_result_spec_auto_from_type ("coin_pub", - &coin_pub), + merchant_pub), GNUNET_PQ_result_spec_string ("payto_uri", - &payto_uri), + payto_uri), GNUNET_PQ_result_spec_end }; - enum GNUNET_DB_QueryStatus qs; now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (), pg->aggregator_shift); @@ -6328,25 +6304,10 @@ postgres_get_ready_deposit (void *cls, "Finding ready deposits by deadline %s (%llu)\n", GNUNET_TIME_absolute2s (now), (unsigned long long) now.abs_value_us); - - qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, - "deposits_get_ready", - params, - rs); - if (qs <= 0) - return qs; - - qs = deposit_cb (deposit_cb_cls, - serial_id, - &merchant_pub, - &coin_pub, - &amount_with_fee, - &deposit_fee, - &h_contract_terms, - wire_target, - payto_uri); - GNUNET_PQ_cleanup_result (rs); - return qs; + return GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "deposits_get_ready", + params, + rs); } diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 4ca6905e0..06810a7de 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -3097,8 +3097,8 @@ struct TALER_EXCHANGEDB_Plugin * @param end_shard_row maximum shard row to select (inclusive) * @param kyc_off true if we should not check the KYC status because * this exchange does not need/support KYC checks. - * @param deposit_cb function to call for ONE such deposit - * @param deposit_cb_cls closure for @a deposit_cb + * @param[out] merchant_pub set to the public key of a merchant with a ready deposit + * @param[out] payto_uri set to the account of the merchant, to be freed by caller * @return transaction status code */ enum GNUNET_DB_QueryStatus @@ -3106,8 +3106,8 @@ struct TALER_EXCHANGEDB_Plugin uint64_t start_shard_row, uint64_t end_shard_row, bool kyc_off, - TALER_EXCHANGEDB_DepositIterator deposit_cb, - void *deposit_cb_cls); + struct TALER_MerchantPublicKeyP *merchant_pub, + char **payto_uri); /** |