Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pandaserver/workflow/psnakemake_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def main():
logging.info(f"{os.path.basename(__file__)}: workflow_file = {workflow_file}")
parser = Parser(workflow_file, level=logging.DEBUG)
nodes, root_in = parser.parse_nodes()
_ = parser.parse_code()
dot_data = parser.get_dot_data()
logging.info(f"dot data ={os.linesep}{dot_data}")
s_id, t_nodes, nodes = resolve_nodes(nodes, root_in, data, 0, set(), sys.argv[2], logging)
Expand Down
58 changes: 26 additions & 32 deletions pandaserver/workflow/snakeparser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
import pathlib
import re
from itertools import chain
from pathlib import Path
from types import SimpleNamespace

import snakemake.dag
import snakemake.parser
import snakemake.persistence
import snakemake.workflow
from snakemake.api import (
OutputSettings,
ResourceSettings,
SnakemakeApi,
StorageSettings,
)

from pandaserver.workflow.snakeparser.utils import ParamRule, param_of
from pandaserver.workflow.workflow_utils import ConditionItem, Node

Expand Down Expand Up @@ -59,32 +63,39 @@ def __init__(self, workflow_file, level=None, logger=None):
snakefile = os.path.abspath(workflow_file)
workdir = os.path.dirname(snakefile)
self._logger.debug("create workflow")
self._workflow = snakemake.workflow.Workflow(snakefile=snakefile, overwrite_workdir=None)
# create workflow through API
with SnakemakeApi(
OutputSettings(
verbose=False,
show_failed_logs=True,
),
) as snakemake_api:
workflow_api = snakemake_api.workflow(
storage_settings=StorageSettings(),
resource_settings=ResourceSettings(),
snakefile=Path(snakefile),
)
dag_api = workflow_api.dag()
self._workflow = workflow_api._workflow
self._workflow.default_target = "all"
self._workflow.overwrite_workdir = None
current_workdir = os.getcwd()
try:
inject()
self._workflow.workdir(workdir)
self._workflow.include(self._workflow.main_snakefile, overwrite_default_target=True)
finally:
if current_workdir:
os.chdir(current_workdir)
# build DAG
dag_api.unlock()
self._dag = self._workflow.dag

@property
def jobs(self):
if self._dag is None:
return list()
return self._dag.jobs

def parse_code(self):
if self._workflow is None:
return None
code, _, __ = snakemake.parser.parse(
snakemake.workflow.GenericSourceFile(self._workflow.main_snakefile),
self._workflow,
)
return code

def parse_nodes(self, in_loop=False):
try:
return self._parse_nodes(in_loop)
Expand All @@ -102,8 +113,6 @@ def parse_nodes(self, in_loop=False):
raise ex

def _parse_nodes(self, in_loop):
if self._dag is None:
self._build_dag()
root_job = next(filter(lambda o: o.rule.name == self._workflow.default_target, self.jobs))
root_inputs = {Parser._extract_job_id(self._define_id(name)): value for name, value in root_job.params.items()}
root_outputs = set(
Expand Down Expand Up @@ -269,23 +278,8 @@ def verify_workflow(self):
return True

def get_dot_data(self):
if self._dag is None:
self._build_dag()
return str(self._dag)

def _build_dag(self):
target_rules = list(filter(lambda o: o.name == self._workflow.default_target, self._workflow.rules))
self._dag = snakemake.dag.DAG(
self._workflow,
rules=self._workflow.rules,
targetrules=target_rules,
targetfiles=set(),
)
self._workflow.persistence = snakemake.persistence.Persistence(dag=self._dag)
self._dag.init()
self._dag.update_checkpoint_dependencies()
self._dag.check_dynamic()

@staticmethod
def _extract_job_id(job_id):
if not job_id:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ dependencies = [
'ruamel.yaml',
'cwl-utils>=0.13',
'packaging',
'snakemake==7.30.1',
'snakemake>=9.14.5',
'numpy',
'scipy',
'werkzeug',
Expand Down