Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- TODO: Remove hardcoded table name
SELECT
*
FROM
`p3rf-bq-search`.`search_index_dataset`.INFORMATION_SCHEMA.SEARCH_INDEXES
WHERE
table_name = 'prototype_CUJ2_benchmark_100GB_and_growing'
AND
coverage_percentage = 100;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- TODO: Remove hardcoded table name
CREATE SEARCH INDEX IF NOT EXISTS prototype_CUJ2_benchmark_index
ON prototype_CUJ2_benchmark_100GB_and_growing(ALL COLUMNS);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- TODO: Remove hardcoded table name
CREATE TABLE prototype_CUJ2_benchmark_100GB_and_growing
AS
SELECT *
FROM
prototype_CUJ1_benchmark_100GB
LIMIT 133782700
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- TODO: Remove hardcoded table name
DROP SEARCH INDEX IF EXISTS prototype_CUJ2_benchmark_index ON prototype_CUJ2_benchmark_100GB_and_growing;
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- TODO: Remove hardcoded table name
DROP TABLE prototype_CUJ2_benchmark_100GB_and_growing
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- TODO: Remove hardcoded table name
INSERT INTO prototype_CUJ2_benchmark_100GB_and_growing
SELECT *
FROM
prototype_CUJ2_benchmark_100GB_and_growing
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- TODO: Remove hardcoded table name
SELECT *
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = 'prototype_CUJ2_benchmark_100GB_and_growing';
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
# Copyright 2025 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Benchmark for measuring search index building time under data ingestion

The benchmark first creates a table and builds the search index on it, waits for
it to fully cover the table, then inserts new data to the table and measure how long
it takes for the index to return back to 100% coverage.


TODO: Set up SF tables and queries and test if they work.
"""

import logging
import os
import time
from typing import Any
from absl import flags
from perfkitbenchmarker import benchmark_spec as bm_spec
from perfkitbenchmarker import configs
from perfkitbenchmarker import edw_service
from perfkitbenchmarker import sample


BENCHMARK_NAME = 'edw_index_building_ingestion_benchmark'
BENCHMARK_CONFIG = """
edw_index_building_ingestion_benchmark:
description: Benchmark for checking how fast is index propogation under ingestion.
edw_service:
type: bigquery
cluster_identifier: _cluster_id_
vm_groups:
client:
vm_spec: *default_dual_core
"""

flags.DEFINE_string(
'edw_index_building_ingestion_query_dir',
'',
'Optional local directory containing all query files. '
'Can be absolute or relative to the executable.',
)

FLAGS = flags.FLAGS


def GetConfig(user_config: dict[Any, Any]) -> dict[Any, Any]:
"""Load and return benchmark config.

Args:
user_config: user supplied configuration (flags and config file)

Returns:
loaded benchmark configuration
"""
return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)


def Prepare(benchmark_spec: bm_spec.BenchmarkSpec) -> None:
"""Prepares the client VM to run the benchmark.

Args:
benchmark_spec: The benchmark specification.
"""
benchmark_spec.always_call_cleanup = True
edw_service_instance = benchmark_spec.edw_service
vm = benchmark_spec.vms[0]

edw_service_instance.GetClientInterface().SetProvisionedAttributes(
benchmark_spec
)
edw_service_instance.GetClientInterface().Prepare('edw_common')

query_locations: list[str] = [
os.path.join(FLAGS.edw_index_building_ingestion_query_dir, query)
for query in FLAGS.edw_power_queries.split(',')
]
any(vm.PushDataFile(query_loc) for query_loc in query_locations)


def _EnsureNoTable(
client_interface: edw_service.EdwClientInterface, timeout: int = 30
) -> None:
"""Ensures that the table does not exist.

Args:
client_interface: The EDW client interface.
timeout: The maximum time(in seconds) to wait for the index to fully cover.
"""
start_time = time.time()
while True:
time_elapsed = time.time() - start_time
if time_elapsed > timeout:
logging.error('Timed out waiting for table to be deleted')
# TODO: find a way to stop the benchmark in case of timeout
break
_, metadata = client_interface.ExecuteQuery('verify_no_table_query')
if metadata and metadata.get('rows_returned', 0) > 0:
client_interface.ExecuteQuery('delete_table_query')
else:
break
time.sleep(1)


def _EnsureFullyIndexed(
client_interface: edw_service.EdwClientInterface, results: list[sample.Sample], timeout: int = 1200
) -> None:
"""Checks if the existing index has fully covered the table.

