Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 58 additions & 16 deletions Snakefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
from spras import runner
import shutil
import json
import yaml
from pathlib import Path
from spras.containers import TimeoutError
from spras.dataset import Dataset
from spras.evaluation import Evaluation
from spras.analysis import ml, summary, cytoscape
Expand Down Expand Up @@ -262,6 +265,22 @@ def collect_prepared_input(wildcards):

return prepared_inputs

def mark_error(file, **details):
Comment thread
tristan-f-r marked this conversation as resolved.
"""Marks a file as an error with associated details."""
Path(file).write_text(json.dumps({"status": "error", **details}))

def is_error(file):
"""Checks if a file was produced by mark_error."""
try:
with open(file, 'r') as f:
json.load(f)["status"] == "error"
except ValueError:
return False

def filter_successful(files):
"""Convenient function for filtering iterators by whether or not their items are error files."""
return [file for file in files if not is_error(file)]

# Run the pathway reconstruction algorithm
rule reconstruct:
input: collect_prepared_input
Expand All @@ -270,25 +289,48 @@ rule reconstruct:
# Overwriting files can happen because the pathway reconstruction algorithms often generate output files with the
# same name regardless of the inputs or parameters, and these aren't renamed until after the container command
# terminates
output: pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt'])
output:
pathway_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']),
# Despite this being a 'log' file, we don't use the log directive as this rule doesn't actually throw errors.
resource_info = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'resource-log.json'])
Comment thread
tristan-f-r marked this conversation as resolved.
params:
# Get the timeout from the config and use it as an input.
# TODO: This has unexpected behavior when this rule succeeds but the timeout extends,
# making this rule run again.
timeout = lambda wildcards: _config.config.algorithm_timeouts[wildcards.algorithm]
run:
# Create a copy so that the updates are not written to the parameters logfile
params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
algorithm_params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
# Declare the input files as a dictionary.
inputs = dict(zip(runner.get_required_inputs(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm)), *{input}, strict=True))
# Remove the _spras_run_name parameter added for keeping track of the run name for parameters.yml
if '_spras_run_name' in params:
params.pop('_spras_run_name')
runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, params, container_settings)
if '_spras_run_name' in algorithm_params:
algorithm_params.pop('_spras_run_name')
try:
runner.run(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), inputs, output.pathway_file, algorithm_params, container_settings, params.timeout)
Path(output.resource_info).write_text(json.dumps({"status": "success"}))
except TimeoutError as err:
# We don't raise the error here (analogous to `--keep-going`, except we avoid unnecessarily re-running this rule.)
mark_error(output.resource_info, type="timeout", duration=params.timeout)
# and we touch pathway_file still: Snakemake doesn't have optional files, so we output a 'resource info' file,
# which contains the status (success/failure) of specific Snakemake jobs.
# We filter for the successful files (such as ones that didn't time out) with the `filter_successful` function.
Path(output.pathway_file).touch()

# Original pathway reconstruction output to universal output
# Use PRRunner as a wrapper to call the algorithm-specific parse_output
rule parse_output:
input:
raw_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'raw-pathway.txt']),
# We propagate up the resource_info error if it exists.
resource_info = rules.reconstruct.output.resource_info,
raw_file = rules.reconstruct.output.pathway_file,
dataset_file = SEP.join([out_dir, 'dataset-{dataset}-merged.pickle'])
output: standardized_file = SEP.join([out_dir, '{dataset}-{algorithm}-{params}', 'pathway.txt'])
run:
Comment thread
tristan-f-r marked this conversation as resolved.
if is_error(input.resource_info):
mark_error(output.standardized_file)
Comment thread
tristan-f-r marked this conversation as resolved.
return

params = reconstruction_params(wildcards.algorithm, wildcards.params).copy()
params['dataset'] = input.dataset_file
runner.parse_output(detach_spras_revision(_config.config.immutable_files, wildcards.algorithm), input.raw_file, output.standardized_file, params)
Expand All @@ -310,7 +352,7 @@ rule viz_cytoscape:
output:
session = SEP.join([out_dir, '{dataset}-cytoscape.cys'])
run:
cytoscape.run_cytoscape(input.pathways, output.session, container_settings)
cytoscape.run_cytoscape(filter_successful(input.pathways), output.session, container_settings)
Comment thread
tristan-f-r marked this conversation as resolved.


# Write a single summary table for all pathways for each dataset
Expand All @@ -323,7 +365,7 @@ rule summary_table:
run:
# Load the node table from the pickled dataset file
node_table = Dataset.from_file(input.dataset_file).node_table
summary_df = summary.summarize_networks(input.pathways, node_table, algorithm_params, algorithms_with_params)
summary_df = summary.summarize_networks(filter_successful(input.pathways), node_table, algorithm_params, algorithms_with_params)
summary_df.to_csv(output.summary_table, sep='\t', index=False)

