From 945821cbc842644f9d10de4ef36e8ef03d2d2253 Mon Sep 17 00:00:00 2001 From: Joseph Date: Thu, 10 Nov 2022 10:37:28 -0500 Subject: move few more functions --- src/exchangedb/pg_begin_revolving_shard.c | 263 ++++++++++++++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 src/exchangedb/pg_begin_revolving_shard.c (limited to 'src/exchangedb/pg_begin_revolving_shard.c') diff --git a/src/exchangedb/pg_begin_revolving_shard.c b/src/exchangedb/pg_begin_revolving_shard.c new file mode 100644 index 000000000..888d7fd20 --- /dev/null +++ b/src/exchangedb/pg_begin_revolving_shard.c @@ -0,0 +1,263 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ +/** + * @file exchangedb/pg_begin_revolving_shard.c + * @brief Implementation of the begin_revolving_shard function for Postgres + * @author Christian Grothoff + */ +#include "platform.h" +#include "taler_error_codes.h" +#include "taler_dbevents.h" +#include "taler_pq_lib.h" +#include "pg_begin_revolving_shard.h" +#include "pg_commit.h" +#include "pg_helper.h" +#include "pg_start.h" +#include "pg_rollback.h" + +enum GNUNET_DB_QueryStatus +TEH_PG_begin_revolving_shard (void *cls, + const char *job_name, + uint32_t shard_size, + uint32_t shard_limit, + uint32_t *start_row, + uint32_t *end_row) +{ + struct PostgresClosure *pg = cls; + + GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX); + GNUNET_assert (shard_limit > 0); + GNUNET_assert (shard_size > 0); + for (unsigned int retries = 0; retries<3; retries++) + { + if (GNUNET_OK != + TEH_PG_start (pg, + "begin_revolving_shard")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + + /* First, find last 'end_row' */ + { + enum GNUNET_DB_QueryStatus qs; + uint32_t last_end; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint32 ("end_row", + &last_end), + GNUNET_PQ_result_spec_end + }; + /* Used in #postgres_begin_revolving_shard() */ + PREPARE(pg, + "get_last_revolving_shard", + "SELECT" + " end_row" + " FROM revolving_work_shards" + " WHERE job_name=$1" + " ORDER BY end_row DESC" + " LIMIT 1;"); + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "get_last_revolving_shard", + params, + rs); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + TEH_PG_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + *start_row = 1U + last_end; + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + *start_row = 0; /* base-case: no shards yet */ + break; /* continued below */ + } + } /* get_last_shard */ + + if (*start_row < shard_limit) + { + /* Claim fresh shard */ + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Absolute now; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_uint32 (start_row), + GNUNET_PQ_query_param_uint32 (end_row), + GNUNET_PQ_query_param_end + }; + + *end_row = GNUNET_MIN (shard_limit, + *start_row + shard_size - 1); + now = GNUNET_TIME_absolute_get (); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Trying to claim shard %llu-%llu\n", + (unsigned long long) *start_row, + (unsigned long long) *end_row); + + /* Used in #postgres_claim_revolving_shard() */ + PREPARE (pg, + "create_revolving_shard", + "INSERT INTO revolving_work_shards" + "(job_name" + ",last_attempt" + ",start_row" + ",end_row" + ",active" + ") VALUES " + "($1, $2, $3, $4, TRUE);"); + qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, + "create_revolving_shard", + params); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + TEH_PG_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below (with commit) */ + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* someone else got this shard already, + try again */ + TEH_PG_rollback (pg); + continue; + } + } /* end create fresh reovlving shard */ + else + { + /* claim oldest existing shard */ + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint32 ("start_row", + start_row), + GNUNET_PQ_result_spec_uint32 ("end_row", + end_row), + GNUNET_PQ_result_spec_end + }; + /* Used in #postgres_begin_revolving_shard() */ + PREPARE (pg, + "get_open_revolving_shard", + "SELECT" + " start_row" + ",end_row" + " FROM revolving_work_shards" + " WHERE job_name=$1" + " AND active=FALSE" + " ORDER BY last_attempt ASC" + " LIMIT 1;"); + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "get_open_revolving_shard", + params, + rs); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + TEH_PG_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* no open shards available */ + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + { + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Timestamp now; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_timestamp (&now), + GNUNET_PQ_query_param_uint32 (start_row), + GNUNET_PQ_query_param_uint32 (end_row), + GNUNET_PQ_query_param_end + }; + + now = GNUNET_TIME_timestamp_get (); + + /* Used in #postgres_begin_revolving_shard() */ + PREPARE (pg, + "reclaim_revolving_shard", + "UPDATE revolving_work_shards" + " SET last_attempt=$2" + " ,active=TRUE" + " WHERE job_name=$1" + " AND start_row=$3" + " AND end_row=$4"); + qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, + "reclaim_revolving_shard", + params); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + TEH_PG_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + break; /* continue with commit */ + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); /* logic error, should be impossible */ + TEH_PG_rollback (pg); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + break; /* continue with commit */ + } + } /* end claim oldest existing shard */ + + /* commit */ + { + enum GNUNET_DB_QueryStatus qs; + + qs = TEH_PG_commit (pg); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + TEH_PG_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + TEH_PG_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + } + } /* retry 'for' loop */ + return GNUNET_DB_STATUS_SOFT_ERROR; +} -- cgit v1.2.3