Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 256 additions & 23 deletions packages/testing/src/execution_testing/cli/hasher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import hashlib
import json
import sys
from dataclasses import dataclass, field
from enum import IntEnum, auto
from pathlib import Path
from typing import Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional, TypeVar

import click
from rich.console import Console
from rich.markup import escape as rich_escape


class HashableItemType(IntEnum):
Expand Down Expand Up @@ -42,26 +45,43 @@ def hash(self) -> bytes:
all_hash_bytes += item_hash_bytes
return hashlib.sha256(all_hash_bytes).digest()

def print(
def format_lines(
self,
*,
name: str,
level: int = 0,
print_type: Optional[HashableItemType] = None,
) -> None:
"""Print the hash of the item and sub-items."""
max_depth: Optional[int] = None,
) -> List[str]:
"""Return the hash lines for the item and sub-items."""
lines: List[str] = []
next_level = level
print_name = name

if level == 0 and self.parents:
separator = "::" if self.type == HashableItemType.TEST else "/"
print_name = f"{'/'.join(self.parents)}{separator}{name}"

if print_type is None or self.type >= print_type:
next_level += 1
print(f"{' ' * level}{print_name}: 0x{self.hash().hex()}")
lines.append(f"{' ' * level}{print_name}: 0x{self.hash().hex()}")

# Stop recursion if we've reached max_depth
if max_depth is not None and next_level > max_depth:
return lines

if self.items is not None:
for key, item in sorted(self.items.items()):
item.print(name=key, level=next_level, print_type=print_type)
lines.extend(
item.format_lines(
name=key,
level=next_level,
print_type=print_type,
max_depth=max_depth,
)
)

return lines

