diff options
-rw-r--r-- | migration/rdma.c | 374 |
1 files changed, 275 insertions, 99 deletions
diff --git a/migration/rdma.c b/migration/rdma.c index f8578b919e..51bafc702b 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -2,10 +2,12 @@ * RDMA protocol and interfaces * * Copyright IBM, Corp. 2010-2013 + * Copyright Red Hat, Inc. 2015-2016 * * Authors: * Michael R. Hines <mrhines@us.ibm.com> * Jiuxing Liu <jl@us.ibm.com> + * Daniel P. Berrange <berrange@redhat.com> * * This work is licensed under the terms of the GNU GPL, version 2 or * later. See the COPYING file in the top-level directory. @@ -374,14 +376,20 @@ typedef struct RDMAContext { GHashTable *blockmap; } RDMAContext; -/* - * Interface to the rest of the migration call stack. - */ -typedef struct QEMUFileRDMA { +#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma" +#define QIO_CHANNEL_RDMA(obj) \ + OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA) + +typedef struct QIOChannelRDMA QIOChannelRDMA; + + +struct QIOChannelRDMA { + QIOChannel parent; RDMAContext *rdma; + QEMUFile *file; size_t len; - void *file; -} QEMUFileRDMA; + bool blocking; /* XXX we don't actually honour this yet */ +}; /* * Main structure for IB Send/Recv control messages. @@ -2518,15 +2526,19 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp) * SEND messages for control only. * VM's ram is handled with regular RDMA messages. */ -static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, - int64_t pos, size_t size) -{ - QEMUFileRDMA *r = opaque; - QEMUFile *f = r->file; - RDMAContext *rdma = r->rdma; - size_t remaining = size; - uint8_t * data = (void *) buf; +static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int *fds, + size_t nfds, + Error **errp) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + QEMUFile *f = rioc->file; + RDMAContext *rdma = rioc->rdma; int ret; + ssize_t done = 0; + size_t i; CHECK_ERROR_STATE(); @@ -2540,27 +2552,31 @@ static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, return ret; } - while (remaining) { - RDMAControlHeader head; + for (i = 0; i < niov; i++) { + size_t remaining = iov[i].iov_len; + uint8_t * data = (void *)iov[i].iov_base; + while (remaining) { + RDMAControlHeader head; - r->len = MIN(remaining, RDMA_SEND_INCREMENT); - remaining -= r->len; + rioc->len = MIN(remaining, RDMA_SEND_INCREMENT); + remaining -= rioc->len; - /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */ - head.len = (uint32_t)r->len; - head.type = RDMA_CONTROL_QEMU_FILE; + head.len = rioc->len; + head.type = RDMA_CONTROL_QEMU_FILE; - ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); + ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL); - if (ret < 0) { - rdma->error_state = ret; - return ret; - } + if (ret < 0) { + rdma->error_state = ret; + return ret; + } - data += r->len; + data += rioc->len; + done += rioc->len; + } } - return size; + return done; } static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, @@ -2585,41 +2601,74 @@ static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf, * RDMA links don't use bytestreams, so we have to * return bytes to QEMUFile opportunistically. */ -static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf, - int64_t pos, size_t size) -{ - QEMUFileRDMA *r = opaque; - RDMAContext *rdma = r->rdma; +static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, + const struct iovec *iov, + size_t niov, + int **fds, + size_t *nfds, + Error **errp) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + RDMAContext *rdma = rioc->rdma; RDMAControlHeader head; int ret = 0; + ssize_t i; + size_t done = 0; CHECK_ERROR_STATE(); - /* - * First, we hold on to the last SEND message we - * were given and dish out the bytes until we run - * out of bytes. - */ - r->len = qemu_rdma_fill(r->rdma, buf, size, 0); - if (r->len) { - return r->len; - } + for (i = 0; i < niov; i++) { + size_t want = iov[i].iov_len; + uint8_t *data = (void *)iov[i].iov_base; - /* - * Once we run out, we block and wait for another - * SEND message to arrive. - */ - ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); + /* + * First, we hold on to the last SEND message we + * were given and dish out the bytes until we run + * out of bytes. + */ + ret = qemu_rdma_fill(rioc->rdma, data, want, 0); + done += ret; + want -= ret; + /* Got what we needed, so go to next iovec */ + if (want == 0) { + continue; + } - if (ret < 0) { - rdma->error_state = ret; - return ret; - } + /* If we got any data so far, then don't wait + * for more, just return what we have */ + if (done > 0) { + break; + } - /* - * SEND was received with new bytes, now try again. - */ - return qemu_rdma_fill(r->rdma, buf, size, 0); + + /* We've got nothing at all, so lets wait for + * more to arrive + */ + ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE); + + if (ret < 0) { + rdma->error_state = ret; + return ret; + } + + /* + * SEND was received with new bytes, now try again. + */ + ret = qemu_rdma_fill(rioc->rdma, data, want, 0); + done += ret; + want -= ret; + + /* Still didn't get enough, so lets just return */ + if (want) { + if (done == 0) { + return QIO_CHANNEL_ERR_BLOCK; + } else { + break; + } + } + } + rioc->len = done; + return rioc->len; } /* @@ -2646,15 +2695,122 @@ static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma) return 0; } -static int qemu_rdma_close(void *opaque) + +static int qio_channel_rdma_set_blocking(QIOChannel *ioc, + bool blocking, + Error **errp) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + /* XXX we should make readv/writev actually honour this :-) */ + rioc->blocking = blocking; + return 0; +} + + +typedef struct QIOChannelRDMASource QIOChannelRDMASource; +struct QIOChannelRDMASource { + GSource parent; + QIOChannelRDMA *rioc; + GIOCondition condition; +}; + +static gboolean +qio_channel_rdma_source_prepare(GSource *source, + gint *timeout) +{ + QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; + RDMAContext *rdma = rsource->rioc->rdma; + GIOCondition cond = 0; + *timeout = -1; + + if (rdma->wr_data[0].control_len) { + cond |= G_IO_IN; + } + cond |= G_IO_OUT; + + return cond & rsource->condition; +} + +static gboolean +qio_channel_rdma_source_check(GSource *source) +{ + QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; + RDMAContext *rdma = rsource->rioc->rdma; + GIOCondition cond = 0; + + if (rdma->wr_data[0].control_len) { + cond |= G_IO_IN; + } + cond |= G_IO_OUT; + + return cond & rsource->condition; +} + +static gboolean +qio_channel_rdma_source_dispatch(GSource *source, + GSourceFunc callback, + gpointer user_data) +{ + QIOChannelFunc func = (QIOChannelFunc)callback; + QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source; + RDMAContext *rdma = rsource->rioc->rdma; + GIOCondition cond = 0; + + if (rdma->wr_data[0].control_len) { + cond |= G_IO_IN; + } + cond |= G_IO_OUT; + + return (*func)(QIO_CHANNEL(rsource->rioc), + (cond & rsource->condition), + user_data); +} + +static void +qio_channel_rdma_source_finalize(GSource *source) +{ + QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source; + + object_unref(OBJECT(ssource->rioc)); +} + +GSourceFuncs qio_channel_rdma_source_funcs = { + qio_channel_rdma_source_prepare, + qio_channel_rdma_source_check, + qio_channel_rdma_source_dispatch, + qio_channel_rdma_source_finalize +}; + +static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, + GIOCondition condition) { + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + QIOChannelRDMASource *ssource; + GSource *source; + + source = g_source_new(&qio_channel_rdma_source_funcs, + sizeof(QIOChannelRDMASource)); + ssource = (QIOChannelRDMASource *)source; + + ssource->rioc = rioc; + object_ref(OBJECT(rioc)); + + ssource->condition = condition; + + return source; +} + + +static int qio_channel_rdma_close(QIOChannel *ioc, + Error **errp) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); trace_qemu_rdma_close(); - QEMUFileRDMA *r = opaque; - if (r->rdma) { - qemu_rdma_cleanup(r->rdma); - g_free(r->rdma); + if (rioc->rdma) { + qemu_rdma_cleanup(rioc->rdma); + g_free(rioc->rdma); + rioc->rdma = NULL; } - g_free(r); return 0; } @@ -2696,8 +2852,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, ram_addr_t block_offset, ram_addr_t offset, size_t size, uint64_t *bytes_sent) { - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); + RDMAContext *rdma = rioc->rdma; int ret; CHECK_ERROR_STATE(); @@ -2951,8 +3107,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) }; RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, .repeat = 1 }; - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); + RDMAContext *rdma = rioc->rdma; RDMALocalBlocks *local = &rdma->local_ram_blocks; RDMAControlHeader head; RDMARegister *reg, *registers; @@ -3207,9 +3363,10 @@ out: * We've already built our local RAMBlock list, but not yet sent the list to * the source. */ -static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name) +static int +rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name) { - RDMAContext *rdma = rfile->rdma; + RDMAContext *rdma = rioc->rdma; int curr; int found = -1; @@ -3251,8 +3408,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data) static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, uint64_t flags, void *data) { - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); + RDMAContext *rdma = rioc->rdma; CHECK_ERROR_STATE(); @@ -3271,8 +3428,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, uint64_t flags, void *data) { Error *local_err = NULL, **errp = &local_err; - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); + RDMAContext *rdma = rioc->rdma; RDMAControlHeader head = { .len = 0, .repeat = 1 }; int ret = 0; @@ -3368,55 +3525,74 @@ err: return ret; } -static int qemu_rdma_get_fd(void *opaque) -{ - QEMUFileRDMA *rfile = opaque; - RDMAContext *rdma = rfile->rdma; - - return rdma->comp_channel->fd; -} - -static const QEMUFileOps rdma_read_ops = { - .get_buffer = qemu_rdma_get_buffer, - .get_fd = qemu_rdma_get_fd, - .close = qemu_rdma_close, -}; - static const QEMUFileHooks rdma_read_hooks = { .hook_ram_load = rdma_load_hook, }; -static const QEMUFileOps rdma_write_ops = { - .put_buffer = qemu_rdma_put_buffer, - .close = qemu_rdma_close, -}; - static const QEMUFileHooks rdma_write_hooks = { .before_ram_iterate = qemu_rdma_registration_start, .after_ram_iterate = qemu_rdma_registration_stop, .save_page = qemu_rdma_save_page, }; -static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) + +static void qio_channel_rdma_finalize(Object *obj) +{ + QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj); + if (rioc->rdma) { + qemu_rdma_cleanup(rioc->rdma); + g_free(rioc->rdma); + rioc->rdma = NULL; + } +} + +static void qio_channel_rdma_class_init(ObjectClass *klass, + void *class_data G_GNUC_UNUSED) +{ + QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass); + + ioc_klass->io_writev = qio_channel_rdma_writev; + ioc_klass->io_readv = qio_channel_rdma_readv; + ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking; + ioc_klass->io_close = qio_channel_rdma_close; + ioc_klass->io_create_watch = qio_channel_rdma_create_watch; +} + +static const TypeInfo qio_channel_rdma_info = { + .parent = TYPE_QIO_CHANNEL, + .name = TYPE_QIO_CHANNEL_RDMA, + .instance_size = sizeof(QIOChannelRDMA), + .instance_finalize = qio_channel_rdma_finalize, + .class_init = qio_channel_rdma_class_init, +}; + +static void qio_channel_rdma_register_types(void) +{ + type_register_static(&qio_channel_rdma_info); +} + +type_init(qio_channel_rdma_register_types); + +static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) { - QEMUFileRDMA *r; + QIOChannelRDMA *rioc; if (qemu_file_mode_is_not_valid(mode)) { return NULL; } - r = g_new0(QEMUFileRDMA, 1); - r->rdma = rdma; + rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); + rioc->rdma = rdma; if (mode[0] == 'w') { - r->file = qemu_fopen_ops(r, &rdma_write_ops); - qemu_file_set_hooks(r->file, &rdma_write_hooks); + rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); + qemu_file_set_hooks(rioc->file, &rdma_write_hooks); } else { - r->file = qemu_fopen_ops(r, &rdma_read_ops); - qemu_file_set_hooks(r->file, &rdma_read_hooks); + rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc)); + qemu_file_set_hooks(rioc->file, &rdma_read_hooks); } - return r->file; + return rioc->file; } static void rdma_accept_incoming_migration(void *opaque) |