diff options
author | Christian Grothoff <christian@grothoff.org> | 2024-09-06 15:52:15 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2024-09-06 15:52:15 +0200 |
commit | c3a9329d6e841e72b16273fff64743cba73f70e2 (patch) | |
tree | 4038d5646119c4689dd37008b92aa6a26de31da2 /src/backend/taler-merchant-reconciliation.c | |
parent | 7732fee20314a6deb2559632593ddd27f2c21b4c (diff) |
renaming taler-merchant-exchange to taler-merchant-reconciliation, preparing ground for new taler-merchant-kyccheck and taler-merchant-exchangekeyupdate helpers
Diffstat (limited to 'src/backend/taler-merchant-reconciliation.c')
-rw-r--r-- | src/backend/taler-merchant-reconciliation.c | 1309 |
1 files changed, 1309 insertions, 0 deletions
diff --git a/src/backend/taler-merchant-reconciliation.c b/src/backend/taler-merchant-reconciliation.c new file mode 100644 index 00000000..23c9a926 --- /dev/null +++ b/src/backend/taler-merchant-reconciliation.c @@ -0,0 +1,1309 @@ +/* + This file is part of TALER + Copyright (C) 2023 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +/** + * @file taler-merchant-reconciliation.c + * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange + * @author Christian Grothoff + */ +#include "platform.h" +#include <gnunet/gnunet_util_lib.h> +#include <jansson.h> +#include <pthread.h> +#include <taler/taler_dbevents.h> +#include "taler_merchant_bank_lib.h" +#include "taler_merchantdb_lib.h" +#include "taler_merchantdb_plugin.h" + +/** + * Timeout for the exchange interaction. Rather long as we should do + * long-polling and do not want to wake up too often. + */ +#define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \ + GNUNET_TIME_UNIT_MINUTES, \ + 30) + +/** + * How many inquiries do we process concurrently at most. + */ +#define OPEN_INQUIRY_LIMIT 1024 + +/** + * How many inquiries do we process concurrently per exchange at most. + */ +#define EXCHANGE_INQUIRY_LIMIT 16 + + +/** + * Information about an inquiry job. + */ +struct Inquiry; + + +/** + * Information about an exchange. + */ +struct Exchange +{ + /** + * Kept in a DLL. + */ + struct Exchange *next; + + /** + * Kept in a DLL. + */ + struct Exchange *prev; + + /** + * Head of active inquiries. + */ + struct Inquiry *w_head; + + /** + * Tail of active inquiries. + */ + struct Inquiry *w_tail; + + /** + * Which exchange are we tracking here. + */ + char *exchange_url; + + /** + * A connection to this exchange + */ + struct TALER_EXCHANGE_GetKeysHandle *conn; + + /** + * The keys of this exchange + */ + struct TALER_EXCHANGE_Keys *keys; + + /** + * Task where we retry fetching /keys from the exchange. + */ + struct GNUNET_SCHEDULER_Task *retry_task; + + /** + * How many active inquiries do we have right now with this exchange. + */ + unsigned int exchange_inquiries; + + /** + * How soon can may we, at the earliest, re-download /keys? + */ + struct GNUNET_TIME_Absolute first_retry; + + /** + * How long should we wait between the next retry? + */ + struct GNUNET_TIME_Relative retry_delay; + + /** + * How long should we wait between requests + * for transfer details? + */ + struct GNUNET_TIME_Relative transfer_delay; + + /** + * False to indicate that there is an ongoing + * /keys transfer we are waiting for; + * true to indicate that /keys data is up-to-date. + */ + bool ready; + +}; + + +/** + * Information about an inquiry job. + */ +struct Inquiry +{ + /** + * Kept in a DLL. + */ + struct Inquiry *next; + + /** + * Kept in a DLL. + */ + struct Inquiry *prev; + + /** + * Handle to the exchange that made the transfer. + */ + struct Exchange *exchange; + + /** + * Task where we retry fetching transfer details from the exchange. + */ + struct GNUNET_SCHEDULER_Task *task; + + /** + * For which merchant instance is this tracking request? + */ + char *instance_id; + + /** + * payto:// URI used for the transfer. + */ + char *payto_uri; + + /** + * Handle for the /wire/transfers request. + */ + struct TALER_EXCHANGE_TransfersGetHandle *wdh; + + /** + * When did the transfer happen? + */ + struct GNUNET_TIME_Timestamp execution_time; + + /** + * Argument for the /wire/transfers request. + */ + struct TALER_WireTransferIdentifierRawP wtid; + + /** + * Amount of the wire transfer. + */ + struct TALER_Amount total; + + /** + * Row of the wire transfer in our database. + */ + uint64_t rowid; + +}; + + +/** + * Head of known exchanges. + */ +static struct Exchange *e_head; + +/** + * Tail of known exchanges. + */ +static struct Exchange *e_tail; + +/** + * The merchant's configuration. + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Our database plugin. + */ +static struct TALER_MERCHANTDB_Plugin *db_plugin; + +/** + * Handle to the context for interacting with the bank. + */ +static struct GNUNET_CURL_Context *ctx; + +/** + * Scheduler context for running the @e ctx. + */ +static struct GNUNET_CURL_RescheduleContext *rc; + +/** + * Main task for #find_work(). + */ +static struct GNUNET_SCHEDULER_Task *task; + +/** + * Event handler to learn that there are new transfers + * to check. + */ +static struct GNUNET_DB_EventHandler *eh; + +/** + * How many active inquiries do we have right now. + */ +static unsigned int active_inquiries; + +/** + * Set to true if we ever encountered any problem. + */ +static bool found_problem; + +/** + * Value to return from main(). 0 on success, non-zero on errors. + */ +static int global_ret; + +/** + * #GNUNET_YES if we are in test mode and should exit when idle. + */ +static int test_mode; + +/** + * True if the last DB query was limited by the + * #OPEN_INQUIRY_LIMIT and we thus should check again + * as soon as we are substantially below that limit, + * and not only when we get a DB notification. + */ +static bool at_limit; + + +/** + * Initiate download from exchange. + * + * @param cls a `struct Inquiry *` + */ +static void +exchange_request (void *cls); + + +/** + * The exchange @a e is ready to handle more inquiries, + * prepare to launch them. + * + * @param[in,out] e exchange to potentially launch inquiries on + */ +static void +launch_inquiries_at_exchange (struct Exchange *e) +{ + for (struct Inquiry *w = e->w_head; + NULL != w; + w = w->next) + { + if (e->exchange_inquiries > EXCHANGE_INQUIRY_LIMIT) + break; + if ( (NULL == w->task) && + (NULL == w->wdh) ) + { + e->exchange_inquiries++; + w->task = GNUNET_SCHEDULER_add_now (&exchange_request, + w); + } + } +} + + +/** + * Function that initiates a /keys download. + * + * @param cls a `struct Exchange *` + */ +static void +download_keys (void *cls); + + +/** + * Function called with information about who is auditing + * a particular exchange and what keys the exchange is using. + * + * @param cls closure with a `struct Exchange *` + * @param kr response data + * @param[in] keys the keys of the exchange + */ +static void +cert_cb ( + void *cls, + const struct TALER_EXCHANGE_KeysResponse *kr, + struct TALER_EXCHANGE_Keys *keys) +{ + struct Exchange *e = cls; + struct GNUNET_TIME_Absolute n; + + e->conn = NULL; + switch (kr->hr.http_status) + { + case MHD_HTTP_OK: + e->ready = true; + TALER_EXCHANGE_keys_decref (e->keys); + e->keys = keys; + launch_inquiries_at_exchange (e); + /* Reset back-off */ + e->retry_delay = GNUNET_TIME_UNIT_ZERO; + /* Success: rate limit at once per minute */ + e->first_retry = GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MINUTES); + n = GNUNET_TIME_absolute_max (e->first_retry, + keys->key_data_expiration.abs_time); + if (NULL != e->retry_task) + GNUNET_SCHEDULER_cancel (e->retry_task); + e->retry_task = GNUNET_SCHEDULER_add_at (n, + &download_keys, + e); + break; + default: + e->retry_delay + = GNUNET_TIME_STD_BACKOFF (e->retry_delay); + e->first_retry + = GNUNET_TIME_relative_to_absolute (e->retry_delay); + if (NULL != e->retry_task) + GNUNET_SCHEDULER_cancel (e->retry_task); + e->retry_task = GNUNET_SCHEDULER_add_delayed (e->retry_delay, + &download_keys, + e); + break; + } +} + + +static void +download_keys (void *cls) +{ + struct Exchange *e = cls; + struct GNUNET_TIME_Relative n; + + /* If we do not hear back again soon, try again automatically */ + n = GNUNET_TIME_STD_BACKOFF (e->retry_delay); + n = GNUNET_TIME_relative_max (n, + GNUNET_TIME_UNIT_MINUTES); + e->retry_task = GNUNET_SCHEDULER_add_delayed (n, + &download_keys, + e); + if ( (NULL == e->keys) || + (GNUNET_TIME_absolute_is_past (e->keys->key_data_expiration.abs_time)) ) + e->conn = TALER_EXCHANGE_get_keys (ctx, + e->exchange_url, + e->keys, + &cert_cb, + e); +} + + +/** + * Updates the transaction status for inquiry @a w to the given values. + * + * @param w inquiry to update status for + * @param next_attempt when should we retry @a w (if ever) + * @param ec error code to use (if any) + * @param failed failure status (if ultimately failed) + * @param verified success status (if ultimately successful) + */ +static void +update_transaction_status (const struct Inquiry *w, + struct GNUNET_TIME_Absolute next_attempt, + enum TALER_ErrorCode ec, + bool failed, + bool verified) +{ + enum GNUNET_DB_QueryStatus qs; + + if (failed) + found_problem = true; + qs = db_plugin->update_transfer_status (db_plugin->cls, + w->exchange->exchange_url, + &w->wtid, + next_attempt, + ec, + failed, + verified); + if (qs < 0) + { + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * Lookup our internal data structure for the given + * @a exchange_url or create one if we do not yet have + * one. + * + * @param exchange_url base URL of the exchange + * @return our state for this exchange + */ +static struct Exchange * +find_exchange (const char *exchange_url) +{ + struct Exchange *e; + + for (e = e_head; NULL != e; e = e->next) + if (0 == strcmp (exchange_url, + e->exchange_url)) + return e; + e = GNUNET_new (struct Exchange); + e->exchange_url = GNUNET_strdup (exchange_url); + GNUNET_CONTAINER_DLL_insert (e_head, + e_tail, + e); + e->retry_task = GNUNET_SCHEDULER_add_now (&download_keys, + e); + return e; +} + + +/** + * Finds new transfers that require work in the merchant database. + * + * @param cls NULL + */ +static void +find_work (void *cls); + + +/** + * Free resources of @a w. + * + * @param[in] w inquiry job to terminate + */ +static void +end_inquiry (struct Inquiry *w) +{ + struct Exchange *e = w->exchange; + + GNUNET_assert (active_inquiries > 0); + active_inquiries--; + if (NULL != w->wdh) + { + TALER_EXCHANGE_transfers_get_cancel (w->wdh); + w->wdh = NULL; + } + GNUNET_free (w->instance_id); + GNUNET_free (w->payto_uri); + GNUNET_CONTAINER_DLL_remove (e->w_head, + e->w_tail, + w); + GNUNET_free (w); + if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) && + (NULL == task) && + (at_limit) ) + { + at_limit = false; + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&find_work, + NULL); + } + if ( (NULL == task) && + (! at_limit) && + (0 == active_inquiries) && + (test_mode) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No more open inquiries and in test mode. Existing.\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * We're being aborted with CTRL-C (or SIGTERM). Shut down. + * + * @param cls closure (NULL) + */ +static void +shutdown_task (void *cls) +{ + (void) cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Running shutdown\n"); + while (NULL != e_head) + { + struct Exchange *e = e_head; + + while (NULL != e->w_head) + { + struct Inquiry *w = e->w_head; + + end_inquiry (w); + } + GNUNET_free (e->exchange_url); + if (NULL != e->conn) + { + TALER_EXCHANGE_get_keys_cancel (e->conn); + e->conn = NULL; + } + if (NULL != e->keys) + { + TALER_EXCHANGE_keys_decref (e->keys); + e->keys = NULL; + } + if (NULL != e->retry_task) + { + GNUNET_SCHEDULER_cancel (e->retry_task); + e->retry_task = NULL; + } + GNUNET_CONTAINER_DLL_remove (e_head, + e_tail, + e); + GNUNET_free (e); + } + if (NULL != eh) + { + db_plugin->event_listen_cancel (eh); + eh = NULL; + } + if (NULL != task) + { + GNUNET_SCHEDULER_cancel (task); + task = NULL; + } + TALER_MERCHANTDB_plugin_unload (db_plugin); + db_plugin = NULL; + cfg = NULL; + if (NULL != ctx) + { + GNUNET_CURL_fini (ctx); + ctx = NULL; + } + if (NULL != rc) + { + GNUNET_CURL_gnunet_rc_destroy (rc); + rc = NULL; + } +} + + +/** + * Check that the given @a wire_fee is what the @a e should charge + * at the @a execution_time. If the fee is correct (according to our + * database), return #GNUNET_OK. If we do not have the fee structure in our + * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee + * is bogus, we respond with the proof to the client and return + * #GNUNET_SYSERR. + * + * @param w inquiry to check fees of + * @param execution_time time of the wire transfer + * @param wire_fee fee claimed by the exchange + * @return #GNUNET_SYSERR if we returned hard proof of + * missbehavior from the exchange to the client + */ +static enum GNUNET_GenericReturnValue +check_wire_fee (struct Inquiry *w, + struct GNUNET_TIME_Timestamp execution_time, + const struct TALER_Amount *wire_fee) +{ + struct Exchange *e = w->exchange; + const struct TALER_EXCHANGE_Keys *keys = e->keys; + struct TALER_WireFeeSet fees; + struct TALER_MasterSignatureP master_sig; + struct GNUNET_TIME_Timestamp start_date; + struct GNUNET_TIME_Timestamp end_date; + enum GNUNET_DB_QueryStatus qs; + char *wire_method; + + if (NULL == keys) + { + GNUNET_break (0); + return GNUNET_NO; + } + wire_method = TALER_payto_get_method (w->payto_uri); + qs = db_plugin->lookup_wire_fee (db_plugin->cls, + &keys->master_pub, + wire_method, + execution_time, + &fees, + &start_date, + &end_date, + &master_sig); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_free (wire_method); + return GNUNET_SYSERR; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_free (wire_method); + return GNUNET_NO; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n", + TALER_B2S (&keys->master_pub), + wire_method, + GNUNET_TIME_timestamp2s (execution_time), + TALER_amount2s (wire_fee)); + GNUNET_free (wire_method); + return GNUNET_OK; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + break; + } + if ( (GNUNET_OK != + TALER_amount_cmp_currency (&fees.wire, + wire_fee)) || + (0 > TALER_amount_cmp (&fees.wire, + wire_fee)) ) + { + GNUNET_break_op (0); + GNUNET_free (wire_method); + return GNUNET_SYSERR; /* expected_fee >= wire_fee */ + } + GNUNET_free (wire_method); + return GNUNET_OK; +} + + +/** + * Closure for #check_transfer() + */ +struct CheckTransferContext +{ + + /** + * Pointer to the detail that we are currently + * checking in #check_transfer(). + */ + const struct TALER_TrackTransferDetails *current_detail; + + /** + * Which transaction detail are we currently looking at? + */ + unsigned int current_offset; + + /** + * #GNUNET_NO if we did not find a matching coin. + * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. + * #GNUNET_OK if we did find a matching coin. + */ + enum GNUNET_GenericReturnValue check_transfer_result; + + /** + * Set to error code, if any. + */ + enum TALER_ErrorCode ec; + + /** + * Set to true if @e ec indicates a permanent failure. + */ + bool failure; +}; + + +/** + * This function checks that the information about the coin which + * was paid back by _this_ wire transfer matches what _we_ (the merchant) + * knew about this coin. + * + * @param cls closure with our `struct CheckTransferContext *` + * @param exchange_url URL of the exchange that issued @a coin_pub + * @param amount_with_fee amount the exchange will transfer for this coin + * @param deposit_fee fee the exchange will charge for this coin + * @param refund_fee fee the exchange will charge for refunding this coin + * @param wire_fee paid wire fee + * @param h_wire hash of merchant's wire details + * @param deposit_timestamp when did the exchange receive the deposit + * @param refund_deadline until when are refunds allowed + * @param exchange_sig signature by the exchange + * @param exchange_pub exchange signing key used for @a exchange_sig + */ +static void +check_transfer (void *cls, + const char *exchange_url, + const struct TALER_Amount *amount_with_fee, + const struct TALER_Amount *deposit_fee, + const struct TALER_Amount *refund_fee, + const struct TALER_Amount *wire_fee, + const struct TALER_MerchantWireHashP *h_wire, + struct GNUNET_TIME_Timestamp deposit_timestamp, + struct GNUNET_TIME_Timestamp refund_deadline, + const struct TALER_ExchangeSignatureP *exchange_sig, + const struct TALER_ExchangePublicKeyP *exchange_pub) +{ + struct CheckTransferContext *ctc = cls; + const struct TALER_TrackTransferDetails *ttd = ctc->current_detail; + + if (GNUNET_SYSERR == ctc->check_transfer_result) + { + GNUNET_break (0); + return; /* already had a serious issue; odd that we're called more than once as well... */ + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking coin with value %s\n", + TALER_amount2s (amount_with_fee)); + if ( (GNUNET_OK != + TALER_amount_cmp_currency (amount_with_fee, + &ttd->coin_value)) || + (0 != TALER_amount_cmp (amount_with_fee, + &ttd->coin_value)) ) + { + /* Disagreement between the exchange and us about how much this + coin is worth! */ + GNUNET_break_op (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Disagreement about coin value %s\n", + TALER_amount2s (amount_with_fee)); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Exchange gave it a value of %s\n", + TALER_amount2s (&ttd->coin_value)); + ctc->check_transfer_result = GNUNET_SYSERR; + /* Build the `TrackTransferConflictDetails` */ + ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; + ctc->failure = true; + /* FIXME: this should be reported to the auditor (once the auditor has an API for this) */ + return; + } + if ( (GNUNET_OK != + TALER_amount_cmp_currency (deposit_fee, + &ttd->coin_fee)) || + (0 != TALER_amount_cmp (deposit_fee, + &ttd->coin_fee)) ) + { + /* Disagreement between the exchange and us about how much this + coin is worth! */ + GNUNET_break_op (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Expected fee is %s\n", + TALER_amount2s (&ttd->coin_fee)); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Fee claimed by exchange is %s\n", + TALER_amount2s (deposit_fee)); + ctc->check_transfer_result = GNUNET_SYSERR; + /* Build the `TrackTransferConflictDetails` */ + ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; + ctc->failure = true; + /* FIXME: this should be reported to the auditor (once the auditor has an API for this) */ + return; + } + ctc->check_transfer_result = GNUNET_OK; +} + + +/** + * Function called with detailed wire transfer data, including all + * of the coin transactions that were combined into the wire transfer. + * + * @param cls closure a `struct Inquiry *` + * @param tgr response details + */ +static void +wire_transfer_cb (void *cls, + const struct TALER_EXCHANGE_TransfersGetResponse *tgr) +{ + struct Inquiry *w = cls; + struct Exchange *e = w->exchange; + const struct TALER_EXCHANGE_TransferData *td = NULL; + + e->exchange_inquiries--; + w->wdh = NULL; + if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) + launch_inquiries_at_exchange (e); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got response code %u from exchange for GET /transfers/$WTID\n", + tgr->hr.http_status); + switch (tgr->hr.http_status) + { + case MHD_HTTP_OK: + td = &tgr->details.ok.td; + w->execution_time = td->execution_time; + e->transfer_delay = GNUNET_TIME_UNIT_ZERO; + break; + case MHD_HTTP_BAD_REQUEST: + case MHD_HTTP_FORBIDDEN: + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_HARD_FAILURE, + true, + false); + end_inquiry (w); + return; + case MHD_HTTP_NOT_FOUND: + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_FATAL_NOT_FOUND, + true, + false); + end_inquiry (w); + return; + case MHD_HTTP_INTERNAL_SERVER_ERROR: + case MHD_HTTP_BAD_GATEWAY: + case MHD_HTTP_GATEWAY_TIMEOUT: + e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute ( + e->transfer_delay), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, + false, + false); + end_inquiry (w); + return; + default: + e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Unexpected HTTP status %u\n", + tgr->hr.http_status); + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute ( + e->transfer_delay), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, + false, + false); + end_inquiry (w); + return; + } + db_plugin->preflight (db_plugin->cls); + + { + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->insert_transfer_details (db_plugin->cls, + w->instance_id, + w->exchange->exchange_url, + w->payto_uri, + &w->wtid, + td); + if (0 > qs) + { + /* Always report on DB error as well to enable diagnostics */ + GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transfer already known. Ignoring duplicate.\n"); + return; + } + } + + { + struct CheckTransferContext ctc = { + .ec = TALER_EC_NONE, + .failure = false + }; + + for (unsigned int i = 0; i<td->details_length; i++) + { + const struct TALER_TrackTransferDetails *ttd = &td->details[i]; + enum GNUNET_DB_QueryStatus qs; + + if (TALER_EC_NONE != ctc.ec) + break; /* already encountered an error */ + ctc.current_offset = i; + ctc.current_detail = ttd; + /* Set the coin as "never seen" before. */ + ctc.check_transfer_result = GNUNET_NO; + qs = db_plugin->lookup_deposits_by_contract_and_coin ( + db_plugin->cls, + w->instance_id, + &ttd->h_contract_terms, + &ttd->coin_pub, + &check_transfer, + &ctc); + switch (qs) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_break (0); + ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; + break; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* The exchange says we made this deposit, but WE do not + recall making it (corrupted / unreliable database?)! + Well, let's say thanks and accept the money! */ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to find payment data in DB\n"); + ctc.check_transfer_result = GNUNET_OK; + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + break; + } + switch (ctc.check_transfer_result) + { + case GNUNET_NO: + /* Internal error: how can we have called #check_transfer() + but still have no result? */ + GNUNET_break (0); + ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE; + return; + case GNUNET_SYSERR: + /* #check_transfer() failed, report conflict! */ + GNUNET_break_op (0); + GNUNET_assert (TALER_EC_NONE != ctc.ec); + break; + case GNUNET_OK: + break; + } + } + if (TALER_EC_NONE != ctc.ec) + { + update_transaction_status ( + w, + ctc.failure + ? GNUNET_TIME_UNIT_FOREVER_ABS + : GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MINUTES), + ctc.ec, + ctc.failure, + false); + end_inquiry (w); + return; + } + } + + if (GNUNET_SYSERR == + check_wire_fee (w, + td->execution_time, + &td->wire_fee)) + { + GNUNET_break_op (0); + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE, + true, + false); + end_inquiry (w); + return; + } + + if ( (GNUNET_OK != + TALER_amount_cmp_currency (&td->total_amount, + &w->total)) || + (0 != + TALER_amount_cmp (&td->total_amount, + &w->total)) ) + { + GNUNET_break_op (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Wire transfer total value was %s\n", + TALER_amount2s (&w->total)); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Exchange claimed total value to be %s\n", + TALER_amount2s (&td->total_amount)); + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_CONFLICTING_TRANSFERS, + true, + false); + end_inquiry (w); + return; + } + /* set transaction to successful */ + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_NONE, + false, + true); + end_inquiry (w); +} + + +/** + * Initiate download from an exchange for a given inquiry. + * + * @param cls a `struct Inquiry *` + */ +static void +exchange_request (void *cls) +{ + struct Inquiry *w = cls; + struct Exchange *e = w->exchange; + + w->task = NULL; + GNUNET_assert (e->ready); + w->wdh = TALER_EXCHANGE_transfers_get ( + ctx, + e->exchange_url, + e->keys, + &w->wtid, + &wire_transfer_cb, + w); + if (NULL == w->wdh) + { + GNUNET_break (0); + e->exchange_inquiries--; + e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute ( + e->transfer_delay), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, + false, + false); + end_inquiry (w); + return; + } + /* Wait at least 1m for the network transfer */ + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MINUTES), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST, + false, + false); +} + + +/** + * Function called with information about a transfer we + * should ask the exchange about. + * + * @param cls closure (NULL) + * @param rowid row of the transfer in the merchant database + * @param instance_id instance that received the transfer + * @param exchange_url base URL of the exchange that initiated the transfer + * @param payto_uri account of the merchant that received the transfer + * @param wtid wire transfer subject identifying the aggregation + * @param total total amount that was wired + * @param next_attempt when should we next try to interact with the exchange + */ +static void +start_inquiry ( + void *cls, + uint64_t rowid, + const char *instance_id, + const char *exchange_url, + const char *payto_uri, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_Amount *total, + struct GNUNET_TIME_Absolute next_attempt) +{ + struct Exchange *e; + struct Inquiry *w; + + (void) cls; + if (GNUNET_TIME_absolute_is_future (next_attempt)) + { + if (NULL == task) + task = GNUNET_SCHEDULER_add_at (next_attempt, + &find_work, + NULL); + return; + } + e = find_exchange (exchange_url); + for (w = e->w_head; NULL != w; w = w->next) + { + if (0 == GNUNET_memcmp (&w->wtid, + wtid)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Already processing inquiry. Aborting ongoing inquiry\n"); + end_inquiry (w); + break; + } + } + + active_inquiries++; + w = GNUNET_new (struct Inquiry); + w->payto_uri = GNUNET_strdup (payto_uri); + w->instance_id = GNUNET_strdup (instance_id); + w->rowid = rowid; + w->wtid = *wtid; + w->total = *total; + GNUNET_CONTAINER_DLL_insert (e->w_head, + e->w_tail, + w); + w->exchange = e; + if (w->exchange->ready) + w->task = GNUNET_SCHEDULER_add_now (&exchange_request, + w); + /* Wait at least 1 minute for /keys */ + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MINUTES), + TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS, + false, + false); +} + + +static void +find_work (void *cls) +{ + enum GNUNET_DB_QueryStatus qs; + int limit; + + (void) cls; + task = NULL; + GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries); + limit = OPEN_INQUIRY_LIMIT - active_inquiries; + if (0 == limit) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Not looking for work: at limit\n"); + at_limit = true; + return; + } + at_limit = false; + qs = db_plugin->select_open_transfers (db_plugin->cls, + limit, + &start_inquiry, + NULL); + if (qs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain open transfers from database\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + if (qs == limit) + { + /* DB limited response, re-trigger DB interaction + the moment we significantly fall below the + limit */ + at_limit = true; + } + if (0 == active_inquiries) + { + if (test_mode) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No more open inquiries and in test mode. Existing.\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + GNUNET_log ( + GNUNET_ERROR_TYPE_INFO, + "No open inquiries found, waiting for notification to resume\n"); + } +} + + +/** + * Function called when transfers are added to the merchant database. We look + * for more work. + * + * @param cls closure (NULL) + * @param extra additional event data provided + * @param extra_size number of bytes in @a extra + */ +static void +transfer_added (void *cls, + const void *extra, + size_t extra_size) +{ + (void) cls; + (void) extra; + (void) extra_size; + if (active_inquiries > OPEN_INQUIRY_LIMIT / 2) + { + /* Trigger DB only once we are substantially below the limit */ + at_limit = true; + return; + } + if (NULL != task) + return; + task = GNUNET_SCHEDULER_add_now (&find_work, + NULL); +} + + +/** + * First task. + * + * @param cls closure, NULL + * @param args remaining command-line arguments + * @param cfgfile name of the configuration file used (for saving, can be NULL!) + * @param c configuration + */ +static void +run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + (void) args; + (void) cfgfile; + + cfg = c; + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); + ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, + &rc); + rc = GNUNET_CURL_gnunet_rc_create (ctx); + if (NULL == ctx) + { + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_FAILURE; + return; + } + if (NULL == + (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to initialize DB subsystem\n"); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NOTCONFIGURED; + return; + } + if (GNUNET_OK != + db_plugin->connect (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to connect to database\n"); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_FAILURE; + return; + } + { + struct GNUNET_DB_EventHeaderP es = { + .size = htons (sizeof (es)), + .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED) + }; + + eh = db_plugin->event_listen (db_plugin->cls, + &es, + GNUNET_TIME_UNIT_FOREVER_REL, + &transfer_added, + NULL); + } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&find_work, + NULL); +} + + +/** + * The main function of taler-merchant-reconciliation + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, + char *const *argv) +{ + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_timetravel ('T', + "timetravel"), + GNUNET_GETOPT_option_flag ('t', + "test", + "run in test mode and exit when idle", + &test_mode), + GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), + GNUNET_GETOPT_OPTION_END + }; + enum GNUNET_GenericReturnValue ret; + + if (GNUNET_OK != + GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) + return EXIT_INVALIDARGUMENT; + TALER_OS_init (); + ret = GNUNET_PROGRAM_run ( + argc, argv, + "taler-merchant-reconciliation", + gettext_noop ( + "background process that reconciles bank transfers with orders by asking the exchange"), + options, + &run, NULL); + GNUNET_free_nz ((void *) argv); + if (GNUNET_SYSERR == ret) + return EXIT_INVALIDARGUMENT; + if (GNUNET_NO == ret) + return EXIT_SUCCESS; + if ( (found_problem) && + (0 == global_ret) ) + global_ret = 7; + return global_ret; +} + + +/* end of taler-merchant-reconciliation.c */ |