Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,13 @@ GElementPtr GElement::setThreadPool(UThreadPoolPtr ptr) {
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(ptr)
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
this->thread_pool_ = ptr;
setThreadPoolEx(ptr);
return this;
}


GElementPtr GElement::setThreadPoolEx(UThreadPoolPtr ptr) {
(void)(ptr);
return this;
}

Expand Down
6 changes: 6 additions & 0 deletions src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ class GElement : public GElementObject,
*/
GElement* setThreadPool(UThreadPoolPtr ptr);

/**
* 设置线程池信息,供个别有 manager 的 group使用
* @return
*/
virtual GElement* setThreadPoolEx(UThreadPoolPtr ptr);

/**
* graphviz dump 逻辑
* @param oss
Expand Down
22 changes: 12 additions & 10 deletions src/GraphCtrl/GraphElement/GElementRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,26 @@ CStatus GElementRepository::pushAllState(const GElementState& state) {
}


CVoid GElementRepository::fetch(GElementManagerCPtr em) {
CVoid GElementRepository::fetchAll(GElementManagerCPtr em) {
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(em)
for (GElementPtr cur : em->manager_elements_) {
/**
* 从 pipeline 的 element manager 中,逐层添加查询
* 查询到如果pipeline中,存在没有注册到 repo 中element,则写入 repo中
* 主要针对 python 注册场景中 直接创建 element 放入group 的场景
*/
if (this->find(cur)) {
continue;
}
fetch(cur);
}
}


if (cur->isGGroup()) {
auto group = dynamic_cast<GGroupPtr>(cur);
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(group);
group->pushElements(elements_);
} else {
elements_.insert(cur);
CVoid GElementRepository::fetch(GElementPtr element) {
elements_.insert(element);
if (element->isGGroup()) {
auto group = dynamic_cast<GGroupPtr>(element);
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(group)
for (auto* cur : group->group_elements_arr_) {
fetch(cur);
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/GraphCtrl/GraphElement/GElementRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ class GElementRepository : public GElementObject {
* @return
* @notice 部分element 随着 group注册进来(特别是 python版本中),在这里做一次查缺补漏
*/
CVoid fetch(GElementManagerCPtr em);
CVoid fetchAll(GElementManagerCPtr em);

/**
* 递归同步所有element 的信息到 repo 中
* @param element
* @return
*/
CVoid fetch(GElementPtr element);

/**
* 用于判断是否是出于退出状态
Expand Down
8 changes: 0 additions & 8 deletions src/GraphCtrl/GraphElement/GGroup/GGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,6 @@ CBool GGroup::isSeparate(GElementCPtr a, GElementCPtr b) const {
}


CVoid GGroup::pushElements(GElementPtrSet& repo) {
repo.insert(this);
for (auto* cur : group_elements_arr_) {
repo.insert(cur);
}
}


CStatus GGroup::__addGElements_4py(const GElementPtrArr& elements) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
Expand Down
7 changes: 0 additions & 7 deletions src/GraphCtrl/GraphElement/GGroup/GGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,6 @@ class GGroup : public GElement {
*/
virtual CBool isSeparate(GElementCPtr a, GElementCPtr b) const;

/**
* 将group内部的所有element(包含子group中的)写入repo中
* @param repo
* @return
*/
CVoid pushElements(GElementPtrSet& repo);

private:
GElementPtrArr group_elements_arr_; // 存放 element的数组

Expand Down
7 changes: 7 additions & 0 deletions src/GraphCtrl/GraphElement/GGroup/GMutable/GMutable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ CBool GMutable::isSerializable() const {
}


GElementPtr GMutable::setThreadPoolEx(UThreadPoolPtr ptr) {
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(manager_, ptr)
manager_->setThreadPool(ptr);
return this;
}


CVoid GMutable::setup() {
for (auto* element : group_elements_arr_) {
element->run_before_.clear();
Expand Down
2 changes: 2 additions & 0 deletions src/GraphCtrl/GraphElement/GGroup/GMutable/GMutable.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class GMutable : public GGroup {
*/
CVoid setup();

GElementPtr setThreadPoolEx(UThreadPoolPtr ptr) final;

private:
GElementManagerPtr manager_ = nullptr;

Expand Down
11 changes: 8 additions & 3 deletions src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ GRegion::~GRegion() {
CStatus GRegion::init() {
CGRAPH_FUNCTION_BEGIN
// 在这里将初始化所有的节点信息,并且实现分析,联通等功能
CGRAPH_ASSERT_NOT_NULL(thread_pool_, manager_)
CGRAPH_ASSERT_NOT_NULL(manager_)

// 在region中,需要专门的调度逻辑
this->manager_->setThreadPool(thread_pool_);
status = this->manager_->init();
CGRAPH_FUNCTION_CHECK_STATUS

Expand Down Expand Up @@ -68,6 +66,13 @@ CStatus GRegion::addElementEx(GElementPtr element) {
}


GElementPtr GRegion::setThreadPoolEx(UThreadPoolPtr ptr) {
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(manager_, ptr)
manager_->setThreadPool(ptr);
return this;
}


GRegionPtr GRegion::setGEngineType(GEngineType type) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(manager_)
Expand Down
2 changes: 2 additions & 0 deletions src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class GRegion : public GGroup {

CStatus addElementEx(GElementPtr element) final;

GElementPtr setThreadPoolEx(UThreadPoolPtr ptr) final;

CBool isSeparate(GElementCPtr a, GElementCPtr b) const final;

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ CVoid GDynamicEngine::analysisDagType(const GSortedGElementPtrSet& elements) {

CVoid GDynamicEngine::analysisParallelMatrix() {
parallel_element_matrix_.clear();
const auto& config = thread_pool_->getConfig();
const auto& config = thread_pool_ ? thread_pool_->getConfig() : UThreadPoolConfig();
CSize thdSize = config.default_thread_size_ + config.secondary_thread_size_;
CGRAPH_THROW_EXCEPTION_BY_CONDITION(thdSize <= 0,
"default thread size cannot smaller than 1");
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphPipeline/GPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ CStatus GPipeline::initEnv() {
element_manager_->setThreadPool(tp);

// 设置所有的element 中的thread_pool
repository_.fetch(element_manager_);
repository_.fetchAll(element_manager_);
repository_.setThreadPool(tp);
status += repository_.init();
CGRAPH_FUNCTION_END
Expand Down
Loading