diff options
author | Fam Zheng <famz@redhat.com> | 2015-01-30 10:49:43 +0800 |
---|---|---|
committer | Stefan Hajnoczi <stefanha@redhat.com> | 2015-02-16 15:07:18 +0000 |
commit | a628daa42db50a3fc1203dd81bba5a2879b76656 (patch) | |
tree | eee3d12c1860bf98ab8b071da52b118326e99ab4 /scripts/qtest.py | |
parent | a91f9584565901635295b08f98d5f3048981c2f5 (diff) |
qtest: Add scripts/qtest.py
This adds scripts/qtest.py as a python library for qtest protocol.
This is a skeleton with a basic "cmd" method to execute a command,
reading and parsing of qtest output could be added later on demand.
Signed-off-by: Fam Zheng <famz@redhat.com>
Reviewed-by: Max Reitz <mreitz@redhat.com>
Message-id: 1422586186-9925-3-git-send-email-famz@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Diffstat (limited to 'scripts/qtest.py')
-rw-r--r-- | scripts/qtest.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/scripts/qtest.py b/scripts/qtest.py new file mode 100644 index 0000000000..a9714453a2 --- /dev/null +++ b/scripts/qtest.py @@ -0,0 +1,71 @@ +# QEMU qtest library +# +# Copyright (C) 2015 Red Hat Inc. +# +# Authors: +# Fam Zheng <famz@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. +# +# Based on qmp.py. +# + +import errno +import socket + +class QEMUQtestProtocol(object): + def __init__(self, address, server=False): + """ + Create a QEMUQtestProtocol object. + + @param address: QEMU address, can be either a unix socket path (string) + or a tuple in the form ( address, port ) for a TCP + connection + @param server: server mode, listens on the socket (bool) + @raise socket.error on socket connection errors + @note No connection is established, this is done by the connect() or + accept() methods + """ + self._address = address + self._sock = self._get_sock() + if server: + self._sock.bind(self._address) + self._sock.listen(1) + + def _get_sock(self): + if isinstance(self._address, tuple): + family = socket.AF_INET + else: + family = socket.AF_UNIX + return socket.socket(family, socket.SOCK_STREAM) + + def connect(self): + """ + Connect to the qtest socket. + + @raise socket.error on socket connection errors + """ + self._sock.connect(self._address) + + def accept(self): + """ + Await connection from QEMU. + + @raise socket.error on socket connection errors + """ + self._sock, _ = self._sock.accept() + + def cmd(self, qtest_cmd): + """ + Send a qtest command on the wire. + + @param qtest_cmd: qtest command text to be sent + """ + self._sock.sendall(qtest_cmd + "\n") + + def close(self): + self._sock.close() + + def settimeout(self, timeout): + self._sock.settimeout(timeout) |