diff options
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 279 |
1 files changed, 158 insertions, 121 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 96922eff2..3726d32fa 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -18,9 +18,6 @@ * @file taler-exchange-aggregator.c * @brief Process that aggregates outgoing transactions and executes them * @author Christian Grothoff - * - * TODO: - * - handle shutdown more nicely (call 'cancel' method on wire transfers) */ #include "platform.h" #include <gnunet/gnunet_util_lib.h> @@ -31,6 +28,102 @@ #include "taler_json_lib.h" #include "taler_wire_lib.h" + +/** + * Data we keep to #run_transfers(). There is at most + * one of these around at any given point in time. + */ +struct WirePrepareData +{ + + /** + * Database session for all of our transactions. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Wire execution handle. + */ + struct TALER_WIRE_ExecuteHandle *eh; + + /** + * Row ID of the transfer. + */ + unsigned long long row_id; + +}; + + +/** + * Information about one aggregation process to be executed. There is + * at most one of these around at any given point in time. + */ +struct AggregationUnit +{ + /** + * Public key of the merchant. + */ + struct TALER_MerchantPublicKeyP merchant_pub; + + /** + * Total amount to be transferred. + */ + struct TALER_Amount total_amount; + + /** + * Hash of @e wire. + */ + struct GNUNET_HashCode h_wire; + + /** + * Wire transfer identifier we use. + */ + struct TALER_WireTransferIdentifierRawP wtid; + + /** + * Row ID of the transaction that started it all. + */ + unsigned long long row_id; + + /** + * The current time. + */ + struct GNUNET_TIME_Absolute execution_time; + + /** + * Wire details of the merchant. + */ + json_t *wire; + + /** + * Database session for all of our transactions. + */ + struct TALER_EXCHANGEDB_Session *session; + + /** + * Wire preparation handle. + */ + struct TALER_WIRE_PrepareHandle *ph; + + /** + * Array of #aggregation_limit row_ids from the + * aggregation. + */ + unsigned long long *additional_rows; + + /** + * Offset specifying how many #additional_rows are in use. + */ + unsigned int rows_offset; + + /** + * Set to #GNUNET_YES if we have to abort due to failure. + */ + int failed; + +}; + + /** * Which currency is used by this exchange? */ @@ -62,11 +155,23 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct TALER_WIRE_Plugin *wire_plugin; /** - * Task for the main #run() function. + * Next task to run, if any. */ static struct GNUNET_SCHEDULER_Task *task; /** + * If we are currently executing a transfer, information about + * the active transfer is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd; + +/** + * If we are currently aggregating transactions, information about the + * active aggregation is here. Otherwise, this variable is NULL. + */ +static struct AggregationUnit *au; + +/** * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR * on serious errors. */ @@ -104,7 +209,36 @@ shutdown_task (void *cls, GNUNET_SCHEDULER_cancel (task); task = NULL; } - /* FIXME: other shutdown stuff here! */ + if (NULL != wpd) + { + if (NULL != wpd->eh) + { + wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls, + wpd->eh); + wpd->eh = NULL; + } + db_plugin->rollback (db_plugin->cls, + wpd->session); + GNUNET_free (wpd); + wpd = NULL; + } + if (NULL != au) + { + if (NULL != au->ph) + { + wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls, + au->ph); + au->ph = NULL; + } + db_plugin->rollback (db_plugin->cls, + au->session); + GNUNET_free_non_null (au->additional_rows); + if (NULL != au->wire) + json_decref (au->wire); + au = NULL; + GNUNET_free (au); + + } } @@ -184,80 +318,10 @@ exchange_serve_process_config (const char *exchange_directory) /** - * Information about one aggregation process to - * be executed. - */ -struct AggregationUnit -{ - /** - * Public key of the merchant. - */ - struct TALER_MerchantPublicKeyP merchant_pub; - - /** - * Total amount to be transferred. - */ - struct TALER_Amount total_amount; - - /** - * Hash of @e wire. - */ - struct GNUNET_HashCode h_wire; - - /** - * Wire transfer identifier we use. - */ - struct TALER_WireTransferIdentifierRawP wtid; - - /** - * Row ID of the transaction that started it all. - */ - unsigned long long row_id; - - /** - * The current time. - */ - struct GNUNET_TIME_Absolute execution_time; - - /** - * Wire details of the merchant. - */ - json_t *wire; - - /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire preparation handle. - */ - struct TALER_WIRE_PrepareHandle *ph; - - /** - * Array of #aggregation_limit row_ids from the - * aggregation. - */ - unsigned long long *additional_rows; - - /** - * Offset specifying how many #additional_rows are in use. - */ - unsigned int rows_offset; - - /** - * Set to #GNUNET_YES if we have to abort due to failure. - */ - int failed; - -}; - - -/** * Function called with details about deposits that have been made, * with the goal of executing the corresponding wire transaction. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin @@ -282,8 +346,6 @@ deposit_cb (void *cls, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { - struct AggregationUnit *au = cls; - au->merchant_pub = *merchant_pub; if (GNUNET_OK != TALER_amount_subtract (&au->total_amount, @@ -337,7 +399,7 @@ deposit_cb (void *cls, * Function called with details about another deposit we * can aggregate into an existing aggregation unit. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param row_id identifies database entry * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin @@ -362,7 +424,6 @@ aggregate_cb (void *cls, struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { - struct AggregationUnit *au = cls; struct TALER_Amount delta; GNUNET_break (0 == @@ -459,7 +520,6 @@ run_aggregation (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct TALER_EXCHANGEDB_Session *session; - struct AggregationUnit *au; unsigned int i; int ret; @@ -493,6 +553,7 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; db_plugin->rollback (db_plugin->cls, session); if (0 != ret) @@ -533,6 +594,7 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; db_plugin->rollback (db_plugin->cls, session); global_ret = GNUNET_SYSERR; @@ -563,6 +625,7 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; return; } /* Mark transactions by row_id as minor */ @@ -591,6 +654,7 @@ run_aggregation (void *cls, if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); @@ -602,9 +666,6 @@ run_aggregation (void *cls, &au->wtid, &prepare_cb, au); - /* FIXME: currently we have no clean-up plan on - shutdown to call prepare_wire_transfer_cancel! - Maybe make 'au' global? */ if (NULL == au->ph) { GNUNET_break (0); /* why? how to best recover? */ @@ -613,6 +674,7 @@ run_aggregation (void *cls, GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); + au = NULL; GNUNET_free (au); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -638,7 +700,7 @@ run_transfers (void *cls, /** * Function to be called with the prepared transfer data. * - * @param cls closure with the `struct AggregationUnit` + * @param cls NULL * @param buf transaction data to persist, NULL on error * @param buf_size number of bytes in @a buf, 0 on error */ @@ -647,13 +709,13 @@ prepare_cb (void *cls, const char *buf, size_t buf_size) { - struct AggregationUnit *au = cls; struct TALER_EXCHANGEDB_Session *session = au->session; GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); GNUNET_free (au); + au = NULL; if (NULL == buf) { GNUNET_break (0); /* why? how to best recover? */ @@ -703,33 +765,9 @@ prepare_cb (void *cls, /** - * Data we keep to #run_transfers(). - */ -struct WirePrepareData -{ - - /** - * Database session for all of our transactions. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire execution handle. - */ - struct TALER_WIRE_ExecuteHandle *eh; - - /** - * Row ID of the transfer. - */ - unsigned long long row_id; - -}; - - -/** * Function called with the result from the execute step. * - * @param cls closure with the `struct WirePrepareData` + * @param cls NULL * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure * @param emsg NULL on success, otherwise an error message */ @@ -738,7 +776,6 @@ wire_confirm_cb (void *cls, int success, const char *emsg) { - struct WirePrepareData *wpd = cls; struct TALER_EXCHANGEDB_Session *session = wpd->session; wpd->eh = NULL; @@ -751,6 +788,7 @@ wire_confirm_cb (void *cls, session); global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } if (GNUNET_OK != @@ -763,9 +801,11 @@ wire_confirm_cb (void *cls, session); global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } GNUNET_free (wpd); + wpd = NULL; if (GNUNET_OK != db_plugin->commit (db_plugin->cls, session)) @@ -788,7 +828,7 @@ wire_confirm_cb (void *cls, /** * Callback with data about a prepared transaction. * - * @param cls closure with the `struct WirePrepareData` + * @param cls NULL * @param rowid row identifier used to mark prepared transaction as done * @param buf transaction data that was persisted, NULL on error * @param buf_size number of bytes in @a buf, 0 on error @@ -799,17 +839,12 @@ wire_prepare_cb (void *cls, const char *buf, size_t buf_size) { - struct WirePrepareData *wpd = cls; - wpd->row_id = rowid; wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, buf, buf_size, &wire_confirm_cb, - wpd); - /* FIXME: currently we have no clean-up plan on - shutdown to call execute_wire_transfer_cancel! - Maybe make 'wpd' global? */ + NULL); if (NULL == wpd->eh) { GNUNET_break (0); /* why? how to best recover? */ @@ -817,6 +852,7 @@ wire_prepare_cb (void *cls, wpd->session); global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } } @@ -834,7 +870,6 @@ run_transfers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { int ret; - struct WirePrepareData *wpd; struct TALER_EXCHANGEDB_Session *session; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) @@ -862,7 +897,7 @@ run_transfers (void *cls, session, exchange_wireformat, &wire_prepare_cb, - wpd); + NULL); if (GNUNET_SYSERR == ret) { GNUNET_break (0); /* why? how to best recover? */ @@ -870,6 +905,7 @@ run_transfers (void *cls, session); global_ret = GNUNET_SYSERR; GNUNET_free (wpd); + wpd = NULL; return; } if (GNUNET_NO == ret) @@ -880,6 +916,7 @@ run_transfers (void *cls, task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); GNUNET_free (wpd); + wpd = NULL; return; } /* otherwise, continues in #wire_prepare_cb() */ |