aboutsummaryrefslogtreecommitdiff
path: root/src/bank-lib/fakebank.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-08-12 19:07:28 +0200
committerChristian Grothoff <christian@grothoff.org>2021-08-12 19:07:28 +0200
commit777dd74b16064c91068e617a6fd39b2800fc0588 (patch)
tree85c7281309b197d98cdd9464901a530088b4ebf8 /src/bank-lib/fakebank.c
parent0dadc24adac1b9604e1337e542572febd8e6329f (diff)
implement long-polling in fakebank
Diffstat (limited to 'src/bank-lib/fakebank.c')
-rw-r--r--src/bank-lib/fakebank.c716
1 files changed, 636 insertions, 80 deletions
diff --git a/src/bank-lib/fakebank.c b/src/bank-lib/fakebank.c
index ecb5934e6..580012b02 100644
--- a/src/bank-lib/fakebank.c
+++ b/src/bank-lib/fakebank.c
@@ -21,11 +21,12 @@
* @brief library that fakes being a Taler bank for testcases
* @author Christian Grothoff <christian@grothoff.org>
*/
-// TODO: support long polling
// TODO: support adding WAD transfers
#include "platform.h"
#include <pthread.h>
+#include <poll.h>
+#include <sys/eventfd.h>
#include "taler_fakebank_lib.h"
#include "taler_bank_service.h"
#include "taler_mhd_lib.h"
@@ -44,6 +45,73 @@
#define MAX_URL_LEN 64
/**
+ * Per account information.
+ */
+struct Account;
+
+
+/**
+ * Types of long polling activities.
+ */
+enum LongPollType
+{
+ /**
+ * Transfer TO the exchange.
+ */
+ LP_CREDIT,
+
+ /**
+ * Transfer FROM the exchange.
+ */
+ LP_DEBIT
+
+};
+
+/**
+ * Client waiting for activity on this account.
+ */
+struct LongPoller
+{
+
+ /**
+ * Kept in a DLL.
+ */
+ struct LongPoller *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct LongPoller *prev;
+
+ /**
+ * Account this long poller is waiting on.
+ */
+ struct Account *account;
+
+ /**
+ * Entry in the heap for this long poller.
+ */
+ struct GNUNET_CONTAINER_HeapNode *hn;
+
+ /**
+ * Client that is waiting for transactions.
+ */
+ struct MHD_Connection *conn;
+
+ /**
+ * When will this long poller time out?
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * What does the @e connection wait for?
+ */
+ enum LongPollType type;
+
+};
+
+
+/**
* Details about a transcation we (as the simulated bank) received.
*/
struct Transaction;
@@ -75,6 +143,16 @@ struct Account
struct Transaction *out_tail;
/**
+ * Kept in a DLL.
+ */
+ struct LongPoller *lp_head;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct LongPoller *lp_tail;
+
+ /**
* Account name (string, not payto!)
*/
char *account_name;
@@ -257,6 +335,23 @@ struct TALER_FAKEBANK_Handle
struct GNUNET_SCHEDULER_Task *mhd_task;
/**
+ * Task for expiring long-polling connections,
+ * unless we are using a thread pool (then NULL).
+ */
+ struct GNUNET_SCHEDULER_Task *lp_task;
+
+ /**
+ * Task for expiring long-polling connections, unless we are using the
+ * GNUnet scheduler (then NULL).
+ */
+ pthread_t lp_thread;
+
+ /**
+ * MIN-heap of long pollers, sorted by timeout.
+ */
+ struct GNUNET_CONTAINER_Heap *lp_heap;
+
+ /**
* Hashmap of reserve public keys to
* `struct Transaction` with that reserve public
* key. Used to prevent public-key re-use.
@@ -319,6 +414,17 @@ struct TALER_FAKEBANK_Handle
*/
uint16_t port;
+ /**
+ * Event FD to signal @a lp_thread a change in
+ * @a lp_heap.
+ */
+ int lp_event;
+
+ /**
+ * Set to true once we are shutting down.
+ */
+ bool in_shutdown;
+
#if EPOLL_SUPPORT
/**
* Boxed @e mhd_fd.
@@ -334,6 +440,145 @@ struct TALER_FAKEBANK_Handle
/**
+ * Special address "con_cls" can point to to indicate that the handler has
+ * been called more than once already (was previously suspended).
+ */
+static int special_ptr;
+
+
+/**
+ * Task run whenever HTTP server operations are pending.
+ *
+ * @param cls the `struct TALER_FAKEBANK_Handle`
+ */
+static void
+run_mhd (void *cls);
+
+
+/**
+ * Trigger the @a lp. Frees associated resources,
+ * except the entry of @a lp in the timeout heap.
+ * Must be called while the ``big lock`` is held.
+ *
+ * @param[in] lp long poller to trigger
+ * @param[in,out] h fakebank handle
+ */
+static void
+lp_trigger (struct LongPoller *lp,
+ struct TALER_FAKEBANK_Handle *h)
+{
+ struct Account *acc = lp->account;
+
+ GNUNET_CONTAINER_DLL_remove (acc->lp_head,
+ acc->lp_tail,
+ lp);
+ MHD_resume_connection (lp->conn);
+ GNUNET_free (lp);
+ if (NULL != h->mhd_task)
+ GNUNET_SCHEDULER_cancel (h->mhd_task);
+ h->mhd_task =
+ GNUNET_SCHEDULER_add_now (&run_mhd,
+ h);
+}
+
+
+/**
+ * Thread that is run to wake up connections that have hit their timeout. Runs
+ * until in_shutdown is set to true. Must be send signals via lp_event on
+ * shutdown and/or whenever the heap changes to an earlier timeout.
+ *
+ * @param cls a `struct TALER_FAKEBANK_Handle *`
+ * @return NULL
+ */
+static void *
+lp_expiration_thread (void *cls)
+{
+ struct TALER_FAKEBANK_Handle *h = cls;
+
+ GNUNET_assert (0 ==
+ pthread_mutex_lock (&h->big_lock));
+ while (! h->in_shutdown)
+ {
+ struct LongPoller *lp;
+ int timeout_ms;
+
+ lp = GNUNET_CONTAINER_heap_peek (h->lp_heap);
+ while ( (NULL != lp) &&
+ GNUNET_TIME_absolute_is_past (lp->timeout))
+ {
+ GNUNET_assert (lp ==
+ GNUNET_CONTAINER_heap_remove_root (h->lp_heap));
+ GNUNET_assert (0 ==
+ pthread_mutex_lock (&h->big_lock));
+ lp_trigger (lp,
+ h);
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ lp = GNUNET_CONTAINER_heap_peek (h->lp_heap);
+ }
+ if (NULL != lp)
+ {
+ struct GNUNET_TIME_Relative rem;
+ unsigned long long left_ms;
+
+ rem = GNUNET_TIME_absolute_get_remaining (lp->timeout);
+ left_ms = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us;
+ if (left_ms > INT_MAX)
+ timeout_ms = INT_MAX;
+ else
+ timeout_ms = (int) left_ms;
+ }
+ else
+ {
+ timeout_ms = -1; /* infinity */
+ }
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ {
+ struct pollfd p = {
+ .fd = h->lp_event,
+ .events = POLLIN
+ };
+ int ret;
+
+ ret = poll (&p,
+ 1,
+ timeout_ms);
+ if (-1 == ret)
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+ "poll");
+ }
+ else if (1 == ret)
+ {
+ /* clear event */
+ uint64_t ev;
+ ssize_t iret;
+
+ iret = read (h->lp_event,
+ &ev,
+ sizeof (ev));
+ if (-1 == iret)
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+ "read");
+ }
+ else
+ {
+ GNUNET_break (sizeof (uint64_t) == iret);
+ }
+ }
+ }
+ GNUNET_assert (0 ==
+ pthread_mutex_lock (&h->big_lock));
+ }
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ return NULL;
+}
+
+
+/**
* Lookup account with @a name, and if it does not exist, create it.
*
* @param[in,out] h bank to lookup account at
@@ -626,6 +871,36 @@ post_transaction (struct TALER_FAKEBANK_Handle *h,
ca->in_tail,
old);
}
+ {
+ struct LongPoller *nxt;
+
+ for (struct LongPoller *lp = debit_acc->lp_head;
+ NULL != lp;
+ lp = nxt)
+ {
+ nxt = lp->next;
+ if (LP_DEBIT == lp->type)
+ {
+ GNUNET_assert (lp ==
+ GNUNET_CONTAINER_heap_remove_node (lp->hn));
+ lp_trigger (lp,
+ h);
+ }
+ }
+ for (struct LongPoller *lp = credit_acc->lp_head;
+ NULL != lp;
+ lp = nxt)
+ {
+ nxt = lp->next;
+ if (LP_CREDIT == lp->type)
+ {
+ GNUNET_assert (lp ==
+ GNUNET_CONTAINER_heap_remove_node (lp->hn));
+ lp_trigger (lp,
+ h);
+ }
+ }
+ }
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->big_lock));
if ( (NULL != old) &&
@@ -884,6 +1159,7 @@ free_account (void *cls,
{
struct Account *account = val;
+ GNUNET_assert (NULL == account->lp_head);
GNUNET_free (account->account_name);
GNUNET_free (account);
return GNUNET_OK;
@@ -898,6 +1174,11 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
GNUNET_SCHEDULER_cancel (h->mhd_task);
h->mhd_task = NULL;
}
+ if (NULL != h->lp_task)
+ {
+ GNUNET_SCHEDULER_cancel (h->lp_task);
+ h->lp_task = NULL;
+ }
#if EPOLL_SUPPORT
if (NULL != h->mhd_rfd)
{
@@ -910,6 +1191,39 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
MHD_stop_daemon (h->mhd_bank);
h->mhd_bank = NULL;
}
+ if (-1 != h->lp_event)
+ {
+ uint64_t val = 1;
+ void *ret;
+ struct LongPoller *lp;
+
+ GNUNET_assert (0 ==
+ pthread_mutex_lock (&h->big_lock));
+ h->in_shutdown = true;
+ while (NULL != (lp = GNUNET_CONTAINER_heap_remove_root (h->lp_heap)))
+ lp_trigger (lp,
+ h);
+ GNUNET_break (sizeof (val) ==
+ write (h->lp_event,
+ &val,
+ sizeof (val)));
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ GNUNET_break (0 ==
+ pthread_join (h->lp_thread,
+ &ret));
+ GNUNET_break (NULL == ret);
+ GNUNET_break (0 == close (h->lp_event));
+ h->lp_event = -1;
+ }
+ else
+ {
+ struct LongPoller *lp;
+
+ while (NULL != (lp = GNUNET_CONTAINER_heap_remove_root (h->lp_heap)))
+ lp_trigger (lp,
+ h);
+ }
if (NULL != h->accounts)
{
GNUNET_CONTAINER_multihashmap_iterate (h->accounts,
@@ -919,6 +1233,7 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
}
GNUNET_CONTAINER_multihashmap_destroy (h->uuid_map);
GNUNET_CONTAINER_multipeermap_destroy (h->rpubs);
+ GNUNET_CONTAINER_heap_destroy (h->lp_heap);
GNUNET_assert (0 ==
pthread_mutex_destroy (&h->big_lock));
GNUNET_assert (0 ==
@@ -960,6 +1275,10 @@ handle_mhd_completion_callback (void *cls,
(void) cls;
(void) connection;
(void) toe;
+ if (NULL == *con_cls)
+ return;
+ if (&special_ptr == *con_cls)
+ return;
GNUNET_JSON_post_parser_cleanup (*con_cls);
*con_cls = NULL;
}
@@ -988,7 +1307,6 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle *h,
json_t *json;
uint64_t row_id;
struct GNUNET_TIME_Absolute timestamp;
- enum GNUNET_GenericReturnValue ret;
pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX,
connection,
@@ -1017,6 +1335,7 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle *h,
struct TALER_Amount amount;
struct TALER_ReservePublicKeyP reserve_pub;
char *debit;
+ enum GNUNET_GenericReturnValue ret;
struct GNUNET_JSON_Specification spec[] = {
GNUNET_JSON_spec_fixed_auto ("reserve_pub",
&reserve_pub),
@@ -1029,14 +1348,13 @@ handle_admin_add_incoming (struct TALER_FAKEBANK_Handle *h,
};
if (GNUNET_OK !=
- GNUNET_JSON_parse (json,
- spec,
- NULL, NULL))
+ (ret = TALER_MHD_parse_json_data (connection,
+ json,
+ spec)))
{
- GNUNET_break (0);
+ GNUNET_break_op (0);
json_decref (json);
- /* We're fakebank, no need for nice error handling */
- return MHD_NO;
+ return (GNUNET_NO == ret) ? MHD_YES : MHD_NO;
}
if (0 != strcasecmp (amount.currency,
h->currency))
@@ -1141,6 +1459,7 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h,
char *credit;
const char *base_url;
struct TALER_Amount amount;
+ enum GNUNET_GenericReturnValue ret;
struct GNUNET_JSON_Specification spec[] = {
GNUNET_JSON_spec_fixed_auto ("request_uid",
&uuid),
@@ -1157,14 +1476,13 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h,
};
if (GNUNET_OK !=
- GNUNET_JSON_parse (json,
- spec,
- NULL, NULL))
+ (ret = TALER_MHD_parse_json_data (connection,
+ json,
+ spec)))
{
- GNUNET_break (0);
+ GNUNET_break_op (0);
json_decref (json);
- /* We are fakebank, no need for nice error handling */
- return MHD_NO;
+ return (GNUNET_NO == ret) ? MHD_YES : MHD_NO;
}
{
int ret;
@@ -1223,20 +1541,17 @@ handle_transfer (struct TALER_FAKEBANK_Handle *h,
*
* @param h the fakebank handle
* @param connection the connection
- * @param con_cls place to store state, not used
* @return MHD result code
*/
static MHD_RESULT
handle_home_page (struct TALER_FAKEBANK_Handle *h,
- struct MHD_Connection *connection,
- void **con_cls)
+ struct MHD_Connection *connection)
{
MHD_RESULT ret;
struct MHD_Response *resp;
#define HELLOMSG "Hello, Fakebank!"
(void) h;
- (void) con_cls;
resp = MHD_create_response_from_buffer (
strlen (HELLOMSG),
HELLOMSG,
@@ -1292,9 +1607,11 @@ struct HistoryArgs
* @param h bank handle to work on
* @param connection MHD connection.
* @param[out] ha will contain the parsed values.
- * @return #GNUNET_OK only if the parsing succeeds.
+ * @return #GNUNET_OK only if the parsing succeeds,
+ * #GNUNET_SYSERR if it failed,
+ * #GNUNET_NO if it failed and an error was returned
*/
-static int
+static enum GNUNET_GenericReturnValue
parse_history_common_args (const struct TALER_FAKEBANK_Handle *h,
struct MHD_Connection *connection,
struct HistoryArgs *ha)
@@ -1305,6 +1622,7 @@ parse_history_common_args (const struct TALER_FAKEBANK_Handle *h,
unsigned long long lp_timeout;
unsigned long long sval;
long long d;
+ char dummy;
start = MHD_lookup_connection_value (connection,
MHD_GET_ARGUMENT_KIND,
@@ -1319,23 +1637,60 @@ parse_history_common_args (const struct TALER_FAKEBANK_Handle *h,
lp_timeout = 0;
if ( (NULL == delta) ||
(1 != sscanf (delta,
- "%lld",
- &d)) ||
- ( (NULL != long_poll_ms) &&
- (1 != sscanf (long_poll_ms,
- "%llu",
- &lp_timeout)) ) ||
- ( (NULL != start) &&
- (1 != sscanf (start,
- "%llu",
- &sval)) ) )
+ "%lld%c",
+ &d,
+ &dummy)) )
{
/* Fail if one of the above failed. */
/* Invalid request, given that this is fakebank we impolitely
* just kill the connection instead of returning a nice error.
*/
- GNUNET_break (0);
- return GNUNET_NO;
+ GNUNET_break_op (0);
+ return (MHD_YES ==
+ TALER_MHD_reply_with_error (connection,
+ MHD_HTTP_BAD_REQUEST,
+ TALER_EC_GENERIC_PARAMETER_MALFORMED,
+ "delta"))
+ ? GNUNET_NO
+ : GNUNET_SYSERR;
+ }
+ if ( (NULL != long_poll_ms) &&
+ (1 != sscanf (long_poll_ms,
+ "%llu%c",
+ &lp_timeout,
+ &dummy)) )
+ {
+ /* Fail if one of the above failed. */
+ /* Invalid request, given that this is fakebank we impolitely
+ * just kill the connection instead of returning a nice error.
+ */
+ GNUNET_break_op (0);
+ return (MHD_YES ==
+ TALER_MHD_reply_with_error (connection,
+ MHD_HTTP_BAD_REQUEST,
+ TALER_EC_GENERIC_PARAMETER_MALFORMED,
+ "long_poll_ms"))
+ ? GNUNET_NO
+ : GNUNET_SYSERR;
+ }
+ if ( (NULL != start) &&
+ (1 != sscanf (start,
+ "%llu%c",
+ &sval,
+ &dummy)) )
+ {
+ /* Fail if one of the above failed. */
+ /* Invalid request, given that this is fakebank we impolitely
+ * just kill the connection instead of returning a nice error.
+ */
+ GNUNET_break_op (0);
+ return (MHD_YES ==
+ TALER_MHD_reply_with_error (connection,
+ MHD_HTTP_BAD_REQUEST,
+ TALER_EC_GENERIC_PARAMETER_MALFORMED,
+ "start"))
+ ? GNUNET_NO
+ : GNUNET_SYSERR;
}
if (NULL == start)
ha->start_idx = (d > 0) ? 0 : h->serial_counter;
@@ -1344,8 +1699,14 @@ parse_history_common_args (const struct TALER_FAKEBANK_Handle *h,
ha->delta = (int64_t) d;
if (0 == ha->delta)
{
- GNUNET_break (0);
- return GNUNET_NO;
+ GNUNET_break_op (0);
+ return (MHD_YES ==
+ TALER_MHD_reply_with_error (connection,
+ MHD_HTTP_BAD_REQUEST,
+ TALER_EC_GENERIC_PARAMETER_MALFORMED,
+ "delta"))
+ ? GNUNET_NO
+ : GNUNET_SYSERR;
}
ha->lp_timeout
= GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
@@ -1359,33 +1720,146 @@ parse_history_common_args (const struct TALER_FAKEBANK_Handle *h,
/**
+ * Task run when a long poller is about to time out.
+ * Only used in single-threaded mode.
+ *
+ * @param cls a `struct TALER_FAKEBANK_Handle *`
+ */
+static void
+lp_timeout (void *cls)
+{
+ struct TALER_FAKEBANK_Handle *h = cls;
+ struct LongPoller *lp;
+
+ h->lp_task = NULL;
+ while (NULL != (lp = GNUNET_CONTAINER_heap_peek (h->lp_heap)))
+ {
+ if (GNUNET_TIME_absolute_is_future (lp->timeout))
+ break;
+ GNUNET_assert (lp ==
+ GNUNET_CONTAINER_heap_remove_root (h->lp_heap));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Timeout reached for long poller %p\n",
+ lp->conn);
+ lp_trigger (lp,
+ h);
+ }
+ if (NULL == lp)
+ return;
+ h->lp_task = GNUNET_SCHEDULER_add_at (lp->timeout,
+ &lp_timeout,
+ h);
+}
+
+
+/**
+ * Reschedule the timeout task of @a h for time @a t.
+ *
+ * @param h fakebank handle
+ * @param t when will the next connection timeout expire
+ */
+static void
+reschedule_lp_timeout (struct TALER_FAKEBANK_Handle *h,
+ struct GNUNET_TIME_Absolute t)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Scheduling timeout task for %s\n",
+ GNUNET_STRINGS_absolute_time_to_string (t));
+ if (-1 != h->lp_event)
+ {
+ uint64_t num = 1;
+
+ GNUNET_break (sizeof (num) ==
+ write (h->lp_event,
+ &num,
+ sizeof (num)));
+ }
+ else
+ {
+ if (NULL != h->lp_task)
+ GNUNET_SCHEDULER_cancel (h->lp_task);
+ h->lp_task = GNUNET_SCHEDULER_add_at (t,
+ &lp_timeout,
+ h);
+ }
+}
+
+
+/**
+ * Start long-polling for @a connection and @a acc
+ * for transfers in @a dir. Must be called with the
+ * "big lock" held.
+ *
+ * @param[in,out] h fakebank handle
+ * @param[in,out] connection to suspend
+ * @param[in,out] acc account affected
+ * @param lp_timeout how long to suspend
+ * @param dir direction of transfers to watch for
+ */
+static void
+start_lp (struct TALER_FAKEBANK_Handle *h,
+ struct MHD_Connection *connection,
+ struct Account *acc,
+ struct GNUNET_TIME_Relative lp_timeout,
+ enum LongPollType dir)
+{
+ struct LongPoller *lp;
+ bool toc;
+
+ lp = GNUNET_new (struct LongPoller);
+ lp->account = acc;
+ lp->conn = connection;
+ lp->timeout = GNUNET_TIME_relative_to_absolute (lp_timeout);
+ lp->type = dir;
+ lp->hn = GNUNET_CONTAINER_heap_insert (h->lp_heap,
+ lp,
+ lp->timeout.abs_value_us);
+ toc = (lp ==
+ GNUNET_CONTAINER_heap_peek (h->lp_heap));
+ GNUNET_CONTAINER_DLL_insert (acc->lp_head,
+ acc->lp_tail,
+ lp);
+ MHD_suspend_connection (connection);
+ if (toc)
+ reschedule_lp_timeout (h,
+ lp->timeout);
+
+}
+
+
+/**
* Handle incoming HTTP request for /history/outgoing
*
* @param h the fakebank handle
* @param connection the connection
* @param account which account the request is about
- * @return MHD result code
+ * @param con_cls closure for request (NULL or &special_ptr)
*/
static MHD_RESULT
handle_debit_history (struct TALER_FAKEBANK_Handle *h,
struct MHD_Connection *connection,
- const char *account)
+ const char *account,
+ void **con_cls)
{
struct HistoryArgs ha;
struct Account *acc;
struct Transaction *pos;
json_t *history;
char *debit_payto;
+ enum GNUNET_GenericReturnValue ret;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Handling /history/outgoing connection %p\n",
+ connection);
if (GNUNET_OK !=
- parse_history_common_args (h,
- connection,
- &ha))
+ (ret = parse_history_common_args (h,
+ connection,
+ &ha)))
{
- GNUNET_break (0);
- return MHD_NO;
+ return (GNUNET_SYSERR == ret) ? MHD_NO : MHD_YES;
}
-
+ if (&special_ptr == *con_cls)
+ ha.lp_timeout = GNUNET_TIME_UNIT_ZERO;
acc = lookup_account (h,
account);
GNUNET_asprintf (&debit_payto,
@@ -1430,16 +1904,29 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
if ( (NULL == t) ||
overflow)
{
+ GNUNET_free (debit_payto);
+ if (GNUNET_TIME_relative_is_zero (ha.lp_timeout) &&
+ (0 < ha.delta))
+ {
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ return TALER_MHD_REPLY_JSON_PACK (
+ connection,
+ MHD_HTTP_OK,
+ GNUNET_JSON_pack_array_steal (
+ "outgoing_transactions",
+ history));
+ }
+ *con_cls = &special_ptr;
+ start_lp (h,
+ connection,
+ acc,
+ ha.lp_timeout,
+ LP_DEBIT);
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->big_lock));
- GNUNET_free (debit_payto);
- /* FIXME: suspend for long-polling instead */
- return TALER_MHD_REPLY_JSON_PACK (
- connection,
- MHD_HTTP_OK,
- GNUNET_JSON_pack_array_steal (
- "outgoing_transactions",
- history));
+ json_decref (history);
+ return MHD_YES;
}
if (t->debit_account != acc)
{
@@ -1524,6 +2011,21 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
if (0 < ha.delta)
pos = pos->next_out;
}
+ if ( (0 == json_array_size (history)) &&
+ (! GNUNET_TIME_relative_is_zero (ha.lp_timeout)) &&
+ (0 < ha.delta))
+ {
+ *con_cls = &special_ptr;
+ start_lp (h,
+ connection,
+ acc,
+ ha.lp_timeout,
+ LP_DEBIT);
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ json_decref (history);
+ return MHD_YES;
+ }
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->big_lock));
GNUNET_free (debit_payto);
@@ -1546,22 +2048,29 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
static MHD_RESULT
handle_credit_history (struct TALER_FAKEBANK_Handle *h,
struct MHD_Connection *connection,
- const char *account)
+ const char *account,
+ void **con_cls)
{
struct HistoryArgs ha;
struct Account *acc;
const struct Transaction *pos;
json_t *history;
char *credit_payto;
+ enum GNUNET_GenericReturnValue ret;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Handling /history/incoming connection %p\n",
+ connection);
if (GNUNET_OK !=
- parse_history_common_args (h,
- connection,
- &ha))
+ (ret = parse_history_common_args (h,
+ connection,
+ &ha)))
{
- GNUNET_break (0);
- return MHD_NO;
+ return (GNUNET_SYSERR == ret) ? MHD_NO : MHD_YES;
}
+ if (&special_ptr == *con_cls)
+ ha.lp_timeout = GNUNET_TIME_UNIT_ZERO;
+ *con_cls = &special_ptr;
acc = lookup_account (h,
account);
history = json_array ();
@@ -1601,15 +2110,28 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
if ( (NULL == t) ||
overflow)
{
+ GNUNET_free (credit_payto);
+ if (GNUNET_TIME_relative_is_zero (ha.lp_timeout) &&
+ (0 < ha.delta))
+ {
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ return TALER_MHD_REPLY_JSON_PACK (connection,
+ MHD_HTTP_OK,
+ GNUNET_JSON_pack_array_steal (
+ "incoming_transactions",
+ history));
+ }
+ *con_cls = &special_ptr;
+ start_lp (h,
+ connection,
+ acc,
+ ha.lp_timeout,
+ LP_CREDIT);
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->big_lock));
- GNUNET_free (credit_payto);
- /* FIXME: suspend for long-polling instead */
- return TALER_MHD_REPLY_JSON_PACK (connection,
- MHD_HTTP_OK,
- GNUNET_JSON_pack_array_steal (
- "incoming_transactions",
- history));
+ json_decref (history);
+ return MHD_YES;
}
if (skip)
{
@@ -1681,6 +2203,21 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
if (0 < ha.delta)
pos = pos->next_in;
}
+ if ( (0 == json_array_size (history)) &&
+ (! GNUNET_TIME_relative_is_zero (ha.lp_timeout)) &&
+ (0 < ha.delta))
+ {
+ *con_cls = &special_ptr;
+ start_lp (h,
+ connection,
+ acc,
+ ha.lp_timeout,
+ LP_CREDIT);
+ GNUNET_assert (0 ==
+ pthread_mutex_unlock (&h->big_lock));
+ json_decref (history);
+ return MHD_YES;
+ }
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->big_lock));
GNUNET_free (credit_payto);
@@ -1702,7 +2239,7 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
* @param account which account should process the request
* @param upload_data request data
* @param upload_data_size size of @a upload_data in bytes
- * @param con_cls closure for request (a `struct Buffer *`)
+ * @param con_cls closure
* @return MHD result code
*/
static MHD_RESULT
@@ -1727,18 +2264,19 @@ serve (struct TALER_FAKEBANK_Handle *h,
(NULL != account) )
return handle_credit_history (h,
connection,
- account);
+ account,
+ con_cls);
if ( (0 == strcmp (url,
"/history/outgoing")) &&
(NULL != account) )
return handle_debit_history (h,
connection,
- account);
+ account,
+ con_cls);
if (0 == strcmp (url,
"/"))
return handle_home_page (h,
- connection,
- con_cls);
+ connection);
}
else if (0 == strcasecmp (method,
MHD_HTTP_METHOD_POST))
@@ -1762,12 +2300,15 @@ serve (struct TALER_FAKEBANK_Handle *h,
con_cls);
}
/* Unexpected URL path, just close the connection. */
- /* We're rather impolite here, but it's a testcase. */
TALER_LOG_ERROR ("Breaking URL: %s %s\n",
method,
url);
GNUNET_break_op (0);
- return MHD_NO;
+ return TALER_MHD_reply_with_error (
+ connection,
+ MHD_HTTP_NOT_FOUND,
+ TALER_EC_GENERIC_ENDPOINT_UNKNOWN,
+ url);
}
@@ -1781,7 +2322,7 @@ serve (struct TALER_FAKEBANK_Handle *h,
* @param version HTTP version (ignored)
* @param upload_data request data
* @param upload_data_size size of @a upload_data in bytes
- * @param con_cls closure for request (a `struct Buffer *`)
+ * @param con_cls closure for request
* @return MHD result code
*/
static MHD_RESULT
@@ -1823,15 +2364,6 @@ handle_mhd_request (void *cls,
}
-/**
- * Task run whenever HTTP server operations are pending.
- *
- * @param cls the `struct TALER_FAKEBANK_Handle`
- */
-static void
-run_mhd (void *cls);
-
-
#if EPOLL_SUPPORT
/**
* Schedule MHD. This function should be called initially when an
@@ -1982,6 +2514,7 @@ TALER_FAKEBANK_start2 (uint16_t port,
}
GNUNET_assert (strlen (currency) < TALER_CURRENCY_LEN);
h = GNUNET_new (struct TALER_FAKEBANK_Handle);
+ h->lp_event = -1;
h->port = port;
h->ram_limit = ram_limit;
h->serial_counter = 0;
@@ -2027,6 +2560,7 @@ TALER_FAKEBANK_start2 (uint16_t port,
TALER_FAKEBANK_stop (h);
return NULL;
}
+ h->lp_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
h->currency = GNUNET_strdup (currency);
GNUNET_asprintf (&h->my_baseurl,
"http://localhost:%u/",
@@ -2061,6 +2595,28 @@ TALER_FAKEBANK_start2 (uint16_t port,
}
else
{
+ h->lp_event = eventfd (0,
+ EFD_CLOEXEC);
+ if (-1 == h->lp_event)
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
+ "eventfd");
+ TALER_FAKEBANK_stop (h);
+ return NULL;
+ }
+ if (0 !=
+ pthread_create (&h->lp_thread,
+ NULL,
+ &lp_expiration_thread,
+ h))
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
+ "pthread_create");
+ GNUNET_break (0 == close (h->lp_event));
+ h->lp_event = -1;
+ TALER_FAKEBANK_stop (h);
+ return NULL;
+ }
h->mhd_bank = MHD_start_daemon (MHD_USE_DEBUG
| MHD_USE_AUTO_INTERNAL_THREAD
| MHD_ALLOW_SUSPEND_RESUME