Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# ⚡ Dask: Faster-than-Pandas (CPU Benchmark)\n",
"\n",
"### **Template Review**\n",
"This template demonstrates how to handle **larger-than-memory** datasets using **Dask** on a standard CPU environment. Optimized for **Saturn Cloud Jupyter Notebooks**, it provides a direct performance comparison between standard Pandas (single-core) and Dask (multi-core parallel processing).\n",
"\n",
"**Core Workflow:** We will generate a massive **2GB+ Synthetic Dataset** locally with complex data types (Dates, Strings). This ensures the workload is heavy enough to make Pandas struggle, highlighting Dask's ability to process data in chunks without crashing memory.\n",
"\n",
"### **Tech Stack**\n",
"* **Dask**: Parallel computing library for scaling Python.\n",
"* **Pandas**: Standard data analysis library (the baseline).\n",
"* **Infrastructure**: [Saturn Cloud](https://saturncloud.io/) CPU Jupyter Instance."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install Dask with complete dependencies (Dashboard + Distributed)\n",
"# Quotes are used to prevent ZSH/Shell errors with brackets\n",
"!pip install \"dask[complete]\" pandas numpy -q"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 1: Generate Massive Synthetic Data (2GB)**\n",
"We generate **20 Million rows** of complex data. We include **Dates** and **String Categories** because these data types are computationally expensive and slow down Pandas significantly."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import os\n",
"from datetime import datetime, timedelta\n",
"\n",
"FILENAME = \"heavy_data.csv\"\n",
"NUM_ROWS = 20_000_000 # 20 Million Rows\n",
"CHUNK_SIZE = 1_000_000 # Process in chunks to save memory during creation\n",
"\n",
"def generate_data():\n",
" print(f\"🔨 Generating {NUM_ROWS} rows (Approx 2GB)... Please wait.\")\n",
" \n",
" # Categories for string complexity (Strings are slower than Ints)\n",
" categories = ['Electronics', 'Furniture', 'Clothing', 'Food', 'Auto']\n",
" start_date = datetime(2020, 1, 1)\n",
" \n",
" # Remove old file if exists\n",
" if os.path.exists(FILENAME):\n",
" os.remove(FILENAME)\n",
" \n",
" for i in range(0, NUM_ROWS, CHUNK_SIZE):\n",
" # Create complex data: Dates + Strings + Floats\n",
" df = pd.DataFrame({\n",
" 'transaction_id': np.arange(i, i + CHUNK_SIZE),\n",
" 'date': [start_date + timedelta(days=x % 365) for x in range(CHUNK_SIZE)],\n",
" 'category': np.random.choice(categories, CHUNK_SIZE),\n",
" 'amount': np.random.uniform(10.0, 500.0, CHUNK_SIZE),\n",
" 'discount': np.random.uniform(0.0, 0.3, CHUNK_SIZE)\n",
" })\n",
" \n",
" # Append to CSV\n",
" mode = 'a' if i > 0 else 'w'\n",
" header = (i == 0)\n",
" df.to_csv(FILENAME, index=False, mode=mode, header=header)\n",
" \n",
" if (i // CHUNK_SIZE) % 5 == 0:\n",
" print(f\"... Written {(i + CHUNK_SIZE) / 1_000_000:.0f} Million rows\")\n",
" \n",
" print(f\"✅ Done! File size: {os.path.getsize(FILENAME) / (1024**3):.2f} GB\")\n",
"\n",
"# Check if file exists and is big enough (>1.5GB)\n",
"if not os.path.exists(FILENAME) or os.path.getsize(FILENAME) < 1.5 * 1024**3:\n",
" generate_data()\n",
"else:\n",
" print(\"✅ Large file already exists. Skipping generation.\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 2: Initialize Dask Client**\n",
"Dask uses a \"Client\" to manage parallel workers. Running this cell will provide a **Dashboard Link** where you can watch your CPU cores light up in real-time."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client\n",
"import dask.dataframe as dd\n",
"import time\n",
"\n",
"# Start a local Dask cluster using all available CPU cores\n",
"client = Client()\n",
"print(f\"🚀 Dask Dashboard available at: {client.dashboard_link}\")\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 3: The Pandas Benchmark (The Struggle)**\n",
"We attempt to read the 2GB file and parse dates into memory. \n",
"\n",
"> **Note:** We use `parse_dates` specifically because it is an expensive operation that forces Pandas to work hard. This step usually takes **60-90 seconds** or triggers a MemoryError on smaller machines."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_time = time.time()\n",
"print(\"🐢 Pandas: Reading & Processing...\")\n",
"\n",
"try:\n",
" # We force date parsing to ensure CPU load is high\n",
" df_pd = pd.read_csv(FILENAME, parse_dates=['date'])\n",
" \n",
" # Complex GroupBy Operation\n",
" print(\"🐢 Pandas: Grouping & Aggregating...\")\n",
" res_pd = df_pd.groupby('category')['amount'].mean()\n",
" \n",
" pd_duration = time.time() - start_time\n",
" print(f\"⏱️ Pandas Time: {pd_duration:.2f} seconds\")\n",
"except MemoryError:\n",
" print(\"❌ Pandas Crashed (MemoryError)! This proves the data is too big for RAM.\")\n",
" pd_duration = float('inf')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### **Step 4: The Dask Benchmark (The Solution)**\n",
"Dask processes this **lazily**. It scans the file structure instantly and then processes chunks in parallel across all your CPU cores. It never loads the entire file at once."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"start_time = time.time()\n",
"print(\"🐇 Dask: Lazy Read & Parallel Compute...\")\n",
"\n",
"# Dask handles date parsing efficiently across cores\n",
"ddf = dd.read_csv(FILENAME, parse_dates=['date'])\n",
"\n",
"# .compute() triggers the actual parallel execution\n",
"res_dask = ddf.groupby('category')['amount'].mean().compute()\n",
"\n",
"dask_duration = time.time() - start_time\n",
"print(f\"⏱️ Dask Time: {dask_duration:.2f} seconds\")\n",
"\n",
"# Calculate Speedup Factor\n",
"if pd_duration != float('inf'):\n",
" print(f\"\\n🚀 Speedup: {pd_duration / dask_duration:.2f}x Faster\")\n",
"else:\n",
" print(\"\\n🏆 Dask Wins (Pandas Crashed)\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"## 🏁 Conclusion & Next Steps\n",
"We have successfully demonstrated that **Dask** can handle heavy datasets that slow down standard Pandas workflows. By parallelizing the `read_csv` and `groupby` operations, Dask maximizes the utility of your **Saturn Cloud CPU** instance.\n",
"\n",
"### **Resources & Backlinks**\n",
"* **Cloud Infrastructure**: [Deploy on Saturn Cloud](https://saturncloud.io/)\n",
"* **Dask Documentation**: [Best Practices](https://docs.dask.org/en/stable/best-practices.html)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "cpu-plotly-env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Loading
Loading