22import copy
33import functools
44import subprocess
5+ from pathlib import Path
56from typing import Iterable
67from typing import Optional
78from typing import Union
1213from _pytask .nodes import FilePathNode
1314from _pytask .nodes import PythonFunctionTask
1415from _pytask .parametrize import _copy_func
15- from _pytask .shared import to_list
1616
1717
1818def r (options : Optional [Union [str , Iterable [str ]]] = None ):
@@ -31,10 +31,9 @@ def r(options: Optional[Union[str, Iterable[str]]] = None):
3131 return options
3232
3333
34- def run_r_script (depends_on , r ):
34+ def run_r_script (r ):
3535 """Run an R script."""
36- script = to_list (depends_on )[0 ]
37- subprocess .run (["Rscript" , script .as_posix (), * r ], check = True )
36+ subprocess .run (r , check = True )
3837
3938
4039@hookimpl
@@ -50,32 +49,37 @@ def pytask_collect_task(session, path, name, obj):
5049 task = PythonFunctionTask .from_path_name_function_session (
5150 path , name , obj , session
5251 )
52+
53+ return task
54+
55+
56+ @hookimpl
57+ def pytask_collect_task_teardown (session , task ):
58+ """Perform some checks."""
59+ if get_specific_markers_from_task (task , "r" ):
60+ source = _get_node_from_dictionary (task .depends_on , "source" )
61+ if isinstance (source , FilePathNode ) and source .value .suffix not in [".r" , ".R" ]:
62+ raise ValueError (
63+ "The first dependency of an R task must be the executable script."
64+ )
65+
5366 r_function = _copy_func (run_r_script )
5467 r_function .pytaskmark = copy .deepcopy (task .function .pytaskmark )
5568
5669 merged_marks = _merge_all_markers (task )
5770 args = r (* merged_marks .args , ** merged_marks .kwargs )
58- r_function = functools .partial (r_function , r = args )
71+ options = _prepare_cmd_options (session , task , args )
72+ r_function = functools .partial (r_function , r = options )
5973
6074 task .function = r_function
6175
62- return task
63-
6476
65- @hookimpl
66- def pytask_collect_task_teardown (task ):
67- """Perform some checks.
68-
69- Remove is task is none check with pytask 0.0.9.
70-
71- """
72- if task is not None and get_specific_markers_from_task (task , "r" ):
73- if isinstance (task .depends_on [0 ], FilePathNode ) and task .depends_on [
74- 0
75- ].value .suffix not in [".r" , ".R" ]:
76- raise ValueError (
77- "The first dependency of an R task must be the executable script."
78- )
77+ def _get_node_from_dictionary (obj , key , fallback = 0 ):
78+ if isinstance (obj , Path ):
79+ pass
80+ elif isinstance (obj , dict ):
81+ obj = obj .get (key ) or obj .get (fallback )
82+ return obj
7983
8084
8185def _merge_all_markers (task ):
@@ -85,3 +89,14 @@ def _merge_all_markers(task):
8589 for mark_ in r_marks [1 :]:
8690 mark = mark .combined_with (mark_ )
8791 return mark
92+
93+
94+ def _prepare_cmd_options (session , task , args ):
95+ """Prepare the command line arguments to execute the do-file.
96+
97+ The last entry changes the name of the log file. We take the task id as a name which
98+ is unique and does not cause any errors when parallelizing the execution.
99+
100+ """
101+ source = _get_node_from_dictionary (task .depends_on , session .config ["r_source_key" ])
102+ return ["Rscript" , source .value .as_posix (), * args ]
0 commit comments