/** * @file bufqueueimp.h * @brief 双缓冲队列实现头文件 * Copyright(c) 2007,上海第九城市游戏研发部 * All rights reserved * 文件名称: bufqueueimp.h * 摘 要: 双缓冲队列实现头文件 * 作 者: dzj * 完成日期: 2007.11.20 */ #pragma once #include "BufQueue.h" #include #ifdef DS_EPOLL #include "../../Test/testthreadque_wait/dslock.h" //使用DSEvent; #endif //DS_EPOLL ///双缓冲队列实现类,模板参数为队列中的消息类型; ///MSG_TYPE:队列中的消息类型; ///READ_SIG_TYPE:信号量类型,其定义的信号量指示缓冲中是否有消息可读; MsgToPut* PopMsgToPutFromList( list* pList ); #define BATPKG_NUM 500 class CBufQueue : public IBufQueue { typedef ACE_Thread_Mutex READCOND_MUTEX; private: CBufQueue( const CBufQueue& inBufQueue ); //屏蔽这两个操作; CBufQueue& operator = ( const CBufQueue& inBufQueue );//屏蔽这两个操作; public: #ifdef DS_EPOLL explicit CBufQueue( DsEvent& inEvent, int signo = MYSIGREAD ) : m_DsEvent( inEvent ) #else //DS_EPOLL explicit CBufQueue( int signo = MYSIGREAD ) #endif //DS_EPOLL { TRY_BEGIN; m_pInQueueEx = NEW list; m_pOutQueueEx = NEW list; m_pInQueueEx->clear(); m_pOutQueueEx->clear(); m_ReadSig = signo; #ifndef DS_EPOLL pReadCond = NEW ACE_Condition(condMutex); #endif //DS_EPOLL TRY_END; } virtual ~CBufQueue(); public: ////////////////////////////////////////////////////////////////////////// ///写信息使用; /** * 置可读信号,促使等待读线程进行读操作,由写信息线程在特定时刻调用; * 本函数仅在this用作主逻辑线程向网络线程发消息缓冲时有用,!!! 目前暂时的策略,如果队列长度达到一定程度,则自动激活写事件; * 用作网络线程发消息到主逻辑线程缓冲时不必使用,因为主逻辑线程会定时进行读网络消息的操作; * 成功返回0,其它返回值表明失败; */ ///设置读信号; virtual int SetReadEvent( bool isForce = false/*是否强制设置读事件,用于终止读线程*/ ); ///调用以等待读信号; virtual void WaitReadEvent(); ///调用以等待读信号或者超时,等到信号时isReadConFulfil=true,否则,如果是超时返回,则isReadConFulfil=false; virtual void WaitReadEventByTimeOut( bool& isReadConFulfil ); ///将信息放入缓冲; virtual int PushMsg( MsgToPut* pInMsg ); ///写信息使用; ////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////// ///读信息使用 /** * 准备进行一系列读,应在每次执行一系列读操作之前进行一次 * 内部操作为:将之前填满的输入队列变成输出队列,同时将之前空的输出队列转成新的输入队列; * 例: pBufQueue->PreSerialRead(); pMsg = pBufQueue->PopMsg(); while ( NULL != pMsg ) { ...process pMsg; delete pMsg; pMsg = pBufQueue->PopMsg(); } ...other process; */ virtual list* PreSerialRead( list* inQueue ); // virtual list* SwitchOutMsgQueue( list* inQueue ) // { // TRY_BEGIN; // // list* pTmpQueue = NULL; //#ifdef ACE_WIN32 // pTmpQueue = (list*) ::InterlockedExchangePointer( (void**)&m_pOutQueueEx, inQueue ); //#else //ACE_WIN32 // ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock, NULL ); // pTmpQueue = m_pOutQueueEx; // m_pOutQueueEx = inQueue; //#endif //ACE_WIN32 // // return pTmpQueue; // TRY_END; // return NULL; // } /** * 清可读信号,防止反复收到可读消息; * 本函数仅在this用作主逻辑线程向网络线程发消息缓冲时有用,由网络读信号响应函数调用; * 用作网络线程发消息到主逻辑线程缓冲时不必使用,因为主逻辑线程会定时进行读网络消息的操作; */ virtual int ResetReadEvent(); ///从缓冲中取信息; virtual MsgToPut* PopMsg(); ///读信息使用 ////////////////////////////////////////////////////////////////////////// ///取缓冲可读信号量; virtual READ_SIG_TYPE& GetReadEvent(); private: ///用于输入的队列; //ACE_Message_Queue* m_pInQueue; list* m_pInQueueEx; ///用于输出的队列; //ACE_Message_Queue* m_pOutQueue; list* m_pOutQueueEx; ///交换队列时(或析构时)用到的读写锁; ACE_RW_Thread_Mutex m_MsgSwitchLock; ///指示本缓存有信息可读的信号; READ_SIG_TYPE m_ReadSig; #ifdef DS_EPOLL DsEvent& m_DsEvent;//框架线程等待事件; #else //DS_EPOLL READCOND_MUTEX condMutex; ACE_Condition* pReadCond; #endif //DS_EPOLL }; /////双缓冲队列实现类,模板参数为队列中的消息类型; /////MSG_TYPE:队列中的消息类型; /////READ_SIG_TYPE:信号量类型,其定义的信号量指示缓冲中是否有消息可读; //class CBufQueue : public IBufQueue //{ //public: // CBufQueue( int signo = MYSIGREAD ) // { // m_pInQueueEx = NEW ACE_Message_Queue_Ex; // m_pOutQueueEx = NEW ACE_Message_Queue_Ex; // // m_ReadSig = signo; // } // // ~CBufQueue() // { ////#ifdef ACE_WIN32 //// m_ReadSig.reset(); //// //CloseHandle( m_ReadSig ); ////#else //ACE_WIN32 // m_ReadSig = MYSIGREAD; ////#endif //ACE_WIN32 // ACE_WRITE_GUARD( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock ); // //把尚未处理的包全部删掉; // if ( NULL != m_pInQueueEx ) // { // MsgToPut* pOutMsg = NULL; // m_pInQueueEx->dequeue_head( pOutMsg ); // while ( NULL != pOutMsg ) // { // g_poolMsgToPut->Release( pOutMsg ); // pOutMsg = NULL; // m_pInQueueEx->dequeue_head( pOutMsg ); // } // //ACE_Message_Block* pPopedMsgBlock = NULL;//如果队列中没有了消息块,则dequeue_head不会自动将传入指针置空; // //m_pInQueue->dequeue_head( pPopedMsgBlock ); // //while ( NULL != pPopedMsgBlock ) // //{ // // MsgToPut* pOutMsg = NULL; // // memcpy( &pOutMsg, pPopedMsgBlock->rd_ptr(), sizeof(MsgToPut*) );//!!!注意:此处的指针内容拷贝以后修改; // // g_poolMsgToPut->Release( pOutMsg ); // // delete pPopedMsgBlock; pPopedMsgBlock = NULL;//如果队列中没有了消息块,则dequeue_head不会自动将传入指针置空;//内存池修改 // // m_pInQueue->dequeue_head( pPopedMsgBlock ); // //} // } // if ( NULL != m_pOutQueueEx ) // { // MsgToPut* pOutMsg = NULL; // m_pOutQueueEx->dequeue_head( pOutMsg ); // while ( NULL != pOutMsg ) // { // g_poolMsgToPut->Release( pOutMsg ); // pOutMsg = NULL; // m_pOutQueueEx->dequeue_head( pOutMsg ); // } // //ACE_Message_Block* pPopedMsgBlock = NULL; // //m_pOutQueue->dequeue_head( pPopedMsgBlock ); // //while ( NULL != pPopedMsgBlock ) // //{ // // MsgToPut* pOutMsg = NULL; // // memcpy( &pOutMsg, pPopedMsgBlock->rd_ptr(), sizeof(MsgToPut*) );//!!!注意:此处的指针内容拷贝以后修改; // // g_poolMsgToPut->Release( pOutMsg ); // // delete pPopedMsgBlock; pPopedMsgBlock = NULL;//如果队列中没有了消息块,则dequeue_head不会自动将传入指针置空;//内存池修改 // // m_pOutQueue->dequeue_head( pPopedMsgBlock ); // //} // } // m_pInQueueEx->close(); // m_pOutQueueEx->close(); // delete m_pInQueueEx; m_pInQueueEx = NULL; // delete m_pOutQueueEx; m_pOutQueueEx = NULL; // } // //public: // // ////////////////////////////////////////////////////////////////////////// // ///写信息使用; // /** // * 置可读信号,促使等待读线程进行读操作,由写信息线程在特定时刻调用; // * 本函数仅在this用作主逻辑线程向网络线程发消息缓冲时有用,!!! 目前暂时的策略,如果队列长度达到一定程度,则自动激活写事件; // * 用作网络线程发消息到主逻辑线程缓冲时不必使用,因为主逻辑线程会定时进行读网络消息的操作; // * 成功返回0,其它返回值表明失败; // */ // virtual int SetReadEvent() // { ////#ifdef ACE_WIN32 //// return m_ReadSig.signal(); ////#else //ACE_WIN32 // return raise( m_ReadSig ); ////#endif //ACE_WIN32 // }; // ///将信息放入缓冲; // virtual int PushMsg( MsgToPut* pInMsg ) // { // //读写信息时获取m_MsgSwitchLock的读锁; // if ( NULL == pInMsg ) // { // //空信息也认为操作成功; // return 0; // } // if ( NULL == m_pInQueueEx ) // { // //内部队列空,直接将收包删掉; // g_poolMsgToPut->Release( pInMsg ); // return -1; // } // ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock, -1 ); // if ( -1 == m_pInQueueEx->enqueue_tail( pInMsg )) // { // if ( errno == EWOULDBLOCK ) // { // int jjj = 0; // } // D_WARNING( "入缓冲队列错误;\n" );//入缓冲队列错误; // return -1; // } // return 0; // }; // ///写信息使用; // ////////////////////////////////////////////////////////////////////////// // // ////////////////////////////////////////////////////////////////////////// // ///读信息使用 // /** // * 准备进行一系列读,应在每次执行一系列读操作之前进行一次 // * 内部操作为:将之前填满的输入队列变成输出队列,同时将之前空的输出队列转成新的输入队列; // * 例: // pBufQueue->PreSerialRead(); // pMsg = pBufQueue->PopMsg(); // while ( NULL != pMsg ) // { // ...process pMsg; // delete pMsg; // pMsg = pBufQueue->PopMsg(); // } // ...other process; // */ // virtual int PreSerialRead() // { // //交换队列时获取m_MsgSwitchLock的写锁; // ACE_WRITE_GUARD_RETURN( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock, -1 ); // //交换两个队列指针,是否要判断各队列是否空?? // ACE_Message_Queue_Ex< MsgToPut, ACE_NULL_SYNCH >* tmpPtr = m_pInQueueEx; // m_pInQueueEx = m_pOutQueueEx; // m_pOutQueueEx = tmpPtr; // //ACE_Message_Queue< ACE_NULL_SYNCH >* tmpPtr = m_pInQueue; // //m_pInQueue = m_pOutQueue; // //m_pOutQueue = tmpPtr; // return 0; // } // // /** // * 清可读信号,防止反复收到可读消息; // * 本函数仅在this用作主逻辑线程向网络线程发消息缓冲时有用,由网络读信号响应函数调用; // * 用作网络线程发消息到主逻辑线程缓冲时不必使用,因为主逻辑线程会定时进行读网络消息的操作; // */ // virtual int ResetReadEvent() // { // //m_ReadSig.reset(); // return 0; // }; // // ///从缓冲中取信息; // virtual MsgToPut* PopMsg() // { // if ( NULL == m_pOutQueueEx ) // { // return NULL; // } // // ACE_READ_GUARD_RETURN( ACE_RW_Thread_Mutex, guard, m_MsgSwitchLock, NULL ); // MsgToPut* tmpOut = NULL; // m_pOutQueueEx->dequeue_head( tmpOut ); // return tmpOut; // //ACE_Message_Block* pPopedMsgBlock = NULL; // //m_pOutQueue->dequeue_head( pPopedMsgBlock );//!!!!!!!!!出队列; // //if ( NULL != pPopedMsgBlock ) // //{ // // MsgToPut* pOutMsg = NULL; // // memcpy( &pOutMsg, pPopedMsgBlock->rd_ptr(), sizeof(MsgToPut*) );//!!!注意:此处的指针内容拷贝以后修改; // // delete pPopedMsgBlock; pPopedMsgBlock = NULL; // // return pOutMsg;//返回之前在信息块中保存的信息指针地址; // //} // //return NULL; // }; // ///读信息使用 // ////////////////////////////////////////////////////////////////////////// // // ///取缓冲可读信号量; // virtual READ_SIG_TYPE& GetReadEvent() // { // return m_ReadSig; // }; // //private: // ///用于输入的队列; // //ACE_Message_Queue* m_pInQueue; // ACE_Message_Queue_Ex* m_pInQueueEx; // // ///用于输出的队列; // //ACE_Message_Queue* m_pOutQueue; // ACE_Message_Queue_Ex* m_pOutQueueEx; // // ///交换队列时(或析构时)用到的读写锁; // ACE_RW_Thread_Mutex m_MsgSwitchLock; // ///指示本缓存有信息可读的信号; // READ_SIG_TYPE m_ReadSig; //};