aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Bonzini <pbonzini@redhat.com>2011-09-08 13:46:25 +0200
committerPaolo Bonzini <pbonzini@redhat.com>2011-12-22 11:53:53 +0100
commit8c5135f90e2dcf1d5c3d03106e0ac6e371ccb572 (patch)
tree5c5976aeb886b20ea1f564cc828b448e56fcbebd
parent993295fedc5fefaefee9ac80e057d8bf08026ef4 (diff)
sheepdog: move coroutine send/recv function to generic code
Outside coroutines, avoid busy waiting on EAGAIN by temporarily making the socket blocking. The API of qemu_recvv/qemu_sendv is slightly different from do_readv/do_writev because they do not handle coroutines. It returns the number of bytes written before encountering an EAGAIN. The specificity of yielding on EAGAIN is entirely in qemu-coroutine.c. Reviewed-by: MORITA Kazutaka <morita.kazutaka@lab.ntt.co.jp> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
-rw-r--r--Makefile.objs2
-rw-r--r--block/sheepdog.c230
-rw-r--r--cutils.c111
-rw-r--r--qemu-common.h32
-rw-r--r--qemu-coroutine-io.c96
5 files changed, 260 insertions, 211 deletions
diff --git a/Makefile.objs b/Makefile.objs
index f753d838ff..8813673584 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -12,7 +12,7 @@ oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o
#######################################################################
# coroutines
-coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o
+coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o
ifeq ($(CONFIG_UCONTEXT_COROUTINE),y)
coroutine-obj-$(CONFIG_POSIX) += coroutine-ucontext.o
else
diff --git a/block/sheepdog.c b/block/sheepdog.c
index aa9707f2ae..00ea5a0acc 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -443,129 +443,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
return acb;
}
-#ifdef _WIN32
-
-struct msghdr {
- struct iovec *msg_iov;
- size_t msg_iovlen;
-};
-
-static ssize_t sendmsg(int s, const struct msghdr *msg, int flags)
-{
- size_t size = 0;
- char *buf, *p;
- int i, ret;
-
- /* count the msg size */
- for (i = 0; i < msg->msg_iovlen; i++) {
- size += msg->msg_iov[i].iov_len;
- }
- buf = g_malloc(size);
-
- p = buf;
- for (i = 0; i < msg->msg_iovlen; i++) {
- memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len);
- p += msg->msg_iov[i].iov_len;
- }
-
- ret = send(s, buf, size, flags);
-
- g_free(buf);
- return ret;
-}
-
-static ssize_t recvmsg(int s, struct msghdr *msg, int flags)
-{
- size_t size = 0;
- char *buf, *p;
- int i, ret;
-
- /* count the msg size */
- for (i = 0; i < msg->msg_iovlen; i++) {
- size += msg->msg_iov[i].iov_len;
- }
- buf = g_malloc(size);
-
- ret = qemu_recv(s, buf, size, flags);
- if (ret < 0) {
- goto out;
- }
-
- p = buf;
- for (i = 0; i < msg->msg_iovlen; i++) {
- memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len);
- p += msg->msg_iov[i].iov_len;
- }
-out:
- g_free(buf);
- return ret;
-}
-
-#endif
-
-/*
- * Send/recv data with iovec buffers
- *
- * This function send/recv data from/to the iovec buffer directly.
- * The first `offset' bytes in the iovec buffer are skipped and next
- * `len' bytes are used.
- *
- * For example,
- *
- * do_send_recv(sockfd, iov, len, offset, 1);
- *
- * is equals to
- *
- * char *buf = malloc(size);
- * iov_to_buf(iov, iovcnt, buf, offset, size);
- * send(sockfd, buf, size, 0);
- * free(buf);
- */
-static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset,
- int write)
-{
- struct msghdr msg;
- int ret, diff;
-
- memset(&msg, 0, sizeof(msg));
- msg.msg_iov = iov;
- msg.msg_iovlen = 1;
-
- len += offset;
-
- while (iov->iov_len < len) {
- len -= iov->iov_len;
-
- iov++;
- msg.msg_iovlen++;
- }
-
- diff = iov->iov_len - len;
- iov->iov_len -= diff;
-
- while (msg.msg_iov->iov_len <= offset) {
- offset -= msg.msg_iov->iov_len;
-
- msg.msg_iov++;
- msg.msg_iovlen--;
- }
-
- msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset;
- msg.msg_iov->iov_len -= offset;
-
- if (write) {
- ret = sendmsg(sockfd, &msg, 0);
- } else {
- ret = recvmsg(sockfd, &msg, 0);
- }
-
- msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset;
- msg.msg_iov->iov_len += offset;
-
- iov->iov_len += diff;
- return ret;
-}
-
static int connect_to_sdog(const char *addr, const char *port)
{
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
@@ -618,83 +495,19 @@ success:
return fd;
}
-static int do_readv_writev(int sockfd, struct iovec *iov, int len,
- int iov_offset, int write)
-{
- int ret;
-again:
- ret = do_send_recv(sockfd, iov, len, iov_offset, write);
- if (ret < 0) {
- if (errno == EINTR) {
- goto again;
- }
- if (errno == EAGAIN) {
- if (qemu_in_coroutine()) {
- qemu_coroutine_yield();
- }
- goto again;
- }
- error_report("failed to recv a rsp, %s", strerror(errno));
- return 1;
- }
-
- iov_offset += ret;
- len -= ret;
- if (len) {
- goto again;
- }
-
- return 0;
-}
-
-static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset)
-{
- return do_readv_writev(sockfd, iov, len, iov_offset, 0);
-}
-
-static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset)
-{
- return do_readv_writev(sockfd, iov, len, iov_offset, 1);
-}
-
-static int do_read_write(int sockfd, void *buf, int len, int write)
-{
- struct iovec iov;
-
- iov.iov_base = buf;
- iov.iov_len = len;
-
- return do_readv_writev(sockfd, &iov, len, 0, write);
-}
-
-static int do_read(int sockfd, void *buf, int len)
-{
- return do_read_write(sockfd, buf, len, 0);
-}
-
-static int do_write(int sockfd, void *buf, int len)
-{
- return do_read_write(sockfd, buf, len, 1);
-}
-
static int send_req(int sockfd, SheepdogReq *hdr, void *data,
unsigned int *wlen)
{
int ret;
- struct iovec iov[2];
- iov[0].iov_base = hdr;
- iov[0].iov_len = sizeof(*hdr);
-
- if (*wlen) {
- iov[1].iov_base = data;
- iov[1].iov_len = *wlen;
+ ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0);
+ if (ret < sizeof(*hdr)) {
+ error_report("failed to send a req, %s", strerror(errno));
}
- ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0);
- if (ret) {
+ ret = qemu_send_full(sockfd, data, *wlen, 0);
+ if (ret < *wlen) {
error_report("failed to send a req, %s", strerror(errno));
- ret = -1;
}
return ret;
@@ -705,16 +518,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
{
int ret;
+ socket_set_block(sockfd);
ret = send_req(sockfd, hdr, data, wlen);
- if (ret) {
- ret = -1;
+ if (ret < 0) {
goto out;
}
- ret = do_read(sockfd, hdr, sizeof(*hdr));
- if (ret) {
+ ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0);
+ if (ret < sizeof(*hdr)) {
error_report("failed to get a rsp, %s", strerror(errno));
- ret = -1;
goto out;
}
@@ -723,15 +535,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
}
if (*rlen) {
- ret = do_read(sockfd, data, *rlen);
- if (ret) {
+ ret = qemu_recv_full(sockfd, data, *rlen, 0);
+ if (ret < *rlen) {
error_report("failed to get the data, %s", strerror(errno));
- ret = -1;
goto out;
}
}
ret = 0;
out:
+ socket_set_nonblock(sockfd);
return ret;
}
@@ -793,8 +605,8 @@ static void coroutine_fn aio_read_response(void *opaque)
}
/* read a header */
- ret = do_read(fd, &rsp, sizeof(rsp));
- if (ret) {
+ ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
+ if (ret < 0) {
error_report("failed to get the header, %s", strerror(errno));
goto out;
}
@@ -839,9 +651,9 @@ static void coroutine_fn aio_read_response(void *opaque)
}
break;
case AIOCB_READ_UDATA:
- ret = do_readv(fd, acb->qiov->iov, rsp.data_length,
- aio_req->iov_offset);
- if (ret) {
+ ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length,
+ aio_req->iov_offset);
+ if (ret < 0) {
error_report("failed to get the data, %s", strerror(errno));
goto out;
}
@@ -1114,16 +926,16 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
set_cork(s->fd, 1);
/* send a header */
- ret = do_write(s->fd, &hdr, sizeof(hdr));
- if (ret) {
+ ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
+ if (ret < 0) {
qemu_co_mutex_unlock(&s->lock);
error_report("failed to send a req, %s", strerror(errno));
return -EIO;
}
if (wlen) {
- ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset);
- if (ret) {
+ ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset);
+ if (ret < 0) {
qemu_co_mutex_unlock(&s->lock);
error_report("failed to send a data, %s", strerror(errno));
return -EIO;
diff --git a/cutils.c b/cutils.c
index 24b3fe355b..a6ffd46445 100644
--- a/cutils.c
+++ b/cutils.c
@@ -25,6 +25,8 @@
#include "host-utils.h"
#include <math.h>
+#include "qemu_socket.h"
+
void pstrcpy(char *buf, int buf_size, const char *str)
{
int c;
@@ -403,3 +405,112 @@ int qemu_parse_fd(const char *param)
}
return fd;
}
+
+/*
+ * Send/recv data with iovec buffers
+ *
+ * This function send/recv data from/to the iovec buffer directly.
+ * The first `offset' bytes in the iovec buffer are skipped and next
+ * `len' bytes are used.
+ *
+ * For example,
+ *
+ * do_sendv_recvv(sockfd, iov, len, offset, 1);
+ *
+ * is equal to
+ *
+ * char *buf = malloc(size);
+ * iov_to_buf(iov, iovcnt, buf, offset, size);
+ * send(sockfd, buf, size, 0);
+ * free(buf);
+ */
+static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset,
+ int do_sendv)
+{
+ int ret, diff, iovlen;
+ struct iovec *last_iov;
+
+ /* last_iov is inclusive, so count from one. */
+ iovlen = 1;
+ last_iov = iov;
+ len += offset;
+
+ while (last_iov->iov_len < len) {
+ len -= last_iov->iov_len;
+
+ last_iov++;
+ iovlen++;
+ }
+
+ diff = last_iov->iov_len - len;
+ last_iov->iov_len -= diff;
+
+ while (iov->iov_len <= offset) {
+ offset -= iov->iov_len;
+
+ iov++;
+ iovlen--;
+ }
+
+ iov->iov_base = (char *) iov->iov_base + offset;
+ iov->iov_len -= offset;
+
+ {
+#if defined CONFIG_IOVEC && defined CONFIG_POSIX
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_iov = iov;
+ msg.msg_iovlen = iovlen;
+
+ do {
+ if (do_sendv) {
+ ret = sendmsg(sockfd, &msg, 0);
+ } else {
+ ret = recvmsg(sockfd, &msg, 0);
+ }
+ } while (ret == -1 && errno == EINTR);
+#else
+ struct iovec *p = iov;
+ ret = 0;
+ while (iovlen > 0) {
+ int rc;
+ if (do_sendv) {
+ rc = send(sockfd, p->iov_base, p->iov_len, 0);
+ } else {
+ rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0);
+ }
+ if (rc == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (ret == 0) {
+ ret = -1;
+ }
+ break;
+ }
+ if (rc == 0) {
+ break;
+ }
+ ret += rc;
+ iovlen--, p++;
+ }
+#endif
+ }
+
+ /* Undo the changes above */
+ iov->iov_base = (char *) iov->iov_base - offset;
+ iov->iov_len += offset;
+ last_iov->iov_len += diff;
+ return ret;
+}
+
+int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+ return do_sendv_recvv(sockfd, iov, len, iov_offset, 0);
+}
+
+int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset)
+{
+ return do_sendv_recvv(sockfd, iov, len, iov_offset, 1);
+}
+
diff --git a/qemu-common.h b/qemu-common.h
index 5c3f3af6aa..6ab7dfb1b9 100644
--- a/qemu-common.h
+++ b/qemu-common.h
@@ -175,7 +175,7 @@ ssize_t qemu_write_full(int fd, const void *buf, size_t count)
QEMU_WARN_UNUSED_RESULT;
ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags)
QEMU_WARN_UNUSED_RESULT;
-ssize_t qemu_recv_full(int fd, const void *buf, size_t count, int flags)
+ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags)
QEMU_WARN_UNUSED_RESULT;
void qemu_set_cloexec(int fd);
@@ -190,6 +190,9 @@ int qemu_pipe(int pipefd[2]);
#define qemu_recv(sockfd, buf, len, flags) recv(sockfd, buf, len, flags)
#endif
+int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset);
+int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset);
+
/* Error handling. */
void QEMU_NORETURN hw_error(const char *fmt, ...) GCC_FMT_ATTR(1, 2);
@@ -276,6 +279,33 @@ struct qemu_work_item {
void qemu_init_vcpu(void *env);
#endif
+/**
+ * Sends an iovec (or optionally a part of it) down a socket, yielding
+ * when the socket is full.
+ */
+int qemu_co_sendv(int sockfd, struct iovec *iov,
+ int len, int iov_offset);
+
+/**
+ * Receives data into an iovec (or optionally into a part of it) from
+ * a socket, yielding when there is no data in the socket.
+ */
+int qemu_co_recvv(int sockfd, struct iovec *iov,
+ int len, int iov_offset);
+
+
+/**
+ * Sends a buffer down a socket, yielding when the socket is full.
+ */
+int qemu_co_send(int sockfd, void *buf, int len);
+
+/**
+ * Receives data into a buffer from a socket, yielding when there
+ * is no data in the socket.
+ */
+int qemu_co_recv(int sockfd, void *buf, int len);
+
+
typedef struct QEMUIOVector {
struct iovec *iov;
int niov;
diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c
new file mode 100644
index 0000000000..40fd514395
--- /dev/null
+++ b/qemu-coroutine-io.c
@@ -0,0 +1,96 @@
+/*
+ * Coroutine-aware I/O functions
+ *
+ * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
+ * Copyright (c) 2011, Red Hat, Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+#include "qemu-common.h"
+#include "qemu_socket.h"
+#include "qemu-coroutine.h"
+
+int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov,
+ int len, int iov_offset)
+{
+ int total = 0;
+ int ret;
+ while (len) {
+ ret = qemu_recvv(sockfd, iov, len, iov_offset + total);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ qemu_coroutine_yield();
+ continue;
+ }
+ if (total == 0) {
+ total = -1;
+ }
+ break;
+ }
+ if (ret == 0) {
+ break;
+ }
+ total += ret, len -= ret;
+ }
+
+ return total;
+}
+
+int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov,
+ int len, int iov_offset)
+{
+ int total = 0;
+ int ret;
+ while (len) {
+ ret = qemu_sendv(sockfd, iov, len, iov_offset + total);
+ if (ret < 0) {
+ if (errno == EAGAIN) {
+ qemu_coroutine_yield();
+ continue;
+ }
+ if (total == 0) {
+ total = -1;
+ }
+ break;
+ }
+ total += ret, len -= ret;
+ }
+
+ return total;
+}
+
+int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len)
+{
+ struct iovec iov;
+
+ iov.iov_base = buf;
+ iov.iov_len = len;
+
+ return qemu_co_recvv(sockfd, &iov, len, 0);
+}
+
+int coroutine_fn qemu_co_send(int sockfd, void *buf, int len)
+{
+ struct iovec iov;
+
+ iov.iov_base = buf;
+ iov.iov_len = len;
+
+ return qemu_co_sendv(sockfd, &iov, len, 0);
+}