aboutsummaryrefslogtreecommitdiff
path: root/src/auditor/taler-wire-auditor.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2019-08-24 22:49:35 +0200
committerChristian Grothoff <christian@grothoff.org>2019-08-24 22:49:35 +0200
commit71ae493c7a7601b418de4f5c83159d46abf677b8 (patch)
tree30f4438e383326d36178ed62124ca2dac1e483fd /src/auditor/taler-wire-auditor.c
parentb5dd2bcdbb25cd6af1897b652c05d782a64dac5f (diff)
refactor wire auditor to properly handle multiple accounts
Diffstat (limited to 'src/auditor/taler-wire-auditor.c')
-rw-r--r--src/auditor/taler-wire-auditor.c735
1 files changed, 434 insertions, 301 deletions
diff --git a/src/auditor/taler-wire-auditor.c b/src/auditor/taler-wire-auditor.c
index 3b84a6412..16104a64b 100644
--- a/src/auditor/taler-wire-auditor.c
+++ b/src/auditor/taler-wire-auditor.c
@@ -67,6 +67,31 @@ struct WireAccount
char *section_name;
/**
+ * Active wire request for the transaction history.
+ */
+ struct TALER_WIRE_HistoryHandle *hh;
+
+ /**
+ * Progress point for this account.
+ */
+ struct TALER_AUDITORDB_WireAccountProgressPoint pp;
+
+ /**
+ * Where we are in the inbound (CREDIT) transaction history.
+ */
+ void *in_wire_off;
+
+ /**
+ * Where we are in the inbound (DEBIT) transaction history.
+ */
+ void *out_wire_off;
+
+ /**
+ * Number of bytes in #in_wire_off and #out_wire_off.
+ */
+ size_t wire_off_size;
+
+ /**
* We should check for inbound transactions to this account.
*/
int watch_credit;
@@ -148,22 +173,6 @@ static struct WireAccount *wa_head;
static struct WireAccount *wa_tail;
/**
- * Handle to the wire plugin for wire operations.
- */
-static struct TALER_WIRE_Plugin *wp;
-
-/**
- * Name of the section that configures the account
- * we are currently processing (matches #wp).
- */
-static char *wp_section_name;
-
-/**
- * Active wire request for the transaction history.
- */
-static struct TALER_WIRE_HistoryHandle *hh;
-
-/**
* Query status for the incremental processing status in the auditordb.
*/
static enum GNUNET_DB_QueryStatus qsx;
@@ -174,21 +183,6 @@ static enum GNUNET_DB_QueryStatus qsx;
static struct TALER_AUDITORDB_WireProgressPoint pp;
/**
- * Where we are in the inbound (CREDIT) transaction history.
- */
-static void *in_wire_off;
-
-/**
- * Where we are in the inbound (DEBIT) transaction history.
- */
-static void *out_wire_off;
-
-/**
- * Number of bytes in #in_wire_off and #out_wire_off.
- */
-static size_t wire_off_size;
-
-/**
* Array of reports about row inconsitencies in wire_out table.
*/
static json_t *report_wire_out_inconsistencies;
@@ -226,6 +220,11 @@ static json_t *report_row_minor_inconsistencies;
static json_t *report_lags;
/**
+ * Amount that is considered "tiny"
+ */
+static struct TALER_Amount tiny_amount;
+
+/**
* Total amount that was transferred too much from the exchange.
*/
static struct TALER_Amount total_bad_amount_out_plus;
@@ -434,12 +433,6 @@ do_shutdown (void *cls)
report_lags = NULL;
report_wire_format_inconsistencies = NULL;
}
- if (NULL != hh)
- {
- wp->get_history_cancel (wp->cls,
- hh);
- hh = NULL;
- }
if (NULL != in_map)
{
GNUNET_CONTAINER_multihashmap_iterate (in_map,
@@ -456,23 +449,23 @@ do_shutdown (void *cls)
GNUNET_CONTAINER_multihashmap_destroy (out_map);
out_map = NULL;
}
- if (NULL != wp)
- {
- TALER_WIRE_plugin_unload (wp);
- wp = NULL;
- }
- if (NULL != wp_section_name)
- {
- GNUNET_free (wp_section_name);
- wp_section_name = NULL;
- }
while (NULL != (wa = wa_head))
{
+ if (NULL != wa->hh)
+ {
+ struct TALER_WIRE_Plugin *wp = wa->wire_plugin;
+
+ wp->get_history_cancel (wp->cls,
+ wa->hh);
+ wa->hh = NULL;
+ }
GNUNET_CONTAINER_DLL_remove (wa_head,
wa_tail,
wa);
TALER_WIRE_plugin_unload (wa->wire_plugin);
GNUNET_free (wa->section_name);
+ GNUNET_free_non_null (wa->in_wire_off);
+ GNUNET_free_non_null (wa->out_wire_off);
GNUNET_free (wa);
}
if (NULL != adb)
@@ -534,25 +527,46 @@ commit (enum GNUNET_DB_QueryStatus qs)
esession);
return qs;
}
+ for (struct WireAccount *wa = wa_head;
+ NULL != wa;
+ wa = wa->next)
+ {
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx)
+ qs = adb->update_wire_auditor_account_progress (adb->cls,
+ asession,
+ &master_pub,
+ wa->section_name,
+ &wa->pp,
+ wa->in_wire_off,
+ wa->out_wire_off,
+ wa->wire_off_size);
+ else
+ qs = adb->insert_wire_auditor_account_progress (adb->cls,
+ asession,
+ &master_pub,
+ wa->section_name,
+ &wa->pp,
+ wa->in_wire_off,
+ wa->out_wire_off,
+ wa->wire_off_size);
+ if (0 >= qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Failed to update auditor DB, not recording progress\n");
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
+ }
+ }
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx)
qs = adb->update_wire_auditor_progress (adb->cls,
asession,
&master_pub,
- wp_section_name,
- &pp,
- in_wire_off,
- out_wire_off,
- wire_off_size);
+ &pp);
else
qs = adb->insert_wire_auditor_progress (adb->cls,
asession,
&master_pub,
- wp_section_name,
- &pp,
- in_wire_off,
- out_wire_off,
- wire_off_size);
-
+ &pp);
if (0 >= qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -561,9 +575,8 @@ commit (enum GNUNET_DB_QueryStatus qs)
return qs;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Concluded audit step at %llu/%llu\n"),
- (unsigned long long) pp.last_reserve_in_serial_id,
- (unsigned long long) pp.last_wire_out_serial_id);
+ "Concluded audit step at %s\n",
+ GNUNET_STRINGS_absolute_time_to_string (pp.last_timestamp));
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
@@ -602,16 +615,111 @@ commit (enum GNUNET_DB_QueryStatus qs)
}
+/* ***************************** Analyze required transfers ************************ */
+
+/**
+ * Function called on deposits that are past their due date
+ * and have not yet seen a wire transfer.
+ *
+ * @param cls closure
+ * @param rowid deposit table row of the coin's deposit
+ * @param coin_pub public key of the coin
+ * @param amount value of the deposit, including fee
+ * @param wire where should the funds be wired
+ * @param deadline what was the requested wire transfer deadline
+ * @param tiny did the exchange defer this transfer because it is too small?
+ * @param done did the exchange claim that it made a transfer?
+ */
+static void
+wire_missing_cb (void *cls,
+ uint64_t rowid,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
+ const struct TALER_Amount *amount,
+ const json_t *wire,
+ struct GNUNET_TIME_Absolute deadline,
+ /* bool? */ int tiny,
+ /* bool? */ int done)
+{
+ GNUNET_break (GNUNET_OK ==
+ TALER_amount_add (&total_amount_lag,
+ &total_amount_lag,
+ amount));
+ if ( (GNUNET_YES == tiny) &&
+ (0 > TALER_amount_cmp (amount,
+ &tiny_amount)) )
+ return; /* acceptable, amount was tiny */
+ report (report_lags,
+ json_pack ("{s:I, s:o, s:s, s:s, s:o, s:O}",
+ "row", (json_int_t) rowid,
+ "amount", TALER_JSON_from_amount (amount),
+ "deadline", GNUNET_STRINGS_absolute_time_to_string (deadline),
+ "claimed_done", (done) ? "yes" : "no",
+ "coin_pub", GNUNET_JSON_from_data_auto (coin_pub),
+ "account", wire));
+
+}
+
+
+/**
+ * Checks that all wire transfers that should have happened
+ * (based on deposits) have indeed happened.
+ *
+ * FIXME: this check _might_ rather belong with the
+ * taler-auditor logic.
+ */
+static void
+check_for_required_transfers ()
+{
+ struct GNUNET_TIME_Absolute next_timestamp;
+ enum GNUNET_DB_QueryStatus qs;
+
+ next_timestamp = GNUNET_TIME_absolute_get ();
+ /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing
+ without immediately raising undue concern */
+ next_timestamp = GNUNET_TIME_absolute_subtract (next_timestamp,
+ GRACE_PERIOD);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Analyzing exchange's unfinished deposits\n");
+ qs = edb->select_deposits_missing_wire (edb->cls,
+ esession,
+ pp.last_timestamp,
+ next_timestamp,
+ &wire_missing_cb,
+ &next_timestamp);
+ if (0 > qs)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ pp.last_timestamp = next_timestamp;
+ /* conclude with success */
+ commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT);
+}
+
+
/* ***************************** Analyze reserves_out ************************ */
+/**
+ * Clean up after processing wire out data.
+ */
+static void
+conclude_wire_out ()
+{
+ GNUNET_CONTAINER_multihashmap_destroy (out_map);
+ out_map = NULL;
+ check_for_required_transfers ();
+}
+
/**
* Function called with details about outgoing wire transfers
* as claimed by the exchange DB.
*
- * @param cls NULL
+ * @param cls a `struct WireAccount`
* @param rowid unique serial ID for the refresh session in our DB
- * @param date timestamp of the wire transfer (roughly)
+ * @param date timestamp of the transfer (roughly)
* @param wtid wire transfer subject
* @param wire wire transfer details of the receiver
* @param amount amount that was wired
@@ -625,6 +733,7 @@ wire_out_cb (void *cls,
const json_t *wire,
const struct TALER_Amount *amount)
{
+ struct WireAccount *wa = cls;
struct GNUNET_HashCode key;
struct ReserveOutInfo *roi;
@@ -757,6 +866,7 @@ cleanup:
free_roi (NULL,
&key,
roi));
+ wa->pp.last_wire_out_serial_id = rowid + 1;
return GNUNET_OK;
}
@@ -765,7 +875,7 @@ cleanup:
* Complain that we failed to match an entry from #out_map. This
* means a wire transfer was made without proper justification.
*
- * @param cls NULL
+ * @param cls a `struct WireAccount`
* @param key unused key
* @param value the `struct ReserveOutInfo` to report
* @return #GNUNET_OK
@@ -775,8 +885,10 @@ complain_out_not_found (void *cls,
const struct GNUNET_HashCode *key,
void *value)
{
+ struct WireAccount *wa = cls;
struct ReserveOutInfo *roi = value;
+ (void) wa; // FIXME: log which account is affected...
report (report_wire_out_inconsistencies,
json_pack ("{s:I, s:o, s:o, s:o, s:s, s:s}",
"row", (json_int_t) 0,
@@ -796,84 +908,36 @@ complain_out_not_found (void *cls,
/**
- * Function called on deposits that are past their due date
- * and have not yet seen a wire transfer.
- *
- * @param cls closure
- * @param rowid deposit table row of the coin's deposit
- * @param coin_pub public key of the coin
- * @param amount value of the deposit, including fee
- * @param wire where should the funds be wired
- * @param deadline what was the requested wire transfer deadline
- * @param tiny did the exchange defer this transfer because it is too small?
- * @param done did the exchange claim that it made a transfer?
- */
-static void
-wire_missing_cb (void *cls,
- uint64_t rowid,
- const struct TALER_CoinSpendPublicKeyP *coin_pub,
- const struct TALER_Amount *amount,
- const json_t *wire,
- struct GNUNET_TIME_Absolute deadline,
- /* bool? */ int tiny,
- /* bool? */ int done)
-{
- GNUNET_break (GNUNET_OK ==
- TALER_amount_add (&total_amount_lag,
- &total_amount_lag,
- amount));
- if (GNUNET_YES == tiny)
- {
- struct TALER_Amount rounded;
-
- rounded = *amount;
- GNUNET_break (GNUNET_SYSERR !=
- wp->amount_round (wp->cls,
- &rounded));
- if (0 == TALER_amount_cmp (&rounded,
- &zero))
- return; /* acceptable, amount was tiny */
- }
- report (report_lags,
- json_pack ("{s:I, s:o, s:s, s:s, s:o, s:O}",
- "row", (json_int_t) rowid,
- "amount", TALER_JSON_from_amount (amount),
- "deadline", GNUNET_STRINGS_absolute_time_to_string (deadline),
- "claimed_done", (done) ? "yes" : "no",
- "coin_pub", GNUNET_JSON_from_data_auto (coin_pub),
- "account", wire));
-
-}
-
-
-/**
- * Start processing the next wire account.
- * Shuts down if we are done.
+ * Main function for processing 'reserves_out' data. We start by going over
+ * the DEBIT transactions this time, and then verify that all of them are
+ * justified by 'reserves_out'.
*
- * @param cls NULL
+ * @param cls `struct WireAccount` with a wire account list to process
*/
static void
-process_next_account (void *cls);
+process_debits (void *cls);
/**
* Go over the "wire_out" table of the exchange and
* verify that all wire outs are in that table.
+ *
+ * @param wa wire account we are processing
*/
static void
-check_exchange_wire_out ()
+check_exchange_wire_out (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
- struct GNUNET_TIME_Absolute next_timestamp;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Analyzing exchange's wire OUT table\n");
+ "Analyzing exchange's wire OUT table for account `%s'\n",
+ wa->section_name);
qs = edb->select_wire_out_above_serial_id_by_account (edb->cls,
esession,
- wp_section_name,
- pp.last_wire_out_serial_id,
+ wa->section_name,
+ wa->pp.last_wire_out_serial_id,
&wire_out_cb,
- NULL);
+ wa);
if (0 > qs)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
@@ -882,41 +946,13 @@ check_exchange_wire_out ()
return;
}
GNUNET_CONTAINER_multihashmap_iterate (out_map,
- &complain_out_not_found,
- NULL);
+ &complain_out_not_found,
+ wa);
/* clean up */
GNUNET_CONTAINER_multihashmap_iterate (out_map,
- &free_roi,
- NULL);
- GNUNET_CONTAINER_multihashmap_destroy (out_map);
- out_map = NULL;
-
- /* now check that all wire transfers that should have happened,
- have indeed happened */
- next_timestamp = GNUNET_TIME_absolute_get ();
- /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing
- without immediately raising undue concern */
- next_timestamp = GNUNET_TIME_absolute_subtract (next_timestamp,
- GRACE_PERIOD);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Analyzing exchange's unfinished deposits\n");
- qs = edb->select_deposits_missing_wire (edb->cls,
- esession,
- pp.last_timestamp,
- next_timestamp,
- &wire_missing_cb,
- &next_timestamp);
- if (0 > qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- global_ret = 1;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- pp.last_timestamp = next_timestamp;
-
- /* continue with next account: */
- process_next_account (NULL);
+ &free_roi,
+ NULL);
+ process_debits (wa->next);
}
@@ -925,7 +961,7 @@ check_exchange_wire_out ()
* are credited to the exchange's account (incoming
* transactions).
*
- * @param cls closure
+ * @param cls `struct WireAccount` with current wire account to process
* @param ec error code in case something went wrong
* @param dir direction of the transfer
* @param row_off identification of the position at which we are querying
@@ -941,10 +977,10 @@ history_debit_cb (void *cls,
size_t row_off_size,
const struct TALER_WIRE_TransferDetails *details)
{
+ struct WireAccount *wa = cls;
struct ReserveOutInfo *roi;
struct GNUNET_HashCode rowh;
-
if (TALER_BANK_DIRECTION_NONE == dir)
{
if (TALER_EC_NONE != ec)
@@ -954,10 +990,8 @@ history_debit_cb (void *cls,
"Error fetching history: %u!\n",
(unsigned int) ec);
}
- /* end of iteration, now check wire_out to see
- if it matches #out_map */
- hh = NULL;
- check_exchange_wire_out ();
+ wa->hh = NULL;
+ check_exchange_wire_out (wa);
return GNUNET_OK;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -987,19 +1021,38 @@ history_debit_cb (void *cls,
GNUNET_free (diagnostic);
return GNUNET_OK;
}
+
+ /* Update offset */
+ if (NULL == wa->out_wire_off)
+ {
+ wa->wire_off_size = row_off_size;
+ wa->out_wire_off = GNUNET_malloc (row_off_size);
+ }
+ if (wa->wire_off_size != row_off_size)
+ {
+ GNUNET_break (0);
+ commit (GNUNET_DB_STATUS_HARD_ERROR);
+ wa->hh = NULL;
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_SYSERR;
+ }
+ memcpy (wa->out_wire_off,
+ row_off,
+ row_off_size);
+
roi = GNUNET_new (struct ReserveOutInfo);
GNUNET_CRYPTO_hash (&details->wtid,
- sizeof (details->wtid),
- &roi->subject_hash);
+ sizeof (details->wtid),
+ &roi->subject_hash);
roi->details.amount = details->amount;
roi->details.execution_date = details->execution_date;
roi->details.wtid = details->wtid;
roi->details.account_url = GNUNET_strdup (details->account_url);
if (GNUNET_OK !=
GNUNET_CONTAINER_multihashmap_put (out_map,
- &roi->subject_hash,
- roi,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
+ &roi->subject_hash,
+ roi,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
{
char *diagnostic;
@@ -1026,31 +1079,43 @@ history_debit_cb (void *cls,
/**
- * Main function for processing 'reserves_out' data.
- * We start by going over the DEBIT transactions this
- * time, and then verify that all of them are justified
- * by 'reserves_out'.
+ * Main function for processing 'reserves_out' data. We start by going over
+ * the DEBIT transactions this time, and then verify that all of them are
+ * justified by 'reserves_out'.
+ *
+ * @param cls `struct WireAccount` with a wire account list to process
*/
static void
-process_debits ()
+process_debits (void *cls)
{
+ struct WireAccount *wa = cls;
+ struct TALER_WIRE_Plugin *wp;
+
+ if (NULL == wa)
+ {
+ /* end of iteration, now check wire_out to see
+ if it matches #out_map */
+ conclude_wire_out ();
+ return;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Checking bank DEBIT records\n");
- GNUNET_assert (NULL == hh);
- out_map = GNUNET_CONTAINER_multihashmap_create (1024,
- GNUNET_YES);
- hh = wp->get_history (wp->cls,
- wp_section_name,
- TALER_BANK_DIRECTION_DEBIT,
- out_wire_off,
- wire_off_size,
- INT64_MAX,
- &history_debit_cb,
- NULL);
- if (NULL == hh)
+ "Checking bank DEBIT records of account `%s'\n",
+ wa->section_name);
+ GNUNET_assert (NULL == wa->hh);
+ wp = wa->wire_plugin;
+ wa->hh = wp->get_history (wp->cls,
+ wa->section_name,
+ TALER_BANK_DIRECTION_DEBIT,
+ wa->out_wire_off,
+ wa->wire_off_size,
+ INT64_MAX,
+ &history_debit_cb,
+ wa);
+ if (NULL == wa->hh)
{
fprintf (stderr,
- "Failed to obtain bank transaction history\n");
+ "Failed to obtain bank transaction history for `%s'\n",
+ wa->section_name);
commit (GNUNET_DB_STATUS_HARD_ERROR);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
@@ -1059,14 +1124,40 @@ process_debits ()
}
+/**
+ * Begin analyzing wire_out.
+ */
+static void
+begin_debit_audit ()
+{
+ out_map = GNUNET_CONTAINER_multihashmap_create (1024,
+ GNUNET_YES);
+ process_debits (wa_head);
+}
+
+
/* ***************************** Analyze reserves_in ************************ */
+/**
+ * Conclude the credit history check by logging entries that
+ * were not found and freeing resources. Then move on to
+ * processing debits.
+ */
+static void
+conclude_credit_history ()
+{
+ GNUNET_CONTAINER_multihashmap_destroy (in_map);
+ in_map = NULL;
+ /* credit done, now check debits */
+ begin_debit_audit ();
+}
+
/**
* Function called with details about incoming wire transfers
* as claimed by the exchange DB.
*
- * @param cls NULL
+ * @param cls a `struct WireAccount` we are processing
* @param rowid unique serial ID for the refresh session in our DB
* @param reserve_pub public key of the reserve (also the WTID)
* @param credit amount that was received
@@ -1086,6 +1177,7 @@ reserve_in_cb (void *cls,
size_t wire_reference_size,
struct GNUNET_TIME_Absolute execution_date)
{
+ struct WireAccount *wa = cls;
struct ReserveInInfo *rii;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -1125,7 +1217,7 @@ reserve_in_cb (void *cls,
GNUNET_free (rii);
return GNUNET_OK;
}
- pp.last_reserve_in_serial_id = rowid + 1;
+ wa->pp.last_reserve_in_serial_id = rowid + 1;
return GNUNET_OK;
}
@@ -1133,7 +1225,7 @@ reserve_in_cb (void *cls,
/**
* Complain that we failed to match an entry from #in_map.
*
- * @param cls NULL
+ * @param cls a `struct WireAccount`
* @param key unused key
* @param value the `struct ReserveInInfo` to free
* @return #GNUNET_OK
@@ -1143,8 +1235,10 @@ complain_in_not_found (void *cls,
const struct GNUNET_HashCode *key,
void *value)
{
+ struct WireAccount *wa = cls;
struct ReserveInInfo *rii = value;
+ (void) wa; // FIXME: log which account is affected...
report (report_reserve_in_inconsistencies,
json_pack ("{s:I, s:o, s:o, s:o, s:s, s:s}",
"row", (json_int_t) rii->rowid,
@@ -1162,24 +1256,13 @@ complain_in_not_found (void *cls,
/**
- * Conclude the credit history check by logging entries that
- * were not found and freeing resources. Then move on to
- * processing debits.
+ * Start processing the next wire account.
+ * Shuts down if we are done.
+ *
+ * @param cls `struct WireAccount` with a wire account list to process
*/
static void
-conclude_credit_history ()
-{
- GNUNET_CONTAINER_multihashmap_iterate (in_map,
- &complain_in_not_found,
- NULL);
- /* clean up before 2nd phase */
- GNUNET_CONTAINER_multihashmap_iterate (in_map,
- &free_rii,
- NULL);
- GNUNET_CONTAINER_multihashmap_destroy (in_map);
- in_map = NULL;
- process_debits ();
-}
+process_credits (void *cls);
/**
@@ -1187,7 +1270,7 @@ conclude_credit_history ()
* are credited to the exchange's account (incoming
* transactions).
*
- * @param cls closure
+ * @param cls `struct WireAccount` we are processing
* @param ec error code in case something went wrong
* @param dir direction of the transfer
* @param row_off identification of the position at which we are querying
@@ -1203,6 +1286,7 @@ history_credit_cb (void *cls,
size_t row_off_size,
const struct TALER_WIRE_TransferDetails *details)
{
+ struct WireAccount *wa = cls;
struct ReserveInInfo *rii;
struct GNUNET_HashCode key;
@@ -1216,8 +1300,18 @@ history_credit_cb (void *cls,
(unsigned int) ec);
}
/* end of operation */
- hh = NULL;
- conclude_credit_history ();
+ wa->hh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Reconciling CREDIT processing of account `%s'\n",
+ wa->section_name);
+ GNUNET_CONTAINER_multihashmap_iterate (in_map,
+ &complain_in_not_found,
+ wa);
+ /* clean up before 2nd phase */
+ GNUNET_CONTAINER_multihashmap_iterate (in_map,
+ &free_rii,
+ NULL);
+ process_credits (wa->next);
return GNUNET_OK;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -1235,29 +1329,29 @@ history_credit_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Failed to find wire transfer at `%s' in exchange database. Audit ends at this point in time.\n",
GNUNET_STRINGS_absolute_time_to_string (details->execution_date));
- hh = NULL;
- conclude_credit_history ();
+ wa->hh = NULL;
+ process_credits (wa->next);
return GNUNET_SYSERR; /* not an error, just end of processing */
}
/* Update offset */
- if (NULL == in_wire_off)
+ if (NULL == wa->in_wire_off)
{
- wire_off_size = row_off_size;
- in_wire_off = GNUNET_malloc (row_off_size);
+ wa->wire_off_size = row_off_size;
+ wa->in_wire_off = GNUNET_malloc (row_off_size);
}
- if (wire_off_size != row_off_size)
+ if (wa->wire_off_size != row_off_size)
{
GNUNET_break (0);
commit (GNUNET_DB_STATUS_HARD_ERROR);
GNUNET_SCHEDULER_shutdown ();
- hh = NULL;
return GNUNET_SYSERR;
}
- memcpy (in_wire_off,
+ memcpy (wa->in_wire_off,
row_off,
row_off_size);
+
/* compare records with expected data */
if (row_off_size != rii->row_off_size)
{
@@ -1384,42 +1478,83 @@ history_credit_cb (void *cls,
* Start processing the next wire account.
* Shuts down if we are done.
*
- * @param cls NULL
+ * @param cls `struct WireAccount` with a wire account list to process
*/
static void
-process_next_account (void *cls)
+process_credits (void *cls)
{
- struct WireAccount *wa;
+ struct WireAccount *wa = cls;
+ struct TALER_WIRE_Plugin *wp;
enum GNUNET_DB_QueryStatus qs;
- int ret;
-
- (void) cls;
- // FIXME: this logic is broken at a high level,
- // as it iterates over the exchange's incoming
- // transactions once PER bank account, so for
- // multiple bank accounts this cannot work!
- if (NULL == (wa = wa_head))
+
+ if (NULL == wa)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Finished with all accounts, shutting down\n");
- if (NULL != wp)
- commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT);
+ /* done with all accounts, conclude check */
+ conclude_credit_history ();
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Analyzing exchange's wire IN table for account `%s'\n",
+ wa->section_name);
+ qs = edb->select_reserves_in_above_serial_id_by_account (edb->cls,
+ esession,
+ wa->section_name,
+ wa->pp.last_reserve_in_serial_id,
+ &reserve_in_cb,
+ wa);
+ if (0 > qs)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
- GNUNET_CONTAINER_DLL_remove (wa_head,
- wa_tail,
- wa);
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Starting audit of account `%s'\n",
+ "Starting bank CREDIT history of account `%s'\n",
wa->section_name);
- /* setup globals */
- if (NULL != wp)
- TALER_WIRE_plugin_unload (wp);
wp = wa->wire_plugin;
- GNUNET_free_non_null (wp_section_name);
- wp_section_name = wa->section_name;
- GNUNET_free (wa);
+ wa->hh = wp->get_history (wp->cls,
+ wa->section_name,
+ TALER_BANK_DIRECTION_CREDIT,
+ wa->in_wire_off,
+ wa->wire_off_size,
+ INT64_MAX,
+ &history_credit_cb,
+ wa);
+ if (NULL == wa->hh)
+ {
+ fprintf (stderr,
+ "Failed to obtain bank transaction history\n");
+ commit (GNUNET_DB_STATUS_HARD_ERROR);
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+}
+
+
+/**
+ * Begin audit of CREDITs to the exchange.
+ */
+static void
+begin_credit_audit ()
+{
+ in_map = GNUNET_CONTAINER_multihashmap_create (1024,
+ GNUNET_YES);
+ /* now go over all bank accounts and check delta with in_map */
+ process_credits (wa_head);
+}
+
+
+/**
+ * Start the database transactions and begin the audit.
+ */
+static void
+begin_transaction ()
+{
+ enum GNUNET_DB_QueryStatus qsx;
+ int ret;
ret = adb->start (adb->cls,
asession);
@@ -1442,14 +1577,30 @@ process_next_account (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
+ for (struct WireAccount *wa = wa_head;
+ NULL != wa;
+ wa = wa->next)
+ {
+ qsx = adb->get_wire_auditor_account_progress (adb->cls,
+ asession,
+ &master_pub,
+ wa->section_name,
+ &wa->pp,
+ &wa->in_wire_off,
+ &wa->out_wire_off,
+ &wa->wire_off_size);
+ if (0 > qsx)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ }
qsx = adb->get_wire_auditor_progress (adb->cls,
asession,
&master_pub,
- wp_section_name,
- &pp,
- &in_wire_off,
- &out_wire_off,
- &wire_off_size);
+ &pp);
if (0 > qsx)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
@@ -1465,54 +1616,10 @@ process_next_account (void *cls)
else
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Resuming audit at %llu/%llu\n"),
- (unsigned long long) pp.last_reserve_in_serial_id,
- (unsigned long long) pp.last_wire_out_serial_id);
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Analyzing exchange's wire IN table\n");
- in_map = GNUNET_CONTAINER_multihashmap_create (1024,
- GNUNET_YES);
- qs = edb->select_reserves_in_above_serial_id_by_account (edb->cls,
- esession,
- wp_section_name,
- pp.last_reserve_in_serial_id,
- &reserve_in_cb,
- NULL);
- if (0 > qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- global_ret = 1;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
- "No new incoming transactions available, skipping CREDIT phase\n");
- process_debits ();
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Checking bank CREDIT records\n");
- hh = wp->get_history (wp->cls,
- wp_section_name,
- TALER_BANK_DIRECTION_CREDIT,
- in_wire_off,
- wire_off_size,
- INT64_MAX,
- &history_credit_cb,
- NULL);
- if (NULL == hh)
- {
- fprintf (stderr,
- "Failed to obtain bank transaction history\n");
- commit (GNUNET_DB_STATUS_HARD_ERROR);
- global_ret = 1;
- GNUNET_SCHEDULER_shutdown ();
- return;
+ "Resuming audit at %s\n",
+ GNUNET_STRINGS_absolute_time_to_string (pp.last_timestamp));
}
+ begin_credit_audit ();
}
@@ -1568,10 +1675,36 @@ run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *c)
{
static const struct TALER_MasterPublicKeyP zeromp;
+ char *tinys;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Launching auditor\n");
cfg = c;
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_string (cfg,
+ "auditor",
+ "TINY_AMOUNT",
+ &tinys))
+ {
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+ "auditor",
+ "TINY_AMOUNT");
+ global_ret = 1;
+ return;
+ }
+ if (GNUNET_OK !=
+ TALER_string_to_amount (tinys,
+ &tiny_amount))
+ {
+ GNUNET_free (tinys);
+ GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
+ "auditor",
+ "TINY_AMOUNT",
+ "invalid amount");
+ global_ret = 1;
+ return;
+ }
+ GNUNET_free (tinys);
if (0 == GNUNET_memcmp (&zeromp,
&master_pub))
{
@@ -1716,7 +1849,7 @@ run (void *cls,
TALER_EXCHANGEDB_find_accounts (cfg,
&process_account_cb,
NULL);
- process_next_account (NULL);
+ begin_transaction ();
}
@@ -1755,12 +1888,12 @@ main (int argc,
NULL));
if (GNUNET_OK !=
GNUNET_PROGRAM_run (argc,
- argv,
+ argv,
"taler-wire-auditor",
- "Audit exchange database for consistency with the bank's wire transfers",
- options,
- &run,
- NULL))
+ "Audit exchange database for consistency with the bank's wire transfers",
+ options,
+ &run,
+ NULL))
return 1;
return global_ret;
}