/** * @file bufqueueimp.cpp * @brief 双缓冲队列实现 * Copyright(c) 2007,上海第九城市游戏研发部 * All rights reserved * 文件名称: bufqueueimp.cpp * 摘 要: 双缓冲队列实现 * 作 者: dzj * 完成日期: 2007.11.20 */ //#include "stdafx.h" #include "BufQueueImp.h" #ifdef USE_DSIOCP #include "iocp/dsiocp.h" //CRITICAL_SECTION CDsIocp::m_hAllInSocketSection;//m_arrAllInSockets互斥区; DsCS CDsIocp::m_hAllInSocketSection;//m_arrAllInSockets互斥区; set< CDsSocket* > CDsIocp::m_arrAllInSockets;//所有监听到的连接,用于群发消息(目前只有gatesrv向所有玩家广播消息时用到),之所以用set,是为了防止遍历时对同一元素遍历多次; #endif //USE_DSIOCP MsgToPut* PopMsgToPutFromList( list* pList ) { TRY_BEGIN; if ( NULL == pList ) { return NULL; } if ( pList->empty() ) { return NULL; } else { MsgToPut* tmpReturn = pList->front(); pList->pop_front(); return tmpReturn; } TRY_END; return NULL; } CBufQueue::~CBufQueue() { TRY_BEGIN; //#ifdef ACE_WIN32 // m_ReadSig.reset(); // //CloseHandle( m_ReadSig ); //#else //ACE_WIN32 m_ReadSig = MYSIGREAD; //#endif //ACE_WIN32 ACE_WRITE_GUARD( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock ); //把尚未处理的包全部删掉; if ( NULL != m_pInQueueEx ) { MsgToPut* pOutMsg = PopMsgToPutFromList( m_pInQueueEx ); while ( NULL != pOutMsg ) { if ( NULL != g_poolMsgToPut ) { g_poolMsgToPut->Release( pOutMsg ); pOutMsg = NULL; } pOutMsg = PopMsgToPutFromList( m_pInQueueEx ); } //ACE_Message_Block* pPopedMsgBlock = NULL;//如果队列中没有了消息块,则dequeue_head不会自动将传入指针置空; //m_pInQueue->dequeue_head( pPopedMsgBlock ); //while ( NULL != pPopedMsgBlock ) //{ // MsgToPut* pOutMsg = NULL; // memcpy( &pOutMsg, pPopedMsgBlock->rd_ptr(), sizeof(MsgToPut*) );//!!!注意:此处的指针内容拷贝以后修改; // g_poolMsgToPut->Release( pOutMsg ); // delete pPopedMsgBlock; pPopedMsgBlock = NULL;//如果队列中没有了消息块,则dequeue_head不会自动将传入指针置空;//内存池修改 // m_pInQueue->dequeue_head( pPopedMsgBlock ); //} } if ( NULL != m_pOutQueueEx ) { MsgToPut* pOutMsg = PopMsgToPutFromList( m_pOutQueueEx ); while ( NULL != pOutMsg ) { if ( NULL != g_poolMsgToPut ) { g_poolMsgToPut->Release( pOutMsg ); pOutMsg = NULL; } pOutMsg = PopMsgToPutFromList( m_pOutQueueEx ); } //ACE_Message_Block* pPopedMsgBlock = NULL; //m_pOutQueue->dequeue_head( pPopedMsgBlock ); //while ( NULL != pPopedMsgBlock ) //{ // MsgToPut* pOutMsg = NULL; // memcpy( &pOutMsg, pPopedMsgBlock->rd_ptr(), sizeof(MsgToPut*) );//!!!注意:此处的指针内容拷贝以后修改; // g_poolMsgToPut->Release( pOutMsg ); // delete pPopedMsgBlock; pPopedMsgBlock = NULL;//如果队列中没有了消息块,则dequeue_head不会自动将传入指针置空;//内存池修改 // m_pOutQueue->dequeue_head( pPopedMsgBlock ); //} } m_pInQueueEx->clear(); m_pOutQueueEx->clear(); delete m_pInQueueEx; m_pInQueueEx = NULL; delete m_pOutQueueEx; m_pOutQueueEx = NULL; #ifndef DS_EPOLL delete pReadCond; pReadCond = NULL; #endif //DS_EPOLL TRY_END; } ///设置读信号; int CBufQueue::SetReadEvent( bool isForce/*是否强制设置读事件,用于终止读线程*/ ) { #ifdef DS_EPOLL if ( ( !(m_pInQueueEx->empty()) ) || isForce ) { m_DsEvent.SignalEvent();//触发等待在此条件上的线程; } #else //DS_EPOLL condMutex.acquire(); if ( ( !(m_pInQueueEx->empty()) ) || isForce ) { pReadCond->signal();//触发等待在此条件上的线程; } condMutex.release(); #endif //DS_EPOLL return 0; }; ///调用以等待读信号或者超时,等到信号时isReadConFulfil=true,否则,如果是超时返回,则isReadConFulfil=false; void CBufQueue::WaitReadEventByTimeOut( bool& isReadConFulfil ) { #ifdef DS_EPOLL if ( m_pInQueueEx->size() < BATPKG_NUM ) { m_DsEvent.BlockWaitEventByTime( 3 ); } isReadConFulfil = false; if ( m_pInQueueEx->size() > 0 ) { isReadConFulfil = true; } #else //DS_EPOLL condMutex.acquire(); #ifdef ACE_WIN32 if ( m_pInQueueEx->empty() ) #else if ( m_pInQueueEx->size() < BATPKG_NUM ) #endif/*ACE_WIN32*/ { ACE_Time_Value curTime = ACE_OS::gettimeofday();//等待一会; static ACE_Time_Value waitTime( 0, 1000 );//固定等待1ms; curTime += waitTime; isReadConFulfil = ( -1 != ( pReadCond->wait(&curTime) ) );//是否超时; #ifdef ACE_WIN32 #else if ( !isReadConFulfil ) { isReadConFulfil = !(m_pInQueueEx->empty()); } #endif //ACE_WIN32 } else { isReadConFulfil = true; } condMutex.release(); #endif //DS_EPOLL } ///调用以等待读信号; void CBufQueue::WaitReadEvent() { #ifdef DS_EPOLL if ( m_pInQueueEx->size() < BATPKG_NUM ) { m_DsEvent.BlockWaitEventByTime( 3 ); } #else //DS_EPOLL condMutex.acquire(); #ifdef ACE_WIN32 if ( m_pInQueueEx->empty() ) #else if ( m_pInQueueEx->size() < BATPKG_NUM ) #endif/*ACE_WIN32*/ { pReadCond->wait(); } condMutex.release(); #endif //DS_EPOLL } ///将信息放入缓冲; int CBufQueue::PushMsg( MsgToPut* pInMsg ) { TRY_BEGIN; //读写信息时获取m_MsgSwitchLock的读锁; if ( NULL == pInMsg ) { //空信息也认为操作成功; return 0; } if ( NULL == m_pInQueueEx ) { D_ERROR( "PushMsg, m_pInQueueEx空\n" ); //内部队列空,直接将收包删掉; if ( NULL != g_poolMsgToPut ) { g_poolMsgToPut->Release( pInMsg ); pInMsg = NULL; } else { ///错误,g_poolMsgToPut过早释放了; } return -1; } bool issetread = false;//是否置读事件; if (true) { ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock, -1 ); m_pInQueueEx->push_back( pInMsg );//尽量短使用锁; #ifdef ACE_WIN32 issetread = ( m_pInQueueEx->size() > 1 ); #else issetread = ( m_pInQueueEx->size() >= BATPKG_NUM ); #endif/*ACE_WIN32*/ } if ( issetread ) { SetReadEvent(); } return 0; TRY_END; return -1; }; /** * 准备进行一系列读,应在每次执行一系列读操作之前进行一次 * 内部操作为:将之前填满的输入队列变成输出队列,同时将之前空的输出队列转成新的输入队列; * 例: pBufQueue->PreSerialRead(); pMsg = pBufQueue->PopMsg(); while ( NULL != pMsg ) { ...process pMsg; delete pMsg; pMsg = pBufQueue->PopMsg(); } ...other process; */ list* CBufQueue::PreSerialRead( list* inQueue ) { TRY_BEGIN; //交换队列时获取m_MsgSwitchLock的写锁; ACE_WRITE_GUARD_RETURN( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock, NULL ); //交换两个队列指针,是否要判断各队列是否空?? list* tmpPtr = m_pInQueueEx; m_pInQueueEx = m_pOutQueueEx; m_pOutQueueEx = tmpPtr; tmpPtr = m_pOutQueueEx; m_pOutQueueEx = inQueue; return tmpPtr; ///////////////////////////////////////////////////////////////////////////////// ////int jjj, test //if ( m_pOutQueueEx->size() > 2000 ) //{ // D_DEBUG( "queue size : %d\n", m_pOutQueueEx->size() ); //} ////int jjj, test ///////////////////////////////////////////////////////////////////////////////// //ACE_Message_Queue< ACE_NULL_SYNCH >* tmpPtr = m_pInQueue; //m_pInQueue = m_pOutQueue; //m_pOutQueue = tmpPtr; //return 0; TRY_END; return 0; } /** * 清可读信号,防止反复收到可读消息; * 本函数仅在this用作主逻辑线程向网络线程发消息缓冲时有用,由网络读信号响应函数调用; * 用作网络线程发消息到主逻辑线程缓冲时不必使用,因为主逻辑线程会定时进行读网络消息的操作; */ int CBufQueue::ResetReadEvent() { //m_ReadSig.reset(); return 0; }; ///从缓冲中取信息; MsgToPut* CBufQueue::PopMsg() { TRY_BEGIN; if ( NULL == m_pOutQueueEx ) { return NULL; } ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock, NULL ); MsgToPut* tmpOut = PopMsgToPutFromList( m_pOutQueueEx ); return tmpOut; //ACE_Message_Block* pPopedMsgBlock = NULL; //m_pOutQueue->dequeue_head( pPopedMsgBlock );//!!!!!!!!!出队列; //if ( NULL != pPopedMsgBlock ) //{ // MsgToPut* pOutMsg = NULL; // memcpy( &pOutMsg, pPopedMsgBlock->rd_ptr(), sizeof(MsgToPut*) );//!!!注意:此处的指针内容拷贝以后修改; // delete pPopedMsgBlock; pPopedMsgBlock = NULL; // return pOutMsg;//返回之前在信息块中保存的信息指针地址; //} //return NULL; TRY_END; return NULL; }; ///取缓冲可读信号量; READ_SIG_TYPE& CBufQueue::GetReadEvent() { return m_ReadSig; };