diff options
-rw-r--r-- | MAINTAINERS | 7 | ||||
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | Makefile.objs | 2 | ||||
-rw-r--r-- | block/nbd.c | 319 | ||||
-rw-r--r-- | block/sheepdog.c | 250 | ||||
-rw-r--r-- | cutils.c | 111 | ||||
-rw-r--r-- | main-loop.h | 6 | ||||
-rw-r--r-- | nbd.c | 439 | ||||
-rw-r--r-- | nbd.h | 14 | ||||
-rw-r--r-- | os-posix.c | 42 | ||||
-rw-r--r-- | os-win32.c | 5 | ||||
-rw-r--r-- | osdep.c | 76 | ||||
-rw-r--r-- | oslib-posix.c | 43 | ||||
-rw-r--r-- | oslib-win32.c | 5 | ||||
-rw-r--r-- | qemu-common.h | 34 | ||||
-rw-r--r-- | qemu-coroutine-io.c | 96 | ||||
-rw-r--r-- | qemu-nbd.c | 120 | ||||
-rw-r--r-- | qemu-tool.c | 42 | ||||
-rw-r--r-- | qemu_socket.h | 1 |
19 files changed, 1087 insertions, 530 deletions
diff --git a/MAINTAINERS b/MAINTAINERS index e22bfa1a30..764c92dab6 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -473,6 +473,13 @@ M: Mark McLoughlin <markmc@redhat.com> S: Maintained F: net/ +Network Block Device (NBD) +M: Paolo Bonzini <pbonzini@redhat.com> +S: Odd Fixes +F: block/nbd.c +F: nbd.* +F: qemu-nbd.c + SLIRP M: Jan Kiszka <jan.kiszka@siemens.com> S: Maintained @@ -147,8 +147,9 @@ endif qemu-img.o: qemu-img-cmds.h qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.o qemu-ga.o: $(GENERATED_HEADERS) -tools-obj-y = qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \ - qemu-timer-common.o cutils.o +tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o qemu-timer.o \ + qemu-timer-common.o main-loop.o notify.o iohandler.o cutils.o async.o +tools-obj-$(CONFIG_POSIX) += compatfd.o qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) diff --git a/Makefile.objs b/Makefile.objs index f753d838ff..8813673584 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -12,7 +12,7 @@ oslib-obj-$(CONFIG_POSIX) += oslib-posix.o qemu-thread-posix.o ####################################################################### # coroutines -coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o +coroutine-obj-y = qemu-coroutine.o qemu-coroutine-lock.o qemu-coroutine-io.o ifeq ($(CONFIG_UCONTEXT_COROUTINE),y) coroutine-obj-$(CONFIG_POSIX) += coroutine-ucontext.o else diff --git a/block/nbd.c b/block/nbd.c index 95212dac64..161b299855 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -46,14 +46,25 @@ #define logout(fmt, ...) ((void)0) #endif +#define MAX_NBD_REQUESTS 16 +#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs)) +#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs)) + typedef struct BDRVNBDState { - CoMutex lock; int sock; uint32_t nbdflags; off_t size; size_t blocksize; char *export_name; /* An NBD server may export several devices */ + CoMutex send_mutex; + CoMutex free_sema; + Coroutine *send_coroutine; + int in_flight; + + Coroutine *recv_coroutine[MAX_NBD_REQUESTS]; + struct nbd_reply reply; + /* If it begins with '/', this is a UNIX domain socket. Otherwise, * it's a string of the form <hostname|ip4|\[ip6\]>:port */ @@ -106,6 +117,130 @@ out: return err; } +static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request) +{ + int i; + + /* Poor man semaphore. The free_sema is locked when no other request + * can be accepted, and unlocked after receiving one reply. */ + if (s->in_flight >= MAX_NBD_REQUESTS - 1) { + qemu_co_mutex_lock(&s->free_sema); + assert(s->in_flight < MAX_NBD_REQUESTS); + } + s->in_flight++; + + for (i = 0; i < MAX_NBD_REQUESTS; i++) { + if (s->recv_coroutine[i] == NULL) { + s->recv_coroutine[i] = qemu_coroutine_self(); + break; + } + } + + assert(i < MAX_NBD_REQUESTS); + request->handle = INDEX_TO_HANDLE(s, i); +} + +static int nbd_have_request(void *opaque) +{ + BDRVNBDState *s = opaque; + + return s->in_flight > 0; +} + +static void nbd_reply_ready(void *opaque) +{ + BDRVNBDState *s = opaque; + int i; + + if (s->reply.handle == 0) { + /* No reply already in flight. Fetch a header. */ + if (nbd_receive_reply(s->sock, &s->reply) < 0) { + s->reply.handle = 0; + goto fail; + } + } + + /* There's no need for a mutex on the receive side, because the + * handler acts as a synchronization point and ensures that only + * one coroutine is called until the reply finishes. */ + i = HANDLE_TO_INDEX(s, s->reply.handle); + if (s->recv_coroutine[i]) { + qemu_coroutine_enter(s->recv_coroutine[i], NULL); + return; + } + +fail: + for (i = 0; i < MAX_NBD_REQUESTS; i++) { + if (s->recv_coroutine[i]) { + qemu_coroutine_enter(s->recv_coroutine[i], NULL); + } + } +} + +static void nbd_restart_write(void *opaque) +{ + BDRVNBDState *s = opaque; + qemu_coroutine_enter(s->send_coroutine, NULL); +} + +static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, + struct iovec *iov, int offset) +{ + int rc, ret; + + qemu_co_mutex_lock(&s->send_mutex); + s->send_coroutine = qemu_coroutine_self(); + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, + nbd_have_request, NULL, s); + rc = nbd_send_request(s->sock, request); + if (rc != -1 && iov) { + ret = qemu_co_sendv(s->sock, iov, request->len, offset); + if (ret != request->len) { + errno = -EIO; + rc = -1; + } + } + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, + nbd_have_request, NULL, s); + s->send_coroutine = NULL; + qemu_co_mutex_unlock(&s->send_mutex); + return rc; +} + +static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, + struct nbd_reply *reply, + struct iovec *iov, int offset) +{ + int ret; + + /* Wait until we're woken up by the read handler. TODO: perhaps + * peek at the next reply and avoid yielding if it's ours? */ + qemu_coroutine_yield(); + *reply = s->reply; + if (reply->handle != request->handle) { + reply->error = EIO; + } else { + if (iov && reply->error == 0) { + ret = qemu_co_recvv(s->sock, iov, request->len, offset); + if (ret != request->len) { + reply->error = EIO; + } + } + + /* Tell the read handler to read another header. */ + s->reply.handle = 0; + } +} + +static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request) +{ + int i = HANDLE_TO_INDEX(s, request->handle); + s->recv_coroutine[i] = NULL; + if (s->in_flight-- == MAX_NBD_REQUESTS) { + qemu_co_mutex_unlock(&s->free_sema); + } +} + static int nbd_establish_connection(BlockDriverState *bs) { BDRVNBDState *s = bs->opaque; @@ -135,8 +270,11 @@ static int nbd_establish_connection(BlockDriverState *bs) return -errno; } - /* Now that we're connected, set the socket to be non-blocking */ + /* Now that we're connected, set the socket to be non-blocking and + * kick the reply mechanism. */ socket_set_nonblock(sock); + qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, + nbd_have_request, NULL, s); s->sock = sock; s->size = size; @@ -152,11 +290,11 @@ static void nbd_teardown_connection(BlockDriverState *bs) struct nbd_request request; request.type = NBD_CMD_DISC; - request.handle = (uint64_t)(intptr_t)bs; request.from = 0; request.len = 0; nbd_send_request(s->sock, &request); + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, NULL); closesocket(s->sock); } @@ -165,6 +303,9 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) BDRVNBDState *s = bs->opaque; int result; + qemu_co_mutex_init(&s->send_mutex); + qemu_co_mutex_init(&s->free_sema); + /* Pop the config into our state object. Exit if invalid. */ result = nbd_config(s, filename, flags); if (result != 0) { @@ -176,90 +317,146 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) */ result = nbd_establish_connection(bs); - qemu_co_mutex_init(&s->lock); return result; } -static int nbd_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) +static int nbd_co_readv_1(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov, + int offset) { BDRVNBDState *s = bs->opaque; struct nbd_request request; struct nbd_reply reply; request.type = NBD_CMD_READ; - request.handle = (uint64_t)(intptr_t)bs; request.from = sector_num * 512; request.len = nb_sectors * 512; - if (nbd_send_request(s->sock, &request) == -1) - return -errno; - - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; - - if (reply.error !=0) - return -reply.error; - - if (reply.handle != request.handle) - return -EIO; - - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len) - return -EIO; + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, NULL, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, qiov->iov, offset); + } + nbd_coroutine_end(s, &request); + return -reply.error; - return 0; } -static int nbd_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) +static int nbd_co_writev_1(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov, + int offset) { BDRVNBDState *s = bs->opaque; struct nbd_request request; struct nbd_reply reply; request.type = NBD_CMD_WRITE; - request.handle = (uint64_t)(intptr_t)bs; + if (!bdrv_enable_write_cache(bs) && (s->nbdflags & NBD_FLAG_SEND_FUA)) { + request.type |= NBD_CMD_FLAG_FUA; + } + request.from = sector_num * 512; request.len = nb_sectors * 512; - if (nbd_send_request(s->sock, &request) == -1) - return -errno; - - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len) - return -EIO; - - if (nbd_receive_reply(s->sock, &reply) == -1) - return -errno; - - if (reply.error !=0) - return -reply.error; + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, qiov->iov, offset) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, NULL, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; +} - if (reply.handle != request.handle) - return -EIO; +/* qemu-nbd has a limit of slightly less than 1M per request. Try to + * remain aligned to 4K. */ +#define NBD_MAX_SECTORS 2040 - return 0; +static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) +{ + int offset = 0; + int ret; + while (nb_sectors > NBD_MAX_SECTORS) { + ret = nbd_co_readv_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset); + if (ret < 0) { + return ret; + } + offset += NBD_MAX_SECTORS * 512; + sector_num += NBD_MAX_SECTORS; + nb_sectors -= NBD_MAX_SECTORS; + } + return nbd_co_readv_1(bs, sector_num, nb_sectors, qiov, offset); } -static coroutine_fn int nbd_co_read(BlockDriverState *bs, int64_t sector_num, - uint8_t *buf, int nb_sectors) +static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num, + int nb_sectors, QEMUIOVector *qiov) { + int offset = 0; int ret; + while (nb_sectors > NBD_MAX_SECTORS) { + ret = nbd_co_writev_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset); + if (ret < 0) { + return ret; + } + offset += NBD_MAX_SECTORS * 512; + sector_num += NBD_MAX_SECTORS; + nb_sectors -= NBD_MAX_SECTORS; + } + return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset); +} + +static int nbd_co_flush(BlockDriverState *bs) +{ BDRVNBDState *s = bs->opaque; - qemu_co_mutex_lock(&s->lock); - ret = nbd_read(bs, sector_num, buf, nb_sectors); - qemu_co_mutex_unlock(&s->lock); - return ret; + struct nbd_request request; + struct nbd_reply reply; + + if (!(s->nbdflags & NBD_FLAG_SEND_FLUSH)) { + return 0; + } + + request.type = NBD_CMD_FLUSH; + if (s->nbdflags & NBD_FLAG_SEND_FUA) { + request.type |= NBD_CMD_FLAG_FUA; + } + + request.from = 0; + request.len = 0; + + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, NULL, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, NULL, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; } -static coroutine_fn int nbd_co_write(BlockDriverState *bs, int64_t sector_num, - const uint8_t *buf, int nb_sectors) +static int nbd_co_discard(BlockDriverState *bs, int64_t sector_num, + int nb_sectors) { - int ret; BDRVNBDState *s = bs->opaque; - qemu_co_mutex_lock(&s->lock); - ret = nbd_write(bs, sector_num, buf, nb_sectors); - qemu_co_mutex_unlock(&s->lock); - return ret; + struct nbd_request request; + struct nbd_reply reply; + + if (!(s->nbdflags & NBD_FLAG_SEND_TRIM)) { + return 0; + } + request.type = NBD_CMD_TRIM; + request.from = sector_num * 512;; + request.len = nb_sectors * 512; + + nbd_coroutine_start(s, &request); + if (nbd_co_send_request(s, &request, NULL, 0) == -1) { + reply.error = errno; + } else { + nbd_co_receive_reply(s, &request, &reply, NULL, 0); + } + nbd_coroutine_end(s, &request); + return -reply.error; } static void nbd_close(BlockDriverState *bs) @@ -279,14 +476,16 @@ static int64_t nbd_getlength(BlockDriverState *bs) } static BlockDriver bdrv_nbd = { - .format_name = "nbd", - .instance_size = sizeof(BDRVNBDState), - .bdrv_file_open = nbd_open, - .bdrv_read = nbd_co_read, - .bdrv_write = nbd_co_write, - .bdrv_close = nbd_close, - .bdrv_getlength = nbd_getlength, - .protocol_name = "nbd", + .format_name = "nbd", + .instance_size = sizeof(BDRVNBDState), + .bdrv_file_open = nbd_open, + .bdrv_co_readv = nbd_co_readv, + .bdrv_co_writev = nbd_co_writev, + .bdrv_close = nbd_close, + .bdrv_co_flush_to_os = nbd_co_flush, + .bdrv_co_discard = nbd_co_discard, + .bdrv_getlength = nbd_getlength, + .protocol_name = "nbd", }; static void bdrv_nbd_init(void) diff --git a/block/sheepdog.c b/block/sheepdog.c index aa9707f2ae..17a79beb24 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -443,129 +443,6 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov, return acb; } -#ifdef _WIN32 - -struct msghdr { - struct iovec *msg_iov; - size_t msg_iovlen; -}; - -static ssize_t sendmsg(int s, const struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } - - ret = send(s, buf, size, flags); - - g_free(buf); - return ret; -} - -static ssize_t recvmsg(int s, struct msghdr *msg, int flags) -{ - size_t size = 0; - char *buf, *p; - int i, ret; - - /* count the msg size */ - for (i = 0; i < msg->msg_iovlen; i++) { - size += msg->msg_iov[i].iov_len; - } - buf = g_malloc(size); - - ret = qemu_recv(s, buf, size, flags); - if (ret < 0) { - goto out; - } - - p = buf; - for (i = 0; i < msg->msg_iovlen; i++) { - memcpy(msg->msg_iov[i].iov_base, p, msg->msg_iov[i].iov_len); - p += msg->msg_iov[i].iov_len; - } -out: - g_free(buf); - return ret; -} - -#endif - -/* - * Send/recv data with iovec buffers - * - * This function send/recv data from/to the iovec buffer directly. - * The first `offset' bytes in the iovec buffer are skipped and next - * `len' bytes are used. - * - * For example, - * - * do_send_recv(sockfd, iov, len, offset, 1); - * - * is equals to - * - * char *buf = malloc(size); - * iov_to_buf(iov, iovcnt, buf, offset, size); - * send(sockfd, buf, size, 0); - * free(buf); - */ -static int do_send_recv(int sockfd, struct iovec *iov, int len, int offset, - int write) -{ - struct msghdr msg; - int ret, diff; - - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = iov; - msg.msg_iovlen = 1; - - len += offset; - - while (iov->iov_len < len) { - len -= iov->iov_len; - - iov++; - msg.msg_iovlen++; - } - - diff = iov->iov_len - len; - iov->iov_len -= diff; - - while (msg.msg_iov->iov_len <= offset) { - offset -= msg.msg_iov->iov_len; - - msg.msg_iov++; - msg.msg_iovlen--; - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base + offset; - msg.msg_iov->iov_len -= offset; - - if (write) { - ret = sendmsg(sockfd, &msg, 0); - } else { - ret = recvmsg(sockfd, &msg, 0); - } - - msg.msg_iov->iov_base = (char *) msg.msg_iov->iov_base - offset; - msg.msg_iov->iov_len += offset; - - iov->iov_len += diff; - return ret; -} - static int connect_to_sdog(const char *addr, const char *port) { char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; @@ -618,83 +495,19 @@ success: return fd; } -static int do_readv_writev(int sockfd, struct iovec *iov, int len, - int iov_offset, int write) -{ - int ret; -again: - ret = do_send_recv(sockfd, iov, len, iov_offset, write); - if (ret < 0) { - if (errno == EINTR) { - goto again; - } - if (errno == EAGAIN) { - if (qemu_in_coroutine()) { - qemu_coroutine_yield(); - } - goto again; - } - error_report("failed to recv a rsp, %s", strerror(errno)); - return 1; - } - - iov_offset += ret; - len -= ret; - if (len) { - goto again; - } - - return 0; -} - -static int do_readv(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 0); -} - -static int do_writev(int sockfd, struct iovec *iov, int len, int iov_offset) -{ - return do_readv_writev(sockfd, iov, len, iov_offset, 1); -} - -static int do_read_write(int sockfd, void *buf, int len, int write) -{ - struct iovec iov; - - iov.iov_base = buf; - iov.iov_len = len; - - return do_readv_writev(sockfd, &iov, len, 0, write); -} - -static int do_read(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 0); -} - -static int do_write(int sockfd, void *buf, int len) -{ - return do_read_write(sockfd, buf, len, 1); -} - static int send_req(int sockfd, SheepdogReq *hdr, void *data, unsigned int *wlen) { int ret; - struct iovec iov[2]; - iov[0].iov_base = hdr; - iov[0].iov_len = sizeof(*hdr); - - if (*wlen) { - iov[1].iov_base = data; - iov[1].iov_len = *wlen; + ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { + error_report("failed to send a req, %s", strerror(errno)); } - ret = do_writev(sockfd, iov, sizeof(*hdr) + *wlen, 0); - if (ret) { + ret = qemu_send_full(sockfd, data, *wlen, 0); + if (ret < *wlen) { error_report("failed to send a req, %s", strerror(errno)); - ret = -1; } return ret; @@ -705,16 +518,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, { int ret; + socket_set_block(sockfd); ret = send_req(sockfd, hdr, data, wlen); - if (ret) { - ret = -1; + if (ret < 0) { goto out; } - ret = do_read(sockfd, hdr, sizeof(*hdr)); - if (ret) { + ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0); + if (ret < sizeof(*hdr)) { error_report("failed to get a rsp, %s", strerror(errno)); - ret = -1; goto out; } @@ -723,15 +535,15 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data, } if (*rlen) { - ret = do_read(sockfd, data, *rlen); - if (ret) { + ret = qemu_recv_full(sockfd, data, *rlen, 0); + if (ret < *rlen) { error_report("failed to get the data, %s", strerror(errno)); - ret = -1; goto out; } } ret = 0; out: + socket_set_nonblock(sockfd); return ret; } @@ -793,8 +605,8 @@ static void coroutine_fn aio_read_response(void *opaque) } /* read a header */ - ret = do_read(fd, &rsp, sizeof(rsp)); - if (ret) { + ret = qemu_co_recv(fd, &rsp, sizeof(rsp)); + if (ret < 0) { error_report("failed to get the header, %s", strerror(errno)); goto out; } @@ -839,9 +651,9 @@ static void coroutine_fn aio_read_response(void *opaque) } break; case AIOCB_READ_UDATA: - ret = do_readv(fd, acb->qiov->iov, rsp.data_length, - aio_req->iov_offset); - if (ret) { + ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length, + aio_req->iov_offset); + if (ret < 0) { error_report("failed to get the data, %s", strerror(errno)); goto out; } @@ -890,22 +702,6 @@ static int aio_flush_request(void *opaque) return !QLIST_EMPTY(&s->outstanding_aio_head); } -#if !defined(SOL_TCP) || !defined(TCP_CORK) - -static int set_cork(int fd, int v) -{ - return 0; -} - -#else - -static int set_cork(int fd, int v) -{ - return setsockopt(fd, SOL_TCP, TCP_CORK, &v, sizeof(v)); -} - -#endif - static int set_nodelay(int fd) { int ret, opt; @@ -1111,26 +907,26 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, s->co_send = qemu_coroutine_self(); qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, aio_flush_request, NULL, s); - set_cork(s->fd, 1); + socket_set_cork(s->fd, 1); /* send a header */ - ret = do_write(s->fd, &hdr, sizeof(hdr)); - if (ret) { + ret = qemu_co_send(s->fd, &hdr, sizeof(hdr)); + if (ret < 0) { qemu_co_mutex_unlock(&s->lock); error_report("failed to send a req, %s", strerror(errno)); return -EIO; } if (wlen) { - ret = do_writev(s->fd, iov, wlen, aio_req->iov_offset); - if (ret) { + ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset); + if (ret < 0) { qemu_co_mutex_unlock(&s->lock); error_report("failed to send a data, %s", strerror(errno)); return -EIO; } } - set_cork(s->fd, 0); + socket_set_cork(s->fd, 0); qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, aio_flush_request, NULL, s); qemu_co_mutex_unlock(&s->lock); @@ -25,6 +25,8 @@ #include "host-utils.h" #include <math.h> +#include "qemu_socket.h" + void pstrcpy(char *buf, int buf_size, const char *str) { int c; @@ -403,3 +405,112 @@ int qemu_parse_fd(const char *param) } return fd; } + +/* + * Send/recv data with iovec buffers + * + * This function send/recv data from/to the iovec buffer directly. + * The first `offset' bytes in the iovec buffer are skipped and next + * `len' bytes are used. + * + * For example, + * + * do_sendv_recvv(sockfd, iov, len, offset, 1); + * + * is equal to + * + * char *buf = malloc(size); + * iov_to_buf(iov, iovcnt, buf, offset, size); + * send(sockfd, buf, size, 0); + * free(buf); + */ +static int do_sendv_recvv(int sockfd, struct iovec *iov, int len, int offset, + int do_sendv) +{ + int ret, diff, iovlen; + struct iovec *last_iov; + + /* last_iov is inclusive, so count from one. */ + iovlen = 1; + last_iov = iov; + len += offset; + + while (last_iov->iov_len < len) { + len -= last_iov->iov_len; + + last_iov++; + iovlen++; + } + + diff = last_iov->iov_len - len; + last_iov->iov_len -= diff; + + while (iov->iov_len <= offset) { + offset -= iov->iov_len; + + iov++; + iovlen--; + } + + iov->iov_base = (char *) iov->iov_base + offset; + iov->iov_len -= offset; + + { +#if defined CONFIG_IOVEC && defined CONFIG_POSIX + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = iov; + msg.msg_iovlen = iovlen; + + do { + if (do_sendv) { + ret = sendmsg(sockfd, &msg, 0); + } else { + ret = recvmsg(sockfd, &msg, 0); + } + } while (ret == -1 && errno == EINTR); +#else + struct iovec *p = iov; + ret = 0; + while (iovlen > 0) { + int rc; + if (do_sendv) { + rc = send(sockfd, p->iov_base, p->iov_len, 0); + } else { + rc = qemu_recv(sockfd, p->iov_base, p->iov_len, 0); + } + if (rc == -1) { + if (errno == EINTR) { + continue; + } + if (ret == 0) { + ret = -1; + } + break; + } + if (rc == 0) { + break; + } + ret += rc; + iovlen--, p++; + } +#endif + } + + /* Undo the changes above */ + iov->iov_base = (char *) iov->iov_base - offset; + iov->iov_len += offset; + last_iov->iov_len += diff; + return ret; +} + +int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset) +{ + return do_sendv_recvv(sockfd, iov, len, iov_offset, 0); +} + +int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset) +{ + return do_sendv_recvv(sockfd, iov, len, iov_offset, 1); +} + diff --git a/main-loop.h b/main-loop.h index 876092dd15..f9710136c9 100644 --- a/main-loop.h +++ b/main-loop.h @@ -324,6 +324,9 @@ int qemu_add_child_watch(pid_t pid); * by threads other than the main loop thread when calling * qemu_bh_new(), qemu_set_fd_handler() and basically all other * functions documented in this file. + * + * NOTE: tools currently are single-threaded and qemu_mutex_lock_iothread + * is a no-op there. */ void qemu_mutex_lock_iothread(void); @@ -336,6 +339,9 @@ void qemu_mutex_lock_iothread(void); * as soon as possible by threads other than the main loop thread, * because it prevents the main loop from processing callbacks, * including timers and bottom halves. + * + * NOTE: tools currently are single-threaded and qemu_mutex_unlock_iothread + * is a no-op there. */ void qemu_mutex_unlock_iothread(void); @@ -18,6 +18,9 @@ #include "nbd.h" #include "block.h" +#include "block_int.h" + +#include "qemu-coroutine.h" #include <errno.h> #include <string.h> @@ -35,6 +38,7 @@ #endif #include "qemu_socket.h" +#include "qemu-queue.h" //#define DEBUG_NBD @@ -81,6 +85,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) { size_t offset = 0; + if (qemu_in_coroutine()) { + if (do_read) { + return qemu_co_recv(fd, buffer, size); + } else { + return qemu_co_send(fd, buffer, size); + } + } + while (offset < size) { ssize_t len; @@ -178,7 +190,7 @@ int unix_socket_outgoing(const char *path) Request (type == 2) */ -int nbd_negotiate(int csock, off_t size, uint32_t flags) +static int nbd_send_negotiate(int csock, off_t size, uint32_t flags) { char buf[8 + 8 + 8 + 128]; @@ -194,7 +206,9 @@ int nbd_negotiate(int csock, off_t size, uint32_t flags) memcpy(buf, "NBDMAGIC", 8); cpu_to_be64w((uint64_t*)(buf + 8), 0x00420281861253LL); cpu_to_be64w((uint64_t*)(buf + 16), size); - cpu_to_be32w((uint32_t*)(buf + 24), flags | NBD_FLAG_HAS_FLAGS); + cpu_to_be32w((uint32_t*)(buf + 24), + flags | NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM | + NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA); memset(buf + 28, 0, 124); if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) { @@ -348,6 +362,15 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, #ifdef __linux__ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize) { + TRACE("Setting NBD socket"); + + if (ioctl(fd, NBD_SET_SOCK, csock) == -1) { + int serrno = errno; + LOG("Failed to set NBD socket"); + errno = serrno; + return -1; + } + TRACE("Setting block size to %lu", (unsigned long)blocksize); if (ioctl(fd, NBD_SET_BLKSIZE, blocksize) == -1) { @@ -386,24 +409,6 @@ int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize) return -1; } - TRACE("Clearing NBD socket"); - - if (ioctl(fd, NBD_CLEAR_SOCK) == -1) { - int serrno = errno; - LOG("Failed clearing NBD socket"); - errno = serrno; - return -1; - } - - TRACE("Setting NBD socket"); - - if (ioctl(fd, NBD_SET_SOCK, csock) == -1) { - int serrno = errno; - LOG("Failed to set NBD socket"); - errno = serrno; - return -1; - } - TRACE("Negotiation ended"); return 0; @@ -582,121 +587,369 @@ static int nbd_send_reply(int csock, struct nbd_reply *reply) return 0; } -int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, - off_t *offset, uint32_t nbdflags, uint8_t *data, int data_size) +#define MAX_NBD_REQUESTS 16 + +typedef struct NBDRequest NBDRequest; + +struct NBDRequest { + QSIMPLEQ_ENTRY(NBDRequest) entry; + NBDClient *client; + uint8_t *data; +}; + +struct NBDExport { + BlockDriverState *bs; + off_t dev_offset; + off_t size; + uint32_t nbdflags; + QSIMPLEQ_HEAD(, NBDRequest) requests; +}; + +struct NBDClient { + int refcount; + void (*close)(NBDClient *client); + + NBDExport *exp; + int sock; + + Coroutine *recv_coroutine; + + CoMutex send_lock; + Coroutine *send_coroutine; + + int nb_requests; +}; + +static void nbd_client_get(NBDClient *client) { - struct nbd_request request; - struct nbd_reply reply; + client->refcount++; +} - TRACE("Reading request."); +static void nbd_client_put(NBDClient *client) +{ + if (--client->refcount == 0) { + g_free(client); + } +} - if (nbd_receive_request(csock, &request) == -1) - return -1; +static void nbd_client_close(NBDClient *client) +{ + qemu_set_fd_handler2(client->sock, NULL, NULL, NULL, NULL); + close(client->sock); + client->sock = -1; + if (client->close) { + client->close(client); + } + nbd_client_put(client); +} + +static NBDRequest *nbd_request_get(NBDClient *client) +{ + NBDRequest *req; + NBDExport *exp = client->exp; - if (request.len + NBD_REPLY_SIZE > data_size) { + assert(client->nb_requests <= MAX_NBD_REQUESTS - 1); + client->nb_requests++; + + if (QSIMPLEQ_EMPTY(&exp->requests)) { + req = g_malloc0(sizeof(NBDRequest)); + req->data = qemu_blockalign(exp->bs, NBD_BUFFER_SIZE); + } else { + req = QSIMPLEQ_FIRST(&exp->requests); + QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); + } + nbd_client_get(client); + req->client = client; + return req; +} + +static void nbd_request_put(NBDRequest *req) +{ + NBDClient *client = req->client; + QSIMPLEQ_INSERT_HEAD(&client->exp->requests, req, entry); + if (client->nb_requests-- == MAX_NBD_REQUESTS) { + qemu_notify_event(); + } + nbd_client_put(client); +} + +NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, + off_t size, uint32_t nbdflags) +{ + NBDExport *exp = g_malloc0(sizeof(NBDExport)); + QSIMPLEQ_INIT(&exp->requests); + exp->bs = bs; + exp->dev_offset = dev_offset; + exp->nbdflags = nbdflags; + exp->size = size == -1 ? exp->bs->total_sectors * 512 : size; + return exp; +} + +void nbd_export_close(NBDExport *exp) +{ + while (!QSIMPLEQ_EMPTY(&exp->requests)) { + NBDRequest *first = QSIMPLEQ_FIRST(&exp->requests); + QSIMPLEQ_REMOVE_HEAD(&exp->requests, entry); + qemu_vfree(first->data); + g_free(first); + } + + bdrv_close(exp->bs); + g_free(exp); +} + +static int nbd_can_read(void *opaque); +static void nbd_read(void *opaque); +static void nbd_restart_write(void *opaque); + +static int nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, + int len) +{ + NBDClient *client = req->client; + int csock = client->sock; + int rc, ret; + + qemu_co_mutex_lock(&client->send_lock); + qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, + nbd_restart_write, client); + client->send_coroutine = qemu_coroutine_self(); + + if (!len) { + rc = nbd_send_reply(csock, reply); + if (rc == -1) { + rc = -errno; + } + } else { + socket_set_cork(csock, 1); + rc = nbd_send_reply(csock, reply); + if (rc != -1) { + ret = qemu_co_send(csock, req->data, len); + if (ret != len) { + errno = EIO; + rc = -1; + } + } + if (rc == -1) { + rc = -errno; + } + socket_set_cork(csock, 0); + } + + client->send_coroutine = NULL; + qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + qemu_co_mutex_unlock(&client->send_lock); + return rc; +} + +static int nbd_co_receive_request(NBDRequest *req, struct nbd_request *request) +{ + NBDClient *client = req->client; + int csock = client->sock; + int rc; + + client->recv_coroutine = qemu_coroutine_self(); + if (nbd_receive_request(csock, request) == -1) { + rc = -EIO; + goto out; + } + + if (request->len > NBD_BUFFER_SIZE) { LOG("len (%u) is larger than max len (%u)", - request.len + NBD_REPLY_SIZE, data_size); - errno = EINVAL; - return -1; + request->len, NBD_BUFFER_SIZE); + rc = -EINVAL; + goto out; } - if ((request.from + request.len) < request.from) { + if ((request->from + request->len) < request->from) { LOG("integer overflow detected! " "you're probably being attacked"); - errno = EINVAL; - return -1; + rc = -EINVAL; + goto out; } - if ((request.from + request.len) > size) { - LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64 - ", Offset: %" PRIu64 "\n", - request.from, request.len, (uint64_t)size, dev_offset); - LOG("requested operation past EOF--bad client?"); - errno = EINVAL; - return -1; + TRACE("Decoding type"); + + if ((request->type & NBD_CMD_MASK_COMMAND) == NBD_CMD_WRITE) { + TRACE("Reading %u byte(s)", request->len); + + if (qemu_co_recv(csock, req->data, request->len) != request->len) { + LOG("reading from socket failed"); + rc = -EIO; + goto out; + } } + rc = 0; - TRACE("Decoding type"); +out: + client->recv_coroutine = NULL; + return rc; +} + +static void nbd_trip(void *opaque) +{ + NBDClient *client = opaque; + NBDRequest *req = nbd_request_get(client); + NBDExport *exp = client->exp; + struct nbd_request request; + struct nbd_reply reply; + int ret; + + TRACE("Reading request."); + + ret = nbd_co_receive_request(req, &request); + if (ret == -EIO) { + goto out; + } reply.handle = request.handle; reply.error = 0; - switch (request.type) { + if (ret < 0) { + reply.error = -ret; + goto error_reply; + } + + if ((request.from + request.len) > exp->size) { + LOG("From: %" PRIu64 ", Len: %u, Size: %" PRIu64 + ", Offset: %" PRIu64 "\n", + request.from, request.len, + (uint64_t)exp->size, exp->dev_offset); + LOG("requested operation past EOF--bad client?"); + goto invalid_request; + } + + switch (request.type & NBD_CMD_MASK_COMMAND) { case NBD_CMD_READ: TRACE("Request type is READ"); - if (bdrv_read(bs, (request.from + dev_offset) / 512, - data + NBD_REPLY_SIZE, - request.len / 512) == -1) { + ret = bdrv_read(exp->bs, (request.from + exp->dev_offset) / 512, + req->data, request.len / 512); + if (ret < 0) { LOG("reading from file failed"); - errno = EINVAL; - return -1; + reply.error = -ret; + goto error_reply; } - *offset += request.len; TRACE("Read %u byte(s)", request.len); - - /* Reply - [ 0 .. 3] magic (NBD_REPLY_MAGIC) - [ 4 .. 7] error (0 == no error) - [ 7 .. 15] handle - */ - - cpu_to_be32w((uint32_t*)data, NBD_REPLY_MAGIC); - cpu_to_be32w((uint32_t*)(data + 4), reply.error); - cpu_to_be64w((uint64_t*)(data + 8), reply.handle); - - TRACE("Sending data to client"); - - if (write_sync(csock, data, - request.len + NBD_REPLY_SIZE) != - request.len + NBD_REPLY_SIZE) { - LOG("writing to socket failed"); - errno = EINVAL; - return -1; - } + if (nbd_co_send_reply(req, &reply, request.len) < 0) + goto out; break; case NBD_CMD_WRITE: TRACE("Request type is WRITE"); - TRACE("Reading %u byte(s)", request.len); - - if (read_sync(csock, data, request.len) != request.len) { - LOG("reading from socket failed"); - errno = EINVAL; - return -1; + if (exp->nbdflags & NBD_FLAG_READ_ONLY) { + TRACE("Server is read-only, return error"); + reply.error = EROFS; + goto error_reply; } - if (nbdflags & NBD_FLAG_READ_ONLY) { - TRACE("Server is read-only, return error"); - reply.error = 1; - } else { - TRACE("Writing to device"); + TRACE("Writing to device"); - if (bdrv_write(bs, (request.from + dev_offset) / 512, - data, request.len / 512) == -1) { - LOG("writing to file failed"); - errno = EINVAL; - return -1; - } + ret = bdrv_write(exp->bs, (request.from + exp->dev_offset) / 512, + req->data, request.len / 512); + if (ret < 0) { + LOG("writing to file failed"); + reply.error = -ret; + goto error_reply; + } - *offset += request.len; + if (request.type & NBD_CMD_FLAG_FUA) { + ret = bdrv_co_flush(exp->bs); + if (ret < 0) { + LOG("flush failed"); + reply.error = -ret; + goto error_reply; + } } - if (nbd_send_reply(csock, &reply) == -1) - return -1; + if (nbd_co_send_reply(req, &reply, 0) < 0) + goto out; break; case NBD_CMD_DISC: TRACE("Request type is DISCONNECT"); errno = 0; - return 1; + goto out; + case NBD_CMD_FLUSH: + TRACE("Request type is FLUSH"); + + ret = bdrv_co_flush(exp->bs); + if (ret < 0) { + LOG("flush failed"); + reply.error = -ret; + } + + if (nbd_co_send_reply(req, &reply, 0) < 0) + goto out; + break; + case NBD_CMD_TRIM: + TRACE("Request type is TRIM"); + ret = bdrv_co_discard(exp->bs, (request.from + exp->dev_offset) / 512, + request.len / 512); + if (ret < 0) { + LOG("discard failed"); + reply.error = -ret; + } + if (nbd_co_send_reply(req, &reply, 0) < 0) + goto out; + break; default: LOG("invalid request type (%u) received", request.type); - errno = EINVAL; - return -1; + invalid_request: + reply.error = -EINVAL; + error_reply: + if (nbd_co_send_reply(req, &reply, 0) == -1) + goto out; + break; } TRACE("Request/Reply complete"); - return 0; + nbd_request_put(req); + return; + +out: + nbd_request_put(req); + nbd_client_close(client); +} + +static int nbd_can_read(void *opaque) +{ + NBDClient *client = opaque; + + return client->recv_coroutine || client->nb_requests < MAX_NBD_REQUESTS; +} + +static void nbd_read(void *opaque) +{ + NBDClient *client = opaque; + + if (client->recv_coroutine) { + qemu_coroutine_enter(client->recv_coroutine, NULL); + } else { + qemu_coroutine_enter(qemu_coroutine_create(nbd_trip), client); + } +} + +static void nbd_restart_write(void *opaque) +{ + NBDClient *client = opaque; + + qemu_coroutine_enter(client->send_coroutine, NULL); +} + +NBDClient *nbd_client_new(NBDExport *exp, int csock, + void (*close)(NBDClient *)) +{ + NBDClient *client; + if (nbd_send_negotiate(csock, exp->size, exp->nbdflags) == -1) { + return NULL; + } + client = g_malloc0(sizeof(NBDClient)); + client->refcount = 1; + client->exp = exp; + client->sock = csock; + client->close = close; + qemu_co_mutex_init(&client->send_lock); + qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client); + return client; } @@ -57,6 +57,8 @@ enum { #define NBD_DEFAULT_PORT 10809 +#define NBD_BUFFER_SIZE (1024*1024) + size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read); int tcp_socket_outgoing(const char *address, uint16_t port); int tcp_socket_incoming(const char *address, uint16_t port); @@ -65,15 +67,21 @@ int tcp_socket_incoming_spec(const char *address_and_port); int unix_socket_outgoing(const char *path); int unix_socket_incoming(const char *path); -int nbd_negotiate(int csock, off_t size, uint32_t flags); int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, off_t *size, size_t *blocksize); int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize); int nbd_send_request(int csock, struct nbd_request *request); int nbd_receive_reply(int csock, struct nbd_reply *reply); -int nbd_trip(BlockDriverState *bs, int csock, off_t size, uint64_t dev_offset, - off_t *offset, uint32_t nbdflags, uint8_t *data, int data_size); int nbd_client(int fd); int nbd_disconnect(int fd); +typedef struct NBDExport NBDExport; +typedef struct NBDClient NBDClient; + +NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, + off_t size, uint32_t nbdflags); +void nbd_export_close(NBDExport *exp); +NBDClient *nbd_client_new(NBDExport *exp, int csock, + void (*close)(NBDClient *)); + #endif diff --git a/os-posix.c b/os-posix.c index dc4a6bb3ff..5c437ca12c 100644 --- a/os-posix.c +++ b/os-posix.c @@ -42,11 +42,6 @@ #ifdef CONFIG_LINUX #include <sys/prctl.h> -#include <sys/syscall.h> -#endif - -#ifdef CONFIG_EVENTFD -#include <sys/eventfd.h> #endif static struct passwd *user_pwd; @@ -333,34 +328,6 @@ void os_set_line_buffering(void) setvbuf(stdout, NULL, _IOLBF, 0); } -/* - * Creates an eventfd that looks like a pipe and has EFD_CLOEXEC set. - */ -int qemu_eventfd(int fds[2]) -{ -#ifdef CONFIG_EVENTFD - int ret; - - ret = eventfd(0, 0); - if (ret >= 0) { - fds[0] = ret; - qemu_set_cloexec(ret); - if ((fds[1] = dup(ret)) == -1) { - close(ret); - return -1; - } - qemu_set_cloexec(fds[1]); - return 0; - } - - if (errno != ENOSYS) { - return -1; - } -#endif - - return qemu_pipe(fds); -} - int qemu_create_pidfile(const char *filename) { char buffer[128]; @@ -384,12 +351,3 @@ int qemu_create_pidfile(const char *filename) close(fd); return 0; } - -int qemu_get_thread_id(void) -{ -#if defined (__linux__) - return syscall(SYS_gettid); -#else - return getpid(); -#endif -} diff --git a/os-win32.c b/os-win32.c index 8523d8d0c4..ad76370c7c 100644 --- a/os-win32.c +++ b/os-win32.c @@ -151,8 +151,3 @@ int qemu_create_pidfile(const char *filename) } return 0; } - -int qemu_get_thread_id(void) -{ - return GetCurrentThreadId(); -} @@ -48,6 +48,15 @@ extern int madvise(caddr_t, size_t, int); #include "trace.h" #include "qemu_socket.h" +int socket_set_cork(int fd, int v) +{ +#if defined(SOL_TCP) && defined(TCP_CORK) + return setsockopt(fd, SOL_TCP, TCP_CORK, &v, sizeof(v)); +#else + return 0; +#endif +} + int qemu_madvise(void *addr, size_t len, int advice) { if (advice == QEMU_MADV_INVALID) { @@ -166,3 +175,70 @@ int qemu_accept(int s, struct sockaddr *addr, socklen_t *addrlen) return ret; } + +/* + * A variant of send(2) which handles partial write. + * + * Return the number of bytes transferred, which is only + * smaller than `count' if there is an error. + * + * This function won't work with non-blocking fd's. + * Any of the possibilities with non-bloking fd's is bad: + * - return a short write (then name is wrong) + * - busy wait adding (errno == EAGAIN) to the loop + */ +ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags) +{ + ssize_t ret = 0; + ssize_t total = 0; + + while (count) { + ret = send(fd, buf, count, flags); + if (ret < 0) { + if (errno == EINTR) { + continue; + } + break; + } + + count -= ret; + buf += ret; + total += ret; + } + + return total; +} + +/* + * A variant of recv(2) which handles partial write. + * + * Return the number of bytes transferred, which is only + * smaller than `count' if there is an error. + * + * This function won't work with non-blocking fd's. + * Any of the possibilities with non-bloking fd's is bad: + * - return a short write (then name is wrong) + * - busy wait adding (errno == EAGAIN) to the loop + */ +ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags) +{ + ssize_t ret = 0; + ssize_t total = 0; + + while (count) { + ret = qemu_recv(fd, buf, count, flags); + if (ret <= 0) { + if (ret < 0 && errno == EINTR) { + continue; + } + break; + } + + count -= ret; + buf += ret; + total += ret; + } + + return total; +} + diff --git a/oslib-posix.c b/oslib-posix.c index ce755496b5..b6a3c7fc55 100644 --- a/oslib-posix.c +++ b/oslib-posix.c @@ -55,6 +55,21 @@ static int running_on_valgrind = -1; #else # define running_on_valgrind 0 #endif +#ifdef CONFIG_LINUX +#include <sys/syscall.h> +#endif +#ifdef CONFIG_EVENTFD +#include <sys/eventfd.h> +#endif + +int qemu_get_thread_id(void) +{ +#if defined(__linux__) + return syscall(SYS_gettid); +#else + return getpid(); +#endif +} int qemu_daemon(int nochdir, int noclose) { @@ -162,6 +177,34 @@ int qemu_pipe(int pipefd[2]) return ret; } +/* + * Creates an eventfd that looks like a pipe and has EFD_CLOEXEC set. + */ +int qemu_eventfd(int fds[2]) +{ +#ifdef CONFIG_EVENTFD + int ret; + + ret = eventfd(0, 0); + if (ret >= 0) { + fds[0] = ret; + fds[1] = dup(ret); + if (fds[1] == -1) { + close(ret); + return -1; + } + qemu_set_cloexec(ret); + qemu_set_cloexec(fds[1]); + return 0; + } + if (errno != ENOSYS) { + return -1; + } +#endif + + return qemu_pipe(fds); +} + int qemu_utimens(const char *path, const struct timespec *times) { struct timeval tv[2], tv_now; diff --git a/oslib-win32.c b/oslib-win32.c index 5e3de7dc8a..ce3021e6c7 100644 --- a/oslib-win32.c +++ b/oslib-win32.c @@ -118,3 +118,8 @@ int qemu_gettimeofday(qemu_timeval *tp) Do not set errno on error. */ return 0; } + +int qemu_get_thread_id(void) +{ + return GetCurrentThreadId(); +} diff --git a/qemu-common.h b/qemu-common.h index b2de015629..6ab7dfb1b9 100644 --- a/qemu-common.h +++ b/qemu-common.h @@ -173,6 +173,10 @@ void *qemu_oom_check(void *ptr); int qemu_open(const char *name, int flags, ...); ssize_t qemu_write_full(int fd, const void *buf, size_t count) QEMU_WARN_UNUSED_RESULT; +ssize_t qemu_send_full(int fd, const void *buf, size_t count, int flags) + QEMU_WARN_UNUSED_RESULT; +ssize_t qemu_recv_full(int fd, void *buf, size_t count, int flags) + QEMU_WARN_UNUSED_RESULT; void qemu_set_cloexec(int fd); #ifndef _WIN32 @@ -186,6 +190,9 @@ int qemu_pipe(int pipefd[2]); #define qemu_recv(sockfd, buf, len, flags) recv(sockfd, buf, len, flags) #endif +int qemu_recvv(int sockfd, struct iovec *iov, int len, int iov_offset); +int qemu_sendv(int sockfd, struct iovec *iov, int len, int iov_offset); + /* Error handling. */ void QEMU_NORETURN hw_error(const char *fmt, ...) GCC_FMT_ATTR(1, 2); @@ -272,6 +279,33 @@ struct qemu_work_item { void qemu_init_vcpu(void *env); #endif +/** + * Sends an iovec (or optionally a part of it) down a socket, yielding + * when the socket is full. + */ +int qemu_co_sendv(int sockfd, struct iovec *iov, + int len, int iov_offset); + +/** + * Receives data into an iovec (or optionally into a part of it) from + * a socket, yielding when there is no data in the socket. + */ +int qemu_co_recvv(int sockfd, struct iovec *iov, + int len, int iov_offset); + + +/** + * Sends a buffer down a socket, yielding when the socket is full. + */ +int qemu_co_send(int sockfd, void *buf, int len); + +/** + * Receives data into a buffer from a socket, yielding when there + * is no data in the socket. + */ +int qemu_co_recv(int sockfd, void *buf, int len); + + typedef struct QEMUIOVector { struct iovec *iov; int niov; diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c new file mode 100644 index 0000000000..40fd514395 --- /dev/null +++ b/qemu-coroutine-io.c @@ -0,0 +1,96 @@ +/* + * Coroutine-aware I/O functions + * + * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation. + * Copyright (c) 2011, Red Hat, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +#include "qemu-common.h" +#include "qemu_socket.h" +#include "qemu-coroutine.h" + +int coroutine_fn qemu_co_recvv(int sockfd, struct iovec *iov, + int len, int iov_offset) +{ + int total = 0; + int ret; + while (len) { + ret = qemu_recvv(sockfd, iov, len, iov_offset + total); + if (ret < 0) { + if (errno == EAGAIN) { + qemu_coroutine_yield(); + continue; + } + if (total == 0) { + total = -1; + } + break; + } + if (ret == 0) { + break; + } + total += ret, len -= ret; + } + + return total; +} + +int coroutine_fn qemu_co_sendv(int sockfd, struct iovec *iov, + int len, int iov_offset) +{ + int total = 0; + int ret; + while (len) { + ret = qemu_sendv(sockfd, iov, len, iov_offset + total); + if (ret < 0) { + if (errno == EAGAIN) { + qemu_coroutine_yield(); + continue; + } + if (total == 0) { + total = -1; + } + break; + } + total += ret, len -= ret; + } + + return total; +} + +int coroutine_fn qemu_co_recv(int sockfd, void *buf, int len) +{ + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = len; + + return qemu_co_recvv(sockfd, &iov, len, 0); +} + +int coroutine_fn qemu_co_send(int sockfd, void *buf, int len) +{ + struct iovec iov; + + iov.iov_base = buf; + iov.iov_len = len; + + return qemu_co_sendv(sockfd, &iov, len, 0); +} diff --git a/qemu-nbd.c b/qemu-nbd.c index 291cba2eaa..155b05840b 100644 --- a/qemu-nbd.c +++ b/qemu-nbd.c @@ -35,13 +35,15 @@ #define SOCKET_PATH "/var/lock/qemu-nbd-%s" -#define NBD_BUFFER_SIZE (1024*1024) - -static int sigterm_wfd; +static NBDExport *exp; static int verbose; static char *device; static char *srcpath; static char *sockpath; +static bool sigterm_reported; +static bool nbd_started; +static int shared = 1; +static int nb_fds; static void usage(const char *name) { @@ -170,10 +172,8 @@ static int find_partition(BlockDriverState *bs, int partition, static void termsig_handler(int signum) { - static int sigterm_reported; - if (!sigterm_reported) { - sigterm_reported = (write(sigterm_wfd, "", 1) == 1); - } + sigterm_reported = true; + qemu_notify_event(); } static void *show_parts(void *arg) @@ -244,17 +244,38 @@ out: return (void *) EXIT_FAILURE; } +static int nbd_can_accept(void *opaque) +{ + return nb_fds < shared; +} + +static void nbd_client_closed(NBDClient *client) +{ + nb_fds--; + qemu_notify_event(); +} + +static void nbd_accept(void *opaque) +{ + int server_fd = (uintptr_t) opaque; + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + + int fd = accept(server_fd, (struct sockaddr *)&addr, &addr_len); + nbd_started = true; + if (fd != -1 && nbd_client_new(exp, fd, nbd_client_closed)) { + nb_fds++; + } +} + int main(int argc, char **argv) { BlockDriverState *bs; off_t dev_offset = 0; - off_t offset = 0; uint32_t nbdflags = 0; bool disconnect = false; const char *bindto = "0.0.0.0"; int port = NBD_DEFAULT_PORT; - struct sockaddr_in addr; - socklen_t addr_len = sizeof(addr); off_t fd_size; const char *sopt = "hVb:o:p:rsnP:c:dvk:e:t"; struct option lopt[] = { @@ -282,14 +303,7 @@ int main(int argc, char **argv) int flags = BDRV_O_RDWR; int partition = -1; int ret; - int shared = 1; - uint8_t *data; - fd_set fds; - int *sharing_fds; int fd; - int i; - int nb_fds = 0; - int max_fd; int persistent = 0; pthread_t client_thread; @@ -297,12 +311,6 @@ int main(int argc, char **argv) * handler ensures that "qemu-nbd -v -c" exits with a nice status code. */ struct sigaction sa_sigterm; - int sigterm_fd[2]; - if (qemu_pipe(sigterm_fd) == -1) { - err(EXIT_FAILURE, "Error setting up communication pipe"); - } - - sigterm_wfd = sigterm_fd[1]; memset(&sa_sigterm, 0, sizeof(sa_sigterm)); sa_sigterm.sa_handler = termsig_handler; sigaction(SIGTERM, &sa_sigterm, NULL); @@ -492,16 +500,17 @@ int main(int argc, char **argv) err(EXIT_FAILURE, "Could not find partition %d", partition); } - sharing_fds = g_malloc((shared + 1) * sizeof(int)); + exp = nbd_export_new(bs, dev_offset, fd_size, nbdflags); if (sockpath) { - sharing_fds[0] = unix_socket_incoming(sockpath); + fd = unix_socket_incoming(sockpath); } else { - sharing_fds[0] = tcp_socket_incoming(bindto, port); + fd = tcp_socket_incoming(bindto, port); } - if (sharing_fds[0] == -1) + if (fd == -1) { return 1; + } if (device) { int ret; @@ -516,60 +525,15 @@ int main(int argc, char **argv) memset(&client_thread, 0, sizeof(client_thread)); } - max_fd = sharing_fds[0]; - nb_fds++; - - data = qemu_blockalign(bs, NBD_BUFFER_SIZE); - if (data == NULL) { - errx(EXIT_FAILURE, "Cannot allocate data buffer"); - } + qemu_init_main_loop(); + qemu_set_fd_handler2(fd, nbd_can_accept, nbd_accept, NULL, + (void *)(uintptr_t)fd); do { - FD_ZERO(&fds); - FD_SET(sigterm_fd[0], &fds); - for (i = 0; i < nb_fds; i++) - FD_SET(sharing_fds[i], &fds); - - do { - ret = select(max_fd + 1, &fds, NULL, NULL, NULL); - } while (ret == -1 && errno == EINTR); - if (ret == -1 || FD_ISSET(sigterm_fd[0], &fds)) { - break; - } - - if (FD_ISSET(sharing_fds[0], &fds)) - ret--; - for (i = 1; i < nb_fds && ret; i++) { - if (FD_ISSET(sharing_fds[i], &fds)) { - if (nbd_trip(bs, sharing_fds[i], fd_size, dev_offset, - &offset, nbdflags, data, NBD_BUFFER_SIZE) != 0) { - close(sharing_fds[i]); - nb_fds--; - sharing_fds[i] = sharing_fds[nb_fds]; - i--; - } - ret--; - } - } - /* new connection ? */ - if (FD_ISSET(sharing_fds[0], &fds)) { - if (nb_fds < shared + 1) { - sharing_fds[nb_fds] = accept(sharing_fds[0], - (struct sockaddr *)&addr, - &addr_len); - if (sharing_fds[nb_fds] != -1 && - nbd_negotiate(sharing_fds[nb_fds], fd_size, nbdflags) != -1) { - if (sharing_fds[nb_fds] > max_fd) - max_fd = sharing_fds[nb_fds]; - nb_fds++; - } - } - } - } while (persistent || nb_fds > 1); - qemu_vfree(data); + main_loop_wait(false); + } while (!sigterm_reported && (persistent || !nbd_started || nb_fds > 0)); - close(sharing_fds[0]); - g_free(sharing_fds); + nbd_export_close(exp); if (sockpath) { unlink(sockpath); } diff --git a/qemu-tool.c b/qemu-tool.c index 5df7279745..226b6e890e 100644 --- a/qemu-tool.c +++ b/qemu-tool.c @@ -16,12 +16,12 @@ #include "qemu-timer.h" #include "qemu-log.h" #include "migration.h" +#include "main-loop.h" +#include "qemu_socket.h" +#include "slirp/libslirp.h" #include <sys/time.h> -QEMUClock *rt_clock; -QEMUClock *vm_clock; - FILE *logfile; struct QEMUBH @@ -57,41 +57,45 @@ void monitor_protocol_event(MonitorEvent event, QObject *data) { } -int qemu_set_fd_handler2(int fd, - IOCanReadHandler *fd_read_poll, - IOHandler *fd_read, - IOHandler *fd_write, - void *opaque) +int64 cpu_get_clock(void) { - return 0; + abort(); } -void qemu_notify_event(void) +int64 cpu_get_icount(void) { + abort(); } -QEMUTimer *qemu_new_timer(QEMUClock *clock, int scale, - QEMUTimerCB *cb, void *opaque) +void qemu_mutex_lock_iothread(void) { - return g_malloc(1); } -void qemu_free_timer(QEMUTimer *ts) +void qemu_mutex_unlock_iothread(void) { - g_free(ts); } -void qemu_del_timer(QEMUTimer *ts) +int use_icount; + +void qemu_clock_warp(QEMUClock *clock) { } -void qemu_mod_timer(QEMUTimer *ts, int64_t expire_time) +static void __attribute__((constructor)) init_main_loop(void) { + init_clocks(); + init_timer_alarm(); + qemu_clock_enable(vm_clock, false); } -int64_t qemu_get_clock_ns(QEMUClock *clock) +void slirp_select_fill(int *pnfds, fd_set *readfds, + fd_set *writefds, fd_set *xfds) +{ +} + +void slirp_select_poll(fd_set *readfds, fd_set *writefds, + fd_set *xfds, int select_error) { - return 0; } void migrate_add_blocker(Error *reason) diff --git a/qemu_socket.h b/qemu_socket.h index 9e32fac651..fe4cf6ca61 100644 --- a/qemu_socket.h +++ b/qemu_socket.h @@ -35,6 +35,7 @@ int inet_aton(const char *cp, struct in_addr *ia); /* misc helpers */ int qemu_socket(int domain, int type, int protocol); int qemu_accept(int s, struct sockaddr *addr, socklen_t *addrlen); +int socket_set_cork(int fd, int v); void socket_set_block(int fd); void socket_set_nonblock(int fd); int send_all(int fd, const void *buf, int len1); |