Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ press Ctrl-C to unmount.
Options:

- `--basename NAME` — basename for the plink fileset (defaults to the VCZ stem).
- `--backend-storage {fsspec,obstore,icechunk}` — backend for remote VCZ URLs.
- `--access-log PATH` — record every read as a JSONL row to PATH (useful for
characterising consumer access patterns).
- `-v` / `-vv` — increase logging verbosity.
- The bcftools-view-style filter / backend / log options
(`-r`/`-R`/`-s`/`-S`/`-t`/`-T`/`-i`/`-e`/`-v`/`-V`/`-m`/`-M`,
`--backend-storage`, `--storage-option`, `--log-level`, `--log-file`)
are inherited from `vcztools view-bed`. Run `biofuse mount-plink --help`
or see `vcztools view-bed --help` for the full reference.

Example:

Expand Down
51 changes: 20 additions & 31 deletions biofuse/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,13 @@

import click
import trio
from vcztools import cli as vcztools_cli

from biofuse import access_log, fuse_adapter, plink_client, plink_ops

logger = logging.getLogger(__name__)


def _setup_logging(verbosity: int) -> None:
levels = [logging.WARNING, logging.INFO, logging.DEBUG]
level = levels[min(verbosity, len(levels) - 1)]
logging.basicConfig(
level=level,
format="%(asctime)s %(name)s %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
)


def handle_exception(func):
@wraps(func)
def wrapper(*args, **kwargs):
Expand All @@ -46,18 +37,6 @@ def _default_basename(vcz_url: str) -> str:
name = stem


verbose = click.option(
"-v",
"--verbose",
count=True,
help="Increase logging verbosity (-v info, -vv debug).",
)
backend_storage = click.option(
"--backend-storage",
type=click.Choice(["fsspec", "obstore", "icechunk"]),
default=None,
help="Backend storage to use for remote VCZ URLs.",
)
access_log_opt = click.option(
"--access-log",
"access_log_path",
Expand Down Expand Up @@ -86,23 +65,28 @@ def biofuse_main():
type=click.Path(file_okay=False, dir_okay=True, path_type=str),
)
@basename_opt
@backend_storage
@vcztools_cli.view_bed_options
@access_log_opt
@verbose
@vcztools_cli.log_options
@handle_exception
def mount_plink(
vcz_url, mount_dir, basename, backend_storage, access_log_path, verbose
):
def mount_plink(vcz_url, mount_dir, basename, access_log_path, **kwargs):
"""Mount a PLINK 1.9 view of VCZ_URL at MOUNT_DIR.

Spawns a plink-server subprocess that owns the ``VczReader`` and
serves ``.bed`` reads over an ``AF_UNIX`` socket. ``.bim`` and
``.fam`` are precomputed once at mount time and held in the FUSE
process's memory; only ``.bed`` reads cross the wire.

The bcftools-view-style filter / backend / log options are inherited
from ``vcztools view-bed``; see ``vcztools view-bed --help`` for the
full reference.

The mount runs in the foreground until interrupted with Ctrl-C.
"""
_setup_logging(verbose)
log_config = vcztools_cli.LogConfig.pop_from_click_kwargs(kwargs)
reader_options = vcztools_cli.ViewBedOptions.pop_from_click_kwargs(kwargs)
assert kwargs == {}, kwargs
log_config.apply()

mount_dir_path = pathlib.Path(mount_dir)
if not mount_dir_path.is_dir():
Expand All @@ -118,7 +102,8 @@ def mount_plink(
vcz_url,
str(mount_dir_path),
resolved_basename,
backend_storage,
reader_options,
log_config,
log_path,
sock_path,
)
Expand All @@ -128,12 +113,16 @@ async def _amount(
vcz_url: str,
mount_dir: str,
basename: str,
backend_storage: str | None,
reader_options: vcztools_cli.ViewBedOptions,
log_config: vcztools_cli.LogConfig,
log_path: pathlib.Path | None,
sock_path: pathlib.Path,
) -> None:
async with await plink_client.PlinkClient.start(
vcz_url, sock_path, backend_storage=backend_storage
vcz_url,
sock_path,
reader_options=reader_options,
log_config=log_config,
) as client:
with access_log.AccessLogger(log_path) as access_logger:
ops = plink_ops.PlinkOps(client, basename, access_logger=access_logger)
Expand Down
22 changes: 13 additions & 9 deletions biofuse/plink_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from collections.abc import Callable

import trio
from vcztools import cli as vcztools_cli

from biofuse import plink_protocol, plink_server

