aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-06-22 13:15:50 +0200
committerChristian Grothoff <christian@grothoff.org>2021-06-22 13:15:50 +0200
commit0caf3ac2b7716b55da43b492d0a24e33694724c5 (patch)
treeafc09b24e7c8eb62639292e24302d10a5139bba4
parentc9a928fe357a9f2c9e9b679ea18f3b394b492031 (diff)
-fix up wirewatch logic
-rw-r--r--src/exchange/taler-exchange-wirewatch.c19
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c112
2 files changed, 113 insertions, 18 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 5d35eba57..03f6e9e87 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -116,7 +116,7 @@ struct WireAccount
/**
* How much do we incremnt @e batch_size on success?
*/
- unsigned int batch_increment;
+ unsigned int batch_thresh;
/**
* How many transactions did we see in the current batch?
@@ -375,8 +375,8 @@ handle_soft_error (struct WireAccount *wa)
wa->session);
if (1 < wa->batch_size)
{
+ wa->batch_thresh = wa->batch_size;
wa->batch_size /= 2;
- wa->batch_increment = 0;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Reduced batch size to %llu due to serialization issue\n",
(unsigned long long) wa->batch_size);
@@ -451,9 +451,13 @@ do_commit (struct WireAccount *wa)
wa->session = NULL; /* should not be needed */
if (wa->batch_size < MAXIMUM_BATCH_SIZE)
{
- wa->batch_increment++;
+ int delta;
+
+ delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
+ if (delta < 0)
+ delta = -delta;
wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
- wa->batch_size + wa->batch_increment);
+ wa->batch_size + delta + 1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
@@ -669,9 +673,9 @@ find_transfers (void *cls)
}
}
if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- session,
- "wirewatch check for incoming wire transfers"))
+ db_plugin->start_read_committed (db_plugin->cls,
+ session,
+ "wirewatch check for incoming wire transfers"))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
@@ -679,6 +683,7 @@ find_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
+
limit = GNUNET_MIN (wa_pos->batch_size,
wa_pos->shard_end - wa_pos->batch_start);
GNUNET_assert (NULL == wa_pos->hh);
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index d06fc6a1e..886d26ed4 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -426,7 +426,6 @@ postgres_get_session (void *cls)
" WHERE reserve_pub=$1"
" LIMIT 1;",
1),
- /* Used in #postgres_reserves_in_insert() when the reserve is new */
GNUNET_PQ_make_prepare ("reserve_create",
"INSERT INTO reserves "
"(reserve_pub"
@@ -2571,6 +2570,47 @@ postgres_start (void *cls,
/**
+ * Start a READ COMMITTED transaction.
+ *
+ * @param cls the `struct PostgresClosure` with the plugin-specific state
+ * @param session the database connection
+ * @param name unique name identifying the transaction (for debugging)
+ * must point to a constant
+ * @return #GNUNET_OK on success
+ */
+static int
+postgres_start_read_committed (void *cls,
+ struct TALER_EXCHANGEDB_Session *session,
+ const char *name)
+{
+ struct GNUNET_PQ_ExecuteStatement es[] = {
+ GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"),
+ GNUNET_PQ_EXECUTE_STATEMENT_END
+ };
+
+ (void) cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting transaction named: %s\n",
+ name);
+ postgres_preflight (cls,
+ session);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting transaction on %p\n",
+ session->conn);
+ if (GNUNET_OK !=
+ GNUNET_PQ_exec_statements (session->conn,
+ es))
+ {
+ TALER_LOG_ERROR ("Failed to start transaction\n");
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ session->transaction_name = name;
+ return GNUNET_OK;
+}
+
+
+/**
* Roll back the current transaction of a database connection.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
@@ -3446,11 +3486,18 @@ postgres_reserves_in_insert (void *cls,
enum GNUNET_DB_QueryStatus qs1;
struct TALER_EXCHANGEDB_Reserve reserve;
struct GNUNET_TIME_Absolute expiry;
+ struct GNUNET_TIME_Absolute gc;
+ struct GNUNET_TIME_Absolute now;
+ now = GNUNET_TIME_absolute_get ();
+ (void) GNUNET_TIME_round_abs (&now);
reserve.pub = *reserve_pub;
expiry = GNUNET_TIME_absolute_add (execution_time,
pg->idle_reserve_expiration_time);
(void) GNUNET_TIME_round_abs (&expiry);
+ gc = GNUNET_TIME_absolute_add (now,
+ pg->legal_reserve_expiration_time);
+ (void) GNUNET_TIME_round_abs (&gc);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating reserve %s with expiration in %s\n",
TALER_B2S (reserve_pub),
@@ -3467,7 +3514,7 @@ postgres_reserves_in_insert (void *cls,
GNUNET_PQ_query_param_string (sender_account_details),
TALER_PQ_query_param_amount (balance),
TALER_PQ_query_param_absolute_time (&expiry),
- TALER_PQ_query_param_absolute_time (&expiry),
+ TALER_PQ_query_param_absolute_time (&gc),
GNUNET_PQ_query_param_end
};
@@ -3505,6 +3552,13 @@ postgres_reserves_in_insert (void *cls,
}
if (0 >= qs2)
{
+ if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) &&
+ (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) )
+ {
+ GNUNET_break (0); /* should be impossible: reserve was fresh,
+ but transaction already known */
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
/* Transaction was already known or error. We are finished. */
return qs2;
}
@@ -3515,6 +3569,22 @@ postgres_reserves_in_insert (void *cls,
/* we were wrong with our optimistic assumption:
reserve does exist, need to do an update instead */
{
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = postgres_commit (cls,
+ session);
+ if (cs < 0)
+ return cs;
+ if (GNUNET_OK !=
+ postgres_start (cls,
+ session,
+ "reserve-update-serializable"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
+ {
enum GNUNET_DB_QueryStatus reserve_exists;
reserve_exists = postgres_reserves_get (cls,
@@ -3560,7 +3630,7 @@ postgres_reserves_in_insert (void *cls,
updated_reserve.expiry = GNUNET_TIME_absolute_max (expiry,
reserve.expiry);
(void) GNUNET_TIME_round_abs (&updated_reserve.expiry);
- updated_reserve.gc = GNUNET_TIME_absolute_max (updated_reserve.expiry,
+ updated_reserve.gc = GNUNET_TIME_absolute_max (gc,
reserve.gc);
(void) GNUNET_TIME_round_abs (&updated_reserve.gc);
qs3 = reserves_update (cls,
@@ -3581,8 +3651,26 @@ postgres_reserves_in_insert (void *cls,
/* continued below */
break;
}
- return qs3;
}
+
+ /* Go back to original transaction mode */
+ {
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = postgres_commit (cls,
+ session);
+ if (cs < 0)
+ return cs;
+ if (GNUNET_OK !=
+ postgres_start_read_committed (cls,
+ session,
+ "reserve-insert-continued"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
@@ -3698,7 +3786,7 @@ postgres_insert_withdraw_info (
struct PostgresClosure *pg = cls;
struct TALER_EXCHANGEDB_Reserve reserve;
struct GNUNET_TIME_Absolute now;
- struct GNUNET_TIME_Absolute expiry;
+ struct GNUNET_TIME_Absolute gc;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&collectable->h_coin_envelope),
GNUNET_PQ_query_param_auto_from_type (&collectable->denom_pub_hash),
@@ -3749,9 +3837,9 @@ postgres_insert_withdraw_info (
TALER_B2S (&collectable->reserve_pub));
return GNUNET_DB_STATUS_HARD_ERROR;
}
- expiry = GNUNET_TIME_absolute_add (now,
- pg->legal_reserve_expiration_time);
- reserve.gc = GNUNET_TIME_absolute_max (expiry,
+ gc = GNUNET_TIME_absolute_add (now,
+ pg->legal_reserve_expiration_time);
+ reserve.gc = GNUNET_TIME_absolute_max (gc,
reserve.gc);
(void) GNUNET_TIME_round_abs (&reserve.gc);
qs = reserves_update (cls,
@@ -8474,6 +8562,7 @@ postgres_insert_recoup_request (
{
struct PostgresClosure *pg = cls;
struct GNUNET_TIME_Absolute expiry;
+ struct GNUNET_TIME_Absolute gc;
struct TALER_EXCHANGEDB_Reserve reserve;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&coin->coin_pub),
@@ -8517,9 +8606,9 @@ postgres_insert_recoup_request (
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Inserting recoup for coin %s\n",
TALER_B2S (&coin->coin_pub));
- expiry = GNUNET_TIME_absolute_add (timestamp,
- pg->legal_reserve_expiration_time);
- reserve.gc = GNUNET_TIME_absolute_max (expiry,
+ gc = GNUNET_TIME_absolute_add (timestamp,
+ pg->legal_reserve_expiration_time);
+ reserve.gc = GNUNET_TIME_absolute_max (gc,
reserve.gc);
(void) GNUNET_TIME_round_abs (&reserve.gc);
expiry = GNUNET_TIME_absolute_add (timestamp,
@@ -10549,6 +10638,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->drop_tables = &postgres_drop_tables;
plugin->create_tables = &postgres_create_tables;
plugin->start = &postgres_start;
+ plugin->start_read_committed = &postgres_start_read_committed;
plugin->commit = &postgres_commit;
plugin->preflight = &postgres_preflight;
plugin->rollback = &postgres_rollback;