/* This file is part of TALER Copyright (C) 2014-2016 GNUnet e.V. TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with TALER; see the file COPYING. If not, see */ /** * @file plugin_auditordb_postgres.c * @brief Low-level (statement-level) Postgres database access for the auditor * @author Florian Dold * @author Christian Grothoff * @author Sree Harsha Totakura */ #include "platform.h" #include "taler_pq_lib.h" #include "taler_auditordb_plugin.h" #include #include /** * Log a query error. * * @param result PQ result object of the query that failed */ #define QUERY_ERR(result) \ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Query failed at %s:%u: %s (%s)\n", __FILE__, __LINE__, PQresultErrorMessage (result), PQresStatus (PQresultStatus (result))) /** * Log a really unexpected PQ error. * * @param result PQ result object of the PQ operation that failed */ #define BREAK_DB_ERR(result) do { \ GNUNET_break (0); \ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Database failure: %s (%s)\n", PQresultErrorMessage (result), PQresStatus (PQresultStatus (result))); \ } while (0) /** * Shorthand for exit jumps. Logs the current line number * and jumps to the "EXITIF_exit" label. * * @param cond condition that must be TRUE to exit with an error */ #define EXITIF(cond) \ do { \ if (cond) { GNUNET_break (0); goto EXITIF_exit; } \ } while (0) /** * Execute an SQL statement and log errors on failure. Must be * run in a function that has an "SQLEXEC_fail" label to jump * to in case the SQL statement failed. * * @param conn database connection * @param sql SQL statement to run */ #define SQLEXEC_(conn, sql) \ do { \ PGresult *result = PQexec (conn, sql); \ if (PGRES_COMMAND_OK != PQresultStatus (result)) \ { \ BREAK_DB_ERR (result); \ PQclear (result); \ goto SQLEXEC_fail; \ } \ PQclear (result); \ } while (0) /** * Run an SQL statement, ignoring errors and clearing the result. * * @param conn database connection * @param sql SQL statement to run */ #define SQLEXEC_IGNORE_ERROR_(conn, sql) \ do { \ PGresult *result = PQexec (conn, sql); \ PQclear (result); \ } while (0) /** * Handle for a database session (per-thread, for transactions). */ struct TALER_AUDITORDB_Session { /** * Postgres connection handle. */ PGconn *conn; }; /** * Type of the "cls" argument given to each of the functions in * our API. */ struct PostgresClosure { /** * Thread-local database connection. * Contains a pointer to `PGconn` or NULL. */ pthread_key_t db_conn_threadlocal; /** * Database connection string, as read from * the configuration. */ char *connection_cfg_str; }; /** * Function called by libpq whenever it wants to log something. * We already log whenever we care, so this function does nothing * and merely exists to silence the libpq logging. * * @param arg NULL * @param res information about some libpq event */ static void pq_notice_receiver_cb (void *arg, const PGresult *res) { /* do nothing, intentionally */ } /** * Function called by libpq whenever it wants to log something. * We log those using the Taler logger. * * @param arg NULL * @param message information about some libpq event */ static void pq_notice_processor_cb (void *arg, const char *message) { GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "pq", "%s", message); } /** * Establish connection to the Postgres database * and initialize callbacks for logging. * * @param pc configuration to use * @return NULL on error */ static PGconn * connect_to_postgres (struct PostgresClosure *pc) { PGconn *conn; conn = PQconnectdb (pc->connection_cfg_str); if (CONNECTION_OK != PQstatus (conn)) { TALER_LOG_ERROR ("Database connection failed: %s\n", PQerrorMessage (conn)); GNUNET_break (0); return NULL; } PQsetNoticeReceiver (conn, &pq_notice_receiver_cb, NULL); PQsetNoticeProcessor (conn, &pq_notice_processor_cb, NULL); return conn; } /** * Drop all Taler tables. This should only be used by testcases. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure */ static int postgres_drop_tables (void *cls) { struct PostgresClosure *pc = cls; PGconn *conn; conn = connect_to_postgres (pc); if (NULL == conn) return GNUNET_SYSERR; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Dropping ALL tables\n"); SQLEXEC_ (conn, "DROP TABLE IF EXISTS test;"); PQfinish (conn); return GNUNET_OK; SQLEXEC_fail: PQfinish (conn); return GNUNET_SYSERR; } /** * Create the necessary tables if they are not present * * @param cls the `struct PostgresClosure` with the plugin-specific state * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure */ static int postgres_create_tables (void *cls) { struct PostgresClosure *pc = cls; PGconn *conn; conn = connect_to_postgres (pc); if (NULL == conn) return GNUNET_SYSERR; #define SQLEXEC(sql) SQLEXEC_(conn, sql); #define SQLEXEC_INDEX(sql) SQLEXEC_IGNORE_ERROR_(conn, sql); SQLEXEC ("CREATE TABLE IF NOT EXISTS test" "(test_pub BYTEA NOT NULL CHECK (LENGTH(master_pub)=32" ")"); SQLEXEC_INDEX("CREATE INDEX testx " "ON test(test_pub)"); #undef SQLEXEC #undef SQLEXEC_INDEX PQfinish (conn); return GNUNET_OK; SQLEXEC_fail: PQfinish (conn); return GNUNET_SYSERR; } /** * Setup prepared statements. * * @param db_conn connection handle to initialize * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure */ static int postgres_prepare (PGconn *db_conn) { PGresult *result; #define PREPARE(name, sql, ...) \ do { \ result = PQprepare (db_conn, name, sql, __VA_ARGS__); \ if (PGRES_COMMAND_OK != PQresultStatus (result)) \ { \ BREAK_DB_ERR (result); \ PQclear (result); result = NULL; \ return GNUNET_SYSERR; \ } \ PQclear (result); result = NULL; \ } while (0); /* Used in #postgres_XXX() */ PREPARE ("test_insert", "INSERT INTO test " "(test_pub" ") VALUES " "($1);", 1, NULL); return GNUNET_OK; #undef PREPARE } /** * Close thread-local database connection when a thread is destroyed. * * @param cls closure we get from pthreads (the db handle) */ static void db_conn_destroy (void *cls) { struct TALER_AUDITORDB_Session *session = cls; PGconn *db_conn = session->conn; if (NULL != db_conn) PQfinish (db_conn); GNUNET_free (session); } /** * Get the thread-local database-handle. * Connect to the db if the connection does not exist yet. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @return the database connection, or NULL on error */ static struct TALER_AUDITORDB_Session * postgres_get_session (void *cls) { struct PostgresClosure *pc = cls; PGconn *db_conn; struct TALER_AUDITORDB_Session *session; if (NULL != (session = pthread_getspecific (pc->db_conn_threadlocal))) return session; db_conn = connect_to_postgres (pc); if (NULL == db_conn) return NULL; if (GNUNET_OK != postgres_prepare (db_conn)) { GNUNET_break (0); PQfinish (db_conn); return NULL; } session = GNUNET_new (struct TALER_AUDITORDB_Session); session->conn = db_conn; if (0 != pthread_setspecific (pc->db_conn_threadlocal, session)) { GNUNET_break (0); PQfinish (db_conn); GNUNET_free (session); return NULL; } return session; } /** * Start a transaction. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @param session the database connection * @return #GNUNET_OK on success */ static int postgres_start (void *cls, struct TALER_AUDITORDB_Session *session) { PGresult *result; result = PQexec (session->conn, "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"); if (PGRES_COMMAND_OK != PQresultStatus (result)) { TALER_LOG_ERROR ("Failed to start transaction: %s\n", PQresultErrorMessage (result)); GNUNET_break (0); PQclear (result); return GNUNET_SYSERR; } PQclear (result); return GNUNET_OK; } /** * Roll back the current transaction of a database connection. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @param session the database connection * @return #GNUNET_OK on success */ static void postgres_rollback (void *cls, struct TALER_AUDITORDB_Session *session) { PGresult *result; result = PQexec (session->conn, "ROLLBACK"); GNUNET_break (PGRES_COMMAND_OK == PQresultStatus (result)); PQclear (result); } /** * Commit the current transaction of a database connection. * * @param cls the `struct PostgresClosure` with the plugin-specific state * @param session the database connection * @return #GNUNET_OK on success */ static int postgres_commit (void *cls, struct TALER_AUDITORDB_Session *session) { PGresult *result; result = PQexec (session->conn, "COMMIT"); if (PGRES_COMMAND_OK != PQresultStatus (result)) { const char *sqlstate; sqlstate = PQresultErrorField (result, PG_DIAG_SQLSTATE); if (NULL == sqlstate) { /* very unexpected... */ GNUNET_break (0); PQclear (result); return GNUNET_SYSERR; } /* 40P01: deadlock, 40001: serialization failure */ if ( (0 == strcmp (sqlstate, "40P01")) || (0 == strcmp (sqlstate, "40001")) ) { /* These two can be retried and have a fair chance of working the next time */ PQclear (result); return GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Database commit failure: %s\n", sqlstate); PQclear (result); return GNUNET_SYSERR; } PQclear (result); return GNUNET_OK; } /** * Function called to perform "garbage collection" on the * database, expiring records we no longer require. * * @param cls closure * @return #GNUNET_OK on success, * #GNUNET_SYSERR on DB errors */ static int postgres_gc (void *cls) { struct PostgresClosure *pc = cls; struct GNUNET_TIME_Absolute now; struct GNUNET_PQ_QueryParam params_time[] = { GNUNET_PQ_query_param_absolute_time (&now), GNUNET_PQ_query_param_end }; PGconn *conn; PGresult *result; now = GNUNET_TIME_absolute_get (); conn = connect_to_postgres (pc); if (NULL == conn) return GNUNET_SYSERR; if (GNUNET_OK != postgres_prepare (conn)) { PQfinish (conn); return GNUNET_SYSERR; } result = GNUNET_PQ_exec_prepared (conn, "gc_auditor", params_time); if (PGRES_COMMAND_OK != PQresultStatus (result)) { BREAK_DB_ERR (result); PQclear (result); PQfinish (conn); return GNUNET_SYSERR; } PQclear (result); PQfinish (conn); return GNUNET_OK; } /** * Initialize Postgres database subsystem. * * @param cls a configuration instance * @return NULL on error, otherwise a `struct TALER_AUDITORDB_Plugin` */ void * libtaler_plugin_auditordb_postgres_init (void *cls) { struct GNUNET_CONFIGURATION_Handle *cfg = cls; struct PostgresClosure *pg; struct TALER_AUDITORDB_Plugin *plugin; const char *ec; pg = GNUNET_new (struct PostgresClosure); if (0 != pthread_key_create (&pg->db_conn_threadlocal, &db_conn_destroy)) { TALER_LOG_ERROR ("Cannnot create pthread key.\n"); GNUNET_free (pg); return NULL; } ec = getenv ("TALER_AUDITORDB_POSTGRES_CONFIG"); if (NULL != ec) { pg->connection_cfg_str = GNUNET_strdup (ec); } else { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "auditordb-postgres", "db_conn_str", &pg->connection_cfg_str)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "auditordb-postgres", "db_conn_str"); GNUNET_free (pg); return NULL; } } plugin = GNUNET_new (struct TALER_AUDITORDB_Plugin); plugin->cls = pg; plugin->get_session = &postgres_get_session; plugin->drop_tables = &postgres_drop_tables; plugin->create_tables = &postgres_create_tables; plugin->start = &postgres_start; plugin->commit = &postgres_commit; plugin->rollback = &postgres_rollback; plugin->gc = &postgres_gc; return plugin; } /** * Shutdown Postgres database subsystem. * * @param cls a `struct TALER_AUDITORDB_Plugin` * @return NULL (always) */ void * libtaler_plugin_auditordb_postgres_done (void *cls) { struct TALER_AUDITORDB_Plugin *plugin = cls; struct PostgresClosure *pg = plugin->cls; GNUNET_free (pg->connection_cfg_str); GNUNET_free (pg); GNUNET_free (plugin); return NULL; } /* end of plugin_auditordb_postgres.c */