-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathkitchensink.py
More file actions
135 lines (112 loc) · 4.96 KB
/
kitchensink.py
File metadata and controls
135 lines (112 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Kitchen Sink Example
====================
Comprehensive example demonstrating all major workflow task types and patterns.
What it does:
-------------
- HTTP Task: Make external API calls
- JavaScript Task: Execute inline JavaScript code
- JSON JQ Task: Transform JSON using JQ queries
- Switch Task: Conditional branching based on values
- Wait Task: Pause workflow execution
- Set Variable Task: Store values in workflow variables
- Terminate Task: End workflow with specific status
- Custom Worker Task: Execute Python business logic
Use Cases:
----------
- Learning all available task types
- Building complex workflows with multiple task patterns
- Testing different control flow mechanisms (switch, terminate)
- Understanding how to combine system tasks with custom workers
Key Concepts:
-------------
- System Tasks: Built-in tasks (HTTP, JavaScript, JQ, Wait, etc.)
- Control Flow: Switch for branching, Terminate for early exit
- Data Transformation: JQ for JSON manipulation
- Worker Integration: Mix system tasks with custom Python workers
- Variable Management: Set and use workflow variables
This example is a "kitchen sink" showing all major features in one workflow.
"""
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes_clients import OrkesClients
from conductor.client.worker.worker_task import worker_task
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.http_task import HttpTask
from conductor.client.workflow.task.javascript_task import JavascriptTask
from conductor.client.workflow.task.json_jq_task import JsonJQTask
from conductor.client.workflow.task.set_variable_task import SetVariableTask
from conductor.client.workflow.task.switch_task import SwitchTask
from conductor.client.workflow.task.terminate_task import TerminateTask, WorkflowStatus
from conductor.client.workflow.task.wait_task import WaitTask
@worker_task(task_definition_name='route')
def route(country: str) -> str:
return f'routing the packages to {country}'
def start_workers(api_config):
task_handler = TaskHandler(
workers=[],
configuration=api_config,
scan_for_annotated_workers=True
)
task_handler.start_processes()
return task_handler
def main():
api_config = Configuration()
clients = OrkesClients(configuration=api_config)
workflow_executor = clients.get_workflow_executor()
task_handler = start_workers(api_config)
wf = ConductorWorkflow(name='kitchensink2', version=1, executor=workflow_executor)
say_hello_js = """
function greetings() {
return {
"text": "hello " + $.name,
"url": "https://orkes-api-tester.orkesconductor.com/api"
}
}
greetings();
"""
js = JavascriptTask(task_ref_name='hello_script', script=say_hello_js, bindings={'name': '${workflow.input.name}'})
# If using Orkes, remove the line
js.input_parameter('evaluatorType', 'javascript')
http_call = HttpTask(task_ref_name='call_remote_api', http_input={
'uri': 'https://orkes-api-tester.orkesconductor.com/api'
})
sub_workflow = ConductorWorkflow(name='sub0', executor=workflow_executor)
sub_workflow >> HttpTask(task_ref_name='call_remote_api', http_input={
'uri': sub_workflow.input('uri')
}) >> WaitTask(task_ref_name="wait_forever", wait_for_seconds=2)
sub_workflow.input_parameters({
'uri': js.output('url')
})
wait_for_two_sec = WaitTask(task_ref_name='wait_for_2_sec', wait_for_seconds=2)
jq_script = """
{ key3: (.key1.value1 + .key2.value2) }
"""
jq = JsonJQTask(task_ref_name='jq_process', script=jq_script)
jq.input_parameters.update({
'key1': {'value1': ['a', 'b']},
'key2': {'value2': ['d', 'e']}
})
set_wf_var = SetVariableTask(task_ref_name='set_wf_var_ref')
set_wf_var.input_parameters.update({
'var1': 'value1',
'var2': 42,
'var3': ['a', 'b', 'c']
})
switch = SwitchTask(task_ref_name='decide', case_expression=wf.input('country'))
switch.switch_case('US', route(task_ref_name='us_routing', country=wf.input('country')))
switch.switch_case('CA', route(task_ref_name='ca_routing', country=wf.input('country')))
switch.default_case(TerminateTask(task_ref_name='bad_country_Ref', termination_reason='unsupported country',
status=WorkflowStatus.TERMINATED))
wf >> js >> [sub_workflow, [http_call, wait_for_two_sec]] >> jq >> set_wf_var >> switch
wf.output_parameters({
'greetings': js.output()
})
result = wf.execute(workflow_input={'name': 'Orkes', 'country': 'US'})
op = result.output
print(f'\n\nWorkflow output: {op}\n\n')
print(f'\n\nWorkflow status: {result.status}\n\n')
print(f'See the execution at {api_config.ui_host}/execution/{result.workflow_id}')
task_handler.stop_processes()
if __name__ == '__main__':
main()