Skip to content

Commit ec57990

Browse files
committed
added reversal coordinator
1 parent 4a20e2c commit ec57990

2 files changed

Lines changed: 87 additions & 0 deletions

File tree

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from temporalio import workflow
2+
3+
4+
@workflow.defn
5+
class ReversalCoordinator:
6+
7+
def __init__(self, list_of_ids: list):
8+
9+
self.list_of_ids = list_of_ids
10+
self.exit = False
11+
12+
@workflow.run
13+
async def run(self):
14+
15+
await condition(self.exit)
16+
17+
@workflow.signal
18+
def registerworkflowid(self, workflow_id: str):
19+
self.list_of_ids.append(workflow_id)
20+
21+
if continue_as_new_recommended
22+
continue_as_new(self.list_of_ids)
23+
24+
25+
@workflow.signal
26+
def decision_made(self, decision: bool):
27+
for id in self.list_of_ids:
28+
handle = get_workflow_handle(id)
29+
handle.send_signal("decision_made", decision)
30+
31+
self.exit = True
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from temporalio import workflow, activity
2+
3+
4+
@workflow.defn
5+
class ReversalWorkflow:
6+
7+
def __init__(self, merchant):
8+
self.merchant = merchant
9+
self.decision_received = False
10+
self.decision_should_continue_reversal = False
11+
12+
@workflow.run
13+
async def run(self):
14+
15+
if rate_limited():
16+
reversal_coordinator_handle = get_or_make_reversal_coordinator()
17+
18+
register_with_handler(reversal_coordinator_handle)
19+
20+
await condition(self.decision_received)
21+
22+
if self.decision_should_continue_reversal:
23+
# continue reversal
24+
pass
25+
else:
26+
# complete
27+
pass
28+
29+
@activity.defn
30+
def get_or_make_reversal_coordinator(self):
31+
32+
while True:
33+
try:
34+
workflow_handle = get_workflow_handle("reversal-coordinator-" + self.merchant)
35+
return workflow_handle
36+
except WorkflowExecutionNotFound:
37+
try:
38+
workflow_handle = start_child_workflow(
39+
ReversalCoordinator.run,
40+
arg=[],
41+
id="reversal-coordinator-" + self.merchant,
42+
task_queue="reversal-coordinator-task-queue",
43+
parent_close_policy='abandon'
44+
)
45+
return workflow_handle
46+
except WorkflowExecutionAlreadyStarted:
47+
continue
48+
49+
@activity.defn
50+
def register_with_handler(self, handle):
51+
handle.signal("registerworkflowid", workflow.id)
52+
53+
@workflow.signal
54+
def decision_made(self, decision_result: bool):
55+
self.decision_received = True
56+
self.decision_should_continue_reversal = decision_result

0 commit comments

Comments
 (0)