Args:
client_interface: The EDW client interface.
timeout: The maximum time(in seconds) to wait for the index to fully cover.
results: The list of samples to append to.
"""
start_time = time.time()
while True:
time_elapsed = time.time() - start_time
if time_elapsed > timeout:
logging.error('Timed out waiting for initial index to to fully cover.')
# TODO: find a way to stop the benchmark in case of timeout
break
_, metadata = client_interface.ExecuteQuery('check_index_coverage_query')
if metadata and metadata.get('rows_returned', 0) > 0:
results.append(
sample.Sample(
'initial_index_fully_available_time', time_elapsed, 'seconds', metadata
)
)
break
time.sleep(1)


def _CreateTable(
client_interface: edw_service.EdwClientInterface,
results: list[sample.Sample],
) -> None:
"""Create the table for CUJ2 benchmark by cloning from the existing table for CUJ1

Args:
client_interface: The EDW client interface.
results: The list of samples to append to.
"""
execution_time, metadata = client_interface.ExecuteQuery('create_table_query')
results.append(
sample.Sample(
'table_creation_time', execution_time, 'seconds', metadata
)
)

def _CreateIndex(
client_interface: edw_service.EdwClientInterface,
results: list[sample.Sample],
) -> None:
"""Creates an index and records the execution time.

Args:
client_interface: The EDW client interface.
results: The list of samples to append to.
"""
execution_time, metadata = client_interface.ExecuteQuery('create_index_query')
results.append(
sample.Sample(
'search_index_creation_time', execution_time, 'seconds', metadata
)
)

def _IngestData(
client_interface: edw_service.EdwClientInterface,
results: list[sample.Sample],
) -> None:
"""Ingest data to the table

Args:
client_interface: The EDW client interface.
results: The list of samples to append to.
"""
execution_time, metadata = client_interface.ExecuteQuery('ingest_data_query')
results.append(
sample.Sample(
'data_ingestion_completion_time', execution_time, 'seconds', metadata
)
)

def _MeasureCoveringTime(
client_interface: edw_service.EdwClientInterface,
results: list[sample.Sample],
timeout: int = 1500,
) -> None:
"""Measures the time it takes for the index to be fully built(reaching 100% coverage).

Args:
client_interface: The EDW client interface.
results: The list of samples to append to.
timeout: The maximum time(in seconds) to wait for the index to be fully
built.
"""
start_time = time.time()
while True:
_, metadata = client_interface.ExecuteQuery('check_index_coverage_query')
time_elapsed = time.time() - start_time
if metadata and metadata.get('rows_returned', 0) > 0:
results.append(
sample.Sample(
'search_index_fully_available_time', time_elapsed, 'seconds', metadata
)
)
break
if time_elapsed > timeout:
logging.error('Timed out waiting for index to fully cover the table.')
# TODO: find a way to stop the benchmark in case of timeout
break
else:
time.sleep(1)


def _DeleteTable(
client_interface: edw_service.EdwClientInterface
) -> None:
"""Deletes the table and records the execution time.

Args:
client_interface: The EDW client interface.
"""
client_interface.ExecuteQuery('delete_table_query')


def _DeleteIndex(
client_interface: edw_service.EdwClientInterface
) -> None:
"""Deletes the index and records the execution time.

Args:
client_interface: The EDW client interface.
"""
client_interface.ExecuteQuery('delete_index_query')



def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]:
"""Runs the benchmark and returns a list of samples.

Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.

Returns:
A list of sample.Sample objects.
"""
results: list[sample.Sample] = []

edw_service_instance = benchmark_spec.edw_service
client_interface = edw_service_instance.GetClientInterface()

_EnsureNoTable(client_interface)

_CreateTable(client_interface, results)

_CreateIndex(client_interface, results)

_EnsureFullyIndexed(client_interface, results)

_IngestData(client_interface, results)

_MeasureCoveringTime(client_interface, results)

return results


def Cleanup(benchmark_spec: bm_spec.BenchmarkSpec) -> None:
"""Cleans up the benchmark resources.

Args:
benchmark_spec: The benchmark specification.
"""
benchmark_spec.edw_service.Cleanup()

edw_service_instance = benchmark_spec.edw_service
_DeleteIndex(edw_service_instance.GetClientInterface())
_DeleteTable(edw_service_instance.GetClientInterface())