Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ repos:
args: [--wrap, "88"]
files: (README\.md)
- repo: https://github.com/crate-ci/typos
rev: v1
rev: v1.46.0
hooks:
- id: typos
- repo: meta
Expand Down
4 changes: 4 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ chronological order. Releases follow [semantic versioning](https://semver.org/)
releases are available on [PyPI](https://pypi.org/project/pytask-parallel) and
[Anaconda.org](https://anaconda.org/conda-forge/pytask-parallel).

## 0.5.3 - 2026-05-01

- {pull}`153` adds compatibility with the new pytask 0.6.0 release.

## 0.5.2 - 2026-02-06

- {pull}`129` drops support for Python 3.8 and 3.9 and adds support for Python 3.14.
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ unsafe-fixes = true

[tool.ruff.lint]
extend-ignore = [
"ANN401", # flake8-annotate typing.Any
"COM812", # Comply with ruff-format.
"ISC001", # Comply with ruff-format.
]
Expand Down
6 changes: 3 additions & 3 deletions src/pytask_parallel/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _configure_worker(root: str | None) -> None:
sys.path.insert(0, root)


def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any:
def _deserialize_and_run_with_cloudpickle(fn: bytes, kwargs: bytes) -> Any: # noqa: ANN401
"""Deserialize and execute a function and keyword arguments."""
deserialized_fn = cloudpickle.loads(fn)
deserialized_kwargs = cloudpickle.loads(kwargs)
Expand All @@ -77,8 +77,8 @@ def submit(
self,
fn: Callable[..., Any],
/,
*args: Any, # noqa: ARG002
**kwargs: Any,
*args: Any, # noqa: ANN401, ARG002
**kwargs: Any, # noqa: ANN401
) -> Future[Any]:
"""Submit a new task."""
return super().submit(
Expand Down
35 changes: 25 additions & 10 deletions src/pytask_parallel/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@
from pytask_parallel.wrappers import WrapperResult


def _get_task_from_dag(session: Session, task_name: str) -> PTask:
"""Get a task from the pre- and post-pytask 0.6 DAG representations for compat."""
node = session.dag.nodes[task_name]
task = node["task"] if isinstance(node, dict) else node

if not isinstance(task, PTask):
msg = f"Expected {task_name!r} to resolve to a task."
raise TypeError(msg)

return task


@hookimpl
def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR0912, PLR0915
"""Execute tasks with a parallel backend.
Expand Down Expand Up @@ -68,8 +80,13 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
with session.config["_parallel_executor"]:
sleeper = _Sleeper()

scheduler = session.scheduler
if scheduler is None:
msg = "Expected the scheduler to be initialized before executing tasks."
raise RuntimeError(msg)

i = 0
while session.scheduler.is_active():
while scheduler.is_active():
try:
newly_collected_reports = []

Expand All @@ -88,13 +105,11 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
n_new_tasks = session.config["n_workers"] - len(running_tasks)

ready_tasks = (
list(session.scheduler.get_ready(n_new_tasks))
if n_new_tasks >= 1
else []
list(scheduler.get_ready(n_new_tasks)) if n_new_tasks >= 1 else []
)

for task_name in ready_tasks:
task = session.dag.nodes[task_name]["task"]
task = _get_task_from_dag(session, task_name)
session.hook.pytask_execute_task_log_start(
session=session, task=task
)
Expand All @@ -111,7 +126,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
task, sys.exc_info()
)
newly_collected_reports.append(report)
session.scheduler.done(task_name)
scheduler.done(task_name)

if not ready_tasks:
sleeper.increment()
Expand All @@ -133,17 +148,17 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091
)

if wrapper_result.exc_info is not None:
task = session.dag.nodes[task_name]["task"]
task = _get_task_from_dag(session, task_name)
newly_collected_reports.append(
ExecutionReport.from_task_and_exception(
task,
cast("Any", wrapper_result.exc_info),
)
)
running_tasks.pop(task_name)
session.scheduler.done(task_name)
scheduler.done(task_name)
else:
task = session.dag.nodes[task_name]["task"]
task = _get_task_from_dag(session, task_name)
_update_carry_over_products(
task, wrapper_result.carry_over_products
)
Expand All @@ -161,7 +176,7 @@ def pytask_execute_build(session: Session) -> bool | None: # noqa: C901, PLR091

running_tasks.pop(task_name)
newly_collected_reports.append(report)
session.scheduler.done(task_name)
scheduler.done(task_name)

for report in newly_collected_reports:
session.hook.pytask_execute_task_process_report(
Expand Down
2 changes: 1 addition & 1 deletion src/pytask_parallel/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ def load(self, is_product: bool = False) -> Path: # noqa: ARG002, FBT001, FBT00
self.node.path = path
return self.node.load(is_product=self.is_product)

def save(self, value: Any) -> None:
def save(self, value: Any) -> None: # noqa: ANN401
"""Save strings or bytes to file."""
self.value = value
4 changes: 2 additions & 2 deletions src/pytask_parallel/typing.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Contains functions related to typing."""

from pathlib import Path
from pathlib import PosixPath
from pathlib import WindowsPath
from typing import Any
from typing import NamedTuple

from pytask import PTask
Expand All @@ -16,7 +16,7 @@ def is_coiled_function(task: PTask) -> bool:
return "coiled_kwargs" in task.attributes


def is_local_path(path: Path) -> bool:
def is_local_path(path: Any) -> bool: # noqa: ANN401
"""Check if a path is local."""
return isinstance(path, (FilePath, PosixPath, WindowsPath))

Expand Down
2 changes: 1 addition & 1 deletion src/pytask_parallel/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _safe_load(
*,
is_product: bool,
remote: bool,
) -> Any:
) -> Any: # noqa: ANN401
"""Load a node and catch exceptions."""
_rich_traceback_guard = True
# Get the argument name like "path" or "return" for function returns.
Expand Down
13 changes: 8 additions & 5 deletions src/pytask_parallel/wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class WrapperResult:
stderr: str


def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult:
def wrap_task_in_thread(task: PTask, *, remote: bool, **kwargs: Any) -> WrapperResult: # noqa: ANN401
"""Mock execution function such that it returns the same as for processes.

The function for processes returns ``warning_reports`` and an ``exception``. With
Expand Down Expand Up @@ -191,7 +191,7 @@ def rewrap_task_with_coiled_function(task: PTask) -> CoiledFunction:
return cast("CoiledFunction", decorated)


def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ARG001
def _raise_exception_on_breakpoint(*args: Any, **kwargs: Any) -> None: # noqa: ANN401, ARG001
msg = (
"You cannot use 'breakpoint()' or 'pdb.set_trace()' while parallelizing the "
"execution of tasks with pytask-parallel. Please, remove the breakpoint or run "
Expand Down Expand Up @@ -223,11 +223,14 @@ def _render_traceback_to_string(
traceback = Traceback(exc_info, show_locals=show_locals)
segments = console.render(cast("Any", traceback), options=console_options)
text = "".join(segment.text for segment in segments)
return (*exc_info[:2], text)
return exc_info[0], exc_info[1], text


def _handle_function_products(
task: PTask, out: Any, *, remote: bool = False
task: PTask,
out: Any, # noqa: ANN401
*,
remote: bool = False,
) -> PyTree[CarryOverPath | PythonNode | None]:
"""Handle the products of the task.

Expand Down Expand Up @@ -310,7 +313,7 @@ def _delete_local_files_on_remote(kwargs: dict[str, PyTree[Any]]) -> None:

"""

def _delete(potential_node: Any) -> None:
def _delete(potential_node: Any) -> None: # noqa: ANN401
if isinstance(potential_node, RemotePathNode):
with suppress(OSError):
os.close(potential_node.fd)
Expand Down