Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/source/dev_api_wfinstances.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ wfcommons.wfinstances.logs.makeflow
:private-members:
:noindex:

wfcommons.wfinstances.logs.taskvine
-----------------------------------

.. automodule:: wfcommons.wfinstances.logs.taskvine
:members:
:undoc-members:
:show-inheritance:
:private-members:
:noindex:

wfcommons.wfinstances.logs.nextflow
-----------------------------------

Expand Down
8 changes: 8 additions & 0 deletions docs/source/user_api_wfbench.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ wfcommons.wfbench.translator.cwl
:undoc-members:
:show-inheritance:

wfcommons.wfbench.translator.streamflow
--------------------------------

.. automodule:: wfcommons.wfbench.translator.streamflow
:members:
:undoc-members:
:show-inheritance:

wfcommons.wfbench.translator.dask
---------------------------------

Expand Down
1 change: 1 addition & 0 deletions tests/translators_loggers/Dockerfile.streamflow
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ RUN apt-get -y install vim --fix-missing
RUN apt-get -y install gcc
RUN apt-get -y install gcc-multilib
RUN apt-get -y install graphviz libgraphviz-dev
RUN apt-get -y install zip



Expand Down
10 changes: 8 additions & 2 deletions tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@ def run_workflow_streamflow(container, num_tasks, str_dirpath):
uuid = output.decode().splitlines()[1].strip().split(" ")[0]
exit_code, output = container.exec_run(cmd=f"streamflow prov {uuid}",
user="wfcommons", stdout=True, stderr=True)
exit_code, output = container.exec_run(cmd=f"mkdir RO-Crate",
user="wfcommons", stdout=True, stderr=True)
exit_code, output = container.exec_run(cmd=f"unzip *.zip -d ./RO-Crate",
user="wfcommons", stdout=True, stderr=True)

def run_workflow_pegasus(container, num_tasks, str_dirpath):
# Run the workflow!
Expand Down Expand Up @@ -330,8 +334,10 @@ def test_translator(self, backend) -> None:
parser = TaskVineLogsParser(dirpath / "vine-run-info/most-recent/vine-logs", filenames_to_ignore=["cpu-benchmark","stress-ng", "wfbench"])
elif backend == "makeflow":
parser = MakeflowLogsParser(execution_dir = dirpath, resource_monitor_logs_dir = dirpath / "monitor_data/")
# elif backend == "streamflow":
# parser =ROCrateLogsParser(dirpath / "work/wfcommons/most-recent/wfbench")
elif backend == "streamflow":
parser = ROCrateLogsParser(dirpath / "RO-Crate",
steps_to_ignore=["main.cwl#compile_output_files", "main.cwl#compile_log_files"],
file_extensions_to_ignore=[".out", ".err"])

if parser is not None:
sys.stderr.write(f"[{backend}] Parsing the logs...\n")
Expand Down
1,509 changes: 0 additions & 1,509 deletions uv.lock

This file was deleted.

6 changes: 6 additions & 0 deletions wfcommons/wfbench/translator/streamflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ def __init__(self,
super().__init__(workflow, logger)

def translate(self, output_folder: pathlib.Path) -> None:
"""
Translate a workflow benchmark description (WfFormat) into an actual workflow application.

:param output_folder: The path to the folder in which the workflow benchmark will be generated.
:type output_folder: pathlib.Path
"""
# Perform the CWL translation (which will create the output folder)
from wfcommons.wfbench import CWLTranslator
cwl_translator = CWLTranslator(workflow=self.workflow, logger=self.logger)
Expand Down
102 changes: 81 additions & 21 deletions wfcommons/wfinstances/logs/ro_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,18 @@ class ROCrateLogsParser(LogsParser):
def __init__(self,
crate_dir: pathlib.Path,
description: Optional[str] = None,
logger: Optional[Logger] = None) -> None:
logger: Optional[Logger] = None,
steps_to_ignore: Optional[list[str]]=[],
file_extensions_to_ignore: Optional[list[str]]=[],
) -> None:
"""Create an object of the RO crate parser."""

# TODO: Decide if these should be RO crate or Streamflow or whatev
super().__init__('Streamflow-ROCrate', 'https://w3id.org/workflowhub/workflow-ro-crate/1.0', description, logger)

# Sanity check
if steps_to_ignore is None:
steps_to_ignore = []
if not crate_dir.is_dir():
raise OSError(f'The provided path does not exist or is not a folder: {crate_dir}')

