/** * @file CManCommHandler.h * @brief 管理到本机的所有连接信息 * Copyright(c) 2007,上海第九城市游戏研发部 * All rights reserved * 文件名称: CManCommHandler.h * 摘 要: 管理到本机的所有连接信息; * 作 者: dzj * 完成日期: 2007.11.29 * */ #pragma once #include "../tcache/tcache.h" #include "../BufQueue/BufQueue.h" //双缓冲队列,从缓冲队列中收包; #include "../dscontainer/dshashtb.h" #include #include using namespace std; //template < typename T_PkgHandler > //class TestLinuxTemplate //不能给map的模板参数传模板? //{ //public: // void TestMap() // { // //class map; // //typename map::iterator; // //typedef typename map MYMAP; // //typedef typename map::iterator MYITER; // map m_mapRcvPkgHandlers1;//所有的连接句柄处理器; // //T_PkgHandler tmpHandler1 = m_mapRcvPkgHandlers1.end()->second; // for ( typename map::iterator tmpIter = m_mapRcvPkgHandlers1.begin(); tmpIter!=m_mapRcvPkgHandlers1.end(); ++tmpIter ) // { // T_PkgHandler tmpHandler = tmpIter->second; // } // vector m_mapRcvPkgHandlers;//所有的连接句柄处理器; // T_PkgHandler tmpHandler = m_mapRcvPkgHandlers.back(); // m_mapRcvPkgHandlers.pop_back(); // return; // } //}; ////test linux template,不能给map的模板参数传模板??; //TestLinuxTemplate< IPkgProcer/*int*//*CRcvPkgHandler**/ > testLinuxMap; //TCache* testLinux; /** * 管理所有的连接信息; * 新来连接时将连接添加到本管理器, * 收包时在本管理器中找到相应收包处理器进行处理 */ template < typename T_PkgHandler > class CManCommHandler { private: CManCommHandler( const CManCommHandler& ); //屏蔽这两个操作; CManCommHandler& operator = ( const CManCommHandler& );//屏蔽这两个操作; public: CManCommHandler() : m_pReadNetMsgQueue(NULL), m_pHandlePool(NULL) { #ifdef USE_SELF_HASH m_pHashPkgHandlers = NEW DsHashTable< HashTbPtrEle, 3000, 2 >; #else //USE_SELF_HASH m_mapPkgHandlers.clear(); #endif //USE_SELF_HASH m_pToSwitchQueueRead = NEW list; }; ~CManCommHandler() { TRY_BEGIN; if ( NULL != m_pToSwitchQueueRead ) { for ( list::iterator iter=m_pToSwitchQueueRead->begin(); iter!=m_pToSwitchQueueRead->end(); ++iter ) { delete *iter; *iter = NULL; } m_pToSwitchQueueRead->clear(); delete m_pToSwitchQueueRead; m_pToSwitchQueueRead = NULL; } //清内部句柄指针; #ifdef USE_SELF_HASH if ( ( NULL != m_pHashPkgHandlers ) && ( NULL != m_pHandlePool ) ) { HashTbPtrEle* pFound = NULL; m_pHashPkgHandlers->InitExplore();//准备遍历; while ( NULL != ( pFound = m_pHashPkgHandlers->ExploreNext() ) ) { m_pHandlePool->Release( pFound->GetInnerPtr() );//依次回收; } delete m_pHashPkgHandlers; m_pHashPkgHandlers = NULL; } #else //USE_SELF_HASH for ( typename map< int, T_PkgHandler* >::iterator tmpIter = m_mapPkgHandlers.begin(); tmpIter != m_mapPkgHandlers.end(); ++tmpIter ) { if ( NULL != m_pHandlePool ) { m_pHandlePool->Release( tmpIter->second ); tmpIter->second = NULL; } } m_mapPkgHandlers.clear(); #endif //USE_SELF_HASH delete m_pHandlePool; m_pHandlePool = NULL; TRY_END; }; ///置网络消息队列,从这些队列中收消息或将消息发送出去; void SetNetQueue( IBufQueue* pReadNetMsgQueue ) { m_pReadNetMsgQueue = pReadNetMsgQueue; } ///置句柄池对象 void SetHandlePoolSize( int poolSize ) { if ( ( poolSize <=0 ) || ( poolSize > 60000 ) ) { D_ERROR( "SetHandlePoolSize, poolSize:%d非法\n", poolSize ); return;//非法通信句柄数量; } m_pHandlePool = NEW TCache< T_PkgHandler >(poolSize); } public: ///通信时钟处理,定时调用以处理网络收包; void CommTimerProc() { TRY_BEGIN; if ( NULL == m_pReadNetMsgQueue ) { //收包队列未初始化; return; } RecvAllMsg(); return; TRY_END; return; } //int WriteNetMsg( MsgToPut* pPkg ) //{ // //MsgToPut*由组包类完成; // m_pSendNetMsgQueue->PushMsg( pPkg ); // return 0; //} private: ///从网络收包; void RecvAllMsg() { TRY_BEGIN; //从缓冲队列中收包; MsgToPut* pMsgToPut = NULL; bool isReadReady = false; m_pReadNetMsgQueue->WaitReadEventByTimeOut( isReadReady );//???,邓子建检查,08.08.19; if ( !isReadReady ) { //没有待读消息; return; } m_pToSwitchQueueRead = m_pReadNetMsgQueue->PreSerialRead( m_pToSwitchQueueRead ); //m_pReadNetMsgQueue->PreSerialRead();//准备读; //m_pToSwitchQueueRead = m_pReadNetMsgQueue->SwitchOutMsgQueue( m_pToSwitchQueueRead ); if ( m_pToSwitchQueueRead->empty() ) { return; } for ( list::iterator iter=m_pToSwitchQueueRead->begin(); iter!=m_pToSwitchQueueRead->end(); ++iter ) { pMsgToPut = *iter; if ( NULL == pMsgToPut ) { continue; } //处理到达的网络消息; ProcOneMsg( pMsgToPut ); } if ( NULL != g_poolMsgToPut ) { //处理器在AddPkgContent时会进行拷贝,因此在这里应该删除; for ( list::iterator iter=m_pToSwitchQueueRead->begin(); iter!=m_pToSwitchQueueRead->end(); ++iter ) { pMsgToPut = *iter; if ( NULL != pMsgToPut ) { g_poolMsgToPut->Release( pMsgToPut ); } } } m_pToSwitchQueueRead->clear();//清,准备下次读; return; TRY_END; return; } //处理一个网络收包,将收包转PkgHandler处理; void ProcOneMsg( MsgToPut* pMsg ) { TRY_BEGIN; if ( NULL == pMsg ) { return; } if ( 0 == pMsg->nMsgLen ) { //长度为0的消息表明连接断开; DeletePkgHandle( pMsg->nHandleID, pMsg->nSessionID ); return; } #ifdef USE_DSIOCP T_PkgHandler* tmpHandler = FindPkgHandler( pMsg->nHandleID, pMsg->nSessionID, pMsg->pUniqueSocket ); #else //USE_DSIOCP T_PkgHandler* tmpHandler = FindPkgHandler( pMsg->nHandleID, pMsg->nSessionID ); #endif //USE_DSIOCP if ( NULL != tmpHandler ) { if ( 0 == pMsg->nMsgLen ) { //长度为0的消息表明连接断开; DeletePkgHandle( pMsg->nHandleID, pMsg->nSessionID ); return; } if ( pMsg->nMsgLen < 0 ) { //长度小于0消息表明连接建立成功,之前FindPkgHandler时已经作了相应的建立handler以及连接初始化工作,后续的消息内容无效,忽略; return; } tmpHandler->AddPkgContent( pMsg->pMsg, pMsg->nMsgLen ); } else { //错误; D_WARNING( "网络收包异常, ProcOneMsg()函数FindPkgHandler失败\n" ); } ////int jjj, test //if ( pMsg->nSessionID == 700 ) //{ // static char lastMsg[512];/*消息设为固定大小*/ // static int lastMsgLen = pMsg->nMsgLen; // memcpy( lastMsg, pMsg->pMsg, pMsg->nMsgLen ); //} return; TRY_END; return; } bool CheckHandle( T_PkgHandler* pHandler, int nHandleID, int nSessionID ) { if ( NULL == pHandler ) { D_WARNING( "CheckHandle, NULL == pHandler\n" ); return false; } int orgHandleID = 0; int orgSessionID = 0; pHandler->GetHandleInfo( orgHandleID, orgSessionID ); if ( ( nSessionID == orgSessionID ) && ( nHandleID == orgHandleID ) ) { return true; } return false; } #ifdef USE_SELF_HASH ///根据消息句柄ID标识和session找到该消息的合适处理器; #ifdef USE_DSIOCP T_PkgHandler* FindPkgHandler( int nHandleID, int nSessionID, UniqueObj< CDsSocket >* pUniqueSocket ) #else //USE_DSIOCP T_PkgHandler* FindPkgHandler( int nHandleID, int nSessionID ) #endif //USE_DSIOCP { TRY_BEGIN; HashTbPtrEle* pFound = NULL; bool isFound = m_pHashPkgHandlers->FindEle( (unsigned int) nHandleID, pFound ); if ( isFound ) { if ( NULL == pFound ) { D_ERROR( "FindPkgHandler,FindEle成功,但pFound==空\n" ); return NULL; } T_PkgHandler* pFoundHandle = pFound->GetInnerPtr(); //原来就有该通信句柄,检查sessionID是否一致; if ( NULL == pFoundHandle ) { //错误,无通信处理器; //应该新建处理器; D_WARNING( "原来有通信句柄,但无相应处理器,nHandleID%d, sessionid:%d\n", nHandleID, nSessionID ); #ifdef USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID, pUniqueSocket ); #else //USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID ); #endif //USE_DSIOCP } if ( CheckHandle( pFoundHandle, nHandleID, nSessionID ) ) { //句柄ID及sessionID都正确,返回相应处理器指针; return pFoundHandle; } else { //句柄已被重用,应该释放旧处理器,并新建处理器; if ( NULL != m_pHandlePool ) { m_pHandlePool->Release( pFoundHandle ); } m_pHashPkgHandlers->DelEle( (unsigned int) nHandleID ); D_WARNING( "原来有通信句柄及处理器,但旧sessionid与新的sessionid:%d不一致\n", nSessionID ); } } //原来没有该通信句柄,新建相应处理器; //D_WARNING( "原来无该通信句柄:%d,新建之\n", nSessionID ); #ifdef USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID, pUniqueSocket ); #else //USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID ); #endif //USE_DSIOCP TRY_END; return NULL; } ///删除特定的处理器,原因:相应连接断开; void DeletePkgHandle( int nHandleID, int nSessionID ) { TRY_BEGIN; if ( NULL == m_pHashPkgHandlers ) { return; } HashTbPtrEle* pFound = NULL; bool isFound = m_pHashPkgHandlers->FindEle( (unsigned int) nHandleID, pFound ); if ( ( !isFound ) || (NULL == pFound) ) { return;//没有对应nHandleID的处理器; } T_PkgHandler* pFoundHandle = pFound->GetInnerPtr(); if ( NULL == pFoundHandle ) { D_WARNING( "DeletePkgHandle, NULL == pFoundHandle\n" ); return;//对应处理器空 } if ( ! CheckHandle( pFoundHandle, nHandleID, nSessionID ) ) { //句柄已被重用,忽略删除操作; return; } //句柄ID及sessionID都正确,的确是要删去该处理器,回收,在map中释放相应项; if ( NULL != m_pHandlePool ) { m_pHandlePool->Release( pFoundHandle ); } m_pHashPkgHandlers->DelEle( (unsigned int) nHandleID ); TRY_END; } ///新建一个处理器,并将该处理器添加到map; #ifdef USE_DSIOCP T_PkgHandler* CreatePkgHandle( int nHandleID, int nSessionID, UniqueObj< CDsSocket >* pUniqueSocket ) #else //USE_DSIOCP T_PkgHandler* CreatePkgHandle( int nHandleID, int nSessionID ) #endif //USE_DSIOCP { TRY_BEGIN; //新建处理器,设置合适的句柄以及session信息; T_PkgHandler* tmpHandler = m_pHandlePool->Retrieve();//保证handler总是可访问,并且可以进行有效性校验; if ( NULL == tmpHandler ) { D_ERROR( "CreatePkgHandle, m_pHandlePool->Retrieve() == NULL! \n" ); return NULL; } #ifdef USE_DSIOCP tmpHandler->SetHandleInfo( nHandleID, nSessionID, pUniqueSocket ); #else //USE_DSIOCP tmpHandler->SetHandleInfo( nHandleID, nSessionID ); #endif //USE_DSIOCP m_pHashPkgHandlers->PushEle( HashTbPtrEle( (unsigned int)nHandleID, tmpHandler ) ); return tmpHandler; TRY_END; return NULL; } #else //USE_SELF_HASH ///根据消息句柄ID标识和session找到该消息的合适处理器; #ifdef USE_DSIOCP T_PkgHandler* FindPkgHandler( int nHandleID, int nSessionID, UniqueObj< CDsSocket >* pUniqueSocket ) #else //USE_DSIOCP T_PkgHandler* FindPkgHandler( int nHandleID, int nSessionID ) #endif //USE_DSIOCP { TRY_BEGIN; typename map< int, T_PkgHandler* >::iterator tmpIter = m_mapPkgHandlers.find( nHandleID ); if ( tmpIter != m_mapPkgHandlers.end() ) { //原来就有该通信句柄,检查sessionID是否一致; if (NULL == tmpIter->second) { //错误,无通信处理器; m_mapPkgHandlers.erase( tmpIter ); //应该新建处理器; D_WARNING( "原来有通信句柄,但无相应处理器,新建处理器,sessionid:%d\n", nSessionID ); //return CreatePkgHandle( nHandleID, nSessionID ); #ifdef USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID, pUniqueSocket ); #else //USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID ); #endif //USE_DSIOCP } if ( CheckHandle( tmpIter->second, nHandleID, nSessionID ) ) { //句柄ID及sessionID都正确,返回相应处理器指针; return tmpIter->second; } else { //句柄已被重用,应该释放旧处理器,并新建处理器; if ( NULL != m_pHandlePool ) { m_pHandlePool->Release( tmpIter->second ); tmpIter->second = NULL; } m_mapPkgHandlers.erase( tmpIter ); D_WARNING( "原来有通信句柄及处理器,但旧sessionid与新的sessionid:%d不一致\n", nSessionID ); //return CreatePkgHandle( nHandleID, nSessionID ); #ifdef USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID, pUniqueSocket ); #else //USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID ); #endif //USE_DSIOCP } } //原来没有该通信句柄,新建相应处理器; //D_WARNING( "原来无该通信句柄:%d,新建之\n", nSessionID ); #ifdef USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID, pUniqueSocket ); #else //USE_DSIOCP return CreatePkgHandle( nHandleID, nSessionID ); #endif //USE_DSIOCP TRY_END; return NULL; } ///删除特定的处理器,原因:相应连接断开; void DeletePkgHandle( int nHandleID, int nSessionID ) { TRY_BEGIN; typename map< int, T_PkgHandler* >::iterator tmpIter = m_mapPkgHandlers.find( nHandleID ); if ( tmpIter != m_mapPkgHandlers.end() ) { //原来就有该通信句柄,检查sessionID是否一致; if (NULL == tmpIter->second) { //错误,无通信处理器; m_mapPkgHandlers.erase( tmpIter ); return; } if ( CheckHandle( tmpIter->second, nHandleID, nSessionID ) ) { //句柄ID及sessionID都正确,的确是要删去该处理器,回收,在map中释放相应项; if ( NULL != m_pHandlePool ) { m_pHandlePool->Release( tmpIter->second ); tmpIter->second = NULL; } m_mapPkgHandlers.erase( tmpIter ); return; } else { //句柄已被重用,忽略删除操作; } } else { //找不到处理器,忽略删除操作,可能底层之前还没有通知过应用该连接的相关信息(虽然不合理,但属于正常,以后改进); } TRY_END; } ///新建一个处理器,并将该处理器添加到map; #ifdef USE_DSIOCP T_PkgHandler* CreatePkgHandle( int nHandleID, int nSessionID, UniqueObj< CDsSocket >* pUniqueSocket ) #else //USE_DSIOCP T_PkgHandler* CreatePkgHandle( int nHandleID, int nSessionID ) #endif //USE_DSIOCP { TRY_BEGIN; //新建处理器,设置合适的句柄以及session信息; T_PkgHandler* tmpHandler = m_pHandlePool->Retrieve();//保证handler总是可访问,并且可以进行有效性校验; if ( NULL == tmpHandler ) { D_ERROR( "CManCommHandler.h::CreatePkgHandle, m_pHandlePool->Retrieve() == NULL! \n" ); return NULL; } #ifdef USE_DSIOCP tmpHandler->SetHandleInfo( nHandleID, nSessionID, pUniqueSocket ); #else //USE_DSIOCP tmpHandler->SetHandleInfo( nHandleID, nSessionID ); #endif //USE_DSIOCP m_mapPkgHandlers.insert( pair(nHandleID, tmpHandler) ); return tmpHandler; TRY_END; return NULL; } #endif //USE_SELF_HASH private: list* m_pToSwitchQueueRead; #ifdef USE_SELF_HASH DsHashTable< HashTbPtrEle, 3000, 2 >* m_pHashPkgHandlers; #else //USE_SELF_HASH ///句柄处理器map; map m_mapPkgHandlers;//所有的连接句柄处理器; #endif //USE_SELF_HASH ///读网络消息来源; IBufQueue* m_pReadNetMsgQueue; ///句柄池对象; ///!!!注意:T_PkgHandler必须从全局池中分配,不可自行new,不可定义临时变量,不可定义静态变量,不可memset; ///因为所有的对象最后都要通过对象池回收,不遵守上述约定会造成内存错误; TCache< T_PkgHandler >* m_pHandlePool; };