/* This file is part of TALER Copyright (C) 2023-2024 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ /** * @file taler-merchant-reconciliation.c * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange * @author Christian Grothoff */ #include "platform.h" #include #include #include #include #include "taler_merchant_bank_lib.h" #include "taler_merchantdb_lib.h" #include "taler_merchantdb_plugin.h" /** * Timeout for the exchange interaction. Rather long as we should do * long-polling and do not want to wake up too often. */ #define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \ GNUNET_TIME_UNIT_MINUTES, \ 30) /** * How many inquiries do we process concurrently at most. */ #define OPEN_INQUIRY_LIMIT 1024 /** * How many inquiries do we process concurrently per exchange at most. */ #define EXCHANGE_INQUIRY_LIMIT 16 /** * Information about an inquiry job. */ struct Inquiry; /** * Information about an exchange. */ struct Exchange { /** * Kept in a DLL. */ struct Exchange *next; /** * Kept in a DLL. */ struct Exchange *prev; /** * Head of active inquiries. */ struct Inquiry *w_head; /** * Tail of active inquiries. */ struct Inquiry *w_tail; /** * Which exchange are we tracking here. */ char *exchange_url; /** * The keys of this exchange */ struct TALER_EXCHANGE_Keys *keys; /** * How many active inquiries do we have right now with this exchange. */ unsigned int exchange_inquiries; /** * How long should we wait between requests * for transfer details? */ struct GNUNET_TIME_Relative transfer_delay; }; /** * Information about an inquiry job. */ struct Inquiry { /** * Kept in a DLL. */ struct Inquiry *next; /** * Kept in a DLL. */ struct Inquiry *prev; /** * Handle to the exchange that made the transfer. */ struct Exchange *exchange; /** * Task where we retry fetching transfer details from the exchange. */ struct GNUNET_SCHEDULER_Task *task; /** * For which merchant instance is this tracking request? */ char *instance_id; /** * payto:// URI used for the transfer. */ struct TALER_FullPayto payto_uri; /** * Handle for the /wire/transfers request. */ struct TALER_EXCHANGE_TransfersGetHandle *wdh; /** * When did the transfer happen? */ struct GNUNET_TIME_Timestamp execution_time; /** * Argument for the /wire/transfers request. */ struct TALER_WireTransferIdentifierRawP wtid; /** * Amount of the wire transfer. */ struct TALER_Amount total; /** * Row of the wire transfer in our database. */ uint64_t rowid; }; /** * Head of known exchanges. */ static struct Exchange *e_head; /** * Tail of known exchanges. */ static struct Exchange *e_tail; /** * The merchant's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Our database plugin. */ static struct TALER_MERCHANTDB_Plugin *db_plugin; /** * Handle to the context for interacting with the bank. */ static struct GNUNET_CURL_Context *ctx; /** * Scheduler context for running the @e ctx. */ static struct GNUNET_CURL_RescheduleContext *rc; /** * Main task for #find_work(). */ static struct GNUNET_SCHEDULER_Task *task; /** * Event handler to learn that there are new transfers * to check. */ static struct GNUNET_DB_EventHandler *eh; /** * Event handler to learn that there may be new exchange * keys to check. */ static struct GNUNET_DB_EventHandler *eh_keys; /** * How many active inquiries do we have right now. */ static unsigned int active_inquiries; /** * Set to true if we ever encountered any problem. */ static bool found_problem; /** * Value to return from main(). 0 on success, non-zero on errors. */ static int global_ret; /** * #GNUNET_YES if we are in test mode and should exit when idle. */ static int test_mode; /** * True if the last DB query was limited by the * #OPEN_INQUIRY_LIMIT and we thus should check again * as soon as we are substantially below that limit, * and not only when we get a DB notification. */ static bool at_limit; /** * Initiate download from exchange. * * @param cls a `struct Inquiry *` */ static void exchange_request (void *cls); /** * The exchange @a e is ready to handle more inquiries, * prepare to launch them. * * @param[in,out] e exchange to potentially launch inquiries on */ static void launch_inquiries_at_exchange (struct Exchange *e) { for (struct Inquiry *w = e->w_head; NULL != w; w = w->next) { if (e->exchange_inquiries > EXCHANGE_INQUIRY_LIMIT) break; if ( (NULL == w->task) && (NULL == w->wdh) ) { e->exchange_inquiries++; w->task = GNUNET_SCHEDULER_add_now (&exchange_request, w); } } } /** * Updates the transaction status for inquiry @a w to the given values. * * @param w inquiry to update status for * @param next_attempt when should we retry @a w (if ever) * @param ec error code to use (if any) * @param failed failure status (if ultimately failed) * @param verified success status (if ultimately successful) */ static void update_transaction_status (const struct Inquiry *w, struct GNUNET_TIME_Absolute next_attempt, enum TALER_ErrorCode ec, bool failed, bool verified) { enum GNUNET_DB_QueryStatus qs; if (failed) found_problem = true; qs = db_plugin->update_transfer_status (db_plugin->cls, w->exchange->exchange_url, &w->wtid, next_attempt, ec, failed, verified); if (qs < 0) { GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } } /** * Interact with the database to get the current set * of exchange keys known to us. * * @param e the exchange to check */ static void sync_keys (struct Exchange *e) { enum GNUNET_DB_QueryStatus qs; struct TALER_EXCHANGE_Keys *keys; qs = db_plugin->select_exchange_keys (db_plugin->cls, e->exchange_url, &keys); if (qs < 0) { GNUNET_break (0); return; } if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { GNUNET_break (0); return; } TALER_EXCHANGE_keys_decref (e->keys); e->keys = keys; launch_inquiries_at_exchange (e); } /** * Lookup our internal data structure for the given * @a exchange_url or create one if we do not yet have * one. * * @param exchange_url base URL of the exchange * @return our state for this exchange */ static struct Exchange * find_exchange (const char *exchange_url) { struct Exchange *e; for (e = e_head; NULL != e; e = e->next) if (0 == strcmp (exchange_url, e->exchange_url)) return e; e = GNUNET_new (struct Exchange); e->exchange_url = GNUNET_strdup (exchange_url); GNUNET_CONTAINER_DLL_insert (e_head, e_tail, e); sync_keys (e); return e; } /** * Finds new transfers that require work in the merchant database. * * @param cls NULL */ static void find_work (void *cls); /** * Free resources of @a w. * * @param[in] w inquiry job to terminate */ static void end_inquiry (struct Inquiry *w) { struct Exchange *e = w->exchange; GNUNET_assert (active_inquiries > 0); active_inquiries--; if (NULL != w->wdh) { TALER_EXCHANGE_transfers_get_cancel (w->wdh); w->wdh = NULL; } GNUNET_free (w->instance_id); GNUNET_free (w->payto_uri.full_payto); GNUNET_CONTAINER_DLL_remove (e->w_head, e->w_tail, w); GNUNET_free (w); if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) && (NULL == task) && (at_limit) ) { at_limit = false; GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&find_work, NULL); } if ( (NULL == task) && (! at_limit) && (0 == active_inquiries) && (test_mode) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more open inquiries and in test mode. Existing.\n"); GNUNET_SCHEDULER_shutdown (); return; } } /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. * * @param cls closure (NULL) */ static void shutdown_task (void *cls) { (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); while (NULL != e_head) { struct Exchange *e = e_head; while (NULL != e->w_head) { struct Inquiry *w = e->w_head; end_inquiry (w); } GNUNET_free (e->exchange_url); if (NULL != e->keys) { TALER_EXCHANGE_keys_decref (e->keys); e->keys = NULL; } GNUNET_CONTAINER_DLL_remove (e_head, e_tail, e); GNUNET_free (e); } if (NULL != eh) { db_plugin->event_listen_cancel (eh); eh = NULL; } if (NULL != eh_keys) { db_plugin->event_listen_cancel (eh_keys); eh_keys = NULL; } if (NULL != task) { GNUNET_SCHEDULER_cancel (task); task = NULL; } TALER_MERCHANTDB_plugin_unload (db_plugin); db_plugin = NULL; cfg = NULL; if (NULL != ctx) { GNUNET_CURL_fini (ctx); ctx = NULL; } if (NULL != rc) { GNUNET_CURL_gnunet_rc_destroy (rc); rc = NULL; } } /** * Check that the given @a wire_fee is what the @a e should charge * at the @a execution_time. If the fee is correct (according to our * database), return #GNUNET_OK. If we do not have the fee structure in our * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee * is bogus, we respond with the proof to the client and return * #GNUNET_SYSERR. * * @param w inquiry to check fees of * @param execution_time time of the wire transfer * @param wire_fee fee claimed by the exchange * @return #GNUNET_SYSERR if we returned hard proof of * missbehavior from the exchange to the client */ static enum GNUNET_GenericReturnValue check_wire_fee (struct Inquiry *w, struct GNUNET_TIME_Timestamp execution_time, const struct TALER_Amount *wire_fee) { struct Exchange *e = w->exchange; const struct TALER_EXCHANGE_Keys *keys = e->keys; struct TALER_WireFeeSet fees; struct TALER_MasterSignatureP master_sig; struct GNUNET_TIME_Timestamp start_date; struct GNUNET_TIME_Timestamp end_date; enum GNUNET_DB_QueryStatus qs; char *wire_method; if (NULL == keys) { GNUNET_break (0); return GNUNET_NO; } wire_method = TALER_payto_get_method (w->payto_uri.full_payto); qs = db_plugin->lookup_wire_fee (db_plugin->cls, &keys->master_pub, wire_method, execution_time, &fees, &start_date, &end_date, &master_sig); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); GNUNET_free (wire_method); return GNUNET_SYSERR; case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_free (wire_method); return GNUNET_NO; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n", TALER_B2S (&keys->master_pub), wire_method, GNUNET_TIME_timestamp2s (execution_time), TALER_amount2s (wire_fee)); GNUNET_free (wire_method); return GNUNET_OK; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: break; } if ( (GNUNET_OK != TALER_amount_cmp_currency (&fees.wire, wire_fee)) || (0 > TALER_amount_cmp (&fees.wire, wire_fee)) ) { GNUNET_break_op (0); GNUNET_free (wire_method); return GNUNET_SYSERR; /* expected_fee >= wire_fee */ } GNUNET_free (wire_method); return GNUNET_OK; } /** * Closure for #check_transfer() */ struct CheckTransferContext { /** * Pointer to the detail that we are currently * checking in #check_transfer(). */ const struct TALER_TrackTransferDetails *current_detail; /** * Which transaction detail are we currently looking at? */ unsigned int current_offset; /** * #GNUNET_NO if we did not find a matching coin. * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. * #GNUNET_OK if we did find a matching coin. */ enum GNUNET_GenericReturnValue check_transfer_result; /** * Set to error code, if any. */ enum TALER_ErrorCode ec; /** * Set to true if @e ec indicates a permanent failure. */ bool failure; }; /** * This function checks that the information about the coin which * was paid back by _this_ wire transfer matches what _we_ (the merchant) * knew about this coin. * * @param cls closure with our `struct CheckTransferContext *` * @param exchange_url URL of the exchange that issued @a coin_pub * @param amount_with_fee amount the exchange will transfer for this coin * @param deposit_fee fee the exchange will charge for this coin * @param refund_fee fee the exchange will charge for refunding this coin * @param wire_fee paid wire fee * @param h_wire hash of merchant's wire details * @param deposit_timestamp when did the exchange receive the deposit * @param refund_deadline until when are refunds allowed * @param exchange_sig signature by the exchange * @param exchange_pub exchange signing key used for @a exchange_sig */ static void check_transfer (void *cls, const char *exchange_url, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *deposit_fee, const struct TALER_Amount *refund_fee, const struct TALER_Amount *wire_fee, const struct TALER_MerchantWireHashP *h_wire, struct GNUNET_TIME_Timestamp deposit_timestamp, struct GNUNET_TIME_Timestamp refund_deadline, const struct TALER_ExchangeSignatureP *exchange_sig, const struct TALER_ExchangePublicKeyP *exchange_pub) { struct CheckTransferContext *ctc = cls; const struct TALER_TrackTransferDetails *ttd = ctc->current_detail; if (GNUNET_SYSERR == ctc->check_transfer_result) { GNUNET_break (0); return; /* already had a serious issue; odd that we're called more than once as well... */ } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking coin with value %s\n", TALER_amount2s (amount_with_fee)); if ( (GNUNET_OK != TALER_amount_cmp_currency (amount_with_fee, &ttd->coin_value)) || (0 != TALER_amount_cmp (amount_with_fee, &ttd->coin_value)) ) { /* Disagreement between the exchange and us about how much this coin is worth! */ GNUNET_break_op (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Disagreement about coin value %s\n", TALER_amount2s (amount_with_fee)); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Exchange gave it a value of %s\n", TALER_amount2s (&ttd->coin_value)); ctc->check_transfer_result = GNUNET_SYSERR; /* Build the `TrackTransferConflictDetails` */ ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; ctc->failure = true; /* FIXME: this should be reported to the auditor (once the auditor has an API for this) */ return; } if ( (GNUNET_OK != TALER_amount_cmp_currency (deposit_fee, &ttd->coin_fee)) || (0 != TALER_amount_cmp (deposit_fee, &ttd->coin_fee)) ) { /* Disagreement between the exchange and us about how much this coin is worth! */ GNUNET_break_op (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Expected fee is %s\n", TALER_amount2s (&ttd->coin_fee)); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Fee claimed by exchange is %s\n", TALER_amount2s (deposit_fee)); ctc->check_transfer_result = GNUNET_SYSERR; /* Build the `TrackTransferConflictDetails` */ ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; ctc->failure = true; /* FIXME: this should be reported to the auditor (once the auditor has an API for this) */ return; } ctc->check_transfer_result = GNUNET_OK; } /** * Function called with detailed wire transfer data, including all * of the coin transactions that were combined into the wire transfer. * * @param cls closure a `struct Inquiry *` * @param tgr response details */ static void wire_transfer_cb (void *cls, const struct TALER_EXCHANGE_TransfersGetResponse *tgr) { struct Inquiry *w = cls; struct Exchange *e = w->exchange; const struct TALER_EXCHANGE_TransferData *td = NULL; e->exchange_inquiries--; w->wdh = NULL; if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) launch_inquiries_at_exchange (e); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got response code %u from exchange for GET /transfers/$WTID\n", tgr->hr.http_status); switch (tgr->hr.http_status) { case MHD_HTTP_OK: td = &tgr->details.ok.td; w->execution_time = td->execution_time; e->transfer_delay = GNUNET_TIME_UNIT_ZERO; break; case MHD_HTTP_BAD_REQUEST: case MHD_HTTP_FORBIDDEN: update_transaction_status (w, GNUNET_TIME_UNIT_FOREVER_ABS, TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_HARD_FAILURE, true, false); end_inquiry (w); return; case MHD_HTTP_NOT_FOUND: update_transaction_status (w, GNUNET_TIME_UNIT_FOREVER_ABS, TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_FATAL_NOT_FOUND, true, false); end_inquiry (w); return; case MHD_HTTP_INTERNAL_SERVER_ERROR: case MHD_HTTP_BAD_GATEWAY: case MHD_HTTP_GATEWAY_TIMEOUT: e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); update_transaction_status (w, GNUNET_TIME_relative_to_absolute ( e->transfer_delay), TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, false, false); end_inquiry (w); return; default: e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unexpected HTTP status %u\n", tgr->hr.http_status); update_transaction_status (w, GNUNET_TIME_relative_to_absolute ( e->transfer_delay), TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, false, false); end_inquiry (w); return; } db_plugin->preflight (db_plugin->cls); { enum GNUNET_DB_QueryStatus qs; qs = db_plugin->insert_transfer_details (db_plugin->cls, w->instance_id, w->exchange->exchange_url, w->payto_uri, &w->wtid, td); if (0 > qs) { /* Always report on DB error as well to enable diagnostics */ GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Transfer already known. Ignoring duplicate.\n"); return; } } { struct CheckTransferContext ctc = { .ec = TALER_EC_NONE, .failure = false }; for (unsigned int i = 0; idetails_length; i++) { const struct TALER_TrackTransferDetails *ttd = &td->details[i]; enum GNUNET_DB_QueryStatus qs; if (TALER_EC_NONE != ctc.ec) break; /* already encountered an error */ ctc.current_offset = i; ctc.current_detail = ttd; /* Set the coin as "never seen" before. */ ctc.check_transfer_result = GNUNET_NO; qs = db_plugin->lookup_deposits_by_contract_and_coin ( db_plugin->cls, w->instance_id, &ttd->h_contract_terms, &ttd->coin_pub, &check_transfer, &ctc); switch (qs) { case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_break (0); ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; break; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; break; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* The exchange says we made this deposit, but WE do not recall making it (corrupted / unreliable database?)! Well, let's say thanks and accept the money! */ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to find payment data in DB\n"); ctc.check_transfer_result = GNUNET_OK; break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: break; } switch (ctc.check_transfer_result) { case GNUNET_NO: /* Internal error: how can we have called #check_transfer() but still have no result? */ GNUNET_break (0); ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE; return; case GNUNET_SYSERR: /* #check_transfer() failed, report conflict! */ GNUNET_break_op (0); GNUNET_assert (TALER_EC_NONE != ctc.ec); break; case GNUNET_OK: break; } } if (TALER_EC_NONE != ctc.ec) { update_transaction_status ( w, ctc.failure ? GNUNET_TIME_UNIT_FOREVER_ABS : GNUNET_TIME_relative_to_absolute ( GNUNET_TIME_UNIT_MINUTES), ctc.ec, ctc.failure, false); end_inquiry (w); return; } } if (GNUNET_SYSERR == check_wire_fee (w, td->execution_time, &td->wire_fee)) { GNUNET_break_op (0); update_transaction_status (w, GNUNET_TIME_UNIT_FOREVER_ABS, TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE, true, false); end_inquiry (w); return; } if ( (GNUNET_OK != TALER_amount_cmp_currency (&td->total_amount, &w->total)) || (0 != TALER_amount_cmp (&td->total_amount, &w->total)) ) { GNUNET_break_op (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Wire transfer total value was %s\n", TALER_amount2s (&w->total)); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Exchange claimed total value to be %s\n", TALER_amount2s (&td->total_amount)); update_transaction_status (w, GNUNET_TIME_UNIT_FOREVER_ABS, TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_CONFLICTING_TRANSFERS, true, false); end_inquiry (w); return; } /* set transaction to successful */ update_transaction_status (w, GNUNET_TIME_UNIT_FOREVER_ABS, TALER_EC_NONE, false, true); end_inquiry (w); } /** * Initiate download from an exchange for a given inquiry. * * @param cls a `struct Inquiry *` */ static void exchange_request (void *cls) { struct Inquiry *w = cls; struct Exchange *e = w->exchange; w->task = NULL; if (NULL == e->keys) return; w->wdh = TALER_EXCHANGE_transfers_get ( ctx, e->exchange_url, e->keys, &w->wtid, &wire_transfer_cb, w); if (NULL == w->wdh) { GNUNET_break (0); e->exchange_inquiries--; e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); update_transaction_status (w, GNUNET_TIME_relative_to_absolute ( e->transfer_delay), TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, false, false); end_inquiry (w); return; } /* Wait at least 1m for the network transfer */ update_transaction_status (w, GNUNET_TIME_relative_to_absolute ( GNUNET_TIME_UNIT_MINUTES), TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST, false, false); } /** * Function called with information about a transfer we * should ask the exchange about. * * @param cls closure (NULL) * @param rowid row of the transfer in the merchant database * @param instance_id instance that received the transfer * @param exchange_url base URL of the exchange that initiated the transfer * @param payto_uri account of the merchant that received the transfer * @param wtid wire transfer subject identifying the aggregation * @param total total amount that was wired * @param next_attempt when should we next try to interact with the exchange */ static void start_inquiry ( void *cls, uint64_t rowid, const char *instance_id, const char *exchange_url, struct TALER_FullPayto payto_uri, const struct TALER_WireTransferIdentifierRawP *wtid, const struct TALER_Amount *total, struct GNUNET_TIME_Absolute next_attempt) { struct Exchange *e; struct Inquiry *w; (void) cls; if (GNUNET_TIME_absolute_is_future (next_attempt)) { if (NULL == task) task = GNUNET_SCHEDULER_add_at (next_attempt, &find_work, NULL); return; } e = find_exchange (exchange_url); for (w = e->w_head; NULL != w; w = w->next) { if (0 == GNUNET_memcmp (&w->wtid, wtid)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Already processing inquiry. Aborting ongoing inquiry\n"); end_inquiry (w); break; } } active_inquiries++; w = GNUNET_new (struct Inquiry); w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); w->instance_id = GNUNET_strdup (instance_id); w->rowid = rowid; w->wtid = *wtid; w->total = *total; GNUNET_CONTAINER_DLL_insert (e->w_head, e->w_tail, w); w->exchange = e; if (NULL != w->exchange->keys) w->task = GNUNET_SCHEDULER_add_now (&exchange_request, w); /* Wait at least 1 minute for /keys */ update_transaction_status (w, GNUNET_TIME_relative_to_absolute ( GNUNET_TIME_UNIT_MINUTES), TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS, false, false); } static void find_work (void *cls) { enum GNUNET_DB_QueryStatus qs; int limit; (void) cls; task = NULL; GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries); limit = OPEN_INQUIRY_LIMIT - active_inquiries; if (0 == limit) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Not looking for work: at limit\n"); at_limit = true; return; } at_limit = false; qs = db_plugin->select_open_transfers (db_plugin->cls, limit, &start_inquiry, NULL); if (qs < 0) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain open transfers from database\n"); GNUNET_SCHEDULER_shutdown (); return; } if (qs == limit) { /* DB limited response, re-trigger DB interaction the moment we significantly fall below the limit */ at_limit = true; } if (0 == active_inquiries) { if (test_mode) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more open inquiries and in test mode. Existing.\n"); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_log ( GNUNET_ERROR_TYPE_INFO, "No open inquiries found, waiting for notification to resume\n"); } } /** * Function called when transfers are added to the merchant database. We look * for more work. * * @param cls closure (NULL) * @param extra additional event data provided * @param extra_size number of bytes in @a extra */ static void transfer_added (void *cls, const void *extra, size_t extra_size) { (void) cls; (void) extra; (void) extra_size; if (active_inquiries > OPEN_INQUIRY_LIMIT / 2) { /* Trigger DB only once we are substantially below the limit */ at_limit = true; return; } if (NULL != task) return; task = GNUNET_SCHEDULER_add_now (&find_work, NULL); } /** * Function called when keys were changed in the * merchant database. Updates ours. * * @param cls closure (NULL) * @param extra additional event data provided * @param extra_size number of bytes in @a extra */ static void keys_changed (void *cls, const void *extra, size_t extra_size) { const char *url = extra; struct Exchange *e; (void) cls; if ( (NULL == extra) || (0 == extra_size) ) { GNUNET_break (0); return; } if ('\0' != url[extra_size - 1]) { GNUNET_break (0); return; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Received keys change notification: reload `%s'\n", url); e = find_exchange (url); sync_keys (e); } /** * First task. * * @param cls closure, NULL * @param args remaining command-line arguments * @param cfgfile name of the configuration file used (for saving, can be NULL!) * @param c configuration */ static void run (void *cls, char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { (void) args; (void) cfgfile; cfg = c; GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); rc = GNUNET_CURL_gnunet_rc_create (ctx); if (NULL == ctx) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_FAILURE; return; } if (NULL == (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to initialize DB subsystem\n"); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_NOTCONFIGURED; return; } if (GNUNET_OK != db_plugin->connect (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to connect to database. Consider running taler-merchant-dbinit!\n"); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_FAILURE; return; } { struct GNUNET_DB_EventHeaderP es = { .size = htons (sizeof (es)), .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED) }; eh = db_plugin->event_listen (db_plugin->cls, &es, GNUNET_TIME_UNIT_FOREVER_REL, &transfer_added, NULL); } { struct GNUNET_DB_EventHeaderP es = { .size = htons (sizeof (es)), .type = htons (TALER_DBEVENT_MERCHANT_EXCHANGE_KEYS) }; eh_keys = db_plugin->event_listen (db_plugin->cls, &es, GNUNET_TIME_UNIT_FOREVER_REL, &keys_changed, NULL); } GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&find_work, NULL); } /** * The main function of taler-merchant-reconciliation * * @param argc number of arguments from the command line * @param argv command line arguments * @return 0 ok, 1 on error */ int main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; TALER_OS_init (); ret = GNUNET_PROGRAM_run ( argc, argv, "taler-merchant-reconciliation", gettext_noop ( "background process that reconciles bank transfers with orders by asking the exchange"), options, &run, NULL); if (GNUNET_SYSERR == ret) return EXIT_INVALIDARGUMENT; if (GNUNET_NO == ret) return EXIT_SUCCESS; if ( (found_problem) && (0 == global_ret) ) global_ret = 7; return global_ret; } /* end of taler-merchant-reconciliation.c */