Skip to content

Commit e5f4e05

Browse files
committed
alternate customer service implementation
1 parent 47d4f70 commit e5f4e05

3 files changed

Lines changed: 128 additions & 54 deletions

File tree

openai_agents/run_customer_service_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ async def main():
6565
CustomerServiceWorkflow.process_user_message, message_input
6666
)
6767
history.extend(new_history)
68-
print(*new_history, sep="\n")
68+
print(*new_history[1:], sep="\n")
6969
except WorkflowUpdateFailedError:
7070
print("** Stale conversation. Reloading...")
7171
length = len(history)

openai_agents/workflows/customer_service.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import annotations as _annotations
22

3+
from typing import Dict, Tuple
4+
35
from agents import Agent, RunContextWrapper, function_tool, handoff
46
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
57
from pydantic import BaseModel
@@ -23,19 +25,20 @@ class AirlineAgentContext(BaseModel):
2325
description_override="Lookup frequently asked questions.",
2426
)
2527
async def faq_lookup_tool(question: str) -> str:
26-
if "bag" in question or "baggage" in question:
28+
question_lower = question.lower()
29+
if "bag" in question_lower or "baggage" in question_lower:
2730
return (
2831
"You are allowed to bring one bag on the plane. "
2932
"It must be under 50 pounds and 22 inches x 14 inches x 9 inches."
3033
)
31-
elif "seats" in question or "plane" in question:
34+
elif "seats" in question_lower or "plane" in question_lower:
3235
return (
3336
"There are 120 seats on the plane. "
3437
"There are 22 business class seats and 98 economy seats. "
3538
"Exit rows are rows 4 and 16. "
3639
"Rows 5-8 are Economy Plus, with extra legroom. "
3740
)
38-
elif "wifi" in question:
41+
elif "wifi" in question_lower:
3942
return "We have free wifi on the plane, join Airline-Wifi"
4043
return "I'm sorry, I don't know the answer to that question."
4144

