Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/foundation/models/capability.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class PolicyReasonCode(StrEnum):
SIDE_EFFECT_REQUIRES_APPROVAL = "side_effect_requires_approval"
UNDECLARED_SIDE_EFFECT = "undeclared_side_effect"
PATH_OUT_OF_SCOPE = "path_out_of_scope"
SCOPE_ESCALATION = "scope_escalation"
NETWORK_OUT_OF_SCOPE = "network_out_of_scope"
INVOCATION_LIMIT_EXCEEDED = "invocation_limit_exceeded"
RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"
Expand Down
7 changes: 6 additions & 1 deletion src/foundation/models/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

from pydantic import BaseModel, ConfigDict, Field, PositiveInt, field_validator, model_validator

from foundation.models.capability import CapabilitySnapshot, PolicyEvaluationRecord
from foundation.models.capability import (
CapabilitySnapshot,
PolicyEvaluationRecord,
PolicyReasonCode,
)


class StrictModel(BaseModel):
Expand Down Expand Up @@ -320,6 +324,7 @@ class PolicyDecision(StrictModel):
risk_categories: list[str] = Field(default_factory=list)
command_preview: str | None = None
paths: list[str] = Field(default_factory=list)
reason_codes: list[PolicyReasonCode] = Field(default_factory=list)


