John Harrison 13eca5248c
[lldb-dap] Re-land refactor of DebugCommunication. (#147787)
Originally commited in 362b9d78b4ee9107da2b5e90b3764b0f0fa610fe and then
reverted in cb63b75e32a415c9bfc298ed7fdcd67e8d9de54c.

This re-lands a subset of the changes to
dap_server.py/DebugCommunication and addresses the python3.10
compatibility issue.

This includes less type annotations since those were the reason for the
failures on that specific version of python.

I've done additional testing on python3.8, python3.10 and python3.13 to
further validate these changes.
2025-08-21 10:20:01 -07:00

95 lines
3.6 KiB
Python

"""
Test lldb-dap cancel request
"""
import time
from lldbsuite.test.decorators import *
from lldbsuite.test.lldbtest import *
import lldbdap_testcase
class TestDAP_cancel(lldbdap_testcase.DAPTestCaseBase):
def send_async_req(self, command: str, arguments: dict = {}) -> int:
return self.dap_server.send_packet(
{
"type": "request",
"command": command,
"arguments": arguments,
}
)
def async_blocking_request(self, duration: float) -> int:
"""
Sends an evaluate request that will sleep for the specified duration to
block the request handling thread.
"""
return self.send_async_req(
command="evaluate",
arguments={
"expression": '`script import time; print("starting sleep", file=lldb.debugger.GetOutputFileHandle()); time.sleep({})'.format(
duration
),
"context": "repl",
},
)
def async_cancel(self, requestId: int) -> int:
return self.send_async_req(command="cancel", arguments={"requestId": requestId})
def test_pending_request(self):
"""
Tests cancelling a pending request.
"""
program = self.getBuildArtifact("a.out")
self.build_and_launch(program)
# Use a relatively short timeout since this is only to ensure the
# following request is queued.
blocking_seq = self.async_blocking_request(duration=1.0)
# Use a longer timeout to ensure we catch if the request was interrupted
# properly.
pending_seq = self.async_blocking_request(duration=self.DEFAULT_TIMEOUT / 2)
cancel_seq = self.async_cancel(requestId=pending_seq)
blocking_resp = self.dap_server.receive_response(blocking_seq)
self.assertEqual(blocking_resp["request_seq"], blocking_seq)
self.assertEqual(blocking_resp["command"], "evaluate")
self.assertEqual(blocking_resp["success"], True)
pending_resp = self.dap_server.receive_response(pending_seq)
self.assertEqual(pending_resp["request_seq"], pending_seq)
self.assertEqual(pending_resp["command"], "evaluate")
self.assertEqual(pending_resp["success"], False)
self.assertEqual(pending_resp["message"], "cancelled")
cancel_resp = self.dap_server.receive_response(cancel_seq)
self.assertEqual(cancel_resp["request_seq"], cancel_seq)
self.assertEqual(cancel_resp["command"], "cancel")
self.assertEqual(cancel_resp["success"], True)
self.continue_to_exit()
def test_inflight_request(self):
"""
Tests cancelling an inflight request.
"""
program = self.getBuildArtifact("a.out")
self.build_and_launch(program)
blocking_seq = self.async_blocking_request(duration=self.DEFAULT_TIMEOUT / 2)
# Wait for the sleep to start to cancel the inflight request.
self.collect_console(pattern="starting sleep")
cancel_seq = self.async_cancel(requestId=blocking_seq)
blocking_resp = self.dap_server.receive_response(blocking_seq)
self.assertEqual(blocking_resp["request_seq"], blocking_seq)
self.assertEqual(blocking_resp["command"], "evaluate")
self.assertEqual(blocking_resp["success"], False)
self.assertEqual(blocking_resp["message"], "cancelled")
cancel_resp = self.dap_server.receive_response(cancel_seq)
self.assertEqual(cancel_resp["request_seq"], cancel_seq)
self.assertEqual(cancel_resp["command"], "cancel")
self.assertEqual(cancel_resp["success"], True)
self.continue_to_exit()