Skip to content
34 changes: 16 additions & 18 deletions codeflash/languages/python/parse_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import os
import re
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from junitparser.xunit2 import JUnitXml

Expand Down Expand Up @@ -48,7 +48,7 @@
)


def _parse_func(file_path: Path):
def _parse_func(file_path: Path) -> Any:
from lxml.etree import XMLParser, parse

xml_parser = XMLParser(huge_tree=True)
Expand All @@ -59,13 +59,22 @@ def parse_python_test_xml(
test_xml_file_path: Path,
test_files: TestFiles,
test_config: TestConfig,
run_result: subprocess.CompletedProcess | None = None,
run_result: subprocess.CompletedProcess[str] | None = None,
) -> TestResults:
from codeflash.verification.parse_test_output import resolve_test_file_from_class_path

test_results = TestResults()
if not test_xml_file_path.exists():
logger.warning(f"No test results for {test_xml_file_path} found.")
if run_result is not None and run_result.returncode != 0:
stderr_snippet = (run_result.stderr or "")[:500]
stdout_snippet = (run_result.stdout or "")[:500]
logger.warning(
f"No test results for {test_xml_file_path} found. "
f"Subprocess exited with code {run_result.returncode}.\n"
f"stdout: {stdout_snippet}\nstderr: {stderr_snippet}"
)
else:
logger.warning(f"No test results for {test_xml_file_path} found.")
console.rule()
return test_results
try:
Expand All @@ -87,12 +96,7 @@ def parse_python_test_xml(
):
logger.info("Test failed to load, skipping it.")
if run_result is not None:
if isinstance(run_result.stdout, str) and isinstance(run_result.stderr, str):
logger.info(f"Test log - STDOUT : {run_result.stdout} \n STDERR : {run_result.stderr}")
else:
logger.info(
f"Test log - STDOUT : {run_result.stdout.decode()} \n STDERR : {run_result.stderr.decode()}"
)
logger.info(f"Test log - STDOUT : {run_result.stdout} \n STDERR : {run_result.stderr}")
return test_results

test_class_path = testcase.classname
Expand Down Expand Up @@ -159,7 +163,7 @@ def parse_python_test_xml(
sys_stdout = testcase.system_out or ""

begin_matches = list(matches_re_start.finditer(sys_stdout))
end_matches: dict[tuple, re.Match] = {}
end_matches: dict[tuple[str, ...], re.Match[str]] = {}
for match in matches_re_end.finditer(sys_stdout):
groups = match.groups()
if len(groups[5].split(":")) > 1:
Expand Down Expand Up @@ -234,11 +238,5 @@ def parse_python_test_xml(
f"Tests '{[test_file.original_file_path for test_file in test_files.test_files]}' failed to run, skipping"
)
if run_result is not None:
stdout, stderr = "", ""
try:
stdout = run_result.stdout.decode()
stderr = run_result.stderr.decode()
except AttributeError:
stdout = run_result.stderr
logger.debug(f"Test log - STDOUT : {stdout} \n STDERR : {stderr}")
logger.debug(f"Test log - STDOUT : {run_result.stdout} \n STDERR : {run_result.stderr}")
return test_results
34 changes: 14 additions & 20 deletions codeflash/languages/python/support.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
CodeContext,
FunctionFilterCriteria,
HelperFunction,
Language,
ReferenceInfo,
TestInfo,
TestResult,
)
from codeflash.languages.language_enum import Language
from codeflash.languages.registry import register_language
from codeflash.models.function_types import FunctionParent

Expand Down Expand Up @@ -48,8 +48,8 @@ def function_sources_to_helpers(sources: list[FunctionSource]) -> list[HelperFun
qualified_name=fs.qualified_name,
file_path=fs.file_path,
source_code=fs.source_code,
start_line=fs.jedi_definition.line if fs.jedi_definition else 1,
end_line=fs.jedi_definition.line if fs.jedi_definition else 1,
start_line=getattr(getattr(fs, "jedi_definition", None), "line", 1),
end_line=getattr(getattr(fs, "jedi_definition", None), "line", 1),
)
for fs in sources
]
Expand Down Expand Up @@ -119,7 +119,7 @@ def visit_FunctionDef(self, node: cst.FunctionDef) -> None:
)


