// Copyright (c) 2013 Doug Binks // // This software is provided 'as-is', without any express or implied // warranty. In no event will the authors be held liable for any damages // arising from the use of this software. // // Permission is granted to anyone to use this software for any purpose, // including commercial applications, and to alter it and redistribute it // freely, subject to the following restrictions: // // 1. The origin of this software must not be misrepresented; you must not // claim that you wrote the original software. If you use this software // in a product, an acknowledgement in the product documentation would be // appreciated but is not required. // 2. Altered source versions must be plainly marked as such, and must not be // misrepresented as being the original software. // 3. This notice may not be removed or altered from any source distribution. #include #include "TaskScheduler.h" #include "LockLessMultiReadPipe.h" using namespace enki; static const uint32_t PIPESIZE_LOG2 = 8; static const uint32_t SPIN_COUNT = 100; static const uint32_t SPIN_BACKOFF_MULTIPLIER = 10; static const uint32_t MAX_NUM_INITIAL_PARTITIONS = 8; // each software thread gets it's own copy of gtl_threadNum, so this is safe to use as a static variable static THREAD_LOCAL uint32_t gtl_threadNum = 0; namespace enki { struct SubTaskSet { ITaskSet* pTask; TaskSetPartition partition; }; // we derive class TaskPipe rather than typedef to get forward declaration working easily class TaskPipe : public LockLessMultiReadPipe {}; struct ThreadArgs { uint32_t threadNum; TaskScheduler* pTaskScheduler; }; } namespace { SubTaskSet SplitTask( SubTaskSet& subTask_, uint32_t rangeToSplit_ ) { SubTaskSet splitTask = subTask_; uint32_t rangeLeft = subTask_.partition.end - subTask_.partition.start; if( rangeToSplit_ > rangeLeft ) { rangeToSplit_ = rangeLeft; } splitTask.partition.end = subTask_.partition.start + rangeToSplit_; subTask_.partition.start = splitTask.partition.end; return splitTask; } #if defined _WIN32 #if defined _M_IX86 || defined _M_X64 #pragma intrinsic(_mm_pause) inline void Pause() { _mm_pause(); } #endif #elif defined __i386__ || defined __x86_64__ inline void Pause() { __asm__ __volatile__("pause;"); } #else inline void Pause() { ;} // may have NOP or yield equiv #endif } static void SafeCallback(ProfilerCallbackFunc func_, uint32_t threadnum_) { if( func_ ) { func_(threadnum_); } } ProfilerCallbacks* TaskScheduler::GetProfilerCallbacks() { return &m_ProfilerCallbacks; } THREADFUNC_DECL TaskScheduler::TaskingThreadFunction( void* pArgs ) { ThreadArgs args = *(ThreadArgs*)pArgs; uint32_t threadNum = args.threadNum; TaskScheduler* pTS = args.pTaskScheduler; gtl_threadNum = threadNum; SafeCallback( pTS->m_ProfilerCallbacks.threadStart, threadNum ); uint32_t spinCount = 0; uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped. while( pTS->m_bRunning ) { if(!pTS->TryRunTask( threadNum, hintPipeToCheck_io ) ) { // no tasks, will spin then wait ++spinCount; if( spinCount > SPIN_COUNT ) { pTS->WaitForTasks( threadNum ); spinCount = 0; } else { uint32_t spinBackoffCount = spinCount * SPIN_BACKOFF_MULTIPLIER; while( spinBackoffCount ) { Pause(); --spinBackoffCount; } } } else { spinCount = 0; } } AtomicAdd( &pTS->m_NumThreadsRunning, -1 ); SafeCallback( pTS->m_ProfilerCallbacks.threadStop, threadNum ); return 0; } void TaskScheduler::StartThreads() { if( m_bHaveThreads ) { return; } m_bRunning = true; SemaphoreCreate( m_NewTaskSemaphore ); // we create one less thread than m_NumThreads as the main thread counts as one m_pThreadNumStore = new ThreadArgs[m_NumThreads]; m_pThreadIDs = new threadid_t[m_NumThreads]; m_pThreadNumStore[0].threadNum = 0; m_pThreadNumStore[0].pTaskScheduler = this; m_pThreadIDs[0] = 0; m_NumThreadsWaiting = 0; m_NumThreadsRunning = 1;// acount for main thread for( uint32_t thread = 1; thread < m_NumThreads; ++thread ) { m_pThreadNumStore[thread].threadNum = thread; m_pThreadNumStore[thread].pTaskScheduler = this; ThreadCreate( &m_pThreadIDs[thread], TaskingThreadFunction, &m_pThreadNumStore[thread] ); ++m_NumThreadsRunning; } // ensure we have sufficient tasks to equally fill either all threads including main // or just the threads we've launched, this is outside the firstinit as we want to be able // to runtime change it if( 1 == m_NumThreads ) { m_NumPartitions = 1; m_NumInitialPartitions = 1; } else { m_NumPartitions = m_NumThreads * (m_NumThreads - 1); m_NumInitialPartitions = m_NumThreads - 1; if( m_NumInitialPartitions > MAX_NUM_INITIAL_PARTITIONS ) { m_NumInitialPartitions = MAX_NUM_INITIAL_PARTITIONS; } } m_bHaveThreads = true; } void TaskScheduler::StopThreads( bool bWait_ ) { if( m_bHaveThreads ) { // wait for them threads quit before deleting data m_bRunning = false; while( bWait_ && m_NumThreadsRunning > 1 ) { // keep firing event to ensure all threads pick up state of m_bRunning SemaphoreSignal( m_NewTaskSemaphore, m_NumThreadsRunning ); } for( uint32_t thread = 1; thread < m_NumThreads; ++thread ) { ThreadTerminate( m_pThreadIDs[thread] ); } m_NumThreads = 0; delete[] m_pThreadNumStore; delete[] m_pThreadIDs; m_pThreadNumStore = 0; m_pThreadIDs = 0; SemaphoreClose( m_NewTaskSemaphore ); m_bHaveThreads = false; m_NumThreadsWaiting = 0; m_NumThreadsRunning = 0; } } bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ ) { // check for tasks SubTaskSet subTask; bool bHaveTask = m_pPipesPerThread[ threadNum ].WriterTryReadFront( &subTask ); uint32_t threadToCheck = hintPipeToCheck_io_; uint32_t checkCount = 0; while( !bHaveTask && checkCount < m_NumThreads ) { threadToCheck = ( hintPipeToCheck_io_ + checkCount ) % m_NumThreads; if( threadToCheck != threadNum ) { bHaveTask = m_pPipesPerThread[ threadToCheck ].ReaderTryReadBack( &subTask ); } ++checkCount; } if( bHaveTask ) { // update hint, will preserve value unless actually got task from another thread. hintPipeToCheck_io_ = threadToCheck; uint32_t partitionSize = subTask.partition.end - subTask.partition.start; if( subTask.pTask->m_RangeToRun < partitionSize ) { SubTaskSet taskToRun = SplitTask( subTask, subTask.pTask->m_RangeToRun ); SplitAndAddTask( gtl_threadNum, subTask, subTask.pTask->m_RangeToRun, 0 ); taskToRun.pTask->ExecuteRange( taskToRun.partition, threadNum ); AtomicAdd( &taskToRun.pTask->m_RunningCount, -1 ); } else { // the task has already been divided up by AddTaskSetToPipe, so just run it subTask.pTask->ExecuteRange( subTask.partition, threadNum ); AtomicAdd( &subTask.pTask->m_RunningCount, -1 ); } } return bHaveTask; } void TaskScheduler::WaitForTasks( uint32_t threadNum ) { // We incrememt the number of threads waiting here in order // to ensure that the check for tasks occurs after the increment // to prevent a task being added after a check, then the thread waiting. // This will occasionally result in threads being mistakenly awoken, // but they will then go back to sleep. AtomicAdd( &m_NumThreadsWaiting, 1 ); bool bHaveTasks = false; for( uint32_t thread = 0; thread < m_NumThreads; ++thread ) { if( !m_pPipesPerThread[ thread ].IsPipeEmpty() ) { bHaveTasks = true; break; } } if( !bHaveTasks ) { SafeCallback( m_ProfilerCallbacks.waitStart, threadNum ); SemaphoreWait( m_NewTaskSemaphore ); SafeCallback( m_ProfilerCallbacks.waitStop, threadNum ); } int32_t prev = AtomicAdd( &m_NumThreadsWaiting, -1 ); assert( prev != 0 ); } void TaskScheduler::WakeThreads() { SemaphoreSignal( m_NewTaskSemaphore, m_NumThreadsWaiting ); } void TaskScheduler::SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_, uint32_t rangeToSplit_, int32_t runningCountOffset_ ) { int32_t numAdded = 0; while( subTask_.partition.start != subTask_.partition.end ) { SubTaskSet taskToAdd = SplitTask( subTask_, rangeToSplit_ ); // add the partition to the pipe ++numAdded; if( !m_pPipesPerThread[ gtl_threadNum ].WriterTryWriteFront( taskToAdd ) ) { if( numAdded > 1 ) { WakeThreads(); } // alter range to run the appropriate fraction if( taskToAdd.pTask->m_RangeToRun < rangeToSplit_ ) { taskToAdd.partition.end = taskToAdd.partition.start + taskToAdd.pTask->m_RangeToRun; subTask_.partition.start = taskToAdd.partition.end; } taskToAdd.pTask->ExecuteRange( taskToAdd.partition, threadNum_ ); --numAdded; } } // increment running count by number added AtomicAdd( &subTask_.pTask->m_RunningCount, numAdded + runningCountOffset_ ); WakeThreads(); } void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet ) { // set running count to -1 to guarantee it won't be found complete until all subtasks added pTaskSet->m_RunningCount = -1; // divide task up and add to pipe pTaskSet->m_RangeToRun = pTaskSet->m_SetSize / m_NumPartitions; if( pTaskSet->m_RangeToRun < pTaskSet->m_MinRange ) { pTaskSet->m_RangeToRun = pTaskSet->m_MinRange; } uint32_t rangeToSplit = pTaskSet->m_SetSize / m_NumInitialPartitions; if( rangeToSplit < pTaskSet->m_MinRange ) { rangeToSplit = pTaskSet->m_MinRange; } SubTaskSet subTask; subTask.pTask = pTaskSet; subTask.partition.start = 0; subTask.partition.end = pTaskSet->m_SetSize; SplitAndAddTask( gtl_threadNum, subTask, rangeToSplit, 1 ); } void TaskScheduler::WaitforTaskSet( const ITaskSet* pTaskSet ) { uint32_t hintPipeToCheck_io = gtl_threadNum + 1; // does not need to be clamped. if( pTaskSet ) { while( pTaskSet->m_RunningCount ) { TryRunTask( gtl_threadNum, hintPipeToCheck_io ); // should add a spin then wait for task completion event. } } else { TryRunTask( gtl_threadNum, hintPipeToCheck_io ); } } void TaskScheduler::WaitforAll() { bool bHaveTasks = true; uint32_t hintPipeToCheck_io = gtl_threadNum + 1; // does not need to be clamped. int32_t threadsRunning = m_NumThreadsRunning - 1; while( bHaveTasks || m_NumThreadsWaiting < threadsRunning ) { TryRunTask( gtl_threadNum, hintPipeToCheck_io ); bHaveTasks = false; for( uint32_t thread = 0; thread < m_NumThreads; ++thread ) { if( !m_pPipesPerThread[ thread ].IsPipeEmpty() ) { bHaveTasks = true; break; } } } } void TaskScheduler::WaitforAllAndShutdown() { WaitforAll(); StopThreads(true); delete[] m_pPipesPerThread; m_pPipesPerThread = 0; } uint32_t TaskScheduler::GetNumTaskThreads() const { return m_NumThreads; } TaskScheduler::TaskScheduler() : m_pPipesPerThread(NULL) , m_NumThreads(0) , m_pThreadNumStore(NULL) , m_pThreadIDs(NULL) , m_bRunning(false) , m_NumThreadsRunning(0) , m_NumThreadsWaiting(0) , m_NumPartitions(0) , m_bHaveThreads(false) { memset(&m_ProfilerCallbacks, 0, sizeof(m_ProfilerCallbacks)); } TaskScheduler::~TaskScheduler() { StopThreads( true ); // Stops threads, waiting for them. delete[] m_pPipesPerThread; m_pPipesPerThread = 0; } void TaskScheduler::Initialize( uint32_t numThreads_ ) { assert( numThreads_ ); StopThreads( true ); // Stops threads, waiting for them. delete[] m_pPipesPerThread; m_NumThreads = numThreads_; m_pPipesPerThread = new TaskPipe[ m_NumThreads ]; StartThreads(); } void TaskScheduler::Initialize() { Initialize( GetNumHardwareThreads() ); }