aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/taler_mintdb_plugin.h54
-rw-r--r--src/mint/taler-mint-aggregator.c21
-rw-r--r--src/mintdb/plugin_mintdb_postgres.c295
3 files changed, 278 insertions, 92 deletions
diff --git a/src/include/taler_mintdb_plugin.h b/src/include/taler_mintdb_plugin.h
index 65c694a72..11b305839 100644
--- a/src/include/taler_mintdb_plugin.h
+++ b/src/include/taler_mintdb_plugin.h
@@ -530,22 +530,24 @@ struct TALER_MINTDB_Session;
* corresponding wire transaction.
*
* @param cls closure
- * @param id transaction ID (used as future `min_id` to avoid
- * iterating over transactions more than once)
+ * @param rowid unique ID for the deposit in our DB, used for marking
+ * it as 'tiny' or 'done'
+ * @param merchant_pub public key of the merchant
+ * @param coin_pub public key of the coin
* @param amount_with_fee amount that was deposited including fee
* @param deposit_fee amount the mint gets to keep as transaction fees
* @param transaction_id unique transaction ID chosen by the merchant
* @param h_contract hash of the contract between merchant and customer
* @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed
- * @param wire wire details for the merchant
+ * @param wire wire details for the merchant, NULL from iterate_matching_deposits()
* @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
*/
typedef int
(*TALER_MINTDB_DepositIterator)(void *cls,
- // unsigned long long rowid, /* ? */
- // May also need/want Merchant pub!?
- uint64_t id,
+ unsigned long long rowid,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *amount_with_fee,
const struct TALER_Amount *deposit_fee,
uint64_t transaction_id,
@@ -940,10 +942,10 @@ struct TALER_MINTDB_Plugin
* #GNUNET_SYSERR on error
*/
int
- (*iterate_ready_deposits) (void *cls,
- struct TALER_MINTDB_Session *session,
- TALER_MINTDB_DepositIterator deposit_cb,
- void *deposit_cb_cls);
+ (*get_ready_deposit) (void *cls,
+ struct TALER_MINTDB_Session *session,
+ TALER_MINTDB_DepositIterator deposit_cb,
+ void *deposit_cb_cls);
/**
@@ -953,9 +955,10 @@ struct TALER_MINTDB_Plugin
* @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database
* @param h_wire destination of the wire transfer
- * @param FIXME: do we also need merchant_pub here?
+ * @param merchant_pub public key of the merchant
* @param deposit_cb function to call for each deposit
* @param deposit_cb_cls closure for @a deposit_cb
+ * @param limit maximum number of matching deposits to return
* @return number of rows processed, 0 if none exist,
* #GNUNET_SYSERR on error
*/
@@ -963,33 +966,10 @@ struct TALER_MINTDB_Plugin
(*iterate_matching_deposits) (void *cls,
struct TALER_MINTDB_Session *session,
const struct GNUNET_HashCode *h_wire,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
TALER_MINTDB_DepositIterator deposit_cb,
- void *deposit_cb_cls);
-
-
- /**
- * Obtain information about deposits. Iterates over all deposits
- * above a certain ID. Use a @a min_id of 0 to start at the beginning.
- * This operation is executed in its own transaction in transaction
- * mode "REPEATABLE READ", i.e. we should only see valid deposits.
- *
- * @param cls the @e cls of this struct with the plugin-specific state
- * @param session connection to the database
- * @param min_id deposit to start at
- * @param limit maximum number of transactions to fetch
- * @param deposit_cb function to call for each deposit
- * @param deposit_cb_cls closure for @a deposit_cb
- * @return number of rows processed, 0 if none exist,
- * #GNUNET_SYSERR on error
- * @deprecated this is likely dead
- */
- int
- (*iterate_deposits) (void *cls,
- struct TALER_MINTDB_Session *session,
- uint64_t min_id,
- uint32_t limit,
- TALER_MINTDB_DepositIterator deposit_cb,
- void *deposit_cb_cls);
+ void *deposit_cb_cls,
+ uint32_t limit);
/**
diff --git a/src/mint/taler-mint-aggregator.c b/src/mint/taler-mint-aggregator.c
index 70a68d007..ee0f6ab22 100644
--- a/src/mint/taler-mint-aggregator.c
+++ b/src/mint/taler-mint-aggregator.c
@@ -148,8 +148,9 @@ mint_serve_process_config (const char *mint_directory)
* with the goal of executing the corresponding wire transaction.
*
* @param cls closure
- * @param id transaction ID (used as future `min_id` to avoid
- * iterating over transactions more than once)
+ * @param row_id identifies database entry
+ * @param merchant_pub public key of the merchant
+ * @param coin_pub public key of the coin
* @param amount_with_fee amount that was deposited including fee
* @param deposit_fee amount the mint gets to keep as transaction fees
* @param transaction_id unique transaction ID chosen by the merchant
@@ -161,7 +162,9 @@ mint_serve_process_config (const char *mint_directory)
*/
static int
deposit_cb (void *cls,
- uint64_t id,
+ unsigned long long row_id,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *amount_with_fee,
const struct TALER_Amount *deposit_fee,
uint64_t transaction_id,
@@ -204,12 +207,12 @@ run (void *cls,
*global_ret = GNUNET_SYSERR;
return;
}
- ret = db_plugin->iterate_deposits (db_plugin->cls,
- session,
- 0 /* FIXME: remove? */,
- 128 /* FIXME: make configurable? */,
- &deposit_cb,
- NULL);
+ ret = db_plugin->get_ready_deposit (db_plugin->cls,
+ session,
+ &deposit_cb,
+ NULL);
+ // FIXME: handle 0 == ret...
+
if (GNUNET_OK != ret)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
diff --git a/src/mintdb/plugin_mintdb_postgres.c b/src/mintdb/plugin_mintdb_postgres.c
index e9a9466b2..5ebf8dc7b 100644
--- a/src/mintdb/plugin_mintdb_postgres.c
+++ b/src/mintdb/plugin_mintdb_postgres.c
@@ -425,7 +425,7 @@ postgres_create_tables (void *cls,
/* This table contains the wire transfers the mint is supposed to
execute to transmit funds to the merchants (and manage refunds). */
SQLEXEC("CREATE TABLE IF NOT EXISTS deposits "
- "(serial_id BIGSERIAL"
+ "(serial_id BIGSERIAL PRIMARY KEY"
",coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)"
",denom_pub BYTEA NOT NULL REFERENCES denominations (pub)"
",denom_sig BYTEA NOT NULL"
@@ -444,6 +444,8 @@ postgres_create_tables (void *cls,
",h_wire BYTEA NOT NULL CHECK (LENGTH(h_wire)=64)"
",coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)"
",wire TEXT NOT NULL"
+ ",tiny BOOLEAN NOT NULL DEFAULT false"
+ ",done BOOLEAN NOT NULL DEFAULT false"
")");
/* Index for get_deposit statement on coin_pub, transaction_id and merchant_pub */
SQLEXEC_INDEX("CREATE INDEX deposits_coin_pub_index "
@@ -899,8 +901,8 @@ postgres_prepare (PGconn *db_conn)
" )",
5, NULL);
- /* Used in #postgres_iterate_deposits() */
- PREPARE ("deposits_iterate",
+ /* Used in #postgres_get_ready_deposit() */
+ PREPARE ("deposits_get_ready",
"SELECT"
" serial_id"
",amount_with_fee_val"
@@ -913,11 +915,50 @@ postgres_prepare (PGconn *db_conn)
",transaction_id"
",h_contract"
",wire"
+ ",merchant_pub"
+ ",coin_pub"
" FROM deposits"
- " WHERE serial_id>=$1"
- " ORDER BY serial_id ASC"
- " LIMIT $2;",
- 2, NULL);
+ " WHERE"
+ " tiny=false AND"
+ " done=false"
+ " ORDER BY execution_time ASC"
+ " LIMIT 1;",
+ 0, NULL);
+ /* Used in #postgres_iterate_matching_deposits() */
+ PREPARE ("deposits_iterate_matching",
+ "SELECT"
+ " serial_id"
+ ",amount_with_fee_val"
+ ",amount_with_fee_frac"
+ ",amount_with_fee_curr"
+ ",deposit_fee_val"
+ ",deposit_fee_frac"
+ ",deposit_fee_curr"
+ ",wire_deadline"
+ ",transaction_id"
+ ",h_contract"
+ ",coin_pub"
+ " FROM deposits"
+ " WHERE"
+ " merchant_pub=$1 AND"
+ " h_wire=$2 AND"
+ " done=false"
+ " ORDER BY execution_time ASC"
+ " LIMIT $3",
+ 3, NULL);
+ /* Used in #postgres_mark_deposit_tiny() */
+ PREPARE ("mark_deposit_tiny",
+ "UPDATE deposits"
+ " SET tiny=true"
+ " WHERE serial_id=$1",
+ 1, NULL);
+ /* Used in #postgres_mark_deposit_done() */
+ PREPARE ("mark_deposit_done",
+ "UPDATE deposits"
+ " SET done=true"
+ " WHERE serial_id=$1",
+ 1, NULL);
+
/* Used in #postgres_get_coin_transactions() to obtain information
about how a coin has been spend with /deposit requests. */
PREPARE ("get_deposit_with_coin_pub",
@@ -2039,82 +2080,239 @@ postgres_have_deposit (void *cls,
/**
- * Obtain information about deposits. Iterates over all deposits
- * above a certain ID. Use a @a min_id of 0 to start at the beginning.
- * This operation is executed in its own transaction in transaction
- * mode "REPEATABLE READ", i.e. we should only see valid deposits.
+ * Mark a deposit as tiny, thereby declaring that it cannot be
+ * executed by itself and should no longer be returned by
+ * @e iterate_ready_deposits()
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database
- * @param min_id deposit to start at
- * @param limit maximum number of transactions to fetch
- * @param deposit_cb function to call for each deposit
+ * @param deposit_rowid identifies the deposit row to modify
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
+ */
+static int
+postgres_mark_deposit_tiny (void *cls,
+ struct TALER_MINTDB_Session *session,
+ unsigned long long rowid)
+{
+ uint64_t serial_id = rowid;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_uint64 (&serial_id),
+ TALER_PQ_query_param_end
+ };
+ PGresult *result;
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "mark_deposit_tiny",
+ params);
+ if (PGRES_COMMAND_OK !=
+ PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Mark a deposit as done, thereby declaring that it cannot be
+ * executed at all anymore, and should no longer be returned by
+ * @e iterate_ready_deposits() or @e iterate_matching_deposits().
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session connection to the database
+ * @param deposit_rowid identifies the deposit row to modify
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
+ */
+static int
+postgres_mark_deposit_done (void *cls,
+ struct TALER_MINTDB_Session *session,
+ unsigned long long rowid)
+{
+ uint64_t serial_id = rowid;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_uint64 (&serial_id),
+ TALER_PQ_query_param_end
+ };
+ PGresult *result;
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "mark_deposit_done",
+ params);
+ if (PGRES_COMMAND_OK !=
+ PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Obtain information about deposits that are ready to be executed.
+ * Such deposits must not be marked as "tiny" or "done", and the
+ * execution time must be in the past.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session connection to the database
+ * @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @return number of rows processed, 0 if none exist,
* #GNUNET_SYSERR on error
*/
static int
-postgres_iterate_deposits (void *cls,
- struct TALER_MINTDB_Session *session,
- uint64_t min_id,
- uint32_t limit,
- TALER_MINTDB_DepositIterator deposit_cb,
- void *deposit_cb_cls)
+postgres_get_ready_deposit (void *cls,
+ struct TALER_MINTDB_Session *session,
+ TALER_MINTDB_DepositIterator deposit_cb,
+ void *deposit_cb_cls)
{
struct TALER_PQ_QueryParam params[] = {
- TALER_PQ_query_param_uint64 (&min_id),
- TALER_PQ_query_param_uint32 (&limit),
TALER_PQ_query_param_end
};
PGresult *result;
- unsigned int i;
unsigned int n;
+ int ret;
- if (GNUNET_OK !=
- postgres_start (cls, session))
- return GNUNET_SYSERR;
- result = PQexec (session->conn,
- "SET TRANSACTION REPEATABLE READ");
- if (PGRES_COMMAND_OK !=
+ result = TALER_PQ_exec_prepared (session->conn,
+ "deposits_get_ready",
+ params);
+ if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- TALER_LOG_ERROR ("Failed to set transaction to REPEATABL EREAD: %s\n",
- PQresultErrorMessage (result));
- GNUNET_break (0);
+ BREAK_DB_ERR (result);
PQclear (result);
return GNUNET_SYSERR;
}
+ if (0 == (n = PQntuples (result)))
+ {
+ PQclear (result);
+ return 0;
+ }
+ GNUNET_break (1 == n);
+ {
+ struct TALER_Amount amount_with_fee;
+ struct TALER_Amount deposit_fee;
+ struct GNUNET_TIME_Absolute wire_deadline;
+ struct GNUNET_HashCode h_contract;
+ struct TALER_MerchantPublicKeyP merchant_pub;
+ struct TALER_CoinSpendPublicKeyP coin_pub;
+ uint64_t transaction_id;
+ uint64_t serial_id;
+ json_t *wire;
+ struct TALER_PQ_ResultSpec rs[] = {
+ TALER_PQ_result_spec_uint64 ("serial_id",
+ &serial_id),
+ TALER_PQ_result_spec_uint64 ("transaction_id",
+ &transaction_id),
+ TALER_PQ_result_spec_amount ("amount_with_fee",
+ &amount_with_fee),
+ TALER_PQ_result_spec_amount ("deposit_fee",
+ &deposit_fee),
+ TALER_PQ_result_spec_absolute_time ("wire_deadline",
+ &wire_deadline),
+ TALER_PQ_result_spec_auto_from_type ("h_contract",
+ &h_contract),
+ TALER_PQ_result_spec_auto_from_type ("merchant_pub",
+ &merchant_pub),
+ TALER_PQ_result_spec_auto_from_type ("coin_pub",
+ &coin_pub),
+ TALER_PQ_result_spec_json ("wire",
+ &wire),
+ TALER_PQ_result_spec_end
+ };
+ if (GNUNET_OK !=
+ TALER_PQ_extract_result (result, rs, 0))
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ ret = deposit_cb (deposit_cb_cls,
+ serial_id,
+ &merchant_pub,
+ &coin_pub,
+ &amount_with_fee,
+ &deposit_fee,
+ transaction_id,
+ &h_contract,
+ wire_deadline,
+ wire);
+ TALER_PQ_cleanup_result (rs);
+ PQclear (result);
+ }
+ return (GNUNET_OK == ret) ? 1 : 0;
+}
+
+
+/**
+ * Obtain information about other pending deposits for the same
+ * destination. Those deposits must not already be "done".
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session connection to the database
+ * @param h_wire destination of the wire transfer
+ * @param merchant_pub public key of the merchant
+ * @param deposit_cb function to call for each deposit
+ * @param deposit_cb_cls closure for @a deposit_cb
+ * @param limit maximum number of matching deposits to return
+ * @return number of rows processed, 0 if none exist,
+ * #GNUNET_SYSERR on error
+ */
+static int
+postgres_iterate_matching_deposits (void *cls,
+ struct TALER_MINTDB_Session *session,
+ const struct GNUNET_HashCode *h_wire,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ TALER_MINTDB_DepositIterator deposit_cb,
+ void *deposit_cb_cls,
+ uint32_t limit)
+{
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_auto_from_type (merchant_pub),
+ TALER_PQ_query_param_auto_from_type (h_wire),
+ TALER_PQ_query_param_uint32 (&limit),
+ TALER_PQ_query_param_end
+ };
+ PGresult *result;
+ unsigned int i;
+ unsigned int n;
result = TALER_PQ_exec_prepared (session->conn,
- "deposits_iterate",
+ "deposits_iterate_matching",
params);
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
BREAK_DB_ERR (result);
PQclear (result);
- postgres_rollback (cls, session);
return GNUNET_SYSERR;
}
if (0 == (n = PQntuples (result)))
{
PQclear (result);
- postgres_rollback (cls, session);
return 0;
}
+ if (n > limit)
+ n = limit;
for (i=0;i<n;i++)
{
struct TALER_Amount amount_with_fee;
struct TALER_Amount deposit_fee;
struct GNUNET_TIME_Absolute wire_deadline;
struct GNUNET_HashCode h_contract;
- json_t *wire;
+ struct TALER_MerchantPublicKeyP merchant_pub;
+ struct TALER_CoinSpendPublicKeyP coin_pub;
uint64_t transaction_id;
- uint64_t id;
+ uint64_t serial_id;
int ret;
struct TALER_PQ_ResultSpec rs[] = {
- TALER_PQ_result_spec_uint64 ("id",
- &id),
+ TALER_PQ_result_spec_uint64 ("serial_id",
+ &serial_id),
TALER_PQ_result_spec_uint64 ("transaction_id",
&transaction_id),
TALER_PQ_result_spec_amount ("amount_with_fee",
@@ -2125,8 +2323,10 @@ postgres_iterate_deposits (void *cls,
&wire_deadline),
TALER_PQ_result_spec_auto_from_type ("h_contract",
&h_contract),
- TALER_PQ_result_spec_json ("wire",
- &wire),
+ TALER_PQ_result_spec_auto_from_type ("merchant_pub",
+ &merchant_pub),
+ TALER_PQ_result_spec_auto_from_type ("coin_pub",
+ &coin_pub),
TALER_PQ_result_spec_end
};
if (GNUNET_OK !=
@@ -2134,23 +2334,23 @@ postgres_iterate_deposits (void *cls,
{
GNUNET_break (0);
PQclear (result);
- postgres_rollback (cls, session);
return GNUNET_SYSERR;
}
ret = deposit_cb (deposit_cb_cls,
- id,
+ serial_id,
+ &merchant_pub,
+ &coin_pub,
&amount_with_fee,
&deposit_fee,
transaction_id,
&h_contract,
wire_deadline,
- wire);
+ NULL);
TALER_PQ_cleanup_result (rs);
PQclear (result);
if (GNUNET_OK != ret)
break;
}
- postgres_rollback (cls, session);
return i;
}
@@ -3838,7 +4038,10 @@ libtaler_plugin_mintdb_postgres_init (void *cls)
plugin->get_reserve_history = &postgres_get_reserve_history;
plugin->free_reserve_history = &common_free_reserve_history;
plugin->have_deposit = &postgres_have_deposit;
- plugin->iterate_deposits = &postgres_iterate_deposits;
+ plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny;
+ plugin->mark_deposit_done = &postgres_mark_deposit_done;
+ plugin->get_ready_deposit = &postgres_get_ready_deposit;
+ plugin->iterate_matching_deposits = &postgres_iterate_matching_deposits;
plugin->insert_deposit = &postgres_insert_deposit;
plugin->get_refresh_session = &postgres_get_refresh_session;
plugin->create_refresh_session = &postgres_create_refresh_session;