/*
This file is part of TALER
(C) 2019-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
TALER; see the file COPYING. If not, see
*/
/**
* @file taler-merchant-httpd_private-get-orders.c
* @brief implement GET /orders
* @author Christian Grothoff
*/
#include "platform.h"
#include "taler-merchant-httpd_private-get-orders.h"
#include
#include
/**
* Sensible bound on TALER_MERCHANTDB_OrderFilter.delta
*/
#define MAX_DELTA 1024
/**
* A pending GET /orders request.
*/
struct TMH_PendingOrder
{
/**
* Kept in a DLL.
*/
struct TMH_PendingOrder *prev;
/**
* Kept in a DLL.
*/
struct TMH_PendingOrder *next;
/**
* Which connection was suspended.
*/
struct MHD_Connection *con;
/**
* Associated heap node.
*/
struct GNUNET_CONTAINER_HeapNode *hn;
/**
* Which instance is this client polling? This also defines
* which DLL this struct is part of.
*/
struct TMH_MerchantInstance *mi;
/**
* At what time does this request expire? If set in the future, we
* may wait this long for a payment to arrive before responding.
*/
struct GNUNET_TIME_Absolute long_poll_timeout;
/**
* Filter to apply.
*/
struct TALER_MERCHANTDB_OrderFilter of;
/**
* The array of orders.
*/
json_t *pa;
/**
* The name of the instance we are querying for.
*/
const char *instance_id;
/**
* The result after adding the orders (#TALER_EC_NONE for okay, anything else for an error).
*/
enum TALER_ErrorCode result;
/**
* Is the structure in the DLL
*/
bool in_dll;
};
/**
* Task to timeout pending orders.
*/
static struct GNUNET_SCHEDULER_Task *order_timeout_task;
/**
* Heap for orders in long polling awaiting timeout.
*/
static struct GNUNET_CONTAINER_Heap *order_timeout_heap;
/**
* We are shutting down (or an instance is being deleted), force resume of all
* GET /orders requests.
*
* @param mi instance to force resuming for
*/
void
TMH_force_get_orders_resume (struct TMH_MerchantInstance *mi)
{
struct TMH_PendingOrder *po;
while (NULL != (po = mi->po_head))
{
GNUNET_assert (po->in_dll);
GNUNET_CONTAINER_DLL_remove (mi->po_head,
mi->po_tail,
po);
GNUNET_assert (po ==
GNUNET_CONTAINER_heap_remove_root (order_timeout_heap));
MHD_resume_connection (po->con);
po->in_dll = false;
}
if (NULL != mi->po_eh)
{
TMH_db->event_listen_cancel (mi->po_eh);
mi->po_eh = NULL;
}
if (NULL != order_timeout_task)
{
GNUNET_SCHEDULER_cancel (order_timeout_task);
order_timeout_task = NULL;
}
if (NULL != order_timeout_heap)
{
GNUNET_CONTAINER_heap_destroy (order_timeout_heap);
order_timeout_heap = NULL;
}
}
/**
* Task run to trigger timeouts on GET /orders requests with long polling.
*
* @param cls unused
*/
static void
order_timeout (void *cls)
{
struct TMH_PendingOrder *po;
struct TMH_MerchantInstance *mi;
(void) cls;
order_timeout_task = NULL;
while (1)
{
po = GNUNET_CONTAINER_heap_peek (order_timeout_heap);
if (NULL == po)
{
/* release data structure, we don't need it right now */
GNUNET_CONTAINER_heap_destroy (order_timeout_heap);
order_timeout_heap = NULL;
return;
}
if (GNUNET_TIME_absolute_is_future (po->long_poll_timeout))
break;
GNUNET_assert (po ==
GNUNET_CONTAINER_heap_remove_root (order_timeout_heap));
po->hn = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Resuming long polled job due to timeout\n");
mi = po->mi;
GNUNET_assert (po->in_dll);
GNUNET_CONTAINER_DLL_remove (mi->po_head,
mi->po_tail,
po);
if ( (NULL == mi->po_head) &&
(NULL != mi->po_eh) )
{
TMH_db->event_listen_cancel (mi->po_eh);
mi->po_eh = NULL;
}
po->in_dll = false;
MHD_resume_connection (po->con);
TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */
}
order_timeout_task = GNUNET_SCHEDULER_add_at (po->long_poll_timeout,
&order_timeout,
NULL);
}
/**
* Cleanup our "context", where we stored the JSON array
* we are building for the response.
*
* @param ctx context to clean up, must be a `struct AddOrderState *`
*/
static void
cleanup (void *ctx)
{
struct TMH_PendingOrder *po = ctx;
if (po->in_dll)
{
struct TMH_MerchantInstance *mi = po->mi;
GNUNET_CONTAINER_DLL_remove (mi->po_head,
mi->po_tail,
po);
}
if (NULL != po->hn)
GNUNET_assert (po ==
GNUNET_CONTAINER_heap_remove_node (po->hn));
json_decref (po->pa);
GNUNET_free (po);
}
/**
* Function called with information about a refund.
* It is responsible for summing up the refund amount.
*
* @param cls closure
* @param refund_serial unique serial number of the refund
* @param timestamp time of the refund (for grouping of refunds in the wallet UI)
* @param coin_pub public coin from which the refund comes from
* @param exchange_url URL of the exchange that issued @a coin_pub
* @param rtransaction_id identificator of the refund
* @param reason human-readable explanation of the refund
* @param refund_amount refund amount which is being taken from @a coin_pub
* @param pending true if the this refund was not yet processed by the wallet/exchange
*/
static void
process_refunds_cb (void *cls,
uint64_t refund_serial,
struct GNUNET_TIME_Absolute timestamp,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const char *exchange_url,
uint64_t rtransaction_id,
const char *reason,
const struct TALER_Amount *refund_amount,
bool pending)
{
struct TALER_Amount *total_refund_amount = cls;
GNUNET_assert (0 <=
TALER_amount_add (total_refund_amount,
total_refund_amount,
refund_amount));
}
/**
* Add order details to our JSON array.
*
* @param cls some closure
* @param orig_order_id the order this is about
* @param order_serial serial ID of the order
* @param creation_time when was the order created
*/
static void
add_order (void *cls,
const char *orig_order_id,
uint64_t order_serial,
struct GNUNET_TIME_Absolute creation_time)
{
struct TMH_PendingOrder *po = cls;
json_t *contract_terms;
struct GNUNET_HashCode h_contract_terms;
enum GNUNET_DB_QueryStatus qs;
const char *summary;
char *order_id = NULL;
bool refundable = false;
bool paid;
struct TALER_Amount order_amount;
qs = TMH_db->lookup_order_status_by_serial (TMH_db->cls,
po->instance_id,
order_serial,
&order_id,
&h_contract_terms,
&paid);
if (qs < 0)
{
GNUNET_break (0);
po->result = TALER_EC_GENERIC_DB_FETCH_FAILED;
return;
}
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
{
/* Contract terms don't exist, so the order cannot be paid. */
paid = false;
if (NULL == orig_order_id)
{
/* cannot be via DB trigger, and the other code
path should have passed an orig_order_id */
GNUNET_break (0);
po->result = TALER_EC_GENERIC_DB_FETCH_FAILED;
return;
}
order_id = GNUNET_strdup (orig_order_id);
}
{
/* First try to find the order in the contracts */
uint64_t os;
qs = TMH_db->lookup_contract_terms (TMH_db->cls,
po->instance_id,
order_id,
&contract_terms,
&os,
NULL);
}
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
{
/* Might still be unclaimed, so try order table */
struct GNUNET_HashCode unused;
qs = TMH_db->lookup_order (TMH_db->cls,
po->instance_id,
order_id,
NULL,
&unused,
&contract_terms);
}
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
GNUNET_break (0); /* Failed: #7025 */
po->result = TALER_EC_GENERIC_DB_FETCH_FAILED;
json_decref (contract_terms);
GNUNET_free (order_id);
return;
}
{
struct GNUNET_TIME_Absolute rd;
struct GNUNET_JSON_Specification spec[] = {
TALER_JSON_spec_amount ("amount",
TMH_currency,
&order_amount),
GNUNET_JSON_spec_absolute_time ("refund_deadline",
&rd),
GNUNET_JSON_spec_string ("summary",
&summary),
GNUNET_JSON_spec_end ()
};
if (GNUNET_OK !=
GNUNET_JSON_parse (contract_terms,
spec,
NULL, NULL))
{
GNUNET_break (0);
po->result = TALER_EC_MERCHANT_GENERIC_DB_CONTRACT_CONTENT_INVALID;
json_decref (contract_terms);
GNUNET_free (order_id);
return;
}
if (GNUNET_TIME_absolute_is_future (rd) &&
paid)
{
struct TALER_Amount refund_amount;
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (TMH_currency,
&refund_amount));
qs = TMH_db->lookup_refunds_detailed (TMH_db->cls,
po->instance_id,
&h_contract_terms,
&process_refunds_cb,
&refund_amount);
if (0 > qs)
{
GNUNET_break (0);
po->result = TALER_EC_GENERIC_DB_FETCH_FAILED;
json_decref (contract_terms);
GNUNET_free (order_id);
return;
}
if (0 > TALER_amount_cmp (&refund_amount,
&order_amount))
refundable = true;
}
}
GNUNET_assert (0 ==
json_array_append_new (
po->pa,
GNUNET_JSON_PACK (
GNUNET_JSON_pack_string ("order_id",
order_id),
GNUNET_JSON_pack_uint64 ("row_id",
order_serial),
GNUNET_JSON_pack_time_abs ("timestamp",
creation_time),
TALER_JSON_pack_amount ("amount",
&order_amount),
GNUNET_JSON_pack_string ("summary",
summary),
GNUNET_JSON_pack_bool ("refundable",
refundable),
GNUNET_JSON_pack_bool ("paid",
paid))));
json_decref (contract_terms);
GNUNET_free (order_id);
}
/**
* We have received a trigger from the database
* that we should (possibly) resume some requests.
*
* @param cls a `struct TMH_MerchantInstance`
* @param extra a `struct TMH_OrderChangeEventP`
* @param extra_size number of bytes in @a extra
*/
static void
resume_by_event (void *cls,
const void *extra,
size_t extra_size)
{
struct TMH_MerchantInstance *mi = cls;
const struct TMH_OrderChangeEventDetailsP *oce = extra;
struct TMH_PendingOrder *pn;
enum TMH_OrderStateFlags osf;
uint64_t order_serial_id;
struct GNUNET_TIME_Absolute date;
if (sizeof (*oce) != extra_size)
{
GNUNET_break (0);
return;
}
osf = (enum TMH_OrderStateFlags) ntohl (oce->order_state);
order_serial_id = GNUNET_ntohll (oce->order_serial_id);
date = GNUNET_TIME_absolute_ntoh (oce->execution_date);
for (struct TMH_PendingOrder *po = mi->po_head;
NULL != po;
po = pn)
{
pn = po->next;
if (! ( ( ((TALER_EXCHANGE_YNA_YES == po->of.paid) ==
(0 != (osf & TMH_OSF_PAID))) ||
(TALER_EXCHANGE_YNA_ALL == po->of.paid) ) &&
( ((TALER_EXCHANGE_YNA_YES == po->of.refunded) ==
(0 != (osf & TMH_OSF_REFUNDED))) ||
(TALER_EXCHANGE_YNA_ALL == po->of.refunded) ) &&
( ((TALER_EXCHANGE_YNA_YES == po->of.wired) ==
(0 != (osf & TMH_OSF_WIRED))) ||
(TALER_EXCHANGE_YNA_ALL == po->of.wired) ) ) )
continue;
if (po->of.delta > 0)
{
if (order_serial_id < po->of.start_row)
continue;
if (date.abs_value_us < po->of.date.abs_value_us)
continue;
po->of.delta--;
}
else
{
if (order_serial_id > po->of.start_row)
continue;
if (date.abs_value_us > po->of.date.abs_value_us)
continue;
po->of.delta++;
}
add_order (po,
NULL,
order_serial_id,
date);
GNUNET_assert (po->in_dll);
GNUNET_CONTAINER_DLL_remove (mi->po_head,
mi->po_tail,
po);
po->in_dll = false;
GNUNET_assert (po ==
GNUNET_CONTAINER_heap_remove_node (po->hn));
MHD_resume_connection (po->con);
TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */
}
if (NULL == mi->po_head)
{
TMH_db->event_listen_cancel (mi->po_eh);
mi->po_eh = NULL;
}
}
/**
* There has been a change or addition of a new @a order_id. Wake up
* long-polling clients that may have been waiting for this event.
*
* @param mi the instance where the order changed
* @param osf order state flags
* @param date execution date of the order
* @param order_serial_id serial ID of the order in the database
*/
void
TMH_notify_order_change (struct TMH_MerchantInstance *mi,
enum TMH_OrderStateFlags osf,
struct GNUNET_TIME_Absolute date,
uint64_t order_serial_id)
{
struct TMH_OrderChangeEventDetailsP oce = {
.order_serial_id = GNUNET_htonll (order_serial_id),
.execution_date = GNUNET_TIME_absolute_hton (date),
.order_state = htonl (osf)
};
struct TMH_OrderChangeEventP eh = {
.header.type = htons (TALER_DBEVENT_MERCHANT_ORDERS_CHANGE),
.header.size = htons (sizeof (eh)),
.merchant_pub = mi->merchant_pub
};
TMH_db->event_notify (TMH_db->cls,
&eh.header,
&oce,
sizeof (oce));
}
/**
* Handle a GET "/orders" request.
*
* @param rh context of the handler
* @param connection the MHD connection to handle
* @param[in,out] hc context with further information about the request
* @return MHD result code
*/
MHD_RESULT
TMH_private_get_orders (const struct TMH_RequestHandler *rh,
struct MHD_Connection *connection,
struct TMH_HandlerContext *hc)
{
struct TMH_PendingOrder *po = hc->ctx;
enum GNUNET_DB_QueryStatus qs;
struct TALER_MERCHANTDB_OrderFilter of;
if (NULL != po)
{
/* resumed from long-polling, return answer we already have
in 'hc->ctx' */
if (TALER_EC_NONE != po->result)
{
GNUNET_break (0);
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_INTERNAL_SERVER_ERROR,
po->result,
NULL);
}
return TALER_MHD_REPLY_JSON_PACK (
connection,
MHD_HTTP_OK,
GNUNET_JSON_pack_array_incref ("orders",
po->pa));
}
if (! (TALER_arg_to_yna (connection,
"paid",
TALER_EXCHANGE_YNA_ALL,
&of.paid)) )
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"paid");
if (! (TALER_arg_to_yna (connection,
"refunded",
TALER_EXCHANGE_YNA_ALL,
&of.refunded)) )
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"refunded");
if (! (TALER_arg_to_yna (connection,
"wired",
TALER_EXCHANGE_YNA_ALL,
&of.wired)) )
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"wired");
{
const char *delta_str;
delta_str = MHD_lookup_connection_value (connection,
MHD_GET_ARGUMENT_KIND,
"delta");
if (NULL == delta_str)
{
of.delta = -20;
}
else
{
char dummy;
long long ll;
if (1 !=
sscanf (delta_str,
"%lld%c",
&ll,
&dummy))
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"delta");
}
of.delta = (int64_t) ll;
if ( (-MAX_DELTA > of.delta) ||
(of.delta > MAX_DELTA) )
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"delta");
}
}
}
{
const char *date_ms_str;
date_ms_str = MHD_lookup_connection_value (connection,
MHD_GET_ARGUMENT_KIND,
"date_ms");
if (NULL == date_ms_str)
{
if (of.delta > 0)
of.date = GNUNET_TIME_UNIT_ZERO_ABS;
else
of.date = GNUNET_TIME_UNIT_FOREVER_ABS;
}
else
{
char dummy;
unsigned long long ll;
if (1 !=
sscanf (date_ms_str,
"%llu%c",
&ll,
&dummy))
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"date_ms");
}
of.date = GNUNET_TIME_absolute_from_ms (ll);
if (GNUNET_TIME_absolute_is_never (of.date))
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"date_ms");
}
}
}
{
const char *start_row_str;
start_row_str = MHD_lookup_connection_value (connection,
MHD_GET_ARGUMENT_KIND,
"start");
if (NULL == start_row_str)
{
if (of.delta > 0)
of.start_row = 0;
else
of.start_row = INT64_MAX;
}
else
{
char dummy;
unsigned long long ull;
if (1 !=
sscanf (start_row_str,
"%llu%c",
&ull,
&dummy))
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"start");
of.start_row = (uint64_t) ull;
if (INT64_MAX < of.start_row)
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"start");
}
}
}
{
const char *timeout_ms_str;
timeout_ms_str = MHD_lookup_connection_value (connection,
MHD_GET_ARGUMENT_KIND,
"timeout_ms");
if (NULL == timeout_ms_str)
{
of.timeout = GNUNET_TIME_UNIT_ZERO;
}
else
{
char dummy;
unsigned long long ull;
if (1 !=
sscanf (timeout_ms_str,
"%lld%c",
&ull,
&dummy))
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"timeout_ms");
of.timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
ull);
if (GNUNET_TIME_relative_is_forever (of.timeout))
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"timeout_ms");
}
}
if ( (0 >= of.delta) &&
(! GNUNET_TIME_relative_is_zero (of.timeout)) )
{
GNUNET_break_op (0);
of.timeout = GNUNET_TIME_UNIT_ZERO;
}
}
po = GNUNET_new (struct TMH_PendingOrder);
hc->ctx = po;
hc->cc = &cleanup;
po->con = connection;
po->pa = json_array ();
GNUNET_assert (NULL != po->pa);
po->instance_id = hc->instance->settings.id;
po->mi = hc->instance;
qs = TMH_db->lookup_orders (TMH_db->cls,
po->instance_id,
&of,
&add_order,
po);
if (0 > qs)
{
GNUNET_break (0);
po->result = TALER_EC_GENERIC_DB_FETCH_FAILED;
}
if (TALER_EC_NONE != po->result)
{
GNUNET_break (0);
return TALER_MHD_reply_with_error (connection,
MHD_HTTP_INTERNAL_SERVER_ERROR,
po->result,
NULL);
}
if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) &&
(! GNUNET_TIME_relative_is_zero (of.timeout)) )
{
struct TMH_MerchantInstance *mi = hc->instance;
/* setup timeout heap (if not yet exists) */
if (NULL == order_timeout_heap)
order_timeout_heap
= GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
po->hn = GNUNET_CONTAINER_heap_insert (order_timeout_heap,
po,
po->long_poll_timeout.abs_value_us);
po->long_poll_timeout = GNUNET_TIME_relative_to_absolute (of.timeout);
po->of = of;
GNUNET_CONTAINER_DLL_insert (mi->po_head,
mi->po_tail,
po);
po->in_dll = true;
if (NULL == mi->po_eh)
{
struct GNUNET_DB_EventHeaderP change_eh = {
.type = htons (TALER_DBEVENT_MERCHANT_ORDERS_CHANGE),
.size = htons (sizeof (change_eh))
};
mi->po_eh = TMH_db->event_listen (TMH_db->cls,
&change_eh,
GNUNET_TIME_UNIT_FOREVER_REL,
&resume_by_event,
mi);
}
MHD_suspend_connection (connection);
{
struct TMH_PendingOrder *pot;
/* start timeout task */
pot = GNUNET_CONTAINER_heap_peek (order_timeout_heap);
if (NULL != order_timeout_task)
GNUNET_SCHEDULER_cancel (order_timeout_task);
order_timeout_task = GNUNET_SCHEDULER_add_at (pot->long_poll_timeout,
&order_timeout,
NULL);
}
return MHD_YES;
}
return TALER_MHD_REPLY_JSON_PACK (
connection,
MHD_HTTP_OK,
GNUNET_JSON_pack_array_incref ("orders",
po->pa));
}
/* end of taler-merchant-httpd_private-get-orders.c */