[Example] Isaac Lab RNN PPO with compact memory + knowledge-base notes#3760
[Example] Isaac Lab RNN PPO with compact memory + knowledge-base notes#3760vmoens wants to merge 1 commit into
Conversation
🔗 Helpful Links🧪 See artifacts and rendered test results at hud.pytorch.org/pr/pytorch/rl/3760
Note: Links to docs will display an error until the docs builds have been completed. ❗ 1 Active SEVsThere are 1 currently active SEVs. If your PR is affected, please view them below: ❌ 4 New Failures, 15 PendingAs of commit a179af1 with merge base 0a01ee8 ( NEW FAILURES - The following jobs have failed:
This comment was automatically generated by Dr. CI and updates every 15 minutes. |
|
| Prefix | Label Applied | Example |
|---|---|---|
[BugFix] |
BugFix | [BugFix] Fix memory leak in collector |
[Feature] |
Feature | [Feature] Add new optimizer |
[Doc] or [Docs] |
Documentation | [Doc] Update installation guide |
[Refactor] |
Refactoring | [Refactor] Clean up module imports |
[CI] |
CI | [CI] Fix workflow permissions |
[Test] or [Tests] |
Tests | [Tests] Add unit tests for buffer |
[Environment] or [Environments] |
Environments | [Environments] Add Gymnasium support |
[Data] |
Data | [Data] Fix replay buffer sampling |
[Performance] or [Perf] |
Performance | [Performance] Optimize tensor ops |
[BC-Breaking] |
bc breaking | [BC-Breaking] Remove deprecated API |
[Deprecation] |
Deprecation | [Deprecation] Mark old function |
[Quality] |
Quality | [Quality] Fix typos and add codespell |
Note: Common variations like singular/plural are supported (e.g., [Doc] or [Docs]).
1 similar comment
|
| Prefix | Label Applied | Example |
|---|---|---|
[BugFix] |
BugFix | [BugFix] Fix memory leak in collector |
[Feature] |
Feature | [Feature] Add new optimizer |
[Doc] or [Docs] |
Documentation | [Doc] Update installation guide |
[Refactor] |
Refactoring | [Refactor] Clean up module imports |
[CI] |
CI | [CI] Fix workflow permissions |
[Test] or [Tests] |
Tests | [Tests] Add unit tests for buffer |
[Environment] or [Environments] |
Environments | [Environments] Add Gymnasium support |
[Data] |
Data | [Data] Fix replay buffer sampling |
[Performance] or [Perf] |
Performance | [Performance] Optimize tensor ops |
[BC-Breaking] |
bc breaking | [BC-Breaking] Remove deprecated API |
[Deprecation] |
Deprecation | [Deprecation] Mark old function |
[Quality] |
Quality | [Quality] Fix typos and add codespell |
Note: Common variations like singular/plural are supported (e.g., [Doc] or [Docs]).
|
| Prefix | Label Applied | Example |
|---|---|---|
[BugFix] |
BugFix | [BugFix] Fix memory leak in collector |
[Feature] |
Feature | [Feature] Add new optimizer |
[Doc] or [Docs] |
Documentation | [Doc] Update installation guide |
[Refactor] |
Refactoring | [Refactor] Clean up module imports |
[CI] |
CI | [CI] Fix workflow permissions |
[Test] or [Tests] |
Tests | [Tests] Add unit tests for buffer |
[Environment] or [Environments] |
Environments | [Environments] Add Gymnasium support |
[Data] |
Data | [Data] Fix replay buffer sampling |
[Performance] or [Perf] |
Performance | [Performance] Optimize tensor ops |
[BC-Breaking] |
bc breaking | [BC-Breaking] Remove deprecated API |
[Deprecation] |
Deprecation | [Deprecation] Mark old function |
[Quality] |
Quality | [Quality] Fix typos and add codespell |
Note: Common variations like singular/plural are supported (e.g., [Doc] or [Docs]).
| launch_args = sys.argv[1:] | ||
| if not any(arg == "--headless" or arg.startswith("--headless=") for arg in launch_args): | ||
| launch_args = [*launch_args, "--headless"] | ||
| args_cli, _ = _parser().parse_known_args(launch_args) | ||
| app_launcher = AppLauncher(args_cli) | ||
| simulation_app = app_launcher.app | ||
|
|
||
|
|
||
| def _restore_local_editable_paths() -> None: | ||
| for module_name in ("tensordict", "torchrl"): | ||
| module = sys.modules.get(module_name) | ||
| if module is not None and getattr(module, "__file__", None) is None: | ||
| del sys.modules[module_name] | ||
|
|
||
| for path in ("/root/tensordict", "/root/td", "/root/rl-isaac"): | ||
| if Path(path).exists() and path not in sys.path: | ||
| sys.path.insert(0, path) |
There was a problem hiding this comment.
that looks terribly hacky and ad-hoc, remove at all cost
|
|
||
| os.environ.setdefault("TORCHRL_PROFILING", "1") | ||
|
|
||
| from isaaclab.app import AppLauncher |
There was a problem hiding this comment.
isaac should only run in the sub-process. Let's ditch the regular collector and just use MultiCollector with sync true or false. No isaac in-process here.
| from isaaclab.app import AppLauncher | ||
|
|
||
|
|
||
| def _parser() -> argparse.ArgumentParser: |
There was a problem hiding this comment.
a proper script is imports -> methods and classes -> execution. This is all mixed up
| backbone = make_backbone( | ||
| args.obs_dim, | ||
| args.hidden_size, | ||
| args.rnn_backend, | ||
| train_device, | ||
| ) | ||
| actor_head = make_actor_head(args.action_dim, args.hidden_size, train_device) | ||
| actor = make_actor_from_modules( | ||
| backbone, | ||
| actor_head, | ||
| args.action_dim, | ||
| train_device, | ||
| ) | ||
| critic = make_critic_head(args.hidden_size, train_device) |
There was a problem hiding this comment.
this could be bundled in a single "make_models" util
| if gae_rnn_backend == "auto": | ||
| gae_rnn_backend = ( | ||
| "cudnn" if args.rnn_backend in ("scan", "triton") else args.rnn_backend | ||
| ) | ||
| elif gae_rnn_backend == "same": | ||
| gae_rnn_backend = args.rnn_backend |
There was a problem hiding this comment.
GAE should be same, always. If that causes an issue we need to solve the issue.
| gae_backbone = None | ||
| gae_full_value = full_value | ||
| if gae_rnn_backend != args.rnn_backend: | ||
| gae_backbone = make_backbone( | ||
| args.obs_dim, | ||
| args.hidden_size, | ||
| gae_rnn_backend, | ||
| train_device, | ||
| ) | ||
| gae_full_value = make_full_value(gae_backbone, critic) | ||
| compile_gae_value_network = compile_gae and gae_backbone is None | ||
| gae_precompile_calls = 2 if compile_gae_value_network else 1 |
There was a problem hiding this comment.
my previous comment should make this a bit redundant.
| def compute_advantages(batch: TensorDictBase): | ||
| with torch.no_grad(), set_recurrent_mode(True): | ||
| if gae_backbone is not None: | ||
| gae_backbone.load_state_dict(backbone.state_dict()) | ||
| return adv_module(batch) |
There was a problem hiding this comment.
no load_state_dict, are we mad? Let's have one single copy of our weights in a given process.
| def compute_loss(batch: TensorDictBase): | ||
| with set_recurrent_mode(True): | ||
| return loss_module(batch) |
There was a problem hiding this comment.
the added value of a function like this is 0 or negative
| actor_grad_norm = torch.nn.utils.clip_grad_norm_( | ||
| actor_params, max_norm=float("inf") | ||
| ) | ||
| critic_grad_norm = torch.nn.utils.clip_grad_norm_( | ||
| critic_params, max_norm=float("inf") | ||
| ) |
There was a problem hiding this comment.
clip grad norm to 1 or 10 or something, use the args for that.
| def _make_warmup_minibatch() -> TensorDictBase: | ||
| mini_batch = _make_fake_training_batch( | ||
| batch_size=sample_num_slices, | ||
| seq_len=sample_seq_len, | ||
| obs_dim=args.obs_dim, | ||
| action_dim=args.action_dim, | ||
| hidden_size=args.hidden_size, | ||
| device=train_device, | ||
| ) | ||
| with torch.no_grad(), set_recurrent_mode(True): | ||
| value_data = full_value(mini_batch.clone()) | ||
| state_value = value_data.get(loss_module.tensor_keys.value, None) | ||
| if state_value is None: | ||
| raise KeyError( | ||
| f"Could not find value key {loss_module.tensor_keys.value} in " | ||
| "the fake warmup value-network output." | ||
| ) | ||
| mini_batch.set(loss_module.tensor_keys.value_target, state_value.detach()) | ||
| mini_batch.set(loss_module.tensor_keys.advantage, torch.zeros_like(state_value)) | ||
| return _canonicalize_batch(mini_batch) |
There was a problem hiding this comment.
we should be able not to use that now. We've solved the issue that made it necessary separately. Let's just run update() in the training loop.
| def _make_warmup_rollout() -> TensorDictBase: | ||
| fake_rollout = _make_fake_training_batch( | ||
| batch_size=args.num_envs, | ||
| seq_len=args.rollout_steps, | ||
| obs_dim=args.obs_dim, | ||
| action_dim=args.action_dim, | ||
| hidden_size=args.hidden_size, | ||
| device=train_device, | ||
| ) | ||
| if args.compact_obs: | ||
| next_td = fake_rollout.get("next", None) | ||
| if next_td is not None and "policy" in next_td.keys(): | ||
| next_td.del_("policy") | ||
| return _canonicalize_batch(fake_rollout) | ||
|
|
There was a problem hiding this comment.
ditto looks useless to me now
| def _restore_after_precompile( | ||
| param_snapshots: tuple[torch.Tensor, ...], | ||
| ) -> None: | ||
| with torch.no_grad(): | ||
| for param, snapshot in zip(model_params, param_snapshots): | ||
| param.copy_(snapshot) | ||
| for state in optim.state.values(): | ||
| for key, value in state.items(): | ||
| if torch.is_tensor(value): | ||
| value.zero_() | ||
| elif isinstance(value, int): | ||
| state[key] = 0 | ||
| elif isinstance(value, float): | ||
| state[key] = 0.0 | ||
| optim.zero_grad(set_to_none=True) |
| state[key] = 0.0 | ||
| optim.zero_grad(set_to_none=True) | ||
|
|
||
| def _precompile_training() -> int: |
There was a problem hiding this comment.
let's not precompile anything. Compile and execute and that's it. We'll swallow the compile time in the training loop, amen
| if args.double_buffer_collector: | ||
| collector_replay_buffer = ReplayBuffer( | ||
| storage=_make_storage( | ||
| args, | ||
| torch.device("cpu"), | ||
| args.num_envs, | ||
| storage_kind=args.collector_buffer_storage, | ||
| name="collector_replay_buffer", | ||
| ), | ||
| batch_size=args.num_envs, | ||
| ) |
There was a problem hiding this comment.
let's remove the double buffer option. We can just use storing_device=cpu. It already uses a buffer in shared mem to pass the data which is a bit like what our double buffer does but more clearly.
| max_episode_steps=args.max_episode_steps, | ||
| device=str(collector_device), | ||
| ) | ||
| collector_rnn_backend = "cudnn" |
There was a problem hiding this comment.
if set_recurrent_mode is False, the rnn backend should always be cudnn. That should be documented. We should not set it here, it should be automatic
| if args.collector_backend == "single": | ||
| collector = Collector( | ||
| make_env_fn, | ||
| policy_factory=collector_policy_factory, | ||
| frames_per_batch=frames_per_batch, | ||
| total_frames=-1, | ||
| policy_device=collector_device, | ||
| no_cuda_sync=True, | ||
| trust_policy=True, | ||
| storing_device=storing_device, | ||
| compact_obs=args.compact_obs, | ||
| auto_register_policy_transforms=True, | ||
| track_policy_version=True, | ||
| replay_buffer=collector_replay_buffer, | ||
| extend_buffer=True, | ||
| ) |
There was a problem hiding this comment.
as pointed above this can be removed
| config = vars(args).copy() | ||
| config.update( | ||
| { | ||
| "frames_per_batch": frames_per_batch, | ||
| "per_collector_envs": per_collector_envs, | ||
| "collector_rnn_backend": collector_rnn_backend, | ||
| "training_rnn_backend": args.rnn_backend, | ||
| "training_gae_rnn_backend": gae_rnn_backend, | ||
| "updates_per_epoch": updates_per_epoch, | ||
| "storing_device": str(storing_device) | ||
| if storing_device is not None | ||
| else None, | ||
| "double_buffer_collector": args.double_buffer_collector, | ||
| "collector_replay_buffer": args.double_buffer_collector, | ||
| "collector_policy_sync": "before_next", | ||
| "collector_policy_version_tracking": True, | ||
| "collector_weight_sync_scheme": "MultiProcessWeightSyncScheme" | ||
| if args.collector_backend != "single" | ||
| else "single_process_fallback", | ||
| "debug_data_flow": args.debug_data_flow, | ||
| "training_precompile": args.precompile_training, | ||
| "training_compile_gae_requested": compile_gae, | ||
| "training_compile_gae": compile_gae_value_network, | ||
| "training_compile_gae_target": "value_network" | ||
| if compile_gae_value_network | ||
| else "none", | ||
| "training_precompile_gae_calls": gae_precompile_calls, | ||
| "training_precompile_update_calls": precompile_update_calls, | ||
| } | ||
| ) |
There was a problem hiding this comment.
do we need all of this? Doesn't make the code more readable
| collector.update_policy_weights_( | ||
| weights=TensorDict.from_module(actor).data | ||
| ) | ||
| collector.increment_version() |
There was a problem hiding this comment.
that should be called automatically by update_policy_weights_ IMO - if not we need to patch.
| collector.update_policy_weights_( | ||
| weights=TensorDict.from_module(actor).data | ||
| ) |
There was a problem hiding this comment.
this looks very standard, we should be able to just pass the actor no? Doesn't update_policy_weights_ do that for us under the hood?
| if collector_replay_buffer is None: | ||
| if collected_batch is None: | ||
| raise RuntimeError( | ||
| "collector returned None but no replay buffer was provided" | ||
| ) | ||
| data = collected_batch |
There was a problem hiding this comment.
this path can be removed
| args.rollout_steps, | ||
| "gae_batch", | ||
| ) | ||
| epoch_data = _canonicalize_batch(data.to(train_device)) |
There was a problem hiding this comment.
_canonicalize_batch should be called in the RNN if anything, not in the loop. Users should not care about this.
| if args.debug_data_flow and epoch == 0: | ||
| stored_data = train_buffer[:] | ||
| _add_data_flow_metrics( | ||
| batch_metrics, "data_flow/train_buffer", stored_data | ||
| ) | ||
| _add_buffer_compare_metrics( | ||
| batch_metrics, epoch_data, stored_data | ||
| ) |
There was a problem hiding this comment.
remove all debug paths from this file
| mini_batch = _canonicalize_batch( | ||
| mini_batch.to(train_device) | ||
| ) |
There was a problem hiding this comment.
ditto this looks like something users will be distracted by. I believe the RNN does this op for us so we can spare it here
Stack from ghstack (oldest at bottom):
End-to-end recurrent PPO example targeting Isaac Lab, exercising the
collector / GAE features added below in the stack:
compact_obs=Trueto drop redundant("next", obs)from eachrollout step and keep the buffer affordable for vectorized envs.
final_obs=Trueto carry the true window-boundary observationunder
("final", k), soGAE(shifted=True)bootstraps withV(s_T)rather thanV(s_{T-1})at every soft boundary.recurrent_backend="scan"(or"triton") for the LSTM, with thecanonical-stride and cuDNN flat-storage fixes in place.
timeit.mark_start/mark_endto instrument the worker loop.track_policy_version=Trueon the MultiCollector so each batch istagged with the policy version at collection time.
isaaclab_rnn_ppo_memory_utils.pyfactors the network builder, thecollector factory, the GAE+critic warmup, and the cudagraph fake batch
plumbing so the main script reads as a thin orchestration loop. CLI
flags surface the compile / cudagraph / vectorized-GAE / cudagraph-trees
knobs and a configurable warmup-iterations floor (which auto-bumps to
cudagraph_warmup + 1).knowledge_base/ISAACLAB.mdadds the pip-based container flow we useon the cluster (uv venv when ensurepip is missing, IsaacLab pip wheels,
the EULA / privacy-consent env vars to make Kit non-interactive) and a
recurrent-PPO-on-Isaac recipe pinning the collector-owned policy_factory,
the weight-sync scheme, and the compact_obs + shifted-GAE pairing.