Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/en/s01-the-agent-loop.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ One exit condition controls the entire flow. The loop runs until the model stops
1. User prompt becomes the first message.

```python
# Start the conversation with the user's request.
# The model only sees what we store in `messages`.
messages.append({"role": "user", "content": query})
```

2. Send messages + tool definitions to the LLM.

```python
# Send the entire conversation state plus the tool definitions.
# `tools=TOOLS` is what tells the model which actions it may call.
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
Expand All @@ -45,41 +49,55 @@ response = client.messages.create(
3. Append the assistant response. Check `stop_reason` -- if the model didn't call a tool, we're done.

```python
# Preserve the assistant turn exactly as returned.
# `response.content` may contain text blocks and tool calls together.
messages.append({"role": "assistant", "content": response.content})
# If the model is done thinking with tools, exit the loop.
if response.stop_reason != "tool_use":
return
```

4. Execute each tool call, collect results, append as a user message. Loop back to step 2.

```python
# Gather every tool result from this assistant turn into one payload.
results = []
for block in response.content:
# A single response can contain multiple content blocks.
# Only `tool_use` blocks should be executed locally.
if block.type == "tool_use":
# Read the command proposed by the model and run it.
output = run_bash(block.input["command"])
results.append({
# `tool_result` links this output back to the original tool call.
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
})
# Feed the tool outputs back so the model can continue reasoning.
messages.append({"role": "user", "content": results})
```

Assembled into one function:

```python
def agent_loop(query):
# Begin with a fresh conversation containing only the current task.
messages = [{"role": "user", "content": query}]
while True:
# Ask the model what to do next given the conversation so far.
response = client.messages.create(
model=MODEL, system=SYSTEM, messages=messages,
tools=TOOLS, max_tokens=8000,
)
# Save the assistant turn before inspecting it.
messages.append({"role": "assistant", "content": response.content})

# No tool call means the agent has reached its final answer.
if response.stop_reason != "tool_use":
return

# Otherwise, execute each requested tool and collect the outputs.
results = []
for block in response.content:
if block.type == "tool_use":
Expand All @@ -89,6 +107,7 @@ def agent_loop(query):
"tool_use_id": block.id,
"content": output,
})
# Turn tool outputs into the next user message, then loop again.
messages.append({"role": "user", "content": results})
```

Expand Down
9 changes: 9 additions & 0 deletions docs/en/s02-tool-use.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,18 @@ One lookup replaces any if/elif chain.

```python
def safe_path(p: str) -> Path:
# Resolve the requested path relative to the workspace root.
path = (WORKDIR / p).resolve()
# Reject ../-style escapes before any file operation happens.
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path

def run_read(path: str, limit: int = None) -> str:
# Reuse the sandbox check before touching the filesystem.
text = safe_path(path).read_text()
lines = text.splitlines()
# Trim large files so one read does not flood the model context.
if limit and limit < len(lines):
lines = lines[:limit]
return "\n".join(lines)[:50000]
Expand All @@ -52,6 +56,8 @@ def run_read(path: str, limit: int = None) -> str:

