diff options
Diffstat (limited to 'src/exchangedb/plugin_exchangedb_postgres.c')
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 1396 |
1 files changed, 750 insertions, 646 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 4c94c2d79..4e0ba2940 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -30,38 +30,6 @@ #include "plugin_exchangedb_common.c" -/** - * Error code returned by Postgres for deadlock. - */ -#define PQ_DIAG_SQLSTATE_DEADLOCK "40P01" - -/** - * Error code returned by Postgres for uniqueness violation. - */ -#define PQ_DIAG_SQLSTATE_UNIQUE_VIOLATION "23505" - -/** - * Error code returned by Postgres on serialization failure. - */ -#define PQ_DIAG_SQLSTATE_SERIALIZATION_FAILURE "40001" - - -/** - * Log a query error. - * - * @param result PQ result object of the query that failed - * @param conn SQL connection that was used - */ -#define QUERY_ERR(result,conn) \ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, \ - "Query failed at %s:%u: %s/%s/%s/%s/%s\n", \ - __FILE__, __LINE__, \ - PQresultErrorField (result, PG_DIAG_MESSAGE_PRIMARY), \ - PQresultErrorField (result, PG_DIAG_MESSAGE_DETAIL), \ - PQresultErrorMessage (result), \ - PQresStatus (PQresultStatus (result)), \ - PQerrorMessage (conn)); - /** * Log a really unexpected PQ error with all the details we can get hold of. @@ -2870,67 +2838,31 @@ postgres_iterate_matching_deposits (void *cls, * @param session the database session handle * @param coin_pub the public key of the coin to search for * @param coin_info place holder for the returned coin information object - * @return #GNUNET_SYSERR upon error; #GNUNET_NO if no coin is found; #GNUNET_OK - * if upon succesfullying retrieving the record data info @a - * coin_info + * @return transaction status code */ -// FIXME: #5010 -static int +static enum GNUNET_DB_QueryStatus get_known_coin (void *cls, struct TALER_EXCHANGEDB_Session *session, const struct TALER_CoinSpendPublicKeyP *coin_pub, struct TALER_CoinPublicInfo *coin_info) { - PGresult *result; struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (coin_pub), GNUNET_PQ_query_param_end }; - int nrows; - - result = GNUNET_PQ_exec_prepared (session->conn, - "get_known_coin", - params); - if (PGRES_TUPLES_OK != PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - nrows = PQntuples (result); - if (0 == nrows) - { - PQclear (result); - return GNUNET_NO; - } - GNUNET_assert (1 == nrows); /* due to primary key */ - if (NULL == coin_info) - { - PQclear (result); - return GNUNET_YES; - } - { - struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_rsa_public_key ("denom_pub", - &coin_info->denom_pub.rsa_public_key), - GNUNET_PQ_result_spec_rsa_signature ("denom_sig", - &coin_info->denom_sig.rsa_signature), - GNUNET_PQ_result_spec_end - }; - - if (GNUNET_OK != - GNUNET_PQ_extract_result (result, - rs, - 0)) - { - PQclear (result); - GNUNET_break (0); - return GNUNET_SYSERR; - } - } - PQclear (result); + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_rsa_public_key ("denom_pub", + &coin_info->denom_pub.rsa_public_key), + GNUNET_PQ_result_spec_rsa_signature ("denom_sig", + &coin_info->denom_sig.rsa_signature), + GNUNET_PQ_result_spec_end + }; + coin_info->coin_pub = *coin_pub; - return GNUNET_OK; + return GNUNET_PQ_eval_prepared_singleton_select (session->conn, + "get_known_coin", + params, + rs); } @@ -2966,6 +2898,50 @@ insert_known_coin (void *cls, /** + * Make sure the given @a coin is known to the database. + * + * @param cls database connection plugin state + * @param session database session + * @param coin the coin that must be made known + * @return database transaction status, non-negative on success + */ +static enum GNUNET_DB_QueryStatus +ensure_coin_known (struct PostgresClosure *cls, + struct TALER_EXCHANGEDB_Session *session, + const struct TALER_CoinPublicInfo *coin) +{ + enum GNUNET_DB_QueryStatus qs; + struct TALER_CoinPublicInfo known_coin; + + /* check if the coin is already known */ + qs = get_known_coin (cls, + session, + &coin->coin_pub, + &known_coin); + if (0 > qs) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return GNUNET_SYSERR; + } + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) + return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS; /* no change! */ + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs); + /* if not known, insert it */ + qs = insert_known_coin (cls, + session, + coin); + if (0 >= qs) + { + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + qs = GNUNET_DB_STATUS_HARD_ERROR; /* should be impossible */ + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; + } + return qs; +} + + +/** * Insert information about deposited coin into the database. * * @param cls the `struct PostgresClosure` with the plugin-specific state @@ -2978,7 +2954,6 @@ postgres_insert_deposit (void *cls, struct TALER_EXCHANGEDB_Session *session, const struct TALER_EXCHANGEDB_Deposit *deposit) { - int ret; enum GNUNET_DB_QueryStatus qs; struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (&deposit->coin.coin_pub), @@ -2994,28 +2969,10 @@ postgres_insert_deposit (void *cls, GNUNET_PQ_query_param_end }; - /* check if the coin is already known */ - ret = get_known_coin (cls, - session, - &deposit->coin.coin_pub, - NULL); - if (GNUNET_SYSERR == ret) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - if (GNUNET_NO == ret) /* if not, insert it */ - { - qs = insert_known_coin (cls, - session, - &deposit->coin); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - } - + if (0 > (qs = ensure_coin_known (cls, + session, + &deposit->coin))) + return qs; return GNUNET_PQ_eval_prepared_non_select (session->conn, "insert_deposit", params); @@ -3098,14 +3055,14 @@ postgres_get_refresh_session (void *cls, params, rs); if ( (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) && - (GNUNET_OK != - get_known_coin (cls, - session, - &refresh_session->melt.coin.coin_pub, - &refresh_session->melt.coin)) ) + (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + (qs = get_known_coin (cls, + session, + &refresh_session->melt.coin.coin_pub, + &refresh_session->melt.coin)) ) ) { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; } refresh_session->melt.session_hash = *session_hash; return qs; @@ -3136,31 +3093,12 @@ postgres_create_refresh_session (void *cls, GNUNET_PQ_query_param_uint16 (&refresh_session->noreveal_index), GNUNET_PQ_query_param_end }; - int ret; enum GNUNET_DB_QueryStatus qs; - /* check if the coin is already known (FIXME: #5010) */ - ret = get_known_coin (cls, - session, - &refresh_session->melt.coin.coin_pub, - NULL); - if (GNUNET_SYSERR == ret) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - if (GNUNET_NO == ret) /* if not, insert it */ - { - qs = insert_known_coin (cls, - session, - &refresh_session->melt.coin); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - } - + if (0 > (qs = ensure_coin_known (cls, + session, + &refresh_session->melt.coin))) + return qs; return GNUNET_PQ_eval_prepared_non_select (session->conn, "insert_refresh_session", params); @@ -3769,9 +3707,9 @@ struct CoinHistoryContext struct TALER_EXCHANGEDB_Session *session; /** - * Set to #GNUNET_SYSERR on errors + * Set to transaction status. */ - int status; + enum GNUNET_DB_QueryStatus status; }; @@ -3794,6 +3732,7 @@ add_coin_deposit (void *cls, { struct TALER_EXCHANGEDB_Deposit *deposit; struct TALER_EXCHANGEDB_TransactionList *tl; + enum GNUNET_DB_QueryStatus qs; deposit = GNUNET_new (struct TALER_EXCHANGEDB_Deposit); { @@ -3826,7 +3765,7 @@ add_coin_deposit (void *cls, { GNUNET_break (0); GNUNET_free (deposit); - chc->status = GNUNET_SYSERR; + chc->status = GNUNET_DB_STATUS_HARD_ERROR; return; } deposit->coin.coin_pub = *chc->coin_pub; @@ -3835,14 +3774,15 @@ add_coin_deposit (void *cls, tl->next = chc->head; tl->type = TALER_EXCHANGEDB_TT_DEPOSIT; tl->details.deposit = deposit; - if (GNUNET_SYSERR == get_known_coin (chc->db_cls, - chc->session, - chc->coin_pub, - &deposit->coin)) + qs = get_known_coin (chc->db_cls, + chc->session, + chc->coin_pub, + &deposit->coin); + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { - GNUNET_break (0); + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); GNUNET_free (deposit); - chc->status = GNUNET_SYSERR; + chc->status = qs; return; } chc->head = tl; @@ -3869,6 +3809,7 @@ add_coin_melt (void *cls, { struct TALER_EXCHANGEDB_RefreshMelt *melt; struct TALER_EXCHANGEDB_TransactionList *tl; + enum GNUNET_DB_QueryStatus qs; melt = GNUNET_new (struct TALER_EXCHANGEDB_RefreshMelt); { @@ -3892,7 +3833,7 @@ add_coin_melt (void *cls, { GNUNET_break (0); GNUNET_free (melt); - chc->status = GNUNET_SYSERR; + chc->status = GNUNET_DB_STATUS_HARD_ERROR; return; } melt->coin.coin_pub = *chc->coin_pub; @@ -3901,14 +3842,15 @@ add_coin_melt (void *cls, tl->next = chc->head; tl->type = TALER_EXCHANGEDB_TT_REFRESH_MELT; tl->details.melt = melt; - if (GNUNET_SYSERR == get_known_coin (chc->db_cls, - chc->session, - chc->coin_pub, - &melt->coin)) + qs = get_known_coin (chc->db_cls, + chc->session, + chc->coin_pub, + &melt->coin); + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { - GNUNET_break (0); + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); GNUNET_free (melt); - chc->status = GNUNET_SYSERR; + chc->status = qs; return; } chc->head = tl; @@ -3935,6 +3877,7 @@ add_coin_refund (void *cls, { struct TALER_EXCHANGEDB_Refund *refund; struct TALER_EXCHANGEDB_TransactionList *tl; + enum GNUNET_DB_QueryStatus qs; refund = GNUNET_new (struct TALER_EXCHANGEDB_Refund); { @@ -3961,7 +3904,7 @@ add_coin_refund (void *cls, { GNUNET_break (0); GNUNET_free (refund); - chc->status = GNUNET_SYSERR; + chc->status = GNUNET_DB_STATUS_HARD_ERROR; return; } refund->coin.coin_pub = *chc->coin_pub; @@ -3970,15 +3913,15 @@ add_coin_refund (void *cls, tl->next = chc->head; tl->type = TALER_EXCHANGEDB_TT_REFUND; tl->details.refund = refund; - if (GNUNET_SYSERR == - get_known_coin (chc->db_cls, - chc->session, - chc->coin_pub, - &refund->coin)) + qs = get_known_coin (chc->db_cls, + chc->session, + chc->coin_pub, + &refund->coin); + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { - GNUNET_break (0); + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); GNUNET_free (refund); - chc->status = GNUNET_SYSERR; + chc->status = qs; return; } chc->head = tl; @@ -4033,7 +3976,7 @@ add_coin_payback (void *cls, { GNUNET_break (0); GNUNET_free (payback); - chc->status = GNUNET_SYSERR; + chc->status = GNUNET_DB_STATUS_HARD_ERROR; return; } payback->coin.coin_pub = *chc->coin_pub; @@ -4096,7 +4039,7 @@ postgres_get_coin_transactions (void *cls, }; chc.head = NULL; - chc.status = GNUNET_OK; + chc.status = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; chc.coin_pub = coin_pub; chc.session = session; chc.db_cls = cls; @@ -4108,14 +4051,14 @@ postgres_get_coin_transactions (void *cls, work[i].cb, &chc); if ( (0 > qs) || - (GNUNET_OK != chc.status) ) + (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != chc.status) ) { if (NULL != chc.head) common_free_coin_transaction_list (cls, chc.head); *tlp = NULL; - if (GNUNET_OK != chc.status) - qs = GNUNET_DB_STATUS_HARD_ERROR; + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != chc.status) + qs = chc.status; return qs; } } @@ -4918,102 +4861,70 @@ postgres_gc (void *cls) GNUNET_PQ_query_param_end }; PGconn *conn; - PGresult *result; - + int ret; + now = GNUNET_TIME_absolute_get (); conn = GNUNET_PQ_connect (pc->connection_cfg_str); if (NULL == conn) return GNUNET_SYSERR; - if (GNUNET_OK != - postgres_prepare (conn)) - { - PQfinish (conn); - return GNUNET_SYSERR; - } - result = GNUNET_PQ_exec_prepared (conn, - "gc_prewire", - params_none); - if (PGRES_COMMAND_OK != PQresultStatus (result)) - { - BREAK_DB_ERR (result, conn); - PQclear (result); - PQfinish (conn); - return GNUNET_SYSERR; - } - PQclear (result); - result = GNUNET_PQ_exec_prepared (conn, - "gc_denominations", - params_time); - if (PGRES_COMMAND_OK != PQresultStatus (result)) + ret = postgres_prepare (conn); + if (GNUNET_OK == ret) { - BREAK_DB_ERR (result, conn); - PQclear (result); - PQfinish (conn); - return GNUNET_SYSERR; - } - PQclear (result); - result = GNUNET_PQ_exec_prepared (conn, - "gc_reserves", - params_time); - if (PGRES_COMMAND_OK != PQresultStatus (result)) - { - BREAK_DB_ERR (result, conn); - PQclear (result); - PQfinish (conn); - return GNUNET_SYSERR; + if ( (0 > GNUNET_PQ_eval_prepared_non_select (conn, + "gc_prewire", + params_none)) || + (0 > GNUNET_PQ_eval_prepared_non_select (conn, + "gc_denominations", + params_time)) || + (0 > GNUNET_PQ_eval_prepared_non_select (conn, + "gc_reserves", + params_time)) ) + ret = GNUNET_SYSERR; } - PQclear (result); PQfinish (conn); - return GNUNET_OK; + return ret; } /** - * Select deposits above @a serial_id in monotonically increasing - * order. - * - * @param cls closure - * @param session database connection - * @param serial_id highest serial ID to exclude (select strictly larger) - * @param cb function to call on each result - * @param cb_cls closure for @a cb - * @return #GNUNET_OK on success, - * #GNUNET_SYSERR on DB errors + * Closure for #deposit_serial_helper_cb(). */ -static int -postgres_select_deposits_above_serial_id (void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_DepositCallback cb, - void *cb_cls) +struct DepositSerialContext { - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_uint64 (&serial_id), - GNUNET_PQ_query_param_end - }; - PGresult *result; - result = GNUNET_PQ_exec_prepared (session->conn, - "audit_get_deposits_incr", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - int nrows; - int ret; - nrows = PQntuples (result); - if (0 == nrows) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "select_deposits_above_serial_id() returned 0 matching rows\n"); - PQclear (result); - return GNUNET_NO; - } - for (int i=0;i<nrows;i++) + /** + * Callback to call. + */ + TALER_EXCHANGEDB_DepositCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Status code, set to #GNUNET_SYSERR on hard errors. + */ + int status; +}; + + +/** + * Helper function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure of type `struct DepositSerialContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +deposit_serial_helper_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct DepositSerialContext *dsc = cls; + + for (unsigned int i=0;i<num_results;i++) { struct TALER_EXCHANGEDB_Deposit deposit; struct TALER_DenominationPublicKey denom_pub; @@ -5046,39 +4957,39 @@ postgres_select_deposits_above_serial_id (void *cls, &rowid), GNUNET_PQ_result_spec_end }; + int ret; + if (GNUNET_OK != GNUNET_PQ_extract_result (result, rs, i)) { GNUNET_break (0); - PQclear (result); - return GNUNET_SYSERR; + dsc->status = GNUNET_SYSERR; + return; } - ret = cb (cb_cls, - rowid, - deposit.timestamp, - &deposit.merchant_pub, - &denom_pub, - &deposit.coin.coin_pub, - &deposit.csig, - &deposit.amount_with_fee, - &deposit.h_contract_terms, - deposit.refund_deadline, - deposit.wire_deadline, - deposit.receiver_wire_account, - done); + ret = dsc->cb (dsc->cb_cls, + rowid, + deposit.timestamp, + &deposit.merchant_pub, + &denom_pub, + &deposit.coin.coin_pub, + &deposit.csig, + &deposit.amount_with_fee, + &deposit.h_contract_terms, + deposit.refund_deadline, + deposit.wire_deadline, + deposit.receiver_wire_account, + done); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } - PQclear (result); - return GNUNET_OK; } /** - * Select refresh sessions above @a serial_id in monotonically increasing + * Select deposits above @a serial_id in monotonically increasing * order. * * @param cls closure @@ -5086,47 +4997,76 @@ postgres_select_deposits_above_serial_id (void *cls, * @param serial_id highest serial ID to exclude (select strictly larger) * @param cb function to call on each result * @param cb_cls closure for @a cb - * @return #GNUNET_OK on success, - * #GNUNET_SYSERR on DB errors + * @return transaction status code */ -static int -postgres_select_refreshs_above_serial_id (void *cls, +static enum GNUNET_DB_QueryStatus +postgres_select_deposits_above_serial_id (void *cls, struct TALER_EXCHANGEDB_Session *session, uint64_t serial_id, - TALER_EXCHANGEDB_RefreshSessionCallback cb, + TALER_EXCHANGEDB_DepositCallback cb, void *cb_cls) { struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_end }; - PGresult *result; - int nrows; - int i; - int ret; + struct DepositSerialContext dsc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; - result = GNUNET_PQ_exec_prepared (session->conn, - "audit_get_refresh_sessions_incr", - params); + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "audit_get_deposits_incr", + params, + &deposit_serial_helper_cb, + &dsc); + if (GNUNET_OK != dsc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - nrows = PQntuples (result); - if (0 == nrows) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "select_refreshs_above_serial_id() returned 0 matching rows\n"); - PQclear (result); - return GNUNET_NO; - } +/** + * Closure for #refreshs_serial_helper_cb(). + */ +struct RefreshsSerialContext +{ + + /** + * Callback to call. + */ + TALER_EXCHANGEDB_RefreshSessionCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Status code, set to #GNUNET_SYSERR on hard errors. + */ + int status; +}; - for (i=0;i<nrows;i++) + +/** + * Helper function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure of type `struct RefreshsSerialContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +refreshs_serial_helper_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct RefreshsSerialContext *rsc = cls; + + for (unsigned int i=0;i<num_results;i++) { struct TALER_DenominationPublicKey denom_pub; struct TALER_CoinSpendPublicKeyP coin_pub; @@ -5136,7 +5076,6 @@ postgres_select_refreshs_above_serial_id (void *cls, uint16_t noreveal_index; uint64_t rowid; struct GNUNET_HashCode session_hash; - struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_rsa_public_key ("denom_pub", &denom_pub.rsa_public_key), @@ -5156,35 +5095,35 @@ postgres_select_refreshs_above_serial_id (void *cls, &session_hash), GNUNET_PQ_result_spec_end }; + int ret; + if (GNUNET_OK != GNUNET_PQ_extract_result (result, rs, i)) { GNUNET_break (0); - PQclear (result); - return GNUNET_SYSERR; + rsc->status = GNUNET_SYSERR; + return; } - ret = cb (cb_cls, - rowid, - &denom_pub, - &coin_pub, - &coin_sig, - &amount_with_fee, - num_newcoins, - noreveal_index, - &session_hash); + ret = rsc->cb (rsc->cb_cls, + rowid, + &denom_pub, + &coin_pub, + &coin_sig, + &amount_with_fee, + num_newcoins, + noreveal_index, + &session_hash); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } - PQclear (result); - return GNUNET_OK; } /** - * Select refunds above @a serial_id in monotonically increasing + * Select refresh sessions above @a serial_id in monotonically increasing * order. * * @param cls closure @@ -5192,49 +5131,80 @@ postgres_select_refreshs_above_serial_id (void *cls, * @param serial_id highest serial ID to exclude (select strictly larger) * @param cb function to call on each result * @param cb_cls closure for @a cb - * @return #GNUNET_OK on success, - * #GNUNET_SYSERR on DB errors + * @return transaction status code */ -static int -postgres_select_refunds_above_serial_id (void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_RefundCallback cb, - void *cb_cls) +static enum GNUNET_DB_QueryStatus +postgres_select_refreshs_above_serial_id (void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_RefreshSessionCallback cb, + void *cb_cls) { struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_end }; - PGresult *result; - int nrows; - int ret; + struct RefreshsSerialContext rsc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; - result = GNUNET_PQ_exec_prepared (session->conn, - "audit_get_refunds_incr", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "audit_get_refresh_sessions_incr", + params, + &refreshs_serial_helper_cb, + &rsc); + if (GNUNET_OK != rsc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} - nrows = PQntuples (result); - if (0 == nrows) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "select_refunds_above_serial_id() returned 0 matching rows\n"); - PQclear (result); - return GNUNET_NO; - } - for (int i=0;i<nrows;i++) + +/** + * Closure for #refunds_serial_helper_cb(). + */ +struct RefundsSerialContext +{ + + /** + * Callback to call. + */ + TALER_EXCHANGEDB_RefundCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Status code, set to #GNUNET_SYSERR on hard errors. + */ + int status; +}; + + +/** + * Helper function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure of type `struct RefundsSerialContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +refunds_serial_helper_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct RefundsSerialContext *rsc = cls; + + for (unsigned int i=0;i<num_results;i++) { struct TALER_EXCHANGEDB_Refund refund; struct TALER_DenominationPublicKey denom_pub; uint64_t rowid; - struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_auto_from_type ("merchant_pub", &refund.merchant_pub), @@ -5254,80 +5224,112 @@ postgres_select_refunds_above_serial_id (void *cls, &rowid), GNUNET_PQ_result_spec_end }; + int ret; + if (GNUNET_OK != GNUNET_PQ_extract_result (result, rs, i)) { GNUNET_break (0); - PQclear (result); - return GNUNET_SYSERR; + rsc->status = GNUNET_SYSERR; + return; } - ret = cb (cb_cls, - rowid, - &denom_pub, - &refund.coin.coin_pub, - &refund.merchant_pub, - &refund.merchant_sig, - &refund.h_contract_terms, - refund.rtransaction_id, - &refund.refund_amount); + ret = rsc->cb (rsc->cb_cls, + rowid, + &denom_pub, + &refund.coin.coin_pub, + &refund.merchant_pub, + &refund.merchant_sig, + &refund.h_contract_terms, + refund.rtransaction_id, + &refund.refund_amount); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } - PQclear (result); - return GNUNET_OK; } /** - * Select inbound wire transfers into reserves_in above @a serial_id - * in monotonically increasing order. + * Select refunds above @a serial_id in monotonically increasing + * order. * * @param cls closure * @param session database connection * @param serial_id highest serial ID to exclude (select strictly larger) * @param cb function to call on each result * @param cb_cls closure for @a cb - * @return #GNUNET_OK on success, - * #GNUNET_SYSERR on DB errors + * @return transaction status code */ -static int -postgres_select_reserves_in_above_serial_id (void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_ReserveInCallback cb, - void *cb_cls) +static enum GNUNET_DB_QueryStatus +postgres_select_refunds_above_serial_id (void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_RefundCallback cb, + void *cb_cls) { struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_end }; - PGresult *result; - result = GNUNET_PQ_exec_prepared (session->conn, - "audit_reserves_in_get_transactions_incr", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - int nrows; - int ret; + struct RefundsSerialContext rsc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; - nrows = PQntuples (result); - if (0 == nrows) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "select_reserves_in_above_serial_id() returned 0 matching rows\n"); - PQclear (result); - return GNUNET_NO; - } + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "audit_get_refunds_incr", + params, + &refunds_serial_helper_cb, + &rsc); + if (GNUNET_OK != rsc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} + + +/** + * Closure for #reserves_in_serial_helper_cb(). + */ +struct ReservesInSerialContext +{ - for (int i=0;i<nrows;i++) + /** + * Callback to call. + */ + TALER_EXCHANGEDB_ReserveInCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Status code, set to #GNUNET_SYSERR on hard errors. + */ + int status; +}; + + +/** + * Helper function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure of type `struct ReservesInSerialContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +reserves_in_serial_helper_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct ReservesInSerialContext *risc = cls; + + for (unsigned int i=0;i<num_results;i++) { struct TALER_ReservePublicKeyP reserve_pub; struct TALER_Amount credit; @@ -5336,7 +5338,6 @@ postgres_select_reserves_in_above_serial_id (void *cls, uint64_t rowid; void *wire_reference; size_t wire_reference_size; - struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_auto_from_type ("reserve_pub", &reserve_pub), @@ -5353,6 +5354,7 @@ postgres_select_reserves_in_above_serial_id (void *cls, &rowid), GNUNET_PQ_result_spec_end }; + int ret; if (GNUNET_OK != GNUNET_PQ_extract_result (result, @@ -5360,29 +5362,26 @@ postgres_select_reserves_in_above_serial_id (void *cls, i)) { GNUNET_break (0); - PQclear (result); - return GNUNET_SYSERR; + risc->status = GNUNET_SYSERR; + return; } - ret = cb (cb_cls, - rowid, - &reserve_pub, - &credit, - sender_account_details, - wire_reference, - wire_reference_size, - execution_date); + ret = risc->cb (risc->cb_cls, + rowid, + &reserve_pub, + &credit, + sender_account_details, + wire_reference, + wire_reference_size, + execution_date); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } - - PQclear (result); - return GNUNET_OK; } /** - * Select withdraw operations from reserves_out above @a serial_id + * Select inbound wire transfers into reserves_in above @a serial_id * in monotonically increasing order. * * @param cls closure @@ -5390,44 +5389,76 @@ postgres_select_reserves_in_above_serial_id (void *cls, * @param serial_id highest serial ID to exclude (select strictly larger) * @param cb function to call on each result * @param cb_cls closure for @a cb - * @return #GNUNET_OK on success, - * #GNUNET_NO if no records were found - * #GNUNET_SYSERR on DB errors + * @return transaction status code */ -static int -postgres_select_reserves_out_above_serial_id (void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_WithdrawCallback cb, - void *cb_cls) +static enum GNUNET_DB_QueryStatus +postgres_select_reserves_in_above_serial_id (void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_ReserveInCallback cb, + void *cb_cls) { struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_end }; - PGresult *result; - result = GNUNET_PQ_exec_prepared (session->conn, - "audit_get_reserves_out_incr", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - int nrows; - int ret; + struct ReservesInSerialContext risc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; + + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "audit_reserves_in_get_transactions_incr", + params, + &reserves_in_serial_helper_cb, + &risc); + if (GNUNET_OK != risc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} - nrows = PQntuples (result); - if (0 == nrows) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "select_reserves_out_above_serial_id() returned 0 matching rows\n"); - PQclear (result); - return GNUNET_NO; - } - for (int i=0;i<nrows;i++) + +/** + * Closure for #reserves_out_serial_helper_cb(). + */ +struct ReservesOutSerialContext +{ + + /** + * Callback to call. + */ + TALER_EXCHANGEDB_WithdrawCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Status code, set to #GNUNET_SYSERR on hard errors. + */ + int status; +}; + + +/** + * Helper function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure of type `struct ReservesOutSerialContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +reserves_out_serial_helper_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct ReservesOutSerialContext *rosc = cls; + + for (unsigned int i=0;i<num_results;i++) { struct GNUNET_HashCode h_blind_ev; struct TALER_DenominationPublicKey denom_pub; @@ -5437,7 +5468,6 @@ postgres_select_reserves_out_above_serial_id (void *cls, struct GNUNET_TIME_Absolute execution_date; struct TALER_Amount amount_with_fee; uint64_t rowid; - struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_auto_from_type ("h_blind_ev", &h_blind_ev), @@ -5457,89 +5487,118 @@ postgres_select_reserves_out_above_serial_id (void *cls, &rowid), GNUNET_PQ_result_spec_end }; + int ret; + if (GNUNET_OK != GNUNET_PQ_extract_result (result, rs, i)) { GNUNET_break (0); - PQclear (result); - return GNUNET_SYSERR; + rosc->status = GNUNET_SYSERR; + return; } - ret = cb (cb_cls, - rowid, - &h_blind_ev, - &denom_pub, - &denom_sig, - &reserve_pub, - &reserve_sig, - execution_date, - &amount_with_fee); + ret = rosc->cb (rosc->cb_cls, + rowid, + &h_blind_ev, + &denom_pub, + &denom_sig, + &reserve_pub, + &reserve_sig, + execution_date, + &amount_with_fee); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } - - PQclear (result); - return GNUNET_OK; } /** - * Function called to select all wire transfers the exchange - * executed. + * Select withdraw operations from reserves_out above @a serial_id + * in monotonically increasing order. * * @param cls closure * @param session database connection * @param serial_id highest serial ID to exclude (select strictly larger) - * @param cb function to call for ONE unfinished item + * @param cb function to call on each result * @param cb_cls closure for @a cb - * @return #GNUNET_OK on success, - * #GNUNET_NO if there are no entries, - * #GNUNET_SYSERR on DB errors + * @return transaction status code */ -static int -postgres_select_wire_out_above_serial_id (void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_WireTransferOutCallback cb, - void *cb_cls) +static enum GNUNET_DB_QueryStatus +postgres_select_reserves_out_above_serial_id (void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_WithdrawCallback cb, + void *cb_cls) { struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_end }; - PGresult *result; - int nrows; - int ret; + struct ReservesOutSerialContext rosc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; - result = GNUNET_PQ_exec_prepared (session->conn, - "audit_get_wire_incr", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "audit_get_reserves_out_incr", + params, + &reserves_out_serial_helper_cb, + &rosc); + if (GNUNET_OK != rosc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} - nrows = PQntuples (result); - if (0 == nrows) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "select_prepare_above_serial_id() returned 0 matching rows\n"); - PQclear (result); - return GNUNET_NO; - } - for (int i=0;i<nrows;i++) + +/** + * Closure for #wire_out_serial_helper_cb(). + */ +struct WireOutSerialContext +{ + + /** + * Callback to call. + */ + TALER_EXCHANGEDB_WireTransferOutCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Status code, set to #GNUNET_SYSERR on hard errors. + */ + int status; +}; + + +/** + * Helper function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure of type `struct WireOutSerialContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +wire_out_serial_helper_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct WireOutSerialContext *wosc = cls; + + for (unsigned int i=0;i<num_results;i++) { uint64_t rowid; struct GNUNET_TIME_Absolute date; struct TALER_WireTransferIdentifierRawP wtid; json_t *wire; struct TALER_Amount amount; - struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_uint64 ("wireout_uuid", &rowid), @@ -5553,6 +5612,7 @@ postgres_select_wire_out_above_serial_id (void *cls, &amount), GNUNET_PQ_result_spec_end }; + int ret; if (GNUNET_OK != GNUNET_PQ_extract_result (result, @@ -5560,73 +5620,101 @@ postgres_select_wire_out_above_serial_id (void *cls, i)) { GNUNET_break (0); - PQclear (result); - return GNUNET_SYSERR; + wosc->status = GNUNET_SYSERR; + return; } - - ret = cb (cb_cls, - rowid, - date, - &wtid, - wire, - &amount); + ret = wosc->cb (wosc->cb_cls, + rowid, + date, + &wtid, + wire, + &amount); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } - - PQclear (result); - return GNUNET_OK; } /** - * Function called to select payback requests the exchange - * received, ordered by serial ID (monotonically increasing). + * Function called to select all wire transfers the exchange + * executed. * * @param cls closure * @param session database connection - * @param serial_id lowest serial ID to include (select larger or equal) + * @param serial_id highest serial ID to exclude (select strictly larger) * @param cb function to call for ONE unfinished item * @param cb_cls closure for @a cb - * @return #GNUNET_OK on success, - * #GNUNET_NO if there are no entries, - * #GNUNET_SYSERR on DB errors + * @return transaction status code */ -static int -postgres_select_payback_above_serial_id (void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_PaybackCallback cb, - void *cb_cls) +static enum GNUNET_DB_QueryStatus +postgres_select_wire_out_above_serial_id (void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_WireTransferOutCallback cb, + void *cb_cls) { struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_end }; - PGresult *result; - result = GNUNET_PQ_exec_prepared (session->conn, - "payback_get_incr", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - int nrows; - int ret; + struct WireOutSerialContext wosc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; - nrows = PQntuples (result); - if (0 == nrows) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "select_prepare_above_serial_id() returned 0 matching rows\n"); - PQclear (result); - return GNUNET_NO; - } - for (int i=0;i<nrows;i++) + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "audit_get_wire_incr", + params, + &wire_out_serial_helper_cb, + &wosc); + if (GNUNET_OK != wosc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} + + +/** + * Closure for #payback_serial_helper_cb(). + */ +struct PaybackSerialContext +{ + + /** + * Callback to call. + */ + TALER_EXCHANGEDB_PaybackCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Status code, set to #GNUNET_SYSERR on hard errors. + */ + int status; +}; + + +/** + * Helper function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure of type `struct PaybackSerialContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +payback_serial_helper_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct PaybackSerialContext *psc = cls; + + for (unsigned int i=0;i<num_results;i++) { uint64_t rowid; struct TALER_ReservePublicKeyP reserve_pub; @@ -5659,6 +5747,7 @@ postgres_select_payback_above_serial_id (void *cls, &amount), GNUNET_PQ_result_spec_end }; + int ret; if (GNUNET_OK != GNUNET_PQ_extract_result (result, @@ -5666,74 +5755,103 @@ postgres_select_payback_above_serial_id (void *cls, i)) { GNUNET_break (0); - PQclear (result); - return GNUNET_SYSERR; + psc->status = GNUNET_SYSERR; + return; } - ret = cb (cb_cls, - rowid, - timestamp, - &amount, - &reserve_pub, - &coin, - &coin_sig, - &coin_blind); + ret = psc->cb (psc->cb_cls, + rowid, + timestamp, + &amount, + &reserve_pub, + &coin, + &coin_sig, + &coin_blind); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } - - PQclear (result); - return GNUNET_OK; } /** - * Function called to select reserve close operations the aggregator - * triggered, ordered by serial ID (monotonically increasing). + * Function called to select payback requests the exchange + * received, ordered by serial ID (monotonically increasing). * * @param cls closure * @param session database connection * @param serial_id lowest serial ID to include (select larger or equal) * @param cb function to call for ONE unfinished item * @param cb_cls closure for @a cb - * @return #GNUNET_OK on success, - * #GNUNET_NO if there are no entries, - * #GNUNET_SYSERR on DB errors + * @return transaction status code */ -static int -postgres_select_reserve_closed_above_serial_id (void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_ReserveClosedCallback cb, - void *cb_cls) +static enum GNUNET_DB_QueryStatus +postgres_select_payback_above_serial_id (void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_PaybackCallback cb, + void *cb_cls) { struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_end }; - PGresult *result; - result = GNUNET_PQ_exec_prepared (session->conn, - "reserves_close_get_incr", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - int nrows; - int ret; + struct PaybackSerialContext psc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; + + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "payback_get_incr", + params, + &payback_serial_helper_cb, + &psc); + if (GNUNET_OK != psc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} - nrows = PQntuples (result); - if (0 == nrows) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "select_reserve_closed_above_serial_id() returned 0 matching rows\n"); - PQclear (result); - return GNUNET_NO; - } - for (int i=0;i<nrows;i++) + +/** + * Closure for #reserve_closed_serial_helper_cb(). + */ +struct ReserveClosedSerialContext +{ + + /** + * Callback to call. + */ + TALER_EXCHANGEDB_ReserveClosedCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + + /** + * Status code, set to #GNUNET_SYSERR on hard errors. + */ + int status; +}; + + +/** + * Helper function to be called with the results of a SELECT statement + * that has returned @a num_results results. + * + * @param cls closure of type `struct ReserveClosedSerialContext` + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +reserve_closed_serial_helper_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct ReserveClosedSerialContext *rcsc = cls; + + for (unsigned int i=0;i<num_results;i++) { uint64_t rowid; struct TALER_ReservePublicKeyP reserve_pub; @@ -5759,6 +5877,7 @@ postgres_select_reserve_closed_above_serial_id (void *cls, &closing_fee), GNUNET_PQ_result_spec_end }; + int ret; if (GNUNET_OK != GNUNET_PQ_extract_result (result, @@ -5766,24 +5885,61 @@ postgres_select_reserve_closed_above_serial_id (void *cls, i)) { GNUNET_break (0); - PQclear (result); - return GNUNET_SYSERR; + rcsc->status = GNUNET_SYSERR; + return; } - ret = cb (cb_cls, - rowid, - execution_date, - &amount_with_fee, - &closing_fee, - &reserve_pub, - receiver_account, - &wtid); + ret = rcsc->cb (rcsc->cb_cls, + rowid, + execution_date, + &amount_with_fee, + &closing_fee, + &reserve_pub, + receiver_account, + &wtid); GNUNET_PQ_cleanup_result (rs); if (GNUNET_OK != ret) break; } +} - PQclear (result); - return GNUNET_OK; + +/** + * Function called to select reserve close operations the aggregator + * triggered, ordered by serial ID (monotonically increasing). + * + * @param cls closure + * @param session database connection + * @param serial_id lowest serial ID to include (select larger or equal) + * @param cb function to call for ONE unfinished item + * @param cb_cls closure for @a cb + * @return transaction status code + */ +static enum GNUNET_DB_QueryStatus +postgres_select_reserve_closed_above_serial_id (void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_ReserveClosedCallback cb, + void *cb_cls) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&serial_id), + GNUNET_PQ_query_param_end + }; + struct ReserveClosedSerialContext rcsc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; + + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "reserves_close_get_incr", + params, + &reserve_closed_serial_helper_cb, + &rcsc); + if (GNUNET_OK != rcsc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; } @@ -5828,32 +5984,13 @@ postgres_insert_payback_request (void *cls, GNUNET_PQ_query_param_auto_from_type (h_blind_ev), GNUNET_PQ_query_param_end }; - int ret; enum GNUNET_DB_QueryStatus qs; /* check if the coin is already known */ - // FIXME: #5010! - ret = get_known_coin (cls, - session, - &coin->coin_pub, - NULL); - if (GNUNET_SYSERR == ret) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - if (GNUNET_NO == ret) /* if not, insert it */ - { - qs = insert_known_coin (cls, - session, - coin); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - } - + if (0 > (qs = ensure_coin_known (cls, + session, + coin))) + return qs; /* now store actual payback information */ qs = GNUNET_PQ_eval_prepared_non_select (session->conn, "payback_insert", @@ -5968,11 +6105,9 @@ postgres_insert_denomination_revocation (void *cls, * @param denom_pub_hash hash of the revoked denomination key * @param[out] master_sig signature affirming the revocation * @param[out] rowid row where the information is stored - * @return #GNUNET_OK on success, - * #GNUNET_NO no such entry exists - * #GNUNET_SYSERR on DB errors + * @return transaction status code */ -static int +static enum GNUNET_DB_QueryStatus postgres_get_denomination_revocation (void *cls, struct TALER_EXCHANGEDB_Session *session, const struct GNUNET_HashCode *denom_pub_hash, @@ -5988,42 +6123,11 @@ postgres_get_denomination_revocation (void *cls, GNUNET_PQ_result_spec_uint64 ("denom_revocations_serial_id", rowid), GNUNET_PQ_result_spec_end }; - PGresult *result; - int nrows; - result = GNUNET_PQ_exec_prepared (session->conn, - "denomination_revocation_get", - params); - if (PGRES_TUPLES_OK != - PQresultStatus (result)) - { - BREAK_DB_ERR (result, session->conn); - PQclear (result); - return GNUNET_SYSERR; - } - nrows = PQntuples (result); - if (0 == nrows) - { - /* no matches found */ - PQclear (result); - return GNUNET_NO; - } - if (1 != nrows) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - if (GNUNET_OK != - GNUNET_PQ_extract_result (result, - rs, - 0)) - { - PQclear (result); - GNUNET_break (0); - return GNUNET_SYSERR; - } - PQclear (result); - return GNUNET_OK; + return GNUNET_PQ_eval_prepared_singleton_select (session->conn, + "denomination_revocation_get", + params, + rs); } @@ -6143,7 +6247,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) plugin->select_reserves_out_above_serial_id = &postgres_select_reserves_out_above_serial_id; plugin->select_wire_out_above_serial_id = &postgres_select_wire_out_above_serial_id; plugin->select_payback_above_serial_id = &postgres_select_payback_above_serial_id; - plugin->select_reserve_closed_above_serial_id = &postgres_select_reserve_closed_above_serial_id; + plugin->select_reserve_closed_above_serial_id = &postgres_select_reserve_closed_above_serial_id; plugin->insert_payback_request = &postgres_insert_payback_request; plugin->get_reserve_by_h_blind = &postgres_get_reserve_by_h_blind; plugin->insert_denomination_revocation = &postgres_insert_denomination_revocation; |