aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <grothoff@gnunet.org>2022-03-24 17:33:29 +0100
committerChristian Grothoff <grothoff@gnunet.org>2022-03-24 17:33:29 +0100
commitb856d56d95f92eb9dedb0af49493350ea8ea2268 (patch)
tree3490ebf1e069fbe858a3f6cf97b18da8289840ae
parentc782dfe2aadfd06e47ed354c1fb389fecc715433 (diff)
rework deposits sharding, towards making aggregator faster (not necessarily done)
-rw-r--r--src/exchange/taler-exchange-aggregator.c83
-rw-r--r--src/exchangedb/drop0001.sql8
-rw-r--r--src/exchangedb/exchange-0001.sql266
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c131
-rw-r--r--src/exchangedb/test_exchangedb.c4
-rw-r--r--src/include/taler_exchangedb_plugin.h8
6 files changed, 324 insertions, 176 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index abab347fe..c34d47f9c 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -28,6 +28,18 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
+struct AdditionalDeposit
+{
+ /**
+ * Public key of the coin.
+ */
+ struct TALER_CoinSpendPublicKeyP coin_pub;
+
+ /**
+ * Row of the deposit.
+ */
+ uint64_t row;
+};
/**
* Information about one aggregation process to be executed. There is
@@ -43,6 +55,11 @@ struct AggregationUnit
struct TALER_MerchantPublicKeyP merchant_pub;
/**
+ * Public key of the coin.
+ */
+ struct TALER_CoinSpendPublicKeyP coin_pub;
+
+ /**
* Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
*/
struct TALER_Amount total_amount;
@@ -97,7 +114,8 @@ struct AggregationUnit
/**
* Array of row_ids from the aggregation.
*/
- uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
+ struct AdditionalDeposit
+ additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
/**
* Offset specifying how many @e additional_rows are in use.
@@ -383,7 +401,8 @@ deposit_cb (void *cls,
enum GNUNET_DB_QueryStatus qs;
au->merchant_pub = *merchant_pub;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ au->coin_pub = *coin_pub;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Aggregator processing payment %s with amount %s\n",
TALER_B2S (coin_pub),
TALER_amount2s (amount_with_fee));
@@ -405,7 +424,7 @@ deposit_cb (void *cls,
{
struct TALER_Amount ntotal;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Non-refunded transaction, subtracting deposit fee %s\n",
TALER_amount2s (deposit_fee));
if (0 >
@@ -428,6 +447,9 @@ deposit_cb (void *cls,
au->total_amount = ntotal;
}
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Amount after fee is %s\n",
+ TALER_amount2s (&au->total_amount));
GNUNET_assert (NULL == au->payto_uri);
au->payto_uri = GNUNET_strdup (payto_uri);
@@ -437,7 +459,7 @@ deposit_cb (void *cls,
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&au->wtid,
sizeof (au->wtid));
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n",
TALER_B2S (&au->wtid),
TALER_amount2s (amount_with_fee),
@@ -493,7 +515,7 @@ deposit_cb (void *cls,
"Aggregator marks deposit %llu as done\n",
(unsigned long long) row_id);
qs = db_plugin->mark_deposit_done (db_plugin->cls,
- merchant_pub,
+ coin_pub,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
@@ -528,6 +550,8 @@ aggregate_cb (void *cls,
struct TALER_Amount old;
enum GNUNET_DB_QueryStatus qs;
+ if (row_id == au->row_id)
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
{
/* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */
@@ -605,18 +629,29 @@ aggregate_cb (void *cls,
}
/* "append" to our list of rows */
- au->additional_rows[au->rows_offset++] = row_id;
+ au->additional_rows[au->rows_offset].coin_pub = *coin_pub;
+ au->additional_rows[au->rows_offset].row = row_id;
+ au->rows_offset++;
/* insert into aggregation tracking table */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Adding %llu to aggregate %s\n",
+ (unsigned long long) row_id,
+ TALER_B2S (&au->wtid));
qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
&au->wtid,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to add %llu to aggregate %s: %d\n",
+ (unsigned long long) row_id,
+ TALER_B2S (&au->wtid),
+ qs);
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs;
}
qs = db_plugin->mark_deposit_done (db_plugin->cls,
- &au->merchant_pub,
+ coin_pub,
row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
@@ -775,7 +810,7 @@ run_aggregation (void *cls)
}
/* Now try to find other deposits to aggregate */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Found ready deposit for %s, aggregating by target %llu\n",
TALER_B2S (&au_active.merchant_pub),
(unsigned long long) au_active.wire_target);
@@ -808,13 +843,17 @@ run_aggregation (void *cls)
s);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Found %d other deposits to combine into wire transfer.\n",
- qs);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found %d other deposits to combine into wire transfer with fee %s.\n",
+ qs,
+ TALER_amount2s (&au_active.fees.wire));
/* Subtract wire transfer fee and round to the unit supported by the
wire transfer method; Check if after rounding down, we still have
an amount to transfer, and if not mark as 'tiny'. */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Rounding aggregate of %s\n",
+ TALER_amount2s (&au_active.total_amount));
if ( (0 >=
TALER_amount_subtract (&au_active.final_amount,
&au_active.total_amount,
@@ -822,8 +861,7 @@ run_aggregation (void *cls)
(GNUNET_SYSERR ==
TALER_amount_round_down (&au_active.final_amount,
&currency_round_unit)) ||
- ( (0 == au_active.final_amount.value) &&
- (0 == au_active.final_amount.fraction) ) )
+ (TALER_amount_is_zero (&au_active.final_amount)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Aggregate value too low for transfer (%d/%s)\n",
@@ -848,23 +886,29 @@ run_aggregation (void *cls)
return;
}
/* Mark transactions by row_id as minor */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Marking %s (%llu) as tiny\n",
+ TALER_B2S (&au_active.coin_pub),
+ (unsigned long long) au_active.row_id);
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- &au_active.merchant_pub,
+ &au_active.coin_pub,
au_active.row_id);
- if (0 <= qs)
+ if (0 < qs)
{
for (unsigned int i = 0; i<au_active.rows_offset; i++)
{
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- &au_active.merchant_pub,
- au_active.additional_rows[i]);
- if (0 > qs)
+ &au_active.additional_rows[i].
+ coin_pub,
+ au_active.additional_rows[i].row);
+ if (0 >= qs)
break;
}
}
+ GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serialization issue, trying again later!\n");
db_plugin->rollback (db_plugin->cls);
cleanup_au (&au_active);
@@ -876,6 +920,7 @@ run_aggregation (void *cls)
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
+ GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
cleanup_au (&au_active);
global_ret = EXIT_FAILURE;
diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql
index 60acc98a4..225c817a0 100644
--- a/src/exchangedb/drop0001.sql
+++ b/src/exchangedb/drop0001.sql
@@ -55,6 +55,8 @@ DROP TABLE IF EXISTS wire_targets CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_wire_targets_partition;
DROP TABLE IF EXISTS wire_fee CASCADE;
DROP TABLE IF EXISTS deposits CASCADE;
+DROP TABLE IF EXISTS deposits_by_ready CASCADE;
+DROP TABLE IF EXISTS deposits_for_matching CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_deposits_partition;
DROP TABLE IF EXISTS extension_details CASCADE;
DROP TABLE IF EXISTS refunds CASCADE;
@@ -88,6 +90,7 @@ DROP TABLE IF EXISTS recoup_by_reserve CASCADE;
DROP TABLE IF EXISTS partners CASCADE;
DROP TABLE IF EXISTS account_merges CASCADE;
DROP TABLE IF EXISTS purse_merges CASCADE;
+DROP TABLE IF EXISTS purse_deposits CASCADE;
DROP TABLE IF EXISTS contracts CASCADE;
DROP TABLE IF EXISTS history_requests CASCADE;
DROP TABLE IF EXISTS close_requests CASCADE;
@@ -103,8 +106,9 @@ DROP FUNCTION IF EXISTS exchange_do_withdraw;
DROP FUNCTION IF EXISTS exchange_do_withdraw_limit_check;
DROP FUNCTION IF EXISTS recoup_insert_trigger;
DROP FUNCTION IF EXISTS recoup_delete_trigger;
-DROP FUNCTION IF EXISTS deposits_by_coin_insert_trigger;
-DROP FUNCTION IF EXISTS deposits_by_coin_delete_trigger;
+DROP FUNCTION IF EXISTS deposits_insert_trigger;
+DROP FUNCTION IF EXISTS deposits_update_trigger;
+DROP FUNCTION IF EXISTS deposits_delete_trigger;
DROP FUNCTION IF EXISTS reserves_out_by_reserve_insert_trigger;
DROP FUNCTION IF EXISTS reserves_out_by_reserve_delete_trigger;
DROP FUNCTION IF EXISTS exchange_do_deposit;
diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql
index e723a3677..568779f97 100644
--- a/src/exchangedb/exchange-0001.sql
+++ b/src/exchangedb/exchange-0001.sql
@@ -610,7 +610,7 @@ CREATE TABLE IF NOT EXISTS deposits
(deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- PRIMARY KEY
,shard INT8 NOT NULL
,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE
- ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE
+ ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE --- FIXME: column needed???
,amount_with_fee_val INT8 NOT NULL
,amount_with_fee_frac INT4 NOT NULL
,wallet_timestamp INT8 NOT NULL
@@ -626,22 +626,11 @@ CREATE TABLE IF NOT EXISTS deposits
,done BOOLEAN NOT NULL DEFAULT FALSE
,extension_blocked BOOLEAN NOT NULL DEFAULT FALSE
,extension_details_serial_id INT8 REFERENCES extension_details (extension_details_serial_id) ON DELETE CASCADE
- ,UNIQUE (shard, coin_pub, merchant_pub, h_contract_terms)
+ ,UNIQUE (coin_pub, merchant_pub, h_contract_terms)
)
- PARTITION BY HASH (shard); -- FIXME: why not BY RANGE? RANGE would seem better for 'deposits_get_ready'!
+ PARTITION BY HASH (coin_pub);
-- FIXME:
--- new idea: partition deposits by coin_pub (remove deposits_by_coin)
--- define 'ready' == ! (tiny || done || blocked)
--- add new deposits_by_ready (on shard + wire_deadline), select by shard, then ready + deadline
--- -- use triggers to ONLY include 'ready' deposits (delete on update)!
--- -- use multi-level partitions: Hash(shard) + Range(wire_deadline/sec)
--- add new deposits_by_match (on shard + refund_deadline)
--- -- use triggers to ONLY include 'ready' deposits (delete on update)!
--- -- use multi-level partitions: Hash(shard) + Range(refund_deadline/sec)
--- => first we select per-merchant shard, basically stay on the same system as other ops for the same merchant
--- => second we select by deadline, use enough values so that _usually_ the aggregator
--- and the 'insert' process _can_ work on different shards!
--- => the latter could be achieved by dynamically (!) creating/deleting partitions:
+-- TODO: dynamically (!) creating/deleting partitions:
-- create new partitions 'as needed', drop old ones once the aggregator has made
-- them empty; as 'new' deposits will always have deadlines in the future, this
-- would basically guarantee no conflict between aggregator and exchange service!
@@ -683,31 +672,15 @@ COMMENT ON COLUMN deposits.extension_details_serial_id
COMMENT ON COLUMN deposits.tiny
IS 'Set to TRUE if we decided that the amount is too small to ever trigger a wire transfer by itself (requires real aggregation)';
+-- FIXME: we sometimes go ONLY by 'deposit_serial_id',
+-- check if queries could be improved by adding shard or adding another index without shard here, or inverting the order of the index here!
CREATE INDEX IF NOT EXISTS deposits_deposit_by_serial_id_index
ON deposits
(shard,deposit_serial_id);
-CREATE INDEX IF NOT EXISTS deposits_for_get_ready_index
- ON deposits
- (shard ASC
- ,done
- ,extension_blocked
- ,tiny
- ,wire_deadline ASC
- );
-COMMENT ON INDEX deposits_for_get_ready_index
- IS 'for deposits_get_ready';
-CREATE INDEX IF NOT EXISTS deposits_for_iterate_matching_index
+CREATE INDEX IF NOT EXISTS deposits_by_coin_pub_index
ON deposits
- (shard
- ,merchant_pub
- ,wire_target_h_payto
- ,done
- ,extension_blocked
- ,refund_deadline ASC
- );
-COMMENT ON INDEX deposits_for_iterate_matching_index
- IS 'for deposits_iterate_matching';
+ (coin_pub);
CREATE TABLE IF NOT EXISTS deposits_default
@@ -732,66 +705,198 @@ $$;
SELECT add_constraints_to_deposits_partition('default');
+CREATE TABLE IF NOT EXISTS deposits_by_ready
+ (wire_deadline INT8 NOT NULL
+ ,shard INT8 NOT NULL
+ ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)
+ ,deposit_serial_id INT8
+ )
+ PARTITION BY RANGE (wire_deadline);
+COMMENT ON TABLE deposits_by_ready
+ IS 'Enables fast lookups for deposits_get_ready, auto-populated via TRIGGER below';
+
+CREATE INDEX IF NOT EXISTS deposits_by_ready_main_index
+ ON deposits_by_ready
+ (wire_deadline ASC, shard ASC, coin_pub);
+
+CREATE TABLE IF NOT EXISTS deposits_by_ready_default
+ PARTITION OF deposits_by_ready
+ DEFAULT;
-CREATE TABLE IF NOT EXISTS deposits_by_coin
- (deposit_serial_id BIGINT
+
+CREATE TABLE IF NOT EXISTS deposits_for_matching
+ (refund_deadline INT8 NOT NULL
,shard INT8 NOT NULL
,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)
+ ,deposit_serial_id INT8
)
- PARTITION BY HASH (coin_pub);
-COMMENT ON TABLE deposits_by_coin
- IS 'Enables fast lookups of deposit by coin_pub, auto-populated via TRIGGER below';
+ PARTITION BY RANGE (refund_deadline);
+COMMENT ON TABLE deposits_for_matching
+ IS 'Enables fast lookups for deposits_iterate_matching, auto-populated via TRIGGER below';
-CREATE INDEX IF NOT EXISTS deposits_by_coin_main_index
- ON deposits_by_coin
- (coin_pub);
+CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index
+ ON deposits_for_matching
+ (refund_deadline ASC, shard, coin_pub);
-CREATE TABLE IF NOT EXISTS deposits_by_coin_default
- PARTITION OF deposits_by_coin
- FOR VALUES WITH (MODULUS 1, REMAINDER 0);
+CREATE TABLE IF NOT EXISTS deposits_for_matching_default
+ PARTITION OF deposits_for_matching
+ DEFAULT;
+
-CREATE OR REPLACE FUNCTION deposits_by_coin_insert_trigger()
+CREATE OR REPLACE FUNCTION deposits_insert_trigger()
RETURNS trigger
LANGUAGE plpgsql
AS $$
+DECLARE
+ is_ready BOOLEAN;
+DECLARE
+ is_tready BOOLEAN; -- is ready, but may be tiny
BEGIN
- INSERT INTO deposits_by_coin
- (deposit_serial_id
- ,shard
- ,coin_pub)
- VALUES
- (NEW.deposit_serial_id
- ,NEW.shard
- ,NEW.coin_pub);
+ is_ready = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked);
+ is_tready = NOT (NEW.done OR NEW.extension_blocked);
+
+ IF (is_ready)
+ THEN
+ INSERT INTO deposits_by_ready
+ (wire_deadline
+ ,shard
+ ,coin_pub
+ ,deposit_serial_id)
+ VALUES
+ (NEW.wire_deadline
+ ,NEW.shard
+ ,NEW.coin_pub
+ ,NEW.deposit_serial_id);
+ END IF;
+ IF (is_tready)
+ THEN
+ INSERT INTO deposits_for_matching
+ (refund_deadline
+ ,shard
+ ,coin_pub
+ ,deposit_serial_id)
+ VALUES
+ (NEW.refund_deadline
+ ,NEW.shard
+ ,NEW.coin_pub
+ ,NEW.deposit_serial_id);
+ END IF;
RETURN NEW;
END $$;
-COMMENT ON FUNCTION deposits_by_coin_insert_trigger()
- IS 'Replicate deposit inserts into deposits_by_coin table.';
+COMMENT ON FUNCTION deposits_insert_trigger()
+ IS 'Replicate deposit inserts into materialized indices.';
CREATE TRIGGER deposits_on_insert
AFTER INSERT
ON deposits
- FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_insert_trigger();
+ FOR EACH ROW EXECUTE FUNCTION deposits_insert_trigger();
-CREATE OR REPLACE FUNCTION deposits_by_coin_delete_trigger()
+CREATE OR REPLACE FUNCTION deposits_update_trigger()
RETURNS trigger
LANGUAGE plpgsql
AS $$
+DECLARE
+ was_ready BOOLEAN;
+DECLARE
+ is_ready BOOLEAN;
+DECLARE
+ was_tready BOOLEAN; -- was ready, but may be tiny
+DECLARE
+ is_tready BOOLEAN; -- is ready, but may be tiny
BEGIN
- DELETE FROM deposits_by_coin
- WHERE coin_pub = OLD.coin_pub
- AND shard = OLD.shard
- AND deposit_serial_id = OLD.deposit_serial_id;
- RETURN OLD;
+ was_ready = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked);
+ is_ready = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked);
+ was_tready = NOT (OLD.done OR OLD.extension_blocked);
+ is_tready = NOT (NEW.done OR NEW.extension_blocked);
+ IF (was_ready AND NOT is_ready)
+ THEN
+ DELETE FROM deposits_by_ready
+ WHERE wire_deadline = OLD.wire_deadline
+ AND shard = OLD.shard
+ AND coin_pub = OLD.coin_pub
+ AND deposit_serial_id = OLD.deposit_serial_id;
+ END IF;
+ IF (was_tready AND NOT is_tready)
+ THEN
+ DELETE FROM deposits_for_matching
+ WHERE refund_deadline = OLD.refund_deadline
+ AND shard = OLD.shard
+ AND coin_pub = OLD.coin_pub
+ AND deposit_serial_id = OLD.deposit_serial_id;
+ END IF;
+ IF (is_ready AND NOT was_ready)
+ THEN
+ INSERT INTO deposits_by_ready
+ (wire_deadline
+ ,shard
+ ,coin_pub
+ ,deposit_serial_id)
+ VALUES
+ (NEW.wire_deadline
+ ,NEW.shard
+ ,NEW.coin_pub
+ ,NEW.deposit_serial_id);
+ END IF;
+ IF (is_tready AND NOT was_tready)
+ THEN
+ INSERT INTO deposits_for_matching
+ (refund_deadline
+ ,shard
+ ,coin_pub
+ ,deposit_serial_id)
+ VALUES
+ (NEW.refund_deadline
+ ,NEW.shard
+ ,NEW.coin_pub
+ ,NEW.deposit_serial_id);
+ END IF;
+ RETURN NEW;
+END $$;
+COMMENT ON FUNCTION deposits_update_trigger()
+ IS 'Replicate deposits changes into materialized indices.';
+
+CREATE TRIGGER deposits_on_update
+ AFTER UPDATE
+ ON deposits
+ FOR EACH ROW EXECUTE FUNCTION deposits_update_trigger();
+
+CREATE OR REPLACE FUNCTION deposits_delete_trigger()
+ RETURNS trigger
+ LANGUAGE plpgsql
+ AS $$
+DECLARE
+ was_ready BOOLEAN;
+DECLARE
+ was_tready BOOLEAN; -- is ready, but may be tiny
+BEGIN
+ was_ready = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked);
+ was_tready = NOT (OLD.done OR OLD.extension_blocked);
+
+ IF (was_ready)
+ THEN
+ DELETE FROM deposits_by_ready
+ WHERE wire_deadline = OLD.wire_deadline
+ AND shard = OLD.shard
+ AND coin_pub = OLD.coin_pub
+ AND deposit_serial_id = OLD.deposit_serial_id;
+ END IF;
+ IF (was_tready)
+ THEN
+ DELETE FROM deposits_for_matching
+ WHERE refund_deadline = OLD.refund_deadline
+ AND shard = OLD.shard
+ AND coin_pub = OLD.coin_pub
+ AND deposit_serial_id = OLD.deposit_serial_id;
+ END IF;
+ RETURN NEW;
END $$;
-COMMENT ON FUNCTION deposits_by_coin_delete_trigger()
- IS 'Replicate deposits deletions into deposits_by_coin table.';
+COMMENT ON FUNCTION deposits_delete_trigger()
+ IS 'Replicate deposit deletions into materialized indices.';
CREATE TRIGGER deposits_on_delete
AFTER DELETE
- ON deposits
- FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_delete_trigger();
-
+ ON deposits
+ FOR EACH ROW EXECUTE FUNCTION deposits_delete_trigger();
CREATE TABLE IF NOT EXISTS refunds
@@ -2011,7 +2116,7 @@ DECLARE
BEGIN
-- Shards: INSERT extension_details (by extension_details_serial_id)
-- INSERT wire_targets (by h_payto), on CONFLICT DO NOTHING;
--- INSERT deposits (by shard + merchant_pub + h_payto), ON CONFLICT DO NOTHING;
+-- INSERT deposits (by coin_pub, shard), ON CONFLICT DO NOTHING;
-- UPDATE known_coins (by coin_pub)
IF NOT NULL in_extension_details
@@ -2356,27 +2461,26 @@ DECLARE
DECLARE
deposit_frac INT8; -- amount that was originally deposited
BEGIN
--- Shards: SELECT deposits (by shard, coin_pub, h_contract_terms, merchant_pub)
+-- Shards: SELECT deposits (coin_pub, shard, h_contract_terms, merchant_pub)
-- INSERT refunds (by deposit_serial_id, rtransaction_id) ON CONFLICT DO NOTHING
-- SELECT refunds (by deposit_serial_id)
-- UPDATE known_coins (by coin_pub)
SELECT
- dep.deposit_serial_id
- ,dep.amount_with_fee_val
- ,dep.amount_with_fee_frac
- ,dep.done
+ deposit_serial_id
+ ,amount_with_fee_val
+ ,amount_with_fee_frac
+ ,done
INTO
dsi
,deposit_val
,deposit_frac
,out_gone
-FROM deposits_by_coin dbc
- JOIN deposits dep USING (shard,deposit_serial_id)
- WHERE dbc.coin_pub=in_coin_pub
- AND dep.shard=in_deposit_shard
- AND dep.merchant_pub=in_merchant_pub
- AND dep.h_contract_terms=in_h_contract_terms;
+FROM deposits
+ WHERE coin_pub=in_coin_pub
+ AND shard=in_deposit_shard
+ AND merchant_pub=in_merchant_pub
+ AND h_contract_terms=in_h_contract_terms;
IF NOT FOUND
THEN
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index 120f475da..3cde97732 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -990,12 +990,11 @@ prepare_statements (struct PostgresClosure *pg)
",rtransaction_id "
",amount_with_fee_val "
",amount_with_fee_frac "
- ") SELECT dbc.deposit_serial_id, $3, $5, $6, $7"
- " FROM deposits_by_coin dbc"
- " JOIN deposits dep USING (shard,deposit_serial_id)"
- " WHERE dbc.coin_pub=$1"
- " AND dep.h_contract_terms=$4"
- " AND dep.merchant_pub=$2",
+ ") SELECT deposit_serial_id, $3, $5, $6, $7"
+ " FROM deposits" /* FIXME: check if adding additional AND on the 'shard' would help (possibly after reviewing indices on deposits!) */
+ " WHERE coin_pub=$1"
+ " AND h_contract_terms=$4"
+ " AND merchant_pub=$2",
7),
/* Query the 'refunds' by coin public key */
GNUNET_PQ_make_prepare (
@@ -1010,12 +1009,11 @@ prepare_statements (struct PostgresClosure *pg)
",denom.fee_refund_val "
",denom.fee_refund_frac "
",ref.refund_serial_id"
- " FROM deposits_by_coin dbc"
+ " FROM deposits dep"
" JOIN refunds ref USING (deposit_serial_id)"
- " JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id = dep.deposit_serial_id)"
- " JOIN known_coins kc ON (dbc.coin_pub = kc.coin_pub)"
+ " JOIN known_coins kc ON (dep.coin_pub = kc.coin_pub)"
" JOIN denominations denom USING (denominations_serial)"
- " WHERE dbc.coin_pub=$1;",
+ " WHERE dep.coin_pub=$1;",
1),
/* Query the 'refunds' by coin public key, merchant_pub and contract hash */
GNUNET_PQ_make_prepare (
@@ -1023,10 +1021,9 @@ prepare_statements (struct PostgresClosure *pg)
"SELECT"
" ref.amount_with_fee_val"
",ref.amount_with_fee_frac"
- " FROM deposits_by_coin dbc"
+ " FROM deposits dep"
" JOIN refunds ref USING (shard,deposit_serial_id)"
- " JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id = dep.deposit_serial_id)"
- " WHERE dbc.coin_pub=$1"
+ " WHERE dep.coin_pub=$1"
" AND dep.merchant_pub=$2"
" AND dep.h_contract_terms=$3;",
3),
@@ -1053,6 +1050,7 @@ prepare_statements (struct PostgresClosure *pg)
/* Lock deposit table; NOTE: we may want to eventually shard the
deposit table to avoid this lock being the main point of
contention limiting transaction performance. */
+ // FIXME: check if this query is even still used!
GNUNET_PQ_make_prepare (
"lock_deposit",
"LOCK TABLE deposits;",
@@ -1098,12 +1096,11 @@ prepare_statements (struct PostgresClosure *pg)
",dep.h_contract_terms"
",dep.wire_salt"
",wt.payto_uri AS receiver_wire_account"
- " FROM deposits_by_coin dbc"
- " JOIN deposits dep USING (shard,deposit_serial_id)"
- " JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)"
+ " FROM deposits dep"
+ " JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)"
" JOIN denominations USING (denominations_serial)"
" JOIN wire_targets wt USING (wire_target_h_payto)"
- " WHERE dbc.coin_pub=$1"
+ " WHERE dep.coin_pub=$1"
" AND dep.merchant_pub=$3"
" AND dep.h_contract_terms=$2;",
3),
@@ -1150,12 +1147,11 @@ prepare_statements (struct PostgresClosure *pg)
",denom.fee_deposit_val"
",denom.fee_deposit_frac"
",dep.wire_deadline"
- " FROM deposits_by_coin dbc"
- " JOIN deposits dep USING (shard,deposit_serial_id)"
+ " FROM deposits dep"
" JOIN wire_targets wt USING (wire_target_h_payto)"
- " JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)"
+ " JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)"
" JOIN denominations denom USING (denominations_serial)"
- " WHERE dbc.coin_pub=$1"
+ " WHERE dep.coin_pub=$1"
" AND dep.merchant_pub=$3"
" AND dep.h_contract_terms=$2;",
3),
@@ -1163,7 +1159,7 @@ prepare_statements (struct PostgresClosure *pg)
GNUNET_PQ_make_prepare (
"deposits_get_ready",
"SELECT"
- " deposit_serial_id"
+ " dep.deposit_serial_id"
",amount_with_fee_val"
",amount_with_fee_frac"
",denom.fee_deposit_val"
@@ -1173,47 +1169,46 @@ prepare_statements (struct PostgresClosure *pg)
",wire_target_serial_id"
",merchant_pub"
",kc.coin_pub"
- " FROM deposits"
+ " FROM deposits_by_ready dbr"
+ " JOIN deposits dep"
+ " ON (dbr.coin_pub = dep.coin_pub AND dbr.deposit_serial_id = dep.deposit_serial_id)"
" JOIN wire_targets "
" USING (wire_target_h_payto)"
" JOIN known_coins kc"
- " USING (coin_pub)"
+ " ON (kc.coin_pub = dep.coin_pub)"
" JOIN denominations denom"
" USING (denominations_serial)"
- " WHERE "
- " shard >= $2"
- " AND shard <= $3"
- " AND done=FALSE"
- " AND extension_blocked=FALSE"
- " AND tiny=FALSE"
- " AND wire_deadline<=$1"
+ " WHERE dbr.wire_deadline<=$1"
+ " AND dbr.shard >= $2"
+ " AND dbr.shard <= $3"
" AND (kyc_ok OR $4)"
" ORDER BY "
- " shard ASC"
- " ,wire_deadline ASC"
+ " dbr.wire_deadline ASC"
+ " ,dbr.shard ASC"
" LIMIT 1;",
4),
/* Used in #postgres_iterate_matching_deposits() */
GNUNET_PQ_make_prepare (
"deposits_iterate_matching",
"SELECT"
- " deposit_serial_id"
- ",amount_with_fee_val"
- ",amount_with_fee_frac"
+ " dep.deposit_serial_id"
+ ",dep.amount_with_fee_val"
+ ",dep.amount_with_fee_frac"
",denom.fee_deposit_val"
",denom.fee_deposit_frac"
- ",h_contract_terms"
- ",kc.coin_pub"
- " FROM deposits"
- " JOIN known_coins kc USING (coin_pub)"
- " JOIN denominations denom USING (denominations_serial)"
- " WHERE shard=$4"
- " AND merchant_pub=$1"
- " AND wire_target_h_payto=$2"
- " AND done=FALSE"
- " AND extension_blocked=FALSE"
- " AND refund_deadline<$3"
- " ORDER BY refund_deadline ASC"
+ ",dep.h_contract_terms"
+ ",dfm.coin_pub"
+ " FROM deposits_for_matching dfm"
+ " JOIN deposits dep "
+ " ON (dep.coin_pub = dfm.coin_pub and dep.deposit_serial_id = dfm.deposit_serial_id)"
+ " JOIN known_coins kc"
+ " ON (dep.coin_pub = kc.coin_pub)"
+ " JOIN denominations denom"
+ " USING (denominations_serial)"
+ " WHERE dfm.refund_deadline<$3"
+ " AND dfm.shard=$4"
+ " AND dep.merchant_pub=$1"
+ " AND dep.wire_target_h_payto=$2"
" LIMIT "
TALER_QUOTE (
TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";",
@@ -1223,16 +1218,16 @@ prepare_statements (struct PostgresClosure *pg)
"mark_deposit_tiny",
"UPDATE deposits"
" SET tiny=TRUE"
- " WHERE shard=$2"
- " AND deposit_serial_id=$1",
+ " WHERE coin_pub=$1"
+ " AND deposit_serial_id=$2",
2),
/* Used in #postgres_mark_deposit_done() */
GNUNET_PQ_make_prepare (
"mark_deposit_done",
"UPDATE deposits"
" SET done=TRUE"
- " WHERE shard=$2"
- " AND deposit_serial_id=$1;",
+ " WHERE coin_pub=$1"
+ " AND deposit_serial_id=$2;",
2),
/* Used in #postgres_get_coin_transactions() to obtain information
about how a coin has been spend with /deposit requests. */
@@ -1255,16 +1250,14 @@ prepare_statements (struct PostgresClosure *pg)
",dep.coin_sig"
",dep.deposit_serial_id"
",dep.done"
- " FROM deposits_by_coin dbc"
- " JOIN deposits dep"
- " USING (shard,deposit_serial_id)"
+ " FROM deposits dep"
" JOIN wire_targets wt"
" USING (wire_target_h_payto)"
" JOIN known_coins kc"
- " ON (kc.coin_pub = dbc.coin_pub)"
+ " ON (kc.coin_pub = dep.coin_pub)"
" JOIN denominations denoms"
" USING (denominations_serial)"
- " WHERE dbc.coin_pub=$1;",
+ " WHERE dep.coin_pub=$1;",
1),
/* Used in #postgres_get_link_data(). */
@@ -1329,20 +1322,18 @@ prepare_statements (struct PostgresClosure *pg)
",wt.payto_uri"
",denom.fee_deposit_val"
",denom.fee_deposit_frac"
- " FROM deposits_by_coin dbc"
- " JOIN deposits dep"
- " USING (shard,deposit_serial_id)"
+ " FROM deposits dep"
" JOIN wire_targets wt"
" USING (wire_target_h_payto)"
" JOIN aggregation_tracking"
" USING (deposit_serial_id)"
" JOIN known_coins kc"
- " ON (kc.coin_pub = dbc.coin_pub)"
+ " ON (kc.coin_pub = dep.coin_pub)"
" JOIN denominations denom"
" USING (denominations_serial)"
" JOIN wire_out"
" USING (wtid_raw)"
- " WHERE dbc.coin_pub=$1"
+ " WHERE dep.coin_pub=$1"
" AND dep.merchant_pub=$3"
" AND dep.h_contract_terms=$2",
3),
@@ -5898,14 +5889,13 @@ postgres_have_deposit2 (
*/
static enum GNUNET_DB_QueryStatus
postgres_mark_deposit_tiny (void *cls,
- const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
uint64_t rowid)
{
struct PostgresClosure *pg = cls;
- uint64_t deposit_shard = compute_shard (merchant_pub);
struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (coin_pub),
GNUNET_PQ_query_param_uint64 (&rowid),
- GNUNET_PQ_query_param_uint64 (&deposit_shard),
GNUNET_PQ_query_param_end
};
@@ -5927,14 +5917,13 @@ postgres_mark_deposit_tiny (void *cls,
*/
static enum GNUNET_DB_QueryStatus
postgres_mark_deposit_done (void *cls,
- const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
uint64_t rowid)
{
struct PostgresClosure *pg = cls;
- uint64_t deposit_shard = compute_shard (merchant_pub);
struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (coin_pub),
GNUNET_PQ_query_param_uint64 (&rowid),
- GNUNET_PQ_query_param_uint64 (&deposit_shard),
GNUNET_PQ_query_param_end
};
@@ -6431,6 +6420,12 @@ postgres_insert_deposit (void *cls,
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs;
}
+ if (GNUNET_TIME_timestamp_cmp (deposit->wire_deadline,
+ <,
+ deposit->refund_deadline))
+ {
+ GNUNET_break (0);
+ }
{
uint64_t shard = compute_shard (&deposit->merchant_pub);
struct GNUNET_PQ_QueryParam params[] = {
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 012cac645..79b09e0e1 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -2284,7 +2284,7 @@ run (void *cls)
"test-2"));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->mark_deposit_tiny (plugin->cls,
- &deposit.merchant_pub,
+ &deposit.coin.coin_pub,
deposit_rowid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->get_ready_deposit (plugin->cls,
@@ -2306,7 +2306,7 @@ run (void *cls)
"test-3"));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->mark_deposit_done (plugin->cls,
- &deposit.merchant_pub,
+ &deposit.coin.coin_pub,
deposit_rowid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->commit (plugin->cls));
diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h
index cee509542..2a462aba6 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -3060,13 +3060,13 @@ struct TALER_EXCHANGEDB_Plugin
* returned by @e iterate_ready_deposits()
*
* @param cls the @e cls of this struct with the plugin-specific state
- * @param merchant_pub identifies the beneficiary of the deposit
+ * @param coin_pub identifies the coin of the deposit
* @param deposit_rowid identifies the deposit row to modify
* @return query result status
*/
enum GNUNET_DB_QueryStatus
(*mark_deposit_tiny)(void *cls,
- const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
uint64_t rowid);
@@ -3076,13 +3076,13 @@ struct TALER_EXCHANGEDB_Plugin
* @e iterate_ready_deposits() or @e iterate_matching_deposits().
*
* @param cls the @e cls of this struct with the plugin-specific state
- * @param merchant_pub identifies the beneficiary of the deposit
+ * @param coin_pub identifies the coin of the deposit
* @param deposit_rowid identifies the deposit row to modify
* @return query result status
*/
enum GNUNET_DB_QueryStatus
(*mark_deposit_done)(void *cls,
- const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
uint64_t rowid);