Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,9 @@ class Executor
virtual void
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::CallbackGroup::WeakPtr,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
typedef std::map<
rclcpp::CallbackGroup::WeakPtr, rclcpp::GuardCondition::SharedPtr,
std::owner_less<rclcpp::CallbackGroup::WeakPtr>>
WeakCallbackGroupsToGuardConditionsMap;

/// maps callback groups to guard conditions
Expand Down
12 changes: 7 additions & 5 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ Executor::~Executor()
weak_groups_to_nodes_associated_with_executor_.clear();
weak_groups_to_nodes_.clear();
for (const auto & pair : weak_groups_to_guard_conditions_) {
auto guard_condition = pair.second;
memory_strategy_->remove_guard_condition(guard_condition);
memory_strategy_->remove_guard_condition(pair.second.get());
}
weak_groups_to_guard_conditions_.clear();

Expand Down Expand Up @@ -218,7 +217,10 @@ Executor::add_callback_group_to_map(
if (node_ptr->get_context()->is_valid()) {
auto callback_group_guard_condition =
group_ptr->get_notify_guard_condition(node_ptr->get_context());
weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition.get();
// Store shared_ptr to keep the guard condition alive while registered with the executor.
// This prevents the guard condition from being finalized (impl set to NULL) while the
// memory strategy still holds a raw pointer to it during wait_for_work().
weak_groups_to_guard_conditions_[weak_group_ptr] = callback_group_guard_condition;
// Add the callback_group's notify condition to the guard condition handles
memory_strategy_->add_guard_condition(*callback_group_guard_condition);
}
Expand Down Expand Up @@ -304,7 +306,7 @@ Executor::remove_callback_group_from_map(
{
auto iter = weak_groups_to_guard_conditions_.find(weak_group_ptr);
if (iter != weak_groups_to_guard_conditions_.end()) {
memory_strategy_->remove_guard_condition(iter->second);
memory_strategy_->remove_guard_condition(iter->second.get());
}
weak_groups_to_guard_conditions_.erase(weak_group_ptr);

Expand Down Expand Up @@ -730,7 +732,7 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout)
if (callback_guard_pair != weak_groups_to_guard_conditions_.end()) {
auto guard_condition = callback_guard_pair->second;
weak_groups_to_guard_conditions_.erase(group_ptr);
memory_strategy_->remove_guard_condition(guard_condition);
memory_strategy_->remove_guard_condition(guard_condition.get());
}
weak_groups_to_nodes_.erase(group_ptr);
});
Expand Down
89 changes: 89 additions & 0 deletions rclcpp/test/rclcpp/test_add_callback_groups_to_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

#include <gtest/gtest.h>

#include <atomic>
#include <chrono>
#include <future>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -340,6 +343,92 @@ TYPED_TEST(TestAddCallbackGroupsToExecutor, subscriber_triggered_to_receive_mess
EXPECT_TRUE(received_message_future.get());
}

/*
* Test destroying the last strong callback group reference while the executor is spinning.
* This exercises the callback-group lifetime path from https://github.com/ros2/rclcpp/issues/2163.
*/
TYPED_TEST(TestAddCallbackGroupsToExecutor, callback_group_destroyed_while_spinning)
{
using ExecutorType = TypeParam;

ExecutorType executor;
auto node = std::make_shared<rclcpp::Node>("callback_group_destroyed_while_spinning", "/ns");
executor.add_node(node);

auto count_live_callback_groups = [&executor]() {
size_t count = 0;
for (const auto & weak_group : executor.get_all_callback_groups()) {
if (weak_group.lock()) {
++count;
}
}
return count;
};

auto wait_for_live_callback_groups =
[&count_live_callback_groups](size_t expected_count, std::chrono::milliseconds timeout) {
const auto deadline = std::chrono::steady_clock::now() + timeout;
while (std::chrono::steady_clock::now() < deadline) {
if (count_live_callback_groups() == expected_count) {
return true;
}
std::this_thread::sleep_for(1ms);
}
return count_live_callback_groups() == expected_count;
};

const auto initial_callback_group_count = count_live_callback_groups();

std::exception_ptr spin_exception;
std::thread spin_thread([&executor, &spin_exception]() {
try {
executor.spin();
} catch (...) {
spin_exception = std::current_exception();
}
});

auto heartbeat_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, true);
auto heartbeat_timer = node->create_wall_timer(1ms, []() {}, heartbeat_group);

bool callback_groups_tracked =
wait_for_live_callback_groups(initial_callback_group_count + 1u, 2s);
const auto steady_state_callback_group_count = count_live_callback_groups();
bool callback_groups_cleaned_up = callback_groups_tracked;

for (size_t attempt = 0; attempt < 50 && callback_groups_cleaned_up; ++attempt) {
auto transient_group = node->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, true);
auto transient_timer = node->create_wall_timer(1ms, []() {}, transient_group);

callback_groups_cleaned_up = wait_for_live_callback_groups(
steady_state_callback_group_count + 1u, 2s);

transient_timer.reset();
transient_group.reset();

callback_groups_cleaned_up = callback_groups_cleaned_up &&
wait_for_live_callback_groups(steady_state_callback_group_count, 2s);
}

executor.cancel();
spin_thread.join();

EXPECT_TRUE(callback_groups_tracked);
EXPECT_TRUE(callback_groups_cleaned_up);

if (spin_exception) {
try {
std::rethrow_exception(spin_exception);
} catch (const std::exception & exception) {
FAIL() << "executor.spin() threw: " << exception.what();
} catch (...) {
FAIL() << "executor.spin() threw a non-standard exception";
}
}
}

/*
* Test removing callback group from executor that its not associated with.
*/
Expand Down