#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ This takes a crashing qtest trace and tries to remove superflous operations """ import sys import os import subprocess import time import struct QEMU_ARGS = None QEMU_PATH = None TIMEOUT = 5 CRASH_TOKEN = None write_suffix_lookup = {"b": (1, "B"), "w": (2, "H"), "l": (4, "L"), "q": (8, "Q")} def usage(): sys.exit("""\ Usage: QEMU_PATH="/path/to/qemu" QEMU_ARGS="args" {} input_trace output_trace By default, will try to use the second-to-last line in the output to identify whether the crash occred. Optionally, manually set a string that idenitifes the crash by setting CRASH_TOKEN= """.format((sys.argv[0]))) deduplication_note = """\n\ Note: While trimming the input, sometimes the mutated trace triggers a different type crash but indicates the same bug. Under this situation, our minimizer is incapable of recognizing and stopped from removing it. In the future, we may use a more sophisticated crash case deduplication method. \n""" def check_if_trace_crashes(trace, path): with open(path, "w") as tracefile: tracefile.write("".join(trace)) rc = subprocess.Popen("timeout -s 9 {timeout}s {qemu_path} {qemu_args} 2>&1\ < {trace_path}".format(timeout=TIMEOUT, qemu_path=QEMU_PATH, qemu_args=QEMU_ARGS, trace_path=path), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, encoding="utf-8") global CRASH_TOKEN if CRASH_TOKEN is None: try: outs, _ = rc.communicate(timeout=5) CRASH_TOKEN = " ".join(outs.splitlines()[-2].split()[0:3]) except subprocess.TimeoutExpired: print("subprocess.TimeoutExpired") return False print("Identifying Crashes by this string: {}".format(CRASH_TOKEN)) global deduplication_note print(deduplication_note) return True for line in iter(rc.stdout.readline, ""): if "CLOSED" in line: return False if CRASH_TOKEN in line: return True print("\nWarning:") print(" There is no 'CLOSED'or CRASH_TOKEN in the stdout of subprocess.") print(" Usually this indicates a different type of crash.\n") return False def minimize_trace(inpath, outpath): global TIMEOUT with open(inpath) as f: trace = f.readlines() start = time.time() if not check_if_trace_crashes(trace, outpath): sys.exit("The input qtest trace didn't cause a crash...") end = time.time() print("Crashed in {} seconds".format(end-start)) TIMEOUT = (end-start)*5 print("Setting the timeout for {} seconds".format(TIMEOUT)) i = 0 newtrace = trace[:] # For each line while i < len(newtrace): # 1.) Try to remove it completely and reproduce the crash. If it works, # we're done. prior = newtrace[i] print("Trying to remove {}".format(newtrace[i])) # Try to remove the line completely newtrace[i] = "" if check_if_trace_crashes(newtrace, outpath): i += 1 continue newtrace[i] = prior # 2.) Try to replace write{bwlq} commands with a write addr, len # command. Since this can require swapping endianness, try both LE and # BE options. We do this, so we can "trim" the writes in (3) if (newtrace[i].startswith("write") and not newtrace[i].startswith("write ")): suffix = newtrace[i].split()[0][-1] assert(suffix in write_suffix_lookup) addr = int(newtrace[i].split()[1], 16) value = int(newtrace[i].split()[2], 16) for endianness in ['<', '>']: data = struct.pack("{end}{size}".format(end=endianness, size=write_suffix_lookup[suffix][1]), value) newtrace[i] = "write {addr} {size} 0x{data}\n".format( addr=hex(addr), size=hex(write_suffix_lookup[suffix][0]), data=data.hex()) if(check_if_trace_crashes(newtrace, outpath)): break else: newtrace[i] = prior # 3.) If it is a qtest write command: write addr len data, try to split # it into two separate write commands. If splitting the write down the # middle does not work, try to move the pivot "left" and retry, until # there is no space left. The idea is to prune unneccessary bytes from # long writes, while accommodating arbitrary MemoryRegion access sizes # and alignments. if newtrace[i].startswith("write "): addr = int(newtrace[i].split()[1], 16) length = int(newtrace[i].split()[2], 16) data = newtrace[i].split()[3][2:] if length > 1: leftlength = int(length/2) rightlength = length - leftlength newtrace.insert(i+1, "") while leftlength > 0: newtrace[i] = "write {addr} {size} 0x{data}\n".format( addr=hex(addr), size=hex(leftlength), data=data[:leftlength*2]) newtrace[i+1] = "write {addr} {size} 0x{data}\n".format( addr=hex(addr+leftlength), size=hex(rightlength), data=data[leftlength*2:]) if check_if_trace_crashes(newtrace, outpath): break else: leftlength -= 1 rightlength += 1 if check_if_trace_crashes(newtrace, outpath): i -= 1 else: newtrace[i] = prior del newtrace[i+1] i += 1 check_if_trace_crashes(newtrace, outpath) if __name__ == '__main__': if len(sys.argv) < 3: usage() QEMU_PATH = os.getenv("QEMU_PATH") QEMU_ARGS = os.getenv("QEMU_ARGS") if QEMU_PATH is None or QEMU_ARGS is None: usage() # if "accel" not in QEMU_ARGS: # QEMU_ARGS += " -accel qtest" CRASH_TOKEN = os.getenv("CRASH_TOKEN") QEMU_ARGS += " -qtest stdio -monitor none -serial none " minimize_trace(sys.argv[1], sys.argv[2])