aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2023-04-28 15:03:59 +0200
committerChristian Grothoff <christian@grothoff.org>2023-04-28 15:04:27 +0200
commit5d59b885fd6683c248d89df55891009f0b460bf5 (patch)
tree42742004b22f6013abbcb82f1b089359d8de57fe
parentcc8b85435bc93b6660f62a8a885aa80aea570d28 (diff)
-rate limiting
-rw-r--r--src/backend/taler-merchant-exchange.c197
1 files changed, 147 insertions, 50 deletions
diff --git a/src/backend/taler-merchant-exchange.c b/src/backend/taler-merchant-exchange.c
index ab4bc03f..66893f03 100644
--- a/src/backend/taler-merchant-exchange.c
+++ b/src/backend/taler-merchant-exchange.c
@@ -35,6 +35,15 @@
GNUNET_TIME_UNIT_MINUTES, \
30)
+/**
+ * How many inquiries do we process concurrently at most.
+ */
+#define OPEN_INQUIRY_LIMIT 1024
+
+/**
+ * How many inquiries do we process concurrently per exchange at most.
+ */
+#define EXCHANGE_INQUIRY_LIMIT 16
/**
* Information about an exchange.
@@ -62,6 +71,16 @@ struct Exchange
struct TALER_EXCHANGE_Handle *conn;
/**
+ * Task where we retry fetching /keys from the exchange.
+ */
+ struct GNUNET_SCHEDULER_Task *retry_task;
+
+ /**
+ * How many active inquiries do we have right now with this exchange.
+ */
+ unsigned int exchange_inquiries;
+
+ /**
* How soon can may we, at the earliest, re-download /keys?
*/
struct GNUNET_TIME_Absolute first_retry;
@@ -72,11 +91,6 @@ struct Exchange
struct GNUNET_TIME_Relative retry_delay;
/**
- * Task where we retry fetching /keys from the exchange.
- */
- struct GNUNET_SCHEDULER_Task *retry_task;
-
- /**
* False to indicate that there is an ongoing
* /keys transfer we are waiting for;
* true to indicate that /keys data is up-to-date.
@@ -107,45 +121,45 @@ struct Inquiry
struct Exchange *exchange;
/**
- * When did the transfer happen?
+ * For which merchant instance is this tracking request?
*/
- struct GNUNET_TIME_Absolute execution_time;
+ char *instance_id;
/**
- * Argument for the /wire/transfers request.
+ * payto:// URI used for the transfer.
*/
- struct TALER_WireTransferIdentifierRawP wtid;
+ char *payto_uri;
/**
- * Amount of the wire transfer.
+ * Handle for the /wire/transfers request.
*/
- struct TALER_Amount total;
+ struct TALER_EXCHANGE_TransfersGetHandle *wdh;
/**
- * For which merchant instance is this tracking request?
+ * Pointer to the detail that we are currently
+ * checking in #check_transfer().
*/
- char *instance_id;
+ const struct TALER_TrackTransferDetails *current_detail;
/**
- * payto:// URI used for the transfer.
+ * When did the transfer happen?
*/
- char *payto_uri;
+ struct GNUNET_TIME_Absolute execution_time;
/**
- * Row of the wire transfer in our database.
+ * Argument for the /wire/transfers request.
*/
- uint64_t rowid;
+ struct TALER_WireTransferIdentifierRawP wtid;
/**
- * Handle for the /wire/transfers request.
+ * Amount of the wire transfer.
*/
- struct TALER_EXCHANGE_TransfersGetHandle *wdh;
+ struct TALER_Amount total;
/**
- * Pointer to the detail that we are currently
- * checking in #check_transfer().
+ * Row of the wire transfer in our database.
*/
- const struct TALER_TrackTransferDetails *current_detail;
+ uint64_t rowid;
/**
* Which transaction detail are we currently looking at?
@@ -220,6 +234,11 @@ static struct GNUNET_SCHEDULER_Task *task;
static struct GNUNET_DB_EventHandler *eh;
/**
+ * How many active inquiries do we have right now.
+ */
+static unsigned int active_inquiries;
+
+/**
* Value to return from main(). 0 on success, non-zero on errors.
*/
static int global_ret;
@@ -245,6 +264,38 @@ exchange_request (void *cls);
/**
+ * The exchange @a e is ready to handle more inquiries,
+ * prepare to launch them.
+ *
+ * @param e exchange to potentially launch inquiries on
+ */
+static void
+launch_inquiries_at_exchange (const struct Exchange *e)
+{
+ /* Note: this is an O(n) that should be optimized to an O(1) by tracking
+ inquiries per exchange at the exchange... */
+ for (struct Inquiry w = w_head;
+ NULL != w;
+ w = w->next)
+ {
+ if (w->exchange_done)
+ continue;
+ if (w->exchange != e)
+ continue;
+ if ( (NULL == w->task) &&
+ (NULL == w->wdh) )
+ {
+ /* Note: we should additionally count the number of exchange
+ requests launched here to not then run into the per-exchange
+ limit at exchange_request */
+ w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
+ w);
+ }
+ }
+}
+
+
+/**
* Function called with information about who is auditing
* a particular exchange and what keys the exchange is using.
*
@@ -267,21 +318,7 @@ cert_cb (
{
case MHD_HTTP_OK:
e->ready = true;
- for (struct Inquiry w = w_head;
- NULL != w;
- w = w->next)
- {
- if (w->exchange_done)
- continue;
- if (w->exchange != e)
- continue;
- if ( (NULL == w->task) &&
- (NULL == w->wdh) )
- {
- w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
- w);
- }
- }
+ launch_inquiries_at_exchange (e);
// FIXME: schedule retry?
break;
default:
@@ -292,6 +329,26 @@ cert_cb (
/**
+ * Updates the transaction status for inquiry @a w to the given values.
+ *
+ * @param w inquiry to update status for
+ * @param next_attempt when should we retry @a w (if ever)
+ * @param ec error code to use (if any)
+ * @param failed failure status (if ultimately failed)
+ * @param verified success status (if ultimately successful)
+ */
+static void
+update_transaction_status (const struct Inquiry *w,
+ struct GNUNET_TIME_Absolute next_attempt,
+ enum TALER_ErrorCode ec,
+ bool failed,
+ bool verified)
+{
+ // FIXME: DB update here. Log result, on failure shutdown.
+}
+
+
+/**
* Lookup our internal data structure for the given
* @a exchange_url or create one if we do not yet have
* one.
@@ -330,6 +387,8 @@ find_exchange (const char *exchange_url)
static void
end_inquiry (struct Inquiry *w)
{
+ GNUNET_assert (active_inquiries > 0);
+ active_inquiries--;
if (NULL != w->wdh)
{
TALER_EXCHANGE_transfers_get_cancel (w->wdh);
@@ -479,8 +538,12 @@ wire_transfer_cb (void *cls,
const struct TALER_EXCHANGE_TransferData *td)
{
struct Inquiry *w = cls;
+ struct Exchange *e = w->exchange;
enum GNUNET_DB_QueryStatus qs;
+ e->exchange_inquiries--;
+ if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries)
+ launch_inquiries_at_exchange (e);
w->wdh = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got response code %u from exchange for GET /transfers/$WTID\n",
@@ -490,10 +553,18 @@ wire_transfer_cb (void *cls,
case MHD_HTTP_OK:
break;
case MHD_HTTP_NOT_FOUND:
- // FIXME: record permanent failure in DB!
+ update_transaction_status (w,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ TALER_EC_MERCHANT_XXX,
+ true,
+ false);
return;
default:
- // FIXME: record transient failure in DB!
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (delay),
+ TALER_EC_MERCHANT_XXX,
+ false,
+ false);
return;
}
TMH_db->preflight (TMH_db->cls);
@@ -508,7 +579,8 @@ wire_transfer_cb (void *cls,
{
/* Always report on DB error as well to enable diagnostics */
GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs);
- // FIXME: shutdown?
+ global_ret = 1;
+ GNUNET_SCHEDULER_shutdown ();
return;
}
if (0 == qs)
@@ -521,10 +593,20 @@ wire_transfer_cb (void *cls,
TALER_amount_cmp (&td->total_amount,
&w->total))
{
- // FIXME: record inconsistency in DB!
+ /* record inconsistency in DB! (TODO: report to auditor!?) */
+ update_transaction_status (w,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ TALER_EC_MERCHANT_XXX,
+ true,
+ false);
return;
}
- // FIXME: record success in DB!
+ /* set transaction to successful */
+ update_transaction_status (w,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ TALER_EC_NONE,
+ false,
+ true);
}
@@ -537,21 +619,31 @@ static void
exchange_request (void *cls)
{
struct Inquiry *w = cls;
+ struct Exchange *e = w->exchange;
- GNUNET_assert (w->exchange->ready);
- // FIXME: rate-limit number of parallel requests per
- // exchange!
- w->wdh = TALER_EXCHANGE_transfers_get (w->exchange->conn,
+ GNUNET_assert (e->ready);
+ if (EXCHANGE_INQUIRY_LIMIT <= e->exchange_inquiries)
+ return; /* blocked by exchange rate limit */
+ w->wdh = TALER_EXCHANGE_transfers_get (e->conn,
&w->wtid,
&wire_transfer_cb,
w);
if (NULL == w->wdh)
{
GNUNET_break (0);
- /* FIXME: update DB: status: failed on exchange! */
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (delay),
+ TALER_EC_MERCHANT_XXX,
+ false,
+ false);
return;
}
- /* FIXME: update DB: status: waiting on exchange /transfers */
+ e->exchange_inquiries++;
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (delay),
+ TALER_EC_MERCHANT_EXCHANGE_GET_TRANSFER_IN_PROGRESS,
+ false,
+ false);
}
@@ -582,6 +674,7 @@ start_inquiry (
struct Inquiry *w;
(void) cls;
+ active_inquiries++;
w = GNUNET_new (struct Inquiry);
w->payto_uri = GNUNET_strdup (payto_uri);
w->instance_id = GNUNET_strdup (instance_id);
@@ -595,7 +688,11 @@ start_inquiry (
if (w->exchange->ready)
w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
w);
- /* FIXME: update DB: status: waiting on exchange /keys */
+ update_transaction_status (w,
+ GNUNET_TIME_relative_to_absolute (delay),
+ TALER_EC_MERCHANT_EXCHANGE_KEYS_IN_PROGRESS,
+ false,
+ false);
}
@@ -614,7 +711,7 @@ find_work (void *cls)
// NOTE: SELECT WHERE confirmed AND NOT verified AND NOT failed?;
// FIXME: use LIMIT clause! => When do we try again if LIMIT applied?
qs = db_plugin->select_open_transfers (db_plugin->cls,
- LIMIT - active,
+ OPEN_INQUIRY_LIMIT - active_inquiries,
&start_inquiry,
NULL);
if (qs < 0)