```python
TOOL_HANDLERS = {
# Keys match tool names exposed to the model.
# Values adapt JSON tool input into normal Python calls.
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
Expand All @@ -65,10 +71,13 @@ TOOL_HANDLERS = {
```python
for block in response.content:
if block.type == "tool_use":
# Route by tool name instead of hardcoding one branch per tool.
handler = TOOL_HANDLERS.get(block.name)
# Unknown tools return an error string instead of crashing the loop.
output = handler(**block.input) if handler \
else f"Unknown tool: {block.name}"
results.append({
# The model needs both the call id and the tool output.
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
Expand Down
6 changes: 6 additions & 0 deletions docs/en/s03-todo-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,16 @@ On multi-step tasks, the model loses track. It repeats work, skips steps, or wan
```python
class TodoManager:
def update(self, items: list) -> str:
# Normalize incoming items and count how many tasks are actively being worked.
validated, in_progress_count = [], 0
for item in items:
status = item.get("status", "pending")
# Missing status means a new task defaults to pending.
if status == "in_progress":
in_progress_count += 1
validated.append({"id": item["id"], "text": item["text"],
"status": status})
# This rule forces the model to focus on one step at a time.
if in_progress_count > 1:
raise ValueError("Only one task can be in_progress")
self.items = validated
Expand All @@ -57,6 +60,7 @@ class TodoManager:
```python
TOOL_HANDLERS = {
# ...base tools...
# Planning plugs into dispatch exactly like any other tool.
"todo": lambda **kw: TODO.update(kw["items"]),
}
```
Expand All @@ -66,9 +70,11 @@ TOOL_HANDLERS = {
```python
if rounds_since_todo >= 3 and messages:
last = messages[-1]
# Only inject reminders into a user turn that already contains tool results.
if last["role"] == "user" and isinstance(last.get("content"), list):
last["content"].insert(0, {
"type": "text",
# Prepend a lightweight nudge before the normal tool_result payload.
"text": "<reminder>Update your todos.</reminder>",
})
```
Expand Down
7 changes: 7 additions & 0 deletions docs/en/s04-subagent.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Parent context stays clean. Subagent context is discarded.

```python
PARENT_TOOLS = CHILD_TOOLS + [
# Only the parent can delegate.
# Child agents keep the base tools but do not get `task` again.
{"name": "task",
"description": "Spawn a subagent with fresh context.",
"input_schema": {
Expand All @@ -46,26 +48,31 @@ PARENT_TOOLS = CHILD_TOOLS + [

```python
def run_subagent(prompt: str) -> str:
# Start a completely fresh conversation for the delegated subtask.
sub_messages = [{"role": "user", "content": prompt}]
for _ in range(30): # safety limit
# The child uses the same loop shape, but only with CHILD_TOOLS.
response = client.messages.create(
model=MODEL, system=SUBAGENT_SYSTEM,
messages=sub_messages,
tools=CHILD_TOOLS, max_tokens=8000,
)
sub_messages.append({"role": "assistant",
"content": response.content})
# No more tool calls means the child is ready to summarize.
if response.stop_reason != "tool_use":
break
results = []
for block in response.content:
if block.type == "tool_use":
# Reuse the normal handlers inside the child context.
handler = TOOL_HANDLERS.get(block.name)
output = handler(**block.input)
results.append({"type": "tool_result",
"tool_use_id": block.id,
"content": str(output)[:50000]})
sub_messages.append({"role": "user", "content": results})
# Return only the final text; discard the child's detailed transcript.
return "".join(
b.text for b in response.content if hasattr(b, "text")
) or "(no summary)"
Expand Down
6 changes: 6 additions & 0 deletions docs/en/s05-skill-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,32 +52,38 @@ class SkillLoader:
def __init__(self, skills_dir: Path):
self.skills = {}
for f in sorted(skills_dir.rglob("SKILL.md")):
# Read one markdown file, then split metadata from instruction body.
text = f.read_text()
meta, body = self._parse_frontmatter(text)
# Frontmatter `name` wins; folder name is the fallback identifier.
name = meta.get("name", f.parent.name)
self.skills[name] = {"meta": meta, "body": body}

def get_descriptions(self) -> str:
lines = []
for name, skill in self.skills.items():
desc = skill["meta"].get("description", "")
# These short summaries are cheap enough to keep in the system prompt.
lines.append(f" - {name}: {desc}")
return "\n".join(lines)

def get_content(self, name: str) -> str:
skill = self.skills.get(name)
if not skill:
return f"Error: Unknown skill '{name}'."
# Wrap the full body so the model can recognize the injected skill payload.
return f"<skill name=\"{name}\">\n{skill['body']}\n</skill>"
```

3. Layer 1 goes into the system prompt. Layer 2 is just another tool handler.

```python
# Keep only lightweight skill descriptions in the always-on prompt.
SYSTEM = f"""You are a coding agent at {WORKDIR}.
Skills available:
{SKILL_LOADER.get_descriptions()}"""

# The expensive instruction body is fetched on demand through a tool call.
TOOL_HANDLERS = {
# ...base tools...
"load_skill": lambda **kw: SKILL_LOADER.get_content(kw["name"]),
Expand Down
10 changes: 9 additions & 1 deletion docs/en/s06-context-compact.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,19 @@ continue [Layer 2: auto_compact]

```python
def micro_compact(messages: list) -> list:
# Collect every tool_result so we can decide which ones are safe to shrink.
tool_results = []
for i, msg in enumerate(messages):
if msg["role"] == "user" and isinstance(msg.get("content"), list):
for j, part in enumerate(msg["content"]):
if isinstance(part, dict) and part.get("type") == "tool_result":
tool_results.append((i, j, part))
# Keep the newest tool outputs verbatim for local continuity.
if len(tool_results) <= KEEP_RECENT:
return messages
for _, _, part in tool_results[:-KEEP_RECENT]:
if len(part.get("content", "")) > 100:
# Old, bulky output becomes a placeholder instead of raw transcript.
part["content"] = f"[Previous: used {tool_name}]"
return messages
```
Expand All @@ -71,15 +74,17 @@ def auto_compact(messages: list) -> list:
with open(transcript_path, "w") as f:
for msg in messages:
f.write(json.dumps(msg, default=str) + "\n")
# LLM summarizes
# Ask the model to compress the conversation into a shorter continuity note.
response = client.messages.create(
model=MODEL,
messages=[{"role": "user", "content":
# Bound the payload so compaction itself does not overflow context.
"Summarize this conversation for continuity..."
+ json.dumps(messages, default=str)[:80000]}],
max_tokens=2000,
)
return [
# Replace the long transcript with a compact handoff summary.
{"role": "user", "content": f"[Compressed]\n\n{response.content[0].text}"},
{"role": "assistant", "content": "Understood. Continuing."},
]
Expand All @@ -92,12 +97,15 @@ def auto_compact(messages: list) -> list:
```python
def agent_loop(messages: list):
while True:
# Layer 1: silently shrink stale tool output every round.
micro_compact(messages) # Layer 1
if estimate_tokens(messages) > THRESHOLD:
# Layer 2: reset the active transcript when token pressure is too high.
messages[:] = auto_compact(messages) # Layer 2
response = client.messages.create(...)
# ... tool execution ...
if manual_compact:
# Layer 3: let the model request compaction explicitly after a milestone.
messages[:] = auto_compact(messages) # Layer 3
```

Expand Down
5 changes: 5 additions & 0 deletions docs/en/s07-task-system.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ class TaskManager:
def __init__(self, tasks_dir: Path):
self.dir = tasks_dir
self.dir.mkdir(exist_ok=True)
# Recover the next ID from disk so restarts do not overwrite tasks.
self._next_id = self._max_id() + 1

def create(self, subject, description=""):
task = {"id": self._next_id, "subject": subject,
"status": "pending", "blockedBy": [],
"blocks": [], "owner": ""}
# Persist immediately so the board survives compaction or process exit.
self._save(task)
self._next_id += 1
return json.dumps(task, indent=2)
Expand All @@ -72,6 +74,7 @@ class TaskManager:
def _clear_dependency(self, completed_id):
for f in self.dir.glob("task_*.json"):
task = json.loads(f.read_text())
# Remove the finished task from every dependent's waiting list.
if completed_id in task.get("blockedBy", []):
task["blockedBy"].remove(completed_id)
self._save(task)
Expand All @@ -84,6 +87,7 @@ def update(self, task_id, status=None,
add_blocked_by=None, add_blocks=None):
task = self._load(task_id)
if status:
# Status changes are the trigger point for graph side effects.
task["status"] = status
if status == "completed":
self._clear_dependency(task_id)
Expand All @@ -95,6 +99,7 @@ def update(self, task_id, status=None,
```python
TOOL_HANDLERS = {
# ...base tools...
# Task graph operations become first-class tools the model can call directly.
"task_create": lambda **kw: TASKS.create(kw["subject"]),
"task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status")),
"task_list": lambda **kw: TASKS.list_all(),
Expand Down
6 changes: 6 additions & 0 deletions docs/en/s08-background-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Agent --[spawn A]--[spawn B]--[other work]----
class BackgroundManager:
def __init__(self):
self.tasks = {}
# Finished background work waits here until the main loop injects it.
self._notification_queue = []
self._lock = threading.Lock()
```
Expand All @@ -46,8 +47,10 @@ class BackgroundManager:

```python
def run(self, command: str) -> str:
# Give each background job a stable ID for later status checks.
task_id = str(uuid.uuid4())[:8]
self.tasks[task_id] = {"status": "running", "command": command}
# Daemon threads let long-running subprocesses overlap with agent thinking.
thread = threading.Thread(
target=self._execute, args=(task_id, command), daemon=True)
thread.start()
Expand All @@ -65,6 +68,7 @@ def _execute(self, task_id, command):
except subprocess.TimeoutExpired:
output = "Error: Timeout (300s)"
with self._lock:
# Queue a short completion notice for the next loop iteration.
self._notification_queue.append({
"task_id": task_id, "result": output[:500]})
```
Expand All @@ -74,11 +78,13 @@ def _execute(self, task_id, command):
```python
def agent_loop(messages: list):
while True:
# Pull completed background jobs into context before the next LLM call.
notifs = BG.drain_notifications()
if notifs:
notif_text = "\n".join(
f"[bg:{n['task_id']}] {n['result']}" for n in notifs)
messages.append({"role": "user",
# Structured tags help the model recognize async updates.
"content": f"<background-results>\n{notif_text}\n"
f"</background-results>"})
messages.append({"role": "assistant",
Expand Down
Loading