diff options
author | Christian Grothoff <christian@grothoff.org> | 2021-06-20 16:41:04 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2021-06-20 16:41:04 +0200 |
commit | 108bf57d048a135cb71f9453540c9d6579ae2028 (patch) | |
tree | 6a67bbcf7cb1ab049a5d9ba426e878b1b493dd10 /src | |
parent | 0271e848138a94e27f472196f5341879fd3ab8ba (diff) |
preparations for sharded wirewatch
Diffstat (limited to 'src')
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 22 | ||||
-rw-r--r-- | src/exchangedb/exchange-0002.sql | 34 | ||||
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 297 | ||||
-rw-r--r-- | src/include/taler_exchangedb_plugin.h | 41 |
4 files changed, 382 insertions, 12 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 760dbe10b..28fa81e7e 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -528,8 +528,20 @@ find_transfers (void *cls) GNUNET_SCHEDULER_shutdown (); return; } + wa_pos->delay = true; + wa_pos->current_batch_size = 0; /* reset counter */ + wa_pos->session = session; + if (wa_pos->shard_end == wa_pos->last_row_off) + { + /* advance to next shard */ + // FIXME: if other processes are running in parallel, + // update 'last_row_off' to next free shard! + wa_pos->shard_end = wa_pos->last_row_off + shard_size; + } if (! wa_pos->reset_mode) { + // FIXME: need good way to fetch + // shard data here! qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls, session, wa_pos->section_name, @@ -553,16 +565,8 @@ find_transfers (void *cls) NULL); return; } - wa_pos->reset_mode = GNUNET_NO; - } - wa_pos->delay = true; - wa_pos->current_batch_size = 0; /* reset counter */ - wa_pos->session = session; - if (wa_pos->shard_end == wa_pos->last_row_off) - { - /* advance to next shard */ - wa_pos->shard_end += shard_size; } + wa_pos->reset_mode = true; limit = GNUNET_MIN (wa_pos->batch_size, wa_pos->shard_end - wa_pos->last_row_off); GNUNET_assert (NULL == wa_pos->hh); diff --git a/src/exchangedb/exchange-0002.sql b/src/exchangedb/exchange-0002.sql index b03a7b512..361b69b8d 100644 --- a/src/exchangedb/exchange-0002.sql +++ b/src/exchangedb/exchange-0002.sql @@ -1,6 +1,6 @@ -- -- This file is part of TALER --- Copyright (C) 2020 Taler Systems SA +-- Copyright (C) 2020-2021 Taler Systems SA -- -- TALER is free software; you can redistribute it and/or modify it under the -- terms of the GNU General Public License as published by the Free Software @@ -374,5 +374,37 @@ COMMENT ON TABLE signkey_revocations IS 'remembering which online signing keys have been revoked'; + +CREATE TABLE IF NOT EXISTS work_shards + (shard_serial_id BIGSERIAL UNIQUE + ,last_attempt INT8 NOT NULL + ,start_row INT8 NOT NULL + ,end_row INT8 NOT NULL + ,completed BOOLEAN NOT NULL + ,job_name VARCHAR NOT NULL + ,PRIMARY KEY (job_name, start_row) + ); +CREATE INDEX IF NOT EXISTS work_shards_index + ON work_shards + (job_name + ,completed + ,last_attempt + ); +COMMENT ON TABLE work_shards + IS 'coordinates work between multiple processes working on the same job'; +COMMENT ON COLUMN work_shards.shard_serial_id + IS 'unique serial number identifying the shard'; +COMMENT ON COLUMN work_shards.last_attempt + IS 'last time a worker attempted to work on the shard'; +COMMENT ON COLUMN work_shards.completed + IS 'set to TRUE once the shard is finished by a worker'; +COMMENT ON COLUMN work_shards.start_row + IS 'row at which the shard scope starts, inclusive'; +COMMENT ON COLUMN work_shards.end_row + IS 'row at which the shard scope ends, exclusive'; +COMMENT ON COLUMN work_shards.job_name + IS 'unique name of the job the workers on this shard are performing'; + + -- Complete transaction COMMIT; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 04dc03cdd..e61a1ac7a 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2014--2020 Taler Systems SA + Copyright (C) 2014--2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -2438,6 +2438,52 @@ postgres_get_session (void *cls) ") VALUES " "($1, $2, $3, $4, $5, $6, $7, $8);", 8), + + /* Used in #postgres_begin_shard() */ + GNUNET_PQ_make_prepare ("get_open_shard", + "SELECT" + " start_row" + ",end_row" + " FROM work_shards" + " WHERE job_name=$1" + " AND last_attempt<$2" + " AND completed=FALSE" + " ORDER BY last_attempt ASC" + " LIMIT 1;", + 2), + GNUNET_PQ_make_prepare ("reclaim_shard", + "UPDATE work_shards" + " SET last_attempt=$2" + " WHERE job_name=$1" + " AND start_row=$3" + " AND end_row=$4", + 4), + GNUNET_PQ_make_prepare ("get_last_shard", + "SELECT" + " end_row" + " FROM work_shards" + " WHERE job_name=$1" + " AND completed=FALSE" + " ORDER BY end_row DESC" + " LIMIT 1;", + 1), + GNUNET_PQ_make_prepare ("claim_next_shard", + "INSERT INTO work_shards" + "(job_name" + ",last_attempt" + ",start_row" + ",end_row" + ") VALUES " + "($1, $2, $3, $4);", + 4), + /* Used in #postgres_complete_shard() */ + GNUNET_PQ_make_prepare ("complete_shard", + "UPDATE work_shards" + " SET completed=TRUE" + " WHERE job_name=$1" + " AND start_row=$2" + " AND end_row=$3", + 3), GNUNET_PQ_PREPARED_STATEMENT_END }; @@ -10150,6 +10196,251 @@ postgres_insert_records_by_table (void *cls, /** + * Function called to grab a work shard on an operation @a op. Runs in its + * own transaction (hence no session provided). + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param job_name name of the operation to grab a word shard for + * @param delay minimum age of a shard to grab + * @param size desired shard size + * @param[out] start_row inclusive start row of the shard (returned) + * @param[out] end_row exclusive end row of the shard (returned) + * @return transaction status code + */ +static enum GNUNET_DB_QueryStatus +postgres_begin_shard (void *cls, + const char *job_name, + struct GNUNET_TIME_Relative delay, + uint64_t shard_size, + uint64_t *start_row, + uint64_t *end_row) +{ + struct TALER_EXCHANGEDB_Session *session; + + session = postgres_get_session (cls); + if (NULL == session) + return GNUNET_DB_STATUS_HARD_ERROR; + for (unsigned int retries = 0; retries<3; retries++) + { + if (GNUNET_OK != + postgres_start (cls, + session, + "begin_shard")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + + { + struct GNUNET_TIME_Absolute past; + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_absolute_time (&past), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("start_row", + start_row), + GNUNET_PQ_result_spec_uint64 ("end_row", + end_row), + GNUNET_PQ_result_spec_end + }; + + past = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + delay); + qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn, + "get_open_shard", + params, + rs); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (cls, + session); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (cls, + session); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + { + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Absolute now; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_uint64 (start_row), + GNUNET_PQ_query_param_uint64 (end_row), + GNUNET_PQ_query_param_end + }; + + now = GNUNET_TIME_absolute_get (); + qs = GNUNET_PQ_eval_prepared_non_select (session->conn, + "reclaim_shard", + params); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (cls, + session); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (cls, + session); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + goto commit; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); /* logic error, should be impossible */ + postgres_rollback (cls, + session); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + break; /* actually unreachable */ + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + break; /* continued below */ + } + } /* get_open_shard */ + + /* No open shard, find last 'end_row' */ + { + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("end_row", + start_row), + GNUNET_PQ_result_spec_end + }; + + qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn, + "get_last_shard", + params, + rs); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (cls, + session); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (cls, + session); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + *start_row = 0; /* base-case: no shards yet */ + break; /* continued below */ + } + *end_row = *start_row + shard_size; + } /* get_last_shard */ + + /* Claim fresh shard */ + { + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Absolute now; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_uint64 (start_row), + GNUNET_PQ_query_param_uint64 (end_row), + GNUNET_PQ_query_param_end + }; + + now = GNUNET_TIME_absolute_get (); + qs = GNUNET_PQ_eval_prepared_non_select (session->conn, + "claim_next_shard", + params); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (cls, + session); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (cls, + session); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below */ + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + postgres_rollback (cls, + session); + continue; + } + } /* claim_next_shard */ + + /* commit */ +commit: + { + enum GNUNET_DB_QueryStatus qs; + + qs = postgres_commit (cls, + session); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (cls, + session); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (cls, + session); + continue; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + } + } /* retry 'for' loop */ + return GNUNET_DB_STATUS_SOFT_ERROR; +} + + +/** + * Function called to persist that work on a shard was completed. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param session a session + * @param job_name name of the operation to grab a word shard for + * @param start_row inclusive start row of the shard + * @param end_row exclusive end row of the shard + * @return transaction status code + */ +enum GNUNET_DB_QueryStatus +postgres_complete_shard (void *cls, + struct TALER_EXCHANGEDB_Session *session, + const char *job_name, + uint64_t start_row, + uint64_t end_row) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_uint64 (&start_row), + GNUNET_PQ_query_param_uint64 (&end_row), + GNUNET_PQ_query_param_end + }; + + (void) cls; + return GNUNET_PQ_eval_prepared_non_select (session->conn, + "complete_shard", + params); +} + + +/** * Initialize Postgres database subsystem. * * @param cls a configuration instance @@ -10353,6 +10644,10 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) = &postgres_lookup_records_by_table; plugin->insert_records_by_table = &postgres_insert_records_by_table; + plugin->begin_shard + = &postgres_begin_shard; + plugin->complete_shard + = &postgres_complete_shard; return plugin; } diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 686edb867..75e8f8bb6 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2014-2020 Taler Systems SA + Copyright (C) 2014-2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -3820,6 +3820,45 @@ struct TALER_EXCHANGEDB_Plugin struct TALER_EXCHANGEDB_Session *session, const struct TALER_EXCHANGEDB_TableData *td); + + /** + * Function called to grab a work shard on an operation @a op. Runs in its + * own transaction (hence no session provided). + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param job_name name of the operation to grab a word shard for + * @param delay minimum age of a shard to grab + * @param size desired shard size + * @param[out] start_row inclusive start row of the shard (returned) + * @param[out] end_row exclusive end row of the shard (returned) + * @return transaction status code + */ + enum GNUNET_DB_QueryStatus + (*begin_shard)(void *cls, + const char *job_name, + struct GNUNET_TIME_Relative delay, + uint64_t shard_size, + uint64_t *start_row, + uint64_t *end_row); + + + /** + * Function called to persist that work on a shard was completed. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param session a session + * @param job_name name of the operation to grab a word shard for + * @param start_row inclusive start row of the shard + * @param end_row exclusive end row of the shard + * @return transaction status code + */ + enum GNUNET_DB_QueryStatus + (*complete_shard)(void *cls, + struct TALER_EXCHANGEDB_Session *session, + const char *job_name, + uint64_t start_row, + uint64_t end_row); + }; #endif /* _TALER_EXCHANGE_DB_H */ |