fastdo  0.5.12
threads.hpp
浏览该文件的文档.
1 #ifndef __THREADS_HPP__
2 #define __THREADS_HPP__
3 
4 namespace winux
5 {
6 
7 //=========================================================================================
11 class ThreadSysError : public Error
12 {
13 public:
14  ThreadSysError( int errCode, AnsiString const & errStr ) throw() : Error( errCode, errStr ) { }
16  int getErrCode() const throw() { return this->getErrType(); }
17 };
18 
19 //=========================================================================================
20 // 线程相关错误返回值为errno.h定义的错误码
21 //=========================================================================================
22 
25 {
26 public:
28  explicit ThreadAttr( bool isCreate = true );
29  ~ThreadAttr();
30 
31  ThreadAttr( ThreadAttr && other );
32  ThreadAttr & operator = ( ThreadAttr && other );
33 
35  int create();
36 
38  int destroy();
39 
41  operator bool() const;
42 
43 public:
44  //attributes:
45 
48  {
49  threadCreateJoinable = 0,
50  threadCreateDetached = 1,
51  };
53  void setDetachState( DetachStateType detachState = threadCreateJoinable );
55  DetachStateType getDetachState() const;
56 
57  void setStackSize( size_t stackSize );
58  size_t getStackSize() const;
59 
60 private:
62  friend class Thread;
63 
65 };
66 
69 {
70 public:
71  ThreadId();
72  ~ThreadId();
73  ThreadId( ThreadId const & other );
74  ThreadId & operator = ( ThreadId const & other );
75  ThreadId( ThreadId && other );
76  ThreadId & operator = ( ThreadId && other );
77 
78  bool operator == ( ThreadId const & other ) const;
79  bool operator != ( ThreadId const & other ) const { return !this->operator == (other); }
80 
81 private:
83 
84  friend class Thread;
85 };
86 
87 class ThreadGroup;
90 {
91 public:
92 
94  typedef void * (* ThreadFuncPtr)( void * param );
95 
96 public:
97 
99  static ThreadId Self();
100 
102  static void Exit( void * exitVal = NULL );
103 
107  static void * Join( Thread & otherThread );
108 
109 private:
110  // 默认线程处理函数, param被传入Thread对象, 调用thObj->run()
111  static void * _ThreadDefaultFunc( void * param );
112 
113 public:
118  explicit Thread( bool isStartup = false ) : _attr(false), _exitVal(NULL), _deleter(NULL), _group(NULL), _isRunning(false)
119  {
120  if ( isStartup ) this->startup();
121  }
122 
126  template < typename _Fx, typename... _ArgType >
127  explicit Thread( bool isStartup, _Fx routine, _ArgType&&... arg ) : _attr(false), _exitVal(NULL), _deleter(NULL), _group(NULL), _isRunning(false)
128  {
129  this->setRunable( routine, std::forward<_ArgType>(arg)... );
130  if ( isStartup ) this->startup();
131  }
132 
134  virtual ~Thread() { }
135 
137  Thread( Thread && other );
138 
140  Thread & operator = ( Thread && other );
141 
146  int startup();
147 
149  template < typename _Fx, typename... _ArgType >
150  int startup( _Fx routine, _ArgType&&... arg )
151  {
152  this->setRunable( routine, std::forward<_ArgType>(arg)... );
153  return this->startup();
154  }
155 
157  int startupEx( ThreadFuncPtr startRoutine, void * param );
158 
160  void * joined();
161 
168  int detach();
169 
170 public:
171  //attributes:
172 
174  ThreadAttr & attr();
175 
177  template < typename _Fx, typename... _ArgType >
178  Thread & setRunable( _Fx routine, _ArgType&&... arg )
179  {
180  this->_runable.attachNew( NewRunable( routine, std::forward<_ArgType>(arg)... ) );
181  return *this;
182  }
183 
185  Thread & setDeleter( SimpleDeleterContext * deleter = NULL )
186  {
187  _deleter = deleter;
188  return *this;
189  }
191  Thread & setDefaultDeleter() { return this->setDeleter( new SimpleDefaultDeleterContext<Thread*>(this) ); }
193  template < typename _Dt >
194  Thread & setCustomDeleter( _Dt fn ) { return this->setDeleter( new SimpleCustomDeleterContext< Thread*, _Dt >( this, fn ) ); }
195 
197  void * getExitVal() const { return _exitVal; }
199  Thread & setExitVal( void * exitVal )
200  {
201  _exitVal = exitVal;
202  return *this;
203  }
204 
206  ThreadId get() const
207  {
208  return _threadId;
209  }
210 
211 protected:
213  virtual void run();
214 
215 private:
216  ThreadAttr _attr;
217  void * _exitVal;
218  SimpleDeleterContext * _deleter;
219  ThreadGroup * _group;
220  bool _isRunning;
221  ThreadId _threadId;
222  SimplePointer<Runable> _runable;
223 
224  friend class ThreadGroup;
226 };
227 
228 //=========================================================================================
231 {
232 public:
233  explicit MutexAttr( bool isCreate = true );
234  ~MutexAttr();
235 
236  MutexAttr( MutexAttr && other );
237  MutexAttr & operator = ( MutexAttr && other );
238 
240  int create();
241 
243  int destroy();
244 
246  operator bool() const;
247 
248 private:
250  friend class Mutex;
251 
253 };
254 
257 {
258 public:
259  explicit Mutex( bool isCreate = false );
260  ~Mutex();
261 
262  Mutex( Mutex && other );
263  Mutex & operator = ( Mutex && other );
264 
266  int create();
267 
269  int destroy();
270 
271  bool lock();
272  bool tryLock();
273  bool unlock();
274 
276  MutexAttr & attr();
277 private:
278  MutexAttr _attr;
280  friend class Condition;
281 
283 };
284 
285 //=========================================================================================
288 {
289 public:
290  explicit ConditionAttr( bool isCreate = true );
291  ~ConditionAttr();
292 
293  ConditionAttr( ConditionAttr && other );
294  ConditionAttr & operator = ( ConditionAttr && other );
295 
297  int create();
298 
300  int destroy();
301 
303  operator bool() const;
304 
305 private:
307  friend class Condition;
308 
310 };
311 
314 {
315 public:
316  explicit Condition( bool isCreate = false );
317  ~Condition();
318 
319  Condition( Condition && other );
320  Condition & operator = ( Condition && other );
321 
323  int create();
324 
326  int destroy();
327 
332  bool wait( Mutex & mutex, double sec = -1 );
333 
338  template < typename _Predicate >
339  bool waitUntil( _Predicate pred, Mutex & mutex, double sec = -1 )
340  {
341  while ( !pred() )
342  if ( !this->wait( mutex, sec ) )
343  return pred();
344  return true;
345  }
346 
348  int notify();
349 
351  int notifyAll();
352 
353  ConditionAttr & attr();
354 
355 private:
356  ConditionAttr _attr;
359 };
360 
361 //===========================================================================================
364 {
365 public:
366  explicit TlsKey( void (*destructor)( void *pv ) = NULL );
367  ~TlsKey();
368 
369  TlsKey( TlsKey && other );
370  TlsKey & operator = ( TlsKey && other );
371 
373  int create( void (*destructor)( void *pv ) = NULL );
374 
376  int destroy();
377 
379  void * get() const;
380 
381 private:
383 
384  friend class TlsVar;
386 };
387 
390 {
391 public:
392  explicit TlsVar( TlsKey & tlsKey );
393  ~TlsVar();
394 
395  TlsVar( TlsVar && other );
396  TlsVar & operator = ( TlsVar && other );
397 
398  void * get();
399  void * get() const;
400  void set( void * v );
401 
402  template < typename _Ty >
403  _Ty get() { return reinterpret_cast<_Ty>( this->get() ); }
404  template < typename _Ty >
405  _Ty get() const { return reinterpret_cast<_Ty>( this->get() ); }
406 
407  template < typename _Ty >
408  void set( _Ty v ) { this->set( reinterpret_cast<void*>(v) ); }
409 
410  template < typename _Ty >
411  _Ty & ref() { return *reinterpret_cast<_Ty*>( this->get() ); }
412  template < typename _Ty >
413  _Ty const & ref() const { return *reinterpret_cast<_Ty*>( this->get() ); }
414 
415 private:
416  TlsKey * _tlsKey;
418 };
419 
420 //===========================================================================================
421 
426 {
427 public:
428 
430  ThreadGroup() : _mtxGroup(true), _cdtGroup(true)
431  {
432  }
433 
435  template < typename _Fx, typename... _ArgType >
436  ThreadGroup( int count, _Fx fn, _ArgType&&... arg ) : _mtxGroup(true), _cdtGroup(true)
437  {
438  int i;
439  for ( i = 0; i < count; i++ )
440  {
441  Thread * p = new Thread( false, fn, std::forward<_ArgType>(arg)... );
442  p->_group = this;
443  _threads.emplace_back(p);
444  }
445  }
446 
447  virtual ~ThreadGroup()
448  {
449  }
450 
451  ThreadGroup( ThreadGroup && other ) : _mtxGroup( std::move(other._mtxGroup) ), _cdtGroup( std::move(other._cdtGroup) ), _threads( std::move(other._threads) )
452  {
453  }
454 
455  ThreadGroup & operator = ( ThreadGroup && other )
456  {
457  if ( this != &other )
458  {
459  // 先释放自身
460  this->wait();
461 
462  _mtxGroup = std::move(other._mtxGroup);
463  _cdtGroup = std::move(other._cdtGroup);
464  _threads = std::move(other._threads);
465  }
466  return *this;
467  }
468 
470  ThreadGroup & destroy();
471 
473  template < typename _Fx, typename... _ArgType >
474  ThreadGroup & create( int count, _Fx fn, _ArgType&&... arg )
475  {
476  this->destroy();
477 
478  int i;
479  for ( i = 0; i < count; i++ )
480  {
481  Thread * p = new Thread( false, fn, std::forward<_ArgType>(arg)... );
482  p->_group = this;
483  _threads.emplace_back(p);
484  }
485  return *this;
486  }
487 
489  template < class _ThreadCls >
490  ThreadGroup & create( int count )
491  {
492  this->destroy();
493 
494  int i;
495  for ( i = 0; i < count; i++ )
496  {
497  Thread * p = new _ThreadCls();
498  p->_group = this;
499  _threads.emplace_back(p);
500  }
501  return *this;
502  }
503 
505  ThreadGroup & startup();
506 
510  bool wait( double sec = -1 );
511 
512 private:
513  static void * _ThreadGroupDefaultFunc( void * param );
514 
515  Mutex _mtxGroup; // 互斥量保护数据
516  Condition _cdtGroup; // 用于判断组中线程是否全部运行完毕
517  std::vector< SimplePointer<Thread> > _threads;
518 
520 };
521 
523 
524 template < typename _Ty >
525 class Task;
526 class ThreadPool;
527 
529 struct TaskCtx
530 {
532  {
535  taskStop
536  };
537 
538  Mutex mtxTask; // 互斥量保护数据
539  Condition cdtTask; // 用于判断运行状态
540  TaskStatus status; // 运行状态
541  ThreadPool * pool; // 相关线程池
543  SimplePointer<Runable> poolRunable; // 投递到线程池的例程
544 
546  bool wait( double sec = -1 )
547  {
548  ScopeGuard guard(mtxTask);
549  return cdtTask.waitUntil( [this] () { return this->status == TaskCtx::taskStop; }, mtxTask, sec );
550  }
551 
553  void updateStatus( TaskStatus st, bool isNotifyAll = false )
554  {
555  ScopeGuard guard(mtxTask);
556  this->status = st;
557  if ( isNotifyAll ) cdtTask.notifyAll(); // 唤醒所有等待此任务的线程
558  }
559 
561  void rePost();
562 protected:
563  TaskCtx() : mtxTask(true), cdtTask(true), status(taskPending), pool(nullptr) { }
564  virtual ~TaskCtx() { }
565 };
566 
567 template < typename _Ty >
568 struct TaskCtxT : public TaskCtx
569 {
570  _Ty val;
571 
572  template < typename... _ArgType >
573  static SharedPointer<TaskCtxT> Create( _ArgType&& ... arg )
574  {
575  SharedPointer<TaskCtxT> p( new TaskCtxT( std::forward<_ArgType>(arg)... ) );
576  p->weakThis = p;
577  return p;
578  }
579 
580  _Ty get()
581  {
582  this->wait();
583  return val;
584  }
585 
586  void exec( RunableInvoker<_Ty> * ivk )
587  {
588  this->val = ivk->invoke();
589  }
590 protected:
591  TaskCtxT() { }
592 };
593 
594 template <>
595 struct TaskCtxT<void> : public TaskCtx
596 {
597  template < typename... _ArgType >
598  static SharedPointer<TaskCtxT> Create( _ArgType&& ... arg )
599  {
600  SharedPointer<TaskCtxT> p( new TaskCtxT( std::forward<_ArgType>(arg)... ) );
601  p->weakThis = p;
602  return p;
603  }
604 
605  void get()
606  {
607  this->wait();
608  }
609 
610  void exec( RunableInvoker<void> * ivk )
611  {
612  ivk->invoke();
613  }
614 protected:
615  TaskCtxT() { }
616 };
617 
620 {
621 public:
622  // 线程池任务等待模式
624  {
626  modeWaitTimeRePost
627  };
633  explicit ThreadPool( int threadCount = 4, ThreadPoolMode mode = modeWaitKeep, double durationSec = 0.1 ) : _mtxPool(true), _cdtPool(true), _poolStop(false), _mode(mode), _durationSec(durationSec)
634  {
635  _group.create( threadCount, [this] () {
636  while ( !_poolStop )
637  {
638  SharedPointer<TaskCtx> taskCtx;
639  {
640  ScopeGuard guard(_mtxPool);
641  while ( _queueTask.empty() && !_poolStop )
642  {
643  _cdtPool.wait(_mtxPool);
644  }
645  if ( !_queueTask.empty() ) // 如果队列不空
646  {
647  taskCtx = _queueTask.front();
648  _queueTask.pop();
649 
650  _cdtPool.notifyAll(); // 唤醒所有线程池线程
651  }
652  }
653 
654  if ( taskCtx )
655  {
656  taskCtx->updateStatus( TaskCtx::taskRunning, false );
657  taskCtx->poolRunable->run();
658  }
659  }
660  } );
661  _group.startup(); // 启动线程组的线程
662  }
663 
664  virtual ~ThreadPool()
665  {
666  this->whenEmptyStopAndWait();
667  }
668 
670  template < typename _Fx, typename... _ArgType >
671  Task<typename FuncTraits<_Fx>::ReturnType> task( _Fx fn, _ArgType&&... arg )
672  {
673  return Task<typename FuncTraits<_Fx>::ReturnType>( this, fn, std::forward<_ArgType>(arg)... );
674  }
675 
677  void stop()
678  {
679  ScopeGuard guard(_mtxPool);
680  _poolStop = true;
681  _cdtPool.notifyAll(); // 唤醒所有线程池线程
682  }
683 
685  bool wait( double sec = -1 )
686  {
687  return _group.wait(sec); // 等待全部线程退出
688  }
689 
692  {
693  if ( true )
694  {
695  ScopeGuard guard(_mtxPool);
696  _cdtPool.waitUntil( [this] () { return _queueTask.empty(); }, _mtxPool );
697  }
698 
699  this->stop();
700  this->wait();
701  }
702 
704  size_t getTaskCount() const
705  {
706  ScopeGuard guard( const_cast<Mutex &>(_mtxPool) );
707  return _queueTask.size();
708  }
709 
711  double getDurationSec() const { return _durationSec; }
712 
713 private:
714  // 投递一个任务
715  void _postTask( SharedPointer<TaskCtx> taskCtx )
716  {
717  ScopeGuard guard(_mtxPool);
718  _queueTask.push(taskCtx);
719  _cdtPool.notify(); // 通知一个等待中的线程
720  }
721 
722  Mutex _mtxPool;
723  Condition _cdtPool; // 用于判断队列是否有数据
724  bool _poolStop; // 线程池停止
725  ThreadPoolMode _mode; // 等待子任务的模式
726  double _durationSec; // 等待的时间
727  ThreadGroup _group;
728  std::queue< SharedPointer<TaskCtx> > _queueTask;
729 
730  friend struct TaskCtx;
731  template < typename _Ty0 >
732  friend class Task;
733 
735 };
736 
737 // partial TaskCtx
738 inline void TaskCtx::rePost()
739 {
740  if ( !weakThis.expired() )
741  {
742  SharedPointer<TaskCtx> p = weakThis.lock();
743  p->status = taskPending;
744  pool->_postTask(p);
745  }
746 }
747 
749 template < typename _Ty >
750 class Task
751 {
752 public:
753  using ReturnType = _Ty;
754 
756  template < typename _Fx, typename... _ArgType >
757  Task( ThreadPool * pool, _Fx fnRoutine, _ArgType&& ... argRoutine )
758  {
759  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
760  _taskCtx = TaskCtxT<ReturnType>::Create();
761  _taskCtx->pool = pool;
762  _taskCtx->status = TaskCtx::taskPending;
763 
764  auto routine = MakeSimple( NewRunable( fnRoutine, std::forward<_ArgType>(argRoutine)... ) );
765  _taskCtx->poolRunable.attachNew( NewRunable( [routine] ( TaskCtxT<ReturnType> * taskCtx ) {
766  taskCtx->exec( routine.get() );
767  // 更新运行状态并通知唤醒等待在此任务的线程
768  taskCtx->updateStatus( TaskCtx::taskStop, true );
769  }, _taskCtx.get() ) );
770 
771  // 添加任务到线程池
772  _taskCtx->pool->_postTask(_taskCtx);
773  }
774 
776  template < typename _Fx, typename... _ArgType >
777  Task( SharedPointer< TaskCtxT<void> > waitTaskCtx, _Fx fnRoutine, _ArgType&& ... argRoutine )
778  {
779  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
780  _taskCtx = TaskCtxT<ReturnType>::Create();
781  _taskCtx->pool = waitTaskCtx->pool;
782  _taskCtx->status = TaskCtx::taskPending;
783 
784  auto routine = MakeSimple( NewRunable( fnRoutine, std::forward<_ArgType>(argRoutine)... ) );
785  _taskCtx->poolRunable.attachNew( NewRunable( [routine] ( SharedPointer< TaskCtxT<void> > waitTaskCtx, TaskCtxT<ReturnType> * taskCtx ) {
786  ThreadPool * pool = taskCtx->pool;
787  if ( pool->_mode == ThreadPool::modeWaitKeep )
788  {
789  waitTaskCtx->wait();
790  taskCtx->exec( routine.get() );
791  // 更新运行状态并通知唤醒等待在此任务的线程
792  taskCtx->updateStatus( TaskCtx::taskStop, true );
793  }
794  else if ( pool->_mode == ThreadPool::modeWaitTimeRePost )
795  {
796  if ( waitTaskCtx->wait(pool->_durationSec) )
797  {
798  taskCtx->exec( routine.get() );
799  // 更新运行状态并通知唤醒等待在此任务的线程
800  taskCtx->updateStatus( TaskCtx::taskStop, true );
801  }
802  else
803  {
804  // 再投递
805  taskCtx->rePost();
806  }
807  }
808  }, waitTaskCtx, _taskCtx.get() ) );
809 
810  // 添加任务到线程池
811  _taskCtx->pool->_postTask(_taskCtx);
812  }
813 
815  template < typename _Fx, typename... _ArgType >
816  Task( SharedPointer< TaskCtxT<void> > waitTaskCtx, _Fx fnRoutine, typename FuncTraits<_Fx>::ClassType * obj, _ArgType&& ... argRoutine )
817  {
818  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
819  _taskCtx = TaskCtxT<ReturnType>::Create();
820  _taskCtx->pool = waitTaskCtx->pool;
821  _taskCtx->status = TaskCtx::taskPending;
822 
823  auto routine = MakeSimple( NewRunable( fnRoutine, obj, std::forward<_ArgType>(argRoutine)... ) );
824  _taskCtx->poolRunable.attachNew( NewRunable( [routine] ( SharedPointer< TaskCtxT<void> > waitTaskCtx, TaskCtxT<ReturnType> * taskCtx ) {
825  ThreadPool * pool = taskCtx->pool;
826  if ( pool->_mode == ThreadPool::modeWaitKeep )
827  {
828  waitTaskCtx->wait();
829  taskCtx->exec( routine.get() );
830  // 更新运行状态并通知唤醒等待在此任务的线程
831  taskCtx->updateStatus( TaskCtx::taskStop, true );
832  }
833  else if ( pool->_mode == ThreadPool::modeWaitTimeRePost )
834  {
835  if ( waitTaskCtx->wait(pool->_durationSec) )
836  {
837  taskCtx->exec( routine.get() );
838  // 更新运行状态并通知唤醒等待在此任务的线程
839  taskCtx->updateStatus( TaskCtx::taskStop, true );
840  }
841  else
842  {
843  // 再投递
844  taskCtx->rePost();
845  }
846  }
847  }, waitTaskCtx, _taskCtx.get() ) );
848 
849  // 添加任务到线程池
850  _taskCtx->pool->_postTask(_taskCtx);
851  }
852 
854  template < typename _Ty2, typename _Fx, typename... _ArgType >
855  Task( SharedPointer< TaskCtxT<_Ty2> > waitTaskCtx, _Fx fnRoutine, _ArgType&& ... argRoutine )
856  {
857  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
858  _taskCtx = TaskCtxT<ReturnType>::Create();
859  _taskCtx->pool = waitTaskCtx->pool;
860  _taskCtx->status = TaskCtx::taskPending;
861 
862  auto routine = MakeSimple( NewRunable( fnRoutine, _Ty2(), std::forward<_ArgType>(argRoutine)... ) );
863  _taskCtx->poolRunable.attachNew( NewRunable( [routine] ( SharedPointer< TaskCtxT<_Ty2> > waitTaskCtx, TaskCtxT<ReturnType> * taskCtx ) {
864  ThreadPool * pool = taskCtx->pool;
865  if ( pool->_mode == ThreadPool::modeWaitKeep )
866  {
867  waitTaskCtx->wait();
868  std::get<0>(routine->_tuple) = std::move(waitTaskCtx->val);
869  taskCtx->exec( routine.get() );
870  // 更新运行状态并通知唤醒等待在此任务的线程
871  taskCtx->updateStatus( TaskCtx::taskStop, true );
872  }
873  else if ( pool->_mode == ThreadPool::modeWaitTimeRePost )
874  {
875  if ( waitTaskCtx->wait(pool->_durationSec) )
876  {
877  std::get<0>(routine->_tuple) = std::move(waitTaskCtx->val);
878  taskCtx->exec( routine.get() );
879  // 更新运行状态并通知唤醒等待在此任务的线程
880  taskCtx->updateStatus( TaskCtx::taskStop, true );
881  }
882  else
883  {
884  // 再投递
885  taskCtx->rePost();
886  }
887  }
888  }, waitTaskCtx, _taskCtx.get() ) );
889 
890  // 添加任务到线程池
891  _taskCtx->pool->_postTask(_taskCtx);
892  }
893 
895  template < typename _Ty2, typename _Fx, typename... _ArgType >
896  Task( SharedPointer< TaskCtxT<_Ty2> > waitTaskCtx, _Fx fnRoutine, typename FuncTraits<_Fx>::ClassType * obj, _ArgType&& ... argRoutine )
897  {
898  static_assert( std::is_same< ReturnType, typename FuncTraits<_Fx>::ReturnType >::value , "FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
899  _taskCtx = TaskCtxT<ReturnType>::Create();
900  _taskCtx->pool = waitTaskCtx->pool;
901  _taskCtx->status = TaskCtx::taskPending;
902 
903  auto routine = MakeSimple( NewRunable( fnRoutine, obj, _Ty2(), std::forward<_ArgType>(argRoutine)... ) );
904  _taskCtx->poolRunable.attachNew( NewRunable( [routine] ( SharedPointer< TaskCtxT<_Ty2> > waitTaskCtx, TaskCtxT<ReturnType> * taskCtx ) {
905  ThreadPool * pool = taskCtx->pool;
906  if ( pool->_mode == ThreadPool::modeWaitKeep )
907  {
908  waitTaskCtx->wait();
909  std::get<1>(routine->_tuple) = std::move(waitTaskCtx->val);
910  taskCtx->exec( routine.get() );
911  // 更新运行状态并通知唤醒等待在此任务的线程
912  taskCtx->updateStatus( TaskCtx::taskStop, true );
913  }
914  else if ( pool->_mode == ThreadPool::modeWaitTimeRePost )
915  {
916  if ( waitTaskCtx->wait(pool->_durationSec) )
917  {
918  std::get<1>(routine->_tuple) = std::move(waitTaskCtx->val);
919  taskCtx->exec( routine.get() );
920  // 更新运行状态并通知唤醒等待在此任务的线程
921  taskCtx->updateStatus( TaskCtx::taskStop, true );
922  }
923  else
924  {
925  // 再投递
926  taskCtx->rePost();
927  }
928  }
929  }, waitTaskCtx, _taskCtx.get() ) );
930 
931  // 添加任务到线程池
932  _taskCtx->pool->_postTask(_taskCtx);
933  }
934 
935  virtual ~Task()
936  {
937  }
938 
940  template < typename _Fx, typename... _ArgType >
941  Task<typename FuncTraits<_Fx>::ReturnType> then( _Fx fn, _ArgType&& ... arg )
942  {
943  return Task<typename FuncTraits<_Fx>::ReturnType>( _taskCtx, fn, std::forward<_ArgType>(arg)... );
944  }
945 
947  void wait( double sec = -1 )
948  {
949  _taskCtx->wait(sec);
950  }
951 
954  {
955  return _taskCtx->get();
956  }
957 
958 private:
960 
961  template < typename _Ty0 >
962  friend class Task;
963 };
964 
965 
966 }
967 
968 #endif // __THREADS_HPP__
Thread(bool isStartup, _Fx routine, _ArgType &&...arg)
构造函数2
Definition: threads.hpp:127
_Ty ReturnType
Definition: threads.hpp:753
ThreadGroup(ThreadGroup &&other)
Definition: threads.hpp:451
同步锁对象接口
Definition: system.hpp:164
ThreadGroup & create(int count, _Fx fn, _ArgType &&...arg)
按指定的线程处理例程,创建一定数量的线程
Definition: threads.hpp:474
SimplePointer< Runable > poolRunable
Definition: threads.hpp:543
void stop()
主动停止线程池运行
Definition: threads.hpp:677
ThreadGroup(int count, _Fx fn, _ArgType &&...arg)
构造函数2 提供一个线程处理例程,并指定创建的线程数量
Definition: threads.hpp:436
void whenEmptyStopAndWait()
当任务队列为空,就停止线程池运行,并等待线程组线程正常退出
Definition: threads.hpp:691
Condition cdtTask
Definition: threads.hpp:539
Simple默认删除器场景
Definition: smartptr.hpp:45
#define WINUX_DLL
Definition: utilities.hpp:57
std::basic_string< char > AnsiString
Definition: utilities.hpp:165
size_t getTaskCount() const
队列里的任务数
Definition: threads.hpp:704
void * getExitVal() const
取得退出值
Definition: threads.hpp:197
ThreadPool * pool
Definition: threads.hpp:541
Simple自定义删除器场景
Definition: smartptr.hpp:58
一直等待上一个任务
Definition: threads.hpp:625
static SharedPointer< TaskCtxT > Create(_ArgType &&...arg)
Definition: threads.hpp:598
Task(SharedPointer< TaskCtxT< _Ty2 > > waitTaskCtx, _Fx fnRoutine, typename FuncTraits< _Fx >::ClassType *obj, _ArgType &&...argRoutine)
Ctor3-2 等待一个任务结束并把其返回值移动给一个新的任务
Definition: threads.hpp:896
Thread & setCustomDeleter(_Dt fn)
设置自定义删除器场景以便默认线程函数删除Thread对象自己
Definition: threads.hpp:194
virtual ~TaskCtx()
Definition: threads.hpp:564
Task(SharedPointer< TaskCtxT< void > > waitTaskCtx, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor2-1 等待一个任务结束并创建一个新的任务
Definition: threads.hpp:777
Task(ThreadPool *pool, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor1 创建一个任务,需要提供一个线程池
Definition: threads.hpp:757
等待一定时间后重投
Definition: threads.hpp:626
Thread & setDefaultDeleter()
设置默认删除器场景以便默认线程函数删除Thread对象自己
Definition: threads.hpp:191
STL namespace.
bool wait(double sec=-1)
wait(sec>0)等待一定的时间长用于等待任务运行。当调用stop()后,wait(sec<0)等待线程组线程全部正常退出 ...
Definition: threads.hpp:685
ThreadGroup & create(int count)
创建一定数量指定的派生类线程
Definition: threads.hpp:490
Thread(bool isStartup=false)
构造函数1
Definition: threads.hpp:118
Task< typename FuncTraits< _Fx >::ReturnType > task(_Fx fn, _ArgType &&...arg)
创建一个新任务
Definition: threads.hpp:671
任务数据场景
Definition: threads.hpp:529
条件变量
Definition: threads.hpp:313
void updateStatus(TaskStatus st, bool isNotifyAll=false)
更新运行状态
Definition: threads.hpp:553
_Ty & ref()
Definition: threads.hpp:411
Simple删除器场景基类
Definition: smartptr.hpp:23
void exec(RunableInvoker< void > *ivk)
Definition: threads.hpp:610
#define DISABLE_OBJECT_COPY(clsname)
Definition: utilities.hpp:78
virtual ~ThreadGroup()
Definition: threads.hpp:447
Task< typename FuncTraits< _Fx >::ReturnType > then(_Fx fn, _ArgType &&...arg)
等待本任务结束并开启下一个新任务,把返回值传给新任务作参数
Definition: threads.hpp:941
互斥量
Definition: threads.hpp:256
互斥量属性
Definition: threads.hpp:230
bool wait(double sec=-1)
等待任务结束
Definition: threads.hpp:546
double getDurationSec() const
等待的时间(秒)
Definition: threads.hpp:711
ThreadPool(int threadCount=4, ThreadPoolMode mode=modeWaitKeep, double durationSec=0.1)
构造函数1
Definition: threads.hpp:633
条件变量属性
Definition: threads.hpp:287
作用域范围保护
Definition: system.hpp:174
virtual ~ThreadPool()
Definition: threads.hpp:664
线程组
Definition: threads.hpp:425
virtual ~Task()
Definition: threads.hpp:935
TaskStatus status
Definition: threads.hpp:540
bool waitUntil(_Predicate pred, Mutex &mutex, double sec=-1)
等待谓词条件达成
Definition: threads.hpp:339
ThreadSysError(int errCode, AnsiString const &errStr)
Definition: threads.hpp:14
ThreadGroup()
构造函数1 默认
Definition: threads.hpp:430
TLS Var.
Definition: threads.hpp:389
线程
Definition: threads.hpp:89
Thread & setRunable(_Fx routine, _ArgType &&...arg)
设置Runable,run()默认会调用它
Definition: threads.hpp:178
线程池,创建一组线程等待着从任务队列中获取任务执行
Definition: threads.hpp:619
线程相关错误
Definition: threads.hpp:11
线程ID
Definition: threads.hpp:68
代表投递到线程池的一个任务,用于等待执行完毕获取返回值或者接着投递下一个任务
Definition: threads.hpp:525
int getErrCode() const
获取错误代码
Definition: threads.hpp:16
Thread & setExitVal(void *exitVal)
设置退出值,可在run()中调用
Definition: threads.hpp:199
virtual int getErrType() const
Definition: utilities.hpp:422
static SharedPointer< TaskCtxT > Create(_ArgType &&...arg)
Definition: threads.hpp:573
void wait(double sec=-1)
等待任务执行完毕
Definition: threads.hpp:947
错误类
Definition: utilities.hpp:413
SimplePointer< _Ty > MakeSimple(_Ty *newObj)
Definition: smartptr.hpp:908
int notifyAll()
通知所有正在wait()中的线程醒来
TLS Key.
Definition: threads.hpp:363
Thread & setDeleter(SimpleDeleterContext *deleter=NULL)
设置删除器场景以便默认线程函数删除Thread对象自己
Definition: threads.hpp:185
WeakPointer< TaskCtx > weakThis
Definition: threads.hpp:542
Task(SharedPointer< TaskCtxT< void > > waitTaskCtx, _Fx fnRoutine, typename FuncTraits< _Fx >::ClassType *obj, _ArgType &&...argRoutine)
Ctor2-2 等待一个任务结束并创建一个新的任务
Definition: threads.hpp:816
Task(SharedPointer< TaskCtxT< _Ty2 > > waitTaskCtx, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor3-1 等待一个任务结束并把其返回值移动给一个新的任务
Definition: threads.hpp:855
DetachStateType
分离状态类型
Definition: threads.hpp:47
virtual ~Thread()
析构函数
Definition: threads.hpp:134
void rePost()
重新投入线程池队列中
Definition: threads.hpp:738
void exec(RunableInvoker< _Ty > *ivk)
Definition: threads.hpp:586
_Ty const & ref() const
Definition: threads.hpp:413
线程属性
Definition: threads.hpp:24
跨平台基础功能库
Definition: archives.hpp:7
int startup(_Fx routine, _ArgType &&...arg)
实际创建一个线程,提供你自己的处理例程
Definition: threads.hpp:150