diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-09-30 21:28:17 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-09-30 21:28:17 +0200 |
commit | 85a2d3dc0e6938ca751bd72cf556173645e09e78 (patch) | |
tree | a608b5920d66b6728323b91ae9cb5d842c41accb | |
parent | 6a4f6b183639b543a1406ebdb8a1111aee42fc72 (diff) |
modify wire auditor to deal with asynchrony of WIRE plugin API
-rw-r--r-- | src/auditor/taler-wire-auditor.c | 363 |
1 files changed, 191 insertions, 172 deletions
diff --git a/src/auditor/taler-wire-auditor.c b/src/auditor/taler-wire-auditor.c index 19a48bc60..c1323ea9d 100644 --- a/src/auditor/taler-wire-auditor.c +++ b/src/auditor/taler-wire-auditor.c @@ -88,10 +88,67 @@ static struct TALER_MasterPublicKeyP master_pub; static struct TALER_WIRE_Plugin *wp; /** + * Active wire request for the transaction history. + */ +static struct TALER_WIRE_HistoryHandle *hh; + +/** + * Query status for the incremental processing status in the auditordb. + */ +static enum GNUNET_DB_QueryStatus qsx; + +/** * Last reserve_in / reserve_out serial IDs seen. */ static struct TALER_AUDITORDB_WireProgressPoint pp; +/** + * Where we are in the inbound (CREDIT) transaction history. + */ +static void *in_wire_off; + +/** + * Where we are in the inbound (DEBIT) transaction history. + */ +static void *out_wire_off; + +/** + * Number of bytes in #in_wire_off and #out_wire_off. + */ +static size_t wire_off_size; + + +/* ***************************** Shutdown **************************** */ + +/** + * Task run on shutdown. + */ +static void +do_shutdown () +{ + if (NULL != hh) + { + wp->get_history_cancel (wp->cls, + hh); + hh = NULL; + } + if (NULL != wp) + { + TALER_WIRE_plugin_unload (wp); + wp = NULL; + } + if (NULL != adb) + { + TALER_AUDITORDB_plugin_unload (adb); + adb = NULL; + } + if (NULL != edb) + { + TALER_EXCHANGEDB_plugin_unload (edb); + edb = NULL; + } +} + /* ***************************** Report logic **************************** */ @@ -140,107 +197,18 @@ report_row_minor_inconsistency (const char *table, #endif -/* ***************************** Analyze reserves_in ************************ */ -/* This logic checks the reserves_in table */ - -/** - * Analyze reserves for being well-formed. - * - * @param cls NULL - * @return transaction status code - */ -static enum GNUNET_DB_QueryStatus -analyze_reserves_in (void *cls) -{ - /* FIXME: #4958 */ - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -} - - -/* ***************************** Analyze reserves_out ************************ */ -/* This logic checks the reserves_out table */ - -/** - * Analyze reserves for being well-formed. - * - * @param cls NULL - * @return transaction status code - */ -static enum GNUNET_DB_QueryStatus -analyze_reserves_out (void *cls) -{ -#if 0 - // FIXME: start_off != rowid! - hh = wp->get_history (wp->cls, - TALER_BANK_DIRECTION_CREDIT, - &start_off, - sizeof (start_off), - INT64_MAX, - &history_cb, - NULL); -#endif - /* FIXME: #4958 */ - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -} - - /* *************************** General transaction logic ****************** */ /** - * Type of an analysis function. Each analysis function runs in - * its own transaction scope and must thus be internally consistent. - * - * @param cls closure - * @return transaction status code - */ -typedef enum GNUNET_DB_QueryStatus -(*Analysis)(void *cls); - - -/** - * Perform the given @a analysis incrementally, checkpointing our - * progress in the auditor DB. + * Commit the transaction, checkpointing our progress in the auditor + * DB. * - * @param analysis analysis to run - * @param analysis_cls closure for @a analysis + * @param qs transaction status so far * @return transaction status code */ static enum GNUNET_DB_QueryStatus -incremental_processing (Analysis analysis, - void *analysis_cls) +commit (enum GNUNET_DB_QueryStatus qs) { - enum GNUNET_DB_QueryStatus qs; - enum GNUNET_DB_QueryStatus qsx; - void *in_wire_off; - void *out_wire_off; - size_t wire_off_size; - - qsx = adb->get_wire_auditor_progress (adb->cls, - asession, - &master_pub, - &pp, - &in_wire_off, - &out_wire_off, - &wire_off_size); - if (0 > qsx) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx); - return qsx; - } - if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx) - { - GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, - _("First analysis using this auditor, starting audit from scratch\n")); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Resuming audit at %llu/%llu\n"), - (unsigned long long) pp.last_reserve_in_serial_id, - (unsigned long long) pp.last_reserve_out_serial_id); - } - qs = analysis (analysis_cls); - // FIXME: wire plugin does NOT support synchronous activity! if (0 > qs) { if (GNUNET_DB_STATUS_SOFT_ERROR == qs) @@ -248,7 +216,11 @@ incremental_processing (Analysis analysis, "Serialization issue, not recording progress\n"); else GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Hard database error, not recording progress\n"); + "Hard error, not recording progress\n"); + adb->rollback (adb->cls, + asession); + edb->rollback (edb->cls, + esession); return qs; } if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx) @@ -279,43 +251,7 @@ incremental_processing (Analysis analysis, _("Concluded audit step at %llu/%llu\n"), (unsigned long long) pp.last_reserve_in_serial_id, (unsigned long long) pp.last_reserve_out_serial_id); - return qs; -} - - -/** - * Perform the given @a analysis within a transaction scope. - * Commit on success. - * - * @param analysis analysis to run - * @param analysis_cls closure for @a analysis - * @return #GNUNET_OK if @a analysis succeessfully committed, - * #GNUNET_NO if we had an error on commit (retry may help) - * #GNUNET_SYSERR on hard errors - */ -static int -transact (Analysis analysis, - void *analysis_cls) -{ - int ret; - enum GNUNET_DB_QueryStatus qs; - ret = adb->start (adb->cls, - asession); - if (GNUNET_OK != ret) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - ret = edb->start (edb->cls, - esession); - if (GNUNET_OK != ret) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - qs = incremental_processing (analysis, - analysis_cls); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { qs = edb->commit (edb->cls, @@ -353,46 +289,45 @@ transact (Analysis analysis, } +/* ***************************** Analyze reserves_in ************************ */ + + /** - * Initialize DB sessions and run the analysis. + * Callbacks of this type are used to serve the result of asking + * the bank for the transaction history. + * + * @param cls closure + * @param dir direction of the transfer + * @param row_off identification of the position at which we are querying + * @param row_off_size number of bytes in @a row_off + * @param details details about the wire transfer + * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration */ -static void -setup_sessions_and_run () +static int +history_credit_cb (void *cls, + enum TALER_BANK_Direction dir, + const void *row_off, + size_t row_off_size, + const struct TALER_WIRE_TransferDetails *details) { - esession = edb->get_session (edb->cls); - if (NULL == esession) + if (NULL == details) { - fprintf (stderr, - "Failed to initialize exchange session.\n"); - global_ret = 1; - return; - } - asession = adb->get_session (adb->cls); - if (NULL == asession) - { - fprintf (stderr, - "Failed to initialize auditor session.\n"); - global_ret = 1; - return; - } - wp = TALER_WIRE_plugin_load (cfg, - wire_plugin); - if (NULL == wp) - { - fprintf (stderr, - "Failed to load wire plugin `%s'\n", - wire_plugin); - global_ret = 1; - return; + /* end of operation */ + hh = NULL; + /* TODO: also check DEBITs! */ + commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT); + GNUNET_SCHEDULER_shutdown (); + return GNUNET_SYSERR; } - // FIXME: wire plugin does NOT support synchronous activity! - transact (&analyze_reserves_in, - NULL); - transact (&analyze_reserves_out, - NULL); + /* TODO: implement actual checks! */ + return GNUNET_OK; } + +/* ***************************** Setup logic ************************ */ + + /** * Main function that will be run. * @@ -407,6 +342,8 @@ run (void *cls, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { + int ret; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Launching auditor\n"); cfg = c; @@ -458,18 +395,100 @@ run (void *cls, GNUNET_break (GNUNET_OK == adb->create_tables (adb->cls)); } - + GNUNET_SCHEDULER_add_shutdown (&do_shutdown, + NULL); + esession = edb->get_session (edb->cls); + if (NULL == esession) + { + fprintf (stderr, + "Failed to initialize exchange session.\n"); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } + asession = adb->get_session (adb->cls); + if (NULL == asession) + { + fprintf (stderr, + "Failed to initialize auditor session.\n"); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } + wp = TALER_WIRE_plugin_load (cfg, + wire_plugin); + if (NULL == wp) + { + fprintf (stderr, + "Failed to load wire plugin `%s'\n", + wire_plugin); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting audit\n"); - setup_sessions_and_run (); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Audit complete\n"); - if (NULL != wp) - TALER_WIRE_plugin_unload (wp); - if (NULL != adb) - TALER_AUDITORDB_plugin_unload (adb); - if (NULL != edb) - TALER_EXCHANGEDB_plugin_unload (edb); + ret = adb->start (adb->cls, + asession); + if (GNUNET_OK != ret) + { + GNUNET_break (0); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } + ret = edb->start (edb->cls, + esession); + if (GNUNET_OK != ret) + { + GNUNET_break (0); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } + qsx = adb->get_wire_auditor_progress (adb->cls, + asession, + &master_pub, + &pp, + &in_wire_off, + &out_wire_off, + &wire_off_size); + if (0 > qsx) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx) + { + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + _("First analysis using this auditor, starting audit from scratch\n")); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Resuming audit at %llu/%llu\n"), + (unsigned long long) pp.last_reserve_in_serial_id, + (unsigned long long) pp.last_reserve_out_serial_id); + } + + hh = wp->get_history (wp->cls, + TALER_BANK_DIRECTION_CREDIT, + in_wire_off, + wire_off_size, + INT64_MAX, + &history_credit_cb, + NULL); + if (NULL == hh) + { + fprintf (stderr, + "Failed to obtain bank transaction history\n"); + commit (GNUNET_DB_STATUS_HARD_ERROR); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } } |