Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
"""
Git guardrails for detecting and classifying destructive git operations.

Provides command validation to identify dangerous git and GitHub CLI/API
operations that could cause irreversible damage (branch deletion, force
pushes, history rewriting, etc.).

These checks are used by the system prompt builder to inject safety
instructions and can be used by future hook-based enforcement layers.
"""

import logging
import re

logger = logging.getLogger(__name__)


class GitGuardrailViolation:
"""Describes a guardrail violation found in a command."""

def __init__(self, rule: str, severity: str, command: str, explanation: str) -> None:
self.rule = rule
self.severity = severity # "block" or "warn"
self.command = command
self.explanation = explanation

def __repr__(self) -> str:
return f"GitGuardrailViolation(rule={self.rule!r}, severity={self.severity!r})"


# ---------------------------------------------------------------------------
# Destructive command patterns
# ---------------------------------------------------------------------------

# Patterns that should be blocked outright (irreversible / high-blast-radius)
_BLOCKED_PATTERNS: list[tuple[str, str, re.Pattern[str]]] = [
(
"delete_remote_ref",
"Deleting a remote branch/ref can permanently close associated PRs",
re.compile(
r"""
(?:gh\s+api|curl) # GitHub API call via gh or curl
.* # any intervening flags/args
-X\s*DELETE # HTTP DELETE method
.* # any intervening text
/git/refs/ # targeting a git ref
""",
re.VERBOSE | re.IGNORECASE,
),
),
(
"api_force_update_ref",
"Force-updating a remote ref via the GitHub API bypasses git safety mechanisms",
re.compile(
r"""
(?:gh\s+api|curl) # GitHub API call
.* # any intervening flags/args
(?:PATCH|PUT) # HTTP update method
.* # any intervening text
/git/refs/ # targeting a git ref
.* # any intervening text
["\']?force["\']?\s* # force parameter
:\s*true # set to true
""",
re.VERBOSE | re.IGNORECASE,
),
),
(
"api_create_commit_on_ref",
"Creating commits directly via the GitHub API bypasses local git safeguards",
re.compile(
r"""
(?:gh\s+api|curl) # GitHub API call
.* # any intervening flags/args
(?:POST|PATCH|PUT) # HTTP write method
.* # any intervening text
/git/(?:commits|trees|blobs) # low-level git data API
""",
re.VERBOSE | re.IGNORECASE,
),
),
(
"force_push",
"Force pushing overwrites remote history and can destroy others' work",
re.compile(
r"""
git\s+push\s+ # git push command
.* # any flags/args
--force(?!\-with\-lease) # --force but NOT --force-with-lease
""",
re.VERBOSE,
),
),
(
"force_push_short",
"Force pushing (-f) overwrites remote history and can destroy others' work",
re.compile(
r"""
git\s+push\s+ # git push command
.* # any flags/args
\s-[a-zA-Z]*f # short flag containing -f
""",
re.VERBOSE,
),
),
(
"push_to_main",
"Pushing directly to main/master can corrupt the default branch",
re.compile(
r"""
git\s+push\s+ # git push command
.* # remote name and flags
\s(?:main|master)\b # targeting main or master branch
""",
re.VERBOSE,
),
),
(
"reset_hard",
"git reset --hard discards all uncommitted changes irreversibly",
re.compile(
r"""
git\s+reset\s+ # git reset command
.* # any flags
--hard # hard reset flag
""",
re.VERBOSE,
),
),
(
"clean_force",
"git clean -fd permanently deletes untracked files and directories",
re.compile(
r"""
git\s+clean\s+ # git clean command
.* # any flags
-[a-zA-Z]*f # force flag (required for clean to run)
""",
re.VERBOSE,
),
),
(
"checkout_discard",
"git checkout -- . discards all unstaged changes irreversibly",
re.compile(
r"""
git\s+checkout\s+ # git checkout command
--\s+\. # discard all changes
""",
re.VERBOSE,
),
),
(
"branch_delete_remote",
"Deleting a remote branch can permanently close associated PRs",
re.compile(
r"""
git\s+push\s+ # git push command
\S+\s+ # remote name
--delete\s+ # delete flag
""",
re.VERBOSE,
),
),
(
"branch_delete_remote_colon",
"Deleting a remote branch via :branch syntax can permanently close associated PRs",
re.compile(
r"""
git\s+push\s+ # git push command
\S+\s+ # remote name
:\S+ # :branch (delete syntax)
""",
re.VERBOSE,
),
),
]

# Patterns that should generate warnings (risky but sometimes necessary)
_WARN_PATTERNS: list[tuple[str, str, re.Pattern[str]]] = [
(
"rebase",
"Rebasing rewrites commit history; create a backup branch first",
re.compile(
r"""
git\s+rebase\s+ # git rebase command
""",
re.VERBOSE,
),
),
(
"force_with_lease",
"Force push with lease is safer but still overwrites remote history",
re.compile(
r"""
git\s+push\s+ # git push command
.* # any flags/args
--force-with-lease # safer force push
""",
re.VERBOSE,
),
),
(
"amend_commit",
"Amending commits rewrites history; avoid if already pushed",
re.compile(
r"""
git\s+commit\s+ # git commit command
.* # any flags
--amend # amend flag
""",
re.VERBOSE,
),
),
]


def check_command(command: str) -> list[GitGuardrailViolation]:
"""Check a shell command for git guardrail violations.

Args:
command: The shell command string to validate.

Returns:
List of violations found (empty if command is safe).
"""
if not command or not command.strip():
return []

