/*
This file is part of TALER
Copyright (C) 2023 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see
*/
/**
* @file taler-merchant-wirewatch.c
* @brief Process that imports information about incoming bank transfers into the merchant backend
* @author Christian Grothoff
*/
#include "platform.h"
#include
#include
#include
#include
#include "taler_merchant_bank_lib.h"
#include "taler_merchantdb_lib.h"
#include "taler_merchantdb_plugin.h"
/**
* Timeout for the bank interaction. Rather long as we should do long-polling
* and do not want to wake up too often.
*/
#define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
5)
/**
* Information about a watch job.
*/
struct Watch
{
/**
* Kept in a DLL.
*/
struct Watch *next;
/**
* Kept in a DLL.
*/
struct Watch *prev;
/**
* Next task to run, if any.
*/
struct GNUNET_SCHEDULER_Task *task;
/**
* Dynamically adjusted long polling time-out.
*/
struct GNUNET_TIME_Relative bank_timeout;
/**
* For which instance are we importing bank transfers?
*/
char *instance_id;
/**
* For which account are we importing bank transfers?
*/
char *payto_uri;
/**
* Bank history request.
*/
struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh;
/**
* Start row for the bank interaction. Exclusive.
*/
uint64_t start_row;
/**
* Artificial delay to use between API calls. Used to
* throttle on failures.
*/
struct GNUNET_TIME_Relative delay;
/**
* When did we start our last HTTP request?
*/
struct GNUNET_TIME_Absolute start_time;
/**
* How long should long-polling take at least?
*/
struct GNUNET_TIME_Absolute long_poll_timeout;
/**
* Login data for the bank.
*/
struct TALER_MERCHANT_BANK_AuthenticationData ad;
/**
* Set to true if we found a transaction in the last iteration.
*/
bool found;
};
/**
* Head of active watches.
*/
static struct Watch *w_head;
/**
* Tail of active watches.
*/
static struct Watch *w_tail;
/**
* The merchant's configuration.
*/
static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
* Our database plugin.
*/
static struct TALER_MERCHANTDB_Plugin *db_plugin;
/**
* Handle to the context for interacting with the bank.
*/
static struct GNUNET_CURL_Context *ctx;
/**
* Scheduler context for running the @e ctx.
*/
static struct GNUNET_CURL_RescheduleContext *rc;
/**
* Event handler to learn that the configuration changed
* and we should shutdown (to be restarted).
*/
static struct GNUNET_DB_EventHandler *eh;
/**
* Value to return from main(). 0 on success, non-zero on errors.
*/
static int global_ret;
/**
* How many transactions should we fetch at most per batch?
*/
static unsigned int batch_size = 32;
/**
* #GNUNET_YES if we are in test mode and should exit when idle.
*/
static int test_mode;
/**
* #GNUNET_YES if we are in persistent mode and do
* not exit on #config_changed.
*/
static int persist_mode;
/**
* Set to true if we are shutting down due to a
* configuration change.
*/
static bool config_changed_flag;
/**
* Save progress in DB.
*/
static void
save (struct Watch *w)
{
enum GNUNET_DB_QueryStatus qs;
qs = db_plugin->update_wirewatch_progress (db_plugin->cls,
w->instance_id,
w->payto_uri,
w->start_row);
if (qs < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to persist wirewatch progress for %s/%s (%d)\n",
w->instance_id,
w->payto_uri,
qs);
GNUNET_SCHEDULER_shutdown ();
global_ret = EXIT_FAILURE;
}
}
/**
* Free resources of @a w.
*
* @param w watch job to terminate
*/
static void
end_watch (struct Watch *w)
{
if (NULL != w->task)
{
GNUNET_SCHEDULER_cancel (w->task);
w->task = NULL;
}
if (NULL != w->hh)
{
TALER_MERCHANT_BANK_credit_history_cancel (w->hh);
w->hh = NULL;
}
GNUNET_free (w->instance_id);
GNUNET_free (w->payto_uri);
TALER_MERCHANT_BANK_auth_free (&w->ad);
GNUNET_CONTAINER_DLL_remove (w_head,
w_tail,
w);
GNUNET_free (w);
}
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
*
* @param cls closure
*/
static void
shutdown_task (void *cls)
{
(void) cls;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Running shutdown\n");
while (NULL != w_head)
{
struct Watch *w = w_head;
save (w);
end_watch (w);
}
if (NULL != eh)
{
db_plugin->event_listen_cancel (eh);
eh = NULL;
}
TALER_MERCHANTDB_plugin_unload (db_plugin);
db_plugin = NULL;
cfg = NULL;
if (NULL != ctx)
{
GNUNET_CURL_fini (ctx);
ctx = NULL;
}
if (NULL != rc)
{
GNUNET_CURL_gnunet_rc_destroy (rc);
rc = NULL;
}
}
/**
* Parse @a subject from wire transfer into @a wtid and @a exchange_url.
*
* @param subject wire transfer subject to parse;
* format is "$WTID $URL"
* @param[out] wtid wire transfer ID to extract
* @param[out] exchange_url set to exchange URL
* @return #GNUNET_OK on success
*/
static enum GNUNET_GenericReturnValue
parse_subject (const char *subject,
struct TALER_WireTransferIdentifierRawP *wtid,
char **exchange_url)
{
const char *space;
space = strchr (subject, ' ');
if (NULL == space)
return GNUNET_NO;
if (GNUNET_OK !=
GNUNET_STRINGS_string_to_data (subject,
space - subject,
wtid,
sizeof (*wtid)))
return GNUNET_NO;
space++;
if (! TALER_url_valid_charset (space))
return GNUNET_NO;
if ( (0 != strncasecmp ("http://",
space,
strlen ("http://"))) &&
(0 != strncasecmp ("https://",
space,
strlen ("https://"))) )
return GNUNET_NO;
*exchange_url = GNUNET_strdup (space);
return GNUNET_OK;
}
/**
* Run next iteration.
*
* @param cls a `struct Watch *`
*/
static void
do_work (void *cls);
/**
* Callbacks of this type are used to serve the result of asking
* the bank for the credit transaction history.
*
* @param cls a `struct Watch *`
* @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request
* 0 if the bank's reply is bogus (fails to follow the protocol),
* #MHD_HTTP_NO_CONTENT if there are no more results; on success the
* last callback is always of this status (even if `abs(num_results)` were
* already returned).
* @param ec detailed error code
* @param serial_id monotonically increasing counter corresponding to the transaction
* @param details details about the wire transfer
* @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
*/
static enum GNUNET_GenericReturnValue
credit_cb (
void *cls,
unsigned int http_status,
enum TALER_ErrorCode ec,
uint64_t serial_id,
const struct TALER_MERCHANT_BANK_CreditDetails *details)
{
struct Watch *w = cls;
switch (http_status)
{
case 0:
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Invalid HTTP response (HTTP status: 0, %d) from bank\n",
ec);
w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
break;
case MHD_HTTP_OK:
{
enum GNUNET_DB_QueryStatus qs;
char *exchange_url;
struct TALER_WireTransferIdentifierRawP wtid;
char *credit_payto;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Received wire transfer `%s' over %s\n",
details->wire_subject,
TALER_amount2s (&details->amount));
w->found = true;
if (GNUNET_OK !=
parse_subject (details->wire_subject,
&wtid,
&exchange_url))
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Skipping transfer %llu (%s): not from exchange\n",
(unsigned long long) serial_id,
details->wire_subject);
w->start_row = serial_id;
return GNUNET_OK;
}
/* FIXME-Performance-Optimization: consider grouping multiple inserts
into one bigger transaction with just one notify. */
credit_payto = TALER_payto_normalize (details->credit_account_uri);
qs = db_plugin->insert_transfer (db_plugin->cls,
w->instance_id,
exchange_url,
&wtid,
&details->amount,
credit_payto,
true /* confirmed */);
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
{
struct TALER_Amount total;
struct TALER_Amount wfee;
struct TALER_Amount eamount;
struct GNUNET_TIME_Timestamp timestamp;
bool have_esig;
bool verified;
qs = db_plugin->lookup_transfer (db_plugin->cls,
w->instance_id,
exchange_url,
&wtid,
&total,
&wfee,
&eamount,
×tamp,
&have_esig,
&verified);
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Inserting transfer for %s into database failed. Is the credit account %s configured correctly?\n",
w->instance_id,
credit_payto);
}
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
if (0 !=
TALER_amount_cmp (&total,
&details->amount))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Inserting transfer for %s into database failed. An entry exists for a different transfer amount (%s)!\n",
w->instance_id,
TALER_amount2s (&total));
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Inserting transfer for %s into database failed. An equivalent entry already exists.\n",
w->instance_id);
}
}
}
GNUNET_free (credit_payto);
GNUNET_free (exchange_url);
if (qs < 0)
{
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
w->hh = NULL;
return GNUNET_SYSERR;
}
/* Success => reset back-off timer! */
w->delay = GNUNET_TIME_UNIT_ZERO;
{
struct GNUNET_DB_EventHeaderP es = {
.size = htons (sizeof (es)),
.type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED)
};
db_plugin->event_notify (db_plugin->cls,
&es,
NULL,
0);
}
}
w->start_row = serial_id;
return GNUNET_OK;
case MHD_HTTP_NO_CONTENT:
save (w);
/* Delay artificially if server returned before long-poll timeout */
if (! w->found)
w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout);
break;
case MHD_HTTP_NOT_FOUND:
/* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */
w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES,
GNUNET_TIME_STD_BACKOFF (w->delay));
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Bank claims account is unknown, waiting for %s before trying again\n",
GNUNET_TIME_relative2s (w->delay,
true));
break;
case MHD_HTTP_GATEWAY_TIMEOUT:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Gateway timeout, adjusting long polling threshold\n");
/* Limit new timeout at request delay */
w->bank_timeout
= GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration (
w->start_time),
w->bank_timeout);
/* set the timeout a bit earlier */
w->bank_timeout
= GNUNET_TIME_relative_subtract (w->bank_timeout,
GNUNET_TIME_UNIT_SECONDS);
/* do not allow it to go to zero */
w->bank_timeout
= GNUNET_TIME_relative_max (w->bank_timeout,
GNUNET_TIME_UNIT_SECONDS);
w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
break;
default:
/* Something went wrong, try again, but with back-off */
w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Unexpected HTTP status code %u(%d) from bank\n",
http_status,
ec);
break;
}
w->hh = NULL;
if (test_mode && (! w->found))
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No transactions found and in test mode. Ending watch!\n");
end_watch (w);
if (NULL == w_head)
GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK;
}
w->task = GNUNET_SCHEDULER_add_delayed (w->delay,
&do_work,
w);
return GNUNET_OK;
}
static void
do_work (void *cls)
{
struct Watch *w = cls;
w->task = NULL;
w->found = false;
w->long_poll_timeout
= GNUNET_TIME_relative_to_absolute (w->bank_timeout);
w->start_time
= GNUNET_TIME_absolute_get ();
w->hh = TALER_MERCHANT_BANK_credit_history (ctx,
&w->ad,
w->start_row,
batch_size,
test_mode
? GNUNET_TIME_UNIT_ZERO
: w->bank_timeout,
&credit_cb,
w);
if (NULL == w->hh)
{
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
return;
}
}
/**
* Function called with information about a accounts
* the wirewatcher should monitor.
*
* @param cls closure (NULL)
* @param instance instance that owns the account
* @param payto_uri account URI
* @param credit_facade_url URL for the credit facade
* @param credit_facade_credentials account access credentials
* @param last_serial last transaction serial (inclusive) we have seen from this account
*/
static void
start_watch (
void *cls,
const char *instance,
const char *payto_uri,
const char *credit_facade_url,
const json_t *credit_facade_credentials,
uint64_t last_serial)
{
struct Watch *w = GNUNET_new (struct Watch);
(void) cls;
w->bank_timeout = BANK_TIMEOUT;
if (GNUNET_OK !=
TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials,
credit_facade_url,
&w->ad))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to parse authentication data of `%s/%s'\n",
instance,
payto_uri);
GNUNET_free (w);
GNUNET_SCHEDULER_shutdown ();
global_ret = EXIT_NOTCONFIGURED;
return;
}
GNUNET_CONTAINER_DLL_insert (w_head,
w_tail,
w);
w->instance_id = GNUNET_strdup (instance);
w->payto_uri = TALER_payto_normalize (payto_uri);
w->start_row = last_serial;
w->task = GNUNET_SCHEDULER_add_now (&do_work,
w);
}
/**
* Function called on configuration change events received from Postgres. We
* shutdown (and systemd should restart us).
*
* @param cls closure (NULL)
* @param extra additional event data provided
* @param extra_size number of bytes in @a extra
*/
static void
config_changed (void *cls,
const void *extra,
size_t extra_size)
{
(void) cls;
(void) extra;
(void) extra_size;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Configuration changed, %s\n",
0 == persist_mode
? "restarting"
: "reinitializing");
config_changed_flag = true;
GNUNET_SCHEDULER_shutdown ();
}
/**
* First task.
*
* @param cls closure, NULL
* @param args remaining command-line arguments
* @param cfgfile name of the configuration file used (for saving, can be NULL!)
* @param c configuration
*/
static void
run (void *cls,
char *const *args,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
(void) args;
(void) cfgfile;
cfg = c;
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
NULL);
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
&rc);
rc = GNUNET_CURL_gnunet_rc_create (ctx);
if (NULL == ctx)
{
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
global_ret = EXIT_NO_RESTART;
return;
}
if (NULL ==
(db_plugin = TALER_MERCHANTDB_plugin_load (cfg)))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to initialize DB subsystem\n");
GNUNET_SCHEDULER_shutdown ();
global_ret = EXIT_NOTCONFIGURED;
return;
}
if (GNUNET_OK !=
db_plugin->connect (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to connect to database\n");
GNUNET_SCHEDULER_shutdown ();
global_ret = EXIT_NO_RESTART;
return;
}
{
struct GNUNET_DB_EventHeaderP es = {
.size = htons (sizeof (es)),
.type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED)
};
eh = db_plugin->event_listen (db_plugin->cls,
&es,
GNUNET_TIME_UNIT_FOREVER_REL,
&config_changed,
NULL);
}
{
enum GNUNET_DB_QueryStatus qs;
qs = db_plugin->select_wirewatch_accounts (db_plugin->cls,
&start_watch,
NULL);
if (qs < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain wirewatch accounts from database\n");
GNUNET_SCHEDULER_shutdown ();
global_ret = EXIT_NO_RESTART;
return;
}
if ( (NULL == w_head) &&
(GNUNET_YES == test_mode) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No active wirewatch accounts in database and in test mode. Exiting.\n");
GNUNET_SCHEDULER_shutdown ();
global_ret = EXIT_SUCCESS;
return;
}
}
}
/**
* The main function of taler-merchant-wirewatch
*
* @param argc number of arguments from the command line
* @param argv command line arguments
* @return 0 ok, 1 on error
*/
int
main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_option_flag ('p',
"persist",
"run in persist mode and do not exit on configuration changes",
&persist_mode),
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_option_flag ('t',
"test",
"run in test mode and exit when idle",
&test_mode),
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;
if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv))
return EXIT_INVALIDARGUMENT;
TALER_OS_init ();
do {
config_changed_flag = false;
ret = GNUNET_PROGRAM_run (
argc, argv,
"taler-merchant-wirewatch",
gettext_noop (
"background process that watches for incoming wire transfers to the merchant bank account"),
options,
&run, NULL);
} while ( (1 == persist_mode) &&
config_changed_flag);
GNUNET_free_nz ((void *) argv);
if (GNUNET_SYSERR == ret)
return EXIT_INVALIDARGUMENT;
if (GNUNET_NO == ret)
return EXIT_SUCCESS;
return global_ret;
}
/* end of taler-exchange-wirewatch.c */