diff options
Diffstat (limited to 'src/auditor/taler-helper-auditor-wire.c')
-rw-r--r-- | src/auditor/taler-helper-auditor-wire.c | 244 |
1 files changed, 157 insertions, 87 deletions
diff --git a/src/auditor/taler-helper-auditor-wire.c b/src/auditor/taler-helper-auditor-wire.c index babf4bf28..fd920e87e 100644 --- a/src/auditor/taler-helper-auditor-wire.c +++ b/src/auditor/taler-helper-auditor-wire.c @@ -52,6 +52,12 @@ 15) +struct TALER_AUDITORDB_WireAccountProgressPoint +{ + uint64_t last_reserve_in_serial_id; + uint64_t last_wire_out_serial_id; +}; + /** * Information we keep for each supported account. */ @@ -93,9 +99,34 @@ struct WireAccount struct TALER_AUDITORDB_WireAccountProgressPoint start_pp; /** - * Where we are in the transaction history. + * Where we are in the inbound transaction history. + */ + uint64_t wire_off_in; + + /** + * Where we are in the outbound transaction history. */ - struct TALER_AUDITORDB_BankAccountProgressPoint wire_off; + uint64_t wire_off_out; + + /** + * Label under which we store our pp's reserve_in_serial_id. + */ + char *label_reserve_in_serial_id; + + /** + * Label under which we store our pp's reserve_in_serial_id. + */ + char *label_wire_out_serial_id; + + /** + * Label under which we store our wire_off_in. + */ + char *label_wire_off_in; + + /** + * Label under which we store our wire_off_out. + */ + char *label_wire_off_out; /** * Return value when we got this account's progress point. @@ -178,12 +209,9 @@ static enum GNUNET_DB_QueryStatus qsx_gwap; /** * Last reserve_in / wire_out serial IDs seen. */ -static struct TALER_AUDITORDB_WireProgressPoint pp; - -/** - * Last reserve_in / wire_out serial IDs seen. - */ -static struct TALER_AUDITORDB_WireProgressPoint start_pp; +static TALER_ARL_DEF_PP (wire_reserve_close_id); +static TALER_ARL_DEF_PP (wire_batch_deposit_id); +static TALER_ARL_DEF_PP (wire_aggregation_id); /** * Array of reports about row inconsistencies in wire_out table. @@ -302,17 +330,17 @@ static struct TALER_Amount total_wire_out; /** * Total amount of profits drained. */ -static struct TALER_Amount total_drained; +static TALER_ARL_DEF_AB (total_drained); /** - * Starting balance at the beginning of this iteration. + * Final balance at the end of this iteration. */ -static struct TALER_Amount start_balance; +static TALER_ARL_DEF_AB (final_balance); /** - * Final balance at the end of this iteration. + * Starting balance at the beginning of this iteration. */ -static struct TALER_Amount final_balance; +static struct TALER_Amount start_balance; /** * True if #start_balance was initialized. @@ -525,9 +553,9 @@ do_shutdown (void *cls) TALER_JSON_pack_amount ("total_wire_out", &total_wire_out), TALER_JSON_pack_amount ("total_drained", - &total_drained), + &TALER_ARL_USE_AB (total_drained)), TALER_JSON_pack_amount ("final_balance", - &final_balance), + &TALER_ARL_USE_AB (final_balance)), /* Tested in test-auditor.sh #1 */ TALER_JSON_pack_amount ("total_amount_lag", &total_amount_lag), @@ -549,17 +577,17 @@ do_shutdown (void *cls) TALER_JSON_pack_time_abs_human ("wire_auditor_end_time", GNUNET_TIME_absolute_get ()), GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_id", - start_pp.last_reserve_close_uuid), + 0 /* no longer supported */), GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_id", - pp.last_reserve_close_uuid), + TALER_ARL_USE_PP (wire_reserve_close_id)), GNUNET_JSON_pack_uint64 ("start_pp_last_batch_deposit_id", - start_pp.last_batch_deposit_uuid), + 0 /* no longer supported */), GNUNET_JSON_pack_uint64 ("end_pp_last_batch_deposit_id", - pp.last_batch_deposit_uuid), + TALER_ARL_USE_PP (wire_batch_deposit_id)), GNUNET_JSON_pack_uint64 ("start_pp_last_aggregation_serial_id", - start_pp.last_aggregation_serial), + 0 /* no longer supported */), GNUNET_JSON_pack_uint64 ("end_pp_last_aggregation_serial_id", - pp.last_aggregation_serial), + TALER_ARL_USE_PP (wire_aggregation_id)), GNUNET_JSON_pack_array_steal ("account_progress", report_account_progress))); report_wire_out_inconsistencies = NULL; @@ -617,6 +645,10 @@ do_shutdown (void *cls) GNUNET_CONTAINER_DLL_remove (wa_head, wa_tail, wa); + GNUNET_free (wa->label_reserve_in_serial_id); + GNUNET_free (wa->label_wire_out_serial_id); + GNUNET_free (wa->label_wire_off_in); + GNUNET_free (wa->label_wire_off_out); GNUNET_free (wa); } if (NULL != ctx) @@ -671,8 +703,8 @@ check_pending_rc (void *cls, &rc->wtid), GNUNET_JSON_pack_string ("account", rc->receiver_account))); - pp.last_reserve_close_uuid - = GNUNET_MIN (pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id) + = GNUNET_MIN (TALER_ARL_USE_PP (wire_reserve_close_id), rc->rowid); return GNUNET_OK; } @@ -724,30 +756,32 @@ commit (enum GNUNET_DB_QueryStatus qs) TALER_ARL_amount_add (&sum, &total_wire_in, &start_balance); - TALER_ARL_amount_subtract (&final_balance, + TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance), &sum, &total_wire_out); - qs = TALER_ARL_adb->update_predicted_result (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &final_balance, - &total_drained); + qs = TALER_ARL_adb->update_balance ( + TALER_ARL_adb->cls, + TALER_ARL_SET_AB (total_drained), + TALER_ARL_SET_AB (final_balance), + NULL); } else { - TALER_ARL_amount_subtract (&final_balance, + TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance), &total_wire_in, &total_wire_out); - qs = TALER_ARL_adb->insert_predicted_result (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &final_balance, - &total_drained); + qs = TALER_ARL_adb->insert_balance ( + TALER_ARL_adb->cls, + TALER_ARL_SET_AB (total_drained), + TALER_ARL_SET_AB (final_balance), + NULL); } } else { GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, - &final_balance)); + &TALER_ARL_USE_AB (final_balance))); } if (0 > qs) { @@ -781,19 +815,29 @@ commit (enum GNUNET_DB_QueryStatus qs) GNUNET_JSON_pack_uint64 ("end_wire_out", wa->pp.last_wire_out_serial_id)))); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == wa->qsx) - qs = TALER_ARL_adb->update_wire_auditor_account_progress ( + qs = TALER_ARL_adb->update_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - &wa->wire_off); + wa->label_reserve_in_serial_id, + wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + wa->wire_off_in, + wa->label_wire_off_out, + wa->wire_off_out, + NULL); else - qs = TALER_ARL_adb->insert_wire_auditor_account_progress ( + qs = TALER_ARL_adb->insert_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - &wa->wire_off); + wa->label_reserve_in_serial_id, + wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + wa->wire_off_in, + wa->label_wire_off_out, + wa->wire_off_out, + NULL); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -806,13 +850,19 @@ commit (enum GNUNET_DB_QueryStatus qs) &check_pending_rc, NULL); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx_gwap) - qs = TALER_ARL_adb->update_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qs = TALER_ARL_adb->update_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_SET_PP (wire_reserve_close_id), + TALER_ARL_SET_PP (wire_batch_deposit_id), + TALER_ARL_SET_PP (wire_aggregation_id), + NULL); else - qs = TALER_ARL_adb->insert_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qs = TALER_ARL_adb->insert_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_SET_PP (wire_reserve_close_id), + TALER_ARL_SET_PP (wire_batch_deposit_id), + TALER_ARL_SET_PP (wire_aggregation_id), + NULL); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -822,8 +872,8 @@ commit (enum GNUNET_DB_QueryStatus qs) } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Concluded audit step at %llu/%llu\n", - (unsigned long long) pp.last_aggregation_serial, - (unsigned long long) pp.last_batch_deposit_uuid); + (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id), + (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id)); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { @@ -902,7 +952,6 @@ import_wire_missing_cb (void *cls, wc->max_batch_deposit_uuid = batch_deposit_serial_id; qs = TALER_ARL_adb->insert_pending_deposit ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, batch_deposit_serial_id, wire_target_h_payto, total_amount, @@ -1187,7 +1236,6 @@ clear_finished_transfer_cb ( ac->max_aggregation_serial = tracking_serial_id; qs = TALER_ARL_adb->delete_pending_deposit ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, batch_deposit_serial_id); if (0 == qs) { @@ -1208,7 +1256,7 @@ static void check_for_required_transfers (void) { struct ImportMissingWireContext wc = { - .max_batch_deposit_uuid = pp.last_batch_deposit_uuid, + .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id), .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT }; struct GNUNET_TIME_Absolute deadline; @@ -1217,13 +1265,13 @@ check_for_required_transfers (void) .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT }; struct AggregationContext ac = { - .max_aggregation_serial = pp.last_aggregation_serial, + .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id), .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT }; qs = TALER_ARL_edb->select_batch_deposits_missing_wire ( TALER_ARL_edb->cls, - pp.last_batch_deposit_uuid, + TALER_ARL_USE_PP (wire_batch_deposit_id), &import_wire_missing_cb, &wc); if ( (0 > qs) || (0 > wc.err) ) @@ -1235,10 +1283,10 @@ check_for_required_transfers (void) GNUNET_SCHEDULER_shutdown (); return; } - pp.last_batch_deposit_uuid = wc.max_batch_deposit_uuid; + TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid; qs = TALER_ARL_edb->select_aggregations_above_serial ( TALER_ARL_edb->cls, - pp.last_aggregation_serial, + TALER_ARL_USE_PP (wire_aggregation_id), &clear_finished_transfer_cb, &ac); if ( (0 > qs) || (0 > ac.err) ) @@ -1250,7 +1298,7 @@ check_for_required_transfers (void) GNUNET_SCHEDULER_shutdown (); return; } - pp.last_aggregation_serial = ac.max_aggregation_serial; + TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial; /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing without immediately raising undue concern */ deadline = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), @@ -1259,7 +1307,6 @@ check_for_required_transfers (void) GNUNET_NO); qs = TALER_ARL_adb->select_pending_deposits ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, deadline, &report_wire_missing_cb, &rc); @@ -1735,8 +1782,8 @@ complain_out_not_found (void *cls, GNUNET_free (account_section); GNUNET_free (payto_uri); /* profit drain was correct */ - TALER_ARL_amount_add (&total_drained, - &total_drained, + TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_drained), + &TALER_ARL_USE_AB (total_drained), &amount); return GNUNET_OK; } @@ -1846,7 +1893,7 @@ history_debit_cb (void *cls, TALER_amount2s (&dd->amount), TALER_B2S (&dd->wtid)); /* Update offset */ - wa->wire_off.out_wire_off = dd->serial_id; + wa->wire_off_out = dd->serial_id; slen = strlen (dd->credit_account_uri) + 1; roi = GNUNET_malloc (sizeof (struct ReserveOutInfo) + slen); @@ -1942,7 +1989,7 @@ process_debits (void *cls) // (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!) wa->dhh = TALER_BANK_debit_history (ctx, wa->ai->auth, - wa->wire_off.out_wire_off, + wa->wire_off_out, INT32_MAX, GNUNET_TIME_UNIT_ZERO, &history_debit_cb, @@ -2180,7 +2227,7 @@ analyze_credit (struct WireAccount *wa, } /* Update offset */ - wa->wire_off.in_wire_off = details->serial_id; + wa->wire_off_in = details->serial_id; /* compare records with expected data */ if (0 != GNUNET_memcmp (&details->reserve_pub, &rii->details.reserve_pub)) @@ -2419,7 +2466,7 @@ process_credits (void *cls) // (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!) wa->chh = TALER_BANK_credit_history (ctx, wa->ai->auth, - wa->wire_off.in_wire_off, + wa->wire_off_in, INT32_MAX, GNUNET_TIME_UNIT_ZERO, &history_credit_cb, @@ -2506,8 +2553,8 @@ reserve_closed_cb (void *cls, return GNUNET_SYSERR; return GNUNET_OK; } - pp.last_reserve_close_uuid - = GNUNET_MAX (pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id) + = GNUNET_MAX (TALER_ARL_USE_PP (wire_reserve_close_id), rowid + 1); rc->receiver_account = GNUNET_strdup (receiver_account); rc->wtid = *wtid; @@ -2566,18 +2613,18 @@ begin_transaction (void) } GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, - &total_drained)); + &TALER_ARL_USE_AB (total_drained))); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_wire_in)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_wire_out)); - qs = TALER_ARL_adb->get_predicted_balance ( + qs = TALER_ARL_adb->get_balance ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &start_balance, - &total_drained); + TALER_ARL_GET_AB (total_drained), + TALER_ARL_GET_AB (final_balance), + NULL); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -2597,12 +2644,33 @@ begin_transaction (void) NULL != wa; wa = wa->next) { - wa->qsx = TALER_ARL_adb->get_wire_auditor_account_progress ( + GNUNET_asprintf (&wa->label_reserve_in_serial_id, + "wire-%s-%s", + wa->ai->section_name, + "reserve_in_serial_id"); + GNUNET_asprintf (&wa->label_wire_out_serial_id, + "wire-%s-%s", + wa->ai->section_name, + "wire_out_serial_id"); + GNUNET_asprintf (&wa->label_wire_off_in, + "wire-%s-%s", + wa->ai->section_name, + "wire_off_in"); + GNUNET_asprintf (&wa->label_wire_off_out, + "wire-%s-%s", + wa->ai->section_name, + "wire_off_out"); + wa->qsx = TALER_ARL_adb->get_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - &wa->wire_off); + wa->label_reserve_in_serial_id, + &wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + &wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + &wa->wire_off_in, + wa->label_wire_off_out, + &wa->wire_off_out, + NULL); if (0 > wa->qsx) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wa->qsx); @@ -2610,9 +2678,12 @@ begin_transaction (void) } wa->start_pp = wa->pp; } - qsx_gwap = TALER_ARL_adb->get_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qsx_gwap = TALER_ARL_adb->get_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_GET_PP (wire_reserve_close_id), + TALER_ARL_GET_PP (wire_batch_deposit_id), + TALER_ARL_GET_PP (wire_aggregation_id), + NULL); if (0 > qsx_gwap) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx_gwap); @@ -2625,12 +2696,11 @@ begin_transaction (void) } else { - start_pp = pp; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Resuming wire audit at %llu / %llu / %llu\n", - (unsigned long long) pp.last_reserve_close_uuid, - (unsigned long long) pp.last_batch_deposit_uuid, - (unsigned long long) pp.last_aggregation_serial); + (unsigned long long) TALER_ARL_USE_PP (wire_reserve_close_id), + (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id), + (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id)); } { @@ -2638,7 +2708,7 @@ begin_transaction (void) qs = TALER_ARL_edb->select_reserve_closed_above_serial_id ( TALER_ARL_edb->cls, - pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id), &reserve_closed_cb, NULL); if (0 > qs) |