diff options
author | Christian Grothoff <christian@grothoff.org> | 2021-08-25 17:23:35 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2021-08-25 17:23:35 +0200 |
commit | 696278ce80c7613e39c24e138dd6c99116080adb (patch) | |
tree | 31e85cf7bb872d79ac6f2c4d50dc7831a20d7692 /src/backend/taler-merchant-httpd_private-get-orders.c | |
parent | 5672080efcaca68ed2d301adf1673d45f7638a16 (diff) |
complete implementation of #6956: long polling triggers via database backend
Diffstat (limited to 'src/backend/taler-merchant-httpd_private-get-orders.c')
-rw-r--r-- | src/backend/taler-merchant-httpd_private-get-orders.c | 153 |
1 files changed, 120 insertions, 33 deletions
diff --git a/src/backend/taler-merchant-httpd_private-get-orders.c b/src/backend/taler-merchant-httpd_private-get-orders.c index 0f9e9f13..e2796610 100644 --- a/src/backend/taler-merchant-httpd_private-get-orders.c +++ b/src/backend/taler-merchant-httpd_private-get-orders.c @@ -21,6 +21,7 @@ #include "platform.h" #include "taler-merchant-httpd_private-get-orders.h" #include <taler/taler_json_lib.h> +#include <taler/taler_dbevents.h> /** @@ -127,6 +128,11 @@ TMH_force_get_orders_resume (struct TMH_MerchantInstance *mi) MHD_resume_connection (po->con); po->in_dll = false; } + if (NULL != mi->po_eh) + { + TMH_db->event_listen_cancel (mi->po_eh); + mi->po_eh = NULL; + } if (NULL != order_timeout_task) { GNUNET_SCHEDULER_cancel (order_timeout_task); @@ -175,6 +181,12 @@ order_timeout (void *cls) GNUNET_CONTAINER_DLL_remove (mi->po_head, mi->po_tail, po); + if ( (NULL == mi->po_head) && + (NULL != mi->po_eh) ) + { + TMH_db->event_listen_cancel (mi->po_eh); + mi->po_eh = NULL; + } po->in_dll = false; MHD_resume_connection (po->con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ @@ -249,14 +261,14 @@ process_refunds_cb (void *cls, /** * Add order details to our JSON array. * - * @param[in,out] cls a `json_t *` JSON array to build - * @param order_id ID of the order + * @param cls some closure + * @param orig_order_id the order this is about * @param order_serial serial ID of the order * @param creation_time when was the order created */ static void add_order (void *cls, - const char *order_id, + const char *orig_order_id, uint64_t order_serial, struct GNUNET_TIME_Absolute creation_time) { @@ -265,24 +277,37 @@ add_order (void *cls, struct GNUNET_HashCode h_contract_terms; enum GNUNET_DB_QueryStatus qs; const char *summary; + char *order_id = NULL; bool refundable = false; bool paid; struct TALER_Amount order_amount; - qs = TMH_db->lookup_order_status (TMH_db->cls, - po->instance_id, - order_id, - &h_contract_terms, - &paid); - /* qs == 0: contract terms don't exist, so the order cannot be paid. */ - if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) - paid = false; + qs = TMH_db->lookup_order_status_by_serial (TMH_db->cls, + po->instance_id, + order_serial, + &order_id, + &h_contract_terms, + &paid); if (qs < 0) { GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; return; } + /* qs == 0: contract terms don't exist, so the order cannot be paid. */ + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + paid = false; + if (NULL == orig_order_id) + { + /* cannot be via DB trigger, and the other code + path should have passed an orig_order_id */ + GNUNET_break (0); + po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; + return; + } + order_id = GNUNET_strdup (orig_order_id); + } if (paid) { @@ -314,6 +339,7 @@ add_order (void *cls, GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; json_decref (contract_terms); + GNUNET_free (order_id); return; } @@ -338,6 +364,7 @@ add_order (void *cls, GNUNET_break (0); po->result = TALER_EC_MERCHANT_GENERIC_DB_CONTRACT_CONTENT_INVALID; json_decref (contract_terms); + GNUNET_free (order_id); return; } @@ -359,6 +386,7 @@ add_order (void *cls, GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; json_decref (contract_terms); + GNUNET_free (order_id); return; } if (0 > TALER_amount_cmp (&refund_amount, @@ -386,42 +414,51 @@ add_order (void *cls, GNUNET_JSON_pack_bool ("paid", paid)))); json_decref (contract_terms); + GNUNET_free (order_id); } /** - * There has been a change or addition of a new @a order_id. Wake up - * long-polling clients that may have been waiting for this event. + * We have received a trigger from the database + * that we should (possibly) resume some requests. * - * @param mi the instance where the order changed - * @param order_id the order that changed - * @param paid is the order paid by the customer? - * @param refunded was the order refunded? - * @param wired was the merchant paid via wire transfer? - * @param date execution date of the order - * @param order_serial_id serial ID of the order in the database + * @param cls a `struct TMH_MerchantInstance` + * @param extra a `struct TMH_OrderChangeEventP` + * @param extra_size number of bytes in @a extra */ -void -TMH_notify_order_change (struct TMH_MerchantInstance *mi, - const char *order_id, - bool paid, - bool refunded, - bool wired, - struct GNUNET_TIME_Absolute date, - uint64_t order_serial_id) +static void +resume_by_event (void *cls, + const void *extra, + size_t extra_size) { + struct TMH_MerchantInstance *mi = cls; + const struct TMH_OrderChangeEventDetailsP *oce = extra; struct TMH_PendingOrder *pn; + enum TMH_OrderStateFlags osf; + uint64_t order_serial_id; + struct GNUNET_TIME_Absolute date; + if (sizeof (*oce) != extra_size) + { + GNUNET_break (0); + return; + } + osf = (enum TMH_OrderStateFlags) ntohl (oce->order_state); + order_serial_id = GNUNET_ntohll (oce->order_serial_id); + date = GNUNET_TIME_absolute_ntoh (oce->execution_date); for (struct TMH_PendingOrder *po = mi->po_head; NULL != po; po = pn) { pn = po->next; - if (! ( ( ((TALER_EXCHANGE_YNA_YES == po->of.paid) == paid) || + if (! ( ( ((TALER_EXCHANGE_YNA_YES == po->of.paid) == + (0 != (osf & TMH_OSF_PAID))) || (TALER_EXCHANGE_YNA_ALL == po->of.paid) ) && - ( ((TALER_EXCHANGE_YNA_YES == po->of.refunded) == refunded) || + ( ((TALER_EXCHANGE_YNA_YES == po->of.refunded) == + (0 != (osf & TMH_OSF_REFUNDED))) || (TALER_EXCHANGE_YNA_ALL == po->of.refunded) ) && - ( ((TALER_EXCHANGE_YNA_YES == po->of.wired) == wired) || + ( ((TALER_EXCHANGE_YNA_YES == po->of.wired) == + (0 != (osf & TMH_OSF_WIRED))) || (TALER_EXCHANGE_YNA_ALL == po->of.wired) ) ) ) continue; if (po->of.delta > 0) @@ -441,7 +478,7 @@ TMH_notify_order_change (struct TMH_MerchantInstance *mi, po->of.delta++; } add_order (po, - order_id, + NULL, order_serial_id, date); GNUNET_assert (po->in_dll); @@ -454,6 +491,44 @@ TMH_notify_order_change (struct TMH_MerchantInstance *mi, MHD_resume_connection (po->con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ } + if (NULL == mi->po_head) + { + TMH_db->event_listen_cancel (mi->po_eh); + mi->po_eh = NULL; + } +} + + +/** + * There has been a change or addition of a new @a order_id. Wake up + * long-polling clients that may have been waiting for this event. + * + * @param mi the instance where the order changed + * @param osf order state flags + * @param date execution date of the order + * @param order_serial_id serial ID of the order in the database + */ +void +TMH_notify_order_change (struct TMH_MerchantInstance *mi, + enum TMH_OrderStateFlags osf, + struct GNUNET_TIME_Absolute date, + uint64_t order_serial_id) +{ + struct TMH_OrderChangeEventDetailsP oce = { + .order_serial_id = GNUNET_htonll (order_serial_id), + .execution_date = GNUNET_TIME_absolute_hton (date), + .order_state = htonl (osf) + }; + struct TMH_OrderChangeEventP eh = { + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDERS_CHANGE), + .header.size = htons (sizeof (eh)), + .merchant_pub = mi->merchant_pub + }; + + TMH_db->event_notify (TMH_db->cls, + &eh.header, + &oce, + sizeof (oce)); } @@ -688,7 +763,6 @@ TMH_private_get_orders (const struct TMH_RequestHandler *rh, GNUNET_assert (NULL != po->pa); po->instance_id = hc->instance->settings.id; po->mi = hc->instance; - qs = TMH_db->lookup_orders (TMH_db->cls, po->instance_id, &of, @@ -724,6 +798,19 @@ TMH_private_get_orders (const struct TMH_RequestHandler *rh, mi->po_tail, po); po->in_dll = true; + if (NULL == mi->po_eh) + { + struct GNUNET_DB_EventHeaderP change_eh = { + .type = htons (TALER_DBEVENT_MERCHANT_ORDERS_CHANGE), + .size = htons (sizeof (change_eh)) + }; + + mi->po_eh = TMH_db->event_listen (TMH_db->cls, + &change_eh, + GNUNET_TIME_UNIT_FOREVER_REL, + &resume_by_event, + mi); + } MHD_suspend_connection (connection); { struct TMH_PendingOrder *pot; |