From 6e5092d83473dc1b0200d82744cf0f0056b0c110 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 14 Sep 2023 17:37:53 +0200 Subject: more work towards auditor support for AML/KYC --- src/auditor/taler-helper-auditor-wire.c | 451 ++++++++++++++++++++++++++------ 1 file changed, 375 insertions(+), 76 deletions(-) (limited to 'src/auditor/taler-helper-auditor-wire.c') diff --git a/src/auditor/taler-helper-auditor-wire.c b/src/auditor/taler-helper-auditor-wire.c index 71775bd4b..2d2bdbcc2 100644 --- a/src/auditor/taler-helper-auditor-wire.c +++ b/src/auditor/taler-helper-auditor-wire.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2017-2022 Taler Systems SA + Copyright (C) 2017-2023 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -548,14 +548,18 @@ do_shutdown (void *cls) start_time), TALER_JSON_pack_time_abs_human ("wire_auditor_end_time", GNUNET_TIME_absolute_get ()), - GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_uuid", + GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_id", start_pp.last_reserve_close_uuid), - GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_uuid", + GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_id", pp.last_reserve_close_uuid), - TALER_JSON_pack_time_abs_human ("start_pp_last_timestamp", - start_pp.last_timestamp.abs_time), - TALER_JSON_pack_time_abs_human ("end_pp_last_timestamp", - pp.last_timestamp.abs_time), + GNUNET_JSON_pack_uint64 ("start_pp_last_batch_deposit_id", + start_pp.last_batch_deposit_uuid), + GNUNET_JSON_pack_uint64 ("end_pp_last_batch_deposit_id", + pp.last_batch_deposit_uuid), + GNUNET_JSON_pack_uint64 ("start_pp_last_aggregation_serial_id", + start_pp.last_aggregation_serial), + GNUNET_JSON_pack_uint64 ("end_pp_last_aggregation_serial_id", + pp.last_aggregation_serial), GNUNET_JSON_pack_array_steal ("account_progress", report_account_progress))); report_wire_out_inconsistencies = NULL; @@ -773,8 +777,7 @@ commit (enum GNUNET_DB_QueryStatus qs) GNUNET_JSON_pack_uint64 ("end_reserve_in", wa->pp.last_reserve_in_serial_id), GNUNET_JSON_pack_uint64 ("start_wire_out", - wa->start_pp. - last_wire_out_serial_id), + wa->start_pp.last_wire_out_serial_id), GNUNET_JSON_pack_uint64 ("end_wire_out", wa->pp.last_wire_out_serial_id)))); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == wa->qsx) @@ -818,8 +821,9 @@ commit (enum GNUNET_DB_QueryStatus qs) return qs; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Concluded audit step at %s\n", - GNUNET_TIME_timestamp2s (pp.last_timestamp)); + "Concluded audit step at %llu/%llu\n", + (unsigned long long) pp.last_aggregation_serial, + (unsigned long long) pp.last_batch_deposit_uuid); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { @@ -856,60 +860,207 @@ commit (enum GNUNET_DB_QueryStatus qs) /* ***************************** Analyze required transfers ************************ */ /** - * Function called on deposits that are past their due date - * and have not yet seen a wire transfer. + * Closure for import_wire_missing_cb(). + */ +struct ImportMissingWireContext +{ + /** + * Set to maximum row ID encountered. + */ + uint64_t max_batch_deposit_uuid; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Function called on deposits that need to be checked for their + * wire transfer. * - * @param cls closure, points to a `struct GNUNET_TIME_Timestamp` + * @param cls closure, points to a `struct ImportMissingWireContext` + * @param batch_deposit_serial_id serial of the entry in the batch deposits table * @param total_amount value of the missing deposits, including fee - * @param payto_uri where should the funds be wired + * @param wire_target_h_payto where should the funds be wired * @param deadline what was the earliest requested wire transfer deadline - * @param kyc_pending NULL if no KYC requirement is pending, otherwise text describing the missing KYC requirement - * @param aml_status status of AML possibly blocking the transfer - * @param aml_limit current monthly AML limit */ static void -wire_missing_cb (void *cls, - const struct TALER_Amount *total_amount, - const char *payto_uri, - struct GNUNET_TIME_Timestamp deadline, - const char *kyc_pending, - enum TALER_AmlDecisionState status, - const struct TALER_Amount *aml_limit) +import_wire_missing_cb (void *cls, + uint64_t batch_deposit_serial_id, + const struct TALER_Amount *total_amount, + const struct TALER_PaytoHashP *wire_target_h_payto, + struct GNUNET_TIME_Timestamp deadline) { - struct GNUNET_TIME_Timestamp *nt = cls; - json_t *rep; + struct ImportMissingWireContext *wc = cls; + enum GNUNET_DB_QueryStatus qs; + + if (wc->err < 0) + return; /* already failed */ + GNUNET_assert (batch_deposit_serial_id > wc->max_batch_deposit_uuid); + wc->max_batch_deposit_uuid = batch_deposit_serial_id; + qs = TALER_ARL_adb->insert_pending_deposit ( + TALER_ARL_adb->cls, + &TALER_ARL_master_pub, + batch_deposit_serial_id, + wire_target_h_payto, + total_amount, + deadline); + if (qs < 0) + wc->err = qs; +} + + +/** + * Information about a delayed wire transfer and the possible + * reasons for the delay. + */ +struct ReasonDetail +{ + /** + * Total amount that should have been transferred. + */ + struct TALER_Amount total_amount; + + /** + * Earliest deadline for an expected transfer to the account. + */ + struct GNUNET_TIME_Timestamp deadline; + + /** + * Target account, NULL if even that is not known (due to + * exchange lacking required entry in wire_targets table). + */ + char *payto_uri; + + /** + * Reasons due to pending KYC requests. + */ + char *kyc_pending; + + /** + * AML decision state for the target account. + */ + enum TALER_AmlDecisionState status; + + /** + * Current AML threshold for the account, may be an invalid account if the + * default threshold applies. + */ + struct TALER_Amount aml_limit; +}; + +/** + * Closure for report_wire_missing_cb(). + */ +struct ReportMissingWireContext +{ + /** + * Map from wire_target_h_payto to `struct ReasonDetail`. + */ + struct GNUNET_CONTAINER_MultiShortmap *map; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Closure for #clear_finished_transfer_cb(). + */ +struct AggregationContext +{ + /** + * Set to maximum row ID encountered. + */ + uint64_t max_aggregation_serial; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Free memory allocated in @a value. + * + * @param cls unused + * @param key unused + * @param value must be a `struct ReasonDetail` + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static enum GNUNET_GenericReturnValue +free_report_entry (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReasonDetail *rd = value; + + GNUNET_free (rd->kyc_pending); + GNUNET_free (rd->payto_uri); + GNUNET_free (rd); + return GNUNET_YES; +} + + +/** + * We had an entry in our map of wire transfers that + * should have been performed. Generate report. + * + * @param cls unused + * @param key unused + * @param value must be a `struct ReasonDetail` + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static enum GNUNET_GenericReturnValue +generate_report (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReasonDetail *rd = value; - *nt = GNUNET_TIME_timestamp_min (deadline, - *nt); - (void) cls; - // TODO: maybe split up by category? - TALER_ARL_amount_add (&total_amount_lag, - &total_amount_lag, - total_amount); /* For now, we simplify and only check that the amount was tiny */ - if (0 > TALER_amount_cmp (total_amount, + if (0 > TALER_amount_cmp (&rd->total_amount, &tiny_amount)) - return; /* acceptable, amount was tiny */ - if (NULL != kyc_pending) + return free_report_entry (cls, + key, + value); /* acceptable, amount was tiny */ + // TODO: maybe split total_amount_lag up by category below? + TALER_ARL_amount_add (&total_amount_lag, + &total_amount_lag, + &rd->total_amount); + if (NULL != rd->kyc_pending) { + json_t *rep; + rep = GNUNET_JSON_PACK ( TALER_JSON_pack_amount ("total_amount", - total_amount), + &rd->total_amount), TALER_JSON_pack_time_abs_human ("deadline", - deadline.abs_time), + rd->deadline.abs_time), GNUNET_JSON_pack_string ("kyc_pending", - kyc_pending), - GNUNET_JSON_pack_string ("account", - payto_uri)); + rd->kyc_pending), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); TALER_ARL_report (report_kyc_lags, rep); } - else if (TALER_AML_NORMAL != status) + else if (TALER_AML_NORMAL != rd->status) { const char *sstatus; + json_t *rep; - switch (status) + switch (rd->status) { case TALER_AML_NORMAL: GNUNET_assert (0); @@ -923,30 +1074,128 @@ wire_missing_cb (void *cls, } rep = GNUNET_JSON_PACK ( TALER_JSON_pack_amount ("total_amount", - total_amount), - TALER_JSON_pack_amount ("aml_limit", - aml_limit), + &rd->total_amount), + GNUNET_JSON_pack_allow_null ( + TALER_JSON_pack_amount ("aml_limit", + TALER_amount_is_valid (&rd->aml_limit) + ? &rd->aml_limit + : NULL)), TALER_JSON_pack_time_abs_human ("deadline", - deadline.abs_time), + rd->deadline.abs_time), GNUNET_JSON_pack_string ("aml_status", sstatus), - GNUNET_JSON_pack_string ("account", - payto_uri)); + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); TALER_ARL_report (report_aml_lags, rep); } else { + json_t *rep; + rep = GNUNET_JSON_PACK ( TALER_JSON_pack_amount ("total_amount", - total_amount), + &rd->total_amount), TALER_JSON_pack_time_abs_human ("deadline", - deadline.abs_time), - GNUNET_JSON_pack_string ("account", - payto_uri)); + rd->deadline.abs_time), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); TALER_ARL_report (report_lags, rep); } + + return free_report_entry (cls, + key, + value); +} + + +/** + * Function called on deposits that are past their due date + * and have not yet seen a wire transfer. + * + * @param cls closure, points to a `struct ReportMissingWireContext` + * @param total_amount value of the missing deposits, including fee + * @param payto_uri where should the funds be wired + * @param deadline what was the earliest requested wire transfer deadline + */ +static void +report_wire_missing_cb (void *cls, + uint64_t batch_deposit_serial_id, + const struct TALER_Amount *total_amount, + const struct TALER_PaytoHashP *wire_target_h_payto, + struct GNUNET_TIME_Timestamp deadline) +{ + struct ReportMissingWireContext *rc = cls; + struct ReasonDetail *rd; + + rd = GNUNET_CONTAINER_multishortmap_get (rc->map, + &wire_target_h_payto->hash); + if (NULL == rd) + { + rd = GNUNET_new (struct ReasonDetail); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multishortmap_put ( + rc->map, + &wire_target_h_payto->hash, + rd, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + rc->err = TALER_ARL_edb->select_justification_for_missing_wire ( + TALER_ARL_edb->cls, + wire_target_h_payto, + &rd->payto_uri, + &rd->kyc_pending, + &rd->status, + &rd->aml_limit); + rd->total_amount = *total_amount; + rd->deadline = deadline; + } + else + { + TALER_ARL_amount_add (&rd->total_amount, + &rd->total_amount, + total_amount); + rd->deadline = GNUNET_TIME_timestamp_min (rd->deadline, + deadline); + } +} + + +/** + * Function called on aggregations that were done for + * a (batch) deposit. + * + * @param cls closure + * @param tracking_serial_id where in the table are we + * @param batch_deposit_serial_id which batch deposit was aggregated + */ +static void +clear_finished_transfer_cb ( + void *cls, + uint64_t tracking_serial_id, + uint64_t batch_deposit_serial_id) +{ + struct AggregationContext *ac = cls; + enum GNUNET_DB_QueryStatus qs; + + if (0 > ac->err) + return; /* already failed */ + GNUNET_assert (ac->max_aggregation_serial < tracking_serial_id); + ac->max_aggregation_serial = tracking_serial_id; + qs = TALER_ARL_adb->delete_pending_deposit ( + TALER_ARL_adb->cls, + &TALER_ARL_master_pub, + batch_deposit_serial_id); + if (0 == qs) + { + /* Aggregated something twice or other error, report! */ + GNUNET_break (0); + // FIXME: report more nicely! + } + if (0 > qs) + ac->err = qs; } @@ -957,31 +1206,79 @@ wire_missing_cb (void *cls, static void check_for_required_transfers (void) { - struct GNUNET_TIME_Timestamp next_timestamp; + struct ImportMissingWireContext wc = { + .max_batch_deposit_uuid = pp.last_batch_deposit_uuid, + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + struct GNUNET_TIME_Absolute deadline; enum GNUNET_DB_QueryStatus qs; + struct ReportMissingWireContext rc = { + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + struct AggregationContext ac = { + .max_aggregation_serial = pp.last_aggregation_serial, + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + qs = TALER_ARL_edb->select_batch_deposits_missing_wire ( + TALER_ARL_edb->cls, + pp.last_batch_deposit_uuid, + &import_wire_missing_cb, + &wc); + if ( (0 > qs) || (0 > wc.err) ) + { + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == wc.err) ); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + pp.last_batch_deposit_uuid = wc.max_batch_deposit_uuid; + qs = TALER_ARL_edb->select_aggregations_above_serial ( + TALER_ARL_edb->cls, + pp.last_aggregation_serial, + &clear_finished_transfer_cb, + &ac); + if ( (0 > qs) || (0 > ac.err) ) + { + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == ac.err) ); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + pp.last_aggregation_serial = ac.max_aggregation_serial; /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing without immediately raising undue concern */ - next_timestamp = GNUNET_TIME_absolute_to_timestamp ( - GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), - GRACE_PERIOD)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing exchange's unfinished deposits (deadline: %s)\n", - GNUNET_TIME_timestamp2s (next_timestamp)); - qs = TALER_ARL_edb->select_batch_deposits_missing_wire (TALER_ARL_edb->cls, - pp.last_timestamp, - next_timestamp, - &wire_missing_cb, - &next_timestamp); - if (0 > qs) + deadline = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GRACE_PERIOD); + rc.map = GNUNET_CONTAINER_multishortmap_create (1024, + GNUNET_NO); + qs = TALER_ARL_adb->select_pending_deposits ( + TALER_ARL_adb->cls, + &TALER_ARL_master_pub, + deadline, + &report_wire_missing_cb, + &rc); + if ( (0 > qs) || (0 > rc.err) ) { GNUNET_break (0); - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == rc.err) ); + GNUNET_CONTAINER_multishortmap_iterate (rc.map, + &free_report_entry, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (rc.map); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } - pp.last_timestamp = next_timestamp; + GNUNET_CONTAINER_multishortmap_iterate (rc.map, + &generate_report, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (rc.map); /* conclude with success */ commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT); GNUNET_SCHEDULER_shutdown (); @@ -2275,10 +2572,11 @@ begin_transaction (void) GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_wire_out)); - qs = TALER_ARL_adb->get_predicted_balance (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &start_balance, - &total_drained); + qs = TALER_ARL_adb->get_predicted_balance ( + TALER_ARL_adb->cls, + &TALER_ARL_master_pub, + &start_balance, + &total_drained); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -2328,9 +2626,10 @@ begin_transaction (void) { start_pp = pp; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Resuming wire audit at %s / %llu\n", - GNUNET_TIME_timestamp2s (pp.last_timestamp), - (unsigned long long) pp.last_reserve_close_uuid); + "Resuming wire audit at %llu / %llu / %llu\n", + (unsigned long long) pp.last_reserve_close_uuid, + (unsigned long long) pp.last_batch_deposit_uuid, + (unsigned long long) pp.last_aggregation_serial); } { -- cgit v1.2.3