Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ CVoid GElement::dumpElement(std::ostream& oss) {
}


CVoid GElement::dumpElementHeader(std::ostream& oss) {
CVoid GElement::dumpElementHeader(std::ostream& oss) const {
oss << 'p' << this << "[label=\"";
if (this->name_.empty()) {
oss << 'p' << this; // 如果没有名字,则通过当前指针位置来代替
Expand All @@ -423,7 +423,7 @@ CVoid GElement::dumpElementHeader(std::ostream& oss) {
}


CVoid GElement::dumpPerfInfo(std::ostream& oss) {
CVoid GElement::dumpPerfInfo(std::ostream& oss) const {
if (perf_info_ && perf_info_->loop_ > 0) {
// 包含 perf信息的情况
oss << "\n";
Expand Down
4 changes: 2 additions & 2 deletions src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,14 @@ class GElement : public GElementObject,
* @param oss
* @return
*/
CVoid dumpElementHeader(std::ostream& oss);
CVoid dumpElementHeader(std::ostream& oss) const;

/**
* graphviz dump perf逻辑
* @param oss
* @return
*/
CVoid dumpPerfInfo(std::ostream& oss);
CVoid dumpPerfInfo(std::ostream& oss) const;

/**
* 判断是否进入 suspend 状态。如果是的话,则等待恢复。未进入 suspend 状态,则继续运行
Expand Down
6 changes: 3 additions & 3 deletions src/UtilsCtrl/Container/USmallVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

CGRAPH_NAMESPACE_BEGIN

template<class T, CSize CAPACITY=8>
template<class T, CSize CAPACITY=4>
class USmallVector : public UtilsObject {
public:
explicit USmallVector() {
Expand Down Expand Up @@ -51,7 +51,7 @@ class USmallVector : public UtilsObject {
data_ = curData;
capacity_ = curCapacity;
}
data_[cur_index_++] = val;
data_[cur_index_++] = std::move(val);
}

CSize size() const {
Expand Down Expand Up @@ -107,7 +107,7 @@ class USmallVector : public UtilsObject {
* @return
*/
std::vector<T> asVector() const {
std::vector<T> vec;
std::vector<T> vec{};
if (!data_) {
return vec;
}
Expand Down
2 changes: 1 addition & 1 deletion src/UtilsCtrl/ThreadPool/Queue/UWorkStealingQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class UWorkStealingQueue : public UQueueObject {
* @param lockable
* @return
*/
CVoid push(T&& value, CBool enable, CBool lockable) {
CVoid push(T&& value, const CBool enable, const CBool lockable) {
if (enable && lockable) {
mutex_.lock();
}
Expand Down
12 changes: 6 additions & 6 deletions src/UtilsCtrl/ThreadPool/Task/UTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,23 @@ class UTask : public CStruct {
// 退化以获得实际类型,修改思路参考:https://github.com/ChunelFeng/CThreadPool/pull/3
template<typename F, typename T = typename std::decay<F>::type>
struct TaskDerided : TaskBased {
T func_;
T func_ {};
explicit TaskDerided(F&& func) : func_(std::forward<F>(func)) {}
CVoid call() final { func_(); }
};

public:
template<typename F>
UTask(F&& func, int priority = 0)
UTask(F&& func, const int priority = 0)
: impl_(new TaskDerided<F>(std::forward<F>(func)))
, priority_(priority) {}

CVoid operator()() {
CVoid operator()() const {
// impl_ 理论上不可能为空
impl_->call();
}

UTask() = default;
explicit UTask() = default;

UTask(UTask&& task) noexcept:
impl_(std::move(task.impl_)),
Expand All @@ -66,8 +66,8 @@ class UTask : public CStruct {
CGRAPH_NO_ALLOWED_COPY(UTask)

private:
std::unique_ptr<TaskBased> impl_ = nullptr;
CInt priority_ = 0; // 任务的优先级信息
std::unique_ptr<TaskBased> impl_ { nullptr };
CInt priority_ { 0 }; // 任务的优先级信息
};


Expand Down
4 changes: 2 additions & 2 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class UThreadBase : public UThreadObject {
* 执行单个任务
* @param task
*/
CVoid runTask(UTask& task) {
CVoid runTask(const UTask& task) {
is_running_ = true;
task();
total_task_num_++;
Expand All @@ -91,7 +91,7 @@ class UThreadBase : public UThreadObject {
* 批量执行任务
* @param tasks
*/
CVoid runTasks(UTaskArr& tasks) {
CVoid runTasks(const UTaskArr& tasks) {
is_running_ = true;
for (auto& task : tasks) {
task();
Expand Down
33 changes: 16 additions & 17 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class UThreadPrimary : public UThreadBase {
* @param poolThreads
* @param config
*/
CStatus setThreadPoolInfo(int index,
CStatus setThreadPoolInfo(const int index,
UAtomicQueue<UTask>* poolTaskQueue,
std::vector<UThreadPrimary *>* poolThreads,
UThreadPoolConfigPtr config) {
const UThreadPoolConfigPtr config) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false) // 初始化之前,设置参数
CGRAPH_ASSERT_NOT_NULL(poolTaskQueue, poolThreads, config)
Expand Down Expand Up @@ -77,7 +77,7 @@ class UThreadPrimary : public UThreadBase {
* 理论不会走到这个判断逻辑里面
*/
if (std::any_of(pool_threads_->begin(), pool_threads_->end(),
[](UThreadPrimary* thd) {
[](const UThreadPrimary* thd) {
return nullptr == thd;
})) {
CGRAPH_RETURN_ERROR_STATUS("primary thread is null")
Expand Down Expand Up @@ -146,7 +146,7 @@ class UThreadPrimary : public UThreadBase {
* @param lockable true 的时候需要上锁,false 的时候会解锁
* @return
*/
CVoid pushTask(UTask&& task, CBool enable, CBool lockable) {
CVoid pushTask(UTask&& task, const CBool enable, const CBool lockable) {
secondary_queue_.push(std::move(task), enable, lockable); // 通过 second 写入,主要是方便其他的thread 进行steal操作
if (enable && !lockable) {
cur_empty_epoch_ = 0;
Expand All @@ -161,8 +161,7 @@ class UThreadPrimary : public UThreadBase {
* @return
*/
CBool popTask(UTaskRef task) {
const auto& result = primary_queue_.tryPop(task) || secondary_queue_.tryPop(task);
return result;
return primary_queue_.tryPop(task) || secondary_queue_.tryPop(task);
}


Expand All @@ -173,7 +172,7 @@ class UThreadPrimary : public UThreadBase {
*/
CBool popTask(UTaskArrRef tasks) {
CBool result = primary_queue_.tryPop(tasks, config_->max_local_batch_size_);
const auto& leftSize = config_->max_local_batch_size_ - tasks.size();
const auto leftSize = config_->max_local_batch_size_ - tasks.size();
if (leftSize > 0) {
// 如果凑齐了,就不需要了。没凑齐的话,就继续
result |= (secondary_queue_.tryPop(tasks, leftSize));
Expand Down Expand Up @@ -201,7 +200,7 @@ class UThreadPrimary : public UThreadBase {
* 待窃取相邻的数量,不能超过默认primary线程数
*/
CBool result = false;
for (auto& target : steal_targets_) {
for (const auto target : steal_targets_) {
/**
* 从线程中周围的thread中,窃取任务。
* 如果成功,则返回true,并且执行任务。
Expand Down Expand Up @@ -230,12 +229,12 @@ class UThreadPrimary : public UThreadBase {
}

CBool result = false;
for (const auto& target : steal_targets_) {
for (const auto target : steal_targets_) {
if (likely((*pool_threads_)[target])) {
result = ((*pool_threads_)[target])->secondary_queue_.trySteal(tasks, config_->max_steal_batch_size_);
const auto& leftSize = config_->max_steal_batch_size_ - tasks.size();
const auto leftSize = config_->max_steal_batch_size_ - tasks.size();
if (leftSize > 0) {
result |= ((*pool_threads_)[target])->primary_queue_.trySteal(tasks, leftSize);
result |= (*pool_threads_)[target]->primary_queue_.trySteal(tasks, leftSize);
}

if (result) {
Expand Down Expand Up @@ -268,12 +267,12 @@ class UThreadPrimary : public UThreadBase {
}

private:
CInt index_ {0}; // 线程index
CInt cur_empty_epoch_ {0}; // 当前空转的轮数信息
UWorkStealingQueue<UTask> primary_queue_; // 内部队列信息
UWorkStealingQueue<UTask> secondary_queue_; // 第二个队列,用于减少触锁概率,提升性能
std::vector<UThreadPrimary *>* pool_threads_; // 用于存放线程池中的线程信息
std::vector<CInt> steal_targets_; // 被偷的目标信息
CInt index_ {0}; // 线程index
CInt cur_empty_epoch_ {0}; // 当前空转的轮数信息
UWorkStealingQueue<UTask> primary_queue_ {}; // 内部队列信息
UWorkStealingQueue<UTask> secondary_queue_ {}; // 第二个队列,用于减少触锁概率,提升性能
std::vector<UThreadPrimary *>* pool_threads_ {}; // 用于存放线程池中的线程信息
std::vector<CInt> steal_targets_ {}; // 被偷的目标信息

friend class UThreadPool;
friend class CAllocator;
Expand Down
8 changes: 4 additions & 4 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class UThreadSecondary : public UThreadBase {
*/
CStatus setThreadPoolInfo(UAtomicQueue<UTask>* poolTaskQueue,
UAtomicPriorityQueue<UTask>* poolPriorityTaskQueue,
UThreadPoolConfigPtr config) {
const UThreadPoolConfigPtr config) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false) // 初始化之前,设置参数
CGRAPH_ASSERT_NOT_NULL(poolTaskQueue, poolPriorityTaskQueue, config)
Expand Down Expand Up @@ -92,8 +92,8 @@ class UThreadSecondary : public UThreadBase {
* @return
* @notice 目的是降低cpu的占用率
*/
CVoid waitRunTask(CMSec ms) {
auto task = this->pool_task_queue_->popWithTimeout(ms);
CVoid waitRunTask(const CMSec ms) {
const auto& task = pool_task_queue_->popWithTimeout(ms);
if (nullptr != task) {
runTask(*task);
}
Expand All @@ -116,7 +116,7 @@ class UThreadSecondary : public UThreadBase {
}

private:
CSec cur_ttl_ = 0; // 当前最大生存周期
CSec cur_ttl_ { 0 }; // 当前最大生存周期

friend class UThreadPool;
};
Expand Down
30 changes: 16 additions & 14 deletions src/UtilsCtrl/ThreadPool/UThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

CGRAPH_NAMESPACE_BEGIN

UThreadPool::UThreadPool(CBool autoInit, const UThreadPoolConfig& config) noexcept {
UThreadPool::UThreadPool(const CBool autoInit, const UThreadPoolConfig& config) noexcept {
cur_index_ = 0;
is_init_ = false;
this->setConfig(config); // setConfig 函数,用在 is_init_ 设定之后
Expand Down Expand Up @@ -55,7 +55,7 @@ CStatus UThreadPool::init() {
monitor_thread_ = std::thread(&UThreadPool::monitor, this);
}
thread_record_map_.clear();
thread_record_map_[(CSize)std::hash<std::thread::id>{}(std::this_thread::get_id())] = CGRAPH_MAIN_THREAD_ID;
thread_record_map_[std::hash<std::thread::id>{}(std::this_thread::get_id())] = CGRAPH_MAIN_THREAD_ID;
task_queue_.setup();
primary_threads_.reserve(config_.default_thread_size_);
for (int i = 0; i < config_.default_thread_size_; i++) {
Expand All @@ -72,8 +72,8 @@ CStatus UThreadPool::init() {
*/
for (int i = 0; i < config_.default_thread_size_; i++) {
status += primary_threads_[i]->init();
thread_record_map_[(CSize)std::hash<std::thread::id>{}(primary_threads_[i]->thread_.get_id())] = i;
}
thread_record_map_[std::hash<std::thread::id>{}(primary_threads_[i]->thread_.get_id())] = i;
}
CGRAPH_FUNCTION_CHECK_STATUS

/**
Expand All @@ -89,7 +89,7 @@ CStatus UThreadPool::init() {
}


CStatus UThreadPool::submit(const UTaskGroup& taskGroup, CMSec ttl) {
CStatus UThreadPool::submit(const UTaskGroup& taskGroup, const CMSec ttl) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(true)

Expand All @@ -100,7 +100,7 @@ CStatus UThreadPool::submit(const UTaskGroup& taskGroup, CMSec ttl) {
}

// 计算最终运行时间信息
auto deadline = std::chrono::steady_clock::now()
const auto& deadline = std::chrono::steady_clock::now()
+ std::chrono::milliseconds(std::min(taskGroup.getTtl(), ttl));

for (auto& fut : futures) {
Expand All @@ -121,15 +121,15 @@ CStatus UThreadPool::submit(const UTaskGroup& taskGroup, CMSec ttl) {
}


CStatus UThreadPool::submit(CGRAPH_DEFAULT_CONST_FUNCTION_REF func, CMSec ttl,
CStatus UThreadPool::submit(CGRAPH_DEFAULT_CONST_FUNCTION_REF func, const CMSec ttl,
CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished) {
return submit(UTaskGroup(func, ttl, onFinished));
}


CIndex UThreadPool::getThreadIndex(CSize tid) {
CIndex UThreadPool::getThreadIndex(const CSize tid) {
int index = CGRAPH_SECONDARY_THREAD_COMMON_ID;
auto result = thread_record_map_.find(tid);
const auto result = thread_record_map_.find(tid);
if (result != thread_record_map_.end()) {
index = result->second;
}
Expand All @@ -145,7 +145,8 @@ CStatus UThreadPool::destroy() {
}

// primary 线程是普通指针,需要delete
for (auto &pt : primary_threads_) {
for (const auto& pt : primary_threads_) {
CGRAPH_ASSERT_NOT_NULL(pt);
status += pt->destroy();
}
CGRAPH_FUNCTION_CHECK_STATUS
Expand All @@ -165,6 +166,7 @@ CStatus UThreadPool::destroy() {
// secondary 线程是智能指针,不需要delete
task_queue_.reset();
for (auto &st : secondary_threads_) {
CGRAPH_ASSERT_NOT_NULL(st.get());
status += st->destroy();
}
CGRAPH_FUNCTION_CHECK_STATUS
Expand Down Expand Up @@ -204,7 +206,7 @@ CStatus UThreadPool::releaseSecondaryThread(CInt size) {
}


CIndex UThreadPool::dispatch(CIndex origIndex) {
CIndex UThreadPool::dispatch(const CIndex origIndex) {
CIndex realIndex = 0;
if (CGRAPH_DEFAULT_TASK_STRATEGY == origIndex) {
realIndex = cur_index_++;
Expand All @@ -219,7 +221,7 @@ CIndex UThreadPool::dispatch(CIndex origIndex) {
}


CStatus UThreadPool::createSecondaryThread(CInt size) {
CStatus UThreadPool::createSecondaryThread(const CInt size) {
CGRAPH_FUNCTION_BEGIN

const int leftSize = static_cast<int>(config_.max_thread_size_ - config_.default_thread_size_ - secondary_threads_.size());
Expand Down Expand Up @@ -250,7 +252,7 @@ CVoid UThreadPool::monitor() {
}

// 如果 primary线程都在执行,则表示忙碌
bool busy = !primary_threads_.empty() && std::all_of(primary_threads_.begin(), primary_threads_.end(),
const bool busy = !primary_threads_.empty() && std::all_of(primary_threads_.begin(), primary_threads_.end(),
[](UThreadPrimaryPtr ptr) { return ptr && ptr->is_running_; });

// 如果忙碌或者priority_task_queue_中有任务,则需要添加 secondary线程
Expand All @@ -267,7 +269,7 @@ CVoid UThreadPool::monitor() {
}


CVoid UThreadPool::wakeupAllThread() {
CVoid UThreadPool::wakeupAllThread() const {
for (auto& pt : primary_threads_) {
pt->wakeup();
}
Expand Down
3 changes: 1 addition & 2 deletions src/UtilsCtrl/ThreadPool/UThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include <map>
#include <future>
#include <thread>
#include <algorithm>
#include <memory>
#include <functional>

Expand Down Expand Up @@ -180,7 +179,7 @@ class UThreadPool : public UThreadObject {
* 通知所有thread 开启
* @return
*/
CVoid wakeupAllThread();
CVoid wakeupAllThread() const;

protected:
/**
Expand Down
4 changes: 2 additions & 2 deletions src/UtilsCtrl/ThreadPool/UThreadPool.inl
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ auto UThreadPool::commitWithPriority(const FunctionType& func, int priority)

template<typename FunctionType>
CVoid UThreadPool::execute(FunctionType&& task, const CIndex index) {
const CIndex& realIndex = dispatch(index);
if (realIndex >= 0 && realIndex < config_.default_thread_size_) {
const CIndex realIndex = dispatch(index);
if (likely(realIndex >= 0 && realIndex < config_.default_thread_size_)) {
primary_threads_[realIndex]->pushTask(std::forward<FunctionType>(task));
} else if (CGRAPH_LONG_TIME_TASK_STRATEGY == realIndex) {
priority_task_queue_.push(std::forward<FunctionType>(task), CGRAPH_LONG_TIME_TASK_STRATEGY);
Expand Down
Loading