1 #ifndef __THREADS_HPP__ 2 #define __THREADS_HPP__ 41 operator bool()
const;
49 threadCreateJoinable = 0,
50 threadCreateDetached = 1,
53 void setDetachState(
DetachStateType detachState = threadCreateJoinable );
57 void setStackSize(
size_t stackSize );
58 size_t getStackSize()
const;
78 bool operator == (
ThreadId const & other )
const;
79 bool operator != (
ThreadId const & other )
const {
return !this->operator == (other); }
94 typedef void * (* ThreadFuncPtr)(
void * param );
102 static void Exit(
void * exitVal = NULL );
107 static void * Join(
Thread & otherThread );
111 static void * _ThreadDefaultFunc(
void * param );
118 explicit Thread(
bool isStartup =
false ) : _attr(false), _exitVal(NULL), _deleter(NULL), _group(NULL), _isRunning(false)
120 if ( isStartup ) this->startup();
126 template <
typename _Fx,
typename... _ArgType >
127 explicit Thread(
bool isStartup, _Fx routine, _ArgType&&... arg ) : _attr(false), _exitVal(NULL), _deleter(NULL), _group(NULL), _isRunning(false)
129 this->setRunable( routine, std::forward<_ArgType>(arg)... );
130 if ( isStartup ) this->startup();
149 template <
typename _Fx,
typename... _ArgType >
152 this->setRunable( routine, std::forward<_ArgType>(arg)... );
153 return this->startup();
157 int startupEx( ThreadFuncPtr startRoutine,
void * param );
177 template <
typename _Fx,
typename... _ArgType >
180 this->_runable.attachNew( NewRunable( routine, std::forward<_ArgType>(arg)... ) );
193 template <
typename _Dt >
233 explicit MutexAttr(
bool isCreate =
true );
246 operator bool()
const;
259 explicit Mutex(
bool isCreate =
false );
303 operator bool()
const;
316 explicit Condition(
bool isCreate =
false );
332 bool wait(
Mutex & mutex,
double sec = -1 );
338 template <
typename _Predicate >
342 if ( !this->wait( mutex, sec ) )
366 explicit TlsKey(
void (*destructor)(
void *pv ) = NULL );
373 int create(
void (*destructor)(
void *pv ) = NULL );
400 void set(
void * v );
402 template <
typename _Ty >
403 _Ty
get() {
return reinterpret_cast<_Ty
>( this->
get() ); }
404 template <
typename _Ty >
405 _Ty
get()
const {
return reinterpret_cast<_Ty
>( this->
get() ); }
407 template <
typename _Ty >
408 void set( _Ty v ) { this->
set(
reinterpret_cast<void*
>(v) ); }
410 template <
typename _Ty >
411 _Ty &
ref() {
return *
reinterpret_cast<_Ty*
>( this->
get() ); }
412 template <
typename _Ty >
413 _Ty
const &
ref()
const {
return *
reinterpret_cast<_Ty*
>( this->
get() ); }
435 template <
typename _Fx,
typename... _ArgType >
436 ThreadGroup(
int count, _Fx fn, _ArgType&&... arg ) : _mtxGroup(true), _cdtGroup(true)
439 for ( i = 0; i < count; i++ )
441 Thread * p =
new Thread(
false, fn, std::forward<_ArgType>(arg)... );
443 _threads.emplace_back(p);
457 if (
this != &other )
462 _mtxGroup = std::move(other._mtxGroup);
463 _cdtGroup = std::move(other._cdtGroup);
464 _threads = std::move(other._threads);
473 template <
typename _Fx,
typename... _ArgType >
479 for ( i = 0; i < count; i++ )
481 Thread * p =
new Thread(
false, fn, std::forward<_ArgType>(arg)... );
483 _threads.emplace_back(p);
489 template <
class _ThreadCls >
495 for ( i = 0; i < count; i++ )
497 Thread * p =
new _ThreadCls();
499 _threads.emplace_back(p);
510 bool wait(
double sec = -1 );
513 static void * _ThreadGroupDefaultFunc(
void * param );
517 std::vector< SimplePointer<Thread> > _threads;
524 template <
typename _Ty >
563 TaskCtx() : mtxTask(true), cdtTask(true), status(taskPending), pool(nullptr) { }
567 template <
typename _Ty >
572 template <
typename... _ArgType >
586 void exec( RunableInvoker<_Ty> * ivk )
588 this->val = ivk->invoke();
597 template <
typename... _ArgType >
610 void exec( RunableInvoker<void> * ivk )
633 explicit ThreadPool(
int threadCount = 4,
ThreadPoolMode mode = modeWaitKeep,
double durationSec = 0.1 ) : _mtxPool(true), _cdtPool(true), _poolStop(false), _mode(mode), _durationSec(durationSec)
635 _group.create( threadCount, [
this] () {
641 while ( _queueTask.empty() && !_poolStop )
643 _cdtPool.wait(_mtxPool);
645 if ( !_queueTask.empty() )
647 taskCtx = _queueTask.front();
650 _cdtPool.notifyAll();
657 taskCtx->poolRunable->run();
666 this->whenEmptyStopAndWait();
670 template <
typename _Fx,
typename... _ArgType >
681 _cdtPool.notifyAll();
687 return _group.wait(sec);
696 _cdtPool.waitUntil( [
this] () {
return _queueTask.empty(); }, _mtxPool );
706 ScopeGuard guard( const_cast<Mutex &>(_mtxPool) );
707 return _queueTask.size();
718 _queueTask.push(taskCtx);
728 std::queue< SharedPointer<TaskCtx> > _queueTask;
731 template <
typename _Ty0 >
740 if ( !weakThis.expired() )
743 p->status = taskPending;
749 template <
typename _Ty >
756 template <
typename _Fx,
typename... _ArgType >
759 static_assert( std::is_same<
ReturnType,
typename FuncTraits<_Fx>::ReturnType >::value ,
"FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
761 _taskCtx->pool = pool;
764 auto routine =
MakeSimple( NewRunable( fnRoutine, std::forward<_ArgType>(argRoutine)... ) );
766 taskCtx->
exec( routine.get() );
769 }, _taskCtx.get() ) );
772 _taskCtx->pool->_postTask(_taskCtx);
776 template <
typename _Fx,
typename... _ArgType >
779 static_assert( std::is_same<
ReturnType,
typename FuncTraits<_Fx>::ReturnType >::value ,
"FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
781 _taskCtx->pool = waitTaskCtx->pool;
784 auto routine =
MakeSimple( NewRunable( fnRoutine, std::forward<_ArgType>(argRoutine)... ) );
790 taskCtx->exec( routine.get() );
792 taskCtx->updateStatus( TaskCtx::taskStop, true );
796 if ( waitTaskCtx->wait(pool->_durationSec) )
798 taskCtx->exec( routine.get() );
800 taskCtx->updateStatus( TaskCtx::taskStop, true );
808 }, waitTaskCtx, _taskCtx.get() ) );
811 _taskCtx->pool->_postTask(_taskCtx);
815 template <
typename _Fx,
typename... _ArgType >
818 static_assert( std::is_same<
ReturnType,
typename FuncTraits<_Fx>::ReturnType >::value ,
"FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
820 _taskCtx->pool = waitTaskCtx->pool;
823 auto routine =
MakeSimple( NewRunable( fnRoutine, obj, std::forward<_ArgType>(argRoutine)... ) );
829 taskCtx->exec( routine.get() );
831 taskCtx->updateStatus( TaskCtx::taskStop, true );
835 if ( waitTaskCtx->wait(pool->_durationSec) )
837 taskCtx->exec( routine.get() );
839 taskCtx->updateStatus( TaskCtx::taskStop, true );
847 }, waitTaskCtx, _taskCtx.get() ) );
850 _taskCtx->pool->_postTask(_taskCtx);
854 template <
typename _Ty2,
typename _Fx,
typename... _ArgType >
857 static_assert( std::is_same<
ReturnType,
typename FuncTraits<_Fx>::ReturnType >::value ,
"FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
859 _taskCtx->pool = waitTaskCtx->pool;
862 auto routine =
MakeSimple( NewRunable( fnRoutine, _Ty2(), std::forward<_ArgType>(argRoutine)... ) );
868 std::get<0>(routine->_tuple) = std::move(waitTaskCtx->val);
869 taskCtx->exec( routine.get() );
871 taskCtx->updateStatus( TaskCtx::taskStop, true );
875 if ( waitTaskCtx->wait(pool->_durationSec) )
877 std::get<0>(routine->_tuple) = std::move(waitTaskCtx->val);
878 taskCtx->exec( routine.get() );
880 taskCtx->updateStatus( TaskCtx::taskStop, true );
888 }, waitTaskCtx, _taskCtx.get() ) );
891 _taskCtx->pool->_postTask(_taskCtx);
895 template <
typename _Ty2,
typename _Fx,
typename... _ArgType >
898 static_assert( std::is_same<
ReturnType,
typename FuncTraits<_Fx>::ReturnType >::value ,
"FuncTraits<_Fx>::ReturnType is not match Task<_Ty>." );
900 _taskCtx->pool = waitTaskCtx->pool;
903 auto routine =
MakeSimple( NewRunable( fnRoutine, obj, _Ty2(), std::forward<_ArgType>(argRoutine)... ) );
909 std::get<1>(routine->_tuple) = std::move(waitTaskCtx->val);
910 taskCtx->exec( routine.get() );
912 taskCtx->updateStatus( TaskCtx::taskStop, true );
916 if ( waitTaskCtx->wait(pool->_durationSec) )
918 std::get<1>(routine->_tuple) = std::move(waitTaskCtx->val);
919 taskCtx->exec( routine.get() );
921 taskCtx->updateStatus( TaskCtx::taskStop, true );
929 }, waitTaskCtx, _taskCtx.get() ) );
932 _taskCtx->pool->_postTask(_taskCtx);
940 template <
typename _Fx,
typename... _ArgType >
955 return _taskCtx->get();
961 template <
typename _Ty0 >
968 #endif // __THREADS_HPP__
Thread(bool isStartup, _Fx routine, _ArgType &&...arg)
构造函数2
ThreadGroup(ThreadGroup &&other)
ThreadGroup & create(int count, _Fx fn, _ArgType &&...arg)
按指定的线程处理例程,创建一定数量的线程
SimplePointer< Runable > poolRunable
ThreadGroup(int count, _Fx fn, _ArgType &&...arg)
构造函数2 提供一个线程处理例程,并指定创建的线程数量
void whenEmptyStopAndWait()
当任务队列为空,就停止线程池运行,并等待线程组线程正常退出
std::basic_string< char > AnsiString
size_t getTaskCount() const
队列里的任务数
void * getExitVal() const
取得退出值
static SharedPointer< TaskCtxT > Create(_ArgType &&...arg)
Task(SharedPointer< TaskCtxT< _Ty2 > > waitTaskCtx, _Fx fnRoutine, typename FuncTraits< _Fx >::ClassType *obj, _ArgType &&...argRoutine)
Ctor3-2 等待一个任务结束并把其返回值移动给一个新的任务
Thread & setCustomDeleter(_Dt fn)
设置自定义删除器场景以便默认线程函数删除Thread对象自己
Task(SharedPointer< TaskCtxT< void > > waitTaskCtx, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor2-1 等待一个任务结束并创建一个新的任务
Task(ThreadPool *pool, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor1 创建一个任务,需要提供一个线程池
Thread & setDefaultDeleter()
设置默认删除器场景以便默认线程函数删除Thread对象自己
bool wait(double sec=-1)
wait(sec>0)等待一定的时间长用于等待任务运行。当调用stop()后,wait(sec<0)等待线程组线程全部正常退出 ...
ThreadGroup & create(int count)
创建一定数量指定的派生类线程
Thread(bool isStartup=false)
构造函数1
Task< typename FuncTraits< _Fx >::ReturnType > task(_Fx fn, _ArgType &&...arg)
创建一个新任务
void updateStatus(TaskStatus st, bool isNotifyAll=false)
更新运行状态
void exec(RunableInvoker< void > *ivk)
#define DISABLE_OBJECT_COPY(clsname)
Task< typename FuncTraits< _Fx >::ReturnType > then(_Fx fn, _ArgType &&...arg)
等待本任务结束并开启下一个新任务,把返回值传给新任务作参数
bool wait(double sec=-1)
等待任务结束
double getDurationSec() const
等待的时间(秒)
ThreadPool(int threadCount=4, ThreadPoolMode mode=modeWaitKeep, double durationSec=0.1)
构造函数1
bool waitUntil(_Predicate pred, Mutex &mutex, double sec=-1)
等待谓词条件达成
ThreadSysError(int errCode, AnsiString const &errStr)
Thread & setRunable(_Fx routine, _ArgType &&...arg)
设置Runable,run()默认会调用它
线程池,创建一组线程等待着从任务队列中获取任务执行
代表投递到线程池的一个任务,用于等待执行完毕获取返回值或者接着投递下一个任务
int getErrCode() const
获取错误代码
Thread & setExitVal(void *exitVal)
设置退出值,可在run()中调用
virtual int getErrType() const
static SharedPointer< TaskCtxT > Create(_ArgType &&...arg)
void wait(double sec=-1)
等待任务执行完毕
SimplePointer< _Ty > MakeSimple(_Ty *newObj)
int notifyAll()
通知所有正在wait()中的线程醒来
Thread & setDeleter(SimpleDeleterContext *deleter=NULL)
设置删除器场景以便默认线程函数删除Thread对象自己
WeakPointer< TaskCtx > weakThis
Task(SharedPointer< TaskCtxT< void > > waitTaskCtx, _Fx fnRoutine, typename FuncTraits< _Fx >::ClassType *obj, _ArgType &&...argRoutine)
Ctor2-2 等待一个任务结束并创建一个新的任务
Task(SharedPointer< TaskCtxT< _Ty2 > > waitTaskCtx, _Fx fnRoutine, _ArgType &&...argRoutine)
Ctor3-1 等待一个任务结束并把其返回值移动给一个新的任务
void exec(RunableInvoker< _Ty > *ivk)
int startup(_Fx routine, _ArgType &&...arg)
实际创建一个线程,提供你自己的处理例程