/* This file is part of TALER Copyright (C) 2023 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ /** * @file taler-merchant-webhook.c * @brief Process that runs webhooks triggered by the merchant backend * @author Priscilla HUANG */ #include "platform.h" #include #include #include #include "taler_merchantdb_lib.h" #include "taler_merchantdb_plugin.h" #include struct WorkResponse { struct WorkResponse *next; struct WorkResponse *prev; struct GNUNET_CURL_Job *job; uint64_t webhook_pending_serial; char *body; struct curl_slist *job_headers; }; static struct WorkResponse *w_head; static struct WorkResponse *w_tail; static struct GNUNET_DB_EventHandler *eh; /** * The merchant's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Our database plugin. */ static struct TALER_MERCHANTDB_Plugin *db_plugin; /** * Next task to run, if any. */ static struct GNUNET_SCHEDULER_Task *task; /** * Handle to the context for interacting with the bank / wire gateway. */ static struct GNUNET_CURL_Context *ctx; /** * Scheduler context for running the @e ctx. */ static struct GNUNET_CURL_RescheduleContext *rc; /** * Value to return from main(). 0 on success, non-zero on errors. */ static int global_ret; /** * #GNUNET_YES if we are in test mode and should exit when idle. */ static int test_mode; /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. * * @param cls closure */ static void shutdown_task (void *cls) { struct WorkResponse *w; (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); if (NULL != eh) { db_plugin->event_listen_cancel (eh); eh = NULL; } if (NULL != task) { GNUNET_SCHEDULER_cancel (task); task = NULL; } while (NULL != (w = w_head)) { GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); GNUNET_CURL_job_cancel (w->job); curl_slist_free_all (w->job_headers); GNUNET_free (w->body); GNUNET_free (w); } db_plugin->rollback (db_plugin->cls); /* just in case */ TALER_MERCHANTDB_plugin_unload (db_plugin); db_plugin = NULL; cfg = NULL; if (NULL != ctx) { GNUNET_CURL_fini (ctx); ctx = NULL; } if (NULL != rc) { GNUNET_CURL_gnunet_rc_destroy (rc); rc = NULL; } } /** * Select webhook to process. * * @param cls NULL */ static void select_work (void *cls); /** * This function is used by the function `pending_webhooks_cb`. According to the response code, * we delete or update the webhook. * * @param cls closure * @param response_code HTTP response code from server, 0 on hard error * @param body http body of the response * @param body_size number of bytes in @a body */ static void handle_webhook_response (void *cls, long response_code, const void *body, size_t body_size) { struct WorkResponse *w = cls; (void) body; (void) body_size; w->job = NULL; GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); GNUNET_free (w->body); curl_slist_free_all (w->job_headers); if (NULL == w_head) task = GNUNET_SCHEDULER_add_now (&select_work, NULL); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Webhook %llu returned with status %ld\n", (unsigned long long) w->webhook_pending_serial, response_code); if (2 == response_code / 100) /* any 2xx http status code is OK! */ { enum GNUNET_DB_QueryStatus qs; qs = db_plugin->delete_pending_webhook (db_plugin->cls, w->webhook_pending_serial); GNUNET_free (w); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to delete webhook, delete returned: %d\n", qs); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delete returned: %d\n", qs); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delete returned: %d\n", qs); return; } GNUNET_assert (0); } { struct GNUNET_TIME_Relative next_attempt; enum GNUNET_DB_QueryStatus qs; switch (response_code) { case MHD_HTTP_BAD_REQUEST: next_attempt = GNUNET_TIME_UNIT_FOREVER_REL; // never try again break; case MHD_HTTP_INTERNAL_SERVER_ERROR: next_attempt = GNUNET_TIME_UNIT_MINUTES; break; case MHD_HTTP_FORBIDDEN: next_attempt = GNUNET_TIME_UNIT_MINUTES; break; default: next_attempt = GNUNET_TIME_UNIT_HOURS; break; } qs = db_plugin->update_pending_webhook (db_plugin->cls, w->webhook_pending_serial, GNUNET_TIME_relative_to_absolute ( next_attempt)); GNUNET_free (w); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to update pending webhook to next in %s Rval: %d\n", GNUNET_TIME_relative2s (next_attempt, true), qs); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Next in %s Rval: %d\n", GNUNET_TIME_relative2s (next_attempt, true), qs); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Next in %s Rval: %d\n", GNUNET_TIME_relative2s (next_attempt, true), qs); return; } GNUNET_assert (0); } } /** * Typically called by `select_work`. * * @param cls a `json_t *` JSON array to build * @param webhook_pending_serial reference to the configured webhook template. * @param next_attempt is the time we should make the next request to the webhook. * @param retries how often have we tried this request to the webhook. * @param url to make request to * @param http_method use for the webhook * @param header of the webhook * @param body of the webhook */ static void pending_webhooks_cb (void *cls, uint64_t webhook_pending_serial, struct GNUNET_TIME_Absolute next_attempt, uint32_t retries, const char *url, const char *http_method, const char *header, const char *body) { struct WorkResponse *w = GNUNET_new (struct WorkResponse); CURL *eh; (void) retries; (void) next_attempt; (void) cls; struct curl_slist *job_headers = NULL; GNUNET_CONTAINER_DLL_insert (w_head, w_tail, w); w->webhook_pending_serial = webhook_pending_serial; eh = curl_easy_init (); GNUNET_assert (NULL != eh); GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, CURLOPT_CUSTOMREQUEST, http_method)); GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, CURLOPT_URL, url)); GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, CURLOPT_VERBOSE, 0L)); /* conversion body data */ if (NULL != body) { w->body = GNUNET_strdup (body); GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, CURLOPT_POSTFIELDS, w->body)); } /* conversion header to job_headers data */ if (NULL != header) { char *header_copy = GNUNET_strdup (header); for (const char *tok = strtok (header_copy, "\n"); NULL != tok; tok = strtok (NULL, "\n")) { // extract all Key: value from 'header_copy'! job_headers = curl_slist_append (job_headers, tok); } GNUNET_free (header_copy); GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, CURLOPT_HTTPHEADER, job_headers)); w->job_headers = job_headers; } GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, CURLOPT_MAXREDIRS, 5)); GNUNET_assert (CURLE_OK == curl_easy_setopt (eh, CURLOPT_FOLLOWLOCATION, 1)); w->job = GNUNET_CURL_job_add_raw (ctx, eh, job_headers, &handle_webhook_response, w); if (NULL == w->job) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start the curl job for pending webhook #%llu\n", (unsigned long long) webhook_pending_serial); curl_slist_free_all (w->job_headers); GNUNET_free (w->body); GNUNET_CONTAINER_DLL_remove (w_head, w_tail, w); GNUNET_free (w); GNUNET_SCHEDULER_shutdown (); return; } } /** * Function called on events received from Postgres. * * @param cls closure, NULL * @param extra additional event data provided * @param extra_size number of bytes in @a extra */ static void db_notify (void *cls, const void *extra, size_t extra_size) { (void) cls; (void) extra; (void) extra_size; GNUNET_assert (NULL != task); GNUNET_SCHEDULER_cancel (task); task = GNUNET_SCHEDULER_add_now (&select_work, NULL); } /** * Typically called by `select_work`. * * @param cls a `json_t *` JSON array to build * @param webhook_pending_serial reference to the configured webhook template. * @param next_attempt is the time we should make the next request to the webhook. * @param retries how often have we tried this request to the webhook. * @param url to make request to * @param http_method use for the webhook * @param header of the webhook * @param body of the webhook */ static void future_webhook_cb (void *cls, uint64_t webhook_pending_serial, struct GNUNET_TIME_Absolute next_attempt, uint32_t retries, const char *url, const char *http_method, const char *header, const char *body) { (void) webhook_pending_serial; (void) retries; (void) url; (void) http_method; (void) header; (void) body; task = GNUNET_SCHEDULER_add_at (next_attempt, &select_work, NULL); } static void select_work (void *cls) { enum GNUNET_DB_QueryStatus qs; struct GNUNET_TIME_Relative rel; (void) cls; task = NULL; db_plugin->preflight (db_plugin->cls); qs = db_plugin->lookup_pending_webhooks (db_plugin->cls, &pending_webhooks_cb, NULL); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to lookup pending webhooks!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: if (test_mode) { GNUNET_SCHEDULER_shutdown (); return; } qs = db_plugin->lookup_future_webhook (db_plugin->cls, &future_webhook_cb, NULL); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to lookup future webhook!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: /* wait 5 min */ /* Note: this should not even be necessary if all webhooks use the events properly... */ rel = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5); task = GNUNET_SCHEDULER_add_delayed (rel, &select_work, NULL); return; } case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: default: return; // wait for completion, then select more work. } } /** * First task. * * @param cls closure, NULL * @param args remaining command-line arguments * @param cfgfile name of the configuration file used (for saving, can be NULL!) * @param c configuration */ static void run (void *cls, char *const *args, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { (void) args; (void) cfgfile; cfg = c; GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); rc = GNUNET_CURL_gnunet_rc_create (ctx); if (NULL == ctx) { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_FAILURE; return; } if (NULL == (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to initialize DB subsystem\n"); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_NOTCONFIGURED; return; } if (GNUNET_OK != db_plugin->connect (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to connect to database\n"); GNUNET_SCHEDULER_shutdown (); global_ret = EXIT_FAILURE; return; } { struct GNUNET_DB_EventHeaderP es = { .size = htons (sizeof (es)), .type = htons (TALER_DBEVENT_MERCHANT_WEBHOOK_PENDING) }; eh = db_plugin->event_listen (db_plugin->cls, &es, GNUNET_TIME_UNIT_FOREVER_REL, &db_notify, NULL); } GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&select_work, NULL); } /** * The main function of the taler-merchant-webhook * @param argc number of arguments from the command line * @param argv command line arguments * @return 0 ok, 1 on error */ int main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) return EXIT_INVALIDARGUMENT; TALER_OS_init (); ret = GNUNET_PROGRAM_run ( argc, argv, "taler-merchant-webhook", gettext_noop ( "background process that executes webhooks"), options, &run, NULL); GNUNET_free_nz ((void *) argv); if (GNUNET_SYSERR == ret) return EXIT_INVALIDARGUMENT; if (GNUNET_NO == ret) return EXIT_SUCCESS; return global_ret; } /* end of taler-merchant-webhook.c */