/* 包含两个线程的对象,线程间使用cirqueue通信 by dzj, 10.08.03 */ #include "Utility.h" #include "../../Test/testthreadque_wait/cirqueue.h" #include "../../Test/testthreadque_wait/dslock.h" #include "../../Test/testthreadque_wait/buflog.h" #include "../MysqlWrapper.h" template< typename T_InnerMsg, unsigned int CIR_INNER_SIZE > class CDsTwoThread { public: CDsTwoThread() { StructMemSet( m_sqlConnStr, 0, sizeof(m_sqlConnStr) ); m_pAssistSqlConn = NULL; m_isInited = false; m_quitFlag = false; m_inMsgQueue = NULL; m_outMsgQueue = NULL; } virtual ~CDsTwoThread() { ReleaseDsTwoThread(); } public: //添加一个DB任务; bool AddOneAssistTask( T_InnerMsg* pTask ) { return MainPassToAssist( &pTask, 1 ); } //定时调用时钟,处理可能的DB相关结果; bool MainThreadTimer() { MainTryProc(); return true; } protected: //真正工作之前调用; bool WorkPre( stConnectionString* pszConnectionString ) { return InitDsTwoThread( pszConnectionString ); } //所有工作完毕之后调用; bool WorkPost() { return ReleaseDsTwoThread(); } private: //主线程主动调用以初始化(对象构造完整后调用); bool InitDsTwoThread( stConnectionString* pszConnectionString ) { if ( NULL == pszConnectionString ) { NewLog( LOG_LEV_ERROR, "InitDsTwoThread, NULL == pszConnectionString!"); return false; } StructMemCpy( m_sqlConnStr, pszConnectionString, sizeof(m_sqlConnStr) );//内部保存connStr; m_quitFlag = false; m_inMsgQueue = NEW CirQueue< T_InnerMsg, CIR_INNER_SIZE, 9999999 >(NULL/*不使用可读通知*/);//等待辅助线程处理队列; m_outMsgQueue = NEW CirQueue< T_InnerMsg, CIR_INNER_SIZE, 9999999 >(NULL/*不使用可读通知*/);//等待主线程处理队列; m_isInited = true; //启动写IO线程; #ifndef WIN32 unsigned long selfid = pthread_self(); int err = pthread_create( &m_iothread, NULL, &AssistThread, this ); if ( 0 != err ) { NewLog( LOG_LEV_ERROR, "%x:CDsTwoThread创建辅助线程失败", selfid ); return false; } else { NewLog( LOG_LEV_DEBUG, "%x:CDsTwoThread创建辅助线程%x成功\n", selfid, (unsigned long)m_iothread ); } #else //WIN32 unsigned long selfid = ::GetCurrentThreadId(); unsigned long createid = 0; m_iothread = HXBCBEGINTHREADEX( NULL, 0, &AssistThread, this, 0, &createid ); if ( NULL == m_iothread ) { NewLog( LOG_LEV_ERROR, "应用线程%x:CDsTwoThread创建辅助线程失败\n", selfid ); return false; } else { NewLog( LOG_LEV_ERROR, "应用线程%x:CDsTwoThread创建辅助线程成功\n", selfid, createid ); } #endif //WIN32 return true; } //主线程主动调用以释放; bool ReleaseDsTwoThread() { if ( !m_isInited ) { return true; } m_quitFlag = true; #ifdef WIN32 ::WaitForSingleObject( m_iothread, INFINITE );//等待IO线程结束; unsigned long selfid = ::GetCurrentThreadId(); #else //WIN32 pthread_join( m_iothread, NULL ); unsigned long selfid = pthread_self(); #endif //WIN32 MainTryProc();//辅助线程结束之后再尝试处理一次m_outMsgQueue,确保待处理队列全处理完毕,以防止遗漏; if ( NULL != m_inMsgQueue ) { delete m_inMsgQueue; m_inMsgQueue = NULL; } if ( NULL != m_outMsgQueue ) { delete m_outMsgQueue; m_outMsgQueue = NULL; } m_isInited = false; return true; } private: //主线程执行,尝试处理辅助线程返回的各个结果; bool MainTryProc() { if ( NULL == m_outMsgQueue ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::MainTryProc,NULL == m_outMsgQueue!" ); return false; } T_InnerMsg* tmpDataArr[256]; int toProcNum = 0; bool isGot = false; isGot = m_outMsgQueue->PopEle( (T_InnerMsg**)&tmpDataArr, ARRAY_SIZE(tmpDataArr), toProcNum ); if ( !isGot ) { return true; } if ( toProcNum > 0 ) { if ( toProcNum >= ARRAY_SIZE(tmpDataArr) ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::MainTryProc, 错误的toProcNum:%d!", toProcNum ); return false; } for ( int i=0; i* pOrgIns = (CDsTwoThread< T_InnerMsg, CIR_INNER_SIZE >*) pThis; if ( NULL == pOrgIns ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::AssistThread, NULL == pOrgIns!" ); ThreadEndTls(); return NULL; } //连接DB; if ( !pOrgIns->AssistConnToDB() ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::AssistThread, 辅助线程连接数据库失败,辅助线程不再继续运行!" ); ThreadEndTls(); return NULL; } CirQueue< T_InnerMsg, CIR_INNER_SIZE, 9999999 >* inMsgQueue = pOrgIns->GetInQueue(); T_InnerMsg* tmpDataArr[256]; int toProcNum = 0; bool isGot = false; unsigned int dbKeepAliveCounter = 0; while ( !(pOrgIns->IsQuitFlagSet()) ) { #ifdef WIN32 Sleep( 16 ); #else //WIN32 sleep( 6 ); #endif//WIN32 ++dbKeepAliveCounter; if ( dbKeepAliveCounter > 1000 ) { //每隔一段时间重新ping一次DB以保持连接; dbKeepAliveCounter = 0; pOrgIns->AssistPingDB(); } toProcNum = 0; isGot = false; isGot = inMsgQueue->PopEle( (T_InnerMsg**)&tmpDataArr, ARRAY_SIZE(tmpDataArr), toProcNum ); if ( !isGot ) { continue; } if ( toProcNum > 0 ) { if ( toProcNum >= ARRAY_SIZE(tmpDataArr) ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::AssistThread, 错误的toProcNum:%d!", toProcNum ); continue; } for ( int i=0; iAssistTwoThreadProc( tmpDataArr[i] ); } } } pOrgIns->AssistDisconnFromDB(); ThreadEndTls(); return NULL; } private: //连接DB; bool AssistConnToDB() { if ( NULL != m_pAssistSqlConn ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::AssistConnToDB, NULL != m_pAssistSqlConn" ); return false; } m_pAssistSqlConn = NEW CConnection; bool isConnOK = (0 == m_pAssistSqlConn->Open( &m_sqlConnStr )); if ( !isConnOK ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::AssistConnToDB,连DB失败,%s|%s|%s|%d", m_sqlConnStr.szHost, m_sqlConnStr.szUser, m_sqlConnStr.szDb, m_sqlConnStr.port ); delete m_pAssistSqlConn; m_pAssistSqlConn = NULL; } else { NewLog( LOG_LEV_ERROR, "CDsTwoThread::AssistConnToDB,连DB成功,%s|%s|%s|%d", m_sqlConnStr.szHost, m_sqlConnStr.szUser, m_sqlConnStr.szDb, m_sqlConnStr.port ); } return isConnOK; } bool AssistPingDB() { if ( NULL == m_pAssistSqlConn ) { return false; } m_pAssistSqlConn->Ping(); return true; } //从DB断开; bool AssistDisconnFromDB() { if ( NULL == m_pAssistSqlConn ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::AssistDisconnFromDB, NULL == m_pAssistSqlConn" ); return false; } delete m_pAssistSqlConn; m_pAssistSqlConn = NULL; return true; } private: stConnectionString m_sqlConnStr;//连接字符串; CConnection* m_pAssistSqlConn;//属于辅助线程的mysql连接; private: bool AssistTwoThreadProc( T_InnerMsg* pToProcMsg ) { if ( NULL == pToProcMsg ) { NewLog( LOG_LEV_ERROR, "CRankDBIns::AssistTwoThreadProc, NULL == pToProcMsg" ); return false; } pToProcMsg->AssistProcMsg( m_pAssistSqlConn ); AssistPassToMain( &pToProcMsg, 1 );//处理完后的消息压入输出队列,以便稍后由主线程继续处理; return true; };//辅助线程中处理一个消息; bool MainProcMsg( T_InnerMsg* pToProcMsg ) { if ( NULL == pToProcMsg ) { NewLog( LOG_LEV_ERROR, "CRankDBIns::MainProcMsg, NULL == pToProcMsg" ); return false; } pToProcMsg->MainRstProcMsg(); T_InnerMsg::DelRankDBTask( pToProcMsg );//处理完毕后回收; return true; };//主线程中处理一个消息; private: //只可由辅线程执行,添加等待主线程处理者,若返回false,则可能是内部队列满,此时辅线程应暂时缓存待处理消息,稍后再试; inline bool AssistPassToMain( T_InnerMsg** pMsgArr/*输入待处理消息数组*/, const unsigned int arrNum/*输入数组中有效元素个数*/ ) { if ( NULL == m_outMsgQueue ) { NewLog( LOG_LEV_ERROR, "CDsTwoThread::AssistPassToMain,NULL == m_outMsgQueue!" ); return false; } return m_outMsgQueue->PushEle( pMsgArr, arrNum ); } //只可由主线程执行,添加等待辅助线程处理者,若返回false,则可能是内部队列满,此时主线程应暂时缓存待处理消息,稍后再试; inline bool MainPassToAssist( T_InnerMsg** pMsgArr/*输入待处理消息数组*/, const unsigned int arrNum/*输入数组中有效元素个数*/ ) { if ( m_quitFlag ) { NewLog( LOG_LEV_ERROR, "线程:%x,已发起终止,但仍试图执行, MainPassToAssist" ); return false; } if ( NULL == m_inMsgQueue ) { NewLog( LOG_LEV_ERROR, "MainPassToAssist,NULL == m_inMsgQueue" ); return false; } return m_inMsgQueue->PushEle( pMsgArr, arrNum ); } private: CirQueue< T_InnerMsg, CIR_INNER_SIZE, 9999999 >* m_inMsgQueue;//等待辅助线程处理队列; CirQueue< T_InnerMsg, CIR_INNER_SIZE, 9999999 >* m_outMsgQueue;//等待主线程处理队列; public: inline bool IsQuitFlagSet() { return m_quitFlag; } inline CirQueue< T_InnerMsg, CIR_INNER_SIZE, 9999999 >* GetInQueue() { return m_inMsgQueue; }; private: bool m_isInited; bool m_quitFlag; #ifdef WIN32 HANDLE m_iothread; #else //WIN32 pthread_t m_iothread; #endif //WIN32 };//class CDsTwoThread