22from __future__ import annotations
33
44import enum
5+ from concurrent .futures import Future
56from concurrent .futures import ProcessPoolExecutor
67from concurrent .futures import ThreadPoolExecutor
8+ from typing import Any
9+ from typing import Callable
10+
11+ import cloudpickle
12+
13+
14+ def deserialize_and_run_with_cloudpickle (
15+ fn : Callable [..., Any ], kwargs : dict [str , Any ]
16+ ) -> Any :
17+ """Deserialize and execute a function and keyword arguments."""
18+ deserialized_fn = cloudpickle .loads (fn )
19+ deserialized_kwargs = cloudpickle .loads (kwargs )
20+ return deserialized_fn (** deserialized_kwargs )
21+
22+
23+ class CloudpickleProcessPoolExecutor (ProcessPoolExecutor ):
24+ """Patches the standard executor to serialize functions with cloudpickle."""
25+
26+ # The type signature is wrong for version above Py3.7. Fix when 3.7 is deprecated.
27+ def submit ( # type: ignore[override]
28+ self , fn : Callable [..., Any ], * args : Any , ** kwargs : Any # noqa: ARG002
29+ ) -> Future [Any ]:
30+ """Submit a new task."""
31+ return super ().submit (
32+ deserialize_and_run_with_cloudpickle ,
33+ fn = cloudpickle .dumps (fn ),
34+ kwargs = cloudpickle .dumps (kwargs ),
35+ )
736
837
938try :
@@ -20,7 +49,7 @@ class ParallelBackendChoices(enum.Enum):
2049 PARALLEL_BACKENDS_DEFAULT = ParallelBackendChoices .PROCESSES
2150
2251 PARALLEL_BACKENDS = {
23- ParallelBackendChoices .PROCESSES : ProcessPoolExecutor ,
52+ ParallelBackendChoices .PROCESSES : CloudpickleProcessPoolExecutor ,
2453 ParallelBackendChoices .THREADS : ThreadPoolExecutor ,
2554 }
2655
@@ -36,7 +65,7 @@ class ParallelBackendChoices(enum.Enum): # type: ignore[no-redef]
3665 PARALLEL_BACKENDS_DEFAULT = ParallelBackendChoices .PROCESSES
3766
3867 PARALLEL_BACKENDS = {
39- ParallelBackendChoices .PROCESSES : ProcessPoolExecutor ,
68+ ParallelBackendChoices .PROCESSES : CloudpickleProcessPoolExecutor ,
4069 ParallelBackendChoices .THREADS : ThreadPoolExecutor ,
4170 ParallelBackendChoices .LOKY : ( # type: ignore[attr-defined]
4271 get_reusable_executor
0 commit comments