aboutsummaryrefslogtreecommitdiff
path: root/src/auditor
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-01-11 23:02:22 +0100
committerChristian Grothoff <christian@grothoff.org>2021-01-11 23:02:22 +0100
commit2518da8f4581d868dd8eafabc54e6b2ddcc998d4 (patch)
treef2ec4cc7cab30dec29107818feb35ee2dab9b4e5 /src/auditor
parente3156e88a783e5fdfc5e31bcc6f357d33af7f634 (diff)
taler-auditor-sync WiP
Diffstat (limited to 'src/auditor')
-rw-r--r--src/auditor/taler-auditor-sync.c203
1 files changed, 176 insertions, 27 deletions
diff --git a/src/auditor/taler-auditor-sync.c b/src/auditor/taler-auditor-sync.c
index a76c9a0be..fae3d2188 100644
--- a/src/auditor/taler-auditor-sync.c
+++ b/src/auditor/taler-auditor-sync.c
@@ -52,6 +52,182 @@ static unsigned int transaction_size = 512;
*/
static unsigned int actual_size;
+static struct Table
+{
+ enum TALER_EXCHANGEDB_ReplicatedTable rt;
+ uint64_t start_serial;
+ uint64_t end_serial;
+ bool end;
+} tables[] = {
+ { .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS},
+ { .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES_IN},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES_CLOSE},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES_OUT},
+ { .rt = TALER_EXCHANGEDB_RT_AUDITORS},
+ { .rt = TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS},
+ { .rt = TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS},
+ { .rt = TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS},
+ { .rt = TALER_EXCHANGEDB_RT_KNOWN_COINS},
+ { .rt = TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS},
+ { .rt = TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS},
+ { .rt = TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS},
+ { .rt = TALER_EXCHANGEDB_RT_DEPOSITS},
+ { .rt = TALER_EXCHANGEDB_RT_REFUNDS},
+ { .rt = TALER_EXCHANGEDB_RT_WIRE_OUT},
+ { .rt = TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING},
+ { .rt = TALER_EXCHANGEDB_RT_WIRE_FEE},
+ { .rt = TALER_EXCHANGEDB_RT_RECOUP},
+ { .rt = TALER_EXCHANGEDB_RT_RECOUP_REFRESH },
+ { .end = true }
+};
+
+
+/**
+ * Function called on data to replicate in the auditor's database.
+ *
+ * @param cls closure
+ * @param td record from an exchange table
+ * @return #GNUNET_OK to continue to iterate,
+ * #GNUNET_SYSERR to fail with an error
+ */
+static int
+do_insert (void *cls,
+ const struct TALER_EXCHANGEDB_TableData *td)
+{
+ // FIXME ...
+}
+
+
+/**
+ * Run one replication transaction.
+ *
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to rollback
+ */
+static int
+transact (struct TALER_EXCHANGEDB_Session *ss,
+ struct TALER_EXCHANGEDB_Session *ds)
+{
+ if (GNUNET_OK !=
+ src->start (src->cls,
+ ss,
+ "lookup src serials"))
+ return GNUNET_SYSERR;
+ for (unsigned int i = 0; ! tables[i].end; i++)
+ src->lookup_serial_by_table (src->cls,
+ ss,
+ tables[i].rt,
+ &tables[i].end_serial);
+ if (GNUNET_OK !=
+ src->commit (src->cls,
+ ss))
+ return GNUNET_SYSERR;
+ if (GNUNET_OK !=
+ dst->start (src->cls,
+ ds,
+ "lookup dst serials"))
+ return GNUNET_SYSERR;
+ for (unsigned int i = 0; ! tables[i].end; i++)
+ dst->lookup_serial_by_table (dst->cls,
+ ds,
+ tables[i].rt,
+ &tables[i].start_serial);
+ if (GNUNET_OK !=
+ dst->commit (dst->cls,
+ ds))
+ return GNUNET_SYSERR;
+ for (unsigned int i = 0; ! tables[i].end; i++)
+ {
+ printf ("%d ", i);
+ fflush (stdout);
+ while (tables[i].start_serial < tables[i].end_serial)
+ {
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (GNUNET_OK !=
+ src->start (src->cls,
+ ss,
+ "copy table (src)"))
+ return GNUNET_SYSERR;
+ if (GNUNET_OK !=
+ dst->start (dst->cls,
+ ds,
+ "copy table (dst)"))
+ return GNUNET_SYSERR;
+ qs = src->lookup_records_by_table (src->cls,
+ ss,
+ tables[i].rt,
+ tables[i].start_serial,
+ &do_insert,
+ ds);
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ {
+ global_ret = 3;
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ return GNUNET_SYSERR; /* will retry */
+ if (0 == qs)
+ {
+ GNUNET_break (0); /* should be impossible */
+ global_ret = 4;
+ return GNUNET_SYSERR;
+ }
+ }
+ }
+ /* we do not care about conflicting UPDATEs to src table, so safe to just rollback */
+ src->rollback (src->cls,
+ ss);
+ if (GNUNET_OK !=
+ dst->commit (dst->cls,
+ ds))
+ return GNUNET_SYSERR;
+ printf ("\n");
+ return GNUNET_OK;
+}
+
+
+/**
+ * Task to do the actual synchronization work.
+ *
+ * @param cls NULL, unused
+ */
+static void
+do_sync (void *cls)
+{
+ struct GNUNET_TIME_Relative delay;
+ struct TALER_EXCHANGEDB_Session *ss;
+ struct TALER_EXCHANGEDB_Session *ds;
+
+ sync_task = NULL;
+ actual_size = 0;
+ ss = src->get_session (src->cls);
+ ds = dst->get_session (dst->cls);
+ if (GNUNET_OK !=
+ transact (ss,
+ ds))
+ {
+ src->rollback (src->cls,
+ ss);
+ dst->rollback (dst->cls,
+ ds);
+ }
+ if (0 != global_ret)
+ return;
+ if (actual_size < transaction_size / 2)
+ {
+ delay = GNUNET_TIME_STD_BACKOFF (delay);
+ }
+ else if (actual_size >= transaction_size)
+ {
+ delay = GNUNET_TIME_UNIT_ZERO;
+ }
+ sync_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &do_sync,
+ NULL);
+}
+
/**
* Set an option of type 'char *' from the command line with
@@ -151,33 +327,6 @@ load_config (const char *cfgfile)
/**
- * Task to do the actual synchronization work.
- *
- * @param cls NULL, unused
- */
-static void
-do_sync (void *cls)
-{
- struct GNUNET_TIME_Relative delay;
-
- sync_task = NULL;
- actual_size = 0;
- // FIXME: do real work here!
- if (actual_size < transaction_size / 2)
- {
- delay = GNUNET_TIME_STD_BACKOFF (delay);
- }
- else if (actual_size >= transaction_size)
- {
- delay = GNUNET_TIME_UNIT_ZERO;
- }
- sync_task = GNUNET_SCHEDULER_add_delayed (delay,
- &do_sync,
- NULL);
-}
-
-
-/**
* Shutdown task.
*
* @param cls NULL, unused