
These two functions are doing the same thing, only one of them is sending the packets immediately and the other "queues" them to be sent later. The first one is better as in case of errors, the backtrace will point straight to the place that caused them. Modify the first method to avoid duplication, and ten standardize on it.
173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
from __future__ import print_function
|
|
|
|
import gdbremote_testcase
|
|
import random
|
|
import select
|
|
import socket
|
|
from lldbsuite.test.decorators import *
|
|
from lldbsuite.test.lldbtest import *
|
|
from lldbgdbserverutils import Server
|
|
import lldbsuite.test.lldbplatformutil
|
|
|
|
if lldbplatformutil.getHostPlatform() == "windows":
|
|
import ctypes
|
|
import ctypes.wintypes
|
|
from ctypes.wintypes import (BOOL, DWORD, HANDLE, LPCWSTR, LPDWORD, LPVOID)
|
|
|
|
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
|
|
|
|
PIPE_ACCESS_INBOUND = 1
|
|
FILE_FLAG_FIRST_PIPE_INSTANCE = 0x00080000
|
|
FILE_FLAG_OVERLAPPED = 0x40000000
|
|
PIPE_TYPE_BYTE = 0
|
|
PIPE_REJECT_REMOTE_CLIENTS = 8
|
|
INVALID_HANDLE_VALUE = -1
|
|
ERROR_ACCESS_DENIED = 5
|
|
ERROR_IO_PENDING = 997
|
|
|
|
|
|
class OVERLAPPED(ctypes.Structure):
|
|
_fields_ = [("Internal", LPVOID), ("InternalHigh", LPVOID), ("Offset",
|
|
DWORD), ("OffsetHigh", DWORD), ("hEvent", HANDLE)]
|
|
|
|
def __init__(self):
|
|
super(OVERLAPPED, self).__init__(Internal=0, InternalHigh=0,
|
|
Offset=0, OffsetHigh=0, hEvent=None)
|
|
LPOVERLAPPED = ctypes.POINTER(OVERLAPPED)
|
|
|
|
CreateNamedPipe = kernel32.CreateNamedPipeW
|
|
CreateNamedPipe.restype = HANDLE
|
|
CreateNamedPipe.argtypes = (LPCWSTR, DWORD, DWORD, DWORD, DWORD, DWORD,
|
|
DWORD, LPVOID)
|
|
|
|
ConnectNamedPipe = kernel32.ConnectNamedPipe
|
|
ConnectNamedPipe.restype = BOOL
|
|
ConnectNamedPipe.argtypes = (HANDLE, LPOVERLAPPED)
|
|
|
|
CreateEvent = kernel32.CreateEventW
|
|
CreateEvent.restype = HANDLE
|
|
CreateEvent.argtypes = (LPVOID, BOOL, BOOL, LPCWSTR)
|
|
|
|
GetOverlappedResultEx = kernel32.GetOverlappedResultEx
|
|
GetOverlappedResultEx.restype = BOOL
|
|
GetOverlappedResultEx.argtypes = (HANDLE, LPOVERLAPPED, LPDWORD, DWORD,
|
|
BOOL)
|
|
|
|
ReadFile = kernel32.ReadFile
|
|
ReadFile.restype = BOOL
|
|
ReadFile.argtypes = (HANDLE, LPVOID, DWORD, LPDWORD, LPOVERLAPPED)
|
|
|
|
CloseHandle = kernel32.CloseHandle
|
|
CloseHandle.restype = BOOL
|
|
CloseHandle.argtypes = (HANDLE,)
|
|
|
|
class Pipe(object):
|
|
def __init__(self, prefix):
|
|
while True:
|
|
self.name = "lldb-" + str(random.randrange(1e10))
|
|
full_name = "\\\\.\\pipe\\" + self.name
|
|
self._handle = CreateNamedPipe(full_name, PIPE_ACCESS_INBOUND |
|
|
FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
|
|
PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS, 1, 4096,
|
|
4096, 0, None)
|
|
if self._handle != INVALID_HANDLE_VALUE:
|
|
break
|
|
if ctypes.get_last_error() != ERROR_ACCESS_DENIED:
|
|
raise ctypes.WinError(ctypes.get_last_error())
|
|
|
|
self._overlapped = OVERLAPPED()
|
|
self._overlapped.hEvent = CreateEvent(None, True, False, None)
|
|
result = ConnectNamedPipe(self._handle, self._overlapped)
|
|
assert result == 0
|
|
if ctypes.get_last_error() != ERROR_IO_PENDING:
|
|
raise ctypes.WinError(ctypes.get_last_error())
|
|
|
|
def finish_connection(self, timeout):
|
|
if not GetOverlappedResultEx(self._handle, self._overlapped,
|
|
ctypes.byref(DWORD(0)), timeout*1000, True):
|
|
raise ctypes.WinError(ctypes.get_last_error())
|
|
|
|
def read(self, size, timeout):
|
|
buf = ctypes.create_string_buffer(size)
|
|
if not ReadFile(self._handle, ctypes.byref(buf), size, None,
|
|
self._overlapped):
|
|
if ctypes.get_last_error() != ERROR_IO_PENDING:
|
|
raise ctypes.WinError(ctypes.get_last_error())
|
|
read = DWORD(0)
|
|
if not GetOverlappedResultEx(self._handle, self._overlapped,
|
|
ctypes.byref(read), timeout*1000, True):
|
|
raise ctypes.WinError(ctypes.get_last_error())
|
|
return buf.raw[0:read.value]
|
|
|
|
def close(self):
|
|
CloseHandle(self._overlapped.hEvent)
|
|
CloseHandle(self._handle)
|
|
|
|
|
|
else:
|
|
class Pipe(object):
|
|
def __init__(self, prefix):
|
|
self.name = os.path.join(prefix, "stub_port_number")
|
|
os.mkfifo(self.name)
|
|
self._fd = os.open(self.name, os.O_RDONLY | os.O_NONBLOCK)
|
|
|
|
def finish_connection(self, timeout):
|
|
pass
|
|
|
|
def read(self, size, timeout):
|
|
(readers, _, _) = select.select([self._fd], [], [], timeout)
|
|
if self._fd not in readers:
|
|
raise TimeoutError
|
|
return os.read(self._fd, size)
|
|
|
|
def close(self):
|
|
os.close(self._fd)
|
|
|
|
|
|
class TestGdbRemoteConnection(gdbremote_testcase.GdbRemoteTestCaseBase):
|
|
|
|
mydir = TestBase.compute_mydir(__file__)
|
|
|
|
@skipIfRemote # reverse connect is not a supported use case for now
|
|
def test_reverse_connect(self):
|
|
# Reverse connect is the default connection method.
|
|
self.connect_to_debug_monitor()
|
|
# Verify we can do the handshake. If that works, we'll call it good.
|
|
self.do_handshake()
|
|
|
|
@skipIfRemote
|
|
def test_named_pipe(self):
|
|
family, type, proto, _, addr = socket.getaddrinfo(
|
|
self.stub_hostname, 0, proto=socket.IPPROTO_TCP)[0]
|
|
self.sock = socket.socket(family, type, proto)
|
|
self.sock.settimeout(self.DEFAULT_TIMEOUT)
|
|
|
|
self.addTearDownHook(lambda: self.sock.close())
|
|
|
|
pipe = Pipe(self.getBuildDir())
|
|
|
|
self.addTearDownHook(lambda: pipe.close())
|
|
|
|
args = self.debug_monitor_extra_args
|
|
if lldb.remote_platform:
|
|
args += ["*:0"]
|
|
else:
|
|
args += ["localhost:0"]
|
|
|
|
args += ["--named-pipe", pipe.name]
|
|
|
|
server = self.spawnSubprocess(
|
|
self.debug_monitor_exe,
|
|
args,
|
|
install_remote=False)
|
|
|
|
pipe.finish_connection(self.DEFAULT_TIMEOUT)
|
|
port = pipe.read(10, self.DEFAULT_TIMEOUT)
|
|
# Trim null byte, convert to int
|
|
addr = (addr[0], int(port[:-1]))
|
|
self.sock.connect(addr)
|
|
self._server = Server(self.sock, server)
|
|
|
|
# Verify we can do the handshake. If that works, we'll call it good.
|
|
self.do_handshake()
|