Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions benchmarks/tpc/engines/comet-hashjoin.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[engine]
name = "comet-hashjoin"

[env]
required = ["COMET_JAR"]

[spark_submit]
jars = ["$COMET_JAR"]
driver_class_path = ["$COMET_JAR"]

[spark_conf]
"spark.driver.extraClassPath" = "$COMET_JAR"
"spark.executor.extraClassPath" = "$COMET_JAR"
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
"spark.comet.scan.impl" = "native_datafusion"
"spark.comet.exec.replaceSortMergeJoin" = "true"
"spark.comet.expression.Cast.allowIncompatible" = "true"
48 changes: 48 additions & 0 deletions benchmarks/tpc/engines/comet-iceberg-hashjoin.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[engine]
name = "comet-iceberg-hashjoin"

[env]
required = ["COMET_JAR", "ICEBERG_JAR", "ICEBERG_WAREHOUSE"]

[env.defaults]
ICEBERG_CATALOG = "local"

[spark_submit]
jars = ["$COMET_JAR", "$ICEBERG_JAR"]
driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]

[spark_conf]
"spark.driver.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
"spark.comet.exec.replaceSortMergeJoin" = "true"
"spark.comet.expression.Cast.allowIncompatible" = "true"
"spark.comet.enabled" = "true"
"spark.comet.exec.enabled" = "true"
"spark.comet.scan.icebergNative.enabled" = "true"
"spark.comet.explainFallback.enabled" = "true"
"spark.sql.catalog.${ICEBERG_CATALOG}" = "org.apache.iceberg.spark.SparkCatalog"
"spark.sql.catalog.${ICEBERG_CATALOG}.type" = "hadoop"
"spark.sql.catalog.${ICEBERG_CATALOG}.warehouse" = "$ICEBERG_WAREHOUSE"
"spark.sql.defaultCatalog" = "${ICEBERG_CATALOG}"

[tpcbench_args]
use_iceberg = true
1 change: 0 additions & 1 deletion benchmarks/tpc/engines/comet-iceberg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ driver_class_path = ["$COMET_JAR", "$ICEBERG_JAR"]
"spark.executor.extraClassPath" = "$COMET_JAR:$ICEBERG_JAR"
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
"spark.comet.exec.replaceSortMergeJoin" = "true"
"spark.comet.expression.Cast.allowIncompatible" = "true"
"spark.comet.enabled" = "true"
"spark.comet.exec.enabled" = "true"
Expand Down
1 change: 0 additions & 1 deletion benchmarks/tpc/engines/comet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,4 @@ driver_class_path = ["$COMET_JAR"]
"spark.plugins" = "org.apache.spark.CometPlugin"
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
"spark.comet.scan.impl" = "native_datafusion"
"spark.comet.exec.replaceSortMergeJoin" = "true"
"spark.comet.expression.Cast.allowIncompatible" = "true"
130 changes: 102 additions & 28 deletions benchmarks/tpc/generate-comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,88 @@

import argparse
import json
import logging
import matplotlib.pyplot as plt
import numpy as np

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def geomean(data):
return np.prod(data) ** (1 / len(data))

def generate_query_rel_speedup_chart(baseline, comparison, label1: str, label2: str, benchmark: str, title: str):
def get_durations(result, query_key):
"""Extract durations from a query result, supporting both old (list) and new (dict) formats."""
value = result[query_key]
if isinstance(value, dict):
return value["durations"]
return value

def get_all_queries(results):
"""Return the sorted union of all query keys across all result sets."""
all_keys = set()
for result in results:
all_keys.update(result.keys())
# Filter to numeric query keys and sort numerically
numeric_keys = []
for k in all_keys:
try:
numeric_keys.append(int(k))
except ValueError:
pass
return sorted(numeric_keys)

def get_common_queries(results, labels):
"""Return queries present in ALL result sets, warning about queries missing from some files."""
all_queries = get_all_queries(results)
common = []
for query in all_queries:
key = str(query)
present = [labels[i] for i, r in enumerate(results) if key in r]
missing = [labels[i] for i, r in enumerate(results) if key not in r]
if missing:
logger.warning(f"Query {query}: present in [{', '.join(present)}] but missing from [{', '.join(missing)}]")
if not missing:
common.append(query)
return common

def check_result_consistency(results, labels, benchmark):
"""Log warnings if row counts or result hashes differ across result sets."""
all_queries = get_all_queries(results)
for query in all_queries:
key = str(query)
row_counts = []
hashes = []
for i, result in enumerate(results):
if key not in result:
continue
value = result[key]
if not isinstance(value, dict):
continue
if "row_count" in value:
row_counts.append((labels[i], value["row_count"]))
if "result_hash" in value:
hashes.append((labels[i], value["result_hash"]))

