Med Ismail Bennani 6cf668016e [lldb] Add an example of interactive scripted process debugging
This patch is a proof of concept that shows how a scripted process could
be used with real process to perform interactive debugging.

In this example, we run a process that spawns 10 threads.
That process gets launched by an intermediary scripted process who's job
is to intercept all of it's process events and dispatching them
back either to the real process or to other child scripted processes.

In this example, we have 2 child scripted processes, with even and odd
thread indices. The goal is to be able to do thread filtering and
explore the various interactive debugging approaches, by letting a child
process running when stopping the other process and inspecting it.
Another approach would be to have the child processes execution in-sync
to force running every child process when one of them starts running.

Differential Revision: https://reviews.llvm.org/D145297

Signed-off-by: Med Ismail Bennani <medismail.bennani@gmail.com>
2023-04-25 15:02:34 -07:00

123 lines
5.2 KiB
Python

"""
Test the functionality of interactive scripted processes
"""
import lldb
import lldbsuite.test.lldbutil as lldbutil
from lldbsuite.test.lldbtest import *
import json, os
class TestInteractiveScriptedProcess(TestBase):
NO_DEBUG_INFO_TESTCASE = True
def test_passthrough_launch(self):
"""Test a simple pass-through process launch"""
self.build()
self.runCmd("file " + self.getBuildArtifact("a.out"), CURRENT_EXECUTABLE_SET)
self.main_source_file = lldb.SBFileSpec("main.cpp")
self.script_module = "interactive_scripted_process"
self.script_file = self.script_module + ".py"
self.passthrough_launch()
def duplicate_target(self, driving_target):
exe = driving_target.executable.fullpath
triple = driving_target.triple
return self.dbg.CreateTargetWithFileAndTargetTriple(exe, triple)
def get_launch_info(self, class_name, script_dict):
structured_data = lldb.SBStructuredData()
structured_data.SetFromJSON(json.dumps(script_dict))
launch_info = lldb.SBLaunchInfo(None)
launch_info.SetProcessPluginName("ScriptedProcess")
launch_info.SetScriptedProcessClassName(class_name)
launch_info.SetScriptedProcessDictionary(structured_data)
return launch_info
def passthrough_launch(self):
"""Test that a simple passthrough wrapper functions correctly"""
# First build the real target:
self.assertEqual(self.dbg.GetNumTargets(), 1)
real_target_id = 0
real_target = self.dbg.GetTargetAtIndex(real_target_id)
lldbutil.run_break_set_by_source_regexp(self, "Break here")
self.assertEqual(real_target.GetNumBreakpoints(), 1)
# Now source in the scripted module:
script_path = os.path.join(self.getSourceDir(), self.script_file)
self.runCmd(f"command script import '{script_path}'")
mux_target = self.duplicate_target(real_target)
self.assertTrue(mux_target.IsValid(), "duplicate target succeeded")
mux_class = f"{self.script_module}.MultiplexerScriptedProcess"
script_dict = {"driving_target_idx": real_target_id}
mux_launch_info = self.get_launch_info(mux_class, script_dict)
mux_process_listener = lldb.SBListener(
"lldb.test.interactive-scripted-process.listener"
)
mux_launch_info.SetPassthroughListener(mux_process_listener)
self.dbg.SetAsync(True)
error = lldb.SBError()
mux_process = mux_target.Launch(mux_launch_info, error)
self.assertSuccess(error, "Launched multiplexer scripted process")
self.assertTrue(mux_process.IsValid(), "Got a valid process")
# Check that the mux process started running
event = lldbutil.fetch_next_event(
self, mux_process_listener, mux_process.GetBroadcaster(), timeout=60 * 5
)
self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
# Check that the real process started running
event = lldbutil.fetch_next_event(
self, self.dbg.GetListener(), mux_process.GetBroadcaster()
)
self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
# Check that the real process stopped
event = lldbutil.fetch_next_event(
self, self.dbg.GetListener(), mux_process.GetBroadcaster(), timeout=60 * 5
)
self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
# Check that the mux process stopped
event = lldbutil.fetch_next_event(
self, mux_process_listener, mux_process.GetBroadcaster(), timeout=60 * 5
)
self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)
real_process = real_target.GetProcess()
self.assertTrue(real_process.IsValid(), "Got a valid process")
self.assertState(
real_process.GetState(), lldb.eStateStopped, "Process is stopped"
)
# This is a passthrough, so the two processes should have the same state:
# Check that we got the right threads:
self.assertEqual(
len(real_process.threads),
len(mux_process.threads),
"Same number of threads",
)
for id in range(len(real_process.threads)):
real_pc = real_process.threads[id].frame[0].pc
mux_pc = mux_process.threads[id].frame[0].pc
self.assertEqual(real_pc, mux_pc, f"PC's equal for {id}")
lldbutil.run_break_set_by_source_regexp(self, "also break here")
self.assertEqual(mux_target.GetNumBreakpoints(), 1)
error = mux_process.Continue()
self.assertSuccess(error, "Resuming multiplexer scripted process")
self.assertTrue(mux_process.IsValid(), "Got a valid process")
event = lldbutil.fetch_next_event(
self, mux_process_listener, mux_process.GetBroadcaster()
)
self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateRunning)
event = lldbutil.fetch_next_event(
self, mux_process_listener, mux_process.GetBroadcaster()
)
self.assertState(lldb.SBProcess.GetStateFromEvent(event), lldb.eStateStopped)