aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2011-09-08 14:28:59 +0200
committerPaolo Bonzini <pbonzini@redhat.com>2011-12-22 11:53:57 +0100
commitae255e523c256cf0708f1c16cb946ff96340a800 (patch)
treefbc4e2309f43d337ed7ce0c1c7e3582a6008fa5c
parent8c5135f90e2dcf1d5c3d03106e0ac6e371ccb572 (diff)
nbd: switch to asynchronous operation
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
-rw-r--r--block/nbd.c188
-rw-r--r--nbd.c8
2 files changed, 131 insertions, 65 deletions
diff --git a/block/nbd.c b/block/nbd.c
index 95212dac64..bea7acd213 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -47,13 +47,17 @@
#endif
typedef struct BDRVNBDState {
- CoMutex lock;
int sock;
uint32_t nbdflags;
off_t size;
size_t blocksize;
char *export_name; /* An NBD server may export several devices */
+ CoMutex mutex;
+ Coroutine *coroutine;
+
+ struct nbd_reply reply;
+
/* If it begins with '/', this is a UNIX domain socket. Otherwise,
* it's a string of the form <hostname|ip4|\[ip6\]>:port
*/
@@ -106,6 +110,95 @@ out:
return err;
}
+static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request)
+{
+ qemu_co_mutex_lock(&s->mutex);
+ s->coroutine = qemu_coroutine_self();
+ request->handle = (uint64_t)(intptr_t)s;
+}
+
+static int nbd_have_request(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+
+ return !!s->coroutine;
+}
+
+static void nbd_reply_ready(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+
+ if (s->reply.handle == 0) {
+ /* No reply already in flight. Fetch a header. */
+ if (nbd_receive_reply(s->sock, &s->reply) < 0) {
+ s->reply.handle = 0;
+ }
+ }
+
+ /* There's no need for a mutex on the receive side, because the
+ * handler acts as a synchronization point and ensures that only
+ * one coroutine is called until the reply finishes. */
+ if (s->coroutine) {
+ qemu_coroutine_enter(s->coroutine, NULL);
+ }
+}
+
+static void nbd_restart_write(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+ qemu_coroutine_enter(s->coroutine, NULL);
+}
+
+static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
+ struct iovec *iov, int offset)
+{
+ int rc, ret;
+
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write,
+ nbd_have_request, NULL, s);
+ rc = nbd_send_request(s->sock, request);
+ if (rc != -1 && iov) {
+ ret = qemu_co_sendv(s->sock, iov, request->len, offset);
+ if (ret != request->len) {
+ errno = -EIO;
+ rc = -1;
+ }
+ }
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+ nbd_have_request, NULL, s);
+ return rc;
+}
+
+static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
+ struct nbd_reply *reply,
+ struct iovec *iov, int offset)
+{
+ int ret;
+
+ /* Wait until we're woken up by the read handler. */
+ qemu_coroutine_yield();
+ *reply = s->reply;
+ if (reply->handle != request->handle) {
+ reply->error = EIO;
+ } else {
+ if (iov && reply->error == 0) {
+ ret = qemu_co_recvv(s->sock, iov, request->len, offset);
+ if (ret != request->len) {
+ reply->error = EIO;
+ }
+ }
+
+ /* Tell the read handler to read another header. */
+ s->reply.handle = 0;
+ }
+}
+
+static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request)
+{
+ s->coroutine = NULL;
+ qemu_co_mutex_unlock(&s->mutex);
+}
+
static int nbd_establish_connection(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
@@ -135,8 +228,11 @@ static int nbd_establish_connection(BlockDriverState *bs)
return -errno;
}
- /* Now that we're connected, set the socket to be non-blocking */
+ /* Now that we're connected, set the socket to be non-blocking and
+ * kick the reply mechanism. */
socket_set_nonblock(sock);
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+ nbd_have_request, NULL, s);
s->sock = sock;
s->size = size;
@@ -152,11 +248,11 @@ static void nbd_teardown_connection(BlockDriverState *bs)
struct nbd_request request;
request.type = NBD_CMD_DISC;
- request.handle = (uint64_t)(intptr_t)bs;
request.from = 0;
request.len = 0;
nbd_send_request(s->sock, &request);
+ qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL);
closesocket(s->sock);
}
@@ -165,6 +261,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
BDRVNBDState *s = bs->opaque;
int result;
+ qemu_co_mutex_init(&s->mutex);
+
/* Pop the config into our state object. Exit if invalid. */
result = nbd_config(s, filename, flags);
if (result != 0) {
@@ -176,90 +274,50 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags)
*/
result = nbd_establish_connection(bs);
- qemu_co_mutex_init(&s->lock);
return result;
}
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
- uint8_t *buf, int nb_sectors)
+static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
struct nbd_reply reply;
request.type = NBD_CMD_READ;
- request.handle = (uint64_t)(intptr_t)bs;
request.from = sector_num * 512;
request.len = nb_sectors * 512;
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
-
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
-
- if (reply.error !=0)
- return -reply.error;
-
- if (reply.handle != request.handle)
- return -EIO;
-
- if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
- return -EIO;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
- return 0;
}
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
- const uint8_t *buf, int nb_sectors)
+static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
struct nbd_reply reply;
request.type = NBD_CMD_WRITE;
- request.handle = (uint64_t)(intptr_t)bs;
request.from = sector_num * 512;
request.len = nb_sectors * 512;
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
-
- if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
- return -EIO;
-
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
-
- if (reply.error !=0)
- return -reply.error;
-
- if (reply.handle != request.handle)
- return -EIO;
-
- return 0;
-}
-
-static coroutine_fn int nbd_co_read(BlockDriverState *bs, int64_t sector_num,
- uint8_t *buf, int nb_sectors)
-{
- int ret;
- BDRVNBDState *s = bs->opaque;
- qemu_co_mutex_lock(&s->lock);
- ret = nbd_read(bs, sector_num, buf, nb_sectors);
- qemu_co_mutex_unlock(&s->lock);
- return ret;
-}
-
-static coroutine_fn int nbd_co_write(BlockDriverState *bs, int64_t sector_num,
- const uint8_t *buf, int nb_sectors)
-{
- int ret;
- BDRVNBDState *s = bs->opaque;
- qemu_co_mutex_lock(&s->lock);
- ret = nbd_write(bs, sector_num, buf, nb_sectors);
- qemu_co_mutex_unlock(&s->lock);
- return ret;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
}
static void nbd_close(BlockDriverState *bs)
@@ -282,8 +340,8 @@ static BlockDriver bdrv_nbd = {
.format_name = "nbd",
.instance_size = sizeof(BDRVNBDState),
.bdrv_file_open = nbd_open,
- .bdrv_read = nbd_co_read,
- .bdrv_write = nbd_co_write,
+ .bdrv_co_readv = nbd_co_readv,
+ .bdrv_co_writev = nbd_co_writev,
.bdrv_close = nbd_close,
.bdrv_getlength = nbd_getlength,
.protocol_name = "nbd",
diff --git a/nbd.c b/nbd.c
index de880fe3c6..ff701d3dc8 100644
--- a/nbd.c
+++ b/nbd.c
@@ -81,6 +81,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
{
size_t offset = 0;
+ if (qemu_in_coroutine()) {
+ if (do_read) {
+ return qemu_co_recv(fd, buffer, size);
+ } else {
+ return qemu_co_send(fd, buffer, size);
+ }
+ }
+
while (offset < size) {
ssize_t len;