/*
This file is part of TALER
Copyright (C) 2022-2023 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
TALER; see the file COPYING. If not, see
*/
/**
* @file exchangedb/pg_reserves_in_insert.c
* @brief Implementation of the reserves_in_insert function for Postgres
* @author Christian Grothoff
* @author Joseph Xu
*/
#include "platform.h"
#include "taler_error_codes.h"
#include "taler_dbevents.h"
#include "taler_pq_lib.h"
#include "pg_reserves_in_insert.h"
#include "pg_helper.h"
#include "pg_start.h"
#include "pg_start_read_committed.h"
#include "pg_commit.h"
#include "pg_preflight.h"
#include "pg_rollback.h"
#include "pg_reserves_get.h"
#include "pg_reserves_update.h"
#include "pg_setup_wire_target.h"
#include "pg_event_notify.h"
/**
* Generate event notification for the reserve change.
*
* @param reserve_pub reserve to notfiy on
* @return string to pass to postgres for the notification
*/
static char *
compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
{
struct TALER_ReserveEventP rep = {
.header.size = htons (sizeof (rep)),
.header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
.reserve_pub = *reserve_pub
};
return GNUNET_PQ_get_event_notify_channel (&rep.header);
}
/**
* Closure for our helper_cb()
*/
struct Context
{
/**
* Array of reserve UUIDs to initialize.
*/
uint64_t *reserve_uuids;
/**
* Array with entries set to 'true' for duplicate transactions.
*/
bool *transaction_duplicates;
/**
* Array with entries set to 'true' for rows with conflicts.
*/
bool *conflicts;
/**
* Set to #GNUNET_SYSERR on failures.
*/
enum GNUNET_GenericReturnValue status;
/**
* Single value (no array) set to true if we need
* to follow-up with an update.
*/
bool needs_update;
};
/**
* Helper function to be called with the results of a SELECT statement
* that has returned @a num_results results.
*
* @param cls closure of type `struct Context *`
* @param result the postgres result
* @param num_results the number of results in @a result
*/
static void
helper_cb (void *cls,
PGresult *result,
unsigned int num_results)
{
struct Context *ctx = cls;
for (unsigned int i = 0; itransaction_duplicates[i]),
GNUNET_PQ_result_spec_allow_null (
GNUNET_PQ_result_spec_uint64 ("ruuid",
&ctx->reserve_uuids[i]),
&ctx->conflicts[i]),
GNUNET_PQ_result_spec_end
};
if (GNUNET_OK !=
GNUNET_PQ_extract_result (result,
rs,
i))
{
GNUNET_break (0);
ctx->status = GNUNET_SYSERR;
return;
}
if (! ctx->transaction_duplicates[i])
ctx->needs_update |= ctx->conflicts[i];
}
}
enum GNUNET_DB_QueryStatus
TEH_PG_reserves_in_insert (
void *cls,
const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
unsigned int reserves_length,
enum GNUNET_DB_QueryStatus *results)
{
struct PostgresClosure *pg = cls;
unsigned int dups = 0;
struct TALER_PaytoHashP h_paytos[GNUNET_NZL (reserves_length)];
char *notify_s[GNUNET_NZL (reserves_length)];
struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
const char *sender_account_details[GNUNET_NZL (reserves_length)];
const char *exchange_account_names[GNUNET_NZL (reserves_length)];
uint64_t wire_references[GNUNET_NZL (reserves_length)];
uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
bool transaction_duplicates[GNUNET_NZL (reserves_length)];
bool conflicts[GNUNET_NZL (reserves_length)];
struct GNUNET_TIME_Timestamp reserve_expiration
= GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
struct GNUNET_TIME_Timestamp gc
= GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
enum GNUNET_DB_QueryStatus qs;
bool need_update;
for (unsigned int i = 0; isender_account_details,
&h_paytos[i]);
notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
reserve_pubs[i] = *reserve->reserve_pub;
balances[i] = *reserve->balance;
execution_times[i] = reserve->execution_time;
sender_account_details[i] = reserve->sender_account_details;
exchange_account_names[i] = reserve->exchange_account_name;
wire_references[i] = reserve->wire_reference;
}
/* NOTE: kind-of pointless to explicitly start a transaction here... */
if (GNUNET_OK !=
TEH_PG_preflight (pg))
{
GNUNET_break (0);
qs = GNUNET_DB_STATUS_HARD_ERROR;
goto finished;
}
if (GNUNET_OK !=
TEH_PG_start_read_committed (pg,
"READ_COMMITED"))
{
GNUNET_break (0);
qs = GNUNET_DB_STATUS_HARD_ERROR;
goto finished;
}
PREPARE (pg,
"reserves_insert_with_array",
"SELECT"
" transaction_duplicate"
",ruuid"
" FROM exchange_do_array_reserves_insert"
" ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10);");
{
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_timestamp (&gc),
GNUNET_PQ_query_param_timestamp (&reserve_expiration),
GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
reserve_pubs,
pg->conn),
GNUNET_PQ_query_param_array_uint64 (reserves_length,
wire_references,
pg->conn),
TALER_PQ_query_param_array_amount (
reserves_length,
balances,
pg->conn),
GNUNET_PQ_query_param_array_ptrs_string (
reserves_length,
(const char **) exchange_account_names,
pg->conn),
GNUNET_PQ_query_param_array_timestamp (
reserves_length,
execution_times,
pg->conn),
GNUNET_PQ_query_param_array_auto_from_type (
reserves_length,
h_paytos,
pg->conn),
GNUNET_PQ_query_param_array_ptrs_string (
reserves_length,
(const char **) sender_account_details,
pg->conn),
GNUNET_PQ_query_param_array_ptrs_string (
reserves_length,
(const char **) notify_s,
pg->conn),
GNUNET_PQ_query_param_end
};
struct Context ctx = {
.reserve_uuids = reserve_uuids,
.transaction_duplicates = transaction_duplicates,
.conflicts = conflicts,
.needs_update = false,
.status = GNUNET_OK
};
qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
"reserves_insert_with_array",
params,
&helper_cb,
&ctx);
GNUNET_PQ_cleanup_query_params_closures (params);
if ( (qs < 0) ||
(GNUNET_OK != ctx.status) )
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to insert into reserves (%d)\n",
qs);
goto finished;
}
need_update = ctx.needs_update;
}
{
enum GNUNET_DB_QueryStatus cs;
cs = TEH_PG_commit (pg);
if (cs < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to commit\n");
qs = cs;
goto finished;
}
}
for (unsigned int i = 0; iconn,
&balances[i]),
GNUNET_PQ_query_param_string (exchange_account_names[i]),
GNUNET_PQ_query_param_auto_from_type (&h_paytos[i]),
GNUNET_PQ_query_param_string (notify_s[i]),
GNUNET_PQ_query_param_end
};
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_bool ("duplicate",
&duplicate),
GNUNET_PQ_result_spec_end
};
enum GNUNET_DB_QueryStatus qsi;
qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
"reserves_update",
params,
rs);
if (qsi < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to update reserves (%d)\n",
qsi);
results[i] = qsi;
goto finished;
}
results[i] = duplicate
? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
}
{
enum GNUNET_DB_QueryStatus cs;
cs = TEH_PG_commit (pg);
if (cs < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to commit\n");
qs = cs;
goto finished;
}
}
finished:
for (unsigned int i = 0; iconn);
if (0 != dups)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
dups,
reserves_length);
return qs;
}