Robert O'Callahan d5e1de6da9
[lldb] Implement basic support for reverse-continue (#99736)
This commit only adds support for the
`SBProcess::ReverseContinue()` API. A user-accessible command for this
will follow in a later commit.

This feature depends on a gdbserver implementation (e.g. `rr`) providing
support for the `bc` and `bs` packets. `lldb-server` does not support
those packets, and there is no plan to change that. So, for testing
purposes, `lldbreverse.py` wraps `lldb-server` with a Python
implementation of *very limited* record-and-replay functionality for use
by *tests only*.

The majority of this PR is test infrastructure (about 700 of the 950
lines added).
2024-10-10 13:01:47 -07:00

419 lines
16 KiB
Python

import os
import os.path
import lldb
from lldbsuite.test.lldbtest import *
from lldbsuite.test.gdbclientutils import *
from lldbsuite.test.lldbgdbproxy import *
import lldbgdbserverutils
import re
class ThreadSnapshot:
def __init__(self, thread_id, registers):
self.thread_id = thread_id
self.registers = registers
class MemoryBlockSnapshot:
def __init__(self, address, data):
self.address = address
self.data = data
class StateSnapshot:
def __init__(self, thread_snapshots, memory):
self.thread_snapshots = thread_snapshots
self.memory = memory
self.thread_id = None
class RegisterInfo:
def __init__(self, lldb_index, bitsize, little_endian):
self.lldb_index = lldb_index
self.bitsize = bitsize
self.little_endian = little_endian
BELOW_STACK_POINTER = 16384
ABOVE_STACK_POINTER = 4096
BLOCK_SIZE = 1024
SOFTWARE_BREAKPOINTS = 0
HARDWARE_BREAKPOINTS = 1
WRITE_WATCHPOINTS = 2
class ReverseTestBase(GDBProxyTestBase):
"""
Base class for tests that need reverse execution.
This class uses a gdbserver proxy to add very limited reverse-
execution capability to lldb-server/debugserver for testing
purposes only.
To use this class, run the inferior forward until some stopping point.
Then call `start_recording()` and execute forward again until reaching
a software breakpoint; this class records the state before each execution executes.
At that point, the server will accept "bc" and "bs" packets to step
backwards through the state.
When executing during recording, we only allow single-step and continue without
delivering a signal, and only software breakpoint stops are allowed.
We assume that while recording is enabled, the only effects of instructions
are on general-purpose registers (read/written by the 'g' and 'G' packets)
and on memory bytes between [SP - BELOW_STACK_POINTER, SP + ABOVE_STACK_POINTER).
"""
"""
A list of StateSnapshots in time order.
There is one snapshot per single-stepped instruction,
representing the state before that instruction was
executed. The last snapshot in the list is the
snapshot before the last instruction was executed.
This is an undo log; we snapshot a superset of the state that may have
been changed by the instruction's execution.
"""
snapshots = None
recording_enabled = False
breakpoints = None
pid = None
pc_register_info = None
sp_register_info = None
general_purpose_register_info = None
def __init__(self, *args, **kwargs):
GDBProxyTestBase.__init__(self, *args, **kwargs)
self.breakpoints = [set(), set(), set(), set(), set()]
def respond(self, packet):
if not packet:
raise ValueError("Invalid empty packet")
if packet == self.server.PACKET_INTERRUPT:
# Don't send a response. We'll just run to completion.
return []
if self.is_command(packet, "qSupported", ":"):
reply = self.pass_through(packet)
return reply + ";ReverseStep+;ReverseContinue+"
if self.is_command(packet, "vCont", ";"):
if self.recording_enabled:
return self.continue_with_recording(packet)
snapshots = []
if packet[0] == "c" or packet[0] == "s" or packet[0] == "C" or packet[0] == "S":
raise ValueError("LLDB should not be sending old-style continuation packets")
if packet == "bc":
return self.reverse_continue()
if packet == "bs":
return self.reverse_step()
if packet == 'jThreadsInfo':
# Suppress this because it contains thread stop reasons which we might
# need to modify, and we don't want to have to implement that.
return ""
if packet[0] == "z" or packet[0] == "Z":
reply = self.pass_through(packet)
if reply == "OK":
self.update_breakpoints(packet)
return reply
return GDBProxyTestBase.respond(self, packet)
def start_recording(self):
self.recording_enabled = True
self.snapshots = []
def stop_recording(self):
"""
Don't record when executing foward.
Reverse execution is still supported until the next forward continue.
"""
self.recording_enabled = False
def is_command(self, packet, cmd, follow_token):
return packet == cmd or packet[0:len(cmd) + 1] == cmd + follow_token
def update_breakpoints(self, packet):
m = re.match("([zZ])([01234]),([0-9a-f]+),([0-9a-f]+)", packet)
if m is None:
raise ValueError("Invalid breakpoint packet: " + packet)
t = int(m.group(2))
addr = int(m.group(3), 16)
kind = int(m.group(4), 16)
if m.group(1) == 'Z':
self.breakpoints[t].add((addr, kind))
else:
self.breakpoints[t].discard((addr, kind))
def breakpoint_triggered_at(self, pc):
if any(addr == pc for addr, kind in self.breakpoints[SOFTWARE_BREAKPOINTS]):
return True
if any(addr == pc for addr, kind in self.breakpoints[HARDWARE_BREAKPOINTS]):
return True
return False
def watchpoint_triggered(self, new_value_block, current_contents):
"""Returns the address or None."""
for watch_addr, kind in breakpoints[WRITE_WATCHPOINTS]:
for offset in range(0, kind):
addr = watch_addr + offset
if (addr >= new_value_block.address and
addr < new_value_block.address + len(new_value_block.data)):
index = addr - new_value_block.address
if new_value_block.data[index*2:(index + 1)*2] != current_contents[index*2:(index + 1)*2]:
return watch_addr
return None
def continue_with_recording(self, packet):
self.logger.debug("Continue with recording enabled")
step_packet = "vCont;s"
if packet == "vCont":
requested_step = False
else:
m = re.match("vCont;(c|s)(.*)", packet)
if m is None:
raise ValueError("Unsupported vCont packet: " + packet)
requested_step = m.group(1) == 's'
step_packet += m.group(2)
while True:
snapshot = self.capture_snapshot()
reply = self.pass_through(step_packet)
(stop_signal, stop_pairs) = self.parse_stop(reply)
if stop_signal != 5:
raise ValueError("Unexpected stop signal: " + reply)
is_swbreak = False
thread_id = None
for key, value in stop_pairs.items():
if key == "thread":
thread_id = self.parse_thread_id(value)
continue
if re.match('[0-9a-f]+', key):
continue
if key == "swbreak" or (key == "reason" and value == "breakpoint"):
is_swbreak = True
continue
if key in ["name", "threads", "thread-pcs", "reason"]:
continue
raise ValueError(f"Unknown stop key '{key}' in {reply}")
if is_swbreak:
self.logger.debug("Recording stopped")
return reply
if thread_id is None:
return ValueError("Expected thread ID: " + reply)
snapshot.thread_id = thread_id
self.snapshots.append(snapshot)
if requested_step:
self.logger.debug("Recording stopped for step")
return reply
def parse_stop(self, reply):
result = {}
if not reply:
raise ValueError("Invalid empty packet")
if reply[0] == "T" and len(reply) >= 3:
result = {k:v for k, v in self.parse_pairs(reply[3:])}
return (int(reply[1:3], 16), result)
raise "Unsupported stop reply: " + reply
def parse_pairs(self, text):
for pair in text.split(";"):
if not pair:
continue
m = re.match("([^:]+):(.*)", pair)
if m is None:
raise ValueError("Invalid pair text: " + text)
yield (m.group(1), m.group(2))
def capture_snapshot(self):
"""Snapshot all threads and their stack memories."""
self.ensure_register_info()
current_thread = self.get_current_thread()
thread_snapshots = []
memory = []
for thread_id in self.get_thread_list():
registers = {}
for index in sorted(self.general_purpose_register_info.keys()):
reply = self.pass_through(f"p{index:x};thread:{thread_id:x};")
if reply == "" or reply[0] == 'E':
raise ValueError("Can't read register")
registers[index] = reply
thread_snapshot = ThreadSnapshot(thread_id, registers)
thread_sp = self.get_register(self.sp_register_info, thread_snapshot.registers)
memory += self.read_memory(thread_sp - BELOW_STACK_POINTER, thread_sp + ABOVE_STACK_POINTER)
thread_snapshots.append(thread_snapshot)
self.set_current_thread(current_thread)
return StateSnapshot(thread_snapshots, memory)
def restore_snapshot(self, snapshot):
"""
Restore the snapshot during reverse execution.
If this triggers a breakpoint or watchpoint, return the stop reply,
otherwise None.
"""
current_thread = self.get_current_thread()
stop_reasons = []
for thread_snapshot in snapshot.thread_snapshots:
thread_id = thread_snapshot.thread_id
for lldb_index in sorted(thread_snapshot.registers.keys()):
data = thread_snapshot.registers[lldb_index]
reply = self.pass_through(f"P{lldb_index:x}={data};thread:{thread_id:x};")
if reply != "OK":
raise ValueError("Can't restore thread register")
if thread_id == snapshot.thread_id:
new_pc = self.get_register(self.pc_register_info, thread_snapshot.registers)
if self.breakpoint_triggered_at(new_pc):
stop_reasons.append([("reason", "breakpoint")])
self.set_current_thread(current_thread)
for block in snapshot.memory:
current_memory = self.pass_through(f"m{block.address:x},{(len(block.data)/2):x}")
if not current_memory or current_memory[0] == 'E':
raise ValueError("Can't read back memory")
reply = self.pass_through(f"M{block.address:x},{len(block.data)/2:x}:" + block.data)
if reply != "OK":
raise ValueError("Can't restore memory")
watch_addr = self.watchpoint_triggered(block, current_memory[1:])
if watch_addr is not None:
stop_reasons.append([("reason", "watchpoint"), ("watch", f"{watch_addr:x}")])
if stop_reasons:
pairs = ";".join(f"{key}:{value}" for key, value in stop_reasons[0])
return f"T05thread:{self.pid:x}.{snapshot.thread_id:x};{pairs};"
return None
def reverse_step(self):
if not self.snapshots:
self.logger.debug("Reverse-step at history boundary")
return self.history_boundary_reply(self.get_current_thread())
self.logger.debug("Reverse-step started")
snapshot = self.snapshots.pop()
stop_reply = self.restore_snapshot(snapshot)
self.set_current_thread(snapshot.thread_id)
self.logger.debug("Reverse-step stopped")
if stop_reply is None:
return self.singlestep_stop_reply(snapshot.thread_id)
return stop_reply
def reverse_continue(self):
self.logger.debug("Reverse-continue started")
thread_id = None
while self.snapshots:
snapshot = self.snapshots.pop()
stop_reply = self.restore_snapshot(snapshot)
thread_id = snapshot.thread_id
if stop_reply is not None:
self.set_current_thread(thread_id)
self.logger.debug("Reverse-continue stopped")
return stop_reply
if thread_id is None:
thread_id = self.get_current_thread()
else:
self.set_current_thread(snapshot.thread_id)
self.logger.debug("Reverse-continue stopped at history boundary")
return self.history_boundary_reply(thread_id)
def get_current_thread(self):
reply = self.pass_through("qC")
return self.parse_thread_id(reply[2:])
def parse_thread_id(self, thread_id):
m = re.match("(p([0-9a-f]+)[.])?([0-9a-f]+)$", thread_id)
if m is None:
raise ValueError("Invalid thread ID: " + thread_id)
if self.pid is None:
self.pid = int(m.group(2), 16)
return int(m.group(3), 16)
def history_boundary_reply(self, thread_id):
return f"T00thread:{self.pid:x}.{thread_id:x};replaylog:begin;"
def singlestep_stop_reply(self, thread_id):
return f"T05thread:{self.pid:x}.{thread_id:x};"
def set_current_thread(self, thread_id):
"""
Set current thread in inner gdbserver.
"""
if thread_id >= 0:
self.pass_through(f"Hg{self.pid:x}.{thread_id:x}")
self.pass_through(f"Hc{self.pid:x}.{thread_id:x}")
else:
self.pass_through(f"Hc-1.-1")
self.pass_through(f"Hg-1.-1")
def get_register(self, register_info, registers):
if register_info.bitsize % 8 != 0:
raise ValueError("Register size must be a multiple of 8 bits")
if register_info.lldb_index not in registers:
raise ValueError("Register value not captured")
data = registers[register_info.lldb_index]
num_bytes = register_info.bitsize//8
bytes = []
for i in range(0, num_bytes):
bytes.append(int(data[i*2:(i + 1)*2], 16))
if register_info.little_endian:
bytes.reverse()
result = 0
for byte in bytes:
result = (result << 8) + byte
return result
def read_memory(self, start_addr, end_addr):
"""
Read a region of memory from the target.
Some of the addresses may extend into invalid virtual memory;
skip those areas.
Return a list of blocks containing the valid area(s) in the
requested range.
"""
regions = []
start_addr = start_addr & (BLOCK_SIZE - 1)
end_addr = (end_addr + BLOCK_SIZE - 1) & (BLOCK_SIZE - 1)
for addr in range(start_addr, end_addr, BLOCK_SIZE):
reply = self.pass_through(f"m{addr:x},{(BLOCK_SIZE - 1):x}")
if reply and reply[0] != 'E':
block = MemoryBlockSnapshot(addr, reply[1:])
regions.append(block)
return regions
def ensure_register_info(self):
if self.general_purpose_register_info is not None:
return
reply = self.pass_through("qHostInfo")
little_endian = any(kv == ("endian", "little") for kv in self.parse_pairs(reply))
self.general_purpose_register_info = {}
lldb_index = 0
while True:
reply = self.pass_through(f"qRegisterInfo{lldb_index:x}")
if not reply or reply[0] == 'E':
break
info = {k:v for k, v in self.parse_pairs(reply)}
reg_info = RegisterInfo(lldb_index, int(info["bitsize"]), little_endian)
if info["set"] == "General Purpose Registers" and not "container-regs" in info:
self.general_purpose_register_info[lldb_index] = reg_info
if "generic" in info:
if info["generic"] == "pc":
self.pc_register_info = reg_info
elif info["generic"] == "sp":
self.sp_register_info = reg_info
lldb_index += 1
if self.pc_register_info is None or self.sp_register_info is None:
raise ValueError("Can't find generic pc or sp register")
def get_thread_list(self):
threads = []
reply = self.pass_through("qfThreadInfo")
while True:
if not reply:
raise ValueError("Missing reply packet")
if reply[0] == 'm':
for id in reply[1:].split(","):
threads.append(self.parse_thread_id(id))
elif reply[0] == 'l':
return threads
reply = self.pass_through("qsThreadInfo")