-
Notifications
You must be signed in to change notification settings - Fork 7
Nemo curator clean #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1cc4cfc
a365047
1ac48ed
cff3de0
c03c70d
4780560
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| # NeMo Curator Image Deduplication Example | ||
| # Uses CUDA 12.8 for GPU-accelerated processing | ||
| FROM anyscale/ray:2.54.0-slim-py312-cu128 | ||
|
|
||
| # Note: Cache busting for git clone is done via CURATOR_CACHE_BUST arg below | ||
|
|
||
| # Install system dependencies | ||
| RUN sudo apt-get update && \ | ||
| sudo apt-get install -y --no-install-recommends \ | ||
| build-essential \ | ||
| unzip \ | ||
| wget \ | ||
| curl \ | ||
| git && \ | ||
| sudo apt-get clean && \ | ||
| sudo rm -rf /var/lib/apt/lists/* | ||
|
|
||
| # Install uv for fast package management | ||
| RUN curl -LsSf https://astral.sh/uv/install.sh | sh | ||
|
|
||
| # Install Python dependencies | ||
| # Use uv pip install --system to install into the base anaconda environment | ||
| # so all Ray workers (not just the driver) have these packages | ||
| RUN python -m pip install --upgrade pip setuptools wheel | ||
|
|
||
| # IMPORTANT: Uninstall any pre-existing RAPIDS/cuML packages from the base image | ||
| # The base image may have incompatible versions that conflict with scikit-learn | ||
| RUN python -m pip uninstall -y cuml-cu12 cudf-cu12 cugraph-cu12 pylibraft-cu12 raft-dask-cu12 rmm-cu12 || true && \ | ||
| echo "Cleaned up pre-existing RAPIDS packages" | ||
|
|
||
| # Install NeMo-Curator from the fork branch with CUDA dependencies (DALI, cuML, RAFT, etc.) | ||
| ARG CURATOR_REPO=https://github.com/avigyabb/Curator.git | ||
| ARG CURATOR_REF=avi-test | ||
| RUN git clone --depth 1 -b ${CURATOR_REF} ${CURATOR_REPO} /home/ray/NeMo-Curator && \ | ||
| uv pip install --system /home/ray/NeMo-Curator[image_cuda12] | ||
|
|
||
| # Re-upgrade scikit-learn AFTER nemo-curator in case it was downgraded | ||
| # cuML 25.6.* needs sklearn >= 1.5 (has _get_default_requests) | ||
| RUN uv pip install --system "scikit-learn>=1.5,<1.6" && \ | ||
| python -c "import sklearn; print(f'Final scikit-learn version: {sklearn.__version__}')" | ||
|
|
||
| # Additional dependencies for image downloading and processing | ||
| RUN uv pip install --system \ | ||
| loguru \ | ||
| Pillow \ | ||
| aiohttp \ | ||
| tqdm \ | ||
| pandas \ | ||
| pyarrow \ | ||
| huggingface_hub \ | ||
| transformers \ | ||
| webdataset \ | ||
| pydantic-settings | ||
|
|
||
| # Set environment variable for model directory | ||
| ENV MODEL_DIR=/home/ray/model_weights | ||
|
|
||
| # NCCL diagnostic output — baked into the image so every worker node has it. | ||
| # (job.yaml env_vars only reach the driver; these ENV lines reach all nodes.) | ||
| ENV NCCL_DEBUG=WARN | ||
|
|
||
| # Pre-download CLIP model weights into the image so workers never race on download | ||
| RUN python -c "\ | ||
| from nemo_curator.models.clip import CLIPImageEmbeddings; \ | ||
| CLIPImageEmbeddings.download_weights_on_node('/home/ray/model_weights')" | ||
|
|
||
| # Create output directories | ||
| RUN mkdir -p /home/ray/data/webdataset \ | ||
| /home/ray/data/results \ | ||
| /home/ray/data/embeddings \ | ||
| /home/ray/data/removal_ids | ||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,124 @@ | ||
| # Image Semantic Deduplication with NeMo Curator | ||
|
|
||
| This example uses [NVIDIA NeMo Curator](https://github.com/NVIDIA-NeMo/Curator) to perform GPU-accelerated semantic deduplication on image datasets. It reads image URLs from a HuggingFace parquet dataset, downloads them into [WebDataset](https://github.com/webdataset/webdataset) tar shards, generates CLIP embeddings on GPUs, clusters them with K-means + DBSCAN to find near-duplicates, and writes a clean deduplicated dataset. | ||
|
|
||
| ## Install the Anyscale CLI | ||
|
|
||
| (required version = 0.26.82+) | ||
|
|
||
| ```bash | ||
| pip install -U anyscale | ||
| anyscale login | ||
| ``` | ||
|
|
||
| ## Run the job | ||
|
|
||
| Clone the example from GitHub. | ||
|
|
||
| ```bash | ||
| git clone https://github.com/anyscale/examples.git | ||
| cd examples/nemo_curator_semantic_dedup | ||
| ``` | ||
|
|
||
| Submit the job. Use `--env` to forward your HuggingFace token for dataset access. | ||
|
|
||
| ```bash | ||
| anyscale job submit -f job.yaml --env HF_TOKEN=$HF_TOKEN | ||
| ``` | ||
|
|
||
| ## How it works | ||
|
|
||
| The entrypoint defined in [job.yaml](./job.yaml) runs [image_dedup_example.py](./image_dedup_example.py), which executes four steps: | ||
|
|
||
| ``` | ||
| HuggingFace parquet (URLs + captions) | ||
| │ | ||
| ▼ | ||
| Step 1: Download images → WebDataset tar shards | ||
| │ | ||
| ▼ | ||
| Step 2: Generate CLIP embeddings (GPU) | ||
| │ | ||
| ▼ | ||
| Step 3: Semantic deduplication (K-means + DBSCAN) | ||
| │ | ||
| ▼ | ||
| Step 4: Write deduplicated dataset (new tar shards without duplicates) | ||
| ``` | ||
|
|
||
| ### Step 1: Parquet to WebDataset | ||
|
|
||
| `main()` calls `parquet_to_webdataset_ray()` in [helper.py](./helper.py), which builds a Ray Data pipeline to download images and pack them into WebDataset tar shards: | ||
|
|
||
| ``` | ||
| read_parquet (HF) → repartition → map_batches(download) → flat_map(validate) → map_batches(write_tar) | ||
| ``` | ||
|
|
||
| | Ray Data operator | Function | What it does | | ||
| |-------------------|----------|-------------| | ||
| | `read_parquet` | — | Lazily reads `url` and `caption` columns from HuggingFace via `HfFileSystem`. | | ||
| | `repartition` | — | Splits large parquet blocks (~millions of rows) into ~1000-row blocks so Ray can parallelize downstream work across the cluster. | | ||
| | `map_batches` | `image_download_batch` | Downloads images in batches of 100. Within each Ray task, a `ThreadPoolExecutor` with 100 threads downloads URLs concurrently — so you get parallelism at two levels: Ray distributes batches across cluster CPUs, and threads parallelize I/O within each batch. | | ||
| | `flat_map` | `process_image` | Validates each image with Pillow (`.verify()`), converts to RGB JPEG, and drops failures by returning `[]`. Handles all image modes (RGBA, palette, grayscale, CMYK) by compositing onto a white background. | | ||
| | `map_batches` | `write_tar_batch` | Packs `ENTRIES_PER_TAR` images (default 500) into a single `.tar` shard on cluster storage. Each shard gets a unique name based on node ID + UUID to avoid collisions when multiple nodes write simultaneously. | | ||
|
|
||
| The entire pipeline streams end-to-end — Ray Data handles backpressure so fast stages don't overwhelm slow ones, and data flows through without loading the full dataset into memory. `.take_all()` at the end triggers execution. | ||
|
|
||
| ### Step 2: Image embedding pipeline | ||
|
|
||
| `create_image_embedding_pipeline()` builds a NeMo Curator `Pipeline` with five stages: | ||
|
|
||
| | Stage | What it does | | ||
| |-------|-------------| | ||
| | `FilePartitioningStage` | Discovers `.tar` files and groups them into partitions (`tar_files_per_partition=1`). Not a bottleneck at this scale — GPU throughput on the embedding stage is. | | ||
| | `ImageReaderStage` | Reads images from tar shards. DALI decodes JPEGs on the GPU. | | ||
| | `ImageEmbeddingStage` | Runs OpenAI CLIP ViT-L/14 to produce 768-dimensional embeddings for each image. | | ||
| | `ConvertImageBatchToDocumentBatchStage` | Converts from `ImageBatch` (numpy pixel arrays) to `DocumentBatch` (DataFrames), keeping only `image_id` and `embedding`. | | ||
| | `ParquetWriter` | Saves embeddings to parquet files on cluster storage. | | ||
|
|
||
| This pipeline runs on a `RayDataExecutor`, which treats the stages as a streaming dataflow — data flows through one batch at a time in a producer-consumer pattern. It's well suited for stages that process data independently without coordination between workers. | ||
|
|
||
| ### Step 3: Semantic deduplication workflow | ||
|
|
||
| `SemanticDeduplicationWorkflow` is where the actual deduplication happens: | ||
|
|
||
| 1. **K-means clustering** (`n_clusters=100`) — groups similar embeddings together using RAFT/NCCL across all GPU actors. | ||
| 2. **DBSCAN within clusters** (`eps=0.01`) — finds near-duplicate pairs based on cosine distance within each cluster. | ||
|
|
||
| This step runs on a `RayActorPoolExecutor`, which creates a fixed pool of long-lived Ray actors, each holding a GPU. This is needed because K-means is iterative: each actor holds a shard of the embeddings plus the current 100 cluster centroids in GPU memory, and on every iteration all actors reassign points, compute partial centroid sums, then synchronize via NCCL to update the shared centroids. That state (centroids + assignments) must persist across iterations until convergence — a stateless streaming executor, which processes each batch independently and forgets it, can't do that. | ||
|
|
||
| ### Step 4: Write deduplicated dataset | ||
|
|
||
| `create_image_deduplication_pipeline()` re-reads the original tar shards, filters out images whose IDs appear in the removal list, and writes new tar shards. You can't selectively remove files from a tar archive — tar is a flat concatenation of file records with no editable index — so writing new shards without the duplicates is the only option. `ImageWriterStage` re-encodes each image's numpy pixel array back to JPEG with Pillow, packs them into new `.tar` files (up to 1000 images per tar), and writes a companion `.parquet` with metadata (`image_id`, `tar_file`, `member_name`, `original_path`). | ||
|
|
||
| ## Cluster storage | ||
|
|
||
| All intermediate and output data lives under `/mnt/cluster_storage/`, a shared network filesystem (backed by S3) that is automatically mounted on every node in the cluster. This is necessary because Steps 2, 3, and 4 run as separate pipeline executions with different executors (`RayDataExecutor` for streaming stages, `RayActorPoolExecutor` for K-means). When one step finishes, its data is no longer in memory — the next step reads it back from disk. A shared filesystem ensures any node can read what any other node wrote, regardless of which step produced it. | ||
|
|
||
| | Step | Directory | Contents | | ||
| |------|-----------|----------| | ||
| | 1 | `webdataset/` | WebDataset `.tar` files (downloaded images) | | ||
| | 2 | `embeddings/` | Parquet files with `image_id` + embedding vectors | | ||
| | 3 | `removal_ids/` | K-means results, pairwise results, and `duplicates/` parquet listing IDs to remove | | ||
| | 4 | `results/` | Final deduplicated WebDataset `.tar` files | | ||
|
|
||
| ## Configuration | ||
|
|
||
| All configuration is done through environment variables in [job.yaml](./job.yaml). Override them at submit time: | ||
|
|
||
| ```bash | ||
| anyscale job submit -f job.yaml \ | ||
| --env HF_TOKEN=$HF_TOKEN | ||
| ``` | ||
|
|
||
| The [Dockerfile](./Dockerfile) builds a custom image with NeMo Curator's CUDA dependencies (`nemo-curator[image_cuda12]`), including DALI, cuML, and RAFT. The NeMo Curator Python package itself is overridden at runtime via `PYTHONPATH=Curator` in job.yaml, which points to a local `Curator/` directory uploaded with the working directory. | ||
|
|
||
| ## View the job | ||
|
|
||
| View the job in the [jobs tab](https://console.anyscale.com/jobs) of the Anyscale console. | ||
|
|
||
| ## Learn more | ||
|
|
||
| - [NeMo Curator Documentation](https://docs.nvidia.com/nemo/curator/latest/) | ||
| - [NeMo Curator Image Tutorials](https://github.com/NVIDIA-NeMo/Curator/tree/main/tutorials/image/getting-started) | ||
| - [Anyscale Jobs Documentation](https://docs.anyscale.com/platform/jobs/) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| # Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Helper functions for downloading and preparing image datasets as WebDataset tar files.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import io | ||
| import os | ||
| import uuid | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from typing import Any | ||
| import requests | ||
| from loguru import logger | ||
| from PIL import Image | ||
| import webdataset as wds | ||
|
|
||
|
|
||
| def download_single_image(url: str, session: requests.Session) -> bytes | None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you try the ray download expr and bench the perf, there is a major update to this feature ray-project/ray#61735 that recently got merge that you should try |
||
| """Download a single image, returning bytes or None on failure.""" | ||
| try: | ||
| response = session.get(url, timeout=5, stream=True) | ||
| return response.content if response.status_code == 200 else None | ||
| except Exception: | ||
| return None | ||
|
|
||
|
|
||
| def image_download_batch(batch: dict[str, Any]) -> dict[str, Any]: | ||
| """Download a batch of images using ThreadPoolExecutor for parallelism.""" | ||
| session = requests.Session() | ||
| adapter = requests.adapters.HTTPAdapter( | ||
| pool_connections=100, pool_maxsize=100, max_retries=0, | ||
| ) | ||
| session.mount("http://", adapter) | ||
| session.mount("https://", adapter) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use ray download expr and compare with this |
||
|
|
||
| # Use ThreadPoolExecutor for parallel downloads within this batch | ||
| # 50 threads means 50 concurrent downloads per Ray task | ||
| with ThreadPoolExecutor(max_workers=100) as executor: | ||
| batch["bytes"] = list(executor.map(lambda url: download_single_image(url, session), batch["url"])) | ||
|
|
||
| return batch | ||
|
|
||
|
|
||
| def process_image(row: dict[str, Any]) -> list[dict[str, Any]]: | ||
|
avigyabb marked this conversation as resolved.
|
||
| """Validate downloaded image bytes, convert to JPEG, and drop failures. | ||
|
|
||
| Returns a single-element list on success or an empty list to drop the row. | ||
| Designed for use with Ray Data's flat_map. | ||
| """ | ||
| image_bytes = row.get("bytes") | ||
| if not image_bytes: | ||
| return [] | ||
|
|
||
| try: | ||
| img = Image.open(io.BytesIO(image_bytes)) | ||
| img.verify() | ||
| img = Image.open(io.BytesIO(image_bytes)) | ||
|
|
||
| # Robust RGB conversion for ALL image modes (L, LA, P, PA, RGBA, CMYK, etc.) | ||
| # This ensures CLIP gets 3-channel images | ||
| if img.mode != "RGB": | ||
| if img.mode == "P": | ||
| img = img.convert("RGBA") | ||
| # For any mode with alpha, composite onto white background | ||
| if img.mode in ("RGBA", "LA", "PA"): | ||
| background = Image.new("RGB", img.size, (255, 255, 255)) | ||
| # Use alpha channel as mask | ||
| if img.mode == "LA": | ||
| img = img.convert("RGBA") | ||
| background.paste(img, mask=img.split()[-1]) | ||
| img = background | ||
| else: | ||
| img = img.convert("RGB") | ||
|
|
||
| if img.mode != "RGB" or img.size[0] < 3 or img.size[1] < 3: | ||
| return [] | ||
|
|
||
| jpeg_buffer = io.BytesIO() | ||
| img.save(jpeg_buffer, format="JPEG", quality=95) | ||
| row["jpeg_bytes"] = jpeg_buffer.getvalue() | ||
| return [row] | ||
| except Exception: | ||
| return [] | ||
|
|
||
|
|
||
| def write_tar_batch(batch: dict[str, Any], output_dir: str) -> dict[str, Any]: | ||
|
avigyabb marked this conversation as resolved.
|
||
| """Write a batch of images to a WebDataset tar shard.""" | ||
| import ray | ||
|
|
||
| node_id = ray.get_runtime_context().get_node_id()[:8] | ||
| shard_id = f"{node_id}_{uuid.uuid4().hex[:8]}" | ||
| tar_path = os.path.join(output_dir, f"{shard_id}.tar") | ||
|
|
||
| urls = batch["url"] | ||
| captions = batch["caption"] | ||
| jpeg_list = batch["jpeg_bytes"] | ||
| num_images = len(urls) | ||
|
|
||
| with wds.TarWriter(tar_path) as sink: | ||
| for i in range(num_images): | ||
| sink.write({ | ||
| "__key__": f"{shard_id}_{i:06d}", | ||
| "jpg": jpeg_list[i], | ||
| "txt": str(captions[i]), | ||
| "json": {"url": urls[i], "caption": captions[i]}, | ||
| }) | ||
|
|
||
| return {"shard_id": [shard_id], "success_count": [num_images], "total_count": [num_images]} | ||
|
|
||
|
|
||
| def parquet_to_webdataset_ray( | ||
| hf_dataset_path: str, | ||
| output_dir: str, | ||
| entries_per_tar: int = 1000, | ||
| max_entries: int | None = None, | ||
| concurrency: int | None = None, | ||
| ) -> dict[str, int]: | ||
| """Convert HuggingFace parquet dataset to WebDataset tar files using Ray Data.""" | ||
| import ray | ||
| import ray.data | ||
| from functools import partial | ||
| from huggingface_hub import HfFileSystem | ||
|
|
||
| os.makedirs(output_dir, exist_ok=True) | ||
|
|
||
| ds = ray.data.read_parquet( | ||
| hf_dataset_path, | ||
| file_extensions=["parquet"], | ||
| columns=["url", "caption"], | ||
| filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]), | ||
| concurrency=10, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the concurrency is 10, don't you pass concurrency as a param? |
||
| ) | ||
|
|
||
| if max_entries is not None: | ||
| ds = ds.limit(max_entries) | ||
| ds = ds.repartition(num_blocks=max(100, max_entries // 1000)) | ||
|
|
||
| if concurrency is None: | ||
| cluster_resources = ray.cluster_resources() | ||
| concurrency = max(4, int(cluster_resources.get("CPU", 4))) | ||
|
|
||
| # Download images, validate, convert to JPEG | ||
| ds = ds.map_batches(image_download_batch, batch_size=100, batch_format="numpy") | ||
| ds = ds.flat_map(process_image) | ||
|
|
||
| # Write tar shards | ||
| results = ds.map_batches( | ||
| partial(write_tar_batch, output_dir=output_dir), | ||
| batch_size=entries_per_tar, | ||
| batch_format="numpy", | ||
| concurrency=concurrency, | ||
| ).take_all() | ||
|
avigyabb marked this conversation as resolved.
|
||
|
|
||
| total_success = sum(r["success_count"] for r in results) | ||
| num_shards = len(results) | ||
| total_attempted = max_entries if max_entries is not None else total_success | ||
| success_rate = (total_success / total_attempted * 100) if total_attempted > 0 else 0 | ||
| logger.info(f"Download complete: {total_success} images in {num_shards} shards ({success_rate:.1f}% success rate)") | ||
|
|
||
| return {"total_success": total_success, "total_attempted": total_attempted, "num_shards": num_shards} | ||
Uh oh!
There was an error while loading. Please reload this page.