From 6457ee56dfb148773167e45fb408176e7370f817 Mon Sep 17 00:00:00 2001 From: Joseph Date: Wed, 4 Jan 2023 07:31:08 -0500 Subject: corrections applied to batch_test --- src/exchangedb/Makefile.am | 28 ++- .../exchange_do_batch2_reserves_in_insert.sql | 26 +- .../exchange_do_batch4_reserves_in_insert.sql | 27 +- .../exchange_do_batch8_reserves_in_insert.sql | 47 ++-- .../exchange_do_batch_reserves_in_insert.sql | 11 +- .../exchange_do_batch_reserves_update.sql | 73 ++---- src/exchangedb/pg_batch2_reserves_in_insert.c | 177 ++++++++----- src/exchangedb/pg_batch2_reserves_in_insert.h | 2 +- src/exchangedb/pg_batch_reserves_in_insert.c | 75 +++--- src/exchangedb/test_exchangedb_by_j.c | 2 +- src/exchangedb/test_exchangedb_populate_table.c | 274 +++++++++++---------- 11 files changed, 412 insertions(+), 330 deletions(-) diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index 3f4900192..ef069c3cf 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -295,16 +295,16 @@ check_PROGRAMS = \ bench-db-postgres\ perf-exchangedb-reserves-in-insert-postgres\ test-exchangedb-by-j-postgres\ - test-exchangedb-batch-reserves-in-insert-postgres - + test-exchangedb-batch-reserves-in-insert-postgres\ + test-exchangedb-populate-table-postgres AM_TESTS_ENVIRONMENT=export TALER_PREFIX=$${TALER_PREFIX:-@libdir@};export PATH=$${TALER_PREFIX:-@prefix@}/bin:$$PATH; TESTS = \ test-exchangedb-postgres\ test-exchangedb-by-j-postgres\ perf-exchangedb-reserves-in-insert-postgres\ - test-exchangedb-batch-reserves-in-insert-postgres - + test-exchangedb-batch-reserves-in-insert-postgres\ + test-exchangedb-populate-table-postgres test_exchangedb_postgres_SOURCES = \ test_exchangedb.c test_exchangedb_postgres_LDADD = \ @@ -376,7 +376,27 @@ bench_db_postgres_LDADD = \ -lgnunetutil \ $(XLIB) +test_exchangedb_populate_table_postgres_SOURCES = \ + test_exchangedb_populate_table.c +test_exchangedb_populate_table_postgres_LDADD = \ + libtalerexchangedb.la \ + $(top_builddir)/src/json/libtalerjson.la \ + $(top_builddir)/src/util/libtalerutil.la \ + $(top_builddir)/src/pq/libtalerpq.la \ + -ljansson \ + -lgnunetjson \ + -lgnunetutil \ + $(XLIB) +bench_db_postgres_SOURCES = \ + bench_db.c +bench_db_postgres_LDADD = \ + libtalerexchangedb.la \ + $(top_builddir)/src/util/libtalerutil.la \ + $(top_builddir)/src/pq/libtalerpq.la \ + -lgnunetpq \ + -lgnunetutil \ + $(XLIB) EXTRA_test_exchangedb_postgres_DEPENDENCIES = \ libtaler_plugin_exchangedb_postgres.la diff --git a/src/exchangedb/exchange_do_batch2_reserves_in_insert.sql b/src/exchangedb/exchange_do_batch2_reserves_in_insert.sql index 0e2d37d86..417b08ebf 100644 --- a/src/exchangedb/exchange_do_batch2_reserves_in_insert.sql +++ b/src/exchangedb/exchange_do_batch2_reserves_in_insert.sql @@ -54,8 +54,8 @@ DECLARE r RECORD; BEGIN --SIMPLE INSERT ON CONFLICT DO NOTHING - transaction_duplicate=FALSE; - transaction_duplicate2=FALSE; + transaction_duplicate=TRUE; + transaction_duplicate2=TRUE; out_reserve_found = TRUE; out_reserve_found2 = TRUE; ruuid=0; @@ -148,7 +148,7 @@ BEGIN ,in_exchange_account_name ,in_wire_source_h_payto ,in_expiration_date), - (in2_reserve_pub + (in3_reserve_pub ,in2_wire_ref ,in2_credit_val ,in2_credit_frac @@ -163,26 +163,32 @@ BEGIN THEN IF in_reserve_pub = r.reserve_pub THEN - transaction_duplicate = TRUE; + transaction_duplicate = FALSE; END IF; - IF in2_reserve_pub = i.reserve_pub + IF in2_reserve_pub = r.reserve_pub THEN - transaction_duplicate = TRUE; + transaction_duplicate2 = FALSE; END IF; FETCH FROM curs_transaction_exist INTO r; IF FOUND THEN IF in_reserve_pub = r.reserve_pub THEN - transaction_duplicate = TRUE; + transaction_duplicate = FALSE; END IF; - IF in2_reserve_pub = i.reserve_pub + IF in2_reserve_pub = r.reserve_pub THEN - transaction_duplicate = TRUE; + transaction_duplicate2 = FALSE; END IF; END IF; END IF; - +/* IF transaction_duplicate + OR transaction_duplicate2 + THEN + CLOSE curs_transaction_exist; + ROLLBACK; + RETURN; + END IF;*/ CLOSE curs_transaction_exist; RETURN; END $$; diff --git a/src/exchangedb/exchange_do_batch4_reserves_in_insert.sql b/src/exchangedb/exchange_do_batch4_reserves_in_insert.sql index 5519da157..e2ab133bd 100644 --- a/src/exchangedb/exchange_do_batch4_reserves_in_insert.sql +++ b/src/exchangedb/exchange_do_batch4_reserves_in_insert.sql @@ -81,10 +81,10 @@ DECLARE BEGIN --INITIALIZATION - transaction_duplicate=FALSE; - transaction_duplicate2=FALSE; - transaction_duplicate3=FALSE; - transaction_duplicate4=FALSE; + transaction_duplicate=TRUE; + transaction_duplicate2=TRUE; + transaction_duplicate3=TRUE; + transaction_duplicate4=TRUE; out_reserve_found = TRUE; out_reserve_found2 = TRUE; out_reserve_found3 = TRUE; @@ -232,25 +232,34 @@ BEGIN THEN IF in_reserve_pub = i.reserve_pub THEN - transaction_duplicate = TRUE; + transaction_duplicate = FALSE; END IF; IF in2_reserve_pub = i.reserve_pub THEN - transaction_duplicate2 = TRUE; + transaction_duplicate2 = FALSE; END IF; IF in3_reserve_pub = i.reserve_pub THEN - transaction_duplicate3 = TRUE; + transaction_duplicate3 = FALSE; END IF; IF in4_reserve_pub = i.reserve_pub THEN - transaction_duplicate4 = TRUE; + transaction_duplicate4 = FALSE; END IF; END IF; k=k+1; END LOOP; - CLOSE curs_transaction_exist; +/* IF transaction_duplicate + OR transaction_duplicate2 + OR transaction_duplicate3 + OR transaction_duplicate4 + THEN + RAISE EXCEPTION 'Reserve did not exist, but INSERT into reserves_in gave conflict'; + CLOSE curs_transaction_exist; + RETURN; + END IF;*/ + CLOSE curs_transaction_exist; RETURN; END $$; diff --git a/src/exchangedb/exchange_do_batch8_reserves_in_insert.sql b/src/exchangedb/exchange_do_batch8_reserves_in_insert.sql index fc735adda..2c61bfedf 100644 --- a/src/exchangedb/exchange_do_batch8_reserves_in_insert.sql +++ b/src/exchangedb/exchange_do_batch8_reserves_in_insert.sql @@ -136,14 +136,14 @@ DECLARE BEGIN --INITIALIZATION - transaction_duplicate=FALSE; - transaction_duplicate2=FALSE; - transaction_duplicate3=FALSE; - transaction_duplicate4=FALSE; - transaction_duplicate5=FALSE; - transaction_duplicate6=FALSE; - transaction_duplicate7=FALSE; - transaction_duplicate8=FALSE; + transaction_duplicate=TRUE; + transaction_duplicate2=TRUE; + transaction_duplicate3=TRUE; + transaction_duplicate4=TRUE; + transaction_duplicate5=TRUE; + transaction_duplicate6=TRUE; + transaction_duplicate7=TRUE; + transaction_duplicate8=TRUE; out_reserve_found = TRUE; out_reserve_found2 = TRUE; out_reserve_found3 = TRUE; @@ -388,41 +388,52 @@ BEGIN THEN IF in_reserve_pub = r.reserve_pub THEN - transaction_duplicate = TRUE; + transaction_duplicate = FALSE; END IF; IF in2_reserve_pub = r.reserve_pub THEN - transaction_duplicate2 = TRUE; + transaction_duplicate2 = FALSE; END IF; IF in3_reserve_pub = r.reserve_pub THEN - transaction_duplicate3 = TRUE; + transaction_duplicate3 = FALSE; END IF; IF in4_reserve_pub = r.reserve_pub THEN - transaction_duplicate4 = TRUE; + transaction_duplicate4 = FALSE; END IF; IF in5_reserve_pub = r.reserve_pub THEN - transaction_duplicate5 = TRUE; + transaction_duplicate5 = FALSE; END IF; IF in6_reserve_pub = r.reserve_pub THEN - transaction_duplicate6 = TRUE; + transaction_duplicate6 = FALSE; END IF; IF in7_reserve_pub = r.reserve_pub THEN - transaction_duplicate7 = TRUE; + transaction_duplicate7 = FALSE; END IF; IF in8_reserve_pub = r.reserve_pub THEN - transaction_duplicate8 = TRUE; + transaction_duplicate8 = FALSE; END IF; - END IF; k=k+1; END LOOP; - +/* IF transaction_duplicate + OR transaction_duplicate2 + OR transaction_duplicate3 + OR transaction_duplicate4 + OR transaction_duplicate5 + OR transaction_duplicate6 + OR transaction_duplicate7 + OR transaction_duplicate8 + THEN + CLOSE curs_transaction_existed; + ROLLBACK; + RETURN; + END IF;*/ CLOSE curs_transaction_existed; RETURN; END $$; diff --git a/src/exchangedb/exchange_do_batch_reserves_in_insert.sql b/src/exchangedb/exchange_do_batch_reserves_in_insert.sql index 56cae3d91..4ae7f68a2 100644 --- a/src/exchangedb/exchange_do_batch_reserves_in_insert.sql +++ b/src/exchangedb/exchange_do_batch_reserves_in_insert.sql @@ -36,7 +36,7 @@ AS $$ BEGIN ruuid= 0; out_reserve_found = TRUE; -transaction_duplicate= FALSE; +transaction_duplicate= TRUE; --SIMPLE INSERT ON CONFLICT DO NOTHING INSERT INTO wire_targets (wire_target_h_payto @@ -66,7 +66,7 @@ transaction_duplicate= FALSE; out_reserve_found = FALSE; ELSE -- We made no change, which means the reserve existed. - out_reserve_found = TRUE; /*RESERVE EXISTED BUT WE DO NOT HAVE ANY INFORMATION ABOUT TRANSACTION, RETURN*/ + out_reserve_found = TRUE; RETURN; END IF; PERFORM pg_notify(in_notify, NULL); @@ -89,11 +89,14 @@ transaction_duplicate= FALSE; ON CONFLICT DO NOTHING; IF FOUND THEN - transaction_duplicate = FALSE; /*HAPPY PATH THERE IS NO DUPLICATE TRANS AND NEW RESERVE*/ + -- HAPPY PATH THERE IS NO DUPLICATE TRANS + transaction_duplicate = FALSE; RETURN; ELSE + -- Unhappy... + RAISE EXCEPTION 'Reserve did not exist, but INSERT into reserves_in gave conflict'; transaction_duplicate = TRUE; - /*HAPPY PATH IF THERE IS A DUPLICATE TRANS WE JUST NEED TO ROLLBACK COMPLAIN*/ + ROLLBACK; RETURN; END IF; RETURN; diff --git a/src/exchangedb/exchange_do_batch_reserves_update.sql b/src/exchangedb/exchange_do_batch_reserves_update.sql index 05dd1876d..82b6b84c1 100644 --- a/src/exchangedb/exchange_do_batch_reserves_update.sql +++ b/src/exchangedb/exchange_do_batch_reserves_update.sql @@ -14,25 +14,19 @@ -- TALER; see the file COPYING. If not, see -- -CREATE OR REPLACE PROCEDURE exchange_do_batch_reserves_update( +CREATE OR REPLACE FUNCTION exchange_do_batch_reserves_update( IN in_reserve_pub BYTEA, IN in_expiration_date INT8, IN in_wire_ref INT8, IN in_credit_val INT8, IN in_credit_frac INT4, IN in_exchange_account_name VARCHAR, - IN in_reserve_found BOOLEAN, IN in_wire_source_h_payto BYTEA, - IN in_notify text) + IN in_notify text, + OUT out_duplicate BOOLEAN) LANGUAGE plpgsql AS $$ -DECLARE - i RECORD; -DECLARE - curs refcursor; BEGIN - OPEN curs FOR - WITH reserves_update AS ( INSERT INTO reserves_in (reserve_pub ,wire_reference @@ -49,46 +43,31 @@ BEGIN ,in_exchange_account_name ,in_wire_source_h_payto ,in_expiration_date) - ON CONFLICT DO NOTHING - RETURNING reserve_pub, credit_val, credit_frac) - SELECT * FROM reserves_in; - - FETCH FROM curs INTO i; - IF FOUND --IF THE INSERTION WAS A SUCCESS IT MEANS NO DUPLICATED TRANSACTION + ON CONFLICT DO NOTHING; + IF FOUND THEN - IF in_reserve_found - THEN - UPDATE reserves - SET - current_balance_frac = current_balance_frac+in_credit_frac - - CASE - WHEN current_balance_frac + in_credit_frac >= 100000000 - THEN 100000000 - ELSE 1 - END - ,current_balance_val = current_balance_val+in_credit_val - + CASE - WHEN current_balance_frac + in_credit_frac >= 100000000 - THEN 1 - ELSE 0 - END - ,expiration_date=GREATEST(expiration_date,in_expiration_date) - ,gc_date=GREATEST(gc_date,in_expiration_date) - WHERE reserve_pub=in_reserve_pub; - END IF; + --IF THE INSERTION WAS A SUCCESS IT MEANS NO DUPLICATED TRANSACTION + out_duplicate = FALSE; + UPDATE reserves + SET + current_balance_frac = current_balance_frac+in_credit_frac + - CASE + WHEN current_balance_frac + in_credit_frac >= 100000000 + THEN 100000000 + ELSE 1 + END + ,current_balance_val = current_balance_val+in_credit_val + + CASE + WHEN current_balance_frac + in_credit_frac >= 100000000 + THEN 1 + ELSE 0 + END + ,expiration_date=GREATEST(expiration_date,in_expiration_date) + ,gc_date=GREATEST(gc_date,in_expiration_date) + WHERE reserve_pub=in_reserve_pub; PERFORM pg_notify(in_notify, NULL); ELSE - CLOSE curs; - IF NOT in_reserve_found - THEN - ROLLBACK; - END IF; - PERFORM pg_notify(in_notify, NULL); - -/* UPDATE reserves_in - SET credit_frac = credit_frac - in_credit_frac - AND credit_val = credit_val + in_credit_val - WHERE reserve_pub = in_reserve_pub;*/ + out_duplicate = TRUE; END IF; - + RETURN; END $$; diff --git a/src/exchangedb/pg_batch2_reserves_in_insert.c b/src/exchangedb/pg_batch2_reserves_in_insert.c index 01f6d1455..90848e0e6 100644 --- a/src/exchangedb/pg_batch2_reserves_in_insert.c +++ b/src/exchangedb/pg_batch2_reserves_in_insert.c @@ -62,7 +62,8 @@ insert1(struct PostgresClosure *pg, struct GNUNET_TIME_Timestamp reserve_expiration, bool *transaction_duplicate, bool *conflict, - uint64_t *reserve_uuid) + uint64_t *reserve_uuid, + enum GNUNET_DB_QueryStatus results[1]) { enum GNUNET_DB_QueryStatus qs2; PREPARE (pg, @@ -107,23 +108,24 @@ insert1(struct PostgresClosure *pg, "batch1_reserve_create", params, rs); - if (qs2 < 0) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to create reserves (%d)\n", - qs2); - return qs2; - } - GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs2); - - if (conflict[0] && transaction_duplicate[0]) - { - GNUNET_break (0); - TEH_PG_rollback (pg); - return GNUNET_DB_STATUS_HARD_ERROR; - } - return qs2; + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserves (%d)\n", + qs2); + results[0] = qs2; + return qs2; + } + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs2); + if ((conflict[0]) && transaction_duplicate[0]) + { + GNUNET_break (0); + TEH_PG_rollback (pg); + results[0] = GNUNET_DB_STATUS_HARD_ERROR; + return GNUNET_DB_STATUS_HARD_ERROR; + } + results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + return qs2; } @@ -138,7 +140,8 @@ insert2 (struct PostgresClosure *pg, struct GNUNET_TIME_Timestamp reserve_expiration, bool *transaction_duplicate, bool *conflict, - uint64_t *reserve_uuid) + uint64_t *reserve_uuid, + enum GNUNET_DB_QueryStatus results[1]) { enum GNUNET_DB_QueryStatus qs1; PREPARE (pg, @@ -154,7 +157,7 @@ insert2 (struct PostgresClosure *pg, " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22);"); struct GNUNET_PQ_QueryParam params[] = { - // THIS is wrong, not 22 args! + GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub), GNUNET_PQ_query_param_timestamp (&expiry), GNUNET_PQ_query_param_timestamp (&gc), @@ -209,6 +212,7 @@ insert2 (struct PostgresClosure *pg, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to create reserves (%d)\n", qs1); + results[0]=qs1; return qs1; } @@ -224,8 +228,10 @@ insert2 (struct PostgresClosure *pg, { GNUNET_break (0); TEH_PG_rollback (pg); + results[0] = GNUNET_DB_STATUS_HARD_ERROR; return GNUNET_DB_STATUS_HARD_ERROR; } + results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; return qs1; } @@ -240,7 +246,8 @@ insert4 (struct PostgresClosure *pg, struct GNUNET_TIME_Timestamp reserve_expiration, bool *transaction_duplicate, bool *conflict, - uint64_t *reserve_uuid) + uint64_t *reserve_uuid, + enum GNUNET_DB_QueryStatus results[1]) { enum GNUNET_DB_QueryStatus qs3; PREPARE (pg, @@ -353,13 +360,11 @@ insert4 (struct PostgresClosure *pg, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to create reserves4 (%d)\n", qs3); + results[0] = qs3; return qs3; } GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs3); - /* results[i] = (transaction_duplicate) - ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS - : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;*/ if ( ((conflict[0]) && (transaction_duplicate[0])) @@ -370,8 +375,10 @@ insert4 (struct PostgresClosure *pg, { GNUNET_break (0); TEH_PG_rollback (pg); + results[0] = GNUNET_DB_STATUS_HARD_ERROR; return GNUNET_DB_STATUS_HARD_ERROR; } + results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; return qs3; } @@ -386,7 +393,8 @@ insert8 (struct PostgresClosure *pg, struct GNUNET_TIME_Timestamp reserve_expiration, bool *transaction_duplicate, bool *conflict, - uint64_t *reserve_uuid) + uint64_t *reserve_uuid, + enum GNUNET_DB_QueryStatus results[1]) { enum GNUNET_DB_QueryStatus qs3; PREPARE (pg, @@ -583,6 +591,7 @@ insert8 (struct PostgresClosure *pg, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to create reserves8 (%d)\n", qs3); + results[0]=qs3; return qs3; } @@ -604,17 +613,19 @@ insert8 (struct PostgresClosure *pg, { GNUNET_break (0); TEH_PG_rollback (pg); + results[0]=GNUNET_DB_STATUS_HARD_ERROR; return GNUNET_DB_STATUS_HARD_ERROR; } + results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; return qs3; } enum GNUNET_DB_QueryStatus TEH_PG_batch2_reserves_in_insert (void *cls, - const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, - unsigned int reserves_length, + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, unsigned int batch_size, - enum GNUNET_DB_QueryStatus *results) + enum GNUNET_DB_QueryStatus *results) { struct PostgresClosure *pg = cls; enum GNUNET_DB_QueryStatus qs1; @@ -627,6 +638,7 @@ TEH_PG_batch2_reserves_in_insert (void *cls, uint64_t reserve_uuid[reserves_length]; bool transaction_duplicate[reserves_length]; bool need_update = false; + bool t_duplicate=false; struct GNUNET_TIME_Timestamp reserve_expiration = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); bool conflicts[reserves_length]; @@ -689,7 +701,8 @@ TEH_PG_batch2_reserves_in_insert (void *cls, reserve_expiration, &transaction_duplicate[i], &conflicts[i], - &reserve_uuid[i]); + &reserve_uuid[i], + &results[i]); if (qs1<0) { @@ -707,6 +720,14 @@ TEH_PG_batch2_reserves_in_insert (void *cls, need_update |= conflicts[i+5]; need_update |= conflicts[i+6]; need_update |= conflicts[i+7]; + t_duplicate |= transaction_duplicate[i]; + t_duplicate |= transaction_duplicate[i+1]; + t_duplicate |= transaction_duplicate[i+2]; + t_duplicate |= transaction_duplicate[i+3]; + t_duplicate |= transaction_duplicate[i+4]; + t_duplicate |= transaction_duplicate[i+5]; + t_duplicate |= transaction_duplicate[i+6]; + t_duplicate |= transaction_duplicate[i+7]; i+=8; continue; } @@ -725,7 +746,8 @@ TEH_PG_batch2_reserves_in_insert (void *cls, reserve_expiration, &transaction_duplicate[i], &conflicts[i], - &reserve_uuid[i]); + &reserve_uuid[i], + &results[i]); if (qs4<0) { @@ -738,8 +760,10 @@ TEH_PG_batch2_reserves_in_insert (void *cls, need_update |= conflicts[i+1]; need_update |= conflicts[i+2]; need_update |= conflicts[i+3]; - // fprintf(stdout, "%ld %ld %ld %ld\n", reserve_uuid[i], reserve_uuid[i+1], reserve_uuid[i+2], reserve_uuid[i+3]); - //fprintf(stdout, "%d %d %d %d\n", transaction_duplicate[i], transaction_duplicate[i+1], transaction_duplicate[i+2], transaction_duplicate[i+3]); + t_duplicate |= transaction_duplicate[i]; + t_duplicate |= transaction_duplicate[i+1]; + t_duplicate |= transaction_duplicate[i+2]; + t_duplicate |= transaction_duplicate[i+3]; i += 4; break; case 3: @@ -753,17 +777,21 @@ TEH_PG_batch2_reserves_in_insert (void *cls, reserve_expiration, &transaction_duplicate[i], &conflicts[i], - &reserve_uuid[i]); + &reserve_uuid[i], + &results[i]); if (qs5<0) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to update reserves (%d)\n", + "Failed to update reserves 2 (%d)\n", qs5); return qs5; } need_update |= conflicts[i]; need_update |= conflicts[i+1]; - // fprintf(stdout, "%ld %ld\n", reserve_uuid[i], reserve_uuid[i+1]); + t_duplicate |= transaction_duplicate[i]; + t_duplicate |= transaction_duplicate[i+1]; + + //fprintf(stdout, "%ld %ld c:%d t:%d\n", reserve_uuid[i], reserve_uuid[i+1], conflicts[i], transaction_duplicate[i]); i += 2; break; case 1: @@ -776,14 +804,19 @@ TEH_PG_batch2_reserves_in_insert (void *cls, reserve_expiration, &transaction_duplicate[i], &conflicts[i], - &reserve_uuid[i]); + &reserve_uuid[i], + &results[i]); if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) { GNUNET_break (0); return GNUNET_DB_STATUS_HARD_ERROR; } need_update |= conflicts[i]; + t_duplicate |= transaction_duplicate[i]; + + // fprintf(stdout, "%ld c:%d t:%d\n", reserve_uuid[i], conflicts[i], transaction_duplicate[i]); i += 1; + break; case 0: GNUNET_assert (0); @@ -807,6 +840,8 @@ TEH_PG_batch2_reserves_in_insert (void *cls, { goto exit; } + if (t_duplicate) + goto exit; // begin serializable { if (GNUNET_OK != @@ -819,42 +854,52 @@ TEH_PG_batch2_reserves_in_insert (void *cls, } enum GNUNET_DB_QueryStatus qs3; - + PREPARE (pg, + "reserves_update", + "SELECT" + " out_duplicate AS duplicate " + "FROM exchange_do_batch_reserves_update" + " ($1,$2,$3,$4,$5,$6,$7,$8);"); for (unsigned int i = 0; iconn, - "reserves_update", - params); - if (qs3<0) + if (! conflicts[i]) + continue; + // fprintf(stdout, "%d\n", conflicts[i]); { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to update (%d)\n", - qs3); - return qs3; + bool duplicate; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (reserves[i].reserve_pub), + GNUNET_PQ_query_param_timestamp (&expiry), + GNUNET_PQ_query_param_uint64 (&reserves[i].wire_reference), + TALER_PQ_query_param_amount (reserves[i].balance), + GNUNET_PQ_query_param_string (reserves[i].exchange_account_name), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (notify_s[i]), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("duplicate", + &duplicate), + GNUNET_PQ_result_spec_end + }; + qs3 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "reserves_update", + params, + rs); + if (qs3<0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to update (%d)\n", + qs3); + results[i] = qs3; + return qs3; + } + results[i] = duplicate + ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS + : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } } - } + { enum GNUNET_DB_QueryStatus cs; diff --git a/src/exchangedb/pg_batch2_reserves_in_insert.h b/src/exchangedb/pg_batch2_reserves_in_insert.h index 491e789cd..3191f867e 100644 --- a/src/exchangedb/pg_batch2_reserves_in_insert.h +++ b/src/exchangedb/pg_batch2_reserves_in_insert.h @@ -16,7 +16,7 @@ /** * @file exchangedb/pg_batch2_reserves_in_insert.h * @brief implementation of the batch2_reserves_in_insert function for Postgres - * @author Christian Grothoff + * @author Joseph XU */ #ifndef PG_BATCH2_RESERVES_IN_INSERT_H #define PG_BATCH2_RESERVES_IN_INSERT_H diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c b/src/exchangedb/pg_batch_reserves_in_insert.c index 14dc24e75..f1e4a9362 100644 --- a/src/exchangedb/pg_batch_reserves_in_insert.c +++ b/src/exchangedb/pg_batch_reserves_in_insert.c @@ -66,9 +66,6 @@ TEH_PG_batch_reserves_in_insert ( struct GNUNET_TIME_Timestamp gc; struct TALER_PaytoHashP h_payto; uint64_t reserve_uuid; - bool conflicted; - bool transaction_duplicate; - bool need_update = false; struct GNUNET_TIME_Timestamp reserve_expiration = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); bool conflicts[reserves_length]; @@ -100,15 +97,12 @@ TEH_PG_batch_reserves_in_insert ( GNUNET_STRINGS_relative_time_to_string ( pg->idle_reserve_expiration_time, GNUNET_NO)); - + if (GNUNET_OK != + TEH_PG_start_read_committed (pg, + "READ_COMMITED")) { - if (GNUNET_OK != - TEH_PG_start_read_committed (pg, - "READ_COMMITED")) - { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; } /* Optimistically assume this is a new reserve, create balance for the first time; we do this before adding the actual transaction to "reserves_in", @@ -119,9 +113,12 @@ TEH_PG_batch_reserves_in_insert ( const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i]; notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); } - + bool need_update = false; for (unsigned int i = 0; iconn, "reserve_create", params, rs); - if (qs1 < 0) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to create reserves (%d)\n", qs1); + results[i] = qs1; return qs1; } GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1); - - conflicts[i] = conflicted; - // fprintf(stdout, "%d", conflicts[i]); - if (conflicts[i] && transaction_duplicate) - { - GNUNET_break (0); - TEH_PG_rollback (pg); - return GNUNET_DB_STATUS_HARD_ERROR; - } - need_update |= conflicted; + conflicts[i] = conflicted; + // fprintf(stdout, "%d", conflicts[i]); + if (conflicts[i] && transaction_duplicate) + { + GNUNET_break (0); + results[i] = GNUNET_DB_STATUS_HARD_ERROR; + TEH_PG_rollback (pg); + return GNUNET_DB_STATUS_HARD_ERROR; + } + results[i] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + need_update |= conflicted; } // commit { @@ -183,7 +179,6 @@ TEH_PG_batch_reserves_in_insert ( if (cs < 0) return cs; } - if (! need_update) goto exit; // begin serializable @@ -200,35 +195,46 @@ TEH_PG_batch_reserves_in_insert ( enum GNUNET_DB_QueryStatus qs2; PREPARE (pg, "reserves_in_add_transaction", - "SELECT exchange_do_batch_reserves_update" - " ($1,$2,$3,$4,$5,$6,$7,$8,$9);"); + "SELECT" + " out_duplicate AS duplicate" + " FROM exchange_do_batch_reserves_update" + " ($1,$2,$3,$4,$5,$6,$7,$8);"); for (unsigned int i = 0; iconn, - "reserves_in_add_transaction", - params); - if (qs2<0) + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("duplicate", + &duplicate), + GNUNET_PQ_result_spec_end + }; + qs2 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "reserves_in_add_transaction", + params, + rs); + if (qs2 < 0) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to update reserves (%d)\n", qs2); + results[i] = qs2; return qs2; } + results[i] = duplicate + ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS + : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } } { @@ -238,7 +244,6 @@ TEH_PG_batch_reserves_in_insert ( if (cs < 0) return cs; } - exit: for (unsigned int i = 0; ipub, - &bks, - p_ah[i], - &coin_pub, - &alg_values, - &c_hash, - &pd.blinded_planchet)); - GNUNET_assert (GNUNET_OK == - TALER_coin_ev_hash (&pd.blinded_planchet, - &cbc.denom_pub_hash, - &cbc.h_coin_envelope)); - if (i != 0) - TALER_blinded_denom_sig_free (&cbc.sig); - GNUNET_assert ( - GNUNET_OK == - TALER_denom_sign_blinded ( - &cbc.sig, - &dkp->priv, - false, - &pd.blinded_planchet)); - TALER_blinded_planchet_free (&pd.blinded_planchet); - } - } cbc.reserve_pub = reserve_pub; cbc.amount_with_fee = value; GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (CURRENCY, &cbc.withdraw_fee)); - for (unsigned int i=0; i< NUMBER_DEPOSIT; i++) - { - fprintf(stdout, "%d\n", i); + /* for (unsigned int i=0; i< NUMBER_DEPOSIT; i++) + {*/ + fprintf(stdout, "%d\n", i); + struct TALER_CoinSpendPublicKeyP coin_pub; + RND_BLK (&coin_pub); + { + struct TALER_PlanchetDetail pd; + + struct TALER_AgeCommitmentHash age_hash; + struct TALER_AgeCommitmentHash *p_ah[2] = { + NULL, + &age_hash + }; + + RND_BLK (&age_hash); + + for (size_t k = 0; k < sizeof(p_ah) / sizeof(p_ah[0]); k++) + { + fprintf(stdout, "OPEN\n"); + + GNUNET_assert (GNUNET_OK == + TALER_denom_blind (&dkp->pub, + &bks, + p_ah[k], + &coin_pub, + &alg_values, + &c_hash, + &pd.blinded_planchet)); + GNUNET_assert (GNUNET_OK == + TALER_coin_ev_hash (&pd.blinded_planchet, + &cbc.denom_pub_hash, + &cbc.h_coin_envelope)); + if (k != 0) + TALER_blinded_denom_sig_free (&cbc.sig); + GNUNET_assert ( + GNUNET_OK == + TALER_denom_sign_blinded ( + &cbc.sig, + &dkp->priv, + false, + &pd.blinded_planchet)); + TALER_blinded_planchet_free (&pd.blinded_planchet); + } + } + + depos[i].deposit_fee = fees.deposit; RND_BLK (&depos[i].coin.coin_pub); @@ -432,97 +436,97 @@ run (void *cls) &h_wire_wt); depos[i].timestamp = ts; result = 8; - { - uint64_t known_coin_id; - struct TALER_DenominationHashP dph; - struct TALER_AgeCommitmentHash agh; - FAILIF (TALER_EXCHANGEDB_CKS_ADDED != - plugin->ensure_coin_known (plugin->cls, - &depos[i].coin, - &known_coin_id, - &dph, - &agh)); - } - - /*wire + deposit for get_ready_deposit*/ - - /*STORE INTO DEPOSIT*/ - { - struct GNUNET_TIME_Timestamp now; - struct GNUNET_TIME_Timestamp r; - struct TALER_Amount deposit_fee; - struct TALER_MerchantWireHashP h_wire; - - now = GNUNET_TIME_timestamp_get (); - FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != - plugin->insert_deposit (plugin->cls, - now, - &depos[i])); - TALER_merchant_wire_signature_hash (depos[i].receiver_wire_account, - &depos[i].wire_salt, - &h_wire); - FAILIF (1 != - plugin->have_deposit2 (plugin->cls, - &depos[i].h_contract_terms, - &h_wire, - &depos[i].coin.coin_pub, - &depos[i].merchant_pub, - depos[i].refund_deadline, - &deposit_fee, - &r)); - FAILIF (GNUNET_TIME_timestamp_cmp (now, - !=, - r)); - } - { - struct GNUNET_TIME_Timestamp start_range; - struct GNUNET_TIME_Timestamp end_range; - - start_range = GNUNET_TIME_absolute_to_timestamp ( - GNUNET_TIME_absolute_subtract (deadline.abs_time, - GNUNET_TIME_UNIT_SECONDS)); - end_range = GNUNET_TIME_absolute_to_timestamp ( - GNUNET_TIME_absolute_add (deadline.abs_time, - GNUNET_TIME_UNIT_SECONDS)); - /*Aborted*/ - FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != - plugin->select_deposits_missing_wire (plugin->cls, - start_range, - end_range, - &wire_missing_cb, - &depos[i])); + { + uint64_t known_coin_id; + struct TALER_DenominationHashP dph; + struct TALER_AgeCommitmentHash agh; + FAILIF (TALER_EXCHANGEDB_CKS_ADDED != + plugin->ensure_coin_known (plugin->cls, + &depos[i].coin, + &known_coin_id, + &dph, + &agh)); + } + + /*wire + deposit for get_ready_deposit*/ + + /*STORE INTO DEPOSIT*/ + { + struct GNUNET_TIME_Timestamp now; + struct GNUNET_TIME_Timestamp r; + struct TALER_Amount deposit_fee; + struct TALER_MerchantWireHashP h_wire; + + now = GNUNET_TIME_timestamp_get (); + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->insert_deposit (plugin->cls, + now, + &depos[i])); + TALER_merchant_wire_signature_hash (depos[i].receiver_wire_account, + &depos[i].wire_salt, + &h_wire); + FAILIF (1 != + plugin->have_deposit2 (plugin->cls, + &depos[i].h_contract_terms, + &h_wire, + &depos[i].coin.coin_pub, + &depos[i].merchant_pub, + depos[i].refund_deadline, + &deposit_fee, + &r)); + FAILIF (GNUNET_TIME_timestamp_cmp (now, + !=, + r)); + } + { + struct GNUNET_TIME_Timestamp start_range; + struct GNUNET_TIME_Timestamp end_range; + + start_range = GNUNET_TIME_absolute_to_timestamp ( + GNUNET_TIME_absolute_subtract (deadline.abs_time, + GNUNET_TIME_UNIT_SECONDS)); + end_range = GNUNET_TIME_absolute_to_timestamp ( + GNUNET_TIME_absolute_add (deadline.abs_time, + GNUNET_TIME_UNIT_SECONDS)); + /*Aborted*/ + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->select_deposits_missing_wire (plugin->cls, + start_range, + end_range, + &wire_missing_cb, + &depos[i])); FAILIF (8 != result); - } - auditor_row_cnt = 0; - FAILIF (0 >= - plugin->select_deposits_above_serial_id (plugin->cls, - 0, - &audit_deposit_cb, - NULL)); - FAILIF (0 == auditor_row_cnt); - result = 8; - sleep (2); - /*CREATE DEPOSIT*/ - { - struct TALER_MerchantPublicKeyP merchant_pub2; - char *payto_uri2; - - FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != - plugin->get_ready_deposit (plugin->cls, - 0, - INT32_MAX, - &merchant_pub2, - &payto_uri2)); - FAILIF (0 != GNUNET_memcmp (&merchant_pub2, - &depos[i].merchant_pub)); - FAILIF (0 != strcmp (payto_uri2, - depos[i].receiver_wire_account)); - TALER_payto_hash (payto_uri2, - &wire_target_h_payto); - GNUNET_free (payto_uri2); - } - /* { + } + auditor_row_cnt = 0; + FAILIF (0 >= + plugin->select_deposits_above_serial_id (plugin->cls, + 0, + &audit_deposit_cb, + NULL)); + FAILIF (0 == auditor_row_cnt); + result = 8; + sleep (2); + /*CREATE DEPOSIT*/ + { + struct TALER_MerchantPublicKeyP merchant_pub2; + char *payto_uri2; + + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->get_ready_deposit (plugin->cls, + 0, + INT32_MAX, + &merchant_pub2, + &payto_uri2)); + FAILIF (0 != GNUNET_memcmp (&merchant_pub2, + &depos[i].merchant_pub)); + FAILIF (0 != strcmp (payto_uri2, + depos[i].receiver_wire_account)); + TALER_payto_hash (payto_uri2, + &wire_target_h_payto); + GNUNET_free (payto_uri2); + // } + /* { RND_BLK (&ref.details.merchant_pub); RND_BLK(&ref.details.merchant_sig); ref.details.h_contract_terms = depos.h_contract_terms; @@ -534,7 +538,7 @@ run (void *cls) plugin->insert_refund (plugin->cls, &ref)); }*/ - + } result = 0; @@ -544,8 +548,8 @@ drop: cleanup: if (NULL != dkp) destroy_denom_key_pair (dkp); - for (unsigned int i=0; i