44import inspect
55import sys
66import time
7+ from concurrent .futures import Future
8+ from types import TracebackType
79from typing import Any
810
911import cloudpickle
1214from pytask import ExecutionReport
1315from pytask import hookimpl
1416from pytask import remove_internal_traceback_frames_from_exc_info
17+ from pytask import Session
18+ from pytask import Task
1519from pytask_parallel .backends import PARALLEL_BACKENDS
1620from rich .console import ConsoleOptions
1721from rich .traceback import Traceback
1822
1923
2024@hookimpl
21- def pytask_post_parse (config ) :
25+ def pytask_post_parse (config : dict [ str , Any ]) -> None :
2226 """Register the parallel backend."""
2327 if config ["parallel_backend" ] in ("loky" , "processes" ):
2428 config ["pm" ].register (ProcessesNameSpace )
@@ -27,7 +31,7 @@ def pytask_post_parse(config):
2731
2832
2933@hookimpl (tryfirst = True )
30- def pytask_execute_build (session ) :
34+ def pytask_execute_build (session : Session ) -> bool | None :
3135 """Execute tasks with a parallel backend.
3236
3337 There are three phases while the scheduler has tasks which need to be executed.
@@ -40,7 +44,7 @@ def pytask_execute_build(session):
4044 """
4145 if session .config ["n_workers" ] > 1 :
4246 reports = session .execution_reports
43- running_tasks = {}
47+ running_tasks : dict [ str , Future [ Any ]] = {}
4448
4549 parallel_backend = PARALLEL_BACKENDS [session .config ["parallel_backend" ]]
4650
@@ -137,13 +141,15 @@ def pytask_execute_build(session):
137141 break
138142
139143 return True
144+ return None
140145
141146
142147class ProcessesNameSpace :
143148 """The name space for hooks related to processes."""
144149
150+ @staticmethod
145151 @hookimpl (tryfirst = True )
146- def pytask_execute_task (session , task ): # noqa: N805
152+ def pytask_execute_task (session : Session , task : Task ) -> Future [ Any ] | None :
147153 """Execute a task.
148154
149155 Take a task, pickle it and send the bytes over to another process.
@@ -162,11 +168,15 @@ def pytask_execute_task(session, task): # noqa: N805
162168 show_locals = session .config ["show_locals" ],
163169 console_options = console .options ,
164170 )
171+ return None
165172
166173
167174def _unserialize_and_execute_task (
168- bytes_function , bytes_kwargs , show_locals , console_options
169- ):
175+ bytes_function : bytes ,
176+ bytes_kwargs : bytes ,
177+ show_locals : bool ,
178+ console_options : ConsoleOptions ,
179+ ) -> tuple [type [BaseException ], BaseException , str ] | None :
170180 """Unserialize and execute task.
171181
172182 This function receives bytes and unpickles them to a task which is them execute
@@ -184,11 +194,14 @@ def _unserialize_and_execute_task(
184194 exc_info = sys .exc_info ()
185195 processed_exc_info = _process_exception (exc_info , show_locals , console_options )
186196 return processed_exc_info
197+ return None
187198
188199
189200def _process_exception (
190- exc_info : tuple [Any ], show_locals : bool , console_options : ConsoleOptions
191- ) -> tuple [Any ]:
201+ exc_info : tuple [type [BaseException ], BaseException , TracebackType | None ],
202+ show_locals : bool ,
203+ console_options : ConsoleOptions ,
204+ ) -> tuple [type [BaseException ], BaseException , str ]:
192205 """Process the exception and convert the traceback to a string."""
193206 exc_info = remove_internal_traceback_frames_from_exc_info (exc_info )
194207 traceback = Traceback .from_exception (* exc_info , show_locals = show_locals )
@@ -200,8 +213,9 @@ def _process_exception(
200213class DefaultBackendNameSpace :
201214 """The name space for hooks related to threads."""
202215
216+ @staticmethod
203217 @hookimpl (tryfirst = True )
204- def pytask_execute_task (session , task ): # noqa: N805
218+ def pytask_execute_task (session : Session , task : Task ) -> Future [ Any ] | None :
205219 """Execute a task.
206220
207221 Since threads have shared memory, it is not necessary to pickle and unpickle the
@@ -211,9 +225,11 @@ def pytask_execute_task(session, task): # noqa: N805
211225 if session .config ["n_workers" ] > 1 :
212226 kwargs = _create_kwargs_for_task (task )
213227 return session .executor .submit (task .execute , ** kwargs )
228+ else :
229+ return None
214230
215231
216- def _create_kwargs_for_task (task ) :
232+ def _create_kwargs_for_task (task : Task ) -> dict [ Any , Any ] :
217233 """Create kwargs for task function."""
218234 kwargs = {** task .kwargs }
219235
0 commit comments