/* 单线程读单线程写队列 by dzj, 09.04.08 //两千万测试(忽略linux机器与windows机器的差异): // 1、wait false时windows一般为20单位,比linux下d版本23单位略快(注:这个时间,与简单地分配释放两千万个int的开销基本一样,因此可以认为开销为0) // 2、wait true时windows一般为35单位,比linux下d版本25-31单位略慢, // 3、综合1/2,估计:windows下的SetEvent与WaitForSingleObject效率比linux下的互斥量+条件变量低,可能与两个系统的睡眠激活效率有关。 // 4、windows下d版本与r版本执行时间相差非常大,linux下d版本与r版本没有明显的执行时间差别,但linux下的cond->signal是相对耗时的,应尽量减少 // 5、windows下的wait版本和nowait版本对内部数组大小都不太敏感 // linux下的wait和nowait版本对该内部数组的大小则非常敏感, // 其中nowait版本随内部数组大小的性能提升可能是log(size/100)(测试值2-5000),至于wait版本则更加大,大约是log(size/10)(测试值2-5000); // 这可能说明:linux下的signal和线程切换相对耗时,因为内部数组小的一个结果就是等待次数与线程切换次数的直接增长,但这与3的估计相矛盾,费解。 // // 总体上windows下r版本与linux下r/d版本性能较接近,比使用互斥锁的方法约快一倍; // // 09.04.10,走之前发现windows下wait/nowait差别不大,总体上比linux下的nowait差一点, // 约相当于linux下nowait速度的一半(不考虑机器差异,也不考虑linux上运行了两个测试程序,而windows机器上跑了些其它程序,而且稍后可能要跑测试机器人 // 09.04.13,区分nowait,waitinfinite以及waittime后(同时添加了通知触发阈值以减小signal次数),测试表明,不考虑机器差异,waittime在windows下速度为linux下速度的约2/3。 */ #include "dslock.h" template class CThreadRWQue { public: CThreadRWQue() : m_isstop(false),m_head(0), m_tail(0) #ifdef WIN32 , m_sleepevent( INVALID_HANDLE_VALUE ) #endif //WIN32 , m_writefailnum( 0 ) { #ifndef WIN32 pthread_mutex_init( &(m_foolock), NULL ); pthread_cond_init( &(m_sleepcond), NULL ); #else //WIN32 m_sleepevent = ::CreateEvent( NULL, FALSE/*自动重置*/, FALSE/*初始状态未触发*/, NULL ); #endif //WIN32 }; ~CThreadRWQue() { NotifyReadStop();//通知读停止; #ifndef WIN32 pthread_mutex_destroy( &(m_foolock) ); pthread_cond_destroy( &(m_sleepcond) ); #else //WIN32 ::CloseHandle( m_sleepevent ); #endif //WIN32 }; public: ///仅由单个写线程执行,通知睡眠的读线程有可读元素,仅应在读线程可能睡眠于空队列时使用; inline bool NotifyReadAvailable() { #ifndef WIN32 pthread_cond_signal( &(m_sleepcond) );//通知可能等在条件变量上的读线程; #else //WIN32 ::SetEvent( m_sleepevent ); #endif //WIN32 return true; } ///通知读线程停止读,只用于永远睡眠的读线程; inline bool NotifyReadStop() { m_isstop = true; #ifndef WIN32 pthread_cond_signal( &(m_sleepcond) );//通知可能等在条件变量上的读线程; sleep( 2 ); #else //WIN32 ::SetEvent( m_sleepevent ); ::Sleep( 2000 );//等待2秒; #endif //WIN32 return true; } ///仅由单个写线程执行,压入元素; inline bool PushEle( T_Ele* pToPush ) { ////////////////////////////////////////////////////////////////////////////////////// //保留当前快照,防止值在此期间修改(>=INNER_SIZE,但还未来得及修正为0的瞬时) //只需存m_head,因为m_tail只会由本线程修改... unsigned int curhead = m_head; curhead = (curhead < INNER_SIZE) ? curhead:0; //由读线程保证,pop操作不会读过界(++m_tail之前,读线程不可能取走m_tail的当前元素); ////////////////////////////////////////////////////////////////////////////////////// unsigned int nexttail = m_tail + 1;//由本写线程防止push操作写过界; nexttail = (nexttail < INNER_SIZE) ? nexttail:0; if ( nexttail == curhead ) { //内部数组已填满; return false; } m_innerArr[m_tail] = pToPush; ++m_tail; m_tail = (m_tail < INNER_SIZE) ? m_tail:0; return true; } ///仅由单个写线程执行,压入元素,压入时通知可能的等待线程,只有读线程可能等待时才调用此方法,否则应调用PushEle; inline bool PushEleNotifyRead( T_Ele* pToPush ) { ////////////////////////////////////////////////////////////////////////////////////// //保留当前快照,防止值在此期间修改(>=INNER_SIZE,但还未来得及修正为0的瞬时) //只需存m_head,因为m_tail只会由本线程修改... unsigned int curhead = m_head; curhead = (curhead < INNER_SIZE) ? curhead:0; //由读线程保证,pop操作不会读过界(++m_tail之前,读线程不可能取走m_tail的当前元素); ////////////////////////////////////////////////////////////////////////////////////// unsigned int nexttail = m_tail + 1;//由本写线程防止push操作写过界; nexttail = (nexttail < INNER_SIZE) ? nexttail:0; if ( nexttail == curhead ) { //内部数组已填满; ++m_writefailnum; if ( m_writefailnum > 500 )//累积500次写失败; { //此时必须再发次消息,以防止此时读线程处于空队列等待状态,而同时本写线程因为已写满而不再写消息,导致死锁; m_writefailnum = 0; #ifndef WIN32 pthread_cond_signal( &(m_sleepcond) );//通知可能等在条件变量上的读线程; #else //WIN32 ::SetEvent( m_sleepevent ); #endif //WIN32 } return false; } m_innerArr[m_tail] = pToPush; ++m_tail; m_tail = (m_tail < INNER_SIZE) ? m_tail:0; int cursize = ( curhead <= m_tail ) ? (m_tail-curhead) : ( m_tail + (INNER_SIZE-curhead) ); if ( cursize >= NOTIFY_BIAS ) { //当前元素数目超过通知限, // 使用此方法时,必须有另外的触发(例如NotifyReadAvailable)确保读线程不会永远等待阈值到达,从而丢失已在队列中的少量元素; // 或者只使用PopEleNullWaitTime,而不要使用PopEleNullWaitInfinite; //通知可能在等待的读线程; #ifndef WIN32 pthread_cond_signal( &(m_sleepcond) );//通知可能等在条件变量上的读线程; #else //WIN32 ::SetEvent( m_sleepevent ); #endif //WIN32 } return true; } ///仅由单个读线程执行,弹出元素,队列空时即时返回空; inline T_Ele* PopEle() { ////////////////////////////////////////////////////////////////////////////////////// //保留当前快照,防止值在此期间修改(>=INNER_SIZE,但还未来得及修正为0的瞬时) //只需存m_tail,因为m_head只会由本线程修改... unsigned int curtail = m_tail; curtail = (curtail < INNER_SIZE) ? curtail:0; //由写线程保证,push操作不会读过界(++m_head之前,写线程不可能写m_head的当前元素); ////////////////////////////////////////////////////////////////////////////////////// if ( m_head == curtail ) { //内部空,无存储元素; return NULL; } T_Ele* pToPop = m_innerArr[m_head]; ++m_head; m_head = (m_head < INNER_SIZE) ? m_head:0; return pToPop; } ///仅由单个读线程执行,弹出元素,队列空时无限期等待, /// 注意:使用此函数时,必须确保写线程会执行PushEleNotifyRead或NotifyReadAvailable,否则读线程会阻死; inline T_Ele* PopEleNullWaitInfinite() { ////////////////////////////////////////////////////////////////////////////////////// //保留当前快照,防止值在此期间修改(>=INNER_SIZE,但还未来得及修正为0的瞬时) //只需存m_tail,因为m_head只会由本线程修改... unsigned int curtail = m_tail; curtail = (curtail < INNER_SIZE) ? curtail:0; //由写线程保证,push操作不会读过界(++m_head之前,写线程不可能写m_head的当前元素); ////////////////////////////////////////////////////////////////////////////////////// //空读睡眠 while ( m_head == curtail ) { if ( m_isstop ) { return NULL;//停止读标记已设置,读线程不应阻在此处; } #ifndef WIN32 pthread_mutex_lock( &(m_foolock) ); pthread_cond_wait( &(m_sleepcond), &(m_foolock) ); pthread_mutex_unlock( &(m_foolock) ); #else //WIN32 ::WaitForSingleObject( m_sleepevent, INFINITE ); #endif //WIN32 ////////////////////////////////////////////////////////////////////////////////////// //保留当前快照,防止值在此期间修改(>=INNER_SIZE,但还未来得及修正为0的瞬时) //只需存m_tail,因为m_head只会由本线程修改... curtail = m_tail; curtail = (curtail < INNER_SIZE) ? curtail:0; //由写线程保证,push操作不会读过界(++m_head之前,写线程不可能写m_head的当前元素); ////////////////////////////////////////////////////////////////////////////////////// } T_Ele* pToPop = m_innerArr[m_head]; ++m_head; m_head = (m_head < INNER_SIZE) ? m_head:0; return pToPop; } ///仅由单个读线程执行,弹出元素,队列空时等待一最小时间间隔后返回以免阻死读线程; /// 注意:使用此函数时,必须确保写线程会执行PushEleNotifyRead或NotifyReadAvailable,否则不能即时在有消息时返回; inline T_Ele* PopEleNullWaitTime() { ////////////////////////////////////////////////////////////////////////////////////// //保留当前快照,防止值在此期间修改(>=INNER_SIZE,但还未来得及修正为0的瞬时) //只需存m_tail,因为m_head只会由本线程修改... unsigned int curtail = m_tail; curtail = (curtail < INNER_SIZE) ? curtail:0; //由写线程保证,push操作不会读过界(++m_head之前,写线程不可能写m_head的当前元素); ////////////////////////////////////////////////////////////////////////////////////// //空读睡眠 if ( m_head == curtail ) { #ifndef WIN32 struct timespec waittotime;//等待至时刻; struct timeval now; gettimeofday(&now, NULL); waittotime.tv_sec = now.tv_sec; waittotime.tv_nsec = now.tv_usec * 1000; waittotime.tv_nsec += 3*1000;/*等待3毫秒*/ pthread_mutex_lock( &(m_foolock) ); pthread_cond_timedwait( &(m_sleepcond), &(m_foolock), &(waittotime) ); pthread_mutex_unlock( &(m_foolock) ); #else //WIN32 ::WaitForSingleObject( m_sleepevent, 3 ); #endif //WIN32 ////////////////////////////////////////////////////////////////////////////////////// //保留当前快照,防止值在此期间修改(>=INNER_SIZE,但还未来得及修正为0的瞬时) //只需存m_tail,因为m_head只会由本线程修改... curtail = m_tail; curtail = (curtail < INNER_SIZE) ? curtail:0; //由写线程保证,push操作不会读过界(++m_head之前,写线程不可能写m_head的当前元素); ////////////////////////////////////////////////////////////////////////////////////// //再次判断队列是否空,若空则返回; if ( m_head == curtail ) { return NULL; } } T_Ele* pToPop = m_innerArr[m_head]; ++m_head; m_head = (m_head < INNER_SIZE) ? m_head:0; return pToPop; } private: bool m_isstop;//停止读和写,准备销毁自身,仅用于永远等待的读线程; private: CThreadRWQue( const CThreadRWQue& ); //屏蔽这两个操作; CThreadRWQue& operator = ( const CThreadRWQue& );//屏蔽这两个操作; private: unsigned int m_head;//头,只由读线程修改; unsigned int m_tail;//尾,只由写线程修改; T_Ele* m_innerArr[INNER_SIZE]; private://用于队列空时,读线程睡眠 #ifndef WIN32 pthread_mutex_t m_foolock;//永远不会锁等待,只是为了使用条件变量; pthread_cond_t m_sleepcond;//等待用条件变量; #else //WIN32 HANDLE m_sleepevent;//等待用事件; #endif //WIN32 private: unsigned int m_writefailnum;//写失败次数,失败次数达到一定程度时,怀疑读线程在等可读信号; };