diff options
author | Christian Grothoff <christian@grothoff.org> | 2023-04-28 15:03:59 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2023-04-28 15:04:27 +0200 |
commit | 5d59b885fd6683c248d89df55891009f0b460bf5 (patch) | |
tree | 42742004b22f6013abbcb82f1b089359d8de57fe | |
parent | cc8b85435bc93b6660f62a8a885aa80aea570d28 (diff) |
-rate limiting
-rw-r--r-- | src/backend/taler-merchant-exchange.c | 197 |
1 files changed, 147 insertions, 50 deletions
diff --git a/src/backend/taler-merchant-exchange.c b/src/backend/taler-merchant-exchange.c index ab4bc03f..66893f03 100644 --- a/src/backend/taler-merchant-exchange.c +++ b/src/backend/taler-merchant-exchange.c @@ -35,6 +35,15 @@ GNUNET_TIME_UNIT_MINUTES, \ 30) +/** + * How many inquiries do we process concurrently at most. + */ +#define OPEN_INQUIRY_LIMIT 1024 + +/** + * How many inquiries do we process concurrently per exchange at most. + */ +#define EXCHANGE_INQUIRY_LIMIT 16 /** * Information about an exchange. @@ -62,6 +71,16 @@ struct Exchange struct TALER_EXCHANGE_Handle *conn; /** + * Task where we retry fetching /keys from the exchange. + */ + struct GNUNET_SCHEDULER_Task *retry_task; + + /** + * How many active inquiries do we have right now with this exchange. + */ + unsigned int exchange_inquiries; + + /** * How soon can may we, at the earliest, re-download /keys? */ struct GNUNET_TIME_Absolute first_retry; @@ -72,11 +91,6 @@ struct Exchange struct GNUNET_TIME_Relative retry_delay; /** - * Task where we retry fetching /keys from the exchange. - */ - struct GNUNET_SCHEDULER_Task *retry_task; - - /** * False to indicate that there is an ongoing * /keys transfer we are waiting for; * true to indicate that /keys data is up-to-date. @@ -107,45 +121,45 @@ struct Inquiry struct Exchange *exchange; /** - * When did the transfer happen? + * For which merchant instance is this tracking request? */ - struct GNUNET_TIME_Absolute execution_time; + char *instance_id; /** - * Argument for the /wire/transfers request. + * payto:// URI used for the transfer. */ - struct TALER_WireTransferIdentifierRawP wtid; + char *payto_uri; /** - * Amount of the wire transfer. + * Handle for the /wire/transfers request. */ - struct TALER_Amount total; + struct TALER_EXCHANGE_TransfersGetHandle *wdh; /** - * For which merchant instance is this tracking request? + * Pointer to the detail that we are currently + * checking in #check_transfer(). */ - char *instance_id; + const struct TALER_TrackTransferDetails *current_detail; /** - * payto:// URI used for the transfer. + * When did the transfer happen? */ - char *payto_uri; + struct GNUNET_TIME_Absolute execution_time; /** - * Row of the wire transfer in our database. + * Argument for the /wire/transfers request. */ - uint64_t rowid; + struct TALER_WireTransferIdentifierRawP wtid; /** - * Handle for the /wire/transfers request. + * Amount of the wire transfer. */ - struct TALER_EXCHANGE_TransfersGetHandle *wdh; + struct TALER_Amount total; /** - * Pointer to the detail that we are currently - * checking in #check_transfer(). + * Row of the wire transfer in our database. */ - const struct TALER_TrackTransferDetails *current_detail; + uint64_t rowid; /** * Which transaction detail are we currently looking at? @@ -220,6 +234,11 @@ static struct GNUNET_SCHEDULER_Task *task; static struct GNUNET_DB_EventHandler *eh; /** + * How many active inquiries do we have right now. + */ +static unsigned int active_inquiries; + +/** * Value to return from main(). 0 on success, non-zero on errors. */ static int global_ret; @@ -245,6 +264,38 @@ exchange_request (void *cls); /** + * The exchange @a e is ready to handle more inquiries, + * prepare to launch them. + * + * @param e exchange to potentially launch inquiries on + */ +static void +launch_inquiries_at_exchange (const struct Exchange *e) +{ + /* Note: this is an O(n) that should be optimized to an O(1) by tracking + inquiries per exchange at the exchange... */ + for (struct Inquiry w = w_head; + NULL != w; + w = w->next) + { + if (w->exchange_done) + continue; + if (w->exchange != e) + continue; + if ( (NULL == w->task) && + (NULL == w->wdh) ) + { + /* Note: we should additionally count the number of exchange + requests launched here to not then run into the per-exchange + limit at exchange_request */ + w->task = GNUNET_SCHEDULER_add_now (&exchange_request, + w); + } + } +} + + +/** * Function called with information about who is auditing * a particular exchange and what keys the exchange is using. * @@ -267,21 +318,7 @@ cert_cb ( { case MHD_HTTP_OK: e->ready = true; - for (struct Inquiry w = w_head; - NULL != w; - w = w->next) - { - if (w->exchange_done) - continue; - if (w->exchange != e) - continue; - if ( (NULL == w->task) && - (NULL == w->wdh) ) - { - w->task = GNUNET_SCHEDULER_add_now (&exchange_request, - w); - } - } + launch_inquiries_at_exchange (e); // FIXME: schedule retry? break; default: @@ -292,6 +329,26 @@ cert_cb ( /** + * Updates the transaction status for inquiry @a w to the given values. + * + * @param w inquiry to update status for + * @param next_attempt when should we retry @a w (if ever) + * @param ec error code to use (if any) + * @param failed failure status (if ultimately failed) + * @param verified success status (if ultimately successful) + */ +static void +update_transaction_status (const struct Inquiry *w, + struct GNUNET_TIME_Absolute next_attempt, + enum TALER_ErrorCode ec, + bool failed, + bool verified) +{ + // FIXME: DB update here. Log result, on failure shutdown. +} + + +/** * Lookup our internal data structure for the given * @a exchange_url or create one if we do not yet have * one. @@ -330,6 +387,8 @@ find_exchange (const char *exchange_url) static void end_inquiry (struct Inquiry *w) { + GNUNET_assert (active_inquiries > 0); + active_inquiries--; if (NULL != w->wdh) { TALER_EXCHANGE_transfers_get_cancel (w->wdh); @@ -479,8 +538,12 @@ wire_transfer_cb (void *cls, const struct TALER_EXCHANGE_TransferData *td) { struct Inquiry *w = cls; + struct Exchange *e = w->exchange; enum GNUNET_DB_QueryStatus qs; + e->exchange_inquiries--; + if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) + launch_inquiries_at_exchange (e); w->wdh = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got response code %u from exchange for GET /transfers/$WTID\n", @@ -490,10 +553,18 @@ wire_transfer_cb (void *cls, case MHD_HTTP_OK: break; case MHD_HTTP_NOT_FOUND: - // FIXME: record permanent failure in DB! + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_MERCHANT_XXX, + true, + false); return; default: - // FIXME: record transient failure in DB! + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute (delay), + TALER_EC_MERCHANT_XXX, + false, + false); return; } TMH_db->preflight (TMH_db->cls); @@ -508,7 +579,8 @@ wire_transfer_cb (void *cls, { /* Always report on DB error as well to enable diagnostics */ GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); - // FIXME: shutdown? + global_ret = 1; + GNUNET_SCHEDULER_shutdown (); return; } if (0 == qs) @@ -521,10 +593,20 @@ wire_transfer_cb (void *cls, TALER_amount_cmp (&td->total_amount, &w->total)) { - // FIXME: record inconsistency in DB! + /* record inconsistency in DB! (TODO: report to auditor!?) */ + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_MERCHANT_XXX, + true, + false); return; } - // FIXME: record success in DB! + /* set transaction to successful */ + update_transaction_status (w, + GNUNET_TIME_UNIT_FOREVER_ABS, + TALER_EC_NONE, + false, + true); } @@ -537,21 +619,31 @@ static void exchange_request (void *cls) { struct Inquiry *w = cls; + struct Exchange *e = w->exchange; - GNUNET_assert (w->exchange->ready); - // FIXME: rate-limit number of parallel requests per - // exchange! - w->wdh = TALER_EXCHANGE_transfers_get (w->exchange->conn, + GNUNET_assert (e->ready); + if (EXCHANGE_INQUIRY_LIMIT <= e->exchange_inquiries) + return; /* blocked by exchange rate limit */ + w->wdh = TALER_EXCHANGE_transfers_get (e->conn, &w->wtid, &wire_transfer_cb, w); if (NULL == w->wdh) { GNUNET_break (0); - /* FIXME: update DB: status: failed on exchange! */ + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute (delay), + TALER_EC_MERCHANT_XXX, + false, + false); return; } - /* FIXME: update DB: status: waiting on exchange /transfers */ + e->exchange_inquiries++; + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute (delay), + TALER_EC_MERCHANT_EXCHANGE_GET_TRANSFER_IN_PROGRESS, + false, + false); } @@ -582,6 +674,7 @@ start_inquiry ( struct Inquiry *w; (void) cls; + active_inquiries++; w = GNUNET_new (struct Inquiry); w->payto_uri = GNUNET_strdup (payto_uri); w->instance_id = GNUNET_strdup (instance_id); @@ -595,7 +688,11 @@ start_inquiry ( if (w->exchange->ready) w->task = GNUNET_SCHEDULER_add_now (&exchange_request, w); - /* FIXME: update DB: status: waiting on exchange /keys */ + update_transaction_status (w, + GNUNET_TIME_relative_to_absolute (delay), + TALER_EC_MERCHANT_EXCHANGE_KEYS_IN_PROGRESS, + false, + false); } @@ -614,7 +711,7 @@ find_work (void *cls) // NOTE: SELECT WHERE confirmed AND NOT verified AND NOT failed?; // FIXME: use LIMIT clause! => When do we try again if LIMIT applied? qs = db_plugin->select_open_transfers (db_plugin->cls, - LIMIT - active, + OPEN_INQUIRY_LIMIT - active_inquiries, &start_inquiry, NULL); if (qs < 0) |