diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/auditor/taler-helper-auditor-wire.c | 302 | ||||
-rw-r--r-- | src/bank-lib/bank_api_credit.c | 120 | ||||
-rw-r--r-- | src/bank-lib/bank_api_debit.c | 124 | ||||
-rw-r--r-- | src/bank-lib/taler-exchange-wire-gateway-client.c | 232 | ||||
-rw-r--r-- | src/exchange/taler-exchange-httpd_batch-deposit.c | 1 | ||||
-rw-r--r-- | src/exchange/taler-exchange-httpd_deposit.c | 36 | ||||
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 936 | ||||
-rw-r--r-- | src/exchangedb/pg_insert_deposit.c | 10 | ||||
-rw-r--r-- | src/include/taler_bank_service.h | 167 | ||||
-rw-r--r-- | src/testing/testing_api_cmd_bank_history_credit.c | 137 | ||||
-rw-r--r-- | src/testing/testing_api_cmd_bank_history_debit.c | 139 | ||||
-rw-r--r-- | src/testing/testing_api_cmd_common.c | 39 | ||||
-rw-r--r-- | src/testing/testing_api_cmd_exec_wirewatch.c | 1 |
13 files changed, 1120 insertions, 1124 deletions
diff --git a/src/auditor/taler-helper-auditor-wire.c b/src/auditor/taler-helper-auditor-wire.c index eb85eec9e..852c219cf 100644 --- a/src/auditor/taler-helper-auditor-wire.c +++ b/src/auditor/taler-helper-auditor-wire.c @@ -1469,92 +1469,92 @@ check_exchange_wire_out (struct WireAccount *wa) * transactions). * * @param cls `struct WireAccount` with current wire account to process - * @param http_status_code http status of the request - * @param ec error code in case something went wrong - * @param row_off identification of the position at which we are querying - * @param details details about the wire transfer - * @param json original response in JSON format - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param dhr HTTP response details */ -static enum GNUNET_GenericReturnValue +static void history_debit_cb (void *cls, - unsigned int http_status_code, - enum TALER_ErrorCode ec, - uint64_t row_off, - const struct TALER_BANK_DebitDetails *details, - const json_t *json) + const struct TALER_BANK_DebitHistoryResponse *dhr) { struct WireAccount *wa = cls; struct ReserveOutInfo *roi; size_t slen; - (void) json; - if (NULL == details) + wa->dhh = NULL; + switch (dhr->http_status) { - wa->dhh = NULL; - if ( (TALER_EC_NONE != ec) && - ( (! ignore_account_404) || - (MHD_HTTP_NOT_FOUND != http_status_code) ) ) + case MHD_HTTP_OK: + for (unsigned int i = 0; i<dhr->details.success.details_length; i++) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching debit history of account %s: %u/%u!\n", - wa->ai->section_name, - http_status_code, - (unsigned int) ec); - commit (GNUNET_DB_STATUS_HARD_ERROR); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + const struct TALER_BANK_DebitDetails *dd + = &dhr->details.success.details[i]; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Analyzing bank DEBIT at %s of %s with WTID %s\n", + GNUNET_TIME_timestamp2s (dd->execution_date), + TALER_amount2s (&dd->amount), + TALER_B2S (&dd->wtid)); + /* Update offset */ + wa->wire_off.out_wire_off = dd->serial_id; + slen = strlen (dd->credit_account_uri) + 1; + roi = GNUNET_malloc (sizeof (struct ReserveOutInfo) + + slen); + GNUNET_CRYPTO_hash (&dd->wtid, + sizeof (dd->wtid), + &roi->subject_hash); + roi->details.amount = dd->amount; + roi->details.execution_date = dd->execution_date; + roi->details.wtid = dd->wtid; + roi->details.credit_account_uri = (const char *) &roi[1]; + memcpy (&roi[1], + dd->credit_account_uri, + slen); + if (GNUNET_OK != + GNUNET_CONTAINER_multihashmap_put (out_map, + &roi->subject_hash, + roi, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) + { + char *diagnostic; + + GNUNET_asprintf (&diagnostic, + "duplicate subject hash `%s'", + TALER_B2S (&roi->subject_hash)); + TALER_ARL_amount_add (&total_wire_format_amount, + &total_wire_format_amount, + &dd->amount); + TALER_ARL_report (report_wire_format_inconsistencies, + GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("amount", + &dd->amount), + GNUNET_JSON_pack_uint64 ("wire_offset", + dd->serial_id), + GNUNET_JSON_pack_string ("diagnostic", + diagnostic))); + GNUNET_free (diagnostic); + } } check_exchange_wire_out (wa); - return GNUNET_OK; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing bank DEBIT at %s of %s with WTID %s\n", - GNUNET_TIME_timestamp2s (details->execution_date), - TALER_amount2s (&details->amount), - TALER_B2S (&details->wtid)); - /* Update offset */ - wa->wire_off.out_wire_off = row_off; - slen = strlen (details->credit_account_uri) + 1; - roi = GNUNET_malloc (sizeof (struct ReserveOutInfo) - + slen); - GNUNET_CRYPTO_hash (&details->wtid, - sizeof (details->wtid), - &roi->subject_hash); - roi->details.amount = details->amount; - roi->details.execution_date = details->execution_date; - roi->details.wtid = details->wtid; - roi->details.credit_account_uri = (const char *) &roi[1]; - memcpy (&roi[1], - details->credit_account_uri, - slen); - if (GNUNET_OK != - GNUNET_CONTAINER_multihashmap_put (out_map, - &roi->subject_hash, - roi, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) - { - char *diagnostic; - - GNUNET_asprintf (&diagnostic, - "duplicate subject hash `%s'", - TALER_B2S (&roi->subject_hash)); - TALER_ARL_amount_add (&total_wire_format_amount, - &total_wire_format_amount, - &details->amount); - TALER_ARL_report (report_wire_format_inconsistencies, - GNUNET_JSON_PACK ( - TALER_JSON_pack_amount ("amount", - &details->amount), - GNUNET_JSON_pack_uint64 ("wire_offset", - row_off), - GNUNET_JSON_pack_string ("diagnostic", - diagnostic))); - GNUNET_free (diagnostic); - return GNUNET_OK; + return; + case MHD_HTTP_NO_CONTENT: + check_exchange_wire_out (wa); + return; + case MHD_HTTP_NOT_FOUND: + if (ignore_account_404) + { + check_exchange_wire_out (wa); + return; + } + break; + default: + break; } - return GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching debit history of account %s: %u/%u!\n", + wa->ai->section_name, + dhr->http_status, + (unsigned int) dhr->ec); + commit (GNUNET_DB_STATUS_HARD_ERROR); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); } @@ -1764,69 +1764,49 @@ process_credits (void *cls); /** - * This function is called for all transactions that - * are credited to the exchange's account (incoming - * transactions). + * We got all of the incoming transactions for @a wa, + * finish processing the account. * - * @param cls `struct WireAccount` we are processing - * @param http_status HTTP status returned by the bank - * @param ec error code in case something went wrong - * @param row_off identification of the position at which we are querying - * @param details details about the wire transfer - * @param json raw response - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param[in,out] wa wire account to process */ -static enum GNUNET_GenericReturnValue -history_credit_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t row_off, - const struct TALER_BANK_CreditDetails *details, - const json_t *json) +static void +conclude_account (struct WireAccount *wa) +{ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reconciling CREDIT processing of account `%s'\n", + wa->ai->section_name); + GNUNET_CONTAINER_multihashmap_iterate (in_map, + &complain_in_not_found, + wa); + /* clean up before 2nd phase */ + GNUNET_CONTAINER_multihashmap_iterate (in_map, + &free_rii, + NULL); + process_credits (wa->next); +} + + +/** + * Analyze credit transation @a details into @a wa. + * + * @param[in,out] wa account that received the transfer + * @param details transfer details + * @return true on success, false to stop loop at this point + */ +static bool +analyze_credit (struct WireAccount *wa, + const struct TALER_BANK_CreditDetails *details) { - struct WireAccount *wa = cls; struct ReserveInInfo *rii; struct GNUNET_HashCode key; - (void) json; - if (NULL == details) - { - wa->chh = NULL; - if ( (TALER_EC_NONE != ec) && - ( (! ignore_account_404) || - (MHD_HTTP_NOT_FOUND != http_status) ) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching credit history of account %s: %u/%s!\n", - wa->ai->section_name, - http_status, - TALER_ErrorCode_get_hint (ec)); - commit (GNUNET_DB_STATUS_HARD_ERROR); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; - } - /* end of operation */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Reconciling CREDIT processing of account `%s'\n", - wa->ai->section_name); - GNUNET_CONTAINER_multihashmap_iterate (in_map, - &complain_in_not_found, - wa); - /* clean up before 2nd phase */ - GNUNET_CONTAINER_multihashmap_iterate (in_map, - &free_rii, - NULL); - process_credits (wa->next); - return GNUNET_OK; - } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Analyzing bank CREDIT at %s of %s with Reserve-pub %s\n", GNUNET_TIME_timestamp2s (details->execution_date), TALER_amount2s (&details->amount), TALER_B2S (&details->reserve_pub)); - GNUNET_CRYPTO_hash (&row_off, - sizeof (row_off), + GNUNET_CRYPTO_hash (&details->serial_id, + sizeof (details->serial_id), &key); rii = GNUNET_CONTAINER_multihashmap_get (in_map, &key); @@ -1835,13 +1815,12 @@ history_credit_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Failed to find wire transfer at `%s' in exchange database. Audit ends at this point in time.\n", GNUNET_TIME_timestamp2s (details->execution_date)); - wa->chh = NULL; process_credits (wa->next); - return GNUNET_SYSERR; /* not an error, just end of processing */ + return false; /* not an error, just end of processing */ } /* Update offset */ - wa->wire_off.in_wire_off = row_off; + wa->wire_off.in_wire_off = details->serial_id; /* compare records with expected data */ if (0 != GNUNET_memcmp (&details->reserve_pub, &rii->details.reserve_pub)) @@ -1852,7 +1831,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &rii->details.amount), TALER_JSON_pack_amount ("amount_wired", @@ -1872,7 +1851,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &zero), TALER_JSON_pack_amount ("amount_wired", @@ -1898,7 +1877,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &rii->details.amount), TALER_JSON_pack_amount ("amount_wired", @@ -1946,7 +1925,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), GNUNET_JSON_pack_data_auto ( "reserve_pub", &rii->details.reserve_pub))); @@ -1965,7 +1944,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), GNUNET_JSON_pack_string ("diagnostic", "execution date mismatch"))); } @@ -1974,7 +1953,60 @@ cleanup: free_rii (NULL, &key, rii)); - return GNUNET_OK; + return true; +} + + +/** + * This function is called for all transactions that + * are credited to the exchange's account (incoming + * transactions). + * + * @param cls `struct WireAccount` we are processing + * @param chr HTTP response returned by the bank + */ +static void +history_credit_cb (void *cls, + const struct TALER_BANK_CreditHistoryResponse *chr) +{ + struct WireAccount *wa = cls; + + wa->chh = NULL; + switch (chr->http_status) + { + case MHD_HTTP_OK: + for (unsigned int i = 0; i<chr->details.success.details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd + = &chr->details.success.details[i]; + + if (! analyze_credit (wa, + cd)) + break; + } + conclude_account (wa); + return; + case MHD_HTTP_NO_CONTENT: + conclude_account (wa); + return; + case MHD_HTTP_NOT_FOUND: + if (ignore_account_404) + { + conclude_account (wa); + return; + } + break; + default: + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching credit history of account %s: %u/%s!\n", + wa->ai->section_name, + chr->http_status, + TALER_ErrorCode_get_hint (chr->ec)); + commit (GNUNET_DB_STATUS_HARD_ERROR); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); } diff --git a/src/bank-lib/bank_api_credit.c b/src/bank-lib/bank_api_credit.c index 36cab3d51..127ae057d 100644 --- a/src/bank-lib/bank_api_credit.c +++ b/src/bank-lib/bank_api_credit.c @@ -77,6 +77,11 @@ static enum GNUNET_GenericReturnValue parse_account_history (struct TALER_BANK_CreditHistoryHandle *hh, const json_t *history) { + struct TALER_BANK_CreditHistoryResponse chr = { + .http_status = MHD_HTTP_OK, + .ec = TALER_EC_NONE, + .response = history + }; json_t *history_array; if (NULL == (history_array = json_object_get (history, @@ -90,49 +95,45 @@ parse_account_history (struct TALER_BANK_CreditHistoryHandle *hh, GNUNET_break_op (0); return GNUNET_SYSERR; } - for (unsigned int i = 0; i<json_array_size (history_array); i++) { - struct TALER_BANK_CreditDetails td; - uint64_t row_id; - struct GNUNET_JSON_Specification hist_spec[] = { - TALER_JSON_spec_amount_any ("amount", - &td.amount), - GNUNET_JSON_spec_timestamp ("date", - &td.execution_date), - GNUNET_JSON_spec_uint64 ("row_id", - &row_id), - GNUNET_JSON_spec_fixed_auto ("reserve_pub", - &td.reserve_pub), - GNUNET_JSON_spec_string ("debit_account", - &td.debit_account_uri), - GNUNET_JSON_spec_string ("credit_account", - &td.credit_account_uri), - GNUNET_JSON_spec_end () - }; - json_t *transaction = json_array_get (history_array, - i); + size_t len = json_array_size (history_array); + struct TALER_BANK_CreditDetails cd[len]; - if (GNUNET_OK != - GNUNET_JSON_parse (transaction, - hist_spec, - NULL, NULL)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (GNUNET_OK != - hh->hcb (hh->hcb_cls, - MHD_HTTP_OK, - TALER_EC_NONE, - row_id, - &td, - transaction)) + for (unsigned int i = 0; i<json_array_size (history_array); i++) { - hh->hcb = NULL; - GNUNET_JSON_parse_free (hist_spec); - return GNUNET_OK; + struct TALER_BANK_CreditDetails *td = &cd[i]; + struct GNUNET_JSON_Specification hist_spec[] = { + TALER_JSON_spec_amount_any ("amount", + &td->amount), + GNUNET_JSON_spec_timestamp ("date", + &td->execution_date), + GNUNET_JSON_spec_uint64 ("row_id", + &td->serial_id), + GNUNET_JSON_spec_fixed_auto ("reserve_pub", + &td->reserve_pub), + GNUNET_JSON_spec_string ("debit_account", + &td->debit_account_uri), + GNUNET_JSON_spec_string ("credit_account", + &td->credit_account_uri), + GNUNET_JSON_spec_end () + }; + json_t *transaction = json_array_get (history_array, + i); + + if (GNUNET_OK != + GNUNET_JSON_parse (transaction, + hist_spec, + NULL, + NULL)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } } - GNUNET_JSON_parse_free (hist_spec); + chr.details.success.details_length = len; + chr.details.success.details = cd; + hh->hcb (hh->hcb_cls, + &chr); } return GNUNET_OK; } @@ -152,72 +153,67 @@ handle_credit_history_finished (void *cls, const void *response) { struct TALER_BANK_CreditHistoryHandle *hh = cls; - const json_t *j = response; - enum TALER_ErrorCode ec; + struct TALER_BANK_CreditHistoryResponse chr = { + .http_status = MHD_HTTP_OK, + .response = response + }; hh->job = NULL; switch (response_code) { case 0: - ec = TALER_EC_GENERIC_INVALID_RESPONSE; + chr.ec = TALER_EC_GENERIC_INVALID_RESPONSE; break; case MHD_HTTP_OK: if (GNUNET_OK != parse_account_history (hh, - j)) + chr.response)) { GNUNET_break_op (0); - json_dumpf (j, + json_dumpf (chr.response, stderr, JSON_INDENT (2)); - response_code = 0; - ec = TALER_EC_GENERIC_INVALID_RESPONSE; + chr.http_status = 0; + chr.ec = TALER_EC_GENERIC_INVALID_RESPONSE; break; } - response_code = MHD_HTTP_NO_CONTENT; /* signal end of list */ - ec = TALER_EC_NONE; - break; + TALER_BANK_credit_history_cancel (hh); + return; case MHD_HTTP_NO_CONTENT: - ec = TALER_EC_NONE; break; case MHD_HTTP_BAD_REQUEST: /* This should never happen, either us or the bank is buggy (or API version conflict); just pass JSON reply to the application */ GNUNET_break_op (0); - ec = TALER_JSON_get_error_code (j); + chr.ec = TALER_JSON_get_error_code (chr.response); break; case MHD_HTTP_UNAUTHORIZED: /* Nothing really to verify, bank says the HTTP Authentication failed. May happen if HTTP authentication is used and the user supplied a wrong username/password combination. */ - ec = TALER_JSON_get_error_code (j); + chr.ec = TALER_JSON_get_error_code (chr.response); break; case MHD_HTTP_NOT_FOUND: /* Nothing really to verify: the bank is either unaware of the endpoint (not a bank), or of the account. We should pass the JSON (?) reply to the application */ - ec = TALER_JSON_get_error_code (j); + chr.ec = TALER_JSON_get_error_code (chr.response); break; case MHD_HTTP_INTERNAL_SERVER_ERROR: /* Server had an internal issue; we should retry, but this API leaves this to the application */ - ec = TALER_JSON_get_error_code (j); + chr.ec = TALER_JSON_get_error_code (chr.response); break; default: /* unexpected response code */ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unexpected response code %u\n", (unsigned int) response_code); - ec = TALER_JSON_get_error_code (j); + chr.ec = TALER_JSON_get_error_code (chr.response); break; } - if (NULL != hh->hcb) - hh->hcb (hh->hcb_cls, - response_code, - ec, - 0LLU, - NULL, - j); + hh->hcb (hh->hcb_cls, + &chr); TALER_BANK_credit_history_cancel (hh); } diff --git a/src/bank-lib/bank_api_debit.c b/src/bank-lib/bank_api_debit.c index 2a76495bd..6642dda81 100644 --- a/src/bank-lib/bank_api_debit.c +++ b/src/bank-lib/bank_api_debit.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2017--2021 Taler Systems SA + Copyright (C) 2017--2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License @@ -77,6 +77,11 @@ static enum GNUNET_GenericReturnValue parse_account_history (struct TALER_BANK_DebitHistoryHandle *hh, const json_t *history) { + struct TALER_BANK_DebitHistoryResponse dhr = { + .http_status = MHD_HTTP_OK, + .ec = TALER_EC_NONE, + .response = history + }; json_t *history_array; if (NULL == (history_array = json_object_get (history, @@ -90,51 +95,47 @@ parse_account_history (struct TALER_BANK_DebitHistoryHandle *hh, GNUNET_break_op (0); return GNUNET_SYSERR; } - for (unsigned int i = 0; i<json_array_size (history_array); i++) { - struct TALER_BANK_DebitDetails td; - uint64_t row_id; - struct GNUNET_JSON_Specification hist_spec[] = { - TALER_JSON_spec_amount_any ("amount", - &td.amount), - GNUNET_JSON_spec_timestamp ("date", - &td.execution_date), - GNUNET_JSON_spec_uint64 ("row_id", - &row_id), - GNUNET_JSON_spec_fixed_auto ("wtid", - &td.wtid), - GNUNET_JSON_spec_string ("credit_account", - &td.credit_account_uri), - GNUNET_JSON_spec_string ("debit_account", - &td.debit_account_uri), - GNUNET_JSON_spec_string ("exchange_base_url", - &td.exchange_base_url), - GNUNET_JSON_spec_end () - }; - json_t *transaction = json_array_get (history_array, - i); + size_t len = json_array_size (history_array); + struct TALER_BANK_DebitDetails dd[len]; - if (GNUNET_OK != - GNUNET_JSON_parse (transaction, - hist_spec, - NULL, NULL)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - if (GNUNET_OK != - hh->hcb (hh->hcb_cls, - MHD_HTTP_OK, - TALER_EC_NONE, - row_id, - &td, - transaction)) + for (unsigned int i = 0; i<len; i++) { - hh->hcb = NULL; - GNUNET_JSON_parse_free (hist_spec); - return GNUNET_OK; + struct TALER_BANK_DebitDetails *td = &dd[i]; + struct GNUNET_JSON_Specification hist_spec[] = { + TALER_JSON_spec_amount_any ("amount", + &td->amount), + GNUNET_JSON_spec_timestamp ("date", + &td->execution_date), + GNUNET_JSON_spec_uint64 ("row_id", + &td->serial_id), + GNUNET_JSON_spec_fixed_auto ("wtid", + &td->wtid), + GNUNET_JSON_spec_string ("credit_account", + &td->credit_account_uri), + GNUNET_JSON_spec_string ("debit_account", + &td->debit_account_uri), + GNUNET_JSON_spec_string ("exchange_base_url", + &td->exchange_base_url), + GNUNET_JSON_spec_end () + }; + json_t *transaction = json_array_get (history_array, + i); + + if (GNUNET_OK != + GNUNET_JSON_parse (transaction, + hist_spec, + NULL, + NULL)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } } - GNUNET_JSON_parse_free (hist_spec); + dhr.details.success.details_length = len; + dhr.details.success.details = dd; + hh->hcb (hh->hcb_cls, + &dhr); } return GNUNET_OK; } @@ -154,69 +155,64 @@ handle_debit_history_finished (void *cls, const void *response) { struct TALER_BANK_DebitHistoryHandle *hh = cls; - const json_t *j = response; - enum TALER_ErrorCode ec; + struct TALER_BANK_DebitHistoryResponse dhr = { + .http_status = MHD_HTTP_OK, + .response = response + }; hh->job = NULL; switch (response_code) { case 0: - ec = TALER_EC_GENERIC_INVALID_RESPONSE; + dhr.ec = TALER_EC_GENERIC_INVALID_RESPONSE; break; case MHD_HTTP_OK: if (GNUNET_OK != parse_account_history (hh, - j)) + dhr.response)) { GNUNET_break_op (0); - response_code = 0; - ec = TALER_EC_GENERIC_INVALID_RESPONSE; + dhr.http_status = 0; + dhr.ec = TALER_EC_GENERIC_INVALID_RESPONSE; break; } - response_code = MHD_HTTP_NO_CONTENT; /* signal end of list */ - ec = TALER_EC_NONE; - break; + TALER_BANK_debit_history_cancel (hh); + return; case MHD_HTTP_NO_CONTENT: - ec = TALER_EC_NONE; break; case MHD_HTTP_BAD_REQUEST: /* This should never happen, either us or the bank is buggy (or API version conflict); just pass JSON reply to the application */ GNUNET_break_op (0); - ec = TALER_JSON_get_error_code (j); + dhr.ec = TALER_JSON_get_error_code (dhr.response); break; case MHD_HTTP_UNAUTHORIZED: /* Nothing really to verify, bank says the HTTP Authentication failed. May happen if HTTP authentication is used and the user supplied a wrong username/password combination. */ - ec = TALER_JSON_get_error_code (j); + dhr.ec = TALER_JSON_get_error_code (dhr.response); break; case MHD_HTTP_NOT_FOUND: /* Nothing really to verify: the bank is either unaware of the endpoint (not a bank), or of the account. We should pass the JSON (?) reply to the application */ - ec = TALER_JSON_get_error_code (j); + dhr.ec = TALER_JSON_get_error_code (dhr.response); break; case MHD_HTTP_INTERNAL_SERVER_ERROR: /* Server had an internal issue; we should retry, but this API leaves this to the application */ - ec = TALER_JSON_get_error_code (j); + dhr.ec = TALER_JSON_get_error_code (dhr.response); break; default: /* unexpected response code */ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unexpected response code %u\n", (unsigned int) response_code); - ec = TALER_JSON_get_error_code (j); + dhr.ec = TALER_JSON_get_error_code (dhr.response); break; } - if (NULL != hh->hcb) - hh->hcb (hh->hcb_cls, - response_code, - ec, - 0LLU, - NULL, - j); + hh->hcb (hh->hcb_cls, + &dhr); TALER_BANK_debit_history_cancel (hh); } diff --git a/src/bank-lib/taler-exchange-wire-gateway-client.c b/src/bank-lib/taler-exchange-wire-gateway-client.c index ab16573a7..5bfd9311d 100644 --- a/src/bank-lib/taler-exchange-wire-gateway-client.c +++ b/src/bank-lib/taler-exchange-wire-gateway-client.c @@ -152,83 +152,72 @@ do_shutdown (void *cls) /** - * Callback used to process ONE entry in the transaction + * Callback used to process the transaction * history returned by the bank. * * @param cls closure - * @param http_status HTTP status code from server - * @param ec taler error code - * @param serial_id identification of the position at - * which we are returning data - * @param details details about the wire transfer - * @param json original full response from server - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to - * abort iteration + * @param reply response we got from the bank */ -static enum GNUNET_GenericReturnValue +static void credit_history_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t serial_id, - const struct TALER_BANK_CreditDetails *details, - const json_t *json) + const struct TALER_BANK_CreditHistoryResponse *reply) { (void) cls; chh = NULL; - if (MHD_HTTP_OK != http_status) + switch (reply->http_status) { - if ( (MHD_HTTP_NO_CONTENT != http_status) || - (TALER_EC_NONE != ec) ) + case 0: + fprintf (stderr, + "Failed to obtain HTTP reply from `%s'\n", + auth.wire_gateway_url); + global_ret = 2; + break; + case MHD_HTTP_NO_CONTENT: + fprintf (stdout, + "No transactions.\n"); + global_ret = 0; + break; + case MHD_HTTP_OK: + for (unsigned int i = 0; i<reply->details.success.details_length; i++) { - if (0 == http_status) - { - fprintf (stderr, - "Failed to obtain HTTP reply from `%s'\n", - auth.wire_gateway_url); - } - else - { - fprintf (stderr, - "Failed to obtain credit history from `%s': HTTP status %u (%s)\n", - auth.wire_gateway_url, - http_status, - TALER_ErrorCode_get_hint (ec)); - } - if (NULL != json) - json_dumpf (json, - stderr, - JSON_INDENT (2)); - global_ret = 2; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_NO; + const struct TALER_BANK_CreditDetails *cd = + &reply->details.success.details[i]; + + /* If credit/debit accounts were specified, use as a filter */ + if ( (NULL != credit_account) && + (0 != strcasecmp (credit_account, + cd->credit_account_uri) ) ) + continue; + if ( (NULL != debit_account) && + (0 != strcasecmp (debit_account, + cd->debit_account_uri) ) ) + continue; + fprintf (stdout, + "%llu: %s->%s (%s) over %s at %s\n", + (unsigned long long) cd->serial_id, + cd->debit_account_uri, + cd->credit_account_uri, + TALER_B2S (&cd->reserve_pub), + TALER_amount2s (&cd->amount), + GNUNET_TIME_timestamp2s (cd->execution_date)); } - fprintf (stdout, - "End of transactions list.\n"); global_ret = 0; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_NO; + break; + default: + fprintf (stderr, + "Failed to obtain credit history from `%s': HTTP status %u (%s)\n", + auth.wire_gateway_url, + reply->http_status, + TALER_ErrorCode_get_hint (reply->ec)); + if (NULL != reply->response) + json_dumpf (reply->response, + stderr, + JSON_INDENT (2)); + global_ret = 2; + break; } - - /* If credit/debit accounts were specified, use as a filter */ - if ( (NULL != credit_account) && - (0 != strcasecmp (credit_account, - details->credit_account_uri) ) ) - return GNUNET_OK; - if ( (NULL != debit_account) && - (0 != strcasecmp (debit_account, - details->debit_account_uri) ) ) - return GNUNET_OK; - - fprintf (stdout, - "%llu: %s->%s (%s) over %s at %s\n", - (unsigned long long) serial_id, - details->debit_account_uri, - details->credit_account_uri, - TALER_B2S (&details->reserve_pub), - TALER_amount2s (&details->amount), - GNUNET_TIME_timestamp2s (details->execution_date)); - return GNUNET_OK; + GNUNET_SCHEDULER_shutdown (); } @@ -264,84 +253,71 @@ execute_credit_history (void) /** - * Function with the debit debit transaction history. + * Function with the debit transaction history. * * @param cls closure - * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request - * 0 if the bank's reply is bogus (fails to follow the protocol), - * #MHD_HTTP_NO_CONTENT if there are no more results; on success the - * last callback is always of this status (even if `abs(num_results)` were - * already returned). - * @param ec detailed error code - * @param serial_id monotonically increasing counter corresponding to the transaction - * @param details details about the wire transfer - * @param json detailed response from the HTTPD, or NULL if reply was not in JSON - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param reply response details */ -static enum GNUNET_GenericReturnValue +static void debit_history_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t serial_id, - const struct TALER_BANK_DebitDetails *details, - const json_t *json) + const struct TALER_BANK_DebitHistoryResponse *reply) { (void) cls; dhh = NULL; - if (MHD_HTTP_OK != http_status) + switch (reply->http_status) { - if ( (MHD_HTTP_NO_CONTENT != http_status) || - (TALER_EC_NONE != ec) ) + case 0: + fprintf (stderr, + "Failed to obtain HTTP reply from `%s'\n", + auth.wire_gateway_url); + global_ret = 2; + break; + case MHD_HTTP_NO_CONTENT: + fprintf (stdout, + "No transactions.\n"); + global_ret = 0; + break; + case MHD_HTTP_OK: + for (unsigned int i = 0; i<reply->details.success.details_length; i++) { - if (0 == http_status) - { - fprintf (stderr, - "Failed to obtain HTTP reply from `%s'\n", - auth.wire_gateway_url); - } - else - { - fprintf (stderr, - "Failed to obtain debit history from `%s': HTTP status %u (%s)\n", - auth.wire_gateway_url, - http_status, - TALER_ErrorCode_get_hint (ec)); - } - if (NULL != json) - json_dumpf (json, - stderr, - JSON_INDENT (2)); - global_ret = 2; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_NO; + const struct TALER_BANK_DebitDetails *dd = + &reply->details.success.details[i]; + + /* If credit/debit accounts were specified, use as a filter */ + if ( (NULL != credit_account) && + (0 != strcasecmp (credit_account, + dd->credit_account_uri) ) ) + continue; + if ( (NULL != debit_account) && + (0 != strcasecmp (debit_account, + dd->debit_account_uri) ) ) + continue; + fprintf (stdout, + "%llu: %s->%s (%s) over %s at %s\n", + (unsigned long long) dd->serial_id, + dd->debit_account_uri, + dd->credit_account_uri, + TALER_B2S (&dd->wtid), + TALER_amount2s (&dd->amount), + GNUNET_TIME_timestamp2s (dd->execution_date)); } - fprintf (stdout, - "End of transactions list.\n"); global_ret = 0; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_NO; + break; + default: + fprintf (stderr, + "Failed to obtain debit history from `%s': HTTP status %u (%s)\n", + auth.wire_gateway_url, + reply->http_status, + TALER_ErrorCode_get_hint (reply->ec)); + if (NULL != reply->response) + json_dumpf (reply->response, + stderr, + JSON_INDENT (2)); + global_ret = 2; + break; } - - /* If credit/debit accounts were specified, use as a filter */ - if ( (NULL != credit_account) && - (0 != strcasecmp (credit_account, - details->credit_account_uri) ) ) - return GNUNET_OK; - if ( (NULL != debit_account) && - (0 != strcasecmp (debit_account, - details->debit_account_uri) ) ) - return GNUNET_OK; - - fprintf (stdout, - "%llu: %s->%s (%s) over %s at %s\n", - (unsigned long long) serial_id, - details->debit_account_uri, - details->credit_account_uri, - TALER_B2S (&details->wtid), - TALER_amount2s (&details->amount), - GNUNET_TIME_timestamp2s (details->execution_date)); - return GNUNET_OK; + GNUNET_SCHEDULER_shutdown (); } diff --git a/src/exchange/taler-exchange-httpd_batch-deposit.c b/src/exchange/taler-exchange-httpd_batch-deposit.c index 7a3ea0fa4..0545c393b 100644 --- a/src/exchange/taler-exchange-httpd_batch-deposit.c +++ b/src/exchange/taler-exchange-httpd_batch-deposit.c @@ -290,7 +290,6 @@ batch_deposit_transaction (void *cls, mhd_ret); if (qs < 0) return qs; - qs = TEH_plugin->do_deposit ( TEH_plugin->cls, deposit, diff --git a/src/exchange/taler-exchange-httpd_deposit.c b/src/exchange/taler-exchange-httpd_deposit.c index 455888a89..740db7c1f 100644 --- a/src/exchange/taler-exchange-httpd_deposit.c +++ b/src/exchange/taler-exchange-httpd_deposit.c @@ -74,20 +74,20 @@ reply_deposit_success ( struct TALER_ExchangeSignatureP sig; enum TALER_ErrorCode ec; - if (TALER_EC_NONE != - (ec = TALER_exchange_online_deposit_confirmation_sign ( - &TEH_keys_exchange_sign_, - h_contract_terms, - h_wire, - h_policy, - exchange_timestamp, - wire_deadline, - refund_deadline, - amount_without_fee, - coin_pub, - merchant, - &pub, - &sig))) + ec = TALER_exchange_online_deposit_confirmation_sign ( + &TEH_keys_exchange_sign_, + h_contract_terms, + h_wire, + h_policy, + exchange_timestamp, + wire_deadline, + refund_deadline, + amount_without_fee, + coin_pub, + merchant, + &pub, + &sig); + if (TALER_EC_NONE != ec) { return TALER_MHD_reply_with_ec (connection, ec, @@ -187,8 +187,6 @@ deposit_transaction (void *cls, mhd_ret); if (qs < 0) return qs; - - /* If the deposit has a policy associated to it, persist it. This will * insert or update the record. */ if (dc->has_policy) @@ -203,16 +201,14 @@ deposit_transaction (void *cls, if (qs < 0) return qs; } - - qs = TEH_plugin->do_deposit ( TEH_plugin->cls, dc->deposit, dc->known_coin_id, &dc->h_payto, (dc->has_policy) - ? &dc->policy_details_serial_id - : NULL, + ? &dc->policy_details_serial_id + : NULL, &dc->exchange_timestamp, &balance_ok, &in_conflict); diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 146f2ee72..e9b28030b 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -13,7 +13,6 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ - /** * @file taler-exchange-wirewatch.c * @brief Process that watches for wire transfers to the exchange's bank account @@ -43,122 +42,88 @@ #define MAXIMUM_BATCH_SIZE 1024 /** - * Information we keep for each supported account. + * Information about our account. */ -struct WireAccount -{ - /** - * Accounts are kept in a DLL. - */ - struct WireAccount *next; - - /** - * Plugins are kept in a DLL. - */ - struct WireAccount *prev; - - /** - * Information about this account. - */ - const struct TALER_EXCHANGEDB_AccountInfo *ai; - - /** - * Active request for history. - */ - struct TALER_BANK_CreditHistoryHandle *hh; - - /** - * Until when is processing this wire plugin delayed? - */ - struct GNUNET_TIME_Absolute delayed_until; - - /** - * Encoded offset in the wire transfer list from where - * to start the next query with the bank. - */ - uint64_t batch_start; - - /** - * Latest row offset seen in this transaction, becomes - * the new #batch_start upon commit. - */ - uint64_t latest_row_off; - - /** - * Maximum row offset this transaction may yield. If we got the - * maximum number of rows, we must not @e delay before running - * the next transaction. - */ - uint64_t max_row_off; - - /** - * Offset where our current shard begins (inclusive). - */ - uint64_t shard_start; - - /** - * Offset where our current shard ends (exclusive). - */ - uint64_t shard_end; - - /** - * When did we start with the shard? - */ - struct GNUNET_TIME_Absolute shard_start_time; - - /** - * For how long did we lock the shard? - */ - struct GNUNET_TIME_Absolute shard_end_time; - - /** - * How long did we take to finish the last shard - * for this account? - */ - struct GNUNET_TIME_Relative shard_delay; - - /** - * Name of our job in the shard table. - */ - char *job_name; - - /** - * How many transactions do we retrieve per batch? - */ - unsigned int batch_size; - - /** - * How much do we increment @e batch_size on success? - */ - unsigned int batch_thresh; - - /** - * Should we delay the next request to the wire plugin a bit? Set to - * false if we actually did some work. - */ - bool delay; - - /** - * Did we start a transaction yet? - */ - bool started_transaction; - - /** - * Is this shard still open for processing. - */ - bool shard_open; -}; +static const struct TALER_EXCHANGEDB_AccountInfo *ai; + +/** + * Active request for history. + */ +static struct TALER_BANK_CreditHistoryHandle *hh; + +/** + * Until when is processing this wire plugin delayed? + */ +static struct GNUNET_TIME_Absolute delayed_until; +/** + * Encoded offset in the wire transfer list from where + * to start the next query with the bank. + */ +static uint64_t batch_start; /** - * Head of list of loaded wire plugins. + * Latest row offset seen in this transaction, becomes + * the new #batch_start upon commit. */ -static struct WireAccount *wa_head; +static uint64_t latest_row_off; /** - * Tail of list of loaded wire plugins. + * Offset where our current shard begins (inclusive). */ -static struct WireAccount *wa_tail; +static uint64_t shard_start; + +/** + * Offset where our current shard ends (exclusive). + */ +static uint64_t shard_end; + +/** + * When did we start with the shard? + */ +static struct GNUNET_TIME_Absolute shard_start_time; + +/** + * For how long did we lock the shard? + */ +static struct GNUNET_TIME_Absolute shard_end_time; + +/** + * How long did we take to finish the last shard + * for this account? + */ +static struct GNUNET_TIME_Relative shard_delay; + +/** + * Name of our job in the shard table. + */ +static char *job_name; + +/** + * How many transactions do we retrieve per batch? + */ +static unsigned int batch_size; + +/** + * How much do we increment @e batch_size on success? + */ +static unsigned int batch_thresh; + +/** + * Did work remain in the transaction queue? Set to true + * if we did some work and thus there might be more. + */ +static bool progress; + +/** + * Did we start a transaction yet? + */ +static bool started_transaction; + +/** + * Is this shard still open for processing. + */ +static bool shard_open; /** * Handle to the context for interacting with the bank. @@ -227,6 +192,10 @@ static int ignore_account_404; */ static struct GNUNET_SCHEDULER_Task *task; +/** + * Name of the configuration section with the account we should watch. + */ +static char *account_section; /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. @@ -236,38 +205,27 @@ static struct GNUNET_SCHEDULER_Task *task; static void shutdown_task (void *cls) { + enum GNUNET_DB_QueryStatus qs; (void) cls; - { - struct WireAccount *wa; - while (NULL != (wa = wa_head)) - { - enum GNUNET_DB_QueryStatus qs; - - if (NULL != wa->hh) - { - TALER_BANK_credit_history_cancel (wa->hh); - wa->hh = NULL; - } - GNUNET_CONTAINER_DLL_remove (wa_head, - wa_tail, - wa); - if (wa->started_transaction) - { - db_plugin->rollback (db_plugin->cls); - wa->started_transaction = false; - } - qs = db_plugin->abort_shard (db_plugin->cls, - wa->job_name, - wa->shard_start, - wa->shard_end); - if (qs <= 0) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to abort work shard on shutdown\n"); - GNUNET_free (wa->job_name); - GNUNET_free (wa); - } + if (NULL != hh) + { + TALER_BANK_credit_history_cancel (hh); + hh = NULL; } + if (started_transaction) + { + db_plugin->rollback (db_plugin->cls); + started_transaction = false; + } + qs = db_plugin->abort_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + if (qs <= 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to abort work shard on shutdown\n"); + GNUNET_free (job_name); if (NULL != ctx) { GNUNET_CURL_fini (ctx); @@ -295,28 +253,36 @@ shutdown_task (void *cls) * account to our list (if it is enabled and we can load the plugin). * * @param cls closure, NULL - * @param ai account information + * @param in_ai account information */ static void add_account_cb (void *cls, - const struct TALER_EXCHANGEDB_AccountInfo *ai) + const struct TALER_EXCHANGEDB_AccountInfo *in_ai) { - struct WireAccount *wa; - (void) cls; - if (! ai->credit_enabled) + if (! in_ai->credit_enabled) + return; /* not enabled for us, skip */ + if ( (NULL != account_section) && + (0 != strcasecmp (ai->section_name, + account_section)) ) return; /* not enabled for us, skip */ - wa = GNUNET_new (struct WireAccount); - wa->ai = ai; - GNUNET_asprintf (&wa->job_name, + if (NULL != ai) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n", + ai->section_name, + in_ai->section_name); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_INVALIDARGUMENT; + return; + } + ai = in_ai; + GNUNET_asprintf (&job_name, "wirewatch-%s", ai->section_name); - wa->batch_size = MAXIMUM_BATCH_SIZE; - if (0 != shard_size % wa->batch_size) - wa->batch_size = shard_size; - GNUNET_CONTAINER_DLL_insert (wa_head, - wa_tail, - wa); + batch_size = MAXIMUM_BATCH_SIZE; + if (0 != shard_size % batch_size) + batch_size = shard_size; } @@ -360,7 +326,16 @@ exchange_serve_process_config (void) } TALER_EXCHANGEDB_find_accounts (&add_account_cb, NULL); - GNUNET_assert (NULL != wa_head); + if (NULL == ai) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No accounts enabled for credit!\n"); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_INVALIDARGUMENT; + TALER_EXCHANGEDB_plugin_unload (db_plugin); + db_plugin = NULL; + return GNUNET_SYSERR; + } return GNUNET_OK; } @@ -368,210 +343,276 @@ exchange_serve_process_config (void) /** * Lock a shard and then begin to query for incoming wire transfers. * - * @param cls a `struct WireAccount` to operate on + * @param cls NULL */ static void lock_shard (void *cls); /** - * Continue with the credit history of the shard - * reserved as @a wa. + * Continue with the credit history of the shard. * - * @param[in,out] cls `struct WireAccount *` account with shard to continue processing + * @param cls NULL */ static void continue_with_shard (void *cls); /** - * We encountered a serialization error. - * Rollback the transaction and try again - * - * @param wa account we are transacting on + * We encountered a serialization error. Rollback the transaction and try + * again. */ static void -handle_soft_error (struct WireAccount *wa) +handle_soft_error (void) { db_plugin->rollback (db_plugin->cls); - wa->started_transaction = false; - if (1 < wa->batch_size) + started_transaction = false; + if (1 < batch_size) { - wa->batch_thresh = wa->batch_size; - wa->batch_size /= 2; + batch_thresh = batch_size; + batch_size /= 2; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Reduced batch size to %llu due to serialization issue\n", - (unsigned long long) wa->batch_size); + (unsigned long long) batch_size); } /* Reset to beginning of transaction, and go again from there. */ - wa->latest_row_off = wa->batch_start; + latest_row_off = batch_start; GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); + NULL); } /** - * Schedule the #lock_shard() operation for - * @a wa. If @a wa is NULL, start with #wa_head. - * - * @param wa account to schedule #lock_shard() for, - * possibly NULL (!). + * Schedule the #lock_shard() operation. */ static void -schedule_transfers (struct WireAccount *wa) +schedule_transfers (void) { - if (NULL == wa) - { - wa = wa_head; - GNUNET_assert (NULL != wa); - } - if (wa->shard_open) + if (shard_open) GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Will retry my shard (%llu,%llu] of %s in %s\n", - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, - wa->job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + job_name, GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_remaining (wa->delayed_until), - GNUNET_YES)); + GNUNET_TIME_absolute_get_remaining (delayed_until), + true)); else GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Will try to lock next shard of %s in %s\n", - wa->job_name, + job_name, GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_remaining (wa->delayed_until), - GNUNET_YES)); + GNUNET_TIME_absolute_get_remaining (delayed_until), + true)); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_at (wa->delayed_until, + task = GNUNET_SCHEDULER_add_at (delayed_until, &lock_shard, - wa); + NULL); } /** - * We are done with the work that is possible on @a wa right now (and the - * transaction was committed, if there was one to commit). Move on to the next - * account. - * - * @param wa wire account for which we completed a shard + * We are done with the work that is possible right now (and the transaction + * was committed, if there was one to commit). Move on to the next shard. */ static void -account_completed (struct WireAccount *wa) +transaction_completed (void) { - GNUNET_assert (! wa->started_transaction); - if ( (wa->batch_start + wa->batch_size == - wa->latest_row_off) && - (wa->batch_size < MAXIMUM_BATCH_SIZE) ) + GNUNET_assert (! started_transaction); + if ( (batch_start + batch_size == + latest_row_off) && + (batch_size < MAXIMUM_BATCH_SIZE) ) { /* The current batch size worked without serialization issues, and we are allowed to grow. Do so slowly. */ int delta; - delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; + delta = ((int) batch_thresh - (int) batch_size) / 4; if (delta < 0) delta = -delta; - wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, - wa->batch_size + delta + 1); + batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, + batch_size + delta + 1); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Increasing batch size to %llu\n", - (unsigned long long) wa->batch_size); + (unsigned long long) batch_size); } - if (wa->delay) + if ( (! progress) && test_mode) { - /* This account was finished, block this one for the - #wirewatch_idle_sleep_interval and move on to the next one. */ - wa->delayed_until - = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); - wa = wa->next; + /* Transaction list was drained and we are in + test mode. So we are done. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transaction list drained and in test mode. Exiting\n"); + GNUNET_SCHEDULER_shutdown (); + return; } GNUNET_assert (NULL == task); - schedule_transfers (wa); + schedule_transfers (); } /** - * Check if we are finished with the current shard. If so, update the - * database, marking the shard as finished. + * We got incoming transaction details from the bank. Add them + * to the database. * - * @param wa wire account to commit for - * @return true if we were indeed done with the shard + * @param details array of transaction details + * @param details_length length of the @a details array */ -static bool -check_shard_done (struct WireAccount *wa) +static void +process_reply (const struct TALER_BANK_CreditDetails *details, + unsigned int details_length) { enum GNUNET_DB_QueryStatus qs; + bool shard_done; + uint64_t lroff = latest_row_off; - if (wa->shard_end > wa->latest_row_off) + if (0 == details_length) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Shard %s (%llu,%llu] at %llu\n", - wa->job_name, - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, - (unsigned long long) wa->latest_row_off); - return false; /* actually, not done! */ + /* Server should have used 204, not 200! */ + GNUNET_break_op (0); + transaction_completed (); + return; } - /* shard is complete, mark this as well */ - qs = db_plugin->complete_shard (db_plugin->cls, - wa->job_name, - wa->shard_start, - wa->shard_end); - switch (qs) + /* check serial IDs for range constraints */ + for (unsigned int i = 0; i<details_length; i++) { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); + const struct TALER_BANK_CreditDetails *cd = &details[i]; + + if (cd->serial_id < lroff) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Serial ID %llu not monotonic (got %llu before). Failing!\n", + (unsigned long long) cd->serial_id, + (unsigned long long) lroff); + db_plugin->rollback (db_plugin->cls); + GNUNET_SCHEDULER_shutdown (); + return; + } + if (cd->serial_id > shard_end) + { + /* we are *past* the current shard (likely because the serial_id of the + shard_end happens to not exist in the DB). So commit and stop this + iteration! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serial ID %llu past shard end at %llu, ending iteration early!\n", + (unsigned long long) cd->serial_id, + (unsigned long long) shard_end); + details_length = i; + progress = true; + lroff = cd->serial_id - 1; + break; + } + lroff = cd->serial_id; + } + if (GNUNET_OK != + db_plugin->start_read_committed (db_plugin->cls, + "wirewatch check for incoming wire transfers")) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); - return false; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for complete_shard. Rolling back.\n"); - handle_soft_error (wa); - return false; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_break (0); - /* Not expected, but let's just continue */ - break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Completed shard %s (%llu,%llu] after %s\n", - wa->job_name, - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, - GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_duration (wa->shard_start_time), - GNUNET_YES)); - break; + return; } - return true; -} - - -/** - * We are finished with the current transaction, try - * to commit and then schedule the next iteration. - * - * @param wa wire account to commit for - */ -static void -do_commit (struct WireAccount *wa) -{ - enum GNUNET_DB_QueryStatus qs; - bool shard_done; + started_transaction = true; - GNUNET_assert (NULL == task); - shard_done = check_shard_done (wa); - wa->started_transaction = false; + for (unsigned int i = 0; i<details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd = &details[i]; + + /* FIXME #7276: Consider using Postgres multi-valued insert here, + for up to 15x speed-up according to + https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006 + (Note: this may require changing both the + plugin API as well as modifying how this function is called.) */ + qs = db_plugin->reserves_in_insert (db_plugin->cls, + &cd->reserve_pub, + &cd->amount, + cd->execution_date, + cd->debit_account_uri, + ai->section_name, + cd->serial_id); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for reserves_in_insert. Rolling back.\n"); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Either wirewatch was freshly started after the system was + shutdown and we're going over an incomplete shard again + after being restarted, or the shard lock period was too + short (number of workers set incorrectly?) and a 2nd + wirewatcher has been stealing our work while we are still + at it. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Attempted to import transaction %llu (%s) twice. " + "This should happen rarely (if not, ask for support).\n", + (unsigned long long) cd->serial_id, + job_name); + db_plugin->rollback (db_plugin->cls); + started_transaction = false; + /* already existed, ok, let's just continue */ + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Imported transaction %llu.", + (unsigned long long) cd->serial_id); + /* normal case */ + break; + } + progress = true; + } + latest_row_off = lroff; + shard_done = (shard_end <= latest_row_off); + if (shard_done) + { + /* shard is complete, mark this as well */ + qs = db_plugin->complete_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + /* Not expected, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard %s (%llu,%llu] after %s\n", + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_duration (shard_start_time), + true)); + break; + } + } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Committing %s progress (%llu,%llu] at %llu\n (%s)", - wa->job_name, - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, - (unsigned long long) wa->latest_row_off, + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + (unsigned long long) latest_row_off, shard_done ? "shard done" : "shard incomplete"); @@ -584,24 +625,24 @@ do_commit (struct WireAccount *wa) return; case GNUNET_DB_STATUS_SOFT_ERROR: /* reduce transaction size to reduce rollback probability */ - handle_soft_error (wa); + handle_soft_error (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + started_transaction = false; /* normal case */ break; } if (shard_done) { - wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); - wa->shard_open = false; - account_completed (wa); - } - else - { - task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); + shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); + shard_open = false; + transaction_completed (); + return; } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + NULL); } @@ -609,208 +650,75 @@ do_commit (struct WireAccount *wa) * Callbacks of this type are used to serve the result of asking * the bank for the transaction history. * - * @param cls closure with the `struct WioreAccount *` we are processing - * @param http_status HTTP status code from the server - * @param ec taler error code - * @param serial_id identification of the position at which we are querying - * @param details details about the wire transfer - * @param json raw JSON response - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param cls NULL + * @param reply response we got from the bank */ -static enum GNUNET_GenericReturnValue +static void history_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t serial_id, - const struct TALER_BANK_CreditDetails *details, - const json_t *json) + const struct TALER_BANK_CreditHistoryResponse *reply) { - struct WireAccount *wa = cls; - enum GNUNET_DB_QueryStatus qs; - - (void) json; + (void) cls; GNUNET_assert (NULL == task); - if (NULL == details) - { - wa->hh = NULL; - if ( (! ( (MHD_HTTP_NOT_FOUND == http_status) && - (ignore_account_404) ) ) && - ( (MHD_HTTP_NO_CONTENT != http_status) && - ( (TALER_EC_NONE != ec) || - (MHD_HTTP_OK != http_status) ) ) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching history: %s (%u)\n", - TALER_ErrorCode_get_hint (ec), - http_status); - if (! (exit_on_error || test_mode) ) - { - account_completed (wa); - return GNUNET_OK; - } - GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; - } - if (wa->started_transaction) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "End of list. Committing progress on %s of (%llu,%llu]!\n", - wa->job_name, - (unsigned long long) wa->batch_start, - (unsigned long long) wa->latest_row_off); - do_commit (wa); - return GNUNET_OK; /* will be ignored anyway */ - } - /* We did not even start a transaction. */ - if ( (wa->delay) && - (test_mode) && - (NULL == wa->next) ) - { - /* We exit on idle */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Shutdown due to test mode!\n"); - GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "No transactions in history response, moving on.\n"); - account_completed (wa); - return GNUNET_OK; /* will be ignored anyway */ - } - - /* We did get 'details' from the bank. Do sanity checks before inserting. */ - if (serial_id < wa->latest_row_off) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Serial ID %llu not monotonic (got %llu before). Failing!\n", - (unsigned long long) serial_id, - (unsigned long long) wa->latest_row_off); - GNUNET_SCHEDULER_shutdown (); - wa->hh = NULL; - return GNUNET_SYSERR; - } - /* If we got 'limit' transactions back from the bank, - we should not introduce any delay before the next - call. */ - if (serial_id >= wa->max_row_off) - wa->delay = false; - if (serial_id > wa->shard_end) - { - /* we are *past* the current shard (likely because the serial_id of the - shard_end happens to not exist in the DB). So commit and stop this - iteration! */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Serial ID %llu past shard end at %llu, ending iteration early!\n", - (unsigned long long) serial_id, - (unsigned long long) wa->shard_end); - wa->latest_row_off = serial_id - 1; /* excluding serial_id! */ - wa->hh = NULL; - if (wa->started_transaction) - { - GNUNET_assert (NULL == task); - do_commit (wa); - } - else - { - GNUNET_assert (NULL == task); - if (check_shard_done (wa)) - account_completed (wa); - else - task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); - } - return GNUNET_SYSERR; - } - if (! wa->started_transaction) + hh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "History request returned with HTTP status %u\n", + reply->http_status); + switch (reply->http_status) { - if (GNUNET_OK != - db_plugin->start_read_committed (db_plugin->cls, - "wirewatch check for incoming wire transfers")) + case MHD_HTTP_OK: + process_reply (reply->details.success.details, + reply->details.success.details_length); + return; + case MHD_HTTP_NO_CONTENT: + transaction_completed (); + return; + case MHD_HTTP_NOT_FOUND: + if (ignore_account_404) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - wa->hh = NULL; - return GNUNET_SYSERR; + transaction_completed (); + return; } - wa->started_transaction = true; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding wire transfer over %s with (hashed) subject `%s'\n", - TALER_amount2s (&details->amount), - TALER_B2S (&details->reserve_pub)); - /* FIXME #7276: Consider using Postgres multi-valued insert here, - for up to 15x speed-up according to - https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006 - (Note: this may require changing both the - plugin API as well as modifying how this function is called.) */ - qs = db_plugin->reserves_in_insert (db_plugin->cls, - &details->reserve_pub, - &details->amount, - details->execution_date, - details->debit_account_uri, - wa->ai->section_name, - serial_id); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - wa->started_transaction = false; - GNUNET_SCHEDULER_shutdown (); - wa->hh = NULL; - return GNUNET_SYSERR; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for reserves_in_insert. Rolling back.\n"); - handle_soft_error (wa); - wa->hh = NULL; - return GNUNET_SYSERR; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* Either wirewatch was freshly started after the system was - shutdown and we're going over an incomplete shard again - after being restarted, or the shard lock period was too - short (number of workers set incorrectly?) and a 2nd - wirewatcher has been stealing our work while we are still - at it. */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Attempted to import transaction %llu (%s) twice. " - "This should happen rarely (if not, ask for support).\n", - (unsigned long long) serial_id, - wa->job_name); - /* already existed, ok, let's just continue */ break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ + default: break; } - wa->latest_row_off = serial_id; - return GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching history: %s (%u)\n", + TALER_ErrorCode_get_hint (reply->ec), + reply->http_status); + if (! exit_on_error) + { + transaction_completed (); + return; + } + GNUNET_SCHEDULER_shutdown (); } static void continue_with_shard (void *cls) { - struct WireAccount *wa = cls; unsigned int limit; + (void) cls; task = NULL; - limit = GNUNET_MIN (wa->batch_size, - wa->shard_end - wa->latest_row_off); - wa->max_row_off = wa->latest_row_off + limit; - GNUNET_assert (NULL == wa->hh); - wa->hh = TALER_BANK_credit_history (ctx, - wa->ai->auth, - wa->latest_row_off, - limit, - test_mode - ? GNUNET_TIME_UNIT_ZERO - : LONGPOLL_TIMEOUT, - &history_cb, - wa); - if (NULL == wa->hh) + GNUNET_assert (shard_end > latest_row_off); + limit = GNUNET_MIN (batch_size, + shard_end - latest_row_off); + GNUNET_assert (NULL == hh); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Requesting credit history staring from %llu\n", + (unsigned long long) latest_row_off); + hh = TALER_BANK_credit_history (ctx, + ai->auth, + latest_row_off, + limit, + test_mode + ? GNUNET_TIME_UNIT_ZERO + : LONGPOLL_TIMEOUT, + &history_cb, + NULL); + if (NULL == hh) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start request for account history!\n"); @@ -824,12 +732,12 @@ continue_with_shard (void *cls) static void lock_shard (void *cls) { - struct WireAccount *wa = cls; enum GNUNET_DB_QueryStatus qs; struct GNUNET_TIME_Relative delay; - uint64_t last_shard_start = wa->shard_start; - uint64_t last_shard_end = wa->shard_end; + uint64_t last_shard_start = shard_start; + uint64_t last_shard_end = shard_end; + (void) cls; task = NULL; if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) @@ -840,17 +748,16 @@ lock_shard (void *cls) GNUNET_SCHEDULER_shutdown (); return; } - if ( (wa->shard_open) && - (GNUNET_TIME_absolute_is_future (wa->shard_end_time)) ) + if ( (shard_open) && + (GNUNET_TIME_absolute_is_future (shard_end_time)) ) { - wa->delay = true; /* default is to delay, unless - we find out that we're really busy */ - wa->batch_start = wa->latest_row_off; + progress = false; + batch_start = latest_row_off; task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); + NULL); return; } - if (wa->shard_open) + if (shard_open) GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Shard not completed in time, will try to re-acquire\n"); /* How long we lock a shard depends on the number of @@ -863,15 +770,15 @@ lock_shard (void *cls) GNUNET_CRYPTO_QUALITY_WEAK, 4 * GNUNET_TIME_relative_max ( wirewatch_idle_sleep_interval, - GNUNET_TIME_relative_multiply (wa->shard_delay, + GNUNET_TIME_relative_multiply (shard_delay, max_workers)).rel_value_us); - wa->shard_start_time = GNUNET_TIME_absolute_get (); + shard_start_time = GNUNET_TIME_absolute_get (); qs = db_plugin->begin_shard (db_plugin->cls, - wa->job_name, + job_name, delay, shard_size, - &wa->shard_start, - &wa->shard_end); + &shard_start, + &shard_end); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -888,52 +795,51 @@ lock_shard (void *cls) rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Serialization error tying to obtain shard %s, will try again in %s!\n", - wa->job_name, + job_name, GNUNET_STRINGS_relative_time_to_string (rdelay, - GNUNET_YES)); - wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); + true)); + delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); } GNUNET_assert (NULL == task); - schedule_transfers (wa->next); + schedule_transfers (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "No shard available, will try again for %s in %s!\n", - wa->job_name, + job_name, GNUNET_STRINGS_relative_time_to_string ( wirewatch_idle_sleep_interval, GNUNET_YES)); - wa->delayed_until = GNUNET_TIME_relative_to_absolute ( + delayed_until = GNUNET_TIME_relative_to_absolute ( wirewatch_idle_sleep_interval); - wa->shard_open = false; + shard_open = false; GNUNET_assert (NULL == task); - schedule_transfers (wa->next); + schedule_transfers (); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* continued below */ break; } - wa->shard_end_time = GNUNET_TIME_relative_to_absolute (delay); + shard_end_time = GNUNET_TIME_relative_to_absolute (delay); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting with shard %s at (%llu,%llu] locked for %s\n", - wa->job_name, - (unsigned long long) wa->shard_start, - (unsigned long long) wa->shard_end, + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES)); - wa->delay = true; /* default is to delay, unless - we find out that we're really busy */ - wa->batch_start = wa->shard_start; - if ( (wa->shard_open) && - (wa->shard_start == last_shard_start) && - (wa->shard_end == last_shard_end) ) - GNUNET_break (wa->latest_row_off >= wa->batch_start); /* resume where we left things */ + true)); + progress = false; + batch_start = shard_start; + if ( (shard_open) && + (shard_start == last_shard_start) && + (shard_end == last_shard_end) ) + GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */ else - wa->latest_row_off = wa->batch_start; - wa->shard_open = true; + latest_row_off = batch_start; + shard_open = true; task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - wa); + NULL); } @@ -956,14 +862,15 @@ run (void *cls, (void) cfgfile; cfg = c; + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + cls); if (GNUNET_OK != exchange_serve_process_config ()) { global_ret = EXIT_NOTCONFIGURED; + GNUNET_SCHEDULER_shutdown (); return; } - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - cls); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); if (NULL == ctx) @@ -973,9 +880,7 @@ run (void *cls, return; } rc = GNUNET_CURL_gnunet_rc_create (ctx); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&lock_shard, - wa_head); + schedule_transfers (); } @@ -991,6 +896,11 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_string ('a', + "account", + "SECTION_NAME", + "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)", + &account_section), GNUNET_GETOPT_option_flag ('e', "exit-on-error", "terminate wirewatch if we failed to download information from the bank", diff --git a/src/exchangedb/pg_insert_deposit.c b/src/exchangedb/pg_insert_deposit.c index 09247e282..ec4d49bf9 100644 --- a/src/exchangedb/pg_insert_deposit.c +++ b/src/exchangedb/pg_insert_deposit.c @@ -26,18 +26,20 @@ #include "pg_helper.h" #include "pg_setup_wire_target.h" #include "pg_compute_shard.h" + + enum GNUNET_DB_QueryStatus TEH_PG_insert_deposit (void *cls, - struct GNUNET_TIME_Timestamp exchange_timestamp, - const struct TALER_EXCHANGEDB_Deposit *deposit) + struct GNUNET_TIME_Timestamp exchange_timestamp, + const struct TALER_EXCHANGEDB_Deposit *deposit) { struct PostgresClosure *pg = cls; struct TALER_PaytoHashP h_payto; enum GNUNET_DB_QueryStatus qs; qs = TEH_PG_setup_wire_target (pg, - deposit->receiver_wire_account, - &h_payto); + deposit->receiver_wire_account, + &h_payto); if (qs < 0) return qs; if (GNUNET_TIME_timestamp_cmp (deposit->wire_deadline, diff --git a/src/include/taler_bank_service.h b/src/include/taler_bank_service.h index bb7f3d33b..7b26bceab 100644 --- a/src/include/taler_bank_service.h +++ b/src/include/taler_bank_service.h @@ -261,6 +261,11 @@ struct TALER_BANK_CreditHistoryHandle; struct TALER_BANK_CreditDetails { /** + * Serial ID of the wire transfer. + */ + uint64_t serial_id; + + /** * Amount that was transferred */ struct TALER_Amount amount; @@ -271,49 +276,85 @@ struct TALER_BANK_CreditDetails struct GNUNET_TIME_Timestamp execution_date; /** - * Reserve public key encoded in the wire - * transfer subject. + * Reserve public key encoded in the wire transfer subject. */ struct TALER_ReservePublicKeyP reserve_pub; /** - * payto://-URL of the source account that - * send the funds. + * payto://-URL of the source account that send the funds. */ const char *debit_account_uri; /** - * payto://-URL of the target account that - * received the funds. + * payto://-URL of the target account that received the funds. */ const char *credit_account_uri; }; /** + * Response details for a history request. + */ +struct TALER_BANK_CreditHistoryResponse +{ + + /** + * HTTP status. Note that #MHD_HTTP_OK and #MHD_HTTP_NO_CONTENT are both + * successful replies, but @e details will only contain @e success information + * if this is set to #MHD_HTTP_OK. + */ + unsigned int http_status; + + /** + * Taler error code, #TALER_EC_NONE on success. + */ + enum TALER_ErrorCode ec; + + /** + * Full response, NULL if body was not in JSON format. + */ + const json_t *response; + + /** + * Details returned depending on the @e http_status. + */ + union + { + + /** + * Details if status was #MHD_HTTP_OK + */ + struct + { + + /** + * Array of transactions recevied. + */ + const struct TALER_BANK_CreditDetails *details; + + /** + * Length of the @e details array. + */ + unsigned int details_length; + + } success; + + } details; + +}; + + +/** * Callbacks of this type are used to serve the result of asking * the bank for the credit transaction history. * * @param cls closure - * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request - * 0 if the bank's reply is bogus (fails to follow the protocol), - * #MHD_HTTP_NO_CONTENT if there are no more results; on success the - * last callback is always of this status (even if `abs(num_results)` were - * already returned). - * @param ec detailed error code - * @param serial_id monotonically increasing counter corresponding to the transaction - * @param details details about the wire transfer - * @param json detailed response from the HTTPD, or NULL if reply was not in JSON - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param reply details about the response */ -typedef enum GNUNET_GenericReturnValue +typedef void (*TALER_BANK_CreditHistoryCallback)( void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t serial_id, - const struct TALER_BANK_CreditDetails *details, - const json_t *json); + const struct TALER_BANK_CreditHistoryResponse *reply); /** @@ -370,6 +411,11 @@ struct TALER_BANK_DebitHistoryHandle; struct TALER_BANK_DebitDetails { /** + * Serial ID of the wire transfer. + */ + uint64_t serial_id; + + /** * Amount that was transferred */ struct TALER_Amount amount; @@ -390,14 +436,12 @@ struct TALER_BANK_DebitDetails const char *exchange_base_url; /** - * payto://-URI of the source account that - * send the funds. + * payto://-URI of the source account that send the funds. */ const char *debit_account_uri; /** - * payto://-URI of the target account that - * received the funds. + * payto://-URI of the target account that received the funds. */ const char *credit_account_uri; @@ -405,29 +449,68 @@ struct TALER_BANK_DebitDetails /** + * Response details for a history request. + */ +struct TALER_BANK_DebitHistoryResponse +{ + + /** + * HTTP status. Note that #MHD_HTTP_OK and #MHD_HTTP_NO_CONTENT are both + * successful replies, but @e details will only contain @e success information + * if this is set to #MHD_HTTP_OK. + */ + unsigned int http_status; + + /** + * Taler error code, #TALER_EC_NONE on success. + */ + enum TALER_ErrorCode ec; + + /** + * Full response, NULL if body was not in JSON format. + */ + const json_t *response; + + /** + * Details returned depending on the @e http_status. + */ + union + { + + /** + * Details if status was #MHD_HTTP_OK + */ + struct + { + + /** + * Array of transactions initiated. + */ + const struct TALER_BANK_DebitDetails *details; + + /** + * Length of the @e details array. + */ + unsigned int details_length; + + } success; + + } details; + +}; + + +/** * Callbacks of this type are used to serve the result of asking * the bank for the debit transaction history. * * @param cls closure - * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request - * 0 if the bank's reply is bogus (fails to follow the protocol), - * #MHD_HTTP_NO_CONTENT if there are no more results; on success the - * last callback is always of this status (even if `abs(num_results)` were - * already returned). - * @param ec detailed error code - * @param serial_id monotonically increasing counter corresponding to the transaction - * @param details details about the wire transfer - * @param json detailed response from the HTTPD, or NULL if reply was not in JSON - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param reply details about the response */ -typedef enum GNUNET_GenericReturnValue +typedef void (*TALER_BANK_DebitHistoryCallback)( void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t serial_id, - const struct TALER_BANK_DebitDetails *details, - const json_t *json); + const struct TALER_BANK_DebitHistoryResponse *reply); /** diff --git a/src/testing/testing_api_cmd_bank_history_credit.c b/src/testing/testing_api_cmd_bank_history_credit.c index c65c84c13..9a61d6d53 100644 --- a/src/testing/testing_api_cmd_bank_history_credit.c +++ b/src/testing/testing_api_cmd_bank_history_credit.c @@ -370,99 +370,86 @@ check_result (struct History *h, * finally check it against what the bank returned. * * @param cls closure. - * @param http_status HTTP response code, #MHD_HTTP_OK (200) - * for successful status request 0 if the bank's reply is - * bogus (fails to follow the protocol), - * #MHD_HTTP_NO_CONTENT if there are no more results; on - * success the last callback is always of this status - * (even if `abs(num_results)` were already returned). - * @param ec taler status code. - * @param row_id monotonically increasing counter corresponding to - * the transaction. - * @param details details about the wire transfer. - * @param json detailed response from the HTTPD, or NULL if - * reply was not in JSON. - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param chr http response details */ -static enum GNUNET_GenericReturnValue +static void history_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t row_id, - const struct TALER_BANK_CreditDetails *details, - const json_t *json) + const struct TALER_BANK_CreditHistoryResponse *chr) { struct TALER_TESTING_Interpreter *is = cls; struct HistoryState *hs = is->commands[is->ip].cls; - (void) row_id; - if (NULL == details) + hs->hh = NULL; + switch (chr->http_status) { - hs->hh = NULL; - if ( (MHD_HTTP_NOT_FOUND == http_status) && - (0 == hs->total) ) + case 0: + GNUNET_break (0); + goto error; + case MHD_HTTP_OK: + for (unsigned int i = 0; i<chr->details.success.details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd = + &chr->details.success.details[i]; + + /* check current element */ + if (GNUNET_OK != + check_result (hs->h, + hs->total, + hs->results_obtained, + cd)) + { + GNUNET_break (0); + json_dumpf (chr->response, + stderr, + JSON_COMPACT); + hs->failed = true; + hs->hh = NULL; + TALER_TESTING_interpreter_fail (is); + return; + } + hs->results_obtained++; + } + TALER_TESTING_interpreter_next (is); + return; + case MHD_HTTP_NO_CONTENT: + if (0 == hs->total) { /* not found is OK for empty history */ TALER_TESTING_interpreter_next (is); - return GNUNET_OK; + return; } - if ( (hs->results_obtained != hs->total) || - (hs->failed) || - (MHD_HTTP_NO_CONTENT != http_status) ) + GNUNET_break (0); + goto error; + case MHD_HTTP_NOT_FOUND: + if (0 == hs->total) { - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Expected history of length %u, got %llu;" - " HTTP status code: %u/%d, failed: %d\n", - hs->total, - (unsigned long long) hs->results_obtained, - http_status, - (int) ec, - hs->failed ? 1 : 0); - print_expected (hs->h, - hs->total, - UINT_MAX); - TALER_TESTING_interpreter_fail (is); - return GNUNET_SYSERR; + /* not found is OK for empty history */ + TALER_TESTING_interpreter_next (is); + return; } - TALER_TESTING_interpreter_next (is); - return GNUNET_OK; - } - if (MHD_HTTP_OK != http_status) - { + GNUNET_break (0); + goto error; + default: hs->hh = NULL; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unwanted response code from /history/incoming: %u\n", - http_status); - TALER_TESTING_interpreter_fail (is); - return GNUNET_SYSERR; - } - - /* check current element */ - if (GNUNET_OK != - check_result (hs->h, - hs->total, - hs->results_obtained, - details)) - { - char *acc; - - GNUNET_break (0); - acc = json_dumps (json, - JSON_COMPACT); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Result %u was `%s'\n", - (unsigned int) hs->results_obtained++, - acc); - if (NULL != acc) - free (acc); - hs->failed = true; - hs->hh = NULL; + chr->http_status); TALER_TESTING_interpreter_fail (is); - return GNUNET_SYSERR; + return; } - hs->results_obtained++; - return GNUNET_OK; +error: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Expected history of length %u, got %llu;" + " HTTP status code: %u/%d, failed: %d\n", + hs->total, + (unsigned long long) hs->results_obtained, + chr->http_status, + (int) chr->ec, + hs->failed ? 1 : 0); + print_expected (hs->h, + hs->total, + UINT_MAX); + TALER_TESTING_interpreter_fail (is); } diff --git a/src/testing/testing_api_cmd_bank_history_debit.c b/src/testing/testing_api_cmd_bank_history_debit.c index cb3f68097..33b212ad0 100644 --- a/src/testing/testing_api_cmd_bank_history_debit.c +++ b/src/testing/testing_api_cmd_bank_history_debit.c @@ -362,99 +362,86 @@ check_result (struct History *h, * finally check it against what the bank returned. * * @param cls closure. - * @param http_status HTTP response code, #MHD_HTTP_OK (200) - * for successful status request 0 if the bank's reply is - * bogus (fails to follow the protocol), - * #MHD_HTTP_NO_CONTENT if there are no more results; on - * success the last callback is always of this status - * (even if `abs(num_results)` were already returned). - * @param ec taler status code. - * @param row_id monotonically increasing counter corresponding to - * the transaction. - * @param details details about the wire transfer. - * @param json detailed response from the HTTPD, or NULL if - * reply was not in JSON. - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param dhr http response details */ -static enum GNUNET_GenericReturnValue +static void history_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t row_id, - const struct TALER_BANK_DebitDetails *details, - const json_t *json) + const struct TALER_BANK_DebitHistoryResponse *dhr) { struct TALER_TESTING_Interpreter *is = cls; struct HistoryState *hs = is->commands[is->ip].cls; - (void) row_id; - if (NULL == details) + hs->hh = NULL; + switch (dhr->http_status) { - hs->hh = NULL; - if ( (MHD_HTTP_NOT_FOUND == http_status) && - (0 == hs->total) ) + case 0: + GNUNET_break (0); + goto error; + case MHD_HTTP_OK: + for (unsigned int i = 0; i<dhr->details.success.details_length; i++) + { + const struct TALER_BANK_DebitDetails *dd = + &dhr->details.success.details[i]; + + /* check current element */ + if (GNUNET_OK != + check_result (hs->h, + hs->total, + hs->results_obtained, + dd)) + { + GNUNET_break (0); + json_dumpf (dhr->response, + stderr, + JSON_COMPACT); + hs->failed = true; + hs->hh = NULL; + TALER_TESTING_interpreter_fail (is); + return; + } + hs->results_obtained++; + } + TALER_TESTING_interpreter_next (is); + return; + case MHD_HTTP_NO_CONTENT: + if (0 == hs->total) { /* not found is OK for empty history */ TALER_TESTING_interpreter_next (is); - return GNUNET_OK; + return; } - if ( (hs->results_obtained != hs->total) || - (GNUNET_YES == hs->failed) || - (MHD_HTTP_NO_CONTENT != http_status) ) + GNUNET_break (0); + goto error; + case MHD_HTTP_NOT_FOUND: + if (0 == hs->total) { - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Expected history of length %u, got %llu;" - " HTTP status code: %u/%d, failed: %d\n", - hs->total, - (unsigned long long) hs->results_obtained, - http_status, - (int) ec, - hs->failed); - print_expected (hs->h, - hs->total, - UINT_MAX); - TALER_TESTING_interpreter_fail (is); - return GNUNET_SYSERR; + /* not found is OK for empty history */ + TALER_TESTING_interpreter_next (is); + return; } - TALER_TESTING_interpreter_next (is); - return GNUNET_OK; - } - if (MHD_HTTP_OK != http_status) - { - hs->hh = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Unwanted response code from /history/outgoing: %u\n", - http_status); - TALER_TESTING_interpreter_fail (is); - return GNUNET_SYSERR; - } - - /* check current element */ - if (GNUNET_OK != - check_result (hs->h, - hs->total, - hs->results_obtained, - details)) - { - char *acc; - GNUNET_break (0); - acc = json_dumps (json, - JSON_COMPACT); - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Result %u was `%s'\n", - (unsigned int) hs->results_obtained++, - acc); - if (NULL != acc) - free (acc); - hs->failed = GNUNET_YES; + goto error; + default: hs->hh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Unwanted response code from /history/incoming: %u\n", + dhr->http_status); TALER_TESTING_interpreter_fail (is); - return GNUNET_SYSERR; + return; } - hs->results_obtained++; - return GNUNET_OK; +error: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Expected history of length %u, got %llu;" + " HTTP status code: %u/%d, failed: %d\n", + hs->total, + (unsigned long long) hs->results_obtained, + dhr->http_status, + (int) dhr->ec, + hs->failed ? 1 : 0); + print_expected (hs->h, + hs->total, + UINT_MAX); + TALER_TESTING_interpreter_fail (is); } diff --git a/src/testing/testing_api_cmd_common.c b/src/testing/testing_api_cmd_common.c index bb0eb3f04..91138f361 100644 --- a/src/testing/testing_api_cmd_common.c +++ b/src/testing/testing_api_cmd_common.c @@ -90,8 +90,7 @@ TALER_TESTING_history_entry_cmp ( (GNUNET_TIME_timestamp_cmp ( h1->details.history_details.request_timestamp, ==, - h2->details.history_details. - request_timestamp)) && + h2->details.history_details.request_timestamp)) && (0 == GNUNET_memcmp (&h1->details.history_details.reserve_sig, &h2->details.history_details.reserve_sig)) ) @@ -131,10 +130,42 @@ TALER_TESTING_history_entry_cmp ( return 0; return 1; case TALER_EXCHANGE_RTT_OPEN: - // FIXME: verify response... + if ( (0 == + TALER_amount_cmp (&h1->amount, + &h2->amount)) && + (GNUNET_TIME_timestamp_cmp ( + h1->details.open_request.request_timestamp, + ==, + h2->details.open_request.request_timestamp)) && + (GNUNET_TIME_timestamp_cmp ( + h1->details.open_request.reserve_expiration, + ==, + h2->details.open_request.reserve_expiration)) && + (h1->details.open_request.purse_limit == + h2->details.open_request.purse_limit) && + (0 == + TALER_amount_cmp (&h1->details.open_request.reserve_payment, + &h2->details.open_request.reserve_payment)) && + (0 == + GNUNET_memcmp (&h1->details.open_request.reserve_sig, + &h2->details.open_request.reserve_sig)) ) + return 0; return 1; case TALER_EXCHANGE_RTT_CLOSE: - // FIXME: verify response... + if ( (0 == + TALER_amount_cmp (&h1->amount, + &h2->amount)) && + (GNUNET_TIME_timestamp_cmp ( + h1->details.close_request.request_timestamp, + ==, + h2->details.close_request.request_timestamp)) && + (0 == + GNUNET_memcmp (&h1->details.close_request.target_account_h_payto, + &h2->details.close_request.target_account_h_payto)) && + (0 == + GNUNET_memcmp (&h1->details.close_request.reserve_sig, + &h2->details.close_request.reserve_sig)) ) + return 0; return 1; } GNUNET_assert (0); diff --git a/src/testing/testing_api_cmd_exec_wirewatch.c b/src/testing/testing_api_cmd_exec_wirewatch.c index cd31688d5..2517bf74d 100644 --- a/src/testing/testing_api_cmd_exec_wirewatch.c +++ b/src/testing/testing_api_cmd_exec_wirewatch.c @@ -71,6 +71,7 @@ wirewatch_run (void *cls, "-S", "1", "-w", "0", "-t", /* exit when done */ + "-L", "DEBUG", NULL); if (NULL == ws->wirewatch_proc) { |