aboutsummaryrefslogtreecommitdiff
path: root/src/exchangedb/pg_reserves_in_insert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb/pg_reserves_in_insert.c')
-rw-r--r--src/exchangedb/pg_reserves_in_insert.c1000
1 files changed, 811 insertions, 189 deletions
diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c
index 0fdc4a16c..314e89d8b 100644
--- a/src/exchangedb/pg_reserves_in_insert.c
+++ b/src/exchangedb/pg_reserves_in_insert.c
@@ -27,6 +27,8 @@
#include "pg_start.h"
#include "pg_start_read_committed.h"
#include "pg_commit.h"
+#include "pg_preflight.h"
+#include "pg_rollback.h"
#include "pg_reserves_get.h"
#include "pg_reserves_update.h"
#include "pg_setup_wire_target.h"
@@ -34,15 +36,13 @@
/**
- * Generate event notification for the reserve
- * change.
+ * Generate event notification for the reserve change.
*
- * @param pg plugin state
* @param reserve_pub reserve to notfiy on
+ * @return string to pass to postgres for the notification
*/
-static void
-notify_on_reserve (struct PostgresClosure *pg,
- const struct TALER_ReservePublicKeyP *reserve_pub)
+static char *
+compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
{
struct TALER_ReserveEventP rep = {
.header.size = htons (sizeof (rep)),
@@ -50,246 +50,868 @@ notify_on_reserve (struct PostgresClosure *pg,
.reserve_pub = *reserve_pub
};
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Notifying on reserve!\n");
- TEH_PG_event_notify (pg,
- &rep.header,
- NULL,
- 0);
+ return GNUNET_PG_get_event_notify_channel (&rep.header);
+}
+
+
+static enum GNUNET_DB_QueryStatus
+insert1 (struct PostgresClosure *pg,
+ const struct TALER_EXCHANGEDB_ReserveInInfo reserves[1],
+ struct GNUNET_TIME_Timestamp expiry,
+ struct GNUNET_TIME_Timestamp gc,
+ struct TALER_PaytoHashP h_payto,
+ char *const *notify_s,
+ struct GNUNET_TIME_Timestamp reserve_expiration,
+ bool *transaction_duplicate,
+ bool *conflict,
+ uint64_t *reserve_uuid,
+ enum GNUNET_DB_QueryStatus results[1])
+{
+ enum GNUNET_DB_QueryStatus qs2;
+ PREPARE (pg,
+ "batch1_reserve_create",
+ "SELECT "
+ " out_reserve_found AS conflicted"
+ ",transaction_duplicate"
+ ",ruuid AS reserve_uuid"
+ " FROM exchange_do_batch_reserves_in_insert"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12);");
+
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_timestamp (&gc),
+ GNUNET_PQ_query_param_uint64 (&reserves[0].wire_reference),
+ TALER_PQ_query_param_amount (reserves[0].balance),
+ GNUNET_PQ_query_param_string (reserves[0].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[0].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[0].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+ GNUNET_PQ_query_param_string (notify_s[0]),
+ GNUNET_PQ_query_param_end
+ };
+
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_bool ("conflicted",
+ &conflict[0]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate",
+ &transaction_duplicate[0]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
+ &reserve_uuid[0]),
+ GNUNET_PQ_result_spec_end
+ };
+
+
+ TALER_payto_hash (reserves[0].sender_account_details,
+ &h_payto);
+
+ qs2 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "batch1_reserve_create",
+ params,
+ rs);
+
+ if (qs2 < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserves 1(%d)\n",
+ qs2);
+ results[0] = qs2;
+ return qs2;
+ }
+ GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs2);
+ if ((! conflict[0]) && transaction_duplicate[0])
+ {
+ GNUNET_break (0);
+ TEH_PG_rollback (pg);
+ results[0] = GNUNET_DB_STATUS_HARD_ERROR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ return qs2;
+}
+
+
+static enum GNUNET_DB_QueryStatus
+insert2 (struct PostgresClosure *pg,
+ const struct TALER_EXCHANGEDB_ReserveInInfo reserves[2],
+ struct GNUNET_TIME_Timestamp expiry,
+ struct GNUNET_TIME_Timestamp gc,
+ struct TALER_PaytoHashP h_payto,
+ char *const*notify_s,
+ struct GNUNET_TIME_Timestamp reserve_expiration,
+ bool *transaction_duplicate,
+ bool *conflict,
+ uint64_t *reserve_uuid,
+ enum GNUNET_DB_QueryStatus results[1])
+{
+ enum GNUNET_DB_QueryStatus qs1;
+ PREPARE (pg,
+ "batch2_reserve_create",
+ "SELECT "
+ "out_reserve_found AS conflicted"
+ ",out_reserve_found2 AS conflicted2"
+ ",transaction_duplicate"
+ ",transaction_duplicate2"
+ ",ruuid AS reserve_uuid"
+ ",ruuid2 AS reserve_uuid2"
+ " FROM exchange_do_batch2_reserves_insert"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22);");
+
+ struct GNUNET_PQ_QueryParam params[] = {
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_timestamp (&gc),
+ GNUNET_PQ_query_param_uint64 (&reserves[0].wire_reference),
+ TALER_PQ_query_param_amount (reserves[0].balance),
+ GNUNET_PQ_query_param_string (reserves[0].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[0].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[0].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+ GNUNET_PQ_query_param_string (notify_s[0]),
+ GNUNET_PQ_query_param_string (notify_s[1]),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[1].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[1].wire_reference),
+ TALER_PQ_query_param_amount (reserves[1].balance),
+ GNUNET_PQ_query_param_string (reserves[1].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[1].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[1].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_bool ("conflicted",
+ &conflict[0]),
+ GNUNET_PQ_result_spec_bool ("conflicted2",
+ &conflict[1]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate",
+ &transaction_duplicate[0]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate2",
+ &transaction_duplicate[1]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
+ &reserve_uuid[0]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid2",
+ &reserve_uuid[1]),
+ GNUNET_PQ_result_spec_end
+ };
+
+ TALER_payto_hash (reserves[0].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[1].sender_account_details,
+ &h_payto);
+
+
+ qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "batch2_reserve_create",
+ params,
+ rs);
+ if (qs1 < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserves 2(%d)\n",
+ qs1);
+ results[0] = qs1;
+ return qs1;
+ }
+
+ GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
+ /* results[i] = (transaction_duplicate)
+ ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
+ : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;*/
+
+ if (
+ ((! conflict[0]) && (transaction_duplicate[0]))
+ || ((! conflict[1]) && (transaction_duplicate[1]))
+ )
+ {
+ GNUNET_break (0);
+ TEH_PG_rollback (pg); // ROLLBACK
+ results[0] = GNUNET_DB_STATUS_HARD_ERROR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ return qs1;
+}
+
+
+static enum GNUNET_DB_QueryStatus
+insert4 (struct PostgresClosure *pg,
+ const struct TALER_EXCHANGEDB_ReserveInInfo reserves[4],
+ struct GNUNET_TIME_Timestamp expiry,
+ struct GNUNET_TIME_Timestamp gc,
+ struct TALER_PaytoHashP h_payto,
+ char *const*notify_s,
+ struct GNUNET_TIME_Timestamp reserve_expiration,
+ bool *transaction_duplicate,
+ bool *conflict,
+ uint64_t *reserve_uuid,
+ enum GNUNET_DB_QueryStatus results[1])
+{
+ enum GNUNET_DB_QueryStatus qs3;
+ PREPARE (pg,
+ "batch4_reserve_create",
+ "SELECT "
+ "out_reserve_found AS conflicted"
+ ",out_reserve_found2 AS conflicted2"
+ ",out_reserve_found3 AS conflicted3"
+ ",out_reserve_found4 AS conflicted4"
+ ",transaction_duplicate"
+ ",transaction_duplicate2"
+ ",transaction_duplicate3"
+ ",transaction_duplicate4"
+ ",ruuid AS reserve_uuid"
+ ",ruuid2 AS reserve_uuid2"
+ ",ruuid3 AS reserve_uuid3"
+ ",ruuid4 AS reserve_uuid4"
+ " FROM exchange_do_batch4_reserves_insert"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38,$39, $40, $41,$42);");
+
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_timestamp (&gc),
+ GNUNET_PQ_query_param_uint64 (&reserves[0].wire_reference),
+ TALER_PQ_query_param_amount (reserves[0].balance),
+ GNUNET_PQ_query_param_string (reserves[0].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[0].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[0].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+ GNUNET_PQ_query_param_string (notify_s[0]),
+ GNUNET_PQ_query_param_string (notify_s[1]),
+ GNUNET_PQ_query_param_string (notify_s[2]),
+ GNUNET_PQ_query_param_string (notify_s[3]),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[1].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[1].wire_reference),
+ TALER_PQ_query_param_amount (reserves[1].balance),
+ GNUNET_PQ_query_param_string (reserves[1].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[1].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[1].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[2].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[2].wire_reference),
+ TALER_PQ_query_param_amount (reserves[2].balance),
+ GNUNET_PQ_query_param_string (reserves[2].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[2].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[2].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[3].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[3].wire_reference),
+ TALER_PQ_query_param_amount (reserves[3].balance),
+ GNUNET_PQ_query_param_string (reserves[3].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[3].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[3].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_end
+ };
+
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_bool ("conflicted",
+ &conflict[0]),
+ GNUNET_PQ_result_spec_bool ("conflicted2",
+ &conflict[1]),
+ GNUNET_PQ_result_spec_bool ("conflicted3",
+ &conflict[2]),
+ GNUNET_PQ_result_spec_bool ("conflicted4",
+ &conflict[3]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate",
+ &transaction_duplicate[0]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate2",
+ &transaction_duplicate[1]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate3",
+ &transaction_duplicate[2]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate4",
+ &transaction_duplicate[3]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
+ &reserve_uuid[0]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid2",
+ &reserve_uuid[1]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid3",
+ &reserve_uuid[2]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid4",
+ &reserve_uuid[3]),
+ GNUNET_PQ_result_spec_end
+ };
+
+ TALER_payto_hash (reserves[0].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[1].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[2].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[3].sender_account_details,
+ &h_payto);
+
+ qs3 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "batch4_reserve_create",
+ params,
+ rs);
+ if (qs3 < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserves4 (%d)\n",
+ qs3);
+ results[0] = qs3;
+ return qs3;
+ }
+
+ GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs3);
+
+ if (
+ ((! conflict[0]) && (transaction_duplicate[0]))
+ || ((! conflict[1]) && (transaction_duplicate[1]))
+ || ((! conflict[2]) && (transaction_duplicate[2]))
+ || ((! conflict[3]) && (transaction_duplicate[3]))
+ )
+ {
+ GNUNET_break (0);
+ TEH_PG_rollback (pg);
+ results[0] = GNUNET_DB_STATUS_HARD_ERROR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ return qs3;
+}
+
+
+static enum GNUNET_DB_QueryStatus
+insert8 (struct PostgresClosure *pg,
+ const struct TALER_EXCHANGEDB_ReserveInInfo reserves[8],
+ struct GNUNET_TIME_Timestamp expiry,
+ struct GNUNET_TIME_Timestamp gc,
+ struct TALER_PaytoHashP h_payto,
+ char *const*notify_s,
+ struct GNUNET_TIME_Timestamp reserve_expiration,
+ bool *transaction_duplicate,
+ bool *conflict,
+ uint64_t *reserve_uuid,
+ enum GNUNET_DB_QueryStatus results[1])
+{
+ enum GNUNET_DB_QueryStatus qs3;
+ PREPARE (pg,
+ "batch8_reserve_create",
+ "SELECT "
+ "out_reserve_found AS conflicted"
+ ",out_reserve_found2 AS conflicted2"
+ ",out_reserve_found3 AS conflicted3"
+ ",out_reserve_found4 AS conflicted4"
+ ",out_reserve_found5 AS conflicted5"
+ ",out_reserve_found6 AS conflicted6"
+ ",out_reserve_found7 AS conflicted7"
+ ",out_reserve_found8 AS conflicted8"
+ ",transaction_duplicate"
+ ",transaction_duplicate2"
+ ",transaction_duplicate3"
+ ",transaction_duplicate4"
+ ",transaction_duplicate5"
+ ",transaction_duplicate6"
+ ",transaction_duplicate7"
+ ",transaction_duplicate8"
+ ",ruuid AS reserve_uuid"
+ ",ruuid2 AS reserve_uuid2"
+ ",ruuid3 AS reserve_uuid3"
+ ",ruuid4 AS reserve_uuid4"
+ ",ruuid5 AS reserve_uuid5"
+ ",ruuid6 AS reserve_uuid6"
+ ",ruuid7 AS reserve_uuid7"
+ ",ruuid8 AS reserve_uuid8"
+ " FROM exchange_do_batch8_reserves_insert"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18,$19,$20,$21,$22,$23,$24,$25,$26,$27,$28,$29,$30,$31,$32,$33,$34,$35,$36,$37,$38,$39, $40, $41,$42,$43,$44,$45,$46,$47,$48,$49,$50,$51,$52,$53,$54,$55,$56,$57,$58,$59,$60,$61,$62,$63,$64,$65,$66,$67,$68,$69,$70,$71,$72,$73,$74,$75,$76,$77,$78,$79,$80,$81,$82);");
+
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (reserves[0].reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_timestamp (&gc),
+ GNUNET_PQ_query_param_uint64 (&reserves[0].wire_reference),
+ TALER_PQ_query_param_amount (reserves[0].balance),
+ GNUNET_PQ_query_param_string (reserves[0].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[0].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[0].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+ GNUNET_PQ_query_param_string (notify_s[0]),
+ GNUNET_PQ_query_param_string (notify_s[1]),
+ GNUNET_PQ_query_param_string (notify_s[2]),
+ GNUNET_PQ_query_param_string (notify_s[3]),
+ GNUNET_PQ_query_param_string (notify_s[4]),
+ GNUNET_PQ_query_param_string (notify_s[5]),
+ GNUNET_PQ_query_param_string (notify_s[6]),
+ GNUNET_PQ_query_param_string (notify_s[7]),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[1].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[1].wire_reference),
+ TALER_PQ_query_param_amount (reserves[1].balance),
+ GNUNET_PQ_query_param_string (reserves[1].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[1].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[1].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[2].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[2].wire_reference),
+ TALER_PQ_query_param_amount (reserves[2].balance),
+ GNUNET_PQ_query_param_string (reserves[2].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[2].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[2].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[3].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[3].wire_reference),
+ TALER_PQ_query_param_amount (reserves[3].balance),
+ GNUNET_PQ_query_param_string (reserves[3].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[3].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[3].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[4].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[4].wire_reference),
+ TALER_PQ_query_param_amount (reserves[4].balance),
+ GNUNET_PQ_query_param_string (reserves[4].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[4].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[4].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[5].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[5].wire_reference),
+ TALER_PQ_query_param_amount (reserves[5].balance),
+ GNUNET_PQ_query_param_string (reserves[5].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[5].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[5].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[6].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[6].wire_reference),
+ TALER_PQ_query_param_amount (reserves[6].balance),
+ GNUNET_PQ_query_param_string (reserves[6].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[6].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[6].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_auto_from_type (reserves[7].reserve_pub),
+ GNUNET_PQ_query_param_uint64 (&reserves[7].wire_reference),
+ TALER_PQ_query_param_amount (reserves[7].balance),
+ GNUNET_PQ_query_param_string (reserves[7].exchange_account_name),
+ GNUNET_PQ_query_param_timestamp (&reserves[7].execution_time),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (reserves[7].sender_account_details),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+
+ GNUNET_PQ_query_param_end
+ };
+
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_bool ("conflicted",
+ &conflict[0]),
+ GNUNET_PQ_result_spec_bool ("conflicted2",
+ &conflict[1]),
+ GNUNET_PQ_result_spec_bool ("conflicted3",
+ &conflict[2]),
+ GNUNET_PQ_result_spec_bool ("conflicted4",
+ &conflict[3]),
+ GNUNET_PQ_result_spec_bool ("conflicted5",
+ &conflict[4]),
+ GNUNET_PQ_result_spec_bool ("conflicted6",
+ &conflict[5]),
+ GNUNET_PQ_result_spec_bool ("conflicted7",
+ &conflict[6]),
+ GNUNET_PQ_result_spec_bool ("conflicted8",
+ &conflict[7]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate",
+ &transaction_duplicate[0]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate2",
+ &transaction_duplicate[1]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate3",
+ &transaction_duplicate[2]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate4",
+ &transaction_duplicate[3]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate5",
+ &transaction_duplicate[4]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate6",
+ &transaction_duplicate[5]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate7",
+ &transaction_duplicate[6]),
+ GNUNET_PQ_result_spec_bool ("transaction_duplicate8",
+ &transaction_duplicate[7]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
+ &reserve_uuid[0]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid2",
+ &reserve_uuid[1]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid3",
+ &reserve_uuid[2]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid4",
+ &reserve_uuid[3]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid5",
+ &reserve_uuid[4]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid6",
+ &reserve_uuid[5]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid7",
+ &reserve_uuid[6]),
+ GNUNET_PQ_result_spec_uint64 ("reserve_uuid8",
+ &reserve_uuid[7]),
+ GNUNET_PQ_result_spec_end
+ };
+
+ TALER_payto_hash (reserves[0].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[1].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[2].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[3].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[4].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[5].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[6].sender_account_details,
+ &h_payto);
+ TALER_payto_hash (reserves[7].sender_account_details,
+ &h_payto);
+
+ qs3 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "batch8_reserve_create",
+ params,
+ rs);
+ if (qs3 < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserves8 (%d)\n",
+ qs3);
+ results[0] = qs3;
+ return qs3;
+ }
+
+ GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs3);
+ /* results[i] = (transaction_duplicate)
+ ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
+ : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;*/
+
+ if (
+ ((! conflict[0]) && (transaction_duplicate[0]))
+ || ((! conflict[1]) && (transaction_duplicate[1]))
+ || ((! conflict[2]) && (transaction_duplicate[2]))
+ || ((! conflict[3]) && (transaction_duplicate[3]))
+ || ((! conflict[4]) && (transaction_duplicate[4]))
+ || ((! conflict[5]) && (transaction_duplicate[5]))
+ || ((! conflict[6]) && (transaction_duplicate[6]))
+ || ((! conflict[7]) && (transaction_duplicate[7]))
+ )
+ {
+ GNUNET_break (0);
+ TEH_PG_rollback (pg);
+ results[0] = GNUNET_DB_STATUS_HARD_ERROR;
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ results[0] = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ return qs3;
}
enum GNUNET_DB_QueryStatus
TEH_PG_reserves_in_insert (void *cls,
- const struct TALER_ReservePublicKeyP *reserve_pub,
- const struct TALER_Amount *balance,
- struct GNUNET_TIME_Timestamp execution_time,
- const char *sender_account_details,
- const char *exchange_account_section,
- uint64_t wire_ref)
+ const struct
+ TALER_EXCHANGEDB_ReserveInInfo *reserves,
+ unsigned int reserves_length,
+ unsigned int batch_size,
+ enum GNUNET_DB_QueryStatus *results)
{
struct PostgresClosure *pg = cls;
enum GNUNET_DB_QueryStatus qs1;
- struct TALER_EXCHANGEDB_Reserve reserve;
+ enum GNUNET_DB_QueryStatus qs2;
+ enum GNUNET_DB_QueryStatus qs4;
+ enum GNUNET_DB_QueryStatus qs5;
struct GNUNET_TIME_Timestamp expiry;
struct GNUNET_TIME_Timestamp gc;
- uint64_t reserve_uuid;
+ struct TALER_PaytoHashP h_payto;
+ uint64_t reserve_uuid[reserves_length];
+ bool transaction_duplicate[reserves_length];
+ bool need_update = false;
+ bool t_duplicate = false;
+ struct GNUNET_TIME_Timestamp reserve_expiration
+ = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
+ bool conflicts[reserves_length];
+ char *notify_s[reserves_length];
+
+ if (GNUNET_OK !=
+ TEH_PG_preflight (pg))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
- reserve.pub = *reserve_pub;
expiry = GNUNET_TIME_absolute_to_timestamp (
- GNUNET_TIME_absolute_add (execution_time.abs_time,
+ GNUNET_TIME_absolute_add (reserves->execution_time.abs_time,
pg->idle_reserve_expiration_time));
gc = GNUNET_TIME_absolute_to_timestamp (
GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (),
pg->legal_reserve_expiration_time));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating reserve %s with expiration in %s\n",
- TALER_B2S (reserve_pub),
+ TALER_B2S (&(reserves->reserve_pub)),
GNUNET_STRINGS_relative_time_to_string (
pg->idle_reserve_expiration_time,
GNUNET_NO));
+
+ if (GNUNET_OK !=
+ TEH_PG_start_read_committed (pg,
+ "READ_COMMITED"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+
/* Optimistically assume this is a new reserve, create balance for the first
time; we do this before adding the actual transaction to "reserves_in",
as for a new reserve it can't be a duplicate 'add' operation, and as
the 'add' operation needs the reserve entry as a foreign key. */
+ for (unsigned int i = 0; i<reserves_length; i++)
{
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (reserve_pub),
- TALER_PQ_query_param_amount (balance),
- GNUNET_PQ_query_param_timestamp (&expiry),
- GNUNET_PQ_query_param_timestamp (&gc),
- GNUNET_PQ_query_param_end
- };
- struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
- &reserve_uuid),
- GNUNET_PQ_result_spec_end
- };
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Reserve does not exist; creating a new one\n");
- /* Note: query uses 'on conflict do nothing' */
- PREPARE (pg,
- "reserve_create",
- "INSERT INTO reserves "
- "(reserve_pub"
- ",current_balance_val"
- ",current_balance_frac"
- ",expiration_date"
- ",gc_date"
- ") VALUES "
- "($1, $2, $3, $4, $5)"
- " ON CONFLICT DO NOTHING"
- " RETURNING reserve_uuid;");
- qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
- "reserve_create",
- params,
- rs);
- if (qs1 < 0)
- return qs1;
+ const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
+
+ notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
}
- /* Create new incoming transaction, "ON CONFLICT DO NOTHING"
- is again used to guard against duplicates. */
+ unsigned int i = 0;
+
+ while (i < reserves_length)
{
- enum GNUNET_DB_QueryStatus qs2;
- enum GNUNET_DB_QueryStatus qs3;
- struct TALER_PaytoHashP h_payto;
-
- qs3 = TEH_PG_setup_wire_target (pg,
- sender_account_details,
- &h_payto);
- if (qs3 < 0)
- return qs3;
- /* We do not have the UUID, so insert by public key */
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (&reserve.pub),
- GNUNET_PQ_query_param_uint64 (&wire_ref),
- TALER_PQ_query_param_amount (balance),
- GNUNET_PQ_query_param_string (exchange_account_section),
- GNUNET_PQ_query_param_auto_from_type (&h_payto),
- GNUNET_PQ_query_param_timestamp (&execution_time),
- GNUNET_PQ_query_param_end
- };
-
- PREPARE (pg,
- "reserves_in_add_transaction",
- "INSERT INTO reserves_in "
- "(reserve_pub"
- ",wire_reference"
- ",credit_val"
- ",credit_frac"
- ",exchange_account_section"
- ",wire_source_h_payto"
- ",execution_date"
- ") VALUES ($1, $2, $3, $4, $5, $6, $7)"
- " ON CONFLICT DO NOTHING;");
- qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn,
- "reserves_in_add_transaction",
- params);
- /* qs2 could be 0 as statement used 'ON CONFLICT DO NOTHING' */
- if (0 >= qs2)
+ unsigned int bs = GNUNET_MIN (batch_size,
+ reserves_length - i);
+ if (bs >= 8)
{
- if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) &&
- (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) )
+ qs1 = insert8 (pg,
+ &reserves[i],
+ expiry,
+ gc,
+ h_payto,
+ &notify_s[i],
+ reserve_expiration,
+ &transaction_duplicate[i],
+ &conflicts[i],
+ &reserve_uuid[i],
+ &results[i]);
+
+ if (qs1<0)
{
- /* Conflict for the transaction, but the reserve was
- just now created, that should be impossible. */
- GNUNET_break (0); /* should be impossible: reserve was fresh,
- but transaction already known */
- return GNUNET_DB_STATUS_HARD_ERROR;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserve batch_8 (%d)\n",
+ qs1);
+ return qs1;
}
- /* Transaction was already known or error. We are finished. */
- return qs2;
+ need_update |= conflicts[i];
+ need_update |= conflicts[i + 1];
+ need_update |= conflicts[i + 2];
+ need_update |= conflicts[i + 3];
+ need_update |= conflicts[i + 4];
+ need_update |= conflicts[i + 5];
+ need_update |= conflicts[i + 6];
+ need_update |= conflicts[i + 7];
+ t_duplicate |= transaction_duplicate[i];
+ t_duplicate |= transaction_duplicate[i + 1];
+ t_duplicate |= transaction_duplicate[i + 2];
+ t_duplicate |= transaction_duplicate[i + 3];
+ t_duplicate |= transaction_duplicate[i + 4];
+ t_duplicate |= transaction_duplicate[i + 5];
+ t_duplicate |= transaction_duplicate[i + 6];
+ t_duplicate |= transaction_duplicate[i + 7];
+ i += 8;
+ continue;
}
- }
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs1)
- {
- /* New reserve, we are finished */
- notify_on_reserve (pg,
- reserve_pub);
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- }
+ switch (bs)
+ {
+ case 7:
+ case 6:
+ case 5:
+ case 4:
+ qs4 = insert4 (pg,
+ &reserves[i],
+ expiry,
+ gc,
+ h_payto,
+ &notify_s[i],
+ reserve_expiration,
+ &transaction_duplicate[i],
+ &conflicts[i],
+ &reserve_uuid[i],
+ &results[i]);
- /* we were wrong with our optimistic assumption:
- reserve did already exist, need to do an update instead */
+ if (qs4<0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserve batch_4 (%d)\n",
+ qs4);
+ return qs4;
+ }
+ need_update |= conflicts[i];
+ need_update |= conflicts[i + 1];
+ need_update |= conflicts[i + 2];
+ need_update |= conflicts[i + 3];
+ t_duplicate |= transaction_duplicate[i];
+ t_duplicate |= transaction_duplicate[i + 1];
+ t_duplicate |= transaction_duplicate[i + 2];
+ t_duplicate |= transaction_duplicate[i + 3];
+ // fprintf(stdout, "reserve_uuid : %ld %ld %ld %ld\n", reserve_uuid[i], reserve_uuid[i+1], reserve_uuid[i+2], reserve_uuid[i+3]);
+ i += 4;
+ break;
+ case 3:
+ case 2:
+ qs5 = insert2 (pg,
+ &reserves[i],
+ expiry,
+ gc,
+ h_payto,
+ &notify_s[i],
+ reserve_expiration,
+ &transaction_duplicate[i],
+ &conflicts[i],
+ &reserve_uuid[i],
+ &results[i]);
+ if (qs5<0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserve batch_2 (%d)\n",
+ qs5);
+ return qs5;
+ }
+ need_update |= conflicts[i];
+ need_update |= conflicts[i + 1];
+ t_duplicate |= transaction_duplicate[i];
+ t_duplicate |= transaction_duplicate[i + 1];
+ results[i] = (t_duplicate)
+ ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
+ : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ // fprintf(stdout, "reserve_uuid : %ld %ld\n", reserve_uuid[i], reserve_uuid[i+1]);
+ i += 2;
+ break;
+ case 1:
+ qs2 = insert1 (pg,
+ &reserves[i],
+ expiry,
+ gc,
+ h_payto,
+ &notify_s[i],
+ reserve_expiration,
+ &transaction_duplicate[i],
+ &conflicts[i],
+ &reserve_uuid[i],
+ &results[i]);
+ if (qs2<0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to create reserve batch_1 (%d)\n)"
+ ,qs2);
+ return qs2;
+ }
+ need_update |= conflicts[i];
+ t_duplicate |= transaction_duplicate[i];
+ // fprintf(stdout, "reserve uuid : %ld c :%d t:%d\n", reserve_uuid[i], conflicts[i], transaction_duplicate[i]);
+ i += 1;
+ break;
+ case 0:
+ GNUNET_assert (0);
+ break;
+ }
+ } /* end while */
+ // commit
{
- /* We need to move away from 'read committed' to serializable.
- Also, we know that it should be safe to commit at this point.
- (We are only run in a larger transaction for performance.) */
enum GNUNET_DB_QueryStatus cs;
cs = TEH_PG_commit (pg);
if (cs < 0)
- return cs;
- if (GNUNET_OK !=
- TEH_PG_start (pg,
- "reserve-update-serializable"))
{
- GNUNET_break (0);
- return GNUNET_DB_STATUS_HARD_ERROR;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit\n");
+ return cs;
}
}
+ if (! need_update)
{
- enum GNUNET_DB_QueryStatus reserve_exists;
-
- reserve_exists = TEH_PG_reserves_get (pg,
- &reserve);
- switch (reserve_exists)
+ goto exit;
+ }
+ // begin serializable
+ {
+ if (GNUNET_OK !=
+ TEH_PG_start (pg,
+ "reserve-insert-continued"))
{
- case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
- return reserve_exists;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- return reserve_exists;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- /* First we got a conflict, but then we cannot select? Very strange. */
- GNUNET_break (0);
- return GNUNET_DB_STATUS_SOFT_ERROR;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* continued below */
- break;
+ return GNUNET_DB_STATUS_HARD_ERROR;
}
}
+ enum GNUNET_DB_QueryStatus qs3;
+ PREPARE (pg,
+ "reserves_update",
+ "SELECT"
+ " out_duplicate AS duplicate "
+ "FROM exchange_do_batch_reserves_update"
+ " ($1,$2,$3,$4,$5,$6,$7,$8);");
+ for (unsigned int i = 0; i<reserves_length; i++)
{
- struct TALER_EXCHANGEDB_Reserve updated_reserve;
- enum GNUNET_DB_QueryStatus qs3;
-
- /* If the reserve already existed, we need to still update the
- balance; we do this after checking for duplication, as
- otherwise we might have to actually pay the cost to roll this
- back for duplicate transactions; like this, we should virtually
- never actually have to rollback anything. */
- updated_reserve.pub = reserve.pub;
- if (0 >
- TALER_amount_add (&updated_reserve.balance,
- &reserve.balance,
- balance))
+ if (! conflicts[i])
+ continue;
{
- /* currency overflow or incompatible currency */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Attempt to deposit incompatible amount into reserve\n");
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
- updated_reserve.expiry = GNUNET_TIME_timestamp_max (expiry,
- reserve.expiry);
- updated_reserve.gc = GNUNET_TIME_timestamp_max (gc,
- reserve.gc);
- qs3 = TEH_PG_reserves_update (pg,
- &updated_reserve);
- switch (qs3)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- return qs3;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- return qs3;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- /* How can the UPDATE not work here? Very strange. */
- GNUNET_break (0);
- return GNUNET_DB_STATUS_HARD_ERROR;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* continued below */
- break;
+ bool duplicate;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (reserves[i].reserve_pub),
+ GNUNET_PQ_query_param_timestamp (&expiry),
+ GNUNET_PQ_query_param_uint64 (&reserves[i].wire_reference),
+ TALER_PQ_query_param_amount (reserves[i].balance),
+ GNUNET_PQ_query_param_string (reserves[i].exchange_account_name),
+ GNUNET_PQ_query_param_auto_from_type (&h_payto),
+ GNUNET_PQ_query_param_string (notify_s[i]),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_bool ("duplicate",
+ &duplicate),
+ GNUNET_PQ_result_spec_end
+ };
+ qs3 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "reserves_update",
+ params,
+ rs);
+ if (qs3<0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to update reserves (%d)\n",
+ qs3);
+ results[i] = qs3;
+ return qs3;
+ }
+ results[i] = duplicate
+ ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
+ : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
}
- notify_on_reserve (pg,
- reserve_pub);
- /* Go back to original transaction mode */
+
{
enum GNUNET_DB_QueryStatus cs;
cs = TEH_PG_commit (pg);
if (cs < 0)
return cs;
- if (GNUNET_OK !=
- TEH_PG_start_read_committed (pg,
- "reserve-insert-continued"))
- {
- GNUNET_break (0);
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
}
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+
+exit:
+ for (unsigned int i = 0; i<reserves_length; i++)
+ GNUNET_free (notify_s[i]);
+
+ return reserves_length;
}