From cb87b6f646888bf62af31e8b23bb642de9a57344 Mon Sep 17 00:00:00 2001 From: Joseph Date: Mon, 27 Mar 2023 07:23:27 -0400 Subject: New spi files --- src/exchangedb/exchange_do_batch_coin_known.sql | 477 ++++++++++++++ src/exchangedb/exchange_get_ready_deposit.sql | 60 ++ src/exchangedb/spi/README.md | 41 ++ src/exchangedb/spi/own_test.bc | Bin 0 -> 22876 bytes src/exchangedb/spi/own_test.c | 818 ++++++++++++++++++++++++ src/exchangedb/spi/own_test.control | 4 + src/exchangedb/spi/own_test.so | Bin 0 -> 76824 bytes src/exchangedb/spi/own_test.sql | 216 +++++++ src/exchangedb/spi/perf_own_test.c | 25 + src/exchangedb/spi/pg_aggregate.c | 389 +++++++++++ 10 files changed, 2030 insertions(+) create mode 100644 src/exchangedb/exchange_do_batch_coin_known.sql create mode 100644 src/exchangedb/exchange_get_ready_deposit.sql create mode 100644 src/exchangedb/spi/README.md create mode 100644 src/exchangedb/spi/own_test.bc create mode 100644 src/exchangedb/spi/own_test.c create mode 100644 src/exchangedb/spi/own_test.control create mode 100755 src/exchangedb/spi/own_test.so create mode 100644 src/exchangedb/spi/own_test.sql create mode 100644 src/exchangedb/spi/perf_own_test.c create mode 100644 src/exchangedb/spi/pg_aggregate.c (limited to 'src') diff --git a/src/exchangedb/exchange_do_batch_coin_known.sql b/src/exchangedb/exchange_do_batch_coin_known.sql new file mode 100644 index 000000000..38d795959 --- /dev/null +++ b/src/exchangedb/exchange_do_batch_coin_known.sql @@ -0,0 +1,477 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2014--2022 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see +-- + +CREATE OR REPLACE FUNCTION exchange_do_batch4_known_coin( + IN in_coin_pub1 BYTEA, + IN in_denom_pub_hash1 BYTEA, + IN in_h_age_commitment1 BYTEA, + IN in_denom_sig1 BYTEA, + IN in_coin_pub2 BYTEA, + IN in_denom_pub_hash2 BYTEA, + IN in_h_age_commitment2 BYTEA, + IN in_denom_sig2 BYTEA, + IN in_coin_pub3 BYTEA, + IN in_denom_pub_hash3 BYTEA, + IN in_h_age_commitment3 BYTEA, + IN in_denom_sig3 BYTEA, + IN in_coin_pub4 BYTEA, + IN in_denom_pub_hash4 BYTEA, + IN in_h_age_commitment4 BYTEA, + IN in_denom_sig4 BYTEA, + OUT existed1 BOOLEAN, + OUT existed2 BOOLEAN, + OUT existed3 BOOLEAN, + OUT existed4 BOOLEAN, + OUT known_coin_id1 INT8, + OUT known_coin_id2 INT8, + OUT known_coin_id3 INT8, + OUT known_coin_id4 INT8, + OUT denom_pub_hash1 BYTEA, + OUT denom_pub_hash2 BYTEA, + OUT denom_pub_hash3 BYTEA, + OUT denom_pub_hash4 BYTEA, + OUT age_commitment_hash1 BYTEA, + OUT age_commitment_hash2 BYTEA, + OUT age_commitment_hash3 BYTEA, + OUT age_commitment_hash4 BYTEA) +LANGUAGE plpgsql +AS $$ +BEGIN +WITH dd AS ( +SELECT + denominations_serial, + coin_val, coin_frac + FROM denominations + WHERE denom_pub_hash + IN + (in_denom_pub_hash1, + in_denom_pub_hash2, + in_denom_pub_hash3, + in_denom_pub_hash4) + ),--dd + input_rows AS ( + VALUES + (in_coin_pub1, + in_denom_pub_hash1, + in_h_age_commitment1, + in_denom_sig1), + (in_coin_pub2, + in_denom_pub_hash2, + in_h_age_commitment2, + in_denom_sig2), + (in_coin_pub3, + in_denom_pub_hash3, + in_h_age_commitment3, + in_denom_sig3), + (in_coin_pub4, + in_denom_pub_hash4, + in_h_age_commitment4, + in_denom_sig4) + ),--ir + ins AS ( + INSERT INTO known_coins ( + coin_pub, + denominations_serial, + age_commitment_hash, + denom_sig, + remaining_val, + remaining_frac + ) + SELECT + ir.coin_pub, + dd.denominations_serial, + ir.age_commitment_hash, + ir.denom_sig, + dd.coin_val, + dd.coin_frac + FROM input_rows ir + JOIN dd + ON dd.denom_pub_hash = ir.denom_pub_hash + ON CONFLICT DO NOTHING + RETURNING known_coin_id + ),--kc + exists AS ( + SELECT + CASE + WHEN + ins.known_coin_id IS NOT NULL + THEN + FALSE + ELSE + TRUE + END AS existed, + ins.known_coin_id, + dd.denom_pub_hash, + kc.age_commitment_hash + FROM input_rows ir + LEFT JOIN ins + ON ins.coin_pub = ir.coin_pub + LEFT JOIN known_coins kc + ON kc.coin_pub = ir.coin_pub + LEFT JOIN dd + ON dd.denom_pub_hash = ir.denom_pub_hash + )--exists +SELECT + exists.existed AS existed1, + exists.known_coin_id AS known_coin_id1, + exists.denom_pub_hash AS denom_pub_hash1, + exists.age_commitment_hash AS age_commitment_hash1, + ( + SELECT exists.existed + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash2 + ) AS existed2, + ( + SELECT exists.known_coin_id + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash2 + ) AS known_coin_id2, + ( + SELECT exists.denom_pub_hash + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash2 + ) AS denom_pub_hash2, + ( + SELECT exists.age_commitment_hash + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash2 + )AS age_commitment_hash2, + ( + SELECT exists.existed + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash3 + ) AS existed3, + ( + SELECT exists.known_coin_id + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash3 + ) AS known_coin_id3, + ( + SELECT exists.denom_pub_hash + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash3 + ) AS denom_pub_hash3, + ( + SELECT exists.age_commitment_hash + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash3 + )AS age_commitment_hash3, + ( + SELECT exists.existed + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash4 + ) AS existed4, + ( + SELECT exists.known_coin_id + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash4 + ) AS known_coin_id4, + ( + SELECT exists.denom_pub_hash + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash4 + ) AS denom_pub_hash4, + ( + SELECT exists.age_commitment_hash + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash4 + )AS age_commitment_hash4 +FROM exists; + +RETURN; +END $$; + + +CREATE OR REPLACE FUNCTION exchange_do_batch2_known_coin( + IN in_coin_pub1 BYTEA, + IN in_denom_pub_hash1 BYTEA, + IN in_h_age_commitment1 BYTEA, + IN in_denom_sig1 BYTEA, + IN in_coin_pub2 BYTEA, + IN in_denom_pub_hash2 BYTEA, + IN in_h_age_commitment2 BYTEA, + IN in_denom_sig2 BYTEA, + OUT existed1 BOOLEAN, + OUT existed2 BOOLEAN, + OUT known_coin_id1 INT8, + OUT known_coin_id2 INT8, + OUT denom_pub_hash1 BYTEA, + OUT denom_pub_hash2 BYTEA, + OUT age_commitment_hash1 BYTEA, + OUT age_commitment_hash2 BYTEA) +LANGUAGE plpgsql +AS $$ +BEGIN +WITH dd AS ( +SELECT + denominations_serial, + coin_val, coin_frac + FROM denominations + WHERE denom_pub_hash + IN + (in_denom_pub_hash1, + in_denom_pub_hash2) + ),--dd + input_rows AS ( + VALUES + (in_coin_pub1, + in_denom_pub_hash1, + in_h_age_commitment1, + in_denom_sig1), + (in_coin_pub2, + in_denom_pub_hash2, + in_h_age_commitment2, + in_denom_sig2) + ),--ir + ins AS ( + INSERT INTO known_coins ( + coin_pub, + denominations_serial, + age_commitment_hash, + denom_sig, + remaining_val, + remaining_frac + ) + SELECT + ir.coin_pub, + dd.denominations_serial, + ir.age_commitment_hash, + ir.denom_sig, + dd.coin_val, + dd.coin_frac + FROM input_rows ir + JOIN dd + ON dd.denom_pub_hash = ir.denom_pub_hash + ON CONFLICT DO NOTHING + RETURNING known_coin_id + ),--kc + exists AS ( + SELECT + CASE + WHEN ins.known_coin_id IS NOT NULL + THEN + FALSE + ELSE + TRUE + END AS existed, + ins.known_coin_id, + dd.denom_pub_hash, + kc.age_commitment_hash + FROM input_rows ir + LEFT JOIN ins + ON ins.coin_pub = ir.coin_pub + LEFT JOIN known_coins kc + ON kc.coin_pub = ir.coin_pub + LEFT JOIN dd + ON dd.denom_pub_hash = ir.denom_pub_hash + )--exists +SELECT + exists.existed AS existed1, + exists.known_coin_id AS known_coin_id1, + exists.denom_pub_hash AS denom_pub_hash1, + exists.age_commitment_hash AS age_commitment_hash1, + ( + SELECT exists.existed + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash2 + ) AS existed2, + ( + SELECT exists.known_coin_id + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash2 + ) AS known_coin_id2, + ( + SELECT exists.denom_pub_hash + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash2 + ) AS denom_pub_hash2, + ( + SELECT exists.age_commitment_hash + FROM exists + WHERE exists.denom_pub_hash = in_denom_pub_hash2 + )AS age_commitment_hash2 +FROM exists; + +RETURN; +END $$; + + +CREATE OR REPLACE FUNCTION exchange_do_batch1_known_coin( + IN in_coin_pub1 BYTEA, + IN in_denom_pub_hash1 BYTEA, + IN in_h_age_commitment1 BYTEA, + IN in_denom_sig1 BYTEA, + OUT existed1 BOOLEAN, + OUT known_coin_id1 INT8, + OUT denom_pub_hash1 BYTEA, + OUT age_commitment_hash1 BYTEA) +LANGUAGE plpgsql +AS $$ +BEGIN +WITH dd AS ( +SELECT + denominations_serial, + coin_val, coin_frac + FROM denominations + WHERE denom_pub_hash + IN + (in_denom_pub_hash1, + in_denom_pub_hash2) + ),--dd + input_rows AS ( + VALUES + (in_coin_pub1, + in_denom_pub_hash1, + in_h_age_commitment1, + in_denom_sig1) + ),--ir + ins AS ( + INSERT INTO known_coins ( + coin_pub, + denominations_serial, + age_commitment_hash, + denom_sig, + remaining_val, + remaining_frac + ) + SELECT + ir.coin_pub, + dd.denominations_serial, + ir.age_commitment_hash, + ir.denom_sig, + dd.coin_val, + dd.coin_frac + FROM input_rows ir + JOIN dd + ON dd.denom_pub_hash = ir.denom_pub_hash + ON CONFLICT DO NOTHING + RETURNING known_coin_id + ),--kc + exists AS ( + SELECT + CASE + WHEN ins.known_coin_id IS NOT NULL + THEN + FALSE + ELSE + TRUE + END AS existed, + ins.known_coin_id, + dd.denom_pub_hash, + kc.age_commitment_hash + FROM input_rows ir + LEFT JOIN ins + ON ins.coin_pub = ir.coin_pub + LEFT JOIN known_coins kc + ON kc.coin_pub = ir.coin_pub + LEFT JOIN dd + ON dd.denom_pub_hash = ir.denom_pub_hash + )--exists +SELECT + exists.existed AS existed1, + exists.known_coin_id AS known_coin_id1, + exists.denom_pub_hash AS denom_pub_hash1, + exists.age_commitment_hash AS age_commitment_hash1 +FROM exists; + +RETURN; +END $$; + +/*** THIS SQL CODE WORKS ***/ +/* +CREATE OR REPLACE FUNCTION exchange_do_batch2_known_coin( + IN in_coin_pub1 BYTEA, + IN in_denom_pub_hash1 TEXT, + IN in_h_age_commitment1 TEXT, + IN in_denom_sig1 TEXT, + IN in_coin_pub2 BYTEA, + IN in_denom_pub_hash2 TEXT, + IN in_h_age_commitment2 TEXT, + IN in_denom_sig2 TEXT, + OUT existed1 BOOLEAN, + OUT existed2 BOOLEAN, + OUT known_coin_id1 INT8, + OUT known_coin_id2 INT8, + OUT denom_pub_hash1 TEXT, + OUT denom_pub_hash2 TEXT, + OUT age_commitment_hash1 TEXT, + OUT age_commitment_hash2 TEXT) +LANGUAGE plpgsql +AS $$ +DECLARE + ins_values RECORD; +BEGIN + FOR i IN 1..2 LOOP + ins_values := ( + SELECT + in_coin_pub1 AS coin_pub, + in_denom_pub_hash1 AS denom_pub_hash, + in_h_age_commitment1 AS age_commitment_hash, + in_denom_sig1 AS denom_sig + WHERE i = 1 + UNION + SELECT + in_coin_pub2 AS coin_pub, + in_denom_pub_hash2 AS denom_pub_hash, + in_h_age_commitment2 AS age_commitment_hash, + in_denom_sig2 AS denom_sig + WHERE i = 2 + ); + WITH dd (denominations_serial, coin_val, coin_frac) AS ( + SELECT denominations_serial, coin_val, coin_frac + FROM denominations + WHERE denom_pub_hash = ins_values.denom_pub_hash + ), + input_rows(coin_pub) AS ( + VALUES (ins_values.coin_pub) + ), + ins AS ( + INSERT INTO known_coins ( + coin_pub, + denominations_serial, + age_commitment_hash, + denom_sig, + remaining_val, + remaining_frac + ) SELECT + input_rows.coin_pub, + dd.denominations_serial, + ins_values.age_commitment_hash, + ins_values.denom_sig, + coin_val, + coin_frac + FROM dd + CROSS JOIN input_rows + ON CONFLICT DO NOTHING + RETURNING known_coin_id, denom_pub_hash + ) + SELECT + CASE i + WHEN 1 THEN + COALESCE(ins.known_coin_id, 0) <> 0 AS existed1, + ins.known_coin_id AS known_coin_id1, + ins.denom_pub_hash AS denom_pub_hash1, + ins.age_commitment_hash AS age_commitment_hash1 + WHEN 2 THEN + COALESCE(ins.known_coin_id, 0) <> 0 AS existed2, + ins.known_coin_id AS known_coin_id2, + ins.denom_pub_hash AS denom_pub_hash2, + ins.age_commitment_hash AS age_commitment_hash2 + END + FROM ins; + END LOOP; +END; +$$;*/ diff --git a/src/exchangedb/exchange_get_ready_deposit.sql b/src/exchangedb/exchange_get_ready_deposit.sql new file mode 100644 index 000000000..4f76463fc --- /dev/null +++ b/src/exchangedb/exchange_get_ready_deposit.sql @@ -0,0 +1,60 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2014--2022 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see +-- +CREATE OR REPLACE FUNCTION exchange_do_get_ready_deposit( + IN in_now INT8, + IN in_start_shard_now INT8, + IN in_end_shard_now INT8, + OUT out_payto_uri VARCHAR, + OUT out_merchant_pub BYTEA +) +LANGUAGE plpgsql +AS $$ +DECLARE + curs CURSOR + FOR + SELECT + coin_pub + ,deposit_serial_id + ,wire_deadline + ,shard + FROM deposits_by_ready dbr + WHERE wire_deadline <= in_now + AND shard >= in_start_shard_now + AND shard <=in_end_shard_now + ORDER BY + wire_deadline ASC + ,shard ASC + LIMIT 1; +DECLARE + i RECORD; +BEGIN +OPEN curs; +FETCH FROM curs INTO i; +SELECT + payto_uri + ,merchant_pub + INTO + out_payto_uri + ,out_merchant_pub + FROM deposits + JOIN wire_targets wt + USING (wire_target_h_payto) + WHERE + i.coin_pub = coin_pub + AND i.deposit_serial_id=deposit_serial_id; +CLOSE curs; +RETURN; +END $$; diff --git a/src/exchangedb/spi/README.md b/src/exchangedb/spi/README.md new file mode 100644 index 000000000..ec6a9016a --- /dev/null +++ b/src/exchangedb/spi/README.md @@ -0,0 +1,41 @@ + Server Programming Interface (SPI) + + +Dependencies: +============= + +These are the direct dependencies for running SPI functions : + + + +Step 1: +"postgresql-server-dev-" +-- sudo apt-get install libpq-dev postgresql-server-dev-13 + +Step 2: +To solve gssapi/gssapi.h, use the following command: +apt-get install libkrb5-dev + +Step 3: +apt-cache search openssl | grep -- -dev +apt-get install libssl-dev + +Compile: +======== +gcc -shared -o .so .c + +CALL FUNCTIONS: +=============== + +psql -c "SELECT ();" db_name + +Structure: +========== + +usr/include/postgres/ + +usr/include/postgres/13/server/ + +make +make install +psql \ No newline at end of file diff --git a/src/exchangedb/spi/own_test.bc b/src/exchangedb/spi/own_test.bc new file mode 100644 index 000000000..240c78cf4 Binary files /dev/null and b/src/exchangedb/spi/own_test.bc differ diff --git a/src/exchangedb/spi/own_test.c b/src/exchangedb/spi/own_test.c new file mode 100644 index 000000000..04798bfa7 --- /dev/null +++ b/src/exchangedb/spi/own_test.c @@ -0,0 +1,818 @@ +#include "postgres.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "utils/array.h" +#include +#include "utils/numeric.h" +#include "utils/timestamp.h" +#include + +#ifdef PG_MODULE_MAGIC +PG_MODULE_MAGIC; +#endif + +typedef struct { + Datum col1; + Datum col2; +} valuest; + +void _PG_init(void); +void _PG_fini(void); + +void _PG_init(void) +{ +} + +PG_FUNCTION_INFO_V1(pg_spi_insert_int); +PG_FUNCTION_INFO_V1(pg_spi_select_from_x); +PG_FUNCTION_INFO_V1(pg_spi_select_pair_from_y); +//PG_FUNCTION_INFO_V1(pg_spi_select_with_cond); +PG_FUNCTION_INFO_V1(pg_spi_update_y); +PG_FUNCTION_INFO_V1(pg_spi_prepare_example); +PG_FUNCTION_INFO_V1(pg_spi_prepare_example_without_saveplan); +PG_FUNCTION_INFO_V1(pg_spi_prepare_insert); +PG_FUNCTION_INFO_V1(pg_spi_prepare_insert_without_saveplan); +//PG_FUNCTION_INFO_V1(pg_spi_prepare_select_with_cond); +PG_FUNCTION_INFO_V1(pg_spi_prepare_select_with_cond_without_saveplan); +PG_FUNCTION_INFO_V1(pg_spi_prepare_update); +PG_FUNCTION_INFO_V1(pg_spi_get_dep_ref_fees); +// SIMPLE SELECT +Datum +pg_spi_prepare_example(PG_FUNCTION_ARGS) +{ + static SPIPlanPtr prepared_plan; + int ret; + int64 result; + char * value; + SPIPlanPtr new_plan; + + ret=SPI_connect(); + if (ret != SPI_OK_CONNECT) { + elog(ERROR, "DB connexion failed ! \n"); + } + { + if (prepared_plan == NULL) + { + new_plan = SPI_prepare("SELECT 1 FROM joseph_test.X", 0, NULL); + prepared_plan = SPI_saveplan(new_plan); + + if (prepared_plan == NULL) + { + elog(ERROR, "FAIL TO SAVE !\n"); + } + } + + ret = SPI_execute_plan(prepared_plan, NULL, 0,false, 0); + if (ret != SPI_OK_SELECT) { + elog(ERROR, "SELECT FAILED %d !\n", ret); + } + + if (SPI_tuptable != NULL && SPI_tuptable->vals != NULL && SPI_tuptable->tupdesc != NULL) + { + value = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1); + result = atoi(value); + } + else + { + elog(ERROR, "EMPTY TABLE !\n"); + } + } + SPI_finish(); + PG_RETURN_INT64(result); +} + + + +Datum +pg_spi_prepare_example_without_saveplan(PG_FUNCTION_ARGS) +{ + int ret; + int64 result; + char * value; + SPIPlanPtr new_plan; + + ret=SPI_connect(); + if (ret != SPI_OK_CONNECT) { + elog(ERROR, "DB connexion failed ! \n"); + } + + { + new_plan = SPI_prepare("SELECT 1 FROM joseph_test.X", 0, NULL); + ret = SPI_execute_plan(new_plan, NULL, 0,false, 0); + if (ret != SPI_OK_SELECT) { + elog(ERROR, "SELECT FAILED %d !\n", ret); + } + + if (SPI_tuptable != NULL + && SPI_tuptable->vals != NULL + && SPI_tuptable->tupdesc != NULL) + { + value = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1); + result = atoi(value); + } + else + { + elog(ERROR, "EMPTY TABLE !\n"); + } + } + SPI_finish(); + PG_RETURN_INT64(result);// PG_RETURN_INT64(result); +} + + +//SELECT 1 FROM X +//V1 +Datum +pg_spi_select_from_x(PG_FUNCTION_ARGS) +{ + int ret; + char *query = "SELECT 1 FROM joseph_test.X"; + uint64 proc; + ret = SPI_connect(); + + if (ret != SPI_OK_CONNECT) + { + elog(ERROR, "SPI_connect failed"); + } + + ret = SPI_exec(query, 10); + proc = SPI_processed; + if (ret != SPI_OK_SELECT) + { + elog(ERROR, "SPI_exec failed"); + } + + SPI_finish(); + + PG_RETURN_INT64(proc); +} + +//INSERT INTO X VALUES (1) +Datum +pg_spi_insert_int(PG_FUNCTION_ARGS) +{ + int ret; + int nargs; + Oid argtypes[1]; + Datum values[1]; + char *query = "INSERT INTO joseph_test.X (a) VALUES ($1)"; + + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) + { + elog(ERROR, "SPI_connect failed"); + } + + nargs = 1; + argtypes[0] = INT4OID; + values[0] = Int32GetDatum(3); + + ret = SPI_execute_with_args(query, nargs, argtypes, values, NULL, false, 0); + if (ret != SPI_OK_INSERT) + { + elog(ERROR, "SPI_execute_with_args failed"); + } + + SPI_finish(); + + PG_RETURN_VOID(); +} + + +Datum +pg_spi_prepare_insert(PG_FUNCTION_ARGS) +{ + static SPIPlanPtr prepared_plan = NULL; + int ret; + int nargs; + Oid argtypes[1]; + Datum values[1]; + char *query = "INSERT INTO joseph_test.X (a) VALUES ($1)"; + SPIPlanPtr new_plan; + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) + { + elog(ERROR, "SPI_connect failed ! \n"); + } + if (prepared_plan == NULL) { + + argtypes[0] = INT4OID; + nargs = 1; + values[0] = Int32GetDatum(3); + new_plan = SPI_prepare(query, nargs, argtypes); + if (new_plan== NULL) + { + elog(ERROR, "SPI_prepare failed ! \n"); + } + prepared_plan = SPI_saveplan(new_plan); + if (prepared_plan == NULL) + { + elog(ERROR, "SPI_saveplan failed ! \n"); + } + } + + ret = SPI_execute_plan(prepared_plan, values, NULL, false, 0); + if (ret != SPI_OK_INSERT) + { + elog(ERROR, "SPI_execute_plan failed ! \n"); + } + + SPI_finish(); + + PG_RETURN_VOID(); +} +/* +Datum +pg_spi_prepare_insert_bytea(PG_FUNCTION_ARGS) +{ + static SPIPlanPtr prepared_plan = NULL; + int ret; + int nargs; + Oid argtypes[1]; + Datum values[1]; + Oid argtypes2[1]; + Datum val[1]; + char *query = "INSERT INTO joseph_test.X (a) VALUES ($1)"; + SPIPlanPtr new_plan; + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) + { + elog(ERROR, "SPI_connect failed ! \n"); + } + if (prepared_plan == NULL) { + argtypes2[0] = BOOLOID; + val[0] = BoolGetDatum(); + argtypes[0] = BYTEAOID; + nargs = 1; + values[0] = Int32GetDatum(3); + new_plan = SPI_prepare(query, nargs, argtypes); + if (new_plan== NULL) + { + elog(ERROR, "SPI_prepare failed ! \n"); + } + prepared_plan = SPI_saveplan(new_plan); + if (prepared_plan == NULL) + { + elog(ERROR, "SPI_saveplan failed ! \n"); + } + } + + ret = SPI_execute_plan(prepared_plan, values, NULL, false, 0); + if (ret != SPI_OK_INSERT) + { + elog(ERROR, "SPI_execute_plan failed ! \n"); + } + + SPI_finish(); + + PG_RETURN_VOID(); +} +*/ + +Datum +pg_spi_prepare_insert_without_saveplan(PG_FUNCTION_ARGS) +{ + int ret; + int nargs; + Oid argtypes[1]; + Datum values[1]; + char *query = "INSERT INTO joseph_test.X (a) VALUES ($1)"; + SPIPlanPtr new_plan; + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) + { + elog(ERROR, "SPI_connect failed"); + } + { + argtypes[0] = INT4OID; + nargs = 1; + values[0] = Int32GetDatum(3); + new_plan = SPI_prepare(query, nargs, argtypes); + if (new_plan== NULL) + { + elog(ERROR, "SPI_prepare failed"); + } + } + + ret = SPI_execute_plan(new_plan, values, NULL, false, 0); + if (ret != SPI_OK_INSERT) + { + elog(ERROR, "SPI_execute_plan failed"); + } + + SPI_finish(); + + PG_RETURN_VOID(); +} + + + + + + +/* +Datum +pg_spi_select_pair_from_y(PG_FUNCTION_ARGS) +{ + int ret; + valuest result; + bool isnull; + char *query = "SELECT 1,1 FROM joseph_test.Y"; + result.col1 = 0; + result.col2 = 0; + + if ((ret = SPI_connect()) < 0) { + fprintf(stderr, "SPI_connect returned %d\n", ret); + exit(1); + } + ret = SPI_exec(query, 0); + if (ret == SPI_OK_SELECT && SPI_processed > 0) { + int i; + SPITupleTable *tuptable = SPI_tuptable; + TupleDesc tupdesc = tuptable->tupdesc; + for (i = 0; i < SPI_processed; i++) { + HeapTuple tuple = tuptable->vals[i]; + result.col1 = SPI_getbinval(tuple, tupdesc, 1, &isnull); + result.col2 = SPI_getbinval(tuple, tupdesc, 2, &isnull); + } + } + SPI_finish(); + PG_RETURN_TEXT_P(result); +} +*/ + +//SELECT X FROM Y WHERE Z=$1 +/* +Datum +pg_spi_select_with_cond(PG_FUNCTION_ARGS) +{ + int ret; + char *query; + int nargs; + Oid argtypes[1]; + Datum values[1]; + uint64 proc; + query = "SELECT col1 FROM joseph_test.Y WHERE col2 = $1"; + + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) { + elog(ERROR, "SPI_connect failed: %d", ret); + } + nargs = 1; + argtypes[0] = INT4OID; + values[0] = Int32GetDatum(2); + + ret = SPI_execute_with_args(query, nargs, argtypes, values, NULL, false, 0); + proc = SPI_processed; + if (ret != SPI_OK_SELECT) + { + elog(ERROR, "SPI_execute_with_args failed"); + } + + SPI_finish(); + + + PG_RETURN_INT64(proc); + }*/ + +////////SELECT WITH COND +/* +Datum pg_spi_prepare_select_with_cond(PG_FUNCTION_ARGS) { + static SPIPlanPtr prepared_plan = NULL; + SPIPlanPtr new_plan; + int ret; + Datum values[1]; + uint64 proc; + int nargs; + Oid argtypes[1]; + char *query = "SELECT col1 FROM joseph_test.Y WHERE col1 = $1"; + int result = 0; + + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed ! \n"); + + if (prepared_plan == NULL) { + + argtypes[0] = INT4OID; + nargs = 1; + values[0] = DatumGetByteaP(SPI_getbinval(tuptable->vals[0], tupdesc, 1, &isnull)); //Value col2 + + new_plan = SPI_prepare(query, nargs, argtypes); + if (new_plan == NULL) + elog(ERROR, "SPI_prepare failed ! \n"); + + prepared_plan = SPI_saveplan(new_plan); + if (prepared_plan == NULL) + elog(ERROR, "SPI_saveplan failed ! \n"); + } + + + ret = SPI_execute_plan(prepared_plan, values, NULL, false, 0); + + if (ret != SPI_OK_SELECT) { + elog(ERROR, "SPI_execute_plan failed: %d \n", ret); + } + + proc = SPI_processed; + + if (proc > 0) { + SPITupleTable *tuptable = SPI_tuptable; + TupleDesc tupdesc = tuptable->tupdesc; + HeapTuple tuple; + int i; + + for (i = 0; i < proc; i++) { + tuple = tuptable->vals[i]; + for (int j = 1; j <= tupdesc->natts; j++) { + char * value = SPI_getvalue(tuple, tupdesc, j); + result += atoi(value); + } + } + } + SPI_finish(); + PG_RETURN_INT64(result); +} +*/ + +Datum pg_spi_prepare_select_with_cond_without_saveplan(PG_FUNCTION_ARGS) { + + SPIPlanPtr new_plan; + int ret; + Datum values[1]; + uint64 proc; + int nargs; + Oid argtypes[1]; + char *query = "SELECT col1 FROM joseph_test.Y WHERE col2 = $1"; + int result = 0; + + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed ! \n"); + + { + + argtypes[0] = INT4OID; + nargs = 1; + values[0] = Int32GetDatum(2); //Value col2 + + new_plan = SPI_prepare(query, nargs, argtypes); + if (new_plan == NULL) + elog(ERROR, "SPI_prepare failed ! \n"); + + } + + + ret = SPI_execute_plan(new_plan, values, NULL, false, 0); + + if (ret != SPI_OK_SELECT) { + elog(ERROR, "SPI_execute_plan failed: %d \n", ret); + } + + proc = SPI_processed; + + if (proc > 0) { + SPITupleTable *tuptable = SPI_tuptable; + TupleDesc tupdesc = tuptable->tupdesc; + HeapTuple tuple; + int i; + + for (i = 0; i < proc; i++) { + tuple = tuptable->vals[i]; + for (int j = 1; j <= tupdesc->natts; j++) { + char * value = SPI_getvalue(tuple, tupdesc, j); + result += atoi(value); + } + } + } + SPI_finish(); + PG_RETURN_INT64(result); +} + + + + +Datum +pg_spi_update_y(PG_FUNCTION_ARGS) +{ + int ret; + int nargs; + Oid argtypes[1]; + Datum values[1]; + char *query = "UPDATE joseph_test.Y SET col1 = 4 WHERE (col2 = $1)"; + + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) + { + elog(ERROR, "SPI_connect failed ! \n"); + } + + nargs = 1; + argtypes[0] = INT4OID; + values[0] = Int32GetDatum(0); + + ret = SPI_execute_with_args(query, nargs, argtypes, values, NULL, false, 0); + if (ret != SPI_OK_UPDATE) + { + elog(ERROR, "SPI_execute_with_args failed ! \n"); + } + + SPI_finish(); + + PG_RETURN_VOID(); +} + +Datum +pg_spi_prepare_update(PG_FUNCTION_ARGS) +{ + static SPIPlanPtr prepared_plan = NULL; + SPIPlanPtr new_plan; + int ret; + int nargs; + Oid argtypes[1]; + Datum values[1]; + char *query = "UPDATE joseph_test.Y SET col1 = 4 WHERE (col2 = $1)"; + + ret = SPI_connect(); + if (ret != SPI_OK_CONNECT) + { + elog(ERROR, "SPI_connect failed ! \n"); + } + + if ( prepared_plan == NULL) + { + argtypes[0] = INT4OID; + nargs = 1; + values[0] = Int32GetDatum(3); + //PREPARE + new_plan = SPI_prepare(query, nargs, argtypes); + if (new_plan == NULL) + elog(ERROR, "SPI_prepare failed ! \n"); + //SAVEPLAN + prepared_plan = SPI_saveplan(new_plan); + if(prepared_plan == NULL) + elog(ERROR, "SPI_saveplan failed ! \n"); + } + ret = SPI_execute_plan(prepared_plan, values, NULL, false, 0); + if (ret != SPI_OK_UPDATE) + elog(ERROR, "SPI_execute_plan failed ! \n"); + + SPI_finish(); + PG_RETURN_VOID(); +} +/* +Datum +pg_spi_prepare_update_without_saveplan(PG_FUNCTION_ARGS) +{}*/ +void _PG_fini(void) +{ +} + +/* + +*/ + +Datum +pg_spi_get_dep_ref_fees (PG_FUNCTION_ARGS) { + /* Define plan to save */ + static SPIPlanPtr deposit_plan; + static SPIPlanPtr ref_plan; + static SPIPlanPtr fees_plan; + static SPIPlanPtr dummy_plan; + /* Define variables to update */ + Timestamp refund_deadline = PG_GETARG_TIMESTAMP(0); + bytea *merchant_pub = PG_GETARG_BYTEA_P(1); + bytea *wire_target_h_payto = PG_GETARG_BYTEA_P(2); + bytea *wtid_raw = PG_GETARG_BYTEA_P(3); + bool is_null; + /* Define variables to store the results of each SPI query */ + uint64_t sum_deposit_val = 0; + uint32_t sum_deposit_frac = 0; + uint64_t s_refund_val = 0; + uint32_t s_refund_frac = 0; + uint64_t sum_dep_fee_val = 0; + uint32_t sum_dep_fee_frac = 0; + uint64_t norm_refund_val = 0; + uint32_t norm_refund_frac = 0; + uint64_t sum_refund_val = 0; + uint32_t sum_refund_frac = 0; + /* Define variables to store the Tuptable */ + SPITupleTable *dep_res; + SPITupleTable *ref_res; + SPITupleTable *ref_by_coin_res; + SPITupleTable *norm_ref_by_coin_res; + SPITupleTable *fully_refunded_coins_res; + SPITupleTable *fees_res; + SPITupleTable *dummys_res; + /* Define variable to update */ + Datum values_refund[2]; + Datum values_deposit[3]; + Datum values_fees[2]; + Datum values_dummys[2]; + TupleDesc tupdesc; + /* Define variables to replace some tables */ + bytea *ref_by_coin_coin_pub; + int64 ref_by_coin_deposit_serial_id = 0; + bytea *norm_ref_by_coin_coin_pub; + int64_t norm_ref_by_coin_deposit_serial_id = 0; + bytea *new_dep_coin_pub = NULL; + int res = SPI_connect(); + + /* Connect to SPI */ + if (res < 0) { + elog(ERROR, "Could not connect to SPI manager"); + } + if (deposit_plan == NULL) + { + const char *dep_sql; + SPIPlanPtr new_plan; + + // Execute first query and store results in variables + dep_sql = + "UPDATE deposits SET done=TRUE " + "WHERE NOT (done OR policy_blocked) " + "AND refund_deadline=$1 " + "AND merchant_pub=$2 " + "AND wire_target_h_payto=$3 " + "RETURNING " + "deposit_serial_id," + "coin_pub," + "amount_with_fee_val," + "amount_with_fee_frac;"; + fprintf(stderr, "dep sql %d\n", 1); + new_plan = + SPI_prepare(dep_sql, 4,(Oid[]){INT8OID, BYTEAOID, BYTEAOID}); + fprintf(stderr, "dep sql %d\n", 2); + if (new_plan == NULL) + elog(ERROR, "SPI_prepare failed for dep \n"); + deposit_plan = SPI_saveplan(new_plan); + if (deposit_plan == NULL) + elog(ERROR, "SPI_saveplan failed for dep \n"); + } + fprintf(stdout, "dep sql %d\n", 3); + + values_deposit[0] = Int64GetDatum(refund_deadline); + values_deposit[1] = PointerGetDatum(merchant_pub); + values_deposit[2] = PointerGetDatum(wire_target_h_payto); + + res = SPI_execute_plan (deposit_plan, + values_deposit, + NULL, + true, + 0); + fprintf(stdout, "dep sql %d\n", 4); + if (res != SPI_OK_UPDATE) + { + elog(ERROR, "Failed to execute subquery 1 \n"); + } + // STORE TUPTABLE deposit + dep_res = SPI_tuptable; + + for (unsigned int i = 0; i < SPI_processed; i++) { + int64 dep_deposit_serial_ids = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &is_null)); + bytea *dep_coin_pub = DatumGetByteaP(SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &is_null)); + int64 dep_amount_val = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 3, &is_null)); + int32 dep_amount_frac = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 4, &is_null)); + + if (is_null) + elog(ERROR, "Failed to retrive data from deposit \n"); + if (ref_plan == NULL) + { + // Execute second query with parameters from first query and store results in variables + const char * ref_sql = + "SELECT amount_with_fee_val, amount_with_fee_frac, coin_pub, deposit_serial_id " + "FROM refunds " + "WHERE coin_pub=$1 " + "AND deposit_serial_id=$2;"; + SPIPlanPtr new_plan = SPI_prepare(ref_sql, 3, (Oid[]){BYTEAOID, INT8OID}); + if (new_plan == NULL) + elog(ERROR, "SPI_prepare failed for refund\n"); + ref_plan = SPI_saveplan(new_plan); + if (ref_plan == NULL) + elog(ERROR, "SPI_saveplan failed for refund\n"); + } + values_refund[0] = PointerGetDatum(dep_coin_pub); + values_refund[1] = Int64GetDatum(dep_deposit_serial_ids); + res = SPI_execute_plan(ref_plan, + values_refund, + NULL, + false, + 0); + if (res != SPI_OK_SELECT) + elog(ERROR, "Failed to execute subquery 2\n"); + // STORE TUPTABLE refund + ref_res = SPI_tuptable; + for (unsigned int j = 0; j < SPI_processed; j++) { + int64 ref_refund_val = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 1, &is_null)); + int32 ref_refund_frac = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 2, &is_null)); + bytea *ref_coin_pub = DatumGetByteaP(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 3, &is_null)); + int64 ref_deposit_serial_id = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 4, &is_null)); + // Execute third query with parameters from second query and store results in variables + ref_by_coin_coin_pub = ref_coin_pub; + ref_by_coin_deposit_serial_id = ref_deposit_serial_id; + // LOOP TO GET THE SUM FROM REFUND BY COIN + for (unsigned int i = 0; ivals[i], SPI_tuptable->tupdesc, 1, &is_null))) + && + (ref_by_coin_deposit_serial_id == + DatumGetUInt64(SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &is_null))) + ) + { + sum_refund_val += ref_refund_val; + sum_refund_frac += ref_refund_frac; + norm_ref_by_coin_coin_pub = ref_by_coin_coin_pub; + norm_ref_by_coin_deposit_serial_id = ref_by_coin_deposit_serial_id; + } + }// END SUM CALCULATION + //NORMALIZE REFUND VAL FRAC + norm_refund_val = + (sum_refund_val + sum_refund_frac ) / 100000000; + norm_refund_frac = + sum_refund_frac % 100000000; + // Get refund values + s_refund_val += sum_refund_val; + s_refund_frac = sum_refund_frac; + }//END REFUND + if (norm_ref_by_coin_coin_pub == dep_coin_pub + && ref_by_coin_deposit_serial_id == dep_deposit_serial_ids + && norm_refund_val == dep_amount_val + && norm_refund_frac == dep_amount_frac) + { + new_dep_coin_pub = dep_coin_pub; + } + // Ensure we get the fee for each coin and not only once per denomination + if (fees_plan == NULL ) + { + const char * fees_sql = + "SELECT " + " denom.fee_deposit_val AS fee_val, " + " denom.fee_deposit_frac AS fee_frac, " + "FROM known_coins kc" + "JOIN denominations denom USING (denominations_serial) " + "WHERE kc.coin_pub = $1 AND kc.coin_pub != $2;"; + SPIPlanPtr new_plan = SPI_prepare(fees_sql, 3, (Oid[]){BYTEAOID, BYTEAOID}); + if (new_plan == NULL) + { + elog(ERROR, "SPI_prepare for fees failed ! \n"); + } + fees_plan = SPI_saveplan(new_plan); + if (fees_plan == NULL) + { + elog(ERROR, "SPI_saveplan for fees failed ! \n"); + } + } + values_fees[0] = PointerGetDatum(dep_coin_pub); + values_fees[1] = PointerGetDatum(new_dep_coin_pub); + res = SPI_execute_plan(fees_plan, values_fees, NULL, false, 0); + if (res != SPI_OK_SELECT) + elog(ERROR, "SPI_execute_plan failed for fees \n"); + fees_res = SPI_tuptable; + tupdesc = fees_res->tupdesc; + for (unsigned int i = 0; ivals[i]; + bool is_null; + uint64_t fee_val = DatumGetUInt64(SPI_getbinval(tuple, tupdesc, 1, &is_null)); + uint32_t fee_frac = DatumGetUInt32(SPI_getbinval(tuple, tupdesc, 2, &is_null)); + uint64_t fees_deposit_serial_id = DatumGetUInt64(SPI_getbinval(tuple, tupdesc, 3, &is_null)); + if (dummy_plan == NULL) + { + const char *insert_dummy_sql = + "INSERT INTO " + "aggregation_tracking(deposit_serial_id, wtid_raw)" + " VALUES ($1, $2)"; + + SPIPlanPtr new_plan = SPI_prepare(insert_dummy_sql, 2, (Oid[]){INT8OID, BYTEAOID}); + if (new_plan == NULL) + elog(ERROR, "FAILED to prepare aggregation tracking \n"); + dummy_plan = SPI_saveplan(new_plan); + if ( dummy_plan == NULL ) + elog(ERROR, "FAILED to saveplan aggregation tracking\n"); + } + values_dummys[0] = Int64GetDatum(dep_deposit_serial_ids); + values_dummys[1] = PointerGetDatum(wtid_raw); + res = SPI_execute_plan(dummy_plan, values_dummys, NULL, false, 0); + if (res != SPI_OK_INSERT) + elog(ERROR, "Failed to insert dummy\n"); + dummys_res = SPI_tuptable; + // Calculation of deposit fees for not fully refunded deposits + sum_dep_fee_val += fee_val; + sum_dep_fee_frac += fee_frac; + } + // Get deposit values + sum_deposit_val += dep_amount_val; + sum_deposit_frac += dep_amount_frac; + }//END DEPOSIT + SPI_finish(); + PG_RETURN_VOID(); +} diff --git a/src/exchangedb/spi/own_test.control b/src/exchangedb/spi/own_test.control new file mode 100644 index 000000000..4e73e207f --- /dev/null +++ b/src/exchangedb/spi/own_test.control @@ -0,0 +1,4 @@ +comment = 'Example extension for testing purposes' +default_version = '1.0' +module_pathname = '$libdir/own_test' +relocatable = true diff --git a/src/exchangedb/spi/own_test.so b/src/exchangedb/spi/own_test.so new file mode 100755 index 000000000..fda70c9d0 Binary files /dev/null and b/src/exchangedb/spi/own_test.so differ diff --git a/src/exchangedb/spi/own_test.sql b/src/exchangedb/spi/own_test.sql new file mode 100644 index 000000000..369c56a60 --- /dev/null +++ b/src/exchangedb/spi/own_test.sql @@ -0,0 +1,216 @@ + +DROP TABLE joseph_test.X; +CREATE TABLE joseph_test.X ( + a integer +); + +INSERT INTO joseph_test.X (a) VALUES (1), (2), (3), (4), (5), (6), (7); + +DROP TABLE joseph_test.Y; +CREATE TABLE joseph_test.Y (col1 INT, col2 INT); +INSERT INTO joseph_test.Y (col1,col2) VALUES (1,2), (2,0), (0,4), (4,0), (0,6), (6,7), (7,8); + +DROP TABLE joseph_test.Z; +CREATE TABLE joseph_test.Z(col1 BYTEA); +DROP TABLE deposits; +/*CREATE TABLE deposits( + deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY + ,shard INT8 NOT NULL + ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) + ,known_coin_id INT8 NOT NULL + ,amount_with_fee_val INT8 NOT NULL + ,amount_with_fee_frac INT4 NOT NULL + ,wallet_timestamp INT8 NOT NULL + ,exchange_timestamp INT8 NOT NULL + ,refund_deadline INT8 NOT NULL + ,wire_deadline INT8 NOT NULL + ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32) + ,h_contract_terms BYTEA NOT NULL CHECK (LENGTH(h_contract_terms)=64) + ,coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64) + ,wire_salt BYTEA NOT NULL CHECK (LENGTH(wire_salt)=16) + ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32) + ,done BOOLEAN NOT NULL DEFAULT FALSE + ,policy_blocked BOOLEAN NOT NULL DEFAULT FALSE + ,policy_details_serial_id INT8); +*/ +--INSERT INTO deposits VALUES (); + + + +CREATE TABLE deposits( + deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY + ,shard INT8 NOT NULL + ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) + ,known_coin_id INT8 NOT NULL + ,amount_with_fee_val INT8 NOT NULL + ,amount_with_fee_frac INT4 NOT NULL + ,wallet_timestamp INT8 NOT NULL + ,exchange_timestamp INT8 NOT NULL + ,refund_deadline INT8 NOT NULL + ,wire_deadline INT8 NOT NULL + ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32) + ,h_contract_terms BYTEA NOT NULL CHECK (LENGTH(h_contract_terms)=64) + ,coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64) + ,wire_salt BYTEA NOT NULL CHECK (LENGTH(wire_salt)=16) + ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32) + ,done BOOLEAN NOT NULL DEFAULT FALSE + ,policy_blocked BOOLEAN NOT NULL DEFAULT FALSE + ,policy_details_serial_id INT8); + + +CREATE OR REPLACE FUNCTION pg_spi_insert_int() + RETURNS VOID + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_insert_int'; +DROP FUNCTION pg_spi_select_from_x(); +CREATE OR REPLACE FUNCTION pg_spi_select_from_x() + RETURNS INT8 + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_select_from_x'; + +/*DROP FUNCTION pg_spi_select_pair_from_y(); +CREATE OR REPLACE FUNCTION pg_spi_select_pair_from_y() + RETURNS valuest + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_select_pair_from_y'; +*/ +/*CREATE OR REPLACE FUNCTION pg_spi_select_with_cond() + RETURNS INT8 + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_select_with_cond'; +*/ +DROP FUNCTION pg_spi_update_y(); +CREATE OR REPLACE FUNCTION pg_spi_update_y() + RETURNS VOID + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_update_y'; +DROP FUNCTION pg_spi_prepare_example(); + +CREATE OR REPLACE FUNCTION pg_spi_prepare_example() + RETURNS INT8 + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_prepare_example'; + +DROP FUNCTION pg_spi_prepare_example_without_saveplan(); +CREATE OR REPLACE FUNCTION pg_spi_prepare_example_without_saveplan() + RETURNS INT8 + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_prepare_example_without_saveplan'; + +CREATE OR REPLACE FUNCTION pg_spi_prepare_insert() + RETURNS VOID + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_prepare_insert'; + +CREATE OR REPLACE FUNCTION pg_spi_prepare_insert_without_saveplan() + RETURNS VOID + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_prepare_insert_without_saveplan'; + +/*DROP FUNCTION pg_spi_prepare_select_with_cond(); +CREATE OR REPLACE FUNCTION pg_spi_prepare_select_with_cond() + RETURNS INT8 + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_prepare_select_with_cond'; +*/ +DROP FUNCTION pg_spi_prepare_select_with_cond_without_saveplan(); +CREATE OR REPLACE FUNCTION pg_spi_prepare_select_with_cond_without_saveplan() + RETURNS INT8 + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_prepare_select_with_cond_without_saveplan'; + +DROP FUNCTION pg_spi_prepare_update(); +CREATE OR REPLACE FUNCTION pg_spi_prepare_update() + RETURNS VOID + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_prepare_update'; + +DROP FUNCTION pg_spi_get_dep_ref_fees( + IN in_timestamp INT8 + ,IN merchant_pub BYTEA + ,IN wire_target_h_payto BYTEA + ,IN wtid BYTEA); +CREATE OR REPLACE FUNCTION pg_spi_get_dep_ref_fees( + IN in_timestamp INT8 + ,IN merchant_pub BYTEA + ,IN wire_target_h_payto BYTEA + ,IN wtid BYTEA +) + RETURNS VOID + LANGUAGE c COST 100 +AS '$libdir/own_test', 'pg_spi_get_dep_ref_fees'; + +CREATE OR REPLACE FUNCTION update_pg_spi_get_dep_ref_fees( + IN in_refund_deadline INT8, + IN in_merchant_pub BYTEA, + IN in_wire_target_h_payto BYTEA +) +RETURNS SETOF record +LANGUAGE plpgsql VOLATILE +AS $$ +DECLARE + +BEGIN +RETURN QUERY + UPDATE deposits + SET done = TRUE + WHERE NOT (done OR policy_blocked) + AND refund_deadline < in_refund_deadline + AND merchant_pub = in_merchant_pub + AND wire_target_h_payto = in_wire_target_h_payto + RETURNING + deposit_serial_id, + coin_pub, + amount_with_fee_val, + amount_with_fee_frac; +END $$; + +CREATE OR REPLACE FUNCTION stored_procedure_update( +IN in_number INT8 +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + UPDATE joseph_test.Y + SET col1 = 4 + WHERE col2 = in_number; +END $$; + +CREATE OR REPLACE FUNCTION stored_procedure_select(OUT out_value INT8) +RETURNS INT8 +LANGUAGE plpgsql +AS $$ +BEGIN + SELECT 1 + INTO out_value + FROM joseph_test.X; + RETURN; +END $$; + + +CREATE OR REPLACE FUNCTION stored_procedure_insert( +IN in_number INT8, +OUT out_number INT8) +RETURNS INT8 +LANGUAGE plpgsql +AS $$ +BEGIN + INSERT INTO joseph_test.X (a) + VALUES (in_number) + RETURNING a INTO out_number; +END $$; + +CREATE OR REPLACE FUNCTION stored_procedure_select_with_cond( +IN in_number INT8, +OUT out_number INT8 +) +RETURNS INT8 +LANGUAGE plpgsql +AS $$ +BEGIN + SELECT col1 INTO out_number + FROM joseph_test.Y + WHERE col2 = in_number; + RETURN; +END $$; diff --git a/src/exchangedb/spi/perf_own_test.c b/src/exchangedb/spi/perf_own_test.c new file mode 100644 index 000000000..92be2235e --- /dev/null +++ b/src/exchangedb/spi/perf_own_test.c @@ -0,0 +1,25 @@ +/* + This file is part of TALER + Copyright (C) 2014-2023 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see +*/ +/** + * @file exchangedb/spi/perf_own_test.c + * @brief benchmark for 'own_test' + * @author Joseph Xu + */ +#include "exchangedb/platform.h" +#include "exchangedb/taler_exchangedb_lib.h" +#include "exchangedb/taler_json_lib.h" +#include "exchangedb/taler_exchangedb_plugin.h" +#include "own_test.sql" diff --git a/src/exchangedb/spi/pg_aggregate.c b/src/exchangedb/spi/pg_aggregate.c new file mode 100644 index 000000000..262100ce8 --- /dev/null +++ b/src/exchangedb/spi/pg_aggregate.c @@ -0,0 +1,389 @@ +#include "postgres.h" +#include "fmgr.h" +#include "utils/numeric.h" +#include "utils/builtins.h" +#include "executor/spi.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(get_deposit_summary); + +Datum get_deposit_summary(PG_FUNCTION_ARGS) +{ + + static SPIPlanPtr deposit_plan; + static SPIPlanPtr refund_plan; + static SPIPlanPtr refund_by_coin_plan; + static SPIPlanPtr norm_refund_by_coin_plan; + static SPIPlanPtr fully_refunded_by_coins_plan; + static SPIPlanPtr fees_plan; + + int shard = PG_GETARG_INT32(0); + char * sql; + char *merchant_pub = text_to_cstring(PG_GETARG_TEXT_P(1)); + char *wire_target_h_payto = text_to_cstring(PG_GETARG_TEXT_P(2)); + char *wtid_raw = text_to_cstring(PG_GETARG_TEXT_P(3)); + int refund_deadline = PG_GETARG_INT32(4); + int conn = SPI_connect(); + if (conn != SPI_OK_CONNECT) + { + elog(ERROR, "DB connexion failed ! \n"); + } + + if ( deposit_plan == NULL + || refund_plan == NULL + || refund_by_coin_plan == NULL + || norm_refund_by_coin_plan = NULL + || fully_refunded_coins_plan = NULL + || fees_plan == NULL ) + { + if (deposit_plan == NULL) + { + int nargs = 3; + Oid argtypes[3]; + argtypes[0] = INT8OID; + argtypes[1] = BYTEAOID; + argtypes[2] = BYTEAOID; + const char *dep_sql = + " UPDATE deposits" + " SET done=TRUE" + " WHERE NOT (done OR policy_blocked)" + " AND refund_deadline < $1" + " AND merchant_pub = $2" + " AND wire_target_h_payto = $3" + " RETURNING" + " deposit_serial_id" + " ,coin_pub" + " ,amount_with_fee_val AS amount_val" + " ,amount_with_fee_frac AS amount_frac"; + SPIPlanPtr new_plan = + SPI_prepare(dep_sql, 4, argtypes}); + if (new_plan == NULL) + { + elog(ERROR, "SPI_prepare for deposit failed ! \n"); + } + deposit_plan = SPI_saveplan(new_plan); + if (deposit_plan == NULL) + { + elog(ERROR, "SPI_saveplan for deposit failed ! \n"); + } + } + + Datum values[4]; + values[0] = Int64GetDatum(refund_deadline); + values[1] = CStringGetDatum(merchant_pub); + values[2] = CStringGetDatum(wire_target_h_payto); + int ret = SPI_execute_plan (deposit_plan, + values, + NULL, + true, + 0); + if (ret != SPI_OK_UPDATE) + { + elog(ERROR, "Failed to execute subquery 1\n"); + } + uint64_t *dep_deposit_serial_ids = palloc(sizeof(uint64_t) * SPI_processed); + BYTEA **dep_coin_pubs = palloc(sizeof(BYTEA *) * SPI_processed); + uint64_t *dep_amount_vals = palloc(sizeof(uint64_t) * SPI_processed); + uint32_t *dep_amount_fracs = palloc(sizeof(uint32_t) * SPI_processed); + for (unsigned int i = 0; i < SPI_processed; i++) { + HeapTuple tuple = SPI_tuptable->vals[i]; + dep_deposit_serial_ids[i] = + DatumGetInt64(SPI_getbinval(tuple, SPI_tuptable->tupdesc, 1, &ret)); + dep_coin_pubs[i] = + DatumGetByteaP(SPI_getbinval(tuple, SPI_tuptable->tupdesc, 2, &ret)); + dep_amount_vals[i] = + DatumGetInt64(SPI_getbinval(tuple, SPI_tuptable->tupdesc, 3, &ret)); + dep_amount_fracs[i] = + DatumGetInt32(SPI_getbinval(tuple, SPI_tuptable->tupdesc, 4, &ret)); + } + + + if (refund_plan == NULL) + { + const char *ref_sql = + "ref AS (" + " SELECT" + " amount_with_fee_val AS refund_val" + " ,amount_with_fee_frac AS refund_frac" + " ,coin_pub" + " ,deposit_serial_id" + " FROM refunds" + " WHERE coin_pub IN (SELECT coin_pub FROM dep)" + " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep)) "; + SPIPlanPtr new_plan = SPI_prepare(ref_sql, 0, NULL); + if (new_plan == NULL) + elog (ERROR, "SPI_prepare for refund failed ! \n"); + refund_plan = SPI_saveplan(new_plan); + if (refund_plan == NULL) + { + elog(ERROR, "SPI_saveplan for refund failed ! \n"); + } + } + + int64t_t *ref_deposit_serial_ids = palloc(sizeof(int64_t) * SPI_processed); + + int res = SPI_execute_plan (refund_plan, NULL, NULL, false, 0); + if (res != SPI_OK_SELECT) + { + elog(ERROR, "Failed to execute subquery 2\n"); + } + SPITupleTable *tuptable = SPI_tuptable; + TupleDesc tupdesc = tuptable->tupdesc; + for (unsigned int i = 0; i < SPI_processed; i++) + { + HeapTuple tuple = tuptable->vals[i]; + Datum refund_val = SPI_getbinval(tuple, tupdesc, 1, &refund_val_isnull); + Datum refund_frac = SPI_getbinval(tuple, tupdesc, 2, &refund_frac_isnull); + Datum coin_pub = SPI_getbinval(tuple, tupdesc, 3, &coin_pub_isnull); + Datum deposit_serial_id = SPI_getbinval(tuple, tupdesc, 4, &deposit_serial_id_isnull); + if (refund_val_isnull + || refund_frac_isnull + || coin_pub_isnull + || deposit_serial_id_isnull ) + { + elog(ERROR, "Failed to retrieve data from subquery 2"); + } + uint64_t refund_val_int = DatumGetUInt64(refund_val); + uint32_t refund_frac_int = DatumGetUInt32(refund_frac); + BYTEA coin_pub = DatumGetByteaP(coin_pub); + ref_deposit_serial_ids = DatumGetInt64(deposit_serial_id); + + refund *new_refund = (refund*) palloc(sizeof(refund)); + new_refund->coin_pub = coin_pub_str; + new_refund->deposit_serial_id = deposit_serial_id_int; + new_refund->amount_with_fee_val = refund_val_int; + new_refund->amount_with_fee_frac = refund_frac_int; + } + + + if (refund_by_coin_plan == NULL) + { + const char *ref_by_coin_sql = + "ref_by_coin AS (" + " SELECT" + " SUM(refund_val) AS sum_refund_val" + " ,SUM(refund_frac) AS sum_refund_frac" + " ,coin_pub" + " ,deposit_serial_id" + " FROM ref" + " GROUP BY coin_pub, deposit_serial_id) "; + SPIPlanPtr new_plan = SPI_prepare (ref_by_coin_sql, 0, NULL); + if (new_plan == NULL) + elog(ERROR, "SPI_prepare for refund by coin failed ! \n"); + refund_by_coin_plan = SPI_saveplan (new_plan); + if (refund_by_coin_plan == NULL) + elog(ERROR, "SPI_saveplan for refund failed"); + } + + + int res = SPI_execute_plan (refund_by_coin_plan, NULL, NULL, false, 0); + if (res != SPI_OK_SELECT) + { + elog(ERROR, "Failed to execute subquery 2\n"); + } + + SPITupleTable *tuptable = SPI_tuptable; + TupleDesc tupdesc = tuptable->tupdesc; + for (unsigned int i = 0; i < SPI_processed; i++) + { + HeapTuple tuple = tuptable->vals[i]; + Datum sum_refund_val = SPI_getbinval(tuple, tupdesc, 1, &refund_val_isnull); + Datum sum_refund_frac = SPI_getbinval(tuple, tupdesc, 2, &refund_frac_isnull); + Datum coin_pub = SPI_getbinval(tuple, tupdesc, 3, &coin_pub_isnull); + Datum deposit_serial_id_int = SPI_getbinval(tuple, tupdesc, 4, &deposit_serial_id_isnull); + if (refund_val_isnull + || refund_frac_isnull + || coin_pub_isnull + || deposit_serial_id_isnull ) + { + elog(ERROR, "Failed to retrieve data from subquery 2"); + } + uint64_t s_refund_val_int = DatumGetUInt64(sum_refund_val); + uint32_t s_refund_frac_int = DatumGetUInt32(sum_refund_frac); + BYTEA coin_pub = DatumGetByteaP(coin_pub); + uint64_t deposit_serial_id_int = DatumGetInt64(deposit_serial_id_int); + refund *new_refund_by_coin = (refund*) palloc(sizeof(refund)); + new_refund_by_coin->coin_pub = coin_pub; + new_refund_by_coin->deposit_serial_id = deposit_serial_id_int; + new_refund_by_coin->refund_amount_with_fee_val = s_refund_val_int; + new_refund_by_coin->refund_amount_with_fee_frac = s_refund_frac_int; + } + + + if (norm_refund_by_coin_plan == NULL) + { + const char *norm_ref_by_coin_sql = + "norm_ref_by_coin AS (" + " SELECT" + " coin_pub" + " ,deposit_serial_id" + " FROM ref_by_coin) "; + SPIPlanPtr new_plan = SPI_prepare (norm_ref_by_coin_sql, 0, NULL); + if (new_plan == NULL) + elog(ERROR, "SPI_prepare for norm refund by coin failed ! \n"); + norm_refund_by_coin_plan = SPI_saveplan(new_plan); + if (norm_refund_by_coin_plan == NULL) + elog(ERROR, "SPI_saveplan for norm refund by coin failed ! \n"); + } + + double norm_refund_val = + ((double)new_refund_by_coin->refund_amount_with_fee_val + + (double)new_refund_by_coin->refund_amount_with_fee_frac) / 100000000; + double norm_refund_frac = + (double)new_refund_by_coin->refund_amount_with_fee_frac % 100000000; + + if (fully_refunded_coins_plan == NULL) + { + const char *fully_refunded_coins_sql = + "fully_refunded_coins AS (" + " SELECT" + " dep.coin_pub" + " FROM norm_ref_by_coin norm" + " JOIN dep" + " ON (norm.coin_pub = dep.coin_pub" + " AND norm.deposit_serial_id = dep.deposit_serial_id" + " AND norm.norm_refund_val = dep.amount_val" + " AND norm.norm_refund_frac = dep.amount_frac)) "; + SPIPlanPtr new_plan = + SPI_prepare(fully_refunded_coins_sql, 0, NULL); + if (new_plan == NULL) + elog (ERROR, "SPI_prepare for fully refunded coins failed ! \n"); + fully_refunded_coins_plan = SPI_saveplan(new_plan); + if (fully_refunded_coins_plan == NULL) + elog (ERROR, "SPI_saveplan for fully refunded coins failed ! \n"); + } + + int res = SPI_execute_plan(fully_refunded_coins_sql); + if ( res != SPI_OK_SELECT) + elog(ERROR, "Failed to execute subquery 4\n"); + SPITupleTable * tuptable = SPI_tuptable; + TupleDesc tupdesc = tuptable->tupdesc; + + BYTEA coin_pub = SPI_getbinval(tuple, tupdesc, 1, &coin_pub_isnull); + if (fees_plan == NULL) + { + const char *fees_sql = + "SELECT " + " denom.fee_deposit_val AS fee_val, " + " denom.fee_deposit_frac AS fee_frac, " + " cs.deposit_serial_id " + "FROM dep cs " + "JOIN known_coins kc USING (coin_pub) " + "JOIN denominations denom USING (denominations_serial) " + "WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins)"; + SPIPlanPtr new_plan = + SPI_prepare(fees_sql, 0, NULL); + if (new_plan == NULL) + { + elog(ERROR, "SPI_prepare for fees failed ! \n"); + } + fees_plan = SPI_saveplan(new_plan); + if (fees_plan == NULL) + { + elog(ERROR, "SPI_saveplan for fees failed ! \n"); + } + } + } + int fees_ntuples; + SPI_execute(fees_sql, true, 0); + if (SPI_result_code() != SPI_OK_SELECT) + { + ereport( + ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("deposit fee query failed: error code %d \n", SPI_result_code()))); + } + fees_ntuples = SPI_processed; + + if (fees_ntuples > 0) + { + for (i = 0; i < fees_ntuples; i++) + { + Datum fee_val_datum = + SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1, &fee_null); + Datum fee_frac_datum = + SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2, &fee_null); + Datum deposit_id_datum = + SPI_getbinval(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 3, &deposit_null); + if (!fee_null && !deposit_null) + { + int64 fee_val = DatumGetInt64(fee_val_datum); + int32 fee_frac = DatumGetInt32(fee_frac_datum); + int64 deposit_id = DatumGetInt64(deposit_id_datum); + sum_fee_value += fee_val; + sum_fee_fraction += fee_frac; + char *insert_agg_sql = + psprintf( + "INSERT INTO " + "aggregation_tracking(deposit_serial_id, wtid_raw)" + " VALUES (%lld, '%s')", + deposit_id, wtid_raw); + SPI_execute(insert_agg_sql, false, 0); + } + } + } + + TupleDesc tupdesc; + SPITupleTable *tuptable = SPI_tuptable; + HeapTuple tuple; + Datum result; + + if (tuptable == NULL || SPI_processed != 1) + { + ereport( + ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("Unexpected result \n"))); + } + tupdesc = SPI_tuptable->tupdesc; + tuple = SPI_tuptable->vals[0]; + result = HeapTupleGetDatum(tuple); + + TupleDesc result_desc = CreateTemplateTupleDesc(6, false); + TupleDescInitEntry(result_desc, (AttrNumber) 1, "sum_deposit_value", INT8OID, -1, 0); + TupleDescInitEntry(result_desc, (AttrNumber) 2, "sum_deposit_fraction", INT4OID, -1, 0); + TupleDescInitEntry(result_desc, (AttrNumber) 3, "sum_refund_value", INT8OID, -1, 0); + TupleDescInitEntry(result_desc, (AttrNumber) 4, "sum_refund_fraction", INT4OID, -1, 0); + TupleDescInitEntry(result_desc, (AttrNumber) 5, "sum_fee_value", INT8OID, -1, 0); + TupleDescInitEntry(result_desc, (AttrNumber) 6, "sum_fee_fraction", INT4OID, -1, 0); + + int ret = SPI_prepare(sql, 4, argtypes); + if (ret != SPI_OK_PREPARE) + { + elog(ERROR, "Failed to prepare statement: %s \n", sql); + } + + ret = SPI_execute_plan(plan, args, nulls, true, 0); + if (ret != SPI_OK_SELECT) + { + elog(ERROR, "Failed to execute statement: %s \n", sql); + } + + if (SPI_processed > 0) + { + HeapTuple tuple; + Datum values[6]; + bool nulls[6] = {false}; + values[0] = + SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &nulls[0]); + values[1] = + SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2, &nulls[1]); + values[2] = + SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 3, &nulls[2]); + values[3] = + SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 4, &nulls[3]); + values[4] = + SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 5, &nulls[4]); + values[5] = + SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 6, &nulls[5]); + tuple = heap_form_tuple(result_desc, values, nulls); + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); + } + SPI_finish(); + + PG_RETURN_NULL(); +} + + + -- cgit v1.2.3