diff options
Diffstat (limited to 'buffered_file.c')
-rw-r--r-- | buffered_file.c | 129 |
1 files changed, 46 insertions, 83 deletions
diff --git a/buffered_file.c b/buffered_file.c index f170aa046f..ed92df1053 100644 --- a/buffered_file.c +++ b/buffered_file.c @@ -23,11 +23,7 @@ typedef struct QEMUFileBuffered { - BufferedPutFunc *put_buffer; - BufferedPutReadyFunc *put_ready; - BufferedWaitForUnfreezeFunc *wait_for_unfreeze; - BufferedCloseFunc *close; - void *opaque; + MigrationState *migration_state; QEMUFile *file; int freeze_output; size_t bytes_xfer; @@ -50,70 +46,60 @@ static void buffered_append(QEMUFileBuffered *s, const uint8_t *buf, size_t size) { if (size > (s->buffer_capacity - s->buffer_size)) { - void *tmp; - DPRINTF("increasing buffer capacity from %zu by %zu\n", s->buffer_capacity, size + 1024); s->buffer_capacity += size + 1024; - tmp = g_realloc(s->buffer, s->buffer_capacity); - if (tmp == NULL) { - fprintf(stderr, "qemu file buffer expansion failed\n"); - exit(1); - } - - s->buffer = tmp; + s->buffer = g_realloc(s->buffer, s->buffer_capacity); } memcpy(s->buffer + s->buffer_size, buf, size); s->buffer_size += size; } -static void buffered_flush(QEMUFileBuffered *s) +static ssize_t buffered_flush(QEMUFileBuffered *s) { size_t offset = 0; - int error; - - error = qemu_file_get_error(s->file); - if (error != 0) { - DPRINTF("flush when error, bailing: %s\n", strerror(-error)); - return; - } + ssize_t ret = 0; DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size); - while (offset < s->buffer_size) { - ssize_t ret; + while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) { - ret = s->put_buffer(s->opaque, s->buffer + offset, - s->buffer_size - offset); + ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset, + s->buffer_size - offset); if (ret == -EAGAIN) { DPRINTF("backend not ready, freezing\n"); + ret = 0; s->freeze_output = 1; break; } if (ret <= 0) { DPRINTF("error flushing data, %zd\n", ret); - qemu_file_set_error(s->file, ret); break; } else { DPRINTF("flushed %zd byte(s)\n", ret); offset += ret; + s->bytes_xfer += ret; } } DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size); memmove(s->buffer, s->buffer + offset, s->buffer_size - offset); s->buffer_size -= offset; + + if (ret < 0) { + return ret; + } + return offset; } static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) { QEMUFileBuffered *s = opaque; - int offset = 0, error; - ssize_t ret; + ssize_t error; DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos); @@ -126,65 +112,54 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in DPRINTF("unfreezing output\n"); s->freeze_output = 0; - buffered_flush(s); - - while (!s->freeze_output && offset < size) { - if (s->bytes_xfer > s->xfer_limit) { - DPRINTF("transfer limit exceeded when putting\n"); - break; - } - - ret = s->put_buffer(s->opaque, buf + offset, size - offset); - if (ret == -EAGAIN) { - DPRINTF("backend not ready, freezing\n"); - s->freeze_output = 1; - break; - } - - if (ret <= 0) { - DPRINTF("error putting\n"); - qemu_file_set_error(s->file, ret); - offset = -EINVAL; - break; - } - - DPRINTF("put %zd byte(s)\n", ret); - offset += ret; - s->bytes_xfer += ret; + if (size > 0) { + DPRINTF("buffering %d bytes\n", size - offset); + buffered_append(s, buf, size); } - if (offset >= 0) { - DPRINTF("buffering %d bytes\n", size - offset); - buffered_append(s, buf + offset, size - offset); - offset = size; + error = buffered_flush(s); + if (error < 0) { + DPRINTF("buffered flush error. bailing: %s\n", strerror(-error)); + return error; } if (pos == 0 && size == 0) { DPRINTF("file is ready\n"); - if (s->bytes_xfer <= s->xfer_limit) { + if (!s->freeze_output && s->bytes_xfer < s->xfer_limit) { DPRINTF("notifying client\n"); - s->put_ready(s->opaque); + migrate_fd_put_ready(s->migration_state); } } - return offset; + return size; } static int buffered_close(void *opaque) { QEMUFileBuffered *s = opaque; - int ret; + ssize_t ret = 0; + int ret2; DPRINTF("closing\n"); + s->xfer_limit = INT_MAX; while (!qemu_file_get_error(s->file) && s->buffer_size) { - buffered_flush(s); - if (s->freeze_output) - s->wait_for_unfreeze(s->opaque); + ret = buffered_flush(s); + if (ret < 0) { + break; + } + if (s->freeze_output) { + ret = migrate_fd_wait_for_unfreeze(s->migration_state); + if (ret < 0) { + break; + } + } } - ret = s->close(s->opaque); - + ret2 = migrate_fd_close(s->migration_state); + if (ret >= 0) { + ret = ret2; + } qemu_del_timer(s->timer); qemu_free_timer(s->timer); g_free(s->buffer); @@ -256,29 +231,17 @@ static void buffered_rate_tick(void *opaque) s->bytes_xfer = 0; - buffered_flush(s); - - /* Add some checks around this */ - s->put_ready(s->opaque); + buffered_put_buffer(s, NULL, 0, 0); } -QEMUFile *qemu_fopen_ops_buffered(void *opaque, - size_t bytes_per_sec, - BufferedPutFunc *put_buffer, - BufferedPutReadyFunc *put_ready, - BufferedWaitForUnfreezeFunc *wait_for_unfreeze, - BufferedCloseFunc *close) +QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state) { QEMUFileBuffered *s; s = g_malloc0(sizeof(*s)); - s->opaque = opaque; - s->xfer_limit = bytes_per_sec / 10; - s->put_buffer = put_buffer; - s->put_ready = put_ready; - s->wait_for_unfreeze = wait_for_unfreeze; - s->close = close; + s->migration_state = migration_state; + s->xfer_limit = migration_state->bandwidth_limit / 10; s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, buffered_close, buffered_rate_limit, |