// Include files #include "threadpool.h" // Local definitions // Global data // cThreadPool Constructor cThreadPool::cThreadPool(void) : mRequestQueue(NULL), mWorkerThreadNumber(0) { } // ~cThreadPool Destructor cThreadPool::~cThreadPool(void) { Shutdown( ); } // Initialize Method bool cThreadPool::Initialize(HANDLE completion, int numThreads, int numWorkerThreads) { // CreateIoCompletionPort¸¦ ÀÌ¿ëÇÏ¿© completion port ¿ÀºêÁ§Æ®¸¦ »ý¼ºÇÒ ¶§ ÀϹÝÀûÀ¸·Î ½Å°æ½á¾ß ÇÒ ÆÄ // ¶ó¸ÞÅÍ´Â NumberOfConcurrentThreads ÀÌ´Ù. óÀ½ 3°³ÀÇ ÆÄ¶ó¸ÞÅÍ´Â °ªÀ» ¼³Á¤ÇÏÁö ¾Ê´Â´Ù. // // NumberOfConcurrentThreads´Â ÇöÀç completion port¿¡ ´ëÇÏ¿© ¸î°³ÀÇ ½º·¹µå¸¦ ½ÇÇàÇÒ °ÍÀΰ¡¸¦ °áÁ¤ÇÑ // ´Ù. °¡Àå ÀÌ»óÀûÀÎ °æ¿ì´Â ½º·¹µåÀÇ context switchingÀ» ¹æÁöÇϱâ À§ÇÏ¿© ¼­ºñ½º¿¡ ´ëÇÏ¿© ÇϳªÀÇ ÇÁ·Î // ¼¼¼­´ç ÇϳªÀÇ ½º·¹µå¸¸ µ¿ÀÛÇϵµ·Ï ÇÏ´Â °ÍÀÌ´Ù. NumberOfConcurrentThreadsÀÇ °ªÀ» 0À¸·Î ¼³Á¤ÇÏ¸é ½Ã // ½ºÅÛÀº ÇÁ·Î¼¼¼­¿¡ ÀûÇÕÇÑ ½º·¹µå °³¼ö¸¦ ÇÒ´çÇÏ¿© »ç¿ëÇÑ´Ù. // (ÇϳªÀÇ CPU¿¡ ´ëÇÏ¿© IOCP Çϳª´ç ÇϳªÀÇ ½º·¹µå¸¦ »ý¼º) mRequestQueue = CreateIoCompletionPort( completion, NULL, 0, numThreads ); // CreateIoCompletionPort ¿ÀºêÁ§Æ®¸¦ »ý¼ºÇÑ µÚ¿¡ ÀÛ¾÷½º·¹µå¸¦ »ý¼ºÇÑ´Ù. À̶§ »ý¼ºÇÏ´Â ÀÛ¾÷½º·¹µå´Â, // ½º·¹µå°¡ ºí·ÏµÉ °¡´É¼ºÀÌ ÀÖ´Ù¸é, NumberOfConcurrentThreads ÀÇ °¹¼öº¸´Ù ¸¹ÀÌ »ý¼ºÇÏ´Â °ÍÀÌ ÁÁ´Ù. // NumberOfConcurrentThreads¸¸Å­ ½º·¹µå°¡ Ç×»ó ½ÇÇàµÇ°Ô ÇÒ ¼ö Àֱ⠶§¹®¿¡, ºí·ÏµÈ ½º·¹µå°¡ ÀÖ´Ù¸é ´Ù // ¸¥ ½º·¹µå°¡ ´ë½Å µ¿ÀÛÇÒ °ÍÀ̱⠶§¹®ÀÌ´Ù. SYSTEM_INFO si; DWORD threadId; GetSystemInfo(&si); mWorkerThreadNumber = min( si.dwNumberOfProcessors * numWorkerThreads, MAX_WORKER_THREAD ); for ( int i = 0; i < mWorkerThreadNumber; i++ ) { mWorkerThread[i] = CreateThread( NULL, 0, WorkerThreadStartingPoint, (LPVOID)this, 0, &threadId ); } return (mRequestQueue != NULL) ? true : false; } // Shutdown Method void cThreadPool::Shutdown( ) { // Not initialized if ( mRequestQueue == NULL ) return; // Cause worker threads to exit if ( mRequestQueue != NULL ) { for ( int i = 0; i < mWorkerThreadNumber; i++ ) { PostQueuedCompletionStatus( mRequestQueue, 0, 0, POOL_SHUTDOWN ); } } // Make sure worker threads exits. for ( int i = 0; i < mWorkerThreadNumber; i++ ) { if ( WaitForSingleObject( mWorkerThread[i], 60000 ) != WAIT_OBJECT_0 ) { DWORD exitCode; GetExitCodeThread( mWorkerThread[i], &exitCode); if ( exitCode == STILL_ACTIVE ) { TerminateThread( mWorkerThread[i], 0 ); } } CloseHandle( mWorkerThread[i] ); mWorkerThread[i] = NULL; } // Close the request queue handle CloseHandle( mRequestQueue ); // Clear the queue handle mRequestQueue = NULL; } // QueueRequest Method BOOL cThreadPool::QueueRequest(ULONG_PTR completionKey, LPOVERLAPPED overlapped, DWORD bytesTransferred) { return PostQueuedCompletionStatus( mRequestQueue, bytesTransferred, completionKey, overlapped ); } // WorkerThread Method DWORD cThreadPool::WorkerThread( ) { DWORD bytesTransferred; ULONG_PTR completionKey; OVERLAPPED* overlapped; while ( GetQueuedCompletionStatus( mRequestQueue, &bytesTransferred, &completionKey, &overlapped, INFINITE) ) { // Shut down if ( overlapped == POOL_SHUTDOWN ) break; } return 0L; } // WorkerThreadStartingPoint Method DWORD cThreadPool::WorkerThreadStartingPoint(void* ptr) { cThreadPool* threadPool = (cThreadPool*)ptr; return threadPool->WorkerThread( ); }