/** * @file RcvPkgHandler.h * @brief 收包处理以及接口定义及实现头文件 * Copyright(c) 2007,上海第九城市游戏研发部 * All rights reserved * 文件名称:RcvPkgHandler.h * 摘 要:本文件用于收包处理接口类; * 作 者:dzj * 修改日期:2007.11.09,部分注释方式修改;07.11.28,改为pkghandler,用于即时处理; * ////////////////////////////////////////////////////////////////////////// * //包缓存 * //by dzj, 06.10.07; * ////////////////////////////////////////////////////////////////////////// */ #pragma once #include "../tcache/tcache.h" #include "CliProtocol.h" #ifdef USE_DSIOCP #include "iocp/dsiocp.h" #endif //USE_DSIOCP #include "../../Test/testthreadque_wait/sigexception/sigexception.h" using namespace MUX_PROTO; extern const unsigned char SELF_PROTO_TYPE;//自身为GateSrv,定义见srvProtocol.h; //////////////////////////////////////////////////////////// //COMM_BUG ///包缓存大小; const unsigned int DS_PKG_BUFSIZE = MAX_MSG_SIZE*2;//包缓存大小,由于单个包不可能大于1024,因此可以设成这样,每个包来了都即时处理; //const unsigned int DS_PKG_BUFSIZE = 20480;//包缓存大小,由于单个包不可能大于1024,因此可以设成这样,每个包来了都即时处理; //COMM_BUG //////////////////////////////////////////////////////////// ///收包处理器定义; ///发包时,也根据每个连接的nHandleID以及nSessionID向网络模块请求发送消息; template class CRcvPkgHandler { public: ///构造,定义缓存大小; CRcvPkgHandler() : m_dwTotalSize( DS_PKG_BUFSIZE ), m_dwToProc(0), m_dwDataEnd(0) { }; ///析构,释放内部缓存; virtual ~CRcvPkgHandler(void) { PoolObjInit(); }; PoolFlagDefine() { TRY_BEGIN; m_bIsCurrentValid = true; m_dwTotalSize = DS_PKG_BUFSIZE; m_dwToProc = 0; m_dwDataEnd = 0; m_PkgProcer.OnDestoryed(); TRY_END; } ///建立通信句柄时设置句柄相关信息,以便发包时使用; #ifdef USE_DSIOCP void SetHandleInfo( int nHandleID, int nSessionID, UniqueObj< CDsSocket >* pUniqueSocket ) #else //USE_DSIOCP void SetHandleInfo( int nHandleID, int nSessionID ) #endif //USE_DSIOCP { TRY_BEGIN; m_nHandleID = nHandleID; m_nSessionID = nSessionID; m_PkgProcer.OnCreated(); #ifdef USE_DSIOCP m_PkgProcer.SetUniqueDsSocket( pUniqueSocket ); //由于SetHandleInfo内部可能用到此pUniqueSocket,因此本句要先执行; #endif //USE_DSIOCP m_PkgProcer.SetHandleInfo( nHandleID, nSessionID ); TRY_END; } ///取句柄信息; int GetHandleInfo( int& nHandleID, int& nSessionID ) { nHandleID = m_nHandleID; nSessionID = m_nSessionID; return 0; } /////为重用本缓存准备; //void ReUse() //{ // m_dwToProc = 0; // m_dwDataEnd = 0; //}; ///添加待处理包; bool AddPkgContent( const char* pBuf, int nSize ) { TRY_BEGIN; if ( nSize <= 0 ) { return true; } if ( !pBuf ) { return true; } if ( !m_bIsCurrentValid ) { //本句柄已经无效,不再处理后续消息; return false; } //if ( (m_dwTotalSize>=m_dwDataEnd) // && (m_dwTotalSize-m_dwDataEnd) <= (unsigned int)nSize )//!!这两个参数不可优化; //{ // //缓存不够,网络层传来的收包太大,错误; // return false; //} //以下将包添加到缓存; if ( m_dwTotalSize < m_dwDataEnd ) { D_WARNING( "解析包错误,AddPkgContent1, sessionID:%d", m_nSessionID ); m_PkgProcer.OnPkgError(); m_bIsCurrentValid = false; return false; } /* |invalid... |data... |invalic... | | |toProc |dataEnd&toWrite | */ //05.18, by dzj, if ( ( m_dwTotalSize - m_dwDataEnd ) > (unsigned int)nSize ) { //剩余空间足够,则拷入剩余空间; memcpy( &(m_pPkgBuf[m_dwDataEnd]), pBuf, nSize ); m_dwDataEnd += nSize; } else { ////剩余空间不够,则先处理已收数据中的可能包,然后将已收数据移至缓存区首,最后再次试着写入; //TryBatchProcPkgs();//先处理已收数据; //将剩余数据移至缓存区首 int curBufSize = m_dwDataEnd - m_dwToProc; memmove( m_pPkgBuf, &(m_pPkgBuf[m_dwToProc]), curBufSize ); m_dwToProc = 0; m_dwDataEnd = curBufSize; //然后再次尝试将新收数据附至尾部; if ( ( m_dwTotalSize - m_dwDataEnd ) > (unsigned int)nSize ) { //剩余空间足够,则拷入剩余空间; memcpy( &(m_pPkgBuf[m_dwDataEnd]), pBuf, nSize ); m_dwDataEnd += nSize; } else { //这样还不够,只能说明原来收到了无效数据,或者新收数据超过了缓存容量; //有可能是正常情况,例如:4个512的包,先收到256字节,然后一并收到256+2*512字节,这样,当前缓存内容中不够一个有效包,同时新收的包也不能一次性放入缓存 //为此,如果新收数据一次放不下,则应先放入一部分,尝试处理完毕后,再依次将剩余数据放入并处理之 //如果缓存放满时仍然不是一个有效包,则说明该连接发来的数据无效,应该断开此连接,并记错误日志; int tocp = nSize;//最初待处理数据大小; int tocppos = 0;//从最初位置开始处理数据; while ( tocp > 0 ) { //将剩余数据移至缓存区首 int curBufSize = m_dwDataEnd - m_dwToProc; memmove( m_pPkgBuf, &(m_pPkgBuf[m_dwToProc]), curBufSize ); m_dwToProc = 0; m_dwDataEnd = curBufSize; //拷一部分处理一部分; int curspaceleft = m_dwTotalSize-m_dwDataEnd; int cped = (tocp 0 ) { //还有数据没收完,但已收数据却不是完整包; D_WARNING( "解析包错误,AddPkgContent2, sessionID:%d", m_nSessionID ); m_PkgProcer.OnPkgError(); m_bIsCurrentValid = false; return true; } break; } } } } //////////////////////////////////////////////////////////// //COMM_BUG TryBatchProcPkgs();//如果不缓存,则每次收包后都立即尝试处理; //COMM_BUG //////////////////////////////////////////////////////////// return true; TRY_END; return false; } private: /** * 缓存中是否包含一个有效包,dwPkgLen为包长,包格式: * 1-2 3 4-5 不定长度 * 包长度 包类型 命令字 协议结构内容 * wPkgLen byPkgType wCmd pContent */ bool IsPkgValid( unsigned short& nPkgLen ) { TRY_BEGIN; //包头是一个表明本包长度的WORD字段,其中长度不包含WORD的2个字节; if ( m_dwToProc >= m_dwDataEnd ) { return false;//没有缓存数据; } unsigned int curBufed = m_dwDataEnd - m_dwToProc; if ( curBufed < sizeof(unsigned short)+sizeof(char)+sizeof(unsigned short) ) //包头长度字段为WORD,包类型字段BYTE, 命令字字段为WORD { return false;//包不完整; } unsigned short pkgLen = 0; //获得实际包长度 memcpy( &pkgLen, &(m_pPkgBuf[m_dwToProc]), sizeof(pkgLen) ); if ( pkgLen < sizeof(unsigned short)+sizeof(char)+sizeof(unsigned short) ) //包头长度字段为WORD,包类型字段BYTE, 命令字字段为WORD { //包头指明的长度小于最小长度; //解析包错误(应该断开连接) D_WARNING( "解析包错误,IsPkgValid1, sessionID:%d\n", m_nSessionID ); m_PkgProcer.OnPkgError(); m_bIsCurrentValid = false; return false; } if ( pkgLen >= MAX_MSG_SIZE ) { //解析包错误(应该断开连接) D_WARNING( "解析包错误,IsPkgValid2, sessionID:%d\n", m_nSessionID ); m_PkgProcer.OnPkgError(); m_bIsCurrentValid = false; return false; } if ( pkgLen > curBufed ) { return false; } nPkgLen = pkgLen;//缓存中有一个完整包; return true; TRY_END; return false; };//缓存中是否包含一个有效包,dwPkgLen为包长; ///处理一个完整包,不管该包怎样处理,底层都已经将其删去了,如果处理包时发生错误,由错误发生点即时处理; virtual void ProcOnePkg( const char* pBuf, const int& nSize ) { //TRY_BEGIN; if ( NULL == pBuf ) { //传入包空; return; } unsigned short wPkgLen = *((unsigned short*)pBuf); if ( wPkgLen != (unsigned short)nSize ) { //错误! return; } // 使用协议工具,不对bypkgType进行验证,cmd唯一区别消息2008.7.15 //char byPkgType = pBuf[2]; //if ( (byPkgType & 0x0f) != SELF_PROTO_TYPE ) //{ // //包类型不对,直接返回不处理,或者断开??? // return; //} unsigned short wCmd = *( (unsigned short*)( &(pBuf[3]) ) );//取命令字; ////int jjj, test, //if ( ( 0x10c != wCmd ) // && ( m_PkgProcer.GetSessionID() > 1000 ) // ) //{ // D_DEBUG( "收到socket%d消息%x\n", m_PkgProcer.GetSessionID(), wCmd ); //} ST_SIG_CATCH { if (m_nHandleID != 700) { if (wCmd != 0x5100 && wCmd != 0x7100) { D_DEBUG("\n----------ProcOnePkg:HandleID-%d,SessionID-%d,Cmd-%x\n",m_nHandleID,m_nSessionID,wCmd); } } m_PkgProcer.OnPkgRcved( wCmd, &(pBuf[5]), wPkgLen - 5 ); } END_SIG_CATCH; return; //return; //TRY_END; //return; };//处理一个完整包; ///尝试处理缓存中的一个包,返回值表明是否有包被处理过; bool TryProcOnePkg() //返回值表明是否有包被处理过; { TRY_BEGIN; unsigned short nPkgLen; nPkgLen = 0; if ( !IsPkgValid(nPkgLen) ) { return false; } ProcOnePkg( &(m_pPkgBuf[m_dwToProc]), nPkgLen ); m_dwToProc += nPkgLen; if ( m_dwToProc >= m_dwDataEnd ) { m_dwDataEnd = 0; m_dwToProc = 0; } //////////////////////////////////////////////////////////////////////////// ////这一段如果放到处理包之后,则会重复进入,郁闷! //unsigned int tmpPos = m_dwToProc; //m_dwToProc += nPkgLen;//新的待处理位置; //if ( m_dwToProc >= m_dwDataEnd ) //缓存的包都已被处理完毕; //{ // m_dwDataEnd = 0; // m_dwToProc = 0; //} ////这一段如果放到处理包之后,则会重复进入,郁闷! //////////////////////////////////////////////////////////////////////////// //ProcOnePkg( &(m_pPkgBuf[tmpPos]), nPkgLen ); return true; TRY_END; return false; } ///尝试批处理缓存中的包,nMaxCount为最多处理多少个包,返回值表明是否有包被处理过; bool TryBatchProcPkgs() //尝试批处理缓存中的包,nMaxCount为最多处理多少个包,返回值表明是否有包被处理过; { TRY_BEGIN; bool isSomePkgProced = false; while (true) { if ( !(TryProcOnePkg()) ) { break; } else { isSomePkgProced = true; } } return isSomePkgProced; TRY_END; return false; } private: ///包缓存; char m_pPkgBuf[DS_PKG_BUFSIZE]; ///缓存位置指示; volatile unsigned int m_dwTotalSize, m_dwToProc, m_dwDataEnd; ///包处理者:该处理者实现OnPkgRcved( wCmd, &(pBuf[5]), wPkgLen - 5 ),其中对包进行处理; T_PkgProcer m_PkgProcer; bool m_bIsCurrentValid;//本句柄当前是否有效,初始时总是有效的,除非解析包发生了错误,一旦发生错误,则本句柄不再处理任何包直至本句柄被删除; ///通信句柄标识,来自MSG_TO_PUT; int m_nHandleID; ///通信句柄sessionid,来自MSG_TO_PUT,防止两次通信中间通信句柄被重用; int m_nSessionID; };