Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 89 additions & 12 deletions benchmarks/tpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ For full instructions on running these benchmarks on an EC2 instance, see the [C

TPC queries are bundled in `benchmarks/tpc/queries/` (derived from TPC-H/DS under the TPC Fair Use Policy).

Create a virtual environment and install dependencies:

```shell
cd benchmarks/tpc
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
```

## Usage

All benchmarks are run via `run.py`:
Expand All @@ -38,15 +47,17 @@ All benchmarks are run via `run.py`:
python3 run.py --engine <engine> --benchmark <tpch|tpcds> [options]
```

| Option | Description |
| -------------- | ------------------------------------------------ |
| `--engine` | Engine name (matches a TOML file in `engines/`) |
| `--benchmark` | `tpch` or `tpcds` |
| `--iterations` | Number of iterations (default: 1) |
| `--output` | Output directory (default: `.`) |
| `--query` | Run a single query number |
| `--no-restart` | Skip Spark master/worker restart |
| `--dry-run` | Print the spark-submit command without executing |
| Option | Description |
| -------------------- | ---------------------------------------------------- |
| `--engine` | Engine name (matches a TOML file in `engines/`) |
| `--benchmark` | `tpch` or `tpcds` |
| `--iterations` | Number of iterations (default: 1) |
| `--output` | Output directory (default: `.`) |
| `--query` | Run a single query number |
| `--no-restart` | Skip Spark master/worker restart |
| `--dry-run` | Print the spark-submit command without executing |
| `--profile` | Enable executor metrics profiling via Spark REST API |
| `--profile-interval` | Profiling poll interval in seconds (default: 2.0) |

Available engines: `spark`, `comet`, `comet-iceberg`, `gluten`

Expand Down Expand Up @@ -103,6 +114,20 @@ Generating charts:
python3 generate-comparison.py --benchmark tpch --labels "Spark 3.5.3" "Comet 0.9.0" "Gluten 1.4.0" --title "TPC-H @ 100 GB (single executor, 8 cores, local Parquet files)" spark-tpch-1752338506381.json comet-tpch-1752337818039.json gluten-tpch-1752337474344.json
```

## Profiling

Use `--profile` to collect executor memory metrics from the Spark REST API during the benchmark run.
A background thread polls `/api/v1/applications/{appId}/executors` at a configurable interval and
writes the time-series data to a CSV file alongside the benchmark results.

```shell
python3 run.py --engine comet --benchmark tpch --profile
python3 run.py --engine comet --benchmark tpch --profile --profile-interval 1.0
```

The output CSV is written to `{output}/{name}-{benchmark}-metrics.csv` and contains per-executor
columns including `memoryUsed`, `maxMemory`, heap/off-heap storage metrics, and peak memory metrics.

## Engine Configuration

Each engine is defined by a TOML file in `engines/`. The config specifies JARs, Spark conf overrides,
Expand Down Expand Up @@ -268,7 +293,8 @@ between versions by changing the path and restarting.
Use `docker compose run --rm` to execute benchmarks. The `--rm` flag removes the
container when it exits, preventing port conflicts on subsequent runs. Pass
`--no-restart` since the cluster is already managed by Compose, and `--output /results`
so that output files land in the mounted results directory:
so that output files land in the mounted results directory (alongside cgroup metrics
CSVs):

```shell
docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml \
Expand Down Expand Up @@ -315,7 +341,7 @@ docker compose -f benchmarks/tpc/infra/docker/docker-compose.yml \
> `sun.misc.Unsafe` errors. Unset `BENCH_JAVA_HOME` (or switch it back to Java 17)
> and restart the cluster before running Comet or Spark benchmarks.

### Memory limits
### Memory limits and metrics

Two compose files are provided for different hardware profiles:

Expand All @@ -335,7 +361,32 @@ Two compose files are provided for different hardware profiles:

Configure via environment variables: `WORKER_MEM_LIMIT` (default: 32g per worker),
`BENCH_MEM_LIMIT` (default: 10g), `WORKER_MEMORY` (default: 16g, Spark executor memory),
`WORKER_CORES` (default: 8).
`WORKER_CORES` (default: 8), `METRICS_INTERVAL` (default: 1 second).

A metrics-collector sidecar runs alongside each worker to collect cgroup metrics.
Raw cgroup metrics are continuously written to
`$RESULTS_DIR/container-metrics-spark-worker-{1,2}.csv`. These files are overwritten each
time the cluster restarts.

When `--profile` is used, the profiler automatically snapshots the cgroup data for the
benchmark time window, producing per-engine files:

- `{name}-{benchmark}-metrics.csv` -- JVM executor metrics
- `{name}-{benchmark}-container-metrics-spark-worker-1.csv` -- cgroup snapshot for worker 1
- `{name}-{benchmark}-container-metrics-spark-worker-2.csv` -- cgroup snapshot for worker 2

This ensures each engine run has its own paired JVM + cgroup dataset even when multiple
engines are benchmarked on the same cluster.

Use `visualize-metrics.py` to generate memory charts from these files:

```shell
python3 visualize-metrics.py \
--jvm-metrics /tmp/bench-results/comet-tpch-metrics.csv \
--cgroup-metrics /tmp/bench-results/comet-tpch-container-metrics-spark-worker-1.csv \
/tmp/bench-results/comet-tpch-container-metrics-spark-worker-2.csv \
--output-dir /tmp/comet-charts --title "Comet TPC-H"
```

### Running on a laptop with small scale factors

Expand All @@ -353,6 +404,32 @@ The benchmark scripts request 2 executor instances and 16 max cores by default
(`run.py`). Spark will simply use whatever resources are available on the single worker,
so no script changes are needed.

### Quick start with docker-bench.sh

The `docker-bench.sh` wrapper script orchestrates the full Docker lifecycle — start
cluster, run benchmark, tear down — in a single command. The cluster is always torn
down on exit, even on Ctrl-C or errors.

```shell
export DATA_DIR=/mnt/bigdata/tpch/sf100
export COMET_JAR=/path/to/comet-spark-spark3.5_2.12-0.10.0.jar

# Run a benchmark (starts cluster, runs benchmark, tears down)
./benchmarks/tpc/docker-bench.sh --engine comet --benchmark tpch

# With profiling
./benchmarks/tpc/docker-bench.sh --engine comet --benchmark tpch --profile

# Laptop mode (single worker, reduced memory)
./benchmarks/tpc/docker-bench.sh --laptop --engine comet --benchmark tpch

# Dry run (prints spark-submit command without executing)
./benchmarks/tpc/docker-bench.sh --engine comet --benchmark tpch --dry-run
```

All arguments except `--laptop` are passed through to `run.py`. The script
automatically adds `--output /results` and `--no-restart`.

### Comparing Parquet vs Iceberg performance

Run both benchmarks and compare:
Expand Down
101 changes: 101 additions & 0 deletions benchmarks/tpc/docker-bench.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Wrapper script for running TPC benchmarks in a Docker Compose Spark cluster.
#
# Orchestrates the full lifecycle: start cluster, run benchmark, tear down.
# All arguments (except --laptop) are passed through to run.py.
#
# Usage:
# ./benchmarks/tpc/docker-bench.sh --engine comet --benchmark tpch
# ./benchmarks/tpc/docker-bench.sh --laptop --engine comet --benchmark tpch
# ./benchmarks/tpc/docker-bench.sh --engine comet --benchmark tpch --profile
#
# Required environment variables:
# DATA_DIR - Host path to TPC data
#
# Engine-specific environment variables (set the one matching --engine):
# COMET_JAR - Host path to Comet JAR
# GLUTEN_JAR - Host path to Gluten JAR
# ICEBERG_JAR - Host path to Iceberg Spark runtime JAR
#
# Optional environment variables:
# RESULTS_DIR - Host path for results (default: /tmp/bench-results)
# BENCH_IMAGE - Docker image name (default: comet-bench)
# BENCH_JAVA_HOME - Java home inside container
# WORKER_MEM_LIMIT - Memory limit per worker container
# BENCH_MEM_LIMIT - Memory limit for bench container
# WORKER_MEMORY - Spark executor memory
# WORKER_CORES - Spark executor cores
# METRICS_INTERVAL - Metrics collection interval in seconds

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
COMPOSE_DIR="$SCRIPT_DIR/infra/docker"

# --- Parse our own flags (--laptop), pass the rest to run.py ---
LAPTOP=false
RUN_ARGS=()

for arg in "$@"; do
if [ "$arg" = "--laptop" ]; then
LAPTOP=true
else
RUN_ARGS+=("$arg")
fi
done

if [ "$LAPTOP" = true ]; then
COMPOSE_FILE="$COMPOSE_DIR/docker-compose-laptop.yml"
else
COMPOSE_FILE="$COMPOSE_DIR/docker-compose.yml"
fi

# --- Validate environment ---
if [ -z "${DATA_DIR:-}" ]; then
echo "Error: DATA_DIR is not set. Set it to the host path of your TPC data." >&2
exit 1
fi

RESULTS_DIR="${RESULTS_DIR:-/tmp/bench-results}"

# --- Ensure results directories exist ---
mkdir -p "$RESULTS_DIR/spark-events"

# --- Cleanup on exit (Ctrl-C, errors, normal exit) ---
cleanup() {
echo ""
echo "Stopping Docker Compose cluster..."
docker compose -f "$COMPOSE_FILE" down
}
trap cleanup EXIT

# --- Start the cluster ---
echo "Starting Spark cluster with: $COMPOSE_FILE"
docker compose -f "$COMPOSE_FILE" up -d

# --- Run the benchmark ---
echo "Running benchmark: run.py ${RUN_ARGS[*]:-}"
docker compose -f "$COMPOSE_FILE" \
run --rm -p 4040:4040 bench \
python3 /opt/benchmarks/run.py \
--output /results --no-restart \
"${RUN_ARGS[@]}"
6 changes: 6 additions & 0 deletions benchmarks/tpc/infra/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk
# Copy the benchmark scripts into the image.
COPY benchmarks/tpc/run.py /opt/benchmarks/run.py
COPY benchmarks/tpc/tpcbench.py /opt/benchmarks/tpcbench.py
COPY benchmarks/tpc/profiling.py /opt/benchmarks/profiling.py
COPY benchmarks/tpc/engines /opt/benchmarks/engines
COPY benchmarks/tpc/queries /opt/benchmarks/queries
COPY benchmarks/tpc/create-iceberg-tables.py /opt/benchmarks/create-iceberg-tables.py
COPY benchmarks/tpc/generate-comparison.py /opt/benchmarks/generate-comparison.py
COPY benchmarks/tpc/visualize-metrics.py /opt/benchmarks/visualize-metrics.py

# Copy the metrics collector script.
COPY benchmarks/tpc/infra/docker/collect-metrics.sh /opt/benchmarks/collect-metrics.sh
RUN chmod +x /opt/benchmarks/collect-metrics.sh

# Engine JARs are bind-mounted or copied in at runtime via --jars.
# Data and query paths are also bind-mounted.
Expand Down
103 changes: 103 additions & 0 deletions benchmarks/tpc/infra/docker/collect-metrics.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#!/bin/sh
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Container-level memory metrics collector.
#
# Polls cgroup memory stats at a fixed interval and writes a CSV with
# columns: timestamp, memory_usage_bytes, memory_limit_bytes, rss_bytes,
# cache_bytes, swap_bytes.
#
# Works with both cgroup v1 and v2.
#
# Usage:
# collect-metrics.sh [INTERVAL_SECS] [OUTPUT_CSV]
#
# Defaults: interval=1, output=/results/container-metrics.csv

set -e

INTERVAL="${1:-1}"
OUTPUT="${2:-/results/container-metrics.csv}"

# Detect cgroup version
if [ -f /sys/fs/cgroup/memory/memory.usage_in_bytes ]; then
CGROUP_VERSION=1
elif [ -f /sys/fs/cgroup/memory.current ]; then
CGROUP_VERSION=2
else
echo "Warning: cannot detect cgroup memory files; polling disabled" >&2
# Still write a header so downstream tools don't break on a missing file.
echo "timestamp_ms,memory_usage_bytes,memory_limit_bytes,rss_bytes,cache_bytes,swap_bytes" > "$OUTPUT"
# Sleep forever so the container stays up (compose expects it to keep running).
exec sleep infinity
fi

# ---- helpers ----

read_file() {
# Return the contents of a file, or "0" if it doesn't exist.
if [ -f "$1" ]; then cat "$1"; else echo "0"; fi
}

read_stat() {
# Extract a named field from memory.stat (cgroup v1 format: "key value").
grep "^$1 " "$2" 2>/dev/null | awk '{print $2}' || echo "0"
}

poll_v1() {
local usage limit rss cache swap
usage=$(read_file /sys/fs/cgroup/memory/memory.usage_in_bytes)
limit=$(read_file /sys/fs/cgroup/memory/memory.limit_in_bytes)
local stat=/sys/fs/cgroup/memory/memory.stat
rss=$(read_stat total_rss "$stat")
cache=$(read_stat total_cache "$stat")
swap=$(read_file /sys/fs/cgroup/memory/memory.memsw.usage_in_bytes)
# swap file reports memory+swap; subtract memory to get swap only
if [ "$swap" != "0" ]; then
swap=$((swap - usage))
[ "$swap" -lt 0 ] && swap=0
fi
echo "$usage,$limit,$rss,$cache,$swap"
}

poll_v2() {
local usage limit rss cache swap
usage=$(read_file /sys/fs/cgroup/memory.current)
limit=$(read_file /sys/fs/cgroup/memory.max)
[ "$limit" = "max" ] && limit=0
local stat=/sys/fs/cgroup/memory.stat
rss=$(read_stat anon "$stat")
cache=$(read_stat file "$stat")
swap=$(read_file /sys/fs/cgroup/memory.swap.current)
echo "$usage,$limit,$rss,$cache,$swap"
}

# ---- main loop ----

echo "timestamp_ms,memory_usage_bytes,memory_limit_bytes,rss_bytes,cache_bytes,swap_bytes" > "$OUTPUT"
echo "Collecting container memory metrics every ${INTERVAL}s -> ${OUTPUT} (cgroup v${CGROUP_VERSION})" >&2

while true; do
ts=$(date +%s%3N 2>/dev/null || python3 -c 'import time; print(int(time.time()*1000))')
if [ "$CGROUP_VERSION" = "1" ]; then
vals=$(poll_v1)
else
vals=$(poll_v2)
fi
echo "${ts},${vals}" >> "$OUTPUT"
sleep "$INTERVAL"
done
12 changes: 11 additions & 1 deletion benchmarks/tpc/infra/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# ICEBERG_JAR - Host path to Iceberg Spark runtime JAR
# WORKER_MEM_LIMIT - Hard memory limit per worker container (default: 32g)
# BENCH_MEM_LIMIT - Hard memory limit for the bench runner (default: 10g)
# METRICS_INTERVAL - Metrics collection interval in seconds (default: 1)
# BENCH_JAVA_HOME - Java home inside container (default: /usr/lib/jvm/java-17-openjdk)
# Set to /usr/lib/jvm/java-8-openjdk for Gluten

Expand All @@ -47,7 +48,16 @@ x-worker: &worker
image: ${BENCH_IMAGE:-comet-bench}
depends_on:
- spark-master
command: /opt/spark/sbin/start-worker.sh spark://spark-master:7077
# The metrics collector runs inside the worker container so that it
# reads the worker's own cgroup memory stats (a separate sidecar
# container would only see its own cgroup, not the worker's).
command:
- sh
- -c
- >-
/opt/benchmarks/collect-metrics.sh ${METRICS_INTERVAL:-1}
/results/container-metrics-$${HOSTNAME}.csv &
exec /opt/spark/sbin/start-worker.sh spark://spark-master:7077
volumes: *volumes
environment:
- JAVA_HOME=${BENCH_JAVA_HOME:-/usr/lib/jvm/java-17-openjdk}
Expand Down
Loading
Loading