diff options
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 167 |
1 files changed, 103 insertions, 64 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index a417342a2..eb0449942 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -362,6 +362,7 @@ static void handle_soft_error (struct WireAccount *wa) { db_plugin->rollback (db_plugin->cls); + wa->started_transaction = false; if (1 < wa->batch_size) { wa->batch_thresh = wa->batch_size; @@ -377,61 +378,13 @@ handle_soft_error (struct WireAccount *wa) /** - * We are finished with the current transaction, try - * to commit and then schedule the next iteration. + * We are done with a shard, move on to the next one. * - * @param wa wire account to commit for + * @param wa wire account for which we completed a shard */ static void -do_commit (struct WireAccount *wa) +shard_completed (struct WireAccount *wa) { - enum GNUNET_DB_QueryStatus qs; - - if (wa->shard_end <= wa->latest_row_off) - { - /* shard is complete, mark this as well */ - qs = db_plugin->complete_shard (db_plugin->cls, - wa->job_name, - wa->shard_start, - wa->shard_end); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for complete_shard. Rolling back.\n"); - handle_soft_error (wa); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* already existed, ok, let's just continue */ - break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ - shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); - - break; - } - } - qs = db_plugin->commit (db_plugin->cls); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - /* reduce transaction size to reduce rollback probability */ - handle_soft_error (wa); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ - break; - } /* transaction success, update #last_row_off */ wa->batch_start = wa->latest_row_off; if (wa->batch_size < MAXIMUM_BATCH_SIZE) @@ -447,15 +400,6 @@ do_commit (struct WireAccount *wa) "Increasing batch size to %llu\n", (unsigned long long) wa->batch_size); } - if ( (wa->delay) && - (test_mode) && - (NULL == wa->next) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Shutdown due to test mode!\n"); - GNUNET_SCHEDULER_shutdown (); - return; - } if (wa->delay) { wa->delayed_until @@ -473,6 +417,83 @@ do_commit (struct WireAccount *wa) /** + * We are finished with the current shard. Update the database, marking the + * shard as finished. + * + * @param wa wire account to commit for + * @return true on success + */ +static bool +mark_shard_done (struct WireAccount *wa) +{ + enum GNUNET_DB_QueryStatus qs; + + if (wa->shard_end > wa->latest_row_off) + return false; /* actually, not done! */ + /* shard is complete, mark this as well */ + qs = db_plugin->complete_shard (db_plugin->cls, + wa->job_name, + wa->shard_start, + wa->shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls); + GNUNET_SCHEDULER_shutdown (); + return false; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + handle_soft_error (wa); + return false; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* already existed, ok, let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); + + break; + } + return true; +} + + +/** + * We are finished with the current transaction, try + * to commit and then schedule the next iteration. + * + * @param wa wire account to commit for + */ +static void +do_commit (struct WireAccount *wa) +{ + enum GNUNET_DB_QueryStatus qs; + + wa->started_transaction = false; + mark_shard_done (wa); + qs = db_plugin->commit (db_plugin->cls); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* reduce transaction size to reduce rollback probability */ + handle_soft_error (wa); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + break; + } + shard_completed (wa); +} + + +/** * Callbacks of this type are used to serve the result of asking * the bank for the transaction history. * @@ -510,9 +531,20 @@ history_cb (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "End of list. Committing progress!\n"); - wa->started_transaction = false; do_commit (wa); } + else + { + if ( (wa->delay) && + (test_mode) && + (NULL == wa->next) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shutdown due to test mode!\n"); + GNUNET_SCHEDULER_shutdown (); + return GNUNET_OK; + } + } return GNUNET_OK; /* will be ignored anyway */ } if (serial_id < wa->latest_row_off) @@ -542,11 +574,13 @@ history_cb (void *cls, wa->delay = false; if (wa->started_transaction) { - wa->started_transaction = false; do_commit (wa); } else - GNUNET_break (0); /* how did this happen */ + { + if (mark_shard_done (wa)) + shard_completed (wa); + } wa->hh = NULL; return GNUNET_SYSERR; } @@ -587,6 +621,7 @@ history_cb (void *cls, case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); db_plugin->rollback (db_plugin->cls); + wa->started_transaction = false; GNUNET_SCHEDULER_shutdown (); wa->hh = NULL; return GNUNET_SYSERR; @@ -703,7 +738,11 @@ find_transfers (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start request for account history!\n"); - db_plugin->rollback (db_plugin->cls); + if (wa_pos->started_transaction) + { + db_plugin->rollback (db_plugin->cls); + wa_pos->started_transaction = false; + } global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; |