aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2024-12-01 15:46:09 +0100
committerChristian Grothoff <christian@grothoff.org>2024-12-01 15:46:09 +0100
commitf89d3f3698ed52d0dd91fc38cce49896c9aac56c (patch)
tree696e8040387227b610e1cffc85f55afd918f3301
parent2fdfd1fc182f2645636eb3d253abf08fe5ccc606 (diff)
start to work on #9303
-rw-r--r--debian/etc-taler-auditor/taler-auditor/conf.d/auditor-system.conf13
-rw-r--r--src/exchange/taler-exchange-aggregator.c62
2 files changed, 43 insertions, 32 deletions
diff --git a/debian/etc-taler-auditor/taler-auditor/conf.d/auditor-system.conf b/debian/etc-taler-auditor/taler-auditor/conf.d/auditor-system.conf
new file mode 100644
index 000000000..0cc933309
--- /dev/null
+++ b/debian/etc-taler-auditor/taler-auditor/conf.d/auditor-system.conf
@@ -0,0 +1,13 @@
+# Configuration settings for system parameters of the auditor.
+#
+# Read secret sections into configuration, but only
+# if we have permission to do so.
+@inline-secret@ auditordb-postgres ../secrets/auditor-db.secret.conf
+
+[auditor]
+
+# Only supported database is Postgres right now.
+DATABASE = postgres
+
+SERVE = unix
+UNIXPATH_MODE = 666
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index eb117800e..79257449f 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -119,13 +119,6 @@ struct AggregationUnit
uint64_t requirement_row;
/**
- * Set to #GNUNET_OK during transient checking
- * while everything is OK. Otherwise see return
- * value of #do_aggregate().
- */
- enum GNUNET_GenericReturnValue ret;
-
- /**
* Do we have an entry in the transient table for
* this aggregation?
*/
@@ -515,6 +508,24 @@ fail_aggregation (struct AggregationUnit *au)
/**
+ * Run the next task with the given shard @a s.
+ *
+ * @param s shard to run, NULL to run more drain jobs
+ */
+static void
+run_task_with_shard (struct Shard *s)
+{
+ GNUNET_assert (NULL == task);
+ if (NULL == s)
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ else
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ s);
+}
+
+
+/**
* The aggregation process failed with a serialization
* issue. Rollback the transaction and try again.
*
@@ -527,9 +538,7 @@ rollback_aggregation (struct AggregationUnit *au)
cleanup_au (au);
db_plugin->rollback (db_plugin->cls);
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
+ run_task_with_shard (s);
}
@@ -543,9 +552,11 @@ commit_aggregation (struct AggregationUnit *au)
{
struct Shard *s = au->shard;
- cleanup_au (au);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Committing aggregation result\n");
+ "Committing aggregation result over %s to %s\n",
+ TALER_amount2s (&au->final_amount),
+ au->payto_uri.full_payto);
+ cleanup_au (au);
/* Now we can finally commit the overall transaction, as we are
again consistent if all of this passes. */
@@ -555,9 +566,7 @@ commit_aggregation (struct AggregationUnit *au)
/* try again */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serialization issue on commit; trying again later!\n");
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
+ run_task_with_shard (s);
return;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
@@ -569,9 +578,7 @@ commit_aggregation (struct AggregationUnit *au)
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Commit complete, going again\n");
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
+ run_task_with_shard (s);
return;
default:
GNUNET_break (0);
@@ -597,8 +604,7 @@ commit_to_transient (struct AggregationUnit *au)
enum GNUNET_DB_QueryStatus qs;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Not ready for wire transfer (%d/%s)\n",
- qs,
+ "Not ready for wire transfer (%s)\n",
TALER_amount2s (&au->final_amount));
if (au->have_transient)
qs = db_plugin->update_aggregation_transient (db_plugin->cls,
@@ -898,7 +904,7 @@ evaluate_lrs (
if (NULL == requirement)
{
TALER_KYCLOGIC_rules_free (lrs);
- commit_aggregation (au);
+ trigger_wire_transfer (au);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -1166,9 +1172,7 @@ run_aggregation (void *cls)
case GNUNET_DB_STATUS_SOFT_ERROR:
cleanup_au (au);
db_plugin->rollback (db_plugin->cls);
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
+ run_task_with_shard (s);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
{
@@ -1306,7 +1310,7 @@ run_shard (void *cls)
* @param total amount aggregated so far
* @return true to continue to iterate
*/
-static void
+static bool
handle_transient_cb (
void *cls,
const struct TALER_FullPayto payto_uri,
@@ -1316,11 +1320,6 @@ handle_transient_cb (
{
struct AggregationUnit *au = cls;
- if (GNUNET_OK != au->ret)
- {
- GNUNET_break (0);
- return false;
- }
au->payto_uri.full_payto
= GNUNET_strdup (payto_uri.full_payto);
TALER_full_payto_hash (payto_uri,
@@ -1405,12 +1404,11 @@ drain_kyc_alerts (void *cls)
break;
}
- au->ret = GNUNET_OK;
/* FIXME: should be replaced with a query that has a LIMIT 1... */
qs = db_plugin->find_aggregation_transient (db_plugin->cls,
&au->h_normalized_payto,
&handle_transient_cb,
- &au);
+ au);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR: