diff options
Diffstat (limited to 'block')
-rw-r--r-- | block/backup.c | 2 | ||||
-rw-r--r-- | block/blkdebug.c | 9 | ||||
-rwxr-xr-x | block/blkreplay.c | 2 | ||||
-rw-r--r-- | block/block-backend.c | 13 | ||||
-rw-r--r-- | block/curl.c | 44 | ||||
-rw-r--r-- | block/gluster.c | 9 | ||||
-rw-r--r-- | block/io.c | 42 | ||||
-rw-r--r-- | block/iscsi.c | 15 | ||||
-rw-r--r-- | block/linux-aio.c | 10 | ||||
-rw-r--r-- | block/mirror.c | 12 | ||||
-rw-r--r-- | block/nbd-client.c | 119 | ||||
-rw-r--r-- | block/nbd-client.h | 2 | ||||
-rw-r--r-- | block/nfs.c | 9 | ||||
-rw-r--r-- | block/qcow2-cluster.c | 4 | ||||
-rw-r--r-- | block/qed-cluster.c | 2 | ||||
-rw-r--r-- | block/qed-table.c | 12 | ||||
-rw-r--r-- | block/qed.c | 58 | ||||
-rw-r--r-- | block/qed.h | 3 | ||||
-rw-r--r-- | block/sheepdog.c | 31 | ||||
-rw-r--r-- | block/ssh.c | 29 | ||||
-rw-r--r-- | block/throttle-groups.c | 4 | ||||
-rw-r--r-- | block/win32-aio.c | 9 |
22 files changed, 234 insertions, 206 deletions
diff --git a/block/backup.c b/block/backup.c index ea38733849..fe010e78e3 100644 --- a/block/backup.c +++ b/block/backup.c @@ -64,7 +64,7 @@ static void coroutine_fn wait_for_overlapping_requests(BackupBlockJob *job, retry = false; QLIST_FOREACH(req, &job->inflight_reqs, list) { if (end > req->start && start < req->end) { - qemu_co_queue_wait(&req->wait_queue); + qemu_co_queue_wait(&req->wait_queue, NULL); retry = true; break; } diff --git a/block/blkdebug.c b/block/blkdebug.c index acccf85666..d8eee1b9b4 100644 --- a/block/blkdebug.c +++ b/block/blkdebug.c @@ -405,12 +405,6 @@ out: return ret; } -static void error_callback_bh(void *opaque) -{ - Coroutine *co = opaque; - qemu_coroutine_enter(co); -} - static int inject_error(BlockDriverState *bs, BlkdebugRule *rule) { BDRVBlkdebugState *s = bs->opaque; @@ -423,8 +417,7 @@ static int inject_error(BlockDriverState *bs, BlkdebugRule *rule) } if (!immediately) { - aio_bh_schedule_oneshot(bdrv_get_aio_context(bs), error_callback_bh, - qemu_coroutine_self()); + aio_co_schedule(qemu_get_current_aio_context(), qemu_coroutine_self()); qemu_coroutine_yield(); } diff --git a/block/blkreplay.c b/block/blkreplay.c index a741654d35..cfc8c5be02 100755 --- a/block/blkreplay.c +++ b/block/blkreplay.c @@ -60,7 +60,7 @@ static int64_t blkreplay_getlength(BlockDriverState *bs) static void blkreplay_bh_cb(void *opaque) { Request *req = opaque; - qemu_coroutine_enter(req->co); + aio_co_wake(req->co); qemu_bh_delete(req->bh); g_free(req); } diff --git a/block/block-backend.c b/block/block-backend.c index efbf398bb5..819f27213a 100644 --- a/block/block-backend.c +++ b/block/block-backend.c @@ -880,7 +880,6 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf, { QEMUIOVector qiov; struct iovec iov; - Coroutine *co; BlkRwCo rwco; iov = (struct iovec) { @@ -897,9 +896,14 @@ static int blk_prw(BlockBackend *blk, int64_t offset, uint8_t *buf, .ret = NOT_DONE, }; - co = qemu_coroutine_create(co_entry, &rwco); - qemu_coroutine_enter(co); - BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE); + if (qemu_in_coroutine()) { + /* Fast-path if already in coroutine context */ + co_entry(&rwco); + } else { + Coroutine *co = qemu_coroutine_create(co_entry, &rwco); + qemu_coroutine_enter(co); + BDRV_POLL_WHILE(blk_bs(blk), rwco.ret == NOT_DONE); + } return rwco.ret; } @@ -979,7 +983,6 @@ static void blk_aio_complete(BlkAioEmAIOCB *acb) static void blk_aio_complete_bh(void *opaque) { BlkAioEmAIOCB *acb = opaque; - assert(acb->has_returned); blk_aio_complete(acb); } diff --git a/block/curl.c b/block/curl.c index 792fef8269..2939cc77e9 100644 --- a/block/curl.c +++ b/block/curl.c @@ -386,9 +386,8 @@ static void curl_multi_check_completion(BDRVCURLState *s) } } -static void curl_multi_do(void *arg) +static void curl_multi_do_locked(CURLState *s) { - CURLState *s = (CURLState *)arg; CURLSocket *socket, *next_socket; int running; int r; @@ -406,12 +405,23 @@ static void curl_multi_do(void *arg) } } +static void curl_multi_do(void *arg) +{ + CURLState *s = (CURLState *)arg; + + aio_context_acquire(s->s->aio_context); + curl_multi_do_locked(s); + aio_context_release(s->s->aio_context); +} + static void curl_multi_read(void *arg) { CURLState *s = (CURLState *)arg; - curl_multi_do(arg); + aio_context_acquire(s->s->aio_context); + curl_multi_do_locked(s); curl_multi_check_completion(s->s); + aio_context_release(s->s->aio_context); } static void curl_multi_timeout_do(void *arg) @@ -424,9 +434,11 @@ static void curl_multi_timeout_do(void *arg) return; } + aio_context_acquire(s->aio_context); curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running); curl_multi_check_completion(s); + aio_context_release(s->aio_context); #else abort(); #endif @@ -784,13 +796,18 @@ static void curl_readv_bh_cb(void *p) { CURLState *state; int running; + int ret = -EINPROGRESS; CURLAIOCB *acb = p; - BDRVCURLState *s = acb->common.bs->opaque; + BlockDriverState *bs = acb->common.bs; + BDRVCURLState *s = bs->opaque; + AioContext *ctx = bdrv_get_aio_context(bs); size_t start = acb->sector_num * BDRV_SECTOR_SIZE; size_t end; + aio_context_acquire(ctx); + // In case we have the requested data already (e.g. read-ahead), // we can just call the callback and be done. switch (curl_find_buf(s, start, acb->nb_sectors * BDRV_SECTOR_SIZE, acb)) { @@ -798,7 +815,7 @@ static void curl_readv_bh_cb(void *p) qemu_aio_unref(acb); // fall through case FIND_RET_WAIT: - return; + goto out; default: break; } @@ -806,9 +823,8 @@ static void curl_readv_bh_cb(void *p) // No cache found, so let's start a new request state = curl_init_state(acb->common.bs, s); if (!state) { - acb->common.cb(acb->common.opaque, -EIO); - qemu_aio_unref(acb); - return; + ret = -EIO; + goto out; } acb->start = 0; @@ -822,9 +838,8 @@ static void curl_readv_bh_cb(void *p) state->orig_buf = g_try_malloc(state->buf_len); if (state->buf_len && state->orig_buf == NULL) { curl_clean_state(state); - acb->common.cb(acb->common.opaque, -ENOMEM); - qemu_aio_unref(acb); - return; + ret = -ENOMEM; + goto out; } state->acb[0] = acb; @@ -837,6 +852,13 @@ static void curl_readv_bh_cb(void *p) /* Tell curl it needs to kick things off */ curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, &running); + +out: + aio_context_release(ctx); + if (ret != -EINPROGRESS) { + acb->common.cb(acb->common.opaque, ret); + qemu_aio_unref(acb); + } } static BlockAIOCB *curl_aio_readv(BlockDriverState *bs, diff --git a/block/gluster.c b/block/gluster.c index 1a22f2982d..56b4abe3a7 100644 --- a/block/gluster.c +++ b/block/gluster.c @@ -698,13 +698,6 @@ static struct glfs *qemu_gluster_init(BlockdevOptionsGluster *gconf, return qemu_gluster_glfs_init(gconf, errp); } -static void qemu_gluster_complete_aio(void *opaque) -{ - GlusterAIOCB *acb = (GlusterAIOCB *)opaque; - - qemu_coroutine_enter(acb->coroutine); -} - /* * AIO callback routine called from GlusterFS thread. */ @@ -720,7 +713,7 @@ static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg) acb->ret = -EIO; /* Partial read/write - fail it */ } - aio_bh_schedule_oneshot(acb->aio_context, qemu_gluster_complete_aio, acb); + aio_co_schedule(acb->aio_context, acb->coroutine); } static void qemu_gluster_parse_flags(int bdrv_flags, int *open_flags) diff --git a/block/io.c b/block/io.c index c42b34a965..d5c45447fd 100644 --- a/block/io.c +++ b/block/io.c @@ -189,7 +189,7 @@ static void bdrv_co_drain_bh_cb(void *opaque) bdrv_dec_in_flight(bs); bdrv_drained_begin(bs); data->done = true; - qemu_coroutine_enter(co); + aio_co_wake(co); } static void coroutine_fn bdrv_co_yield_to_drain(BlockDriverState *bs) @@ -539,7 +539,7 @@ static bool coroutine_fn wait_serialising_requests(BdrvTrackedRequest *self) * (instead of producing a deadlock in the former case). */ if (!req->waiting_for) { self->waiting_for = req; - qemu_co_queue_wait(&req->wait_queue); + qemu_co_queue_wait(&req->wait_queue, NULL); self->waiting_for = NULL; retry = true; waited = true; @@ -813,7 +813,7 @@ static void bdrv_co_io_em_complete(void *opaque, int ret) CoroutineIOCompletion *co = opaque; co->ret = ret; - qemu_coroutine_enter(co->coroutine); + aio_co_wake(co->coroutine); } static int coroutine_fn bdrv_driver_preadv(BlockDriverState *bs, @@ -2080,6 +2080,11 @@ void bdrv_aio_cancel(BlockAIOCB *acb) if (acb->aiocb_info->get_aio_context) { aio_poll(acb->aiocb_info->get_aio_context(acb), true); } else if (acb->bs) { + /* qemu_aio_ref and qemu_aio_unref are not thread-safe, so + * assert that we're not using an I/O thread. Thread-safe + * code should use bdrv_aio_cancel_async exclusively. + */ + assert(bdrv_get_aio_context(acb->bs) == qemu_get_aio_context()); aio_poll(bdrv_get_aio_context(acb->bs), true); } else { abort(); @@ -2239,35 +2244,6 @@ BlockAIOCB *bdrv_aio_flush(BlockDriverState *bs, return &acb->common; } -void *qemu_aio_get(const AIOCBInfo *aiocb_info, BlockDriverState *bs, - BlockCompletionFunc *cb, void *opaque) -{ - BlockAIOCB *acb; - - acb = g_malloc(aiocb_info->aiocb_size); - acb->aiocb_info = aiocb_info; - acb->bs = bs; - acb->cb = cb; - acb->opaque = opaque; - acb->refcnt = 1; - return acb; -} - -void qemu_aio_ref(void *p) -{ - BlockAIOCB *acb = p; - acb->refcnt++; -} - -void qemu_aio_unref(void *p) -{ - BlockAIOCB *acb = p; - assert(acb->refcnt > 0); - if (--acb->refcnt == 0) { - g_free(acb); - } -} - /**************************************************************/ /* Coroutine block device emulation */ @@ -2299,7 +2275,7 @@ int coroutine_fn bdrv_co_flush(BlockDriverState *bs) /* Wait until any previous flushes are completed */ while (bs->active_flush_req) { - qemu_co_queue_wait(&bs->flush_queue); + qemu_co_queue_wait(&bs->flush_queue, NULL); } bs->active_flush_req = true; diff --git a/block/iscsi.c b/block/iscsi.c index 1860f1bc91..2561be90de 100644 --- a/block/iscsi.c +++ b/block/iscsi.c @@ -165,8 +165,9 @@ iscsi_schedule_bh(IscsiAIOCB *acb) static void iscsi_co_generic_bh_cb(void *opaque) { struct IscsiTask *iTask = opaque; + iTask->complete = 1; - qemu_coroutine_enter(iTask->co); + aio_co_wake(iTask->co); } static void iscsi_retry_timer_expired(void *opaque) @@ -174,7 +175,7 @@ static void iscsi_retry_timer_expired(void *opaque) struct IscsiTask *iTask = opaque; iTask->complete = 1; if (iTask->co) { - qemu_coroutine_enter(iTask->co); + aio_co_wake(iTask->co); } } @@ -394,8 +395,10 @@ iscsi_process_read(void *arg) IscsiLun *iscsilun = arg; struct iscsi_context *iscsi = iscsilun->iscsi; + aio_context_acquire(iscsilun->aio_context); iscsi_service(iscsi, POLLIN); iscsi_set_events(iscsilun); + aio_context_release(iscsilun->aio_context); } static void @@ -404,8 +407,10 @@ iscsi_process_write(void *arg) IscsiLun *iscsilun = arg; struct iscsi_context *iscsi = iscsilun->iscsi; + aio_context_acquire(iscsilun->aio_context); iscsi_service(iscsi, POLLOUT); iscsi_set_events(iscsilun); + aio_context_release(iscsilun->aio_context); } static int64_t sector_lun2qemu(int64_t sector, IscsiLun *iscsilun) @@ -1392,16 +1397,20 @@ static void iscsi_nop_timed_event(void *opaque) { IscsiLun *iscsilun = opaque; + aio_context_acquire(iscsilun->aio_context); if (iscsi_get_nops_in_flight(iscsilun->iscsi) >= MAX_NOP_FAILURES) { error_report("iSCSI: NOP timeout. Reconnecting..."); iscsilun->request_timed_out = true; } else if (iscsi_nop_out_async(iscsilun->iscsi, NULL, NULL, 0, NULL) != 0) { error_report("iSCSI: failed to sent NOP-Out. Disabling NOP messages."); - return; + goto out; } timer_mod(iscsilun->nop_timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + NOP_INTERVAL); iscsi_set_events(iscsilun); + +out: + aio_context_release(iscsilun->aio_context); } static void iscsi_readcapacity_sync(IscsiLun *iscsilun, Error **errp) diff --git a/block/linux-aio.c b/block/linux-aio.c index 03ab741d37..88b8d55ec7 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -54,10 +54,10 @@ struct LinuxAioState { io_context_t ctx; EventNotifier e; - /* io queue for submit at batch */ + /* io queue for submit at batch. Protected by AioContext lock. */ LaioQueue io_q; - /* I/O completion processing */ + /* I/O completion processing. Only runs in I/O thread. */ QEMUBH *completion_bh; int event_idx; int event_max; @@ -100,7 +100,7 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) * that! */ if (!qemu_coroutine_entered(laiocb->co)) { - qemu_coroutine_enter(laiocb->co); + aio_co_wake(laiocb->co); } } else { laiocb->common.cb(laiocb->common.opaque, ret); @@ -234,9 +234,12 @@ static void qemu_laio_process_completions(LinuxAioState *s) static void qemu_laio_process_completions_and_submit(LinuxAioState *s) { qemu_laio_process_completions(s); + + aio_context_acquire(s->aio_context); if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { ioq_submit(s); } + aio_context_release(s->aio_context); } static void qemu_laio_completion_bh(void *opaque) @@ -455,6 +458,7 @@ void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context) { aio_set_event_notifier(old_context, &s->e, false, NULL, NULL); qemu_bh_delete(s->completion_bh); + s->aio_context = NULL; } void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context) diff --git a/block/mirror.c b/block/mirror.c index 301ba9219a..698a54e50f 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -132,6 +132,8 @@ static void mirror_write_complete(void *opaque, int ret) { MirrorOp *op = opaque; MirrorBlockJob *s = op->s; + + aio_context_acquire(blk_get_aio_context(s->common.blk)); if (ret < 0) { BlockErrorAction action; @@ -142,12 +144,15 @@ static void mirror_write_complete(void *opaque, int ret) } } mirror_iteration_done(op, ret); + aio_context_release(blk_get_aio_context(s->common.blk)); } static void mirror_read_complete(void *opaque, int ret) { MirrorOp *op = opaque; MirrorBlockJob *s = op->s; + + aio_context_acquire(blk_get_aio_context(s->common.blk)); if (ret < 0) { BlockErrorAction action; @@ -158,10 +163,11 @@ static void mirror_read_complete(void *opaque, int ret) } mirror_iteration_done(op, ret); - return; + } else { + blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov, + 0, mirror_write_complete, op); } - blk_aio_pwritev(s->target, op->sector_num * BDRV_SECTOR_SIZE, &op->qiov, - 0, mirror_write_complete, op); + aio_context_release(blk_get_aio_context(s->common.blk)); } static inline void mirror_clip_sectors(MirrorBlockJob *s, diff --git a/block/nbd-client.c b/block/nbd-client.c index 06f1532805..0dc12c2d67 100644 --- a/block/nbd-client.c +++ b/block/nbd-client.c @@ -33,8 +33,9 @@ #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs)) #define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs)) -static void nbd_recv_coroutines_enter_all(NBDClientSession *s) +static void nbd_recv_coroutines_enter_all(BlockDriverState *bs) { + NBDClientSession *s = nbd_get_client_session(bs); int i; for (i = 0; i < MAX_NBD_REQUESTS; i++) { @@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSession *s) qemu_coroutine_enter(s->recv_coroutine[i]); } } + BDRV_POLL_WHILE(bs, s->read_reply_co); } static void nbd_teardown_connection(BlockDriverState *bs) @@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs) qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); - nbd_recv_coroutines_enter_all(client); + nbd_recv_coroutines_enter_all(bs); nbd_client_detach_aio_context(bs); object_unref(OBJECT(client->sioc)); @@ -65,54 +67,43 @@ static void nbd_teardown_connection(BlockDriverState *bs) client->ioc = NULL; } -static void nbd_reply_ready(void *opaque) +static coroutine_fn void nbd_read_reply_entry(void *opaque) { - BlockDriverState *bs = opaque; - NBDClientSession *s = nbd_get_client_session(bs); + NBDClientSession *s = opaque; uint64_t i; int ret; - if (!s->ioc) { /* Already closed */ - return; - } - - if (s->reply.handle == 0) { - /* No reply already in flight. Fetch a header. It is possible - * that another thread has done the same thing in parallel, so - * the socket is not readable anymore. - */ + for (;;) { + assert(s->reply.handle == 0); ret = nbd_receive_reply(s->ioc, &s->reply); - if (ret == -EAGAIN) { - return; - } if (ret < 0) { - s->reply.handle = 0; - goto fail; + break; } - } - /* There's no need for a mutex on the receive side, because the - * handler acts as a synchronization point and ensures that only - * one coroutine is called until the reply finishes. */ - i = HANDLE_TO_INDEX(s, s->reply.handle); - if (i >= MAX_NBD_REQUESTS) { - goto fail; - } + /* There's no need for a mutex on the receive side, because the + * handler acts as a synchronization point and ensures that only + * one coroutine is called until the reply finishes. + */ + i = HANDLE_TO_INDEX(s, s->reply.handle); + if (i >= MAX_NBD_REQUESTS || !s->recv_coroutine[i]) { + break; + } - if (s->recv_coroutine[i]) { - qemu_coroutine_enter(s->recv_coroutine[i]); - return; + /* We're woken up by the recv_coroutine itself. Note that there + * is no race between yielding and reentering read_reply_co. This + * is because: + * + * - if recv_coroutine[i] runs on the same AioContext, it is only + * entered after we yield + * + * - if recv_coroutine[i] runs on a different AioContext, reentering + * read_reply_co happens through a bottom half, which can only + * run after we yield. + */ + aio_co_wake(s->recv_coroutine[i]); + qemu_coroutine_yield(); } - -fail: - nbd_teardown_connection(bs); -} - -static void nbd_restart_write(void *opaque) -{ - BlockDriverState *bs = opaque; - - qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine); + s->read_reply_co = NULL; } static int nbd_co_send_request(BlockDriverState *bs, @@ -120,7 +111,6 @@ static int nbd_co_send_request(BlockDriverState *bs, QEMUIOVector *qiov) { NBDClientSession *s = nbd_get_client_session(bs); - AioContext *aio_context; int rc, ret, i; qemu_co_mutex_lock(&s->send_mutex); @@ -141,11 +131,6 @@ static int nbd_co_send_request(BlockDriverState *bs, return -EPIPE; } - s->send_coroutine = qemu_coroutine_self(); - aio_context = bdrv_get_aio_context(bs); - - aio_set_fd_handler(aio_context, s->sioc->fd, false, - nbd_reply_ready, nbd_restart_write, NULL, bs); if (qiov) { qio_channel_set_cork(s->ioc, true); rc = nbd_send_request(s->ioc, request); @@ -160,9 +145,6 @@ static int nbd_co_send_request(BlockDriverState *bs, } else { rc = nbd_send_request(s->ioc, request); } - aio_set_fd_handler(aio_context, s->sioc->fd, false, - nbd_reply_ready, NULL, NULL, bs); - s->send_coroutine = NULL; qemu_co_mutex_unlock(&s->send_mutex); return rc; } @@ -174,8 +156,7 @@ static void nbd_co_receive_reply(NBDClientSession *s, { int ret; - /* Wait until we're woken up by the read handler. TODO: perhaps - * peek at the next reply and avoid yielding if it's ours? */ + /* Wait until we're woken up by nbd_read_reply_entry. */ qemu_coroutine_yield(); *reply = s->reply; if (reply->handle != request->handle || @@ -201,7 +182,7 @@ static void nbd_coroutine_start(NBDClientSession *s, /* Poor man semaphore. The free_sema is locked when no other request * can be accepted, and unlocked after receiving one reply. */ if (s->in_flight == MAX_NBD_REQUESTS) { - qemu_co_queue_wait(&s->free_sema); + qemu_co_queue_wait(&s->free_sema, NULL); assert(s->in_flight < MAX_NBD_REQUESTS); } s->in_flight++; @@ -209,13 +190,19 @@ static void nbd_coroutine_start(NBDClientSession *s, /* s->recv_coroutine[i] is set as soon as we get the send_lock. */ } -static void nbd_coroutine_end(NBDClientSession *s, +static void nbd_coroutine_end(BlockDriverState *bs, NBDRequest *request) { + NBDClientSession *s = nbd_get_client_session(bs); int i = HANDLE_TO_INDEX(s, request->handle); + s->recv_coroutine[i] = NULL; - if (s->in_flight-- == MAX_NBD_REQUESTS) { - qemu_co_queue_next(&s->free_sema); + s->in_flight--; + qemu_co_queue_next(&s->free_sema); + + /* Kick the read_reply_co to get the next reply. */ + if (s->read_reply_co) { + aio_co_wake(s->read_reply_co); } } @@ -241,7 +228,7 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset, } else { nbd_co_receive_reply(client, &request, &reply, qiov); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } @@ -271,7 +258,7 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset, } else { nbd_co_receive_reply(client, &request, &reply, NULL); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } @@ -306,7 +293,7 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, } else { nbd_co_receive_reply(client, &request, &reply, NULL); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } @@ -331,7 +318,7 @@ int nbd_client_co_flush(BlockDriverState *bs) } else { nbd_co_receive_reply(client, &request, &reply, NULL); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } @@ -357,23 +344,23 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count) } else { nbd_co_receive_reply(client, &request, &reply, NULL); } - nbd_coroutine_end(client, &request); + nbd_coroutine_end(bs, &request); return -reply.error; } void nbd_client_detach_aio_context(BlockDriverState *bs) { - aio_set_fd_handler(bdrv_get_aio_context(bs), - nbd_get_client_session(bs)->sioc->fd, - false, NULL, NULL, NULL, NULL); + NBDClientSession *client = nbd_get_client_session(bs); + qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc)); } void nbd_client_attach_aio_context(BlockDriverState *bs, AioContext *new_context) { - aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd, - false, nbd_reply_ready, NULL, NULL, bs); + NBDClientSession *client = nbd_get_client_session(bs); + qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context); + aio_co_schedule(new_context, client->read_reply_co); } void nbd_client_close(BlockDriverState *bs) @@ -434,7 +421,7 @@ int nbd_client_init(BlockDriverState *bs, /* Now that we're connected, set the socket to be non-blocking and * kick the reply mechanism. */ qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL); - + client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client); nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs)); logout("Established connection with NBD server\n"); diff --git a/block/nbd-client.h b/block/nbd-client.h index f8d6006849..8cdfc92e94 100644 --- a/block/nbd-client.h +++ b/block/nbd-client.h @@ -25,7 +25,7 @@ typedef struct NBDClientSession { CoMutex send_mutex; CoQueue free_sema; - Coroutine *send_coroutine; + Coroutine *read_reply_co; int in_flight; Coroutine *recv_coroutine[MAX_NBD_REQUESTS]; diff --git a/block/nfs.c b/block/nfs.c index 689eaa792e..08b43dd189 100644 --- a/block/nfs.c +++ b/block/nfs.c @@ -208,15 +208,21 @@ static void nfs_set_events(NFSClient *client) static void nfs_process_read(void *arg) { NFSClient *client = arg; + + aio_context_acquire(client->aio_context); nfs_service(client->context, POLLIN); nfs_set_events(client); + aio_context_release(client->aio_context); } static void nfs_process_write(void *arg) { NFSClient *client = arg; + + aio_context_acquire(client->aio_context); nfs_service(client->context, POLLOUT); nfs_set_events(client); + aio_context_release(client->aio_context); } static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task) @@ -231,8 +237,9 @@ static void nfs_co_init_task(BlockDriverState *bs, NFSRPC *task) static void nfs_co_generic_bh_cb(void *opaque) { NFSRPC *task = opaque; + task->complete = 1; - qemu_coroutine_enter(task->co); + aio_co_wake(task->co); } static void diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c index 928c1e298d..78c11d4948 100644 --- a/block/qcow2-cluster.c +++ b/block/qcow2-cluster.c @@ -932,9 +932,7 @@ static int handle_dependencies(BlockDriverState *bs, uint64_t guest_offset, if (bytes == 0) { /* Wait for the dependency to complete. We need to recheck * the free/allocated clusters when we continue. */ - qemu_co_mutex_unlock(&s->lock); - qemu_co_queue_wait(&old_alloc->dependent_requests); - qemu_co_mutex_lock(&s->lock); + qemu_co_queue_wait(&old_alloc->dependent_requests, &s->lock); return -EAGAIN; } } diff --git a/block/qed-cluster.c b/block/qed-cluster.c index c24e75616a..8f5da74c4d 100644 --- a/block/qed-cluster.c +++ b/block/qed-cluster.c @@ -83,6 +83,7 @@ static void qed_find_cluster_cb(void *opaque, int ret) unsigned int index; unsigned int n; + qed_acquire(s); if (ret) { goto out; } @@ -109,6 +110,7 @@ static void qed_find_cluster_cb(void *opaque, int ret) out: find_cluster_cb->cb(find_cluster_cb->opaque, ret, offset, len); + qed_release(s); g_free(find_cluster_cb); } diff --git a/block/qed-table.c b/block/qed-table.c index ed443e2b70..b12c298a8a 100644 --- a/block/qed-table.c +++ b/block/qed-table.c @@ -31,6 +31,7 @@ static void qed_read_table_cb(void *opaque, int ret) { QEDReadTableCB *read_table_cb = opaque; QEDTable *table = read_table_cb->table; + BDRVQEDState *s = read_table_cb->s; int noffsets = read_table_cb->qiov.size / sizeof(uint64_t); int i; @@ -40,13 +41,15 @@ static void qed_read_table_cb(void *opaque, int ret) } /* Byteswap offsets */ + qed_acquire(s); for (i = 0; i < noffsets; i++) { table->offsets[i] = le64_to_cpu(table->offsets[i]); } + qed_release(s); out: /* Completion */ - trace_qed_read_table_cb(read_table_cb->s, read_table_cb->table, ret); + trace_qed_read_table_cb(s, read_table_cb->table, ret); gencb_complete(&read_table_cb->gencb, ret); } @@ -84,8 +87,9 @@ typedef struct { static void qed_write_table_cb(void *opaque, int ret) { QEDWriteTableCB *write_table_cb = opaque; + BDRVQEDState *s = write_table_cb->s; - trace_qed_write_table_cb(write_table_cb->s, + trace_qed_write_table_cb(s, write_table_cb->orig_table, write_table_cb->flush, ret); @@ -97,8 +101,10 @@ static void qed_write_table_cb(void *opaque, int ret) if (write_table_cb->flush) { /* We still need to flush first */ write_table_cb->flush = false; + qed_acquire(s); bdrv_aio_flush(write_table_cb->s->bs, qed_write_table_cb, write_table_cb); + qed_release(s); return; } @@ -213,6 +219,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret) CachedL2Table *l2_table = request->l2_table; uint64_t l2_offset = read_l2_table_cb->l2_offset; + qed_acquire(s); if (ret) { /* can't trust loaded L2 table anymore */ qed_unref_l2_cache_entry(l2_table); @@ -228,6 +235,7 @@ static void qed_read_l2_table_cb(void *opaque, int ret) request->l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset); assert(request->l2_table != NULL); } + qed_release(s); gencb_complete(&read_l2_table_cb->gencb, ret); } diff --git a/block/qed.c b/block/qed.c index 1a7ef0a9ce..0b62c7799e 100644 --- a/block/qed.c +++ b/block/qed.c @@ -273,7 +273,19 @@ static CachedL2Table *qed_new_l2_table(BDRVQEDState *s) return l2_table; } -static void qed_aio_next_io(void *opaque, int ret); +static void qed_aio_next_io(QEDAIOCB *acb, int ret); + +static void qed_aio_start_io(QEDAIOCB *acb) +{ + qed_aio_next_io(acb, 0); +} + +static void qed_aio_next_io_cb(void *opaque, int ret) +{ + QEDAIOCB *acb = opaque; + + qed_aio_next_io(acb, ret); +} static void qed_plug_allocating_write_reqs(BDRVQEDState *s) { @@ -292,7 +304,7 @@ static void qed_unplug_allocating_write_reqs(BDRVQEDState *s) acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs); if (acb) { - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); } } @@ -333,10 +345,22 @@ static void qed_need_check_timer_cb(void *opaque) trace_qed_need_check_timer_cb(s); + qed_acquire(s); qed_plug_allocating_write_reqs(s); /* Ensure writes are on disk before clearing flag */ bdrv_aio_flush(s->bs->file->bs, qed_clear_need_check, s); + qed_release(s); +} + +void qed_acquire(BDRVQEDState *s) +{ + aio_context_acquire(bdrv_get_aio_context(s->bs)); +} + +void qed_release(BDRVQEDState *s) +{ + aio_context_release(bdrv_get_aio_context(s->bs)); } static void qed_start_need_check_timer(BDRVQEDState *s) @@ -721,7 +745,7 @@ static void qed_is_allocated_cb(void *opaque, int ret, uint64_t offset, size_t l } if (cb->co) { - qemu_coroutine_enter(cb->co); + aio_co_wake(cb->co); } } @@ -918,6 +942,7 @@ static void qed_update_l2_table(BDRVQEDState *s, QEDTable *table, int index, static void qed_aio_complete_bh(void *opaque) { QEDAIOCB *acb = opaque; + BDRVQEDState *s = acb_to_s(acb); BlockCompletionFunc *cb = acb->common.cb; void *user_opaque = acb->common.opaque; int ret = acb->bh_ret; @@ -925,7 +950,9 @@ static void qed_aio_complete_bh(void *opaque) qemu_aio_unref(acb); /* Invoke callback */ + qed_acquire(s); cb(user_opaque, ret); + qed_release(s); } static void qed_aio_complete(QEDAIOCB *acb, int ret) @@ -959,7 +986,7 @@ static void qed_aio_complete(QEDAIOCB *acb, int ret) QSIMPLEQ_REMOVE_HEAD(&s->allocating_write_reqs, next); acb = QSIMPLEQ_FIRST(&s->allocating_write_reqs); if (acb) { - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); } else if (s->header.features & QED_F_NEED_CHECK) { qed_start_need_check_timer(s); } @@ -984,7 +1011,7 @@ static void qed_commit_l2_update(void *opaque, int ret) acb->request.l2_table = qed_find_l2_cache_entry(&s->l2_cache, l2_offset); assert(acb->request.l2_table != NULL); - qed_aio_next_io(opaque, ret); + qed_aio_next_io(acb, ret); } /** @@ -1032,11 +1059,11 @@ static void qed_aio_write_l2_update(QEDAIOCB *acb, int ret, uint64_t offset) if (need_alloc) { /* Write out the whole new L2 table */ qed_write_l2_table(s, &acb->request, 0, s->table_nelems, true, - qed_aio_write_l1_update, acb); + qed_aio_write_l1_update, acb); } else { /* Write out only the updated part of the L2 table */ qed_write_l2_table(s, &acb->request, index, acb->cur_nclusters, false, - qed_aio_next_io, acb); + qed_aio_next_io_cb, acb); } return; @@ -1088,7 +1115,7 @@ static void qed_aio_write_main(void *opaque, int ret) } if (acb->find_cluster_ret == QED_CLUSTER_FOUND) { - next_fn = qed_aio_next_io; + next_fn = qed_aio_next_io_cb; } else { if (s->bs->backing) { next_fn = qed_aio_write_flush_before_l2_update; @@ -1201,7 +1228,7 @@ static void qed_aio_write_alloc(QEDAIOCB *acb, size_t len) if (acb->flags & QED_AIOCB_ZERO) { /* Skip ahead if the clusters are already zero */ if (acb->find_cluster_ret == QED_CLUSTER_ZERO) { - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); return; } @@ -1321,18 +1348,18 @@ static void qed_aio_read_data(void *opaque, int ret, /* Handle zero cluster and backing file reads */ if (ret == QED_CLUSTER_ZERO) { qemu_iovec_memset(&acb->cur_qiov, 0, 0, acb->cur_qiov.size); - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); return; } else if (ret != QED_CLUSTER_FOUND) { qed_read_backing_file(s, acb->cur_pos, &acb->cur_qiov, - &acb->backing_qiov, qed_aio_next_io, acb); + &acb->backing_qiov, qed_aio_next_io_cb, acb); return; } BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO); bdrv_aio_readv(bs->file, offset / BDRV_SECTOR_SIZE, &acb->cur_qiov, acb->cur_qiov.size / BDRV_SECTOR_SIZE, - qed_aio_next_io, acb); + qed_aio_next_io_cb, acb); return; err: @@ -1342,9 +1369,8 @@ err: /** * Begin next I/O or complete the request */ -static void qed_aio_next_io(void *opaque, int ret) +static void qed_aio_next_io(QEDAIOCB *acb, int ret) { - QEDAIOCB *acb = opaque; BDRVQEDState *s = acb_to_s(acb); QEDFindClusterFunc *io_fn = (acb->flags & QED_AIOCB_WRITE) ? qed_aio_write_data : qed_aio_read_data; @@ -1400,7 +1426,7 @@ static BlockAIOCB *qed_aio_setup(BlockDriverState *bs, qemu_iovec_init(&acb->cur_qiov, qiov->niov); /* Start request */ - qed_aio_next_io(acb, 0); + qed_aio_start_io(acb); return &acb->common; } @@ -1436,7 +1462,7 @@ static void coroutine_fn qed_co_pwrite_zeroes_cb(void *opaque, int ret) cb->done = true; cb->ret = ret; if (cb->co) { - qemu_coroutine_enter(cb->co); + aio_co_wake(cb->co); } } diff --git a/block/qed.h b/block/qed.h index 9676ab9479..ce8c314089 100644 --- a/block/qed.h +++ b/block/qed.h @@ -198,6 +198,9 @@ enum { */ typedef void QEDFindClusterFunc(void *opaque, int ret, uint64_t offset, size_t len); +void qed_acquire(BDRVQEDState *s); +void qed_release(BDRVQEDState *s); + /** * Generic callback for chaining async callbacks */ diff --git a/block/sheepdog.c b/block/sheepdog.c index f757157cea..860ba61502 100644 --- a/block/sheepdog.c +++ b/block/sheepdog.c @@ -486,7 +486,7 @@ static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb) retry: QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) { if (AIOCBOverlapping(acb, cb)) { - qemu_co_queue_wait(&s->overlapping_queue); + qemu_co_queue_wait(&s->overlapping_queue, NULL); goto retry; } } @@ -575,13 +575,6 @@ static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data, return ret; } -static void restart_co_req(void *opaque) -{ - Coroutine *co = opaque; - - qemu_coroutine_enter(co); -} - typedef struct SheepdogReqCo { int sockfd; BlockDriverState *bs; @@ -592,12 +585,19 @@ typedef struct SheepdogReqCo { unsigned int *rlen; int ret; bool finished; + Coroutine *co; } SheepdogReqCo; +static void restart_co_req(void *opaque) +{ + SheepdogReqCo *srco = opaque; + + aio_co_wake(srco->co); +} + static coroutine_fn void do_co_req(void *opaque) { int ret; - Coroutine *co; SheepdogReqCo *srco = opaque; int sockfd = srco->sockfd; SheepdogReq *hdr = srco->hdr; @@ -605,9 +605,9 @@ static coroutine_fn void do_co_req(void *opaque) unsigned int *wlen = srco->wlen; unsigned int *rlen = srco->rlen; - co = qemu_coroutine_self(); + srco->co = qemu_coroutine_self(); aio_set_fd_handler(srco->aio_context, sockfd, false, - NULL, restart_co_req, NULL, co); + NULL, restart_co_req, NULL, srco); ret = send_co_req(sockfd, hdr, data, wlen); if (ret < 0) { @@ -615,7 +615,7 @@ static coroutine_fn void do_co_req(void *opaque) } aio_set_fd_handler(srco->aio_context, sockfd, false, - restart_co_req, NULL, NULL, co); + restart_co_req, NULL, NULL, srco); ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr)); if (ret != sizeof(*hdr)) { @@ -643,6 +643,7 @@ out: aio_set_fd_handler(srco->aio_context, sockfd, false, NULL, NULL, NULL, NULL); + srco->co = NULL; srco->ret = ret; srco->finished = true; if (srco->bs) { @@ -866,7 +867,7 @@ static void coroutine_fn aio_read_response(void *opaque) * We've finished all requests which belong to the AIOCB, so * we can switch back to sd_co_readv/writev now. */ - qemu_coroutine_enter(acb->coroutine); + aio_co_wake(acb->coroutine); } return; @@ -883,14 +884,14 @@ static void co_read_response(void *opaque) s->co_recv = qemu_coroutine_create(aio_read_response, opaque); } - qemu_coroutine_enter(s->co_recv); + aio_co_wake(s->co_recv); } static void co_write_request(void *opaque) { BDRVSheepdogState *s = opaque; - qemu_coroutine_enter(s->co_send); + aio_co_wake(s->co_send); } /* diff --git a/block/ssh.c b/block/ssh.c index e0edf20f78..835932e6a4 100644 --- a/block/ssh.c +++ b/block/ssh.c @@ -889,10 +889,14 @@ static void restart_coroutine(void *opaque) DPRINTF("co=%p", co); - qemu_coroutine_enter(co); + aio_co_wake(co); } -static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs) +/* A non-blocking call returned EAGAIN, so yield, ensuring the + * handlers are set up so that we'll be rescheduled when there is an + * interesting event on the socket. + */ +static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs) { int r; IOHandler *rd_handler = NULL, *wr_handler = NULL; @@ -912,25 +916,10 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs) aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false, rd_handler, wr_handler, NULL, co); -} - -static coroutine_fn void clear_fd_handler(BDRVSSHState *s, - BlockDriverState *bs) -{ - DPRINTF("s->sock=%d", s->sock); - aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, - false, NULL, NULL, NULL, NULL); -} - -/* A non-blocking call returned EAGAIN, so yield, ensuring the - * handlers are set up so that we'll be rescheduled when there is an - * interesting event on the socket. - */ -static coroutine_fn void co_yield(BDRVSSHState *s, BlockDriverState *bs) -{ - set_fd_handler(s, bs); qemu_coroutine_yield(); - clear_fd_handler(s, bs); + DPRINTF("s->sock=%d - back", s->sock); + aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, false, + NULL, NULL, NULL, NULL); } /* SFTP has a function `libssh2_sftp_seek64' which seeks to a position diff --git a/block/throttle-groups.c b/block/throttle-groups.c index 17b2efb7c7..b73e7a800b 100644 --- a/block/throttle-groups.c +++ b/block/throttle-groups.c @@ -326,7 +326,7 @@ void coroutine_fn throttle_group_co_io_limits_intercept(BlockBackend *blk, if (must_wait || blkp->pending_reqs[is_write]) { blkp->pending_reqs[is_write]++; qemu_mutex_unlock(&tg->lock); - qemu_co_queue_wait(&blkp->throttled_reqs[is_write]); + qemu_co_queue_wait(&blkp->throttled_reqs[is_write], NULL); qemu_mutex_lock(&tg->lock); blkp->pending_reqs[is_write]--; } @@ -416,7 +416,9 @@ static void timer_cb(BlockBackend *blk, bool is_write) qemu_mutex_unlock(&tg->lock); /* Run the request that was waiting for this timer */ + aio_context_acquire(blk_get_aio_context(blk)); empty_queue = !qemu_co_enter_next(&blkp->throttled_reqs[is_write]); + aio_context_release(blk_get_aio_context(blk)); /* If the request queue was empty then we have to take care of * scheduling the next one */ diff --git a/block/win32-aio.c b/block/win32-aio.c index 8cdf73b00d..3be8f458fa 100644 --- a/block/win32-aio.c +++ b/block/win32-aio.c @@ -41,7 +41,7 @@ struct QEMUWin32AIOState { HANDLE hIOCP; EventNotifier e; int count; - bool is_aio_context_attached; + AioContext *aio_ctx; }; typedef struct QEMUWin32AIOCB { @@ -87,7 +87,6 @@ static void win32_aio_process_completion(QEMUWin32AIOState *s, qemu_vfree(waiocb->buf); } - waiocb->common.cb(waiocb->common.opaque, ret); qemu_aio_unref(waiocb); } @@ -176,13 +175,13 @@ void win32_aio_detach_aio_context(QEMUWin32AIOState *aio, AioContext *old_context) { aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL); - aio->is_aio_context_attached = false; + aio->aio_ctx = NULL; } void win32_aio_attach_aio_context(QEMUWin32AIOState *aio, AioContext *new_context) { - aio->is_aio_context_attached = true; + aio->aio_ctx = new_context; aio_set_event_notifier(new_context, &aio->e, false, win32_aio_completion_cb, NULL); } @@ -212,7 +211,7 @@ out_free_state: void win32_aio_cleanup(QEMUWin32AIOState *aio) { - assert(!aio->is_aio_context_attached); + assert(!aio->aio_ctx); CloseHandle(aio->hIOCP); event_notifier_cleanup(&aio->e); g_free(aio); |