Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ CVoid GDynamicEngine::parallelRunAll() {

CVoid GDynamicEngine::parallelRunOne(GElementPtr element) {
if (unlikely(cur_status_.isErr())) {
CGRAPH_UNIQUE_LOCK lock(locker_.mtx_);
locker_.cv_.notify_one();
return;
}

Expand Down
14 changes: 7 additions & 7 deletions src/UtilsCtrl/ThreadPool/UThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,38 +64,38 @@ class UThreadPool : public UThreadObject {
/**
* 提交任务信息
* @tparam FunctionType
* @param task
* @param func
* @param index
* @return
*/
template<typename FunctionType>
auto commit(const FunctionType& task,
auto commit(const FunctionType& func,
CIndex index = CGRAPH_DEFAULT_TASK_STRATEGY)
-> std::future<decltype(std::declval<FunctionType>()())>;

/**
* 向特定的线程id中,提交任务信息
* @tparam FunctionType
* @param task
* @param func
* @param tid 线程id。如果超出主线程个数范围,则默认写入pool的通用队列中
* @param enable 是否启用上锁/解锁功能
* @param lockable 上锁(true) / 解锁(false)
* @return
*/
template<typename FunctionType>
auto commitWithTid(const FunctionType& task, CIndex tid, CBool enable, CBool lockable)
auto commitWithTid(const FunctionType& func, CIndex tid, CBool enable, CBool lockable)
-> std::future<decltype(std::declval<FunctionType>()())>;

/**
* 根据优先级,执行任务
* @tparam FunctionType
* @param task
* @param func
* @param priority 优先级别。自然序从大到小依次执行
* @return
* @notice 建议,priority 范围在 [-100, 100] 之间
*/
template<typename FunctionType>
auto commitWithPriority(const FunctionType& task,
auto commitWithPriority(const FunctionType& func,
int priority)
-> std::future<decltype(std::declval<FunctionType>()())>;

Expand All @@ -106,7 +106,7 @@ class UThreadPool : public UThreadObject {
* @param index
*/
template<typename FunctionType>
CVoid execute(const FunctionType& task,
CVoid execute(FunctionType&& task,
CIndex index = CGRAPH_DEFAULT_TASK_STRATEGY);

/**
Expand Down
48 changes: 17 additions & 31 deletions src/UtilsCtrl/ThreadPool/UThreadPool.inl
Original file line number Diff line number Diff line change
Expand Up @@ -14,75 +14,61 @@
CGRAPH_NAMESPACE_BEGIN

template<typename FunctionType>
auto UThreadPool::commit(const FunctionType& task, CIndex index)
auto UThreadPool::commit(const FunctionType& func, CIndex index)
-> std::future<decltype(std::declval<FunctionType>()())> {
using ResultType = decltype(std::declval<FunctionType>()());

std::packaged_task<ResultType()> runnableTask(std::move(task));
std::future<ResultType> result(runnableTask.get_future());
std::packaged_task<ResultType()> task(func);
std::future<ResultType> result(task.get_future());

CIndex realIndex = dispatch(index);
if (realIndex >= 0 && realIndex < config_.default_thread_size_) {
// 如果返回的结果,在主线程数量之间,则放到主线程的queue中执行
primary_threads_[realIndex]->pushTask(std::move(runnableTask));
} else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) {
/**
* 如果是长时间任务,则交给特定的任务队列,仅由辅助线程处理
* 目的是防止有很多长时间任务,将所有运行的线程均阻塞
* 长任务程序,默认优先级较低
**/
priority_task_queue_.push(std::move(runnableTask), CGRAPH_LONG_TIME_TASK_STRATEGY);
} else {
// 返回其他结果,放到pool的queue中执行
task_queue_.push(std::move(runnableTask));
}
execute(std::move(task), index);
return result;
}


template<typename FunctionType>
auto UThreadPool::commitWithTid(const FunctionType& task, CIndex tid, CBool enable, CBool lockable)
auto UThreadPool::commitWithTid(const FunctionType& func, CIndex tid, CBool enable, CBool lockable)
-> std::future<decltype(std::declval<FunctionType>()())> {
using ResultType = decltype(std::declval<FunctionType>()());
std::packaged_task<ResultType()> runnableTask(std::move(task));
std::future<ResultType> result(runnableTask.get_future());
std::packaged_task<ResultType()> task(std::move(func));
std::future<ResultType> result(task.get_future());

if (tid >= 0 && tid < config_.default_thread_size_) {
primary_threads_[tid]->pushTask(std::move(runnableTask), enable, lockable);
primary_threads_[tid]->pushTask(std::move(task), enable, lockable);
} else {
// 如果超出主线程的范围,则默认写入 pool 通用的任务队列中
task_queue_.push(task);
task_queue_.push(func);
}
return result;
}


template<typename FunctionType>
auto UThreadPool::commitWithPriority(const FunctionType& task, int priority)
auto UThreadPool::commitWithPriority(const FunctionType& func, int priority)
-> std::future<decltype(std::declval<FunctionType>()())> {
using ResultType = decltype(std::declval<FunctionType>()());

std::packaged_task<ResultType()> runnableTask(task);
std::future<ResultType> result(runnableTask.get_future());
std::packaged_task<ResultType()> task(func);
std::future<ResultType> result(task.get_future());

if (secondary_threads_.empty()) {
createSecondaryThread(1); // 如果没有开启辅助线程,则直接开启一个
}

priority_task_queue_.push(std::move(runnableTask), priority);
priority_task_queue_.push(std::move(task), priority);
return result;
}


template<typename FunctionType>
CVoid UThreadPool::execute(const FunctionType& task, CIndex index) {
CVoid UThreadPool::execute(FunctionType&& task, CIndex index) {
CIndex realIndex = dispatch(index);
if (realIndex >= 0 && realIndex < config_.default_thread_size_) {
primary_threads_[realIndex]->pushTask(std::move(task));
primary_threads_[realIndex]->pushTask(std::forward<FunctionType>(task));
} else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) {
priority_task_queue_.push(std::move(task), CGRAPH_LONG_TIME_TASK_STRATEGY);
priority_task_queue_.push(std::forward<FunctionType>(task), CGRAPH_LONG_TIME_TASK_STRATEGY);
} else {
task_queue_.push(std::move(task));
task_queue_.push(std::forward<FunctionType>(task));
}
}

Expand Down
Loading