violations: list[GitGuardrailViolation] = []

for rule, explanation, pattern in _BLOCKED_PATTERNS:
if pattern.search(command):
violations.append(
GitGuardrailViolation(
rule=rule,
severity="block",
command=command,
explanation=explanation,
)
)

for rule, explanation, pattern in _WARN_PATTERNS:
if pattern.search(command):
violations.append(
GitGuardrailViolation(
rule=rule,
severity="warn",
command=command,
explanation=explanation,
)
)

return violations


def has_blocking_violation(command: str) -> bool:
"""Return True if the command contains any blocking git guardrail violation."""
violations = check_command(command)
return any(v.severity == "block" for v in violations)


def format_violations(violations: list[GitGuardrailViolation]) -> str:
"""Format violations into a human-readable message."""
if not violations:
return ""

lines = ["Git guardrail violations detected:"]
for v in violations:
marker = "BLOCKED" if v.severity == "block" else "WARNING"
lines.append(f" [{marker}] {v.rule}: {v.explanation}")
return "\n".join(lines)


# ---------------------------------------------------------------------------
# Token redaction helpers
# ---------------------------------------------------------------------------

# Patterns that match common token/secret formats in commands
_TOKEN_PATTERNS: list[re.Pattern[str]] = [
# GitHub PATs (classic and fine-grained)
re.compile(r"ghp_[A-Za-z0-9]{36,}"),
re.compile(r"github_pat_[A-Za-z0-9_]{36,}"),
# GitLab tokens
re.compile(r"glpat-[A-Za-z0-9\-_]{20,}"),
# Generic Bearer/token in URLs
re.compile(r"(?<=://)([^:]+):([^@]+)@", re.IGNORECASE),
]


def redact_tokens_in_command(command: str) -> str:
"""Redact known token patterns in a command string.

Args:
command: The command string that may contain tokens.

Returns:
Command with tokens replaced by [REDACTED].
"""
result = command
for pattern in _TOKEN_PATTERNS:
if pattern.groups:
# For patterns with groups (like URL credentials), replace the whole match
result = pattern.sub("[REDACTED]@", result)
else:
result = pattern.sub("[REDACTED]", result)
return result
46 changes: 46 additions & 0 deletions components/runners/ambient-runner/ambient_runner/platform/prompts.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,48 @@
"the feature branch (`{branch}`). If push fails, do NOT fall back to main.\n\n"
)

GIT_SAFETY_INSTRUCTIONS = (
"## Git Safety Guardrails\n\n"
"You MUST follow these rules when performing git operations. Violations can "
"cause **irreversible data loss** including destroyed PRs, lost review history, "
"and corrupted branches.\n\n"
"### Hard Rules (NEVER violate)\n\n"
"1. **NEVER delete remote branches or refs** — deleting a remote branch "
"permanently closes any associated PR and makes it unrestorable. Do NOT use "
"`git push --delete`, `git push origin :branch`, or "
"`gh api -X DELETE .../git/refs/...`.\n\n"
"2. **NEVER manipulate git refs via the GitHub/GitLab REST API** — if "
"`git push` fails, report the failure to the user and stop. Do NOT "
"circumvent push failures by using `gh api` or `curl` to PATCH/POST/DELETE "
"refs, or to create commits/trees/blobs directly via the Git Data API.\n\n"
"3. **NEVER force push** — do not use `git push --force` or `git push -f`. "
"If you must update a remote branch after a rebase, use "
"`git push --force-with-lease` and ONLY after getting explicit user approval.\n\n"
"4. **NEVER modify the user's default/main branch** — treat `main` and "
"`master` as read-only. Never push commits to them, never reset them, "
"never rebase onto them with a force push.\n\n"
"5. **NEVER run destructive local operations without a backup** — before "
"running `git reset --hard`, `git clean -fd`, `git checkout -- .`, or "
"any rebase, ALWAYS create a backup branch first:\n"
" ```\n"
" git branch backup-$(date +%s)\n"
" ```\n\n"
"6. **NEVER embed tokens or credentials in commands** — do not include "
"PATs, API keys, or passwords in git remote URLs, curl commands, or any "
"shell command. Use environment variables (e.g. `$GITHUB_TOKEN`) instead.\n\n"
"### Escalation Protocol\n\n"
"When a git operation fails, you MUST follow this protocol:\n"
"1. **Stop** — do not retry with a more aggressive variant.\n"
"2. **Diagnose** — read the error message and identify the root cause "
"(auth scope, permissions, branch protection, etc.).\n"
"3. **Report** — tell the user what failed and why.\n"
"4. **Wait** — let the user decide the next step. Do NOT autonomously "
"escalate to force pushes, API workarounds, or destructive operations.\n\n"
"Violating these rules can permanently destroy user work, close PRs, "
"and lose review history. When in doubt, ask the user.\n\n"
)


RUBRIC_EVALUATION_HEADER = "## Rubric Evaluation\n\n"

RUBRIC_EVALUATION_INTRO = (
Expand Down Expand Up @@ -215,6 +257,10 @@ def build_workspace_context_prompt(
prompt += f"- **repos/{repo_name}/**\n"
prompt += GIT_PUSH_STEPS.format(branch=push_branch)

# Git safety guardrails (always included when repos are present)
if repos_cfg:
prompt += GIT_SAFETY_INSTRUCTIONS

# Human-in-the-loop instructions
prompt += HUMAN_INPUT_INSTRUCTIONS

Expand Down
Loading
Loading