class ExecutionResult(StrictModel):
Expand Down
90 changes: 89 additions & 1 deletion src/foundation/services/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
PolicyDecision,
PolicyDecisionType,
PolicyEvaluationRecord,
PolicyReasonCode,
QuestionAction,
ShellAction,
ToolCall,
Expand Down Expand Up @@ -68,6 +69,7 @@
from foundation.services.git_service import GitService
from foundation.services.guardrails import GuardrailPolicyEngine
from foundation.services.observer import ObserverService
from foundation.services.scope_grants import ScopeGrantStore
from foundation.services.shell import (
ExecutionMode,
OutputCallback,
Expand Down Expand Up @@ -125,6 +127,7 @@ def __init__(
file_service: FileService | None = None,
git_service: GitService | None = None,
question_callback: Callable[[QuestionAction], str | None] | None = None,
grant_store: ScopeGrantStore | None = None,
) -> None:
self._workspace_root = Path(workspace_root).expanduser().resolve()
self._shell_runtime = shell_runtime
Expand All @@ -137,6 +140,7 @@ def __init__(
self._file_service = file_service
self._git_service = git_service
self._question_callback = question_callback
self._grant_store = grant_store

def execute(
self,
Expand Down Expand Up @@ -230,6 +234,67 @@ def _handle_question(
artifact={**artifact, "answer": answer},
)

def _handle_scope_escalation(
self,
action: PlannedAction,
decision: PolicyDecision,
*,
request_id: str,
session_id: str | None,
) -> ExecutionResult | None:
"""Ask the user to allow an out-of-scope read; grant on approval.

Returns a BLOCKED result if the user declines (or cannot be prompted),
or None if access was granted and execution should proceed.
"""
paths = list(decision.paths)
display = ", ".join(paths) if paths else "a path outside your workspace"
question = QuestionAction(
prompt=(
f"The agent wants to read {display}, which is outside your workspace "
"root. Allow read access for this session?"
),
options=["Allow for this session", "Deny"],
allow_free_text=False,
)
self._observer.emit(
EVENT_QUESTION_ASKED,
payload={
"request_id": request_id,
"action_id": action.id,
"prompt": question.prompt,
"kind": "scope_escalation",
},
session_id=session_id,
logger_name="foundation.services.executor",
)
answer = self._question_callback(question) if self._question_callback is not None else None
self._observer.emit(
EVENT_QUESTION_ANSWERED,
payload={
"request_id": request_id,
"action_id": action.id,
"granted": answer == "Allow for this session",
},
session_id=session_id,
logger_name="foundation.services.executor",
)
if answer != "Allow for this session":
return ExecutionResult(
action_id=action.id,
status=ExecutionStatus.BLOCKED,
summary="Out-of-scope read was not approved.",
error="Out-of-scope read was not approved.",
)
if self._grant_store is not None:
for raw in paths:
target = Path(raw).expanduser()
if not target.is_absolute():
target = self._workspace_root / target
root = target if target.is_dir() else target.parent
self._grant_store.grant(root)
return None

def _handle_action(
self,
action: PlannedAction,
Expand Down Expand Up @@ -264,7 +329,30 @@ def _handle_action(
None,
)

if decision.decision is PolicyDecisionType.REQUIRE_APPROVAL:
if (
decision.decision is PolicyDecisionType.REQUIRE_APPROVAL
and PolicyReasonCode.SCOPE_ESCALATION in decision.reason_codes
):
blocked = self._handle_scope_escalation(
action,
decision,
request_id=request_id,
session_id=session_id,
)
if blocked is not None:
return (blocked, None, None)
# Granted: fall through to normal execution with the grant recorded.
decision = PolicyDecision(
action_id=decision.action_id,
decision=PolicyDecisionType.ALLOW,
reason="Out-of-scope read approved for this session.",
risk_categories=list(decision.risk_categories),
command_preview=decision.command_preview,
paths=list(decision.paths),
)
approval_request = None
approval_resolution = None
elif decision.decision is PolicyDecisionType.REQUIRE_APPROVAL:
if policy_evaluation is None:
raise RuntimeError(
f"Approval-required action {action.id!r} is missing a policy evaluation."
Expand Down
39 changes: 36 additions & 3 deletions src/foundation/services/file_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
FileServiceError,
FileWriteRequest,
)
from foundation.services.scope_grants import ScopeGrantStore
from foundation.services.staging import WorkspaceRewriteStager

_MAX_READ_BYTES = 256 * 1024 # 256 KB
Expand Down Expand Up @@ -231,8 +232,15 @@ def _norm(s: str) -> str:
class FileService:
"""Workspace-bound text file operations for v3 file capabilities."""

def __init__(self, *, workspace_root: Path, state_dir: Path) -> None:
def __init__(
self,
*,
workspace_root: Path,
state_dir: Path,
read_grant_store: ScopeGrantStore | None = None,
) -> None:
self._workspace_root = Path(workspace_root).expanduser().resolve()
self._read_grant_store = read_grant_store
self._stager = WorkspaceRewriteStager(
workspace_root=self._workspace_root,
state_dir=state_dir,
Expand All @@ -257,6 +265,31 @@ def _resolve_path(self, raw_path: str) -> Path:
)
return resolved

def _resolve_read_path(self, raw_path: str) -> Path:
"""Resolve a read path, also allowing session-granted out-of-scope roots.

Reads may target the workspace or any directory the user approved via a
scope escalation. Writes never use this — they stay workspace-confined.
"""
candidate = Path(raw_path)
resolved = (
candidate.resolve()
if candidate.is_absolute()
else (self._workspace_root / candidate).resolve()
)
try:
resolved.relative_to(self._workspace_root)
return resolved
except ValueError:
pass
if self._read_grant_store is not None and self._read_grant_store.is_granted(resolved):
return resolved
_raise(
FileErrorCode.PATH_OUTSIDE_WORKSPACE,
"Path escapes the workspace boundary.",
path=raw_path,
)

# -- raw I/O helpers ----------------------------------------------------

def _read_raw(self, resolved: Path) -> tuple[str, str]:
Expand Down Expand Up @@ -309,7 +342,7 @@ def _atomic_write(self, target: Path, content: str) -> None:

def read(self, request: FileReadRequest) -> FileReadResult:
"""Read one workspace text file up to 256 KB."""
resolved = self._resolve_path(request.path)
resolved = self._resolve_read_path(request.path)
if not resolved.exists():
_raise(FileErrorCode.FILE_NOT_FOUND, "File does not exist.", path=request.path)
file_size = resolved.stat().st_size
Expand All @@ -332,7 +365,7 @@ def read(self, request: FileReadRequest) -> FileReadResult:

def read_chunk(self, request: FileReadChunkRequest) -> FileReadChunkResult:
"""Read a line-based chunk from a workspace text file."""
resolved = self._resolve_path(request.path)
resolved = self._resolve_read_path(request.path)
if not resolved.exists():
_raise(FileErrorCode.FILE_NOT_FOUND, "File does not exist.", path=request.path)

Expand Down
35 changes: 35 additions & 0 deletions src/foundation/services/guardrails.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
TrustTier,
)
from foundation.services.capabilities import SHELL_CAPABILITY_ID, CapabilityRegistry
from foundation.services.scope_grants import ScopeGrantStore
from foundation.settings import ApprovalMode

_SIMPLE_NO_ARG_COMMANDS = {"date", "pwd", "uname", "whoami"}
Expand Down Expand Up @@ -118,9 +119,11 @@ def __init__(
*,
workspace_root: Path,
capability_registry: CapabilityRegistry | None = None,
grant_store: ScopeGrantStore | None = None,
) -> None:
self._workspace_root = Path(workspace_root).expanduser().resolve()
self._capability_registry = capability_registry
self._grant_store = grant_store
self._invocation_counts: dict[str, int] = defaultdict(int)
self._invocation_log: dict[str, deque[float]] = defaultdict(deque)

Expand Down Expand Up @@ -211,6 +214,7 @@ def to_policy_decision(self, evaluation: PolicyEvaluationRecord) -> PolicyDecisi
risk_categories=self._risk_categories_for_evaluation(evaluation),
command_preview=evaluation.policy_input.command_preview,
paths=list(evaluation.policy_input.requested_paths),
reason_codes=list(evaluation.verdict.reason_codes),
)

def register_invocation(self, evaluation: PolicyEvaluationRecord) -> None:
Expand Down Expand Up @@ -549,6 +553,16 @@ def _evaluate_input(
policy_input,
)
if not self._paths_in_scope(policy_input):
if self._can_escalate_scope(policy_input):
return CapabilityPolicyVerdict(
outcome=CapabilityPolicyOutcome.REQUIRE_APPROVAL,
summary=(
"Requested path is outside the workspace; reading it needs "
"your approval."
),
reason_codes=[PolicyReasonCode.SCOPE_ESCALATION],
constraints=policy_input.constraints.model_copy(deep=True),
)
return self._blocked_verdict(
"Requested paths are outside the capability's declared path scope.",
PolicyReasonCode.PATH_OUT_OF_SCOPE,
Expand Down Expand Up @@ -618,6 +632,23 @@ def _evaluate_input(
constraints=effective_constraints,
)

def _can_escalate_scope(self, policy_input: CapabilityPolicyInput) -> bool:
"""Whether an out-of-scope path may be escalated to the user.

Only read-only typed file reads are eligible: a single grant can be
honored by the file service, and reads are the lowest-risk escalation.
Writes, shell, and discovery stay hard-blocked.
"""
if self._grant_store is None:
return False
if policy_input.runtime_endpoint not in {
"builtin.file.read",
"builtin.file.read_chunk",
}:
return False
effects = set(policy_input.requested_side_effects)
return effects == {"filesystem_read"}

def _blocked_verdict(
self,
summary: str,
Expand Down Expand Up @@ -694,6 +725,10 @@ def _path_matches_rules(
if not rules:
return False
resolved_path = path.resolve()
# A session-granted out-of-scope read root counts as in-scope, so a
# second read under an already-approved root does not re-prompt.
if self._grant_store is not None and self._grant_store.is_granted(resolved_path):
return True
for rule in rules:
if rule.kind is CapabilityScopeKind.ANY:
return True
Expand Down
6 changes: 6 additions & 0 deletions src/foundation/services/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from foundation.services.observer import EventSink, ObserverService
from foundation.services.planner import PlannerService, PlanningError
from foundation.services.provider import ProviderAdapter, ProviderError
from foundation.services.scope_grants import ScopeGrantStore
from foundation.services.shell import OutputCallback, ShellRuntime
from foundation.services.tools import LocalToolService
from foundation.settings import ApprovalMode
Expand Down Expand Up @@ -452,9 +453,12 @@ def __init__(
store=CapabilityStore(store_root),
tool_service=self._tool_service,
)
# Shared, session-scoped read grants for out-of-workspace escalation.
self._grant_store = ScopeGrantStore()
self._policy_engine = policy_engine or GuardrailPolicyEngine(
workspace_root=self._workspace_root,
capability_registry=self._capability_registry,
grant_store=self._grant_store,
)
self._approval_service = approval_service or ApprovalService(mode=approval_mode)
self._history_store = history_store
Expand Down Expand Up @@ -488,9 +492,11 @@ def __init__(
file_service=FileService(
workspace_root=self._workspace_root,
state_dir=state_dir,
read_grant_store=self._grant_store,
),
git_service=self._git_service,
question_callback=question_callback,
grant_store=self._grant_store,
)

def set_event_sink(self, event_sink: EventSink | None) -> None:
Expand Down
4 changes: 4 additions & 0 deletions src/foundation/services/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ def _base_plan_messages(
"reading, search, or file edits. "
"Use shell actions for running tests, builds, linters, and "
"environment inspection only. "
"To read a file outside the workspace root, issue a normal "
"foundation.file.read with its absolute path; the user will be asked "
"to approve out-of-scope read access rather than it being silently "
"blocked. Do not refuse preemptively. "
"Shell args are passed directly to the target binary via execve, "
"NOT interpreted by a shell. Do NOT wrap args in single or double "
"quotes, do NOT expect glob expansion or variable substitution, "
Expand Down
34 changes: 34 additions & 0 deletions src/foundation/services/scope_grants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Session-scoped, read-only out-of-workspace access grants.

When the user approves an out-of-scope read escalation, the granted directory
root is recorded here. Both the guardrail policy engine and the file service
consult the same store so a single grant unblocks reads under that root for the
rest of the session. Grants are read-only and in-memory (never persisted).
"""

from __future__ import annotations

from pathlib import Path


class ScopeGrantStore:
"""A set of additional directory roots the user has approved for reading."""

def __init__(self) -> None:
self._roots: set[Path] = set()

def grant(self, root: Path) -> None:
"""Record a directory root as readable for the rest of the session."""
self._roots.add(Path(root).expanduser().resolve())

def is_granted(self, path: Path) -> bool:
"""Return whether ``path`` lies within any granted root."""
resolved = Path(path).expanduser().resolve()
for root in self._roots:
if resolved == root or resolved.is_relative_to(root):
return True
return False

@property
def roots(self) -> frozenset[Path]:
return frozenset(self._roots)
Loading
Loading