Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/ros2_medkit_gateway/src/http/handlers/data_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ void DataHandlers::handle_list_data(const httplib::Request & req, httplib::Respo
ext.ros2_type(topic_type);
try {
auto type_info = type_introspection->get_type_info(topic_type);
ext.type_info(type_info.schema);
json type_info_obj;
type_info_obj["schema"] = type_info.schema;
type_info_obj["default_value"] = type_info.default_value;
ext.type_info(type_info_obj);
} catch (const std::exception & e) {
RCLCPP_DEBUG(HandlerContext::logger(), "Could not get type info for topic '%s': %s", topic.name.c_str(),
e.what());
Expand Down Expand Up @@ -176,7 +179,10 @@ void DataHandlers::handle_get_data_item(const httplib::Request & req, httplib::R
auto type_introspection = data_access_mgr->get_type_introspection();
try {
auto type_info = type_introspection->get_type_info(sample.message_type);
ext.type_info(type_info.schema);
json type_info_obj;
type_info_obj["schema"] = type_info.schema;
type_info_obj["default_value"] = type_info.default_value;
ext.type_info(type_info_obj);
} catch (const std::exception & e) {
RCLCPP_DEBUG(HandlerContext::logger(), "Could not get type info for topic '%s': %s", full_topic_path.c_str(),
e.what());
Expand Down
117 changes: 116 additions & 1 deletion src/ros2_medkit_gateway/src/http/handlers/fault_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <filesystem>
#include <fstream>
#include <iomanip>
#include <set>
#include <sstream>
#include <vector>

Expand Down Expand Up @@ -89,6 +90,35 @@ std::string validate_fault_code(const std::string & fault_code) {
return "";
}

/// Helper to filter faults JSON array by a set of namespace prefixes
/// Keeps faults where any reporting_source starts with any of the given prefixes
json filter_faults_by_sources(const json & faults_array, const std::set<std::string> & source_prefixes) {
json filtered = json::array();
for (const auto & fault : faults_array) {
if (!fault.contains("reporting_sources")) {
continue;
}
const auto & sources = fault["reporting_sources"];
bool matches = false;
for (const auto & src : sources) {
const std::string src_str = src.get<std::string>();
for (const auto & prefix : source_prefixes) {
if (src_str.rfind(prefix, 0) == 0) {
matches = true;
break;
}
}
if (matches) {
break;
}
}
if (matches) {
filtered.push_back(fault);
}
}
return filtered;
}

} // namespace

void FaultHandlers::handle_list_all_faults(const httplib::Request & req, httplib::Response & res) {
Expand Down Expand Up @@ -170,7 +200,6 @@ void FaultHandlers::handle_list_faults(const httplib::Request & req, httplib::Re
{{"entity_id", entity_id}});
return;
}
std::string namespace_path = entity_info.namespace_path;

auto filter = parse_fault_status_param(req);
if (!filter.is_valid) {
Expand All @@ -188,6 +217,92 @@ void FaultHandlers::handle_list_faults(const httplib::Request & req, httplib::Re
bool include_clusters = req.get_param_value("include_clusters") == "true";

auto fault_mgr = ctx_.node()->get_fault_manager();

// For Functions, aggregate faults from all host apps
// Functions don't have a single namespace_path - they host apps from potentially different namespaces
if (entity_info.type == EntityType::FUNCTION) {
// Get all faults (no namespace filter)
auto result = fault_mgr->get_faults("", filter.include_pending, filter.include_confirmed, filter.include_cleared,
include_muted, include_clusters);

if (!result.success) {
HandlerContext::send_error(res, StatusCode::ServiceUnavailable_503, ERR_SERVICE_UNAVAILABLE,
"Failed to get faults",
{{"details", result.error_message}, {entity_info.id_field, entity_id}});
return;
}

// Collect host app FQNs for filtering
const auto & cache = ctx_.node()->get_thread_safe_cache();
auto agg_configs = cache.get_entity_configurations(entity_id);
std::set<std::string> host_fqns;
for (const auto & node : agg_configs.nodes) {
if (!node.node_fqn.empty()) {
host_fqns.insert(node.node_fqn);
}
}

// Filter faults to only those from function's host apps
json filtered_faults = filter_faults_by_sources(result.data["faults"], host_fqns);

// Build response
json response = {{"items", filtered_faults}};

XMedkit ext;
ext.entity_id(entity_id);
ext.add("aggregation_level", "function");
ext.add("count", filtered_faults.size());
ext.add("host_count", host_fqns.size());

response["x-medkit"] = ext.build();
HandlerContext::send_json(res, response);
return;
}

// For Components, aggregate faults from all hosted apps
// Components group Apps, so we filter by the apps' FQNs rather than namespace (which is too broad)
if (entity_info.type == EntityType::COMPONENT) {
// Get all faults (no namespace filter)
auto result = fault_mgr->get_faults("", filter.include_pending, filter.include_confirmed, filter.include_cleared,
include_muted, include_clusters);

if (!result.success) {
HandlerContext::send_error(res, StatusCode::ServiceUnavailable_503, ERR_SERVICE_UNAVAILABLE,
"Failed to get faults",
{{"details", result.error_message}, {entity_info.id_field, entity_id}});
return;
}

// Collect hosted app FQNs for filtering
const auto & cache = ctx_.node()->get_thread_safe_cache();
auto app_ids = cache.get_apps_for_component(entity_id);
std::set<std::string> app_fqns;
for (const auto & app_id : app_ids) {
auto app = cache.get_app(app_id);
if (app && app->bound_fqn.has_value() && !app->bound_fqn->empty()) {
app_fqns.insert(app->bound_fqn.value());
}
}

// Filter faults to only those from component's hosted apps
json filtered_faults = filter_faults_by_sources(result.data["faults"], app_fqns);

// Build response
json response = {{"items", filtered_faults}};

XMedkit ext;
ext.entity_id(entity_id);
ext.add("aggregation_level", "component");
ext.add("count", filtered_faults.size());
ext.add("app_count", app_fqns.size());

response["x-medkit"] = ext.build();
HandlerContext::send_json(res, response);
return;
}

// For other entity types (App, Area), use namespace_path filtering
std::string namespace_path = entity_info.namespace_path;
auto result = fault_mgr->get_faults(namespace_path, filter.include_pending, filter.include_confirmed,
filter.include_cleared, include_muted, include_clusters);

Expand Down
4 changes: 2 additions & 2 deletions src/ros2_medkit_gateway/src/http/handlers/handler_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ EntityInfo HandlerContext::get_entity_info(const std::string & entity_id) const
// Search areas (O(1) lookup)
if (auto area = cache.get_area(entity_id)) {
info.type = EntityType::AREA;
info.namespace_path = ""; // Areas don't have namespace_path
info.fqn = "";
info.namespace_path = area->namespace_path; // Use area's namespace for fault filtering
info.fqn = area->namespace_path;
info.id_field = "area_id";
info.error_name = "Area";
return info;
Expand Down
25 changes: 9 additions & 16 deletions src/ros2_medkit_gateway/src/native_topic_sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,24 +280,17 @@ TopicSampleResult NativeTopicSampler::sample_topic(const std::string & topic_nam

RCLCPP_DEBUG(node_->get_logger(), "sample_topic: Created GenericSubscription for '%s'", topic_name.c_str());

// Spin with timeout
// Wait for message using future with timeout
// The main executor will deliver callbacks - we just wait for the result
// Note: This works because HTTP requests are handled in a separate thread,
// while the main executor processes ROS callbacks
const auto timeout = std::chrono::duration<double>(timeout_sec);
const auto start_time = std::chrono::steady_clock::now();

while (!received.load()) {
auto elapsed = std::chrono::steady_clock::now() - start_time;
if (elapsed >= timeout) {
RCLCPP_DEBUG(node_->get_logger(), "sample_topic: Timeout waiting for message on '%s'", topic_name.c_str());
result.has_data = false;
return result;
}

// Spin with small timeout to allow checking received flag
rclcpp::spin_some(node_->get_node_base_interface());
auto future_status = message_future.wait_for(timeout);

if (!received.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
if (future_status == std::future_status::timeout) {
RCLCPP_DEBUG(node_->get_logger(), "sample_topic: Timeout waiting for message on '%s'", topic_name.c_str());
result.has_data = false;
return result;
}

// Deserialize message using JsonSerializer
Expand Down