-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathtest_workflows.py
More file actions
195 lines (165 loc) · 7.57 KB
/
test_workflows.py
File metadata and controls
195 lines (165 loc) · 7.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
Workflow Unit Testing Example
==============================
This module demonstrates how to write unit tests for Conductor workflows and workers.
Key Concepts:
-------------
1. **Worker Testing**: Test worker functions independently as regular Python functions
2. **Workflow Testing**: Test complete workflows end-to-end with mocked task outputs
3. **Mock Outputs**: Simulate task execution results without running actual workers
4. **Retry Simulation**: Test retry logic by providing multiple outputs (failed then succeeded)
5. **Decision Testing**: Verify switch/decision logic with different input scenarios
Test Types:
-----------
- **Unit Test (test_greetings_worker)**: Tests a single worker function in isolation
- **Integration Test (test_workflow_execution)**: Tests complete workflow with mocked dependencies
Running Tests:
--------------
python3 -m unittest discover --verbose --start-directory=./
python3 -m unittest examples.test_workflows.WorkflowUnitTest
Use Cases:
----------
- Validate workflow logic before deployment
- Test error handling and retry behavior
- Verify decision/switch conditions
- CI/CD pipeline integration
- Regression testing for workflow changes
"""
import unittest
from conductor.client.configuration.configuration import Configuration
from conductor.client.http.models.workflow_test_request import WorkflowTestRequest
from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.http_task import HttpTask
from conductor.client.workflow.task.simple_task import SimpleTask
from conductor.client.workflow.task.switch_task import SwitchTask
from examples.helloworld.greetings_worker import greet
class WorkflowUnitTest(unittest.TestCase):
"""
Unit tests for Conductor workflows and workers.
This test suite demonstrates:
- Testing individual worker functions
- Testing complete workflow execution with mocked task outputs
- Simulating task failures and retries
- Validating workflow decision logic
"""
@classmethod
def setUpClass(cls) -> None:
api_config = Configuration()
clients = OrkesClients(configuration=api_config)
cls.workflow_executor = clients.get_workflow_executor()
cls.workflow_client = clients.get_workflow_client()
def test_greetings_worker(self):
"""
Unit test for a worker function.
Demonstrates:
- Worker functions are regular Python functions that can be tested directly
- No need to start worker processes or connect to Conductor server
- Fast, isolated testing of business logic
- Can use standard Python testing tools (unittest, pytest, etc.)
This approach is ideal for:
- Testing worker logic in isolation
- Running tests in CI/CD pipelines
- Test-driven development (TDD)
- Quick feedback during development
"""
name = 'test'
result = greet(name=name)
self.assertEqual(f'Hello {name}', result)
def test_workflow_execution(self):
"""
Integration test for a complete workflow with mocked task outputs.
Demonstrates:
- Testing workflow logic without running actual workers
- Mocking task outputs to simulate different scenarios
- Testing retry behavior (task failure followed by success)
- Testing decision/switch logic with different inputs
- Validating workflow execution paths
Key Benefits:
- Fast execution (no actual task execution)
- Deterministic results (mocked outputs)
- No external dependencies (no worker processes)
- Test error scenarios safely
- Validate workflow structure and logic
Workflow Structure:
-------------------
1. HTTP task (always succeeds)
2. task1 (fails first, succeeds on retry with city='NYC')
3. Switch decision based on task1.output('city')
4. If city='NYC': execute task2
5. Otherwise: execute task3
Expected Flow:
--------------
HTTP → task1 (FAILED) → task1 (RETRY, COMPLETED) → switch → task2
"""
# Create workflow with tasks
wf = ConductorWorkflow(name='unit_testing_example', version=1, executor=self.workflow_executor)
task1 = SimpleTask(task_def_name='hello', task_reference_name='hello_ref_1')
task2 = SimpleTask(task_def_name='hello', task_reference_name='hello_ref_2')
task3 = SimpleTask(task_def_name='hello', task_reference_name='hello_ref_3')
# Switch decision: if city='NYC' → task2, else → task3
decision = SwitchTask(task_ref_name='switch_ref', case_expression=task1.output('city'))
decision.switch_case('NYC', task2)
decision.default_case(task3)
# HTTP task to simulate external API call
http = HttpTask(task_ref_name='http', http_input={'uri': 'https://orkes-api-tester.orkesconductor.com/api'})
wf >> http
wf >> task1 >> decision
# Mock outputs for each task
task_ref_to_mock_output = {}
# task1 has two attempts: first fails, second succeeds
# This tests retry behavior
task_ref_to_mock_output[task1.task_reference_name] = [{
'status': 'FAILED',
'output': {
'key': 'failed'
}
},
{
'status': 'COMPLETED',
'output': {
'city': 'NYC' # This triggers the switch to execute task2
}
}
]
# task2 succeeds (executed because city='NYC')
task_ref_to_mock_output[task2.task_reference_name] = [
{
'status': 'COMPLETED',
'output': {
'key': 'task2.output'
}
}
]
# HTTP task succeeds
task_ref_to_mock_output[http.task_reference_name] = [
{
'status': 'COMPLETED',
'output': {
'key': 'http.output'
}
}
]
# Execute workflow test with mocked outputs
test_request = WorkflowTestRequest(name=wf.name, version=wf.version,
task_ref_to_mock_output=task_ref_to_mock_output,
workflow_def=wf.to_workflow_def())
run = self.workflow_client.test_workflow(test_request=test_request)
# Verify workflow completed successfully
print(f'completed the test run')
print(f'status: {run.status}')
self.assertEqual(run.status, 'COMPLETED')
# Verify HTTP task executed first
print(f'first task (HTTP) status: {run.tasks[0].task_type}')
self.assertEqual(run.tasks[0].task_type, 'HTTP')
# Verify task1 failed on first attempt (retry test)
print(f'{run.tasks[1].reference_task_name} status: {run.tasks[1].status} (expected to be FAILED)')
self.assertEqual(run.tasks[1].status, 'FAILED')
# Verify task1 succeeded on retry
print(f'{run.tasks[2].reference_task_name} status: {run.tasks[2].status} (expected to be COMPLETED')
self.assertEqual(run.tasks[2].status, 'COMPLETED')
# Verify switch decision executed task2 (because city='NYC')
print(f'{run.tasks[4].reference_task_name} status: {run.tasks[4].status} (expected to be COMPLETED')
self.assertEqual(run.tasks[4].status, 'COMPLETED')
# Verify the correct branch was taken (task2, not task3)
self.assertEqual(run.tasks[4].reference_task_name, task2.task_reference_name)