Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions nemo_curator_semantic_dedup/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# NeMo Curator Image Deduplication Example
# Uses CUDA 12.8 for GPU-accelerated processing
FROM anyscale/ray:2.54.0-slim-py312-cu128

# Note: Cache busting for git clone is done via CURATOR_CACHE_BUST arg below

# Install system dependencies
RUN sudo apt-get update && \
sudo apt-get install -y --no-install-recommends \
build-essential \
unzip \
wget \
curl \
git && \
sudo apt-get clean && \
sudo rm -rf /var/lib/apt/lists/*

# Install uv for fast package management
RUN curl -LsSf https://astral.sh/uv/install.sh | sh

# Install Python dependencies
# Use uv pip install --system to install into the base anaconda environment
# so all Ray workers (not just the driver) have these packages
RUN python -m pip install --upgrade pip setuptools wheel

# IMPORTANT: Uninstall any pre-existing RAPIDS/cuML packages from the base image
# The base image may have incompatible versions that conflict with scikit-learn
RUN python -m pip uninstall -y cuml-cu12 cudf-cu12 cugraph-cu12 pylibraft-cu12 raft-dask-cu12 rmm-cu12 || true && \
echo "Cleaned up pre-existing RAPIDS packages"

# Install NeMo-Curator from the fork branch with CUDA dependencies (DALI, cuML, RAFT, etc.)
ARG CURATOR_REPO=https://github.com/avigyabb/Curator.git
ARG CURATOR_REF=avi-test
RUN git clone --depth 1 -b ${CURATOR_REF} ${CURATOR_REPO} /home/ray/NeMo-Curator && \
uv pip install --system /home/ray/NeMo-Curator[image_cuda12]

# Re-upgrade scikit-learn AFTER nemo-curator in case it was downgraded
# cuML 25.6.* needs sklearn >= 1.5 (has _get_default_requests)
RUN uv pip install --system "scikit-learn>=1.5,<1.6" && \
python -c "import sklearn; print(f'Final scikit-learn version: {sklearn.__version__}')"

# Additional dependencies for image downloading and processing
RUN uv pip install --system \
loguru \
Pillow \
aiohttp \
tqdm \
pandas \
pyarrow \
huggingface_hub \
transformers \
webdataset \
pydantic-settings

# Set environment variable for model directory
ENV MODEL_DIR=/home/ray/model_weights

# NCCL diagnostic output — baked into the image so every worker node has it.
# (job.yaml env_vars only reach the driver; these ENV lines reach all nodes.)
ENV NCCL_DEBUG=WARN

# Pre-download CLIP model weights into the image so workers never race on download
RUN python -c "\
from nemo_curator.models.clip import CLIPImageEmbeddings; \
CLIPImageEmbeddings.download_weights_on_node('/home/ray/model_weights')"

# Create output directories
RUN mkdir -p /home/ray/data/webdataset \
/home/ray/data/results \
/home/ray/data/embeddings \
/home/ray/data/removal_ids

124 changes: 124 additions & 0 deletions nemo_curator_semantic_dedup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Image Semantic Deduplication with NeMo Curator

This example uses [NVIDIA NeMo Curator](https://github.com/NVIDIA-NeMo/Curator) to perform GPU-accelerated semantic deduplication on image datasets. It reads image URLs from a HuggingFace parquet dataset, downloads them into [WebDataset](https://github.com/webdataset/webdataset) tar shards, generates CLIP embeddings on GPUs, clusters them with K-means + DBSCAN to find near-duplicates, and writes a clean deduplicated dataset.

## Install the Anyscale CLI

(required version = 0.26.82+)

```bash
pip install -U anyscale
anyscale login
```

## Run the job

Clone the example from GitHub.

```bash
git clone https://github.com/anyscale/examples.git
cd examples/nemo_curator_semantic_dedup
```

Submit the job. Use `--env` to forward your HuggingFace token for dataset access.

```bash
anyscale job submit -f job.yaml --env HF_TOKEN=$HF_TOKEN
```

## How it works

The entrypoint defined in [job.yaml](./job.yaml) runs [image_dedup_example.py](./image_dedup_example.py), which executes four steps:

```
HuggingFace parquet (URLs + captions)
Step 1: Download images → WebDataset tar shards
Step 2: Generate CLIP embeddings (GPU)
Step 3: Semantic deduplication (K-means + DBSCAN)
Step 4: Write deduplicated dataset (new tar shards without duplicates)
```

### Step 1: Parquet to WebDataset

`main()` calls `parquet_to_webdataset_ray()` in [helper.py](./helper.py), which builds a Ray Data pipeline to download images and pack them into WebDataset tar shards:

```
read_parquet (HF) → repartition → map_batches(download) → flat_map(validate) → map_batches(write_tar)
```

| Ray Data operator | Function | What it does |
|-------------------|----------|-------------|
| `read_parquet` | — | Lazily reads `url` and `caption` columns from HuggingFace via `HfFileSystem`. |
| `repartition` | — | Splits large parquet blocks (~millions of rows) into ~1000-row blocks so Ray can parallelize downstream work across the cluster. |
| `map_batches` | `image_download_batch` | Downloads images in batches of 100. Within each Ray task, a `ThreadPoolExecutor` with 100 threads downloads URLs concurrently — so you get parallelism at two levels: Ray distributes batches across cluster CPUs, and threads parallelize I/O within each batch. |
| `flat_map` | `process_image` | Validates each image with Pillow (`.verify()`), converts to RGB JPEG, and drops failures by returning `[]`. Handles all image modes (RGBA, palette, grayscale, CMYK) by compositing onto a white background. |
| `map_batches` | `write_tar_batch` | Packs `ENTRIES_PER_TAR` images (default 500) into a single `.tar` shard on cluster storage. Each shard gets a unique name based on node ID + UUID to avoid collisions when multiple nodes write simultaneously. |

The entire pipeline streams end-to-end — Ray Data handles backpressure so fast stages don't overwhelm slow ones, and data flows through without loading the full dataset into memory. `.take_all()` at the end triggers execution.

### Step 2: Image embedding pipeline

`create_image_embedding_pipeline()` builds a NeMo Curator `Pipeline` with five stages:

| Stage | What it does |
|-------|-------------|
| `FilePartitioningStage` | Discovers `.tar` files and groups them into partitions (`tar_files_per_partition=1`). Not a bottleneck at this scale — GPU throughput on the embedding stage is. |
Comment thread
avigyabb marked this conversation as resolved.
| `ImageReaderStage` | Reads images from tar shards. DALI decodes JPEGs on the GPU. |
| `ImageEmbeddingStage` | Runs OpenAI CLIP ViT-L/14 to produce 768-dimensional embeddings for each image. |
| `ConvertImageBatchToDocumentBatchStage` | Converts from `ImageBatch` (numpy pixel arrays) to `DocumentBatch` (DataFrames), keeping only `image_id` and `embedding`. |
| `ParquetWriter` | Saves embeddings to parquet files on cluster storage. |

This pipeline runs on a `RayDataExecutor`, which treats the stages as a streaming dataflow — data flows through one batch at a time in a producer-consumer pattern. It's well suited for stages that process data independently without coordination between workers.

### Step 3: Semantic deduplication workflow

`SemanticDeduplicationWorkflow` is where the actual deduplication happens:

1. **K-means clustering** (`n_clusters=100`) — groups similar embeddings together using RAFT/NCCL across all GPU actors.
2. **DBSCAN within clusters** (`eps=0.01`) — finds near-duplicate pairs based on cosine distance within each cluster.

This step runs on a `RayActorPoolExecutor`, which creates a fixed pool of long-lived Ray actors, each holding a GPU. This is needed because K-means is iterative: each actor holds a shard of the embeddings plus the current 100 cluster centroids in GPU memory, and on every iteration all actors reassign points, compute partial centroid sums, then synchronize via NCCL to update the shared centroids. That state (centroids + assignments) must persist across iterations until convergence — a stateless streaming executor, which processes each batch independently and forgets it, can't do that.

### Step 4: Write deduplicated dataset

`create_image_deduplication_pipeline()` re-reads the original tar shards, filters out images whose IDs appear in the removal list, and writes new tar shards. You can't selectively remove files from a tar archive — tar is a flat concatenation of file records with no editable index — so writing new shards without the duplicates is the only option. `ImageWriterStage` re-encodes each image's numpy pixel array back to JPEG with Pillow, packs them into new `.tar` files (up to 1000 images per tar), and writes a companion `.parquet` with metadata (`image_id`, `tar_file`, `member_name`, `original_path`).

## Cluster storage

All intermediate and output data lives under `/mnt/cluster_storage/`, a shared network filesystem (backed by S3) that is automatically mounted on every node in the cluster. This is necessary because Steps 2, 3, and 4 run as separate pipeline executions with different executors (`RayDataExecutor` for streaming stages, `RayActorPoolExecutor` for K-means). When one step finishes, its data is no longer in memory — the next step reads it back from disk. A shared filesystem ensures any node can read what any other node wrote, regardless of which step produced it.

| Step | Directory | Contents |
|------|-----------|----------|
| 1 | `webdataset/` | WebDataset `.tar` files (downloaded images) |
| 2 | `embeddings/` | Parquet files with `image_id` + embedding vectors |
| 3 | `removal_ids/` | K-means results, pairwise results, and `duplicates/` parquet listing IDs to remove |
| 4 | `results/` | Final deduplicated WebDataset `.tar` files |

## Configuration

All configuration is done through environment variables in [job.yaml](./job.yaml). Override them at submit time:

```bash
anyscale job submit -f job.yaml \
--env HF_TOKEN=$HF_TOKEN
```

The [Dockerfile](./Dockerfile) builds a custom image with NeMo Curator's CUDA dependencies (`nemo-curator[image_cuda12]`), including DALI, cuML, and RAFT. The NeMo Curator Python package itself is overridden at runtime via `PYTHONPATH=Curator` in job.yaml, which points to a local `Curator/` directory uploaded with the working directory.

## View the job

View the job in the [jobs tab](https://console.anyscale.com/jobs) of the Anyscale console.

## Learn more

- [NeMo Curator Documentation](https://docs.nvidia.com/nemo/curator/latest/)
- [NeMo Curator Image Tutorials](https://github.com/NVIDIA-NeMo/Curator/tree/main/tutorials/image/getting-started)
- [Anyscale Jobs Documentation](https://docs.anyscale.com/platform/jobs/)
172 changes: 172 additions & 0 deletions nemo_curator_semantic_dedup/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helper functions for downloading and preparing image datasets as WebDataset tar files."""

from __future__ import annotations

import io
import os
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any
import requests
from loguru import logger
from PIL import Image
import webdataset as wds


def download_single_image(url: str, session: requests.Session) -> bytes | None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you try the ray download expr and bench the perf, there is a major update to this feature ray-project/ray#61735 that recently got merge that you should try

"""Download a single image, returning bytes or None on failure."""
try:
response = session.get(url, timeout=5, stream=True)
return response.content if response.status_code == 200 else None
except Exception:
return None


def image_download_batch(batch: dict[str, Any]) -> dict[str, Any]:
"""Download a batch of images using ThreadPoolExecutor for parallelism."""
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=100, pool_maxsize=100, max_retries=0,
)
session.mount("http://", adapter)
session.mount("https://", adapter)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ray download expr and compare with this


# Use ThreadPoolExecutor for parallel downloads within this batch
# 50 threads means 50 concurrent downloads per Ray task
with ThreadPoolExecutor(max_workers=100) as executor:
batch["bytes"] = list(executor.map(lambda url: download_single_image(url, session), batch["url"]))

return batch


def process_image(row: dict[str, Any]) -> list[dict[str, Any]]:
Comment thread
avigyabb marked this conversation as resolved.
"""Validate downloaded image bytes, convert to JPEG, and drop failures.

Returns a single-element list on success or an empty list to drop the row.
Designed for use with Ray Data's flat_map.
"""
image_bytes = row.get("bytes")
if not image_bytes:
return []

try:
img = Image.open(io.BytesIO(image_bytes))
img.verify()
img = Image.open(io.BytesIO(image_bytes))

# Robust RGB conversion for ALL image modes (L, LA, P, PA, RGBA, CMYK, etc.)
# This ensures CLIP gets 3-channel images
if img.mode != "RGB":
if img.mode == "P":
img = img.convert("RGBA")
# For any mode with alpha, composite onto white background
if img.mode in ("RGBA", "LA", "PA"):
background = Image.new("RGB", img.size, (255, 255, 255))
# Use alpha channel as mask
if img.mode == "LA":
img = img.convert("RGBA")
background.paste(img, mask=img.split()[-1])
img = background
else:
img = img.convert("RGB")

if img.mode != "RGB" or img.size[0] < 3 or img.size[1] < 3:
return []

jpeg_buffer = io.BytesIO()
img.save(jpeg_buffer, format="JPEG", quality=95)
row["jpeg_bytes"] = jpeg_buffer.getvalue()
return [row]
except Exception:
return []


def write_tar_batch(batch: dict[str, Any], output_dir: str) -> dict[str, Any]:
Comment thread
avigyabb marked this conversation as resolved.
"""Write a batch of images to a WebDataset tar shard."""
import ray

node_id = ray.get_runtime_context().get_node_id()[:8]
shard_id = f"{node_id}_{uuid.uuid4().hex[:8]}"
tar_path = os.path.join(output_dir, f"{shard_id}.tar")

urls = batch["url"]
captions = batch["caption"]
jpeg_list = batch["jpeg_bytes"]
num_images = len(urls)

with wds.TarWriter(tar_path) as sink:
for i in range(num_images):
sink.write({
"__key__": f"{shard_id}_{i:06d}",
"jpg": jpeg_list[i],
"txt": str(captions[i]),
"json": {"url": urls[i], "caption": captions[i]},
})

return {"shard_id": [shard_id], "success_count": [num_images], "total_count": [num_images]}


def parquet_to_webdataset_ray(
hf_dataset_path: str,
output_dir: str,
entries_per_tar: int = 1000,
max_entries: int | None = None,
concurrency: int | None = None,
) -> dict[str, int]:
"""Convert HuggingFace parquet dataset to WebDataset tar files using Ray Data."""
import ray
import ray.data
from functools import partial
from huggingface_hub import HfFileSystem

os.makedirs(output_dir, exist_ok=True)

ds = ray.data.read_parquet(
hf_dataset_path,
file_extensions=["parquet"],
columns=["url", "caption"],
filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]),
concurrency=10,
Copy link
Copy Markdown
Contributor

@xyuzh xyuzh Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the concurrency is 10, don't you pass concurrency as a param?

)

if max_entries is not None:
ds = ds.limit(max_entries)
ds = ds.repartition(num_blocks=max(100, max_entries // 1000))

if concurrency is None:
cluster_resources = ray.cluster_resources()
concurrency = max(4, int(cluster_resources.get("CPU", 4)))

# Download images, validate, convert to JPEG
ds = ds.map_batches(image_download_batch, batch_size=100, batch_format="numpy")
ds = ds.flat_map(process_image)

# Write tar shards
results = ds.map_batches(
partial(write_tar_batch, output_dir=output_dir),
batch_size=entries_per_tar,
batch_format="numpy",
concurrency=concurrency,
).take_all()
Comment thread
avigyabb marked this conversation as resolved.

total_success = sum(r["success_count"] for r in results)
num_shards = len(results)
total_attempted = max_entries if max_entries is not None else total_success
success_rate = (total_success / total_attempted * 100) if total_attempted > 0 else 0
logger.info(f"Download complete: {total_success} images in {num_shards} shards ({success_rate:.1f}% success rate)")

return {"total_success": total_success, "total_attempted": total_attempted, "num_shards": num_shards}
Loading