diff options
author | Christian Grothoff <grothoff@gnunet.org> | 2022-03-24 17:33:29 +0100 |
---|---|---|
committer | Christian Grothoff <grothoff@gnunet.org> | 2022-03-24 17:33:29 +0100 |
commit | b856d56d95f92eb9dedb0af49493350ea8ea2268 (patch) | |
tree | 3490ebf1e069fbe858a3f6cf97b18da8289840ae | |
parent | c782dfe2aadfd06e47ed354c1fb389fecc715433 (diff) |
rework deposits sharding, towards making aggregator faster (not necessarily done)
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 83 | ||||
-rw-r--r-- | src/exchangedb/drop0001.sql | 8 | ||||
-rw-r--r-- | src/exchangedb/exchange-0001.sql | 266 | ||||
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 131 | ||||
-rw-r--r-- | src/exchangedb/test_exchangedb.c | 4 | ||||
-rw-r--r-- | src/include/taler_exchangedb_plugin.h | 8 |
6 files changed, 324 insertions, 176 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index abab347fe..c34d47f9c 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -28,6 +28,18 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" +struct AdditionalDeposit +{ + /** + * Public key of the coin. + */ + struct TALER_CoinSpendPublicKeyP coin_pub; + + /** + * Row of the deposit. + */ + uint64_t row; +}; /** * Information about one aggregation process to be executed. There is @@ -43,6 +55,11 @@ struct AggregationUnit struct TALER_MerchantPublicKeyP merchant_pub; /** + * Public key of the coin. + */ + struct TALER_CoinSpendPublicKeyP coin_pub; + + /** * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ struct TALER_Amount total_amount; @@ -97,7 +114,8 @@ struct AggregationUnit /** * Array of row_ids from the aggregation. */ - uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; + struct AdditionalDeposit + additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; /** * Offset specifying how many @e additional_rows are in use. @@ -383,7 +401,8 @@ deposit_cb (void *cls, enum GNUNET_DB_QueryStatus qs; au->merchant_pub = *merchant_pub; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + au->coin_pub = *coin_pub; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Aggregator processing payment %s with amount %s\n", TALER_B2S (coin_pub), TALER_amount2s (amount_with_fee)); @@ -405,7 +424,7 @@ deposit_cb (void *cls, { struct TALER_Amount ntotal; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Non-refunded transaction, subtracting deposit fee %s\n", TALER_amount2s (deposit_fee)); if (0 > @@ -428,6 +447,9 @@ deposit_cb (void *cls, au->total_amount = ntotal; } } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Amount after fee is %s\n", + TALER_amount2s (&au->total_amount)); GNUNET_assert (NULL == au->payto_uri); au->payto_uri = GNUNET_strdup (payto_uri); @@ -437,7 +459,7 @@ deposit_cb (void *cls, GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &au->wtid, sizeof (au->wtid)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n", TALER_B2S (&au->wtid), TALER_amount2s (amount_with_fee), @@ -493,7 +515,7 @@ deposit_cb (void *cls, "Aggregator marks deposit %llu as done\n", (unsigned long long) row_id); qs = db_plugin->mark_deposit_done (db_plugin->cls, - merchant_pub, + coin_pub, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { @@ -528,6 +550,8 @@ aggregate_cb (void *cls, struct TALER_Amount old; enum GNUNET_DB_QueryStatus qs; + if (row_id == au->row_id) + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) { /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ @@ -605,18 +629,29 @@ aggregate_cb (void *cls, } /* "append" to our list of rows */ - au->additional_rows[au->rows_offset++] = row_id; + au->additional_rows[au->rows_offset].coin_pub = *coin_pub; + au->additional_rows[au->rows_offset].row = row_id; + au->rows_offset++; /* insert into aggregation tracking table */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding %llu to aggregate %s\n", + (unsigned long long) row_id, + TALER_B2S (&au->wtid)); qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, &au->wtid, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to add %llu to aggregate %s: %d\n", + (unsigned long long) row_id, + TALER_B2S (&au->wtid), + qs); GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } qs = db_plugin->mark_deposit_done (db_plugin->cls, - &au->merchant_pub, + coin_pub, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { @@ -775,7 +810,7 @@ run_aggregation (void *cls) } /* Now try to find other deposits to aggregate */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found ready deposit for %s, aggregating by target %llu\n", TALER_B2S (&au_active.merchant_pub), (unsigned long long) au_active.wire_target); @@ -808,13 +843,17 @@ run_aggregation (void *cls) s); return; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Found %d other deposits to combine into wire transfer.\n", - qs); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found %d other deposits to combine into wire transfer with fee %s.\n", + qs, + TALER_amount2s (&au_active.fees.wire)); /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Rounding aggregate of %s\n", + TALER_amount2s (&au_active.total_amount)); if ( (0 >= TALER_amount_subtract (&au_active.final_amount, &au_active.total_amount, @@ -822,8 +861,7 @@ run_aggregation (void *cls) (GNUNET_SYSERR == TALER_amount_round_down (&au_active.final_amount, ¤cy_round_unit)) || - ( (0 == au_active.final_amount.value) && - (0 == au_active.final_amount.fraction) ) ) + (TALER_amount_is_zero (&au_active.final_amount)) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Aggregate value too low for transfer (%d/%s)\n", @@ -848,23 +886,29 @@ run_aggregation (void *cls) return; } /* Mark transactions by row_id as minor */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Marking %s (%llu) as tiny\n", + TALER_B2S (&au_active.coin_pub), + (unsigned long long) au_active.row_id); qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - &au_active.merchant_pub, + &au_active.coin_pub, au_active.row_id); - if (0 <= qs) + if (0 < qs) { for (unsigned int i = 0; i<au_active.rows_offset; i++) { qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - &au_active.merchant_pub, - au_active.additional_rows[i]); - if (0 > qs) + &au_active.additional_rows[i]. + coin_pub, + au_active.additional_rows[i].row); + if (0 >= qs) break; } } + GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serialization issue, trying again later!\n"); db_plugin->rollback (db_plugin->cls); cleanup_au (&au_active); @@ -876,6 +920,7 @@ run_aggregation (void *cls) } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { + GNUNET_break (0); db_plugin->rollback (db_plugin->cls); cleanup_au (&au_active); global_ret = EXIT_FAILURE; diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql index 60acc98a4..225c817a0 100644 --- a/src/exchangedb/drop0001.sql +++ b/src/exchangedb/drop0001.sql @@ -55,6 +55,8 @@ DROP TABLE IF EXISTS wire_targets CASCADE; DROP FUNCTION IF EXISTS add_constraints_to_wire_targets_partition; DROP TABLE IF EXISTS wire_fee CASCADE; DROP TABLE IF EXISTS deposits CASCADE; +DROP TABLE IF EXISTS deposits_by_ready CASCADE; +DROP TABLE IF EXISTS deposits_for_matching CASCADE; DROP FUNCTION IF EXISTS add_constraints_to_deposits_partition; DROP TABLE IF EXISTS extension_details CASCADE; DROP TABLE IF EXISTS refunds CASCADE; @@ -88,6 +90,7 @@ DROP TABLE IF EXISTS recoup_by_reserve CASCADE; DROP TABLE IF EXISTS partners CASCADE; DROP TABLE IF EXISTS account_merges CASCADE; DROP TABLE IF EXISTS purse_merges CASCADE; +DROP TABLE IF EXISTS purse_deposits CASCADE; DROP TABLE IF EXISTS contracts CASCADE; DROP TABLE IF EXISTS history_requests CASCADE; DROP TABLE IF EXISTS close_requests CASCADE; @@ -103,8 +106,9 @@ DROP FUNCTION IF EXISTS exchange_do_withdraw; DROP FUNCTION IF EXISTS exchange_do_withdraw_limit_check; DROP FUNCTION IF EXISTS recoup_insert_trigger; DROP FUNCTION IF EXISTS recoup_delete_trigger; -DROP FUNCTION IF EXISTS deposits_by_coin_insert_trigger; -DROP FUNCTION IF EXISTS deposits_by_coin_delete_trigger; +DROP FUNCTION IF EXISTS deposits_insert_trigger; +DROP FUNCTION IF EXISTS deposits_update_trigger; +DROP FUNCTION IF EXISTS deposits_delete_trigger; DROP FUNCTION IF EXISTS reserves_out_by_reserve_insert_trigger; DROP FUNCTION IF EXISTS reserves_out_by_reserve_delete_trigger; DROP FUNCTION IF EXISTS exchange_do_deposit; diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql index e723a3677..568779f97 100644 --- a/src/exchangedb/exchange-0001.sql +++ b/src/exchangedb/exchange-0001.sql @@ -610,7 +610,7 @@ CREATE TABLE IF NOT EXISTS deposits (deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- PRIMARY KEY ,shard INT8 NOT NULL ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE - ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE + ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE --- FIXME: column needed??? ,amount_with_fee_val INT8 NOT NULL ,amount_with_fee_frac INT4 NOT NULL ,wallet_timestamp INT8 NOT NULL @@ -626,22 +626,11 @@ CREATE TABLE IF NOT EXISTS deposits ,done BOOLEAN NOT NULL DEFAULT FALSE ,extension_blocked BOOLEAN NOT NULL DEFAULT FALSE ,extension_details_serial_id INT8 REFERENCES extension_details (extension_details_serial_id) ON DELETE CASCADE - ,UNIQUE (shard, coin_pub, merchant_pub, h_contract_terms) + ,UNIQUE (coin_pub, merchant_pub, h_contract_terms) ) - PARTITION BY HASH (shard); -- FIXME: why not BY RANGE? RANGE would seem better for 'deposits_get_ready'! + PARTITION BY HASH (coin_pub); -- FIXME: --- new idea: partition deposits by coin_pub (remove deposits_by_coin) --- define 'ready' == ! (tiny || done || blocked) --- add new deposits_by_ready (on shard + wire_deadline), select by shard, then ready + deadline --- -- use triggers to ONLY include 'ready' deposits (delete on update)! --- -- use multi-level partitions: Hash(shard) + Range(wire_deadline/sec) --- add new deposits_by_match (on shard + refund_deadline) --- -- use triggers to ONLY include 'ready' deposits (delete on update)! --- -- use multi-level partitions: Hash(shard) + Range(refund_deadline/sec) --- => first we select per-merchant shard, basically stay on the same system as other ops for the same merchant --- => second we select by deadline, use enough values so that _usually_ the aggregator --- and the 'insert' process _can_ work on different shards! --- => the latter could be achieved by dynamically (!) creating/deleting partitions: +-- TODO: dynamically (!) creating/deleting partitions: -- create new partitions 'as needed', drop old ones once the aggregator has made -- them empty; as 'new' deposits will always have deadlines in the future, this -- would basically guarantee no conflict between aggregator and exchange service! @@ -683,31 +672,15 @@ COMMENT ON COLUMN deposits.extension_details_serial_id COMMENT ON COLUMN deposits.tiny IS 'Set to TRUE if we decided that the amount is too small to ever trigger a wire transfer by itself (requires real aggregation)'; +-- FIXME: we sometimes go ONLY by 'deposit_serial_id', +-- check if queries could be improved by adding shard or adding another index without shard here, or inverting the order of the index here! CREATE INDEX IF NOT EXISTS deposits_deposit_by_serial_id_index ON deposits (shard,deposit_serial_id); -CREATE INDEX IF NOT EXISTS deposits_for_get_ready_index - ON deposits - (shard ASC - ,done - ,extension_blocked - ,tiny - ,wire_deadline ASC - ); -COMMENT ON INDEX deposits_for_get_ready_index - IS 'for deposits_get_ready'; -CREATE INDEX IF NOT EXISTS deposits_for_iterate_matching_index +CREATE INDEX IF NOT EXISTS deposits_by_coin_pub_index ON deposits - (shard - ,merchant_pub - ,wire_target_h_payto - ,done - ,extension_blocked - ,refund_deadline ASC - ); -COMMENT ON INDEX deposits_for_iterate_matching_index - IS 'for deposits_iterate_matching'; + (coin_pub); CREATE TABLE IF NOT EXISTS deposits_default @@ -732,66 +705,198 @@ $$; SELECT add_constraints_to_deposits_partition('default'); +CREATE TABLE IF NOT EXISTS deposits_by_ready + (wire_deadline INT8 NOT NULL + ,shard INT8 NOT NULL + ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) + ,deposit_serial_id INT8 + ) + PARTITION BY RANGE (wire_deadline); +COMMENT ON TABLE deposits_by_ready + IS 'Enables fast lookups for deposits_get_ready, auto-populated via TRIGGER below'; + +CREATE INDEX IF NOT EXISTS deposits_by_ready_main_index + ON deposits_by_ready + (wire_deadline ASC, shard ASC, coin_pub); + +CREATE TABLE IF NOT EXISTS deposits_by_ready_default + PARTITION OF deposits_by_ready + DEFAULT; -CREATE TABLE IF NOT EXISTS deposits_by_coin - (deposit_serial_id BIGINT + +CREATE TABLE IF NOT EXISTS deposits_for_matching + (refund_deadline INT8 NOT NULL ,shard INT8 NOT NULL ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) + ,deposit_serial_id INT8 ) - PARTITION BY HASH (coin_pub); -COMMENT ON TABLE deposits_by_coin - IS 'Enables fast lookups of deposit by coin_pub, auto-populated via TRIGGER below'; + PARTITION BY RANGE (refund_deadline); +COMMENT ON TABLE deposits_for_matching + IS 'Enables fast lookups for deposits_iterate_matching, auto-populated via TRIGGER below'; -CREATE INDEX IF NOT EXISTS deposits_by_coin_main_index - ON deposits_by_coin - (coin_pub); +CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index + ON deposits_for_matching + (refund_deadline ASC, shard, coin_pub); -CREATE TABLE IF NOT EXISTS deposits_by_coin_default - PARTITION OF deposits_by_coin - FOR VALUES WITH (MODULUS 1, REMAINDER 0); +CREATE TABLE IF NOT EXISTS deposits_for_matching_default + PARTITION OF deposits_for_matching + DEFAULT; + -CREATE OR REPLACE FUNCTION deposits_by_coin_insert_trigger() +CREATE OR REPLACE FUNCTION deposits_insert_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ +DECLARE + is_ready BOOLEAN; +DECLARE + is_tready BOOLEAN; -- is ready, but may be tiny BEGIN - INSERT INTO deposits_by_coin - (deposit_serial_id - ,shard - ,coin_pub) - VALUES - (NEW.deposit_serial_id - ,NEW.shard - ,NEW.coin_pub); + is_ready = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked); + is_tready = NOT (NEW.done OR NEW.extension_blocked); + + IF (is_ready) + THEN + INSERT INTO deposits_by_ready + (wire_deadline + ,shard + ,coin_pub + ,deposit_serial_id) + VALUES + (NEW.wire_deadline + ,NEW.shard + ,NEW.coin_pub + ,NEW.deposit_serial_id); + END IF; + IF (is_tready) + THEN + INSERT INTO deposits_for_matching + (refund_deadline + ,shard + ,coin_pub + ,deposit_serial_id) + VALUES + (NEW.refund_deadline + ,NEW.shard + ,NEW.coin_pub + ,NEW.deposit_serial_id); + END IF; RETURN NEW; END $$; -COMMENT ON FUNCTION deposits_by_coin_insert_trigger() - IS 'Replicate deposit inserts into deposits_by_coin table.'; +COMMENT ON FUNCTION deposits_insert_trigger() + IS 'Replicate deposit inserts into materialized indices.'; CREATE TRIGGER deposits_on_insert AFTER INSERT ON deposits - FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_insert_trigger(); + FOR EACH ROW EXECUTE FUNCTION deposits_insert_trigger(); -CREATE OR REPLACE FUNCTION deposits_by_coin_delete_trigger() +CREATE OR REPLACE FUNCTION deposits_update_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ +DECLARE + was_ready BOOLEAN; +DECLARE + is_ready BOOLEAN; +DECLARE + was_tready BOOLEAN; -- was ready, but may be tiny +DECLARE + is_tready BOOLEAN; -- is ready, but may be tiny BEGIN - DELETE FROM deposits_by_coin - WHERE coin_pub = OLD.coin_pub - AND shard = OLD.shard - AND deposit_serial_id = OLD.deposit_serial_id; - RETURN OLD; + was_ready = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked); + is_ready = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked); + was_tready = NOT (OLD.done OR OLD.extension_blocked); + is_tready = NOT (NEW.done OR NEW.extension_blocked); + IF (was_ready AND NOT is_ready) + THEN + DELETE FROM deposits_by_ready + WHERE wire_deadline = OLD.wire_deadline + AND shard = OLD.shard + AND coin_pub = OLD.coin_pub + AND deposit_serial_id = OLD.deposit_serial_id; + END IF; + IF (was_tready AND NOT is_tready) + THEN + DELETE FROM deposits_for_matching + WHERE refund_deadline = OLD.refund_deadline + AND shard = OLD.shard + AND coin_pub = OLD.coin_pub + AND deposit_serial_id = OLD.deposit_serial_id; + END IF; + IF (is_ready AND NOT was_ready) + THEN + INSERT INTO deposits_by_ready + (wire_deadline + ,shard + ,coin_pub + ,deposit_serial_id) + VALUES + (NEW.wire_deadline + ,NEW.shard + ,NEW.coin_pub + ,NEW.deposit_serial_id); + END IF; + IF (is_tready AND NOT was_tready) + THEN + INSERT INTO deposits_for_matching + (refund_deadline + ,shard + ,coin_pub + ,deposit_serial_id) + VALUES + (NEW.refund_deadline + ,NEW.shard + ,NEW.coin_pub + ,NEW.deposit_serial_id); + END IF; + RETURN NEW; +END $$; +COMMENT ON FUNCTION deposits_update_trigger() + IS 'Replicate deposits changes into materialized indices.'; + +CREATE TRIGGER deposits_on_update + AFTER UPDATE + ON deposits + FOR EACH ROW EXECUTE FUNCTION deposits_update_trigger(); + +CREATE OR REPLACE FUNCTION deposits_delete_trigger() + RETURNS trigger + LANGUAGE plpgsql + AS $$ +DECLARE + was_ready BOOLEAN; +DECLARE + was_tready BOOLEAN; -- is ready, but may be tiny +BEGIN + was_ready = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked); + was_tready = NOT (OLD.done OR OLD.extension_blocked); + + IF (was_ready) + THEN + DELETE FROM deposits_by_ready + WHERE wire_deadline = OLD.wire_deadline + AND shard = OLD.shard + AND coin_pub = OLD.coin_pub + AND deposit_serial_id = OLD.deposit_serial_id; + END IF; + IF (was_tready) + THEN + DELETE FROM deposits_for_matching + WHERE refund_deadline = OLD.refund_deadline + AND shard = OLD.shard + AND coin_pub = OLD.coin_pub + AND deposit_serial_id = OLD.deposit_serial_id; + END IF; + RETURN NEW; END $$; -COMMENT ON FUNCTION deposits_by_coin_delete_trigger() - IS 'Replicate deposits deletions into deposits_by_coin table.'; +COMMENT ON FUNCTION deposits_delete_trigger() + IS 'Replicate deposit deletions into materialized indices.'; CREATE TRIGGER deposits_on_delete AFTER DELETE - ON deposits - FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_delete_trigger(); - + ON deposits + FOR EACH ROW EXECUTE FUNCTION deposits_delete_trigger(); CREATE TABLE IF NOT EXISTS refunds @@ -2011,7 +2116,7 @@ DECLARE BEGIN -- Shards: INSERT extension_details (by extension_details_serial_id) -- INSERT wire_targets (by h_payto), on CONFLICT DO NOTHING; --- INSERT deposits (by shard + merchant_pub + h_payto), ON CONFLICT DO NOTHING; +-- INSERT deposits (by coin_pub, shard), ON CONFLICT DO NOTHING; -- UPDATE known_coins (by coin_pub) IF NOT NULL in_extension_details @@ -2356,27 +2461,26 @@ DECLARE DECLARE deposit_frac INT8; -- amount that was originally deposited BEGIN --- Shards: SELECT deposits (by shard, coin_pub, h_contract_terms, merchant_pub) +-- Shards: SELECT deposits (coin_pub, shard, h_contract_terms, merchant_pub) -- INSERT refunds (by deposit_serial_id, rtransaction_id) ON CONFLICT DO NOTHING -- SELECT refunds (by deposit_serial_id) -- UPDATE known_coins (by coin_pub) SELECT - dep.deposit_serial_id - ,dep.amount_with_fee_val - ,dep.amount_with_fee_frac - ,dep.done + deposit_serial_id + ,amount_with_fee_val + ,amount_with_fee_frac + ,done INTO dsi ,deposit_val ,deposit_frac ,out_gone -FROM deposits_by_coin dbc - JOIN deposits dep USING (shard,deposit_serial_id) - WHERE dbc.coin_pub=in_coin_pub - AND dep.shard=in_deposit_shard - AND dep.merchant_pub=in_merchant_pub - AND dep.h_contract_terms=in_h_contract_terms; +FROM deposits + WHERE coin_pub=in_coin_pub + AND shard=in_deposit_shard + AND merchant_pub=in_merchant_pub + AND h_contract_terms=in_h_contract_terms; IF NOT FOUND THEN diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 120f475da..3cde97732 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -990,12 +990,11 @@ prepare_statements (struct PostgresClosure *pg) ",rtransaction_id " ",amount_with_fee_val " ",amount_with_fee_frac " - ") SELECT dbc.deposit_serial_id, $3, $5, $6, $7" - " FROM deposits_by_coin dbc" - " JOIN deposits dep USING (shard,deposit_serial_id)" - " WHERE dbc.coin_pub=$1" - " AND dep.h_contract_terms=$4" - " AND dep.merchant_pub=$2", + ") SELECT deposit_serial_id, $3, $5, $6, $7" + " FROM deposits" /* FIXME: check if adding additional AND on the 'shard' would help (possibly after reviewing indices on deposits!) */ + " WHERE coin_pub=$1" + " AND h_contract_terms=$4" + " AND merchant_pub=$2", 7), /* Query the 'refunds' by coin public key */ GNUNET_PQ_make_prepare ( @@ -1010,12 +1009,11 @@ prepare_statements (struct PostgresClosure *pg) ",denom.fee_refund_val " ",denom.fee_refund_frac " ",ref.refund_serial_id" - " FROM deposits_by_coin dbc" + " FROM deposits dep" " JOIN refunds ref USING (deposit_serial_id)" - " JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id = dep.deposit_serial_id)" - " JOIN known_coins kc ON (dbc.coin_pub = kc.coin_pub)" + " JOIN known_coins kc ON (dep.coin_pub = kc.coin_pub)" " JOIN denominations denom USING (denominations_serial)" - " WHERE dbc.coin_pub=$1;", + " WHERE dep.coin_pub=$1;", 1), /* Query the 'refunds' by coin public key, merchant_pub and contract hash */ GNUNET_PQ_make_prepare ( @@ -1023,10 +1021,9 @@ prepare_statements (struct PostgresClosure *pg) "SELECT" " ref.amount_with_fee_val" ",ref.amount_with_fee_frac" - " FROM deposits_by_coin dbc" + " FROM deposits dep" " JOIN refunds ref USING (shard,deposit_serial_id)" - " JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id = dep.deposit_serial_id)" - " WHERE dbc.coin_pub=$1" + " WHERE dep.coin_pub=$1" " AND dep.merchant_pub=$2" " AND dep.h_contract_terms=$3;", 3), @@ -1053,6 +1050,7 @@ prepare_statements (struct PostgresClosure *pg) /* Lock deposit table; NOTE: we may want to eventually shard the deposit table to avoid this lock being the main point of contention limiting transaction performance. */ + // FIXME: check if this query is even still used! GNUNET_PQ_make_prepare ( "lock_deposit", "LOCK TABLE deposits;", @@ -1098,12 +1096,11 @@ prepare_statements (struct PostgresClosure *pg) ",dep.h_contract_terms" ",dep.wire_salt" ",wt.payto_uri AS receiver_wire_account" - " FROM deposits_by_coin dbc" - " JOIN deposits dep USING (shard,deposit_serial_id)" - " JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)" + " FROM deposits dep" + " JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)" " JOIN denominations USING (denominations_serial)" " JOIN wire_targets wt USING (wire_target_h_payto)" - " WHERE dbc.coin_pub=$1" + " WHERE dep.coin_pub=$1" " AND dep.merchant_pub=$3" " AND dep.h_contract_terms=$2;", 3), @@ -1150,12 +1147,11 @@ prepare_statements (struct PostgresClosure *pg) ",denom.fee_deposit_val" ",denom.fee_deposit_frac" ",dep.wire_deadline" - " FROM deposits_by_coin dbc" - " JOIN deposits dep USING (shard,deposit_serial_id)" + " FROM deposits dep" " JOIN wire_targets wt USING (wire_target_h_payto)" - " JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)" + " JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)" " JOIN denominations denom USING (denominations_serial)" - " WHERE dbc.coin_pub=$1" + " WHERE dep.coin_pub=$1" " AND dep.merchant_pub=$3" " AND dep.h_contract_terms=$2;", 3), @@ -1163,7 +1159,7 @@ prepare_statements (struct PostgresClosure *pg) GNUNET_PQ_make_prepare ( "deposits_get_ready", "SELECT" - " deposit_serial_id" + " dep.deposit_serial_id" ",amount_with_fee_val" ",amount_with_fee_frac" ",denom.fee_deposit_val" @@ -1173,47 +1169,46 @@ prepare_statements (struct PostgresClosure *pg) ",wire_target_serial_id" ",merchant_pub" ",kc.coin_pub" - " FROM deposits" + " FROM deposits_by_ready dbr" + " JOIN deposits dep" + " ON (dbr.coin_pub = dep.coin_pub AND dbr.deposit_serial_id = dep.deposit_serial_id)" " JOIN wire_targets " " USING (wire_target_h_payto)" " JOIN known_coins kc" - " USING (coin_pub)" + " ON (kc.coin_pub = dep.coin_pub)" " JOIN denominations denom" " USING (denominations_serial)" - " WHERE " - " shard >= $2" - " AND shard <= $3" - " AND done=FALSE" - " AND extension_blocked=FALSE" - " AND tiny=FALSE" - " AND wire_deadline<=$1" + " WHERE dbr.wire_deadline<=$1" + " AND dbr.shard >= $2" + " AND dbr.shard <= $3" " AND (kyc_ok OR $4)" " ORDER BY " - " shard ASC" - " ,wire_deadline ASC" + " dbr.wire_deadline ASC" + " ,dbr.shard ASC" " LIMIT 1;", 4), /* Used in #postgres_iterate_matching_deposits() */ GNUNET_PQ_make_prepare ( "deposits_iterate_matching", "SELECT" - " deposit_serial_id" - ",amount_with_fee_val" - ",amount_with_fee_frac" + " dep.deposit_serial_id" + ",dep.amount_with_fee_val" + ",dep.amount_with_fee_frac" ",denom.fee_deposit_val" ",denom.fee_deposit_frac" - ",h_contract_terms" - ",kc.coin_pub" - " FROM deposits" - " JOIN known_coins kc USING (coin_pub)" - " JOIN denominations denom USING (denominations_serial)" - " WHERE shard=$4" - " AND merchant_pub=$1" - " AND wire_target_h_payto=$2" - " AND done=FALSE" - " AND extension_blocked=FALSE" - " AND refund_deadline<$3" - " ORDER BY refund_deadline ASC" + ",dep.h_contract_terms" + ",dfm.coin_pub" + " FROM deposits_for_matching dfm" + " JOIN deposits dep " + " ON (dep.coin_pub = dfm.coin_pub and dep.deposit_serial_id = dfm.deposit_serial_id)" + " JOIN known_coins kc" + " ON (dep.coin_pub = kc.coin_pub)" + " JOIN denominations denom" + " USING (denominations_serial)" + " WHERE dfm.refund_deadline<$3" + " AND dfm.shard=$4" + " AND dep.merchant_pub=$1" + " AND dep.wire_target_h_payto=$2" " LIMIT " TALER_QUOTE ( TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";", @@ -1223,16 +1218,16 @@ prepare_statements (struct PostgresClosure *pg) "mark_deposit_tiny", "UPDATE deposits" " SET tiny=TRUE" - " WHERE shard=$2" - " AND deposit_serial_id=$1", + " WHERE coin_pub=$1" + " AND deposit_serial_id=$2", 2), /* Used in #postgres_mark_deposit_done() */ GNUNET_PQ_make_prepare ( "mark_deposit_done", "UPDATE deposits" " SET done=TRUE" - " WHERE shard=$2" - " AND deposit_serial_id=$1;", + " WHERE coin_pub=$1" + " AND deposit_serial_id=$2;", 2), /* Used in #postgres_get_coin_transactions() to obtain information about how a coin has been spend with /deposit requests. */ @@ -1255,16 +1250,14 @@ prepare_statements (struct PostgresClosure *pg) ",dep.coin_sig" ",dep.deposit_serial_id" ",dep.done" - " FROM deposits_by_coin dbc" - " JOIN deposits dep" - " USING (shard,deposit_serial_id)" + " FROM deposits dep" " JOIN wire_targets wt" " USING (wire_target_h_payto)" " JOIN known_coins kc" - " ON (kc.coin_pub = dbc.coin_pub)" + " ON (kc.coin_pub = dep.coin_pub)" " JOIN denominations denoms" " USING (denominations_serial)" - " WHERE dbc.coin_pub=$1;", + " WHERE dep.coin_pub=$1;", 1), /* Used in #postgres_get_link_data(). */ @@ -1329,20 +1322,18 @@ prepare_statements (struct PostgresClosure *pg) ",wt.payto_uri" ",denom.fee_deposit_val" ",denom.fee_deposit_frac" - " FROM deposits_by_coin dbc" - " JOIN deposits dep" - " USING (shard,deposit_serial_id)" + " FROM deposits dep" " JOIN wire_targets wt" " USING (wire_target_h_payto)" " JOIN aggregation_tracking" " USING (deposit_serial_id)" " JOIN known_coins kc" - " ON (kc.coin_pub = dbc.coin_pub)" + " ON (kc.coin_pub = dep.coin_pub)" " JOIN denominations denom" " USING (denominations_serial)" " JOIN wire_out" " USING (wtid_raw)" - " WHERE dbc.coin_pub=$1" + " WHERE dep.coin_pub=$1" " AND dep.merchant_pub=$3" " AND dep.h_contract_terms=$2", 3), @@ -5898,14 +5889,13 @@ postgres_have_deposit2 ( */ static enum GNUNET_DB_QueryStatus postgres_mark_deposit_tiny (void *cls, - const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_CoinSpendPublicKeyP *coin_pub, uint64_t rowid) { struct PostgresClosure *pg = cls; - uint64_t deposit_shard = compute_shard (merchant_pub); struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (coin_pub), GNUNET_PQ_query_param_uint64 (&rowid), - GNUNET_PQ_query_param_uint64 (&deposit_shard), GNUNET_PQ_query_param_end }; @@ -5927,14 +5917,13 @@ postgres_mark_deposit_tiny (void *cls, */ static enum GNUNET_DB_QueryStatus postgres_mark_deposit_done (void *cls, - const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_CoinSpendPublicKeyP *coin_pub, uint64_t rowid) { struct PostgresClosure *pg = cls; - uint64_t deposit_shard = compute_shard (merchant_pub); struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (coin_pub), GNUNET_PQ_query_param_uint64 (&rowid), - GNUNET_PQ_query_param_uint64 (&deposit_shard), GNUNET_PQ_query_param_end }; @@ -6431,6 +6420,12 @@ postgres_insert_deposit (void *cls, GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } + if (GNUNET_TIME_timestamp_cmp (deposit->wire_deadline, + <, + deposit->refund_deadline)) + { + GNUNET_break (0); + } { uint64_t shard = compute_shard (&deposit->merchant_pub); struct GNUNET_PQ_QueryParam params[] = { diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 012cac645..79b09e0e1 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -2284,7 +2284,7 @@ run (void *cls) "test-2")); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != plugin->mark_deposit_tiny (plugin->cls, - &deposit.merchant_pub, + &deposit.coin.coin_pub, deposit_rowid)); FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->get_ready_deposit (plugin->cls, @@ -2306,7 +2306,7 @@ run (void *cls) "test-3")); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != plugin->mark_deposit_done (plugin->cls, - &deposit.merchant_pub, + &deposit.coin.coin_pub, deposit_rowid)); FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->commit (plugin->cls)); diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index cee509542..2a462aba6 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -3060,13 +3060,13 @@ struct TALER_EXCHANGEDB_Plugin * returned by @e iterate_ready_deposits() * * @param cls the @e cls of this struct with the plugin-specific state - * @param merchant_pub identifies the beneficiary of the deposit + * @param coin_pub identifies the coin of the deposit * @param deposit_rowid identifies the deposit row to modify * @return query result status */ enum GNUNET_DB_QueryStatus (*mark_deposit_tiny)(void *cls, - const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_CoinSpendPublicKeyP *coin_pub, uint64_t rowid); @@ -3076,13 +3076,13 @@ struct TALER_EXCHANGEDB_Plugin * @e iterate_ready_deposits() or @e iterate_matching_deposits(). * * @param cls the @e cls of this struct with the plugin-specific state - * @param merchant_pub identifies the beneficiary of the deposit + * @param coin_pub identifies the coin of the deposit * @param deposit_rowid identifies the deposit row to modify * @return query result status */ enum GNUNET_DB_QueryStatus (*mark_deposit_done)(void *cls, - const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_CoinSpendPublicKeyP *coin_pub, uint64_t rowid); |