#ifndef WIN32 #include "lsocket.h" #ifdef TEST_CODE #include "testplayer.h" #else //TEST_CODE #include "../../GateSrv_IOCP/GateSrv/Player/Player.h" #endif //TEST_CODE #include "apppoolobjs.h" extern bool g_IsSrv;//服务端或客户端; ///显式关闭socket之上的后续消息,主要是一系列epoll消息到来时,有可能处理第1个消息,close fd之前到来了新消息,从而使得返回后继续处理新消息找不到对应的socket对象,虽然实际好象不太可能,但为防万一还是加下; void CLSocket::DelFromEPoll() { if ( NULL == m_epfdArr ) { NewLog( LOG_LEV_INFO, "DelFromEPoll,m_epfdArr数组空" ); return; } int delrst = epoll_ctl( m_epfdArr[m_epfdPos].GetInnerFd(), EPOLL_CTL_DEL, m_fdSocket, NULL ); if ( 0 != delrst ) { NewLog( LOG_LEV_INFO, "DelFromEPoll,从epoll中删去自身%d失败", m_fdSocket ); } return; } ///添加至EPOLL并设置关心事件; void CLSocket::AddToEPoll( int flag ) { if ( NULL == m_epfdArr ) { NewLog( LOG_LEV_INFO, "AddToEPoll,m_epfdArr数组空" ); return; } ///////////////////////////////////////// //写丢失事件防止,参见m_bCareEpollOut注释; m_bCareEPollOut = false; //写丢失事件防止,参见m_bCareEpollOut注释; ///////////////////////////////////////// //已连接; m_careEvent.events = flag; //真正写前清m_bCareEPollOut; //写丢失事件防止,参见m_bCareEpollOut注释; ///////////////////////////////////////// //NewLog( LOG_LEV_INFO, "加入epoll,关心socket%d的%x事件\n", m_fdSocket, flag ); epoll_ctl( m_epfdArr[m_epfdPos].GetInnerFd(), EPOLL_CTL_ADD, m_fdSocket, &m_careEvent ); //int ctlrst = epoll_ctl( m_epfdArr[m_epfdPos].GetInnerFd(), EPOLL_CTL_ADD, m_fdSocket, &m_careEvent ); //NewLog( LOG_LEV_DEBUG, "AddToEPoll, epoll_ctl结果%d,errno:%d", ctlrst, errno ); return; } ///添加EPOLL关心事件; void CLSocket::SetEPollCareFlag( int flag ) { ///////////////////////////////////////// //写丢失事件防止,参见m_bCareEpollOut注释; if ( flag & EPOLLOUT ) { m_bCareEPollOut = true; } //写丢失事件防止,参见m_bCareEpollOut注释; ///////////////////////////////////////// //已连接; m_careEvent.events = flag; //已连接; m_careEvent.events = flag; //NewLog( LOG_LEV_INFO, "关心socket%d的%x事件\n", m_fdSocket, flag ); int ctlrst = epoll_ctl( m_epfdArr[m_epfdPos].GetInnerFd(), EPOLL_CTL_MOD, m_fdSocket, &m_careEvent ); if ( 0!=ctlrst ) { int tmperr = errno; NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d,epoll_ctrl_error_1:%d", m_fdSocket, tmperr ); } //int ctlrst = epoll_ctl( m_epfdArr[m_epfdPos].GetInnerFd(), EPOLL_CTL_MOD, m_fdSocket, &m_careEvent ); //NewLog( LOG_LEV_DEBUG, "SetEPollCareFlag,epoll_ctl结果%d,errno:%d", ctlrst, errno ); ///////////////////////////////////////// //写丢失事件防止,参见m_bCareEpollOut注释; if ( m_bCareEPollOut ) { m_careEvent.events |= EPOLLOUT; //NewLog( LOG_LEV_INFO, "关心socket%d的%x事件\n", m_fdSocket, flag ); int ctlrst = epoll_ctl( m_epfdArr[m_epfdPos].GetInnerFd(), EPOLL_CTL_MOD, m_fdSocket, &m_careEvent ); if ( 0!=ctlrst ) { int tmperr = errno; NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d,epoll_ctrl_error_2:%d", m_fdSocket, tmperr ); } //int ctlrst = epoll_ctl( m_epfdArr[m_epfdPos].GetInnerFd(), EPOLL_CTL_MOD, m_fdSocket, &m_careEvent ); //NewLog( LOG_LEV_DEBUG, "SetEPollCareFlag,epoll_ctl结果%d,errno:%d", ctlrst, errno ); } //真正写前清m_bCareEPollOut; //写丢失事件防止,参见m_bCareEpollOut注释; ///////////////////////////////////////// return; } ///出错回调(EPOLL线程上下文); bool CLSocket::OnEPollError( int errNum ) { //由于-1!=waitrst,因此tmperr中不是真正的socket错误号,虽然多半是对端关闭,但如果一定要知道真正的socket错误号,可以在OnEPollError中再对此socket作一次操作。 //if ( IsConnecting() ) //{ // NewLog( LOG_LEV_DEBUG, "连接,socket%d,EPOLL_ERROR收到", m_fdSocket, errNum ); // return InnerConnect(); //} NewLog( LOG_LEV_INFO, "socket%d错%x,close...", m_fdSocket, errNum ); return false;//需要关闭; } ///EPOLLHUP回调(EPOLL线程上下文); bool CLSocket::OnEPollHup() { if ( IsNewSocket() ) { //加入epoll成功; NewLog( LOG_LEV_INFO, "socket%d成功加入epoll", m_fdSocket ); return true; } else { if ( IsConnecting() ) { //连接中socket; NewLog( LOG_LEV_INFO, "连接,socket%d EPOLLHUP收到", m_fdSocket ); return InnerConnect(); } else { //错误 NewLog( LOG_LEV_INFO, "OnEPollHup, socket%d", m_fdSocket ); return false; } } return false; } ///EPOLLIN(EPOLL线程上下文) bool CLSocket::OnEPollIn() { if ( IsListening() ) { //监听端口; NewLog( LOG_LEV_INFO, "监听端口epoll in..." ); if ( InnerProcIncomeConn() )//处理新连入连接; { SetEPollCareFlag( EPOLLIN | EPOLLERR | EPOLLONESHOT ); return true; } else { return false; } } //普通接收处理; if ( IsConnected() ) { return InnerRcv(); } return false; } ///EPOLLOUT回调(EPOLL线程上下文); bool CLSocket::OnEPollOut() { ///////////////////////////////////////// //写丢失事件防止,参见m_bCareEpollOut注释; m_bCareEPollOut = false; //写丢失事件防止,参见m_bCareEpollOut注释; ///////////////////////////////////////// if ( IsConnecting() ) { NewLog( LOG_LEV_INFO, "连接,socket%d EPOLLOUT收到", m_fdSocket ); return InnerConnect(); } else { //检查数据发送; if ( IsConnected() ) { NewLog( LOG_LEV_INFO, "socket%d EPOLLOUT,检查待发数据", m_fdSocket ); return InnerSend(); } else { NewLog( LOG_LEV_ERROR, "socket%d EPOLLOUT,未连接socket收到EPollOut", m_fdSocket ); return false; } } return false; } ///关闭socket,并重新初始化; bool CLSocket::InnerClose() { int tmpSocket = m_fdSocket; if ( -1 == m_fdSocket ) { //本来就不是有效的socket; return true; } DelFromEPoll();//首先从epoll中删去,不再接受该socket上到来的消息 int closerst = close( m_fdSocket ); if ( -1 == closerst ) { int tmperr = errno; NewLog( LOG_LEV_ERROR, "socket%d 关闭错误, 可能此socket已关闭,err : %d", tmpSocket, tmperr ); //bug 22修改,具体见'todo.doc', by dzj, 09.06.12. SetNeedActiveExplore();//将自身标记为需要遍历,以便由遍历线程删除自身,因为之前close socket者可能用过了active标记; return false; } else { m_fdSocket = -1; SetCurStat( LS_INVALID );//设置自身在下次遍历时需删除,参见'todo.doc'bug23; NewLog( LOG_LEV_DEBUG, "长时间断连debug,socket%d 端口关闭完成", tmpSocket ); } //bug 22修改,具体见'todo.doc', by dzj, 09.06.12. SetNeedActiveExplore();//将自身标记为需要遍历,以便由遍历线程删除自身; return true; } ///处理新连入连接; bool CLSocket::InnerProcIncomeConn() { sockaddr_in cliaddr; socklen_t addrlen = sizeof(cliaddr); while ( 1 ) { int newsock = accept( m_fdSocket, (struct sockaddr*)(&cliaddr), &addrlen ); if ( -1 == newsock ) { int tmperr = errno; if ( (EWOULDBLOCK == tmperr) || ( EINPROGRESS == tmperr ) ) { NewLog( LOG_LEV_INFO, "InnerProcIncomeConn,socket%d,care读", m_fdSocket ); return true;//没有新的待处理连接; } else { NewLog( LOG_LEV_ERROR, "监听端口:socket%d accept错%d", m_fdSocket, tmperr ); if ( EMFILE == tmperr ) { PrintFdNum(); } return false;//需要断开; } } //正常连入连接建立 CLSocket* newLSock = R_PLAYER::CreateSelfIns(); if ( NULL == newLSock ) { NewLog( LOG_LEV_ERROR, "InnerProcIncomeConn:无法建立新的LSocket,关闭新收到连接socket%d", newsock ); close( newsock ); continue; } newLSock->SetRemoteAddr( inet_ntoa(cliaddr.sin_addr), ntohs(cliaddr.sin_port) ); if ( !( newLSock->AddSelfToFrame( GetManFrame() ) //使用监听端口的ManFrame; ) ) { //delete newLSock; newLSock->DestorySelfIns(); newLSock = NULL; continue; } newLSock->SetEPollArrWithFd( newsock, m_epfdArr, m_epfdArrSize ); NewLog( LOG_LEV_INFO, "InnerProcIncomeConn,socket%d,位置%d", newLSock->GetInnerSocket(), newLSock->GetPosInfo() ); } return false;//不可能到此处; } ///监听本地地址; bool CLSocket::InnerListen() { if ( NULL == m_localAddr ) { NewLog( LOG_LEV_ERROR, "socket%d 绑定地址时,本地地址为空,无法绑定", m_fdSocket ); return false; } //将自身设为REUSEADDR int isReuseAddr = 1; int setreuse = setsockopt( m_fdSocket, SOL_SOCKET, SO_REUSEADDR, &isReuseAddr, sizeof(isReuseAddr) ); if ( 0 != setreuse ) { int tmperr = errno; NewLog( LOG_LEV_ERROR, "socket%d 设重用地址选项,错误:%d", m_fdSocket, tmperr ); return false; } int isNoDelay = 1; int setnodelay = setsockopt( m_fdSocket, IPPROTO_TCP, TCP_NODELAY, &isNoDelay, sizeof(isNoDelay) ); if ( 0 != setnodelay ) { int tmperr = errno; NewLog( LOG_LEV_ERROR, "socket%d 设TCP_NODELAY选项,错误:%d", m_fdSocket, tmperr ); return false; } in_addr bindadr;; sockaddr_in localAddr; localAddr.sin_family = AF_INET; localAddr.sin_port = htons( m_localPort ); unsigned long atorst = inet_aton( m_localAddr, &bindadr ); if ( INADDR_NONE==atorst ) { NewLog( LOG_LEV_ERROR, "InnerListen,socket%d,inet_aton错,无效地址%s", m_fdSocket, m_localAddr ); return false; } memcpy( &(localAddr.sin_addr), &(bindadr.s_addr), 4 ); int bindret = bind( m_fdSocket, (const sockaddr*)&localAddr, sizeof(localAddr) ); if ( -1 == bindret ) { int tmperr = errno; if ( tmperr == EADDRINUSE ) { NewLog( LOG_LEV_ERROR, "socket%d 绑定地址%s:%d,重复绑定,可能旧资源未完全释放,请稍等", m_fdSocket, m_localAddr, m_localPort ); } else { NewLog( LOG_LEV_ERROR, "socket%d 绑定地址%s:%d错误, err : %d", m_fdSocket, m_localAddr, m_localPort, tmperr ); } return false; } int listenret = listen( m_fdSocket, 1 ); if ( -1 == listenret ) { int tmperr = errno; NewLog( LOG_LEV_ERROR, "socket%d 监听地址%s:%d错误, err : %d", m_fdSocket, m_localAddr, m_localPort, tmperr ); return false; } SetCurStat( LS_LISTENING ); SetEPollCareFlag( EPOLLIN | EPOLLERR | EPOLLONESHOT ); NewLog( LOG_LEV_INFO, "InnerListen,socket%d,care读", m_fdSocket ); return true; } ///重连远端地址; bool CLSocket::InnerConnect() { in_addr remoteadr; sockaddr_in srvAddr; srvAddr.sin_family = AF_INET; srvAddr.sin_port = htons( m_remotePort ); unsigned long atorst = inet_aton( m_remoteAddr, &remoteadr ); if ( INADDR_NONE==atorst ) { NewLog( LOG_LEV_ERROR, "InnerConnect,socket%d,inet_aton错,无效地址%s", m_fdSocket, m_localAddr ); return false; } memcpy( &(srvAddr.sin_addr), &(remoteadr.s_addr), 4 ); int connret = connect( m_fdSocket, (const sockaddr*)&srvAddr, sizeof(srvAddr) ); if ( -1 == connret ) { int tmperr = errno; //连接失败 NewLog( LOG_LEV_INFO, "socket%d尝试连接, err:%d", m_fdSocket, tmperr ); if ( EINPROGRESS == tmperr ) { //非阻塞socket下的经常情形; SetCurStat( LS_CONNECTING ); NewLog( LOG_LEV_INFO, "socket%d,尝试连接置EPOLLOUT事件", m_fdSocket ); SetEPollCareFlag( EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLONESHOT ); NewLog( LOG_LEV_INFO, "InnerConnect,socket%d,care连入", m_fdSocket ); } else if ( EALREADY == tmperr ) { //已连接; NewLog( LOG_LEV_DEBUG, "socket%d,异步连接成功", m_fdSocket ); SetCurStat( LS_CONNECTED ); } else { //其它错误; NewLog( LOG_LEV_ERROR, "InnerConnect,连接%s:%d失败%d,close...", m_remoteAddr, m_remotePort, tmperr ); return false; } } else { //连接成功 NewLog( LOG_LEV_INFO, "socket%d,直接连接成功", m_fdSocket ); SetCurStat( LS_CONNECTED ); } return true; } ///内部数据发送(真正向端口发送); bool CLSocket::InnerSend() { if ( !IsConnected() ) { NewLog( LOG_LEV_ERROR, "InnerSend,非连接socket执行发送操作" ); return false; } EPollSendPre();//发送消息遗漏防止; NewLog( LOG_LEV_INFO, "InnerSend, socket%d发送数据", m_fdSocket ); bool isNeedClose = false;//是否检测到错误需要关闭socket; //先将上次未发完者发完,然后再尝试发送待发队列中数据; if ( !SendFirstSendQue( isNeedClose ) ) { //这次仍然没有发完,下次再发; return (!isNeedClose);//如果需要close,则返回失败,表明需要close; } //旧数据已全部发完,将待发数据取到优先发送队列; bool isNeedMoreSend = true; bool isSendOK = true; while ( isNeedMoreSend ) { //漏洞是如果最后一个LS_FIRSTQUE_SIZE位置正好是队中最后一个消息,则会多执行一次PopEle,但这也没有影响 isNeedMoreSend = false;//除非待发送数据非常多,超过了LS_FIRSTQUE_SIZE,且已取出的数据发送顺利 int popelenum = 0; m_pSendQueue->PopEle( m_arrSendFirst, LS_FIRSTQUE_SIZE, popelenum ); if ( popelenum <= 0 ) { break;//没有新的待发数据; } //取到了待发数据; m_sendfirstSt = 0; m_sendfirstEnd = popelenum;//优先队列末尾位置 isSendOK = SendFirstSendQue( isNeedClose ); if ( isNeedClose ) { return false; } if ( ( popelenum>= LS_FIRSTQUE_SIZE ) //除非待发送数据非常多,超过了LS_FIRSTQUE_SIZE,且已取出的数据发送顺利 && ( isSendOK ) ) { isNeedMoreSend = true; } } NewLog( LOG_LEV_INFO, "InnerSend, socket%d本次发送结束", m_fdSocket ); EPollSendBH();//发送消息遗漏防止; return true; } ///内部数据接收(真正从端口接收); bool CLSocket::InnerRcv() { /* NewMsgToPut m_arrRcv[LS_RCVBUF_NUM];//接收队列(仅由EPOLL线程上下文使用修改); iovec m_arrRcvTmp[LS_RCVBUF_NUM];//接收队列(仅由EPOLL线程上下文使用修改); int m_rcvfirstEnd;//接收队列的结束位置(<而不是<=) */ //使用m_arrRcvTmp接收,接收完成后分配新的NewMsgToPut往上层传; while ( true/*暂时不用isNeedCont,永远收到block为止*/ ) //一直收到无数据可收为止(底层可能还有需要收的数据,前次接收LS_RCVBUF_NUM中的接收缓存不够;); { ssize_t readlen = recv( m_fdSocket, m_rcvBuf.GetInnerBuf(), m_rcvBuf.GetInnerBufSize(), 0 ); if ( 0 == readlen ) { //远端断开; NewLog( LOG_LEV_INFO, "socket%d,收到0字节,close...", m_fdSocket ); return false;//需要断开连接; } else if ( 0 < readlen ) { NewLog( LOG_LEV_INFO, "InnerRcv,socket%d,接收到%d字节", m_fdSocket, readlen ); //改为在lsrecv中记录,以防止多线程操作此变量,m_lastrcvTick = GetTickCount();//记录最后收包时刻; m_rcvBuf.AddRcved( readlen ); bool isAppNeedNoti = false; #ifdef USE_CRYPT if ( ! m_rcvBuf.SubmitValidPkg( m_Crypt, m_pRcvQueue, isAppNeedNoti ) ) #else //USE_CRYPT if ( ! m_rcvBuf.SubmitValidPkg( m_pRcvQueue, isAppNeedNoti ) ) #endif //USE_CRYPT { return false; } if ( isAppNeedNoti ) { SetNeedActiveExplore(); } continue; } else { int tmperr = errno; //若为IOPENDING,则继续置读标记,否则为出错,断开socket if ( ( EWOULDBLOCK == tmperr ) || ( EAGAIN == tmperr ) ) { NewLog( LOG_LEV_INFO, "socket%d,EWOULDBLOCK置下次收事件", m_fdSocket ); SetEPollCareFlag( EPOLLIN | EPOLLERR | EPOLLONESHOT ); NewLog( LOG_LEV_INFO, "InnerRcv,socket%d,care下次读", m_fdSocket ); return true;//准备下次继续接收; } else { NewLog( LOG_LEV_INFO, "socket%d, read错误%d", m_fdSocket, tmperr ); return false;//出错需要断开 } } } return false;//不可能到此处; } ///发送优先队列中数据,如果全部发完,则返回真,否则返回假; bool CLSocket::SendFirstSendQue( bool& isNeedClose ) { isNeedClose = false;//只有发送时检测到错误才关闭; /* NewMsgToPut* m_arrSendFirst[LS_SENDFIRST_SIZE];//优先发送队列(仅由EPOLL线程上下文使用修改); int m_sendfirstSt;//优先发送队列的发送起始位置 int m_sendfirstEnd;//优先发送队列的发送结束位置(<而不是<=) */ if ( m_sendfirstEnd <= m_sendfirstSt ) { //无待发送数据; //NewLog( LOG_LEV_INFO, "socket%d, 优先队列无可发数据", m_fdSocket ); SetEPollCareFlag( EPOLLIN | EPOLLERR | EPOLLONESHOT ); NewLog( LOG_LEV_INFO, "SendFirstSendQue,socket%d,写完care下次读", m_fdSocket ); return true; } //优先发送队列中有待发数据 int tosendnum = m_sendfirstEnd-m_sendfirstSt; for ( int i=m_sendfirstSt; imsgBuf;//转到发送数组中去,以适应writev接口; m_arrSendFirstTmp[i].iov_len = m_arrSendFirst[i]->msgLen; if ( i>=tosendnum ) { //待发数据已拷贝完毕; break; } } ssize_t sendlen = writev( m_fdSocket, &(m_arrSendFirstTmp[m_sendfirstSt]), tosendnum ); if ( -1 == sendlen ) { int tmperr = errno; //若为IOPENDING,则继续置读标记,否则为出错,断开socket if ( ( EWOULDBLOCK == tmperr ) || ( EAGAIN == tmperr ) ) { //写失败,下次再写; int tmperr = errno; NewLog( LOG_LEV_INFO, "socket%d, 写失败%d,置下次可写通知", m_fdSocket, tmperr ); SetEPollCareFlag( EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLONESHOT ); NewLog( LOG_LEV_INFO, "SendFirstSendQue,socket%d,写失败care下次写", m_fdSocket ); return false; } else { NewLog( LOG_LEV_ERROR, "socket%d, write错误%d,可能socket已关闭", m_fdSocket, tmperr ); isNeedClose = true; return false; } } //以下写正常; NewLog( LOG_LEV_INFO, "socket%d, 已写数据长度:%d", m_fdSocket, sendlen ); bool isallsend = true;//是否优先队列中所有数据都已发送? //将已发送数据一个个删掉; for ( int i=m_sendfirstSt; imsgLen; if ( sendlen < curmsglen )//当前消息尚未发完; { int remained = curmsglen - sendlen;//尚未发完的消息长度; //当前消息还未发送完毕,new 一个NewMsgToPut存放未发完信息,替换已部分发送信息,然后返回; NewMsgToPut* newtosend = g_MsgToPutPool->DsRetrieve( MsgToPutNO ); if ( NULL == newtosend ) { NewLog( LOG_LEV_ERROR, "SendFirstSendQue, 分配NewMsgToPut内存失败" ); isNeedClose = true; //优先队列中信息已全部发送; m_sendfirstSt = m_sendfirstEnd = 0;//重置优先发送队列; return false; } //NewLog( LOG_LEV_DEBUG, "SendFirstSendQue,取NewMsgToPut成功" ); memcpy( newtosend->msgBuf, (char*)(m_arrSendFirst[i]->msgBuf)+sendlen, remained );//复制尚未发完消息; newtosend->msgLen = remained; //delete m_arrSendFirst[i]; g_MsgToPutPool->DsRelease( MsgToPutNO, m_arrSendFirst[i] ); m_arrSendFirst[i] = NULL; m_arrSendFirst[i] = newtosend;//剩余待发消息替换已部分发送消息; m_sendfirstSt = i;//下次的发送起点; isallsend = false; NewLog( LOG_LEV_INFO, "socket%d, 部分发送1,置下次再写通知", m_fdSocket ); SetEPollCareFlag( EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLONESHOT ); NewLog( LOG_LEV_INFO, "SendFirstSendQue,socket%d,部分写1care下次写", m_fdSocket ); return false;//只发送了一部分,未全部发送完毕; } //m_arrSendFirst[i]对应的消息已完全发送,删去此消息; //delete m_arrSendFirst[i]; g_MsgToPutPool->DsRelease( MsgToPutNO, m_arrSendFirst[i] ); m_arrSendFirst[i] = NULL; sendlen -= curmsglen;//已发送数据长度中减去m_arrSendFirst[i]的消息长度; m_sendfirstSt = i+1;//下次发送起点; } if ( !isallsend ) { NewLog( LOG_LEV_INFO, "socket%d, 部分发送2,置下次再写通知", m_fdSocket ); SetEPollCareFlag( EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLONESHOT ); NewLog( LOG_LEV_INFO, "SendFirstSendQue,socket%d,部分写2care下次写", m_fdSocket ); return false;//只发送了一部分,未全部发送完毕; } //优先队列中信息已全部发送; m_sendfirstSt = m_sendfirstEnd = 0;//重置优先发送队列; SetEPollCareFlag( EPOLLIN | EPOLLERR | EPOLLONESHOT ); NewLog( LOG_LEV_INFO, "SendFirstSendQue,socket%d,写成功care下次读", m_fdSocket ); return true; } ///监听本地端口(应用线程上下文); bool CLSocket::LSListen( const char* localAddr, unsigned short localPort ) { if ( NULL == localAddr ) { NewLog( LOG_LEV_ERROR, "LSListen,输入的本地地址空" ); return false; } if ( strlen( localAddr )+1 >= sizeof(m_localAddr) ) { NewLog( LOG_LEV_ERROR, "LSListen,输入的本地地址过长" ); return false; } memcpy( m_localAddr, localAddr, strlen(localAddr)+1 ); m_localPort = localPort; InnerListen(); return true; } ///设置socket对应的远端地址; bool CLSocket::SetRemoteAddr( const char* remoteAddr, unsigned short remotePort ) { if ( NULL == remoteAddr ) { NewLog( LOG_LEV_ERROR, "SetRemoteAddr,输入的远端地址空" ); return false; } if ( strlen( remoteAddr )+1 >= sizeof(m_remoteAddr) ) { NewLog( LOG_LEV_ERROR, "SetRemoteAddr,输入的远端地址过长" ); return false; } memcpy( m_remoteAddr, remoteAddr, strlen(remoteAddr)+1 ); m_remotePort = remotePort; return true; } ///连接远端地址(应用线程上下文); bool CLSocket::LSConnect( const char* remoteAddr, unsigned short remotePort ) { if ( NULL == remoteAddr ) { NewLog( LOG_LEV_ERROR, "LSConnect,输入的远端地址空" ); return false; } if ( strlen( remoteAddr )+1 >= sizeof(m_remoteAddr) ) { NewLog( LOG_LEV_ERROR, "LSConnect,输入的远端地址过长" ); return false; } if ( !SetRemoteAddr( remoteAddr, remotePort ) ) { NewLog( LOG_LEV_ERROR, "LSConnect,输入的远端地址错误" ); return false; } if ( !InnerConnect() ) { NewLog( LOG_LEV_WARNING, "socket%d连接远端(%s:%d)失败,准备断连", GetInnerSocket(), remoteAddr, remotePort ); IssueDestory(); return false; } return true; } #ifdef USE_CRYPT ///信息加密发送,并且会在内部拷贝一份待发信息,因此pToSend可为自动变量,调用完毕后可被释放或重用; bool CLSocket::LSSendMsgCryptInnerCpy( char* pToSend, int toSendLen ) { if ( ( NULL == pToSend ) || ( toSendLen <= 0 ) ) { NewLog( LOG_LEV_ERROR, "CLSocket::LSSendMsgCryptInnerCpy,socket%d,发送参数错误", m_fdSocket ); return false; } if ( ( !IsConnected() ) || ( NULL == m_pSendQueue ) || ( m_bActiveClose ) //已置断连标记,不再向外发送数据 ) { //由于pToSend由调用本函数者管理,因此不必与LSSendMsg一样释放这些待发数据; //NewLog( LOG_LEV_INFO, "CLSocket::LSSendMsg,socket%d,向未连接socket发送数据,或发送队列空", m_fdSocket ); return false; } NewMsgToPut* pMsg = g_MsgToPutPool->DsRetrieve( MsgToPutNO ); if ( NULL == pMsg ) { NewLog( LOG_LEV_ERROR, "CLSocket::LSSendMsgCryptInnerCpy,socket%d,分配发送用NewMsgToPut失败" ); return false; } //加密标志检测; bool isStSendCrypt = false; if ( sizeof(StEncDec) == toSendLen ) { if ( !(m_Crypt.IsStSendCrypt()) ) { //尚未开始发送加密,检测是否为开启发送加密指示; StEncDec* tmpSend = (StEncDec*) (pToSend); if ( tmpSend->IsValidStEncDec() ) { //往后发包为加密包,先以明文发送本包,本包发送完毕后,将设置加密开始,以后的包将加密发送; isStSendCrypt = true; } } } if ( !isStSendCrypt ) { m_Crypt.EncryptSendPkg( (const unsigned char*)pToSend, toSendLen, (unsigned char*)pMsg->msgBuf, sizeof(pMsg->msgBuf), (unsigned int&)pMsg->msgLen ); //if ( m_Crypt.IsStSendCrypt() ) //{ // NewLog( LOG_LEV_DEBUG, "发送加密包,%d:%d", toSendLen, pMsg->msgLen ); //} else { // NewLog( LOG_LEV_DEBUG, "发送明文包,%d:%d", toSendLen, pMsg->msgLen ); //} } else { memcpy( pMsg->msgBuf, pToSend, toSendLen );//加密标记原样发送; pMsg->msgLen = toSendLen; //NewLog( LOG_LEV_DEBUG, "发送加密开始标记,%d:%d", toSendLen, pMsg->msgLen ); m_Crypt.StSendCrypt(); } //将待发信息压入发送缓存,然后添加关心EPOLLOUT事件; bool isPushOK = false; int trynum = 0; do { ++trynum; if ( trynum > 500 ) { NewLog( LOG_LEV_WARNING, "CLSocket::LSSendMsg,socket%d,连续push失败,准备断连", m_fdSocket ); //释放待发数据; g_MsgToPutPool->DsRelease( MsgToPutNO, pMsg ); IssueDestory(); return false; } isPushOK = m_pSendQueue->PushEle( &pMsg, 1 ); } while ( !isPushOK ); //NewLog( LOG_LEV_INFO, "CLSocket::LSSendMsg,socket%d,置可写通知", m_fdSocket ); AppSendPost();//发送消息遗漏防止; return true; } #endif //USE_CRYPT ///向远端发送系列消息(应用线程上下文); bool CLSocket::LSSendMsg( NewMsgToPut** pToSend, int toSendNum ) { if ( ( NULL == pToSend ) || ( toSendNum <= 0 ) ) { NewLog( LOG_LEV_ERROR, "CLSocket::LSSendMsg,socket%d,发送参数错误", m_fdSocket ); return false; } if ( ( !IsConnected() ) || ( NULL == m_pSendQueue ) || ( m_bActiveClose ) //已置断连标记,不再向外发送数据 ) { //if ( IsListening() ) //{ // NewLog( LOG_LEV_ERROR, "CLSocket::LSSendMsg, 错误:监听端口试图发送数据" ); //} for ( int i=0; iDsRelease( MsgToPutNO, pToSend[i] ); pToSend[i] = NULL; } //NewLog( LOG_LEV_INFO, "CLSocket::LSSendMsg,socket%d,向未连接socket发送数据,或发送队列空", m_fdSocket ); return false; } //将待发队列压入发送缓存,然后添加关心EPOLLOUT事件; bool isPushOK = false; int trynum = 0; do { ++trynum; if ( trynum > 500 ) { NewLog( LOG_LEV_WARNING, "CLSocket::LSSendMsg,socket%d,连续push失败,准备断连", m_fdSocket ); for ( int i=0; iDsRelease( MsgToPutNO, pToSend[i] ); pToSend[i] = NULL; } IssueDestory(); return false; } isPushOK = m_pSendQueue->PushEle( pToSend, toSendNum ); } while ( !isPushOK ); //NewLog( LOG_LEV_INFO, "CLSocket::LSSendMsg,socket%d,置可写通知", m_fdSocket ); AppSendPost();//发送消息遗漏防止; return true; } ///从远端收系列消息(应用线程上下文); bool CLSocket::LSRcvMsg( NewMsgToPut** pToRcv, const int rcvBufNum, int& rcvMsgNum ) { //有可能是正常,即socket关闭之前收到了消息,此时应该将这些消息取出来递出,if ( !IsConnected() ) //{ // rcvMsgNum = 0; // return false; //} rcvMsgNum = 0; if ( ( NULL == pToRcv ) || ( rcvBufNum <= 0 ) ) { NewLog( LOG_LEV_ERROR, "CLSocket::LSRcvMsg,socket%d,接收参数错误", m_fdSocket ); rcvMsgNum = 0; return false; } if ( NULL == m_pRcvQueue ) { NewLog( LOG_LEV_ERROR, "CLSocket::LSRcvMsg,socket%d,接收队列空", m_fdSocket ); rcvMsgNum = 0; return false; } //从接收队列取尽可能多消息; m_pRcvQueue->PopEle( pToRcv, rcvBufNum, rcvMsgNum ); if ( rcvMsgNum > 0 ) { m_lastrcvTick = GetTickCount();//记录最后收包时刻; } return true; } ///新创建socket成功时回调; void CLSocket::OnNewCreated() { AddToEPoll( EPOLLIN|EPOLLERR|EPOLLONESHOT ); return; } ///连接远端成功; void CLSocket::OnConnected( const char* remoteAddr, unsigned short remotePort ) { NewLog( LOG_LEV_INFO, "socket%d,连接远端成功,%s:%d", m_fdSocket, remoteAddr, remotePort ); //已连接; //NewLog( LOG_LEV_INFO, "socket%d,连接成功置下次收事件", m_fdSocket ); SetEPollCareFlag( EPOLLIN | EPOLLERR | EPOLLONESHOT );//等待接收事件; NewLog( LOG_LEV_INFO, "OnConnected,socket%d,连成功care下次读", m_fdSocket ); SetNeedActiveExplore();//让应用可以检测到此连接成功事件; return; } #include void CLSocket::PrintFdNum() { struct rlimit rlp; getrlimit( RLIMIT_NOFILE, &rlp ); NewLog( LOG_LEV_DEBUG, "当前fd数目限制%d", rlp.rlim_max ); rlp.rlim_cur += 100; int rst = setrlimit( RLIMIT_NOFILE, &rlp ); NewLog( LOG_LEV_DEBUG, "新设fd数目限制为%d,结果%d", rlp.rlim_max, rst ); sleep( 2 ); exit(0); return; } ///使用输入socket作为内部socket; void CLSocket::CreateInnerSocketByFd( int sockfd ) { m_fdSocket = sockfd; int flags = 1; ioctl( m_fdSocket, FIONBIO, &flags ); m_epfdPos = HashSocketFD( m_fdSocket ); //置自身初始状态; NewInconnSetCurStat();//连入连接直接将状态设为已连接,并加入epoll; return; } ///建内部socket; void CLSocket::CreateInnerSocket() { //建连接用socket; m_fdSocket = socket( AF_INET, SOCK_STREAM, 0 ); int tmperr = errno; if ( -1 == m_fdSocket ) { NewLog( LOG_LEV_ERROR, "CLSocket::CreateInnerSocket,创建socket句柄失败%d", tmperr ); if ( EMFILE == tmperr ) { PrintFdNum(); } return; } int flags = 1; ioctl( m_fdSocket, FIONBIO, &flags ); //close no linger //矛盾:为了防止恶意者故意不进行close四步中的某一步,或对方因某种原因无法完成close,导致server端TCP底层不能迅速释放资源终止连接,需要noclinger, // 但这样可能会有被终止连接上的重复分节被不正确地递送到新化身的问题(RFC1337),'unix网络编程',page173; // 现优先考虑前种情况; struct linger tmplinger; tmplinger.l_onoff = 1; tmplinger.l_linger = 0; int rst = setsockopt( m_fdSocket, SOL_SOCKET, SO_LINGER, &tmplinger, sizeof(tmplinger) ); if ( 0 != rst ) { NewLog( LOG_LEV_ERROR, "socket:%d,置LINGER失败!", m_fdSocket ); } //int isNoDelay = 1; //int setnodelay = setsockopt( m_fdSocket, IPPROTO_TCP, TCP_NODELAY, &isNoDelay, sizeof(isNoDelay) ); //if ( 0 != setnodelay ) //{ // int tmperr = errno; // NewLog( LOG_LEV_ERROR, "socket%d 设TCP_NODELAY选项,错误:%d", m_fdSocket, tmperr ); // return; //} m_epfdPos = HashSocketFD( m_fdSocket ); NewOutconnSetCurStat();//新建socket将状态设为新socket,并加入epoll; return; } ///释放内部队列中未处理完毕的消息; bool CLSocket::ReleaseInnerQueue() { int arrSize = 10; NewMsgToPut** useArr = NEW NewMsgToPut*[arrSize]; int validNum = 0; if ( NULL != m_pRcvQueue ) { do { m_pRcvQueue->PopEle( useArr, arrSize, validNum ); if ( validNum > 0 ) { NewLog( LOG_LEV_INFO, "ReleaseInnerQueue,未处理完的接收队列%d条消息!", validNum ); for ( int i=0; iDsRelease( MsgToPutNO, useArr[i] ); useArr[i] = NULL; } } } while ( validNum == arrSize );//还有消息在队列中,则继续处理完; } if ( NULL != m_pSendQueue ) { do { m_pSendQueue->PopEle( useArr, arrSize, validNum ); if ( validNum > 0 ) { NewLog( LOG_LEV_INFO, "ReleaseInnerQueue,未处理完的发送队列%d条消息!", validNum ); for ( int i=0; iDsRelease( MsgToPutNO, useArr[i] ); useArr[i] = NULL; } } } while ( validNum == arrSize );//还有消息在队列中,则继续处理完; } delete [] useArr; useArr = NULL; for ( int i=m_sendfirstSt; iDsRelease( MsgToPutNO, m_arrSendFirst[i] ); m_arrSendFirst[i] = NULL; } m_sendfirstSt = m_sendfirstEnd = 0; return true; } ///被CManFrame主线程调用,一般是之前将自身设为活动的结果(//注意:有可能当前欲遍历句柄实际已无效,具体参见CManFrame中ClearActiveFlag中的说明); bool CLSocket::OnActiveBeExplored() { NewLog( LOG_LEV_INFO, "CLSocket::OnActiveBeExplored,自身在框架中ID:%d", GetSelfFrameID() ); int rcvArrSize = 10; NewMsgToPut** rcvArr = NEW NewMsgToPut*[rcvArrSize]; int rcvMsgNum = 0; if ( !IsConnected() ) { NewLog( LOG_LEV_INFO, "CLSocket::OnActiveBeExplored,未连接socket被遍历" ); do { LSRcvMsg( rcvArr, rcvArrSize, rcvMsgNum ); if ( rcvMsgNum > 0 ) { NewLog( LOG_LEV_INFO, "关闭时,socket%d已收到%d条消息,收完!", m_fdSocket, rcvMsgNum ); for ( int i=0; imsgBuf[2]) ); unsigned short wCmd = rcvArr[i]->msgCmd;//本次消息的命令字; unsigned int timeInfo = rcvArr[i]->timeInfo;//本次消息的客户端发出时间,时间协商之前为0; #ifdef TEST_CODE bool isBroPkg = false; unsigned int pkgid = 0; if ( g_IsSrv ) { //服务端,收到的应该为CGChat; if ( wCmd != CGChat::wCmd ) { NewLog( LOG_LEV_DEBUG, "服务器,OnActiveBeExplored,调试消息解析错1" ); } CGChat* rcvMsg = (CGChat*) (rcvArr[i]->msgBuf); pkgid = rcvMsg->targetPlayerID.dwPID;//包号; NewLog( LOG_LEV_INFO, "收到消息:%s", rcvMsg->strChat ); } else { //客户端,收到的应该为GCChat if ( wCmd != GCChat::wCmd ) { NewLog( LOG_LEV_DEBUG, "客户端,OnActiveBeExplored,调试消息解析错1" ); } GCChat* rcvMsg = (GCChat*) (rcvArr[i]->msgBuf); pkgid = rcvMsg->sourcePlayerID.dwPID;//包号; NewLog( LOG_LEV_INFO, "收到消息:%s", rcvMsg->strChat ); isBroPkg = rcvMsg->sourcePlayerID.wGID>0;//测试广播消息,是否广播包; if ( isBroPkg ) { if ( 0 == m_brorcvid ) { //新元素收到的第一个广播包; m_brorcvid = rcvMsg->sourcePlayerID.dwPID - 1;//赋初值,以通过下面的检查; } if ( (m_brorcvid+1) != rcvMsg->sourcePlayerID.dwPID ) { NewLog( LOG_LEV_ERROR, "关闭消息处理,广播消息收包序号%d(%d)错", pkgid, m_brorcvid ); sleep( 2 );//int jjj exit(0); } ++m_brorcvid; } } OnRcvedMsg( isBroPkg, pkgid, rcvArr[i]->msgLen ); #else //TEST_CODE OnPkgRcved( timeInfo, wCmd, rcvArr[i]->msgBuf, rcvArr[i]->msgLen );//直接上递; #endif //TEST_CODE //delete rcvArr[i]; g_MsgToPutPool->DsRelease( MsgToPutNO, rcvArr[i] ); rcvArr[i] = NULL; } } } while ( rcvMsgNum == rcvArrSize );//还有待收消息在队列中,则继续收完为止; delete [] rcvArr; rcvArr = NULL; return false;//指示ManFrame此socket已无效,应删去相应指针; } if ( !m_isConnedInitExeced ) { //初次连接成功,给应用对象一个执行相应初始化操作的机会; m_isConnedInitExeced = true; OnConnedInit(); } //以下正常遍历; do { LSRcvMsg( rcvArr, rcvArrSize, rcvMsgNum ); if ( rcvMsgNum > 0 ) { NewLog( LOG_LEV_INFO, "socket%d收到%d条消息, echo...", m_fdSocket, rcvMsgNum ); for ( int i=0; imsgBuf[2]) ); unsigned short wCmd = rcvArr[i]->msgCmd;//本次消息的命令字; unsigned int timeInfo = rcvArr[i]->timeInfo;//本次消息的客户端发出时间,时间协商之前为0; #ifdef TEST_CODE bool isBroPkg = false; unsigned int pkgid = 0; if ( g_IsSrv ) { //服务端; if ( wCmd != CGChat::wCmd ) { NewLog( LOG_LEV_DEBUG, "服务器,OnActiveBeExplored,调试消息解析错1" ); } CGChat* rcvMsg = (CGChat*) (rcvArr[i]->msgBuf); pkgid = rcvMsg->targetPlayerID.dwPID;//包号; NewLog( LOG_LEV_INFO, "收到消息:%s", rcvMsg->strChat ); } else { //客户端 if ( wCmd != GCChat::wCmd ) { NewLog( LOG_LEV_DEBUG, "客户端,OnActiveBeExplored,调试消息解析错1" ); } GCChat* rcvMsg = (GCChat*) (rcvArr[i]->msgBuf); pkgid = rcvMsg->sourcePlayerID.dwPID;//包号; NewLog( LOG_LEV_INFO, "收到消息:%s", rcvMsg->strChat ); isBroPkg = rcvMsg->sourcePlayerID.wGID>0;//测试广播消息,是否广播包; if ( isBroPkg ) { if ( 0 == m_brorcvid ) { //新元素收到的第一个广播包; m_brorcvid = rcvMsg->sourcePlayerID.dwPID - 1;//赋初值,以通过下面的检查; } if ( (m_brorcvid+1) != rcvMsg->sourcePlayerID.dwPID ) { NewLog( LOG_LEV_ERROR, "正常消息处理,广播消息收包序号%d(%d)错", pkgid, m_brorcvid ); sleep( 2 );//int jjj exit(0); } ++m_brorcvid; } } OnRcvedMsg( isBroPkg, pkgid, rcvArr[i]->msgLen ); #else //TEST_CODE OnPkgRcved( timeInfo, wCmd, rcvArr[i]->msgBuf, rcvArr[i]->msgLen );//直接上递; #endif //TEST_CODE //delete rcvArr[i]; g_MsgToPutPool->DsRelease( MsgToPutNO, rcvArr[i] ); rcvArr[i] = NULL;//不再直接echo,改为更加可控的定时发送; } //LSSendMsg( rcvArr, rcvMsgNum );//不再直接echo,改为更加可控的定时发送; } } while ( rcvMsgNum == rcvArrSize );//还有待收消息在队列中,则继续收完为止; delete [] rcvArr; rcvArr = NULL; return true; }; ///被CManFrame调用,一般为定时任务; bool CLSocket::OnValidBeExplored() { if ( IsListening() ) { #ifdef TEST_CODE if ( g_IsSrv ) { //服务器端,测试向客户端广播消息 static const char* sendmsg = "hello, just for test!hello, just for test!hello, just for test!hello, just for test!"; static unsigned broedid = 0; ++broedid; GCChat testmsg;//监听端口,服务器,发送GC信息 memset( &testmsg, 0, sizeof(testmsg) ); testmsg.sourcePlayerID.wGID = 1;//表明为广播包; testmsg.sourcePlayerID.dwPID = broedid;//最后一次的发包序号; strcpy( testmsg.strChat, sendmsg ); testmsg.chatSize = strlen(testmsg.strChat) + 1; NewMsgToPut* pToSendMsg = CNewPkgBuild::CreatePkg(&testmsg); if ( NULL == pToSendMsg ) { NewLog( LOG_LEV_ERROR, "OnValidBeExplored,创建测试发送消息失败" ); return false; } NewLog( LOG_LEV_INFO, "OnValidBeExplored,广播消息发送" ); IssueBrocastMsg( pToSendMsg ); } #endif //TEST_CODE return true;//监听socket; } unsigned int rcvpassed = GetTickCount() - m_lastrcvTick; #ifdef TEST_CODE //正常情况下,两端都检测不活动连接,如果仅仅测试广播包,则服务器端不进行此项检测,开此if判断;if ( !g_IsSrv )//测试广播消息,因为测试时只服务器向客户端发包; #endif //TEST_CODE { if ( rcvpassed > 1000*10/*10秒*/ ) { NewLog( LOG_LEV_ERROR, "OnValidBeExplored,socket%d隔太长时间(%d)未收到包,准备断连", GetInnerSocket(), rcvpassed ); if ( rcvpassed < 1000*10*5/*50秒*/ ) { IssueDestory(); } else { m_lastrcvTick = GetTickCount();//防止反复发断连请求,破坏后续逻辑(断开时刻,断开标记等等); NewLog( LOG_LEV_ERROR, "OnValidBeExplored,socket%d隔太长时间(%d)未收到包,可能已丢失断连请求,强制再发断连", GetInnerSocket(), rcvpassed ); IssueDestory( true );//强制再发断开请求 } return true; } } if ( IsConnected() ) { OnBeExploredByFrame(); } return true; } #endif //WIN32