if len(row_counts) > 1:
counts = set(rc for _, rc in row_counts)
if len(counts) > 1:
details = ", ".join(f"{label}={rc}" for label, rc in row_counts)
logger.warning(f"Query {query}: row count mismatch: {details}")

if len(hashes) > 1:
hash_values = set(h for _, h in hashes)
if len(hash_values) > 1:
details = ", ".join(f"{label}={h}" for label, h in hashes)
logger.warning(f"Query {query}: result hash mismatch: {details}")

def generate_query_rel_speedup_chart(baseline, comparison, label1: str, label2: str, benchmark: str, title: str, common_queries=None):
if common_queries is None:
common_queries = range(1, query_count(benchmark)+1)
results = []
for query in range(1, query_count(benchmark)+1):
if query == 999:
continue
a = np.median(np.array(baseline[str(query)]))
b = np.median(np.array(comparison[str(query)]))
for query in common_queries:
a = np.median(np.array(get_durations(baseline, str(query))))
b = np.median(np.array(get_durations(comparison, str(query))))
if a > b:
speedup = a/b-1
else:
Expand Down Expand Up @@ -80,13 +149,13 @@ def generate_query_rel_speedup_chart(baseline, comparison, label1: str, label2:
# Save the plot as an image file
plt.savefig(f'{benchmark}_queries_speedup_rel.png', format='png')

def generate_query_abs_speedup_chart(baseline, comparison, label1: str, label2: str, benchmark: str, title: str):
def generate_query_abs_speedup_chart(baseline, comparison, label1: str, label2: str, benchmark: str, title: str, common_queries=None):
if common_queries is None:
common_queries = range(1, query_count(benchmark)+1)
results = []
for query in range(1, query_count(benchmark)+1):
if query == 999:
continue
a = np.median(np.array(baseline[str(query)]))
b = np.median(np.array(comparison[str(query)]))
for query in common_queries:
a = np.median(np.array(get_durations(baseline, str(query))))
b = np.median(np.array(get_durations(comparison, str(query))))
speedup = a-b
results.append(("q" + str(query), round(speedup, 1)))

Expand Down Expand Up @@ -130,17 +199,17 @@ def generate_query_abs_speedup_chart(baseline, comparison, label1: str, label2:
# Save the plot as an image file
plt.savefig(f'{benchmark}_queries_speedup_abs.png', format='png')

def generate_query_comparison_chart(results, labels, benchmark: str, title: str):
def generate_query_comparison_chart(results, labels, benchmark: str, title: str, common_queries=None):
if common_queries is None:
common_queries = range(1, query_count(benchmark)+1)
queries = []
benches = []
for _ in results:
benches.append([])
for query in range(1, query_count(benchmark)+1):
if query == 999:
continue
for query in common_queries:
queries.append("q" + str(query))
for i in range(0, len(results)):
benches[i].append(np.median(np.array(results[i][str(query)])))
benches[i].append(np.median(np.array(get_durations(results[i], str(query)))))

# Define the width of the bars
bar_width = 0.3
Expand Down Expand Up @@ -168,25 +237,25 @@ def generate_query_comparison_chart(results, labels, benchmark: str, title: str)
# Save the plot as an image file
plt.savefig(f'{benchmark}_queries_compare.png', format='png')

def generate_summary(results, labels, benchmark: str, title: str):
def generate_summary(results, labels, benchmark: str, title: str, common_queries=None):
if common_queries is None:
common_queries = range(1, query_count(benchmark)+1)
timings = []
for _ in results:
timings.append(0)

num_queries = query_count(benchmark)
for query in range(1, num_queries + 1):
if query == 999:
continue
num_queries = len([q for q in common_queries])
for query in common_queries:
for i in range(0, len(results)):
timings[i] += np.median(np.array(results[i][str(query)]))
timings[i] += np.median(np.array(get_durations(results[i], str(query))))

# Create figure and axis
fig, ax = plt.subplots()
fig.set_size_inches(10, 6)

# Add title and labels
ax.set_title(title)
ax.set_ylabel(f'Time in seconds to run all {num_queries} {benchmark} queries (lower is better)')
ax.set_ylabel(f'Time in seconds to run {num_queries} {benchmark} queries (lower is better)')

times = [round(x,0) for x in timings]

Expand All @@ -213,11 +282,16 @@ def main(files, labels, benchmark: str, title: str):
for filename in files:
with open(filename) as f:
results.append(json.load(f))
generate_summary(results, labels, benchmark, title)
generate_query_comparison_chart(results, labels, benchmark, title)
check_result_consistency(results, labels, benchmark)
common_queries = get_common_queries(results, labels)
if not common_queries:
logger.error("No queries found in common across all result files")
return
generate_summary(results, labels, benchmark, title, common_queries)
generate_query_comparison_chart(results, labels, benchmark, title, common_queries)
if len(files) == 2:
generate_query_abs_speedup_chart(results[0], results[1], labels[0], labels[1], benchmark, title)
generate_query_rel_speedup_chart(results[0], results[1], labels[0], labels[1], benchmark, title)
generate_query_abs_speedup_chart(results[0], results[1], labels[0], labels[1], benchmark, title, common_queries)
generate_query_rel_speedup_chart(results[0], results[1], labels[0], labels[1], benchmark, title, common_queries)

if __name__ == '__main__':
argparse = argparse.ArgumentParser(description='Generate comparison')
Expand Down
1 change: 1 addition & 0 deletions benchmarks/tpc/infra/docker/docker-compose-laptop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,6 @@ services:
- ICEBERG_JAR=/jars/iceberg.jar
- TPCH_DATA=/data
- TPCDS_DATA=/data
- SPARK_EVENT_LOG_DIR=/results/spark-events
mem_limit: 4g
memswap_limit: 4g
1 change: 1 addition & 0 deletions benchmarks/tpc/infra/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ services:
- ICEBERG_JAR=/jars/iceberg.jar
- TPCH_DATA=/data
- TPCDS_DATA=/data
- SPARK_EVENT_LOG_DIR=/results/spark-events
mem_limit: ${BENCH_MEM_LIMIT:-10g}
memswap_limit: ${BENCH_MEM_LIMIT:-10g}

Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tpc/queries/tpcds/q12.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ where
and i_category in ('Jewelry', 'Books', 'Women')
and ws_sold_date_sk = d_date_sk
and d_date between cast('2002-03-22' as date)
and (cast('2002-03-22' as date) + 30 days)
and (cast('2002-03-22' as date) + INTERVAL '30 DAYS')
group by
i_item_id
,i_item_desc
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/tpc/queries/tpcds/q16.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance Council.
-- This query was generated at scale factor 1.
select
count(distinct cs_order_number) as "order count"
,sum(cs_ext_ship_cost) as "total shipping cost"
,sum(cs_net_profit) as "total net profit"
count(distinct cs_order_number) as `order count`
,sum(cs_ext_ship_cost) as `total shipping cost`
,sum(cs_net_profit) as `total net profit`
from
catalog_sales cs1
,date_dim
,customer_address
,call_center
where
d_date between '1999-5-01' and
(cast('1999-5-01' as date) + 60 days)
(cast('1999-5-01' as date) + INTERVAL '60 DAYS')
and cs1.cs_ship_date_sk = d_date_sk
and cs1.cs_ship_addr_sk = ca_address_sk
and ca_state = 'ID'
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/tpc/queries/tpcds/q20.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ select i_item_id
and i_category in ('Children', 'Sports', 'Music')
and cs_sold_date_sk = d_date_sk
and d_date between cast('2002-04-01' as date)
and (cast('2002-04-01' as date) + 30 days)
and (cast('2002-04-01' as date) + INTERVAL '30 DAYS')
group by i_item_id
,i_item_desc
,i_category
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/tpc/queries/tpcds/q21.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ select *
and i_item_sk = inv_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
and d_date between (cast ('2000-05-19' as date) - 30 days)
and (cast ('2000-05-19' as date) + 30 days)
and d_date between (cast ('2000-05-19' as date) - INTERVAL '30 DAYS')
and (cast ('2000-05-19' as date) + INTERVAL '30 DAYS')
group by w_warehouse_name, i_item_id) x
where (case when inv_before > 0
then inv_after / inv_before
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/tpc/queries/tpcds/q32.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-- CometBench-DS query 32 derived from TPC-DS query 32 under the terms of the TPC Fair Use Policy.
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance Council.
-- This query was generated at scale factor 1.
select sum(cs_ext_discount_amt) as "excess discount amount"
select sum(cs_ext_discount_amt) as `excess discount amount`
from
catalog_sales
,item
Expand All @@ -10,7 +10,7 @@ where
i_manufact_id = 283
and i_item_sk = cs_item_sk
and d_date between '1999-02-22' and
(cast('1999-02-22' as date) + 90 days)
(cast('1999-02-22' as date) + INTERVAL '90 DAYS')
and d_date_sk = cs_sold_date_sk
and cs_ext_discount_amt
> (
Expand All @@ -22,7 +22,7 @@ and cs_ext_discount_amt
where
cs_item_sk = i_item_sk
and d_date between '1999-02-22' and
(cast('1999-02-22' as date) + 90 days)
(cast('1999-02-22' as date) + INTERVAL '90 DAYS')
and d_date_sk = cs_sold_date_sk
)
LIMIT 100;
Expand Down
Loading
Loading