#include "HandlerEx.h" //#include "HandlerManager.h" #include "PkgProc/MsgToPut.h" #include "tcache/tcache.h" //#include "CacheMessageBlock.h" #include "Utility.h" #include "SessionManager.h" CHandlerEx::CHandlerEx(void) :m_pReadQueue(NULL), m_pSendQueue(NULL), m_uiSessionID(0), m_iovCount(0), m_iovOffset(0), m_bDeferredClose(false), m_bIsUsed(false), m_bActiveConnect(false), m_bExistDataToSend(false) { } CHandlerEx::CHandlerEx(unsigned int uiPeerBufferSize) :m_pReadQueue(NULL), m_pSendQueue(NULL), m_uiSessionID(0), m_iovCount(0), m_iovOffset(0), m_bDeferredClose(false), m_bIsUsed(false), m_bActiveConnect(false), m_bExistDataToSend(false) { ACE_UNUSED_ARG(uiPeerBufferSize); } CHandlerEx::~CHandlerEx(void) { } int CHandlerEx::open(void *arg) { ACE_UNUSED_ARG(arg); // 具体初始化 return OnLinkUp(); } int CHandlerEx::close(u_long flags) { ACE_UNUSED_ARG(flags); // 重置 Reset(1); return 0; } int CHandlerEx::handle_input(ACE_HANDLE fd) { ACE_UNUSED_ARG(fd); // 临时空间(MsgToPut的整倍数) char szbuf[20* MsgToPut::MSG_MAX_SIZE] = {0}; int iret = 0; do { iret = m_peer.recv(szbuf, sizeof(szbuf)); if(iret == 0) { // 对方关闭连接,则通知主逻辑线程 OnError(0, 0); return -1; } else if(iret == -1) { if(ACE_OS::last_error() == EWOULDBLOCK) { // 阻塞 return 0; } else { // 出错; OnError(1, ACE_OS::last_error()); return -1; } } else { // 读到数据 int iLeft = iret; const char *pszOffset = szbuf; while(iLeft > 0) { // 获取一个msg块 MsgToPut *pMsgToPut =g_poolMsgToPut->RetrieveOrCreate(); if(NULL == pMsgToPut) { D_DEBUG("CHandlerT::handle_input获取块出错.\n"); return 0; } if(iLeft < (int)(MsgToPut::MSG_MAX_SIZE)) { ACE_OS::memcpy(pMsgToPut->pMsg, pszOffset, iLeft); pMsgToPut->nMsgLen = iLeft; } else { ACE_OS::memcpy(pMsgToPut->pMsg, pszOffset, MsgToPut::MSG_MAX_SIZE); pMsgToPut->nMsgLen = MsgToPut::MSG_MAX_SIZE; } iLeft = iLeft - MsgToPut::MSG_MAX_SIZE; pszOffset = pszOffset + MsgToPut::MSG_MAX_SIZE; // 递交给公共队列 HandlePackage((char *)pMsgToPut, sizeof(MsgToPut)); } }//if }while(1); return 0; } int CHandlerEx::handle_output (ACE_HANDLE fd) { ACE_UNUSED_ARG(fd); //sendmsg发送返回值(0:全部发送 1:部分发送 -1:出错) int iRet = 0; while(true) { /////////////旧的/////////////////////////////// ////是否获取后续新的数据? //if((m_iovCount == 0) && (m_iovOffset == 0)) //{ // for(int i = 0; i < UIO_SIZE; i++) // m_msgArray[i] = 0; // // { // //缩小锁定区域 // ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, 0); // // while(!m_messageQueue.empty()) // { // m_msgArray[m_iovCount++] = m_messageQueue.front(); // m_messageQueue.pop(); // if(UIO_SIZE == m_iovCount) // break; // } // if(m_iovCount == 0) // { // //this->reactor()->mask_ops(this, ACE_Event_Handler::WRITE_MASK, ACE_Reactor::CLR_MASK);//没有缓冲数据,清除写事件 // this->reactor()->remove_handler(this, ACE_Event_Handler::WRITE_MASK|ACE_Event_Handler::DONT_CALL); // return 0; // } // } // //D_DEBUG("ChandlerEx::handle_output, get msg, 实际获取:%d, 最大:%d, 取后队列当前大小:%d, \n", m_iovCount, UIO_SIZE, m_messageQueue.size()); // if(m_iovCount > 0) // { // for(int i = 0; i < m_iovCount; i++) // { // m_iov[i].iov_base = m_msgArray[i]->pMsg; // m_iov[i].iov_len = m_msgArray[i]->nMsgLen; // } // } //} //if(m_iovCount > 0) //{ // iRet = SendMsg(); // if(-1 == iRet) // { // D_DEBUG("handle_out, sendmsg return -1\n"); // return -1;//出错回调remove_handler, 进而回调handle_close // } // else if(1 == iRet) // return 0;//等待下次再次调用 // else // { // if(m_bDeferredClose) // { // ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, 0); // if(m_messageQueue.empty()) // return -1;//延迟关闭后发送全部数据,直接返回-1(回调remove_handler, 进而回调handle_close) // } // } //} //////////////////////新的//////////////////////////////// if((m_iovCount == 0) && (m_iovOffset == 0)) { if(m_messageQueue.empty()) { ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, 0); if(m_messageQueue.empty()) { m_bExistDataToSend = false; if(!m_bDeferredClose) { this->reactor()->mask_ops(this, ACE_Event_Handler::WRITE_MASK, ACE_Reactor::CLR_MASK); return 0;//临时取消写事件 } else { //OnError(); return -1;//永远取消写事件,并销毁 } } } else { // 可以考虑优化 for(int i = 0; i < UIO_SIZE; i++) m_msgArray[i] = 0; { //缩小锁定区域 ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, 0); while(!m_messageQueue.empty()) { m_msgArray[m_iovCount++] = m_messageQueue.front(); m_messageQueue.pop(); if(UIO_SIZE == m_iovCount) break; } } //D_DEBUG("ChandlerEx::handle_output, get msg, 实际获取:%d, 最大:%d, 取后队列当前大小:%d, \n", m_iovCount, UIO_SIZE, m_messageQueue.size()); for(int i = 0; i < m_iovCount; i++) { if ( (m_msgArray[i]->nMsgLen <= 0) || ( (unsigned int)(m_msgArray[i]->nMsgLen) > ARRAY_SIZE(m_msgArray[i]->pMsg) )) { D_ERROR( "CHandlerEx::handle_output,错误的msglen:%d, msgHandleID:%d, sessionID:%d\n" , m_msgArray[i]->nMsgLen, (unsigned long)this, m_uiSessionID ); continue; } m_iov[i].iov_base = m_msgArray[i]->pMsg; m_iov[i].iov_len = m_msgArray[i]->nMsgLen; } } } iRet = SendMsg(); if(-1 == iRet) { OnError(2, 0); return -1;//出错回调remove_handler, 进而回调handle_close } else if(1 == iRet) return 0;//等待下次再次调用 } return 0;//继续回调handle_output } int CHandlerEx::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask) { ACE_UNUSED_ARG(handle); bool bCloseNow = false;//立刻关闭 if(ACE_Event_Handler::READ_MASK == close_mask) { //////旧的////// //ACE_GUARD_RETURN(ACE_Thread_Mutex, mont, this->m_mutex, -1); //if(!m_messageQueue.empty() || (m_iovCount != 0) || (m_iovOffset != 0) ) // m_bDeferredClose = true; //else // bCloseNow = true;//没有待发送的缓存信息 //////新的////// //if(!m_bExistDataToSend) //{ // ACE_GUARD_RETURN(ACE_Thread_Mutex, mont, this->m_mutex, -1); // if(!m_bExistDataToSend) // { // bCloseNow = true;//没有待发送的缓存信息 // } //} //else // m_bDeferredClose = true; m_bDeferredClose = true; this->reactor()->register_handler(this, ACE_Event_Handler::WRITE_MASK); //if(!m_bExistDataToSend) //{ // ACE_GUARD_RETURN(ACE_Thread_Mutex, mont, this->m_mutex, -1); // if(!m_bExistDataToSend) // { // bCloseNow = true;//没有待发送的缓存信息 // } //} //else // m_bDeferredClose = true; } if(ACE_BIT_ENABLED(close_mask, ACE_Event_Handler::WRITE_MASK))//1.写出错2.延缓关闭为真 { this->reactor()->remove_handler(this, ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL); bCloseNow = true; } if(bCloseNow) this->Reset(0);// 关闭连接,释放资源,重置相关数据 return 0; } ACE_SOCK_Stream & CHandlerEx::peer(void) const { return (ACE_SOCK_Stream&)this->m_peer; } ACE_HANDLE CHandlerEx::get_handle(void) const { return m_peer.get_handle(); } IBufQueue * CHandlerEx::ReadQueue(void) { return m_pReadQueue; } void CHandlerEx::ReadQueue(IBufQueue *pReadQueue) { m_pReadQueue = pReadQueue; } IBufQueue * CHandlerEx::SendQueue(void) { return m_pSendQueue; } void CHandlerEx::SendQueue(IBufQueue *pSendQueue) { m_pSendQueue = pSendQueue; } unsigned int CHandlerEx::SessionID(void) { return m_uiSessionID; } void CHandlerEx::SessionID(unsigned int uiSessionID) { m_uiSessionID = uiSessionID; } int CHandlerEx::AppendMsg(MsgToPut *pMsgToPut) { if(NULL == pMsgToPut) { D_ERROR("CHandlerEx::AppendMsg中(NULL == pMsgToPut)\n"); return -1; } if((unsigned int)(pMsgToPut->nSessionID) != this->m_uiSessionID) { D_ERROR("CHandlerEx::AppendMsg中(pMsgToPut->nSessionID != this->m_uiSessionID) ,pMsgToPut->nSessionID=%d, this->m_uiSessionID=%d\n", pMsgToPut->nSessionID, this->m_uiSessionID); return -1; } ////////旧的///////////////////////////// //// 是否为关闭消息? //if(IsCloseMsg(pMsgToPut)) //{ // //通知主应用关闭 // OnError(); // ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, -1); // if(m_messageQueue.empty() && (m_iovCount == 0) && (m_iovOffset == 0) ) // this->reactor()->remove_handler(this, ACE_Event_Handler::WRITE_MASK);//直接关闭 // else // m_bDeferredClose = true;//延迟关闭 // return -1; //} //else //{ // bool bflag = false; // ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, -1); // // if(m_messageQueue.empty() && (m_iovCount == 0) && (m_iovOffset == 0)) // bflag = true; // m_messageQueue.push(pMsgToPut); // if(bflag) // //this->reactor()->mask_ops(this, ACE_Event_Handler::WRITE_MASK, ACE_Reactor::ADD_MASK); // this->reactor()->register_handler(this, ACE_Event_Handler::WRITE_MASK); // //m_messageQueue.push(pMsgToPut); // //D_DEBUG("Appendmsg(), m_messageQueue.size() = %d\n", m_messageQueue.size()); //} /////新的//////// // 是否为关闭消息? if(IsCloseMsg(pMsgToPut)) { //通知主应用关闭 OnError(3, 0); //if(!m_bExistDataToSend) //{ // ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, -1); // if(!m_bExistDataToSend) // { // this->reactor()->remove_handler(this, ACE_Event_Handler::WRITE_MASK);//直接关闭 // return -1; // } //} // //m_bDeferredClose = true;//延迟关闭 //return -1; ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, -1); if(!m_bExistDataToSend) this->reactor()->remove_handler(this, ACE_Event_Handler::WRITE_MASK);//直接关闭 else m_bDeferredClose = true;//延迟关闭 return -1; } else { /*if(!m_bExistDataToSend) { ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, -1); if(!m_bExistDataToSend) { this->reactor()->mask_ops(this, ACE_Event_Handler::WRITE_MASK, ACE_Reactor::ADD_MASK); m_messageQueue.push(pMsgToPut); } } else { ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, -1); m_messageQueue.push(pMsgToPut); } *///D_DEBUG("Appendmsg(), m_messageQueue.size() = %d\n", m_messageQueue.size()); ACE_GUARD_RETURN(ACE_Thread_Mutex, mon, m_mutex, -1); m_messageQueue.push(pMsgToPut); if(!m_bExistDataToSend) { m_bExistDataToSend = true; this->reactor()->mask_ops(this, ACE_Event_Handler::WRITE_MASK, ACE_Reactor::ADD_MASK); } } return 0; } int CHandlerEx::OnLinkUp(void) { //m_messageQueue.clear(); for(int i = 0; i < UIO_SIZE; i++) m_msgArray[i] = 0; //iovec m_iov[UIO_SIZE]; m_iovCount = 0; m_iovOffset = 0; // 打印连接对端信息 ACE_INET_Addr remoteAddr; GetRemoteAddress(remoteAddr); D_DEBUG("连接成功,客户信息(%s:%d)\n", remoteAddr.get_host_addr(), remoteAddr.get_port_number()); // 注册网络读事件 if(this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) { D_ERROR("CHandlerEx::open中register_handler出错\n"); return -1; } // 网络层通知应用层 MsgToPut *pMsgToPut =g_poolMsgToPut->RetrieveOrCreate(); if(NULL == pMsgToPut) { D_DEBUG("CHandlerT::sendMsg获取块出错.\n"); return -1; } #ifdef ACE_WIN32 #pragma warning( push ) #pragma warning( disable: 4311 ) //"type cast" pointer truncation from ??* to ?? pMsgToPut->nHandleID = (int)this; #pragma warning( pop ) #else //ACE_WIN32 pMsgToPut->nHandleID = (int)this; #endif //ACE_WIN32 pMsgToPut->nSessionID = m_uiSessionID; pMsgToPut->nMsgLen = -1; // 发送连接建立消息 this->ReadQueue()->PushMsg(pMsgToPut); //debug //D_DEBUG("CHandlerEx::OnLinkUp, m_uiSessionID = %d\n", m_uiSessionID); //debug return 0; } int CHandlerEx::OnLinkDown(u_long flags) { // 打印连接对端信息 ACE_INET_Addr remoteAddr; GetRemoteAddress(remoteAddr); D_DEBUG("连接断开,客户信息(%s:%d)\n", remoteAddr.get_host_addr(), remoteAddr.get_port_number()); if(flags != 0) { // 取消已注册的处理器事件 this->reactor()->remove_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); } // 关闭底层句柄 m_peer.close(); return 0; } void CHandlerEx::OnError(int closeType, int arg) { ////////////////////////////////////////////////////////////////////////// D_DEBUG("CHandlerEx::OnError():sessionid=%d, closeType=%d, arg=%d\n", m_uiSessionID, closeType, arg); ////////////////////////////////////////////////////////////////////////// MsgToPut *pErrorMsg =g_poolMsgToPut->RetrieveOrCreate(); if(NULL == pErrorMsg) { D_DEBUG("CHandlerT::sendMsg获取块出错.\n"); return; } // 通知主逻辑模块 pErrorMsg->nMsgLen = 0; HandlePackage((char *)pErrorMsg, sizeof(MsgToPut)); return; } int CHandlerEx::HandlePackage(char* pszPack, unsigned int uiPakcLen) { ACE_UNUSED_ARG(uiPakcLen); if(pszPack == NULL) return -1; // 设置附加信息 MsgToPut *pMsgToPut = (MsgToPut *)pszPack; #ifdef ACE_WIN32 #pragma warning( push ) #pragma warning( disable: 4311 ) pMsgToPut->nHandleID = (int)this; #pragma warning( pop ) #else /*ACE_WIN32*/ pMsgToPut->nHandleID = (int)this; #endif/*ACE_WIN32*/ pMsgToPut->nSessionID = this->SessionID(); // 插入已读队列 this->ReadQueue()->PushMsg(pMsgToPut); return 0; } int CHandlerEx::GetRemoteAddress(ACE_INET_Addr & remoteAddress) { return m_peer.get_remote_addr(remoteAddress); } bool CHandlerEx::IsCloseMsg(MsgToPut *pMsgToPut) { if(NULL == pMsgToPut) return false; if(pMsgToPut->nMsgLen > 0) return false; return true; } bool CHandlerEx::IsUsed(void) { return m_bIsUsed; } void CHandlerEx::IsUsed(bool bIsUsed) { m_bIsUsed = bIsUsed; } bool CHandlerEx::ActiveConnect(void) { return m_bActiveConnect; } void CHandlerEx::ActiveConnect(bool bActiveConnect) { m_bActiveConnect = bActiveConnect; } void CHandlerEx::Reset(u_long flags) { if(m_peer.get_handle() != ACE_INVALID_HANDLE) { // 已连接则关闭连接 OnLinkDown(flags); } // 重置队列 m_pReadQueue = NULL; m_pSendQueue = NULL; // 重置SessionID m_uiSessionID = 0; m_iovCount = 0; m_iovOffset = 0; // 重置延迟关闭 m_bDeferredClose = false; // 回收处理器 m_bIsUsed = false; // 是否主动连接 m_bActiveConnect = false; // 重置待发标志 m_bExistDataToSend = false; } int CHandlerEx::SendMsg(void) { // 发送m_iov while(m_iovOffset < m_iovCount) { ssize_t n = peer().sendv(&m_iov[m_iovOffset], m_iovCount - m_iovOffset); ////检查 EOF //if (n == 0) // return 0; if (n == -1)// 是否有错误? { if (errno == EWOULDBLOCK || errno == ENOBUFS) return 1; // 阻塞? else return -1;// 其他错误 } while((m_iovOffset < m_iovCount) && (n >= static_cast (m_iov[m_iovOffset].iov_len))) { n -= m_iov[m_iovOffset].iov_len; g_poolMsgToPut->Release(m_msgArray[m_iovOffset]); m_iovOffset++; } if(n != 0) { m_iov[m_iovOffset].iov_base = (char *)(m_iov[m_iovOffset].iov_base) + n; m_iov[m_iovOffset].iov_len = m_iov[m_iovOffset].iov_len - n; } } m_iovCount = 0; m_iovOffset = 0; return 0;//全部发出 }