/** * @file dssocket.h * @brief socket封装 * Copyright(c) 2007,上海第九城市游戏研发部 * All rights reserved * 文件名称: dssocket.h * 摘 要: 用于dsiocp,参考:msdn,<> (second edition) * 作 者: dzj * 完成日期: 2008.05.16-2008.05.19 * */ #pragma once #ifndef DS_SOCKET_H #define DS_SOCKET_H #include "tcache/tcache.h" #ifdef USE_DSIOCP #include #include #include #include using namespace std; class CDsSocket; extern TCache< UniqueObj< CDsSocket > >* g_poolUniDssocket;//保存socket唯一对象的对象池; struct DsIOBuf { static const unsigned int IO_BUF_SIZE = 2048u;//20倍=10k; public: DsIOBuf() : nHandleID(0), nSessionID(0), nToDealLen(0) { //pUniqueSocket = NULL; } ~DsIOBuf() { PoolObjInit(); } PoolFlagDefine() { //if ( NULL != pUniqueSocket ) //{ // if ( NULL != g_poolUniDssocket ) // { // g_poolUniDssocket->Release( pUniqueSocket ); // pUniqueSocket = NULL; // } //} //pUniqueSocket = NULL; nHandleID = 0; nSessionID = 0; nToDealLen = 0; } public: bool PushBuf( const char* pMsgBlk, int nMsgLen ) { if ( ( nToDealLen + nMsgLen ) >= IO_BUF_SIZE ) { return false; } else { memcpy( &(pInnerBuf[nToDealLen]), pMsgBlk, nMsgLen ); nToDealLen += nMsgLen; return true; } } char* GetInnerBuf() { return pInnerBuf; } int GetInnerLen() { return nToDealLen; } void ResetInner() { nToDealLen = 0; } public: inline void SetHandleID( int inHandleID ) { nHandleID = inHandleID; }; inline void SetSessionID( int inSessionID ) { nSessionID = inSessionID; }; inline void SetDealLen( int inToDealLen ) { nToDealLen = inToDealLen; }; private: int nHandleID;/*与该包关联的句柄标识*/ int nSessionID;/*关联句柄校验信息(例如IP地址+端口)没有被中途替换)*/ int nToDealLen; char pInnerBuf[IO_BUF_SIZE];/*消息设为固定大小*/ // UniqueObj< CDsSocket >* pUniqueSocket;//该消息对应的socket; }; extern TCache< DsIOBuf >* g_poolDsIOBuf; ///初始化网络环境; bool InitNetEnv(); ///重置网络环境; void ResetNetEnv(); struct PerIOData; template < typename T_ToSend/*从此队列取待发送消息*/, typename T_RcvTo/*接收消息的存放队列*/, typename T_Socket/*socket类型*/ > class CDsIocp; class CDsSocket { private: CDsSocket( const CDsSocket& ); //屏蔽这两个操作; CDsSocket& operator = ( const CDsSocket& );//屏蔽这两个操作; public: static void Init() { m_globalSessionID = 5000; //SOCKET s = socket( AF_INET, SOCK_STREAM, IPPROTO_TCP ); //DWORD dwBytes = 0; //GUID GuidTransmitFile = WSAID_TRANSMITFILE; //WSAIoctl( s, SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidTransmitFile // , sizeof(GuidTransmitFile), &m_pfnTransmitFile, sizeof(m_pfnTransmitFile), &dwBytes, NULL, NULL ); //if ( NULL == m_pfnTransmitFile ) //{ // //error; // return; //} } public: void SetInitOutstandAcceptNum( long acceptNum ) { ::InterlockedExchange( &m_outstandAcceptNum, acceptNum ); } void DecAcceptNum() { ::InterlockedDecrement( &m_outstandAcceptNum ); } void IncAcceptNum() { ::InterlockedIncrement( &m_outstandAcceptNum ); } long GetAcceptNum() { return m_outstandAcceptNum; } ////1027.debug///初始化pending的send数; //void InitPendingSendNum() //{ // ::InterlockedExchange( &m_outstandSendNum, 0 ); //} ////1027.debug///pending的send数减1; //void DecPendingSendNum() //{ // ::InterlockedDecrement( &m_outstandSendNum ); //} ////1027.debug///pending的send数加1; //void IncPendingSendNum() //{ // ::InterlockedIncrement( &m_outstandSendNum ); //} ////1027.debug///取pending的send数; //long GetPendingSendNum() //{ // return m_outstandSendNum; //} #define PERSOCK_SENDBUF 1024*10 //10k; public: CDsSocket() { m_pListSendBuf.clear();//初始队列中无待发送数据; //m_pPerIOData = NULL; m_ulSessionID = 0; m_outstandAcceptNum = 0;//for listen socket; ///1027.debug,InitPendingSendNum(); m_hSocket = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED ); } ~CDsSocket(); PoolFlagDefine(); public: SOCKET& GetSocket() { return m_hSocket; } public: /////设置与本socket关联的最后一个等待中每句柄数据,应该在post时设置,完成端口等待返回时置空,如果在这期间socket被回收,则pPerIOData也可以被回收; //void SetPerIOData( PerIOData* pPerIOData ) //{ // m_pPerIOData = pPerIOData; //} void SetSocketAddr( const char* sockAddr, unsigned short sPort ) { //if ( NULL != m_strAddr )\ //{\ // if ( sizeof(m_strAddr) > strlen(sockAddr) )\ // {\ // strcpy_s( m_strAddr, sizeof(m_strAddr), sockAddr );\ // }\ //} SafeStrCpy( m_strAddr, sockAddr ); m_sPort = sPort; } bool BindPort( unsigned short sPort ) { SOCKADDR_IN tmpAddr; tmpAddr.sin_family = AF_INET; tmpAddr.sin_addr.s_addr = htonl( INADDR_ANY ); tmpAddr.sin_port = htons( sPort ); return ( 0 == bind( m_hSocket, (PSOCKADDR)&tmpAddr, sizeof(tmpAddr) ) ); } public: //hard close socket; void SetCloseNoWait() { LINGER lingerval; lingerval.l_onoff = TRUE; lingerval.l_linger = 0; int issetok = setsockopt( m_hSocket, SOL_SOCKET, SO_LINGER, (const char*)&lingerval, sizeof(lingerval) ); if ( 0 != issetok ) { int nerr = GetLastError(); D_ERROR( "SetCloseNoWait, ERR:%d\n", nerr ); } } ///置为非阻塞; void SetNonBlocking() { unsigned long valul = 1ul; int nRet = ioctlsocket( m_hSocket, FIONBIO, (unsigned long*) &valul); if ( SOCKET_ERROR == nRet ) { int nerr = GetLastError(); D_ERROR( "SetNonBlocking, ERR:%d\n", nerr ); } } void SetBlocking() { unsigned long valul = 0ul; int nRet = ioctlsocket( m_hSocket, FIONBIO, (unsigned long*) &valul); if ( SOCKET_ERROR == nRet ) { DWORD nError = GetLastError(); D_ERROR( "SetBlocking, ERR:%d\n", nError ); } } public: //由于只有在回收时sessionid才会发生变化,暂时对这三个操作不加锁; //pending操作发出前保存本次pending操作对应socket的唯一ID号,以便在pending操作返回后进行校验; //改为每次retrieve后都立即保存, by dzj, 08.06.04; void RsvPendingSessionID() { m_pendingSessionID = m_ulSessionID; } //取pending操作发出之时的sessionID号; unsigned long GetPendingSessionID() { return m_pendingSessionID; } ///pending期间是否已被重用 bool IsReused() { return m_pendingSessionID != m_ulSessionID; } ///AddSendBuff时检查当前socket是否被重用,输入checkSendSessionID来自pUniqueSocket; bool AddSendBuffSessionIDCheck( unsigned long checkSendID ) { bool isCheckOK = ( GetUID() == checkSendID);//当前sessionID确为欲发送至的sessionID; m_checkSendID = checkSendID; return isCheckOK; } ///SendBuff时检查当前socket是否被重用,使用AddSendBuff时保存的m_checkSendSessionID进行校验; bool SendBuffSessionIDCheck() { return (GetUID() == m_checkSendID); } //设置本socket对应一个连入连接; void SetIsInConn() { m_bIsInConn = true;//设置本socket对应一个连入连接; } bool IsInConn() { //是否在被使用中; return m_bIsInConn;//是否为连入连接; } unsigned long GetSessionID() { return m_ulSessionID; } //只有主动连接时才手动置sessionID; void ManualSetSessionID( unsigned long inSessionID ) { m_ulSessionID = inSessionID; } //每次从池中分配时自动置sessionID; void AutoSetSessionID() { ::InterlockedIncrement( &m_globalSessionID ); if ( m_globalSessionID < 5000 )//尽量减少写锁m_sessionIDLock的使用; { //假设同时有3个线程经过Inc以后得到3个SessionID:1,2,3; ACE_GUARD( ACE_Mutex, guard, m_sessionIDLock );//sessionID写锁; if ( m_globalSessionID < 5000 ) { m_globalSessionID = 5000;//第一个进入写锁的人将globalSessionID设成5000; } else { ++m_globalSessionID;//后进的人分别得到5001,5002; } } m_ulSessionID = m_globalSessionID; m_checkSendID = 0;//只在发送数据时才有意义; } void InitIsDisConnecting() { m_isDisConnecting = FALSE; } void SetIsDisConnecting() { m_isDisConnecting = TRUE; } ///是否正在断连中。。。 BOOL IsDisConnecting() { return m_isDisConnecting; } private: ///断连中socket不要发送更多的信息(不需要太严格,主要为了防止消耗过多DSIOBUF,同时应用层也应作限制,即已发主动断连请求的socket不要继续向其写消息), by dzj, 08.10.28; BOOL m_isDisConnecting;//是否正在断连中,断连中socket不要发送更多的信息(不需要太严格,主要为了防止消耗过多DSIOBUF,同时应用层也应作限制,即已发主动断连请求的socket不要继续向其写消息); public: BOOL AddSendBuf( const char* pPkg, int nPkgLen, unsigned long checkSendSessionID/*来自UniqueSocket的checkID*/ ) { ACE_GUARD_RETURN( ACE_Thread_Mutex, guard, m_usingLock, FALSE ); TRY_BEGIN; //UniqueSocket的GetUniqueObj保证发送给目标对象,Socket内部的RsvSessionID保证目标对象没有被重用; if ( IsReused() ) { //socket已被回收,且暂时没被其它连接重用; return FALSE; } if ( IsDisConnecting() ) { //socket正在断开中,不要再写后续消息了; return FALSE; } if ( ! AddSendBuffSessionIDCheck( checkSendSessionID ) ) { //socket已被回收然后又被另一个连接重用; return FALSE; } DsIOBuf* pCurBuf = NULL; if ( !(m_pListSendBuf.empty()) ) { pCurBuf = m_pListSendBuf.back(); } if ( ( NULL == pCurBuf ) //当前发送缓存队列空 || ( !(pCurBuf->PushBuf(pPkg, nPkgLen)) ) //当前发送缓存放不下; ) { pCurBuf = g_poolDsIOBuf->RetrieveOrCreate(); if ( NULL == pCurBuf ) { TRY_BEGIN; D_ERROR( "AddSendBuf时,g_poolDsIOBuf,内存分配失败\n" ); TRY_END; return FALSE; } pCurBuf->SetHandleID( this->GetSessionID() ); pCurBuf->SetSessionID( this->GetSessionID() ); pCurBuf->SetDealLen( 0 ); if ( !(pCurBuf->PushBuf( pPkg, nPkgLen ) ) ) { D_ERROR( "AddSendBuf,不可能错误\n" ); g_poolDsIOBuf->Release( pCurBuf ); return FALSE; } m_pListSendBuf.push_back( pCurBuf );//新分配的发送缓存附到队列尾; } return TRUE; TRY_END; return FALSE; } private: void ReleseSendBuf() { if ( !(m_pListSendBuf.empty()) ) { for ( list< DsIOBuf* >::iterator iter=m_pListSendBuf.begin(); iter!=m_pListSendBuf.end(); ++iter ) { g_poolDsIOBuf->Release( *iter );//回收所有还未发送出去者; } } ResetSendList(); } void ResetSendList() { m_pListSendBuf.clear(); } public: void ResetAppNotified() { m_AppNotified = false; } void SetAppNotified() { m_AppNotified = true; } bool IsAppNotified() { return m_AppNotified; } private: bool m_AppNotified;//关于本socket的连接信息是否通知过了上层应用,如果通知过了,则断连时也要通知; private: list< DsIOBuf* > m_pListSendBuf; private: SOCKET m_hSocket; long m_outstandAcceptNum;//for listen socket; //1027.debug long m_outstandSendNum;//outstand的send数; // PerIOData* m_pPerIOData;//用于在socket删除时同时删去本socket上post出去的最后一个PerIOData; char m_strAddr[32]; unsigned short m_sPort; unsigned long m_ulSessionID;//本socket的唯一ID号; unsigned long m_pendingSessionID;//pending操作发出前本socket的唯一ID号,由于pending操作中该socket可能被回收也可能被重用,pending操作结束后,如果直接使用m_ulSessionID就会导致逻辑错,by dzj, 08.05.25; unsigned long m_checkSendID;//专用于发送操作的校验sessionID,用于防止在AddSendBuff与真正SendInnerBuff之间,socket被重用,从而导致向错误的socket发送数据,实际上,由于这期间的时间非常短,而socket池很大,理论上不太可能socket立即被重用。 bool m_bIsInConn;//本socket是否对应一个连入连接; static long m_globalSessionID; public: bool SendInnerBuf(); private: ///读写锁; ACE_Thread_Mutex m_usingLock;//防止多线程同时操作同一socket; static ACE_Mutex m_sessionIDLock;//全局sessionID写锁; }; #endif //USE_DSIOCP #endif //#ifndef DS_SOCKET_H