aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <grothoff@gnunet.org>2022-03-27 10:32:28 +0200
committerChristian Grothoff <grothoff@gnunet.org>2022-03-27 10:32:28 +0200
commitd0a69da8954fd72f361795c2e007bad3fe5accd1 (patch)
tree821cc25e6f614ecfcd97d77bbc75ee24293887a7
parent646c9ad0611e5320a460206dec4fdfd516dc8f64 (diff)
towards removing tiny bit
-rw-r--r--src/exchangedb/drop0001.sql2
-rw-r--r--src/exchangedb/exchange-0001.sql69
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c360
-rw-r--r--src/exchangedb/test_exchangedb.c166
-rw-r--r--src/include/taler_exchangedb_plugin.h92
5 files changed, 582 insertions, 107 deletions
diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql
index 225c817a0..3f43a5693 100644
--- a/src/exchangedb/drop0001.sql
+++ b/src/exchangedb/drop0001.sql
@@ -82,9 +82,9 @@ DROP TABLE IF EXISTS denominations CASCADE;
DROP TABLE IF EXISTS cs_nonce_locks CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_cs_nonce_locks_partition;
-DROP TABLE IF EXISTS deposits_by_coin CASCADE;
DROP TABLE IF EXISTS global_fee CASCADE;
DROP TABLE IF EXISTS recoup_by_reserve CASCADE;
+DROP TABLE IF EXISTS aggregation_transient CASCADE;
DROP TABLE IF EXISTS partners CASCADE;
diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql
index b2fb52ac2..e6902ed1c 100644
--- a/src/exchangedb/exchange-0001.sql
+++ b/src/exchangedb/exchange-0001.sql
@@ -772,7 +772,7 @@ CREATE TABLE IF NOT EXISTS deposits_by_ready_default
CREATE TABLE IF NOT EXISTS deposits_for_matching
(refund_deadline INT8 NOT NULL
- ,shard INT8 NOT NULL
+ ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)
,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) ON DELETE CASCADE
,deposit_serial_id INT8
)
@@ -782,7 +782,7 @@ COMMENT ON TABLE deposits_for_matching
CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index
ON deposits_for_matching
- (refund_deadline ASC, shard, coin_pub);
+ (refund_deadline ASC, merchant_pub, coin_pub);
CREATE TABLE IF NOT EXISTS deposits_for_matching_default
PARTITION OF deposits_for_matching
@@ -818,12 +818,12 @@ BEGIN
THEN
INSERT INTO deposits_for_matching
(refund_deadline
- ,shard
+ ,merchant_pub
,coin_pub
,deposit_serial_id)
VALUES
(NEW.refund_deadline
- ,NEW.shard
+ ,NEW.merchant_pub
,NEW.coin_pub
,NEW.deposit_serial_id);
END IF;
@@ -866,7 +866,7 @@ BEGIN
THEN
DELETE FROM deposits_for_matching
WHERE refund_deadline = OLD.refund_deadline
- AND shard = OLD.shard
+ AND merchant_pub = OLD.merchant_pub
AND coin_pub = OLD.coin_pub
AND deposit_serial_id = OLD.deposit_serial_id;
END IF;
@@ -887,12 +887,12 @@ BEGIN
THEN
INSERT INTO deposits_for_matching
(refund_deadline
- ,shard
+ ,merchant_pub
,coin_pub
,deposit_serial_id)
VALUES
(NEW.refund_deadline
- ,NEW.shard
+ ,NEW.merchant_pub
,NEW.coin_pub
,NEW.deposit_serial_id);
END IF;
@@ -930,7 +930,7 @@ BEGIN
THEN
DELETE FROM deposits_for_matching
WHERE refund_deadline = OLD.refund_deadline
- AND shard = OLD.shard
+ AND merchant_pub = OLD.merchant_pub
AND coin_pub = OLD.coin_pub
AND deposit_serial_id = OLD.deposit_serial_id;
END IF;
@@ -1040,21 +1040,64 @@ $$;
SELECT add_constraints_to_wire_out_partition('default');
+CREATE OR REPLACE FUNCTION wire_out_delete_trigger()
+ RETURNS trigger
+ LANGUAGE plpgsql
+ AS $$
+BEGIN
+ DELETE FROM aggregation_tracking
+ WHERE wtid_raw = OLD.wtid_raw;
+ RETURN OLD;
+END $$;
+COMMENT ON FUNCTION wire_out_delete_trigger()
+ IS 'Replicate reserve_out deletions into aggregation_tracking. This replaces an earlier use of an ON DELETE CASCADE that required a DEFERRABLE constraint and conflicted with nice partitioning.';
+
+CREATE TRIGGER wire_out_on_delete
+ AFTER DELETE
+ ON wire_out
+ FOR EACH ROW EXECUTE FUNCTION wire_out_delete_trigger();
+
+
+
+-- ------------------------------ aggregation_transient ----------------------------------------
+
+-- Note: this table is not yet used; it is designed
+-- to allow us to get rid of the 'tiny BOOL' and
+-- the associated need to look at tiny
+-- deposits repeatedly.
+CREATE TABLE IF NOT EXISTS aggregation_transient
+ (amount_val INT8 NOT NULL
+ ,amount_frac INT4 NOT NULL
+ ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)
+ ,exchange_account_section TEXT NOT NULL
+ ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)
+ )
+ PARTITION BY HASH (wire_target_h_payto);
+COMMENT ON TABLE aggregation_transient
+ IS 'aggregations currently happening (lacking wire_out, usually because the amount is too low); this table is not replicated';
+COMMENT ON COLUMN aggregation_transient.amount_val
+ IS 'Sum of all of the aggregated deposits (without deposit fees)';
+COMMENT ON COLUMN aggregation_transient.wtid_raw
+ IS 'identifier of the wire transfer';
+
+CREATE TABLE IF NOT EXISTS aggregation_transient_default
+ PARTITION OF aggregation_transient
+ FOR VALUES WITH (MODULUS 1, REMAINDER 0);
+
+
-- ------------------------------ aggregation_tracking ----------------------------------------
--- FIXME-URGENT: add colum coin_pub to select by coin_pub + deposit_serial_id for more efficient deposit lookup!?
--- Or which direction(s) is this table used? Is the partitioning sane??
CREATE TABLE IF NOT EXISTS aggregation_tracking
(aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE
,deposit_serial_id INT8 PRIMARY KEY -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE
- ,wtid_raw BYTEA NOT NULL CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE
+ ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)
)
PARTITION BY HASH (deposit_serial_id);
COMMENT ON TABLE aggregation_tracking
IS 'mapping from wire transfer identifiers (WTID) to deposits (and back)';
COMMENT ON COLUMN aggregation_tracking.wtid_raw
- IS 'We first create entries in the aggregation_tracking table and then finally the wire_out entry once we know the total amount. Hence the constraint must be deferrable and we cannot use a wireout_uuid here, because we do not have it when these rows are created. Changing the logic to first INSERT a dummy row into wire_out and then UPDATEing that row in the same transaction would theoretically reduce per-deposit storage costs by 5 percent (24/~460 bytes).';
+ IS 'identifier of the wire transfer';
CREATE TABLE IF NOT EXISTS aggregation_tracking_default
PARTITION OF aggregation_tracking
@@ -1070,7 +1113,7 @@ BEGIN
EXECUTE FORMAT (
'ALTER TABLE aggregation_tracking_' || partition_suffix || ' '
'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key '
- 'UNIQUE (aggregation_serial_id) '
+ 'UNIQUE (aggregation_serial_id);'
);
END
$$;
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index c7bdae397..1709f17e4 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1188,7 +1188,7 @@ prepare_statements (struct PostgresClosure *pg)
" ,dbr.shard ASC"
" LIMIT 1;",
4),
- /* Used in #postgres_iterate_matching_deposits() */
+ /* FIXME: deprecated; Used in #postgres_iterate_matching_deposits() */
GNUNET_PQ_make_prepare (
"deposits_iterate_matching",
"SELECT"
@@ -1207,14 +1207,115 @@ prepare_statements (struct PostgresClosure *pg)
" JOIN denominations denom"
" USING (denominations_serial)"
" WHERE dfm.refund_deadline<$3"
- " AND dfm.shard=$4"
+ " AND dfm.merchant_pub=$1"
" AND dep.merchant_pub=$1"
" AND dep.wire_target_h_payto=$2"
" LIMIT "
TALER_QUOTE (
TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";",
+ 3),
+
+ /* Used in #postgres_aggregate() */
+ GNUNET_PQ_make_prepare (
+ "aggregate",
+ "WITH rdy AS (" /* find deposits ready */
+ " SELECT"
+ " coin_pub"
+ " FROM deposits_for_matching"
+ " WHERE refund_deadline<$1"
+ " AND merchant_pub=$2"
+ " ORDER BY refund_deadline ASC" /* ordering is not critical */
+ " LIMIT "
+ TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
+ " )"
+ " ,dep AS (" /* restrict to our merchant and account */
+ " UPDATE deposits"
+ " SET done=TRUE"
+ " WHERE coin_pub IN (SELECT coin_pub FROM rdy)"
+ " AND merchant_pub=$2"
+ " AND wire_target_h_payto=$3"
+ " RETURNING"
+ " deposit_serial_id"
+ " ,coin_pub"
+ " ,amount_with_fee_val AS amount_val"
+ " ,amount_with_fee_frac AS amount_frac)"
+ " ,ref AS (" /* find applicable refunds */
+ " SELECT"
+ " amount_with_fee_val AS refund_val"
+ " ,amount_with_fee_frac AS refund_frac"
+ " ,coin_pub"
+ " FROM refunds"
+ " WHERE coin_pub IN (SELECT coin_pub FROM dep)"
+ " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))"
+ " ,fees AS (" /* find deposit fees for non-refunded deposits */
+ " SELECT"
+ " denom.fee_deposit_val AS fee_val"
+ " ,denom.fee_deposit_frac AS fee_frac"
+ " FROM known_coins kc"
+ " JOIN denominations denom"
+ " USING (denominations_serial)"
+ " WHERE coin_pub IN (SELECT coin_pub FROM dep)"
+ " AND coin_pub NOT IN (SELECT coin_pub FROM ref))"
+ " ,dummy AS (" /* add deposits to aggregation_tracking */
+ " INSERT INTO aggregation_tracking"
+ " (deposit_serial_id"
+ " ,wtid_raw)"
+ " SELECT deposit_serial_id,$4"
+ " FROM dep)"
+ "SELECT" /* calculate totals (deposits, refunds and fees) */
+ " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value"
+ " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction"
+ " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value"
+ " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction"
+ " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value"
+ " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction"
+ " FROM dep "
+ " FULL OUTER JOIN ref ON (FALSE)"
+ " FULL OUTER JOIN fees ON (FALSE);",
+ 4),
+
+
+ /* Used in #postgres_create_aggregation_transient() */
+ GNUNET_PQ_make_prepare (
+ "create_aggregation_transient",
+ "INSERT INTO aggregation_transient"
+ " (amount_val"
+ " ,amount_frac"
+ " ,wire_target_h_payto"
+ " ,exchange_account_section"
+ " ,wtid_raw)"
+ " VALUES ($1, $2, $3, $4, $5);",
+ 5),
+ /* Used in #postgres_select_aggregation_transient() */
+ GNUNET_PQ_make_prepare (
+ "select_aggregation_transient",
+ "SELECT"
+ " amount_val"
+ " ,amount_frac"
+ " ,wtid_raw"
+ " FROM aggregation_transient"
+ " WHERE wire_target_h_payto=$1"
+ " AND exchange_account_section=$2;",
+ 2),
+ /* Used in #postgres_update_aggregation_transient() */
+ GNUNET_PQ_make_prepare (
+ "update_aggregation_transient",
+ "UPDATE aggregation_transient"
+ " SET amount_val=$1"
+ " ,amount_frac=$2"
+ " WHERE wire_target_h_payto=$3"
+ " AND wtid_raw=$4",
4),
- /* Used in #postgres_mark_deposit_tiny() */
+ /* Used in #postgres_delete_aggregation_transient() */
+ GNUNET_PQ_make_prepare (
+ "delete_aggregation_transient",
+ "DELETE FROM aggregation_transient"
+ " WHERE wire_target_h_payto=$1"
+ " AND wtid_raw=$2",
+ 2),
+
+
+ /* FIXME-deprecated: Used in #postgres_mark_deposit_tiny() */
GNUNET_PQ_make_prepare (
"mark_deposit_tiny",
"UPDATE deposits"
@@ -1222,7 +1323,7 @@ prepare_statements (struct PostgresClosure *pg)
" WHERE coin_pub=$1"
" AND deposit_serial_id=$2",
2),
- /* Used in #postgres_mark_deposit_done() */
+ /* FIXME-deprecated: Used in #postgres_mark_deposit_done() */
GNUNET_PQ_make_prepare (
"mark_deposit_done",
"UPDATE deposits"
@@ -1230,6 +1331,7 @@ prepare_statements (struct PostgresClosure *pg)
" WHERE coin_pub=$1"
" AND deposit_serial_id=$2;",
2),
+
/* Used in #postgres_get_coin_transactions() to obtain information
about how a coin has been spend with /deposit requests. */
GNUNET_PQ_make_prepare (
@@ -2835,8 +2937,8 @@ prepare_statements (struct PostgresClosure *pg)
GNUNET_PQ_make_prepare (
"insert_into_table_refunds",
"INSERT INTO refunds"
- "(coin_pub"
- ",refund_serial_id"
+ "(refund_serial_id"
+ ",coin_pub"
",merchant_sig"
",rtransaction_id"
",amount_with_fee_val"
@@ -5869,6 +5971,240 @@ postgres_have_deposit2 (
/**
+ * Aggregate all matching deposits for @a h_payto and
+ * @a merchant_pub, returning the total amounts.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param merchant_pub public key of the merchant
+ * @param wtid wire transfer ID to set for the aggregate
+ * @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_aggregate (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_TIME_Absolute now = {0};
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_auto_from_type (merchant_pub),
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+ uint64_t sum_deposit_value;
+ uint64_t sum_deposit_frac;
+ uint64_t sum_refund_value;
+ uint64_t sum_refund_frac;
+ uint64_t sum_fee_value;
+ uint64_t sum_fee_frac;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
+ &sum_deposit_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
+ &sum_deposit_frac),
+ GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
+ &sum_refund_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
+ &sum_refund_frac),
+ GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
+ &sum_fee_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
+ &sum_fee_frac),
+ GNUNET_PQ_result_spec_end
+ };
+ enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount sum_deposit;
+ struct TALER_Amount sum_refund;
+ struct TALER_Amount sum_fee;
+ struct TALER_Amount delta;
+
+ now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
+ pg->aggregator_shift);
+ qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "aggregate",
+ params,
+ rs);
+ if (qs < 0)
+ {
+ GNUNET_assert (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ total));
+ return qs;
+ }
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_deposit));
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_refund));
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_fee));
+ sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_deposit_value;
+ sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
+ sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_refund_value;
+ sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE;
+ sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_fee_value;
+ sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \
+ GNUNET_assert (0 <=
+ TALER_amount_subtract (&delta,
+ &sum_deposit,
+ &sum_refund));
+ GNUNET_assert (0 <=
+ TALER_amount_subtract (total,
+ &delta,
+ &sum_fee));
+ return qs;
+}
+
+
+/**
+ * Create a new entry in the transient aggregation table.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param exchange_account_section exchange account to use
+ * @param wtid the raw wire transfer identifier to be used
+ * @param total amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_create_aggregation_transient (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const char *exchange_account_section,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_amount (total),
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_string (exchange_account_section),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "create_aggregation_transient",
+ params);
+}
+
+
+/**
+ * Find existing entry in the transient aggregation table.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param exchange_account_section exchange account to use
+ * @param[out] wtid set to the raw wire transfer identifier to be used
+ * @param[out] total existing amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_select_aggregation_transient (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const char *exchange_account_section,
+ struct TALER_WireTransferIdentifierRawP *wtid,
+ struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_string (exchange_account_section),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ TALER_PQ_RESULT_SPEC_AMOUNT ("amount",
+ total),
+ GNUNET_PQ_result_spec_auto_from_type ("wtid_raw",
+ wtid),
+ GNUNET_PQ_result_spec_end
+ };
+
+ return GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "select_aggregation_transient",
+ params,
+ rs);
+}
+
+
+/**
+ * Update existing entry in the transient aggregation table.
+ * @a h_payto is only needed for query performance.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param wtid the raw wire transfer identifier to update
+ * @param total new total amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_update_aggregation_transient (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_amount (total),
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "update_aggregation_transient",
+ params);
+}
+
+
+/**
+ * Delete existing entry in the transient aggregation table.
+ * @a h_payto is only needed for query performance.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param wtid the raw wire transfer identifier to update
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_delete_aggregation_transient (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_WireTransferIdentifierRawP *wtid)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "delete_aggregation_transient",
+ params);
+}
+
+
+/**
* Mark a deposit as tiny, thereby declaring that it cannot be
* executed by itself and should no longer be returned by
* @e iterate_ready_deposits()
@@ -6147,12 +6483,10 @@ postgres_iterate_matching_deposits (
{
struct PostgresClosure *pg = cls;
struct GNUNET_TIME_Absolute now = {0};
- uint64_t shard = compute_shard (merchant_pub);
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (merchant_pub),
GNUNET_PQ_query_param_auto_from_type (h_payto),
GNUNET_PQ_query_param_absolute_time (&now),
- GNUNET_PQ_query_param_uint64 (&shard),
GNUNET_PQ_query_param_end
};
struct MatchingDepositContext mdc = {
@@ -9299,7 +9633,6 @@ refunds_serial_helper_cb (void *cls,
struct RefundsSerialContext *rsc = cls;
struct PostgresClosure *pg = rsc->pg;
- fprintf (stderr, "Got %u results\n", num_results);
for (unsigned int i = 0; i<num_results; i++)
{
struct TALER_EXCHANGEDB_Refund refund;
@@ -13081,6 +13414,15 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->get_known_coin = &postgres_get_known_coin;
plugin->get_coin_denomination = &postgres_get_coin_denomination;
plugin->have_deposit2 = &postgres_have_deposit2;
+ plugin->aggregate = &postgres_aggregate;
+ plugin->create_aggregation_transient
+ = &postgres_create_aggregation_transient;
+ plugin->select_aggregation_transient
+ = &postgres_select_aggregation_transient;
+ plugin->update_aggregation_transient
+ = &postgres_update_aggregation_transient;
+ plugin->delete_aggregation_transient
+ = &postgres_delete_aggregation_transient;
plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny;
plugin->mark_deposit_done = &postgres_mark_deposit_done;
plugin->get_ready_deposit = &postgres_get_ready_deposit;
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 79b09e0e1..9e2e8a480 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -675,48 +675,6 @@ deposit_cb (void *cls,
/**
- * Function called with details about deposits that
- * have been made. Called in the test on the
- * deposit given in @a cls.
- *
- * @param cls closure a `struct TALER_EXCHANGEDB_Deposit *`
- * @param rowid unique ID for the deposit in our DB, used for marking
- * it as 'tiny' or 'done'
- * @param coin_pub public key of the coin
- * @param amount_with_fee amount that was deposited including fee
- * @param deposit_fee amount the exchange gets to keep as transaction fees
- * @param h_contract_terms hash of the proposal data known to merchant and customer
- * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
- */
-static enum GNUNET_DB_QueryStatus
-matching_deposit_cb (void *cls,
- uint64_t rowid,
- const struct TALER_CoinSpendPublicKeyP *coin_pub,
- const struct TALER_Amount *amount_with_fee,
- const struct TALER_Amount *deposit_fee,
- const struct TALER_PrivateContractHashP *h_contract_terms)
-{
- struct TALER_EXCHANGEDB_Deposit *deposit = cls;
-
- deposit_rowid = rowid;
- if ( (0 != TALER_amount_cmp (amount_with_fee,
- &deposit->amount_with_fee)) ||
- (0 != TALER_amount_cmp (deposit_fee,
- &deposit->deposit_fee)) ||
- (0 != GNUNET_memcmp (h_contract_terms,
- &deposit->h_contract_terms)) ||
- (0 != GNUNET_memcmp (coin_pub,
- &deposit->coin.coin_pub)) )
- {
- GNUNET_break (0);
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
-
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-}
-
-
-/**
* Callback for #select_deposits_above_serial_id ()
*
* @param cls closure
@@ -1055,7 +1013,7 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit *deposit)
&h_payto);
auditor_row_cnt = 0;
memset (&wire_out_wtid,
- 42,
+ 41,
sizeof (wire_out_wtid));
wire_out_date = GNUNET_TIME_timestamp_get ();
GNUNET_assert (GNUNET_OK ==
@@ -1109,14 +1067,6 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit *deposit)
&coin_fee2,
&kyc));
}
- /* insert WT data */
- FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
- plugin->insert_aggregation_tracking (plugin->cls,
- &wire_out_wtid,
- deposit_rowid));
-
- /* Now let's fix the transient constraint violation by
- putting in the WTID into the wire_out table */
{
struct TALER_ReservePublicKeyP rpub;
struct TALER_EXCHANGEDB_KycStatus kyc;
@@ -2270,44 +2220,92 @@ run (void *cls)
&deposit_cb,
&deposit));
FAILIF (8 == result);
- FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
- plugin->iterate_matching_deposits (plugin->cls,
- &wire_target_h_payto,
- &deposit.merchant_pub,
- &matching_deposit_cb,
- &deposit,
- 2));
+ {
+ struct TALER_Amount total;
+ struct TALER_WireTransferIdentifierRawP wtid;
+
+ memset (&wtid,
+ 41,
+ sizeof (wtid));
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+ plugin->aggregate (plugin->cls,
+ &wire_target_h_payto,
+ &deposit.merchant_pub,
+ &wtid,
+ &total));
+ }
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->commit (plugin->cls));
FAILIF (GNUNET_OK !=
plugin->start (plugin->cls,
- "test-2"));
- FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
- plugin->mark_deposit_tiny (plugin->cls,
- &deposit.coin.coin_pub,
- deposit_rowid));
- FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
- plugin->get_ready_deposit (plugin->cls,
- 0,
- INT32_MAX,
- true,
- &deposit_cb,
- &deposit));
- plugin->rollback (plugin->cls);
- FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
- plugin->get_ready_deposit (plugin->cls,
- 0,
- INT32_MAX,
- true,
- &deposit_cb,
- &deposit));
- FAILIF (GNUNET_OK !=
- plugin->start (plugin->cls,
"test-3"));
- FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
- plugin->mark_deposit_done (plugin->cls,
- &deposit.coin.coin_pub,
- deposit_rowid));
+ {
+ struct TALER_WireTransferIdentifierRawP wtid;
+ struct TALER_Amount total;
+ struct TALER_WireTransferIdentifierRawP wtid2;
+ struct TALER_Amount total2;
+
+ memset (&wtid,
+ 42,
+ sizeof (wtid));
+ GNUNET_assert (GNUNET_OK ==
+ TALER_string_to_amount (CURRENCY ":42",
+ &total));
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
+ plugin->select_aggregation_transient (plugin->cls,
+ &wire_target_h_payto,
+ "x-bank",
+ &wtid2,
+ &total2));
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+ plugin->create_aggregation_transient (plugin->cls,
+ &wire_target_h_payto,
+ "x-bank",
+ &wtid,
+ &total));
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+ plugin->select_aggregation_transient (plugin->cls,
+ &wire_target_h_payto,
+ "x-bank",
+ &wtid2,
+ &total2));
+ FAILIF (0 !=
+ GNUNET_memcmp (&wtid2,
+ &wtid));
+ FAILIF (0 !=
+ TALER_amount_cmp (&total2,
+ &total));
+ GNUNET_assert (GNUNET_OK ==
+ TALER_string_to_amount (CURRENCY ":43",
+ &total));
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+ plugin->update_aggregation_transient (plugin->cls,
+ &wire_target_h_payto,
+ &wtid,
+ &total));
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+ plugin->select_aggregation_transient (plugin->cls,
+ &wire_target_h_payto,
+ "x-bank",
+ &wtid2,
+ &total2));
+ FAILIF (0 !=
+ GNUNET_memcmp (&wtid2,
+ &wtid));
+ FAILIF (0 !=
+ TALER_amount_cmp (&total2,
+ &total));
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+ plugin->delete_aggregation_transient (plugin->cls,
+ &wire_target_h_payto,
+ &wtid));
+ FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
+ plugin->select_aggregation_transient (plugin->cls,
+ &wire_target_h_payto,
+ "x-bank",
+ &wtid2,
+ &total2));
+ }
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->commit (plugin->cls));
diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h
index b2ea240e2..4ca6905e0 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -3146,6 +3146,98 @@ struct TALER_EXCHANGEDB_Plugin
/**
+ * Aggregate all matching deposits for @a h_payto and
+ * @a merchant_pub, returning the total amounts.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param merchant_pub public key of the merchant
+ * @param wtid wire transfer ID to set for the aggregate
+ * @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds
+ * @return transaction status
+ */
+ enum GNUNET_DB_QueryStatus
+ (*aggregate)(
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ struct TALER_Amount *total);
+
+
+ /**
+ * Create a new entry in the transient aggregation table.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param exchange_account_section exchange account to use
+ * @param wtid the raw wire transfer identifier to be used
+ * @param total amount to be wired in the future
+ * @return transaction status
+ */
+ enum GNUNET_DB_QueryStatus
+ (*create_aggregation_transient)(
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const char *exchange_account_section,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_Amount *total);
+
+
+ /**
+ * Find existing entry in the transient aggregation table.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param exchange_account_section exchange account to use
+ * @param[out] wtid set to the raw wire transfer identifier to be used
+ * @param[out] total existing amount to be wired in the future
+ * @return transaction status
+ */
+ enum GNUNET_DB_QueryStatus
+ (*select_aggregation_transient)(
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const char *exchange_account_section,
+ struct TALER_WireTransferIdentifierRawP *wtid,
+ struct TALER_Amount *total);
+
+
+ /**
+ * Update existing entry in the transient aggregation table.
+ * @a h_payto is only needed for query performance.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param wtid the raw wire transfer identifier to update
+ * @param total new total amount to be wired in the future
+ * @return transaction status
+ */
+ enum GNUNET_DB_QueryStatus
+ (*update_aggregation_transient)(
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_Amount *total);
+
+
+ /**
+ * Delete existing entry in the transient aggregation table.
+ * @a h_payto is only needed for query performance.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param wtid the raw wire transfer identifier to update
+ * @return transaction status
+ */
+ enum GNUNET_DB_QueryStatus
+ (*delete_aggregation_transient)(
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_WireTransferIdentifierRawP *wtid);
+
+
+ /**
* Lookup melt commitment data under the given @a rc.
*
* @param cls the @e cls of this struct with the plugin-specific state