@@ -74,7 +77,9 @@ async def on_seat_booking_handoff(
7477
### AGENTS
7578

7679

77-
def init_agents() -> Agent[AirlineAgentContext]:
80+
def init_agents() -> Tuple[
81+
Agent[AirlineAgentContext], Dict[str, Agent[AirlineAgentContext]]
82+
]:
7883
"""
7984
Initialize the agents for the airline customer service workflow.
8085
:return: triage agent
@@ -121,7 +126,9 @@ def init_agents() -> Agent[AirlineAgentContext]:
121126

122127
faq_agent.handoffs.append(triage_agent)
123128
seat_booking_agent.handoffs.append(triage_agent)
124-
return triage_agent
129+
return triage_agent, {
130+
agent.name: agent for agent in [faq_agent, seat_booking_agent, triage_agent]
131+
}
125132

126133

127134
class ProcessUserMessageInput(BaseModel):
Lines changed: 115 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
from __future__ import annotations as _annotations
22

3+
import asyncio
4+
from datetime import timedelta
5+
36
from agents import (
4-
Agent,
7+
HandoffCallItem,
58
HandoffOutputItem,
69
ItemHelpers,
710
MessageOutputItem,
@@ -12,6 +15,7 @@
1215
TResponseInputItem,
1316
trace,
1417
)
18+
from pydantic import BaseModel, dataclasses
1519
from temporalio import workflow
1620

1721
from openai_agents.workflows.customer_service import (
@@ -21,71 +25,134 @@
2125
)
2226

2327

28+
@dataclasses.dataclass
29+
class CustomerServiceWorkflowState:
30+
printed_history: list[str]
31+
current_agent_name: str
32+
context: AirlineAgentContext
33+
input_items: list[dict] # Store as plain dictionaries to avoid serialization issues
34+
35+
2436
@workflow.defn
2537
class CustomerServiceWorkflow:
2638
@workflow.init
27-
def __init__(self, input_items: list[TResponseInputItem] | None = None):
39+
def __init__(
40+
self, customer_service_state: CustomerServiceWorkflowState | None = None
41+
):
2842
self.run_config = RunConfig()
29-
self.chat_history: list[str] = []
30-
self.current_agent: Agent[AirlineAgentContext] = init_agents()
31-
self.context = AirlineAgentContext()
32-
self.input_items = [] if input_items is None else input_items
3343

34-
@workflow.run
35-
async def run(self, input_items: list[TResponseInputItem] | None = None):
36-
await workflow.wait_condition(
37-
lambda: workflow.info().is_continue_as_new_suggested()
38-
and workflow.all_handlers_finished()
44+
starting_agent, self.agent_map = init_agents()
45+
self.current_agent = (
46+
self.agent_map[customer_service_state.current_agent_name]
47+
if customer_service_state
48+
else starting_agent
49+
)
50+
self.context = (
51+
customer_service_state.context
52+
if customer_service_state
53+
else AirlineAgentContext()
3954
)
40-
workflow.continue_as_new(self.input_items)
55+
56+
self.printed_history: list[str] = (
57+
customer_service_state.printed_history if customer_service_state else []
58+
)
59+
60+
self.input_items = (
61+
customer_service_state.input_items if customer_service_state else []
62+
)
63+
64+
# Communication channels
65+
self.user_input_queue: asyncio.Queue[str] = asyncio.Queue()
66+
self.update_condition: asyncio.Condition = asyncio.Condition()
67+
68+
@workflow.run
69+
async def run(
70+
self, customer_service_state: CustomerServiceWorkflowState | None = None
71+
):
72+
while True:
73+
with trace("Customer service", group_id=workflow.info().workflow_id):
74+
user_input = await self.user_input_queue.get()
75+
self.input_items.append({"content": user_input, "role": "user"})
76+
result = await Runner.run(
77+
self.current_agent,
78+
self.input_items,
79+
context=self.context,
80+
run_config=self.run_config,
81+
)
82+
self.printed_history.append(f"Enter your message: {user_input}")
83+
for new_item in result.new_items:
84+
agent_name = new_item.agent.name
85+
if isinstance(new_item, MessageOutputItem):
86+
self.printed_history.append(
87+
f"{agent_name}: {ItemHelpers.text_message_output(new_item)}"
88+
)
89+
elif isinstance(new_item, HandoffOutputItem):
90+
self.printed_history.append(
91+
f"Handed off from {new_item.source_agent.name} to {new_item.target_agent.name}"
92+
)
93+
elif isinstance(new_item, HandoffCallItem):
94+
self.printed_history.append(
95+
f"{agent_name}: Handed off to tool {new_item.raw_item.name}"
96+
)
97+
elif isinstance(new_item, ToolCallItem):
98+
self.printed_history.append(f"{agent_name}: Calling a tool")
99+
elif isinstance(new_item, ToolCallOutputItem):
100+
self.printed_history.append(
101+
f"{agent_name}: Tool call output: {new_item.output}"
102+
)
103+
else:
104+
self.printed_history.append(
105+
f"{agent_name}: Skipping item: {new_item.__class__.__name__}"
106+
)
107+
self.input_items = result.to_input_list()
108+
self.current_agent = result.last_agent
109+
async with self.update_condition:
110+
self.update_condition.notify_all()
111+
112+
if workflow.info().is_continue_as_new_suggested():
113+
await workflow.wait_condition(
114+
lambda: workflow.all_handlers_finished(),
115+
timeout=timedelta(seconds=10),
116+
timeout_summary="Continue as new timeout - deadlock avoidance",
117+
)
118+
119+
# Convert input_items to plain dictionaries for serialization
120+
serializable_input_items = []
121+
for item in self.input_items:
122+
if hasattr(item, "model_dump"):
123+
# Convert Pydantic objects to dictionaries
124+
serializable_input_items.append(item.model_dump())
125+
else:
126+
# Already a plain Python object
127+
serializable_input_items.append(item)
128+
workflow.continue_as_new(
129+
CustomerServiceWorkflowState(
130+
printed_history=self.printed_history,
131+
current_agent_name=self.current_agent.name,
132+
context=self.context,
133+
input_items=serializable_input_items,
134+
)
135+
)
41136

42137
@workflow.query
43138
def get_chat_history(self) -> list[str]:
44-
return self.chat_history
139+
return self.printed_history
45140

46141
@workflow.update
47142
async def process_user_message(self, input: ProcessUserMessageInput) -> list[str]:
48-
length = len(self.chat_history)
49-
self.chat_history.append(f"User: {input.user_input}")
50-
with trace("Customer service", group_id=workflow.info().workflow_id):
51-
self.input_items.append({"content": input.user_input, "role": "user"})
52-
result = await Runner.run(
53-
self.current_agent,
54-
self.input_items,
55-
context=self.context,
56-
run_config=self.run_config,
143+
length = len(self.printed_history)
144+
self.user_input_queue.put_nowait(input.user_input)
145+
async with self.update_condition:
146+
await self.update_condition.wait_for(
147+
lambda: len(self.printed_history) > length
57148
)
58-
59-
for new_item in result.new_items:
60-
agent_name = new_item.agent.name
61-
if isinstance(new_item, MessageOutputItem):
62-
self.chat_history.append(
63-
f"{agent_name}: {ItemHelpers.text_message_output(new_item)}"
64-
)
65-
elif isinstance(new_item, HandoffOutputItem):
66-
self.chat_history.append(
67-
f"Handed off from {new_item.source_agent.name} to {new_item.target_agent.name}"
68-
)
69-
elif isinstance(new_item, ToolCallItem):
70-
self.chat_history.append(f"{agent_name}: Calling a tool")
71-
elif isinstance(new_item, ToolCallOutputItem):
72-
self.chat_history.append(
73-
f"{agent_name}: Tool call output: {new_item.output}"
74-
)
75-
else:
76-
self.chat_history.append(
77-
f"{agent_name}: Skipping item: {new_item.__class__.__name__}"
78-
)
79-
self.input_items = result.to_input_list()
80-
self.current_agent = result.last_agent
81-
workflow.set_current_details("\n\n".join(self.chat_history))
82-
return self.chat_history[length:]
149+
return self.printed_history[length:]
83150

84151
@process_user_message.validator
85152
def validate_process_user_message(self, input: ProcessUserMessageInput) -> None:
86153
if not input.user_input:
87154
raise ValueError("User input cannot be empty.")
88155
if len(input.user_input) > 1000:
89156
raise ValueError("User input is too long. Please limit to 1000 characters.")
90-
if input.chat_length != len(self.chat_history):
157+
if input.chat_length != len(self.printed_history):
91158
raise ValueError("Stale chat history. Please refresh the chat.")

0 commit comments

Comments
 (0)