aboutsummaryrefslogtreecommitdiff
path: root/src/exchangedb
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb')
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c102
1 files changed, 91 insertions, 11 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index 1d05fb499..2d7ca0573 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -110,6 +110,39 @@ struct TALER_EXCHANGEDB_Session
/**
+ * Event registration record.
+ */
+struct TALER_EXCHANGEDB_EventHandler
+{
+ /**
+ * Underlying GNUnet event handler.
+ */
+ struct GNUNET_DB_EventHandler *geh;
+
+ /**
+ * Entry in the heap.
+ */
+ struct GNUNET_CONTAINER_HeapNode *hn;
+
+ /**
+ * Our timeout.
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * Callback to invoke (on @e timeout).
+ */
+ GNUNET_DB_EventCallback cb;
+
+ /**
+ * Closure for @e cb.
+ */
+ void *cb_cls;
+
+};
+
+
+/**
* Type of the "cls" argument given to each of the functions in
* our API.
*/
@@ -133,6 +166,12 @@ struct PostgresClosure
char *sql_dir;
/**
+ * Heap of `struct TALER_EXCHANGEDB_EventHandler`
+ * by timeout.
+ */
+ struct GNUNET_CONTAINER_Heap *event_heap;
+
+ /**
* After how long should idle reserves be closed?
*/
struct GNUNET_TIME_Relative idle_reserve_expiration_time;
@@ -2832,18 +2871,41 @@ handle_events (void *cls)
}
};
nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2;
+ struct TALER_EXCHANGEDB_EventHandler *r;
GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock));
while (0 != pg->listener_count)
{
int ret;
+ int timeout = -1; /* no timeout */
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
+ while (1)
+ {
+ r = GNUNET_CONTAINER_heap_peek (pg->event_heap);
+ if (NULL == r)
+ break;
+ if (GNUNET_TIME_absolute_is_future (r->timeout))
+ break;
+ GNUNET_assert (r ==
+ GNUNET_CONTAINER_heap_remove_root (pg->event_heap));
+ r->hn = NULL;
+ r->cb (r->cb_cls,
+ NULL,
+ 0);
+ }
+ if (NULL != r)
+ {
+ struct GNUNET_TIME_Relative rem;
+
+ rem = GNUNET_TIME_absolute_get_remaining (r->timeout);
+ timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us;
+ }
ret = poll (pfds,
nfds,
- -1 /* no timeout */);
+ timeout);
if (-1 == ret)
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"poll");
@@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls,
* @param cb_cls closure for @a cb
* @return handle useful to cancel the listener
*/
-static struct GNUNET_DB_EventHandler *
+static struct TALER_EXCHANGEDB_EventHandler *
postgres_event_listen (void *cls,
- struct TALER_EXCHANGEDB_Session *session,
+ struct GNUNET_TIME_Relative timeout,
const struct GNUNET_DB_EventHeaderP *es,
GNUNET_DB_EventCallback cb,
void *cb_cls)
{
struct PostgresClosure *pg = cls;
- struct GNUNET_DB_EventHandler *eh;
+ struct TALER_EXCHANGEDB_EventHandler *eh;
+ struct TALER_EXCHANGEDB_Session *session;
+ session = postgres_get_session (pg);
+ eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler);
+ eh->cb = cb;
+ eh->cb_cls = cb_cls;
+ eh->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ eh->geh = GNUNET_PQ_event_listen (session->conn,
+ es,
+ cb,
+ cb_cls);
+ GNUNET_assert (NULL != eh->geh);
+ eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap,
+ eh,
+ eh->timeout.abs_value_us);
GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock));
pg->listener_count++;
@@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls,
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
- eh = GNUNET_PQ_event_listen (session->conn,
- es,
- cb,
- cb_cls);
- GNUNET_assert (NULL != eh);
return eh;
}
@@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls,
*/
static void
postgres_event_listen_cancel (void *cls,
- struct GNUNET_DB_EventHandler *eh)
+ struct TALER_EXCHANGEDB_EventHandler *eh)
{
struct PostgresClosure *pg = cls;
@@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls,
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
- GNUNET_PQ_event_listen_cancel (eh);
+ if (NULL != eh->hn)
+ {
+ GNUNET_CONTAINER_heap_remove_node (eh->hn);
+ eh->hn = NULL;
+ }
+ GNUNET_PQ_event_listen_cancel (eh->geh);
+ GNUNET_free (eh);
}
@@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
pg = GNUNET_new (struct PostgresClosure);
pg->cfg = cfg;
+ pg->event_heap = GNUNET_CONTAINER_heap_create (
+ GNUNET_CONTAINER_HEAP_ORDER_MIN);
pg->main_self = pthread_self (); /* loaded while single-threaded! */
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_filename (cfg,
@@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls)
GNUNET_break (0 ==
close (pg->event_fd));
pthread_mutex_destroy (&pg->event_lock);
+ GNUNET_CONTAINER_heap_destroy (pg->event_heap);
GNUNET_free (pg->sql_dir);
GNUNET_free (pg->currency);
GNUNET_free (pg);