diff options
Diffstat (limited to 'src/exchangedb/pg_reserves_in_insert.c')
-rw-r--r-- | src/exchangedb/pg_reserves_in_insert.c | 1000 |
1 files changed, 811 insertions, 189 deletions
diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c index 0fdc4a16c..314e89d8b 100644 --- a/src/exchangedb/pg_reserves_in_insert.c +++ b/src/exchangedb/pg_reserves_in_insert.c @@ -27,6 +27,8 @@ #include "pg_start.h" #include "pg_start_read_committed.h" #include "pg_commit.h" +#include "pg_preflight.h" +#include "pg_rollback.h" #include "pg_reserves_get.h" #include "pg_reserves_update.h" #include "pg_setup_wire_target.h" @@ -34,15 +36,13 @@ /** - * Generate event notification for the reserve - * change. + * Generate event notification for the reserve change. * - * @param pg plugin state * @param reserve_pub reserve to notfiy on + * @return string to pass to postgres for the notification */ -static void -notify_on_reserve (struct PostgresClosure *pg, - const struct TALER_ReservePublicKeyP *reserve_pub) +static char * +compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub) { struct TALER_ReserveEventP rep = { .header.size = htons (sizeof (rep)), @@ -50,246 +50,868 @@ notify_on_reserve (struct PostgresClosure *pg, .reserve_pub = *reserve_pub }; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Notifying on reserve!\n"); - TEH_PG_event_notify (pg, - &rep.header, - NULL, - 0); + return GNUNET_PG_get_event_notify_channel (&rep.header); +} + + +static enum GNUNET_DB_QueryStatus +insert1 (struct PostgresClosure *pg, + const struct TALER_EXCHANGEDB_ReserveInInfo reserves[1], + struct GNUNET_TIME_Timestamp expiry, + struct GNUNET_TIME_Timestamp gc, + struct TALER_PaytoHashP h_payto, + char *const *notify_s, + struct GNUNET_TIME_Timestamp reserve_expiration, + bool *transaction_duplicate, + bool *conflict, + uint64_t *reserve_uuid, + enum GNUNET_DB_QueryStatus results[1]) +{ + enum GNUNET_DB_QueryStatus qs2; + PREPARE (pg, + "batch1_reserve_create", + "SELECT " + " out_reserve_found AS conflicted" + ",transaction_duplicate" + ",ruuid AS reserve_uuid" + " FROM exchange_do_batch_reserves_in_insert" + " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12);"); + + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub), + GNUNET_PQ_query_param_timestamp (&expiry), + GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_uint64 (&reserves[0].wire_reference), + TALER_PQ_query_param_amount (reserves[0].balance), + GNUNET_PQ_query_param_string (reserves[0].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[0].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[0].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + GNUNET_PQ_query_param_string (notify_s[0]), + GNUNET_PQ_query_param_end + }; + + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("conflicted", + &conflict[0]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate", + &transaction_duplicate[0]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid", + &reserve_uuid[0]), + GNUNET_PQ_result_spec_end + }; + + + TALER_payto_hash (reserves[0].sender_account_details, + &h_payto); + + qs2 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "batch1_reserve_create", + params, + rs); + + if (qs2 < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserves 1(%d)\n", + qs2); + results[0] = qs2; + return qs2; + } + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs2); + if ((! conflict[0]) && transaction_duplicate[0]) + { + GNUNET_break (0); + TEH_PG_rollback (pg); + results[0] = GNUNET_DB_STATUS_HARD_ERROR; + return GNUNET_DB_STATUS_HARD_ERROR; + } + results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + return qs2; +} + + +static enum GNUNET_DB_QueryStatus +insert2 (struct PostgresClosure *pg, + const struct TALER_EXCHANGEDB_ReserveInInfo reserves[2], + struct GNUNET_TIME_Timestamp expiry, + struct GNUNET_TIME_Timestamp gc, + struct TALER_PaytoHashP h_payto, + char *const*notify_s, + struct GNUNET_TIME_Timestamp reserve_expiration, + bool *transaction_duplicate, + bool *conflict, + uint64_t *reserve_uuid, + enum GNUNET_DB_QueryStatus results[1]) +{ + enum GNUNET_DB_QueryStatus qs1; + PREPARE (pg, + "batch2_reserve_create", + "SELECT " + "out_reserve_found AS conflicted" + ",out_reserve_found2 AS conflicted2" + ",transaction_duplicate" + ",transaction_duplicate2" + ",ruuid AS reserve_uuid" + ",ruuid2 AS reserve_uuid2" + " FROM exchange_do_batch2_reserves_insert" + " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22);"); + + struct GNUNET_PQ_QueryParam params[] = { + + GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub), + GNUNET_PQ_query_param_timestamp (&expiry), + GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_uint64 (&reserves[0].wire_reference), + TALER_PQ_query_param_amount (reserves[0].balance), + GNUNET_PQ_query_param_string (reserves[0].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[0].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[0].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + GNUNET_PQ_query_param_string (notify_s[0]), + GNUNET_PQ_query_param_string (notify_s[1]), + + GNUNET_PQ_query_param_auto_from_type (reserves[1].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[1].wire_reference), + TALER_PQ_query_param_amount (reserves[1].balance), + GNUNET_PQ_query_param_string (reserves[1].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[1].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[1].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("conflicted", + &conflict[0]), + GNUNET_PQ_result_spec_bool ("conflicted2", + &conflict[1]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate", + &transaction_duplicate[0]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate2", + &transaction_duplicate[1]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid", + &reserve_uuid[0]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid2", + &reserve_uuid[1]), + GNUNET_PQ_result_spec_end + }; + + TALER_payto_hash (reserves[0].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[1].sender_account_details, + &h_payto); + + + qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "batch2_reserve_create", + params, + rs); + if (qs1 < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserves 2(%d)\n", + qs1); + results[0] = qs1; + return qs1; + } + + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1); + /* results[i] = (transaction_duplicate) + ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS + : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;*/ + + if ( + ((! conflict[0]) && (transaction_duplicate[0])) + || ((! conflict[1]) && (transaction_duplicate[1])) + ) + { + GNUNET_break (0); + TEH_PG_rollback (pg); // ROLLBACK + results[0] = GNUNET_DB_STATUS_HARD_ERROR; + return GNUNET_DB_STATUS_HARD_ERROR; + } + results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + return qs1; +} + + +static enum GNUNET_DB_QueryStatus +insert4 (struct PostgresClosure *pg, + const struct TALER_EXCHANGEDB_ReserveInInfo reserves[4], + struct GNUNET_TIME_Timestamp expiry, + struct GNUNET_TIME_Timestamp gc, + struct TALER_PaytoHashP h_payto, + char *const*notify_s, + struct GNUNET_TIME_Timestamp reserve_expiration, + bool *transaction_duplicate, + bool *conflict, + uint64_t *reserve_uuid, + enum GNUNET_DB_QueryStatus results[1]) +{ + enum GNUNET_DB_QueryStatus qs3; + PREPARE (pg, + "batch4_reserve_create", + "SELECT " + "out_reserve_found AS conflicted" + ",out_reserve_found2 AS conflicted2" + ",out_reserve_found3 AS conflicted3" + ",out_reserve_found4 AS conflicted4" + ",transaction_duplicate" + ",transaction_duplicate2" + ",transaction_duplicate3" + ",transaction_duplicate4" + ",ruuid AS reserve_uuid" + ",ruuid2 AS reserve_uuid2" + ",ruuid3 AS reserve_uuid3" + ",ruuid4 AS reserve_uuid4" + " FROM exchange_do_batch4_reserves_insert" + " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38,$39, $40, $41,$42);"); + + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub), + GNUNET_PQ_query_param_timestamp (&expiry), + GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_uint64 (&reserves[0].wire_reference), + TALER_PQ_query_param_amount (reserves[0].balance), + GNUNET_PQ_query_param_string (reserves[0].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[0].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[0].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + GNUNET_PQ_query_param_string (notify_s[0]), + GNUNET_PQ_query_param_string (notify_s[1]), + GNUNET_PQ_query_param_string (notify_s[2]), + GNUNET_PQ_query_param_string (notify_s[3]), + + GNUNET_PQ_query_param_auto_from_type (reserves[1].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[1].wire_reference), + TALER_PQ_query_param_amount (reserves[1].balance), + GNUNET_PQ_query_param_string (reserves[1].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[1].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[1].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_auto_from_type (reserves[2].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[2].wire_reference), + TALER_PQ_query_param_amount (reserves[2].balance), + GNUNET_PQ_query_param_string (reserves[2].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[2].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[2].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_auto_from_type (reserves[3].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[3].wire_reference), + TALER_PQ_query_param_amount (reserves[3].balance), + GNUNET_PQ_query_param_string (reserves[3].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[3].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[3].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_end + }; + + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("conflicted", + &conflict[0]), + GNUNET_PQ_result_spec_bool ("conflicted2", + &conflict[1]), + GNUNET_PQ_result_spec_bool ("conflicted3", + &conflict[2]), + GNUNET_PQ_result_spec_bool ("conflicted4", + &conflict[3]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate", + &transaction_duplicate[0]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate2", + &transaction_duplicate[1]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate3", + &transaction_duplicate[2]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate4", + &transaction_duplicate[3]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid", + &reserve_uuid[0]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid2", + &reserve_uuid[1]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid3", + &reserve_uuid[2]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid4", + &reserve_uuid[3]), + GNUNET_PQ_result_spec_end + }; + + TALER_payto_hash (reserves[0].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[1].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[2].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[3].sender_account_details, + &h_payto); + + qs3 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "batch4_reserve_create", + params, + rs); + if (qs3 < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserves4 (%d)\n", + qs3); + results[0] = qs3; + return qs3; + } + + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs3); + + if ( + ((! conflict[0]) && (transaction_duplicate[0])) + || ((! conflict[1]) && (transaction_duplicate[1])) + || ((! conflict[2]) && (transaction_duplicate[2])) + || ((! conflict[3]) && (transaction_duplicate[3])) + ) + { + GNUNET_break (0); + TEH_PG_rollback (pg); + results[0] = GNUNET_DB_STATUS_HARD_ERROR; + return GNUNET_DB_STATUS_HARD_ERROR; + } + results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + return qs3; +} + + +static enum GNUNET_DB_QueryStatus +insert8 (struct PostgresClosure *pg, + const struct TALER_EXCHANGEDB_ReserveInInfo reserves[8], + struct GNUNET_TIME_Timestamp expiry, + struct GNUNET_TIME_Timestamp gc, + struct TALER_PaytoHashP h_payto, + char *const*notify_s, + struct GNUNET_TIME_Timestamp reserve_expiration, + bool *transaction_duplicate, + bool *conflict, + uint64_t *reserve_uuid, + enum GNUNET_DB_QueryStatus results[1]) +{ + enum GNUNET_DB_QueryStatus qs3; + PREPARE (pg, + "batch8_reserve_create", + "SELECT " + "out_reserve_found AS conflicted" + ",out_reserve_found2 AS conflicted2" + ",out_reserve_found3 AS conflicted3" + ",out_reserve_found4 AS conflicted4" + ",out_reserve_found5 AS conflicted5" + ",out_reserve_found6 AS conflicted6" + ",out_reserve_found7 AS conflicted7" + ",out_reserve_found8 AS conflicted8" + ",transaction_duplicate" + ",transaction_duplicate2" + ",transaction_duplicate3" + ",transaction_duplicate4" + ",transaction_duplicate5" + ",transaction_duplicate6" + ",transaction_duplicate7" + ",transaction_duplicate8" + ",ruuid AS reserve_uuid" + ",ruuid2 AS reserve_uuid2" + ",ruuid3 AS reserve_uuid3" + ",ruuid4 AS reserve_uuid4" + ",ruuid5 AS reserve_uuid5" + ",ruuid6 AS reserve_uuid6" + ",ruuid7 AS reserve_uuid7" + ",ruuid8 AS reserve_uuid8" + " FROM exchange_do_batch8_reserves_insert" + " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38,$39, $40, $41,$42,$43,$44,$45,$46,$47,$48,$49,$50,$51,$52,$53,$54,$55,$56,$57,$58,$59,$60,$61,$62,$63,$64,$65,$66,$67,$68,$69,$70,$71,$72,$73,$74,$75,$76,$77,$78,$79,$80,$81,$82);"); + + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub), + GNUNET_PQ_query_param_timestamp (&expiry), + GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_uint64 (&reserves[0].wire_reference), + TALER_PQ_query_param_amount (reserves[0].balance), + GNUNET_PQ_query_param_string (reserves[0].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[0].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[0].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + GNUNET_PQ_query_param_string (notify_s[0]), + GNUNET_PQ_query_param_string (notify_s[1]), + GNUNET_PQ_query_param_string (notify_s[2]), + GNUNET_PQ_query_param_string (notify_s[3]), + GNUNET_PQ_query_param_string (notify_s[4]), + GNUNET_PQ_query_param_string (notify_s[5]), + GNUNET_PQ_query_param_string (notify_s[6]), + GNUNET_PQ_query_param_string (notify_s[7]), + + GNUNET_PQ_query_param_auto_from_type (reserves[1].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[1].wire_reference), + TALER_PQ_query_param_amount (reserves[1].balance), + GNUNET_PQ_query_param_string (reserves[1].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[1].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[1].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_auto_from_type (reserves[2].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[2].wire_reference), + TALER_PQ_query_param_amount (reserves[2].balance), + GNUNET_PQ_query_param_string (reserves[2].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[2].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[2].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_auto_from_type (reserves[3].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[3].wire_reference), + TALER_PQ_query_param_amount (reserves[3].balance), + GNUNET_PQ_query_param_string (reserves[3].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[3].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[3].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_auto_from_type (reserves[4].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[4].wire_reference), + TALER_PQ_query_param_amount (reserves[4].balance), + GNUNET_PQ_query_param_string (reserves[4].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[4].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[4].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_auto_from_type (reserves[5].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[5].wire_reference), + TALER_PQ_query_param_amount (reserves[5].balance), + GNUNET_PQ_query_param_string (reserves[5].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[5].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[5].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_auto_from_type (reserves[6].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[6].wire_reference), + TALER_PQ_query_param_amount (reserves[6].balance), + GNUNET_PQ_query_param_string (reserves[6].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[6].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[6].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_auto_from_type (reserves[7].reserve_pub), + GNUNET_PQ_query_param_uint64 (&reserves[7].wire_reference), + TALER_PQ_query_param_amount (reserves[7].balance), + GNUNET_PQ_query_param_string (reserves[7].exchange_account_name), + GNUNET_PQ_query_param_timestamp (&reserves[7].execution_time), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (reserves[7].sender_account_details), + GNUNET_PQ_query_param_timestamp (&reserve_expiration), + + GNUNET_PQ_query_param_end + }; + + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("conflicted", + &conflict[0]), + GNUNET_PQ_result_spec_bool ("conflicted2", + &conflict[1]), + GNUNET_PQ_result_spec_bool ("conflicted3", + &conflict[2]), + GNUNET_PQ_result_spec_bool ("conflicted4", + &conflict[3]), + GNUNET_PQ_result_spec_bool ("conflicted5", + &conflict[4]), + GNUNET_PQ_result_spec_bool ("conflicted6", + &conflict[5]), + GNUNET_PQ_result_spec_bool ("conflicted7", + &conflict[6]), + GNUNET_PQ_result_spec_bool ("conflicted8", + &conflict[7]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate", + &transaction_duplicate[0]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate2", + &transaction_duplicate[1]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate3", + &transaction_duplicate[2]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate4", + &transaction_duplicate[3]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate5", + &transaction_duplicate[4]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate6", + &transaction_duplicate[5]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate7", + &transaction_duplicate[6]), + GNUNET_PQ_result_spec_bool ("transaction_duplicate8", + &transaction_duplicate[7]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid", + &reserve_uuid[0]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid2", + &reserve_uuid[1]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid3", + &reserve_uuid[2]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid4", + &reserve_uuid[3]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid5", + &reserve_uuid[4]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid6", + &reserve_uuid[5]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid7", + &reserve_uuid[6]), + GNUNET_PQ_result_spec_uint64 ("reserve_uuid8", + &reserve_uuid[7]), + GNUNET_PQ_result_spec_end + }; + + TALER_payto_hash (reserves[0].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[1].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[2].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[3].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[4].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[5].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[6].sender_account_details, + &h_payto); + TALER_payto_hash (reserves[7].sender_account_details, + &h_payto); + + qs3 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "batch8_reserve_create", + params, + rs); + if (qs3 < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserves8 (%d)\n", + qs3); + results[0] = qs3; + return qs3; + } + + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs3); + /* results[i] = (transaction_duplicate) + ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS + : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;*/ + + if ( + ((! conflict[0]) && (transaction_duplicate[0])) + || ((! conflict[1]) && (transaction_duplicate[1])) + || ((! conflict[2]) && (transaction_duplicate[2])) + || ((! conflict[3]) && (transaction_duplicate[3])) + || ((! conflict[4]) && (transaction_duplicate[4])) + || ((! conflict[5]) && (transaction_duplicate[5])) + || ((! conflict[6]) && (transaction_duplicate[6])) + || ((! conflict[7]) && (transaction_duplicate[7])) + ) + { + GNUNET_break (0); + TEH_PG_rollback (pg); + results[0] = GNUNET_DB_STATUS_HARD_ERROR; + return GNUNET_DB_STATUS_HARD_ERROR; + } + results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + return qs3; } enum GNUNET_DB_QueryStatus TEH_PG_reserves_in_insert (void *cls, - const struct TALER_ReservePublicKeyP *reserve_pub, - const struct TALER_Amount *balance, - struct GNUNET_TIME_Timestamp execution_time, - const char *sender_account_details, - const char *exchange_account_section, - uint64_t wire_ref) + const struct + TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, + unsigned int batch_size, + enum GNUNET_DB_QueryStatus *results) { struct PostgresClosure *pg = cls; enum GNUNET_DB_QueryStatus qs1; - struct TALER_EXCHANGEDB_Reserve reserve; + enum GNUNET_DB_QueryStatus qs2; + enum GNUNET_DB_QueryStatus qs4; + enum GNUNET_DB_QueryStatus qs5; struct GNUNET_TIME_Timestamp expiry; struct GNUNET_TIME_Timestamp gc; - uint64_t reserve_uuid; + struct TALER_PaytoHashP h_payto; + uint64_t reserve_uuid[reserves_length]; + bool transaction_duplicate[reserves_length]; + bool need_update = false; + bool t_duplicate = false; + struct GNUNET_TIME_Timestamp reserve_expiration + = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); + bool conflicts[reserves_length]; + char *notify_s[reserves_length]; + + if (GNUNET_OK != + TEH_PG_preflight (pg)) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } - reserve.pub = *reserve_pub; expiry = GNUNET_TIME_absolute_to_timestamp ( - GNUNET_TIME_absolute_add (execution_time.abs_time, + GNUNET_TIME_absolute_add (reserves->execution_time.abs_time, pg->idle_reserve_expiration_time)); gc = GNUNET_TIME_absolute_to_timestamp ( GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), pg->legal_reserve_expiration_time)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating reserve %s with expiration in %s\n", - TALER_B2S (reserve_pub), + TALER_B2S (&(reserves->reserve_pub)), GNUNET_STRINGS_relative_time_to_string ( pg->idle_reserve_expiration_time, GNUNET_NO)); + + if (GNUNET_OK != + TEH_PG_start_read_committed (pg, + "READ_COMMITED")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + /* Optimistically assume this is a new reserve, create balance for the first time; we do this before adding the actual transaction to "reserves_in", as for a new reserve it can't be a duplicate 'add' operation, and as the 'add' operation needs the reserve entry as a foreign key. */ + for (unsigned int i = 0; i<reserves_length; i++) { - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_auto_from_type (reserve_pub), - TALER_PQ_query_param_amount (balance), - GNUNET_PQ_query_param_timestamp (&expiry), - GNUNET_PQ_query_param_timestamp (&gc), - GNUNET_PQ_query_param_end - }; - struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_uint64 ("reserve_uuid", - &reserve_uuid), - GNUNET_PQ_result_spec_end - }; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Reserve does not exist; creating a new one\n"); - /* Note: query uses 'on conflict do nothing' */ - PREPARE (pg, - "reserve_create", - "INSERT INTO reserves " - "(reserve_pub" - ",current_balance_val" - ",current_balance_frac" - ",expiration_date" - ",gc_date" - ") VALUES " - "($1, $2, $3, $4, $5)" - " ON CONFLICT DO NOTHING" - " RETURNING reserve_uuid;"); - qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, - "reserve_create", - params, - rs); - if (qs1 < 0) - return qs1; + const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i]; + + notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); } - /* Create new incoming transaction, "ON CONFLICT DO NOTHING" - is again used to guard against duplicates. */ + unsigned int i = 0; + + while (i < reserves_length) { - enum GNUNET_DB_QueryStatus qs2; - enum GNUNET_DB_QueryStatus qs3; - struct TALER_PaytoHashP h_payto; - - qs3 = TEH_PG_setup_wire_target (pg, - sender_account_details, - &h_payto); - if (qs3 < 0) - return qs3; - /* We do not have the UUID, so insert by public key */ - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_auto_from_type (&reserve.pub), - GNUNET_PQ_query_param_uint64 (&wire_ref), - TALER_PQ_query_param_amount (balance), - GNUNET_PQ_query_param_string (exchange_account_section), - GNUNET_PQ_query_param_auto_from_type (&h_payto), - GNUNET_PQ_query_param_timestamp (&execution_time), - GNUNET_PQ_query_param_end - }; - - PREPARE (pg, - "reserves_in_add_transaction", - "INSERT INTO reserves_in " - "(reserve_pub" - ",wire_reference" - ",credit_val" - ",credit_frac" - ",exchange_account_section" - ",wire_source_h_payto" - ",execution_date" - ") VALUES ($1, $2, $3, $4, $5, $6, $7)" - " ON CONFLICT DO NOTHING;"); - qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn, - "reserves_in_add_transaction", - params); - /* qs2 could be 0 as statement used 'ON CONFLICT DO NOTHING' */ - if (0 >= qs2) + unsigned int bs = GNUNET_MIN (batch_size, + reserves_length - i); + if (bs >= 8) { - if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) && - (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) ) + qs1 = insert8 (pg, + &reserves[i], + expiry, + gc, + h_payto, + ¬ify_s[i], + reserve_expiration, + &transaction_duplicate[i], + &conflicts[i], + &reserve_uuid[i], + &results[i]); + + if (qs1<0) { - /* Conflict for the transaction, but the reserve was - just now created, that should be impossible. */ - GNUNET_break (0); /* should be impossible: reserve was fresh, - but transaction already known */ - return GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserve batch_8 (%d)\n", + qs1); + return qs1; } - /* Transaction was already known or error. We are finished. */ - return qs2; + need_update |= conflicts[i]; + need_update |= conflicts[i + 1]; + need_update |= conflicts[i + 2]; + need_update |= conflicts[i + 3]; + need_update |= conflicts[i + 4]; + need_update |= conflicts[i + 5]; + need_update |= conflicts[i + 6]; + need_update |= conflicts[i + 7]; + t_duplicate |= transaction_duplicate[i]; + t_duplicate |= transaction_duplicate[i + 1]; + t_duplicate |= transaction_duplicate[i + 2]; + t_duplicate |= transaction_duplicate[i + 3]; + t_duplicate |= transaction_duplicate[i + 4]; + t_duplicate |= transaction_duplicate[i + 5]; + t_duplicate |= transaction_duplicate[i + 6]; + t_duplicate |= transaction_duplicate[i + 7]; + i += 8; + continue; } - } - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs1) - { - /* New reserve, we are finished */ - notify_on_reserve (pg, - reserve_pub); - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } + switch (bs) + { + case 7: + case 6: + case 5: + case 4: + qs4 = insert4 (pg, + &reserves[i], + expiry, + gc, + h_payto, + ¬ify_s[i], + reserve_expiration, + &transaction_duplicate[i], + &conflicts[i], + &reserve_uuid[i], + &results[i]); - /* we were wrong with our optimistic assumption: - reserve did already exist, need to do an update instead */ + if (qs4<0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserve batch_4 (%d)\n", + qs4); + return qs4; + } + need_update |= conflicts[i]; + need_update |= conflicts[i + 1]; + need_update |= conflicts[i + 2]; + need_update |= conflicts[i + 3]; + t_duplicate |= transaction_duplicate[i]; + t_duplicate |= transaction_duplicate[i + 1]; + t_duplicate |= transaction_duplicate[i + 2]; + t_duplicate |= transaction_duplicate[i + 3]; + // fprintf(stdout, "reserve_uuid : %ld %ld %ld %ld\n", reserve_uuid[i], reserve_uuid[i+1], reserve_uuid[i+2], reserve_uuid[i+3]); + i += 4; + break; + case 3: + case 2: + qs5 = insert2 (pg, + &reserves[i], + expiry, + gc, + h_payto, + ¬ify_s[i], + reserve_expiration, + &transaction_duplicate[i], + &conflicts[i], + &reserve_uuid[i], + &results[i]); + if (qs5<0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserve batch_2 (%d)\n", + qs5); + return qs5; + } + need_update |= conflicts[i]; + need_update |= conflicts[i + 1]; + t_duplicate |= transaction_duplicate[i]; + t_duplicate |= transaction_duplicate[i + 1]; + results[i] = (t_duplicate) + ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS + : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + // fprintf(stdout, "reserve_uuid : %ld %ld\n", reserve_uuid[i], reserve_uuid[i+1]); + i += 2; + break; + case 1: + qs2 = insert1 (pg, + &reserves[i], + expiry, + gc, + h_payto, + ¬ify_s[i], + reserve_expiration, + &transaction_duplicate[i], + &conflicts[i], + &reserve_uuid[i], + &results[i]); + if (qs2<0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to create reserve batch_1 (%d)\n)" + ,qs2); + return qs2; + } + need_update |= conflicts[i]; + t_duplicate |= transaction_duplicate[i]; + // fprintf(stdout, "reserve uuid : %ld c :%d t:%d\n", reserve_uuid[i], conflicts[i], transaction_duplicate[i]); + i += 1; + break; + case 0: + GNUNET_assert (0); + break; + } + } /* end while */ + // commit { - /* We need to move away from 'read committed' to serializable. - Also, we know that it should be safe to commit at this point. - (We are only run in a larger transaction for performance.) */ enum GNUNET_DB_QueryStatus cs; cs = TEH_PG_commit (pg); if (cs < 0) - return cs; - if (GNUNET_OK != - TEH_PG_start (pg, - "reserve-update-serializable")) { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit\n"); + return cs; } } + if (! need_update) { - enum GNUNET_DB_QueryStatus reserve_exists; - - reserve_exists = TEH_PG_reserves_get (pg, - &reserve); - switch (reserve_exists) + goto exit; + } + // begin serializable + { + if (GNUNET_OK != + TEH_PG_start (pg, + "reserve-insert-continued")) { - case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); - return reserve_exists; - case GNUNET_DB_STATUS_SOFT_ERROR: - return reserve_exists; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* First we got a conflict, but then we cannot select? Very strange. */ - GNUNET_break (0); - return GNUNET_DB_STATUS_SOFT_ERROR; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* continued below */ - break; + return GNUNET_DB_STATUS_HARD_ERROR; } } + enum GNUNET_DB_QueryStatus qs3; + PREPARE (pg, + "reserves_update", + "SELECT" + " out_duplicate AS duplicate " + "FROM exchange_do_batch_reserves_update" + " ($1,$2,$3,$4,$5,$6,$7,$8);"); + for (unsigned int i = 0; i<reserves_length; i++) { - struct TALER_EXCHANGEDB_Reserve updated_reserve; - enum GNUNET_DB_QueryStatus qs3; - - /* If the reserve already existed, we need to still update the - balance; we do this after checking for duplication, as - otherwise we might have to actually pay the cost to roll this - back for duplicate transactions; like this, we should virtually - never actually have to rollback anything. */ - updated_reserve.pub = reserve.pub; - if (0 > - TALER_amount_add (&updated_reserve.balance, - &reserve.balance, - balance)) + if (! conflicts[i]) + continue; { - /* currency overflow or incompatible currency */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Attempt to deposit incompatible amount into reserve\n"); - return GNUNET_DB_STATUS_HARD_ERROR; - } - updated_reserve.expiry = GNUNET_TIME_timestamp_max (expiry, - reserve.expiry); - updated_reserve.gc = GNUNET_TIME_timestamp_max (gc, - reserve.gc); - qs3 = TEH_PG_reserves_update (pg, - &updated_reserve); - switch (qs3) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - return qs3; - case GNUNET_DB_STATUS_SOFT_ERROR: - return qs3; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* How can the UPDATE not work here? Very strange. */ - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* continued below */ - break; + bool duplicate; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (reserves[i].reserve_pub), + GNUNET_PQ_query_param_timestamp (&expiry), + GNUNET_PQ_query_param_uint64 (&reserves[i].wire_reference), + TALER_PQ_query_param_amount (reserves[i].balance), + GNUNET_PQ_query_param_string (reserves[i].exchange_account_name), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_string (notify_s[i]), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("duplicate", + &duplicate), + GNUNET_PQ_result_spec_end + }; + qs3 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "reserves_update", + params, + rs); + if (qs3<0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to update reserves (%d)\n", + qs3); + results[i] = qs3; + return qs3; + } + results[i] = duplicate + ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS + : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } } - notify_on_reserve (pg, - reserve_pub); - /* Go back to original transaction mode */ + { enum GNUNET_DB_QueryStatus cs; cs = TEH_PG_commit (pg); if (cs < 0) return cs; - if (GNUNET_OK != - TEH_PG_start_read_committed (pg, - "reserve-insert-continued")) - { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } } - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + +exit: + for (unsigned int i = 0; i<reserves_length; i++) + GNUNET_free (notify_s[i]); + + return reserves_length; } |