Skip to content

Commit 2343a27

Browse files
committed
Add logic
1 parent 4516e90 commit 2343a27

File tree

11 files changed

+484
-46
lines changed

11 files changed

+484
-46
lines changed

examples/demos/procurement_agent/manifest.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ agent:
106106

107107
# Optional: Set Environment variables for running your agent locally as well
108108
# as for deployment later on
109-
# env:
110-
# OPENAI_API_KEY: "<YOUR_OPENAI_API_KEY_HERE>"
111-
# OPENAI_BASE_URL: "<YOUR_OPENAI_BASE_URL_HERE>"
112-
# OPENAI_ORG_ID: "<YOUR_OPENAI_ORG_ID_HERE>"
109+
env:
110+
OPENAI_API_KEY: ""
111+
# OPENAI_BASE_URL: "<YOUR_OPENAI_BASE_URL_HERE>"
112+
OPENAI_ORG_ID: ""
113113

114114

115115
# Deployment Configuration

examples/demos/procurement_agent/project/acp.py

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import os
22
import sys
3+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
4+
from datetime import timedelta
5+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
6+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
7+
TemporalStreamingModelProvider,
8+
)
39

410
# === DEBUG SETUP (AgentEx CLI Debug Support) ===
511
if os.getenv("AGENTEX_DEBUG_ENABLED") == "true":
@@ -36,29 +42,16 @@
3642
from agentex.lib.sdk.fastacp.fastacp import FastACP
3743
from agentex.lib.types.fastacp import TemporalACPConfig
3844

45+
context_interceptor = ContextInterceptor()
46+
streaming_model_provider = TemporalStreamingModelProvider()
3947

