aboutsummaryrefslogtreecommitdiff
path: root/block/sheepdog.c
diff options
context:
space:
mode:
Diffstat (limited to 'block/sheepdog.c')
-rw-r--r--block/sheepdog.c267
1 files changed, 140 insertions, 127 deletions
diff --git a/block/sheepdog.c b/block/sheepdog.c
index f46ca8fb69..a04ad99ead 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -259,8 +259,7 @@ typedef struct AIOReq {
uint8_t flags;
uint32_t id;
- QLIST_ENTRY(AIOReq) outstanding_aio_siblings;
- QLIST_ENTRY(AIOReq) aioreq_siblings;
+ QLIST_ENTRY(AIOReq) aio_siblings;
} AIOReq;
enum AIOCBState {
@@ -283,8 +282,7 @@ struct SheepdogAIOCB {
void (*aio_done_func)(SheepdogAIOCB *);
int canceled;
-
- QLIST_HEAD(aioreq_head, AIOReq) aioreq_head;
+ int nr_pending;
};
typedef struct BDRVSheepdogState {
@@ -307,7 +305,8 @@ typedef struct BDRVSheepdogState {
Coroutine *co_recv;
uint32_t aioreq_seq_num;
- QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
+ QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
+ QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
} BDRVSheepdogState;
static const char * sd_strerror(int err)
@@ -358,7 +357,7 @@ static const char * sd_strerror(int err)
* Sheepdog I/O handling:
*
* 1. In sd_co_rw_vector, we send the I/O requests to the server and
- * link the requests to the outstanding_list in the
+ * link the requests to the inflight_list in the
* BDRVSheepdogState. The function exits without waiting for
* receiving the response.
*
@@ -386,21 +385,18 @@ static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
aio_req->flags = flags;
aio_req->id = s->aioreq_seq_num++;
- QLIST_INSERT_HEAD(&s->outstanding_aio_head, aio_req,
- outstanding_aio_siblings);
- QLIST_INSERT_HEAD(&acb->aioreq_head, aio_req, aioreq_siblings);
-
+ acb->nr_pending++;
return aio_req;
}
-static inline int free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
+static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
{
SheepdogAIOCB *acb = aio_req->aiocb;
- QLIST_REMOVE(aio_req, outstanding_aio_siblings);
- QLIST_REMOVE(aio_req, aioreq_siblings);
+
+ QLIST_REMOVE(aio_req, aio_siblings);
g_free(aio_req);
- return !QLIST_EMPTY(&acb->aioreq_head);
+ acb->nr_pending--;
}
static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
@@ -446,7 +442,7 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
acb->canceled = 0;
acb->coroutine = qemu_coroutine_self();
acb->ret = 0;
- QLIST_INIT(&acb->aioreq_head);
+ acb->nr_pending = 0;
return acb;
}
@@ -502,56 +498,65 @@ success:
return fd;
}
-static int send_req(int sockfd, SheepdogReq *hdr, void *data,
- unsigned int *wlen)
+static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
+ unsigned int *wlen)
{
int ret;
- ret = qemu_send_full(sockfd, hdr, sizeof(*hdr), 0);
+ ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
if (ret < sizeof(*hdr)) {
error_report("failed to send a req, %s", strerror(errno));
- return -errno;
+ return ret;
}
- ret = qemu_send_full(sockfd, data, *wlen, 0);
+ ret = qemu_co_send(sockfd, data, *wlen);
if (ret < *wlen) {
error_report("failed to send a req, %s", strerror(errno));
- ret = -errno;
}
return ret;
}
-static int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
- unsigned int *wlen)
+static void restart_co_req(void *opaque)
{
- int ret;
+ Coroutine *co = opaque;
- ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
- if (ret < sizeof(*hdr)) {
- error_report("failed to send a req, %s", strerror(errno));
- return ret;
- }
+ qemu_coroutine_enter(co, NULL);
+}
- ret = qemu_co_send(sockfd, data, *wlen);
- if (ret < *wlen) {
- error_report("failed to send a req, %s", strerror(errno));
- }
+typedef struct SheepdogReqCo {
+ int sockfd;
+ SheepdogReq *hdr;
+ void *data;
+ unsigned int *wlen;
+ unsigned int *rlen;
+ int ret;
+ bool finished;
+} SheepdogReqCo;
- return ret;
-}
-static int do_req(int sockfd, SheepdogReq *hdr, void *data,
- unsigned int *wlen, unsigned int *rlen)
+static coroutine_fn void do_co_req(void *opaque)
{
int ret;
+ Coroutine *co;
+ SheepdogReqCo *srco = opaque;
+ int sockfd = srco->sockfd;
+ SheepdogReq *hdr = srco->hdr;
+ void *data = srco->data;
+ unsigned int *wlen = srco->wlen;
+ unsigned int *rlen = srco->rlen;
+
+ co = qemu_coroutine_self();
+ qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, NULL, co);
socket_set_block(sockfd);
- ret = send_req(sockfd, hdr, data, wlen);
+ ret = send_co_req(sockfd, hdr, data, wlen);
if (ret < 0) {
goto out;
}
- ret = qemu_recv_full(sockfd, hdr, sizeof(*hdr), 0);
+ qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, NULL, co);
+
+ ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
if (ret < sizeof(*hdr)) {
error_report("failed to get a rsp, %s", strerror(errno));
ret = -errno;
@@ -563,7 +568,7 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
}
if (*rlen) {
- ret = qemu_recv_full(sockfd, data, *rlen, 0);
+ ret = qemu_co_recv(sockfd, data, *rlen);
if (ret < *rlen) {
error_report("failed to get the data, %s", strerror(errno));
ret = -errno;
@@ -572,76 +577,79 @@ static int do_req(int sockfd, SheepdogReq *hdr, void *data,
}
ret = 0;
out:
+ qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL);
socket_set_nonblock(sockfd);
- return ret;
+
+ srco->ret = ret;
+ srco->finished = true;
}
-static int do_co_req(int sockfd, SheepdogReq *hdr, void *data,
- unsigned int *wlen, unsigned int *rlen)
+static int do_req(int sockfd, SheepdogReq *hdr, void *data,
+ unsigned int *wlen, unsigned int *rlen)
{
- int ret;
-
- socket_set_block(sockfd);
- ret = send_co_req(sockfd, hdr, data, wlen);
- if (ret < 0) {
- goto out;
- }
-
- ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
- if (ret < sizeof(*hdr)) {
- error_report("failed to get a rsp, %s", strerror(errno));
- ret = -errno;
- goto out;
- }
-
- if (*rlen > hdr->data_length) {
- *rlen = hdr->data_length;
- }
+ Coroutine *co;
+ SheepdogReqCo srco = {
+ .sockfd = sockfd,
+ .hdr = hdr,
+ .data = data,
+ .wlen = wlen,
+ .rlen = rlen,
+ .ret = 0,
+ .finished = false,
+ };
- if (*rlen) {
- ret = qemu_co_recv(sockfd, data, *rlen);
- if (ret < *rlen) {
- error_report("failed to get the data, %s", strerror(errno));
- ret = -errno;
- goto out;
+ if (qemu_in_coroutine()) {
+ do_co_req(&srco);
+ } else {
+ co = qemu_coroutine_create(do_co_req);
+ qemu_coroutine_enter(co, &srco);
+ while (!srco.finished) {
+ qemu_aio_wait();
}
}
- ret = 0;
-out:
- socket_set_nonblock(sockfd);
- return ret;
+
+ return srco.ret;
}
static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
struct iovec *iov, int niov, int create,
enum AIOCBState aiocb_type);
+
+static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
+{
+ AIOReq *aio_req;
+
+ QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
+ if (aio_req->oid == oid) {
+ return aio_req;
+ }
+ }
+
+ return NULL;
+}
+
/*
* This function searchs pending requests to the object `oid', and
* sends them.
*/
-static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid, uint32_t id)
+static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
{
- AIOReq *aio_req, *next;
+ AIOReq *aio_req;
SheepdogAIOCB *acb;
int ret;
- QLIST_FOREACH_SAFE(aio_req, &s->outstanding_aio_head,
- outstanding_aio_siblings, next) {
- if (id == aio_req->id) {
- continue;
- }
- if (aio_req->oid != oid) {
- continue;
- }
-
+ while ((aio_req = find_pending_req(s, oid)) != NULL) {
acb = aio_req->aiocb;
+ /* move aio_req from pending list to inflight one */
+ QLIST_REMOVE(aio_req, aio_siblings);
+ QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
ret = add_aio_request(s, aio_req, acb->qiov->iov,
acb->qiov->niov, 0, acb->aiocb_type);
if (ret < 0) {
error_report("add_aio_request is failed");
free_aio_req(s, aio_req);
- if (QLIST_EMPTY(&acb->aioreq_head)) {
+ if (!acb->nr_pending) {
sd_finish_aiocb(acb);
}
}
@@ -662,10 +670,9 @@ static void coroutine_fn aio_read_response(void *opaque)
int ret;
AIOReq *aio_req = NULL;
SheepdogAIOCB *acb;
- int rest;
unsigned long idx;
- if (QLIST_EMPTY(&s->outstanding_aio_head)) {
+ if (QLIST_EMPTY(&s->inflight_aio_head)) {
goto out;
}
@@ -676,8 +683,8 @@ static void coroutine_fn aio_read_response(void *opaque)
goto out;
}
- /* find the right aio_req from the outstanding_aio list */
- QLIST_FOREACH(aio_req, &s->outstanding_aio_head, outstanding_aio_siblings) {
+ /* find the right aio_req from the inflight aio list */
+ QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
if (aio_req->id == rsp.id) {
break;
}
@@ -715,12 +722,12 @@ static void coroutine_fn aio_read_response(void *opaque)
* create requests are not allowed, so we search the
* pending requests here.
*/
- send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx), rsp.id);
+ send_pending_req(s, vid_to_data_oid(s->inode.vdi_id, idx));
}
break;
case AIOCB_READ_UDATA:
- ret = qemu_co_recvv(fd, acb->qiov->iov, rsp.data_length,
- aio_req->iov_offset);
+ ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
+ aio_req->iov_offset, rsp.data_length);
if (ret < 0) {
error_report("failed to get the data, %s", strerror(errno));
goto out;
@@ -733,8 +740,8 @@ static void coroutine_fn aio_read_response(void *opaque)
error_report("%s", sd_strerror(rsp.result));
}
- rest = free_aio_req(s, aio_req);
- if (!rest) {
+ free_aio_req(s, aio_req);
+ if (!acb->nr_pending) {
/*
* We've finished all requests which belong to the AIOCB, so
* we can switch back to sd_co_readv/writev now.
@@ -767,7 +774,8 @@ static int aio_flush_request(void *opaque)
{
BDRVSheepdogState *s = opaque;
- return !QLIST_EMPTY(&s->outstanding_aio_head);
+ return !QLIST_EMPTY(&s->inflight_aio_head) ||
+ !QLIST_EMPTY(&s->pending_aio_head);
}
static int set_nodelay(int fd)
@@ -992,7 +1000,7 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
}
if (wlen) {
- ret = qemu_co_sendv(s->fd, iov, wlen, aio_req->iov_offset);
+ ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
if (ret < 0) {
qemu_co_mutex_unlock(&s->lock);
error_report("failed to send a data, %s", strerror(errno));
@@ -1084,7 +1092,8 @@ static int sd_open(BlockDriverState *bs, const char *filename, int flags)
strstart(filename, "sheepdog:", (const char **)&filename);
- QLIST_INIT(&s->outstanding_aio_head);
+ QLIST_INIT(&s->inflight_aio_head);
+ QLIST_INIT(&s->pending_aio_head);
s->fd = -1;
memset(vdi, 0, sizeof(vdi));
@@ -1446,6 +1455,7 @@ static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
iov.iov_len = sizeof(s->inode);
aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
data_len, offset, 0, 0, offset);
+ QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
ret = add_aio_request(s, aio_req, &iov, 1, 0, AIOCB_WRITE_UDATA);
if (ret) {
free_aio_req(s, aio_req);
@@ -1514,7 +1524,7 @@ out:
* Send I/O requests to the server.
*
* This function sends requests to the server, links the requests to
- * the outstanding_list in BDRVSheepdogState, and exits without
+ * the inflight_list in BDRVSheepdogState, and exits without
* waiting the response. The responses are received in the
* `aio_read_response' function which is called from the main loop as
* a fd handler.
@@ -1546,6 +1556,12 @@ static int coroutine_fn sd_co_rw_vector(void *p)
}
}
+ /*
+ * Make sure we don't free the aiocb before we are done with all requests.
+ * This additional reference is dropped at the end of this function.
+ */
+ acb->nr_pending++;
+
while (done != total) {
uint8_t flags = 0;
uint64_t old_oid = 0;
@@ -1555,37 +1571,40 @@ static int coroutine_fn sd_co_rw_vector(void *p)
len = MIN(total - done, SD_DATA_OBJ_SIZE - offset);
- if (!inode->data_vdi_id[idx]) {
- if (acb->aiocb_type == AIOCB_READ_UDATA) {
+ switch (acb->aiocb_type) {
+ case AIOCB_READ_UDATA:
+ if (!inode->data_vdi_id[idx]) {
+ qemu_iovec_memset(acb->qiov, done, 0, len);
goto done;
}
-
- create = 1;
- } else if (acb->aiocb_type == AIOCB_WRITE_UDATA
- && !is_data_obj_writable(inode, idx)) {
- /* Copy-On-Write */
- create = 1;
- old_oid = oid;
- flags = SD_FLAG_CMD_COW;
+ break;
+ case AIOCB_WRITE_UDATA:
+ if (!inode->data_vdi_id[idx]) {
+ create = 1;
+ } else if (!is_data_obj_writable(inode, idx)) {
+ /* Copy-On-Write */
+ create = 1;
+ old_oid = oid;
+ flags = SD_FLAG_CMD_COW;
+ }
+ break;
+ default:
+ break;
}
if (create) {
- dprintf("update ino (%" PRIu32") %" PRIu64 " %" PRIu64
- " %" PRIu64 "\n", inode->vdi_id, oid,
+ dprintf("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
+ inode->vdi_id, oid,
vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
oid = vid_to_data_oid(inode->vdi_id, idx);
- dprintf("new oid %lx\n", oid);
+ dprintf("new oid %" PRIx64 "\n", oid);
}
aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
if (create) {
AIOReq *areq;
- QLIST_FOREACH(areq, &s->outstanding_aio_head,
- outstanding_aio_siblings) {
- if (areq == aio_req) {
- continue;
- }
+ QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
if (areq->oid == oid) {
/*
* Sheepdog cannot handle simultaneous create
@@ -1595,11 +1614,14 @@ static int coroutine_fn sd_co_rw_vector(void *p)
*/
aio_req->flags = 0;
aio_req->base_oid = 0;
+ QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
+ aio_siblings);
goto done;
}
}
}
+ QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
create, acb->aiocb_type);
if (ret < 0) {
@@ -1614,7 +1636,7 @@ static int coroutine_fn sd_co_rw_vector(void *p)
done += len;
}
out:
- if (QLIST_EMPTY(&acb->aioreq_head)) {
+ if (!--acb->nr_pending) {
return acb->ret;
}
return 1;
@@ -1627,7 +1649,6 @@ static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
int ret;
if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
- /* TODO: shouldn't block here */
ret = sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE);
if (ret < 0) {
return ret;
@@ -1654,20 +1675,12 @@ static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
int nb_sectors, QEMUIOVector *qiov)
{
SheepdogAIOCB *acb;
- int i, ret;
+ int ret;
acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
acb->aiocb_type = AIOCB_READ_UDATA;
acb->aio_done_func = sd_finish_aiocb;
- /*
- * TODO: we can do better; we don't need to initialize
- * blindly.
- */
- for (i = 0; i < qiov->niov; i++) {
- memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
- }
-
ret = sd_co_rw_vector(acb);
if (ret <= 0) {
qemu_aio_release(acb);
@@ -1695,7 +1708,7 @@ static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
hdr.opcode = SD_OP_FLUSH_VDI;
hdr.oid = vid_to_vdi_oid(inode->vdi_id);
- ret = do_co_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen);
+ ret = do_req(s->flush_fd, (SheepdogReq *)&hdr, NULL, &wlen, &rlen);
if (ret) {
error_report("failed to send a request to the sheep");
return ret;
@@ -1725,7 +1738,7 @@ static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
SheepdogInode *inode;
unsigned int datalen;
- dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %d "
+ dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
"is_snapshot %d\n", sn_info->name, sn_info->id_str,
s->name, sn_info->vm_state_size, s->is_snapshot);