diff options
-rw-r--r-- | docs/COLO-FT.txt | 34 | ||||
-rw-r--r-- | hw/net/e1000.c | 16 | ||||
-rw-r--r-- | hw/net/ne2000.c | 4 | ||||
-rw-r--r-- | hw/net/pcnet.c | 4 | ||||
-rw-r--r-- | hw/net/rtl8139.c | 8 | ||||
-rw-r--r-- | hw/net/trace-events | 3 | ||||
-rw-r--r-- | hw/virtio/virtio.c | 2 | ||||
-rw-r--r-- | include/exec/ram_addr.h | 1 | ||||
-rw-r--r-- | include/migration/colo.h | 11 | ||||
-rw-r--r-- | include/net/filter.h | 5 | ||||
-rw-r--r-- | migration/Makefile.objs | 2 | ||||
-rw-r--r-- | migration/colo-comm.c | 76 | ||||
-rw-r--r-- | migration/colo-failover.c | 2 | ||||
-rw-r--r-- | migration/colo.c | 212 | ||||
-rw-r--r-- | migration/migration.c | 46 | ||||
-rw-r--r-- | migration/ram.c | 166 | ||||
-rw-r--r-- | migration/ram.h | 4 | ||||
-rw-r--r-- | migration/savevm.c | 53 | ||||
-rw-r--r-- | migration/savevm.h | 5 | ||||
-rw-r--r-- | migration/trace-events | 3 | ||||
-rw-r--r-- | net/colo-compare.c | 115 | ||||
-rw-r--r-- | net/colo-compare.h | 24 | ||||
-rw-r--r-- | net/colo.c | 10 | ||||
-rw-r--r-- | net/colo.h | 11 | ||||
-rw-r--r-- | net/filter-rewriter.c | 166 | ||||
-rw-r--r-- | net/filter.c | 17 | ||||
-rw-r--r-- | net/net.c | 26 | ||||
-rw-r--r-- | qapi/migration.json | 80 | ||||
-rw-r--r-- | qemu-options.hx | 2 | ||||
-rw-r--r-- | vl.c | 2 |
30 files changed, 958 insertions, 152 deletions
diff --git a/docs/COLO-FT.txt b/docs/COLO-FT.txt index 70cfb9ce7d..6302469d0e 100644 --- a/docs/COLO-FT.txt +++ b/docs/COLO-FT.txt @@ -110,6 +110,40 @@ Note: HeartBeat has not been implemented yet, so you need to trigger failover process by using 'x-colo-lost-heartbeat' command. +== COLO operation status == + ++-----------------+ +| | +| Start COLO | +| | ++--------+--------+ + | + | Main qmp command: + | migrate-set-capabilities with x-colo + | migrate + | + v ++--------+--------+ +| | +| COLO running | +| | ++--------+--------+ + | + | Main qmp command: + | x-colo-lost-heartbeat + | or + | some error happened + v ++--------+--------+ +| | send qmp event: +| COLO failover | COLO_EXIT +| | ++-----------------+ + +COLO use the qmp command to switch and report operation status. +The diagram just shows the main qmp command, you can get the detail +in test procedure. + == Test procedure == 1. Startup qemu Primary: diff --git a/hw/net/e1000.c b/hw/net/e1000.c index 13a9494a8d..5e144cb4e4 100644 --- a/hw/net/e1000.c +++ b/hw/net/e1000.c @@ -36,6 +36,7 @@ #include "qemu/range.h" #include "e1000x_common.h" +#include "trace.h" static const uint8_t bcast[] = {0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; @@ -847,6 +848,15 @@ static uint64_t rx_desc_base(E1000State *s) return (bah << 32) + bal; } +static void +e1000_receiver_overrun(E1000State *s, size_t size) +{ + trace_e1000_receiver_overrun(size, s->mac_reg[RDH], s->mac_reg[RDT]); + e1000x_inc_reg_if_not_full(s->mac_reg, RNBC); + e1000x_inc_reg_if_not_full(s->mac_reg, MPC); + set_ics(s, 0, E1000_ICS_RXO); +} + static ssize_t e1000_receive_iov(NetClientState *nc, const struct iovec *iov, int iovcnt) { @@ -916,8 +926,8 @@ e1000_receive_iov(NetClientState *nc, const struct iovec *iov, int iovcnt) desc_offset = 0; total_size = size + e1000x_fcs_len(s->mac_reg); if (!e1000_has_rxbufs(s, total_size)) { - set_ics(s, 0, E1000_ICS_RXO); - return -1; + e1000_receiver_overrun(s, total_size); + return -1; } do { desc_size = total_size - desc_offset; @@ -969,7 +979,7 @@ e1000_receive_iov(NetClientState *nc, const struct iovec *iov, int iovcnt) rdh_start >= s->mac_reg[RDLEN] / sizeof(desc)) { DBGOUT(RXERR, "RDH wraparound @%x, RDT %x, RDLEN %x\n", rdh_start, s->mac_reg[RDT], s->mac_reg[RDLEN]); - set_ics(s, 0, E1000_ICS_RXO); + e1000_receiver_overrun(s, total_size); return -1; } } while (desc_offset < total_size); diff --git a/hw/net/ne2000.c b/hw/net/ne2000.c index 07d79e317f..869518ee06 100644 --- a/hw/net/ne2000.c +++ b/hw/net/ne2000.c @@ -174,7 +174,7 @@ static int ne2000_buffer_full(NE2000State *s) ssize_t ne2000_receive(NetClientState *nc, const uint8_t *buf, size_t size_) { NE2000State *s = qemu_get_nic_opaque(nc); - int size = size_; + size_t size = size_; uint8_t *p; unsigned int total_len, next, avail, len, index, mcast_idx; uint8_t buf1[60]; @@ -182,7 +182,7 @@ ssize_t ne2000_receive(NetClientState *nc, const uint8_t *buf, size_t size_) { 0xff, 0xff, 0xff, 0xff, 0xff, 0xff }; #if defined(DEBUG_NE2000) - printf("NE2000: received len=%d\n", size); + printf("NE2000: received len=%zu\n", size); #endif if (s->cmd & E8390_STOP || ne2000_buffer_full(s)) diff --git a/hw/net/pcnet.c b/hw/net/pcnet.c index 0c44554168..d9ba04bdfc 100644 --- a/hw/net/pcnet.c +++ b/hw/net/pcnet.c @@ -988,14 +988,14 @@ ssize_t pcnet_receive(NetClientState *nc, const uint8_t *buf, size_t size_) uint8_t buf1[60]; int remaining; int crc_err = 0; - int size = size_; + size_t size = size_; if (CSR_DRX(s) || CSR_STOP(s) || CSR_SPND(s) || !size || (CSR_LOOP(s) && !s->looptest)) { return -1; } #ifdef PCNET_DEBUG - printf("pcnet_receive size=%d\n", size); + printf("pcnet_receive size=%zu\n", size); #endif /* if too small buffer, then expand it */ diff --git a/hw/net/rtl8139.c b/hw/net/rtl8139.c index 46daa16202..2342a095e3 100644 --- a/hw/net/rtl8139.c +++ b/hw/net/rtl8139.c @@ -817,7 +817,7 @@ static ssize_t rtl8139_do_receive(NetClientState *nc, const uint8_t *buf, size_t RTL8139State *s = qemu_get_nic_opaque(nc); PCIDevice *d = PCI_DEVICE(s); /* size is the length of the buffer passed to the driver */ - int size = size_; + size_t size = size_; const uint8_t *dot1q_buf = NULL; uint32_t packet_header = 0; @@ -826,7 +826,7 @@ static ssize_t rtl8139_do_receive(NetClientState *nc, const uint8_t *buf, size_t static const uint8_t broadcast_macaddr[6] = { 0xff, 0xff, 0xff, 0xff, 0xff, 0xff }; - DPRINTF(">>> received len=%d\n", size); + DPRINTF(">>> received len=%zu\n", size); /* test if board clock is stopped */ if (!s->clock_enabled) @@ -1035,7 +1035,7 @@ static ssize_t rtl8139_do_receive(NetClientState *nc, const uint8_t *buf, size_t if (size+4 > rx_space) { - DPRINTF("C+ Rx mode : descriptor %d size %d received %d + 4\n", + DPRINTF("C+ Rx mode : descriptor %d size %d received %zu + 4\n", descriptor, rx_space, size); s->IntrStatus |= RxOverflow; @@ -1148,7 +1148,7 @@ static ssize_t rtl8139_do_receive(NetClientState *nc, const uint8_t *buf, size_t if (avail != 0 && RX_ALIGN(size + 8) >= avail) { DPRINTF("rx overflow: rx buffer length %d head 0x%04x " - "read 0x%04x === available 0x%04x need 0x%04x\n", + "read 0x%04x === available 0x%04x need 0x%04zx\n", s->RxBufferSize, s->RxBufAddr, s->RxBufPtr, avail, size + 8); s->IntrStatus |= RxOverflow; diff --git a/hw/net/trace-events b/hw/net/trace-events index c1dea4b156..9d49f62fa1 100644 --- a/hw/net/trace-events +++ b/hw/net/trace-events @@ -98,6 +98,9 @@ net_rx_pkt_rss_ip6_ex(void) "Calculating IPv6/EX RSS hash" net_rx_pkt_rss_hash(size_t rss_length, uint32_t rss_hash) "RSS hash for %zu bytes: 0x%X" net_rx_pkt_rss_add_chunk(void* ptr, size_t size, size_t input_offset) "Add RSS chunk %p, %zu bytes, RSS input offset %zu bytes" +# hw/net/e1000.c +e1000_receiver_overrun(size_t s, uint32_t rdh, uint32_t rdt) "Receiver overrun: dropped packet of %zu bytes, RDH=%u, RDT=%u" + # hw/net/e1000x_common.c e1000x_rx_can_recv_disabled(bool link_up, bool rx_enabled, bool pci_master) "link_up: %d, rx_enabled %d, pci_master %d" e1000x_vlan_is_vlan_pkt(bool is_vlan_pkt, uint16_t eth_proto, uint16_t vet) "Is VLAN packet: %d, ETH proto: 0x%X, VET: 0x%X" diff --git a/hw/virtio/virtio.c b/hw/virtio/virtio.c index 4e61944f14..4136d239dd 100644 --- a/hw/virtio/virtio.c +++ b/hw/virtio/virtio.c @@ -1611,6 +1611,8 @@ void virtio_del_queue(VirtIODevice *vdev, int n) vdev->vq[n].vring.num = 0; vdev->vq[n].vring.num_default = 0; + vdev->vq[n].handle_output = NULL; + vdev->vq[n].handle_aio_output = NULL; } static void virtio_set_isr(VirtIODevice *vdev, int value) diff --git a/include/exec/ram_addr.h b/include/exec/ram_addr.h index 3abb639056..9ecd911c3e 100644 --- a/include/exec/ram_addr.h +++ b/include/exec/ram_addr.h @@ -27,6 +27,7 @@ struct RAMBlock { struct rcu_head rcu; struct MemoryRegion *mr; uint8_t *host; + uint8_t *colo_cache; /* For colo, VM's ram cache */ ram_addr_t offset; ram_addr_t used_length; ram_addr_t max_length; diff --git a/include/migration/colo.h b/include/migration/colo.h index 2fe48ad353..99ce17aca7 100644 --- a/include/migration/colo.h +++ b/include/migration/colo.h @@ -16,14 +16,21 @@ #include "qemu-common.h" #include "qapi/qapi-types-migration.h" +enum colo_event { + COLO_EVENT_NONE, + COLO_EVENT_CHECKPOINT, + COLO_EVENT_FAILOVER, +}; + void colo_info_init(void); void migrate_start_colo_process(MigrationState *s); bool migration_in_colo_state(void); /* loadvm */ -bool migration_incoming_enable_colo(void); -void migration_incoming_exit_colo(void); +void migration_incoming_enable_colo(void); +void migration_incoming_disable_colo(void); +bool migration_incoming_colo_enabled(void); void *colo_process_incoming_thread(void *opaque); bool migration_incoming_in_colo_state(void); diff --git a/include/net/filter.h b/include/net/filter.h index 435acd6f82..49da666ac0 100644 --- a/include/net/filter.h +++ b/include/net/filter.h @@ -38,6 +38,8 @@ typedef ssize_t (FilterReceiveIOV)(NetFilterState *nc, typedef void (FilterStatusChanged) (NetFilterState *nf, Error **errp); +typedef void (FilterHandleEvent) (NetFilterState *nf, int event, Error **errp); + typedef struct NetFilterClass { ObjectClass parent_class; @@ -45,6 +47,7 @@ typedef struct NetFilterClass { FilterSetup *setup; FilterCleanup *cleanup; FilterStatusChanged *status_changed; + FilterHandleEvent *handle_event; /* mandatory */ FilterReceiveIOV *receive_iov; } NetFilterClass; @@ -77,4 +80,6 @@ ssize_t qemu_netfilter_pass_to_next(NetClientState *sender, int iovcnt, void *opaque); +void colo_notify_filters_event(int event, Error **errp); + #endif /* QEMU_NET_FILTER_H */ diff --git a/migration/Makefile.objs b/migration/Makefile.objs index c83ec47ba8..a4f3bafd86 100644 --- a/migration/Makefile.objs +++ b/migration/Makefile.objs @@ -1,6 +1,6 @@ common-obj-y += migration.o socket.o fd.o exec.o common-obj-y += tls.o channel.o savevm.o -common-obj-y += colo-comm.o colo.o colo-failover.o +common-obj-y += colo.o colo-failover.o common-obj-y += vmstate.o vmstate-types.o page_cache.o common-obj-y += qemu-file.o global_state.o common-obj-y += qemu-file-channel.o diff --git a/migration/colo-comm.c b/migration/colo-comm.c deleted file mode 100644 index df26e4dfe7..0000000000 --- a/migration/colo-comm.c +++ /dev/null @@ -1,76 +0,0 @@ -/* - * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) - * (a.k.a. Fault Tolerance or Continuous Replication) - * - * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD. - * Copyright (c) 2016 FUJITSU LIMITED - * Copyright (c) 2016 Intel Corporation - * - * This work is licensed under the terms of the GNU GPL, version 2 or - * later. See the COPYING file in the top-level directory. - * - */ - -#include "qemu/osdep.h" -#include "migration.h" -#include "migration/colo.h" -#include "migration/vmstate.h" -#include "trace.h" - -typedef struct { - bool colo_requested; -} COLOInfo; - -static COLOInfo colo_info; - -COLOMode get_colo_mode(void) -{ - if (migration_in_colo_state()) { - return COLO_MODE_PRIMARY; - } else if (migration_incoming_in_colo_state()) { - return COLO_MODE_SECONDARY; - } else { - return COLO_MODE_UNKNOWN; - } -} - -static int colo_info_pre_save(void *opaque) -{ - COLOInfo *s = opaque; - - s->colo_requested = migrate_colo_enabled(); - - return 0; -} - -static bool colo_info_need(void *opaque) -{ - return migrate_colo_enabled(); -} - -static const VMStateDescription colo_state = { - .name = "COLOState", - .version_id = 1, - .minimum_version_id = 1, - .pre_save = colo_info_pre_save, - .needed = colo_info_need, - .fields = (VMStateField[]) { - VMSTATE_BOOL(colo_requested, COLOInfo), - VMSTATE_END_OF_LIST() - }, -}; - -void colo_info_init(void) -{ - vmstate_register(NULL, 0, &colo_state, &colo_info); -} - -bool migration_incoming_enable_colo(void) -{ - return colo_info.colo_requested; -} - -void migration_incoming_exit_colo(void) -{ - colo_info.colo_requested = false; -} diff --git a/migration/colo-failover.c b/migration/colo-failover.c index 0ae0c41221..4854a96c92 100644 --- a/migration/colo-failover.c +++ b/migration/colo-failover.c @@ -77,7 +77,7 @@ FailoverStatus failover_get_state(void) void qmp_x_colo_lost_heartbeat(Error **errp) { - if (get_colo_mode() == COLO_MODE_UNKNOWN) { + if (get_colo_mode() == COLO_MODE_NONE) { error_setg(errp, QERR_FEATURE_DISABLED, "colo"); return; } diff --git a/migration/colo.c b/migration/colo.c index 88936f5962..956ac236b7 100644 --- a/migration/colo.c +++ b/migration/colo.c @@ -25,8 +25,16 @@ #include "qemu/error-report.h" #include "migration/failover.h" #include "replication.h" +#include "net/colo-compare.h" +#include "net/colo.h" +#include "block/block.h" +#include "qapi/qapi-events-migration.h" +#include "qapi/qmp/qerror.h" +#include "sysemu/cpus.h" +#include "net/filter.h" static bool vmstate_loading; +static Notifier packets_compare_notifier; #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024) @@ -53,6 +61,7 @@ static void secondary_vm_do_failover(void) { int old_state; MigrationIncomingState *mis = migration_incoming_get_current(); + Error *local_err = NULL; /* Can not do failover during the process of VM's loading VMstate, Or * it will break the secondary VM. @@ -70,6 +79,17 @@ static void secondary_vm_do_failover(void) migrate_set_state(&mis->state, MIGRATION_STATUS_COLO, MIGRATION_STATUS_COMPLETED); + replication_stop_all(true, &local_err); + if (local_err) { + error_report_err(local_err); + } + + /* Notify all filters of all NIC to do checkpoint */ + colo_notify_filters_event(COLO_EVENT_FAILOVER, &local_err); + if (local_err) { + error_report_err(local_err); + } + if (!autostart) { error_report("\"-S\" qemu option will be ignored in secondary side"); /* recover runstate to normal migration finish state */ @@ -107,9 +127,15 @@ static void primary_vm_do_failover(void) { MigrationState *s = migrate_get_current(); int old_state; + Error *local_err = NULL; migrate_set_state(&s->state, MIGRATION_STATUS_COLO, MIGRATION_STATUS_COMPLETED); + /* + * kick COLO thread which might wait at + * qemu_sem_wait(&s->colo_checkpoint_sem). + */ + colo_checkpoint_notify(migrate_get_current()); /* * Wake up COLO thread which may blocked in recv() or send(), @@ -130,10 +156,28 @@ static void primary_vm_do_failover(void) FailoverStatus_str(old_state)); return; } + + replication_stop_all(true, &local_err); + if (local_err) { + error_report_err(local_err); + local_err = NULL; + } + /* Notify COLO thread that failover work is finished */ qemu_sem_post(&s->colo_exit_sem); } +COLOMode get_colo_mode(void) +{ + if (migration_in_colo_state()) { + return COLO_MODE_PRIMARY; + } else if (migration_incoming_in_colo_state()) { + return COLO_MODE_SECONDARY; + } else { + return COLO_MODE_NONE; + } +} + void colo_do_failover(MigrationState *s) { /* Make sure VM stopped while failover happened. */ @@ -207,6 +251,26 @@ void qmp_xen_colo_do_checkpoint(Error **errp) #endif } +COLOStatus *qmp_query_colo_status(Error **errp) +{ + COLOStatus *s = g_new0(COLOStatus, 1); + + s->mode = get_colo_mode(); + + switch (failover_get_state()) { + case FAILOVER_STATUS_NONE: + s->reason = COLO_EXIT_REASON_NONE; + break; + case FAILOVER_STATUS_REQUIRE: + s->reason = COLO_EXIT_REASON_REQUEST; + break; + default: + s->reason = COLO_EXIT_REASON_ERROR; + } + + return s; +} + static void colo_send_message(QEMUFile *f, COLOMessage msg, Error **errp) { @@ -343,21 +407,42 @@ static int colo_do_checkpoint_transaction(MigrationState *s, goto out; } + colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err); + if (local_err) { + goto out; + } + /* Disable block migration */ migrate_set_block_enabled(false, &local_err); - qemu_savevm_state_header(fb); - qemu_savevm_state_setup(fb); qemu_mutex_lock_iothread(); - qemu_savevm_state_complete_precopy(fb, false, false); - qemu_mutex_unlock_iothread(); - - qemu_fflush(fb); + replication_do_checkpoint_all(&local_err); + if (local_err) { + qemu_mutex_unlock_iothread(); + goto out; + } colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err); if (local_err) { + qemu_mutex_unlock_iothread(); + goto out; + } + /* Note: device state is saved into buffer */ + ret = qemu_save_device_state(fb); + + qemu_mutex_unlock_iothread(); + if (ret < 0) { goto out; } /* + * Only save VM's live state, which not including device state. + * TODO: We may need a timeout mechanism to prevent COLO process + * to be blocked here. + */ + qemu_savevm_live_state(s->to_dst_file); + + qemu_fflush(fb); + + /* * We need the size of the VMstate data in Secondary side, * With which we can decide how much data should be read. */ @@ -400,6 +485,11 @@ out: return ret; } +static void colo_compare_notify_checkpoint(Notifier *notifier, void *data) +{ + colo_checkpoint_notify(data); +} + static void colo_process_checkpoint(MigrationState *s) { QIOChannelBuffer *bioc; @@ -416,6 +506,9 @@ static void colo_process_checkpoint(MigrationState *s) goto out; } + packets_compare_notifier.notify = colo_compare_notify_checkpoint; + colo_compare_register_notifier(&packets_compare_notifier); + /* * Wait for Secondary finish loading VM states and enter COLO * restore. @@ -430,6 +523,12 @@ static void colo_process_checkpoint(MigrationState *s) object_unref(OBJECT(bioc)); qemu_mutex_lock_iothread(); + replication_start_all(REPLICATION_MODE_PRIMARY, &local_err); + if (local_err) { + qemu_mutex_unlock_iothread(); + goto out; + } + vm_start(); qemu_mutex_unlock_iothread(); trace_colo_vm_state_change("stop", "run"); @@ -445,6 +544,9 @@ static void colo_process_checkpoint(MigrationState *s) qemu_sem_wait(&s->colo_checkpoint_sem); + if (s->state != MIGRATION_STATUS_COLO) { + goto out; + } ret = colo_do_checkpoint_transaction(s, bioc, fb); if (ret < 0) { goto out; @@ -461,11 +563,38 @@ out: qemu_fclose(fb); } - timer_del(s->colo_delay_timer); + /* + * There are only two reasons we can get here, some error happened + * or the user triggered failover. + */ + switch (failover_get_state()) { + case FAILOVER_STATUS_NONE: + qapi_event_send_colo_exit(COLO_MODE_PRIMARY, + COLO_EXIT_REASON_ERROR); + break; + case FAILOVER_STATUS_REQUIRE: + qapi_event_send_colo_exit(COLO_MODE_PRIMARY, + COLO_EXIT_REASON_REQUEST); + break; + default: + abort(); + } /* Hope this not to be too long to wait here */ qemu_sem_wait(&s->colo_exit_sem); qemu_sem_destroy(&s->colo_exit_sem); + + /* + * It is safe to unregister notifier after failover finished. + * Besides, colo_delay_timer and colo_checkpoint_sem can't be + * released befor unregister notifier, or there will be use-after-free + * error. + */ + colo_compare_unregister_notifier(&packets_compare_notifier); + timer_del(s->colo_delay_timer); + timer_free(s->colo_delay_timer); + qemu_sem_destroy(&s->colo_checkpoint_sem); + /* * Must be called after failover BH is completed, * Or the failover BH may shutdown the wrong fd that @@ -533,6 +662,7 @@ void *colo_process_incoming_thread(void *opaque) uint64_t total_size; uint64_t value; Error *local_err = NULL; + int ret; rcu_register_thread(); qemu_sem_init(&mis->colo_incoming_sem, 0); @@ -559,6 +689,16 @@ void *colo_process_incoming_thread(void *opaque) fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc)); object_unref(OBJECT(bioc)); + qemu_mutex_lock_iothread(); + replication_start_all(REPLICATION_MODE_SECONDARY, &local_err); + if (local_err) { + qemu_mutex_unlock_iothread(); + goto out; + } + vm_start(); + trace_colo_vm_state_change("stop", "run"); + qemu_mutex_unlock_iothread(); + colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY, &local_err); if (local_err) { @@ -578,6 +718,11 @@ void *colo_process_incoming_thread(void *opaque) goto out; } + qemu_mutex_lock_iothread(); + vm_stop_force_state(RUN_STATE_COLO); + trace_colo_vm_state_change("run", "stop"); + qemu_mutex_unlock_iothread(); + /* FIXME: This is unnecessary for periodic checkpoint mode */ colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY, &local_err); @@ -591,6 +736,16 @@ void *colo_process_incoming_thread(void *opaque) goto out; } + qemu_mutex_lock_iothread(); + cpu_synchronize_all_pre_loadvm(); + ret = qemu_loadvm_state_main(mis->from_src_file, mis); + qemu_mutex_unlock_iothread(); + + if (ret < 0) { + error_report("Load VM's live state (ram) error"); + goto out; + } + value = colo_receive_message_value(mis->from_src_file, COLO_MESSAGE_VMSTATE_SIZE, &local_err); if (local_err) { @@ -622,15 +777,37 @@ void *colo_process_incoming_thread(void *opaque) } qemu_mutex_lock_iothread(); - qemu_system_reset(SHUTDOWN_CAUSE_NONE); vmstate_loading = true; - if (qemu_loadvm_state(fb) < 0) { - error_report("COLO: loadvm failed"); + ret = qemu_load_device_state(fb); + if (ret < 0) { + error_report("COLO: load device state failed"); + qemu_mutex_unlock_iothread(); + goto out; + } + + replication_get_error_all(&local_err); + if (local_err) { + qemu_mutex_unlock_iothread(); + goto out; + } + /* discard colo disk buffer */ + replication_do_checkpoint_all(&local_err); + if (local_err) { + qemu_mutex_unlock_iothread(); + goto out; + } + + /* Notify all filters of all NIC to do checkpoint */ + colo_notify_filters_event(COLO_EVENT_CHECKPOINT, &local_err); + + if (local_err) { qemu_mutex_unlock_iothread(); goto out; } vmstate_loading = false; + vm_start(); + trace_colo_vm_state_change("stop", "run"); qemu_mutex_unlock_iothread(); if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) { @@ -654,6 +831,19 @@ out: error_report_err(local_err); } + switch (failover_get_state()) { + case FAILOVER_STATUS_NONE: + qapi_event_send_colo_exit(COLO_MODE_SECONDARY, + COLO_EXIT_REASON_ERROR); + break; + case FAILOVER_STATUS_REQUIRE: + qapi_event_send_colo_exit(COLO_MODE_SECONDARY, + COLO_EXIT_REASON_REQUEST); + break; + default: + abort(); + } + if (fb) { qemu_fclose(fb); } @@ -665,7 +855,7 @@ out: if (mis->to_src_file) { qemu_fclose(mis->to_src_file); } - migration_incoming_exit_colo(); + migration_incoming_disable_colo(); rcu_unregister_thread(); return NULL; diff --git a/migration/migration.c b/migration/migration.c index d6ae879dc8..7696729340 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -76,10 +76,8 @@ /* Migration XBZRLE default cache size */ #define DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE (64 * 1024 * 1024) -/* The delay time (in ms) between two COLO checkpoints - * Note: Please change this default value to 10000 when we support hybrid mode. - */ -#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200 +/* The delay time (in ms) between two COLO checkpoints */ +#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100) #define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2 #define DEFAULT_MIGRATE_MULTIFD_PAGE_COUNT 16 @@ -298,6 +296,22 @@ int migrate_send_rp_req_pages(MigrationIncomingState *mis, const char *rbname, return migrate_send_rp_message(mis, msg_type, msglen, bufc); } +static bool migration_colo_enabled; +bool migration_incoming_colo_enabled(void) +{ + return migration_colo_enabled; +} + +void migration_incoming_disable_colo(void) +{ + migration_colo_enabled = false; +} + +void migration_incoming_enable_colo(void) +{ + migration_colo_enabled = true; +} + void qemu_start_incoming_migration(const char *uri, Error **errp) { const char *p; @@ -388,6 +402,7 @@ static void process_incoming_migration_co(void *opaque) MigrationIncomingState *mis = migration_incoming_get_current(); PostcopyState ps; int ret; + Error *local_err = NULL; assert(mis->from_src_file); mis->migration_incoming_co = qemu_coroutine_self(); @@ -419,7 +434,21 @@ static void process_incoming_migration_co(void *opaque) } /* we get COLO info, and know if we are in COLO mode */ - if (!ret && migration_incoming_enable_colo()) { + if (!ret && migration_incoming_colo_enabled()) { + /* Make sure all file formats flush their mutable metadata */ + bdrv_invalidate_cache_all(&local_err); + if (local_err) { + migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE, + MIGRATION_STATUS_FAILED); + error_report_err(local_err); + exit(EXIT_FAILURE); + } + + if (colo_init_ram_cache() < 0) { + error_report("Init ram cache failed"); + exit(EXIT_FAILURE); + } + qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming", colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE); mis->have_colo_incoming_thread = true; @@ -427,6 +456,8 @@ static void process_incoming_migration_co(void *opaque) /* Wait checkpoint incoming thread exit before free resource */ qemu_thread_join(&mis->colo_incoming_thread); + /* We hold the global iothread lock, so it is safe here */ + colo_release_ram_cache(); } if (ret < 0) { @@ -3017,6 +3048,11 @@ static void *migration_thread(void *opaque) qemu_savevm_send_postcopy_advise(s->to_dst_file); } + if (migrate_colo_enabled()) { + /* Notify migration destination that we enable COLO */ + qemu_savevm_send_colo_enable(s->to_dst_file); + } + qemu_savevm_state_setup(s->to_dst_file); s->setup_time = qemu_clock_get_ms(QEMU_CLOCK_HOST) - setup_start; diff --git a/migration/ram.c b/migration/ram.c index bc38d98cc3..7e7deec4d8 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -3447,6 +3447,29 @@ static inline void *host_from_ram_block_offset(RAMBlock *block, return block->host + offset; } +static inline void *colo_cache_from_block_offset(RAMBlock *block, + ram_addr_t offset) +{ + if (!offset_in_ramblock(block, offset)) { + return NULL; + } + if (!block->colo_cache) { + error_report("%s: colo_cache is NULL in block :%s", + __func__, block->idstr); + return NULL; + } + + /* + * During colo checkpoint, we need bitmap of these migrated pages. + * It help us to decide which pages in ram cache should be flushed + * into VM's RAM later. + */ + if (!test_and_set_bit(offset >> TARGET_PAGE_BITS, block->bmap)) { + ram_state->migration_dirty_pages++; + } + return block->colo_cache + offset; +} + /** * ram_handle_compressed: handle the zero page case * @@ -3651,6 +3674,88 @@ static void decompress_data_with_multi_threads(QEMUFile *f, qemu_mutex_unlock(&decomp_done_lock); } +/* + * colo cache: this is for secondary VM, we cache the whole + * memory of the secondary VM, it is need to hold the global lock + * to call this helper. + */ +int colo_init_ram_cache(void) +{ + RAMBlock *block; + + rcu_read_lock(); + RAMBLOCK_FOREACH_MIGRATABLE(block) { + block->colo_cache = qemu_anon_ram_alloc(block->used_length, + NULL, + false); + if (!block->colo_cache) { + error_report("%s: Can't alloc memory for COLO cache of block %s," + "size 0x" RAM_ADDR_FMT, __func__, block->idstr, + block->used_length); + goto out_locked; + } + memcpy(block->colo_cache, block->host, block->used_length); + } + rcu_read_unlock(); + /* + * Record the dirty pages that sent by PVM, we use this dirty bitmap together + * with to decide which page in cache should be flushed into SVM's RAM. Here + * we use the same name 'ram_bitmap' as for migration. + */ + if (ram_bytes_total()) { + RAMBlock *block; + + RAMBLOCK_FOREACH_MIGRATABLE(block) { + unsigned long pages = block->max_length >> TARGET_PAGE_BITS; + + block->bmap = bitmap_new(pages); + bitmap_set(block->bmap, 0, pages); + } + } + ram_state = g_new0(RAMState, 1); + ram_state->migration_dirty_pages = 0; + memory_global_dirty_log_start(); + + return 0; + +out_locked: + + RAMBLOCK_FOREACH_MIGRATABLE(block) { + if (block->colo_cache) { + qemu_anon_ram_free(block->colo_cache, block->used_length); + block->colo_cache = NULL; + } + } + + rcu_read_unlock(); + return -errno; +} + +/* It is need to hold the global lock to call this helper */ +void colo_release_ram_cache(void) +{ + RAMBlock *block; + + memory_global_dirty_log_stop(); + RAMBLOCK_FOREACH_MIGRATABLE(block) { + g_free(block->bmap); + block->bmap = NULL; + } + + rcu_read_lock(); + + RAMBLOCK_FOREACH_MIGRATABLE(block) { + if (block->colo_cache) { + qemu_anon_ram_free(block->colo_cache, block->used_length); + block->colo_cache = NULL; + } + } + + rcu_read_unlock(); + g_free(ram_state); + ram_state = NULL; +} + /** * ram_load_setup: Setup RAM for migration incoming side * @@ -3667,6 +3772,7 @@ static int ram_load_setup(QEMUFile *f, void *opaque) xbzrle_load_setup(); ramblock_recv_map_init(); + return 0; } @@ -3687,6 +3793,7 @@ static int ram_load_cleanup(void *opaque) g_free(rb->receivedmap); rb->receivedmap = NULL; } + return 0; } @@ -3869,6 +3976,46 @@ static bool postcopy_is_running(void) return ps >= POSTCOPY_INCOMING_LISTENING && ps < POSTCOPY_INCOMING_END; } +/* + * Flush content of RAM cache into SVM's memory. + * Only flush the pages that be dirtied by PVM or SVM or both. + */ +static void colo_flush_ram_cache(void) +{ + RAMBlock *block = NULL; + void *dst_host; + void *src_host; + unsigned long offset = 0; + + memory_global_dirty_log_sync(); + rcu_read_lock(); + RAMBLOCK_FOREACH_MIGRATABLE(block) { + migration_bitmap_sync_range(ram_state, block, 0, block->used_length); + } + rcu_read_unlock(); + + trace_colo_flush_ram_cache_begin(ram_state->migration_dirty_pages); + rcu_read_lock(); + block = QLIST_FIRST_RCU(&ram_list.blocks); + + while (block) { + offset = migration_bitmap_find_dirty(ram_state, block, offset); + + if (offset << TARGET_PAGE_BITS >= block->used_length) { + offset = 0; + block = QLIST_NEXT_RCU(block, next); + } else { + migration_bitmap_clear_dirty(ram_state, block, offset); + dst_host = block->host + (offset << TARGET_PAGE_BITS); + src_host = block->colo_cache + (offset << TARGET_PAGE_BITS); + memcpy(dst_host, src_host, TARGET_PAGE_SIZE); + } + } + + rcu_read_unlock(); + trace_colo_flush_ram_cache_end(); +} + static int ram_load(QEMUFile *f, void *opaque, int version_id) { int flags = 0, ret = 0, invalid_flags = 0; @@ -3924,13 +4071,24 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) { RAMBlock *block = ram_block_from_stream(f, flags); - host = host_from_ram_block_offset(block, addr); + /* + * After going into COLO, we should load the Page into colo_cache. + */ + if (migration_incoming_in_colo_state()) { + host = colo_cache_from_block_offset(block, addr); + } else { + host = host_from_ram_block_offset(block, addr); + } if (!host) { error_report("Illegal RAM offset " RAM_ADDR_FMT, addr); ret = -EINVAL; break; } - ramblock_recv_bitmap_set(block, host); + + if (!migration_incoming_in_colo_state()) { + ramblock_recv_bitmap_set(block, host); + } + trace_ram_load_loop(block->idstr, (uint64_t)addr, flags, host); } @@ -4034,6 +4192,10 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) ret |= wait_for_decompress_done(); rcu_read_unlock(); trace_ram_load_complete(ret, seq_iter); + + if (!ret && migration_incoming_in_colo_state()) { + colo_flush_ram_cache(); + } return ret; } diff --git a/migration/ram.h b/migration/ram.h index a139066846..83ff1bc11a 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -71,4 +71,8 @@ int64_t ramblock_recv_bitmap_send(QEMUFile *file, const char *block_name); int ram_dirty_bitmap_reload(MigrationState *s, RAMBlock *rb); +/* ram cache */ +int colo_init_ram_cache(void); +void colo_release_ram_cache(void); + #endif diff --git a/migration/savevm.c b/migration/savevm.c index 2d10e45582..e4caff9a6a 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -56,6 +56,7 @@ #include "io/channel-file.h" #include "sysemu/replay.h" #include "qjson.h" +#include "migration/colo.h" #ifndef ETH_P_RARP #define ETH_P_RARP 0x8035 @@ -82,6 +83,7 @@ enum qemu_vm_cmd { were previously sent during precopy but are dirty. */ MIG_CMD_PACKAGED, /* Send a wrapped stream within this stream */ + MIG_CMD_ENABLE_COLO, /* Enable COLO */ MIG_CMD_POSTCOPY_RESUME, /* resume postcopy on dest */ MIG_CMD_RECV_BITMAP, /* Request for recved bitmap on dst */ MIG_CMD_MAX @@ -841,6 +843,12 @@ static void qemu_savevm_command_send(QEMUFile *f, qemu_fflush(f); } +void qemu_savevm_send_colo_enable(QEMUFile *f) +{ + trace_savevm_send_colo_enable(); + qemu_savevm_command_send(f, MIG_CMD_ENABLE_COLO, 0, NULL); +} + void qemu_savevm_send_ping(QEMUFile *f, uint32_t value) { uint32_t buf; @@ -1370,13 +1378,21 @@ done: return ret; } -static int qemu_save_device_state(QEMUFile *f) +void qemu_savevm_live_state(QEMUFile *f) { - SaveStateEntry *se; + /* save QEMU_VM_SECTION_END section */ + qemu_savevm_state_complete_precopy(f, true, false); + qemu_put_byte(f, QEMU_VM_EOF); +} - qemu_put_be32(f, QEMU_VM_FILE_MAGIC); - qemu_put_be32(f, QEMU_VM_FILE_VERSION); +int qemu_save_device_state(QEMUFile *f) +{ + SaveStateEntry *se; + if (!migration_in_colo_state()) { + qemu_put_be32(f, QEMU_VM_FILE_MAGIC); + qemu_put_be32(f, QEMU_VM_FILE_VERSION); + } cpu_synchronize_all_states(); QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { @@ -1432,8 +1448,6 @@ enum LoadVMExitCodes { LOADVM_QUIT = 1, }; -static int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis); - /* ------ incoming postcopy messages ------ */ /* 'advise' arrives before any transfers just to tell us that a postcopy * *might* happen - it might be skipped if precopy transferred everything @@ -1922,6 +1936,12 @@ static int loadvm_handle_recv_bitmap(MigrationIncomingState *mis, return 0; } +static int loadvm_process_enable_colo(MigrationIncomingState *mis) +{ + migration_incoming_enable_colo(); + return colo_init_ram_cache(); +} + /* * Process an incoming 'QEMU_VM_COMMAND' * 0 just a normal return @@ -2001,6 +2021,9 @@ static int loadvm_process_command(QEMUFile *f) case MIG_CMD_RECV_BITMAP: return loadvm_handle_recv_bitmap(mis, len); + + case MIG_CMD_ENABLE_COLO: + return loadvm_process_enable_colo(mis); } return 0; @@ -2230,7 +2253,7 @@ static bool postcopy_pause_incoming(MigrationIncomingState *mis) return true; } -static int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis) +int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis) { uint8_t section_type; int ret = 0; @@ -2401,6 +2424,22 @@ int qemu_loadvm_state(QEMUFile *f) return ret; } +int qemu_load_device_state(QEMUFile *f) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + int ret; + + /* Load QEMU_VM_SECTION_FULL section */ + ret = qemu_loadvm_state_main(f, mis); + if (ret < 0) { + error_report("Failed to load device state: %d", ret); + return ret; + } + + cpu_synchronize_all_post_init(); + return 0; +} + int save_snapshot(const char *name, Error **errp) { BlockDriverState *bs, *bs1; diff --git a/migration/savevm.h b/migration/savevm.h index a5e65b8ae3..51a4b9caa8 100644 --- a/migration/savevm.h +++ b/migration/savevm.h @@ -55,8 +55,13 @@ void qemu_savevm_send_postcopy_ram_discard(QEMUFile *f, const char *name, uint16_t len, uint64_t *start_list, uint64_t *length_list); +void qemu_savevm_send_colo_enable(QEMUFile *f); +void qemu_savevm_live_state(QEMUFile *f); +int qemu_save_device_state(QEMUFile *f); int qemu_loadvm_state(QEMUFile *f); void qemu_loadvm_state_cleanup(void); +int qemu_loadvm_state_main(QEMUFile *f, MigrationIncomingState *mis); +int qemu_load_device_state(QEMUFile *f); #endif diff --git a/migration/trace-events b/migration/trace-events index 9430f3cbe0..bd2d0cd25a 100644 --- a/migration/trace-events +++ b/migration/trace-events @@ -37,6 +37,7 @@ savevm_send_ping(uint32_t val) "0x%x" savevm_send_postcopy_listen(void) "" savevm_send_postcopy_run(void) "" savevm_send_postcopy_resume(void) "" +savevm_send_colo_enable(void) "" savevm_send_recv_bitmap(char *name) "%s" savevm_state_setup(void) "" savevm_state_resume_prepare(void) "" @@ -101,6 +102,8 @@ ram_dirty_bitmap_sync_start(void) "" ram_dirty_bitmap_sync_wait(void) "" ram_dirty_bitmap_sync_complete(void) "" ram_state_resume_prepare(uint64_t v) "%" PRId64 +colo_flush_ram_cache_begin(uint64_t dirty_pages) "dirty_pages %" PRIu64 +colo_flush_ram_cache_end(void) "" # migration/migration.c await_return_path_close_on_source_close(void) "" diff --git a/net/colo-compare.c b/net/colo-compare.c index dd745a491b..a39191d522 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -27,11 +27,20 @@ #include "qemu/sockets.h" #include "colo.h" #include "sysemu/iothread.h" +#include "net/colo-compare.h" +#include "migration/colo.h" +#include "migration/migration.h" #define TYPE_COLO_COMPARE "colo-compare" #define COLO_COMPARE(obj) \ OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) +static QTAILQ_HEAD(, CompareState) net_compares = + QTAILQ_HEAD_INITIALIZER(net_compares); + +static NotifierList colo_compare_notifiers = + NOTIFIER_LIST_INITIALIZER(colo_compare_notifiers); + #define COMPARE_READ_LEN_MAX NET_BUFSIZE #define MAX_QUEUE_SIZE 1024 @@ -41,6 +50,10 @@ /* TODO: Should be configurable */ #define REGULAR_PACKET_CHECK_MS 3000 +static QemuMutex event_mtx; +static QemuCond event_complete_cond; +static int event_unhandled_count; + /* * + CompareState ++ * | | @@ -87,6 +100,11 @@ typedef struct CompareState { IOThread *iothread; GMainContext *worker_context; QEMUTimer *packet_check_timer; + + QEMUBH *event_bh; + enum colo_event event; + + QTAILQ_ENTRY(CompareState) next; } CompareState; typedef struct CompareClass { @@ -98,6 +116,12 @@ enum { SECONDARY_IN, }; +static void colo_compare_inconsistency_notify(void) +{ + notifier_list_notify(&colo_compare_notifiers, + migrate_get_current()); +} + static int compare_chr_send(CompareState *s, const uint8_t *buf, uint32_t size, @@ -413,10 +437,7 @@ sec: qemu_hexdump((char *)spkt->data, stderr, "colo-compare spkt", spkt->size); - /* - * colo_compare_inconsistent_notify(); - * TODO: notice to checkpoint(); - */ + colo_compare_inconsistency_notify(); } } @@ -547,8 +568,18 @@ static int colo_old_packet_check_one(Packet *pkt, int64_t *check_time) } } +void colo_compare_register_notifier(Notifier *notify) +{ + notifier_list_add(&colo_compare_notifiers, notify); +} + +void colo_compare_unregister_notifier(Notifier *notify) +{ + notifier_remove(notify); +} + static int colo_old_packet_check_one_conn(Connection *conn, - void *user_data) + void *user_data) { GList *result = NULL; int64_t check_time = REGULAR_PACKET_CHECK_MS; @@ -559,10 +590,7 @@ static int colo_old_packet_check_one_conn(Connection *conn, if (result) { /* Do checkpoint will flush old packet */ - /* - * TODO: Notify colo frame to do checkpoint. - * colo_compare_inconsistent_notify(); - */ + colo_compare_inconsistency_notify(); return 0; } @@ -606,11 +634,12 @@ static void colo_compare_packet(CompareState *s, Connection *conn, /* * If one packet arrive late, the secondary_list or * primary_list will be empty, so we can't compare it - * until next comparison. + * until next comparison. If the packets in the list are + * timeout, it will trigger a checkpoint request. */ trace_colo_compare_main("packet different"); g_queue_push_head(&conn->primary_list, pkt); - /* TODO: colo_notify_checkpoint();*/ + colo_compare_inconsistency_notify(); break; } } @@ -736,6 +765,25 @@ static void check_old_packet_regular(void *opaque) REGULAR_PACKET_CHECK_MS); } +/* Public API, Used for COLO frame to notify compare event */ +void colo_notify_compares_event(void *opaque, int event, Error **errp) +{ + CompareState *s; + + qemu_mutex_lock(&event_mtx); + QTAILQ_FOREACH(s, &net_compares, next) { + s->event = event; + qemu_bh_schedule(s->event_bh); + event_unhandled_count++; + } + /* Wait all compare threads to finish handling this event */ + while (event_unhandled_count > 0) { + qemu_cond_wait(&event_complete_cond, &event_mtx); + } + + qemu_mutex_unlock(&event_mtx); +} + static void colo_compare_timer_init(CompareState *s) { AioContext *ctx = iothread_get_aio_context(s->iothread); @@ -756,6 +804,30 @@ static void colo_compare_timer_del(CompareState *s) } } +static void colo_flush_packets(void *opaque, void *user_data); + +static void colo_compare_handle_event(void *opaque) +{ + CompareState *s = opaque; + + switch (s->event) { + case COLO_EVENT_CHECKPOINT: + g_queue_foreach(&s->conn_list, colo_flush_packets, s); + break; + case COLO_EVENT_FAILOVER: + break; + default: + break; + } + + assert(event_unhandled_count > 0); + + qemu_mutex_lock(&event_mtx); + event_unhandled_count--; + qemu_cond_broadcast(&event_complete_cond); + qemu_mutex_unlock(&event_mtx); +} + static void colo_compare_iothread(CompareState *s) { object_ref(OBJECT(s->iothread)); @@ -769,6 +841,7 @@ static void colo_compare_iothread(CompareState *s) s, s->worker_context, true); colo_compare_timer_init(s); + s->event_bh = qemu_bh_new(colo_compare_handle_event, s); } static char *compare_get_pri_indev(Object *obj, Error **errp) @@ -926,8 +999,13 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); + QTAILQ_INSERT_TAIL(&net_compares, s, next); + g_queue_init(&s->conn_list); + qemu_mutex_init(&event_mtx); + qemu_cond_init(&event_complete_cond); + s->connection_track_table = g_hash_table_new_full(connection_key_hash, connection_key_equal, g_free, @@ -990,6 +1068,7 @@ static void colo_compare_init(Object *obj) static void colo_compare_finalize(Object *obj) { CompareState *s = COLO_COMPARE(obj); + CompareState *tmp = NULL; qemu_chr_fe_deinit(&s->chr_pri_in, false); qemu_chr_fe_deinit(&s->chr_sec_in, false); @@ -997,6 +1076,16 @@ static void colo_compare_finalize(Object *obj) if (s->iothread) { colo_compare_timer_del(s); } + + qemu_bh_delete(s->event_bh); + + QTAILQ_FOREACH(tmp, &net_compares, next) { + if (tmp == s) { + QTAILQ_REMOVE(&net_compares, s, next); + break; + } + } + /* Release all unhandled packets after compare thead exited */ g_queue_foreach(&s->conn_list, colo_flush_packets, s); @@ -1009,6 +1098,10 @@ static void colo_compare_finalize(Object *obj) if (s->iothread) { object_unref(OBJECT(s->iothread)); } + + qemu_mutex_destroy(&event_mtx); + qemu_cond_destroy(&event_complete_cond); + g_free(s->pri_indev); g_free(s->sec_indev); g_free(s->outdev); diff --git a/net/colo-compare.h b/net/colo-compare.h new file mode 100644 index 0000000000..22ddd512e2 --- /dev/null +++ b/net/colo-compare.h @@ -0,0 +1,24 @@ +/* + * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO) + * (a.k.a. Fault Tolerance or Continuous Replication) + * + * Copyright (c) 2017 HUAWEI TECHNOLOGIES CO., LTD. + * Copyright (c) 2017 FUJITSU LIMITED + * Copyright (c) 2017 Intel Corporation + * + * Authors: + * zhanghailiang <zhang.zhanghailiang@huawei.com> + * Zhang Chen <zhangckid@gmail.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or + * later. See the COPYING file in the top-level directory. + */ + +#ifndef QEMU_COLO_COMPARE_H +#define QEMU_COLO_COMPARE_H + +void colo_notify_compares_event(void *opaque, int event, Error **errp); +void colo_compare_register_notifier(Notifier *notify); +void colo_compare_unregister_notifier(Notifier *notify); + +#endif /* QEMU_COLO_COMPARE_H */ diff --git a/net/colo.c b/net/colo.c index 6dda4ed66e..49176bf07b 100644 --- a/net/colo.c +++ b/net/colo.c @@ -137,7 +137,7 @@ Connection *connection_new(ConnectionKey *key) conn->ip_proto = key->ip_proto; conn->processing = false; conn->offset = 0; - conn->syn_flag = 0; + conn->tcp_state = TCPS_CLOSED; conn->pack = 0; conn->sack = 0; g_queue_init(&conn->primary_list); @@ -221,3 +221,11 @@ Connection *connection_get(GHashTable *connection_track_table, return conn; } + +bool connection_has_tracked(GHashTable *connection_track_table, + ConnectionKey *key) +{ + Connection *conn = g_hash_table_lookup(connection_track_table, key); + + return conn ? true : false; +} diff --git a/net/colo.h b/net/colo.h index da6c36dcf7..11c5226488 100644 --- a/net/colo.h +++ b/net/colo.h @@ -18,6 +18,7 @@ #include "slirp/slirp.h" #include "qemu/jhash.h" #include "qemu/timer.h" +#include "slirp/tcp.h" #define HASHTABLE_MAX_SIZE 16384 @@ -81,11 +82,9 @@ typedef struct Connection { uint32_t sack; /* offset = secondary_seq - primary_seq */ tcp_seq offset; - /* - * we use this flag update offset func - * run once in independent tcp connection - */ - int syn_flag; + + int tcp_state; /* TCP FSM state */ + tcp_seq fin_ack_seq; /* the seq of 'fin=1,ack=1' */ } Connection; uint32_t connection_key_hash(const void *opaque); @@ -99,6 +98,8 @@ void connection_destroy(void *opaque); Connection *connection_get(GHashTable *connection_track_table, ConnectionKey *key, GQueue *conn_list); +bool connection_has_tracked(GHashTable *connection_track_table, + ConnectionKey *key); void connection_hashtable_reset(GHashTable *connection_track_table); Packet *packet_new(const void *data, int size, int vnet_hdr_len); void packet_destroy(void *opaque, void *user_data); diff --git a/net/filter-rewriter.c b/net/filter-rewriter.c index f584e4eba4..bb8f4d93b1 100644 --- a/net/filter-rewriter.c +++ b/net/filter-rewriter.c @@ -20,11 +20,15 @@ #include "qemu/main-loop.h" #include "qemu/iov.h" #include "net/checksum.h" +#include "net/colo.h" +#include "migration/colo.h" #define FILTER_COLO_REWRITER(obj) \ OBJECT_CHECK(RewriterState, (obj), TYPE_FILTER_REWRITER) #define TYPE_FILTER_REWRITER "filter-rewriter" +#define FAILOVER_MODE_ON true +#define FAILOVER_MODE_OFF false typedef struct RewriterState { NetFilterState parent_obj; @@ -32,8 +36,14 @@ typedef struct RewriterState { /* hashtable to save connection */ GHashTable *connection_track_table; bool vnet_hdr; + bool failover_mode; } RewriterState; +static void filter_rewriter_failover_mode(RewriterState *s) +{ + s->failover_mode = FAILOVER_MODE_ON; +} + static void filter_rewriter_flush(NetFilterState *nf) { RewriterState *s = FILTER_COLO_REWRITER(nf); @@ -59,9 +69,9 @@ static int is_tcp_packet(Packet *pkt) } /* handle tcp packet from primary guest */ -static int handle_primary_tcp_pkt(NetFilterState *nf, +static int handle_primary_tcp_pkt(RewriterState *rf, Connection *conn, - Packet *pkt) + Packet *pkt, ConnectionKey *key) { struct tcphdr *tcp_pkt; @@ -74,23 +84,28 @@ static int handle_primary_tcp_pkt(NetFilterState *nf, trace_colo_filter_rewriter_conn_offset(conn->offset); } + if (((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == (TH_ACK | TH_SYN)) && + conn->tcp_state == TCPS_SYN_SENT) { + conn->tcp_state = TCPS_ESTABLISHED; + } + if (((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == TH_SYN)) { /* * we use this flag update offset func * run once in independent tcp connection */ - conn->syn_flag = 1; + conn->tcp_state = TCPS_SYN_RECEIVED; } if (((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == TH_ACK)) { - if (conn->syn_flag) { + if (conn->tcp_state == TCPS_SYN_RECEIVED) { /* * offset = secondary_seq - primary seq * ack packet sent by guest from primary node, * so we use th_ack - 1 get primary_seq */ conn->offset -= (ntohl(tcp_pkt->th_ack) - 1); - conn->syn_flag = 0; + conn->tcp_state = TCPS_ESTABLISHED; } if (conn->offset) { /* handle packets to the secondary from the primary */ @@ -99,15 +114,66 @@ static int handle_primary_tcp_pkt(NetFilterState *nf, net_checksum_calculate((uint8_t *)pkt->data + pkt->vnet_hdr_len, pkt->size - pkt->vnet_hdr_len); } + + /* + * Passive close step 3 + */ + if ((conn->tcp_state == TCPS_LAST_ACK) && + (ntohl(tcp_pkt->th_ack) == (conn->fin_ack_seq + 1))) { + conn->tcp_state = TCPS_CLOSED; + g_hash_table_remove(rf->connection_track_table, key); + } + } + + if ((tcp_pkt->th_flags & TH_FIN) == TH_FIN) { + /* + * Passive close. + * Step 1: + * The *server* side of this connect is VM, *client* tries to close + * the connection. We will into CLOSE_WAIT status. + * + * Step 2: + * In this step we will into LAST_ACK status. + * + * We got 'fin=1, ack=1' packet from server side, we need to + * record the seq of 'fin=1, ack=1' packet. + * + * Step 3: + * We got 'ack=1' packets from client side, it acks 'fin=1, ack=1' + * packet from server side. From this point, we can ensure that there + * will be no packets in the connection, except that, some errors + * happen between the path of 'filter object' and vNIC, if this rare + * case really happen, we can still create a new connection, + * So it is safe to remove the connection from connection_track_table. + * + */ + if (conn->tcp_state == TCPS_ESTABLISHED) { + conn->tcp_state = TCPS_CLOSE_WAIT; + } + + /* + * Active close step 2. + */ + if (conn->tcp_state == TCPS_FIN_WAIT_1) { + conn->tcp_state = TCPS_TIME_WAIT; + /* + * For simplify implementation, we needn't wait 2MSL time + * in filter rewriter. Because guest kernel will track the + * TCP status and wait 2MSL time, if client resend the FIN + * packet, guest will apply the last ACK too. + */ + conn->tcp_state = TCPS_CLOSED; + g_hash_table_remove(rf->connection_track_table, key); + } } return 0; } /* handle tcp packet from secondary guest */ -static int handle_secondary_tcp_pkt(NetFilterState *nf, +static int handle_secondary_tcp_pkt(RewriterState *rf, Connection *conn, - Packet *pkt) + Packet *pkt, ConnectionKey *key) { struct tcphdr *tcp_pkt; @@ -121,7 +187,8 @@ static int handle_secondary_tcp_pkt(NetFilterState *nf, trace_colo_filter_rewriter_conn_offset(conn->offset); } - if (((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == (TH_ACK | TH_SYN))) { + if (conn->tcp_state == TCPS_SYN_RECEIVED && + ((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == (TH_ACK | TH_SYN))) { /* * save offset = secondary_seq and then * in handle_primary_tcp_pkt make offset @@ -130,6 +197,12 @@ static int handle_secondary_tcp_pkt(NetFilterState *nf, conn->offset = ntohl(tcp_pkt->th_seq); } + /* VM active connect */ + if (conn->tcp_state == TCPS_CLOSED && + ((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == TH_SYN)) { + conn->tcp_state = TCPS_SYN_SENT; + } + if ((tcp_pkt->th_flags & (TH_ACK | TH_SYN)) == TH_ACK) { /* Only need to adjust seq while offset is Non-zero */ if (conn->offset) { @@ -141,6 +214,32 @@ static int handle_secondary_tcp_pkt(NetFilterState *nf, } } + /* + * Passive close step 2: + */ + if (conn->tcp_state == TCPS_CLOSE_WAIT && + (tcp_pkt->th_flags & (TH_ACK | TH_FIN)) == (TH_ACK | TH_FIN)) { + conn->fin_ack_seq = ntohl(tcp_pkt->th_seq); + conn->tcp_state = TCPS_LAST_ACK; + } + + /* + * Active close + * + * Step 1: + * The *server* side of this connect is VM, *server* tries to close + * the connection. + * + * Step 2: + * We will into CLOSE_WAIT status. + * We simplify the TCPS_FIN_WAIT_2, TCPS_TIME_WAIT and + * CLOSING status. + */ + if (conn->tcp_state == TCPS_ESTABLISHED && + (tcp_pkt->th_flags & (TH_ACK | TH_FIN)) == TH_FIN) { + conn->tcp_state = TCPS_FIN_WAIT_1; + } + return 0; } @@ -184,13 +283,20 @@ static ssize_t colo_rewriter_receive_iov(NetFilterState *nf, */ reverse_connection_key(&key); } + + /* After failover we needn't change new TCP packet */ + if (s->failover_mode && + !connection_has_tracked(s->connection_track_table, &key)) { + goto out; + } + conn = connection_get(s->connection_track_table, &key, NULL); if (sender == nf->netdev) { /* NET_FILTER_DIRECTION_TX */ - if (!handle_primary_tcp_pkt(nf, conn, pkt)) { + if (!handle_primary_tcp_pkt(s, conn, pkt, &key)) { qemu_net_queue_send(s->incoming_queue, sender, 0, (const uint8_t *)pkt->data, pkt->size, NULL); packet_destroy(pkt, NULL); @@ -203,7 +309,7 @@ static ssize_t colo_rewriter_receive_iov(NetFilterState *nf, } } else { /* NET_FILTER_DIRECTION_RX */ - if (!handle_secondary_tcp_pkt(nf, conn, pkt)) { + if (!handle_secondary_tcp_pkt(s, conn, pkt, &key)) { qemu_net_queue_send(s->incoming_queue, sender, 0, (const uint8_t *)pkt->data, pkt->size, NULL); packet_destroy(pkt, NULL); @@ -217,11 +323,49 @@ static ssize_t colo_rewriter_receive_iov(NetFilterState *nf, } } +out: packet_destroy(pkt, NULL); pkt = NULL; return 0; } +static void reset_seq_offset(gpointer key, gpointer value, gpointer user_data) +{ + Connection *conn = (Connection *)value; + + conn->offset = 0; +} + +static gboolean offset_is_nonzero(gpointer key, + gpointer value, + gpointer user_data) +{ + Connection *conn = (Connection *)value; + + return conn->offset ? true : false; +} + +static void colo_rewriter_handle_event(NetFilterState *nf, int event, + Error **errp) +{ + RewriterState *rs = FILTER_COLO_REWRITER(nf); + + switch (event) { + case COLO_EVENT_CHECKPOINT: + g_hash_table_foreach(rs->connection_track_table, + reset_seq_offset, NULL); + break; + case COLO_EVENT_FAILOVER: + if (!g_hash_table_find(rs->connection_track_table, + offset_is_nonzero, NULL)) { + filter_rewriter_failover_mode(rs); + } + break; + default: + break; + } +} + static void colo_rewriter_cleanup(NetFilterState *nf) { RewriterState *s = FILTER_COLO_REWRITER(nf); @@ -265,6 +409,7 @@ static void filter_rewriter_init(Object *obj) RewriterState *s = FILTER_COLO_REWRITER(obj); s->vnet_hdr = false; + s->failover_mode = FAILOVER_MODE_OFF; object_property_add_bool(obj, "vnet_hdr_support", filter_rewriter_get_vnet_hdr, filter_rewriter_set_vnet_hdr, NULL); @@ -277,6 +422,7 @@ static void colo_rewriter_class_init(ObjectClass *oc, void *data) nfc->setup = colo_rewriter_setup; nfc->cleanup = colo_rewriter_cleanup; nfc->receive_iov = colo_rewriter_receive_iov; + nfc->handle_event = colo_rewriter_handle_event; } static const TypeInfo colo_rewriter_info = { diff --git a/net/filter.c b/net/filter.c index 2fd7d7d663..c9f9e5fa08 100644 --- a/net/filter.c +++ b/net/filter.c @@ -17,6 +17,8 @@ #include "net/vhost_net.h" #include "qom/object_interfaces.h" #include "qemu/iov.h" +#include "net/colo.h" +#include "migration/colo.h" static inline bool qemu_can_skip_netfilter(NetFilterState *nf) { @@ -245,11 +247,26 @@ static void netfilter_finalize(Object *obj) g_free(nf->netdev_id); } +static void default_handle_event(NetFilterState *nf, int event, Error **errp) +{ + switch (event) { + case COLO_EVENT_CHECKPOINT: + break; + case COLO_EVENT_FAILOVER: + object_property_set_str(OBJECT(nf), "off", "status", errp); + break; + default: + break; + } +} + static void netfilter_class_init(ObjectClass *oc, void *data) { UserCreatableClass *ucc = USER_CREATABLE_CLASS(oc); + NetFilterClass *nfc = NETFILTER_CLASS(oc); ucc->complete = netfilter_complete; + nfc->handle_event = default_handle_event; } static const TypeInfo netfilter_info = { @@ -712,10 +712,15 @@ ssize_t qemu_deliver_packet_iov(NetClientState *sender, void *opaque) { NetClientState *nc = opaque; + size_t size = iov_size(iov, iovcnt); int ret; + if (size > INT_MAX) { + return size; + } + if (nc->link_down) { - return iov_size(iov, iovcnt); + return size; } if (nc->receive_disabled) { @@ -1335,6 +1340,25 @@ void hmp_info_network(Monitor *mon, const QDict *qdict) } } +void colo_notify_filters_event(int event, Error **errp) +{ + NetClientState *nc; + NetFilterState *nf; + NetFilterClass *nfc = NULL; + Error *local_err = NULL; + + QTAILQ_FOREACH(nc, &net_clients, next) { + QTAILQ_FOREACH(nf, &nc->filters, next) { + nfc = NETFILTER_GET_CLASS(OBJECT(nf)); + nfc->handle_event(nf, event, &local_err); + if (local_err) { + error_propagate(errp, local_err); + return; + } + } + } +} + void qmp_set_link(const char *name, bool up, Error **errp) { NetClientState *ncs[MAX_QUEUE_NUM]; diff --git a/qapi/migration.json b/qapi/migration.json index 6e8c21258a..0928f4b727 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -923,18 +923,18 @@ ## # @COLOMode: # -# The colo mode +# The COLO current mode. # -# @unknown: unknown mode +# @none: COLO is disabled. # -# @primary: master side +# @primary: COLO node in primary side. # -# @secondary: slave side +# @secondary: COLO node in slave side. # # Since: 2.8 ## { 'enum': 'COLOMode', - 'data': [ 'unknown', 'primary', 'secondary'] } + 'data': [ 'none', 'primary', 'secondary'] } ## # @FailoverStatus: @@ -957,6 +957,44 @@ 'data': [ 'none', 'require', 'active', 'completed', 'relaunch' ] } ## +# @COLO_EXIT: +# +# Emitted when VM finishes COLO mode due to some errors happening or +# at the request of users. +# +# @mode: report COLO mode when COLO exited. +# +# @reason: describes the reason for the COLO exit. +# +# Since: 3.1 +# +# Example: +# +# <- { "timestamp": {"seconds": 2032141960, "microseconds": 417172}, +# "event": "COLO_EXIT", "data": {"mode": "primary", "reason": "request" } } +# +## +{ 'event': 'COLO_EXIT', + 'data': {'mode': 'COLOMode', 'reason': 'COLOExitReason' } } + +## +# @COLOExitReason: +# +# The reason for a COLO exit +# +# @none: no failover has ever happened. This can't occur in the +# COLO_EXIT event, only in the result of query-colo-status. +# +# @request: COLO exit is due to an external request +# +# @error: COLO exit is due to an internal error +# +# Since: 3.1 +## +{ 'enum': 'COLOExitReason', + 'data': [ 'none', 'request', 'error' ] } + +## # @x-colo-lost-heartbeat: # # Tell qemu that heartbeat is lost, request it to do takeover procedures. @@ -1270,6 +1308,38 @@ { 'command': 'xen-colo-do-checkpoint' } ## +# @COLOStatus: +# +# The result format for 'query-colo-status'. +# +# @mode: COLO running mode. If COLO is running, this field will return +# 'primary' or 'secondary'. +# +# @reason: describes the reason for the COLO exit. +# +# Since: 3.0 +## +{ 'struct': 'COLOStatus', + 'data': { 'mode': 'COLOMode', 'reason': 'COLOExitReason' } } + +## +# @query-colo-status: +# +# Query COLO status while the vm is running. +# +# Returns: A @COLOStatus object showing the status. +# +# Example: +# +# -> { "execute": "query-colo-status" } +# <- { "return": { "mode": "primary", "active": true, "reason": "request" } } +# +# Since: 3.0 +## +{ 'command': 'query-colo-status', + 'returns': 'COLOStatus' } + +## # @migrate-recover: # # Provide a recovery migration stream URI. diff --git a/qemu-options.hx b/qemu-options.hx index f139459e80..0f2f65ee40 100644 --- a/qemu-options.hx +++ b/qemu-options.hx @@ -2256,7 +2256,7 @@ qemu-system-i386 linux.img \ -netdev socket,id=n2,mcast=230.0.0.1:1234 # launch yet another QEMU instance on same "bus" qemu-system-i386 linux.img \ - -device e1000,netdev=n3,macaddr=52:54:00:12:34:58 \ + -device e1000,netdev=n3,mac=52:54:00:12:34:58 \ -netdev socket,id=n3,mcast=230.0.0.1:1234 @end example @@ -4365,8 +4365,6 @@ int main(int argc, char **argv, char **envp) #endif } - colo_info_init(); - if (net_init_clients(&err) < 0) { error_report_err(err); exit(1); |