// Include files #include "iocpudprecv.h" // Local definitions #pragma warning( disable: 4127 ) #define SafeCloseHandle(P) if (P!=NULL) { CloseHandle(P); (P)=NULL; } // Çڵ鸦 ¾ÈÀüÇÏ°Ô ÇØÁ¦ #define SafeDelete(P) if (P!=NULL) { delete(P); (P)=NULL; } // " // Global data // IocpUdpRecv Constructor IocpUdpRecv::IocpUdpRecv(void) { // Critical Section¸¦ ÃʱâÈ­. InitializeCriticalSectionAndSpinCount( &m_cs, 2000 ); m_iocp = NULL; m_socket = INVALID_SOCKET; m_iocpWorkerThreadNumber = 0; m_ioContextPool = NULL; m_iocpBackendThread = NULL; m_ioContextFrontBuffer = NULL; m_ioContextBackBuffer = NULL; m_runServer = false; m_endServer = false; } // ~IocpUdpRecv Destructor. IocpUdpRecv::~IocpUdpRecv(void) { // Á¾·á Shutdown( ); // Critical Section¸¦ ÇØÁ¦. DeleteCriticalSection( &m_cs ); } // Initialize Method bool IocpUdpRecv::Initialize(char* ipAddr, unsigned short port, unsigned short numWorkerThreads, unsigned int bufferLength) { // CreateIoCompletionPort¸¦ ÀÌ¿ëÇÏ¿© completion port ¿ÀºêÁ§Æ®¸¦ »ý¼ºÇÒ ¶§ ÀϹÝÀûÀ¸·Î ½Å°æ½á¾ß ÇÒ ÆÄ // ¶ó¸ÞÅÍ´Â NumberOfConcurrentThreads ÀÌ´Ù. óÀ½ 3°³ÀÇ ÆÄ¶ó¸ÞÅÍ´Â °ªÀ» ¼³Á¤ÇÏÁö ¾Ê´Â´Ù. // NumberOfConcurrentThreads´Â ÇöÀç completion port¿¡ ´ëÇÏ¿© ¸î°³ÀÇ ½º·¹µå¸¦ ½ÇÇàÇÒ °ÍÀΰ¡¸¦ °áÁ¤ÇÑ // ´Ù. °¡Àå ÀÌ»óÀûÀÎ °æ¿ì´Â ½º·¹µåÀÇ context switchingÀ» ¹æÁöÇϱâ À§ÇÏ¿© ¼­ºñ½º¿¡ ´ëÇÏ¿© ÇϳªÀÇ ÇÁ·Î // ¼¼¼­´ç ÇϳªÀÇ ½º·¹µå¸¸ µ¿ÀÛÇϵµ·Ï ÇÏ´Â °ÍÀÌ´Ù. NumberOfConcurrentThreadsÀÇ °ªÀ» 0À¸·Î ¼³Á¤ÇÏ¸é ½Ã // ½ºÅÛÀº ÇÁ·Î¼¼¼­¿¡ ÀûÇÕÇÑ ½º·¹µå °³¼ö¸¦ ÇÒ´çÇÏ¿© »ç¿ëÇÑ´Ù. // (ÇϳªÀÇ CPU¿¡ ´ëÇÏ¿© IOCP Çϳª´ç ÇϳªÀÇ ½º·¹µå¸¦ »ý¼º) m_iocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 ); if ( m_iocp == NULL ) return false; // CreateIoCompletionPort ¿ÀºêÁ§Æ®¸¦ »ý¼ºÇÑ µÚ¿¡ ÀÛ¾÷½º·¹µå¸¦ »ý¼ºÇÏ°í ¼ÒÄÏÇÚµéÀ» ÁöÁ¤ÇÒ ¼ö Àִµ¥, // À̶§ »ý¼ºÇÏ´Â ÀÛ¾÷½º·¹µå´Â, ½º·¹µå°¡ ºí·ÏµÉ °¡´É¼ºÀÌ ÀÖ´Ù¸é, NumberOfConcurrentThreads ÀÇ °¹¼öº¸ // ´Ù ¸¹ÀÌ »ý¼ºÇÏ´Â °ÍÀÌ ÁÁ´Ù. NumberOfConcurrentThreads¸¸Å­ ½º·¹µå°¡ Ç×»ó ½ÇÇàµÇ°Ô ÇÒ ¼ö Àֱ⠶§¹® // ¿¡, ºí·ÏµÈ ½º·¹µå°¡ ÀÖ´Ù¸é ´Ù¸¥ ½º·¹µå°¡ ´ë½Å µ¿ÀÛÇÒ °ÍÀ̱⠶§¹®ÀÌ´Ù. SYSTEM_INFO si; DWORD threadId; // ½Ã½ºÅÛ¿¡ ¸î °³ÀÇ ÇÁ·Î¼¼¼­°¡ ÀÖ´ÂÁö È®ÀÎÇÑ´Ù. GetSystemInfo( &si ); m_iocpWorkerThreadNumber = min( si.dwNumberOfProcessors * numWorkerThreads, IOCP_MAX_WORKER_THREAD ); for ( int i = 0; i < m_iocpWorkerThreadNumber; i++ ) { m_iocpWorkerThread[i] = CreateThread( NULL, 0, WorkerThreadStartingPoint, (LPVOID)this, 0, &threadId ); if ( m_iocpWorkerThread[i] == NULL ) return false; } // I/O Context Pool¸¦ »ý¼ºÇÑ´Ù. // 0xffff: UDP(65,535) ÃÖ´ë ÆÐŶ »çÀÌÁî // 0x0044: IPv4(60) + UDP(8) ÀÇ ÇØ´õ »çÀÌÁî m_ioContextPool = new cIoContextPool( bufferLength ); if ( m_ioContextPool == NULL ) return false; // I/O Context Buffer¸¦ »ý¼ºÇÑ´Ù. m_ioContextFrontBuffer = (IoContextBuffer*)GlobalAlloc( GPTR, sizeof(IoContextBuffer) ); m_ioContextBackBuffer = (IoContextBuffer*)GlobalAlloc( GPTR, sizeof(IoContextBuffer) ); if ( !m_ioContextFrontBuffer && !m_ioContextBackBuffer ) return false; // IPv4 ÁÖ¼Ò¸¦ ¸¸µç´Ù. ZeroMemory( (void*)&m_addr, sizeof(SOCKADDR_IN) ); m_addr.sin_family = AF_INET; m_addr.sin_port = htons( port ); m_addr.sin_addr.s_addr = inet_addr( ipAddr ); if ( m_addr.sin_addr.s_addr == INADDR_NONE ) { PHOSTENT phe; // the host name for the server is not in dot format, therefore try it just as a string if ( (phe = gethostbyname( ipAddr )) != NULL ) CopyMemory( &m_addr.sin_addr, phe->h_addr_list[0], phe->h_length ); else return false; } // connectÇÒ Å¬¶óÀÌ¾ðÆ® ¼ÒÄÏÀ» »ý¼ºÇÑ´Ù. m_socket = WSASocket( AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED ); if ( m_socket == INVALID_SOCKET ) return false; if ( bind( m_socket, (LPSOCKADDR)&m_addr, sizeof(SOCKADDR_IN) ) == SOCKET_ERROR ) return false; // Disable receive buffering on the socket. Setting SO_RCVBUF to 0 causes winsock to stop bufferring // receive and perform receives directly from our buffers, thereby reducing CPU usage. // SO_RCVBUF¸¦ 0À¸·ÎÇÏ¿© ¼ö½Å¹öÆÛ¸¦ »ç¿ëÇÏÁö ¾Ê´Â´Ù. ¼ö½Å¹öÆÛ°¡ 0ÀÌ ¾Æ´Ï¸é ºÒÇÊ¿äÇÏ°Ô ÀÌ ¼ö½Å¹öÆÛ¸¦ // °ÅÃļ­ overlapped I/O¸¦ È£ÃâÇßÀ» ¶§ Á¦°øÇÑ ¹öÆÛ·Î º¹»ç°¡ ÀÌ·ç¾îÁø´Ù. // ¼ö½Å¹öÆÛ °ø°£ÀÌ ¾øÀ¸¸é overlapped I/O¸¦ È£ÃâÇßÀ» ¶§ Á¦°øÇÑ ¹öÆÛ·Î ½Ã½ºÅÛ¿¡¼­ Á÷Á¢ µ¥ÀÌÅͰ¡ º¹»çµÈ // ´Ù. int zero = DEF_UDP_PACKET_SIZE; if ( setsockopt( m_socket, SOL_SOCKET, SO_RCVBUF, (char*)&zero, sizeof(zero)) == SOCKET_ERROR ) { closesocket( m_socket ); return false; } // »ý¼ºµÈ ¼ÒÄÏ ÇÚµéÀ» completion port¿¡ ÇÒ´çÇÑ´Ù. if ( CreateIoCompletionPort( (HANDLE)m_socket, m_iocp, (ULONG_PTR)NULL, 0 ) == NULL ) { closesocket( m_socket ); return false; } // ¼ö½ÅÀ» À§ÇØ I/O Context¸¦ ÁغñÇÑ´Ù. PerIoContext* perIoContext = m_ioContextPool->GetIoContext( m_socket, IOCP_REQUEST_READ ); RecvPost( NULL, perIoContext ); m_endServer = false; m_runServer = true; // Backend Thread¸¦ »ý¼ºÇÑ´Ù. m_iocpBackendThread = CreateThread( NULL, 0, BackendThreadStartingPoint, (LPVOID)this, 0, &threadId ); if ( m_iocpBackendThread == NULL ) return false; return true; } // Shutdown Method void IocpUdpRecv::Shutdown(DWORD maxWait) { // ¾ÈÀüÇÏ°Ô Á¾·áÇÑ´Ù. m_endServer = true; m_runServer = false; // Backend Thread°¡ Á¾·áµÉ¶§±îÁö ´ë±â if ( m_iocpBackendThread != NULL ) { WaitForSingleObjectEx( m_iocpBackendThread, maxWait, TRUE ); CloseHandle( m_iocpBackendThread ); m_iocpBackendThread = NULL; } // ¾ÈÀüÇÏ°Ô Á¾·áÇÑ´Ù. SOCKET sockTemp = m_socket; if ( sockTemp != INVALID_SOCKET ) { closesocket( sockTemp ); } // Cause worker threads to exit if ( m_iocp != NULL ) { for ( int i = 0; i < m_iocpWorkerThreadNumber; i++ ) { PostQueuedCompletionStatus( m_iocp, 0, 0, IOCP_SHUTDOWN ); } } // Make sure worker threads exits. for ( int i = 0; i < m_iocpWorkerThreadNumber; i++ ) { if ( WaitForSingleObject( m_iocpWorkerThread[ i ], 60000 ) != WAIT_OBJECT_0 ) { DWORD exitCode; GetExitCodeThread( m_iocpWorkerThread[ i ], &exitCode); if ( exitCode == STILL_ACTIVE ) { TerminateThread( m_iocpWorkerThread[ i ], 0 ); } } CloseHandle( m_iocpWorkerThread[ i ] ); m_iocpWorkerThread[i] = NULL; } // ¼ÒÄÏ ÃʱâÈ­. m_socket = INVALID_SOCKET; // Overlapped I/O Model Pool¹× Socket Context Pool¸¦ ÇØÁ¦ÇÑ´Ù. if ( m_ioContextFrontBuffer ) { GlobalFree( m_ioContextFrontBuffer ); m_ioContextFrontBuffer = NULL; } if ( m_ioContextBackBuffer ) { GlobalFree( m_ioContextBackBuffer ); m_ioContextBackBuffer = NULL; } // Overlapped I/O Model Pool¸¦ ÇØÁ¦ÇÑ´Ù. SafeDelete( m_ioContextPool ); // completion port¸¦ ÇØÁ¦ÇÑ´Ù. SafeCloseHandle( m_iocp ); } // GetIoPoolUsage Method void IocpUdpRecv::GetIoPoolUsage(SIZE_T& quotaPagedPoolUsage, SIZE_T& quotaNonePagedPoolUsage, SIZE_T& workingSetSize) { if ( m_ioContextPool != NULL ) { m_ioContextPool->GetProcessMemoryInfo( quotaPagedPoolUsage, quotaNonePagedPoolUsage, workingSetSize ); } } // QueueRequest Method BOOL IocpUdpRecv::QueueRequest(ULONG_PTR completionKey, LPOVERLAPPED overlapped, DWORD bytesTransferred) { return PostQueuedCompletionStatus( m_iocp, bytesTransferred, completionKey, overlapped ); } // RecvPost Method bool IocpUdpRecv::RecvPost(ULONG_PTR completionKey, PerIoContext* perIoContext) { WSABUF wsaBuf; DWORD recvBytes = 0; DWORD flags = 0; int addrlen = sizeof(SOCKADDR_IN); int retcode; wsaBuf.len = (perIoContext->length - perIoContext->offset); wsaBuf.buf = (perIoContext->buffer + perIoContext->offset); retcode = WSARecvFrom( perIoContext->socket, &wsaBuf, 1, &recvBytes, &flags, (SOCKADDR*)&perIoContext->addr, &addrlen, &(perIoContext->wsaOverlapped), NULL ); if ( (retcode == SOCKET_ERROR) && (WSAGetLastError() != WSA_IO_PENDING) ) { PostQueuedCompletionStatus( m_iocp, 0, (ULONG_PTR)completionKey, (LPOVERLAPPED)perIoContext ); return false; } return true; } // CallbackPost Method bool IocpUdpRecv::CallbackPost(ULONG_PTR completionKey, PerIoContext* perIoContext) { int retcode; retcode = PostQueuedCompletionStatus( m_iocp, perIoContext->offset, completionKey, (LPOVERLAPPED)perIoContext ); if ( retcode == 0 ) { return false; } return true;; } // RecvComplete Method (Default Echo Server) bool IocpUdpRecv::RecvComplete(ULONG_PTR completionKey, PerIoContext* perIoContext, DWORD bytesTransferred) { cCSLock lock( &m_cs ); // »ç¿ëÀÌ ¿Ï·áµÈ I/O Context´Â ȸ¼öÇÑ´Ù. perIoContext->offset = bytesTransferred; m_ioContextPool->ReleaseIoContext( perIoContext ); // ¼ö½ÅÀ» À§ÇØ I/O Context¸¦ ÁغñÇÑ´Ù. perIoContext = m_ioContextPool->GetIoContext( m_socket, IOCP_REQUEST_READ ); return RecvPost( completionKey, perIoContext ); } // CallbackComplete Method bool IocpUdpRecv::CallbackComplete(ULONG_PTR /*completionKey*/, PerIoContext* perIoContext, DWORD /*bytesTransferred*/) { // »ç¿ëÀÌ ¿Ï·áµÈ I/O Context´Â ȸ¼öÇÑ´Ù. m_ioContextPool->ReleaseIoContext( perIoContext ); return true; } // WorkerThread Method DWORD IocpUdpRecv::WorkerThread( ) { DWORD bytesTransferred; ULONG_PTR completionKey; OVERLAPPED* overlapped; BOOL retValue; PerIoContext* perIoContext; while ( true ) { // completion port¿¡ ÇÒ´çµÈ ¸ðµç ¼ÒÄÏÀÇ I/O ¿Ï·á¸¦ ±â´Ù¸°´Ù. retValue = GetQueuedCompletionStatus( m_iocp, &bytesTransferred, &completionKey, &overlapped, INFINITE ); // Shutdown if ( overlapped == IOCP_SHUTDOWN ) return 0; perIoContext = (PerIoContext*)overlapped; if ( retValue == FALSE || bytesTransferred == 0 ) { // »ç¿ëÀÌ ¿Ï·áµÈ I/O Context´Â ȸ¼öÇÑ´Ù. perIoContext->offset = max( perIoContext->offset, bytesTransferred ); m_ioContextPool->ReleaseIoContext( perIoContext ); continue; } switch ( perIoContext->requestType ) { case IOCP_REQUEST_READ: RecvComplete( completionKey, perIoContext, bytesTransferred ); break; // Receive ¿Ï·á Çڵ鷯 ÇÔ¼ö. case IOCP_REQUEST_CALLBACK: CallbackComplete( completionKey, perIoContext, bytesTransferred ); break; // Callback ¿Ï·á Çڵ鷯 ÇÔ¼ö. } } return 0; } // BackendThread Method DWORD IocpUdpRecv::BackendThread( ) { while ( true ) { // ¼­¹öÁ¾·á, Thread¸¦ ³ª°£´Ù. if ( m_endServer == true ) break; Sleep( 50 ); } return 0; } // WorkerThreadStartingPoint Method ( Worker Thread ½ÃÀÛ Æ÷ÀÎÆ® ) DWORD IocpUdpRecv::WorkerThreadStartingPoint(void* ptr) { IocpUdpRecv* iocpUdpRecv = (IocpUdpRecv*)ptr; return iocpUdpRecv->WorkerThread( ); } // BackendThreadStartingPoint Method ( Backend Thread ½ÃÀÛ Æ÷ÀÎÆ® ) DWORD IocpUdpRecv::BackendThreadStartingPoint(void* ptr) { IocpUdpRecv* iocpUdpRecv = (IocpUdpRecv*)ptr; return iocpUdpRecv->BackendThread( ); }