# DExTer : Debugging Experience Tester # ~~~~~~ ~ ~~ ~ ~~ # # Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. # See https://llvm.org/LICENSE.txt for license information. # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception """Conditional Controller Class for DExTer.-""" import os import time from collections import defaultdict from itertools import chain from dex.debugger.DebuggerControllers.ControllerHelpers import ( in_source_file, update_step_watches, ) from dex.debugger.DebuggerControllers.DebuggerControllerBase import ( DebuggerControllerBase, ) from dex.debugger.DebuggerBase import DebuggerBase from dex.utils.Exceptions import DebuggerException from dex.utils.Timeout import Timeout class BreakpointRange: """A range of breakpoints and a set of conditions. The leading breakpoint (on line `range_from`) is always active. When the leading breakpoint is hit the trailing range should be activated when `expression` evaluates to any value in `values`. If there are no conditions (`expression` is None) then the trailing breakpoint range should always be activated upon hitting the leading breakpoint. Args: expression: None for no conditions, or a str expression to compare against `values`. hit_count: None for no limit, or int to set the number of times the leading breakpoint is triggered before it is removed. """ def __init__( self, expression: str, path: str, range_from: int, range_to: int, values: list, hit_count: int, finish_on_remove: bool, ): self.expression = expression self.path = path self.range_from = range_from self.range_to = range_to self.conditional_values = values self.max_hit_count = hit_count self.current_hit_count = 0 self.finish_on_remove = finish_on_remove def has_conditions(self): return self.expression != None def get_conditional_expression_list(self): conditional_list = [] for value in self.conditional_values: # () == () conditional_expression = "({}) == ({})".format(self.expression, value) conditional_list.append(conditional_expression) return conditional_list def add_hit(self): self.current_hit_count += 1 def should_be_removed(self): if self.max_hit_count == None: return False return self.current_hit_count >= self.max_hit_count class ConditionalController(DebuggerControllerBase): def __init__(self, context, step_collection): self._bp_ranges = None self._watches = set() self._step_index = 0 self._pause_between_steps = context.options.pause_between_steps self._max_steps = context.options.max_steps # Map {id: BreakpointRange} self._leading_bp_handles = {} super(ConditionalController, self).__init__(context, step_collection) self._build_bp_ranges() def _build_bp_ranges(self): commands = self.step_collection.commands self._bp_ranges = [] try: limit_commands = commands["DexLimitSteps"] for lc in limit_commands: bpr = BreakpointRange( lc.expression, lc.path, lc.from_line, lc.to_line, lc.values, lc.hit_count, False, ) self._bp_ranges.append(bpr) except KeyError: raise DebuggerException( "Missing DexLimitSteps commands, cannot conditionally step." ) if "DexFinishTest" in commands: finish_commands = commands["DexFinishTest"] for ic in finish_commands: bpr = BreakpointRange( ic.expression, ic.path, ic.on_line, ic.on_line, ic.values, ic.hit_count + 1, True, ) self._bp_ranges.append(bpr) def _set_leading_bps(self): # Set a leading breakpoint for each BreakpointRange, building a # map of {leading bp id: BreakpointRange}. for bpr in self._bp_ranges: if bpr.has_conditions(): # Add a conditional breakpoint for each condition. for cond_expr in bpr.get_conditional_expression_list(): id = self.debugger.add_conditional_breakpoint( bpr.path, bpr.range_from, cond_expr ) self._leading_bp_handles[id] = bpr else: # Add an unconditional breakpoint. id = self.debugger.add_breakpoint(bpr.path, bpr.range_from) self._leading_bp_handles[id] = bpr def _run_debugger_custom(self, cmdline): # TODO: Add conditional and unconditional breakpoint support to dbgeng. if self.debugger.get_name() == "dbgeng": raise DebuggerException( "DexLimitSteps commands are not supported by dbgeng" ) self.step_collection.clear_steps() self._set_leading_bps() for command_obj in chain.from_iterable(self.step_collection.commands.values()): self._watches.update(command_obj.get_watches()) self.debugger.launch(cmdline) time.sleep(self._pause_between_steps) exit_desired = False timed_out = False total_timeout = Timeout(self.context.options.timeout_total) while not self.debugger.is_finished: breakpoint_timeout = Timeout(self.context.options.timeout_breakpoint) while self.debugger.is_running and not timed_out: # Check to see whether we've timed out while we're waiting. if total_timeout.timed_out(): self.context.logger.error( "Debugger session has been " f"running for {total_timeout.elapsed}s, timeout reached!" ) timed_out = True if breakpoint_timeout.timed_out(): self.context.logger.error( f"Debugger session has not " f"hit a breakpoint for {breakpoint_timeout.elapsed}s, timeout " "reached!" ) timed_out = True if timed_out: break step_info = self.debugger.get_step_info(self._watches, self._step_index) if step_info.current_frame: self._step_index += 1 update_step_watches( step_info, self._watches, self.step_collection.commands ) self.step_collection.new_step(self.context, step_info) bp_to_delete = [] for bp_id in self.debugger.get_triggered_breakpoint_ids(): try: # See if this is one of our leading breakpoints. bpr = self._leading_bp_handles[bp_id] except KeyError: # This is a trailing bp. Mark it for removal. bp_to_delete.append(bp_id) continue bpr.add_hit() if bpr.should_be_removed(): if bpr.finish_on_remove: exit_desired = True bp_to_delete.append(bp_id) del self._leading_bp_handles[bp_id] # Add a range of trailing breakpoints covering the lines # requested in the DexLimitSteps command. Ignore first line as # that's covered by the leading bp we just hit and include the # final line. for line in range(bpr.range_from + 1, bpr.range_to + 1): self.debugger.add_breakpoint(bpr.path, line) # Remove any trailing or expired leading breakpoints we just hit. self.debugger.delete_breakpoints(bp_to_delete) if exit_desired: break self.debugger.go() time.sleep(self._pause_between_steps)