aboutsummaryrefslogtreecommitdiff
path: root/block/nbd.c
diff options
context:
space:
mode:
Diffstat (limited to 'block/nbd.c')
-rw-r--r--block/nbd.c69
1 files changed, 56 insertions, 13 deletions
diff --git a/block/nbd.c b/block/nbd.c
index 9d661c1a92..3f693e3ffa 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -46,6 +46,10 @@
#define logout(fmt, ...) ((void)0)
#endif
+#define MAX_NBD_REQUESTS 16
+#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
+#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs))
+
typedef struct BDRVNBDState {
int sock;
uint32_t nbdflags;
@@ -53,9 +57,12 @@ typedef struct BDRVNBDState {
size_t blocksize;
char *export_name; /* An NBD server may export several devices */
- CoMutex mutex;
- Coroutine *coroutine;
+ CoMutex send_mutex;
+ CoMutex free_sema;
+ Coroutine *send_coroutine;
+ int in_flight;
+ Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
struct nbd_reply reply;
/* If it begins with '/', this is a UNIX domain socket. Otherwise,
@@ -112,41 +119,68 @@ out:
static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request)
{
- qemu_co_mutex_lock(&s->mutex);
- s->coroutine = qemu_coroutine_self();
- request->handle = (uint64_t)(intptr_t)s;
+ int i;
+
+ /* Poor man semaphore. The free_sema is locked when no other request
+ * can be accepted, and unlocked after receiving one reply. */
+ if (s->in_flight >= MAX_NBD_REQUESTS - 1) {
+ qemu_co_mutex_lock(&s->free_sema);
+ assert(s->in_flight < MAX_NBD_REQUESTS);
+ }
+ s->in_flight++;
+
+ for (i = 0; i < MAX_NBD_REQUESTS; i++) {
+ if (s->recv_coroutine[i] == NULL) {
+ s->recv_coroutine[i] = qemu_coroutine_self();
+ break;
+ }
+ }
+
+ assert(i < MAX_NBD_REQUESTS);
+ request->handle = INDEX_TO_HANDLE(s, i);
}
static int nbd_have_request(void *opaque)
{
BDRVNBDState *s = opaque;
- return !!s->coroutine;
+ return s->in_flight > 0;
}
static void nbd_reply_ready(void *opaque)
{
BDRVNBDState *s = opaque;
+ int i;
if (s->reply.handle == 0) {
/* No reply already in flight. Fetch a header. */
if (nbd_receive_reply(s->sock, &s->reply) < 0) {
s->reply.handle = 0;
+ goto fail;
}
}
/* There's no need for a mutex on the receive side, because the
* handler acts as a synchronization point and ensures that only
* one coroutine is called until the reply finishes. */
- if (s->coroutine) {
- qemu_coroutine_enter(s->coroutine, NULL);
+ i = HANDLE_TO_INDEX(s, s->reply.handle);
+ if (s->recv_coroutine[i]) {
+ qemu_coroutine_enter(s->recv_coroutine[i], NULL);
+ return;
+ }
+
+fail:
+ for (i = 0; i < MAX_NBD_REQUESTS; i++) {
+ if (s->recv_coroutine[i]) {
+ qemu_coroutine_enter(s->recv_coroutine[i], NULL);
+ }
}
}
static void nbd_restart_write(void *opaque)
{
BDRVNBDState *s = opaque;
- qemu_coroutine_enter(s->coroutine, NULL);
+ qemu_coroutine_enter(s->send_coroutine, NULL);
}
static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
@@ -154,6 +188,8 @@ static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
{
int rc, ret;
+ qemu_co_mutex_lock(&s->send_mutex);
+ s->send_coroutine = qemu_coroutine_self();
qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write,
nbd_have_request, NULL, s);
rc = nbd_send_request(s->sock, request);
@@ -166,6 +202,8 @@ static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
}
qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
nbd_have_request, NULL, s);
+ s->send_coroutine = NULL;
+ qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
@@ -175,7 +213,8 @@ static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
{
int ret;
- /* Wait until we're woken up by the read handler. */
+ /* Wait until we're woken up by the read handler. TODO: perhaps
+ * peek at the next reply and avoid yielding if it's ours? */
qemu_coroutine_yield();
*reply = s->reply;
if (reply->handle != request->handle) {
@@ -195,8 +234,11 @@ static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request)
{
- s->coroutine = NULL;
- qemu_co_mutex_unlock(&s->mutex);
+ int i = HANDLE_TO_INDEX(s, request->handle);
+ s->recv_coroutine[i] = NULL;
+ if (s->in_flight-- == MAX_NBD_REQUESTS) {
+ qemu_co_mutex_unlock(&s->free_sema);
+ }
}
static int nbd_establish_connection(BlockDriverState *bs)
@@ -261,7 +303,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
BDRVNBDState *s = bs->opaque;
int result;
- qemu_co_mutex_init(&s->mutex);
+ qemu_co_mutex_init(&s->send_mutex);
+ qemu_co_mutex_init(&s->free_sema);
/* Pop the config into our state object. Exit if invalid. */
result = nbd_config(s, filename, flags);