diff options
Diffstat (limited to 'src/backenddb/plugin_merchantdb_postgres.c')
-rw-r--r-- | src/backenddb/plugin_merchantdb_postgres.c | 526 |
1 files changed, 524 insertions, 2 deletions
diff --git a/src/backenddb/plugin_merchantdb_postgres.c b/src/backenddb/plugin_merchantdb_postgres.c index b52f058b..e9ddabe7 100644 --- a/src/backenddb/plugin_merchantdb_postgres.c +++ b/src/backenddb/plugin_merchantdb_postgres.c @@ -7113,7 +7113,7 @@ postgres_delete_webhook (void *cls, * Insert details about a particular webhook. * * @param cls closure - * @param instance_id instance to insert template for + * @param instance_id instance to insert webhook for * @param webhook_id webhook identifier of webhook to insert * @param wb the webhook details to insert * @return database result code @@ -7149,7 +7149,7 @@ postgres_insert_webhook (void *cls, * * @param cls closure * @param instance_id instance to update template for - * @param webhook_id template to update + * @param webhook_id webhook to update * @param wb update to the webhook details on success, can be NULL * (in that case we only want to check if the webhook exists) * @return database result code, #GNUNET_DB_STATUS_SUCCESS_NO_RESULTS if the webhook @@ -7349,6 +7349,432 @@ postgres_lookup_webhook (void *cls, /** + * Context used for postgres_lookup_webhook(). + */ +struct LookupWebhookDetailContext +{ + /** + * Function to call with the results. + */ + TALER_MERCHANTDB_WebhookDetailCallback cb; + + /** + * Closure for @a cb. + */ + void *cb_cls; + + /** + * Did database result extraction fail? + */ + bool extract_failed; +}; + +/** + * Function to be called with the results of a SELECT statement + * that has returned @a num_results results about webhook. + * + * @param[in,out] cls of type `struct LookupPendingWebhookContext *` + * @param result the postgres result + * @param num_results the number of results in @a result + */ +static void +lookup_webhook_by_event_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct LookupWebhookDetailContext *wlc = cls; + + for (unsigned int i = 0; i < num_results; i++) + { + uint64_t webhook_serial; + char *event_type; + char *url; + char *http_method; + char *header_template; + char *body_template; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("webhook_serial", + &webhook_serial), + GNUNET_PQ_result_spec_string ("event_type", + &event_type), + GNUNET_PQ_result_spec_string ("url", + &url), + GNUNET_PQ_result_spec_string ("http_method", + &http_method), + GNUNET_PQ_result_spec_string ("header_template", + &header_template), + GNUNET_PQ_result_spec_string ("body_template", + &body_template), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_OK != + GNUNET_PQ_extract_result (result, + rs, + i)) + { + GNUNET_break (0); + wlc->extract_failed = true; + return; + } + wlc->cb (wlc->cb_cls, + webhook_serial, + event_type, + url, + http_method, + header_template, + body_template); + GNUNET_PQ_cleanup_result (rs); + } +} + +/** + * Lookup webhook by event + * + * @param cls closure + * @param instance_id instance to lookup webhook for + * @param event_type event that we need to put in the pending webhook + * @param[out] cb set to the webhook details on success + * @param cb_cls callback closure + * @return database result code + */ +static enum GNUNET_DB_QueryStatus +postgres_lookup_webhook_by_event(void *cls, + const char *instance_id, + const char *event_type, + TALER_MERCHANTDB_WebhookDetailCallback cb, + void *cb_cls) +{ + struct PostgresClosure *pg = cls; + struct LookupWebhookDetailContext wlc = { + .cb = cb, + .cb_cls = cb_cls, + .extract_failed = false, + }; + + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (instance_id), + GNUNET_PQ_query_param_string (event_type), + GNUNET_PQ_query_param_end + }; + enum GNUNET_DB_QueryStatus qs; + + check_connection (pg); + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + "lookup_webhook_by_event", + params, + &lookup_webhook_by_event_cb, + &wlc); + + if (wlc.extract_failed) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} + +/** + * Insert webhook in the pending webhook. + * + * @param cls closure + * @param instance_id instance to insert webhook for + * @param webhook_serial webhook to insert in the pending webhook + * @param url to make the request to + * @param http_method for the webhook + * @param header of the webhook + * @param body of the webhook + * @return database result code + */ +static enum GNUNET_DB_QueryStatus +postgres_insert_pending_webhook(void *cls, + const char *instance_id, + uint64_t webhook_serial, + const char *url, + const char *http_method, + const char *header, + const char *body) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (instance_id), + GNUNET_PQ_query_param_uint64 (&webhook_serial), + GNUNET_PQ_query_param_string (url), + GNUNET_PQ_query_param_string (http_method), + NULL == header + ? GNUNET_PQ_query_param_null () + : GNUNET_PQ_query_param_string (header), + NULL == body + ? GNUNET_PQ_query_param_null () + : GNUNET_PQ_query_param_string (body), + GNUNET_PQ_query_param_end + }; + check_connection (pg); + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "insert_pending_webhook", + params); +} + +/** + * Context used for postgres_lookup_future_webhook(). + */ +struct LookupPendingWebhookContext +{ + /** + * Function to call with the results. + */ + TALER_MERCHANTDB_PendingWebhooksCallback cb; + + /** + * Closure for @a cb. + */ + void *cb_cls; + + /** + * Did database result extraction fail? + */ + bool extract_failed; +}; + +/** + * Function to be called with the results of a SELECT statement + * that has returned @a num_results results about webhook. + * + * @param[in,out] cls of type `struct LookupPendingWebhookContext *` + * @param result the postgres result + * @param num_results the number of results in @a result + */ +static void +lookup_pending_webhooks_cb (void *cls, + PGresult *result, + unsigned int num_results) +{ + struct LookupPendingWebhookContext *pwlc = cls; + + for (unsigned int i = 0; i < num_results; i++) + { + uint64_t webhook_serial; + struct GNUNET_TIME_Absolute next_attempt; + uint32_t retries; + char *url; + char *http_method; + char *header; + char *body; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("webhook_serial", + &webhook_serial), + GNUNET_PQ_result_spec_absolute_time ("next_attempt", + &next_attempt), + GNUNET_PQ_result_spec_uint32 ("retries", + &retries), + GNUNET_PQ_result_spec_string ("url", + &url), + GNUNET_PQ_result_spec_string ("http_method", + &http_method), + GNUNET_PQ_result_spec_string ("header", + &header), + GNUNET_PQ_result_spec_string ("body", + &body), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_OK != + GNUNET_PQ_extract_result (result, + rs, + i)) + { + GNUNET_break (0); + pwlc->extract_failed = true; + return; + } + pwlc->cb (pwlc->cb_cls, + webhook_serial, + next_attempt, + retries, + url, + http_method, + header, + body); + GNUNET_PQ_cleanup_result (rs); + } +} + +/** + * Lookup the webhook that need to be send in priority. + * send. + * + * @param cls closure + * @param cb pending webhook callback + * @param cb_cls callback closure + */ +// WHERE next_attempt <= now ORDER BY next_attempt ASC +static enum GNUNET_DB_QueryStatus +postgres_lookup_pending_webhook(void *cls, + TALER_MERCHANTDB_PendingWebhooksCallback cb, + void *cb_cls) +{ + struct PostgresClosure *pg = cls; + struct LookupPendingWebhookContext pwlc = { + .cb = cb, + .cb_cls = cb_cls, + .extract_failed = false, + }; + struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); + struct GNUNET_PQ_QueryParam params_null[] = { + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_end + }; + + enum GNUNET_DB_QueryStatus qs; + + check_connection (pg); + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + "lookup_pending_webhook", + params_null, + &lookup_pending_webhooks_cb, + &pwlc); + + if (pwlc.extract_failed) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} + + +/** + * Lookup future webhook in the pending webhook that need to be send. + * With that we can know how long the system can 'sleep'. + * + * @param cls closure + * @param cb pending webhook callback + * @param cb_cls callback closure + */ +// ORDER BY next_attempt ASC LIMIT 1 +static enum GNUNET_DB_QueryStatus +postgres_lookup_future_webhook(void *cls, + TALER_MERCHANTDB_PendingWebhooksCallback cb, + void *cb_cls) +{ + struct PostgresClosure *pg = cls; + struct LookupPendingWebhookContext pwlc = { + .cb = cb, + .cb_cls = cb_cls, + .extract_failed = false, + }; + struct GNUNET_PQ_QueryParam params_null[] = { + GNUNET_PQ_query_param_end + }; + + enum GNUNET_DB_QueryStatus qs; + + check_connection (pg); + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + "lookup_future_webhook", + params_null, + &lookup_pending_webhooks_cb, + &pwlc); + + if (pwlc.extract_failed) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} + + /** + * Lookup all the webhooks in the pending webhook. + * Use by the administrator + * + * @param cls closure + * @param instance_id to lookup webhooks for this instance particularly + * @param min_row to see the list of the pending webhook that it is started with this minimum row. + * @param max_results to see the list of the pending webhook that it is end with this max results. + * @param cb pending webhook callback + * @param cb_cls callback closure + */ + // WHERE webhook_pending_serial > min_row ORDER BY webhook_pending_serial ASC LIMIT max_results + static enum GNUNET_DB_QueryStatus + postgres_lookup_all_webhooks(void *cls, + const char *instance_id, + uint64_t min_row, + uint32_t max_results, + TALER_MERCHANTDB_PendingWebhooksCallback cb, + void *cb_cls) + { + struct PostgresClosure *pg = cls; + struct LookupPendingWebhookContext pwlc = { + .cb = cb, + .cb_cls = cb_cls, + .extract_failed = false, + }; + uint64_t max_results64 = max_results; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (instance_id), + GNUNET_PQ_query_param_uint64 (&min_row), + GNUNET_PQ_query_param_uint64 (&max_results64), + GNUNET_PQ_query_param_end + }; + + enum GNUNET_DB_QueryStatus qs; + + check_connection (pg); + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + "lookup_all_webhooks", + params, + &lookup_pending_webhooks_cb, + &pwlc); + + if (pwlc.extract_failed) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} + +/** + * Update the pending webhook. It is use if the webhook can't be send. + * + * @param cls closure + * @param webhook_serial webhook that need to be update + * @param next_attempt when we should make the next request to the webhook + * @return database result code + */ + static enum GNUNET_DB_QueryStatus + postgres_update_pending_webhook(void *cls, + uint64_t webhook_serial, + struct GNUNET_TIME_Absolute next_attempt) + // maybe add: http status of failure? +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&webhook_serial), + GNUNET_PQ_query_param_absolute_time (&next_attempt), + GNUNET_PQ_query_param_end + + }; + + check_connection (pg); + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "update_pending_webhook", + params); +} + +/** + * Delete a webhook in the pending webhook if it is successfull + * + * @param cls closure + * @param webhook_serial webhook that need to be delete in the pending webhook + * @return database result code + */ + static enum GNUNET_DB_QueryStatus + postgres_delete_pending_webhook(void *cls, + uint64_t webhook_serial) + { + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&webhook_serial), + GNUNET_PQ_query_param_end + }; + check_connection (pg); + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "delete_pending_webhook", + params); +} + + +/** * Establish connection to the database. * * @param cls plugin context @@ -9906,6 +10332,95 @@ postgres_connect (void *cls) " FROM merchant_instances" " WHERE merchant_id=$1)" " AND webhook_id=$2"), + /* for postgres_lookup_webhook_by_event() */ + GNUNET_PQ_make_prepare ("lookup_webhook_by_event", + "SELECT" + " webhook_serial" + ",event_type" + ",url" + ",http_method" + ",header_template" + ",body_template" + " FROM merchant_webhook" + " JOIN merchant_instances" + " USING (merchant_serial)" + " WHERE merchant_instances.merchant_id=$1" + " AND event_type=$2"), + /* for postgres_delete_pending_webhook() */ + GNUNET_PQ_make_prepare ("delete_pending_webhook", + "DELETE" + " FROM merchant_pending_webhooks" + " WHERE merchant_pending_webhooks.webhook_serial=" + " (SELECT webhook_serial " + " FROM merchant_webhook" + " WHERE webhook_serial=$1)"), + /* for postgres_insert_pending_webhook() */ + GNUNET_PQ_make_prepare ("insert_pending_webhook", + "INSERT INTO merchant_pending_webhooks" + "(merchant_serial" + ",webhook_serial" + ",url" + ",http_method" + ",header" + ",body" + ")" + " SELECT mi.merchant_serial," + " $2, $3, $4, $5, $6" + " FROM merchant_instances mi" + " WHERE mi.merchant_id=$1"), + /* for postgres_update_pending_webhook() */ + GNUNET_PQ_make_prepare ("update_pending_webhook", + "UPDATE merchant_pending_webhooks SET" + " retries=retries+1" + ",next_attempt=$2" + " WHERE webhook_serial=" + " (SELECT webhook_serial" + " FROM merchant_webhook" + " WHERE webhook_serial=$1)"), + /* for postgres_lookup_pending_webhook() */ + GNUNET_PQ_make_prepare ("lookup_pending_webhook", + "SELECT" + " webhook_serial" + ",next_attempt" + ",retries" + ",url" + ",http_method" + ",header" + ",body" + " FROM merchant_pending_webhooks" + " WHERE next_attempt <= $1" + " ORDER BY next_attempt ASC" + ), + /* for postgres_lookup_future_webhook() */ + GNUNET_PQ_make_prepare ("lookup_future_webhook", + "SELECT" + " webhook_serial" + ",next_attempt" + ",retries" + ",url" + ",http_method" + ",header" + ",body" + " FROM merchant_pending_webhooks" + " ORDER BY next_attempt ASC LIMIT 1" + ), + /* for postgres_lookup_all_webhooks() */ + GNUNET_PQ_make_prepare ("lookup_all_webhooks", + " SELECT" + " webhook_serial" + ",next_attempt" + ",retries" + ",url" + ",http_method" + ",header" + ",body" + " FROM merchant_pending_webhooks" + " JOIN merchant_instances" + " USING (merchant_serial)" + " WHERE merchant_instances.merchant_id=$1" + " AND webhook_serial > $2" + " ORDER BY webhook_serial" + " ASC LIMIT $3"), GNUNET_PQ_PREPARED_STATEMENT_END }; struct GNUNET_PQ_ExecuteStatement es[] = { @@ -10065,6 +10580,13 @@ libtaler_plugin_merchantdb_postgres_init (void *cls) plugin->delete_webhook = &postgres_delete_webhook; plugin->insert_webhook = &postgres_insert_webhook; plugin->update_webhook = &postgres_update_webhook; + plugin->lookup_webhook_by_event = &postgres_lookup_webhook_by_event; + plugin->lookup_all_webhooks = &postgres_lookup_all_webhooks; + plugin->lookup_future_webhook = &postgres_lookup_future_webhook; + plugin->lookup_pending_webhook = &postgres_lookup_pending_webhook; + plugin->delete_pending_webhook = &postgres_delete_pending_webhook; + plugin->insert_pending_webhook = &postgres_insert_pending_webhook; + plugin->update_pending_webhook = &postgres_update_pending_webhook; return plugin; } |