diff options
author | Christian Grothoff <christian@grothoff.org> | 2021-06-22 13:15:50 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2021-06-22 13:15:50 +0200 |
commit | 0caf3ac2b7716b55da43b492d0a24e33694724c5 (patch) | |
tree | afc09b24e7c8eb62639292e24302d10a5139bba4 | |
parent | c9a928fe357a9f2c9e9b679ea18f3b394b492031 (diff) |
-fix up wirewatch logic
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 19 | ||||
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 112 |
2 files changed, 113 insertions, 18 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 5d35eba57..03f6e9e87 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -116,7 +116,7 @@ struct WireAccount /** * How much do we incremnt @e batch_size on success? */ - unsigned int batch_increment; + unsigned int batch_thresh; /** * How many transactions did we see in the current batch? @@ -375,8 +375,8 @@ handle_soft_error (struct WireAccount *wa) wa->session); if (1 < wa->batch_size) { + wa->batch_thresh = wa->batch_size; wa->batch_size /= 2; - wa->batch_increment = 0; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Reduced batch size to %llu due to serialization issue\n", (unsigned long long) wa->batch_size); @@ -451,9 +451,13 @@ do_commit (struct WireAccount *wa) wa->session = NULL; /* should not be needed */ if (wa->batch_size < MAXIMUM_BATCH_SIZE) { - wa->batch_increment++; + int delta; + + delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; + if (delta < 0) + delta = -delta; wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, - wa->batch_size + wa->batch_increment); + wa->batch_size + delta + 1); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Increasing batch size to %llu\n", (unsigned long long) wa->batch_size); @@ -669,9 +673,9 @@ find_transfers (void *cls) } } if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - session, - "wirewatch check for incoming wire transfers")) + db_plugin->start_read_committed (db_plugin->cls, + session, + "wirewatch check for incoming wire transfers")) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); @@ -679,6 +683,7 @@ find_transfers (void *cls) GNUNET_SCHEDULER_shutdown (); return; } + limit = GNUNET_MIN (wa_pos->batch_size, wa_pos->shard_end - wa_pos->batch_start); GNUNET_assert (NULL == wa_pos->hh); diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index d06fc6a1e..886d26ed4 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -426,7 +426,6 @@ postgres_get_session (void *cls) " WHERE reserve_pub=$1" " LIMIT 1;", 1), - /* Used in #postgres_reserves_in_insert() when the reserve is new */ GNUNET_PQ_make_prepare ("reserve_create", "INSERT INTO reserves " "(reserve_pub" @@ -2571,6 +2570,47 @@ postgres_start (void *cls, /** + * Start a READ COMMITTED transaction. + * + * @param cls the `struct PostgresClosure` with the plugin-specific state + * @param session the database connection + * @param name unique name identifying the transaction (for debugging) + * must point to a constant + * @return #GNUNET_OK on success + */ +static int +postgres_start_read_committed (void *cls, + struct TALER_EXCHANGEDB_Session *session, + const char *name) +{ + struct GNUNET_PQ_ExecuteStatement es[] = { + GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"), + GNUNET_PQ_EXECUTE_STATEMENT_END + }; + + (void) cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting transaction named: %s\n", + name); + postgres_preflight (cls, + session); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting transaction on %p\n", + session->conn); + if (GNUNET_OK != + GNUNET_PQ_exec_statements (session->conn, + es)) + { + TALER_LOG_ERROR ("Failed to start transaction\n"); + GNUNET_break (0); + return GNUNET_SYSERR; + } + session->transaction_name = name; + return GNUNET_OK; +} + + +/** * Roll back the current transaction of a database connection. * * @param cls the `struct PostgresClosure` with the plugin-specific state @@ -3446,11 +3486,18 @@ postgres_reserves_in_insert (void *cls, enum GNUNET_DB_QueryStatus qs1; struct TALER_EXCHANGEDB_Reserve reserve; struct GNUNET_TIME_Absolute expiry; + struct GNUNET_TIME_Absolute gc; + struct GNUNET_TIME_Absolute now; + now = GNUNET_TIME_absolute_get (); + (void) GNUNET_TIME_round_abs (&now); reserve.pub = *reserve_pub; expiry = GNUNET_TIME_absolute_add (execution_time, pg->idle_reserve_expiration_time); (void) GNUNET_TIME_round_abs (&expiry); + gc = GNUNET_TIME_absolute_add (now, + pg->legal_reserve_expiration_time); + (void) GNUNET_TIME_round_abs (&gc); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating reserve %s with expiration in %s\n", TALER_B2S (reserve_pub), @@ -3467,7 +3514,7 @@ postgres_reserves_in_insert (void *cls, GNUNET_PQ_query_param_string (sender_account_details), TALER_PQ_query_param_amount (balance), TALER_PQ_query_param_absolute_time (&expiry), - TALER_PQ_query_param_absolute_time (&expiry), + TALER_PQ_query_param_absolute_time (&gc), GNUNET_PQ_query_param_end }; @@ -3505,6 +3552,13 @@ postgres_reserves_in_insert (void *cls, } if (0 >= qs2) { + if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) && + (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) ) + { + GNUNET_break (0); /* should be impossible: reserve was fresh, + but transaction already known */ + return GNUNET_DB_STATUS_HARD_ERROR; + } /* Transaction was already known or error. We are finished. */ return qs2; } @@ -3515,6 +3569,22 @@ postgres_reserves_in_insert (void *cls, /* we were wrong with our optimistic assumption: reserve does exist, need to do an update instead */ { + enum GNUNET_DB_QueryStatus cs; + + cs = postgres_commit (cls, + session); + if (cs < 0) + return cs; + if (GNUNET_OK != + postgres_start (cls, + session, + "reserve-update-serializable")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + { enum GNUNET_DB_QueryStatus reserve_exists; reserve_exists = postgres_reserves_get (cls, @@ -3560,7 +3630,7 @@ postgres_reserves_in_insert (void *cls, updated_reserve.expiry = GNUNET_TIME_absolute_max (expiry, reserve.expiry); (void) GNUNET_TIME_round_abs (&updated_reserve.expiry); - updated_reserve.gc = GNUNET_TIME_absolute_max (updated_reserve.expiry, + updated_reserve.gc = GNUNET_TIME_absolute_max (gc, reserve.gc); (void) GNUNET_TIME_round_abs (&updated_reserve.gc); qs3 = reserves_update (cls, @@ -3581,8 +3651,26 @@ postgres_reserves_in_insert (void *cls, /* continued below */ break; } - return qs3; } + + /* Go back to original transaction mode */ + { + enum GNUNET_DB_QueryStatus cs; + + cs = postgres_commit (cls, + session); + if (cs < 0) + return cs; + if (GNUNET_OK != + postgres_start_read_committed (cls, + session, + "reserve-insert-continued")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } @@ -3698,7 +3786,7 @@ postgres_insert_withdraw_info ( struct PostgresClosure *pg = cls; struct TALER_EXCHANGEDB_Reserve reserve; struct GNUNET_TIME_Absolute now; - struct GNUNET_TIME_Absolute expiry; + struct GNUNET_TIME_Absolute gc; struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (&collectable->h_coin_envelope), GNUNET_PQ_query_param_auto_from_type (&collectable->denom_pub_hash), @@ -3749,9 +3837,9 @@ postgres_insert_withdraw_info ( TALER_B2S (&collectable->reserve_pub)); return GNUNET_DB_STATUS_HARD_ERROR; } - expiry = GNUNET_TIME_absolute_add (now, - pg->legal_reserve_expiration_time); - reserve.gc = GNUNET_TIME_absolute_max (expiry, + gc = GNUNET_TIME_absolute_add (now, + pg->legal_reserve_expiration_time); + reserve.gc = GNUNET_TIME_absolute_max (gc, reserve.gc); (void) GNUNET_TIME_round_abs (&reserve.gc); qs = reserves_update (cls, @@ -8474,6 +8562,7 @@ postgres_insert_recoup_request ( { struct PostgresClosure *pg = cls; struct GNUNET_TIME_Absolute expiry; + struct GNUNET_TIME_Absolute gc; struct TALER_EXCHANGEDB_Reserve reserve; struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (&coin->coin_pub), @@ -8517,9 +8606,9 @@ postgres_insert_recoup_request ( GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Inserting recoup for coin %s\n", TALER_B2S (&coin->coin_pub)); - expiry = GNUNET_TIME_absolute_add (timestamp, - pg->legal_reserve_expiration_time); - reserve.gc = GNUNET_TIME_absolute_max (expiry, + gc = GNUNET_TIME_absolute_add (timestamp, + pg->legal_reserve_expiration_time); + reserve.gc = GNUNET_TIME_absolute_max (gc, reserve.gc); (void) GNUNET_TIME_round_abs (&reserve.gc); expiry = GNUNET_TIME_absolute_add (timestamp, @@ -10549,6 +10638,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) plugin->drop_tables = &postgres_drop_tables; plugin->create_tables = &postgres_create_tables; plugin->start = &postgres_start; + plugin->start_read_committed = &postgres_start_read_committed; plugin->commit = &postgres_commit; plugin->preflight = &postgres_preflight; plugin->rollback = &postgres_rollback; |