diff options
-rw-r--r-- | src/exchangedb/drop0001.sql | 2 | ||||
-rw-r--r-- | src/exchangedb/exchange-0001.sql | 69 | ||||
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 360 | ||||
-rw-r--r-- | src/exchangedb/test_exchangedb.c | 166 | ||||
-rw-r--r-- | src/include/taler_exchangedb_plugin.h | 92 |
5 files changed, 582 insertions, 107 deletions
diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql index 225c817a0..3f43a5693 100644 --- a/src/exchangedb/drop0001.sql +++ b/src/exchangedb/drop0001.sql @@ -82,9 +82,9 @@ DROP TABLE IF EXISTS denominations CASCADE; DROP TABLE IF EXISTS cs_nonce_locks CASCADE; DROP FUNCTION IF EXISTS add_constraints_to_cs_nonce_locks_partition; -DROP TABLE IF EXISTS deposits_by_coin CASCADE; DROP TABLE IF EXISTS global_fee CASCADE; DROP TABLE IF EXISTS recoup_by_reserve CASCADE; +DROP TABLE IF EXISTS aggregation_transient CASCADE; DROP TABLE IF EXISTS partners CASCADE; diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql index b2fb52ac2..e6902ed1c 100644 --- a/src/exchangedb/exchange-0001.sql +++ b/src/exchangedb/exchange-0001.sql @@ -772,7 +772,7 @@ CREATE TABLE IF NOT EXISTS deposits_by_ready_default CREATE TABLE IF NOT EXISTS deposits_for_matching (refund_deadline INT8 NOT NULL - ,shard INT8 NOT NULL + ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32) ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) ON DELETE CASCADE ,deposit_serial_id INT8 ) @@ -782,7 +782,7 @@ COMMENT ON TABLE deposits_for_matching CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index ON deposits_for_matching - (refund_deadline ASC, shard, coin_pub); + (refund_deadline ASC, merchant_pub, coin_pub); CREATE TABLE IF NOT EXISTS deposits_for_matching_default PARTITION OF deposits_for_matching @@ -818,12 +818,12 @@ BEGIN THEN INSERT INTO deposits_for_matching (refund_deadline - ,shard + ,merchant_pub ,coin_pub ,deposit_serial_id) VALUES (NEW.refund_deadline - ,NEW.shard + ,NEW.merchant_pub ,NEW.coin_pub ,NEW.deposit_serial_id); END IF; @@ -866,7 +866,7 @@ BEGIN THEN DELETE FROM deposits_for_matching WHERE refund_deadline = OLD.refund_deadline - AND shard = OLD.shard + AND merchant_pub = OLD.merchant_pub AND coin_pub = OLD.coin_pub AND deposit_serial_id = OLD.deposit_serial_id; END IF; @@ -887,12 +887,12 @@ BEGIN THEN INSERT INTO deposits_for_matching (refund_deadline - ,shard + ,merchant_pub ,coin_pub ,deposit_serial_id) VALUES (NEW.refund_deadline - ,NEW.shard + ,NEW.merchant_pub ,NEW.coin_pub ,NEW.deposit_serial_id); END IF; @@ -930,7 +930,7 @@ BEGIN THEN DELETE FROM deposits_for_matching WHERE refund_deadline = OLD.refund_deadline - AND shard = OLD.shard + AND merchant_pub = OLD.merchant_pub AND coin_pub = OLD.coin_pub AND deposit_serial_id = OLD.deposit_serial_id; END IF; @@ -1040,21 +1040,64 @@ $$; SELECT add_constraints_to_wire_out_partition('default'); +CREATE OR REPLACE FUNCTION wire_out_delete_trigger() + RETURNS trigger + LANGUAGE plpgsql + AS $$ +BEGIN + DELETE FROM aggregation_tracking + WHERE wtid_raw = OLD.wtid_raw; + RETURN OLD; +END $$; +COMMENT ON FUNCTION wire_out_delete_trigger() + IS 'Replicate reserve_out deletions into aggregation_tracking. This replaces an earlier use of an ON DELETE CASCADE that required a DEFERRABLE constraint and conflicted with nice partitioning.'; + +CREATE TRIGGER wire_out_on_delete + AFTER DELETE + ON wire_out + FOR EACH ROW EXECUTE FUNCTION wire_out_delete_trigger(); + + + +-- ------------------------------ aggregation_transient ---------------------------------------- + +-- Note: this table is not yet used; it is designed +-- to allow us to get rid of the 'tiny BOOL' and +-- the associated need to look at tiny +-- deposits repeatedly. +CREATE TABLE IF NOT EXISTS aggregation_transient + (amount_val INT8 NOT NULL + ,amount_frac INT4 NOT NULL + ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32) + ,exchange_account_section TEXT NOT NULL + ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32) + ) + PARTITION BY HASH (wire_target_h_payto); +COMMENT ON TABLE aggregation_transient + IS 'aggregations currently happening (lacking wire_out, usually because the amount is too low); this table is not replicated'; +COMMENT ON COLUMN aggregation_transient.amount_val + IS 'Sum of all of the aggregated deposits (without deposit fees)'; +COMMENT ON COLUMN aggregation_transient.wtid_raw + IS 'identifier of the wire transfer'; + +CREATE TABLE IF NOT EXISTS aggregation_transient_default + PARTITION OF aggregation_transient + FOR VALUES WITH (MODULUS 1, REMAINDER 0); + + -- ------------------------------ aggregation_tracking ---------------------------------------- --- FIXME-URGENT: add colum coin_pub to select by coin_pub + deposit_serial_id for more efficient deposit lookup!? --- Or which direction(s) is this table used? Is the partitioning sane?? CREATE TABLE IF NOT EXISTS aggregation_tracking (aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE ,deposit_serial_id INT8 PRIMARY KEY -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE - ,wtid_raw BYTEA NOT NULL CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE + ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32) ) PARTITION BY HASH (deposit_serial_id); COMMENT ON TABLE aggregation_tracking IS 'mapping from wire transfer identifiers (WTID) to deposits (and back)'; COMMENT ON COLUMN aggregation_tracking.wtid_raw - IS 'We first create entries in the aggregation_tracking table and then finally the wire_out entry once we know the total amount. Hence the constraint must be deferrable and we cannot use a wireout_uuid here, because we do not have it when these rows are created. Changing the logic to first INSERT a dummy row into wire_out and then UPDATEing that row in the same transaction would theoretically reduce per-deposit storage costs by 5 percent (24/~460 bytes).'; + IS 'identifier of the wire transfer'; CREATE TABLE IF NOT EXISTS aggregation_tracking_default PARTITION OF aggregation_tracking @@ -1070,7 +1113,7 @@ BEGIN EXECUTE FORMAT ( 'ALTER TABLE aggregation_tracking_' || partition_suffix || ' ' 'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key ' - 'UNIQUE (aggregation_serial_id) ' + 'UNIQUE (aggregation_serial_id);' ); END $$; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index c7bdae397..1709f17e4 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1188,7 +1188,7 @@ prepare_statements (struct PostgresClosure *pg) " ,dbr.shard ASC" " LIMIT 1;", 4), - /* Used in #postgres_iterate_matching_deposits() */ + /* FIXME: deprecated; Used in #postgres_iterate_matching_deposits() */ GNUNET_PQ_make_prepare ( "deposits_iterate_matching", "SELECT" @@ -1207,14 +1207,115 @@ prepare_statements (struct PostgresClosure *pg) " JOIN denominations denom" " USING (denominations_serial)" " WHERE dfm.refund_deadline<$3" - " AND dfm.shard=$4" + " AND dfm.merchant_pub=$1" " AND dep.merchant_pub=$1" " AND dep.wire_target_h_payto=$2" " LIMIT " TALER_QUOTE ( TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";", + 3), + + /* Used in #postgres_aggregate() */ + GNUNET_PQ_make_prepare ( + "aggregate", + "WITH rdy AS (" /* find deposits ready */ + " SELECT" + " coin_pub" + " FROM deposits_for_matching" + " WHERE refund_deadline<$1" + " AND merchant_pub=$2" + " ORDER BY refund_deadline ASC" /* ordering is not critical */ + " LIMIT " + TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) + " )" + " ,dep AS (" /* restrict to our merchant and account */ + " UPDATE deposits" + " SET done=TRUE" + " WHERE coin_pub IN (SELECT coin_pub FROM rdy)" + " AND merchant_pub=$2" + " AND wire_target_h_payto=$3" + " RETURNING" + " deposit_serial_id" + " ,coin_pub" + " ,amount_with_fee_val AS amount_val" + " ,amount_with_fee_frac AS amount_frac)" + " ,ref AS (" /* find applicable refunds */ + " SELECT" + " amount_with_fee_val AS refund_val" + " ,amount_with_fee_frac AS refund_frac" + " ,coin_pub" + " FROM refunds" + " WHERE coin_pub IN (SELECT coin_pub FROM dep)" + " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))" + " ,fees AS (" /* find deposit fees for non-refunded deposits */ + " SELECT" + " denom.fee_deposit_val AS fee_val" + " ,denom.fee_deposit_frac AS fee_frac" + " FROM known_coins kc" + " JOIN denominations denom" + " USING (denominations_serial)" + " WHERE coin_pub IN (SELECT coin_pub FROM dep)" + " AND coin_pub NOT IN (SELECT coin_pub FROM ref))" + " ,dummy AS (" /* add deposits to aggregation_tracking */ + " INSERT INTO aggregation_tracking" + " (deposit_serial_id" + " ,wtid_raw)" + " SELECT deposit_serial_id,$4" + " FROM dep)" + "SELECT" /* calculate totals (deposits, refunds and fees) */ + " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" + " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" + " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value" + " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction" + " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value" + " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction" + " FROM dep " + " FULL OUTER JOIN ref ON (FALSE)" + " FULL OUTER JOIN fees ON (FALSE);", + 4), + + + /* Used in #postgres_create_aggregation_transient() */ + GNUNET_PQ_make_prepare ( + "create_aggregation_transient", + "INSERT INTO aggregation_transient" + " (amount_val" + " ,amount_frac" + " ,wire_target_h_payto" + " ,exchange_account_section" + " ,wtid_raw)" + " VALUES ($1, $2, $3, $4, $5);", + 5), + /* Used in #postgres_select_aggregation_transient() */ + GNUNET_PQ_make_prepare ( + "select_aggregation_transient", + "SELECT" + " amount_val" + " ,amount_frac" + " ,wtid_raw" + " FROM aggregation_transient" + " WHERE wire_target_h_payto=$1" + " AND exchange_account_section=$2;", + 2), + /* Used in #postgres_update_aggregation_transient() */ + GNUNET_PQ_make_prepare ( + "update_aggregation_transient", + "UPDATE aggregation_transient" + " SET amount_val=$1" + " ,amount_frac=$2" + " WHERE wire_target_h_payto=$3" + " AND wtid_raw=$4", 4), - /* Used in #postgres_mark_deposit_tiny() */ + /* Used in #postgres_delete_aggregation_transient() */ + GNUNET_PQ_make_prepare ( + "delete_aggregation_transient", + "DELETE FROM aggregation_transient" + " WHERE wire_target_h_payto=$1" + " AND wtid_raw=$2", + 2), + + + /* FIXME-deprecated: Used in #postgres_mark_deposit_tiny() */ GNUNET_PQ_make_prepare ( "mark_deposit_tiny", "UPDATE deposits" @@ -1222,7 +1323,7 @@ prepare_statements (struct PostgresClosure *pg) " WHERE coin_pub=$1" " AND deposit_serial_id=$2", 2), - /* Used in #postgres_mark_deposit_done() */ + /* FIXME-deprecated: Used in #postgres_mark_deposit_done() */ GNUNET_PQ_make_prepare ( "mark_deposit_done", "UPDATE deposits" @@ -1230,6 +1331,7 @@ prepare_statements (struct PostgresClosure *pg) " WHERE coin_pub=$1" " AND deposit_serial_id=$2;", 2), + /* Used in #postgres_get_coin_transactions() to obtain information about how a coin has been spend with /deposit requests. */ GNUNET_PQ_make_prepare ( @@ -2835,8 +2937,8 @@ prepare_statements (struct PostgresClosure *pg) GNUNET_PQ_make_prepare ( "insert_into_table_refunds", "INSERT INTO refunds" - "(coin_pub" - ",refund_serial_id" + "(refund_serial_id" + ",coin_pub" ",merchant_sig" ",rtransaction_id" ",amount_with_fee_val" @@ -5869,6 +5971,240 @@ postgres_have_deposit2 ( /** + * Aggregate all matching deposits for @a h_payto and + * @a merchant_pub, returning the total amounts. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param merchant_pub public key of the merchant + * @param wtid wire transfer ID to set for the aggregate + * @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_aggregate ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_WireTransferIdentifierRawP *wtid, + struct TALER_Amount *total) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_TIME_Absolute now = {0}; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_auto_from_type (merchant_pub), + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_end + }; + uint64_t sum_deposit_value; + uint64_t sum_deposit_frac; + uint64_t sum_refund_value; + uint64_t sum_refund_frac; + uint64_t sum_fee_value; + uint64_t sum_fee_frac; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("sum_deposit_value", + &sum_deposit_value), + GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction", + &sum_deposit_frac), + GNUNET_PQ_result_spec_uint64 ("sum_refund_value", + &sum_refund_value), + GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction", + &sum_refund_frac), + GNUNET_PQ_result_spec_uint64 ("sum_fee_value", + &sum_fee_value), + GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction", + &sum_fee_frac), + GNUNET_PQ_result_spec_end + }; + enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount sum_deposit; + struct TALER_Amount sum_refund; + struct TALER_Amount sum_fee; + struct TALER_Amount delta; + + now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (), + pg->aggregator_shift); + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "aggregate", + params, + rs); + if (qs < 0) + { + GNUNET_assert (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + total)); + return qs; + } + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + &sum_deposit)); + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + &sum_refund)); + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (pg->currency, + &sum_fee)); + sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE + + sum_deposit_value; + sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE; + sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE + + sum_refund_value; + sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE; + sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE + + sum_fee_value; + sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \ + GNUNET_assert (0 <= + TALER_amount_subtract (&delta, + &sum_deposit, + &sum_refund)); + GNUNET_assert (0 <= + TALER_amount_subtract (total, + &delta, + &sum_fee)); + return qs; +} + + +/** + * Create a new entry in the transient aggregation table. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param exchange_account_section exchange account to use + * @param wtid the raw wire transfer identifier to be used + * @param total amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_create_aggregation_transient ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const char *exchange_account_section, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_Amount *total) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + TALER_PQ_query_param_amount (total), + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_string (exchange_account_section), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_end + }; + + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "create_aggregation_transient", + params); +} + + +/** + * Find existing entry in the transient aggregation table. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param exchange_account_section exchange account to use + * @param[out] wtid set to the raw wire transfer identifier to be used + * @param[out] total existing amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_select_aggregation_transient ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const char *exchange_account_section, + struct TALER_WireTransferIdentifierRawP *wtid, + struct TALER_Amount *total) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_string (exchange_account_section), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + TALER_PQ_RESULT_SPEC_AMOUNT ("amount", + total), + GNUNET_PQ_result_spec_auto_from_type ("wtid_raw", + wtid), + GNUNET_PQ_result_spec_end + }; + + return GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "select_aggregation_transient", + params, + rs); +} + + +/** + * Update existing entry in the transient aggregation table. + * @a h_payto is only needed for query performance. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param wtid the raw wire transfer identifier to update + * @param total new total amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_update_aggregation_transient ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_Amount *total) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + TALER_PQ_query_param_amount (total), + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_end + }; + + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "update_aggregation_transient", + params); +} + + +/** + * Delete existing entry in the transient aggregation table. + * @a h_payto is only needed for query performance. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param wtid the raw wire transfer identifier to update + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_delete_aggregation_transient ( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_WireTransferIdentifierRawP *wtid) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_end + }; + + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "delete_aggregation_transient", + params); +} + + +/** * Mark a deposit as tiny, thereby declaring that it cannot be * executed by itself and should no longer be returned by * @e iterate_ready_deposits() @@ -6147,12 +6483,10 @@ postgres_iterate_matching_deposits ( { struct PostgresClosure *pg = cls; struct GNUNET_TIME_Absolute now = {0}; - uint64_t shard = compute_shard (merchant_pub); struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (merchant_pub), GNUNET_PQ_query_param_auto_from_type (h_payto), GNUNET_PQ_query_param_absolute_time (&now), - GNUNET_PQ_query_param_uint64 (&shard), GNUNET_PQ_query_param_end }; struct MatchingDepositContext mdc = { @@ -9299,7 +9633,6 @@ refunds_serial_helper_cb (void *cls, struct RefundsSerialContext *rsc = cls; struct PostgresClosure *pg = rsc->pg; - fprintf (stderr, "Got %u results\n", num_results); for (unsigned int i = 0; i<num_results; i++) { struct TALER_EXCHANGEDB_Refund refund; @@ -13081,6 +13414,15 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) plugin->get_known_coin = &postgres_get_known_coin; plugin->get_coin_denomination = &postgres_get_coin_denomination; plugin->have_deposit2 = &postgres_have_deposit2; + plugin->aggregate = &postgres_aggregate; + plugin->create_aggregation_transient + = &postgres_create_aggregation_transient; + plugin->select_aggregation_transient + = &postgres_select_aggregation_transient; + plugin->update_aggregation_transient + = &postgres_update_aggregation_transient; + plugin->delete_aggregation_transient + = &postgres_delete_aggregation_transient; plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny; plugin->mark_deposit_done = &postgres_mark_deposit_done; plugin->get_ready_deposit = &postgres_get_ready_deposit; diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 79b09e0e1..9e2e8a480 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -675,48 +675,6 @@ deposit_cb (void *cls, /** - * Function called with details about deposits that - * have been made. Called in the test on the - * deposit given in @a cls. - * - * @param cls closure a `struct TALER_EXCHANGEDB_Deposit *` - * @param rowid unique ID for the deposit in our DB, used for marking - * it as 'tiny' or 'done' - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate - */ -static enum GNUNET_DB_QueryStatus -matching_deposit_cb (void *cls, - uint64_t rowid, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct TALER_PrivateContractHashP *h_contract_terms) -{ - struct TALER_EXCHANGEDB_Deposit *deposit = cls; - - deposit_rowid = rowid; - if ( (0 != TALER_amount_cmp (amount_with_fee, - &deposit->amount_with_fee)) || - (0 != TALER_amount_cmp (deposit_fee, - &deposit->deposit_fee)) || - (0 != GNUNET_memcmp (h_contract_terms, - &deposit->h_contract_terms)) || - (0 != GNUNET_memcmp (coin_pub, - &deposit->coin.coin_pub)) ) - { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } - - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -} - - -/** * Callback for #select_deposits_above_serial_id () * * @param cls closure @@ -1055,7 +1013,7 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit *deposit) &h_payto); auditor_row_cnt = 0; memset (&wire_out_wtid, - 42, + 41, sizeof (wire_out_wtid)); wire_out_date = GNUNET_TIME_timestamp_get (); GNUNET_assert (GNUNET_OK == @@ -1109,14 +1067,6 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit *deposit) &coin_fee2, &kyc)); } - /* insert WT data */ - FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != - plugin->insert_aggregation_tracking (plugin->cls, - &wire_out_wtid, - deposit_rowid)); - - /* Now let's fix the transient constraint violation by - putting in the WTID into the wire_out table */ { struct TALER_ReservePublicKeyP rpub; struct TALER_EXCHANGEDB_KycStatus kyc; @@ -2270,44 +2220,92 @@ run (void *cls) &deposit_cb, &deposit)); FAILIF (8 == result); - FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != - plugin->iterate_matching_deposits (plugin->cls, - &wire_target_h_payto, - &deposit.merchant_pub, - &matching_deposit_cb, - &deposit, - 2)); + { + struct TALER_Amount total; + struct TALER_WireTransferIdentifierRawP wtid; + + memset (&wtid, + 41, + sizeof (wtid)); + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->aggregate (plugin->cls, + &wire_target_h_payto, + &deposit.merchant_pub, + &wtid, + &total)); + } FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->commit (plugin->cls)); FAILIF (GNUNET_OK != plugin->start (plugin->cls, - "test-2")); - FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != - plugin->mark_deposit_tiny (plugin->cls, - &deposit.coin.coin_pub, - deposit_rowid)); - FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != - plugin->get_ready_deposit (plugin->cls, - 0, - INT32_MAX, - true, - &deposit_cb, - &deposit)); - plugin->rollback (plugin->cls); - FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != - plugin->get_ready_deposit (plugin->cls, - 0, - INT32_MAX, - true, - &deposit_cb, - &deposit)); - FAILIF (GNUNET_OK != - plugin->start (plugin->cls, "test-3")); - FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != - plugin->mark_deposit_done (plugin->cls, - &deposit.coin.coin_pub, - deposit_rowid)); + { + struct TALER_WireTransferIdentifierRawP wtid; + struct TALER_Amount total; + struct TALER_WireTransferIdentifierRawP wtid2; + struct TALER_Amount total2; + + memset (&wtid, + 42, + sizeof (wtid)); + GNUNET_assert (GNUNET_OK == + TALER_string_to_amount (CURRENCY ":42", + &total)); + FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + plugin->select_aggregation_transient (plugin->cls, + &wire_target_h_payto, + "x-bank", + &wtid2, + &total2)); + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->create_aggregation_transient (plugin->cls, + &wire_target_h_payto, + "x-bank", + &wtid, + &total)); + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->select_aggregation_transient (plugin->cls, + &wire_target_h_payto, + "x-bank", + &wtid2, + &total2)); + FAILIF (0 != + GNUNET_memcmp (&wtid2, + &wtid)); + FAILIF (0 != + TALER_amount_cmp (&total2, + &total)); + GNUNET_assert (GNUNET_OK == + TALER_string_to_amount (CURRENCY ":43", + &total)); + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->update_aggregation_transient (plugin->cls, + &wire_target_h_payto, + &wtid, + &total)); + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->select_aggregation_transient (plugin->cls, + &wire_target_h_payto, + "x-bank", + &wtid2, + &total2)); + FAILIF (0 != + GNUNET_memcmp (&wtid2, + &wtid)); + FAILIF (0 != + TALER_amount_cmp (&total2, + &total)); + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->delete_aggregation_transient (plugin->cls, + &wire_target_h_payto, + &wtid)); + FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + plugin->select_aggregation_transient (plugin->cls, + &wire_target_h_payto, + "x-bank", + &wtid2, + &total2)); + } FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->commit (plugin->cls)); diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index b2ea240e2..4ca6905e0 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -3146,6 +3146,98 @@ struct TALER_EXCHANGEDB_Plugin /** + * Aggregate all matching deposits for @a h_payto and + * @a merchant_pub, returning the total amounts. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param merchant_pub public key of the merchant + * @param wtid wire transfer ID to set for the aggregate + * @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds + * @return transaction status + */ + enum GNUNET_DB_QueryStatus + (*aggregate)( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_WireTransferIdentifierRawP *wtid, + struct TALER_Amount *total); + + + /** + * Create a new entry in the transient aggregation table. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param exchange_account_section exchange account to use + * @param wtid the raw wire transfer identifier to be used + * @param total amount to be wired in the future + * @return transaction status + */ + enum GNUNET_DB_QueryStatus + (*create_aggregation_transient)( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const char *exchange_account_section, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_Amount *total); + + + /** + * Find existing entry in the transient aggregation table. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param exchange_account_section exchange account to use + * @param[out] wtid set to the raw wire transfer identifier to be used + * @param[out] total existing amount to be wired in the future + * @return transaction status + */ + enum GNUNET_DB_QueryStatus + (*select_aggregation_transient)( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const char *exchange_account_section, + struct TALER_WireTransferIdentifierRawP *wtid, + struct TALER_Amount *total); + + + /** + * Update existing entry in the transient aggregation table. + * @a h_payto is only needed for query performance. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param wtid the raw wire transfer identifier to update + * @param total new total amount to be wired in the future + * @return transaction status + */ + enum GNUNET_DB_QueryStatus + (*update_aggregation_transient)( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_Amount *total); + + + /** + * Delete existing entry in the transient aggregation table. + * @a h_payto is only needed for query performance. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param wtid the raw wire transfer identifier to update + * @return transaction status + */ + enum GNUNET_DB_QueryStatus + (*delete_aggregation_transient)( + void *cls, + const struct TALER_PaytoHashP *h_payto, + const struct TALER_WireTransferIdentifierRawP *wtid); + + + /** * Lookup melt commitment data under the given @a rc. * * @param cls the @e cls of this struct with the plugin-specific state |