-
Notifications
You must be signed in to change notification settings - Fork 4
[ACIX-1353] Improve dda info owners code behavior when running out of repo root
#244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cfa4c0a
1c7387e
4516981
b510387
90a6d01
5edb0b2
8e108cb
121cea4
9c0027e
7271b5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,23 +11,56 @@ | |
| from dda.utils.fs import Path | ||
|
|
||
| if TYPE_CHECKING: | ||
| import codeowners | ||
|
|
||
| from dda.cli.application import Application | ||
|
|
||
|
|
||
| def _find_existing_ancestor(path: Path) -> Path: | ||
| """Walk up from path to find the nearest existing ancestor.""" | ||
| check = path | ||
| while not check.exists(): | ||
| check = check.parent | ||
| return check | ||
|
|
||
|
|
||
| def _display_result(app: Application, res: dict[str, list[str | None]], *, json: bool) -> None: | ||
| if json: | ||
| from json import dumps | ||
|
|
||
| app.output(dumps(res)) | ||
| else: | ||
| # Note: paths here are in POSIX format, so they will use / even on Windows | ||
| display_res = {path: ", ".join(str(x) for x in owners) for path, owners in res.items()} | ||
| app.display_table(display_res, stderr=False) | ||
|
|
||
|
|
||
| def _load_owners(path: Path, owners_cache: dict[Path, codeowners.CodeOwners]) -> codeowners.CodeOwners: | ||
| if path in owners_cache: | ||
| return owners_cache[path] | ||
|
|
||
| import codeowners | ||
|
|
||
| # Can raise FileNotFoundError | ||
| owners = codeowners.CodeOwners(path.read_text(encoding="utf-8")) | ||
| owners_cache[path] = owners | ||
| return owners | ||
|
|
||
|
|
||
| @dynamic_command(short_help="Find code owners of files and directories", features=["codeowners"]) | ||
| @click.argument( | ||
| "paths", | ||
| type=click.Path(exists=True, path_type=Path), | ||
| type=click.Path(path_type=Path), | ||
| required=True, | ||
| nargs=-1, | ||
| ) | ||
| @click.option( | ||
| "--owners", | ||
| "-f", | ||
| "owners_filepath", | ||
| type=click.Path(exists=True, dir_okay=False, path_type=Path), | ||
| "owners_path_override", | ||
| type=click.Path(dir_okay=False, exists=True, path_type=Path), | ||
| help="Path to CODEOWNERS file", | ||
| default=".github/CODEOWNERS", | ||
| default=None, | ||
| ) | ||
| # TODO: Make this respect any --non-interactive flag or other way to detect CI environment | ||
| @click.option( | ||
|
|
@@ -36,26 +69,60 @@ | |
| help="Format the output as JSON", | ||
| ) | ||
| @pass_app | ||
| def cmd(app: Application, paths: tuple[Path, ...], *, owners_filepath: Path, json: bool) -> None: | ||
| def cmd(app: Application, paths: tuple[Path, ...], *, owners_path_override: Path | None, json: bool) -> None: | ||
| """ | ||
| Gets the code owners for the specified paths. | ||
| """ | ||
| import codeowners | ||
| import os | ||
|
|
||
| from dda.cli.info.owners.format import format_path_for_codeowners | ||
|
|
||
| owners = codeowners.CodeOwners(owners_filepath.read_text(encoding="utf-8")) | ||
| cwd = Path.cwd() | ||
|
|
||
| # The codeowners library expects paths to be in a specific format | ||
| res = { | ||
| (formatted_path := format_path_for_codeowners(path)): [owner[1] for owner in owners.of(formatted_path)] | ||
| for path in paths | ||
| } | ||
| if json: | ||
| from json import dumps | ||
| # Resolve explicit --owners path from CWD before any CWD changes | ||
| if owners_path_override is not None: | ||
| owners_path_override = owners_path_override.resolve() | ||
|
|
||
| app.output(dumps(res)) | ||
| else: | ||
| # Note: paths here are in POSIX format, so they will use / even on Windows | ||
| display_res = {path: ", ".join(owners) for path, owners in res.items()} | ||
| app.display_table(display_res, stderr=False) | ||
| # Process each path with dynamic repo root detection | ||
| res: dict[str, list[str | None]] = {} | ||
| owners_cache: dict[Path, codeowners.CodeOwners] = {} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't really get the point of this cache: do we expect several
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fundamentally yes it's about avoiding opening the same file and instantiating the I didn't find a simpler way because nothing prevents you from querying multiple files in different repos, i.e. I guess we could prevent that but I wanted the CLI to be as flexible and intuitive as possible
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok if you want to support |
||
|
|
||
| errors: list[str] = [] | ||
| for path in paths: | ||
| # Avoid resolving symlinks as they might point outside the repo | ||
| abs_path = Path(os.path.normpath(cwd / path)) | ||
|
|
||
| # Determine repo root from the file's location | ||
| try: | ||
| repo_root = app.tools.git.get_repo_root(_find_existing_ancestor(abs_path)) | ||
| except ValueError: | ||
| msg = f"Could not determine repo root for path: {abs_path}" | ||
|
Ishirui marked this conversation as resolved.
|
||
| errors.append(msg) | ||
| continue | ||
|
|
||
| repo_relative = Path(abs_path.relative_to(repo_root)) | ||
|
|
||
| # Load CODEOWNERS file from repo root | ||
| try: | ||
| owners_path = owners_path_override or repo_root / ".github" / "CODEOWNERS" | ||
| owners = _load_owners(owners_path, owners_cache) | ||
| except FileNotFoundError: | ||
| msg = f"CODEOWNERS file not found for {abs_path}: {owners_path} does not exist" | ||
| errors.append(msg) | ||
| continue | ||
|
|
||
| with repo_root.as_cwd(): | ||
| try: | ||
| formatted_path = format_path_for_codeowners(repo_relative) | ||
| except ValueError as e: | ||
| errors.append(str(e)) | ||
| continue | ||
| resolved_owners = owners.of(formatted_path) | ||
| res[formatted_path] = [owner[1] for owner in resolved_owners] if resolved_owners else [None] | ||
|
|
||
|
Comment on lines
+114
to
+122
|
||
| if res: | ||
| _display_result(app, res, json=json) | ||
|
|
||
| if errors: | ||
| app.display_error("\n".join(errors), stderr=True) | ||
| app.abort(code=1) | ||
Uh oh!
There was an error while loading. Please reload this page.