@register_language
@register_language # type: ignore[arg-type] # PythonSupport satisfies LanguageSupport protocol structurally
class PythonSupport:
"""Python language support implementation.

Expand Down Expand Up @@ -214,6 +214,7 @@ def load_coverage(
) -> Any:
from codeflash.verification.coverage_utils import CoverageUtils

assert coverage_config_file is not None
return CoverageUtils.load_from_sqlite_database(
database_path=coverage_database_file,
config_path=coverage_config_file,
Expand Down Expand Up @@ -854,7 +855,7 @@ def compare_test_results(
candidate_results_path: Path,
project_root: Path | None = None,
project_classpath: str | None = None,
) -> tuple[bool, list]:
) -> tuple[bool, list[Any]]:
"""Compare test results between original and candidate code.

Args:
Expand Down Expand Up @@ -1017,7 +1018,7 @@ def instrument_source_for_line_profiler(
# This is handled through the existing infrastructure
return True

def parse_line_profile_results(self, line_profiler_output_file: Path) -> dict:
def parse_line_profile_results(self, line_profiler_output_file: Path) -> dict[str, Any]:
"""Parse line profiler output for Python.

Args:
Expand Down Expand Up @@ -1078,7 +1079,7 @@ def run_behavioral_tests(
from codeflash.code_utils.config_consts import TOTAL_LOOPING_TIME_EFFECTIVE
from codeflash.languages.python.static_analysis.coverage_utils import prepare_coverage_files
from codeflash.languages.python.test_runner import execute_test_subprocess
from codeflash.models.models import TestType
from codeflash.models.test_type import TestType

blocklisted_plugins = ["benchmark", "codspeed", "xdist", "sugar"]

Expand Down Expand Up @@ -1110,7 +1111,7 @@ def run_behavioral_tests(
common_pytest_args.append(f"--timeout={timeout}")

result_file_path = get_run_tmp_file(Path("pytest_results.xml"))
result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"]
result_args = [f"--junitxml={result_file_path}", "-o", "junit_logging=all"]

pytest_test_env = test_env.copy()
pytest_test_env["PYTEST_PLUGINS"] = "codeflash.verification.pytest_plugin"
Expand All @@ -1137,14 +1138,7 @@ def run_behavioral_tests(
shlex.split(f"{SAFE_SYS_EXECUTABLE} -m coverage erase"), cwd=cwd, env=pytest_test_env, timeout=30
)
logger.debug(cov_erase)
coverage_cmd = [
SAFE_SYS_EXECUTABLE,
"-m",
"coverage",
"run",
f"--rcfile={coverage_config_file.as_posix()}",
"-m",
]
coverage_cmd = [SAFE_SYS_EXECUTABLE, "-m", "coverage", "run", f"--rcfile={coverage_config_file}", "-m"]
coverage_cmd.extend(self.pytest_cmd_tokens(IS_POSIX))

blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins if plugin != "cov"]
Expand Down Expand Up @@ -1201,7 +1195,7 @@ def run_benchmarking_tests(
pytest_args.append(f"--timeout={timeout}")

result_file_path = get_run_tmp_file(Path("pytest_results.xml"))
result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"]
result_args = [f"--junitxml={result_file_path}", "-o", "junit_logging=all"]
pytest_test_env = test_env.copy()
pytest_test_env["PYTEST_PLUGINS"] = "codeflash.verification.pytest_plugin"
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
Expand Down Expand Up @@ -1243,7 +1237,7 @@ def run_line_profile_tests(
if timeout is not None:
pytest_args.append(f"--timeout={timeout}")
result_file_path = get_run_tmp_file(Path("pytest_results.xml"))
result_args = [f"--junitxml={result_file_path.as_posix()}", "-o", "junit_logging=all"]
result_args = [f"--junitxml={result_file_path}", "-o", "junit_logging=all"]
pytest_test_env = test_env.copy()
pytest_test_env["PYTEST_PLUGINS"] = "codeflash.verification.pytest_plugin"
blocklist_args = [f"-p no:{plugin}" for plugin in blocklisted_plugins]
Expand All @@ -1258,7 +1252,7 @@ def run_line_profile_tests(

def generate_concolic_tests(
self, test_cfg: Any, project_root: Path, function_to_optimize: FunctionToOptimize, function_to_optimize_ast: Any
) -> tuple[dict, str]:
) -> tuple[dict[str, Any], str]:
import ast
import importlib.util
import subprocess
Expand All @@ -1281,7 +1275,7 @@ def generate_concolic_tests(
crosshair_available = importlib.util.find_spec("crosshair") is not None

start_time = time.perf_counter()
function_to_concolic_tests: dict = {}
function_to_concolic_tests: dict[str, Any] = {}
concolic_test_suite_code = ""

if not crosshair_available:
Expand Down
8 changes: 3 additions & 5 deletions codeflash/languages/python/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from codeflash.cli_cmds.console import logger
from codeflash.code_utils.code_utils import custom_addopts
from codeflash.code_utils.shell_utils import get_cross_platform_subprocess_run_args
from codeflash.languages.registry import get_language_support

# Pattern to extract timing from stdout markers: !######...:<duration_ns>######!
Expand Down Expand Up @@ -92,11 +91,10 @@ def _ensure_runtime_files(project_root: Path, language: str = "javascript") -> N

def execute_test_subprocess(
cmd_list: list[str], cwd: Path, env: dict[str, str] | None, timeout: int = 600
) -> subprocess.CompletedProcess:
) -> subprocess.CompletedProcess[str]:
"""Execute a subprocess with the given command list, working directory, environment variables, and timeout."""
logger.debug(f"executing test run with command: {' '.join(cmd_list)}")
with custom_addopts():
run_args = get_cross_platform_subprocess_run_args(
cwd=cwd, env=env, timeout=timeout, check=False, text=True, capture_output=True
return subprocess.run(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will work on windows? @KRRT7

cmd_list, cwd=cwd, env=env, timeout=timeout, check=False, text=True, capture_output=True, close_fds=False
)
return subprocess.run(cmd_list, **run_args) # noqa: PLW1510
11 changes: 7 additions & 4 deletions codeflash/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,16 @@ class TestFile(BaseModel):

class TestFiles(BaseModel):
test_files: list[TestFile]
_seen_paths: set[Path] = PrivateAttr(default_factory=set)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why privateattr @KRRT7 ?


def model_post_init(self, __context: Any, /) -> None:
self._seen_paths = {tf.instrumented_behavior_file_path for tf in self.test_files}

def add(self, test_file: TestFile) -> None:
if test_file not in self.test_files:
key = test_file.instrumented_behavior_file_path
if key not in self._seen_paths:
self._seen_paths.add(key)
self.test_files.append(test_file)
else:
msg = "Test file already exists in the list"
raise ValueError(msg)

def get_by_original_file_path(self, file_path: Path) -> TestFile | None:
normalized = self._normalize_path_for_comparison(file_path)
Expand Down
44 changes: 44 additions & 0 deletions tests/test_test_files_add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from pathlib import Path

from codeflash.models.models import TestFile, TestFiles
from codeflash.models.test_type import TestType


class TestTestFilesAdd:
def test_add_unique_test_file(self) -> None:
tf = TestFiles(test_files=[])
test_file = TestFile(
instrumented_behavior_file_path=Path("/tmp/test_behavior.py"),
benchmarking_file_path=Path("/tmp/test_perf.py"),
test_type=TestType.GENERATED_REGRESSION,
)
tf.add(test_file)
assert len(tf.test_files) == 1
assert tf.test_files[0] is test_file

def test_add_duplicate_is_noop(self) -> None:
tf = TestFiles(test_files=[])
test_file = TestFile(
instrumented_behavior_file_path=Path("/tmp/test_behavior.py"),
benchmarking_file_path=Path("/tmp/test_perf.py"),
test_type=TestType.GENERATED_REGRESSION,
)
tf.add(test_file)
tf.add(test_file) # silent skip — first write wins
assert len(tf.test_files) == 1

def test_add_many_files_performance(self) -> None:
tf = TestFiles(test_files=[])
for i in range(100):
test_file = TestFile(
instrumented_behavior_file_path=Path(f"/tmp/test_behavior_{i}.py"),
benchmarking_file_path=Path(f"/tmp/test_perf_{i}.py"),
test_type=TestType.GENERATED_REGRESSION,
)
tf.add(test_file)

assert len(tf.test_files) == 100
assert len(tf._seen_paths) == 100
# Verify all paths are unique in the set
expected_paths = {Path(f"/tmp/test_behavior_{i}.py") for i in range(100)}
assert tf._seen_paths == expected_paths
Loading