aboutsummaryrefslogtreecommitdiff
path: root/block/throttle-groups.c
diff options
context:
space:
mode:
Diffstat (limited to 'block/throttle-groups.c')
-rw-r--r--block/throttle-groups.c214
1 files changed, 210 insertions, 4 deletions
diff --git a/block/throttle-groups.c b/block/throttle-groups.c
index 352077f98c..da8c70c4a6 100644
--- a/block/throttle-groups.c
+++ b/block/throttle-groups.c
@@ -23,6 +23,9 @@
*/
#include "block/throttle-groups.h"
+#include "qemu/queue.h"
+#include "qemu/thread.h"
+#include "sysemu/qtest.h"
/* The ThrottleGroup structure (with its ThrottleState) is shared
* among different BlockDriverState and it's independent from
@@ -160,6 +163,153 @@ static BlockDriverState *throttle_group_next_bs(BlockDriverState *bs)
return next;
}
+/* Return the next BlockDriverState in the round-robin sequence with
+ * pending I/O requests.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @bs: the current BlockDriverState
+ * @is_write: the type of operation (read/write)
+ * @ret: the next BlockDriverState with pending requests, or bs
+ * if there is none.
+ */
+static BlockDriverState *next_throttle_token(BlockDriverState *bs,
+ bool is_write)
+{
+ ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+ BlockDriverState *token, *start;
+
+ start = token = tg->tokens[is_write];
+
+ /* get next bs round in round robin style */
+ token = throttle_group_next_bs(token);
+ while (token != start && !token->pending_reqs[is_write]) {
+ token = throttle_group_next_bs(token);
+ }
+
+ /* If no IO are queued for scheduling on the next round robin token
+ * then decide the token is the current bs because chances are
+ * the current bs get the current request queued.
+ */
+ if (token == start && !token->pending_reqs[is_write]) {
+ token = bs;
+ }
+
+ return token;
+}
+
+/* Check if the next I/O request for a BlockDriverState needs to be
+ * throttled or not. If there's no timer set in this group, set one
+ * and update the token accordingly.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @bs: the current BlockDriverState
+ * @is_write: the type of operation (read/write)
+ * @ret: whether the I/O request needs to be throttled or not
+ */
+static bool throttle_group_schedule_timer(BlockDriverState *bs,
+ bool is_write)
+{
+ ThrottleState *ts = bs->throttle_state;
+ ThrottleTimers *tt = &bs->throttle_timers;
+ ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+ bool must_wait;
+
+ /* Check if any of the timers in this group is already armed */
+ if (tg->any_timer_armed[is_write]) {
+ return true;
+ }
+
+ must_wait = throttle_schedule_timer(ts, tt, is_write);
+
+ /* If a timer just got armed, set bs as the current token */
+ if (must_wait) {
+ tg->tokens[is_write] = bs;
+ tg->any_timer_armed[is_write] = true;
+ }
+
+ return must_wait;
+}
+
+/* Look for the next pending I/O request and schedule it.
+ *
+ * This assumes that tg->lock is held.
+ *
+ * @bs: the current BlockDriverState
+ * @is_write: the type of operation (read/write)
+ */
+static void schedule_next_request(BlockDriverState *bs, bool is_write)
+{
+ ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+ bool must_wait;
+ BlockDriverState *token;
+
+ /* Check if there's any pending request to schedule next */
+ token = next_throttle_token(bs, is_write);
+ if (!token->pending_reqs[is_write]) {
+ return;
+ }
+
+ /* Set a timer for the request if it needs to be throttled */
+ must_wait = throttle_group_schedule_timer(token, is_write);
+
+ /* If it doesn't have to wait, queue it for immediate execution */
+ if (!must_wait) {
+ /* Give preference to requests from the current bs */
+ if (qemu_in_coroutine() &&
+ qemu_co_queue_next(&bs->throttled_reqs[is_write])) {
+ token = bs;
+ } else {
+ ThrottleTimers *tt = &token->throttle_timers;
+ int64_t now = qemu_clock_get_ns(tt->clock_type);
+ timer_mod(tt->timers[is_write], now + 1);
+ tg->any_timer_armed[is_write] = true;
+ }
+ tg->tokens[is_write] = token;
+ }
+}
+
+/* Check if an I/O request needs to be throttled, wait and set a timer
+ * if necessary, and schedule the next request using a round robin
+ * algorithm.
+ *
+ * @bs: the current BlockDriverState
+ * @bytes: the number of bytes for this I/O
+ * @is_write: the type of operation (read/write)
+ */
+void coroutine_fn throttle_group_co_io_limits_intercept(BlockDriverState *bs,
+ unsigned int bytes,
+ bool is_write)
+{
+ bool must_wait;
+ BlockDriverState *token;
+
+ ThrottleGroup *tg = container_of(bs->throttle_state, ThrottleGroup, ts);
+ qemu_mutex_lock(&tg->lock);
+
+ /* First we check if this I/O has to be throttled. */
+ token = next_throttle_token(bs, is_write);
+ must_wait = throttle_group_schedule_timer(token, is_write);
+
+ /* Wait if there's a timer set or queued requests of this type */
+ if (must_wait || bs->pending_reqs[is_write]) {
+ bs->pending_reqs[is_write]++;
+ qemu_mutex_unlock(&tg->lock);
+ qemu_co_queue_wait(&bs->throttled_reqs[is_write]);
+ qemu_mutex_lock(&tg->lock);
+ bs->pending_reqs[is_write]--;
+ }
+
+ /* The I/O will be executed, so do the accounting */
+ throttle_account(bs->throttle_state, is_write, bytes);
+
+ /* Schedule the next request */
+ schedule_next_request(bs, is_write);
+
+ qemu_mutex_unlock(&tg->lock);
+}
+
/* Update the throttle configuration for a particular group. Similar
* to throttle_config(), but guarantees atomicity within the
* throttling group.
@@ -195,9 +345,49 @@ void throttle_group_get_config(BlockDriverState *bs, ThrottleConfig *cfg)
qemu_mutex_unlock(&tg->lock);
}
-/* Register a BlockDriverState in the throttling group, also updating
- * its throttle_state pointer to point to it. If a throttling group
- * with that name does not exist yet, it will be created.
+/* ThrottleTimers callback. This wakes up a request that was waiting
+ * because it had been throttled.
+ *
+ * @bs: the BlockDriverState whose request had been throttled
+ * @is_write: the type of operation (read/write)
+ */
+static void timer_cb(BlockDriverState *bs, bool is_write)
+{
+ ThrottleState *ts = bs->throttle_state;
+ ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
+ bool empty_queue;
+
+ /* The timer has just been fired, so we can update the flag */
+ qemu_mutex_lock(&tg->lock);
+ tg->any_timer_armed[is_write] = false;
+ qemu_mutex_unlock(&tg->lock);
+
+ /* Run the request that was waiting for this timer */
+ empty_queue = !qemu_co_enter_next(&bs->throttled_reqs[is_write]);
+
+ /* If the request queue was empty then we have to take care of
+ * scheduling the next one */
+ if (empty_queue) {
+ qemu_mutex_lock(&tg->lock);
+ schedule_next_request(bs, is_write);
+ qemu_mutex_unlock(&tg->lock);
+ }
+}
+
+static void read_timer_cb(void *opaque)
+{
+ timer_cb(opaque, false);
+}
+
+static void write_timer_cb(void *opaque)
+{
+ timer_cb(opaque, true);
+}
+
+/* Register a BlockDriverState in the throttling group, also
+ * initializing its timers and updating its throttle_state pointer to
+ * point to it. If a throttling group with that name does not exist
+ * yet, it will be created.
*
* @bs: the BlockDriverState to insert
* @groupname: the name of the group
@@ -206,6 +396,12 @@ void throttle_group_register_bs(BlockDriverState *bs, const char *groupname)
{
int i;
ThrottleGroup *tg = throttle_group_incref(groupname);
+ int clock_type = QEMU_CLOCK_REALTIME;
+
+ if (qtest_enabled()) {
+ /* For testing block IO throttling only */
+ clock_type = QEMU_CLOCK_VIRTUAL;
+ }
bs->throttle_state = &tg->ts;
@@ -218,11 +414,20 @@ void throttle_group_register_bs(BlockDriverState *bs, const char *groupname)
}
QLIST_INSERT_HEAD(&tg->head, bs, round_robin);
+
+ throttle_timers_init(&bs->throttle_timers,
+ bdrv_get_aio_context(bs),
+ clock_type,
+ read_timer_cb,
+ write_timer_cb,
+ bs);
+
qemu_mutex_unlock(&tg->lock);
}
/* Unregister a BlockDriverState from its group, removing it from the
- * list and setting the throttle_state pointer to NULL.
+ * list, destroying the timers and setting the throttle_state pointer
+ * to NULL.
*
* The group will be destroyed if it's empty after this operation.
*
@@ -247,6 +452,7 @@ void throttle_group_unregister_bs(BlockDriverState *bs)
/* remove the current bs from the list */
QLIST_REMOVE(bs, round_robin);
+ throttle_timers_destroy(&bs->throttle_timers);
qemu_mutex_unlock(&tg->lock);
throttle_group_unref(tg);