aboutsummaryrefslogtreecommitdiff
path: root/src/exchangedb/plugin_exchangedb_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb/plugin_exchangedb_postgres.c')
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c360
1 files changed, 351 insertions, 9 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index c7bdae397..1709f17e4 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1188,7 +1188,7 @@ prepare_statements (struct PostgresClosure *pg)
" ,dbr.shard ASC"
" LIMIT 1;",
4),
- /* Used in #postgres_iterate_matching_deposits() */
+ /* FIXME: deprecated; Used in #postgres_iterate_matching_deposits() */
GNUNET_PQ_make_prepare (
"deposits_iterate_matching",
"SELECT"
@@ -1207,14 +1207,115 @@ prepare_statements (struct PostgresClosure *pg)
" JOIN denominations denom"
" USING (denominations_serial)"
" WHERE dfm.refund_deadline<$3"
- " AND dfm.shard=$4"
+ " AND dfm.merchant_pub=$1"
" AND dep.merchant_pub=$1"
" AND dep.wire_target_h_payto=$2"
" LIMIT "
TALER_QUOTE (
TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";",
+ 3),
+
+ /* Used in #postgres_aggregate() */
+ GNUNET_PQ_make_prepare (
+ "aggregate",
+ "WITH rdy AS (" /* find deposits ready */
+ " SELECT"
+ " coin_pub"
+ " FROM deposits_for_matching"
+ " WHERE refund_deadline<$1"
+ " AND merchant_pub=$2"
+ " ORDER BY refund_deadline ASC" /* ordering is not critical */
+ " LIMIT "
+ TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
+ " )"
+ " ,dep AS (" /* restrict to our merchant and account */
+ " UPDATE deposits"
+ " SET done=TRUE"
+ " WHERE coin_pub IN (SELECT coin_pub FROM rdy)"
+ " AND merchant_pub=$2"
+ " AND wire_target_h_payto=$3"
+ " RETURNING"
+ " deposit_serial_id"
+ " ,coin_pub"
+ " ,amount_with_fee_val AS amount_val"
+ " ,amount_with_fee_frac AS amount_frac)"
+ " ,ref AS (" /* find applicable refunds */
+ " SELECT"
+ " amount_with_fee_val AS refund_val"
+ " ,amount_with_fee_frac AS refund_frac"
+ " ,coin_pub"
+ " FROM refunds"
+ " WHERE coin_pub IN (SELECT coin_pub FROM dep)"
+ " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))"
+ " ,fees AS (" /* find deposit fees for non-refunded deposits */
+ " SELECT"
+ " denom.fee_deposit_val AS fee_val"
+ " ,denom.fee_deposit_frac AS fee_frac"
+ " FROM known_coins kc"
+ " JOIN denominations denom"
+ " USING (denominations_serial)"
+ " WHERE coin_pub IN (SELECT coin_pub FROM dep)"
+ " AND coin_pub NOT IN (SELECT coin_pub FROM ref))"
+ " ,dummy AS (" /* add deposits to aggregation_tracking */
+ " INSERT INTO aggregation_tracking"
+ " (deposit_serial_id"
+ " ,wtid_raw)"
+ " SELECT deposit_serial_id,$4"
+ " FROM dep)"
+ "SELECT" /* calculate totals (deposits, refunds and fees) */
+ " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value"
+ " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction"
+ " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value"
+ " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction"
+ " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value"
+ " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction"
+ " FROM dep "
+ " FULL OUTER JOIN ref ON (FALSE)"
+ " FULL OUTER JOIN fees ON (FALSE);",
+ 4),
+
+
+ /* Used in #postgres_create_aggregation_transient() */
+ GNUNET_PQ_make_prepare (
+ "create_aggregation_transient",
+ "INSERT INTO aggregation_transient"
+ " (amount_val"
+ " ,amount_frac"
+ " ,wire_target_h_payto"
+ " ,exchange_account_section"
+ " ,wtid_raw)"
+ " VALUES ($1, $2, $3, $4, $5);",
+ 5),
+ /* Used in #postgres_select_aggregation_transient() */
+ GNUNET_PQ_make_prepare (
+ "select_aggregation_transient",
+ "SELECT"
+ " amount_val"
+ " ,amount_frac"
+ " ,wtid_raw"
+ " FROM aggregation_transient"
+ " WHERE wire_target_h_payto=$1"
+ " AND exchange_account_section=$2;",
+ 2),
+ /* Used in #postgres_update_aggregation_transient() */
+ GNUNET_PQ_make_prepare (
+ "update_aggregation_transient",
+ "UPDATE aggregation_transient"
+ " SET amount_val=$1"
+ " ,amount_frac=$2"
+ " WHERE wire_target_h_payto=$3"
+ " AND wtid_raw=$4",
4),
- /* Used in #postgres_mark_deposit_tiny() */
+ /* Used in #postgres_delete_aggregation_transient() */
+ GNUNET_PQ_make_prepare (
+ "delete_aggregation_transient",
+ "DELETE FROM aggregation_transient"
+ " WHERE wire_target_h_payto=$1"
+ " AND wtid_raw=$2",
+ 2),
+
+
+ /* FIXME-deprecated: Used in #postgres_mark_deposit_tiny() */
GNUNET_PQ_make_prepare (
"mark_deposit_tiny",
"UPDATE deposits"
@@ -1222,7 +1323,7 @@ prepare_statements (struct PostgresClosure *pg)
" WHERE coin_pub=$1"
" AND deposit_serial_id=$2",
2),
- /* Used in #postgres_mark_deposit_done() */
+ /* FIXME-deprecated: Used in #postgres_mark_deposit_done() */
GNUNET_PQ_make_prepare (
"mark_deposit_done",
"UPDATE deposits"
@@ -1230,6 +1331,7 @@ prepare_statements (struct PostgresClosure *pg)
" WHERE coin_pub=$1"
" AND deposit_serial_id=$2;",
2),
+
/* Used in #postgres_get_coin_transactions() to obtain information
about how a coin has been spend with /deposit requests. */
GNUNET_PQ_make_prepare (
@@ -2835,8 +2937,8 @@ prepare_statements (struct PostgresClosure *pg)
GNUNET_PQ_make_prepare (
"insert_into_table_refunds",
"INSERT INTO refunds"
- "(coin_pub"
- ",refund_serial_id"
+ "(refund_serial_id"
+ ",coin_pub"
",merchant_sig"
",rtransaction_id"
",amount_with_fee_val"
@@ -5869,6 +5971,240 @@ postgres_have_deposit2 (
/**
+ * Aggregate all matching deposits for @a h_payto and
+ * @a merchant_pub, returning the total amounts.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param merchant_pub public key of the merchant
+ * @param wtid wire transfer ID to set for the aggregate
+ * @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_aggregate (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_TIME_Absolute now = {0};
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_auto_from_type (merchant_pub),
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+ uint64_t sum_deposit_value;
+ uint64_t sum_deposit_frac;
+ uint64_t sum_refund_value;
+ uint64_t sum_refund_frac;
+ uint64_t sum_fee_value;
+ uint64_t sum_fee_frac;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
+ &sum_deposit_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
+ &sum_deposit_frac),
+ GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
+ &sum_refund_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
+ &sum_refund_frac),
+ GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
+ &sum_fee_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
+ &sum_fee_frac),
+ GNUNET_PQ_result_spec_end
+ };
+ enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount sum_deposit;
+ struct TALER_Amount sum_refund;
+ struct TALER_Amount sum_fee;
+ struct TALER_Amount delta;
+
+ now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
+ pg->aggregator_shift);
+ qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "aggregate",
+ params,
+ rs);
+ if (qs < 0)
+ {
+ GNUNET_assert (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ total));
+ return qs;
+ }
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_deposit));
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_refund));
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_fee));
+ sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_deposit_value;
+ sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
+ sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_refund_value;
+ sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE;
+ sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_fee_value;
+ sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \
+ GNUNET_assert (0 <=
+ TALER_amount_subtract (&delta,
+ &sum_deposit,
+ &sum_refund));
+ GNUNET_assert (0 <=
+ TALER_amount_subtract (total,
+ &delta,
+ &sum_fee));
+ return qs;
+}
+
+
+/**
+ * Create a new entry in the transient aggregation table.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param exchange_account_section exchange account to use
+ * @param wtid the raw wire transfer identifier to be used
+ * @param total amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_create_aggregation_transient (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const char *exchange_account_section,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_amount (total),
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_string (exchange_account_section),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "create_aggregation_transient",
+ params);
+}
+
+
+/**
+ * Find existing entry in the transient aggregation table.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param exchange_account_section exchange account to use
+ * @param[out] wtid set to the raw wire transfer identifier to be used
+ * @param[out] total existing amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_select_aggregation_transient (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const char *exchange_account_section,
+ struct TALER_WireTransferIdentifierRawP *wtid,
+ struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_string (exchange_account_section),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ TALER_PQ_RESULT_SPEC_AMOUNT ("amount",
+ total),
+ GNUNET_PQ_result_spec_auto_from_type ("wtid_raw",
+ wtid),
+ GNUNET_PQ_result_spec_end
+ };
+
+ return GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "select_aggregation_transient",
+ params,
+ rs);
+}
+
+
+/**
+ * Update existing entry in the transient aggregation table.
+ * @a h_payto is only needed for query performance.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param wtid the raw wire transfer identifier to update
+ * @param total new total amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_update_aggregation_transient (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_amount (total),
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "update_aggregation_transient",
+ params);
+}
+
+
+/**
+ * Delete existing entry in the transient aggregation table.
+ * @a h_payto is only needed for query performance.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param wtid the raw wire transfer identifier to update
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_delete_aggregation_transient (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_WireTransferIdentifierRawP *wtid)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "delete_aggregation_transient",
+ params);
+}
+
+
+/**
* Mark a deposit as tiny, thereby declaring that it cannot be
* executed by itself and should no longer be returned by
* @e iterate_ready_deposits()
@@ -6147,12 +6483,10 @@ postgres_iterate_matching_deposits (
{
struct PostgresClosure *pg = cls;
struct GNUNET_TIME_Absolute now = {0};
- uint64_t shard = compute_shard (merchant_pub);
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (merchant_pub),
GNUNET_PQ_query_param_auto_from_type (h_payto),
GNUNET_PQ_query_param_absolute_time (&now),
- GNUNET_PQ_query_param_uint64 (&shard),
GNUNET_PQ_query_param_end
};
struct MatchingDepositContext mdc = {
@@ -9299,7 +9633,6 @@ refunds_serial_helper_cb (void *cls,
struct RefundsSerialContext *rsc = cls;
struct PostgresClosure *pg = rsc->pg;
- fprintf (stderr, "Got %u results\n", num_results);
for (unsigned int i = 0; i<num_results; i++)
{
struct TALER_EXCHANGEDB_Refund refund;
@@ -13081,6 +13414,15 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->get_known_coin = &postgres_get_known_coin;
plugin->get_coin_denomination = &postgres_get_coin_denomination;
plugin->have_deposit2 = &postgres_have_deposit2;
+ plugin->aggregate = &postgres_aggregate;
+ plugin->create_aggregation_transient
+ = &postgres_create_aggregation_transient;
+ plugin->select_aggregation_transient
+ = &postgres_select_aggregation_transient;
+ plugin->update_aggregation_transient
+ = &postgres_update_aggregation_transient;
+ plugin->delete_aggregation_transient
+ = &postgres_delete_aggregation_transient;
plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny;
plugin->mark_deposit_done = &postgres_mark_deposit_done;
plugin->get_ready_deposit = &postgres_get_ready_deposit;