Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 79 additions & 13 deletions src/streamers/image_transport_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <exception>
#include <functional>
#include <mutex>
#include <optional>
#include <string>
#include <vector>

Expand All @@ -54,8 +55,11 @@
#include "image_transport/image_transport.hpp"
#include "image_transport/transport_hints.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/logger.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/qos.hpp"
#include "rmw/qos_profiles.h"
#include "rmw/types.h"
#include "sensor_msgs/msg/image.hpp"

#include "web_video_server/streamer.hpp"
Expand Down Expand Up @@ -84,6 +88,78 @@ std::vector<std::string> get_image_topics(rclcpp::Node & node)
return result;
}

std::optional<rmw_qos_profile_t> detect_publisher_qos(
const rclcpp::Node::SharedPtr & node,
const rclcpp::Logger & logger,
const std::string & topic)
{
RCLCPP_INFO(logger, "Attempting to auto-detect QoS for topic: %s", topic.c_str());

const auto topic_endpoint_info_array = node->get_publishers_info_by_topic(topic);
if (topic_endpoint_info_array.empty()) {
RCLCPP_WARN(logger, "No publishers found for topic: %s", topic.c_str());
return std::nullopt;
}

// Use the first publisher's QoS as reference.
const auto & endpoint_info = topic_endpoint_info_array.front();
const auto qos_profile = endpoint_info.qos_profile();

const std::string reliability =
(qos_profile.reliability() == rclcpp::ReliabilityPolicy::Reliable) ? "RELIABLE" : "BEST_EFFORT";
const std::string durability =
(qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal) ?
"TRANSIENT_LOCAL" : "VOLATILE";

RCLCPP_INFO(
logger, "Detected QoS - Reliability: %s, Durability: %s, History depth: %zu",
reliability.c_str(), durability.c_str(), qos_profile.depth());

// Convert rclcpp QoS to rmw QoS profile.
auto rmw_qos = rmw_qos_profile_default;
rmw_qos.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT;
rmw_qos.durability = RMW_QOS_POLICY_DURABILITY_VOLATILE;
rmw_qos.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST;
rmw_qos.depth = qos_profile.depth();

if (qos_profile.reliability() == rclcpp::ReliabilityPolicy::Reliable) {
rmw_qos.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE;
}
if (qos_profile.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
rmw_qos.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL;
}

return rmw_qos;
}

std::optional<rmw_qos_profile_t> get_qos_profile(
const rclcpp::Node::SharedPtr & node,
const rclcpp::Logger & logger,
const std::string & profile_name,
const std::string & topic)
{
if (profile_name == "auto") {
auto qos_profile = detect_publisher_qos(node, logger, topic);
if (!qos_profile) {
RCLCPP_WARN(
logger, "Could not auto-detect QoS for topic %s. Using default profile.", topic.c_str());
return rmw_qos_profile_default;
}
RCLCPP_INFO(logger, "Using auto-detected QoS profile for topic %s", topic.c_str());
return qos_profile;
}

RCLCPP_INFO(
logger, "Using specified QoS profile %s for topic %s", profile_name.c_str(), topic.c_str());
auto qos_profile = web_video_server::get_qos_profile_from_name(profile_name);
if (!qos_profile) {
RCLCPP_ERROR(
logger, "Invalid QoS profile %s specified. Using default profile.", profile_name.c_str());
return rmw_qos_profile_default;
}
return qos_profile;
}

} // namespace

ImageTransportStreamerBase::ImageTransportStreamerBase(
Expand All @@ -97,7 +173,7 @@ ImageTransportStreamerBase::ImageTransportStreamerBase(
output_height_ = request.get_query_param_value_or_default<int>("height", -1);
invert_ = request.has_query_param("invert");
default_transport_ = request.get_query_param_value_or_default("default_transport", "raw");
qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "default");
qos_profile_name_ = request.get_query_param_value_or_default("qos_profile", "auto");
}

ImageTransportStreamerBase::~ImageTransportStreamerBase()
Expand Down Expand Up @@ -133,18 +209,8 @@ void ImageTransportStreamerBase::start()
}
}

// Get QoS profile from query parameter
RCLCPP_INFO(
logger_, "Streaming topic %s with QoS profile %s", topic_.c_str(),
qos_profile_name_.c_str());
auto qos_profile = get_qos_profile_from_name(qos_profile_name_);
if (!qos_profile) {
qos_profile = rmw_qos_profile_default;
RCLCPP_ERROR(
logger_,
"Invalid QoS profile %s specified. Using default profile.",
qos_profile_name_.c_str());
}
// Get QoS profile based on user selection or auto-detect.
const auto qos_profile = get_qos_profile(node, logger_, qos_profile_name_, topic_);

// Create subscriber
image_sub_ = image_transport::create_subscription(
Expand Down
Loading