Expand Down Expand Up @@ -146,8 +147,8 @@ async def start(
vcz_url: str,
socket_path: pathlib.Path,
*,
backend_storage: str | None = None,
log_level: int | None = None,
reader_options: vcztools_cli.ViewBedOptions | None = None,
log_config: vcztools_cli.LogConfig | None = None,
) -> "PlinkClient":
"""Spawn the server, run the metadata handshake, return client.

Expand All @@ -156,11 +157,16 @@ async def start(
reduction dups the fds across the spawn boundary; the parent
closes its own copies once the child has started.

``log_level`` is forwarded to the subprocess so its
``logger.debug`` / ``info`` output appears in the parent's
log sink. If ``None``, the subprocess uses its default
(WARNING).
``reader_options`` carries the bcftools-view-style filtering
options forwarded to vcztools' ``make_reader`` in the worker;
``log_config`` configures logging in the worker so its
``logger.debug`` / ``info`` output reaches the parent's sink.
Both default to the empty / WARNING configuration.
"""
if reader_options is None:
reader_options = vcztools_cli.ViewBedOptions()
if log_config is None:
log_config = vcztools_cli.LogConfig()
socket_path = pathlib.Path(socket_path)
socket_path.parent.mkdir(parents=True, exist_ok=True)
if socket_path.exists():
Expand All @@ -170,11 +176,9 @@ async def start(
listener.listen(64)
parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
ctx = mp.get_context("spawn")
if log_level is None:
log_level = logging.getLogger().getEffectiveLevel()
proc: mp.process.BaseProcess = ctx.Process(
target=plink_server._server_main,
args=(listener, child_stop, vcz_url, backend_storage, log_level),
args=(listener, child_stop, vcz_url, reader_options, log_config),
name="biofuse-plink-server",
)
try:
Expand Down
26 changes: 9 additions & 17 deletions biofuse/plink_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ def _server_main(
listener_sock: socket.socket,
stop_sock: socket.socket,
vcz_url: str,
backend_storage: str | None,
log_level: int = logging.WARNING,
reader_options: vcztools_cli.ViewBedOptions,
log_config: vcztools_cli.LogConfig,
) -> None:
"""Subprocess entry point invoked via ``multiprocessing.Process``.

Expand All @@ -224,22 +224,14 @@ def _server_main(
(one pool per reader, drawn on by every ``BedEncoder`` /
``ReadaheadPipeline``) is drained on the way out.

``log_level`` matches the parent's verbosity so the subprocess's
own ``logger.debug`` / ``logger.info`` output reaches the same
sink as the parent.
``log_config`` matches the parent's verbosity so the subprocess's
own ``logger.debug`` / ``logger.info`` output reaches the same sink
as the parent. ``reader_options`` carries the bcftools-style
filtering options (regions, samples, …) that vcztools'
``make_reader`` consumes.
"""
# ``force=True`` so the explicitly-passed ``log_level`` wins even
# if some upstream import in this subprocess (or the parent's
# logging state, replayed via ``spawn``) has already configured
# the root logger. Without it, ``basicConfig`` is a no-op and the
# subprocess silently keeps the wrong level.
logging.basicConfig(
level=log_level,
format="%(asctime)s %(name)s %(levelname)s: %(message)s",
datefmt="%H:%M:%S",
force=True,
)
with vcztools_cli.make_reader(vcz_url, backend_storage=backend_storage) as reader:
log_config.apply()
with vcztools_cli.make_reader_from_options(vcz_url, reader_options) as reader:
session = _ServerSession(reader)
try:
serve_forever(listener_sock, stop_sock, session)
Expand Down
64 changes: 51 additions & 13 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,6 @@ def test_nonexistent_mount_dir_fails(self, tmp_path):
assert result.exit_code != 0
assert "mount directory does not exist" in result.output

def test_invalid_backend_storage_fails(self, tmp_path):
result = CliRunner().invoke(
cli.biofuse_main,
[
"mount-plink",
"x.vcz",
str(tmp_path),
"--backend-storage",
"bogus",
],
)
assert result.exit_code != 0


class TestEndToEndMount:
"""Spawn the CLI as a subprocess, wait for mount, read files, terminate."""
Expand Down Expand Up @@ -119,6 +106,57 @@ def test_access_log_written(self, tmp_path, fx_small_vcz):
paths = {line.split('"path":')[1].split('"')[1] for line in lines}
assert "small.bed" in paths

def test_log_level_accepted(self, tmp_path, fx_small_vcz):
mnt = tmp_path / "mnt"
mnt.mkdir()
proc = subprocess.Popen(
[
sys.executable,
"-m",
"biofuse.cli",
"mount-plink",
str(fx_small_vcz.path),
str(mnt),
"--log-level",
"info",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
self._wait_for_mount(mnt, proc, timeout=15)
assert (mnt / "small.bed").stat().st_size > 0
finally:
self._terminate(proc, mnt)

def test_samples_filter(self, tmp_path, fx_small_vcz):
"""--samples reaches the worker subprocess and shrinks .fam."""
mnt = tmp_path / "mnt"
mnt.mkdir()
proc = subprocess.Popen(
[
sys.executable,
"-m",
"biofuse.cli",
"mount-plink",
str(fx_small_vcz.path),
str(mnt),
"--samples",
"tsk_0,tsk_1",
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
self._wait_for_mount(mnt, proc, timeout=15)
fam_text = (mnt / "small.fam").read_text()
assert fam_text.count("\n") == 2
assert "tsk_0" in fam_text
assert "tsk_1" in fam_text
assert "tsk_2" not in fam_text
finally:
self._terminate(proc, mnt)

def test_basename_override(self, tmp_path, fx_small_vcz):
mnt = tmp_path / "mnt"
mnt.mkdir()
Expand Down
9 changes: 8 additions & 1 deletion tests/test_plink_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import time

import pytest
from vcztools import cli as vcztools_cli
from vcztools.cli import make_reader
from vcztools.plink import write_plink

Expand Down Expand Up @@ -316,7 +317,13 @@ def test_spawn_metadata_handshake(self, fx_small_vcz, tmp_path):
ctx = mp.get_context("spawn")
proc = ctx.Process(
target=plink_server._server_main,
args=(listener, child_stop, str(fx_small_vcz.path), None),
args=(
listener,
child_stop,
str(fx_small_vcz.path),
vcztools_cli.ViewBedOptions(),
vcztools_cli.LogConfig(),
),
)
proc.start()
listener.close()
Expand Down
10 changes: 5 additions & 5 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading