diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-04-07 09:33:04 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-04-07 09:33:04 +0200 |
commit | 148dda09d4218d40e2e0cc9d8fe9212aede969f5 (patch) | |
tree | f06e626d5171235923a205654296d459fec509f8 | |
parent | 8df75214f459fd39ce43540dc604733c1a47515e (diff) | |
parent | a96c7177aae60c37041406ff0879992e58ef2f50 (diff) |
Merge branch 'master' of git+ssh://taler.net/var/git/exchange
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 436 | ||||
-rw-r--r-- | src/exchange/test_taler_exchange_aggregator.c | 22 | ||||
-rw-r--r-- | src/exchangedb/Makefile.am | 1 | ||||
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 26 | ||||
-rw-r--r-- | src/exchangedb/test_exchangedb.c | 81 | ||||
-rw-r--r-- | src/include/taler_exchangedb_plugin.h | 15 | ||||
-rw-r--r-- | src/include/taler_json_lib.h | 2 | ||||
-rw-r--r-- | src/json/json.c | 2 | ||||
-rw-r--r-- | src/wire/plugin_wire_test.c | 2 |
10 files changed, 386 insertions, 203 deletions
diff --git a/.gitignore b/.gitignore index 81be5b2f5..ce73b4d26 100644 --- a/.gitignore +++ b/.gitignore @@ -38,7 +38,7 @@ src/exchange-tools/taler-exchange-dbinit src/exchange-tools/taler-exchange-keycheck src/exchange-tools/taler-exchange-keyup src/exchange-tools/taler-exchange-reservemod -src/exchange-tools/taler-exchange-sepa +src/exchange-tools/taler-exchange-wire src/exchangedb/perf-exchangedb src/json/test_json src/wire/test_sepa_wireformat diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index a6b7de211..def6d0be4 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -18,10 +18,6 @@ * @file taler-exchange-aggregator.c * @brief Process that aggregates outgoing transactions and executes them * @author Christian Grothoff - * - * TODO: - * - simplify global_ret: make it a global! - * - handle shutdown more nicely (call 'cancel' method on wire transfers) */ #include "platform.h" #include <gnunet/gnunet_util_lib.h> @@ -32,6 +28,102 @@ #include "taler_json_lib.h" #include "taler_wire_lib.h" + +/** + * Data we keep to #run_transfers(). There is at most + * one of these around at any given point in time. + */ +struct WirePrepareData +{ + + /** + * Database session for all of our transactions. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Wire execution handle. + */ + struct TALER_WIRE_ExecuteHandle *eh; + + /** + * Row ID of the transfer. + */ + unsigned long long row_id; + +}; + + +/** + * Information about one aggregation process to be executed. There is + * at most one of these around at any given point in time. + */ +struct AggregationUnit +{ + /** + * Public key of the merchant. + */ + struct TALER_MerchantPublicKeyP merchant_pub; + + /** + * Total amount to be transferred. + */ + struct TALER_Amount total_amount; + + /** + * Hash of @e wire. + */ + struct GNUNET_HashCode h_wire; + + /** + * Wire transfer identifier we use. + */ + struct TALER_WireTransferIdentifierRawP wtid; + + /** + * Row ID of the transaction that started it all. + */ + unsigned long long row_id; + + /** + * The current time. + */ + struct GNUNET_TIME_Absolute execution_time; + + /** + * Wire details of the merchant. + */ + json_t *wire; + + /** + * Database session for all of our transactions. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Wire preparation handle. + */ + struct TALER_WIRE_PrepareHandle *ph; + + /** + * Array of #aggregation_limit row_ids from the + * aggregation. + */ + unsigned long long *additional_rows; + + /** + * Offset specifying how many #additional_rows are in use. + */ + unsigned int rows_offset; + + /** + * Set to #GNUNET_YES if we have to abort due to failure. + */ + int failed; + +}; + + /** * Which currency is used by this exchange? */ @@ -63,11 +155,29 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct TALER_WIRE_Plugin *wire_plugin; /** - * Task for the main #run() function. + * Next task to run, if any. */ static struct GNUNET_SCHEDULER_Task *task; /** + * If we are currently executing a transfer, information about + * the active transfer is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd; + +/** + * If we are currently aggregating transactions, information about the + * active aggregation is here. Otherwise, this variable is NULL. + */ +static struct AggregationUnit *au; + +/** + * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR + * on serious errors. + */ +static int global_ret; + +/** * #GNUNET_YES if we are in test mode and are using temporary tables. */ static int test_mode; @@ -78,9 +188,59 @@ static int test_mode; * of the smallest possible unit are aggregated, they do surpass the * "tiny" threshold beyond which we never trigger a wire transaction! * - * TODO: make configurable (via config file or command line option) + * Note: do not change here, Postgres requires us to hard-code the + * LIMIT in the prepared statement. */ -static unsigned int aggregation_limit = 10000; +static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; + + +/** + * We're being aborted with CTRL-C (or SIGTERM). Shut down. + * + * @param cls closure + * @param tc scheduler context + */ +static void +shutdown_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if (NULL != task) + { + GNUNET_SCHEDULER_cancel (task); + task = NULL; + } + if (NULL != wpd) + { + if (NULL != wpd->eh) + { + wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls, + wpd->eh); + wpd->eh = NULL; + } + db_plugin->rollback (db_plugin->cls, + wpd->session); + GNUNET_free (wpd); + wpd = NULL; + } + if (NULL != au) + { + if (NULL != au->ph) + { + wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls, + au->ph); + au->ph = NULL; + } + db_plugin->rollback (db_plugin->cls, + au->session); + GNUNET_free_non_null (au->additional_rows); + if (NULL != au->wire) + json_decref (au->wire); + au = NULL; + GNUNET_free (au); + } + TALER_EXCHANGEDB_plugin_unload (db_plugin); + TALER_WIRE_plugin_unload (wire_plugin); +} /** @@ -159,85 +319,10 @@ exchange_serve_process_config (const char *exchange_directory) /** - * Information about one aggregation process to - * be executed. - */ -struct AggregationUnit -{ - /** - * Public key of the merchant. - */ - struct TALER_MerchantPublicKeyP merchant_pub; - - /** - * Total amount to be transferred. - */ - struct TALER_Amount total_amount; - - /** - * Hash of @e wire. - */ - struct GNUNET_HashCode h_wire; - - /** - * Wire transfer identifier we use. - */ - struct TALER_WireTransferIdentifierRawP wtid; - - /** - * Row ID of the transaction that started it all. - */ - unsigned long long row_id; - - /** - * The current time. - */ - struct GNUNET_TIME_Absolute execution_time; - - /** - * Wire details of the merchant. - */ - json_t *wire; - - /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire preparation handle. - */ - struct TALER_WIRE_PrepareHandle *ph; - - /** - * Array of #aggregation_limit row_ids from the - * aggregation. - */ - unsigned long long *additional_rows; - - /** - * Pointer to global return value. Closure for #run(). - */ - int *global_ret; - - /** - * Offset specifying how many #additional_rows are in use. - */ - unsigned int rows_offset; - - /** - * Set to #GNUNET_YES if we have to abort due to failure. - */ - int failed; - -}; - - -/** * Function called with details about deposits that have been made, * with the goal of executing the corresponding wire transaction. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin @@ -262,8 +347,6 @@ deposit_cb (void *cls, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { - struct AggregationUnit *au = cls; - au->merchant_pub = *merchant_pub; if (GNUNET_OK != TALER_amount_subtract (&au->total_amount, @@ -284,6 +367,9 @@ deposit_cb (void *cls, GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &au->wtid, sizeof (au->wtid)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting aggregation under WTID %s\n", + TALER_B2S (&au->wtid)); if (GNUNET_OK != db_plugin->insert_aggregation_tracking (db_plugin->cls, au->session, @@ -313,12 +399,11 @@ deposit_cb (void *cls, } - /** * Function called with details about another deposit we * can aggregate into an existing aggregation unit. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin @@ -343,7 +428,6 @@ aggregate_cb (void *cls, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { - struct AggregationUnit *au = cls; struct TALER_Amount delta; GNUNET_break (0 == @@ -432,27 +516,28 @@ prepare_cb (void *cls, * Main work function that queries the DB and aggregates transactions * into larger wire transfers. * - * @param cls pointer to an `int` which we will return from main() + * @param cls NULL * @param tc scheduler context */ static void run_aggregation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - int *global_ret = cls; struct TALER_EXCHANGEDB_Session *session; - struct AggregationUnit *au; unsigned int i; int ret; + task = NULL; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for ready deposits to aggregate\n"); if (NULL == (session = db_plugin->get_session (db_plugin->cls, test_mode))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database session!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } if (GNUNET_OK != @@ -461,7 +546,7 @@ run_aggregation (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } au = GNUNET_new (struct AggregationUnit); @@ -475,15 +560,18 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; db_plugin->rollback (db_plugin->cls, session); if (0 != ret) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No more ready deposits, going to sleep\n"); if (GNUNET_YES == test_mode) { /* in test mode, shutdown if we end up being idle */ @@ -494,11 +582,14 @@ run_aggregation (void *cls, /* nothing to do, sleep for a minute and try again */ task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &run_aggregation, - global_ret); + NULL); } return; } /* Now try to find other deposits to aggregate */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Found ready deposit for %s, aggregating\n", + TALER_B2S (&au->merchant_pub)); ret = db_plugin->iterate_matching_deposits (db_plugin->cls, session, &au->h_wire, @@ -515,9 +606,10 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; db_plugin->rollback (db_plugin->cls, session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } /* Round to the unit supported by the wire transfer method */ @@ -528,6 +620,8 @@ run_aggregation (void *cls, if ( (0 == au->total_amount.value) && (0 == au->total_amount.fraction) ) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Aggregate value too low for transfer\n"); /* Rollback ongoing transaction, as we will not use the respective WTID and thus need to remove the tracking data */ db_plugin->rollback (db_plugin->cls, @@ -540,11 +634,12 @@ run_aggregation (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; return; } /* Mark transactions by row_id as minor */ @@ -573,21 +668,27 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } - au->global_ret = global_ret; + { + char *amount_s; + + amount_s = TALER_amount_to_string (&au->total_amount); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Preparing wire transfer of %s to %s\n", + amount_s, + TALER_B2S (&au->merchant_pub)); + } au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls, au->wire, &au->total_amount, &au->wtid, &prepare_cb, au); - /* FIXME: currently we have no clean-up plan on - shutdown to call prepare_wire_transfer_cancel! - Maybe make 'au' global? */ if (NULL == au->ph) { GNUNET_break (0); /* why? how to best recover? */ @@ -596,10 +697,11 @@ run_aggregation (void *cls, GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); + au = NULL; GNUNET_free (au); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } /* otherwise we continue with #prepare_cb(), see below */ @@ -621,7 +723,7 @@ run_transfers (void *cls, /** * Function to be called with the prepared transfer data. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param buf transaction data to persist, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ @@ -630,14 +732,13 @@ prepare_cb (void *cls, const char *buf, size_t buf_size) { - struct AggregationUnit *au = cls; - int *global_ret = au->global_ret; struct TALER_EXCHANGEDB_Session *session = au->session; GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; if (NULL == buf) { GNUNET_break (0); /* why? how to best recover? */ @@ -645,7 +746,7 @@ prepare_cb (void *cls, session); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } @@ -662,7 +763,7 @@ prepare_cb (void *cls, session); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } @@ -676,50 +777,21 @@ prepare_cb (void *cls, "Failed to commit database transaction!\n"); /* try again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } - + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Preparation complete, switching to transfer mode\n"); /* run alternative task: actually do wire transfer! */ task = GNUNET_SCHEDULER_add_now (&run_transfers, - &global_ret); + NULL); } /** - * Data we keep to #run_transfers(). - */ -struct WirePrepareData -{ - - /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire execution handle. - */ - struct TALER_WIRE_ExecuteHandle *eh; - - /** - * Pointer to global return value. Closure for #run(). - */ - int *global_ret; - - - /** - * Row ID of the transfer. - */ - unsigned long long row_id; - -}; - - -/** * Function called with the result from the execute step. * - * @param cls closure with the `struct WirePrepareData` + * @param cls NULL * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure * @param emsg NULL on success, otherwise an error message */ @@ -728,8 +800,6 @@ wire_confirm_cb (void *cls, int success, const char *emsg) { - struct WirePrepareData *wpd = cls; - int *global_ret = wpd->global_ret; struct TALER_EXCHANGEDB_Session *session = wpd->session; wpd->eh = NULL; @@ -740,8 +810,9 @@ wire_confirm_cb (void *cls, emsg); db_plugin->rollback (db_plugin->cls, session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } if (GNUNET_OK != @@ -752,11 +823,13 @@ wire_confirm_cb (void *cls, GNUNET_break (0); /* why!? */ db_plugin->rollback (db_plugin->cls, session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } GNUNET_free (wpd); + wpd = NULL; if (GNUNET_OK != db_plugin->commit (db_plugin->cls, session)) @@ -765,13 +838,15 @@ wire_confirm_cb (void *cls, "Failed to commit database transaction!\n"); /* try again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); return; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Wire transfer complete\n"); /* continue with #run_transfers(), just to guard against the unlikely case that there are more. */ task = GNUNET_SCHEDULER_add_now (&run_transfers, - &global_ret); + NULL); } @@ -779,7 +854,7 @@ wire_confirm_cb (void *cls, /** * Callback with data about a prepared transaction. * - * @param cls closure with the `struct WirePrepareData` + * @param cls NULL * @param rowid row identifier used to mark prepared transaction as done * @param buf transaction data that was persisted, NULL on error * @param buf_size number of bytes in @a buf, 0 on error @@ -790,25 +865,23 @@ wire_prepare_cb (void *cls, const char *buf, size_t buf_size) { - struct WirePrepareData *wpd = cls; - int *global_ret = wpd->global_ret; - wpd->row_id = rowid; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting wire transfer %llu\n", + rowid); wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, buf, buf_size, &wire_confirm_cb, - wpd); - /* FIXME: currently we have no clean-up plan on - shutdown to call execute_wire_transfer_cancel! - Maybe make 'wpd' global? */ + NULL); if (NULL == wpd->eh) { GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, wpd->session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } } @@ -818,18 +891,19 @@ wire_prepare_cb (void *cls, * Execute the wire transfers that we have committed to * do. * - * @param cls pointer to an `int` which we will return from main() + * @param cls NULL * @param tc scheduler context */ static void run_transfers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - int *global_ret = cls; int ret; - struct WirePrepareData *wpd; struct TALER_EXCHANGEDB_Session *session; + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for pending wire transfers\n"); if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; if (NULL == (session = db_plugin->get_session (db_plugin->cls, @@ -837,7 +911,7 @@ run_transfers (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database session!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } if (GNUNET_OK != @@ -846,34 +920,37 @@ run_transfers (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; return; } wpd = GNUNET_new (struct WirePrepareData); wpd->session = session; - wpd->global_ret = global_ret; ret = db_plugin->wire_prepare_data_get (db_plugin->cls, session, exchange_wireformat, &wire_prepare_cb, - wpd); + NULL); if (GNUNET_SYSERR == ret) { GNUNET_break (0); /* why? how to best recover? */ db_plugin->rollback (db_plugin->cls, session); - *global_ret = GNUNET_SYSERR; + global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } if (GNUNET_NO == ret) { /* no more prepared wire transfers, go back to aggregation! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No more pending wire transfers, starting aggregation\n"); db_plugin->rollback (db_plugin->cls, session); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - global_ret); + NULL); GNUNET_free (wpd); + wpd = NULL; return; } /* otherwise, continues in #wire_prepare_cb() */ @@ -881,6 +958,24 @@ run_transfers (void *cls, /** + * First task. + * + * @param cls closure, NULL + * @param tc scheduler context + */ +static void +run (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, + &shutdown_task, + cls); +} + + +/** * The main function of the taler-exchange-httpd server ("the exchange"). * * @param argc number of arguments from the command line @@ -906,7 +1001,6 @@ main (int argc, GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; - int ret = GNUNET_OK; GNUNET_assert (GNUNET_OK == GNUNET_log_setup ("taler-exchange-aggregator", @@ -928,12 +1022,10 @@ main (int argc, { return 1; } + global_ret = GNUNET_OK; + GNUNET_SCHEDULER_run (&run, NULL); - GNUNET_SCHEDULER_run (&run_transfers, &ret); - - TALER_EXCHANGEDB_plugin_unload (db_plugin); - TALER_WIRE_plugin_unload (wire_plugin); - return (GNUNET_SYSERR == ret) ? 1 : 0; + return (GNUNET_SYSERR == global_ret) ? 1 : 0; } /* end of taler-exchange-aggregator.c */ diff --git a/src/exchange/test_taler_exchange_aggregator.c b/src/exchange/test_taler_exchange_aggregator.c index 22bc8e277..f34dea837 100644 --- a/src/exchange/test_taler_exchange_aggregator.c +++ b/src/exchange/test_taler_exchange_aggregator.c @@ -668,25 +668,19 @@ run_test () .label = "run-aggregator-deposit-1" }, - /* The above step is already known to fail (with an error message) - right now, so we skip the rest of the test. */ - { - .opcode = OPCODE_TERMINATE_SKIP, - .label = "testcase-incomplete-terminating-with-skip" - }, - - { .opcode = OPCODE_EXPECT_TRANSACTION, .label = "expect-deposit-1", - .details.expect_transaction.debit_account = 1, + .details.expect_transaction.debit_account = 3, .details.expect_transaction.credit_account = 4, .details.expect_transaction.amount = "EUR:1" }, + { .opcode = OPCODE_EXPECT_TRANSACTIONS_EMPTY, .label = "expect-empty-transactions-on-start" }, + /* test idempotency: run again on transactions already done */ { .opcode = OPCODE_DATABASE_DEPOSIT, @@ -704,6 +698,11 @@ run_test () }, { + .opcode = OPCODE_TERMINATE_SUCCESS, + .label = "testcase-incomplete-terminating-with-skip" + }, + + { .opcode = OPCODE_TERMINATE_SKIP, .label = "testcase-incomplete-terminating-with-skip" }, @@ -804,7 +803,6 @@ handle_mhd_request (void *cls, GNUNET_break_op (0); return MHD_NO; } - /* FIXME: to be implemented! */ pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX, con_cls, upload_data, @@ -848,6 +846,10 @@ handle_mhd_request (void *cls, transactions_tail, t); } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Receiving incoming wire transfer: %llu->%llu\n", + (unsigned long long) t->debit_account, + (unsigned long long) t->credit_account); json_decref (json); resp = MHD_create_response_from_buffer (0, "", MHD_RESPMEM_PERSISTENT); ret = MHD_queue_response (connection, diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index b3637346b..914153f6f 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -80,6 +80,7 @@ test_exchangedb_postgres_SOURCES = \ test_exchangedb.c test_exchangedb_postgres_LDADD = \ libtalerexchangedb.la \ + $(top_builddir)/src/json/libtalerjson.la \ $(top_srcdir)/src/util/libtalerutil.la \ $(top_srcdir)/src/pq/libtalerpq.la \ -lgnunetutil -ljansson diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 6807e7563..c38c0827f 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -952,7 +952,7 @@ postgres_prepare (PGconn *db_conn) " tiny=false AND" " done=false" " ORDER BY wire_deadline ASC" - " LIMIT 1;", + " LIMIT 1", 0, NULL); /* Used in #postgres_iterate_matching_deposits() */ @@ -975,8 +975,8 @@ postgres_prepare (PGconn *db_conn) " h_wire=$2 AND" " done=false" " ORDER BY wire_deadline ASC" - " LIMIT $3", - 3, NULL); + " LIMIT " TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT_STR, + 2, NULL); /* Used in #postgres_mark_deposit_tiny() */ PREPARE ("mark_deposit_tiny", @@ -2336,7 +2336,6 @@ postgres_iterate_matching_deposits (void *cls, struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (merchant_pub), GNUNET_PQ_query_param_auto_from_type (h_wire), - GNUNET_PQ_query_param_uint32 (&limit), GNUNET_PQ_query_param_end }; PGresult *result; @@ -2344,8 +2343,8 @@ postgres_iterate_matching_deposits (void *cls, unsigned int n; result = GNUNET_PQ_exec_prepared (session->conn, - "deposits_iterate_matching", - params); + "deposits_iterate_matching", + params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { @@ -2366,28 +2365,25 @@ postgres_iterate_matching_deposits (void *cls, struct TALER_Amount deposit_fee; struct GNUNET_TIME_Absolute wire_deadline; struct GNUNET_HashCode h_contract; - struct TALER_MerchantPublicKeyP merchant_pub; struct TALER_CoinSpendPublicKeyP coin_pub; uint64_t transaction_id; uint64_t serial_id; int ret; struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_uint64 ("serial_id", - &serial_id), + &serial_id), GNUNET_PQ_result_spec_uint64 ("transaction_id", - &transaction_id), + &transaction_id), TALER_PQ_result_spec_amount ("amount_with_fee", &amount_with_fee), TALER_PQ_result_spec_amount ("deposit_fee", &deposit_fee), GNUNET_PQ_result_spec_absolute_time ("wire_deadline", - &wire_deadline), + &wire_deadline), GNUNET_PQ_result_spec_auto_from_type ("h_contract", - &h_contract), - GNUNET_PQ_result_spec_auto_from_type ("merchant_pub", - &merchant_pub), + &h_contract), GNUNET_PQ_result_spec_auto_from_type ("coin_pub", - &coin_pub), + &coin_pub), GNUNET_PQ_result_spec_end }; if (GNUNET_OK != @@ -2399,7 +2395,7 @@ postgres_iterate_matching_deposits (void *cls, } ret = deposit_cb (deposit_cb_cls, serial_id, - &merchant_pub, + merchant_pub, &coin_pub, &amount_with_fee, &deposit_fee, diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index f33bb18c3..8d06c0072 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -20,6 +20,7 @@ */ #include "platform.h" #include "taler_exchangedb_lib.h" +#include "taler_json_lib.h" #include "taler_exchangedb_plugin.h" static int result; @@ -547,6 +548,70 @@ cb_wtid_check (void *cls, /** + * Function called with details about deposits that + * have been made. Called in the test on the + * deposit given in @a cls. + * + * @param cls closure a `struct TALER_EXCHANGEDB_Deposit *` + * @param rowid unique ID for the deposit in our DB, used for marking + * it as 'tiny' or 'done' + * @param merchant_pub public key of the merchant + * @param coin_pub public key of the coin + * @param amount_with_fee amount that was deposited including fee + * @param deposit_fee amount the exchange gets to keep as transaction fees + * @param transaction_id unique transaction ID chosen by the merchant + * @param h_contract hash of the contract between merchant and customer + * @param wire_deadline by which the merchant adviced that he would like the + * wire transfer to be executed + * @param wire wire details for the merchant, NULL from iterate_matching_deposits() + * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR if deposit does + * not match our expectations + */ +static int +deposit_cb (void *cls, + unsigned long long rowid, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_CoinSpendPublicKeyP *coin_pub, + const struct TALER_Amount *amount_with_fee, + const struct TALER_Amount *deposit_fee, + uint64_t transaction_id, + const struct GNUNET_HashCode *h_contract, + struct GNUNET_TIME_Absolute wire_deadline, + const json_t *wire) +{ + struct TALER_EXCHANGEDB_Deposit *deposit = cls; + struct GNUNET_HashCode h_wire; + + if (NULL != wire) + TALER_JSON_hash (wire, &h_wire); + if ( (0 != memcmp (merchant_pub, + &deposit->merchant_pub, + sizeof (struct TALER_MerchantPublicKeyP))) || + (0 != TALER_amount_cmp (amount_with_fee, + &deposit->amount_with_fee)) || + (0 != TALER_amount_cmp (deposit_fee, + &deposit->deposit_fee)) || + (0 != memcmp (h_contract, + &deposit->h_contract, + sizeof (struct GNUNET_HashCode))) || + (0 != memcmp (coin_pub, + &deposit->coin.coin_pub, + sizeof (struct TALER_CoinSpendPublicKeyP))) || + (transaction_id != deposit->transaction_id) || + ( (NULL != wire) && + (0 != memcmp (&h_wire, + &deposit->h_wire, + sizeof (struct GNUNET_HashCode))) ) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** * Main function that will be run by the scheduler. * * @param cls closure with config @@ -739,14 +804,16 @@ run (void *cls, RND_BLK (&deposit.csig); RND_BLK (&deposit.merchant_pub); RND_BLK (&deposit.h_contract); - RND_BLK (&deposit.h_wire); wire = json_loads (json_wire_str, 0, NULL); + TALER_JSON_hash (wire, + &deposit.h_wire); deposit.wire = wire; deposit.transaction_id = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); deposit.amount_with_fee = value; GNUNET_assert (GNUNET_OK == TALER_amount_get_zero (CURRENCY, &deposit.deposit_fee)); + result = 8; FAILIF (GNUNET_OK != plugin->insert_deposit (plugin->cls, session, &deposit)); @@ -754,6 +821,15 @@ run (void *cls, plugin->have_deposit (plugin->cls, session, &deposit)); + result = 9; + FAILIF (1 != + plugin->iterate_matching_deposits (plugin->cls, + session, + &deposit.h_wire, + &deposit.merchant_pub, + &deposit_cb, &deposit, + 2)); + result = 10; deposit2 = deposit; deposit2.transaction_id++; /* should fail if transaction id is different */ FAILIF (GNUNET_NO != @@ -880,6 +956,9 @@ main (int argc, GNUNET_break (0); return -1; } + GNUNET_log_setup (argv[0], + "WARNING", + NULL); plugin_name++; (void) GNUNET_asprintf (&testname, "test-exchange-db-%s", plugin_name); diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index cb1dcb344..3646981cd 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -947,6 +947,17 @@ struct TALER_EXCHANGEDB_Plugin void *deposit_cb_cls); +/** + * Maximum number of results we return from iterate_matching_deposits(). + * + * Limit on the number of transactions we aggregate at once. Note + * that the limit must be big enough to ensure that when transactions + * of the smallest possible unit are aggregated, they do surpass the + * "tiny" threshold beyond which we never trigger a wire transaction! + */ +#define TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT 10000 +#define TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT_STR "10000" + /** * Obtain information about other pending deposits for the same * destination. Those deposits must not already be "done". @@ -957,7 +968,9 @@ struct TALER_EXCHANGEDB_Plugin * @param merchant_pub public key of the merchant * @param deposit_cb function to call for each deposit * @param deposit_cb_cls closure for @a deposit_cb - * @param limit maximum number of matching deposits to return + * @param limit maximum number of matching deposits to return; should + * be #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, larger values + * are not supported, smaller values would be inefficient. * @return number of rows processed, 0 if none exist, * #GNUNET_SYSERR on error */ diff --git a/src/include/taler_json_lib.h b/src/include/taler_json_lib.h index 3dd661a91..79589dba7 100644 --- a/src/include/taler_json_lib.h +++ b/src/include/taler_json_lib.h @@ -89,7 +89,7 @@ TALER_JSON_spec_denomination_signature (const char *field, * @return #GNUNET_OK on success, #GNUNET_SYSERR on error */ int -TALER_JSON_hash (json_t *json, +TALER_JSON_hash (const json_t *json, struct GNUNET_HashCode *hc); #endif /* TALER_JSON_LIB_H_ */ diff --git a/src/json/json.c b/src/json/json.c index 1fbc59a73..da9e8b9f8 100644 --- a/src/json/json.c +++ b/src/json/json.c @@ -32,7 +32,7 @@ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error */ int -TALER_JSON_hash (json_t *json, +TALER_JSON_hash (const json_t *json, struct GNUNET_HashCode *hc) { char *wire_enc; diff --git a/src/wire/plugin_wire_test.c b/src/wire/plugin_wire_test.c index 357449af6..9d0667680 100644 --- a/src/wire/plugin_wire_test.c +++ b/src/wire/plugin_wire_test.c @@ -178,7 +178,7 @@ context_task (void *cls, rs, ws, &context_task, - cls); + tc); GNUNET_NETWORK_fdset_destroy (rs); GNUNET_NETWORK_fdset_destroy (ws); } |