diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 327 |
1 files changed, 167 insertions, 160 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 9b1c7e3ee..b84c837e5 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -287,7 +287,8 @@ static int reserves_idle; * Note: do not change here, Postgres requires us to hard-code the * LIMIT in the prepared statement. */ -static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; +static unsigned int aggregation_limit = + TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; @@ -347,13 +348,13 @@ update_fees (struct WireAccount *wa, p = p->next) { qs = db_plugin->insert_wire_fee (db_plugin->cls, - session, - wa->wire_plugin->method, - p->start_date, - p->end_date, - &p->wire_fee, - &p->closing_fee, - &p->master_sig); + session, + wa->wire_plugin->method, + p->start_date, + p->end_date, + &p->wire_fee, + &p->closing_fee, + &p->master_sig); if (qs < 0) { TALER_EXCHANGEDB_fees_free (wa->af); @@ -486,8 +487,9 @@ shutdown_task (void *cls) { if (NULL != wpd->eh) { - wpd->wa->wire_plugin->execute_wire_transfer_cancel (wpd->wa->wire_plugin->cls, - wpd->eh); + wpd->wa->wire_plugin->execute_wire_transfer_cancel ( + wpd->wa->wire_plugin->cls, + wpd->eh); wpd->eh = NULL; } db_plugin->rollback (db_plugin->cls, @@ -499,8 +501,9 @@ shutdown_task (void *cls) { if (NULL != au->ph) { - au->wa->wire_plugin->prepare_wire_transfer_cancel (au->wa->wire_plugin->cls, - au->ph); + au->wa->wire_plugin->prepare_wire_transfer_cancel ( + au->wa->wire_plugin->cls, + au->ph); au->ph = NULL; } db_plugin->rollback (db_plugin->cls, @@ -509,8 +512,9 @@ shutdown_task (void *cls) } if (NULL != ctc) { - ctc->wa->wire_plugin->prepare_wire_transfer_cancel (ctc->wa->wire_plugin->cls, - ctc->ph); + ctc->wa->wire_plugin->prepare_wire_transfer_cancel ( + ctc->wa->wire_plugin->cls, + ctc->ph); ctc->ph = NULL; db_plugin->rollback (db_plugin->cls, ctc->session); @@ -613,12 +617,12 @@ exchange_serve_process_config () */ static int refund_by_coin_cb (void *cls, - const struct TALER_MerchantPublicKeyP *merchant_pub, - const struct TALER_MerchantSignatureP *merchant_sig, - const struct GNUNET_HashCode *h_contract, - uint64_t rtransaction_id, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *refund_fee) + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_MerchantSignatureP *merchant_sig, + const struct GNUNET_HashCode *h_contract, + uint64_t rtransaction_id, + const struct TALER_Amount *amount_with_fee, + const struct TALER_Amount *refund_fee) { struct AggregationUnit *aux = cls; @@ -632,8 +636,8 @@ refund_by_coin_cb (void *cls, return GNUNET_OK; /* different contract */ if (GNUNET_OK != TALER_amount_subtract (&aux->total_amount, - &aux->total_amount, - amount_with_fee)) + &aux->total_amount, + amount_with_fee)) { GNUNET_break (0); return GNUNET_SYSERR; @@ -680,17 +684,17 @@ deposit_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Fatally malformed record at row %llu over %s\n", (unsigned long long) row_id, - TALER_amount2s (amount_with_fee)); + TALER_amount2s (amount_with_fee)); return GNUNET_DB_STATUS_HARD_ERROR; } au->row_id = row_id; au->h_contract = h_contract_terms; qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - au->session, - coin_pub, - &refund_by_coin_cb, - au); + au->session, + coin_pub, + &refund_by_coin_cb, + au); au->h_contract = NULL; if (0 > qs) { @@ -738,8 +742,8 @@ deposit_cb (void *cls, au->execution_time = GNUNET_TIME_absolute_get (); (void) GNUNET_TIME_round_abs (&au->execution_time); qs = update_fees (au->wa, - au->execution_time, - au->session); + au->execution_time, + au->session); if (qs <= 0) { if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) @@ -750,17 +754,17 @@ deposit_cb (void *cls, au->wire_fee = au->wa->af->wire_fee; qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - au->session, - &au->wtid, - row_id); + au->session, + &au->wtid, + row_id); if (qs <= 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } qs = db_plugin->mark_deposit_done (db_plugin->cls, - au->session, - row_id); + au->session, + row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); @@ -811,13 +815,13 @@ aggregate_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Fatally malformed record at %llu over amount %s\n", (unsigned long long) row_id, - TALER_amount2s (amount_with_fee)); + TALER_amount2s (amount_with_fee)); return GNUNET_DB_STATUS_HARD_ERROR; } /* add to total */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding transaction amount %s to aggregation\n", - TALER_amount2s (&delta)); + "Adding transaction amount %s to aggregation\n", + TALER_amount2s (&delta)); if (GNUNET_OK != TALER_amount_add (&au->total_amount, &au->total_amount, @@ -832,10 +836,10 @@ aggregate_cb (void *cls, au->h_contract = h_contract_terms; qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - au->session, - coin_pub, - &refund_by_coin_cb, - au); + au->session, + coin_pub, + &refund_by_coin_cb, + au); au->h_contract = NULL; if (0 > qs) { @@ -857,25 +861,25 @@ aggregate_cb (void *cls, au->additional_rows[au->rows_offset++] = row_id; /* insert into aggregation tracking table */ qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - au->session, - &au->wtid, - row_id); + au->session, + &au->wtid, + row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } qs = db_plugin->mark_deposit_done (db_plugin->cls, - au->session, - row_id); + au->session, + row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Added row %llu with %s to aggregation\n", - (unsigned long long) row_id, + "Added row %llu with %s to aggregation\n", + (unsigned long long) row_id, TALER_amount2s (&delta)); return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } @@ -958,8 +962,8 @@ commit_or_warn (struct TALER_EXCHANGEDB_Session *session) */ static void prepare_close_cb (void *cls, - const char *buf, - size_t buf_size) + const char *buf, + size_t buf_size) { enum GNUNET_DB_QueryStatus qs; @@ -984,10 +988,10 @@ prepare_close_cb (void *cls, /* Commit our intention to execute the wire transfer! */ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - ctc->session, - ctc->method, - buf, - buf_size); + ctc->session, + ctc->method, + buf, + buf_size); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_break (0); @@ -1021,7 +1025,7 @@ prepare_close_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Reserve closure committed, running transfer\n"); task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); + NULL); } @@ -1059,10 +1063,10 @@ struct ExpiredReserveContext */ static enum GNUNET_DB_QueryStatus expired_reserve_cb (void *cls, - const struct TALER_ReservePublicKeyP *reserve_pub, - const struct TALER_Amount *left, - const char *account_details, - struct GNUNET_TIME_Absolute expiration_date) + const struct TALER_ReservePublicKeyP *reserve_pub, + const struct TALER_Amount *left, + const char *account_details, + struct GNUNET_TIME_Absolute expiration_date) { struct ExpiredReserveContext *erc = cls; struct TALER_EXCHANGEDB_Session *session = erc->session; @@ -1090,8 +1094,8 @@ expired_reserve_cb (void *cls, /* lookup `closing_fee` */ qs = update_fees (wa, - now, - session); + now, + session); if (qs <= 0) { if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) @@ -1106,8 +1110,8 @@ expired_reserve_cb (void *cls, /* calculate transfer amount */ ret = TALER_amount_subtract (&amount_without_fee, - left, - closing_fee); + left, + closing_fee); if ( (GNUNET_SYSERR == ret) || (GNUNET_NO == ret) ) { @@ -1122,21 +1126,21 @@ expired_reserve_cb (void *cls, /* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to be future-compatible, we use the memset + min construction */ memset (&wtid, - 0, - sizeof (wtid)); + 0, + sizeof (wtid)); memcpy (&wtid, - reserve_pub, - GNUNET_MIN (sizeof (wtid), - sizeof (*reserve_pub))); + reserve_pub, + GNUNET_MIN (sizeof (wtid), + sizeof (*reserve_pub))); qs = db_plugin->insert_reserve_closed (db_plugin->cls, - session, - reserve_pub, - now, - account_details, - &wtid, - left, - closing_fee); + session, + reserve_pub, + now, + account_details, + &wtid, + left, + closing_fee); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Closing reserve %s over %s (%d, %d)\n", @@ -1149,8 +1153,8 @@ expired_reserve_cb (void *cls, { /* success, perform wire transfer */ if (GNUNET_SYSERR == - wa->wire_plugin->amount_round (wa->wire_plugin->cls, - &amount_without_fee)) + wa->wire_plugin->amount_round (wa->wire_plugin->cls, + &amount_without_fee)) { GNUNET_break (0); global_ret = GNUNET_SYSERR; @@ -1164,12 +1168,12 @@ expired_reserve_cb (void *cls, ctc->ph = wa->wire_plugin->prepare_wire_transfer (wa->wire_plugin->cls, wa->section_name, - account_details, - &amount_without_fee, - exchange_base_url, - &wtid, - &prepare_close_cb, - ctc); + account_details, + &amount_without_fee, + exchange_base_url, + &wtid, + &prepare_close_cb, + ctc); if (NULL == ctc->ph) { GNUNET_break (0); @@ -1233,7 +1237,7 @@ run_reserve_closures (void *cls) session); if (GNUNET_OK != db_plugin->start (db_plugin->cls, - session, + session, "aggregator reserve closures")) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1247,10 +1251,10 @@ run_reserve_closures (void *cls) now = GNUNET_TIME_absolute_get (); (void) GNUNET_TIME_round_abs (&now); qs = db_plugin->get_expired_reserves (db_plugin->cls, - session, - now, - &expired_reserve_cb, - &erc); + session, + now, + &expired_reserve_cb, + &erc); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -1264,7 +1268,7 @@ run_reserve_closures (void *cls) db_plugin->rollback (db_plugin->cls, session); task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); + NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1273,14 +1277,14 @@ run_reserve_closures (void *cls) db_plugin->rollback (db_plugin->cls, session); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + NULL); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: if (GNUNET_YES == erc.async_cont) break; (void) commit_or_warn (session); task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); + NULL); return; } } @@ -1307,7 +1311,7 @@ run_aggregation (void *cls) if (0 == (++swap % 2)) { task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); + NULL); return; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1333,9 +1337,9 @@ run_aggregation (void *cls) au = GNUNET_new (struct AggregationUnit); au->session = session; qs = db_plugin->get_ready_deposit (db_plugin->cls, - session, - &deposit_cb, - au); + session, + &deposit_cb, + au); if (0 >= qs) { cleanup_au (); @@ -1354,7 +1358,7 @@ run_aggregation (void *cls) /* should re-try immediately */ swap--; /* do not count failed attempts */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + NULL); return; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1368,15 +1372,15 @@ run_aggregation (void *cls) else { if ( (GNUNET_NO == reserves_idle) || - (GNUNET_YES == test_mode) ) - /* Possibly more to on reserves, go for it immediately */ - task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); + (GNUNET_YES == test_mode) ) + /* Possibly more to on reserves, go for it immediately */ + task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, + NULL); else - /* nothing to do, sleep for a minute and try again */ - task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, - &run_aggregation, - NULL); + /* nothing to do, sleep for a minute and try again */ + task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &run_aggregation, + NULL); } return; } @@ -1386,12 +1390,12 @@ run_aggregation (void *cls) "Found ready deposit for %s, aggregating\n", TALER_B2S (&au->merchant_pub)); qs = db_plugin->iterate_matching_deposits (db_plugin->cls, - session, - &au->h_wire, - &au->merchant_pub, - &aggregate_cb, - au, - aggregation_limit); + session, + &au->h_wire, + &au->merchant_pub, + &aggregate_cb, + au, + aggregation_limit); if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) || (GNUNET_YES == au->failed) ) { @@ -1412,7 +1416,7 @@ run_aggregation (void *cls) db_plugin->rollback (db_plugin->cls, session); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + NULL); return; } @@ -1431,8 +1435,8 @@ run_aggregation (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Aggregate value too low for transfer (%d/%s)\n", - qs, - TALER_amount2s (&au->final_amount)); + qs, + TALER_amount2s (&au->final_amount)); /* Rollback ongoing transaction, as we will not use the respective WTID and thus need to remove the tracking data */ db_plugin->rollback (db_plugin->cls, @@ -1454,35 +1458,35 @@ run_aggregation (void *cls) } /* Mark transactions by row_id as minor */ qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - session, - au->row_id); + session, + au->row_id); if (0 <= qs) { - for (unsigned int i=0;i<au->rows_offset;i++) + for (unsigned int i = 0; i<au->rows_offset; i++) { qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - session, - au->additional_rows[i]); - if (0 > qs) - break; + session, + au->additional_rows[i]); + if (0 > qs) + break; } } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Serialization issue, trying again later!\n"); + "Serialization issue, trying again later!\n"); db_plugin->rollback (db_plugin->cls, - session); + session); cleanup_au (); /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + NULL); return; } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { db_plugin->rollback (db_plugin->cls, - session); + session); cleanup_au (); GNUNET_SCHEDULER_shutdown (); return; @@ -1509,14 +1513,15 @@ run_aggregation (void *cls) char *url; url = TALER_JSON_wire_to_payto (au->wire); - au->ph = au->wa->wire_plugin->prepare_wire_transfer (au->wa->wire_plugin->cls, - au->wa->section_name, - url, - &au->final_amount, - exchange_base_url, - &au->wtid, - &prepare_cb, - au); + au->ph = au->wa->wire_plugin->prepare_wire_transfer ( + au->wa->wire_plugin->cls, + au->wa->section_name, + url, + &au->final_amount, + exchange_base_url, + &au->wtid, + &prepare_cb, + au); GNUNET_free (url); } if (NULL == au->ph) @@ -1563,24 +1568,24 @@ prepare_cb (void *cls, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Storing %u bytes of wire prepare data\n", - (unsigned int) buf_size); + "Storing %u bytes of wire prepare data\n", + (unsigned int) buf_size); /* Commit our intention to execute the wire transfer! */ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - session, - au->wa->wire_plugin->method, - buf, - buf_size); + session, + au->wa->wire_plugin->method, + buf, + buf_size); /* Commit the WTID data to 'wire_out' to finally satisfy aggregation table constraints */ if (qs >= 0) qs = db_plugin->store_wire_transfer_out (db_plugin->cls, - session, - au->execution_time, - &au->wtid, - au->wire, + session, + au->execution_time, + &au->wtid, + au->wire, au->wa->section_name, - &au->final_amount); + &au->final_amount); cleanup_au (); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { @@ -1672,8 +1677,8 @@ wire_confirm_cb (void *cls, return; } qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, - session, - wpd->row_id); + session, + wpd->row_id); if (0 >= qs) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); @@ -1683,7 +1688,7 @@ wire_confirm_cb (void *cls, { /* try again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + NULL); } else { @@ -1759,11 +1764,12 @@ wire_prepare_cb (void *cls, wpd = NULL; return; } - wpd->eh = wpd->wa->wire_plugin->execute_wire_transfer (wpd->wa->wire_plugin->cls, - buf, - buf_size, - &wire_confirm_cb, - NULL); + wpd->eh = wpd->wa->wire_plugin->execute_wire_transfer ( + wpd->wa->wire_plugin->cls, + buf, + buf_size, + &wire_confirm_cb, + NULL); if (NULL == wpd->eh) { GNUNET_break (0); /* why? how to best recover? */ @@ -1821,13 +1827,13 @@ run_transfers (void *cls) wpd = GNUNET_new (struct WirePrepareData); wpd->session = session; qs = db_plugin->wire_prepare_data_get (db_plugin->cls, - session, - &wire_prepare_cb, - NULL); + session, + &wire_prepare_cb, + NULL); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) return; /* continues in #wire_prepare_cb() */ db_plugin->rollback (db_plugin->cls, - session); + session); GNUNET_free (wpd); wpd = NULL; switch (qs) @@ -1840,7 +1846,7 @@ run_transfers (void *cls) case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); + NULL); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* no more prepared wire transfers, go back to aggregation! */ @@ -1910,9 +1916,9 @@ main (int argc, { struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_flag ('t', - "test", - "run in test mode and exit when idle", - &test_mode), + "test", + "run in test mode and exit when idle", + &test_mode), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; @@ -1923,7 +1929,8 @@ main (int argc, if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "taler-exchange-aggregator", - gettext_noop ("background process that aggregates and executes wire transfers to merchants"), + gettext_noop ( + "background process that aggregates and executes wire transfers to merchants"), options, &run, NULL)) { |