diff options
Diffstat (limited to 'src/exchangedb')
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 102 |
1 files changed, 91 insertions, 11 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 1d05fb499..2d7ca0573 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -110,6 +110,39 @@ struct TALER_EXCHANGEDB_Session /** + * Event registration record. + */ +struct TALER_EXCHANGEDB_EventHandler +{ + /** + * Underlying GNUnet event handler. + */ + struct GNUNET_DB_EventHandler *geh; + + /** + * Entry in the heap. + */ + struct GNUNET_CONTAINER_HeapNode *hn; + + /** + * Our timeout. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Callback to invoke (on @e timeout). + */ + GNUNET_DB_EventCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + +}; + + +/** * Type of the "cls" argument given to each of the functions in * our API. */ @@ -133,6 +166,12 @@ struct PostgresClosure char *sql_dir; /** + * Heap of `struct TALER_EXCHANGEDB_EventHandler` + * by timeout. + */ + struct GNUNET_CONTAINER_Heap *event_heap; + + /** * After how long should idle reserves be closed? */ struct GNUNET_TIME_Relative idle_reserve_expiration_time; @@ -2832,18 +2871,41 @@ handle_events (void *cls) } }; nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2; + struct TALER_EXCHANGEDB_EventHandler *r; GNUNET_assert (0 == pthread_mutex_lock (&pg->event_lock)); while (0 != pg->listener_count) { int ret; + int timeout = -1; /* no timeout */ GNUNET_assert (0 == pthread_mutex_unlock (&pg->event_lock)); + while (1) + { + r = GNUNET_CONTAINER_heap_peek (pg->event_heap); + if (NULL == r) + break; + if (GNUNET_TIME_absolute_is_future (r->timeout)) + break; + GNUNET_assert (r == + GNUNET_CONTAINER_heap_remove_root (pg->event_heap)); + r->hn = NULL; + r->cb (r->cb_cls, + NULL, + 0); + } + if (NULL != r) + { + struct GNUNET_TIME_Relative rem; + + rem = GNUNET_TIME_absolute_get_remaining (r->timeout); + timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us; + } ret = poll (pfds, nfds, - -1 /* no timeout */); + timeout); if (-1 == ret) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "poll"); @@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls, * @param cb_cls closure for @a cb * @return handle useful to cancel the listener */ -static struct GNUNET_DB_EventHandler * +static struct TALER_EXCHANGEDB_EventHandler * postgres_event_listen (void *cls, - struct TALER_EXCHANGEDB_Session *session, + struct GNUNET_TIME_Relative timeout, const struct GNUNET_DB_EventHeaderP *es, GNUNET_DB_EventCallback cb, void *cb_cls) { struct PostgresClosure *pg = cls; - struct GNUNET_DB_EventHandler *eh; + struct TALER_EXCHANGEDB_EventHandler *eh; + struct TALER_EXCHANGEDB_Session *session; + session = postgres_get_session (pg); + eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler); + eh->cb = cb; + eh->cb_cls = cb_cls; + eh->timeout = GNUNET_TIME_relative_to_absolute (timeout); + eh->geh = GNUNET_PQ_event_listen (session->conn, + es, + cb, + cb_cls); + GNUNET_assert (NULL != eh->geh); + eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap, + eh, + eh->timeout.abs_value_us); GNUNET_assert (0 == pthread_mutex_lock (&pg->event_lock)); pg->listener_count++; @@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls, } GNUNET_assert (0 == pthread_mutex_unlock (&pg->event_lock)); - eh = GNUNET_PQ_event_listen (session->conn, - es, - cb, - cb_cls); - GNUNET_assert (NULL != eh); return eh; } @@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls, */ static void postgres_event_listen_cancel (void *cls, - struct GNUNET_DB_EventHandler *eh) + struct TALER_EXCHANGEDB_EventHandler *eh) { struct PostgresClosure *pg = cls; @@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls, } GNUNET_assert (0 == pthread_mutex_unlock (&pg->event_lock)); - GNUNET_PQ_event_listen_cancel (eh); + if (NULL != eh->hn) + { + GNUNET_CONTAINER_heap_remove_node (eh->hn); + eh->hn = NULL; + } + GNUNET_PQ_event_listen_cancel (eh->geh); + GNUNET_free (eh); } @@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) pg = GNUNET_new (struct PostgresClosure); pg->cfg = cfg; + pg->event_heap = GNUNET_CONTAINER_heap_create ( + GNUNET_CONTAINER_HEAP_ORDER_MIN); pg->main_self = pthread_self (); /* loaded while single-threaded! */ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg, @@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls) GNUNET_break (0 == close (pg->event_fd)); pthread_mutex_destroy (&pg->event_lock); + GNUNET_CONTAINER_heap_destroy (pg->event_heap); GNUNET_free (pg->sql_dir); GNUNET_free (pg->currency); GNUNET_free (pg); |