Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,24 @@ When `use_clock=True` (default), the replay framework will:
When `use_clock=False`, the replay will:
- Skip `/clock` topic publishing

#### Termination Condition

By default, the replay will run until all messages from the input MCAP are played back.
After that, the `run` stops immediately and the `analyze` step is executed.

To wait for `run` to finish instead of stopping immediately after playback, you can set a `ignore_playback_finish` flag in `ReplayRunParams`:

```python
from replay_testing import ReplayRunParams

@run.default(params=ReplayRunParams(name='default', ignore_playback_finish=True))
class Run:
def generate_launch_description(self) -> LaunchDescription:
# Your launch description here
# Note: Every node in the launch description must finish on its own. Otherwise, the test will hang.
pass
```

### Analyze `@analyze`

The analyze step is run after the mcap from the `run` is recorded and written. It is a basic wrapper over `unittest.TestCase`, so any `unittest` assertions are built in.
Expand Down
4 changes: 2 additions & 2 deletions replay_testing/decorators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,5 @@ def parameterize(parameters: list[ReplayRunParams]):
return run(parameters=parameters)

@staticmethod
def default(params: dict = {}):
return run(parameters=[ReplayRunParams(name='default', params=params)])
def default(params: ReplayRunParams = ReplayRunParams(name='default')):
return run(parameters=[params])
3 changes: 2 additions & 1 deletion replay_testing/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class RunnerArgs(BaseModel):

class ReplayRunParams(BaseModel):
name: str
params: dict
params: dict = {}
runner_args: RunnerArgs = RunnerArgs()
ignore_playback_finish: bool = False


class Mcap(BaseModel):
Expand Down
28 changes: 16 additions & 12 deletions replay_testing/replay_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,30 @@ def _create_run_launch_description(
output='screen',
)

# Event handler to gracefully exit when the process finishes
on_exit_handler = RegisterEventHandler(
OnProcessExit(
target_action=player_action,
# Shutdown the launch service
on_exit=[launch.actions.EmitEvent(event=Shutdown())],
)
)

# Launch description with the event handler
return LaunchDescription([
# Launch description
ld = LaunchDescription([
ExecuteProcess(
cmd=['ros2', 'bag', 'record', '-s', 'mcap', '-o', str(run_fixture.path), '--all'],
output='screen',
),
test_ld,
player_action, # Add the MCAP playback action
on_exit_handler, # Add the event handler to shutdown after playback finishes
])

if not params.ignore_playback_finish:
# Event handler to gracefully exit when the process finishes
on_exit_handler = RegisterEventHandler(
OnProcessExit(
target_action=player_action,
# Shutdown the launch service
on_exit=[launch.actions.EmitEvent(event=Shutdown())],
)
)

ld.add_action(on_exit_handler)

return ld

def filter_fixtures(self) -> list[ReplayFixture]:
self._log_stage_start(ReplayTestingPhase.FIXTURES)

Expand Down
79 changes: 79 additions & 0 deletions test/replay_tests/basic_replay_finish_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright (c) 2025-present Polymath Robotics, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pathlib

from launch import LaunchDescription
from launch.actions import EmitEvent, ExecuteProcess, RegisterEventHandler
from launch.event_handlers import OnProcessExit
from launch.events import Shutdown

from replay_testing import (
LocalFixture,
ReplayRunParams,
analyze,
fixtures,
read_messages,
run,
)

cmd_vel_only_fixture = pathlib.Path(__file__).parent.parent / 'fixtures' / 'cmd_vel_only.mcap'


@fixtures.parameterize([LocalFixture(path=cmd_vel_only_fixture)])
class FilterFixtures:
required_input_topics = ['/vehicle/cmd_vel']
expected_output_topics = ['/user/cmd_vel']


@run.default(params=ReplayRunParams(name='default', ignore_playback_finish=True))
class Run:
def generate_launch_description(self) -> LaunchDescription:
target_process = ExecuteProcess(
cmd=[
'ros2',
'topic',
'pub',
'-r',
'10',
'-t',
'20',
'/user/cmd_vel',
'geometry_msgs/msg/Twist',
'{linear: {x: 1.0}, angular: {z: 0.5}}',
],
name='topic_pub',
output='screen',
)
on_exit_handler = RegisterEventHandler(
OnProcessExit(
target_action=target_process,
# Shutdown the launch service
on_exit=[EmitEvent(event=Shutdown())],
)
)
return LaunchDescription([
target_process,
on_exit_handler,
])


@analyze
class AnalyzeBasicReplay:
def test_expected_failure(self):
msgs_it = read_messages(self.reader, topics=['/user/cmd_vel'])

msgs = [(topic_name, msg, timestamp) for topic_name, msg, timestamp in msgs_it]
assert len(msgs) == 20