import random from lldbsuite.test.decorators import * from lldbsuite.test.lldbtest import * from fork_testbase import GdbRemoteForkTestBase class TestGdbRemoteFork(GdbRemoteForkTestBase): def setUp(self): GdbRemoteForkTestBase.setUp(self) if self.getPlatform() == "linux" and self.getArchitecture() in [ "arm", "aarch64", ]: self.skipTest("Unsupported for Arm/AArch64 Linux") @add_test_categories(["fork"]) def test_fork_multithreaded(self): _, _, child_pid, _ = self.start_fork_test(["thread:new"] * 2 + ["fork"]) # detach the forked child self.test_sequence.add_log_lines( [ "read packet: $D;{}#00".format(child_pid), "send packet: $OK#00", "read packet: $k#00", ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_fork(self): parent_pid, _ = self.fork_and_detach_test("fork") # resume the parent self.test_sequence.add_log_lines( [ "read packet: $c#00", "send packet: $W00;process:{}#00".format(parent_pid), ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_vfork(self): parent_pid, parent_tid = self.fork_and_detach_test("vfork") # resume the parent self.test_sequence.add_log_lines( [ "read packet: $c#00", { "direction": "send", "regex": r"[$]T[0-9a-fA-F]{{2}}thread:p{}[.]{}.*vforkdone.*".format( parent_pid, parent_tid ), }, "read packet: $c#00", "send packet: $W00;process:{}#00".format(parent_pid), ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_fork_follow(self): self.fork_and_follow_test("fork") @add_test_categories(["fork"]) def test_vfork_follow(self): self.fork_and_follow_test("vfork") @add_test_categories(["fork"]) def test_select_wrong_pid(self): self.build() self.prep_debug_monitor_and_inferior() self.add_qSupported_packets(["multiprocess+"]) ret = self.expect_gdbremote_sequence() self.assertIn("multiprocess+", ret["qSupported_response"]) self.reset_test_sequence() # get process pid self.test_sequence.add_log_lines( [ "read packet: $qC#00", { "direction": "send", "regex": "[$]QCp([0-9a-f]+).([0-9a-f]+)#.*", "capture": {1: "pid", 2: "tid"}, }, ], True, ) ret = self.expect_gdbremote_sequence() pid, tid = (int(ret[x], 16) for x in ("pid", "tid")) self.reset_test_sequence() self.test_sequence.add_log_lines( [ # try switching to correct pid "read packet: $Hgp{:x}.{:x}#00".format(pid, tid), "send packet: $OK#00", "read packet: $Hcp{:x}.{:x}#00".format(pid, tid), "send packet: $OK#00", # try switching to invalid tid "read packet: $Hgp{:x}.{:x}#00".format(pid, tid + 1), "send packet: $E15#00", "read packet: $Hcp{:x}.{:x}#00".format(pid, tid + 1), "send packet: $E15#00", # try switching to invalid pid "read packet: $Hgp{:x}.{:x}#00".format(pid + 1, tid), "send packet: $Eff#00", "read packet: $Hcp{:x}.{:x}#00".format(pid + 1, tid), "send packet: $Eff#00", ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_detach_current(self): self.build() self.prep_debug_monitor_and_inferior() self.add_qSupported_packets(["multiprocess+"]) ret = self.expect_gdbremote_sequence() self.assertIn("multiprocess+", ret["qSupported_response"]) self.reset_test_sequence() # get process pid self.test_sequence.add_log_lines( [ "read packet: $qC#00", { "direction": "send", "regex": "[$]QCp([0-9a-f]+).[0-9a-f]+#.*", "capture": {1: "pid"}, }, ], True, ) ret = self.expect_gdbremote_sequence() pid = ret["pid"] self.reset_test_sequence() # detach the process self.test_sequence.add_log_lines( [ "read packet: $D;{}#00".format(pid), "send packet: $OK#00", "read packet: $qC#00", "send packet: $E44#00", ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_detach_all(self): self.detach_all_test() @add_test_categories(["fork"]) def test_kill_all(self): parent_pid, _, child_pid, _ = self.start_fork_test(["fork"]) exit_regex = "[$]X09;process:([0-9a-f]+)#.*" self.test_sequence.add_log_lines( [ # kill all processes "read packet: $k#00", {"direction": "send", "regex": exit_regex, "capture": {1: "pid1"}}, {"direction": "send", "regex": exit_regex, "capture": {1: "pid2"}}, ], True, ) ret = self.expect_gdbremote_sequence() self.assertEqual(set([ret["pid1"], ret["pid2"]]), set([parent_pid, child_pid])) @add_test_categories(["fork"]) def test_vkill_child(self): self.vkill_test(kill_child=True) @add_test_categories(["fork"]) def test_vkill_parent(self): self.vkill_test(kill_parent=True) @add_test_categories(["fork"]) def test_vkill_both(self): self.vkill_test(kill_parent=True, kill_child=True) @add_test_categories(["fork"]) def test_c_parent(self): self.resume_one_test(run_order=["parent", "parent"]) @add_test_categories(["fork"]) def test_c_child(self): self.resume_one_test(run_order=["child", "child"]) @add_test_categories(["fork"]) def test_c_parent_then_child(self): self.resume_one_test(run_order=["parent", "parent", "child", "child"]) @add_test_categories(["fork"]) def test_c_child_then_parent(self): self.resume_one_test(run_order=["child", "child", "parent", "parent"]) @add_test_categories(["fork"]) def test_c_interspersed(self): self.resume_one_test(run_order=["parent", "child", "parent", "child"]) @add_test_categories(["fork"]) def test_vCont_parent(self): self.resume_one_test(run_order=["parent", "parent"], use_vCont=True) @add_test_categories(["fork"]) def test_vCont_child(self): self.resume_one_test(run_order=["child", "child"], use_vCont=True) @add_test_categories(["fork"]) def test_vCont_parent_then_child(self): self.resume_one_test( run_order=["parent", "parent", "child", "child"], use_vCont=True ) @add_test_categories(["fork"]) def test_vCont_child_then_parent(self): self.resume_one_test( run_order=["child", "child", "parent", "parent"], use_vCont=True ) @add_test_categories(["fork"]) def test_vCont_interspersed(self): self.resume_one_test( run_order=["parent", "child", "parent", "child"], use_vCont=True ) @add_test_categories(["fork"]) def test_vCont_two_processes(self): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( ["fork", "stop"] ) self.test_sequence.add_log_lines( [ # try to resume both processes "read packet: $vCont;c:p{}.{};c:p{}.{}#00".format( parent_pid, parent_tid, child_pid, child_tid ), "send packet: $E03#00", ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_vCont_all_processes_explicit(self): self.start_fork_test(["fork", "stop"]) self.test_sequence.add_log_lines( [ # try to resume all processes implicitly "read packet: $vCont;c:p-1.-1#00", "send packet: $E03#00", ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_vCont_all_processes_implicit(self): self.start_fork_test(["fork", "stop"]) self.test_sequence.add_log_lines( [ # try to resume all processes implicitly "read packet: $vCont;c#00", "send packet: $E03#00", ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_threadinfo(self): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( ["fork", "thread:new", "stop"] ) pidtids = [ (parent_pid, parent_tid), (child_pid, child_tid), ] self.add_threadinfo_collection_packets() ret = self.expect_gdbremote_sequence() prev_pidtids = set(self.parse_threadinfo_packets(ret)) self.assertEqual( prev_pidtids, frozenset((int(pid, 16), int(tid, 16)) for pid, tid in pidtids), ) self.reset_test_sequence() for pidtid in pidtids: self.test_sequence.add_log_lines( [ "read packet: $Hcp{}.{}#00".format(*pidtid), "send packet: $OK#00", "read packet: $c#00", { "direction": "send", "regex": self.stop_regex.format(*pidtid), }, ], True, ) self.add_threadinfo_collection_packets() ret = self.expect_gdbremote_sequence() self.reset_test_sequence() new_pidtids = set(self.parse_threadinfo_packets(ret)) added_pidtid = new_pidtids - prev_pidtids prev_pidtids = new_pidtids # verify that we've got exactly one new thread, and that # the PID matches self.assertEqual(len(added_pidtid), 1) self.assertEqual(added_pidtid.pop()[0], int(pidtid[0], 16)) for pidtid in new_pidtids: self.test_sequence.add_log_lines( [ "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), "send packet: $OK#00", ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_memory_read_write(self): self.build() INITIAL_DATA = "Initial message" self.prep_debug_monitor_and_inferior( inferior_args=[ "set-message:{}".format(INITIAL_DATA), "get-data-address-hex:g_message", "fork", "print-message:", "stop", ] ) self.add_qSupported_packets(["multiprocess+", "fork-events+"]) ret = self.expect_gdbremote_sequence() self.assertIn("fork-events+", ret["qSupported_response"]) self.reset_test_sequence() # continue and expect fork self.test_sequence.add_log_lines( [ "read packet: $c#00", { "type": "output_match", "regex": self.maybe_strict_output_regex( r"data address: 0x([0-9a-fA-F]+)\r\n" ), "capture": {1: "addr"}, }, { "direction": "send", "regex": self.fork_regex.format("fork"), "capture": self.fork_capture, }, ], True, ) ret = self.expect_gdbremote_sequence() pidtids = { "parent": (ret["parent_pid"], ret["parent_tid"]), "child": (ret["child_pid"], ret["child_tid"]), } addr = ret["addr"] self.reset_test_sequence() for name, pidtid in pidtids.items(): self.test_sequence.add_log_lines( [ "read packet: $Hgp{}.{}#00".format(*pidtid), "send packet: $OK#00", # read the current memory contents "read packet: $m{},{:x}#00".format(addr, len(INITIAL_DATA) + 1), { "direction": "send", "regex": r"^[$](.+)#.*$", "capture": {1: "data"}, }, # write a new value "read packet: $M{},{:x}:{}#00".format( addr, len(name) + 1, seven.hexlify(name + "\0") ), "send packet: $OK#00", # resume the process and wait for the trap "read packet: $Hcp{}.{}#00".format(*pidtid), "send packet: $OK#00", "read packet: $c#00", { "type": "output_match", "regex": self.maybe_strict_output_regex(r"message: (.*)\r\n"), "capture": {1: "printed_message"}, }, { "direction": "send", "regex": self.stop_regex.format(*pidtid), }, ], True, ) ret = self.expect_gdbremote_sequence() data = seven.unhexlify(ret["data"]) self.assertEqual(data, INITIAL_DATA + "\0") self.assertEqual(ret["printed_message"], name) self.reset_test_sequence() # we do the second round separately to make sure that initial data # is correctly preserved while writing into the first process for name, pidtid in pidtids.items(): self.test_sequence.add_log_lines( [ "read packet: $Hgp{}.{}#00".format(*pidtid), "send packet: $OK#00", # read the current memory contents "read packet: $m{},{:x}#00".format(addr, len(name) + 1), { "direction": "send", "regex": r"^[$](.+)#.*$", "capture": {1: "data"}, }, ], True, ) ret = self.expect_gdbremote_sequence() self.assertIsNotNone(ret.get("data")) data = seven.unhexlify(ret.get("data")) self.assertEqual(data, name + "\0") self.reset_test_sequence() @add_test_categories(["fork"]) def test_register_read_write(self): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( ["fork", "thread:new", "stop"] ) pidtids = [ (parent_pid, parent_tid), (child_pid, child_tid), ] for pidtid in pidtids: self.test_sequence.add_log_lines( [ "read packet: $Hcp{}.{}#00".format(*pidtid), "send packet: $OK#00", "read packet: $c#00", { "direction": "send", "regex": self.stop_regex.format(*pidtid), }, ], True, ) self.add_threadinfo_collection_packets() ret = self.expect_gdbremote_sequence() self.reset_test_sequence() pidtids = set(self.parse_threadinfo_packets(ret)) self.assertEqual(len(pidtids), 4) # first, save register values from all the threads thread_regs = {} for pidtid in pidtids: for regno in range(256): self.test_sequence.add_log_lines( [ "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), "send packet: $OK#00", "read packet: $p{:x}#00".format(regno), { "direction": "send", "regex": r"^[$](.+)#.*$", "capture": {1: "data"}, }, ], True, ) ret = self.expect_gdbremote_sequence() data = ret.get("data") self.assertIsNotNone(data) # ignore registers shorter than 32 bits (this also catches # "Exx" errors) if len(data) >= 8: break else: self.skipTest("no usable register found") thread_regs[pidtid] = (regno, data) vals = set(x[1] for x in thread_regs.values()) # NB: cheap hack to make the loop below easier new_val = next(iter(vals)) # then, start altering them and verify that we don't unexpectedly # change the value from another thread for pidtid in pidtids: old_val = thread_regs[pidtid] regno = old_val[0] old_val_length = len(old_val[1]) # generate a unique new_val while new_val in vals: new_val = "{{:0{}x}}".format(old_val_length).format( random.getrandbits(old_val_length * 4) ) vals.add(new_val) self.test_sequence.add_log_lines( [ "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), "send packet: $OK#00", "read packet: $p{:x}#00".format(regno), { "direction": "send", "regex": r"^[$](.+)#.*$", "capture": {1: "data"}, }, "read packet: $P{:x}={}#00".format(regno, new_val), "send packet: $OK#00", ], True, ) ret = self.expect_gdbremote_sequence() data = ret.get("data") self.assertIsNotNone(data) self.assertEqual(data, old_val[1]) thread_regs[pidtid] = (regno, new_val) # finally, verify that new values took effect for pidtid in pidtids: old_val = thread_regs[pidtid] self.test_sequence.add_log_lines( [ "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), "send packet: $OK#00", "read packet: $p{:x}#00".format(old_val[0]), { "direction": "send", "regex": r"^[$](.+)#.*$", "capture": {1: "data"}, }, ], True, ) ret = self.expect_gdbremote_sequence() data = ret.get("data") self.assertIsNotNone(data) self.assertEqual(data, old_val[1]) @add_test_categories(["fork"]) def test_qC(self): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( ["fork", "thread:new", "stop"] ) pidtids = [ (parent_pid, parent_tid), (child_pid, child_tid), ] for pidtid in pidtids: self.test_sequence.add_log_lines( [ "read packet: $Hcp{}.{}#00".format(*pidtid), "send packet: $OK#00", "read packet: $c#00", { "direction": "send", "regex": self.stop_regex.format(*pidtid), }, ], True, ) self.add_threadinfo_collection_packets() ret = self.expect_gdbremote_sequence() self.reset_test_sequence() pidtids = set(self.parse_threadinfo_packets(ret)) self.assertEqual(len(pidtids), 4) for pidtid in pidtids: self.test_sequence.add_log_lines( [ "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), "send packet: $OK#00", "read packet: $qC#00", "send packet: $QCp{:x}.{:x}#00".format(*pidtid), ], True, ) self.expect_gdbremote_sequence() @add_test_categories(["fork"]) def test_T(self): parent_pid, parent_tid, child_pid, child_tid = self.start_fork_test( ["fork", "thread:new", "stop"] ) pidtids = [ (parent_pid, parent_tid), (child_pid, child_tid), ] for pidtid in pidtids: self.test_sequence.add_log_lines( [ "read packet: $Hcp{}.{}#00".format(*pidtid), "send packet: $OK#00", "read packet: $c#00", { "direction": "send", "regex": self.stop_regex.format(*pidtid), }, ], True, ) self.add_threadinfo_collection_packets() ret = self.expect_gdbremote_sequence() self.reset_test_sequence() pidtids = set(self.parse_threadinfo_packets(ret)) self.assertEqual(len(pidtids), 4) max_pid = max(pid for pid, tid in pidtids) max_tid = max(tid for pid, tid in pidtids) bad_pidtids = ( (max_pid, max_tid + 1, "E02"), (max_pid + 1, max_tid, "E01"), (max_pid + 1, max_tid + 1, "E01"), ) for pidtid in pidtids: self.test_sequence.add_log_lines( [ # test explicit PID+TID "read packet: $Tp{:x}.{:x}#00".format(*pidtid), "send packet: $OK#00", # test implicit PID via Hg "read packet: $Hgp{:x}.{:x}#00".format(*pidtid), "send packet: $OK#00", "read packet: $T{:x}#00".format(max_tid + 1), "send packet: $E02#00", "read packet: $T{:x}#00".format(pidtid[1]), "send packet: $OK#00", ], True, ) for pid, tid, expected in bad_pidtids: self.test_sequence.add_log_lines( [ "read packet: $Tp{:x}.{:x}#00".format(pid, tid), "send packet: ${}#00".format(expected), ], True, ) self.expect_gdbremote_sequence()