diff options
Diffstat (limited to 'net/colo-compare.c')
-rw-r--r-- | net/colo-compare.c | 155 |
1 files changed, 137 insertions, 18 deletions
diff --git a/net/colo-compare.c b/net/colo-compare.c index 103297b7f4..909dd6c6eb 100644 --- a/net/colo-compare.c +++ b/net/colo-compare.c @@ -83,11 +83,14 @@ typedef struct CompareState { char *pri_indev; char *sec_indev; char *outdev; + char *notify_dev; CharBackend chr_pri_in; CharBackend chr_sec_in; CharBackend chr_out; + CharBackend chr_notify_dev; SocketReadState pri_rs; SocketReadState sec_rs; + SocketReadState notify_rs; bool vnet_hdr; /* @@ -117,16 +120,33 @@ enum { SECONDARY_IN, }; -static void colo_compare_inconsistency_notify(void) -{ - notifier_list_notify(&colo_compare_notifiers, - migrate_get_current()); -} static int compare_chr_send(CompareState *s, const uint8_t *buf, uint32_t size, - uint32_t vnet_hdr_len); + uint32_t vnet_hdr_len, + bool notify_remote_frame); + +static void notify_remote_frame(CompareState *s) +{ + char msg[] = "DO_CHECKPOINT"; + int ret = 0; + + ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); + if (ret < 0) { + error_report("Notify Xen COLO-frame failed"); + } +} + +static void colo_compare_inconsistency_notify(CompareState *s) +{ + if (s->notify_dev) { + notify_remote_frame(s); + } else { + notifier_list_notify(&colo_compare_notifiers, + migrate_get_current()); + } +} static gint seq_sorter(Packet *a, Packet *b, gpointer data) { @@ -238,7 +258,8 @@ static void colo_release_primary_pkt(CompareState *s, Packet *pkt) ret = compare_chr_send(s, pkt->data, pkt->size, - pkt->vnet_hdr_len); + pkt->vnet_hdr_len, + false); if (ret < 0) { error_report("colo send primary packet failed"); } @@ -430,7 +451,7 @@ sec: qemu_hexdump((char *)spkt->data, stderr, "colo-compare spkt", spkt->size); - colo_compare_inconsistency_notify(); + colo_compare_inconsistency_notify(s); } } @@ -572,7 +593,7 @@ void colo_compare_unregister_notifier(Notifier *notify) } static int colo_old_packet_check_one_conn(Connection *conn, - void *user_data) + CompareState *s) { GList *result = NULL; int64_t check_time = REGULAR_PACKET_CHECK_MS; @@ -583,7 +604,7 @@ static int colo_old_packet_check_one_conn(Connection *conn, if (result) { /* Do checkpoint will flush old packet */ - colo_compare_inconsistency_notify(); + colo_compare_inconsistency_notify(s); return 0; } @@ -603,7 +624,7 @@ static void colo_old_packet_check(void *opaque) * If we find one old packet, stop finding job and notify * COLO frame do checkpoint. */ - g_queue_find_custom(&s->conn_list, NULL, + g_queue_find_custom(&s->conn_list, s, (GCompareFunc)colo_old_packet_check_one_conn); } @@ -632,7 +653,8 @@ static void colo_compare_packet(CompareState *s, Connection *conn, */ trace_colo_compare_main("packet different"); g_queue_push_head(&conn->primary_list, pkt); - colo_compare_inconsistency_notify(); + + colo_compare_inconsistency_notify(s); break; } } @@ -668,7 +690,8 @@ static void colo_compare_connection(void *opaque, void *user_data) static int compare_chr_send(CompareState *s, const uint8_t *buf, uint32_t size, - uint32_t vnet_hdr_len) + uint32_t vnet_hdr_len, + bool notify_remote_frame) { int ret = 0; uint32_t len = htonl(size); @@ -677,7 +700,14 @@ static int compare_chr_send(CompareState *s, return 0; } - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); + if (notify_remote_frame) { + ret = qemu_chr_fe_write_all(&s->chr_notify_dev, + (uint8_t *)&len, + sizeof(len)); + } else { + ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); + } + if (ret != sizeof(len)) { goto err; } @@ -688,13 +718,26 @@ static int compare_chr_send(CompareState *s, * know how to parse net packet correctly. */ len = htonl(vnet_hdr_len); - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); + + if (!notify_remote_frame) { + ret = qemu_chr_fe_write_all(&s->chr_out, + (uint8_t *)&len, + sizeof(len)); + } + if (ret != sizeof(len)) { goto err; } } - ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); + if (notify_remote_frame) { + ret = qemu_chr_fe_write_all(&s->chr_notify_dev, + (uint8_t *)buf, + size); + } else { + ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size); + } + if (ret != size) { goto err; } @@ -744,6 +787,19 @@ static void compare_sec_chr_in(void *opaque, const uint8_t *buf, int size) } } +static void compare_notify_chr(void *opaque, const uint8_t *buf, int size) +{ + CompareState *s = COLO_COMPARE(opaque); + int ret; + + ret = net_fill_rstate(&s->notify_rs, buf, size); + if (ret == -1) { + qemu_chr_fe_set_handlers(&s->chr_notify_dev, NULL, NULL, NULL, NULL, + NULL, NULL, true); + error_report("colo-compare notify_dev error"); + } +} + /* * Check old packet regularly so it can watch for any packets * that the secondary hasn't produced equivalents of. @@ -831,6 +887,11 @@ static void colo_compare_iothread(CompareState *s) qemu_chr_fe_set_handlers(&s->chr_sec_in, compare_chr_can_read, compare_sec_chr_in, NULL, NULL, s, s->worker_context, true); + if (s->notify_dev) { + qemu_chr_fe_set_handlers(&s->chr_notify_dev, compare_chr_can_read, + compare_notify_chr, NULL, NULL, + s, s->worker_context, true); + } colo_compare_timer_init(s); s->event_bh = qemu_bh_new(colo_compare_handle_event, s); @@ -897,6 +958,21 @@ static void compare_set_vnet_hdr(Object *obj, s->vnet_hdr = value; } +static char *compare_get_notify_dev(Object *obj, Error **errp) +{ + CompareState *s = COLO_COMPARE(obj); + + return g_strdup(s->notify_dev); +} + +static void compare_set_notify_dev(Object *obj, const char *value, Error **errp) +{ + CompareState *s = COLO_COMPARE(obj); + + g_free(s->notify_dev); + s->notify_dev = g_strdup(value); +} + static void compare_pri_rs_finalize(SocketReadState *pri_rs) { CompareState *s = container_of(pri_rs, CompareState, pri_rs); @@ -907,7 +983,8 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs) compare_chr_send(s, pri_rs->buf, pri_rs->packet_len, - pri_rs->vnet_hdr_len); + pri_rs->vnet_hdr_len, + false); } else { /* compare packet in the specified connection */ colo_compare_connection(conn, s); @@ -927,6 +1004,27 @@ static void compare_sec_rs_finalize(SocketReadState *sec_rs) } } +static void compare_notify_rs_finalize(SocketReadState *notify_rs) +{ + CompareState *s = container_of(notify_rs, CompareState, notify_rs); + + /* Get Xen colo-frame's notify and handle the message */ + char *data = g_memdup(notify_rs->buf, notify_rs->packet_len); + char msg[] = "COLO_COMPARE_GET_XEN_INIT"; + int ret; + + if (!strcmp(data, "COLO_USERSPACE_PROXY_INIT")) { + ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); + if (ret < 0) { + error_report("Notify Xen COLO-frame INIT failed"); + } + } + + if (!strcmp(data, "COLO_CHECKPOINT")) { + /* colo-compare do checkpoint, flush pri packet and remove sec packet */ + g_queue_foreach(&s->conn_list, colo_flush_packets, s); + } +} /* * Return 0 is success. @@ -997,6 +1095,17 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp) net_socket_rs_init(&s->pri_rs, compare_pri_rs_finalize, s->vnet_hdr); net_socket_rs_init(&s->sec_rs, compare_sec_rs_finalize, s->vnet_hdr); + /* Try to enable remote notify chardev, currently just for Xen COLO */ + if (s->notify_dev) { + if (find_and_check_chardev(&chr, s->notify_dev, errp) || + !qemu_chr_fe_init(&s->chr_notify_dev, chr, errp)) { + return; + } + + net_socket_rs_init(&s->notify_rs, compare_notify_rs_finalize, + s->vnet_hdr); + } + QTAILQ_INSERT_TAIL(&net_compares, s, next); g_queue_init(&s->conn_list); @@ -1024,7 +1133,8 @@ static void colo_flush_packets(void *opaque, void *user_data) compare_chr_send(s, pkt->data, pkt->size, - pkt->vnet_hdr_len); + pkt->vnet_hdr_len, + false); packet_destroy(pkt, NULL); } while (!g_queue_is_empty(&conn->secondary_list)) { @@ -1057,6 +1167,10 @@ static void colo_compare_init(Object *obj) (Object **)&s->iothread, object_property_allow_set_link, OBJ_PROP_LINK_STRONG, NULL); + /* This parameter just for Xen COLO */ + object_property_add_str(obj, "notify_dev", + compare_get_notify_dev, compare_set_notify_dev, + NULL); s->vnet_hdr = false; object_property_add_bool(obj, "vnet_hdr_support", compare_get_vnet_hdr, @@ -1071,6 +1185,10 @@ static void colo_compare_finalize(Object *obj) qemu_chr_fe_deinit(&s->chr_pri_in, false); qemu_chr_fe_deinit(&s->chr_sec_in, false); qemu_chr_fe_deinit(&s->chr_out, false); + if (s->notify_dev) { + qemu_chr_fe_deinit(&s->chr_notify_dev, false); + } + if (s->iothread) { colo_compare_timer_del(s); } @@ -1103,6 +1221,7 @@ static void colo_compare_finalize(Object *obj) g_free(s->pri_indev); g_free(s->sec_indev); g_free(s->outdev); + g_free(s->notify_dev); } static const TypeInfo colo_compare_info = { |