Skip to content

Commit 5808b61

Browse files
committed
Moved CustomerServiceWorkflow logic into the update handler for clarity.
1 parent bbf6269 commit 5808b61

2 files changed

Lines changed: 58 additions & 40 deletions

File tree

openai_agents/customer_service_client.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import argparse
22
import asyncio
3+
from ftplib import print_line
34

4-
from temporalio.client import Client, WorkflowQueryRejectedError
5+
from temporalio.client import Client, WorkflowQueryRejectedError, WorkflowUpdateFailedError
56
from temporalio.common import WorkflowIDReusePolicy, QueryRejectCondition
67
from temporalio.service import RPCError, RPCStatusCode
78

8-
from openai_agents.workflows.customer_service_workflow import CustomerServiceWorkflow
99
from openai_agents.adapters.open_ai_converter import open_ai_data_converter
10+
from openai_agents.workflows.customer_service_workflow import CustomerServiceWorkflow, ProcessUserMessageInput
1011

1112

1213
async def main():
@@ -42,8 +43,17 @@ async def main():
4243
# Loop to send messages to the workflow
4344
while True:
4445
user_input = input("Enter your message: ")
45-
new_history = await handle.execute_update(CustomerServiceWorkflow.process_user_message, user_input)
46-
print(*new_history, sep="\n")
46+
message_input = ProcessUserMessageInput(user_input=user_input, chat_length=len(history))
47+
try:
48+
new_history = await handle.execute_update(CustomerServiceWorkflow.process_user_message, message_input)
49+
history.extend(new_history)
50+
print(*new_history, sep="\n")
51+
except WorkflowUpdateFailedError:
52+
print_line("** Stale conversation. Reloading...")
53+
length = len(history)
54+
history = await handle.query(CustomerServiceWorkflow.get_chat_history,
55+
reject_condition=QueryRejectCondition.NOT_OPEN)
56+
print(*history[length:], sep="\n")
4757

4858

4959
if __name__ == "__main__":

openai_agents/workflows/customer_service_workflow.py

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
from temporalio import workflow
44

5-
65
with workflow.unsafe.imports_passed_through():
76
from pydantic import BaseModel
87
from openai_agents.adapters.activity_model import ModelStubProvider
@@ -136,55 +135,64 @@ def init_agents() -> Agent[AirlineAgentContext]:
136135
return triage_agent
137136

138137

139-
### RUN
138+
class ProcessUserMessageInput(BaseModel):
139+
user_input: str
140+
chat_length: int
141+
140142

141143
@workflow.defn(sandboxed=False)
142144
class CustomerServiceWorkflow:
143145

144-
def __init__(self):
146+
def __init__(self, input_items: list[TResponseInputItem] = None):
145147
self.run_config = RunConfig(model_provider=ModelStubProvider())
146148
self.chat_history = []
147-
self.user_input = None
149+
self.current_agent: Agent[AirlineAgentContext] = init_agents()
150+
self.context = AirlineAgentContext()
151+
self.input_items = [] if input_items is None else input_items
148152

149153
@workflow.run
150154
async def run(self, input_items: list[TResponseInputItem] = None):
151-
input_items = [] #input_items or []
152-
current_agent: Agent[AirlineAgentContext] = init_agents()
153-
context = AirlineAgentContext()
154-
while not workflow.info().is_continue_as_new_suggested():
155-
await workflow.wait_condition(lambda: self.user_input is not None)
156-
self.chat_history.append(f"User: {self.user_input}")
157-
with trace("Customer service", group_id=workflow.info().workflow_id):
158-
input_items.append({"content": self.user_input, "role": "user"})
159-
result = await Runner.run(current_agent, input_items, context=context, run_config=self.run_config)
160-
161-
for new_item in result.new_items:
162-
agent_name = new_item.agent.name
163-
if isinstance(new_item, MessageOutputItem):
164-
self.chat_history.append(f"{agent_name}: {ItemHelpers.text_message_output(new_item)}")
165-
elif isinstance(new_item, HandoffOutputItem):
166-
self.chat_history.append(
167-
f"Handed off from {new_item.source_agent.name} to {new_item.target_agent.name}"
168-
)
169-
elif isinstance(new_item, ToolCallItem):
170-
self.chat_history.append(f"{agent_name}: Calling a tool")
171-
elif isinstance(new_item, ToolCallOutputItem):
172-
self.chat_history.append(f"{agent_name}: Tool call output: {new_item.output}")
173-
else:
174-
self.chat_history.append(f"{agent_name}: Skipping item: {new_item.__class__.__name__}")
175-
input_items = result.to_input_list()
176-
current_agent = result.last_agent
177-
self.user_input = None
178-
workflow.continue_as_new(input_items)
155+
await workflow.wait_condition(
156+
lambda: workflow.info().is_continue_as_new_suggested() and workflow.all_handlers_finished())
157+
workflow.continue_as_new(self.input_items)
179158

180159
@workflow.query
181160
def get_chat_history(self) -> list[str]:
182161
return self.chat_history
183162

184163
@workflow.update
185-
async def process_user_message(self, user_input: str) -> list[str]:
186-
await workflow.wait_condition(lambda: self.user_input is None)
187-
self.user_input = user_input
164+
async def process_user_message(self, input: ProcessUserMessageInput) -> list[str]:
188165
length = len(self.chat_history)
189-
await workflow.wait_condition(lambda: self.user_input is None)
166+
self.chat_history.append(f"User: {input.user_input}")
167+
with trace("Customer service", group_id=workflow.info().workflow_id):
168+
self.input_items.append({"content": input.user_input, "role": "user"})
169+
result = await Runner.run(self.current_agent, self.input_items, context=self.context,
170+
run_config=self.run_config)
171+
172+
for new_item in result.new_items:
173+
agent_name = new_item.agent.name
174+
if isinstance(new_item, MessageOutputItem):
175+
self.chat_history.append(f"{agent_name}: {ItemHelpers.text_message_output(new_item)}")
176+
elif isinstance(new_item, HandoffOutputItem):
177+
self.chat_history.append(
178+
f"Handed off from {new_item.source_agent.name} to {new_item.target_agent.name}"
179+
)
180+
elif isinstance(new_item, ToolCallItem):
181+
self.chat_history.append(f"{agent_name}: Calling a tool")
182+
elif isinstance(new_item, ToolCallOutputItem):
183+
self.chat_history.append(f"{agent_name}: Tool call output: {new_item.output}")
184+
else:
185+
self.chat_history.append(f"{agent_name}: Skipping item: {new_item.__class__.__name__}")
186+
self.input_items = result.to_input_list()
187+
self.current_agent = result.last_agent
188+
workflow.set_current_details("\n\n".join(self.chat_history))
190189
return self.chat_history[length:]
190+
191+
@process_user_message.validator
192+
def validate_process_user_message(self, input: ProcessUserMessageInput) -> None:
193+
if not input.user_input:
194+
raise ValueError("User input cannot be empty.")
195+
if len(input.user_input) > 1000:
196+
raise ValueError("User input is too long. Please limit to 1000 characters.")
197+
if input.chat_length != len(self.chat_history):
198+
raise ValueError("Stale chat history. Please refresh the chat.")

0 commit comments

Comments
 (0)