Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 9 additions & 28 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,12 @@ class UThreadPrimary : public UThreadBase {


/**
* 依次push到任一队列里。如果都失败,则yield,然后重新push
* 依次push到队列里。如果失败,则yield,然后重新push
* @param task
* @return
*/
CVoid pushTask(UTask&& task) {
while (!(primary_queue_.tryPush(std::move(task))
|| secondary_queue_.tryPush(std::move(task)))) {
while (!wsq_.tryPush(std::move(task))) {
CGRAPH_YIELD();
}
cur_empty_epoch_ = 0;
Expand All @@ -147,7 +146,7 @@ class UThreadPrimary : public UThreadBase {
* @return
*/
CVoid pushTask(UTask&& task, const CBool enable, const CBool lockable) {
secondary_queue_.push(std::move(task), enable, lockable); // 通过 second 写入,主要是方便其他的thread 进行steal操作
wsq_.push(std::move(task), enable, lockable);
if (enable && !lockable) {
cur_empty_epoch_ = 0;
cv_.notify_one();
Expand All @@ -161,7 +160,7 @@ class UThreadPrimary : public UThreadBase {
* @return
*/
CBool popTask(UTaskRef task) {
return primary_queue_.tryPop(task) || secondary_queue_.tryPop(task);
return wsq_.tryPop(task);
}


Expand All @@ -171,13 +170,7 @@ class UThreadPrimary : public UThreadBase {
* @return
*/
CBool popTask(UTaskArrRef tasks) {
CBool result = primary_queue_.tryPop(tasks, config_->max_local_batch_size_);
const auto leftSize = config_->max_local_batch_size_ - tasks.size();
if (leftSize > 0) {
// 如果凑齐了,就不需要了。没凑齐的话,就继续
result |= (secondary_queue_.tryPop(tasks, leftSize));
}
return result;
return wsq_.tryPop(tasks, config_->max_local_batch_size_);
}


Expand All @@ -195,20 +188,14 @@ class UThreadPrimary : public UThreadBase {
return false;
}

/**
* 窃取的时候,仅从相邻的primary线程中窃取
* 待窃取相邻的数量,不能超过默认primary线程数
*/
CBool result = false;
for (const auto target : steal_targets_) {
/**
* 从线程中周围的thread中,窃取任务。
* 如果成功,则返回true,并且执行任务。
* steal 的时候,先从第二个队列里偷,从而降低触碰锁的概率
*/
if (likely((*pool_threads_)[target])
&& (((*pool_threads_)[target])->secondary_queue_.trySteal(task)
|| ((*pool_threads_)[target])->primary_queue_.trySteal(task))) {
&& ((*pool_threads_)[target])->wsq_.trySteal(task)) {
result = true;
break;
}
Expand All @@ -231,16 +218,11 @@ class UThreadPrimary : public UThreadBase {
CBool result = false;
for (const auto target : steal_targets_) {
if (likely((*pool_threads_)[target])) {
result = ((*pool_threads_)[target])->secondary_queue_.trySteal(tasks, config_->max_steal_batch_size_);
const auto leftSize = config_->max_steal_batch_size_ - tasks.size();
if (leftSize > 0) {
result |= (*pool_threads_)[target]->primary_queue_.trySteal(tasks, leftSize);
}

result = ((*pool_threads_)[target])->wsq_.trySteal(tasks, config_->max_steal_batch_size_);
if (result) {
/**
* 在这里,我们对模型进行了简化。实现的思路是:
* 尝试从邻居主线程(先secondary,再primary)中,获取 x(=max_steal_batch_size_) 个task,
* 尝试从邻居主线程中,获取 x(=max_steal_batch_size_) 个task,
* 如果从某一个邻居中,获取了 y(<=x) 个task,则也终止steal的流程
* 且如果如果有一次批量steal成功,就认定成功
*/
Expand Down Expand Up @@ -269,8 +251,7 @@ class UThreadPrimary : public UThreadBase {
private:
CInt index_ {0}; // 线程index
CInt cur_empty_epoch_ {0}; // 当前空转的轮数信息
UWorkStealingQueue<UTask> primary_queue_ {}; // 内部队列信息
UWorkStealingQueue<UTask> secondary_queue_ {}; // 第二个队列,用于减少触锁概率,提升性能
UWorkStealingQueue<UTask> wsq_ {}; // 内部队列信息
std::vector<UThreadPrimary *>* pool_threads_ {}; // 用于存放线程池中的线程信息
std::vector<CInt> steal_targets_ {}; // 被偷的目标信息

Expand Down
Loading