@classmethod
def from_json_file(
Expand Down Expand Up @@ -126,34 +146,247 @@ def from_folder(
return cls(type=HashableItemType.FOLDER, items=items, parents=parents)


@click.command()
def render_hash_report(
folder: Path,
*,
files: bool,
tests: bool,
root: bool,
name_override: Optional[str] = None,
max_depth: Optional[int] = None,
) -> List[str]:
"""Return canonical output lines for a folder."""
item = HashableItem.from_folder(folder_path=folder)
if root:
return [f"0x{item.hash().hex()}"]
print_type: Optional[HashableItemType] = None
if files:
print_type = HashableItemType.FILE
elif tests:
print_type = HashableItemType.TEST
name = name_override if name_override is not None else folder.name
return item.format_lines(
name=name, print_type=print_type, max_depth=max_depth
)


def collect_hashes(
item: HashableItem,
*,
path: str = "",
print_type: Optional[HashableItemType] = None,
max_depth: Optional[int] = None,
depth: int = 0,
) -> Dict[str, str]:
"""Collect hashes from item tree as {path: hash_hex}."""
result: Dict[str, str] = {}

if print_type is None or item.type >= print_type:
if path:
result[path] = f"0x{item.hash().hex()}"
depth += 1
if max_depth is not None and depth > max_depth:
return result

if item.items:
for name, child in sorted(item.items.items()):
child_path = f"{path}/{name}" if path else name
result.update(
collect_hashes(
child,
path=child_path,
print_type=print_type,
max_depth=max_depth,
depth=depth,
)
)

return result


def display_diff(
left: Dict[str, str],
right: Dict[str, str],
*,
left_label: str,
right_label: str,
) -> None:
"""Render diff showing only changed hashes."""
differences: List[tuple[str, str, str]] = []

for path in left:
right_hash = right.get(path, "<missing>")
if left[path] != right_hash:
differences.append((path, left[path], right_hash))

for path in right:
if path not in left:
differences.append((path, "<missing>", right[path]))

if not differences:
return

console = Console()
console.print("── Fixture Hash Differences ──", style="bold")
console.print(f"[dim]--- {left_label}[/dim]")
console.print(f"[dim]+++ {right_label}[/dim]")
console.print()

for path, left_hash, right_hash in differences:
depth = path.count("/")
indent = " " * (depth + 1)
console.print(f"{indent}[bold]{rich_escape(path)}[/bold]")
console.print(f"{indent} [red]- {left_hash}[/red]")
console.print(f"{indent} [green]+ {right_hash}[/green]")
console.print()


class DefaultGroup(click.Group):
"""Click group with a default command fallback."""

def __init__(
self, *args: Any, default_cmd_name: str = "hash", **kwargs: Any
):
super().__init__(*args, **kwargs)
self.default_cmd_name = default_cmd_name

def resolve_command(
self, ctx: click.Context, args: List[str]
) -> tuple[Optional[str], Optional[click.Command], List[str]]:
"""Resolve command, inserting default if no subcommand given."""
first_arg_idx = next(
(i for i, a in enumerate(args) if not a.startswith("-")), None
)
if (
first_arg_idx is not None
and args[first_arg_idx] not in self.commands
):
args = list(args)
args.insert(first_arg_idx, self.default_cmd_name)
return super().resolve_command(ctx, args)


F = TypeVar("F", bound=Callable[..., None])


def hash_options(func: F) -> F:
"""Decorator for common hash options."""
func = click.option(
"--root", "-r", is_flag=True, help="Only print hash of root folder"
)(func)
func = click.option(
"--tests", "-t", is_flag=True, help="Print hash of tests"
)(func)
func = click.option(
"--files", "-f", is_flag=True, help="Print hash of files"
)(func)
return func


@click.group(
cls=DefaultGroup,
default_cmd_name="hash",
context_settings={"help_option_names": ["-h", "--help"]},
)
def hasher() -> None:
"""Hash folders of JSON fixtures and compare them."""
pass


@hasher.command(name="hash")
@click.argument(
"folder_path_str",
type=click.Path(
exists=True, file_okay=False, dir_okay=True, readable=True
),
)
@click.option("--files", "-f", is_flag=True, help="Print hash of files")
@click.option("--tests", "-t", is_flag=True, help="Print hash of tests")
@hash_options
def hash_cmd(
folder_path_str: str, files: bool, tests: bool, root: bool
) -> None:
"""Hash folders of JSON fixtures and print their hashes."""
lines = render_hash_report(
Path(folder_path_str), files=files, tests=tests, root=root
)
for line in lines:
print(line)


@hasher.command(name="compare")
@click.argument(
"left_folder",
type=click.Path(
exists=True, file_okay=False, dir_okay=True, readable=True
),
)
@click.argument(
"right_folder",
type=click.Path(
exists=True, file_okay=False, dir_okay=True, readable=True
),
)
@click.option(
"--root", "-r", is_flag=True, help="Only print hash of root folder"
"--depth",
"-d",
type=int,
default=None,
help="Limit to N levels (0=root, 1=folders, 2=files, 3=tests).",
)
def main(folder_path_str: str, files: bool, tests: bool, root: bool) -> None:
"""Hash folders of JSON fixtures and print their hashes."""
folder_path: Path = Path(folder_path_str)
item = HashableItem.from_folder(folder_path=folder_path)
@hash_options
def compare_cmd(
left_folder: str,
right_folder: str,
files: bool,
tests: bool,
root: bool,
depth: Optional[int],
) -> None:
"""Compare two fixture directories and show differences."""
try:
left_item = HashableItem.from_folder(folder_path=Path(left_folder))
right_item = HashableItem.from_folder(folder_path=Path(right_folder))

if root:
print(f"0x{item.hash().hex()}")
return
if root:
if left_item.hash() == right_item.hash():
sys.exit(0)
left_hashes = {"root": f"0x{left_item.hash().hex()}"}
right_hashes = {"root": f"0x{right_item.hash().hex()}"}
else:
print_type: Optional[HashableItemType] = None
if files:
print_type = HashableItemType.FILE
elif tests:
print_type = HashableItemType.TEST

left_hashes = collect_hashes(
left_item, print_type=print_type, max_depth=depth
)
right_hashes = collect_hashes(
right_item, print_type=print_type, max_depth=depth
)

if left_hashes == right_hashes:
sys.exit(0)

display_diff(
left_hashes,
right_hashes,
left_label=left_folder,
right_label=right_folder,
)
sys.exit(1)
except PermissionError as e:
click.echo(f"Error: Permission denied - {e}", err=True)
sys.exit(2)
except (json.JSONDecodeError, KeyError, TypeError) as e:
click.echo(f"Error: Invalid fixture format - {e}", err=True)
sys.exit(2)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(2)

print_type: Optional[HashableItemType] = None
if files:
print_type = HashableItemType.FILE
elif tests:
print_type = HashableItemType.TEST

item.print(name=folder_path.name, print_type=print_type)
main = hasher # Entry point alias


if __name__ == "__main__":
Expand Down
Loading
Loading