/** * @file dsiocp.h * @brief iocp通信模块 * Copyright(c) 2007,上海第九城市游戏研发部 * All rights reserved * 文件名称: dsiocp.h * 摘 要: 与bufqueue通信,参考:msdn,<> (second edition) * 作 者: dzj * 完成日期: 2008.05.16-2008.05.19 * */ #pragma once #ifndef DS_IOCP_H #define DS_IOCP_H #include "dssocket.h" #include "PkgProc/MsgToPut.h" #include #include #ifdef USE_DSIOCP #include /* _beginthread, _endthread */ typedef unsigned (__stdcall * PTHREAD_START) (void*); #define HXBCBEGINTHREADEX( psa, cbStack, pfnStartAddr\ , pvParam, fdwCreate, pdwThreadID)\ ((HANDLE) _beginthreadex(\ (void*) (psa),\ (unsigned) (cbStack),\ (PTHREAD_START)(pfnStartAddr),\ (void*) (pvParam),\ (unsigned) (fdwCreate),\ (unsigned*) (pdwThreadID))) using namespace std; extern TCache< CDsSocket >* g_poolDsSocket;//socket句柄池; extern TCache< UniqueObj< CDsSocket > >* g_poolUniDssocket;//保存socket唯一对象的对象池; //typedef BOOL (*PFN_ACCEPTEX)( SOCKET sListenSocket, SOCKET sAcceptSocket, PVOID pOutputBuffer // , DWORD dwRcvDataLen, DWORD dwLocalAddrLen, DWORD dwRemoteAddrLen, LPDWORD dwBytesRcved, LPOVERLAPPED pOverlapped ); //class DsMutex //{ //private: // DsMutex(); //public: // explicit DsMutex( CRITICAL_SECTION& inMutex ) : m_refMutex( inMutex ) // { // ::EnterCriticalSection( &m_refMutex ); // } // ~DsMutex() // { // ::LeaveCriticalSection( &m_refMutex ); // } //private: // CRITICAL_SECTION& m_refMutex; //}; struct ConnInfo { char remoteAddr[64]; unsigned short remotePort; CDsSocket* pDsSocket; }; enum IO_OP_TYPE { IOT_READ = 0, IOT_WRITE, IOT_ACCEPT, IOT_DISCONNECT, //post recv时检测到socket断开,通知工作者线程调用OnDisconnect; IOT_NOTI_APP, /*通知应用*/ IOT_INVALID }; struct PerIOData { PoolFlagDefine() { opType = IOT_INVALID; //pMsgToPut = NULL; pDsIOBuf = NULL; } void Init( IO_OP_TYPE inOpType, /*MsgToPut* inMsgToPut*/ DsIOBuf* inDsIOBuf ) { //pMsgToPut = inMsgToPut; pDsIOBuf = inDsIOBuf; opType = inOpType; } OVERLAPPED overLapped; IO_OP_TYPE opType; WSABUF refSendBuf;//指向pMsgToPut,方便WSASend调用; //MsgToPut* pMsgToPut; DsIOBuf* pDsIOBuf; CDsSocket* pAssociSocket; }; extern TCache< PerIOData >* g_poolPerIOData; template < typename T_ToSend/*从此队列取待发送消息*/, typename T_RcvTo/*接收消息的存放队列*/, typename T_Socket/*socket类型*/ > class CDsIocp { public: CDsIocp() { m_bIsStopThread = false; m_pSendQueue = NULL;//发送消息队列; m_pRcvQueue = NULL;//接收消息队列; m_hIocp = INVALID_HANDLE_VALUE; m_hAppIocp = INVALID_HANDLE_VALUE; m_hConnEvent = INVALID_HANDLE_VALUE; m_pToSwitchQueueRead = NEW list; } ~CDsIocp() { StopAllThread(); } public: void Init( int socketPoolSize, T_RcvTo* pReadQueue/*外界从此队列读已收消息*/, T_ToSend* pWriteQueue/*外界向此队列写待发送消息*/ ) { m_pendingAccept.clear(); m_bIsStopThread = false; m_hConnEvent = CreateEvent( NULL, TRUE/*手动重置*/, FALSE/*初始状态未触发*/, NULL ); //InitializeCriticalSection( &m_hConnQueueSection ); //InitializeCriticalSection( &m_hAllInSocketSection );//m_arrAllInSockets互斥区; m_pRcvQueue = pReadQueue;//外界从此队列读已收消息; m_pSendQueue = pWriteQueue;//外界向此队列写待发送消息; m_hIocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );//建完成端口句柄; m_hAppIocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 );//建完成端口句柄; //get m_pfnAcceptEx SOCKET s = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); DWORD dwBytes = 0; GUID GuidAcceptEx = WSAID_ACCEPTEX; WSAIoctl( s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx , sizeof(GuidAcceptEx), &m_pfnAcceptEx, sizeof(m_pfnAcceptEx), &dwBytes, NULL, NULL ); if ( NULL == m_pfnAcceptEx ) { //error; return; } GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS; WSAIoctl( s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidGetAcceptExSockAddrs , sizeof(GuidGetAcceptExSockAddrs), &m_pfnGetAcceptExSockaddrs, sizeof(m_pfnGetAcceptExSockaddrs), &dwBytes, NULL, NULL ); if ( NULL == m_pfnGetAcceptExSockaddrs ) { //error; return; } GUID GuidTransmitFile = WSAID_TRANSMITFILE; WSAIoctl( s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidTransmitFile , sizeof(GuidTransmitFile), &m_pfnTransmitFile, sizeof(m_pfnTransmitFile), &dwBytes, NULL, NULL ); if ( NULL == m_pfnTransmitFile ) { //error; return; } StartValThread();//启动工作; } BOOL AddListenByAddr( const char* listenAddr, unsigned short sListenPort ) { if ( NULL == listenAddr ) { D_ERROR( "AddListenByAddr,输入监听地址空\n" ); return FALSE; } T_Socket* pListenSocket = g_poolDsSocket->Retrieve(); if ( NULL == pListenSocket ) { D_ERROR( "AddListen,没有足够的socket\n" ); return FALSE; } pListenSocket->RsvPendingSessionID();//置使用标记; AssociaIocp( pListenSocket ); //绑定本地地址以及端口号sListenPort; hostent* thisHost = gethostbyname(""); char* ip = inet_ntoa (*(struct in_addr *)(*(thisHost->h_addr_list)));; sockaddr_in serviceaddr; serviceaddr.sin_family = AF_INET; serviceaddr.sin_addr.s_addr = inet_addr( listenAddr );//htonl(INADDR_ANY); serviceaddr.sin_port = htons(sListenPort); pListenSocket->SetSocketAddr( ip, sListenPort ); if ( SOCKET_ERROR == bind( pListenSocket->GetSocket(), (SOCKADDR*) &serviceaddr, sizeof(serviceaddr)) ) { D_ERROR( "监听%s端口%d绑定失败\n", listenAddr, sListenPort ); return FALSE; } if ( listen( pListenSocket->GetSocket(), 0/*SOMAXCONN*/ ) == SOCKET_ERROR) { D_ERROR( "监听端口%d失败\n", sListenPort ); if ( NULL != g_poolDsSocket ) { g_poolDsSocket->Release( pListenSocket ); pListenSocket = NULL; } return FALSE; } CheckAcceptPost( pListenSocket ); return TRUE; } ///增加一个监听端口; BOOL AddListen( unsigned short sListenPort ) { T_Socket* pListenSocket = g_poolDsSocket->Retrieve(); if ( NULL == pListenSocket ) { D_ERROR( "AddListen,没有足够的socket\n" ); return FALSE; } pListenSocket->RsvPendingSessionID();//置使用标记; AssociaIocp( pListenSocket ); //绑定本地地址以及端口号sListenPort; hostent* thisHost = gethostbyname(""); char* ip = inet_ntoa (*(struct in_addr *)(*(thisHost->h_addr_list)));; sockaddr_in serviceaddr; serviceaddr.sin_family = AF_INET; serviceaddr.sin_addr.s_addr = htonl(INADDR_ANY); serviceaddr.sin_port = htons(sListenPort); pListenSocket->SetSocketAddr( ip, sListenPort ); if ( SOCKET_ERROR == bind( pListenSocket->GetSocket(), (SOCKADDR*) &serviceaddr, sizeof(serviceaddr)) ) { D_ERROR( "监听端口%d绑定失败\n", sListenPort ); return FALSE; } if ( listen( pListenSocket->GetSocket(), 0/*SOMAXCONN*/ ) == SOCKET_ERROR) { D_ERROR( "监听端口%d失败\n", sListenPort ); if ( NULL != g_poolDsSocket ) { g_poolDsSocket->Release( pListenSocket ); pListenSocket = NULL; } return FALSE; } CheckAcceptPost( pListenSocket ); return TRUE; } private: ///启动工作者线程与发送消息监听线程; void StartValThread() { //建工作者线程; SYSTEM_INFO systemInfo; GetSystemInfo( &systemInfo ); m_arrThreads = NEW HANDLE[4];//线程句柄; m_arrThreads[0] = HXBCBEGINTHREADEX( NULL, 0, (WorkerThread), this, 0, NULL ); //m_arrThreads[1] = HXBCBEGINTHREADEX( NULL, 0, (WorkerThread), this, 0, NULL ); //建立发送消息监视线程; m_arrThreads[1] = HXBCBEGINTHREADEX( NULL, 0, (SendMonitor), this, 0, NULL ); //建向上层应用传消息的监视线程; m_arrThreads[2] = HXBCBEGINTHREADEX( NULL, 0, (PushMsgToAppThread), this, 0, NULL ); //建连接发起监视线程; m_arrThreads[3] = HXBCBEGINTHREADEX( NULL, 0, (ConnThread), this, 0, NULL ); } ///发停止所有线程命令,并等待各线程停止; void StopAllThread() { //以下先后顺序不要错; Sleep( 2000 );//确保各线程都已启动,否则如果线程没启动之前就置了各停止事件,则线程真正启动之后可能反而等不到停止事件从而陷入永久等待; //先停止监听,以防止在这些端口上等待的accept socket泄漏(关闭各监听端口以及在这些端口上等待的accept端口); for ( set< T_Socket* >::iterator iter=m_arrListenSockets.begin(); iter!=m_arrListenSockets.end(); ++iter ) { if ( NULL != g_poolDsSocket ) { g_poolDsSocket->Release( *iter ); *iter = NULL; } } Sleep( 2000 );//等待工作者线程处理前面这些socket的关闭事件,否则直接到析构函数中delete m_pHandlePool时会导致在监听端口上等待的socket泄漏; //置供各线程检测的关闭标记; m_bIsStopThread = true; //促使等待发送队列的发送线程停止; m_pSendQueue->SetReadEvent( true ); //促使等待连接信号的连接线程停止; SetEvent( m_hConnEvent ); //促使工作者线程停止; PostQueuedCompletionStatus( m_hIocp, 0, 0, 0 ); //促使向上层应用传消息线程停止; PostQueuedCompletionStatus( m_hAppIocp, 0, 0, 0 ); //等待前述各线程结束; ::WaitForMultipleObjects( 4, m_arrThreads, true, INFINITE );//等待所有线程结束; delete [] m_arrThreads; m_arrThreads = NULL; for ( list< ConnInfo* >::iterator iter=m_ConnQueue.begin(); iter!=m_ConnQueue.end(); ++iter ) { if ( NULL != *iter ) { delete *iter; *iter = NULL; } } CloseHandle( m_hConnEvent ); //DeleteCriticalSection( &m_hAllInSocketSection );//m_arrAllInSockets互斥区; //DeleteCriticalSection( &m_hConnQueueSection ); m_arrAllInSockets.clear();//清进入连接列表; if ( !(m_pendingAccept.empty()) ) { for ( set::iterator iter=m_pendingAccept.begin(); iter!=m_pendingAccept.end(); ++iter ) { if ( NULL != g_poolPerIOData ) { if ( NULL != (*iter)->pDsIOBuf )//发生错误时要回收DsIOBuf,因为没有应用去使用并释放它; { g_poolDsIOBuf->Release( (*iter)->pDsIOBuf ); (*iter)->pDsIOBuf = NULL; } g_poolPerIOData->Release( *iter ); *iter = NULL; } } m_pendingAccept.clear(); } if ( NULL != m_pToSwitchQueueRead ) { for ( list::iterator iter=m_pToSwitchQueueRead->begin(); iter!=m_pToSwitchQueueRead->end(); ++iter ) { delete *iter; *iter = NULL; } m_pToSwitchQueueRead->clear(); delete m_pToSwitchQueueRead; m_pToSwitchQueueRead = NULL; } return; } public: void AddPendingAccept( PerIOData* pendAccept ) { //by dzj, 08.08.18,由于从池中分配,因此留最后池释放时释放,m_pendingAccept.insert( pendAccept ); } void DelPendingAccept( PerIOData* pendAccept ) { //by dzj, 08.08.18,由于从池中分配,因此留最后池释放时释放,m_pendingAccept.erase( pendAccept ); } private: set m_pendingAccept; public: ///取连接触发信号句柄 HANDLE& GetConnSignal() { return m_hConnEvent; } ///取连接队列互斥区; //CRITICAL_SECTION& GetConnQueueSection() { return m_hConnQueueSection; } ///取完成端口句柄; HANDLE& GetIocpHandle() { return m_hIocp; } ///取用于向上层应用发数据的IOCP; HANDLE& GetAppIocpHandle() { return m_hAppIocp; } ///取外发消息队列; T_ToSend* GetSendQueue() { return m_pSendQueue; } ///取收包消息存放队列; T_RcvTo* GetRcvQueue() { return m_pRcvQueue; } ///主线程是否发出了停止命令; bool IsStopThread() { return m_bIsStopThread; } public://尝试连接以及尝试发送函数,分别由对应的线程在信号触发时循环调用; ///尝试连接,由连接发起监听线程调用; BOOL TryConn( BOOL& isNeedTryAgain ) { isNeedTryAgain = FALSE; DsMutex tmpMutex( m_hConnQueueSection );//进入连接队列互斥区; ResetEvent( m_hConnEvent );//准备下次接受连接触发; T_Socket* pDsSocket = NULL; ConnInfo* pConnInfo = NULL; for ( list< ConnInfo* >::iterator iter=m_ConnQueue.begin(); iter!=m_ConnQueue.end(); ) { pConnInfo = *iter; pDsSocket = pConnInfo->pDsSocket; //pDsSocket->SetBlocking(); sockaddr_in clientService; clientService.sin_family = AF_INET; clientService.sin_addr.s_addr = inet_addr( pConnInfo->remoteAddr ); if ( INADDR_NONE == clientService.sin_addr.s_addr ) { //无效的IP地址信息; if ( NULL != g_poolDsSocket ) { g_poolDsSocket->Release( pDsSocket );//回收socket; pDsSocket = NULL; } delete pConnInfo; pConnInfo = NULL; m_ConnQueue.erase( iter++ ); continue; } clientService.sin_port = htons( pConnInfo->remotePort ); int connRet = connect( pDsSocket->GetSocket(), (SOCKADDR*) &clientService, sizeof(clientService) ); int nErrno = 0; if ( 0 != connRet ) { nErrno = GetLastError(); } if ( ( 0 == connRet ) || ( WSAEISCONN == nErrno ) ) { //连接已成功。。。 pDsSocket->SetSocketAddr( pConnInfo->remoteAddr, pConnInfo->remotePort );//置socket地址; //关联完成端口, post recv; AssociaIocp( pDsSocket ); //通知应用 MsgToPut* pMsgToPut = g_poolMsgToPut->RetrieveOrCreate(); if ( NULL == pMsgToPut ) { TRY_BEGIN; D_ERROR( "TryConn,g_poolMsgToPut,内存分配失败\n" ); TRY_END; break; } pMsgToPut->pUniqueSocket = g_poolUniDssocket->RetrieveOrCreate(); if ( NULL == pMsgToPut->pUniqueSocket ) { TRY_BEGIN; D_ERROR( "TryConn,g_poolUniDssocket,内存分配失败\n" ); TRY_END; break; } pMsgToPut->pUniqueSocket->Init(pDsSocket); pMsgToPut->nHandleID = pDsSocket->GetSessionID(); pMsgToPut->nSessionID = pDsSocket->GetSessionID(); TRY_BEGIN; D_INFO( "0926dbg,连接成功,对应的sessionid:%d\n", pMsgToPut->nSessionID ); TRY_END; pMsgToPut->nMsgLen = -1;//新连接建立; PushAppMsgToPut( pMsgToPut ); pDsSocket->SetAppNotified();//设已通知应用标记,以便在连接断开时也能通知应用; //在已连接端口上post recv; PostRcv( pDsSocket ); delete pConnInfo; pConnInfo = NULL; m_ConnQueue.erase( iter++ ); } else { if ( nErrno != EWOULDBLOCK ) { D_INFO( "连接出错,以后不再尝试进行该连接:(%s-%d)\n", pConnInfo->remoteAddr, pConnInfo->remotePort); //连接出错,通知应用?? 以后不再尝试进行该连接; if ( NULL != g_poolDsSocket ) { g_poolDsSocket->Release( pDsSocket );//回收socket; pDsSocket = NULL; } delete pConnInfo; pConnInfo = NULL; m_ConnQueue.erase( iter++ ); } else { //阻塞,暂时连不上,下次再连; isNeedTryAgain = TRUE;//告知调用者,还有连接没有成功,需要在稍后重试; //D_INFO( "阻塞,暂时连不上,下次再连:(%s-%d)\n", pConnInfo->remoteAddr, pConnInfo->remotePort); ++iter; }//if ( nerr != EWOULDBLOCK ) }//if conn 成功; }// for 各连接请求; return true; } void PostDisConnect( T_Socket* pInSocket, PerIOData* pPerIOData ) { if ( NULL != pPerIOData ) { if ( NULL != pPerIOData->pDsIOBuf ) { //回收MsgToPut; if ( NULL != g_poolDsIOBuf ) { g_poolDsIOBuf->Release( pPerIOData->pDsIOBuf ); pPerIOData->pDsIOBuf = NULL; } } } if ( NULL == pInSocket ) { return; } if ( pInSocket->IsReused() )//已被重用,无需再发起断开; { return; } if ( pInSocket->IsDisConnecting() ) { //已在断连中,无需再次发起断连; return; } pInSocket->SetIsDisConnecting();//置断开中标记,后续消息不再发送; PerIOData* pPerIoData = g_poolPerIOData->RetrieveOrCreate();//分配一个断连用periodata; if ( NULL == pPerIoData ) { TRY_BEGIN; D_ERROR( "PostDisConnect,NEW PerIOData,内存分配失败\n" ); TRY_END; return; } pPerIoData->opType = IOT_DISCONNECT; memset( &(pPerIoData->overLapped), 0, sizeof( pPerIoData->overLapped ) ); D_INFO( "PostDisConnect on sessionid : %d\n", pInSocket->GetPendingSessionID() ); BOOL isPost = m_pfnTransmitFile( pInSocket->GetSocket()/*待断开端口*/, NULL, 0, 0, &(pPerIoData->overLapped), NULL, TF_DISCONNECT | TF_REUSE_SOCKET ); if ( !isPost ) { int nerr = GetLastError(); if ( ERROR_IO_PENDING != nerr ) { TRY_BEGIN; D_ERROR( "PostDisConnect, nerr = %d\n", nerr ); TRY_END; } } return; } ///从发送队列中取MsgToPut一个个发送出去; void TrySendMsg() { TRY_BEGIN; T_Socket* pDsSocket = NULL; set< T_Socket* > tmpSendQueue; tmpSendQueue.clear(); MsgToPut* pMsgToPut = NULL; m_pToSwitchQueueRead = m_pSendQueue->PreSerialRead( m_pToSwitchQueueRead ); //m_pSendQueue->PreSerialRead(); //m_pToSwitchQueueRead = m_pSendQueue->SwitchOutMsgQueue( m_pToSwitchQueueRead ); if ( m_pToSwitchQueueRead->empty() ) { //队列空; return; } bool isInConnBrocast = false;//是否向所有连入连接广播; for ( list::iterator iter=m_pToSwitchQueueRead->begin(); iter!=m_pToSwitchQueueRead->end(); ++iter ) { pMsgToPut = *iter; if ( NULL == pMsgToPut ) { continue; } //1、检查是否为新建连接请求; //2、检查是否为断开连接请求; //3、执行数据发送; isInConnBrocast = false;//重置广播标记; //1、检查是否为新建连接; if ( NULL == pMsgToPut->pUniqueSocket ) { //对应socket空,必定为新建连接; if( (pMsgToPut->nMsgLen < 0) && (pMsgToPut->nSessionID <= SRV_SID_MAX) ) { //连接发起,取连接目标信息; if ( strlen( pMsgToPut->pMsg ) <= 0 ) { D_ERROR( "TrySendMsg,请求连接的地址无效,放弃连接企图!\n" ); continue; } ACE_INET_Addr remoteAddr( pMsgToPut->pMsg ); if ( remoteAddr.is_any() ) { D_ERROR( "TrySendMsg,连接地址%s错误,放弃连接企图!\n", pMsgToPut->pMsg ); continue; } unsigned short sPort = remoteAddr.get_port_number(); const char* szAddr = remoteAddr.get_host_addr(); //unsigned short sPort = 8801; //const char* szAddr = "127.0.0.1"; AddConnect( szAddr, sPort, pMsgToPut->nSessionID );//将连接请求加到队列中去并激活连接线程; continue; } else if ( GATE_PLAYER_BROCASTSID == pMsgToPut->nSessionID ) { //向所有sessionID>=1000的连接群发消息; isInConnBrocast = true; //不用continue,稍后执行广播; } else { //error; D_ERROR( "TrySendMsg,消息对应socket空,消息长度%d, sessionID:%d!\n", pMsgToPut->nMsgLen, pMsgToPut->nSessionID ); continue; } }//1、检查是否为新建连接;if ( NULL == pMsgToPut->pUniqueSocket ) //2、检查是否为断开连接请求; if ( pMsgToPut->nMsgLen == 0 ) { //断开连接请求; if ( NULL != pMsgToPut->pUniqueSocket ) { pDsSocket = pMsgToPut->pUniqueSocket->GetUniqueObj(); if ( NULL != pDsSocket ) { TRY_BEGIN; D_INFO( "0926dbg,发起主动断连, sessionid:%d\n", pDsSocket->GetPendingSessionID() ); TRY_END; PostDisConnect( pDsSocket, NULL ); } else { //socket已经断开 } } else { D_ERROR( "断开连接请求中没有指定pUniqueSocket\n" ); } continue; } //3、真正发送数据; if ( !isInConnBrocast ) //是否向所有连入连接广播消息; { //单socket发送数据; //UniqueSocket的GetUniqueObj保证发送给目标对象,Socket内部的RsvSessionID保证目标对象没有被重用; pDsSocket = pMsgToPut->pUniqueSocket->GetUniqueObj(); if ( NULL == pDsSocket ) { //对应的socket已经无效,忽略此发送,以及此发送上的所有待发缓冲; continue; } pDsSocket->AddSendBuf( pMsgToPut->pMsg, pMsgToPut->nMsgLen, pMsgToPut->pUniqueSocket->GetExpectUID() );//暂存待发送数据以备一次性发出去,这样虽然在一定程度上打乱了顺序,但对于单个socket而言顺序仍然是保证的; tmpSendQueue.insert( pDsSocket );//记录此pDsSocket有数据待发送; } else { //广播发送给所有连入连接; DsMutex tmpMutex( m_hAllInSocketSection );//进入m_hAllInSocketSection互斥区; for ( set< T_Socket* >::iterator inconniter=m_arrAllInSockets.begin(); inconniter!=m_arrAllInSockets.end(); ) { pDsSocket = *inconniter; if ( NULL == pDsSocket ) { inconniter = m_arrAllInSockets.erase( inconniter ); continue; } if ( pDsSocket->IsInConn() ) { //只要该连接当前为连入连接,则向该连接发消息,而不管在此过程中是否旧连接已被重用; pDsSocket->AddSendBuf( pMsgToPut->pMsg, pMsgToPut->nMsgLen, pDsSocket->GetUID()/*只要是连入连接就可以,没有确定的期望socket*/ );//暂存待发送数据以备一次性发出去,这样虽然在一定程度上打乱了顺序,但对于单个socket而言顺序仍然是保证的; tmpSendQueue.insert( pDsSocket );//记录此pDsSocket有数据待发送; ++inconniter; } else { //连接已被回收或重用,已经不再对应一个连入连接; inconniter = m_arrAllInSockets.erase( inconniter ); } } } ////对应的socket仍然有效,执行发送, pMsgToPut传给PerIOData; //if ( ( 0x1028 != pMsgToPut->msgcmd ) // && ( pDsSocket->GetSessionID() > 1000 ) // ) //{ // D_DEBUG( "向socket%d发送消息%x\n", pDsSocket->GetSessionID(), pMsgToPut->msgcmd ); //} }//while ( pMsgToPut != NULL ) if ( NULL != g_poolMsgToPut ) { for ( list::iterator iter=m_pToSwitchQueueRead->begin(); iter!=m_pToSwitchQueueRead->end(); ++iter ) { pMsgToPut = *iter; if ( NULL != pMsgToPut ) { g_poolMsgToPut->Release( pMsgToPut ); } } } m_pToSwitchQueueRead->clear();//清,准备下次读; //集中发送; if ( !(tmpSendQueue.empty()) ) { list< DsIOBuf* >* pTmpSendList = NULL; T_Socket* pTmpSendSocket = NULL; DsIOBuf* pTmpSendBuf = NULL; for ( set::iterator iter=tmpSendQueue.begin(); iter!=tmpSendQueue.end(); ++iter ) { pTmpSendSocket = *iter; if ( NULL == pTmpSendSocket ) { continue; } if ( !(pTmpSendSocket->SendInnerBuf()) ) { //发送失败,断开连接; //OnDisconnected( pTmpSendSocket, NULL ); //让recv去检测断开; continue; } }//依次发送各端口; tmpSendQueue.clear(); }//if ( tmpSendQueue.size() > 0 ) return; TRY_END; return; } public://各线程回调函数; ///accept操作结束后检测是否要post新的acceptex; bool CheckAcceptPost( CDsSocket* pListenSocket ) { if ( NULL == pListenSocket ) { return false; } //如果outstand acceptex不够,则发出一批acceptex; pListenSocket->DecAcceptNum(); int curleft = pListenSocket->GetAcceptNum(); if ( curleft<32 ) { //一次性post多个acceptex; PostAcceptEx( pListenSocket, 64-curleft ); } return true; } ///连接已接受时回调,socket不回收,msgtoput读完后回收; void OnAccepted( T_Socket* pListenSocket, PerIOData* pPerIOData ) { if ( NULL == pPerIOData ) { //error; return; } if ( NULL == pPerIOData->pDsIOBuf ) { //error; return; } //连接成功,取远端地址; sockaddr_in* localAddr; sockaddr_in* remoteAddr; int localAddrlen = 0; int remoteAddrlen = 0; m_pfnGetAcceptExSockaddrs( pPerIOData->pDsIOBuf->GetInnerBuf(), 0/*dwReceiveDataLength*/, sizeof(sockaddr_in) + 16/*dwLocalAddressLength*/ , sizeof(sockaddr_in) + 16/*dwRemoteAddressLength*/, (LPSOCKADDR*)&localAddr/*LocalSockaddr*/, &localAddrlen/*LocalSockaddrLength*/ , (LPSOCKADDR*)&remoteAddr/*RemoteSockaddr*/, &remoteAddrlen/*RemoteSockaddrLength*/ ); //置新连接的地址信息; T_Socket* pClientSocket = pPerIOData->pAssociSocket; pClientSocket->SetSocketAddr( inet_ntoa(remoteAddr->sin_addr), ntohs(remoteAddr->sin_port) ); //读取远端地址后,回收pDsIOBuf; if ( NULL != g_poolDsIOBuf ) { g_poolDsIOBuf->Release( pPerIOData->pDsIOBuf ); pPerIOData->pDsIOBuf = NULL; } //以下:1、通知应用连接进入,2、关联完成端口,3、post recv,4、如果outstand acceptex不够,则继续post acceptex; //2、关联完成端口; AssociaIocp( pClientSocket ); //3、post recv; PostRcv( pClientSocket ); //加入进入连接列表; if ( true ) { pClientSocket->SetIsInConn();//设置该socket的连入连接标记; //进入m_arrAllInSockets互斥区; DsMutex tmpMutex( m_hAllInSocketSection ); m_arrAllInSockets.insert( pClientSocket );//加入所有监听到的连接列表,用于群发消息(目前只有gatesrv向所有玩家广播消息时用到); //防止这一列表被已回收socket填满,变得过大; if ( m_arrAllInSockets.size() > 4000 ) { T_Socket* pDsSocket = NULL; for ( set< T_Socket* >::iterator inconniter=m_arrAllInSockets.begin(); inconniter!=m_arrAllInSockets.end(); ) { pDsSocket = *inconniter; if ( ( NULL == pDsSocket ) || ( !(pDsSocket->IsInConn()) ) ) { inconniter = m_arrAllInSockets.erase( inconniter ); continue; } ++inconniter; } // for set } // if m_arrAll } //1、通知应用; MsgToPut* pMsgToPut = g_poolMsgToPut->RetrieveOrCreate(); if ( NULL == pMsgToPut ) { TRY_BEGIN; D_ERROR( "OnAccepted, g_poolMsgToPut内存分配失败\n" ); TRY_END; return; } pMsgToPut->pUniqueSocket = g_poolUniDssocket->RetrieveOrCreate(); if ( NULL == pMsgToPut->pUniqueSocket ) { TRY_BEGIN; D_ERROR( "SendInnerBuf:g_poolUniDssocket,内存分配失败\n" ); TRY_END; return; } pMsgToPut->pUniqueSocket->Init(pClientSocket); pMsgToPut->nHandleID = pClientSocket->GetSessionID(); pMsgToPut->nSessionID = pClientSocket->GetSessionID(); pMsgToPut->nMsgLen = -1;//通知应用新连接建立; PushAppMsgToPut( pMsgToPut ); pClientSocket->SetAppNotified();//设已通知应用标记,以便在连接断开时也能通知应用; return; } ///连接断开时回调,socket回收,如果有msgtoput则回收之; void OnDisconnected( T_Socket* pDsSocket, PerIOData* pPerIOData ) { if ( NULL != pPerIOData ) { if ( NULL != pPerIOData->pDsIOBuf ) { //回收MsgToPut; if ( NULL != g_poolDsIOBuf ) { g_poolDsIOBuf->Release( pPerIOData->pDsIOBuf ); pPerIOData->pDsIOBuf = NULL; } } } if ( ( NULL != pDsSocket ) && (!( pDsSocket->IsReused() ))//之前未被重用,则通知应用,否则,在第一次重用时就已经通知过了,无需再通知; ) { TRY_BEGIN; D_INFO( "0926dbg, 通知应用连接断开, sessionid:%d\n", pDsSocket->GetPendingSessionID() ); TRY_END; //如果之前没有通知过应用连接建立,则现在也不必通知应用连接断开; if ( pDsSocket->IsAppNotified() ) { //通知应用连接断开; MsgToPut* pMsgToPut = g_poolMsgToPut->RetrieveOrCreate(); if ( NULL == pMsgToPut ) { TRY_BEGIN; D_ERROR( "OnDisconnected:g_poolMsgToPut,内存分配失败\n" ); TRY_END; return; } pMsgToPut->pUniqueSocket = NULL; pMsgToPut->nHandleID = pDsSocket->GetPendingSessionID(); pMsgToPut->nSessionID = pDsSocket->GetPendingSessionID();//因为是断开,因此ID号多半发生了变化,必须要取断开前保存的ID号; pMsgToPut->nMsgLen = 0;//断开; PushAppMsgToPut( pMsgToPut ); pDsSocket->ResetAppNotified();//已通知应用断开,后续消息无需再通知应用; } //回收CDsSocket; if ( NULL != g_poolDsSocket ) { g_poolDsSocket->Release( pDsSocket ); pDsSocket = NULL; } } return; } ///数据接收到时回调, socket与msgtoput都不回收,传给应用; void OnRecved( T_Socket* pDsSocket, PerIOData* pPerIOData ) { if ( NULL == pPerIOData ) { return; } //立即在本socket上post一个新的recv; PostRcv( pDsSocket ); //通知应用; DsIOBuf* pDsIOBuf = pPerIOData->pDsIOBuf; if ( NULL == pDsIOBuf ) { TRY_BEGIN; D_ERROR( "OnRecved,pDsIOBuf空\n" ); TRY_END; return; } if ( NULL != pDsSocket ) { if ( !(pDsSocket->IsAppNotified()) ) { TRY_BEGIN; D_ERROR( "OnRecved,!!!,未通知应用的pDsSocket收到了消息\n" ); TRY_END; return; } //该socket仍然有效,则将相应的消息上达; const char* pRcved = pDsIOBuf->GetInnerBuf(); int nRcved = pDsIOBuf->GetInnerLen(); if ( nRcved > 0 ) { //将大包拆开一个个通知应用; MsgToPut* pMsgToPut = NULL; int prosize = (int)(sizeof( pMsgToPut->pMsg )); int msgtoputNeed = (nRcved / prosize) + 1;//需要的msgtoput数目; int lastlen = nRcved % prosize;//最后一个msgtoput存放的信息量; int shouldcpy = 0;//每次应该拷贝的信息量; int cppos = 0;//每次拷贝的起始位置; int cpyed = 0; for ( int i=0; iRetrieveOrCreate(); if ( NULL == pMsgToPut ) { TRY_BEGIN; D_ERROR( "OnRecved, g_poolMsgToPut,内存分配失败\n" ); TRY_END; break; } memcpy( pMsgToPut->pMsg, &(pRcved[cppos]), shouldcpy );//内容填充; pMsgToPut->nMsgLen = shouldcpy;//有效长度填充; pMsgToPut->pUniqueSocket = g_poolUniDssocket->RetrieveOrCreate(); if ( NULL == pMsgToPut->pUniqueSocket ) { TRY_BEGIN; D_ERROR( "OnRecved, g_poolUniDssocket,内存分配失败\n" ); TRY_END; break; } pMsgToPut->pUniqueSocket->Init(pDsSocket);//对应的socket信息; pMsgToPut->nHandleID = pDsSocket->GetPendingSessionID(); pMsgToPut->nSessionID = pDsSocket->GetPendingSessionID(); PushAppMsgToPut( pMsgToPut );//传给应用; }//拆成小包传给应用程序; }//接收的数据量大于0; }//相应socket仍然有效,if ( NULL != pDsSocket ) g_poolDsIOBuf->Release( pDsIOBuf );//信息都已拷至msgtoput,回收本pDsIOBuf; } ///数据已发送时回调,socket不回收,msgtoput回收; void OnSent( T_Socket* pDsSocket, PerIOData* pPerIOData ) { if ( NULL == pPerIOData ) { return; } ////1027.debug,if ( NULL != pDsSocket ) //{ // //pDsSocket->DecPendingSendNum(); //} //忽略; if ( NULL != pPerIOData->pDsIOBuf ) { //回收已发送数据; if ( NULL != g_poolDsIOBuf ) { g_poolDsIOBuf->Release( pPerIOData->pDsIOBuf ); pPerIOData->pDsIOBuf = NULL; } } } private://内部调用函数; ///post acceptex; BOOL PostAcceptEx( T_Socket* pListenSocket, int nPostNum ) { for ( int i=0; iRetrieve();//得到一个socket用于接受到的连接; if ( NULL == pClientSocket ) { D_ERROR( "没有足够的CDsSocket*可供PostAcceptEx使用\n" ); return TRUE; } pClientSocket->RsvPendingSessionID(); DsIOBuf* pDsIOBuf = g_poolDsIOBuf->RetrieveOrCreate(); if ( NULL == pDsIOBuf ) { TRY_BEGIN; D_ERROR( "PostAcceptEx, g_poolDsIOBuf,内存分配失败\n" ); TRY_END; return FALSE; } pDsIOBuf->SetHandleID( pClientSocket->GetSessionID() ); pDsIOBuf->SetSessionID( pClientSocket->GetSessionID() ); PerIOData* pPerIoData = g_poolPerIOData->RetrieveOrCreate(); if ( NULL == pPerIoData ) { TRY_BEGIN; D_ERROR( "PostAcceptEx, g_poolPerIOData,内存分配失败\n" ); TRY_END; return FALSE; } pPerIoData->opType = IOT_ACCEPT; memset( &(pPerIoData->overLapped), 0, sizeof( pPerIoData->overLapped ) ); pPerIoData->pAssociSocket = pClientSocket;//只有接受连接时,periodata中的associsocket才有意义; pPerIoData->pDsIOBuf = pDsIOBuf; DWORD dwBytes = 0; pListenSocket->IncAcceptNum(); //pListenSocket->RsvPendingSessionID();//异步操作前保存ID号,如果异步操作完成时ID号已发生变化,则说明此过程中socket已被重用; //pPerIoData->pAssociSocket->RsvPendingSessionID();//异步操作前保存ID号,如果异步操作完成时ID号已发生变化,则说明此过程中socket已被重用; AddPendingAccept( pPerIoData ); //这一pPerIoData将保存在listensocket身上直至连接被接受为止,如果一直没被接受,则在最后删除pListenSocket时会将其删去; BOOL isAcceptOK = m_pfnAcceptEx( pListenSocket->GetSocket()/*监听端口*/, pPerIoData->pAssociSocket->GetSocket()/*接受端口*/ , pPerIoData->pDsIOBuf->GetInnerBuf()/*地址接受缓存*/, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16 , &dwBytes, &(pPerIoData->overLapped) ); if ( !isAcceptOK ) { unsigned long nerr = GetLastError(); if ( ERROR_IO_PENDING != nerr ) { TRY_BEGIN; D_ERROR( "PostAcceptEx, nerr = %d\n", nerr ); TRY_END; pListenSocket->DecAcceptNum(); } } } return true; }; ///增加一个连接请求,并触发连接; BOOL AddConnect( const char* remoteAddr, unsigned short sRemotePort, unsigned long assignedSessionID ) { DsMutex tmpMutex( m_hConnQueueSection );//进入连接请求互斥区; ConnInfo* pConnInfo = NEW ConnInfo; SafeStrCpy( pConnInfo->remoteAddr, remoteAddr ); pConnInfo->remotePort = sRemotePort; pConnInfo->pDsSocket = g_poolDsSocket->Retrieve(); if ( NULL == pConnInfo->pDsSocket ) { D_ERROR( "连接地址%s:%d时,取不到可用DsSocket,放弃连接\n", remoteAddr, sRemotePort ); delete pConnInfo; pConnInfo = NULL; return FALSE; } pConnInfo->pDsSocket->ManualSetSessionID( assignedSessionID );//在socket上保存的sessionID号,表明连接类型; pConnInfo->pDsSocket->RsvPendingSessionID();//置使用标记; m_ConnQueue.push_back( pConnInfo ); SetEvent( m_hConnEvent );//激活连接线程; return TRUE; } private: ///关联完成端口 void AssociaIocp( T_Socket* pDsSocket ) { CreateIoCompletionPort( (HANDLE)(pDsSocket->GetSocket()), m_hIocp, (ULONG_PTR)(pDsSocket), 0 ); return; } ///向应用传递消息 void PushAppMsgToPut( MsgToPut* pMsgToPut ) { PostQueuedCompletionStatus( m_hAppIocp, 0, (ULONG_PTR)(pMsgToPut), 0 ); return; } ///在pSocket上post一个rcv; void PostRcv( T_Socket* pDsSocket ) { if ( NULL == pDsSocket ) { TRY_BEGIN; D_ERROR( "PostRcv时,传入的pDsSocket空\n" ); TRY_END; return; } PerIOData* pPerIoData = g_poolPerIOData->RetrieveOrCreate(); if ( NULL == pPerIoData ) { TRY_BEGIN; D_ERROR( "PostRcv时,g_poolPerIOData,内存分配失败\n" ); TRY_END; return; } pPerIoData->opType = IOT_READ; memset( &(pPerIoData->overLapped), 0, sizeof( pPerIoData->overLapped ) ); pPerIoData->pAssociSocket = pDsSocket;//实际不用,periodata中的关联端口只在accept返回时使用; //pDsSocket->SetPerIOData( pPerIoData );//现在设上,一旦完成端口返回则置空,以便在最后回收socket时同时回收最后一个未完成的PostRcv PerIOData; pPerIoData->pDsIOBuf = g_poolDsIOBuf->RetrieveOrCreate(); if ( NULL == pPerIoData->pDsIOBuf ) { TRY_BEGIN; g_poolPerIOData->Release( pPerIoData ); D_ERROR( "PostRcv时,NEW DsIOBuf,内存分配失败\n" ); TRY_END; return; } //pPerIoData->pDsIOBuf->pUniqueSocket = g_poolUniDssocket->RetrieveOrCreate(); //if ( NULL == pPerIoData->pDsIOBuf->pUniqueSocket ) //{ // g_poolDsIOBuf->Release( pPerIoData->pDsIOBuf ); // g_poolPerIOData->Release( pPerIoData ); // D_ERROR( "PostRcv时,NEW UniqueSocket失败\n" ); // return; //} //pPerIoData->pDsIOBuf->pUniqueSocket->Init(pDsSocket); pPerIoData->pDsIOBuf->SetHandleID( pDsSocket->GetSessionID() ); pPerIoData->pDsIOBuf->SetSessionID( pDsSocket->GetSessionID() ); pPerIoData->refSendBuf.len = DsIOBuf::IO_BUF_SIZE; pPerIoData->refSendBuf.buf = pPerIoData->pDsIOBuf->GetInnerBuf(); DWORD bytesRcved = 0; DWORD tmpFlags = 0; //pDsSocket->RsvPendingSessionID();//异步操作前保存ID号,如果异步操作完成时ID号已发生变化,则说明此过程中socket已被重用; int tmpret = WSARecv( pDsSocket->GetSocket(), &(pPerIoData->refSendBuf), 1, &bytesRcved, &tmpFlags, &(pPerIoData->overLapped), NULL ); int nerr = 0; if ( 0 != tmpret ) { nerr = GetLastError(); int nerr = WSAGetLastError(); if ( WSA_IO_PENDING != nerr ) { //D_ERROR( "WSARecv错误%d, socketID:%d\n", nerr, pDsSocket->GetPendingSessionID() ); //g_poolUniDssocket->Release( pPerIoData->pDsIOBuf->pUniqueSocket ); g_poolDsIOBuf->Release( pPerIoData->pDsIOBuf ); pPerIoData->pDsIOBuf = NULL; g_poolPerIOData->Release( pPerIoData ); pPerIoData = NULL; pPerIoData = g_poolPerIOData->RetrieveOrCreate();//分配一个断连用periodata; if ( NULL == pPerIoData ) { TRY_BEGIN; D_ERROR( "PostRcv不成功断连时,NEW PerIOData失败\n" ); TRY_END; return; } TRY_BEGIN; D_INFO( "0926dbg,PostRcv不成功主动断连, sessionid:%d, err:%d\n", pDsSocket->GetPendingSessionID(), nerr ); TRY_END; pPerIoData->opType = IOT_DISCONNECT; memset( &(pPerIoData->overLapped), 0, sizeof( pPerIoData->overLapped ) ); //pDsSocket->RsvPendingSessionID(); PostQueuedCompletionStatus( m_hIocp, 0, (ULONG_PTR)pDsSocket, &(pPerIoData->overLapped) ); //OnDisconnected( pDsSocket, NULL ); } } return; } private: list* m_pToSwitchQueueRead; bool m_bIsStopThread; HANDLE m_hIocp;//完成端口句柄; HANDLE m_hAppIocp;//用于通知上层应用的完成端口; HANDLE* m_arrThreads;//线程句柄; T_ToSend* m_pSendQueue;//发送消息队列; T_RcvTo* m_pRcvQueue;//接收消息队列; set< T_Socket* > m_arrListenSockets;//所有监听端口; //static CRITICAL_SECTION m_hAllInSocketSection;//m_arrAllInSockets互斥区; static DsCS m_hAllInSocketSection;//m_arrAllInSockets互斥区; static set< T_Socket* > m_arrAllInSockets;//所有监听到的连接,用于群发消息(目前只有gatesrv向所有玩家广播消息时用到),之所以用set,是为了防止遍历时对同一元素遍历多次; HANDLE m_hConnEvent;//连接线程触发事件; //CRITICAL_SECTION m_hConnQueueSection;//连接队列互斥区; DsCS m_hConnQueueSection;//连接队列互斥区; list< ConnInfo* > m_ConnQueue;//连接请求队列; set m_pendingIOData; private: LPFN_ACCEPTEX m_pfnAcceptEx; LPFN_GETACCEPTEXSOCKADDRS m_pfnGetAcceptExSockaddrs; LPFN_TRANSMITFILE m_pfnTransmitFile; }; //用于向应用传递消息的工作者线程,本线程只能启一个,且等待在专用的完成端口上,因为接收消息队列不能多个线程同时写; template< typename T_ToSend/*从此队列取待发送消息*/, typename T_RcvTo/*接收消息的存放队列*/, typename T_Socket/*socket类型*/ > DWORD WINAPI PushMsgToAppThread( LPVOID pOwner ) { TRY_BEGIN; CDsIocp* pDsIocp = ( CDsIocp* ) pOwner; HANDLE hIocp = pDsIocp->GetAppIocpHandle(); DWORD bytesTransferred = 0; OVERLAPPED* pOverlapped = NULL; MsgToPut* pMsgToPut = NULL; T_RcvTo* pRcvQueue = pDsIocp->GetRcvQueue(); BOOL iocpRet = 0; DWORD dwError = 0; while ( true ) { iocpRet = GetQueuedCompletionStatus( hIocp, &bytesTransferred, (LPDWORD)&pMsgToPut, (LPOVERLAPPED*)&pOverlapped, INFINITE ); if ( pDsIocp->IsStopThread() ) { //主动退出 break; } if ( NULL == pMsgToPut ) { continue; } if ( pRcvQueue->PushMsg( pMsgToPut ) < 0 )//收到的消息push到接收队列中去; { D_ERROR( "0926dbg, PushMsgToAppThread, PushMsg失败\n" ); } } return 0; TRY_END; return 0; } //发送消息队列监视线程; template< typename T_ToSend/*从此队列取待发送消息*/, typename T_RcvTo/*接收消息的存放队列*/, typename T_Socket/*socket类型*/ > DWORD WINAPI SendMonitor( LPVOID pOwner ) { TRY_BEGIN; CDsIocp* pDsIocp = ( CDsIocp* ) pOwner; T_ToSend* pSendQueue = pDsIocp->GetSendQueue(); while ( true ) { pSendQueue->WaitReadEvent();//等待队列中有消息可读,或者主线程主动置消息可读事件以便中止本线程; if ( pDsIocp->IsStopThread() ) { break; } //D_DEBUG( "收到发送触发信号...\n" ); pDsIocp->TrySendMsg(); } return 0; TRY_END; return 0; } //连接线程,监测连接需求,发起连接; template< typename T_ToSend/*从此队列取待发送消息*/, typename T_RcvTo/*接收消息的存放队列*/, typename T_Socket/*socket类型*/ > DWORD WINAPI ConnThread( LPVOID pOwner ) { TRY_BEGIN; CDsIocp* pDsIocp = ( CDsIocp* ) pOwner; BOOL isRetry = TRUE; while ( true ) { ::WaitForSingleObject( pDsIocp->GetConnSignal(), INFINITE ); if ( pDsIocp->IsStopThread() ) { break; } while ( isRetry ) { if ( pDsIocp->IsStopThread() ) { break; } isRetry = FALSE; pDsIocp->TryConn( isRetry ); Sleep( 2000 );//等5秒后再试; } //当前的连接请求都已完毕,返回等待下次连接请求触发; } return 0; TRY_END; return 0; } #define RELEASE_IODATAS_IOBUF(pTgtIOData) \ {\ if ( NULL != pTgtIOData )\ {\ if ( NULL != pTgtIOData->pDsIOBuf )\ {\ if ( NULL != g_poolDsIOBuf )\ {\ g_poolDsIOBuf->Release( pTgtIOData->pDsIOBuf );\ pTgtIOData->pDsIOBuf = NULL;\ }\ }\ }\ } //工作者线程,连接接受,数据接收与发送等; template< typename T_ToSend/*从此队列取待发送消息*/, typename T_RcvTo/*接收消息的存放队列*/, typename T_Socket/*socket类型*/ > DWORD WINAPI WorkerThread( LPVOID pOwner ) { TRY_BEGIN; CDsIocp* pDsIocp = ( CDsIocp* ) pOwner; HANDLE hIocp = pDsIocp->GetIocpHandle(); DWORD bytesTransferred = 0; OVERLAPPED* pOverlapped = NULL; T_Socket* pDsSocket = NULL; PerIOData* pPerIoData = NULL; BOOL iocpRet = 0; DWORD dwError = 0; while ( true ) { iocpRet = GetQueuedCompletionStatus( hIocp, &bytesTransferred, (LPDWORD)&pDsSocket, (LPOVERLAPPED*)&pOverlapped, INFINITE ); if ( pDsIocp->IsStopThread() ) { //主动退出 break; } if ( NULL == pDsSocket ) { TRY_BEGIN; D_ERROR( "0926dbg,IOCP工作者线程, pDsSocket == NULL \n" ); TRY_END; break; } pPerIoData = NULL; if ( NULL != pOverlapped ) { pPerIoData = CONTAINING_RECORD( pOverlapped, PerIOData, overLapped );//取pPerIoData; if ( ( NULL != pPerIoData ) && ( IOT_ACCEPT == pPerIoData->opType ) //如果是accept操作,则检查是否要抛出一批acceptex; ) { pDsIocp->CheckAcceptPost( pDsSocket ); } } if ( ( ERROR_SUCCESS == iocpRet ) || ( 0 == iocpRet ) ) { dwError = GetLastError(); if ( NULL != pPerIoData ) { if ( IOT_ACCEPT != pPerIoData->opType ) { //非accept操作,应直接关对应的socket; TRY_BEGIN; D_INFO( "0926dbg,非accept操作(%d)ERROR_SUCCESS, err:%d,sessionid:%d\n", pPerIoData->opType, dwError, pDsSocket->GetPendingSessionID() ); TRY_END; //pDsIocp->OnDisconnected( pDsSocket, pPerIoData ); //rcv处理,其余操作无需抛断连?试验, by dzj, 09.02.11 if ( IOT_DISCONNECT != pPerIoData->opType ) { D_INFO( "ERROR_SUCCESS,尝试发起断开连接,sessionid:%d", pDsSocket->GetPendingSessionID() ); pDsIocp->PostDisConnect( pDsSocket, pPerIoData ); }//由于IOT_DISCONNECT操作发起时没有分配DS_IOBUF,因此不需要else中去显式释放; //RELEASE_IODATAS_IOBUF( pPerIoData );//非accept操作,留给recv去检测; } else { //accept操作,应该关accept用socket,而不是关pListenSocket; TRY_BEGIN; if ( NULL != pPerIoData->pAssociSocket ) { D_INFO( "0926dbg,accept操作ERROR_SUCCESS, err:%d,断开回收处理,sessionid:%d\n", dwError, pPerIoData->pAssociSocket->GetPendingSessionID() ); } else { D_INFO( "0926dbg,accept操作ERROR_SUCCESS, err:%d,断开回收处理,操作对应socket空\n", dwError ); } TRY_END; //pDsIocp->OnDisconnected( pPerIoData->pAssociSocket, pPerIoData ); pDsIocp->PostDisConnect( pPerIoData->pAssociSocket, pPerIoData );//accept操作,因为没有recv,因此直接重用; } if ( NULL != g_poolPerIOData ) { g_poolPerIOData->Release( pPerIoData ); pPerIoData = NULL; } continue; } else { //主动退出,or some error occur; TRY_BEGIN; D_INFO( "0926dbg, NULL == pPerIoData,, err:%d, 退出工作者线程,sessionid:%d\n", dwError, pDsSocket->GetPendingSessionID() ); TRY_END; //pDsIocp->PostDisConnect( pDsSocket, NULL );//留给recv去检测,并且没有periodata中的dsiobuf需要释放; //pDsIocp->OnDisconnected( pDsSocket, NULL ); break; } //处理socket关闭; continue; } if ( NULL == pPerIoData ) { //退出事件; TRY_BEGIN; D_ERROR( "0926dbg, 空指针错误2,退出工作者线程,NULL == pPerIoData\n" ); TRY_END; break; } if ( pDsSocket->IsReused() ) { //socket已在操作pending期间被重用,原连接已断开; TRY_BEGIN; D_INFO( "0926dbg, socket已在操作pending期间被重用,sessionid:%d\n", pDsSocket->GetPendingSessionID() ); TRY_END; //pDsIocp->PostDisConnect( pDsSocket, pPerIoData ); RELEASE_IODATAS_IOBUF( pPerIoData );//已重用,只回收pPerIoData中的dsiobuf而不关socket; //pDsIocp->OnDisconnected( pDsSocket, pPerIoData ); if ( NULL != g_poolPerIOData ) { g_poolPerIOData->Release( pPerIoData ); pPerIoData = NULL; } continue; } if ( IOT_DISCONNECT == pPerIoData->opType ) { //之前的recv错,或者是主动断开连接; TRY_BEGIN; D_INFO( "0926dbg,post recv错或主动断连, sessionid:%d\n", pDsSocket->GetPendingSessionID() ); TRY_END; pDsIocp->OnDisconnected( pDsSocket, pPerIoData ); if ( NULL != g_poolPerIOData ) { g_poolPerIOData->Release( pPerIoData ); pPerIoData = NULL; } continue; } /* enum IO_OP_TYPE { IOT_READ = 0, IOT_WRITE, IOT_ACCEPT }; */ switch ( pPerIoData->opType ) { case IOT_READ: if ( 0 == bytesTransferred ) { //远端断开; TRY_BEGIN; D_INFO( "0926dbg,recv到字节数0, sessionid:%d\n", pDsSocket->GetPendingSessionID() ); TRY_END; //pDsIocp->OnDisconnected( pDsSocket, pPerIoData ); pDsIocp->PostDisConnect( pDsSocket, pPerIoData ); } else { //正常接收; pPerIoData->pDsIOBuf->SetDealLen( bytesTransferred ); pDsIocp->OnRecved( pDsSocket, pPerIoData ); } break; case IOT_WRITE: if ( 0 == bytesTransferred ) { //远端断开; TRY_BEGIN; D_INFO( "0926dbg,write字节数0, sessionid:%d\n", pDsSocket->GetPendingSessionID() ); TRY_END; //pDsIocp->OnDisconnected( pDsSocket, pPerIoData ); //pDsIocp->PostDisConnect( pDsSocket, pPerIoData );//留给recv操作去检测,这里只释放periodata中的dsiobuf; RELEASE_IODATAS_IOBUF( pPerIoData );//已重用,只回收pPerIoData中的dsiobuf而不关socket; } else { pDsIocp->OnSent( pDsSocket, pPerIoData ); } break; case IOT_ACCEPT: pDsIocp->DelPendingAccept( pPerIoData );//清除未决accept与listensocket的绑定; pDsIocp->OnAccepted( pDsSocket, pPerIoData ); break; default: { D_ERROR( "should not get here, WorkerThread_IOT_:%d\n", pPerIoData->opType ); } } if ( NULL != g_poolPerIOData ) { g_poolPerIOData->Release( pPerIoData );//pMsgToPut留待应用去回收,因为其可能要被用于传递给上层应用; pPerIoData = NULL; } }//工作者线程大循环,while ( true ) return 0; TRY_END; return 0; } #endif //USE_DSIOCP #endif //#ifndef DS_IOCP_H