Replace select() with poll().

This commit is contained in:
Bartosz Taudul 2019-02-10 15:45:23 +01:00
parent d18c3432a4
commit c7e64bb8a8
4 changed files with 37 additions and 69 deletions

View File

@ -982,12 +982,8 @@ void Profiler::Worker()
} }
{ {
timeval tv;
tv.tv_sec = 2;
tv.tv_usec = 0;
char shibboleth[HandshakeShibbolethSize]; char shibboleth[HandshakeShibbolethSize];
auto res = m_sock->ReadRaw( shibboleth, HandshakeShibbolethSize, &tv ); auto res = m_sock->ReadRaw( shibboleth, HandshakeShibbolethSize, 2000 );
if( !res || memcmp( shibboleth, HandshakeShibboleth, HandshakeShibbolethSize ) != 0 ) if( !res || memcmp( shibboleth, HandshakeShibboleth, HandshakeShibbolethSize ) != 0 )
{ {
m_sock->~Socket(); m_sock->~Socket();
@ -996,7 +992,7 @@ void Profiler::Worker()
} }
uint32_t protocolVersion; uint32_t protocolVersion;
res = m_sock->ReadRaw( &protocolVersion, sizeof( protocolVersion ), &tv ); res = m_sock->ReadRaw( &protocolVersion, sizeof( protocolVersion ), 2000 );
if( !res ) if( !res )
{ {
m_sock->~Socket(); m_sock->~Socket();
@ -1105,12 +1101,8 @@ void Profiler::Worker()
m_sock = listen.Accept(); m_sock = listen.Accept();
if( m_sock ) if( m_sock )
{ {
timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
char shibboleth[HandshakeShibbolethSize]; char shibboleth[HandshakeShibbolethSize];
auto res = m_sock->ReadRaw( shibboleth, HandshakeShibbolethSize, &tv ); auto res = m_sock->ReadRaw( shibboleth, HandshakeShibbolethSize, 1000 );
if( !res || memcmp( shibboleth, HandshakeShibboleth, HandshakeShibbolethSize ) != 0 ) if( !res || memcmp( shibboleth, HandshakeShibboleth, HandshakeShibbolethSize ) != 0 )
{ {
m_sock->~Socket(); m_sock->~Socket();
@ -1119,7 +1111,7 @@ void Profiler::Worker()
} }
uint32_t protocolVersion; uint32_t protocolVersion;
res = m_sock->ReadRaw( &protocolVersion, sizeof( protocolVersion ), &tv ); res = m_sock->ReadRaw( &protocolVersion, sizeof( protocolVersion ), 1000 );
if( !res ) if( !res )
{ {
m_sock->~Socket(); m_sock->~Socket();
@ -1523,15 +1515,11 @@ static bool DontExit() { return false; }
bool Profiler::HandleServerQuery() bool Profiler::HandleServerQuery()
{ {
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10000;
uint8_t type; uint8_t type;
if( !m_sock->Read( &type, sizeof( type ), &tv, DontExit ) ) return false; if( !m_sock->Read( &type, sizeof( type ), 10, DontExit ) ) return false;
uint64_t ptr; uint64_t ptr;
if( !m_sock->Read( &ptr, sizeof( ptr ), &tv, DontExit ) ) return false; if( !m_sock->Read( &ptr, sizeof( ptr ), 10, DontExit ) ) return false;
switch( type ) switch( type )
{ {

View File

@ -16,10 +16,12 @@
# pragma warning(disable:4244) # pragma warning(disable:4244)
# pragma warning(disable:4267) # pragma warning(disable:4267)
# endif # endif
# define poll WSAPoll
#else #else
# include <sys/socket.h> # include <sys/socket.h>
# include <netdb.h> # include <netdb.h>
# include <unistd.h> # include <unistd.h>
# include <poll.h>
#endif #endif
#ifndef MSG_NOSIGNAL #ifndef MSG_NOSIGNAL
@ -147,7 +149,7 @@ int Socket::Send( const void* _buf, int len )
return int( buf - start ); return int( buf - start );
} }
int Socket::RecvBuffered( void* buf, int len, const timeval* tv ) int Socket::RecvBuffered( void* buf, int len, int timeout )
{ {
if( len <= m_bufLeft ) if( len <= m_bufLeft )
{ {
@ -165,9 +167,9 @@ int Socket::RecvBuffered( void* buf, int len, const timeval* tv )
return ret; return ret;
} }
if( len >= BufSize ) return Recv( buf, len, tv ); if( len >= BufSize ) return Recv( buf, len, timeout );
m_bufLeft = Recv( m_buf, BufSize, tv ); m_bufLeft = Recv( m_buf, BufSize, timeout );
if( m_bufLeft <= 0 ) return m_bufLeft; if( m_bufLeft <= 0 ) return m_bufLeft;
const auto sz = std::min( len, m_bufLeft ); const auto sz = std::min( len, m_bufLeft );
@ -177,21 +179,15 @@ int Socket::RecvBuffered( void* buf, int len, const timeval* tv )
return sz; return sz;
} }
int Socket::Recv( void* _buf, int len, const timeval* tv ) int Socket::Recv( void* _buf, int len, int timeout )
{ {
auto buf = (char*)_buf; auto buf = (char*)_buf;
fd_set fds; struct pollfd fd;
FD_ZERO( &fds ); fd.fd = (socket_t)m_sock;
FD_SET( static_cast<socket_t>(m_sock), &fds ); fd.events = POLLIN;
#if !defined _WIN32 || defined __MINGW32__ if( poll( &fd, 1, timeout ) > 0 )
timeval _tv = *tv;
select( m_sock+1, &fds, nullptr, nullptr, &_tv );
#else
select( m_sock+1, &fds, nullptr, nullptr, tv );
#endif
if( FD_ISSET( m_sock, &fds ) )
{ {
return recv( m_sock, buf, len, 0 ); return recv( m_sock, buf, len, 0 );
} }
@ -201,14 +197,14 @@ int Socket::Recv( void* _buf, int len, const timeval* tv )
} }
} }
bool Socket::Read( void* _buf, int len, const timeval* tv, std::function<bool()> exitCb ) bool Socket::Read( void* _buf, int len, int timeout, std::function<bool()> exitCb )
{ {
auto buf = (char*)_buf; auto buf = (char*)_buf;
while( len > 0 ) while( len > 0 )
{ {
if( exitCb() ) return false; if( exitCb() ) return false;
const auto sz = RecvBuffered( buf, len, tv ); const auto sz = RecvBuffered( buf, len, timeout );
switch( sz ) switch( sz )
{ {
case 0: case 0:
@ -231,12 +227,12 @@ bool Socket::Read( void* _buf, int len, const timeval* tv, std::function<bool()>
return true; return true;
} }
bool Socket::ReadRaw( void* _buf, int len, const timeval* tv ) bool Socket::ReadRaw( void* _buf, int len, int timeout )
{ {
auto buf = (char*)_buf; auto buf = (char*)_buf;
while( len > 0 ) while( len > 0 )
{ {
const auto sz = Recv( buf, len, tv ); const auto sz = Recv( buf, len, timeout );
if( sz <= 0 ) return false; if( sz <= 0 ) return false;
len -= sz; len -= sz;
buf += sz; buf += sz;
@ -248,14 +244,11 @@ bool Socket::HasData()
{ {
if( m_bufLeft > 0 ) return true; if( m_bufLeft > 0 ) return true;
struct timeval tv; struct pollfd fd;
memset( &tv, 0, sizeof( tv ) ); fd.fd = (socket_t)m_sock;
fd.events = POLLIN;
fd_set fds; return poll( &fd, 1, 0 ) > 0;
FD_ZERO( &fds );
FD_SET( static_cast<socket_t>(m_sock), &fds );
return select( m_sock+1, &fds, nullptr, nullptr, &tv ) > 0;
} }
@ -303,16 +296,11 @@ Socket* ListenSocket::Accept()
struct sockaddr_storage remote; struct sockaddr_storage remote;
socklen_t sz = sizeof( remote ); socklen_t sz = sizeof( remote );
struct timeval tv; struct pollfd fd;
tv.tv_sec = 0; fd.fd = (socket_t)m_sock;
tv.tv_usec = 10000; fd.events = POLLIN;
fd_set fds; if( poll( &fd, 1, 10 ) > 0 )
FD_ZERO( &fds );
FD_SET( static_cast<socket_t>(m_sock), &fds );
select( m_sock+1, &fds, nullptr, nullptr, &tv );
if( FD_ISSET( m_sock, &fds ) )
{ {
int sock = accept( m_sock, (sockaddr*)&remote, &sz); int sock = accept( m_sock, (sockaddr*)&remote, &sz);
#if defined __APPLE__ #if defined __APPLE__

View File

@ -3,8 +3,6 @@
#include <functional> #include <functional>
struct timeval;
namespace tracy namespace tracy
{ {
@ -26,8 +24,8 @@ public:
int Send( const void* buf, int len ); int Send( const void* buf, int len );
bool Read( void* buf, int len, const timeval* tv, std::function<bool()> exitCb ); bool Read( void* buf, int len, int timeout, std::function<bool()> exitCb );
bool ReadRaw( void* buf, int len, const timeval* tv ); bool ReadRaw( void* buf, int len, int timeout );
bool HasData(); bool HasData();
Socket( const Socket& ) = delete; Socket( const Socket& ) = delete;
@ -36,8 +34,8 @@ public:
Socket& operator=( Socket&& ) = delete; Socket& operator=( Socket&& ) = delete;
private: private:
int RecvBuffered( void* buf, int len, const timeval* tv ); int RecvBuffered( void* buf, int len, int timeout );
int Recv( void* buf, int len, const timeval* tv ); int Recv( void* buf, int len, int timeout );
char* m_buf; char* m_buf;
char* m_bufPtr; char* m_bufPtr;

View File

@ -1,7 +1,5 @@
#ifdef _WIN32 #ifdef _WIN32
# include <winsock2.h> # include <winsock2.h>
#else
# include <sys/time.h>
#endif #endif
#ifdef _WIN32 #ifdef _WIN32
@ -1471,10 +1469,6 @@ uint16_t Worker::CompressThreadNew( uint64_t thread )
void Worker::Exec() void Worker::Exec()
{ {
timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 10000;
auto ShouldExit = [this] auto ShouldExit = [this]
{ {
return m_shutdown.load( std::memory_order_relaxed ); return m_shutdown.load( std::memory_order_relaxed );
@ -1497,7 +1491,7 @@ void Worker::Exec()
uint32_t protocolVersion = ProtocolVersion; uint32_t protocolVersion = ProtocolVersion;
m_sock.Send( &protocolVersion, sizeof( protocolVersion ) ); m_sock.Send( &protocolVersion, sizeof( protocolVersion ) );
HandshakeStatus handshake; HandshakeStatus handshake;
if( !m_sock.Read( &handshake, sizeof( handshake ), &tv, ShouldExit ) ) goto close; if( !m_sock.Read( &handshake, sizeof( handshake ), 10, ShouldExit ) ) goto close;
m_handshake.store( handshake, std::memory_order_relaxed ); m_handshake.store( handshake, std::memory_order_relaxed );
switch( handshake ) switch( handshake )
{ {
@ -1522,7 +1516,7 @@ void Worker::Exec()
{ {
WelcomeMessage welcome; WelcomeMessage welcome;
if( !m_sock.Read( &welcome, sizeof( welcome ), &tv, ShouldExit ) ) goto close; if( !m_sock.Read( &welcome, sizeof( welcome ), 10, ShouldExit ) ) goto close;
m_timerMul = welcome.timerMul; m_timerMul = welcome.timerMul;
const auto initEnd = TscTime( welcome.initEnd ); const auto initEnd = TscTime( welcome.initEnd );
m_data.framesBase->frames.push_back( FrameEvent{ TscTime( welcome.initBegin ), -1 } ); m_data.framesBase->frames.push_back( FrameEvent{ TscTime( welcome.initBegin ), -1 } );
@ -1547,7 +1541,7 @@ void Worker::Exec()
if( welcome.onDemand != 0 ) if( welcome.onDemand != 0 )
{ {
OnDemandPayloadMessage onDemand; OnDemandPayloadMessage onDemand;
if( !m_sock.Read( &onDemand, sizeof( onDemand ), &tv, ShouldExit ) ) goto close; if( !m_sock.Read( &onDemand, sizeof( onDemand ), 10, ShouldExit ) ) goto close;
m_data.frameOffset = onDemand.frames; m_data.frameOffset = onDemand.frames;
} }
} }
@ -1565,8 +1559,8 @@ void Worker::Exec()
auto buf = m_buffer + m_bufferOffset; auto buf = m_buffer + m_bufferOffset;
lz4sz_t lz4sz; lz4sz_t lz4sz;
if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), &tv, ShouldExit ) ) goto close; if( !m_sock.Read( &lz4sz, sizeof( lz4sz ), 10, ShouldExit ) ) goto close;
if( !m_sock.Read( lz4buf.get(), lz4sz, &tv, ShouldExit ) ) goto close; if( !m_sock.Read( lz4buf.get(), lz4sz, 10, ShouldExit ) ) goto close;
bytes += sizeof( lz4sz ) + lz4sz; bytes += sizeof( lz4sz ) + lz4sz;
auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize ); auto sz = LZ4_decompress_safe_continue( m_stream, lz4buf.get(), buf, lz4sz, TargetFrameSize );