11import asyncio
2- import time
3- from concurrent .futures import ThreadPoolExecutor
42from dataclasses import dataclass
53from datetime import timedelta
6- from threading import Thread
74
85from temporalio import activity , workflow
96from temporalio .client import Client
@@ -17,39 +14,42 @@ class ComposeGreetingInput:
1714
1815
1916class GreetingComposer :
20- def __init__ (self , client : Client , loop : asyncio . AbstractEventLoop ) -> None :
17+ def __init__ (self , client : Client ) -> None :
2118 self .client = client
22- self .loop = loop
2319
2420 @activity .defn
25- def compose_greeting (self , input : ComposeGreetingInput ) -> str :
26- # Make a thread to complete this externally . This could be done in
21+ async def compose_greeting (self , input : ComposeGreetingInput ) -> str :
22+ # Schedule a task to complete this asynchronously . This could be done in
2723 # a completely different process or system.
2824 print ("Completing activity asynchronously" )
29- Thread (
30- target = self .complete_greeting ,
31- args = (activity .info ().task_token , input ),
32- ).start ()
25+ # Tasks stored by asyncio are weak references and therefore can get GC'd
26+ # which can cause warnings like "Task was destroyed but it is pending!".
27+ # So we store the tasks ourselves.
28+ # See https://docs.python.org/3/library/asyncio-task.html#creating-tasks,
29+ # https://bugs.python.org/issue21163 and others.
30+ _ = asyncio .create_task (
31+ self .complete_greeting (activity .info ().task_token , input )
32+ )
3333
3434 # Raise the complete-async error which will complete this function but
3535 # does not consider the activity complete from the workflow perspective
3636 activity .raise_complete_async ()
3737
38- def complete_greeting (self , task_token : bytes , input : ComposeGreetingInput ) -> None :
38+ async def complete_greeting (
39+ self , task_token : bytes , input : ComposeGreetingInput
40+ ) -> None :
3941 # Let's wait three seconds, heartbeating each second. Note, heartbeating
4042 # during async activity completion is done via the client directly. It
4143 # is often important to heartbeat so the server can know when an
4244 # activity has crashed.
4345 handle = self .client .get_async_activity_handle (task_token = task_token )
4446 for _ in range (0 , 3 ):
4547 print ("Waiting one second..." )
46- asyncio . run_coroutine_threadsafe ( handle .heartbeat (), self . loop ). result ()
47- time .sleep (1 )
48+ await handle .heartbeat ()
49+ await asyncio .sleep (1 )
4850
4951 # Complete using the handle
50- asyncio .run_coroutine_threadsafe (
51- handle .complete (f"{ input .greeting } , { input .name } !" ), self .loop
52- ).result ()
52+ await handle .complete (f"{ input .greeting } , { input .name } !" )
5353
5454
5555@workflow .defn
@@ -70,16 +70,13 @@ async def main():
7070 # Start client
7171 client = await Client .connect ("localhost:7233" )
7272
73- loop = asyncio .get_event_loop ()
74-
7573 # Run a worker for the workflow
76- composer = GreetingComposer (client , loop )
74+ composer = GreetingComposer (client )
7775 async with Worker (
7876 client ,
7977 task_queue = "hello-async-activity-completion-task-queue" ,
8078 workflows = [GreetingWorkflow ],
8179 activities = [composer .compose_greeting ],
82- activity_executor = ThreadPoolExecutor (5 ),
8380 ):
8481
8582 # While the worker is running, use the client to run the workflow and
0 commit comments