/* linux下socket; by dzj, 09.05.04 */ #ifndef LSOCKET_H #define LSOCKET_H #ifdef WIN32 #include "rcvbuf.h" #include "manframe/manframe.h" class CLSocket : public IFrameEle { public: CLSocket() { PoolObjInit(); }; ~CLSocket() { } virtual void DestorySelfIns() { delete this; } ;//对于LSocket,删除自身,只有监听端口可能是这样,CTestPlayer会从对象池分配,因此会释放回对象池; public: virtual void IssueDestory( bool isForce = false ) {}; public: ///关闭socket,并重新初始化; virtual bool InnerClose() { return true; }; public: //被CManFrame调用,一般是之前将自身设为活动的结果; bool OnActiveBeExplored() { NewLog( LOG_LEV_INFO, "CLSocket::OnActiveBeExplored,selfID:%d", GetSelfFrameID() ); return true; };//被CManFrame调用,一般是之前将自身设为活动的结果; ///被CManFrame调用,一般为定时任务; bool OnValidBeExplored() { NewLog( LOG_LEV_INFO, "CLSocket::OnValidBeExplored,selfID:%d", GetSelfFrameID() ); return true; };//被CManFrame调用,一般为定时任务; ///FrameEle元素需要的框架消息发送实现(应用线程上下文); virtual void CrdInsSendMsg( NewMsgToPut* pToSendMsg ) { if ( NULL == pToSendMsg ) { return; } return; }; public: PoolFlagDefine() { return; };//池对象需要; }; #endif //WIN32 #ifndef WIN32 #define USE_RCV_BUF #include "newcache.h" #include "dsepoll.h" #include "manframe/manframe.h" #include "rcvbuf.h" ///当前socket状态; enum LSocketStat { LS_LISTENING = 0, //监听端口监听中。。。,等待连接; LS_CONNECTING, //正在连接中; LS_CONNECTED, //已连接 LS_DISCONNING, //断开中 LS_DISCONNED, //已断开 LS_NEWSOCKET, //新建socket,还未进行任何操作; LS_INVALID //无效socket; }; #define LS_FIRSTQUE_SIZE 256 //优先发送队列大小; #define LS_RCVBUF_NUM 10 //接收缓存个数; class CLSocket : public IFrameEle { public: CLSocket() { m_pSendQueue = NEW LSOCKET_QUEUE( NULL/*无通知事件*/ );//待发送消息; m_pRcvQueue = NEW LSOCKET_QUEUE( NULL/*无通知事件*/ );//待处理接收消息; PoolObjInit(); }; ///除非之前已先关闭,并设置自身需要遍历中删去,否则不可删,因为有可能epoll线程正在使用此socket; /// 也就是说,断开socket时,不要直接删CLSocket,应该InnerClose,然后留待框架去删除 virtual ~CLSocket() { //主线程中不执行此操作,InnerClose();//关闭内部socket; ReleaseInnerQueue();//内部队列中未处理完毕的消息; delete m_pSendQueue; m_pSendQueue = NULL; delete m_pRcvQueue; m_pRcvQueue = NULL; if ( -1 != m_fdSocket ) { close( m_fdSocket ); } } virtual void DestorySelfIns() { OnFrameEleDestoryed(); delete this; } ;//对于LSocket,删除自身,只有监听端口可能是这样,CTestPlayer会从对象池分配,因此会释放回对象池; ///以下池对象需要函数; public: PoolFlagDefine() { #ifdef USE_CRYPT /////////////////////////////////////////////////////////// //初始化cryptins密; m_Crypt.InitCryptBF(); //unsigned char cipherkey[16]; //for ( int i=0; i<16; ++i ) //{ // cipherkey[i] = i; //} //m_Crypt.SetSendKey( (const char*)cipherkey, 16 ); //m_Crypt.SetRcvKey( (const cahr*)cipherkey, 16 ); //初始化cryptins密钥; /////////////////////////////////////////////////////////// #endif //USE_CRYPT m_brorcvid = 0;//测试广播消息 m_isConnedInitExeced = false;//初始尚未调用已连接初始化接口; m_lastrcvTick = GetTickCount(); m_bActiveClose = false; SetCurStat( LS_INVALID );//初始状态; memset( m_remoteAddr, 0, sizeof(m_remoteAddr) ); m_remotePort = 0; memset( m_localAddr, 0, sizeof(m_localAddr) ); m_localPort = 0; m_epfdArr = NULL; m_epfdArrSize = 0; m_fdSocket = -1; m_sendfirstSt = 0;//优先发送队列的发送起始位置 m_sendfirstEnd = 0;//优先发送队列的发送结束位置(<而不是<=) ReleaseInnerQueue();//内部队列中未处理完毕的消息; m_rcvBuf.Reset(); SocketChildPoolInit();//子类可能需要执行的相应初始化; return; };//池对象需要; protected: //子类可能需要执行的池对象初始化; virtual void SocketChildPoolInit() {}; private: ///释放内部队列中未处理完毕的消息; bool ReleaseInnerQueue(); protected: //请求销毁自身; virtual void IssueDestory( bool isForce = false /*好像有漏掉的情况,因此加上这一标记,在间隔太长时间时重新发一次AppSendPost,以求保险,最好能查到问题*/) { if ( ( m_bActiveClose ) && (!isForce) ) { //之前其它操作已置断连标记; return; } if ( !isForce ) { NewLog( LOG_LEV_DEBUG, "主动断连发起,socket%d", GetInnerSocket() ); } else { NewLog( LOG_LEV_DEBUG, "强制:主动断连发起,socket%d", GetInnerSocket() ); } m_bActiveClose = true; AppSendPost();//激活一次写,确保一次写EPoll事件,在该事件到来时会检测到m_bActiveClose从而执行SelfClose,继而触发框架活动扫描标记,在活动扫描时会检测到此socket已无效,从而删去自身; } private: bool m_bActiveClose;//主动关闭标记,epoll线程检测到此标记后应立即执行SelfClose,并快速返回,不再执行多余操作以免冲突,参见m_bIsToDelSelf; public: inline void OnEPollEvent( __uint32_t epollEvents ) { NewLog( LOG_LEV_INFO, "socket%d收到EPoll事件%d", GetInnerSocket(), epollEvents ); if ( m_bActiveClose ) { NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d,OnEPollEvent_st_epoll_event_handle", GetInnerSocket() ); SelfClose(); return; } if ( EPOLLERR & epollEvents ) { //NewLog( LOG_LEV_INFO, "EPOLLERR" ); //if ( !OnEPollError( 0xffff ) )//由于-1!=waitrst,因此tmperr中不是真正的socket错误号,虽然多半是对端关闭,但如果一定要知道真正的socket错误号,可以在OnEPollError中再对此socket作一次操作。 //{ // NewLog( LOG_LEV_DEBUG, "EPOLLERR,socket%d准备断开", GetInnerSocket() ); // SelfClose(); // return; //} OnEPollError( 0xffff ); NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d,OnEPollEvent_epollerr_handle", GetInnerSocket() ); SelfClose(); return; } if ( EPOLLOUT & epollEvents ) //EPollOut放在EPoll前,因为可能连接与数据一同到来 { //NewLog( LOG_LEV_INFO, "EPOLLOUT" ); if ( !OnEPollOut() ) { NewLog( LOG_LEV_INFO, "EPOLLOUT,socket%d准备断开", GetInnerSocket() ); NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d,OnEPollEvent_epollout_handle", GetInnerSocket() ); SelfClose(); return; } } if ( EPOLLIN & epollEvents ) { //NewLog( LOG_LEV_INFO, "EPOLLIN" ); if ( !OnEPollIn() ) { NewLog( LOG_LEV_INFO, "EPOLLIN,socket%d准备断开", GetInnerSocket() ); NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d,OnEPollEvent_epollin_handle", GetInnerSocket() ); SelfClose(); return; } } if ( EPOLLHUP & epollEvents ) { //NewLog( LOG_LEV_INFO, "EPOLLHUP" ); if ( !OnEPollHup() ) { NewLog( LOG_LEV_INFO, "EPOLLHUP,socket%d准备断开", GetInnerSocket() ); SelfClose(); NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d,OnEPollEvent_epoll_hup_handle", GetInnerSocket() ); return; } } if ( m_bActiveClose ) { SelfClose(); NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d,OnEPollEvent_end_poll_handle", GetInnerSocket() ); return; } return; } ///以下继承类需要定义的接口,这些接口都由主循环调用,由于可能直接NEW监听端口,因此给这些接口一些空实现; public: ///收到消息; #ifdef TEST_CODE virtual bool OnRcvedMsg( bool isBroPkg, unsigned int pkgid, unsigned int rcvLen ) { isBroPkg = isBroPkg; pkgid = pkgid; rcvLen = rcvLen; return true; };//测试代码,只传递是否广播包; #else //TEST_CODE virtual bool OnPkgRcved( unsigned int timeInfo, unsigned short wCmd, const char* pPkg, unsigned short wPkgLen ) { NewLog( LOG_LEV_ERROR, "错误,lsocket::OnPkgRcved自身不应处理,应交子类处理,%d:%s:%d", wCmd, pPkg, wPkgLen ); return true; };//gatesrv使用,传递消息内容供应用处理; #endif //TEST_CODE ///被框架遍历; virtual bool OnBeExploredByFrame() { return true; }; ///连接成功后,给对象一个初始化自身的机会,每连接只执行一次; virtual bool OnConnedInit() { return true; }; public: bool m_isConnedInitExeced;//对象连接后初始化接口OnConnedInit是否已调用; public: ///关闭socket,并重新初始化; virtual bool InnerClose(); ///以下管理框架需要函数; public: //被CManFrame主线程调用,一般是之前将自身设为活动的结果(//注意:有可能当前欲遍历句柄实际已无效,具体参见CManFrame中ClearActiveFlag中的说明); bool OnActiveBeExplored(); ///被CManFrame调用,一般为定时任务; bool OnValidBeExplored(); public: void SetEPollArr( CDsEPoll* epfdArr, //epoll句柄数组; int epfdArrSize //epoll句柄数组大小; ) { m_epfdArr = epfdArr; m_epfdArrSize = epfdArrSize; InitLSocket(); } ///使用已有socket建LSocket,用于监听socket接收新连接; void SetEPollArrWithFd( int sockfd , CDsEPoll* epfdArr //epoll句柄数组 , int epfdArrSize //epoll句柄数组大小 ) { m_epfdArr = epfdArr; m_epfdArrSize = epfdArrSize; InitLSocketByFd( sockfd ); } private: ///使用socket fd初始化自身; void InitLSocketByFd( int sockfd ) { //与InitLSocket的区别,不清远端地址与端口,因为当前为已连接,且调用前必定已设远端地址与端口; memset( m_localAddr, 0, sizeof(m_localAddr) ); m_localPort = 0; //已连接; memset( &m_careEvent, 0, sizeof(m_careEvent) ); m_careEvent.events = EPOLLIN|EPOLLERR|EPOLLONESHOT; m_careEvent.data.ptr = (void*)this; CreateInnerSocketByFd( sockfd ); } ///初始化socket; void InitLSocket() { memset( m_remoteAddr, 0, sizeof(m_remoteAddr) ); m_remotePort = 0; memset( m_localAddr, 0, sizeof(m_localAddr) ); m_localPort = 0; //已连接; memset( &m_careEvent, 0, sizeof(m_careEvent) ); m_careEvent.events = EPOLLIN|EPOLLERR|EPOLLONESHOT; m_careEvent.data.ptr = (void*)this; CreateInnerSocket(); } public: ///设置socket对应的远端地址; bool SetRemoteAddr( const char* remoteAddr, unsigned short remotePort ); ///socket相关操作; public: ///连接远端地址(应用线程上下文); bool LSConnect( const char* remoteAddr, unsigned short remotePort ); ///监听本地端口(应用线程上下文); bool LSListen( const char* localAddr, unsigned short localPort ); ///向远端发送系列消息(应用线程上下文); bool LSSendMsg( NewMsgToPut** pToSend, int toSendNum ); #ifdef USE_CRYPT bool LSSendMsgCryptInnerCpy( char* pToSend, int toSendLen );//信息加密发送,并且会在内部拷贝一份待发信息,因此pToSend可为自动变量,调用完毕后可被释放或重用; #endif //USE_CRYPT ///从远端收系列消息(应用线程上下文); bool LSRcvMsg( NewMsgToPut** pToRcv, const int rcvBufNum, int& rcvMsgNum ); ///FrameEle元素需要的框架消息发送实现(应用线程上下文); virtual void CrdInsSendMsg( NewMsgToPut* pToSendMsg ) { if ( NULL == pToSendMsg ) { return; } #ifdef USE_CRYPT //加密情形下,通过调用LSSendMsgCryptInnerCpy在本函数内部拷贝pToSendMsg,因为传入的pToSendMsg要被其它socket共用; if ( pToSendMsg->msgLen + sizeof(int) > sizeof(m_toEnc) ) { NewLog( LOG_LEV_DEBUG, "CrdInsSendMsg,socket%d,待发消息过长,msgLen:%d", GetInnerSocket(), pToSendMsg->msgLen ); return; } memcpy( m_toEnc+sizeof(int)/*空出放包序号的字节*/, pToSendMsg->msgBuf, pToSendMsg->msgLen ); unsigned int toenclen = pToSendMsg->msgLen + sizeof(int); LSSendMsgCryptInnerCpy( m_toEnc, toenclen ); #else //USE_CRYPT //非加密情形下,传入的是专为本socket分配的pToSendMsg,因此不需要拷贝; LSSendMsg( &pToSendMsg, 1 ); #endif //USE_CRYPT return; }; #ifdef USE_CRYPT ///设置发送加密开始; bool CliStSendCrypt() { if ( m_Crypt.IsStSendCrypt() ) { //重复设置加密开始; return false; } StEncDec tmpToSend; LSSendMsgCryptInnerCpy( (char*)&tmpToSend, sizeof(tmpToSend) ); return true; } bool CliSetSendRcvCryptKey( const char* cipher, int cipherlen ) { m_Crypt.SetSendKey( cipher, cipherlen ); m_Crypt.SetRcvKey( cipher, cipherlen ); return true; } inline bool IsCliStRcvCrypt() { return m_Crypt.IsStRcvCrypt(); } private: DsCryptBF m_Crypt; char m_toEnc[1024]; #endif //USE_CRYPT private: ///重连远端地址; bool InnerConnect(); ///监听本地地址; bool InnerListen(); ///处理新连入连接; bool InnerProcIncomeConn(); ///内部数据发送(真正向端口发送); bool InnerSend(); ///内部数据接收(真正从端口接收); bool InnerRcv(); ///发送优先队列中数据,如果全部发完,则返回真,否则返回假; bool SendFirstSendQue( bool& isNeedClose ); ///socket相关回调,返回值表明socket上是否发生错误需要关闭; public: ///出错回调(EPOLL线程上下文); bool OnEPollError( int errNum );//由于-1!=waitrst,因此tmperr中不是真正的socket错误号,虽然多半是对端关闭,但如果一定要知道真正的socket错误号,可以在OnEPollError中再对此socket作一次操作。 ///EPOLLHUP回调(EPOLL线程上下文); bool OnEPollHup(); ///EPOLLOUT回调(EPOLL线程上下文); bool OnEPollOut(); ///EPOLLIN(EPOLL线程上下文) bool OnEPollIn(); private: //是否已设关心EPollOut事件,主要为了防止多线程竞态(A线程置写标记,还未来得及写,就被B线程的只关心读标记覆盖,导致事件丢失,具体参见'todo'文档); bool m_bCareEPollOut; ///EPOLL关心事件操作; private: ///添加至EPOLL并设置关心事件; void AddToEPoll( int flag ); ///添加EPOLL关心事件; void SetEPollCareFlag( int flag ); ///显式关闭socket之上的后续消息,之所以需要这一函数,主要是因为:一系列epoll消息到来时,有可能处理第1个消息--close fd之前到来了新消息,从而使得返回后处理新消息时找不到对应的socket对象; void DelFromEPoll(); ///内部回调; private: ///新创建socket成功时回调; void OnNewCreated(); ///连接远端成功; void OnConnected( const char* remoteAddr, unsigned short remotePort ); private: ///建内部socket; void CreateInnerSocket(); ///使用输入socket作为内部socket; void CreateInnerSocketByFd( int sockfd ); ///根据内部socket的值取对应的epoll句柄位置; int HashSocketFD( int fdSocket ) { return fdSocket % m_epfdArrSize; } public: ///取内部socket句柄; inline int& GetInnerSocket() { return m_fdSocket; }; public: ////////////////////////////////////////////////////////////////////////// //新增NewInconnSetCurStat与NewOutconnSetCurStat,区分新建连接与连入连接, // 因为连入连接有可能还没来得及设好connected状态(在监听端口所属线程)便收到其它epoll事件, // 从而导致其它依赖于connected状态的事件处理失败... inline void NewInconnSetCurStat() { m_InnerStat = LS_CONNECTED; OnNewCreated(); OnConnected( m_remoteAddr, m_remotePort ); } inline void NewOutconnSetCurStat() { m_InnerStat = LS_NEWSOCKET; OnNewCreated(); } ///设置当前状态; inline void SetCurStat( LSocketStat curStat ) { m_InnerStat = curStat; if ( LS_CONNECTED == m_InnerStat ) { OnConnected( m_remoteAddr, m_remotePort ); } return; } //...新增NewInconnSetCurStat与NewOutconnSetCurStat,区分新建连接与连入连接, // 因为连入连接有可能还没来得及设好connected状态(在监听端口所属线程)便收到其它epoll事件, // 从而导致其它依赖于connected状态的事件处理失败 ////////////////////////////////////////////////////////////////////////// inline bool IsNewSocket() { return LS_NEWSOCKET == m_InnerStat; } inline bool IsConnecting() { return LS_CONNECTING == m_InnerStat; } inline bool IsConnected() { return LS_CONNECTED == m_InnerStat; } inline bool IsListening() { return LS_LISTENING == m_InnerStat; } private: CDsEPoll* m_epfdArr;//epoll句柄数组; int m_epfdArrSize;//epoll句柄数组大小; int m_epfdPos;//本socket使用的epfd(由m_fdSocket决定); private: int m_fdSocket;//内部描述符; LSocketStat m_InnerStat;//当前socket状态; private: LSOCKET_QUEUE* m_pSendQueue;//待发送消息; LSOCKET_QUEUE* m_pRcvQueue;//待处理接收消息; NewMsgToPut* m_arrSendFirst[LS_FIRSTQUE_SIZE];//优先发送队列(仅由EPOLL线程上下文使用修改); iovec m_arrSendFirstTmp[LS_FIRSTQUE_SIZE];//优先发送队列(仅由EPOLL线程上下文使用修改); int m_sendfirstSt;//优先发送队列的发送起始位置 int m_sendfirstEnd;//优先发送队列的发送结束位置(<而不是<=) CRcvBuf m_rcvBuf;//socket收包缓存; public: inline const char* GetLSocketRemoteAddr() { return &(m_remoteAddr[0]); } inline const unsigned int GetLSocketRemotePort() { return m_remotePort; } private: char m_remoteAddr[32];//远端地址(对accept用socket无效) unsigned int m_remotePort;//远端端口(对accept用socket无效) private: char m_localAddr[32];//本地地址(对accept用socket无效) unsigned int m_localPort;//本地地址(对accept用socket无效) private: epoll_event m_careEvent;//关心事件; private: unsigned int m_lastrcvTick;//上次收包tick,超过一定时间未收到包,删去此socket; private: unsigned int m_brorcvid;//测试广播消息 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /* 发送消息遗漏防止; ------------------------------------------------------- 应用写线程 | epoll发送线程 ------------------------------------------------------- push信息 | m_isCareNewOut = false m_isCareNewOut=true | 弹出信息 care EPOLLOUT | check m_isCareNewOut,若true,则再次care EPOLLOUT; ------------------------------------------------------- 这样处理,可以防止epoll线程弹出信息之后,应用写线程连续执行压信息并设置EPOLLOUT时,丢失EPOLLOUT,导致没有后续消息时的最后一部分消息丢失 漏洞是可能在epoll弹出了应用写线程所写消息的情况下,仍有可能再次care EPOLLOUT,但这种情况没有恶果,只是多消耗了计算资源 by dzj, 09.05.06 */ private: //epoll线程执行 void EPollSendPre() { m_isCareNewOut = false; } //epoll线程执行 void EPollSendBH() { if ( m_isCareNewOut ) { NewLog( LOG_LEV_INFO, "EPollSendBH,socket%d,下半部激活......", m_fdSocket ); SetEPollCareFlag( EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLONESHOT );//等待可发送通知事件; NewLog( LOG_LEV_INFO, "EPollSendBH,socket%d,care写", m_fdSocket ); } } //应用写线程Push信息后执行 void AppSendPost() { m_isCareNewOut = true; SetEPollCareFlag( EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLONESHOT );//等待可发送通知事件; NewLog( LOG_LEV_INFO, "AppSendPost,socket%d,care写", m_fdSocket ); } private: //是否在epoll线程执行期间有新的待发信息到来,每次epoll线程开始发送操作之前清此标记,发送操作完毕后再检查此标记以决定是否要重开OUT事件; bool m_isCareNewOut;//是否在epoll线程执行期间有新的待发信息到来,每次epoll线程开始发送操作之前清此标记,发送操作完毕后再检查此标记以决定是否要重开OUT事件; /*发送消息遗漏防止*/ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// private: ///fd数量限制跟踪; void PrintFdNum(); }; #endif //WIN32 #endif //#ifndef LSOCKET_H