Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions example/E01-AutoPilot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ struct DetectResultGParam : public GParam {
class CameraGDaemon : public GDaemon {
public:
CVoid daemonTask(GDaemonParamPtr param) override {
ImageMParam image;
image.frame_id_ = cur_index_;
std::shared_ptr<ImageMParam> image(new ImageMParam());
image->frame_id_ = cur_index_;
std::string info = "this is " + std::to_string(cur_index_) + " image";
memcpy(image.image_buf_, info.c_str(), info.length());
memcpy(image->image_buf_, info.c_str(), info.length());
cur_index_++;

CGRAPH_PUB_MPARAM(ImageMParam, EXAMPLE_IMAGE_TOPIC, image, GMessagePushStrategy::WAIT);
Expand All @@ -78,7 +78,7 @@ class LaneDetectorGNode : public GNode {
}

CStatus run() override {
std::unique_ptr<ImageMParam> image = nullptr;
std::shared_ptr<ImageMParam> image = nullptr;
auto status = CGRAPH_SUB_MPARAM(ImageMParam, conn_id_, image);
if (status.isErr()) {
return status;
Expand All @@ -105,7 +105,7 @@ class CarDetectorGNode : public GNode {
}

CStatus run() override {
std::unique_ptr<ImageMParam> image = nullptr;
std::shared_ptr<ImageMParam> image = nullptr;
auto status = CGRAPH_SUB_MPARAM(ImageMParam, conn_id_, image);
if (status.isErr()) {
return status;
Expand Down
8 changes: 4 additions & 4 deletions example/E03-ThirdFlow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class InputGNode : public GNode {
public:
CStatus run() override {
for (int i = 0; i < 30; i++) {
std::unique_ptr<InputMParam> input(new InputMParam());
std::shared_ptr<InputMParam> input(new InputMParam());
randomSleep(1, 5); // 间隔1~6ms,发送一次
input->num_ = std::abs((int)std::random_device{}()) % 5 + 1;
CGRAPH_SEND_MPARAM(InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT);
Expand All @@ -52,14 +52,14 @@ class ProcessGNode : public GNode {
public:
CStatus run() override {
while (true) {
std::unique_ptr<InputMParam> input = nullptr;
std::shared_ptr<InputMParam> input = nullptr;
auto status = CGRAPH_RECV_MPARAM_WITH_TIMEOUT(InputMParam, INPUT_TOPIC_NAME, input, 1000);
if (status.isErr()) {
break; // 一阵子收不到消息了,就自动停止好了
}

int ms = randomSleep(1, 100); // 模拟处理流程,随机休息不超过 100ms
std::unique_ptr<ResultMParam> result(new ResultMParam);
std::shared_ptr<ResultMParam> result(new ResultMParam);
switch (input->num_) {
case 1: result->eng_info_ = "one"; break;
case 2: result->eng_info_ = "two"; break;
Expand All @@ -82,7 +82,7 @@ class ResultGNode : public GNode {
public:
CStatus run() override {
while (true) {
std::unique_ptr<ResultMParam> result = nullptr;
std::shared_ptr<ResultMParam> result = nullptr;
auto status = CGRAPH_RECV_MPARAM_WITH_TIMEOUT(ResultMParam, RESULT_TOPIC_NAME, result, 1000);
if (status.isErr()) {
break;
Expand Down
29 changes: 2 additions & 27 deletions src/GraphCtrl/GraphMessage/GMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,6 @@ class GMessage : public GMessageObject {
queue_.clear();
}

/**
* 写入参数
* @tparam TImpl
* @param value
* @param strategy
* @return
*/
template<class TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CVoid send(const TImpl& value, GMessagePushStrategy strategy) {
queue_.push(value, strategy);
}

/**
* 写入智能指针类型的参数
* @tparam TImpl
Expand All @@ -53,22 +40,10 @@ class GMessage : public GMessageObject {
*/
template<class TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CVoid send(std::unique_ptr<TImpl>& value, GMessagePushStrategy strategy) {
CVoid send(const std::shared_ptr<TImpl>& value, GMessagePushStrategy strategy) {
queue_.push(value, strategy);
}

/**
* 获取参数
* @param value
* @param timeout
* @return
*/
template<class TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus recv(TImpl& value, CMSec timeout) {
return queue_.waitPopWithTimeout(value, timeout);
}

/**
* 通过智能指针的方式传递
* @tparam TImpl
Expand All @@ -78,7 +53,7 @@ class GMessage : public GMessageObject {
*/
template<class TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus recv(std::unique_ptr<TImpl>& value, CMSec timeout) {
CStatus recv(std::shared_ptr<TImpl>& value, CMSec timeout) {
return queue_.waitPopWithTimeout(value, timeout);
}

Expand Down
83 changes: 4 additions & 79 deletions src/GraphCtrl/GraphMessage/GMessageManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,6 @@ class GMessageManager : public GMessageObject,
CGRAPH_FUNCTION_END
}

/**
* 根据传入的topic,获得信息
* @tparam TImpl
* @param topic
* @param value
* @param timeout
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus recvTopicValue(const std::string& topic,
TImpl& value,
CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}

auto message = (GMessagePtr<TImpl>)(result->second);
CGRAPH_ASSERT_NOT_NULL(message);

status = message->recv(value, timeout);
CGRAPH_FUNCTION_END
}

/**
* 根据传入的topic,获得信息。仅针对传入智能指针的情况
* @tparam TImpl
Expand All @@ -113,7 +86,7 @@ class GMessageManager : public GMessageObject,
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus recvTopicValue(const std::string& topic,
std::unique_ptr<TImpl>& value,
std::shared_ptr<TImpl>& value,
CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
Expand All @@ -129,33 +102,6 @@ class GMessageManager : public GMessageObject,
CGRAPH_FUNCTION_END
}

/**
* 根据传入的topic,输入信息
* @tparam TImpl
* @param topic
* @param value
* @param strategy
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus sendTopicValue(const std::string& topic,
const TImpl& value,
GMessagePushStrategy strategy) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}

auto message = static_cast<GMessagePtr<T> >(result->second);
CGRAPH_ASSERT_NOT_NULL(message);

message->send(value, strategy);
CGRAPH_FUNCTION_END
}

/**
* 根据传入的topic,输入智能指针类型的信息
* @tparam TImpl
Expand All @@ -167,7 +113,7 @@ class GMessageManager : public GMessageObject,
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus sendTopicValue(const std::string& topic,
std::unique_ptr<TImpl>& value,
const std::shared_ptr<TImpl>& value,
GMessagePushStrategy strategy) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::SEND_RECV_PREFIX + topic;
Expand Down Expand Up @@ -224,7 +170,7 @@ class GMessageManager : public GMessageObject,
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus pubTopicValue(const std::string& topic,
const TImpl& value,
const std::shared_ptr<TImpl>& value,
GMessagePushStrategy strategy) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = internal::PUB_SUB_PREFIX + topic;
Expand All @@ -240,27 +186,6 @@ class GMessageManager : public GMessageObject,
CGRAPH_FUNCTION_END
}

/**
* 根据传入的 connId信息,来获取对应的message信息
* @tparam TImpl
* @param connId
* @param value
* @param timeout
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus subTopicValue(CIndex connId, TImpl& value, CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
if (conn_message_map_.end() == conn_message_map_.find(connId)) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + std::to_string(connId) + "] connect");
}

auto message = (GMessagePtr<TImpl>)(conn_message_map_[connId]);
status = message->recv(value, timeout);
CGRAPH_FUNCTION_END
}

/**
* 根据传入的 connId信息,来获取对应的message信息。仅针对传入智能指针的情况
* @tparam TImpl
Expand All @@ -271,7 +196,7 @@ class GMessageManager : public GMessageObject,
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus subTopicValue(CIndex connId, std::unique_ptr<TImpl>& value, CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CStatus subTopicValue(CIndex connId, std::shared_ptr<TImpl>& value, CMSec timeout = CGRAPH_MAX_BLOCK_TTL) {
CGRAPH_FUNCTION_BEGIN
if (conn_message_map_.end() == conn_message_map_.find(connId)) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + std::to_string(connId) + "] connect");
Expand Down
66 changes: 6 additions & 60 deletions src/UtilsCtrl/ThreadPool/Queue/UAtomicRingBufferQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <vector>
#include <atomic>
#include <chrono>
#include <memory>

#include "UQueueObject.h"

Expand Down Expand Up @@ -51,36 +52,6 @@ class UAtomicRingBufferQueue : public UQueueObject {
return capacity_;
}

/**
* 写入信息
* @tparam TImpl
* @param value
* @param strategy
* @return
*/
template<class TImpl = T>
CVoid push(const TImpl& value, URingBufferPushStrategy strategy) {
{
CGRAPH_UNIQUE_LOCK lk(mutex_);
if (isFull()) {
switch (strategy) {
case URingBufferPushStrategy::WAIT:
push_cv_.wait(lk, [this] { return !isFull(); });
break;
case URingBufferPushStrategy::REPLACE:
head_ = (head_ + 1) % capacity_;
break;
case URingBufferPushStrategy::DROP:
return; // 直接返回,不写入即可
}
}

ring_buffer_queue_[tail_] = std::move(c_make_unique<TImpl>(value));
tail_ = (tail_ + 1) % capacity_;
}
pop_cv_.notify_one();
}

/**
* 写入智能指针类型的信息
* @tparam TImpl
Expand All @@ -89,7 +60,7 @@ class UAtomicRingBufferQueue : public UQueueObject {
* @return
*/
template<class TImpl = T>
CVoid push(std::unique_ptr<TImpl>& value, URingBufferPushStrategy strategy) {
CVoid push(const std::shared_ptr<TImpl>& value, URingBufferPushStrategy strategy) {
{
CGRAPH_UNIQUE_LOCK lk(mutex_);
if (isFull()) {
Expand All @@ -105,37 +76,12 @@ class UAtomicRingBufferQueue : public UQueueObject {
}
}

ring_buffer_queue_[tail_] = std::move(value);
ring_buffer_queue_[tail_] = value;
tail_ = (tail_ + 1) % capacity_;
}
pop_cv_.notify_one();
}

/**
* 等待弹出信息
* @param value
* @param timeout
* @return
*/
template<class TImpl = T>
CStatus waitPopWithTimeout(TImpl& value, CMSec timeout) {
CGRAPH_FUNCTION_BEGIN
{
CGRAPH_UNIQUE_LOCK lk(mutex_);
if (isEmpty()
&& !pop_cv_.wait_for(lk, std::chrono::milliseconds(timeout),
[this] { return !isEmpty(); })) {
// 如果timeout的时间内,等不到消息,则返回错误信息
CGRAPH_RETURN_ERROR_STATUS("receive message timeout.")
}

value = *ring_buffer_queue_[head_]; // 这里直接进行值copy
head_ = (head_ + 1) % capacity_;
}
push_cv_.notify_one();
CGRAPH_FUNCTION_END
}

/**
* 等待弹出信息。ps:当入参为智能指针的情况
* @tparam TImpl
Expand All @@ -144,7 +90,7 @@ class UAtomicRingBufferQueue : public UQueueObject {
* @return
*/
template<class TImpl = T>
CStatus waitPopWithTimeout(std::unique_ptr<TImpl>& value, CMSec timeout) {
CStatus waitPopWithTimeout(std::shared_ptr<TImpl>& value, CMSec timeout) {
CGRAPH_FUNCTION_BEGIN
{
CGRAPH_UNIQUE_LOCK lk(mutex_);
Expand All @@ -159,7 +105,7 @@ class UAtomicRingBufferQueue : public UQueueObject {
* 当传入的内容,是智能指针的时候,
* 这里就直接通过 move转移过去好了,跟直接传值的方式,保持区别
*/
value = std::move(ring_buffer_queue_[head_]);
value = ring_buffer_queue_[head_];
head_ = (head_ + 1) % capacity_;
}
push_cv_.notify_one();
Expand Down Expand Up @@ -206,7 +152,7 @@ class UAtomicRingBufferQueue : public UQueueObject {
std::condition_variable push_cv_; // 写入的条件变量。为了保持语义完整,也考虑今后多入多出的可能性,不使用 父类中的 cv_了
std::condition_variable pop_cv_; // 读取的条件变量

std::vector<std::unique_ptr<T> > ring_buffer_queue_; // 环形缓冲区
std::vector<std::shared_ptr<T> > ring_buffer_queue_; // 环形缓冲区
};

CGRAPH_NAMESPACE_END
Expand Down
2 changes: 1 addition & 1 deletion test/Functional/test-functional-04.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void test_functional_04() {
const int halfArrSize = 32;
const int runTimes = 100000;
CGRAPH_CREATE_MESSAGE_TOPIC(TestGMessageParam, g_test_message_key, 100)
std::unique_ptr<TestGMessageParam> mp(new TestGMessageParam());
std::shared_ptr<TestGMessageParam> mp(new TestGMessageParam());
CGRAPH_SEND_MPARAM(TestGMessageParam, g_test_message_key, mp, GMessagePushStrategy::WAIT)

GPipelinePtr pipeline = GPipelineFactory::create();
Expand Down
4 changes: 2 additions & 2 deletions test/_Materials/TestGNodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ class TestAdd1ByParamGNode : public CGraph::GNode {
class TestRecvMessageGNode : public CGraph::GNode {
public:
CStatus run() override {
std::unique_ptr<TestGMessageParam> mp = nullptr;
std::shared_ptr<TestGMessageParam> mp = nullptr;
CStatus status = CGRAPH_RECV_MPARAM_WITH_TIMEOUT(TestGMessageParam, g_test_message_key, mp, 200);
if (status.isErr()) {
CGraph::CGRAPH_ECHO("error message is [%s]", status.getInfo().c_str());
return status;
}

mp.get()->num_++;
mp->num_++;
CGRAPH_SEND_MPARAM(TestGMessageParam, g_test_message_key, mp, CGraph::GMessagePushStrategy::WAIT)
return status;
}
Expand Down
Loading
Loading