Expand All @@ -63,6 +68,12 @@ def __init__(self,

self.file_objects = {}

self.task_id_name_map: dict[str, str] = {}
self.data_file_id_name_map: dict[str, str] = {}

self.steps_to_ignore = steps_to_ignore
self.file_extensions_to_ignore = file_extensions_to_ignore


def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
"""
Expand All @@ -89,16 +100,29 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
# Dictionary of ro-crate objects by "@id"
self.lookup = {item["@id"]: item for item in self.graph_data}

# Dictionary of application data files
self._construct_data_file_id_name_map()

# Find id of the main workflow
overview = self.lookup.get("./")
main_workflow_id = overview.get("mainEntity").get("@id")

create_actions = list(filter((lambda x: x.get('@type') == "CreateAction"), self.graph_data))
self._create_tasks(create_actions, main_workflow_id)

return self.workflow


return self.workflow
def _construct_data_file_id_name_map(self):
for item in self.graph_data:
if item["@type"] != "File":
continue
id = item["@id"]
if "alternateName" not in item:
continue
alternate_name = item["alternateName"]
self.data_file_id_name_map[id] = alternate_name


def _create_tasks(self, create_actions, main_workflow_id):
# Object to track dependencies between tasks based on files
Expand All @@ -113,6 +137,18 @@ def _create_tasks(self, create_actions, main_workflow_id):
self._process_main_workflow(create_action)
continue

create_action['name'] = create_action['name'].removeprefix("Run of workflow/")
# print("***************************************")
# print("DEALING WITH TASK:", create_action['name'])

# Below would remove the "file.cwl#" tag, which runs the risk
# of non-uniqueness of action names perhaps
# create_action['name'] = create_action['name'].split('#', 1)[-1]

# Check if we should ignore this step
if create_action["name"] in self.steps_to_ignore:
continue

# Get all input & output for the create_action
input = [obj['@id'] for obj in create_action['object']]
output = [obj['@id'] for obj in create_action['result']]
Expand All @@ -121,17 +157,19 @@ def _create_tasks(self, create_actions, main_workflow_id):
input_files = self._filter_file_ids(input)
output_files = self._filter_file_ids(output)

create_action['name'] = create_action['name'].removeprefix("Run of workflow/")

task = Task(name=create_action['name'],
task_id=create_action['@id'],
task_id=create_action['name'],
# task_id=create_action['name'] + "_" + create_action['@id'],
task_type=TaskType.COMPUTE,
runtime=self._time_diff(create_action['startTime'], create_action['endTime']),
executed_at=create_action['startTime'],
input_files=self._get_file_objects(input_files),
output_files=self._get_file_objects(output_files),
logger=self.logger)
self.workflow.add_task(task)
self.task_id_name_map[create_action['@id']] = create_action['name']
# self.task_id_name_map[create_action['@id']] = create_action['name'] + "_" + create_action['@id']

# For each file, track which task(s) it is in/output for
for infile in input_files:
Expand Down Expand Up @@ -159,23 +197,32 @@ def _create_tasks(self, create_actions, main_workflow_id):
self._add_dependencies(files, instruments)

def _add_dependencies(self, files, instruments):

# File dependencies
for file in files.values():
for parent in file.get('out', []):
for child in file.get('in', []):
self.workflow.add_dependency(parent, child)

# Assumes
parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
for parameter_connection in parameter_connections:
source = parameter_connection["sourceParameter"]["@id"]
source = source.rsplit("#", 1)[0] # Trim to get instrument
self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])

# THIS IS COMMENTED OUT AT IT SEEMS TO ADD TONS OF NON-EXISTING DEPENDENCIES ON WORKFLOW BENCHMARKS
# (FOR INSTANCE, IT TOTALLY BREAKS THE BENCHMARK WORKFLOW DUE TO ALL OF THEM USING shell.cwl#output_files
# parameter_connections = list(filter((lambda x: x.get('@type') == "ParameterConnection"), self.graph_data))
# for parameter_connection in parameter_connections:
# # parameter_connection["sourceParameter"] is either a single dict or a list of dicts,
# # which is bad design but whatever
# source_parameters = parameter_connection["sourceParameter"]
# if not isinstance(source_parameters, list):
# source_parameters = [source_parameters]
# source = item["@id"]
# source = source.rsplit("#", 1)[0] # Trim to get instrument
#
# target = parameter_connection["targetParameter"]["@id"]
# target = target.rsplit("#", 1)[0] # Trim to get instrument
#
# for parent in instruments.get(source, []):
# for child in instruments.get(target, []):
# self.workflow.add_dependency(self.task_id_name_map[parent], self.task_id_name_map[child])

target = parameter_connection["targetParameter"]["@id"]
target = target.rsplit("#", 1)[0] # Trim to get instrument

for parent in instruments.get(source, []):
for child in instruments.get(target, []):
self.workflow.add_dependency(parent, child)

def _time_diff(self, start_time, end_time):
diff = datetime.fromisoformat(end_time) - datetime.fromisoformat(start_time)
Expand All @@ -186,19 +233,20 @@ def _get_file_objects(self, files):
output = []
for file in files:
if file not in self.file_objects:
self.file_objects[file] = File(file_id=file,
self.file_objects[file] = File(file_id=self.data_file_id_name_map[file],
size=os.path.getsize(f"{self.crate_dir}/{file}"),
logger=self.logger)
output.append(self.file_objects[file])
return output

def _filter_file_ids(self, ids):
# Given a list of "@id"s, returns those with the File type as well as unpacks PropertyValue into Files.
file_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'File', ids))

file_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'File', ids))
property_value_ids = list(filter(lambda x: self.lookup.get(x)['@type'] == 'PropertyValue', ids))
for property_value_id in property_value_ids:
property_values = self.lookup.get(property_value_id)['value']
if isinstance(property_values, dict):
property_values = [property_values]

# Filter out values without "@id"s (i.e. int values, etc.)
pv_contained_ids = list(filter(lambda x: isinstance(x, dict) and "@id" in x, property_values))
Expand All @@ -210,7 +258,19 @@ def _filter_file_ids(self, ids):
# Filter duplicates while adding
file_ids = list(set(file_ids + pv_filtered_ids))

return file_ids
# Removing files based on file extensions
to_return = []
for file_id in file_ids:
to_ignore = False
for suffix in self.file_extensions_to_ignore:
if self.data_file_id_name_map[file_id].endswith(suffix):
to_ignore = True
break
if not to_ignore:
to_return.append(file_id)

return to_return

def _process_main_workflow(self, main_workflow):
self.workflow.makespan = self._time_diff(main_workflow['startTime'], main_workflow['endTime'])
self.workflow.executed_at = main_workflow['startTime']