# Cluster the output pathways for each dataset
Expand All @@ -339,7 +381,7 @@ rule ml_analysis:
hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-horizontal.png']),
hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', 'hac-clusters-horizontal.txt']),
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params)
ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params)
ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params)
Expand All @@ -353,7 +395,7 @@ rule jaccard_similarity:
jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', 'jaccard-matrix.txt']),
jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', 'jaccard-heatmap.png'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap)


Expand All @@ -364,7 +406,7 @@ rule ensemble:
output:
ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', 'ensemble-pathway.txt'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.ensemble_network(summary_df, output.ensemble_network_file)

# Returns all pathways for a specific algorithm
Expand All @@ -386,7 +428,7 @@ rule ml_analysis_aggregate_algo:
hac_image_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-horizontal.png']),
hac_clusters_horizontal = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-hac-clusters-horizontal.txt']),
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.hac_vertical(summary_df, output.hac_image_vertical, output.hac_clusters_vertical, **hac_params)
ml.hac_horizontal(summary_df, output.hac_image_horizontal, output.hac_clusters_horizontal, **hac_params)
ml.pca(summary_df, output.pca_image, output.pca_variance, output.pca_coordinates, **pca_params)
Expand All @@ -398,7 +440,7 @@ rule ensemble_per_algo:
output:
ensemble_network_file = SEP.join([out_dir,'{dataset}-ml', '{algorithm}-ensemble-pathway.txt'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.ensemble_network(summary_df, output.ensemble_network_file)

# Calculated Jaccard similarity between output pathways for each dataset per algorithm
Expand All @@ -409,7 +451,7 @@ rule jaccard_similarity_per_algo:
jaccard_similarity_matrix = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-matrix.txt']),
jaccard_similarity_heatmap = SEP.join([out_dir, '{dataset}-ml', '{algorithm}-jaccard-heatmap.png'])
run:
summary_df = ml.summarize_networks(input.pathways)
summary_df = ml.summarize_networks(filter_successful(input.pathways))
ml.jaccard_similarity_eval(summary_df, output.jaccard_similarity_matrix, output.jaccard_similarity_heatmap)

# Return the gold standard pickle file for a specific gold standard
Expand Down Expand Up @@ -440,7 +482,7 @@ rule evaluation_pr_per_pathways:
node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-nodes.png']),
run:
node_table = Evaluation.from_file(input.node_gold_standard_file).node_table
pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table)
pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table)
Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png)

# Returns all pathways for a specific algorithm and dataset
Expand All @@ -459,7 +501,7 @@ rule evaluation_per_algo_pr_per_pathways:
node_pr_png = SEP.join([out_dir, '{dataset_gold_standard_pair}-eval', 'pr-per-pathway-for-{algorithm}-nodes.png']),
run:
node_table = Evaluation.from_file(input.node_gold_standard_file).node_table
pr_df = Evaluation.node_precision_and_recall(input.pathways, node_table)
pr_df = Evaluation.node_precision_and_recall(filter_successful(input.pathways), node_table)
Evaluation.precision_and_recall_per_pathway(pr_df, output.node_pr_file, output.node_pr_png, include_aggregate_algo_eval)

# Return pathway summary file per dataset
Expand Down
1 change: 1 addition & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ algorithms:

- name: "allpairs"
include: true
timeout: 1d
Comment thread
tristan-f-r marked this conversation as resolved.

- name: "domino"
include: true
Expand Down
21 changes: 21 additions & 0 deletions docs/design/errors.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
########
Errors
########

By default, whenever SPRAS runs into a container error (i.e. an internal
algorithm error), the full workflow will fail. However, there are
certain designated errors where we don't want this to be the case (at
the moment, these designated errors are only container timeouts, but
this may be extended to heuristics in the future).

Due to the following design constraints:

- Snakemake does not have support for such errors (the closest being
``--keep-going``, which unnecessarily runs failed runs)
- SPRAS occasionally outputs empty files as genuine output
- We need to log errors that happen for user knowledge

we opt to use a ``resource-info.json`` file, which keeps track of the
success/failure status at certain failable parts of the workflow. This
file contains whether or not this part of the workflow succeeded, and if
it failed, how it failed.
7 changes: 7 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ reconstruction methods (PRMs) to omics data.

output
htcondor
timeout

.. toctree::
:maxdepth: 1
Expand All @@ -62,6 +63,12 @@ reconstruction methods (PRMs) to omics data.
contributing/patching
contributing/design

.. toctree::
:maxdepth: 1
:caption: Internal Designs

design/errors

.. toctree::
:maxdepth: 1
:caption: Tutorials
Expand Down
20 changes: 20 additions & 0 deletions docs/timeout.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
##########
Timeouts
##########

SPRAS allows for per-algorithm timeouts, specified under the global
configuration file. For example, to give the AllPairs algorithm a 1 day
timeout:

.. code:: yaml

- name: "allpairs"
include: true
timeout: 1d

