#ifndef NET_TASK_EX_CPP #define NET_TASK_EX_CPP #include "NetTaskEx.h" #include "SessionManager.h" #include "Utility.h" #include "../Test/testthreadque_wait/sigexception/sigexception.h" template CTask::CTask(IBufQueue *pReadQueue, IBufQueue *pSendQueue, ACE_Reactor *pReactor, CHandlerManagerEx *pHandlerManager) :m_pReadQueue(pReadQueue), m_pSendQueue(pSendQueue), m_pReactor(pReactor), m_pHandlerManager(pHandlerManager), m_pAcceptor(NULL) { } template CTask::~CTask(void) { if(NULL != m_pAcceptor) { delete m_pAcceptor; m_pAcceptor = NULL; } } template int CTask::open(void *args) { if(args == NULL) return -1; // 新建接收器 m_pAcceptor = NEW CAcceptorEx< SVC_HANDLER, ACE_SOCK_Acceptor, ACE_LOCK >(m_pReadQueue, m_pSendQueue, 0, m_pHandlerManager); if(NULL == m_pAcceptor) return -1; int iRet = 0; // 初始化接收器 ACE_INET_Addr *pAddr = (ACE_INET_Addr *)args; if((iRet = m_pAcceptor->open(*pAddr, m_pReactor, ACE_NONBLOCK)) == -1) return -1; // 激活任务 return activate(THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, 1); } template int CTask::svc(void) { D_DEBUG("CTask::svc() begin.\n"); int retcode = 0; ThreadInitTls(); // 事件循环处理 m_pReactor->owner(ACE_OS::thr_self()); /*m_pReactor->run_reactor_event_loop();用如下循环替换*/ while (1) { ST_SIG_CATCH { int result = m_pReactor->handle_events (); /*if(result == -1 && m_pReactor->implementation()->deactivate ()) { retcode = 0; break; } else */ if (result == -1) { retcode = -1; break; } } END_SIG_CATCH; } ThreadEndTls(); D_DEBUG("CTask::svc() end.\n"); return retcode; } template int CTask::Quit(void) { m_pReactor->end_reactor_event_loop(); return 0; } template CTaskHelper::CTaskHelper(IBufQueue *pReadQueue, IBufQueue *pSendQueue, ACE_Reactor *pReactor, CHandlerManagerEx *pHandlerManager) :m_pReadQueue(pReadQueue), m_pSendQueue(pSendQueue), m_pReactor(pReactor), m_pHandlerManager(pHandlerManager), m_pConnector(NULL), m_pToSwitchQueueRead(NULL), m_bQuit(false) { } template CTaskHelper::~CTaskHelper(void) { if(NULL != m_pToSwitchQueueRead) { if ( NULL != m_pToSwitchQueueRead ) { for ( list::iterator iter=m_pToSwitchQueueRead->begin(); iter!=m_pToSwitchQueueRead->end(); ++iter ) { delete *iter; *iter = NULL; } m_pToSwitchQueueRead->clear(); delete m_pToSwitchQueueRead; m_pToSwitchQueueRead = NULL; } } if(NULL != m_pConnector) { delete m_pConnector; m_pConnector = NULL; } } template int CTaskHelper::open(void *args ) { ACE_UNUSED_ARG(args); int iRet = 0; m_pToSwitchQueueRead = NEW std::list; if(NULL == m_pToSwitchQueueRead) return -1; // 新建连接器 m_pConnector = NEW CConnectorEx(m_pReadQueue, m_pSendQueue, 0, m_pHandlerManager); if(NULL == m_pConnector) return -1; // 初始化连接器 if((iRet = m_pConnector->open(m_pReactor, ACE_NONBLOCK)) == -1) return -1; return this->activate(); } template int CTaskHelper::svc(void) { int retcode = 0; ThreadInitTls(); while(1) { ST_SIG_CATCH { if(m_bQuit) break; // 等待 /*bool bflag = false; m_pSendQueue->WaitReadEventByTimeOut(bflag); if(!bflag) continue;*/ m_pSendQueue->WaitReadEvent(); // 交换队列 m_pToSwitchQueueRead = m_pSendQueue->PreSerialRead(m_pToSwitchQueueRead); MsgToPut* pTempMsg = NULL; // 调试 //D_DEBUG("m_pToSwitchQueueRead->size = %d\n", m_pToSwitchQueueRead->size()); if(m_pToSwitchQueueRead->size() > 0) { for(list::iterator iter=m_pToSwitchQueueRead->begin(); iter!=m_pToSwitchQueueRead->end(); ++iter ) { pTempMsg = *iter; if ( NULL == pTempMsg ) { D_ERROR( "CTaskHelper::svc,NULL == pTempMsg\n" ); continue; } if ( (pTempMsg->nMsgLen < 0) || ( (unsigned int)(pTempMsg->nMsgLen) > ARRAY_SIZE(pTempMsg->pMsg) ) ) { if ( -1 != pTempMsg->nMsgLen ) { D_ERROR( "CTaskHelper::svc,错误的msglen:%d, msgHandleID:%d, sessionID:%d\n" , pTempMsg->nMsgLen, pTempMsg->nHandleID, pTempMsg->nSessionID ); continue; } } ST_SIG_CATCH { if(pTempMsg->nMsgLen >= 0) { if(GATE_PLAYER_BROCASTSID != pTempMsg->nSessionID)//普通信息 { SVC_HANDLER *pHandler = (SVC_HANDLER *)pTempMsg->nHandleID; if(NULL != pHandler) { if(pHandler->AppendMsg(pTempMsg) == -1) g_poolMsgToPut->Release( pTempMsg ); } } else//广播信息 { SVC_HANDLER *pHandler = NULL; MsgToPut *pMsg = NULL; std::set handlers; m_pHandlerManager->Handlers(handlers); for(typename std::set::iterator iter = handlers.begin(); iter != handlers.end(); iter++) { pHandler = *iter; if((pHandler != NULL) && (!pHandler->ActiveConnect())) { pMsg = g_poolMsgToPut->RetrieveOrCreate(); if(pMsg != NULL) { pMsg->nHandleID = (int)pHandler; pMsg->nSessionID = pHandler->SessionID(); pMsg->nMsgLen = pTempMsg->nMsgLen; memcpy(pMsg->pMsg, pTempMsg->pMsg, sizeof(pMsg->pMsg)); if(pHandler->AppendMsg(pMsg) == -1) g_poolMsgToPut->Release( pMsg ); } } } //回收 g_poolMsgToPut->Release(pTempMsg); } } else { if((pTempMsg->nMsgLen < 0) && (pTempMsg->nSessionID <= SRV_SID_MAX)) { SVC_HANDLER *pConnectHandler = m_pHandlerManager->GetHandler(pTempMsg->nSessionID); if(NULL != pConnectHandler) { //设置主动连接 pConnectHandler->ActiveConnect(true); // 设置socket的缓存 if(pConnectHandler->peer().open(SOCK_STREAM, AF_INET, 0, 0) != -1) { int iNewRcvbuf = 512 * 1024; pConnectHandler->peer().set_option(SOL_SOCKET, SO_RCVBUF, &iNewRcvbuf, sizeof(iNewRcvbuf)); pConnectHandler->peer().set_option(SOL_SOCKET, SO_SNDBUF, &iNewRcvbuf, sizeof(iNewRcvbuf)); /*// 设置pConnectHandler GetHandler中已经设置,pConnectHandler->SessionID(pTempMsg->nSessionID);*/ // 地址信息必须为"tango.cs.wustl.edu:1234" or "128.252.166.57:1234"两者之一 ACE_INET_Addr remoteAddr(pTempMsg->pMsg); // 连接 m_pConnector->connect(pConnectHandler, remoteAddr); g_poolMsgToPut->Release( pTempMsg );//释放? } } } } } } DS_CATCH_SIG { std::set handlers; m_pHandlerManager->Handlers(handlers); NewLog( LOG_LEV_DEBUG, "DS_CATCH_SIG, CTaskHelper::svc, nHandleID=%d,nSessionID=%d,nMsgLen=%d, hadlersnum:%d" , pTempMsg->nHandleID, pTempMsg->nSessionID, pTempMsg->nMsgLen, handlers.size() ); if(handlers.size()>0) { for(typename std::set::iterator iter = handlers.begin(); iter != handlers.end(); iter++) { SVC_HANDLER *pHandler = *iter; if (pHandler != NULL) { NewLog( LOG_LEV_DEBUG, "handerid:%d, isActiveConnect:%d", (unsigned int)pHandler, pHandler->ActiveConnect()?1:0 ); } } } continue; } DS_CATCH_NORMAL { std::set handlers; m_pHandlerManager->Handlers(handlers); NewLog( LOG_LEV_DEBUG, "DS_CATCH_NORMAL, CTaskHelper::svc, nHandleID=%d,nSessionID=%d,nMsgLen=%d", pTempMsg->nHandleID, pTempMsg->nSessionID, pTempMsg->nMsgLen ); if(handlers.size()>0) { for(typename std::set::iterator iter = handlers.begin(); iter != handlers.end(); iter++) { SVC_HANDLER *pHandler = *iter; if (pHandler != NULL) { NewLog( LOG_LEV_DEBUG, "handerid:%d, isActiveConnect:%d", (unsigned int)pHandler, pHandler->ActiveConnect()?1:0 ); } } } continue; } DS_END_TRY; } m_pToSwitchQueueRead->clear(); } } END_SIG_CATCH; } ThreadEndTls(); //return 0; return retcode; } template void CTaskHelper::Quit(void) { m_bQuit = true; } template CHandlerManagerEx CNetTaskEx::m_handlerManager; template < class SVC_HANDLER, class ACE_LOCK > CNetTaskEx::CNetTaskEx(IBufQueue *pReadQueue, IBufQueue *pSendQueue) :m_pReadQueue(pReadQueue), m_pSendQueue(pSendQueue), m_pReactor(NULL), m_pTask(NULL), m_pTaskHelper(NULL) { } template CNetTaskEx::~CNetTaskEx(void) { if(NULL != m_pTaskHelper) { delete m_pTaskHelper; m_pTaskHelper = NULL; } if(NULL != m_pTask) { delete m_pTask; m_pTask = NULL; } if(NULL != m_pReactor) { delete m_pReactor; m_pReactor = NULL; } } template int CNetTaskEx::Open(void *args) { if(NULL == args) return -1; if((NULL == m_pReadQueue) || (NULL == m_pSendQueue)) return -1; // 新建反应器 ACE_Reactor_Impl *pImpl = NULL; #if defined(ACE_WIN32) #if defined(USE_SELECT_REACTOR) ACE_NEW_RETURN(pImpl, ACE_Select_Reactor, -1); #else ACE_NEW_RETURN(pImpl, ACE_WFMO_Reactor, -1); #endif/*USE_SELECT_REACTOR*/ #else long maxOpenFiles = ACE_OS::sysconf(_SC_OPEN_MAX); if(maxOpenFiles < 5000) { D_ERROR("进程打开的最大文件数小于5000\n"); return -1; } #if defined(USE_SELECT_REACTOR) //ACE_NEW_RETURN(pImpl, ACE_Select_Reactor, -1); ACE_NEW_RETURN(pImpl, ACE_Dev_Poll_Reactor(/*ACE::max_handles()*/5000, 1), -1); #else ACE_NEW_RETURN(pImpl, ACE_Dev_Poll_Reactor(/*ACE::max_handles()*/5000, 1), -1); #endif/*USE_SELECT_REACTOR*/ #endif/*ACE_WIN32*/ ACE_NEW_RETURN(m_pReactor, ACE_Reactor(pImpl, 1), -1); if(m_handlerManager.Open(5000) == -1) return -1; m_pTask = NEW CTask(m_pReadQueue, m_pSendQueue, m_pReactor, &m_handlerManager); if(NULL == m_pTask) return -1; m_pTaskHelper = NEW CTaskHelper(m_pReadQueue, m_pSendQueue, m_pReactor, &m_handlerManager); if(NULL == m_pTaskHelper) return -1; if(-1 == m_pTask->open(args)) return -1; if(-1 == m_pTaskHelper->open(NULL)) return -1; return 0; } template void CNetTaskEx::Quit(void) { m_pTaskHelper->Quit(); m_pTask->Quit(); } template int CNetTaskEx::Wait(void) { m_pTaskHelper->wait(); m_pTask->wait(); return 0; } #endif/*NET_TASK_EX_CPP*/