Add ALM Data Pipeline tutorial and stages#1419
Add ALM Data Pipeline tutorial and stages#1419sarahyurick merged 28 commits intoNVIDIA-NeMo:mainfrom
Conversation
66abf28 to
0125f32
Compare
Greptile SummaryThis PR introduces the ALM (Audio Language Model) data curation pipeline for NeMo Curator, adding four new stages ( Key observations:
Confidence Score: 2/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Driver
participant FPS as FilePartitioningStage
participant AMRS as ALMManifestReaderStage
participant ALMB as ALMDataBuilderStage
participant ALMO as ALMDataOverlapStage
participant ALMW as ALMManifestWriterStage
Driver->>FPS: EmptyTask (manifest_path)
FPS-->>AMRS: FileGroupTask (list of .jsonl paths)
AMRS-->>ALMB: AudioBatch per entry (1 entry each, fan-out)
Note over AMRS,ALMB: One AudioBatch per JSONL line
ALMB-->>ALMO: AudioBatch with windows[] + stats{}
Note over ALMB,ALMO: Windows filtered by SR/BW/speakers/duration
ALMO-->>ALMW: AudioBatch with filtered_windows[] + filtered_dur
Note over ALMO,ALMW: Overlapping windows removed, closest to target kept
ALMW-->>Driver: FileGroupTask (output .jsonl path)
|
ayushdg
left a comment
There was a problem hiding this comment.
Few comments:
- Can you add a benchmarking script to benchmarks and share a representative dataset that can be used to run an alm pipeline.
- You are already logging many statistics in the stages here, is it possible to also use
_log_metricslike done in some of the text stages to log some of these timing metrics so that they can be tracked better to catch regressions?
https://github.com/mohammadaaftabv/Curator/tree/alm_data_build/tests/fixtures/audio/alm is the representative dataset and i am assuming by benchmarks you mean result of running both processors on the representative data, in that case alm data build should build 181 windows based on config in test file and alm data overlap applied on resultant 181 windows with allowing max 50% overlap will give 3035.5 seconds total output. All this is in test cases here. |
Added _log_metrics calls to both stages, following the pattern in text stages. Now tracking:
|
|
|
||
| # Calculate statistics | ||
| # Stage 1 output: total_dur_list_window contains the original window count | ||
| stage1_windows = sum(len(e.get("total_dur_list_window", e.get("windows", []))) for e in output_entries) |
There was a problem hiding this comment.
I guess these make sense, but also take a look at Task._metadata and Task._stage_perf_stats if there are things that are relevant
| self._drop_fields_set = {f.strip() for f in self.drop_fields.split(",") if f.strip()} | ||
| self._drop_fields_top_level_set = {f.strip() for f in self.drop_fields_top_level.split(",") if f.strip()} | ||
|
|
||
| def process_dataset_entry(self, data_entry: dict[str, Any]) -> list[AudioBatch]: |
There was a problem hiding this comment.
Is it intentional that we operate on a single manifest entry at a time? Can any of this be vectorized? Same for other stages
There was a problem hiding this comment.
Yes, this is intentional — it follows the LegacySpeechStage pattern used by all other audio stages (GetAudioDurationStage, PreserveByValueStage, etc.), where process() iterates over task.data and calls process_dataset_entry() per entry.
Parallelism is handled at the executor level instead. In benchmark testing (10,000 entries, XennaExecutor on 8-core i9-9900KF), the autoscaler allocated 4 workers to the Builder stage, achieving ~1,460 entries/sec aggregate throughput (365 entries/sec/worker) with 86% CPU utilization. The Overlap stage ran 3 workers at ~5,650 entries/sec. Full pipeline completed in 90s.
If we want batch-level optimization in the future, it would need to happen at the LegacySpeechStage base class level, which would affect all audio stages.
| } | ||
| ) | ||
|
|
||
| return [AudioBatch(data=[result])] |
There was a problem hiding this comment.
Each time we return a Task you must pass its parents tasks metadata and stage_perf_stats..
In such a fan-out implementation this becomes hard to reason about..
There was a problem hiding this comment.
Yeah the _stage_perfs are supposedly propagated via the base LegacySpeechStage. I would be curious to look at the benchmark results for this PR though to get an even better understanding of how existing audio curation code can be refactored.
- Add benchmarking script (alm_pipeline_benchmark.py) with repeat-factor for scale testing, verified end-to-end in Docker (10K entries, 90s) - Add alm-benchmark.yaml config for the benchmarking framework - Reuse create_pipeline_from_yaml from nemo_curator.config.run (rename processors -> stages in YAML and tutorial) - Add TaskPerfUtils per-stage stats to main.py output - Remove unnecessary `from __future__ import annotations` - Propagate parent Task._metadata and _stage_perf in both stages - Refactor alm_data_builder: extract BuilderStats dataclass, 4 helper functions (_get_bandwidth, _compute_speaker_durations, _truncate_segment, _record_window_loss), remove noqa suppressions - Refactor alm_data_overlap: extract 9 methods as module-level functions, class now only contains process/entry/filter logic - Restructure tests into class-based pattern (TestX + TestXIntegration) - Move shared fixtures to tests/stages/audio/alm/conftest.py - Update README with benchmarking section including results and machine specs Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Move manifest I/O from the driver to a worker by adding ALMManifestReaderStage (ProcessingStage[_EmptyTask, AudioBatch]). This follows the Developers Guide recommendation to keep the driver lightweight, matching the pattern used by FilePartitioningStage and CreateInitialManifestFleursStage. - New stage reads JSONL on the worker with fsspec (cloud-path compatible) - Sets is_fanout_stage=True and num_workers_per_node=1 per guide - Pipeline YAML now has 3 stages: reader -> builder -> overlap - main.py no longer loads manifest on driver or passes initial_tasks - Removed load_manifest() and AudioBatch import from main.py Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
…ord_window_loss Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
- Fix README: use stages.0.manifest_path instead of input_manifest, correct stage indices (0=reader, 1=builder, 2=overlap), update expected output and benchmark results to match actual values - Fix overlap stage: stop overwriting windows field with filtered results so builder window count is preserved (181 vs 25) - Fix main.py: remove duplicate manual stats, use only per-stage metrics from TaskPerfUtils as single source of truth - Fix alm-benchmark.yaml: add required model_weights_path field - Fix builder: remove redundant bandwidth check in truncation block Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
- Refactor LegacySpeechStage.process() to propagate _metadata and _stage_perf, removing process() overrides from ALM stages - Add ALMManifestWriterStage (single-worker, append-safe JSONL writer) - Support list[str] manifest_path in ALMManifestReaderStage - Promote manifest_path to top-level pipeline.yaml argument - Move ALM benchmark into nightly-benchmark.yaml, delete alm-benchmark.yaml - Move soundfile dep to pyproject.toml audio_cpu group, delete requirements.txt - Empty tests/__init__.py, inline FIXTURE_PATH, remove test_default_values - Update README install instructions and pipeline docs Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
…rk updates Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: V Mohammad Aaftab <aaftabv@nvidia.com> Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
…tage Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
…uppressions, /tmp usage Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Introduces _RepeatEntriesStage that fans out entries in-memory after the manifest reader, avoiding redundant file I/O. Updates benchmark results accordingly. Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
- Fix RUF003: replace unicode multiplication sign with ASCII x in test comments - Fix I001: reorder imports in benchmark script - Auto-format 6 files with ruff format - Merge upstream/main and regenerate uv.lock - alm_data_overlap: use entry.copy() in empty-windows path, add 4 missing fields - alm_manifest_reader: fix cumulative log to show per-manifest count - Update benchmark results Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
soundfile is only used by GetAudioDurationStage, not by LegacySpeechStage. Moving the import inside the method prevents ModuleNotFoundError when Ray workers install the base package without audio extras. Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
Pre-initialize Ray via shared_ray_client fixture so XennaExecutor ray.init(ignore_reinit_error=True) becomes a no-op -- workers use the cluster default env which has all extras installed. Also restores soundfile import to module level in common.py. Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
…st_common.py Ray auto-packages the working directory when connecting to a cluster, creating a fresh worker venv with only base deps (no soundfile). Move import soundfile inside GetAudioDurationStage.process_dataset_entry so it is only imported at runtime, not at module load. Fix mock path in test_common.py to patch soundfile.SoundFileError directly. Remove ineffective shared_ray_client fixture from integration test. Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
|
/ok to test 64a0281 |
The multimodal_mint1t dataset and both multimodal_mint1t_xenna/materialize benchmark entries were accidentally removed during a rebase conflict. Restores them and keeps the new alm_pipeline_xenna entry appended at the end. Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
|
/ok to test 111827a |
_RepeatEntriesStage and ALMManifestReaderStage were passing _stage_perf by direct reference when creating multiple output tasks, causing all siblings to share the same list. Downstream stage perf entries would accumulate across all tasks, producing misleading metrics and bloated tasks.pkl files. Changed to list(task._stage_perf) to give each child an independent copy, matching LegacySpeechStage behavior. Signed-off-by: aaftaabv@gmail.com <aaftaabv@gmail.com>
|
/ok to test 8f6b28c |
Add new NeMo Curator stages for ALM (Audio Language Model) data curation:
Add complete tutorial with:
Tested with sample data:
Description
Usage
# Add snippet demonstrating usageChecklist