""" Test lldb Python event APIs. """ import re import lldb from lldbsuite.test.decorators import * from lldbsuite.test.lldbtest import * from lldbsuite.test import lldbutil @skipIfLinux # llvm.org/pr25924, sometimes generating SIGSEGV class EventAPITestCase(TestBase): NO_DEBUG_INFO_TESTCASE = True def setUp(self): # Call super's setUp(). TestBase.setUp(self) # Find the line number to of function 'c'. self.line = line_number( "main.c", '// Find the line number of function "c" here.' ) @expectedFailureAll( oslist=["linux"], bugnumber="llvm.org/pr23730 Flaky, fails ~1/10 cases" ) @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 @skipIfNetBSD def test_listen_for_and_print_event(self): """Exercise SBEvent API.""" self.build() exe = self.getBuildArtifact("a.out") self.dbg.SetAsync(True) # Create a target by the debugger. target = self.dbg.CreateTarget(exe) self.assertTrue(target, VALID_TARGET) # Now create a breakpoint on main.c by name 'c'. breakpoint = target.BreakpointCreateByName("c", "a.out") listener = lldb.SBListener("my listener") # Now launch the process, and do not stop at the entry point. error = lldb.SBError() flags = target.GetLaunchInfo().GetLaunchFlags() process = target.Launch( listener, None, # argv None, # envp None, # stdin_path None, # stdout_path None, # stderr_path None, # working directory flags, # launch flags False, # Stop at entry error, ) # error self.assertEqual(process.GetState(), lldb.eStateStopped, PROCESS_STOPPED) # Create an empty event object. event = lldb.SBEvent() traceOn = self.TraceOn() if traceOn: lldbutil.print_stacktraces(process) # Create MyListeningThread class to wait for any kind of event. import threading class MyListeningThread(threading.Thread): def run(self): count = 0 # Let's only try at most 4 times to retrieve any kind of event. # After that, the thread exits. while not count > 3: if traceOn: print("Try wait for event...") if listener.WaitForEvent(5, event): if traceOn: desc = lldbutil.get_description(event) print("Event description:", desc) print("Event data flavor:", event.GetDataFlavor()) print( "Process state:", lldbutil.state_type_to_str(process.GetState()), ) print() else: if traceOn: print("timeout occurred waiting for event...") count = count + 1 listener.Clear() return # Let's start the listening thread to retrieve the events. my_thread = MyListeningThread() my_thread.start() # Use Python API to continue the process. The listening thread should be # able to receive the state changed events. process.Continue() # Use Python API to kill the process. The listening thread should be # able to receive the state changed event, too. process.Kill() # Wait until the 'MyListeningThread' terminates. my_thread.join() # Shouldn't we be testing against some kind of expectation here? @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 @skipIfNetBSD def test_wait_for_event(self): """Exercise SBListener.WaitForEvent() API.""" self.build() exe = self.getBuildArtifact("a.out") self.dbg.SetAsync(True) # Create a target by the debugger. target = self.dbg.CreateTarget(exe) self.assertTrue(target, VALID_TARGET) # Now create a breakpoint on main.c by name 'c'. breakpoint = target.BreakpointCreateByName("c", "a.out") self.trace("breakpoint:", breakpoint) self.assertTrue( breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT ) # Get the debugger listener. listener = self.dbg.GetListener() # Now launch the process, and do not stop at entry point. error = lldb.SBError() flags = target.GetLaunchInfo().GetLaunchFlags() process = target.Launch( listener, None, # argv None, # envp None, # stdin_path None, # stdout_path None, # stderr_path None, # working directory flags, # launch flags False, # Stop at entry error, ) # error self.assertTrue(error.Success() and process, PROCESS_IS_VALID) # Create an empty event object. event = lldb.SBEvent() self.assertFalse(event, "Event should not be valid initially") # Create MyListeningThread to wait for any kind of event. import threading class MyListeningThread(threading.Thread): def run(self): count = 0 # Let's only try at most 3 times to retrieve any kind of event. while not count > 3: if listener.WaitForEvent(5, event): self.context.trace("Got a valid event:", event) self.context.trace("Event data flavor:", event.GetDataFlavor()) self.context.trace( "Event type:", lldbutil.state_type_to_str(event.GetType()) ) listener.Clear() return count = count + 1 print("Timeout: listener.WaitForEvent") listener.Clear() return # Use Python API to kill the process. The listening thread should be # able to receive a state changed event. process.Kill() # Let's start the listening thread to retrieve the event. my_thread = MyListeningThread() my_thread.context = self my_thread.start() # Wait until the 'MyListeningThread' terminates. my_thread.join() self.assertTrue(event, "My listening thread successfully received an event") @expectedFailureAll( oslist=["linux"], bugnumber="llvm.org/pr23617 Flaky, fails ~1/10 cases" ) @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 @expectedFailureNetBSD def test_add_listener_to_broadcaster(self): """Exercise some SBBroadcaster APIs.""" self.build() exe = self.getBuildArtifact("a.out") self.dbg.SetAsync(True) # Create a target by the debugger. target = self.dbg.CreateTarget(exe) self.assertTrue(target, VALID_TARGET) # Now create a breakpoint on main.c by name 'c'. breakpoint = target.BreakpointCreateByName("c", "a.out") self.trace("breakpoint:", breakpoint) self.assertTrue( breakpoint and breakpoint.GetNumLocations() == 1, VALID_BREAKPOINT ) listener = lldb.SBListener("my listener") # Now launch the process, and do not stop at the entry point. error = lldb.SBError() flags = target.GetLaunchInfo().GetLaunchFlags() process = target.Launch( listener, None, # argv None, # envp None, # stdin_path None, # stdout_path None, # stderr_path None, # working directory flags, # launch flags False, # Stop at entry error, ) # error # Create an empty event object. event = lldb.SBEvent() self.assertFalse(event, "Event should not be valid initially") # The finite state machine for our custom listening thread, with an # initial state of None, which means no event has been received. # It changes to 'connected' after 'connected' event is received (for remote platforms) # It changes to 'running' after 'running' event is received (should happen only if the # currentstate is either 'None' or 'connected') # It changes to 'stopped' if a 'stopped' event is received (should happen only if the # current state is 'running'.) self.state = None # Create MyListeningThread to wait for state changed events. # By design, a "running" event is expected following by a "stopped" # event. import threading class MyListeningThread(threading.Thread): def run(self): self.context.trace("Running MyListeningThread:", self) # Regular expression pattern for the event description. pattern = re.compile("data = {.*, state = (.*)}$") # Let's only try at most 6 times to retrieve our events. count = 0 while True: if listener.WaitForEvent(5, event): desc = lldbutil.get_description(event) self.context.trace("Event description:", desc) match = pattern.search(desc) if not match: break if match.group(1) == "connected": # When debugging remote targets with lldb-server, we # first get the 'connected' event. self.context.assertTrue(self.context.state is None) self.context.state = "connected" continue elif match.group(1) == "running": self.context.assertTrue( self.context.state is None or self.context.state == "connected" ) self.context.state = "running" continue elif match.group(1) == "stopped": self.context.assertTrue(self.context.state == "running") # Whoopee, both events have been received! self.context.state = "stopped" break else: break print("Timeout: listener.WaitForEvent") count = count + 1 if count > 6: break listener.Clear() return # Use Python API to continue the process. The listening thread should be # able to receive the state changed events. process.Continue() # Start the listening thread to receive the "running" followed by the # "stopped" events. my_thread = MyListeningThread() # Supply the enclosing context so that our listening thread can access # the 'state' variable. my_thread.context = self my_thread.start() # Wait until the 'MyListeningThread' terminates. my_thread.join() # The final judgement. :-) self.assertEqual( self.state, "stopped", "Both expected state changed events received" ) def wait_for_next_event(self, expected_state, test_shadow=False): """Wait for an event from self.primary & self.shadow listener. If test_shadow is true, we also check that the shadow listener only receives events AFTER the primary listener does.""" # Waiting on the shadow listener shouldn't have events yet because # we haven't fetched them for the primary listener yet: event = lldb.SBEvent() if test_shadow: success = self.shadow_listener.WaitForEvent(1, event) self.assertFalse(success, "Shadow listener doesn't pull events") # But there should be an event for the primary listener: success = self.primary_listener.WaitForEvent(5, event) self.assertTrue(success, "Primary listener got the event") state = lldb.SBProcess.GetStateFromEvent(event) restart = False if state == lldb.eStateStopped: restart = lldb.SBProcess.GetRestartedFromEvent(event) if expected_state != None: self.assertEqual( state, expected_state, "Primary thread got the correct event" ) # And after pulling that one there should be an equivalent event for the shadow # listener: success = self.shadow_listener.WaitForEvent(5, event) self.assertTrue(success, "Shadow listener got event too") self.assertEqual( state, lldb.SBProcess.GetStateFromEvent(event), "It was the same event" ) self.assertEqual( restart, lldb.SBProcess.GetRestartedFromEvent(event), "It was the same restarted", ) return state, restart @expectedFlakeyLinux("llvm.org/pr23730") # Flaky, fails ~1/100 cases @skipIfWindows # This is flakey on Windows AND when it fails, it hangs: llvm.org/pr38373 @skipIfNetBSD def test_shadow_listener(self): self.build() exe = self.getBuildArtifact("a.out") # Create a target by the debugger. target = self.dbg.CreateTarget(exe) self.assertTrue(target, VALID_TARGET) # Now create a breakpoint on main.c by name 'c'. bkpt1 = target.BreakpointCreateByName("c", "a.out") self.trace("breakpoint:", bkpt1) self.assertTrue(bkpt1.GetNumLocations() == 1, VALID_BREAKPOINT) self.primary_listener = lldb.SBListener("my listener") self.shadow_listener = lldb.SBListener("shadow listener") self.cur_thread = None error = lldb.SBError() launch_info = target.GetLaunchInfo() launch_info.SetListener(self.primary_listener) launch_info.SetShadowListener(self.shadow_listener) self.runCmd( "settings set target.process.extra-startup-command QSetLogging:bitmask=LOG_PROCESS|LOG_EXCEPTIONS|LOG_RNB_PACKETS|LOG_STEP;" ) self.dbg.SetAsync(True) self.process = target.Launch(launch_info, error) self.assertSuccess(error, "Process launched successfully") # Keep fetching events from the primary to trigger the do on removal and # then from the shadow listener, and make sure they match: # Events in the launch sequence might be platform dependent, so don't # expect any particular event till we get the stopped: state = lldb.eStateInvalid while state != lldb.eStateStopped: state, restart = self.wait_for_next_event(None, False) # Okay, we're now at a good stop, so try a next: self.cur_thread = self.process.threads[0] # Make sure we're at our expected breakpoint: self.assertTrue(self.cur_thread.IsValid(), "Got a zeroth thread") self.assertEqual(self.cur_thread.stop_reason, lldb.eStopReasonBreakpoint) self.assertEqual( self.cur_thread.GetStopReasonDataCount(), 2, "Only one breakpoint/loc here" ) self.assertEqual( bkpt1.GetID(), self.cur_thread.GetStopReasonDataAtIndex(0), "Hit the right breakpoint", ) # Disable the first breakpoint so it doesn't get in the way... bkpt1.SetEnabled(False) self.cur_thread.StepOver() # We'll run the test for "shadow listener blocked by primary listener # for the first couple rounds, then we'll skip the 1 second pause... self.wait_for_next_event(lldb.eStateRunning, True) self.wait_for_next_event(lldb.eStateStopped, True) # Next try an auto-continue breakpoint and make sure the shadow listener got # the resumed info as well. Note that I'm not explicitly counting # running events here. At the point when I wrote this lldb sometimes # emits two running events in a row. Apparently the code to coalesce running # events isn't working. But that's not what this test is testing, we're really # testing that the primary & shadow listeners hear the same thing and in the # right order. main_spec = lldb.SBFileSpec("main.c") bkpt2 = target.BreakpointCreateBySourceRegex("b.2. returns %d", main_spec) self.assertTrue(bkpt2.GetNumLocations() > 0, "BP2 worked") bkpt2.SetAutoContinue(True) bkpt3 = target.BreakpointCreateBySourceRegex("a.3. returns %d", main_spec) self.assertTrue(bkpt3.GetNumLocations() > 0, "BP3 worked") state = lldb.eStateStopped restarted = False # Put in a counter to make sure we don't spin forever if there is some # error in the logic. counter = 0 while state != lldb.eStateExited: counter += 1 self.assertLess( counter, 50, "Took more than 50 events to hit two breakpoints." ) if state == lldb.eStateStopped and not restarted: self.process.Continue() state, restarted = self.wait_for_next_event(None, False)