4048
# Create the ACP server
4149
acp = FastACP.create(
4250
acp_type="async",
4351
config=TemporalACPConfig(
44-
# When deployed to the cluster, the Temporal address will automatically be set to the cluster address
45-
# For local development, we set the address manually to talk to the local Temporal service set up via docker compose
4652
type="temporal",
47-
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233")
53+
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
54+
plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)],
55+
interceptors=[context_interceptor]
4856
)
49-
)
50-
51-
52-
# Notice that we don't need to register any handlers when we use type="temporal"
53-
# If you look at the code in agentex.sdk.fastacp.impl.temporal_acp
54-
# You can see that these handlers are automatically registered when the ACP is created
55-
56-
# @acp.on_task_create
57-
# This will be handled by the method in your workflow that is decorated with @workflow.run
58-
59-
# @acp.on_task_event_send
60-
# This will be handled by the method in your workflow that is decorated with @workflow.signal(name=SignalName.RECEIVE_MESSAGE)
61-
62-
# @acp.on_task_cancel
63-
# This does not need to be handled by your workflow.
64-
# It is automatically handled by the temporal client which cancels the workflow directly
57+
)
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from temporalio import activity
2+
import asyncio
3+
from temporalio import workflow
4+
from datetime import timedelta
5+
from agentex.lib.utils.logging import make_logger
6+
from project.events import SubmitalApprovalEvent, ShipmentDepartedFactoryEvent, ShipmentArrivedSiteEvent, InspectionFailedEvent
7+
import uuid
8+
from datetime import datetime
9+
10+
logger = make_logger(__name__)
11+
12+
@activity.defn
13+
async def issue_purchase_order(event: SubmitalApprovalEvent) -> str:
14+
"""
15+
Issues a purchase order for construction materials.
16+
17+
Call this when:
18+
- A submittal is approved (Submittal_Approved event)
19+
- Human feedback requests reissuing a purchase order
20+
"""
21+
uuid_purchase_order = str(uuid.uuid4())
22+
# wait for 5 seconds as if we were calling an API to issue a purchase order
23+
await asyncio.sleep(5)
24+
logger.info(f"Issuing purchase order: {event}")
25+
logger.info(f"Purchase order ID: {uuid_purchase_order}")
26+
27+
return f"Successfully issued purchase order with ID: {uuid_purchase_order}"
28+
29+
@activity.defn
30+
async def flag_potential_issue(event: ShipmentDepartedFactoryEvent) -> str:
31+
"""
32+
Flags a potential issue with a delivery date.
33+
34+
Call this when:
35+
- A shipment departure creates timeline concerns (Shipment_Departed_Factory event)
36+
- Human feedback identifies a potential delivery issue
37+
"""
38+
logger.info(f"Flagging potential issue: {event}")
39+
logger.info(f"Potential issue flagged with delivery date: {event.eta}")
40+
# imagine this is a call to an API to flag a potential issue, perhaps a notification to a team member
41+
await asyncio.sleep(1)
42+
return f"Potential issue flagged with delivery date: {event.eta}"
43+
44+
@activity.defn
45+
async def notify_team_shipment_arrived(event: ShipmentDepartedFactoryEvent) -> str:
46+
"""
47+
Notifies the team that a shipment has arrived.
48+
49+
Call this when:
50+
- A shipment arrives at the site (Shipment_Arrived_Site event)
51+
- Human feedback requests team notification
52+
"""
53+
logger.info(f"Notifying team that shipment has arrived: {event.item}")
54+
logger.info(f"Team notification sent for arrival of: {event.item}")
55+
# imagine this is a call to an API to notify the team that a shipment has arrived, perhaps a notification to a team member
56+
await asyncio.sleep(1)
57+
58+
return f"Notifying team that shipment has arrived: {event.item}"
59+
60+
@activity.defn
61+
async def schedule_inspection(event: ShipmentDepartedFactoryEvent) -> str:
62+
"""
63+
Schedules an inspection for delivered materials.
64+
65+
Call this when:
66+
- A shipment arrives at the site (Shipment_Arrived_Site event)
67+
- Human feedback requests scheduling an inspection
68+
"""
69+
inspection_date = datetime.now() + timedelta(days=1)
70+
logger.info(f"Scheduling inspection for: {event.item} on {inspection_date}")
71+
# imagine this is a call to an API to schedule an inspection
72+
await asyncio.sleep(1)
73+
return f"Scheduling inspection for {event.item} on {inspection_date}"
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
"""Event agent for processing procurement events and taking actions."""
2+
3+
from agents import Agent, function_tool
4+
from datetime import datetime, timedelta
5+
from temporalio.contrib import openai_agents
6+
from project.activities import (
7+
issue_purchase_order,
8+
flag_potential_issue,
9+
notify_team_shipment_arrived,
10+
schedule_inspection,
11+
)
12+
from temporalio import workflow
13+
14+
15+
@function_tool
16+
async def wait_for_human(recommended_action: str) -> str:
17+
"""
18+
When the we are stuck and need to ask a human for help, call this tool. Please provide the recommended action to the human, should be yes or no recommendation.
19+
20+
Args:
21+
recommended_action: The recommended action to take
22+
23+
Returns:
24+
The human's response
25+
"""
26+
workflow_instance = workflow.instance()
27+
28+
await workflow.wait_condition(
29+
lambda: not workflow_instance.human_queue.empty(),
30+
timeout=None,
31+
)
32+
33+
34+
while not workflow_instance.human_queue.empty():
35+
human_input = await workflow_instance.human_queue.get()
36+
print(f"[WORKFLOW] Processing human message from queue")
37+
38+
return human_input
39+
40+
41+
def new_procurement_agent(event_log: list, master_construction_schedule: str, human_input_learnings: list) -> Agent:
42+
"""
43+
Create an agent that processes procurement events and takes actions.
44+
45+
Args:
46+
event_log: History of events that have occurred
47+
master_construction_schedule: Current construction schedule
48+
human_input_learnings: Past escalations and human decisions
49+
50+
Returns:
51+
Agent configured to process events and call tools
52+
"""
53+
instructions = f"""
54+
You are a procurement agent for a commercial building construction project.
55+
56+
Your role is to monitor procurement events, take appropriate actions, and escalate critical issues to a human with a recommended action.
57+
58+
If the user says no or has feedback, please come up with another solution and call the wait_for_human tool again (you can call it as many times as needed).
59+
60+
## Context
61+
62+
Master Construction Schedule:
63+
{master_construction_schedule}
64+
65+
Event History:
66+
{event_log}
67+
68+
Past Learnings from Escalations:
69+
{human_input_learnings}
70+
71+
Current Date: {datetime.now().isoformat()}
72+
"""
73+
74+
start_to_close_timeout = timedelta(days=1)
75+
76+
return Agent(
77+
name="Procurement Event Agent",
78+
instructions=instructions,
79+
model="gpt-4o",
80+
tools=[
81+
openai_agents.workflow.activity_as_tool(
82+
issue_purchase_order, start_to_close_timeout=start_to_close_timeout
83+
),
84+
openai_agents.workflow.activity_as_tool(
85+
flag_potential_issue, start_to_close_timeout=start_to_close_timeout
86+
),
87+
openai_agents.workflow.activity_as_tool(
88+
notify_team_shipment_arrived,
89+
start_to_close_timeout=start_to_close_timeout,
90+
),
91+
openai_agents.workflow.activity_as_tool(
92+
schedule_inspection, start_to_close_timeout=start_to_close_timeout
93+
),
94+
wait_for_human, # function_tool runs in workflow context
95+
],
96+
)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from enum import Enum
2+
from pydantic import BaseModel, Field
3+
from datetime import datetime
4+
5+
class EventType(Enum):
6+
SUBMITTAL_APPROVED = "Submittal_Approved"
7+
SHIPMENT_DEPARTED_FACTORY = "Shipment_Departed_Factory"
8+
SHIPMENT_ARRIVED_SITE = "Shipment_Arrived_Site"
9+
INSPECTION_FAILED = "Inspection_Failed"
10+
HUMAN_INPUT = "Human_Input"
11+
12+
class SubmitalApprovalEvent(BaseModel):
13+
event_type: EventType = Field(default=EventType.SUBMITTAL_APPROVED)
14+
item: str
15+
16+
class ShipmentDepartedFactoryEvent(BaseModel):
17+
event_type: EventType = Field(default=EventType.SHIPMENT_DEPARTED_FACTORY)
18+
item: str
19+
eta: datetime
20+
21+
class ShipmentArrivedSiteEvent(BaseModel):
22+
event_type: EventType = Field(default=EventType.SHIPMENT_ARRIVED_SITE)
23+
item: str
24+
date_arrived: datetime
25+
26+
class InspectionFailedEvent(BaseModel):
27+
event_type: EventType = Field(default=EventType.INSPECTION_FAILED)
28+
item: str
29+
inspection_date: datetime
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""Master construction schedule data."""
2+
3+
MASTER_CONSTRUCTION_SCHEDULE = """
4+
{
5+
"project": {
6+
"name": "Small Office Renovation",
7+
"start_date": "2026-02-01",
8+
"end_date": "2026-05-31"
9+
},
10+
"deliveries": [
11+
{
12+
"item": "Steel Beams",
13+
"required_by": "2026-02-15",
14+
"buffer_days": 5
15+
},
16+
{
17+
"item": "HVAC Units",
18+
"required_by": "2026-03-01",
19+
"buffer_days": 7
20+
},
21+
{
22+
"item": "Windows",
23+
"required_by": "2026-03-15",
24+
"buffer_days": 10
25+
},
26+
{
27+
"item": "Flooring Materials",
28+
"required_by": "2026-04-01",
29+
"buffer_days": 3
30+
},
31+
{
32+
"item": "Electrical Panels",
33+
"required_by": "2026-04-15",
34+
"buffer_days": 5
35+
}
36+
]
37+
}
38+
"""

examples/demos/procurement_agent/project/run_worker.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55
from agentex.lib.utils.logging import make_logger
66
from agentex.lib.utils.debug import setup_debug_if_enabled
77
from agentex.lib.environment_variables import EnvironmentVariables
8-
8+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
9+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
10+
from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content
11+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
12+
TemporalStreamingModelProvider,
13+
)
914
from project.workflow import ProcurementAgentWorkflow
10-
15+
from project.activities import issue_purchase_order, flag_potential_issue, notify_team_shipment_arrived, schedule_inspection
1116

1217
environment_variables = EnvironmentVariables.refresh()
1318

@@ -22,11 +27,16 @@ async def main():
2227
if task_queue_name is None:
2328
raise ValueError("WORKFLOW_TASK_QUEUE is not set")
2429

25-
all_activities = get_all_activities() + [] # add your own activities here
30+
all_activities = get_all_activities() + [stream_lifecycle_content, issue_purchase_order, flag_potential_issue, notify_team_shipment_arrived, schedule_inspection]
31+
32+
context_interceptor = ContextInterceptor()
33+
streaming_model_provider = TemporalStreamingModelProvider()
2634

2735
# Create a worker with automatic tracing
2836
worker = AgentexWorker(
2937
task_queue=task_queue_name,
38+
plugins=[OpenAIAgentsPlugin(model_provider=streaming_model_provider)],
39+
interceptors=[context_interceptor],
3040
)
3141

3242
await worker.run(

0 commit comments

Comments
 (0)