forked from anthropics/claude-agent-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_mode_ipython.py
More file actions
229 lines (175 loc) · 7.84 KB
/
streaming_mode_ipython.py
File metadata and controls
229 lines (175 loc) · 7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#!/usr/bin/env python3
"""
IPython-friendly code snippets for ClaudeSDKClient streaming mode.
These examples are designed to be copy-pasted directly into IPython.
Each example is self-contained and can be run independently.
The queries are intentionally simplistic. In reality, a query can be a more
complex task that Claude SDK uses its agentic capabilities and tools (e.g. run
bash commands, edit files, search the web, fetch web content) to accomplish.
"""
# ============================================================================
# BASIC STREAMING
# ============================================================================
from claude_agent_sdk import AssistantMessage, ClaudeSDKClient, ResultMessage, TextBlock
async with ClaudeSDKClient() as client:
print("User: What is 2+2?")
await client.query("What is 2+2?")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
# ============================================================================
# STREAMING WITH REAL-TIME DISPLAY
# ============================================================================
import asyncio
from claude_agent_sdk import AssistantMessage, ClaudeSDKClient, TextBlock
async with ClaudeSDKClient() as client:
async def send_and_receive(prompt):
print(f"User: {prompt}")
await client.query(prompt)
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
await send_and_receive("Tell me a short joke")
print("\n---\n")
await send_and_receive("Now tell me a fun fact")
# ============================================================================
# PERSISTENT CLIENT FOR MULTIPLE QUESTIONS
# ============================================================================
from claude_agent_sdk import AssistantMessage, ClaudeSDKClient, TextBlock
# Create client
client = ClaudeSDKClient()
await client.connect()
# Helper to get response
async def get_response():
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
# Use it multiple times
print("User: What's 2+2?")
await client.query("What's 2+2?")
await get_response()
print("User: What's 10*10?")
await client.query("What's 10*10?")
await get_response()
# Don't forget to disconnect when done
await client.disconnect()
# ============================================================================
# WITH INTERRUPT CAPABILITY
# ============================================================================
# IMPORTANT: Interrupts require active message consumption. You must be
# consuming messages from the client for the interrupt to be processed.
from claude_agent_sdk import AssistantMessage, ClaudeSDKClient, TextBlock
async with ClaudeSDKClient() as client:
print("\n--- Sending initial message ---\n")
# Send a long-running task
print("User: Count from 1 to 100, run bash sleep for 1 second in between")
await client.query("Count from 1 to 100, run bash sleep for 1 second in between")
# Create a background task to consume messages
messages_received = []
interrupt_sent = False
async def consume_messages():
async for msg in client.receive_messages():
messages_received.append(msg)
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
# Check if we got a result after interrupt
if isinstance(msg, ResultMessage) and interrupt_sent:
break
# Start consuming messages in the background
consume_task = asyncio.create_task(consume_messages())
# Wait a bit then send interrupt
await asyncio.sleep(10)
print("\n--- Sending interrupt ---\n")
interrupt_sent = True
await client.interrupt()
# Wait for the consume task to finish
await consume_task
# Send a new message after interrupt
print("\n--- After interrupt, sending new message ---\n")
await client.query("Just say 'Hello! I was interrupted.'")
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
# ============================================================================
# ERROR HANDLING PATTERN
# ============================================================================
from claude_agent_sdk import AssistantMessage, ClaudeSDKClient, TextBlock
try:
async with ClaudeSDKClient() as client:
print("User: Run a bash sleep command for 60 seconds")
await client.query("Run a bash sleep command for 60 seconds")
# Timeout after 20 seconds
messages = []
async with asyncio.timeout(20.0):
async for msg in client.receive_response():
messages.append(msg)
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
except asyncio.TimeoutError:
print("Request timed out after 20 seconds")
except Exception as e:
print(f"Error: {e}")
# ============================================================================
# SENDING ASYNC ITERABLE OF MESSAGES
# ============================================================================
from claude_agent_sdk import AssistantMessage, ClaudeSDKClient, TextBlock
async def message_generator():
"""Generate multiple messages as an async iterable."""
print("User: I have two math questions.")
yield {
"type": "user",
"message": {"role": "user", "content": "I have two math questions."},
"parent_tool_use_id": None,
"session_id": "math-session"
}
print("User: What is 25 * 4?")
yield {
"type": "user",
"message": {"role": "user", "content": "What is 25 * 4?"},
"parent_tool_use_id": None,
"session_id": "math-session"
}
print("User: What is 100 / 5?")
yield {
"type": "user",
"message": {"role": "user", "content": "What is 100 / 5?"},
"parent_tool_use_id": None,
"session_id": "math-session"
}
async with ClaudeSDKClient() as client:
# Send async iterable instead of string
await client.query(message_generator())
async for msg in client.receive_response():
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
# ============================================================================
# COLLECTING ALL MESSAGES INTO A LIST
# ============================================================================
from claude_agent_sdk import AssistantMessage, ClaudeSDKClient, TextBlock
async with ClaudeSDKClient() as client:
print("User: What are the primary colors?")
await client.query("What are the primary colors?")
# Collect all messages into a list
messages = [msg async for msg in client.receive_response()]
# Process them afterwards
for msg in messages:
if isinstance(msg, AssistantMessage):
for block in msg.content:
if isinstance(block, TextBlock):
print(f"Claude: {block.text}")
elif isinstance(msg, ResultMessage):
print(f"Total messages: {len(messages)}")