aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-09-30 21:28:17 +0200
committerChristian Grothoff <christian@grothoff.org>2017-09-30 21:28:17 +0200
commit85a2d3dc0e6938ca751bd72cf556173645e09e78 (patch)
treea608b5920d66b6728323b91ae9cb5d842c41accb
parent6a4f6b183639b543a1406ebdb8a1111aee42fc72 (diff)
modify wire auditor to deal with asynchrony of WIRE plugin API
-rw-r--r--src/auditor/taler-wire-auditor.c363
1 files changed, 191 insertions, 172 deletions
diff --git a/src/auditor/taler-wire-auditor.c b/src/auditor/taler-wire-auditor.c
index 19a48bc60..c1323ea9d 100644
--- a/src/auditor/taler-wire-auditor.c
+++ b/src/auditor/taler-wire-auditor.c
@@ -88,10 +88,67 @@ static struct TALER_MasterPublicKeyP master_pub;
static struct TALER_WIRE_Plugin *wp;
/**
+ * Active wire request for the transaction history.
+ */
+static struct TALER_WIRE_HistoryHandle *hh;
+
+/**
+ * Query status for the incremental processing status in the auditordb.
+ */
+static enum GNUNET_DB_QueryStatus qsx;
+
+/**
* Last reserve_in / reserve_out serial IDs seen.
*/
static struct TALER_AUDITORDB_WireProgressPoint pp;
+/**
+ * Where we are in the inbound (CREDIT) transaction history.
+ */
+static void *in_wire_off;
+
+/**
+ * Where we are in the inbound (DEBIT) transaction history.
+ */
+static void *out_wire_off;
+
+/**
+ * Number of bytes in #in_wire_off and #out_wire_off.
+ */
+static size_t wire_off_size;
+
+
+/* ***************************** Shutdown **************************** */
+
+/**
+ * Task run on shutdown.
+ */
+static void
+do_shutdown ()
+{
+ if (NULL != hh)
+ {
+ wp->get_history_cancel (wp->cls,
+ hh);
+ hh = NULL;
+ }
+ if (NULL != wp)
+ {
+ TALER_WIRE_plugin_unload (wp);
+ wp = NULL;
+ }
+ if (NULL != adb)
+ {
+ TALER_AUDITORDB_plugin_unload (adb);
+ adb = NULL;
+ }
+ if (NULL != edb)
+ {
+ TALER_EXCHANGEDB_plugin_unload (edb);
+ edb = NULL;
+ }
+}
+
/* ***************************** Report logic **************************** */
@@ -140,107 +197,18 @@ report_row_minor_inconsistency (const char *table,
#endif
-/* ***************************** Analyze reserves_in ************************ */
-/* This logic checks the reserves_in table */
-
-/**
- * Analyze reserves for being well-formed.
- *
- * @param cls NULL
- * @return transaction status code
- */
-static enum GNUNET_DB_QueryStatus
-analyze_reserves_in (void *cls)
-{
- /* FIXME: #4958 */
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-}
-
-
-/* ***************************** Analyze reserves_out ************************ */
-/* This logic checks the reserves_out table */
-
-/**
- * Analyze reserves for being well-formed.
- *
- * @param cls NULL
- * @return transaction status code
- */
-static enum GNUNET_DB_QueryStatus
-analyze_reserves_out (void *cls)
-{
-#if 0
- // FIXME: start_off != rowid!
- hh = wp->get_history (wp->cls,
- TALER_BANK_DIRECTION_CREDIT,
- &start_off,
- sizeof (start_off),
- INT64_MAX,
- &history_cb,
- NULL);
-#endif
- /* FIXME: #4958 */
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-}
-
-
/* *************************** General transaction logic ****************** */
/**
- * Type of an analysis function. Each analysis function runs in
- * its own transaction scope and must thus be internally consistent.
- *
- * @param cls closure
- * @return transaction status code
- */
-typedef enum GNUNET_DB_QueryStatus
-(*Analysis)(void *cls);
-
-
-/**
- * Perform the given @a analysis incrementally, checkpointing our
- * progress in the auditor DB.
+ * Commit the transaction, checkpointing our progress in the auditor
+ * DB.
*
- * @param analysis analysis to run
- * @param analysis_cls closure for @a analysis
+ * @param qs transaction status so far
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
-incremental_processing (Analysis analysis,
- void *analysis_cls)
+commit (enum GNUNET_DB_QueryStatus qs)
{
- enum GNUNET_DB_QueryStatus qs;
- enum GNUNET_DB_QueryStatus qsx;
- void *in_wire_off;
- void *out_wire_off;
- size_t wire_off_size;
-
- qsx = adb->get_wire_auditor_progress (adb->cls,
- asession,
- &master_pub,
- &pp,
- &in_wire_off,
- &out_wire_off,
- &wire_off_size);
- if (0 > qsx)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
- return qsx;
- }
- if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
- _("First analysis using this auditor, starting audit from scratch\n"));
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Resuming audit at %llu/%llu\n"),
- (unsigned long long) pp.last_reserve_in_serial_id,
- (unsigned long long) pp.last_reserve_out_serial_id);
- }
- qs = analysis (analysis_cls);
- // FIXME: wire plugin does NOT support synchronous activity!
if (0 > qs)
{
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -248,7 +216,11 @@ incremental_processing (Analysis analysis,
"Serialization issue, not recording progress\n");
else
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Hard database error, not recording progress\n");
+ "Hard error, not recording progress\n");
+ adb->rollback (adb->cls,
+ asession);
+ edb->rollback (edb->cls,
+ esession);
return qs;
}
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx)
@@ -279,43 +251,7 @@ incremental_processing (Analysis analysis,
_("Concluded audit step at %llu/%llu\n"),
(unsigned long long) pp.last_reserve_in_serial_id,
(unsigned long long) pp.last_reserve_out_serial_id);
- return qs;
-}
-
-
-/**
- * Perform the given @a analysis within a transaction scope.
- * Commit on success.
- *
- * @param analysis analysis to run
- * @param analysis_cls closure for @a analysis
- * @return #GNUNET_OK if @a analysis succeessfully committed,
- * #GNUNET_NO if we had an error on commit (retry may help)
- * #GNUNET_SYSERR on hard errors
- */
-static int
-transact (Analysis analysis,
- void *analysis_cls)
-{
- int ret;
- enum GNUNET_DB_QueryStatus qs;
- ret = adb->start (adb->cls,
- asession);
- if (GNUNET_OK != ret)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- ret = edb->start (edb->cls,
- esession);
- if (GNUNET_OK != ret)
- {
- GNUNET_break (0);
- return GNUNET_SYSERR;
- }
- qs = incremental_processing (analysis,
- analysis_cls);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
qs = edb->commit (edb->cls,
@@ -353,46 +289,45 @@ transact (Analysis analysis,
}
+/* ***************************** Analyze reserves_in ************************ */
+
+
/**
- * Initialize DB sessions and run the analysis.
+ * Callbacks of this type are used to serve the result of asking
+ * the bank for the transaction history.
+ *
+ * @param cls closure
+ * @param dir direction of the transfer
+ * @param row_off identification of the position at which we are querying
+ * @param row_off_size number of bytes in @a row_off
+ * @param details details about the wire transfer
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
*/
-static void
-setup_sessions_and_run ()
+static int
+history_credit_cb (void *cls,
+ enum TALER_BANK_Direction dir,
+ const void *row_off,
+ size_t row_off_size,
+ const struct TALER_WIRE_TransferDetails *details)
{
- esession = edb->get_session (edb->cls);
- if (NULL == esession)
+ if (NULL == details)
{
- fprintf (stderr,
- "Failed to initialize exchange session.\n");
- global_ret = 1;
- return;
- }
- asession = adb->get_session (adb->cls);
- if (NULL == asession)
- {
- fprintf (stderr,
- "Failed to initialize auditor session.\n");
- global_ret = 1;
- return;
- }
- wp = TALER_WIRE_plugin_load (cfg,
- wire_plugin);
- if (NULL == wp)
- {
- fprintf (stderr,
- "Failed to load wire plugin `%s'\n",
- wire_plugin);
- global_ret = 1;
- return;
+ /* end of operation */
+ hh = NULL;
+ /* TODO: also check DEBITs! */
+ commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT);
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_SYSERR;
}
- // FIXME: wire plugin does NOT support synchronous activity!
- transact (&analyze_reserves_in,
- NULL);
- transact (&analyze_reserves_out,
- NULL);
+ /* TODO: implement actual checks! */
+ return GNUNET_OK;
}
+
+/* ***************************** Setup logic ************************ */
+
+
/**
* Main function that will be run.
*
@@ -407,6 +342,8 @@ run (void *cls,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
+ int ret;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Launching auditor\n");
cfg = c;
@@ -458,18 +395,100 @@ run (void *cls,
GNUNET_break (GNUNET_OK ==
adb->create_tables (adb->cls));
}
-
+ GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
+ NULL);
+ esession = edb->get_session (edb->cls);
+ if (NULL == esession)
+ {
+ fprintf (stderr,
+ "Failed to initialize exchange session.\n");
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ asession = adb->get_session (adb->cls);
+ if (NULL == asession)
+ {
+ fprintf (stderr,
+ "Failed to initialize auditor session.\n");
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ wp = TALER_WIRE_plugin_load (cfg,
+ wire_plugin);
+ if (NULL == wp)
+ {
+ fprintf (stderr,
+ "Failed to load wire plugin `%s'\n",
+ wire_plugin);
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting audit\n");
- setup_sessions_and_run ();
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Audit complete\n");
- if (NULL != wp)
- TALER_WIRE_plugin_unload (wp);
- if (NULL != adb)
- TALER_AUDITORDB_plugin_unload (adb);
- if (NULL != edb)
- TALER_EXCHANGEDB_plugin_unload (edb);
+ ret = adb->start (adb->cls,
+ asession);
+ if (GNUNET_OK != ret)
+ {
+ GNUNET_break (0);
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ ret = edb->start (edb->cls,
+ esession);
+ if (GNUNET_OK != ret)
+ {
+ GNUNET_break (0);
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ qsx = adb->get_wire_auditor_progress (adb->cls,
+ asession,
+ &master_pub,
+ &pp,
+ &in_wire_off,
+ &out_wire_off,
+ &wire_off_size);
+ if (0 > qsx)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+ _("First analysis using this auditor, starting audit from scratch\n"));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Resuming audit at %llu/%llu\n"),
+ (unsigned long long) pp.last_reserve_in_serial_id,
+ (unsigned long long) pp.last_reserve_out_serial_id);
+ }
+
+ hh = wp->get_history (wp->cls,
+ TALER_BANK_DIRECTION_CREDIT,
+ in_wire_off,
+ wire_off_size,
+ INT64_MAX,
+ &history_credit_cb,
+ NULL);
+ if (NULL == hh)
+ {
+ fprintf (stderr,
+ "Failed to obtain bank transaction history\n");
+ commit (GNUNET_DB_STATUS_HARD_ERROR);
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
}