Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,22 @@ repos:
subworkflows/nf-core/.*|
.*\.snap$
)$
- repo: local
hooks:
- id: ban-moduledir-templates-shell-call
name: Ban "python3 ${moduleDir}/templates/" shell-call invocation
description: |
The pattern `python3 ${moduleDir}/templates/<script>.py` (introduced by
PR #154 in May 2026) breaks on Seqera Platform (Tower) running AWS
Batch: ${moduleDir} interpolates to a head-node path (typically
/.nextflow/assets/<user>/<repo>/...) that does not exist on the worker
containers, so every task fails at start with
"python3: can't open file '/.nextflow/...': No such file or directory".
Use top-level bin/ scripts invoked by name via PATH (canonical
nf-core pattern), or declare the template as a `path` input so
Nextflow stages it to the worker. See PR #154 discussion for
empirical evidence from the 2026-05-28 Atera compatibility evaluation.
language: pygrep
types: [text]
files: ^modules/local/.*\.nf$
entry: '\$\{moduleDir\}/templates/'
58 changes: 35 additions & 23 deletions bin/utility_parquet_to_csv.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,52 @@
#!/usr/bin/env python3
"""
Convert a Parquet file to CSV format.
Stream-convert a Parquet file to CSV format using pyarrow batched I/O.

Reads a Parquet file and writes it as CSV, optionally gzip-compressed.
Pure format transformation: every input row maps to one output row.
Memory usage is bounded by --batch-size (default 200_000 rows) rather
than the full row count, so this works on large bundles (e.g. the 10x
Atera WTA panel: 236M rows x 13 cols) within ~200 MB of RAM rather
than the ~30 GB the previous pandas-based eager loader required.
"""

import argparse
import gzip
from pathlib import Path

import pandas as pd
import pyarrow.csv as pa_csv
import pyarrow.parquet as pq


def convert_parquet(
def stream_parquet_to_csv(
transcripts: str,
extension: str = ".csv",
prefix: str = "",
extension: str,
prefix: str,
batch_size: int,
) -> None:
"""
Convert a Parquet file to CSV or CSV.GZ format.

Args:
transcripts: Filename of the input parquet file
extension: Output extension ('.csv' or '.gz' for gzip)
prefix: Output directory prefix
"""
df = pd.read_parquet(transcripts, engine="pyarrow")

"""Stream a Parquet file to CSV (optionally gzip-compressed)."""
Path(prefix).mkdir(parents=True, exist_ok=True)
pf = pq.ParquetFile(transcripts)

if extension == ".gz":
output = transcripts.replace(".parquet", ".csv.gz")
df.to_csv(f"{prefix}/{output}", compression="gzip", index=False)
out_path = f"{prefix}/" + transcripts.replace(".parquet", ".csv.gz")
# pyarrow's CSVWriter writes bytes; wrap a gzip stream.
sink = gzip.open(out_path, "wb")
else:
output = transcripts.replace(".parquet", ".csv")
df.to_csv(f"{prefix}/{output}", index=False)
out_path = f"{prefix}/" + transcripts.replace(".parquet", ".csv")
sink = open(out_path, "wb")

return None
try:
with pa_csv.CSVWriter(sink, pf.schema_arrow) as writer:
for batch in pf.iter_batches(batch_size=batch_size):
writer.write_batch(batch)
finally:
sink.close()


def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Convert a Parquet file to CSV format."
description="Stream-convert a Parquet file to CSV format."
)
parser.add_argument(
"--transcripts",
Expand All @@ -58,13 +63,20 @@ def parse_args() -> argparse.Namespace:
required=True,
help="Output directory prefix (sample ID)",
)
parser.add_argument(
"--batch-size",
type=int,
default=200_000,
help="Rows per batch (default 200000). Memory ~= batch_size * row_size.",
)
return parser.parse_args()


if __name__ == "__main__":
args = parse_args()
convert_parquet(
stream_parquet_to_csv(
transcripts=args.transcripts,
extension=args.extension,
prefix=args.prefix,
batch_size=args.batch_size,
)
36 changes: 23 additions & 13 deletions workflows/spatialaxe.nf
Original file line number Diff line number Diff line change
Expand Up @@ -333,24 +333,34 @@ workflow SPATIALAXE {

// get gene_panel.json if provided with --gene_panel, sets relabel_genes to true
def do_relabel = gene_panel ? true : relabel_genes
if (gene_panel) {

def gene_panel_file = file(gene_panel, checkIfExists: true)
ch_gene_panel = ch_input.map { meta, _bundle, _image ->
return [meta, gene_panel_file]
// Only construct ch_gene_panel when relabel will actually run.
// The .map { file(..., checkIfExists: true) } closure is evaluated eagerly
// by Nextflow as soon as ch_input emits — even if ch_gene_panel is never
// consumed downstream — so leaving this block unconditional breaks every
// bundle that ships without gene_panel.json (e.g. 10x Atera / Xenium Gen2),
// even in modes that never invoke XENIUMRANGER_RELABEL_RESEGMENT.
if (do_relabel) {
if (gene_panel) {

def gene_panel_file = file(gene_panel, checkIfExists: true)
ch_gene_panel = ch_input.map { meta, _bundle, _image ->
return [meta, gene_panel_file]
}
}
}
else {
else {

// gene panel to use if only --relabel_genes is provided
ch_gene_panel = ch_input.map { meta, bundle, _image ->
def gene_panel_file = file(
bundle.toString().replaceFirst(/\/$/, '') + "/gene_panel.json",
checkIfExists: true
)
return [meta, gene_panel_file]
// gene panel to use if only --relabel_genes is provided
ch_gene_panel = ch_input.map { meta, bundle, _image ->
def gene_panel_file = file(
bundle.toString().replaceFirst(/\/$/, '') + "/gene_panel.json",
checkIfExists: true
)
return [meta, gene_panel_file]
}
}
}
// else: ch_gene_panel keeps its initial channel.empty() value from line 112

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
Loading