From bac71237632d31487c075a29f9e95d95ee7909bd Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 18 May 2023 14:45:28 +0200 Subject: array-based do_reserves_in_insert --- src/exchangedb/exchange_do_reserves_in_insert.sql | 141 ++++++++++++++++++++++ src/exchangedb/pg_reserves_in_insert.c | 89 ++++++++++++-- 2 files changed, 222 insertions(+), 8 deletions(-) diff --git a/src/exchangedb/exchange_do_reserves_in_insert.sql b/src/exchangedb/exchange_do_reserves_in_insert.sql index dffcd8b55..bc1431ad5 100644 --- a/src/exchangedb/exchange_do_reserves_in_insert.sql +++ b/src/exchangedb/exchange_do_reserves_in_insert.sql @@ -963,3 +963,144 @@ BEGIN CLOSE curs_transaction_exist; RETURN; END $$; + + + + + + + + + + + + + +CREATE OR REPLACE FUNCTION exchange_do_array_reserves_insert( + IN in_gc_date INT8, + IN in_reserve_expiration INT8, + IN ina_reserve_pub BYTEA[], + IN ina_wire_ref INT8[], + IN ina_credit_val INT8[], + IN ina_credit_frac INT4[], + IN ina_exchange_account_name VARCHAR[], + IN ina_execution_date INT8[], + IN ina_wire_source_h_payto BYTEA[], + IN ina_payto_uri VARCHAR[], + IN ina_notify TEXT[], + OUT transaction_duplicate BOOLEAN, + OUT ruuid INT8) +LANGUAGE plpgsql +AS $$ +DECLARE + curs REFCURSOR; +DECLARE + conflict BOOL; +DECLARE + dup BOOL; +DECLARE + uuid INT8; +DECLARE + i RECORD; +BEGIN + + INSERT INTO wire_targets + (wire_target_h_payto + ,payto_uri) + SELECT + wire_source_h_payto + ,payto_uri + FROM + UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto + ,UNNEST (ina_payto_uri) AS payto_uri + ON CONFLICT DO NOTHING; + + OPEN curs FOR + WITH reserve_changes AS ( + SELECT + reserve_pub + ,wire_ref + ,credit_val + ,credit_frac + ,exchange_account_name + ,execution_date + ,wire_source_h_payto + ,payto_uri + ,notify + FROM + UNNEST (ina_reserve_pub) AS reserve_pub + ,UNNEST (ina_wire_ref) AS wire_ref + ,UNNEST (ina_credit_val) AS credit_val + ,UNNEST (ina_credit_frac) AS credit_frac + ,UNNEST (ina_exchange_account_name) AS exchange_account_name + ,UNNEST (ina_execution_date) AS execution_date + ,UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto + ,UNNEST (ina_notify) AS notify; + + + <> LOOP + FETCH FROM curs INTO i; + IF NOT FOUND + THEN + EXIT loop; + END IF; + + INSERT INTO reserves + (reserve_pub + ,current_balance_val + ,current_balance_frac + ,expiration_date + ,gc_date + ) VALUES ( + i.reserve_pub + ,i.credit_val + ,i.credit_frac + ,in_reserve_expiration + ,in_gc_date + ) + ON CONFLICT DO NOTHING + RETURNING reserve_uuid + INTO uuid; + conflict = NOT FOUND; + + INSERT INTO reserves_in + (reserve_pub + ,wire_reference + ,credit_val + ,credit_frac + ,exchange_account_section + ,wire_source_h_payto + ,execution_date + ) VALUES ( + i.reserve_pub + ,i.wire_reference + ,i.credit_val + ,i.credit_frac + ,i.exchange_account_section + ,i.wire_source_h_payto + ,i.execution_date + ON CONFLICT DO NOTHING; + + IF NOT FOUND + THEN + IF conflict + THEN + dup = TRUE; + else + dup = FALSE; + END IF; + ELSE + IF NOT conflict + THEN + EXECUTE FORMAT ( + 'NOTIFY %s' + ,i.notify); + END IF; + dup = FALSE; + END IF; + RETURN (dup,uuid); + END LOOP loop_reserve; + CLOSE curs; + + RETURN; +END $$; diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c index 1b7e62d99..72fde7499 100644 --- a/src/exchangedb/pg_reserves_in_insert.c +++ b/src/exchangedb/pg_reserves_in_insert.c @@ -619,12 +619,36 @@ TEH_PG_reserves_in_insert ( #if 0 +/** + * Closure for our helper_cb() + */ struct Context { + /** + * Array of reserve UUIDs to initialize. + */ uint64_t *reserve_uuids; + + /** + * Array with entries set to 'true' for duplicate transactions. + */ bool *transaction_duplicates; + + /** + * Array with entries set to 'true' for rows with conflicts. + */ bool *conflicts; + + /** + * Set to #GNUNET_SYSERR on failures. + */ struct GNUNET_GenericReturnValue status; + + /** + * Single value (no array) set to true if we need + * to follow-up with an update. + */ + bool *needs_update; }; @@ -665,6 +689,7 @@ helper_cb (void *cls, ctx->status = GNUNET_SYSERR; return; } + *ctx->need_update |= ctx->conflicts[i]; } } @@ -685,7 +710,6 @@ TEH_PG_reserves_in_insertN ( const char *sender_account_details[GNUNET_NZL (reserves_length)]; const char *exchange_account_names[GNUNET_NZL (reserves_length)]; uint64_t wire_references[GNUNET_NZL (reserves_length)]; - uint64_t reserve_uuids[GNUNET_NZL (reserves_length)]; bool transaction_duplicates[GNUNET_NZL (reserves_length)]; bool conflicts[GNUNET_NZL (reserves_length)]; @@ -693,6 +717,7 @@ TEH_PG_reserves_in_insertN ( = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); struct GNUNET_TIME_Timestamp gc = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); + bool needs_update = false; enum GNUNET_DB_QueryStatus qs; for (unsigned int i = 0; iexchange_account_name; wire_references[i] = reserve->wire_reference; } + + /* NOTE: kind-of pointless to explicitly start a transaction here... */ + if (GNUNET_OK != + TEH_PG_preflight (pg)) + { + GNUNET_break (0); + qs = GNUNET_DB_STATUS_HARD_ERROR; + goto finished; + } + + if (GNUNET_OK != + TEH_PG_start_read_committed (pg, + "READ_COMMITED")) + { + GNUNET_break (0); + qs = GNUNET_DB_STATUS_HARD_ERROR; + goto finished; + } + PREPARE (pg, "reserves_insert_with_array", "SELECT" @@ -752,6 +796,7 @@ TEH_PG_reserves_in_insertN ( .reserve_uuids = reserve_uuids, .transaction_duplicates = transaction_duplicates, .conflicts = conflicts, + .needs_update = &needs_update, .status = GNUNET_OK }; @@ -766,12 +811,42 @@ TEH_PG_reserves_in_insertN ( GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to insert into reserves (%d)\n", qs); - for (unsigned int i = 0; i