aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <grothoff@gnunet.org>2022-03-27 13:48:25 +0200
committerChristian Grothoff <grothoff@gnunet.org>2022-03-27 13:48:25 +0200
commitb9a9af3a59f3abdb09afb9d0f9e4c0d83df789b7 (patch)
tree5fda2a92b09ef8cb476a289e44e73d48ffef00e2
parentd0a69da8954fd72f361795c2e007bad3fe5accd1 (diff)
new aggregator mega transaction logic
-rw-r--r--src/exchange/taler-exchange-aggregator.c545
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c111
-rw-r--r--src/include/taler_exchangedb_plugin.h8
3 files changed, 159 insertions, 505 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index c34d47f9c..04cf426de 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -28,18 +28,6 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
-struct AdditionalDeposit
-{
- /**
- * Public key of the coin.
- */
- struct TALER_CoinSpendPublicKeyP coin_pub;
-
- /**
- * Row of the deposit.
- */
- uint64_t row;
-};
/**
* Information about one aggregation process to be executed. There is
@@ -55,11 +43,6 @@ struct AggregationUnit
struct TALER_MerchantPublicKeyP merchant_pub;
/**
- * Public key of the coin.
- */
- struct TALER_CoinSpendPublicKeyP coin_pub;
-
- /**
* Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
*/
struct TALER_Amount total_amount;
@@ -80,11 +63,6 @@ struct AggregationUnit
struct TALER_WireTransferIdentifierRawP wtid;
/**
- * Row ID of the transaction that started it all.
- */
- uint64_t row_id;
-
- /**
* The current time (which triggered the aggregation and
* defines the wire fee).
*/
@@ -101,32 +79,11 @@ struct AggregationUnit
struct TALER_PaytoHashP h_payto;
/**
- * Serial number of the wire target.
- */
- uint64_t wire_target;
-
- /**
* Exchange wire account to be used for the preparation and
* eventual execution of the aggregate wire transfer.
*/
const struct TALER_EXCHANGEDB_AccountInfo *wa;
- /**
- * Array of row_ids from the aggregation.
- */
- struct AdditionalDeposit
- additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
-
- /**
- * Offset specifying how many @e additional_rows are in use.
- */
- unsigned int rows_offset;
-
- /**
- * Set to true if we encountered a refund during #refund_by_coin_cb.
- * Used to wave the deposit fee.
- */
- bool have_refund;
};
@@ -341,331 +298,6 @@ parse_wirewatch_config (void)
/**
- * Callback invoked with information about refunds applicable
- * to a particular coin. Subtract refunded amount(s) from
- * the aggregation unit's total amount.
- *
- * @param cls closure with a `struct AggregationUnit *`
- * @param amount_with_fee what was the refunded amount with the fee
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
- */
-static enum GNUNET_GenericReturnValue
-refund_by_coin_cb (void *cls,
- const struct TALER_Amount *amount_with_fee)
-{
- struct AggregationUnit *aux = cls;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator subtracts applicable refund of amount %s\n",
- TALER_amount2s (amount_with_fee));
- aux->have_refund = true;
- if (0 >
- TALER_amount_subtract (&aux->total_amount,
- &aux->total_amount,
- amount_with_fee))
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- return GNUNET_OK;
-}
-
-
-/**
- * Function called with details about deposits that have been made,
- * with the goal of executing the corresponding wire transaction.
- *
- * @param cls a `struct AggregationUnit`
- * @param row_id identifies database entry
- * @param merchant_pub public key of the merchant
- * @param coin_pub public key of the coin
- * @param amount_with_fee amount that was deposited including fee
- * @param deposit_fee amount the exchange gets to keep as transaction fees
- * @param h_contract_terms hash of the proposal data known to merchant and customer
- * @param wire_target target account for the wire transfer
- * @param payto_uri URI of the target account
- * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
- */
-static enum GNUNET_DB_QueryStatus
-deposit_cb (void *cls,
- uint64_t row_id,
- const struct TALER_MerchantPublicKeyP *merchant_pub,
- const struct TALER_CoinSpendPublicKeyP *coin_pub,
- const struct TALER_Amount *amount_with_fee,
- const struct TALER_Amount *deposit_fee,
- const struct TALER_PrivateContractHashP *h_contract_terms,
- uint64_t wire_target,
- const char *payto_uri)
-{
- struct AggregationUnit *au = cls;
- enum GNUNET_DB_QueryStatus qs;
-
- au->merchant_pub = *merchant_pub;
- au->coin_pub = *coin_pub;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator processing payment %s with amount %s\n",
- TALER_B2S (coin_pub),
- TALER_amount2s (amount_with_fee));
- au->row_id = row_id;
- au->total_amount = *amount_with_fee;
- au->have_refund = false;
- qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
- coin_pub,
- &au->merchant_pub,
- h_contract_terms,
- &refund_by_coin_cb,
- au);
- if (0 > qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- if (! au->have_refund)
- {
- struct TALER_Amount ntotal;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Non-refunded transaction, subtracting deposit fee %s\n",
- TALER_amount2s (deposit_fee));
- if (0 >
- TALER_amount_subtract (&ntotal,
- amount_with_fee,
- deposit_fee))
- {
- /* This should never happen, issue a warning, but continue processing
- with an amount of zero, least we hang here for good. */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n",
- (unsigned long long) row_id,
- TALER_amount2s (amount_with_fee));
- GNUNET_assert (GNUNET_OK ==
- TALER_amount_set_zero (au->total_amount.currency,
- &au->total_amount));
- }
- else
- {
- au->total_amount = ntotal;
- }
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Amount after fee is %s\n",
- TALER_amount2s (&au->total_amount));
-
- GNUNET_assert (NULL == au->payto_uri);
- au->payto_uri = GNUNET_strdup (payto_uri);
- TALER_payto_hash (payto_uri,
- &au->h_payto);
- au->wire_target = wire_target;
- GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
- &au->wtid,
- sizeof (au->wtid));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n",
- TALER_B2S (&au->wtid),
- TALER_amount2s (amount_with_fee),
- (unsigned long long) row_id);
- au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (payto_uri);
- if (NULL == au->wa)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "No exchange account configured for `%s', please fix your setup to continue!\n",
- payto_uri);
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
-
- /* make sure we have current fees */
- au->execution_time = GNUNET_TIME_timestamp_get ();
- {
- struct GNUNET_TIME_Timestamp start_date;
- struct GNUNET_TIME_Timestamp end_date;
- struct TALER_MasterSignatureP master_sig;
- enum GNUNET_DB_QueryStatus qs;
-
- qs = db_plugin->get_wire_fee (db_plugin->cls,
- au->wa->method,
- au->execution_time,
- &start_date,
- &end_date,
- &au->fees,
- &master_sig);
- if (0 >= qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Could not get wire fees for %s at %s. Aborting run.\n",
- au->wa->method,
- GNUNET_TIME_timestamp2s (au->execution_time));
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n",
- (unsigned long long) row_id,
- TALER_B2S (&au->wtid),
- TALER_amount2s (&au->fees.wire));
- qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
- &au->wtid,
- row_id);
- if (qs <= 0)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator marks deposit %llu as done\n",
- (unsigned long long) row_id);
- qs = db_plugin->mark_deposit_done (db_plugin->cls,
- coin_pub,
- row_id);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- return qs;
-}
-
-
-/**
- * Function called with details about another deposit we
- * can aggregate into an existing aggregation unit.
- *
- * @param cls a `struct AggregationUnit`
- * @param row_id identifies database entry
- * @param coin_pub public key of the coin
- * @param amount_with_fee amount that was deposited including fee
- * @param deposit_fee amount the exchange gets to keep as transaction fees
- * @param h_contract_terms hash of the proposal data known to merchant and customer
- * @return transaction status code
- */
-static enum GNUNET_DB_QueryStatus
-aggregate_cb (void *cls,
- uint64_t row_id,
- const struct TALER_CoinSpendPublicKeyP *coin_pub,
- const struct TALER_Amount *amount_with_fee,
- const struct TALER_Amount *deposit_fee,
- const struct TALER_PrivateContractHashP *h_contract_terms)
-{
- struct AggregationUnit *au = cls;
- struct TALER_Amount old;
- enum GNUNET_DB_QueryStatus qs;
-
- if (row_id == au->row_id)
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
- {
- /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */
- GNUNET_break (0);
- /* Skip this one, but keep going with the overall transaction */
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- }
-
- /* add to total */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Adding transaction amount %s from row %llu to aggregation\n",
- TALER_amount2s (amount_with_fee),
- (unsigned long long) row_id);
- /* save the existing total aggregate in 'old', for later */
- old = au->total_amount;
- /* we begin with the total contribution of the current coin */
- au->total_amount = *amount_with_fee;
- /* compute contribution of this coin (after fees) */
- au->have_refund = false;
- qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
- coin_pub,
- &au->merchant_pub,
- h_contract_terms,
- &refund_by_coin_cb,
- au);
- if (0 > qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- if (! au->have_refund)
- {
- struct TALER_Amount tmp;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Subtracting deposit fee %s for non-refunded coin\n",
- TALER_amount2s (deposit_fee));
- if (0 >
- TALER_amount_subtract (&tmp,
- &au->total_amount,
- deposit_fee))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n",
- (unsigned long long) row_id,
- TALER_amount2s (&au->total_amount));
- GNUNET_assert (GNUNET_OK ==
- TALER_amount_set_zero (old.currency,
- &au->total_amount));
- }
- else
- {
- au->total_amount = tmp;
- }
- }
-
- /* now add the au->total_amount with the (remaining) contribution of
- the current coin to the 'old' value with the current aggregate value */
- {
- struct TALER_Amount tmp;
-
- if (0 >
- TALER_amount_add (&tmp,
- &au->total_amount,
- &old))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Overflow or currency incompatibility during aggregation at %llu\n",
- (unsigned long long) row_id);
- /* Skip this one, but keep going! */
- au->total_amount = old;
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- }
- au->total_amount = tmp;
- }
-
- /* "append" to our list of rows */
- au->additional_rows[au->rows_offset].coin_pub = *coin_pub;
- au->additional_rows[au->rows_offset].row = row_id;
- au->rows_offset++;
- /* insert into aggregation tracking table */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Adding %llu to aggregate %s\n",
- (unsigned long long) row_id,
- TALER_B2S (&au->wtid));
- qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
- &au->wtid,
- row_id);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to add %llu to aggregate %s: %d\n",
- (unsigned long long) row_id,
- TALER_B2S (&au->wtid),
- qs);
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- qs = db_plugin->mark_deposit_done (db_plugin->cls,
- coin_pub,
- row_id);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator marked deposit %llu as DONE\n",
- (unsigned long long) row_id);
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-}
-
-
-/**
* Perform a database commit. If it fails, print a warning.
*
* @return status of commit
@@ -727,10 +359,17 @@ run_aggregation (void *cls)
struct Shard *s = cls;
struct AggregationUnit au_active;
enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount trans;
+ bool have_transient;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for ready deposits to aggregate\n");
+ /* make sure we have current fees */
+ memset (&au_active,
+ 0,
+ sizeof (au_active));
+ au_active.execution_time = GNUNET_TIME_timestamp_get ();
if (GNUNET_OK !=
db_plugin->start_deferred_wire_out (db_plugin->cls))
{
@@ -741,16 +380,13 @@ run_aggregation (void *cls)
release_shard (s);
return;
}
- memset (&au_active,
- 0,
- sizeof (au_active));
qs = db_plugin->get_ready_deposit (
db_plugin->cls,
s->shard_start,
s->shard_end,
kyc_off ? true : false,
- &deposit_cb,
- &au_active);
+ &au_active.merchant_pub,
+ &au_active.payto_uri);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
@@ -808,22 +444,98 @@ run_aggregation (void *cls)
/* continued below */
break;
}
+ au_active.wa = TALER_EXCHANGEDB_find_account_by_payto_uri (
+ au_active.payto_uri);
+ if (NULL == au_active.wa)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "No exchange account configured for `%s', please fix your setup to continue!\n",
+ au_active.payto_uri);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
+ return;
+ }
+
+ {
+ struct GNUNET_TIME_Timestamp start_date;
+ struct GNUNET_TIME_Timestamp end_date;
+ struct TALER_MasterSignatureP master_sig;
+
+ qs = db_plugin->get_wire_fee (db_plugin->cls,
+ au_active.wa->method,
+ au_active.execution_time,
+ &start_date,
+ &end_date,
+ &au_active.fees,
+ &master_sig);
+ if (0 >= qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Could not get wire fees for %s at %s. Aborting run.\n",
+ au_active.wa->method,
+ GNUNET_TIME_timestamp2s (au_active.execution_time));
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
+ return;
+ }
+ }
+
/* Now try to find other deposits to aggregate */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found ready deposit for %s, aggregating by target %llu\n",
+ "Found ready deposit for %s, aggregating by target %s\n",
TALER_B2S (&au_active.merchant_pub),
- (unsigned long long) au_active.wire_target);
- qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
- &au_active.h_payto,
- &au_active.merchant_pub,
- &aggregate_cb,
- &au_active,
- TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT);
+ au_active.payto_uri);
+ TALER_payto_hash (au_active.payto_uri,
+ &au_active.h_payto);
+
+ qs = db_plugin->select_aggregation_transient (db_plugin->cls,
+ &au_active.h_payto,
+ au_active.wa->section_name,
+ &au_active.wtid,
+ &trans);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to lookup transient aggregates!\n");
+ cleanup_au (&au_active);
+ db_plugin->rollback (db_plugin->cls);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* serializiability issue, try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Serialization issue, trying again later!\n");
+ db_plugin->rollback (db_plugin->cls);
+ cleanup_au (&au_active);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ s);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &au_active.wtid,
+ sizeof (au_active.wtid));
+ have_transient = false;
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ have_transient = true;
+ break;
+ }
+ qs = db_plugin->aggregate (db_plugin->cls,
+ &au_active.h_payto,
+ &au_active.merchant_pub,
+ &au_active.wtid,
+ &au_active.total_amount);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to execute deposit iteration!\n");
+ "Failed to execute aggregation!\n");
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
@@ -844,13 +556,17 @@ run_aggregation (void *cls)
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found %d other deposits to combine into wire transfer with fee %s.\n",
- qs,
- TALER_amount2s (&au_active.fees.wire));
+ "Aggregation total is %s.\n",
+ TALER_amount2s (&au_active.total_amount));
/* Subtract wire transfer fee and round to the unit supported by the
wire transfer method; Check if after rounding down, we still have
an amount to transfer, and if not mark as 'tiny'. */
+ if (have_transient)
+ GNUNET_assert (0 <=
+ TALER_amount_add (&au_active.total_amount,
+ &au_active.total_amount,
+ &trans));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Rounding aggregate of %s\n",
TALER_amount2s (&au_active.total_amount));
@@ -867,45 +583,17 @@ run_aggregation (void *cls)
"Aggregate value too low for transfer (%d/%s)\n",
qs,
TALER_amount2s (&au_active.final_amount));
- /* Rollback ongoing transaction, as we will not use the respective
- WTID and thus need to remove the tracking data */
- db_plugin->rollback (db_plugin->cls);
-
- /* There were results, just the value was too low. Start another
- transaction to mark all* of the selected deposits as minor! */
- if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- "aggregator mark tiny transactions"))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = EXIT_FAILURE;
- cleanup_au (&au_active);
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
- }
- /* Mark transactions by row_id as minor */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Marking %s (%llu) as tiny\n",
- TALER_B2S (&au_active.coin_pub),
- (unsigned long long) au_active.row_id);
- qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- &au_active.coin_pub,
- au_active.row_id);
- if (0 < qs)
- {
- for (unsigned int i = 0; i<au_active.rows_offset; i++)
- {
- qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- &au_active.additional_rows[i].
- coin_pub,
- au_active.additional_rows[i].row);
- if (0 >= qs)
- break;
- }
- }
- GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs);
+ if (have_transient)
+ qs = db_plugin->update_aggregation_transient (db_plugin->cls,
+ &au_active.h_payto,
+ &au_active.wtid,
+ &au_active.total_amount);
+ else
+ qs = db_plugin->create_aggregation_transient (db_plugin->cls,
+ &au_active.h_payto,
+ au_active.wa->section_name,
+ &au_active.wtid,
+ &au_active.total_amount);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -962,8 +650,7 @@ run_aggregation (void *cls)
buf_size);
GNUNET_free (buf);
}
- /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
- table constraints */
+ /* Commit the WTID data to 'wire_out' */
if (qs >= 0)
qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
au_active.execution_time,
@@ -971,6 +658,12 @@ run_aggregation (void *cls)
&au_active.h_payto,
au_active.wa->section_name,
&au_active.final_amount);
+
+ if ( (qs >= 0) &&
+ have_transient)
+ qs = db_plugin->delete_aggregation_transient (db_plugin->cls,
+ &au_active.h_payto,
+ &au_active.wtid);
cleanup_au (&au_active);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index 1709f17e4..36a5e48b2 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1160,29 +1160,17 @@ prepare_statements (struct PostgresClosure *pg)
GNUNET_PQ_make_prepare (
"deposits_get_ready",
"SELECT"
- " dep.deposit_serial_id"
- ",amount_with_fee_val"
- ",amount_with_fee_frac"
- ",denom.fee_deposit_val"
- ",denom.fee_deposit_frac"
- ",h_contract_terms"
- ",payto_uri"
- ",wire_target_serial_id"
+ " payto_uri"
",merchant_pub"
- ",kc.coin_pub"
" FROM deposits_by_ready dbr"
" JOIN deposits dep"
" ON (dbr.coin_pub = dep.coin_pub AND dbr.deposit_serial_id = dep.deposit_serial_id)"
- " JOIN wire_targets "
+ " JOIN wire_targets wt"
" USING (wire_target_h_payto)"
- " JOIN known_coins kc"
- " ON (kc.coin_pub = dep.coin_pub)"
- " JOIN denominations denom"
- " USING (denominations_serial)"
" WHERE dbr.wire_deadline<=$1"
" AND dbr.shard >= $2"
" AND dbr.shard <= $3"
- " AND (kyc_ok OR $4)"
+ " AND (wt.kyc_ok OR $4)"
" ORDER BY "
" dbr.wire_deadline ASC"
" ,dbr.shard ASC"
@@ -1218,22 +1206,23 @@ prepare_statements (struct PostgresClosure *pg)
/* Used in #postgres_aggregate() */
GNUNET_PQ_make_prepare (
"aggregate",
- "WITH rdy AS (" /* find deposits ready */
+ "WITH rdy AS (" /* find deposits ready by merchant */
" SELECT"
" coin_pub"
" FROM deposits_for_matching"
- " WHERE refund_deadline<$1"
- " AND merchant_pub=$2"
+ " WHERE refund_deadline<$1" /* filter by shard, only actually executable deposits */
+ " AND merchant_pub=$2" /* filter by target merchant */
" ORDER BY refund_deadline ASC" /* ordering is not critical */
" LIMIT "
- TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
+ TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) /* limits transaction size */
" )"
- " ,dep AS (" /* restrict to our merchant and account */
+ " ,dep AS (" /* restrict to our merchant and account and mark as done */
" UPDATE deposits"
" SET done=TRUE"
" WHERE coin_pub IN (SELECT coin_pub FROM rdy)"
- " AND merchant_pub=$2"
- " AND wire_target_h_payto=$3"
+ " AND merchant_pub=$2" /* theoretically, same coin could be spent at another merchant */
+ " AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */
+ " AND done=FALSE" /* theoretically, same coin could be spend at the same merchant a 2nd time */
" RETURNING"
" deposit_serial_id"
" ,coin_pub"
@@ -1244,18 +1233,26 @@ prepare_statements (struct PostgresClosure *pg)
" amount_with_fee_val AS refund_val"
" ,amount_with_fee_frac AS refund_frac"
" ,coin_pub"
+ " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
" FROM refunds"
" WHERE coin_pub IN (SELECT coin_pub FROM dep)"
" AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))"
+ " ,coins_with_fees AS (" /* find coins for which deposit fees apply */
+ " SELECT"
+ " coin_pub"
+ " ,deposit_serial_id" /* ensures that if the same coin is deposited twice, it is in the list twice */
+ " FROM dep"
+ " WHERE deposit_serial_id NOT IN (SELECT deposit_serial_id FROM ref))"
" ,fees AS (" /* find deposit fees for non-refunded deposits */
" SELECT"
" denom.fee_deposit_val AS fee_val"
" ,denom.fee_deposit_frac AS fee_frac"
- " FROM known_coins kc"
+ " ,cs.deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */
+ " FROM coins_with_fees cs"
+ " JOIN known_coins kc"
+ " USING (coin_pub)"
" JOIN denominations denom"
- " USING (denominations_serial)"
- " WHERE coin_pub IN (SELECT coin_pub FROM dep)"
- " AND coin_pub NOT IN (SELECT coin_pub FROM ref))"
+ " USING (denominations_serial))"
" ,dummy AS (" /* add deposits to aggregation_tracking */
" INSERT INTO aggregation_tracking"
" (deposit_serial_id"
@@ -1263,14 +1260,14 @@ prepare_statements (struct PostgresClosure *pg)
" SELECT deposit_serial_id,$4"
" FROM dep)"
"SELECT" /* calculate totals (deposits, refunds and fees) */
- " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value"
- " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction"
+ " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" /* cast needed, otherwise we get NUMBER */
+ " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */
" ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value"
" ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction"
" ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value"
" ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction"
" FROM dep "
- " FULL OUTER JOIN ref ON (FALSE)"
+ " FULL OUTER JOIN ref ON (FALSE)" /* We just want all sums */
" FULL OUTER JOIN fees ON (FALSE);",
4),
@@ -6270,8 +6267,8 @@ postgres_mark_deposit_done (void *cls,
* @param end_shard_row maximum shard row to select (inclusive)
* @param kyc_off true if we should not check the KYC status because
* this exchange does not need/support KYC checks.
- * @param deposit_cb function to call for ONE such deposit
- * @param deposit_cb_cls closure for @a deposit_cb
+ * @param[out] merchant_pub set to the public key of a merchant with a ready deposit
+ * @param[out] payto_uri set to the account of the merchant, to be freed by caller
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
@@ -6279,8 +6276,8 @@ postgres_get_ready_deposit (void *cls,
uint64_t start_shard_row,
uint64_t end_shard_row,
bool kyc_off,
- TALER_EXCHANGEDB_DepositIterator deposit_cb,
- void *deposit_cb_cls)
+ struct TALER_MerchantPublicKeyP *merchant_pub,
+ char **payto_uri)
{
struct PostgresClosure *pg = cls;
struct GNUNET_TIME_Absolute now = {0};
@@ -6291,34 +6288,13 @@ postgres_get_ready_deposit (void *cls,
GNUNET_PQ_query_param_bool (kyc_off),
GNUNET_PQ_query_param_end
};
- struct TALER_Amount amount_with_fee;
- struct TALER_Amount deposit_fee;
- struct TALER_PrivateContractHashP h_contract_terms;
- struct TALER_MerchantPublicKeyP merchant_pub;
- struct TALER_CoinSpendPublicKeyP coin_pub;
- uint64_t serial_id;
- uint64_t wire_target;
- char *payto_uri;
struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
- &serial_id),
- GNUNET_PQ_result_spec_uint64 ("wire_target_serial_id",
- &wire_target),
- TALER_PQ_RESULT_SPEC_AMOUNT ("amount_with_fee",
- &amount_with_fee),
- TALER_PQ_RESULT_SPEC_AMOUNT ("fee_deposit",
- &deposit_fee),
- GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
- &h_contract_terms),
GNUNET_PQ_result_spec_auto_from_type ("merchant_pub",
- &merchant_pub),
- GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
- &coin_pub),
+ merchant_pub),
GNUNET_PQ_result_spec_string ("payto_uri",
- &payto_uri),
+ payto_uri),
GNUNET_PQ_result_spec_end
};
- enum GNUNET_DB_QueryStatus qs;
now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
pg->aggregator_shift);
@@ -6328,25 +6304,10 @@ postgres_get_ready_deposit (void *cls,
"Finding ready deposits by deadline %s (%llu)\n",
GNUNET_TIME_absolute2s (now),
(unsigned long long) now.abs_value_us);
-
- qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
- "deposits_get_ready",
- params,
- rs);
- if (qs <= 0)
- return qs;
-
- qs = deposit_cb (deposit_cb_cls,
- serial_id,
- &merchant_pub,
- &coin_pub,
- &amount_with_fee,
- &deposit_fee,
- &h_contract_terms,
- wire_target,
- payto_uri);
- GNUNET_PQ_cleanup_result (rs);
- return qs;
+ return GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "deposits_get_ready",
+ params,
+ rs);
}
diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h
index 4ca6905e0..06810a7de 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -3097,8 +3097,8 @@ struct TALER_EXCHANGEDB_Plugin
* @param end_shard_row maximum shard row to select (inclusive)
* @param kyc_off true if we should not check the KYC status because
* this exchange does not need/support KYC checks.
- * @param deposit_cb function to call for ONE such deposit
- * @param deposit_cb_cls closure for @a deposit_cb
+ * @param[out] merchant_pub set to the public key of a merchant with a ready deposit
+ * @param[out] payto_uri set to the account of the merchant, to be freed by caller
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
@@ -3106,8 +3106,8 @@ struct TALER_EXCHANGEDB_Plugin
uint64_t start_shard_row,
uint64_t end_shard_row,
bool kyc_off,
- TALER_EXCHANGEDB_DepositIterator deposit_cb,
- void *deposit_cb_cls);
+ struct TALER_MerchantPublicKeyP *merchant_pub,
+ char **payto_uri);
/**