diff options
-rw-r--r-- | block/sheepdog.c | 80 |
1 files changed, 66 insertions, 14 deletions
diff --git a/block/sheepdog.c b/block/sheepdog.c index 5311fb18a9..fd1447e906 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -304,6 +304,8 @@ struct SheepdogAIOCB { }; typedef struct BDRVSheepdogState { + BlockDriverState *bs; + SheepdogInode inode; uint32_t min_dirty_data_idx; @@ -323,8 +325,11 @@ typedef struct BDRVSheepdogState { Coroutine *co_recv; uint32_t aioreq_seq_num; + + /* Every aio request must be linked to either of these queues. */ QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head; QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head; + QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head; } BDRVSheepdogState; static const char * sd_strerror(int err) @@ -611,6 +616,8 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, enum AIOCBState aiocb_type); static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req); static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag); +static int get_sheep_fd(BDRVSheepdogState *s); +static void co_write_request(void *opaque); static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid) { @@ -652,6 +659,51 @@ static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid) } } +static coroutine_fn void reconnect_to_sdog(void *opaque) +{ + BDRVSheepdogState *s = opaque; + AIOReq *aio_req, *next; + + qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL); + close(s->fd); + s->fd = -1; + + /* Wait for outstanding write requests to be completed. */ + while (s->co_send != NULL) { + co_write_request(opaque); + } + + /* Try to reconnect the sheepdog server every one second. */ + while (s->fd < 0) { + s->fd = get_sheep_fd(s); + if (s->fd < 0) { + DPRINTF("Wait for connection to be established\n"); + co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME, + 1000000000ULL); + } + }; + + /* + * Now we have to resend all the request in the inflight queue. However, + * resend_aioreq() can yield and newly created requests can be added to the + * inflight queue before the coroutine is resumed. To avoid mixing them, we + * have to move all the inflight requests to the failed queue before + * resend_aioreq() is called. + */ + QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) { + QLIST_REMOVE(aio_req, aio_siblings); + QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings); + } + + /* Resend all the failed aio requests. */ + while (!QLIST_EMPTY(&s->failed_aio_head)) { + aio_req = QLIST_FIRST(&s->failed_aio_head); + QLIST_REMOVE(aio_req, aio_siblings); + QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings); + resend_aioreq(s, aio_req); + } +} + /* * Receive responses of the I/O requests. * @@ -668,15 +720,11 @@ static void coroutine_fn aio_read_response(void *opaque) SheepdogAIOCB *acb; uint64_t idx; - if (QLIST_EMPTY(&s->inflight_aio_head)) { - goto out; - } - /* read a header */ ret = qemu_co_recv(fd, &rsp, sizeof(rsp)); if (ret != sizeof(rsp)) { error_report("failed to get the header, %s", strerror(errno)); - goto out; + goto err; } /* find the right aio_req from the inflight aio list */ @@ -687,7 +735,7 @@ static void coroutine_fn aio_read_response(void *opaque) } if (!aio_req) { error_report("cannot find aio_req %x", rsp.id); - goto out; + goto err; } acb = aio_req->aiocb; @@ -727,7 +775,7 @@ static void coroutine_fn aio_read_response(void *opaque) aio_req->iov_offset, rsp.data_length); if (ret != rsp.data_length) { error_report("failed to get the data, %s", strerror(errno)); - goto out; + goto err; } break; case AIOCB_FLUSH_CACHE: @@ -761,10 +809,9 @@ static void coroutine_fn aio_read_response(void *opaque) if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) { ret = reload_inode(s, 0, ""); if (ret < 0) { - goto out; + goto err; } } - if (is_data_obj(aio_req->oid)) { aio_req->oid = vid_to_data_oid(s->inode.vdi_id, data_oid_to_idx(aio_req->oid)); @@ -792,6 +839,10 @@ static void coroutine_fn aio_read_response(void *opaque) } out: s->co_recv = NULL; + return; +err: + s->co_recv = NULL; + reconnect_to_sdog(opaque); } static void co_read_response(void *opaque) @@ -1083,22 +1134,20 @@ static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req, /* send a header */ ret = qemu_co_send(s->fd, &hdr, sizeof(hdr)); if (ret != sizeof(hdr)) { - qemu_co_mutex_unlock(&s->lock); error_report("failed to send a req, %s", strerror(errno)); - return -errno; + goto out; } if (wlen) { ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen); if (ret != wlen) { - qemu_co_mutex_unlock(&s->lock); error_report("failed to send a data, %s", strerror(errno)); - return -errno; } } - +out: socket_set_cork(s->fd, 0); qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, s); + s->co_send = NULL; qemu_co_mutex_unlock(&s->lock); return 0; @@ -1276,6 +1325,8 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags, Error *local_err = NULL; const char *filename; + s->bs = bs; + opts = qemu_opts_create_nofail(&runtime_opts); qemu_opts_absorb_qdict(opts, options, &local_err); if (error_is_set(&local_err)) { @@ -1289,6 +1340,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags, QLIST_INIT(&s->inflight_aio_head); QLIST_INIT(&s->pending_aio_head); + QLIST_INIT(&s->failed_aio_head); s->fd = -1; memset(vdi, 0, sizeof(vdi)); |