diff options
Diffstat (limited to 'src/exchangedb/plugin_exchangedb_postgres.c')
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 360 |
1 files changed, 351 insertions, 9 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index c7bdae397..1709f17e4 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1188,7 +1188,7 @@ prepare_statements (struct PostgresClosure *pg) " ,dbr.shard ASC" " LIMIT 1;", 4), - /* Used in #postgres_iterate_matching_deposits() */ + /* FIXME: deprecated; Used in #postgres_iterate_matching_deposits() */ GNUNET_PQ_make_prepare ( "deposits_iterate_matching", "SELECT" @@ -1207,14 +1207,115 @@ prepare_statements (struct PostgresClosure *pg) " JOIN denominations denom" " USING (denominations_serial)" " WHERE dfm.refund_deadline<$3" - " AND dfm.shard=$4" + " AND dfm.merchant_pub=$1" " AND dep.merchant_pub=$1" " AND dep.wire_target_h_payto=$2" " LIMIT " TALER_QUOTE ( TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";", + 3), + + /* Used in #postgres_aggregate() */ + GNUNET_PQ_make_prepare ( + "aggregate", + "WITH rdy AS (" /* find deposits ready */ + " SELECT" + " coin_pub" + " FROM deposits_for_matching" + " WHERE refund_deadline<$1" + " AND merchant_pub=$2" + " ORDER BY refund_deadline ASC" /* ordering is not critical */ + " LIMIT " + TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) + " )" + " ,dep AS (" /* restrict to our merchant and account */ + " UPDATE deposits" + " SET done=TRUE" + " WHERE coin_pub IN (SELECT coin_pub FROM rdy)" + " AND merchant_pub=$2" + " AND wire_target_h_payto=$3" + " RETURNING" + " deposit_serial_id" + " ,coin_pub" + " ,amount_with_fee_val AS amount_val" + " ,amount_with_fee_frac AS amount_frac)" + " ,ref AS (" /* find applicable refunds */ + " SELECT" + " amount_with_fee_val AS refund_val" + " ,amount_with_fee_frac AS refund_frac" + " ,coin_pub" + " FROM refunds" + " WHERE coin_pub IN (SELECT coin_pub FROM dep)" + " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))" + " ,fees AS (" /* find deposit fees for non-refunded deposits */ + " SELECT" + " denom.fee_deposit_val AS fee_val" + " ,denom.fee_deposit_frac AS fee_frac" + " FROM known_coins kc" + " JOIN denominations denom" + " USING (denominations_serial)" + " WHERE coin_pub IN (SELECT coin_pub FROM dep)" + " AND coin_pub NOT IN (SELECT coin_pub FROM ref))" + " ,dummy AS (" /* add deposits to aggregation_tracking */ + " INSERT INTO aggregation_tracking" + " (deposit_serial_id" + " ,wtid_raw)" + " SELECT deposit_serial_id,$4" + " FROM dep)" + "SELECT" /* calculate totals (deposits, refunds and fees) */ + " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" + " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" + " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value" + " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction" + " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value" + " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction" + " FROM dep " + " FULL OUTER JOIN ref ON (FALSE)" + " FULL OUTER JOIN fees ON (FALSE);", + 4), + + + /* Used in #postgres_create_aggregation_transient() */ + GNUNET_PQ_make_prepare ( + "create_aggregation_transient", + "INSERT INTO aggregation_transient" + " (amount_val" + " ,amount_frac" + " ,wire_target_h_payto" + " ,exchange_account_section" + " ,wtid_raw)" + " VALUES ($1, $2, $3, $4, $5);", + 5), + /* Used in #postgres_select_aggregation_transient() */ + GNUNET_PQ_make_prepare ( + "select_aggregation_transient", + "SELECT" + " amount_val" + " ,amount_frac" + " ,wtid_raw" + " FROM aggregation_transient" + " WHERE wire_target_h_payto=$1" + " AND exchange_account_section=$2;", + 2), + /* Used in #postgres_update_aggregation_transient() */ + GNUNET_PQ_make_prepare ( + "update_aggregation_transient", + "UPDATE aggregation_transient" + " SET amount_val=$1" + " ,amount_frac=$2" + " WHERE wire_target_h_payto=$3" + " AND wtid_raw=$4", 4), - /* Used in #postgres_mark_deposit_tiny() */ + /* Used in #postgres_delete_aggregation_transient() */ + GNUNET_PQ_make_prepare ( + "delete_aggregation_transient", + "DELETE FROM aggregation_transient" + " WHERE wire_target_h_payto=$1" + " AND wtid_raw=$2", + 2), + + + /* FIXME-deprecated: Used in #postgres_mark_deposit_tiny() */ GNUNET_PQ_make_prepare ( "mark_deposit_tiny", "UPDATE deposits" @@ -1222,7 +1323,7 @@ prepare_statements (struct PostgresClosure *pg) " WHERE coin_pub=$1" " AND deposit_serial_id=$2", 2), - /* Used in #postgres_mark_deposit_done() */ + /* FIXME-deprecated: Used in #postgres_mark_deposit_done() */ GNUNET_PQ_make_prepare ( "mark_deposit_done", "UPDATE deposits" @@ -1230,6 +1331,7 @@ prepare_statements (struct PostgresClosure *pg) " WHERE coin_pub=$1" " AND deposit_serial_id=$2;", 2), + /* Used in #postgres_get_coin_transactions() to obtain information about how a coin has been spend with /deposit requests. */ GNUNET_PQ_make_prepare ( @@ -2835,8 +2937,8 @@ prepare_statements (struct PostgresClosure *pg) GNUNET_PQ_make_prepare ( "insert_into_table_refunds", "INSERT INTO refunds" - "(coin_pub" - ",refund_serial_id" + "(refund_serial_id" + ",coin_pub" ",merchant_sig" ",rtransaction_id" ",amount_with_fee_val" @@ -5869,6 +5971,240 @@ postgres_have_deposit2 ( /** + * Aggregate all matching deposits for @a h_payto and + * @a merchant_pub, returning the total amounts. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param merchant_pub public key of the merchant + * @param wtid wire transfer ID to set for the aggregate + * @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_aggregate ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_WireTransferIdentifierRawP *wtid, + struct TALER_Amount *total) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_TIME_Absolute now = {0}; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_auto_from_type (merchant_pub), + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_end + }; + uint64_t sum_deposit_value; + uint64_t sum_deposit_frac; + uint64_t sum_refund_value; + uint64_t sum_refund_frac; + uint64_t sum_fee_value; + uint64_t sum_fee_frac; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("sum_deposit_value", + &sum_deposit_value), + GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction", + &sum_deposit_frac), + GNUNET_PQ_result_spec_uint64 ("sum_refund_value", + &sum_refund_value), + GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction", + &sum_refund_frac), + GNUNET_PQ_result_spec_uint64 ("sum_fee_value", + &sum_fee_value), + GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction", + &sum_fee_frac), + GNUNET_PQ_result_spec_end + }; + enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount sum_deposit; + struct TALER_Amount sum_refund; + struct TALER_Amount sum_fee; + struct TALER_Amount delta; + + now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (), + pg->aggregator_shift); + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "aggregate", + params, + rs); + if (qs < 0) + { + GNUNET_assert (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + total)); + return qs; + } + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + &sum_deposit)); + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + &sum_refund)); + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + &sum_fee)); + sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE + + sum_deposit_value; + sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE; + sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE + + sum_refund_value; + sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE; + sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE + + sum_fee_value; + sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \ + GNUNET_assert (0 <= + TALER_amount_subtract (&delta, + &sum_deposit, + &sum_refund)); + GNUNET_assert (0 <= + TALER_amount_subtract (total, + &delta, + &sum_fee)); + return qs; +} + + +/** + * Create a new entry in the transient aggregation table. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param exchange_account_section exchange account to use + * @param wtid the raw wire transfer identifier to be used + * @param total amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_create_aggregation_transient ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const char *exchange_account_section, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_Amount *total) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + TALER_PQ_query_param_amount (total), + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_string (exchange_account_section), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_end + }; + + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "create_aggregation_transient", + params); +} + + +/** + * Find existing entry in the transient aggregation table. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param exchange_account_section exchange account to use + * @param[out] wtid set to the raw wire transfer identifier to be used + * @param[out] total existing amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_select_aggregation_transient ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const char *exchange_account_section, + struct TALER_WireTransferIdentifierRawP *wtid, + struct TALER_Amount *total) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_string (exchange_account_section), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + TALER_PQ_RESULT_SPEC_AMOUNT ("amount", + total), + GNUNET_PQ_result_spec_auto_from_type ("wtid_raw", + wtid), + GNUNET_PQ_result_spec_end + }; + + return GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "select_aggregation_transient", + params, + rs); +} + + +/** + * Update existing entry in the transient aggregation table. + * @a h_payto is only needed for query performance. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param wtid the raw wire transfer identifier to update + * @param total new total amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_update_aggregation_transient ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_Amount *total) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + TALER_PQ_query_param_amount (total), + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_end + }; + + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "update_aggregation_transient", + params); +} + + +/** + * Delete existing entry in the transient aggregation table. + * @a h_payto is only needed for query performance. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param wtid the raw wire transfer identifier to update + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_delete_aggregation_transient ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_WireTransferIdentifierRawP *wtid) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_end + }; + + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "delete_aggregation_transient", + params); +} + + +/** * Mark a deposit as tiny, thereby declaring that it cannot be * executed by itself and should no longer be returned by * @e iterate_ready_deposits() @@ -6147,12 +6483,10 @@ postgres_iterate_matching_deposits ( { struct PostgresClosure *pg = cls; struct GNUNET_TIME_Absolute now = {0}; - uint64_t shard = compute_shard (merchant_pub); struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (merchant_pub), GNUNET_PQ_query_param_auto_from_type (h_payto), GNUNET_PQ_query_param_absolute_time (&now), - GNUNET_PQ_query_param_uint64 (&shard), GNUNET_PQ_query_param_end }; struct MatchingDepositContext mdc = { @@ -9299,7 +9633,6 @@ refunds_serial_helper_cb (void *cls, struct RefundsSerialContext *rsc = cls; struct PostgresClosure *pg = rsc->pg; - fprintf (stderr, "Got %u results\n", num_results); for (unsigned int i = 0; i<num_results; i++) { struct TALER_EXCHANGEDB_Refund refund; @@ -13081,6 +13414,15 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) plugin->get_known_coin = &postgres_get_known_coin; plugin->get_coin_denomination = &postgres_get_coin_denomination; plugin->have_deposit2 = &postgres_have_deposit2; + plugin->aggregate = &postgres_aggregate; + plugin->create_aggregation_transient + = &postgres_create_aggregation_transient; + plugin->select_aggregation_transient + = &postgres_select_aggregation_transient; + plugin->update_aggregation_transient + = &postgres_update_aggregation_transient; + plugin->delete_aggregation_transient + = &postgres_delete_aggregation_transient; plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny; plugin->mark_deposit_done = &postgres_mark_deposit_done; plugin->get_ready_deposit = &postgres_get_ready_deposit; |