diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-08-24 22:49:35 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-08-24 22:49:35 +0200 |
commit | 71ae493c7a7601b418de4f5c83159d46abf677b8 (patch) | |
tree | 30f4438e383326d36178ed62124ca2dac1e483fd /src/auditor/taler-wire-auditor.c | |
parent | b5dd2bcdbb25cd6af1897b652c05d782a64dac5f (diff) |
refactor wire auditor to properly handle multiple accounts
Diffstat (limited to 'src/auditor/taler-wire-auditor.c')
-rw-r--r-- | src/auditor/taler-wire-auditor.c | 735 |
1 files changed, 434 insertions, 301 deletions
diff --git a/src/auditor/taler-wire-auditor.c b/src/auditor/taler-wire-auditor.c index 3b84a6412..16104a64b 100644 --- a/src/auditor/taler-wire-auditor.c +++ b/src/auditor/taler-wire-auditor.c @@ -67,6 +67,31 @@ struct WireAccount char *section_name; /** + * Active wire request for the transaction history. + */ + struct TALER_WIRE_HistoryHandle *hh; + + /** + * Progress point for this account. + */ + struct TALER_AUDITORDB_WireAccountProgressPoint pp; + + /** + * Where we are in the inbound (CREDIT) transaction history. + */ + void *in_wire_off; + + /** + * Where we are in the inbound (DEBIT) transaction history. + */ + void *out_wire_off; + + /** + * Number of bytes in #in_wire_off and #out_wire_off. + */ + size_t wire_off_size; + + /** * We should check for inbound transactions to this account. */ int watch_credit; @@ -148,22 +173,6 @@ static struct WireAccount *wa_head; static struct WireAccount *wa_tail; /** - * Handle to the wire plugin for wire operations. - */ -static struct TALER_WIRE_Plugin *wp; - -/** - * Name of the section that configures the account - * we are currently processing (matches #wp). - */ -static char *wp_section_name; - -/** - * Active wire request for the transaction history. - */ -static struct TALER_WIRE_HistoryHandle *hh; - -/** * Query status for the incremental processing status in the auditordb. */ static enum GNUNET_DB_QueryStatus qsx; @@ -174,21 +183,6 @@ static enum GNUNET_DB_QueryStatus qsx; static struct TALER_AUDITORDB_WireProgressPoint pp; /** - * Where we are in the inbound (CREDIT) transaction history. - */ -static void *in_wire_off; - -/** - * Where we are in the inbound (DEBIT) transaction history. - */ -static void *out_wire_off; - -/** - * Number of bytes in #in_wire_off and #out_wire_off. - */ -static size_t wire_off_size; - -/** * Array of reports about row inconsitencies in wire_out table. */ static json_t *report_wire_out_inconsistencies; @@ -226,6 +220,11 @@ static json_t *report_row_minor_inconsistencies; static json_t *report_lags; /** + * Amount that is considered "tiny" + */ +static struct TALER_Amount tiny_amount; + +/** * Total amount that was transferred too much from the exchange. */ static struct TALER_Amount total_bad_amount_out_plus; @@ -434,12 +433,6 @@ do_shutdown (void *cls) report_lags = NULL; report_wire_format_inconsistencies = NULL; } - if (NULL != hh) - { - wp->get_history_cancel (wp->cls, - hh); - hh = NULL; - } if (NULL != in_map) { GNUNET_CONTAINER_multihashmap_iterate (in_map, @@ -456,23 +449,23 @@ do_shutdown (void *cls) GNUNET_CONTAINER_multihashmap_destroy (out_map); out_map = NULL; } - if (NULL != wp) - { - TALER_WIRE_plugin_unload (wp); - wp = NULL; - } - if (NULL != wp_section_name) - { - GNUNET_free (wp_section_name); - wp_section_name = NULL; - } while (NULL != (wa = wa_head)) { + if (NULL != wa->hh) + { + struct TALER_WIRE_Plugin *wp = wa->wire_plugin; + + wp->get_history_cancel (wp->cls, + wa->hh); + wa->hh = NULL; + } GNUNET_CONTAINER_DLL_remove (wa_head, wa_tail, wa); TALER_WIRE_plugin_unload (wa->wire_plugin); GNUNET_free (wa->section_name); + GNUNET_free_non_null (wa->in_wire_off); + GNUNET_free_non_null (wa->out_wire_off); GNUNET_free (wa); } if (NULL != adb) @@ -534,25 +527,46 @@ commit (enum GNUNET_DB_QueryStatus qs) esession); return qs; } + for (struct WireAccount *wa = wa_head; + NULL != wa; + wa = wa->next) + { + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx) + qs = adb->update_wire_auditor_account_progress (adb->cls, + asession, + &master_pub, + wa->section_name, + &wa->pp, + wa->in_wire_off, + wa->out_wire_off, + wa->wire_off_size); + else + qs = adb->insert_wire_auditor_account_progress (adb->cls, + asession, + &master_pub, + wa->section_name, + &wa->pp, + wa->in_wire_off, + wa->out_wire_off, + wa->wire_off_size); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Failed to update auditor DB, not recording progress\n"); + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; + } + } if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx) qs = adb->update_wire_auditor_progress (adb->cls, asession, &master_pub, - wp_section_name, - &pp, - in_wire_off, - out_wire_off, - wire_off_size); + &pp); else qs = adb->insert_wire_auditor_progress (adb->cls, asession, &master_pub, - wp_section_name, - &pp, - in_wire_off, - out_wire_off, - wire_off_size); - + &pp); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -561,9 +575,8 @@ commit (enum GNUNET_DB_QueryStatus qs) return qs; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Concluded audit step at %llu/%llu\n"), - (unsigned long long) pp.last_reserve_in_serial_id, - (unsigned long long) pp.last_wire_out_serial_id); + "Concluded audit step at %s\n", + GNUNET_STRINGS_absolute_time_to_string (pp.last_timestamp)); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { @@ -602,16 +615,111 @@ commit (enum GNUNET_DB_QueryStatus qs) } +/* ***************************** Analyze required transfers ************************ */ + +/** + * Function called on deposits that are past their due date + * and have not yet seen a wire transfer. + * + * @param cls closure + * @param rowid deposit table row of the coin's deposit + * @param coin_pub public key of the coin + * @param amount value of the deposit, including fee + * @param wire where should the funds be wired + * @param deadline what was the requested wire transfer deadline + * @param tiny did the exchange defer this transfer because it is too small? + * @param done did the exchange claim that it made a transfer? + */ +static void +wire_missing_cb (void *cls, + uint64_t rowid, + const struct TALER_CoinSpendPublicKeyP *coin_pub, + const struct TALER_Amount *amount, + const json_t *wire, + struct GNUNET_TIME_Absolute deadline, + /* bool? */ int tiny, + /* bool? */ int done) +{ + GNUNET_break (GNUNET_OK == + TALER_amount_add (&total_amount_lag, + &total_amount_lag, + amount)); + if ( (GNUNET_YES == tiny) && + (0 > TALER_amount_cmp (amount, + &tiny_amount)) ) + return; /* acceptable, amount was tiny */ + report (report_lags, + json_pack ("{s:I, s:o, s:s, s:s, s:o, s:O}", + "row", (json_int_t) rowid, + "amount", TALER_JSON_from_amount (amount), + "deadline", GNUNET_STRINGS_absolute_time_to_string (deadline), + "claimed_done", (done) ? "yes" : "no", + "coin_pub", GNUNET_JSON_from_data_auto (coin_pub), + "account", wire)); + +} + + +/** + * Checks that all wire transfers that should have happened + * (based on deposits) have indeed happened. + * + * FIXME: this check _might_ rather belong with the + * taler-auditor logic. + */ +static void +check_for_required_transfers () +{ + struct GNUNET_TIME_Absolute next_timestamp; + enum GNUNET_DB_QueryStatus qs; + + next_timestamp = GNUNET_TIME_absolute_get (); + /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing + without immediately raising undue concern */ + next_timestamp = GNUNET_TIME_absolute_subtract (next_timestamp, + GRACE_PERIOD); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Analyzing exchange's unfinished deposits\n"); + qs = edb->select_deposits_missing_wire (edb->cls, + esession, + pp.last_timestamp, + next_timestamp, + &wire_missing_cb, + &next_timestamp); + if (0 > qs) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } + pp.last_timestamp = next_timestamp; + /* conclude with success */ + commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT); +} + + /* ***************************** Analyze reserves_out ************************ */ +/** + * Clean up after processing wire out data. + */ +static void +conclude_wire_out () +{ + GNUNET_CONTAINER_multihashmap_destroy (out_map); + out_map = NULL; + check_for_required_transfers (); +} + /** * Function called with details about outgoing wire transfers * as claimed by the exchange DB. * - * @param cls NULL + * @param cls a `struct WireAccount` * @param rowid unique serial ID for the refresh session in our DB - * @param date timestamp of the wire transfer (roughly) + * @param date timestamp of the transfer (roughly) * @param wtid wire transfer subject * @param wire wire transfer details of the receiver * @param amount amount that was wired @@ -625,6 +733,7 @@ wire_out_cb (void *cls, const json_t *wire, const struct TALER_Amount *amount) { + struct WireAccount *wa = cls; struct GNUNET_HashCode key; struct ReserveOutInfo *roi; @@ -757,6 +866,7 @@ cleanup: free_roi (NULL, &key, roi)); + wa->pp.last_wire_out_serial_id = rowid + 1; return GNUNET_OK; } @@ -765,7 +875,7 @@ cleanup: * Complain that we failed to match an entry from #out_map. This * means a wire transfer was made without proper justification. * - * @param cls NULL + * @param cls a `struct WireAccount` * @param key unused key * @param value the `struct ReserveOutInfo` to report * @return #GNUNET_OK @@ -775,8 +885,10 @@ complain_out_not_found (void *cls, const struct GNUNET_HashCode *key, void *value) { + struct WireAccount *wa = cls; struct ReserveOutInfo *roi = value; + (void) wa; // FIXME: log which account is affected... report (report_wire_out_inconsistencies, json_pack ("{s:I, s:o, s:o, s:o, s:s, s:s}", "row", (json_int_t) 0, @@ -796,84 +908,36 @@ complain_out_not_found (void *cls, /** - * Function called on deposits that are past their due date - * and have not yet seen a wire transfer. - * - * @param cls closure - * @param rowid deposit table row of the coin's deposit - * @param coin_pub public key of the coin - * @param amount value of the deposit, including fee - * @param wire where should the funds be wired - * @param deadline what was the requested wire transfer deadline - * @param tiny did the exchange defer this transfer because it is too small? - * @param done did the exchange claim that it made a transfer? - */ -static void -wire_missing_cb (void *cls, - uint64_t rowid, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount, - const json_t *wire, - struct GNUNET_TIME_Absolute deadline, - /* bool? */ int tiny, - /* bool? */ int done) -{ - GNUNET_break (GNUNET_OK == - TALER_amount_add (&total_amount_lag, - &total_amount_lag, - amount)); - if (GNUNET_YES == tiny) - { - struct TALER_Amount rounded; - - rounded = *amount; - GNUNET_break (GNUNET_SYSERR != - wp->amount_round (wp->cls, - &rounded)); - if (0 == TALER_amount_cmp (&rounded, - &zero)) - return; /* acceptable, amount was tiny */ - } - report (report_lags, - json_pack ("{s:I, s:o, s:s, s:s, s:o, s:O}", - "row", (json_int_t) rowid, - "amount", TALER_JSON_from_amount (amount), - "deadline", GNUNET_STRINGS_absolute_time_to_string (deadline), - "claimed_done", (done) ? "yes" : "no", - "coin_pub", GNUNET_JSON_from_data_auto (coin_pub), - "account", wire)); - -} - - -/** - * Start processing the next wire account. - * Shuts down if we are done. + * Main function for processing 'reserves_out' data. We start by going over + * the DEBIT transactions this time, and then verify that all of them are + * justified by 'reserves_out'. * - * @param cls NULL + * @param cls `struct WireAccount` with a wire account list to process */ static void -process_next_account (void *cls); +process_debits (void *cls); /** * Go over the "wire_out" table of the exchange and * verify that all wire outs are in that table. + * + * @param wa wire account we are processing */ static void -check_exchange_wire_out () +check_exchange_wire_out (struct WireAccount *wa) { enum GNUNET_DB_QueryStatus qs; - struct GNUNET_TIME_Absolute next_timestamp; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing exchange's wire OUT table\n"); + "Analyzing exchange's wire OUT table for account `%s'\n", + wa->section_name); qs = edb->select_wire_out_above_serial_id_by_account (edb->cls, esession, - wp_section_name, - pp.last_wire_out_serial_id, + wa->section_name, + wa->pp.last_wire_out_serial_id, &wire_out_cb, - NULL); + wa); if (0 > qs) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); @@ -882,41 +946,13 @@ check_exchange_wire_out () return; } GNUNET_CONTAINER_multihashmap_iterate (out_map, - &complain_out_not_found, - NULL); + &complain_out_not_found, + wa); /* clean up */ GNUNET_CONTAINER_multihashmap_iterate (out_map, - &free_roi, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (out_map); - out_map = NULL; - - /* now check that all wire transfers that should have happened, - have indeed happened */ - next_timestamp = GNUNET_TIME_absolute_get (); - /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing - without immediately raising undue concern */ - next_timestamp = GNUNET_TIME_absolute_subtract (next_timestamp, - GRACE_PERIOD); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing exchange's unfinished deposits\n"); - qs = edb->select_deposits_missing_wire (edb->cls, - esession, - pp.last_timestamp, - next_timestamp, - &wire_missing_cb, - &next_timestamp); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - global_ret = 1; - GNUNET_SCHEDULER_shutdown (); - return; - } - pp.last_timestamp = next_timestamp; - - /* continue with next account: */ - process_next_account (NULL); + &free_roi, + NULL); + process_debits (wa->next); } @@ -925,7 +961,7 @@ check_exchange_wire_out () * are credited to the exchange's account (incoming * transactions). * - * @param cls closure + * @param cls `struct WireAccount` with current wire account to process * @param ec error code in case something went wrong * @param dir direction of the transfer * @param row_off identification of the position at which we are querying @@ -941,10 +977,10 @@ history_debit_cb (void *cls, size_t row_off_size, const struct TALER_WIRE_TransferDetails *details) { + struct WireAccount *wa = cls; struct ReserveOutInfo *roi; struct GNUNET_HashCode rowh; - if (TALER_BANK_DIRECTION_NONE == dir) { if (TALER_EC_NONE != ec) @@ -954,10 +990,8 @@ history_debit_cb (void *cls, "Error fetching history: %u!\n", (unsigned int) ec); } - /* end of iteration, now check wire_out to see - if it matches #out_map */ - hh = NULL; - check_exchange_wire_out (); + wa->hh = NULL; + check_exchange_wire_out (wa); return GNUNET_OK; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -987,19 +1021,38 @@ history_debit_cb (void *cls, GNUNET_free (diagnostic); return GNUNET_OK; } + + /* Update offset */ + if (NULL == wa->out_wire_off) + { + wa->wire_off_size = row_off_size; + wa->out_wire_off = GNUNET_malloc (row_off_size); + } + if (wa->wire_off_size != row_off_size) + { + GNUNET_break (0); + commit (GNUNET_DB_STATUS_HARD_ERROR); + wa->hh = NULL; + GNUNET_SCHEDULER_shutdown (); + return GNUNET_SYSERR; + } + memcpy (wa->out_wire_off, + row_off, + row_off_size); + roi = GNUNET_new (struct ReserveOutInfo); GNUNET_CRYPTO_hash (&details->wtid, - sizeof (details->wtid), - &roi->subject_hash); + sizeof (details->wtid), + &roi->subject_hash); roi->details.amount = details->amount; roi->details.execution_date = details->execution_date; roi->details.wtid = details->wtid; roi->details.account_url = GNUNET_strdup (details->account_url); if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_put (out_map, - &roi->subject_hash, - roi, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) + &roi->subject_hash, + roi, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) { char *diagnostic; @@ -1026,31 +1079,43 @@ history_debit_cb (void *cls, /** - * Main function for processing 'reserves_out' data. - * We start by going over the DEBIT transactions this - * time, and then verify that all of them are justified - * by 'reserves_out'. + * Main function for processing 'reserves_out' data. We start by going over + * the DEBIT transactions this time, and then verify that all of them are + * justified by 'reserves_out'. + * + * @param cls `struct WireAccount` with a wire account list to process */ static void -process_debits () +process_debits (void *cls) { + struct WireAccount *wa = cls; + struct TALER_WIRE_Plugin *wp; + + if (NULL == wa) + { + /* end of iteration, now check wire_out to see + if it matches #out_map */ + conclude_wire_out (); + return; + } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking bank DEBIT records\n"); - GNUNET_assert (NULL == hh); - out_map = GNUNET_CONTAINER_multihashmap_create (1024, - GNUNET_YES); - hh = wp->get_history (wp->cls, - wp_section_name, - TALER_BANK_DIRECTION_DEBIT, - out_wire_off, - wire_off_size, - INT64_MAX, - &history_debit_cb, - NULL); - if (NULL == hh) + "Checking bank DEBIT records of account `%s'\n", + wa->section_name); + GNUNET_assert (NULL == wa->hh); + wp = wa->wire_plugin; + wa->hh = wp->get_history (wp->cls, + wa->section_name, + TALER_BANK_DIRECTION_DEBIT, + wa->out_wire_off, + wa->wire_off_size, + INT64_MAX, + &history_debit_cb, + wa); + if (NULL == wa->hh) { fprintf (stderr, - "Failed to obtain bank transaction history\n"); + "Failed to obtain bank transaction history for `%s'\n", + wa->section_name); commit (GNUNET_DB_STATUS_HARD_ERROR); global_ret = 1; GNUNET_SCHEDULER_shutdown (); @@ -1059,14 +1124,40 @@ process_debits () } +/** + * Begin analyzing wire_out. + */ +static void +begin_debit_audit () +{ + out_map = GNUNET_CONTAINER_multihashmap_create (1024, + GNUNET_YES); + process_debits (wa_head); +} + + /* ***************************** Analyze reserves_in ************************ */ +/** + * Conclude the credit history check by logging entries that + * were not found and freeing resources. Then move on to + * processing debits. + */ +static void +conclude_credit_history () +{ + GNUNET_CONTAINER_multihashmap_destroy (in_map); + in_map = NULL; + /* credit done, now check debits */ + begin_debit_audit (); +} + /** * Function called with details about incoming wire transfers * as claimed by the exchange DB. * - * @param cls NULL + * @param cls a `struct WireAccount` we are processing * @param rowid unique serial ID for the refresh session in our DB * @param reserve_pub public key of the reserve (also the WTID) * @param credit amount that was received @@ -1086,6 +1177,7 @@ reserve_in_cb (void *cls, size_t wire_reference_size, struct GNUNET_TIME_Absolute execution_date) { + struct WireAccount *wa = cls; struct ReserveInInfo *rii; GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1125,7 +1217,7 @@ reserve_in_cb (void *cls, GNUNET_free (rii); return GNUNET_OK; } - pp.last_reserve_in_serial_id = rowid + 1; + wa->pp.last_reserve_in_serial_id = rowid + 1; return GNUNET_OK; } @@ -1133,7 +1225,7 @@ reserve_in_cb (void *cls, /** * Complain that we failed to match an entry from #in_map. * - * @param cls NULL + * @param cls a `struct WireAccount` * @param key unused key * @param value the `struct ReserveInInfo` to free * @return #GNUNET_OK @@ -1143,8 +1235,10 @@ complain_in_not_found (void *cls, const struct GNUNET_HashCode *key, void *value) { + struct WireAccount *wa = cls; struct ReserveInInfo *rii = value; + (void) wa; // FIXME: log which account is affected... report (report_reserve_in_inconsistencies, json_pack ("{s:I, s:o, s:o, s:o, s:s, s:s}", "row", (json_int_t) rii->rowid, @@ -1162,24 +1256,13 @@ complain_in_not_found (void *cls, /** - * Conclude the credit history check by logging entries that - * were not found and freeing resources. Then move on to - * processing debits. + * Start processing the next wire account. + * Shuts down if we are done. + * + * @param cls `struct WireAccount` with a wire account list to process */ static void -conclude_credit_history () -{ - GNUNET_CONTAINER_multihashmap_iterate (in_map, - &complain_in_not_found, - NULL); - /* clean up before 2nd phase */ - GNUNET_CONTAINER_multihashmap_iterate (in_map, - &free_rii, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (in_map); - in_map = NULL; - process_debits (); -} +process_credits (void *cls); /** @@ -1187,7 +1270,7 @@ conclude_credit_history () * are credited to the exchange's account (incoming * transactions). * - * @param cls closure + * @param cls `struct WireAccount` we are processing * @param ec error code in case something went wrong * @param dir direction of the transfer * @param row_off identification of the position at which we are querying @@ -1203,6 +1286,7 @@ history_credit_cb (void *cls, size_t row_off_size, const struct TALER_WIRE_TransferDetails *details) { + struct WireAccount *wa = cls; struct ReserveInInfo *rii; struct GNUNET_HashCode key; @@ -1216,8 +1300,18 @@ history_credit_cb (void *cls, (unsigned int) ec); } /* end of operation */ - hh = NULL; - conclude_credit_history (); + wa->hh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reconciling CREDIT processing of account `%s'\n", + wa->section_name); + GNUNET_CONTAINER_multihashmap_iterate (in_map, + &complain_in_not_found, + wa); + /* clean up before 2nd phase */ + GNUNET_CONTAINER_multihashmap_iterate (in_map, + &free_rii, + NULL); + process_credits (wa->next); return GNUNET_OK; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1235,29 +1329,29 @@ history_credit_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Failed to find wire transfer at `%s' in exchange database. Audit ends at this point in time.\n", GNUNET_STRINGS_absolute_time_to_string (details->execution_date)); - hh = NULL; - conclude_credit_history (); + wa->hh = NULL; + process_credits (wa->next); return GNUNET_SYSERR; /* not an error, just end of processing */ } /* Update offset */ - if (NULL == in_wire_off) + if (NULL == wa->in_wire_off) { - wire_off_size = row_off_size; - in_wire_off = GNUNET_malloc (row_off_size); + wa->wire_off_size = row_off_size; + wa->in_wire_off = GNUNET_malloc (row_off_size); } - if (wire_off_size != row_off_size) + if (wa->wire_off_size != row_off_size) { GNUNET_break (0); commit (GNUNET_DB_STATUS_HARD_ERROR); GNUNET_SCHEDULER_shutdown (); - hh = NULL; return GNUNET_SYSERR; } - memcpy (in_wire_off, + memcpy (wa->in_wire_off, row_off, row_off_size); + /* compare records with expected data */ if (row_off_size != rii->row_off_size) { @@ -1384,42 +1478,83 @@ history_credit_cb (void *cls, * Start processing the next wire account. * Shuts down if we are done. * - * @param cls NULL + * @param cls `struct WireAccount` with a wire account list to process */ static void -process_next_account (void *cls) +process_credits (void *cls) { - struct WireAccount *wa; + struct WireAccount *wa = cls; + struct TALER_WIRE_Plugin *wp; enum GNUNET_DB_QueryStatus qs; - int ret; - - (void) cls; - // FIXME: this logic is broken at a high level, - // as it iterates over the exchange's incoming - // transactions once PER bank account, so for - // multiple bank accounts this cannot work! - if (NULL == (wa = wa_head)) + + if (NULL == wa) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Finished with all accounts, shutting down\n"); - if (NULL != wp) - commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT); + /* done with all accounts, conclude check */ + conclude_credit_history (); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Analyzing exchange's wire IN table for account `%s'\n", + wa->section_name); + qs = edb->select_reserves_in_above_serial_id_by_account (edb->cls, + esession, + wa->section_name, + wa->pp.last_reserve_in_serial_id, + &reserve_in_cb, + wa); + if (0 > qs) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + global_ret = 1; GNUNET_SCHEDULER_shutdown (); return; } - GNUNET_CONTAINER_DLL_remove (wa_head, - wa_tail, - wa); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Starting audit of account `%s'\n", + "Starting bank CREDIT history of account `%s'\n", wa->section_name); - /* setup globals */ - if (NULL != wp) - TALER_WIRE_plugin_unload (wp); wp = wa->wire_plugin; - GNUNET_free_non_null (wp_section_name); - wp_section_name = wa->section_name; - GNUNET_free (wa); + wa->hh = wp->get_history (wp->cls, + wa->section_name, + TALER_BANK_DIRECTION_CREDIT, + wa->in_wire_off, + wa->wire_off_size, + INT64_MAX, + &history_credit_cb, + wa); + if (NULL == wa->hh) + { + fprintf (stderr, + "Failed to obtain bank transaction history\n"); + commit (GNUNET_DB_STATUS_HARD_ERROR); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * Begin audit of CREDITs to the exchange. + */ +static void +begin_credit_audit () +{ + in_map = GNUNET_CONTAINER_multihashmap_create (1024, + GNUNET_YES); + /* now go over all bank accounts and check delta with in_map */ + process_credits (wa_head); +} + + +/** + * Start the database transactions and begin the audit. + */ +static void +begin_transaction () +{ + enum GNUNET_DB_QueryStatus qsx; + int ret; ret = adb->start (adb->cls, asession); @@ -1442,14 +1577,30 @@ process_next_account (void *cls) GNUNET_SCHEDULER_shutdown (); return; } + for (struct WireAccount *wa = wa_head; + NULL != wa; + wa = wa->next) + { + qsx = adb->get_wire_auditor_account_progress (adb->cls, + asession, + &master_pub, + wa->section_name, + &wa->pp, + &wa->in_wire_off, + &wa->out_wire_off, + &wa->wire_off_size); + if (0 > qsx) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx); + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); + return; + } + } qsx = adb->get_wire_auditor_progress (adb->cls, asession, &master_pub, - wp_section_name, - &pp, - &in_wire_off, - &out_wire_off, - &wire_off_size); + &pp); if (0 > qsx) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx); @@ -1465,54 +1616,10 @@ process_next_account (void *cls) else { GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Resuming audit at %llu/%llu\n"), - (unsigned long long) pp.last_reserve_in_serial_id, - (unsigned long long) pp.last_wire_out_serial_id); - } - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing exchange's wire IN table\n"); - in_map = GNUNET_CONTAINER_multihashmap_create (1024, - GNUNET_YES); - qs = edb->select_reserves_in_above_serial_id_by_account (edb->cls, - esession, - wp_section_name, - pp.last_reserve_in_serial_id, - &reserve_in_cb, - NULL); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - global_ret = 1; - GNUNET_SCHEDULER_shutdown (); - return; - } - if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, - "No new incoming transactions available, skipping CREDIT phase\n"); - process_debits (); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking bank CREDIT records\n"); - hh = wp->get_history (wp->cls, - wp_section_name, - TALER_BANK_DIRECTION_CREDIT, - in_wire_off, - wire_off_size, - INT64_MAX, - &history_credit_cb, - NULL); - if (NULL == hh) - { - fprintf (stderr, - "Failed to obtain bank transaction history\n"); - commit (GNUNET_DB_STATUS_HARD_ERROR); - global_ret = 1; - GNUNET_SCHEDULER_shutdown (); - return; + "Resuming audit at %s\n", + GNUNET_STRINGS_absolute_time_to_string (pp.last_timestamp)); } + begin_credit_audit (); } @@ -1568,10 +1675,36 @@ run (void *cls, const struct GNUNET_CONFIGURATION_Handle *c) { static const struct TALER_MasterPublicKeyP zeromp; + char *tinys; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Launching auditor\n"); cfg = c; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_string (cfg, + "auditor", + "TINY_AMOUNT", + &tinys)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "auditor", + "TINY_AMOUNT"); + global_ret = 1; + return; + } + if (GNUNET_OK != + TALER_string_to_amount (tinys, + &tiny_amount)) + { + GNUNET_free (tinys); + GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, + "auditor", + "TINY_AMOUNT", + "invalid amount"); + global_ret = 1; + return; + } + GNUNET_free (tinys); if (0 == GNUNET_memcmp (&zeromp, &master_pub)) { @@ -1716,7 +1849,7 @@ run (void *cls, TALER_EXCHANGEDB_find_accounts (cfg, &process_account_cb, NULL); - process_next_account (NULL); + begin_transaction (); } @@ -1755,12 +1888,12 @@ main (int argc, NULL)); if (GNUNET_OK != GNUNET_PROGRAM_run (argc, - argv, + argv, "taler-wire-auditor", - "Audit exchange database for consistency with the bank's wire transfers", - options, - &run, - NULL)) + "Audit exchange database for consistency with the bank's wire transfers", + options, + &run, + NULL)) return 1; return global_ret; } |