1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
|
/*
* Graph lock: rwlock to protect block layer graph manipulations (add/remove
* edges and nodes)
*
* Copyright (c) 2022 Red Hat
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
#include "qemu/osdep.h"
#include "qemu/main-loop.h"
#include "block/graph-lock.h"
#include "block/block.h"
#include "block/block_int.h"
/* Dummy lock object to use for Thread Safety Analysis (TSA) */
BdrvGraphLock graph_lock;
/* Protects the list of aiocontext and orphaned_reader_count */
static QemuMutex aio_context_list_lock;
/* Written and read with atomic operations. */
static int has_writer;
/*
* A reader coroutine could move from an AioContext to another.
* If this happens, there is no problem from the point of view of
* counters. The problem is that the total count becomes
* unbalanced if one of the two AioContexts gets deleted.
* The count of readers must remain correct, so the AioContext's
* balance is transferred to this glboal variable.
* Protected by aio_context_list_lock.
*/
static uint32_t orphaned_reader_count;
/* Queue of readers waiting for the writer to finish */
static CoQueue reader_queue;
struct BdrvGraphRWlock {
/* How many readers are currently reading the graph. */
uint32_t reader_count;
/*
* List of BdrvGraphRWlock kept in graph-lock.c
* Protected by aio_context_list_lock
*/
QTAILQ_ENTRY(BdrvGraphRWlock) next_aio;
};
/*
* List of BdrvGraphRWlock. This list ensures that each BdrvGraphRWlock
* can safely modify only its own counter, avoid reading/writing
* others and thus improving performances by avoiding cacheline bounces.
*/
static QTAILQ_HEAD(, BdrvGraphRWlock) aio_context_list =
QTAILQ_HEAD_INITIALIZER(aio_context_list);
static void __attribute__((__constructor__)) bdrv_init_graph_lock(void)
{
qemu_mutex_init(&aio_context_list_lock);
qemu_co_queue_init(&reader_queue);
}
void register_aiocontext(AioContext *ctx)
{
ctx->bdrv_graph = g_new0(BdrvGraphRWlock, 1);
QEMU_LOCK_GUARD(&aio_context_list_lock);
assert(ctx->bdrv_graph->reader_count == 0);
QTAILQ_INSERT_TAIL(&aio_context_list, ctx->bdrv_graph, next_aio);
}
void unregister_aiocontext(AioContext *ctx)
{
QEMU_LOCK_GUARD(&aio_context_list_lock);
orphaned_reader_count += ctx->bdrv_graph->reader_count;
QTAILQ_REMOVE(&aio_context_list, ctx->bdrv_graph, next_aio);
g_free(ctx->bdrv_graph);
}
static uint32_t reader_count(void)
{
BdrvGraphRWlock *brdv_graph;
uint32_t rd;
QEMU_LOCK_GUARD(&aio_context_list_lock);
/* rd can temporarily be negative, but the total will *always* be >= 0 */
rd = orphaned_reader_count;
QTAILQ_FOREACH(brdv_graph, &aio_context_list, next_aio) {
rd += qatomic_read(&brdv_graph->reader_count);
}
/* shouldn't overflow unless there are 2^31 readers */
assert((int32_t)rd >= 0);
return rd;
}
void bdrv_graph_wrlock(BlockDriverState *bs)
{
AioContext *ctx = NULL;
GLOBAL_STATE_CODE();
assert(!qatomic_read(&has_writer));
/*
* Release only non-mainloop AioContext. The mainloop often relies on the
* BQL and doesn't lock the main AioContext before doing things.
*/
if (bs) {
ctx = bdrv_get_aio_context(bs);
if (ctx != qemu_get_aio_context()) {
aio_context_release(ctx);
} else {
ctx = NULL;
}
}
/* Make sure that constantly arriving new I/O doesn't cause starvation */
bdrv_drain_all_begin_nopoll();
/*
* reader_count == 0: this means writer will read has_reader as 1
* reader_count >= 1: we don't know if writer read has_writer == 0 or 1,
* but we need to wait.
* Wait by allowing other coroutine (and possible readers) to continue.
*/
do {
/*
* has_writer must be 0 while polling, otherwise we get a deadlock if
* any callback involved during AIO_WAIT_WHILE() tries to acquire the
* reader lock.
*/
qatomic_set(&has_writer, 0);
AIO_WAIT_WHILE_UNLOCKED(NULL, reader_count() >= 1);
qatomic_set(&has_writer, 1);
/*
* We want to only check reader_count() after has_writer = 1 is visible
* to other threads. That way no more readers can sneak in after we've
* determined reader_count() == 0.
*/
smp_mb();
} while (reader_count() >= 1);
bdrv_drain_all_end();
if (ctx) {
aio_context_acquire(bdrv_get_aio_context(bs));
}
}
void bdrv_graph_wrunlock(void)
{
GLOBAL_STATE_CODE();
assert(qatomic_read(&has_writer));
WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
/*
* No need for memory barriers, this works in pair with
* the slow path of rdlock() and both take the lock.
*/
qatomic_store_release(&has_writer, 0);
/* Wake up all coroutines that are waiting to read the graph */
qemu_co_enter_all(&reader_queue, &aio_context_list_lock);
}
/*
* Run any BHs that were scheduled during the wrlock section and that
* callers might expect to have finished (in particular, this is important
* for bdrv_schedule_unref()).
*
* Do this only after restarting coroutines so that nested event loops in
* BHs don't deadlock if their condition relies on the coroutine making
* progress.
*/
aio_bh_poll(qemu_get_aio_context());
}
void coroutine_fn bdrv_graph_co_rdlock(void)
{
BdrvGraphRWlock *bdrv_graph;
bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
for (;;) {
qatomic_set(&bdrv_graph->reader_count,
bdrv_graph->reader_count + 1);
/* make sure writer sees reader_count before we check has_writer */
smp_mb();
/*
* has_writer == 0: this means writer will read reader_count as >= 1
* has_writer == 1: we don't know if writer read reader_count == 0
* or > 0, but we need to wait anyways because
* it will write.
*/
if (!qatomic_read(&has_writer)) {
break;
}
/*
* Synchronize access with reader_count() in bdrv_graph_wrlock().
* Case 1:
* If this critical section gets executed first, reader_count will
* decrease and the reader will go to sleep.
* Then the writer will read reader_count that does not take into
* account this reader, and if there's no other reader it will
* enter the write section.
* Case 2:
* If reader_count() critical section gets executed first,
* then writer will read reader_count >= 1.
* It will wait in AIO_WAIT_WHILE(), but once it releases the lock
* we will enter this critical section and call aio_wait_kick().
*/
WITH_QEMU_LOCK_GUARD(&aio_context_list_lock) {
/*
* Additional check when we use the above lock to synchronize
* with bdrv_graph_wrunlock().
* Case 1:
* If this gets executed first, has_writer is still 1, so we reduce
* reader_count and go to sleep.
* Then the writer will set has_writer to 0 and wake up all readers,
* us included.
* Case 2:
* If bdrv_graph_wrunlock() critical section gets executed first,
* then it will set has_writer to 0 and wake up all other readers.
* Then we execute this critical section, and therefore must check
* again for has_writer, otherwise we sleep without any writer
* actually running.
*/
if (!qatomic_read(&has_writer)) {
return;
}
/* slow path where reader sleeps */
bdrv_graph->reader_count--;
aio_wait_kick();
qemu_co_queue_wait(&reader_queue, &aio_context_list_lock);
}
}
}
void coroutine_fn bdrv_graph_co_rdunlock(void)
{
BdrvGraphRWlock *bdrv_graph;
bdrv_graph = qemu_get_current_aio_context()->bdrv_graph;
qatomic_store_release(&bdrv_graph->reader_count,
bdrv_graph->reader_count - 1);
/* make sure writer sees reader_count before we check has_writer */
smp_mb();
/*
* has_writer == 0: this means reader will read reader_count decreased
* has_writer == 1: we don't know if writer read reader_count old or
* new. Therefore, kick again so on next iteration
* writer will for sure read the updated value.
*/
if (qatomic_read(&has_writer)) {
aio_wait_kick();
}
}
void bdrv_graph_rdlock_main_loop(void)
{
GLOBAL_STATE_CODE();
assert(!qemu_in_coroutine());
}
void bdrv_graph_rdunlock_main_loop(void)
{
GLOBAL_STATE_CODE();
assert(!qemu_in_coroutine());
}
void assert_bdrv_graph_readable(void)
{
/* reader_count() is slow due to aio_context_list_lock lock contention */
#ifdef CONFIG_DEBUG_GRAPH_LOCK
assert(qemu_in_main_thread() || reader_count());
#endif
}
void assert_bdrv_graph_writable(void)
{
assert(qemu_in_main_thread());
assert(qatomic_read(&has_writer));
}
|