diff options
author | Christian Grothoff <christian@grothoff.org> | 2020-11-15 16:17:57 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2020-11-15 16:17:57 +0100 |
commit | 736997ffe89afac91351606369b652e3b1c5d77b (patch) | |
tree | 9bf5fca6df2438fcba9d24b1e7f40aaebd915108 /src | |
parent | 9f60c8c041d9d86ebb701a2532e4d849ca9a9d82 (diff) |
add worker thread logic
Diffstat (limited to 'src')
-rw-r--r-- | src/util/taler-helper-crypto-rsa.c | 315 |
1 files changed, 312 insertions, 3 deletions
diff --git a/src/util/taler-helper-crypto-rsa.c b/src/util/taler-helper-crypto-rsa.c index 4c35370c9..272025b02 100644 --- a/src/util/taler-helper-crypto-rsa.c +++ b/src/util/taler-helper-crypto-rsa.c @@ -38,13 +38,15 @@ * assigned and collected by the main thread). * * TODO: - * - actual networking + * - networking: sending signature replies * - actual signing */ #include "platform.h" #include "taler_util.h" #include "taler-helper-crypto-rsa.h" #include <gcrypt.h> +#include <pthread.h> +#include <sys/eventfd.h> /** @@ -161,6 +163,12 @@ struct Denomination /** + * Actively worked on client request. + */ +struct WorkItem; + + +/** * Information we keep for a client connected to us. */ struct Client @@ -177,6 +185,11 @@ struct Client struct Client *prev; /** + * Work created by this client, NULL for none. + */ + struct WorkItem *work; + + /** * Client socket. */ struct GNUNET_NETWORK_Handle *sock; @@ -186,6 +199,44 @@ struct Client */ struct GNUNET_SCHEDULER_Task *task; + /** + * Flag set to true if this client has disconnected. Used + * by the workers to detect that they must free the client + * instead of returning the result. + */ + bool gone; + +}; + + +struct WorkItem +{ + + /** + * Kept in a DLL. + */ + struct WorkItem *next; + + /** + * Kept in a DLL. + */ + struct WorkItem *prev; + + /** + * The client that created the request. + */ + struct Client *client; + + /** + * Key to be used for this operation. + */ + struct DenominationKey *dk; + + /** + * Hash of the value to sign (FDH still to be computed!). + */ + struct GNUNET_HashCode h_message; + }; @@ -274,6 +325,138 @@ static struct Client *clients_head; */ static struct Client *clients_tail; +/** + * Head of DLL with pending signing operations. + */ +static struct WorkItem *work_head; + +/** + * Tail of DLL with pending signing operations. + */ +static struct WorkItem *work_tail; + +/** + * Lock for the work queue. + */ +static pthread_mutex_t work_lock; + +/** + * Condition variable for the semaphore of the work queue. + */ +static pthread_cond_t work_cond = PTHREAD_COND_INITIALIZER; + +/** + * Number of items in the work queue. Also used as the semaphore counter. + */ +static unsigned long long work_counter; + +/** + * Head of DLL with completed signing operations. + */ +static struct WorkItem *done_head; + +/** + * Tail of DLL with completed signing operations. + */ +static struct WorkItem *done_tail; + +/** + * Lock for the done queue. + */ +static pthread_mutex_t done_lock; + +/** + * Task waiting for work to be done. + */ +static struct GNUNET_SCHEDULER_Task *done_task; + +/** + * Signal used by threads to notify the #done_task that they + * completed work that is now in the done queue. + */ +static struct GNUNET_NETWORK_Handle *done_signal; + +/** + * Set once we are in shutdown and workers should terminate. + */ +static volatile bool in_shutdown; + +/** + * Array of #num_worker sign_worker() threads. + */ +static pthread_t *workers; + +/** + * Length of the #workers array. + */ +static unsigned int num_workers; + + +/** + * Function that performs the actual signature for the work @a wi + * + * @param[in,out] wi signature work we should do + */ +static void +do_sign (struct WorkItem *wi) +{ + // FIXME! +} + + +/** + * Main function of a worker thread that signs. + * + * @param cls NULL + * @return NULL + */ +static void * +sign_worker (void *cls) +{ + (void) cls; + GNUNET_assert (0 == pthread_mutex_lock (&work_lock)); + while (! in_shutdown) + { + struct WorkItem *wi; + + while (NULL != (wi = work_head)) + { + /* take work from queue */ + GNUNET_CONTAINER_DLL_remove (work_head, + work_tail, + wi); + work_counter--; + GNUNET_assert (0 == pthread_mutex_unlock (&work_lock)); + do_sign (wi); + /* put completed work into done queue */ + GNUNET_assert (0 == pthread_mutex_lock (&done_lock)); + GNUNET_CONTAINER_DLL_insert (done_head, + done_tail, + wi); + GNUNET_assert (0 == pthread_mutex_unlock (&done_lock)); + { + uint64_t val = GNUNET_htonll (1); + + /* raise #done_signal */ + if (sizeof(val) != + GNUNET_NETWORK_socket_send (done_signal, + &val, + sizeof (val))) + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "send(eventfd)"); + } + GNUNET_assert (0 == pthread_mutex_lock (&work_lock)); + } + /* queue is empty, wait for work */ + GNUNET_assert (0 == + pthread_cond_wait (&work_cond, + &work_lock)); + } + GNUNET_assert (0 == + pthread_mutex_unlock (&work_lock)); + return NULL; +} + /** * Free @a client, releasing all (remaining) state. @@ -289,10 +472,14 @@ free_client (struct Client *client) client->task = NULL; } GNUNET_NETWORK_socket_close (client->sock); + client->sock = NULL; GNUNET_CONTAINER_DLL_remove (clients_head, clients_tail, client); - GNUNET_free (client); + if (NULL != client->work) + client->gone = true; + else + GNUNET_free (client); } @@ -337,13 +524,84 @@ free_dk (struct DenominationKey *dk) /** + * Process completed tasks that are in the #done_head queue, sending + * the result back to the client (and resuming the client). + * + * @param cls NULL + */ +static void +handle_done (void *cls) +{ + uint64_t data; + (void) cls; + + /* consume #done_signal */ + if (sizeof (data) != + GNUNET_NETWORK_socket_recv (done_signal, + &data, + sizeof (data))) + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, + "recv(eventfd)"); + done_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + done_signal, + &handle_done, + NULL); + GNUNET_assert (0 == pthread_mutex_lock (&done_lock)); + while (NULL != done_head) + { + struct WorkItem *wi = done_head; + + GNUNET_CONTAINER_DLL_remove (done_head, + done_tail, + wi); + GNUNET_assert (0 == pthread_mutex_unlock (&done_lock)); + // FIXME: send response to client! + GNUNET_free (wi); + GNUNET_assert (0 == pthread_mutex_lock (&done_lock)); + } + GNUNET_assert (0 == pthread_mutex_unlock (&done_lock)); + +} + + +/** + * Handle @a client request @a sr to create signature. Create the + * signature using the respective key and return the result to + * the client. * + * @param client the client making the request + * @param sr the request details */ static void handle_sign_request (struct Client *client, const struct TALER_CRYPTO_SignRequest *sr) { - // FIXME ... + struct DenominationKey *dk; + struct WorkItem *wi; + + dk = GNUNET_CONTAINER_multihashmap_get (keys, + &sr->h_denom_pub); + if (NULL == dk) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Signing request failed, denomination key unknown\n"); + // FIXME: send failure response to client! + client_next (client); + return; + } + + wi = GNUNET_new (struct WorkItem); + wi->client = client; + wi->dk = dk; + dk->rc++; + wi->h_message = sr->h_message; + GNUNET_assert (0 == pthread_mutex_lock (&work_lock)); + work_counter++; + GNUNET_CONTAINER_DLL_insert (work_head, + work_tail, + wi); + GNUNET_assert (0 == pthread_mutex_unlock (&work_lock)); + GNUNET_assert (0 == pthread_cond_signal (&work_cond)); } @@ -1364,6 +1622,25 @@ do_shutdown (void *cls) GNUNET_SCHEDULER_cancel (keygen_task); keygen_task = NULL; } + if (NULL != done_task) + { + GNUNET_SCHEDULER_cancel (done_task); + done_task = NULL; + } + /* shut down worker threads */ + GNUNET_assert (0 == pthread_mutex_lock (&work_lock)); + in_shutdown = true; + GNUNET_assert (0 == pthread_cond_broadcast (&work_cond)); + GNUNET_assert (0 == pthread_mutex_unlock (&work_lock)); + for (unsigned int i = 0; i<num_workers; i++) + GNUNET_assert (0 == pthread_join (workers[i], + NULL)); + if (NULL != done_signal) + { + GNUNET_break (GNUNET_OK == + GNUNET_NETWORK_socket_close (done_signal)); + done_signal = NULL; + } } @@ -1518,6 +1795,38 @@ run (void *cls, /* start job to keep keys up-to-date */ keygen_task = GNUNET_SCHEDULER_add_now (&update_denominations, NULL); + + /* start job to handle completed work */ + { + int fd; + + fd = eventfd (0, + EFD_NONBLOCK | EFD_CLOEXEC); + if (-1 == fd) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "eventfd"); + global_ret = 6; + GNUNET_SCHEDULER_shutdown (); + return; + } + done_signal = GNUNET_NETWORK_socket_box_native (fd); + } + done_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, + done_signal, + &handle_done, + NULL); + + /* start crypto workers */ + num_workers = 1; // for now... + workers = GNUNET_new_array (num_workers, + pthread_t); + for (unsigned int i = 0; i<num_workers; i++) + GNUNET_assert (0 == + pthread_create (&workers[i], + NULL, + &sign_worker, + NULL)); } |