diff options
author | Christian Grothoff <christian@grothoff.org> | 2021-09-05 15:25:46 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2021-09-05 15:25:57 +0200 |
commit | ae8d481e1ce9f694a42619809d2c9b6e6acf3497 (patch) | |
tree | 1b0554139c53f7dde177d5cd74a9b3800b3adb33 /src/exchangedb | |
parent | adc6c53b5c7e08ff6eba180f872a8a8f8f94cd65 (diff) |
implement taler-exchange-transfer DB sharding logic
Diffstat (limited to 'src/exchangedb')
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 123 | ||||
-rw-r--r-- | src/exchangedb/test_exchangedb.c | 6 |
2 files changed, 100 insertions, 29 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index d66370a25..817c1a18c 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1188,11 +1188,12 @@ prepare_statements (struct PostgresClosure *pg) ",type" ",buf" " FROM prewire" - " WHERE finished=FALSE" + " WHERE prewire_uuid >= $1" + " AND finished=FALSE" " AND failed=FALSE" " ORDER BY prewire_uuid ASC" - " LIMIT 1;", - 0), + " LIMIT $2;", + 2), /* Used in #postgres_select_deposits_missing_wire */ GNUNET_PQ_make_prepare ("deposits_get_overdue", "SELECT" @@ -6985,51 +6986,115 @@ postgres_wire_prepare_data_mark_failed ( /** + * Closure for #prewire_cb(). + */ +struct PrewireContext +{ + /** + * Function to call on each result. + */ + TALER_EXCHANGEDB_WirePreparationIterator cb; + + /** + * Closure for @a cb. + */ + void *cb_cls; + + /** + * #GNUNET_OK if everything went fine. + */ + enum GNUNET_GenericReturnValue status; +}; + + +/** + * Invoke the callback for each result. + * + * @param cls a `struct MissingWireContext *` + * @param result SQL result + * @param num_results number of rows in @a result + */ +static void +prewire_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct PrewireContext *pc = cls; + + for (unsigned int i = 0; i < num_results; i++) + { + uint64_t prewire_uuid; + char *type; + void *buf = NULL; + size_t buf_size; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("prewire_uuid", + &prewire_uuid), + GNUNET_PQ_result_spec_string ("type", + &type), + GNUNET_PQ_result_spec_variable_size ("buf", + &buf, + &buf_size), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_OK != + GNUNET_PQ_extract_result (result, + rs, + i)) + { + GNUNET_break (0); + pc->status = GNUNET_SYSERR; + return; + } + pc->cb (pc->cb_cls, + prewire_uuid, + type, + buf, + buf_size); + GNUNET_PQ_cleanup_result (rs); + } +} + + +/** * Function called to get an unfinished wire transfer * preparation data. Fetches at most one item. * * @param cls closure + * @param start_row offset to query table at + * @param limit maximum number of results to return * @param cb function to call for ONE unfinished item * @param cb_cls closure for @a cb * @return transaction status code */ static enum GNUNET_DB_QueryStatus postgres_wire_prepare_data_get (void *cls, + uint64_t start_row, + uint64_t limit, TALER_EXCHANGEDB_WirePreparationIterator cb, void *cb_cls) { struct PostgresClosure *pg = cls; - enum GNUNET_DB_QueryStatus qs; struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&start_row), + GNUNET_PQ_query_param_uint64 (&limit), GNUNET_PQ_query_param_end }; - uint64_t prewire_uuid; - char *type; - void *buf = NULL; - size_t buf_size; - struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_uint64 ("prewire_uuid", - &prewire_uuid), - GNUNET_PQ_result_spec_string ("type", - &type), - GNUNET_PQ_result_spec_variable_size ("buf", - &buf, - &buf_size), - GNUNET_PQ_result_spec_end + struct PrewireContext pc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK }; + enum GNUNET_DB_QueryStatus qs; - qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, - "wire_prepare_data_get", - params, - rs); - if (0 >= qs) - return qs; - cb (cb_cls, - prewire_uuid, - type, - buf, - buf_size); - GNUNET_PQ_cleanup_result (rs); + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + "wire_prepare_data_get", + params, + &prewire_cb, + &pc); + if (GNUNET_OK != pc.status) + return GNUNET_DB_STATUS_HARD_ERROR; return qs; } diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 8478fac0d..b332cd6d2 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -117,6 +117,8 @@ test_wire_prepare (void) { FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->wire_prepare_data_get (plugin->cls, + 0, + 1, &dead_prepare_cb, NULL)); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != @@ -126,10 +128,14 @@ test_wire_prepare (void) 11)); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != plugin->wire_prepare_data_get (plugin->cls, + 0, + 1, &mark_prepare_cb, NULL)); FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->wire_prepare_data_get (plugin->cls, + 0, + 1, &dead_prepare_cb, NULL)); return GNUNET_OK; |