aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/exchange/taler-exchange-aggregator.c279
1 files changed, 158 insertions, 121 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 96922eff2..3726d32fa 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -18,9 +18,6 @@
* @file taler-exchange-aggregator.c
* @brief Process that aggregates outgoing transactions and executes them
* @author Christian Grothoff
- *
- * TODO:
- * - handle shutdown more nicely (call 'cancel' method on wire transfers)
*/
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
@@ -31,6 +28,102 @@
#include "taler_json_lib.h"
#include "taler_wire_lib.h"
+
+/**
+ * Data we keep to #run_transfers(). There is at most
+ * one of these around at any given point in time.
+ */
+struct WirePrepareData
+{
+
+ /**
+ * Database session for all of our transactions.
+ */
+ struct TALER_EXCHANGEDB_Session *session;
+
+ /**
+ * Wire execution handle.
+ */
+ struct TALER_WIRE_ExecuteHandle *eh;
+
+ /**
+ * Row ID of the transfer.
+ */
+ unsigned long long row_id;
+
+};
+
+
+/**
+ * Information about one aggregation process to be executed. There is
+ * at most one of these around at any given point in time.
+ */
+struct AggregationUnit
+{
+ /**
+ * Public key of the merchant.
+ */
+ struct TALER_MerchantPublicKeyP merchant_pub;
+
+ /**
+ * Total amount to be transferred.
+ */
+ struct TALER_Amount total_amount;
+
+ /**
+ * Hash of @e wire.
+ */
+ struct GNUNET_HashCode h_wire;
+
+ /**
+ * Wire transfer identifier we use.
+ */
+ struct TALER_WireTransferIdentifierRawP wtid;
+
+ /**
+ * Row ID of the transaction that started it all.
+ */
+ unsigned long long row_id;
+
+ /**
+ * The current time.
+ */
+ struct GNUNET_TIME_Absolute execution_time;
+
+ /**
+ * Wire details of the merchant.
+ */
+ json_t *wire;
+
+ /**
+ * Database session for all of our transactions.
+ */
+ struct TALER_EXCHANGEDB_Session *session;
+
+ /**
+ * Wire preparation handle.
+ */
+ struct TALER_WIRE_PrepareHandle *ph;
+
+ /**
+ * Array of #aggregation_limit row_ids from the
+ * aggregation.
+ */
+ unsigned long long *additional_rows;
+
+ /**
+ * Offset specifying how many #additional_rows are in use.
+ */
+ unsigned int rows_offset;
+
+ /**
+ * Set to #GNUNET_YES if we have to abort due to failure.
+ */
+ int failed;
+
+};
+
+
/**
* Which currency is used by this exchange?
*/
@@ -62,11 +155,23 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct TALER_WIRE_Plugin *wire_plugin;
/**
- * Task for the main #run() function.
+ * Next task to run, if any.
*/
static struct GNUNET_SCHEDULER_Task *task;
/**
+ * If we are currently executing a transfer, information about
+ * the active transfer is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd;
+
+/**
+ * If we are currently aggregating transactions, information about the
+ * active aggregation is here. Otherwise, this variable is NULL.
+ */
+static struct AggregationUnit *au;
+
+/**
* Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
* on serious errors.
*/
@@ -104,7 +209,36 @@ shutdown_task (void *cls,
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
- /* FIXME: other shutdown stuff here! */
+ if (NULL != wpd)
+ {
+ if (NULL != wpd->eh)
+ {
+ wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls,
+ wpd->eh);
+ wpd->eh = NULL;
+ }
+ db_plugin->rollback (db_plugin->cls,
+ wpd->session);
+ GNUNET_free (wpd);
+ wpd = NULL;
+ }
+ if (NULL != au)
+ {
+ if (NULL != au->ph)
+ {
+ wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls,
+ au->ph);
+ au->ph = NULL;
+ }
+ db_plugin->rollback (db_plugin->cls,
+ au->session);
+ GNUNET_free_non_null (au->additional_rows);
+ if (NULL != au->wire)
+ json_decref (au->wire);
+ au = NULL;
+ GNUNET_free (au);
+
+ }
}
@@ -184,80 +318,10 @@ exchange_serve_process_config (const char *exchange_directory)
/**
- * Information about one aggregation process to
- * be executed.
- */
-struct AggregationUnit
-{
- /**
- * Public key of the merchant.
- */
- struct TALER_MerchantPublicKeyP merchant_pub;
-
- /**
- * Total amount to be transferred.
- */
- struct TALER_Amount total_amount;
-
- /**
- * Hash of @e wire.
- */
- struct GNUNET_HashCode h_wire;
-
- /**
- * Wire transfer identifier we use.
- */
- struct TALER_WireTransferIdentifierRawP wtid;
-
- /**
- * Row ID of the transaction that started it all.
- */
- unsigned long long row_id;
-
- /**
- * The current time.
- */
- struct GNUNET_TIME_Absolute execution_time;
-
- /**
- * Wire details of the merchant.
- */
- json_t *wire;
-
- /**
- * Database session for all of our transactions.
- */
- struct TALER_EXCHANGEDB_Session *session;
-
- /**
- * Wire preparation handle.
- */
- struct TALER_WIRE_PrepareHandle *ph;
-
- /**
- * Array of #aggregation_limit row_ids from the
- * aggregation.
- */
- unsigned long long *additional_rows;
-
- /**
- * Offset specifying how many #additional_rows are in use.
- */
- unsigned int rows_offset;
-
- /**
- * Set to #GNUNET_YES if we have to abort due to failure.
- */
- int failed;
-
-};
-
-
-/**
* Function called with details about deposits that have been made,
* with the goal of executing the corresponding wire transaction.
*
- * @param cls closure with the `struct AggregationUnit`
+ * @param cls NULL
* @param row_id identifies database entry
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
@@ -282,8 +346,6 @@ deposit_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire)
{
- struct AggregationUnit *au = cls;
-
au->merchant_pub = *merchant_pub;
if (GNUNET_OK !=
TALER_amount_subtract (&au->total_amount,
@@ -337,7 +399,7 @@ deposit_cb (void *cls,
* Function called with details about another deposit we
* can aggregate into an existing aggregation unit.
*
- * @param cls closure with the `struct AggregationUnit`
+ * @param cls NULL
* @param row_id identifies database entry
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
@@ -362,7 +424,6 @@ aggregate_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire)
{
- struct AggregationUnit *au = cls;
struct TALER_Amount delta;
GNUNET_break (0 ==
@@ -459,7 +520,6 @@ run_aggregation (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct TALER_EXCHANGEDB_Session *session;
- struct AggregationUnit *au;
unsigned int i;
int ret;
@@ -493,6 +553,7 @@ run_aggregation (void *cls,
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
db_plugin->rollback (db_plugin->cls,
session);
if (0 != ret)
@@ -533,6 +594,7 @@ run_aggregation (void *cls,
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
db_plugin->rollback (db_plugin->cls,
session);
global_ret = GNUNET_SYSERR;
@@ -563,6 +625,7 @@ run_aggregation (void *cls,
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
return;
}
/* Mark transactions by row_id as minor */
@@ -591,6 +654,7 @@ run_aggregation (void *cls,
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
@@ -602,9 +666,6 @@ run_aggregation (void *cls,
&au->wtid,
&prepare_cb,
au);
- /* FIXME: currently we have no clean-up plan on
- shutdown to call prepare_wire_transfer_cancel!
- Maybe make 'au' global? */
if (NULL == au->ph)
{
GNUNET_break (0); /* why? how to best recover? */
@@ -613,6 +674,7 @@ run_aggregation (void *cls,
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
+ au = NULL;
GNUNET_free (au);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
@@ -638,7 +700,7 @@ run_transfers (void *cls,
/**
* Function to be called with the prepared transfer data.
*
- * @param cls closure with the `struct AggregationUnit`
+ * @param cls NULL
* @param buf transaction data to persist, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error
*/
@@ -647,13 +709,13 @@ prepare_cb (void *cls,
const char *buf,
size_t buf_size)
{
- struct AggregationUnit *au = cls;
struct TALER_EXCHANGEDB_Session *session = au->session;
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
+ au = NULL;
if (NULL == buf)
{
GNUNET_break (0); /* why? how to best recover? */
@@ -703,33 +765,9 @@ prepare_cb (void *cls,
/**
- * Data we keep to #run_transfers().
- */
-struct WirePrepareData
-{
-
- /**
- * Database session for all of our transactions.
- */
- struct TALER_EXCHANGEDB_Session *session;
-
- /**
- * Wire execution handle.
- */
- struct TALER_WIRE_ExecuteHandle *eh;
-
- /**
- * Row ID of the transfer.
- */
- unsigned long long row_id;
-
-};
-
-
-/**
* Function called with the result from the execute step.
*
- * @param cls closure with the `struct WirePrepareData`
+ * @param cls NULL
* @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
* @param emsg NULL on success, otherwise an error message
*/
@@ -738,7 +776,6 @@ wire_confirm_cb (void *cls,
int success,
const char *emsg)
{
- struct WirePrepareData *wpd = cls;
struct TALER_EXCHANGEDB_Session *session = wpd->session;
wpd->eh = NULL;
@@ -751,6 +788,7 @@ wire_confirm_cb (void *cls,
session);
global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
if (GNUNET_OK !=
@@ -763,9 +801,11 @@ wire_confirm_cb (void *cls,
session);
global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
GNUNET_free (wpd);
+ wpd = NULL;
if (GNUNET_OK !=
db_plugin->commit (db_plugin->cls,
session))
@@ -788,7 +828,7 @@ wire_confirm_cb (void *cls,
/**
* Callback with data about a prepared transaction.
*
- * @param cls closure with the `struct WirePrepareData`
+ * @param cls NULL
* @param rowid row identifier used to mark prepared transaction as done
* @param buf transaction data that was persisted, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error
@@ -799,17 +839,12 @@ wire_prepare_cb (void *cls,
const char *buf,
size_t buf_size)
{
- struct WirePrepareData *wpd = cls;
-
wpd->row_id = rowid;
wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls,
buf,
buf_size,
&wire_confirm_cb,
- wpd);
- /* FIXME: currently we have no clean-up plan on
- shutdown to call execute_wire_transfer_cancel!
- Maybe make 'wpd' global? */
+ NULL);
if (NULL == wpd->eh)
{
GNUNET_break (0); /* why? how to best recover? */
@@ -817,6 +852,7 @@ wire_prepare_cb (void *cls,
wpd->session);
global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
}
@@ -834,7 +870,6 @@ run_transfers (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
int ret;
- struct WirePrepareData *wpd;
struct TALER_EXCHANGEDB_Session *session;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
@@ -862,7 +897,7 @@ run_transfers (void *cls,
session,
exchange_wireformat,
&wire_prepare_cb,
- wpd);
+ NULL);
if (GNUNET_SYSERR == ret)
{
GNUNET_break (0); /* why? how to best recover? */
@@ -870,6 +905,7 @@ run_transfers (void *cls,
session);
global_ret = GNUNET_SYSERR;
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
if (GNUNET_NO == ret)
@@ -880,6 +916,7 @@ run_transfers (void *cls,
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
GNUNET_free (wpd);
+ wpd = NULL;
return;
}
/* otherwise, continues in #wire_prepare_cb() */