diff options
author | Peter Xu <peterx@redhat.com> | 2018-10-09 14:27:13 +0800 |
---|---|---|
committer | Markus Armbruster <armbru@redhat.com> | 2018-12-12 09:55:57 +0100 |
commit | 9ab84470ffc2781df3acd4607bc6d2ae64d6d7e3 (patch) | |
tree | 0096dc70b9747e76ee7d144f057fcabe626ab235 /monitor.c | |
parent | 34f1f3e06d8824740d3bc41556f8300f0fb463cf (diff) |
monitor: Suspend monitor instead dropping commands
When a QMP client sends in-band commands more quickly that we can
process them, we can either queue them without limit (QUEUE), drop
commands when the queue is full (DROP), or suspend receiving commands
when the queue is full (SUSPEND). None of them is ideal:
* QUEUE lets a misbehaving client make QEMU eat memory without bounds.
Not such a hot idea.
* With DROP, the client has to cope with dropped in-band commands. To
inform the client, we send a COMMAND_DROPPED event then. The event is
flawed by design in two ways: it's ambiguous (see commit d621cfe0a17),
and it brings back the "eat memory without bounds" problem.
* With SUSPEND, the client has to manage the flow of in-band commands to
keep the monitor available for out-of-band commands.
We currently DROP. Switch to SUSPEND.
Managing the flow of in-band commands to keep the monitor available for
out-of-band commands isn't really hard: just count the number of
"outstanding" in-band commands (commands sent minus replies received),
and if it exceeds the limit, hold back additional ones until it drops
below the limit again.
Note that we need to be careful pairing the suspend with a resume, or
else the monitor will hang, possibly forever. And here since we need to
make sure both:
(1) popping request from the req queue, and
(2) reading length of the req queue
will be in the same critical section, we let the pop function take the
corresponding queue lock when there is a request, then we release the
lock from the caller.
Reviewed-by: Marc-André Lureau <marcandre.lureau@redhat.com>
Signed-off-by: Peter Xu <peterx@redhat.com>
Message-Id: <20181009062718.1914-2-peterx@redhat.com>
Signed-off-by: Markus Armbruster <armbru@redhat.com>
Diffstat (limited to 'monitor.c')
-rw-r--r-- | monitor.c | 52 |
1 files changed, 23 insertions, 29 deletions
@@ -4110,8 +4110,12 @@ static void monitor_qmp_dispatch(Monitor *mon, QObject *req, QObject *id) * processing commands only on a very busy monitor. To achieve that, * when we process one request on a specific monitor, we put that * monitor to the end of mon_list queue. + * + * Note: if the function returned with non-NULL, then the caller will + * be with mon->qmp.qmp_queue_lock held, and the caller is responsible + * to release it. */ -static QMPRequest *monitor_qmp_requests_pop_any(void) +static QMPRequest *monitor_qmp_requests_pop_any_with_lock(void) { QMPRequest *req_obj = NULL; Monitor *mon; @@ -4121,10 +4125,11 @@ static QMPRequest *monitor_qmp_requests_pop_any(void) QTAILQ_FOREACH(mon, &mon_list, entry) { qemu_mutex_lock(&mon->qmp.qmp_queue_lock); req_obj = g_queue_pop_head(mon->qmp.qmp_requests); - qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); if (req_obj) { + /* With the lock of corresponding queue held */ break; } + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); } if (req_obj) { @@ -4143,30 +4148,34 @@ static QMPRequest *monitor_qmp_requests_pop_any(void) static void monitor_qmp_bh_dispatcher(void *data) { - QMPRequest *req_obj = monitor_qmp_requests_pop_any(); + QMPRequest *req_obj = monitor_qmp_requests_pop_any_with_lock(); QDict *rsp; bool need_resume; + Monitor *mon; if (!req_obj) { return; } + mon = req_obj->mon; /* qmp_oob_enabled() might change after "qmp_capabilities" */ - need_resume = !qmp_oob_enabled(req_obj->mon); + need_resume = !qmp_oob_enabled(mon) || + mon->qmp.qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1; + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); if (req_obj->req) { trace_monitor_qmp_cmd_in_band(qobject_get_try_str(req_obj->id) ?: ""); - monitor_qmp_dispatch(req_obj->mon, req_obj->req, req_obj->id); + monitor_qmp_dispatch(mon, req_obj->req, req_obj->id); } else { assert(req_obj->err); rsp = qmp_error_response(req_obj->err); req_obj->err = NULL; - monitor_qmp_respond(req_obj->mon, rsp, NULL); + monitor_qmp_respond(mon, rsp, NULL); qobject_unref(rsp); } if (need_resume) { /* Pairs with the monitor_suspend() in handle_qmp_command() */ - monitor_resume(req_obj->mon); + monitor_resume(mon); } qmp_request_free(req_obj); @@ -4174,8 +4183,6 @@ static void monitor_qmp_bh_dispatcher(void *data) qemu_bh_schedule(qmp_dispatcher_bh); } -#define QMP_REQ_QUEUE_LEN_MAX (8) - static void handle_qmp_command(void *opaque, QObject *req, Error *err) { Monitor *mon = opaque; @@ -4217,28 +4224,14 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) qemu_mutex_lock(&mon->qmp.qmp_queue_lock); /* - * If OOB is not enabled on the current monitor, we'll emulate the - * old behavior that we won't process the current monitor any more - * until it has responded. This helps make sure that as long as - * OOB is not enabled, the server will never drop any command. + * Suspend the monitor when we can't queue more requests after + * this one. Dequeuing in monitor_qmp_bh_dispatcher() will resume + * it. Note that when OOB is disabled, we queue at most one + * command, for backward compatibility. */ - if (!qmp_oob_enabled(mon)) { + if (!qmp_oob_enabled(mon) || + mon->qmp.qmp_requests->length == QMP_REQ_QUEUE_LEN_MAX - 1) { monitor_suspend(mon); - } else { - /* Drop the request if queue is full. */ - if (mon->qmp.qmp_requests->length >= QMP_REQ_QUEUE_LEN_MAX) { - qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); - /* - * FIXME @id's scope is just @mon, and broadcasting it is - * wrong. If another monitor's client has a command with - * the same ID in flight, the event will incorrectly claim - * that command was dropped. - */ - qapi_event_send_command_dropped(id, - COMMAND_DROP_REASON_QUEUE_FULL); - qmp_request_free(req_obj); - return; - } } /* @@ -4246,6 +4239,7 @@ static void handle_qmp_command(void *opaque, QObject *req, Error *err) * handled in time order. Ownership for req_obj, req, id, * etc. will be delivered to the handler side. */ + assert(mon->qmp.qmp_requests->length < QMP_REQ_QUEUE_LEN_MAX); g_queue_push_tail(mon->qmp.qmp_requests, req_obj); qemu_mutex_unlock(&mon->qmp.qmp_queue_lock); |