The timeout string parsing is delegated to `pytimeparse
<https://pypi.org/project/pytimeparse/>`__, which allows for more
complicated timeout strings, such as ``3d2h32m``.

**NOTE**: This feature only works with docker and apptainer/singularity
at the time of writing.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies:
- scikit-learn=1.7.0
- seaborn=0.13.2
- spython=0.3.14
- pytimeparse=1.1.8

# conda-specific for dsub
- python-dateutil=2.9.0
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies = [
"scikit-learn==1.7.0",
"seaborn==0.13.2",
"spython==0.3.14",
"pytimeparse==1.1.8",

# toolchain deps
"pip==25.3",
Expand Down
5 changes: 3 additions & 2 deletions spras/allpairs.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def generate_inputs(data: Dataset, filename_map):
header=["#Interactor1", "Interactor2", "Weight"])

@staticmethod
def run(inputs, output_file, args=None, container_settings=None):
def run(inputs, output_file, args=None, container_settings=None, timeout=None):
if not container_settings: container_settings = ProcessedContainerSettings()
AllPairs.validate_required_run_args(inputs)

Expand Down Expand Up @@ -109,7 +109,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
container_settings)
container_settings,
timeout)

@staticmethod
def parse_output(raw_pathway_file, standardized_pathway_file, params):
Expand Down
1 change: 1 addition & 0 deletions spras/analysis/cytoscape.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ def run_cytoscape(pathways: List[Union[str, PurePath]], output_file: str, contai
# (https://github.com/Reed-CompBio/spras/pull/390/files#r2485100875)
None,
container_settings,
None,
Comment thread
tristan-f-r marked this conversation as resolved.
env)
rmtree(cytoscape_output_dir)
5 changes: 3 additions & 2 deletions spras/btb.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def generate_inputs(data, filename_map):

# Skips parameter validation step
@staticmethod
def run(inputs, output_file, args=None, container_settings=None):
def run(inputs, output_file, args=None, container_settings=None, timeout=None):
if not container_settings: container_settings = ProcessedContainerSettings()
BowTieBuilder.validate_required_run_args(inputs)

Expand Down Expand Up @@ -119,7 +119,8 @@ def run(inputs, output_file, args=None, container_settings=None):
volumes,
work_dir,
out_dir,
container_settings)
container_settings,
timeout)
# Output is already written to raw-pathway.txt file


Expand Down
3 changes: 2 additions & 1 deletion spras/config/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""
import ast
import copy
from typing import Annotated, Any, Callable, Literal, Union, cast, get_args
from typing import Annotated, Any, Callable, Literal, Optional, Union, cast, get_args

import numpy as np
from pydantic import (
Expand Down Expand Up @@ -167,6 +167,7 @@ def construct_algorithm_model(name: str, model: type[BaseModel]) -> type[BaseMod
return create_model(
f'{name}Model',
name=Literal[name],
timeout=(Optional[str], None),
include=bool,
# For algorithms that have a default parameter config, we allow arbitrarily running an algorithm
# if no runs are specified. For example, the following config
Expand Down
15 changes: 14 additions & 1 deletion spras/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
import itertools as it
import warnings
from pathlib import Path
from typing import Any
from typing import Any, Optional

import numpy as np
import yaml
from pytimeparse import parse

from spras.config.container_schema import ProcessedContainerSettings
from spras.config.revision import attach_spras_revision, spras_revision
Expand Down Expand Up @@ -61,6 +62,8 @@ def __init__(self, raw_config: dict[str, Any]):
self.hash_length = parsed_raw_config.hash_length
# Container settings used by PRMs.
self.container_settings = ProcessedContainerSettings.from_container_settings(parsed_raw_config.containers, self.hash_length)
# Dictionary of algorithms to their respective timeout in seconds
self.algorithm_timeouts: dict[str, Optional[int]] = dict()
# A nested dict mapping algorithm names to dicts that map parameter hashes to parameter combinations.
# Only includes algorithms that are set to be run with 'include: true'.
self.algorithm_params: dict[str, dict[str, Any]] = dict()
Expand Down Expand Up @@ -173,6 +176,16 @@ def process_algorithms(self, raw_config: RawConfig):
# Do not parse the rest of the parameters for this algorithm if it is not included
continue

if alg.timeout:
# Coerce to an `int` if an int isn't possible.
timeout = parse(alg.timeout, granularity='seconds')
Comment thread
tristan-f-r marked this conversation as resolved.
if not timeout: raise RuntimeError(f"Algorithm {alg} has unparsable timeout string {alg.timeout}.")
self.algorithm_timeouts[alg.name] = int(timeout)
else:
# As per the type signature, we still want to say explicitly that this algorithm's timeout
# is uninhabited.
self.algorithm_timeouts[alg.name] = None

runs: dict[str, Any] = alg.runs

# Each set of runs should be 1 level down in the config file
Expand Down
Loading
Loading