// Include files #include "iocpudpsend.h" // Local definitions #pragma warning( disable: 4127 ) #define SafeCloseHandle(P) if (P!=NULL) { CloseHandle(P); (P)=NULL; } // Çڵ鸦 ¾ÈÀüÇÏ°Ô ÇØÁ¦ #define SafeDelete(P) if (P!=NULL) { delete(P); (P)=NULL; } // " // Global data // cIocpUdpSend Constructor cIocpUdpSend::cIocpUdpSend(void) { // Critical Section¸¦ ÃʱâÈ­. InitializeCriticalSectionAndSpinCount( &mCs, 2000 ); mIocp = NULL; mSocket = INVALID_SOCKET; mIocpWorkerThreadNumber = 0; mIoContextPool = NULL; mIocpBackendThread = NULL; mIoContextFrontBuffer = NULL; mIoContextBackBuffer = NULL; mRunServer = false; mEndServer = false; } // ~cIocpUdpSend Destructor. cIocpUdpSend::~cIocpUdpSend(void) { // Á¾·á Shutdown( ); // Critical Section¸¦ ÇØÁ¦. DeleteCriticalSection( &mCs ); } // Initialize Method bool cIocpUdpSend::Initialize(unsigned short numWorkerThreads, unsigned int bufferLength) { // CreateIoCompletionPort¸¦ ÀÌ¿ëÇÏ¿© completion port ¿ÀºêÁ§Æ®¸¦ »ý¼ºÇÒ ¶§ ÀϹÝÀûÀ¸·Î ½Å°æ½á¾ß ÇÒ ÆÄ // ¶ó¸ÞÅÍ´Â NumberOfConcurrentThreads ÀÌ´Ù. óÀ½ 3°³ÀÇ ÆÄ¶ó¸ÞÅÍ´Â °ªÀ» ¼³Á¤ÇÏÁö ¾Ê´Â´Ù. // // NumberOfConcurrentThreads´Â ÇöÀç completion port¿¡ ´ëÇÏ¿© ¸î°³ÀÇ ½º·¹µå¸¦ ½ÇÇàÇÒ °ÍÀΰ¡¸¦ °áÁ¤ÇÑ // ´Ù. °¡Àå ÀÌ»óÀûÀÎ °æ¿ì´Â ½º·¹µåÀÇ context switchingÀ» ¹æÁöÇϱâ À§ÇÏ¿© ¼­ºñ½º¿¡ ´ëÇÏ¿© ÇϳªÀÇ ÇÁ·Î // ¼¼¼­´ç ÇϳªÀÇ ½º·¹µå¸¸ µ¿ÀÛÇϵµ·Ï ÇÏ´Â °ÍÀÌ´Ù. NumberOfConcurrentThreadsÀÇ °ªÀ» 0À¸·Î ¼³Á¤ÇÏ¸é ½Ã // ½ºÅÛÀº ÇÁ·Î¼¼¼­¿¡ ÀûÇÕÇÑ ½º·¹µå °³¼ö¸¦ ÇÒ´çÇÏ¿© »ç¿ëÇÑ´Ù. // (ÇϳªÀÇ CPU¿¡ ´ëÇÏ¿© IOCP Çϳª´ç ÇϳªÀÇ ½º·¹µå¸¦ »ý¼º) mIocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 ); if ( mIocp == NULL ) return false; // CreateIoCompletionPort ¿ÀºêÁ§Æ®¸¦ »ý¼ºÇÑ µÚ¿¡ ÀÛ¾÷½º·¹µå¸¦ »ý¼ºÇÏ°í ¼ÒÄÏÇÚµéÀ» ÁöÁ¤ÇÒ ¼ö Àִµ¥, // À̶§ »ý¼ºÇÏ´Â ÀÛ¾÷½º·¹µå´Â, ½º·¹µå°¡ ºí·ÏµÉ °¡´É¼ºÀÌ ÀÖ´Ù¸é, NumberOfConcurrentThreads ÀÇ °¹¼öº¸ // ´Ù ¸¹ÀÌ »ý¼ºÇÏ´Â °ÍÀÌ ÁÁ´Ù. NumberOfConcurrentThreads¸¸Å­ ½º·¹µå°¡ Ç×»ó ½ÇÇàµÇ°Ô ÇÒ ¼ö Àֱ⠶§¹® // ¿¡, ºí·ÏµÈ ½º·¹µå°¡ ÀÖ´Ù¸é ´Ù¸¥ ½º·¹µå°¡ ´ë½Å µ¿ÀÛÇÒ °ÍÀ̱⠶§¹®ÀÌ´Ù. // SYSTEM_INFO si; DWORD threadId; // ½Ã½ºÅÛ¿¡ ¸î °³ÀÇ ÇÁ·Î¼¼¼­°¡ ÀÖ´ÂÁö È®ÀÎÇÑ´Ù. GetSystemInfo( &si ); mIocpWorkerThreadNumber = min( si.dwNumberOfProcessors * numWorkerThreads, IOCP_MAX_WORKER_THREAD ); for ( int i = 0; i < mIocpWorkerThreadNumber; i++ ) { mIocpWorkerThread[i] = CreateThread( NULL, 0, WorkerThreadStartingPoint, (LPVOID)this, 0, &threadId ); if ( mIocpWorkerThread[i] == NULL ) return false; } // I/O Context Pool¸¦ »ý¼ºÇÑ´Ù. // 0xffff: UDP(65,535) ÃÖ´ë ÆÐŶ »çÀÌÁî // 0x0044: IPv4(60) + UDP(8) ÀÇ ÇØ´õ »çÀÌÁî mIoContextPool = new cIoContextPool( bufferLength ); if ( mIoContextPool == NULL ) return false; // I/O Context Buffer¸¦ »ý¼ºÇÑ´Ù. mIoContextFrontBuffer = (IoContextBuffer*)GlobalAlloc( GPTR, sizeof(IoContextBuffer) ); mIoContextBackBuffer = (IoContextBuffer*)GlobalAlloc( GPTR, sizeof(IoContextBuffer) ); if ( !mIoContextFrontBuffer && !mIoContextBackBuffer ) return false; // connectÇÒ Å¬¶óÀÌ¾ðÆ® ¼ÒÄÏÀ» »ý¼ºÇÑ´Ù. mSocket = WSASocket( AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED ); if ( mSocket == INVALID_SOCKET ) return false; // Disable send buffering on the socket. Setting SO_SNDBUF to 0 causes winsock to stop bufferring // sends and perform sends directly from our buffers, thereby reducing CPU usage. // // SO_SNDBUF¸¦ 0À¸·ÎÇÏ¿© ¼Û½Å¹öÆÛ¸¦ »ç¿ëÇÏÁö ¾Ê´Â´Ù. ¼Û½Å¹öÆÛ°¡ 0ÀÌ ¾Æ´Ï¸é ºÒÇÊ¿äÇÏ°Ô ÀÌ ¼Û½Å¹öÆÛ¸¦ // °ÅÃļ­ overlapped I/O¸¦ È£ÃâÇßÀ» ¶§ Á¦°øÇÑ ¹öÆÛ·Î º¹»ç°¡ ÀÌ·ç¾îÁø´Ù. // ¼Û½Å¹öÆÛ °ø°£ÀÌ ¾øÀ¸¸é overlapped I/O¸¦ È£ÃâÇÞÀ» ¶§ Á¦°øÇÑ ¹öÆÛ·Î ½Ã½ºÅÛ¿¡¼­ Á÷Á¢ µ¥ÀÌÅͰ¡ º¹»çµÈ // ´Ù. // int zero = DEF_UDP_PACKET_SIZE; if ( setsockopt( mSocket, SOL_SOCKET, SO_SNDBUF, (char*)&zero, sizeof(zero) ) == SOCKET_ERROR ) { closesocket( mSocket ); return false; } // ºê·Îµåij½ºÆ® µ¥ÀÌÅ͸¦ ¼Û½ÅÇϱâ À§Çؼ­´Â SO_BROADCAST¸¦ ¼³Á¤ÇØ¾ß ÇÑ´Ù. int broadcast = TRUE; if ( setsockopt( mSocket, SOL_SOCKET, SO_BROADCAST, (char*)&broadcast, sizeof(broadcast) ) == SOCKET_ERROR ) { closesocket( mSocket ); return false; } // »ý¼ºµÈ ¼ÒÄÏ ÇÚµéÀ» completion port¿¡ ÇÒ´çÇÑ´Ù. if ( CreateIoCompletionPort( (HANDLE)mSocket, mIocp, (ULONG_PTR)NULL, 0 ) == NULL ) { closesocket( mSocket ); return false; } mEndServer = false; mRunServer = true; // Backend Thread¸¦ »ý¼ºÇÑ´Ù. mIocpBackendThread = CreateThread( NULL, 0, BackendThreadStartingPoint, (LPVOID)this, 0, &threadId ); if ( mIocpBackendThread == NULL ) return false; return true; } // Shutdown Method void cIocpUdpSend::Shutdown(DWORD maxWait) { // ¾ÈÀüÇÏ°Ô Á¾·áÇÑ´Ù. mEndServer = true; mRunServer = false; // Backend Thread°¡ Á¾·áµÉ¶§±îÁö ´ë±â if ( mIocpBackendThread != NULL ) { WaitForSingleObjectEx( mIocpBackendThread, maxWait, TRUE ); CloseHandle( mIocpBackendThread ); mIocpBackendThread = NULL; } // ¾ÈÀüÇÏ°Ô Á¾·áÇÑ´Ù. SOCKET sockTemp = mSocket;; mSocket = INVALID_SOCKET; if ( sockTemp != INVALID_SOCKET ) { closesocket( sockTemp ); } // Cause worker threads to exit if ( mIocp != NULL ) { for ( int i = 0; i < mIocpWorkerThreadNumber; i++ ) { PostQueuedCompletionStatus( mIocp, 0, 0, IOCP_SHUTDOWN ); } } // Make sure worker threads exits. for ( int i = 0; i < mIocpWorkerThreadNumber; i++ ) { if ( WaitForSingleObject( mIocpWorkerThread[i], 60000 ) != WAIT_OBJECT_0 ) { DWORD exitCode; GetExitCodeThread( mIocpWorkerThread[i], &exitCode); if ( exitCode == STILL_ACTIVE ) { TerminateThread( mIocpWorkerThread[i], 0 ); } } CloseHandle( mIocpWorkerThread[i] ); mIocpWorkerThread[i] = NULL; } // Overlapped I/O Model Pool¹× Socket Context Pool¸¦ ÇØÁ¦ÇÑ´Ù. if ( mIoContextFrontBuffer ) { GlobalFree( mIoContextFrontBuffer ); mIoContextFrontBuffer = NULL; } if ( mIoContextBackBuffer ) { GlobalFree( mIoContextBackBuffer ); mIoContextBackBuffer = NULL; } // Overlapped I/O Model Pool¸¦ ÇØÁ¦ÇÑ´Ù. SafeDelete( mIoContextPool ); // completion port¸¦ ÇØÁ¦ÇÑ´Ù. SafeCloseHandle( mIocp ); } // GetIoPoolUsage Method void cIocpUdpSend::GetIoPoolUsage(SIZE_T& quotaPagedPoolUsage, SIZE_T& quotaNonePagedPoolUsage, SIZE_T& workingSetSize) { if ( mIoContextPool != NULL ) { mIoContextPool->GetProcessMemoryInfo( quotaPagedPoolUsage, quotaNonePagedPoolUsage, workingSetSize ); } } // QueueRequest Method BOOL cIocpUdpSend::QueueRequest(ULONG_PTR completionKey, LPOVERLAPPED overlapped, DWORD bytesTransferred) { return PostQueuedCompletionStatus( mIocp, bytesTransferred, completionKey, overlapped ); } // SendExec Method bool cIocpUdpSend::SendExec(PerIoContext* perIoContext) { WSABUF wsaBuf; DWORD sendBytes = 0; DWORD flags = 0; int addrlen = sizeof(SOCKADDR_IN); int retcode; if ( perIoContext->offset > perIoContext->length ) { PostQueuedCompletionStatus( mIocp, 0, (ULONG_PTR)NULL, (LPOVERLAPPED)perIoContext ); return false; } wsaBuf.len = perIoContext->offset; wsaBuf.buf = perIoContext->buffer; retcode = WSASendTo( perIoContext->socket, &wsaBuf, 1, &sendBytes, flags, (SOCKADDR*)&perIoContext->addr, addrlen, &(perIoContext->wsaOverlapped), NULL ); if ( (retcode == SOCKET_ERROR) && (WSAGetLastError() != WSA_IO_PENDING) ) { PostQueuedCompletionStatus( mIocp, 0, (ULONG_PTR)NULL, (LPOVERLAPPED)perIoContext ); return false; } return true; } // SendPost Method bool cIocpUdpSend::SendPost(PerIoContext* perIoContext) { cCSLock lock( &mCs ); PerIoContext** buffer = mIoContextBackBuffer->buffer; long& offset = mIoContextBackBuffer->offset; if ( offset < MAX_IO_CONTEXT_BUFFER_LEN ) { buffer[ offset ] = perIoContext; offset++; return true; } mIoContextPool->ReleaseIoContext( perIoContext ); return false; } /*-- IoContextPresent Method */ void cIocpUdpSend::IoContextPresent( ) { IoContextBuffer* temp; // Double Buffering ó¸®. CSBlock( &mCs ) { temp = mIoContextBackBuffer; mIoContextBackBuffer = mIoContextFrontBuffer; mIoContextFrontBuffer = temp; } // ¹ß½ÅµÈ µ¥ÀÌÅÍ Ã³¸®. PerIoContext** buffer = mIoContextFrontBuffer->buffer; long& offset = mIoContextFrontBuffer->offset; while ( offset > 0 ) { SendExec( (*buffer) ); (*buffer) = NULL; buffer++; offset--; Sleep( 5 ); } } // SendComplete Method bool cIocpUdpSend::SendComplete(ULONG_PTR /*completionKey*/, PerIoContext* perIoContext, DWORD /*bytesTransferred*/) { // »ç¿ëÀÌ ¿Ï·áµÈ I/O Context´Â ȸ¼öÇÑ´Ù. mIoContextPool->ReleaseIoContext( perIoContext ); return true; } // CallbackComplete Method bool cIocpUdpSend::CallbackComplete(ULONG_PTR /*completionKey*/, PerIoContext* perIoContext, DWORD /*bytesTransferred*/) { // »ç¿ëÀÌ ¿Ï·áµÈ I/O Context´Â ȸ¼öÇÑ´Ù. mIoContextPool->ReleaseIoContext( perIoContext ); return true; } // WorkerThread Method DWORD cIocpUdpSend::WorkerThread( ) { DWORD bytesTransferred; ULONG_PTR completionKey; OVERLAPPED* overlapped; BOOL retValue; PerIoContext* perIoContext; while ( true ) { // completion port¿¡ ÇÒ´çµÈ ¸ðµç ¼ÒÄÏÀÇ I/O ¿Ï·á¸¦ ±â´Ù¸°´Ù. retValue = GetQueuedCompletionStatus( mIocp, &bytesTransferred, &completionKey, &overlapped, INFINITE ); // Shutdown if ( overlapped == IOCP_SHUTDOWN ) return 0; perIoContext = (PerIoContext*)overlapped; if ( retValue == FALSE || bytesTransferred == 0 ) { if ( perIoContext != NULL ) { // »ç¿ëÀÌ ¿Ï·áµÈ I/O Context´Â ȸ¼öÇÑ´Ù. perIoContext->offset = max( perIoContext->offset, bytesTransferred ); mIoContextPool->ReleaseIoContext( perIoContext ); } continue; } switch ( perIoContext->requestType ) { case IOCP_REQUEST_WRITE: SendComplete( completionKey, perIoContext, bytesTransferred ); break; // Send ¿Ï·á Çڵ鷯 ÇÔ¼ö. case IOCP_REQUEST_CALLBACK: CallbackComplete( completionKey, perIoContext, bytesTransferred ); break; // Callback ¿Ï·á Çڵ鷯 ÇÔ¼ö. } } return 0; } // BackendThread Method DWORD cIocpUdpSend::BackendThread( ) { while ( true ) { // ¼­¹öÁ¾·á, Thread¸¦ ³ª°£´Ù. if ( mEndServer == true ) break; IoContextPresent( ); Sleep( 50 ); } IoContextPresent( ); return 0; } // WorkerThreadStartingPoint Method ( Worker Thread ½ÃÀÛ Æ÷ÀÎÆ® ) DWORD cIocpUdpSend::WorkerThreadStartingPoint(void* ptr) { cIocpUdpSend* iocpUdpSend = (cIocpUdpSend*)ptr; return iocpUdpSend->WorkerThread( ); } // BackendThreadStartingPoint Method ( Backend Thread ½ÃÀÛ Æ÷ÀÎÆ® ) DWORD cIocpUdpSend::BackendThreadStartingPoint(void* ptr) { cIocpUdpSend* iocpUdpSend = (cIocpUdpSend*)ptr; return iocpUdpSend->BackendThread( ); }