Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions biofuse/access_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class AccessRecord:
size: int
t_start: float
t_end: float
kind: str = "read"


class AccessLogger:
Expand Down Expand Up @@ -73,14 +74,43 @@ def record(
difference is the read's wall-clock duration; overlap between
records with distinct ``fh`` indicates concurrent execution.
"""
rec = AccessRecord(
path=path,
fh=fh,
offset=offset,
size=size,
t_start=t_start,
t_end=time.monotonic(),
self._write(
AccessRecord(
path=path,
fh=fh,
offset=offset,
size=size,
t_start=t_start,
t_end=time.monotonic(),
)
)

def record_event(
self,
kind: str,
path: str,
fh: int,
t_start: float,
t_end: float | None = None,
) -> None:
"""Record a non-read lifecycle event (open / release / limiter_wait /
aclose). ``offset`` and ``size`` are zero for these events; the
``[t_start, t_end]`` interval is what matters."""
if t_end is None:
t_end = time.monotonic()
self._write(
AccessRecord(
path=path,
fh=fh,
offset=0,
size=0,
t_start=t_start,
t_end=t_end,
kind=kind,
)
)

def _write(self, rec: AccessRecord) -> None:
with self._lock:
if self._fh is not None:
self._fh.write(json.dumps(asdict(rec)) + "\n")
Expand Down
113 changes: 92 additions & 21 deletions biofuse/plink_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import multiprocessing as mp
import pathlib
import socket
import time
from collections.abc import Callable

import trio

Expand All @@ -25,6 +27,15 @@
_CONNECT_RETRY_SLEEP_S = 0.05
_CONNECT_DEADLINE_S = 10.0

# Per-operation deadlines for the parent → server protocol. The FUSE
# handler must never await indefinitely on the worker; on expiry we
# surface an ``OSError`` to the FUSE layer so the kernel sees a real
# I/O error and unblocks the consumer's syscall instead of pinning it
# in uninterruptible sleep.
_REQUEST_TIMEOUT_S = 30.0
_OPEN_TIMEOUT_S = 5.0
_ACLOSE_TIMEOUT_S = 2.0


class BedConnection:
"""One ``.bed`` reader: a dedicated socket to the plink-server.
Expand All @@ -36,36 +47,77 @@ class BedConnection:
threads and do not contend with each other.
"""

def __init__(self, stream: trio.SocketStream) -> None:
def __init__(
self,
stream: trio.SocketStream,
*,
on_aclose: Callable[[float, float], None] | None = None,
) -> None:
self._stream = stream
self._lock = trio.Lock()
self._closed = False
self._on_aclose = on_aclose

async def read(self, off: int, size: int) -> bytes:
if self._closed:
raise OSError(errno.EIO, "bed connection is closed")
request = plink_protocol.pack_read_request(off, size)
async with self._lock:
await self._stream.send_all(request)
status_buf = await _recv_exact(
self._stream, plink_protocol.REPLY_STATUS_SIZE
)
status = plink_protocol.parse_status(status_buf)
if status < 0:
raise plink_protocol.status_to_error(status)
if status == 0:
return b""
return await _recv_exact(self._stream, status)
with trio.move_on_after(_REQUEST_TIMEOUT_S) as cs:
async with self._lock:
if self._closed:
raise OSError(errno.EIO, "bed connection is closed")
await self._stream.send_all(request)
status_buf = await _recv_exact(
self._stream, plink_protocol.REPLY_STATUS_SIZE
)
status = plink_protocol.parse_status(status_buf)
if status < 0:
raise plink_protocol.status_to_error(status)
if status == 0:
return b""
return await _recv_exact(self._stream, status)
# Reached only if ``move_on_after`` caught a Cancelled — the
# inner block always returns or raises through. Mark the
# connection dead so other tasks queued on ``self._lock`` wake
# to an immediate EIO instead of repeating the wait against a
# known-broken socket.
if not cs.cancelled_caught: # pragma: no cover - defensive
raise RuntimeError("plink-server read fall-through")
self._closed = True
with trio.CancelScope(shield=True):
with trio.move_on_after(_ACLOSE_TIMEOUT_S):
try:
await self._stream.aclose()
except (trio.BrokenResourceError, OSError) as exc:
logger.debug("aclose after timeout raised: %s", exc)
raise OSError(errno.EIO, "plink-server request timed out")

async def aclose(self) -> None:
if self._closed:
return
self._closed = True
try:
await self._stream.send_eof()
except (trio.ClosedResourceError, trio.BrokenResourceError, OSError) as exc:
logger.debug("send_eof on bed connection raised: %s", exc)
await self._stream.aclose()
t_start = time.monotonic()
with trio.CancelScope(shield=True):
with trio.move_on_after(_ACLOSE_TIMEOUT_S) as cs:
try:
await self._stream.send_eof()
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
OSError,
) as exc:
logger.debug("send_eof on bed connection raised: %s", exc)
await self._stream.aclose()
if cs.cancelled_caught:
logger.debug(
"bed connection aclose timed out after %.1fs",
_ACLOSE_TIMEOUT_S,
)
if self._on_aclose is not None:
try:
self._on_aclose(t_start, time.monotonic())
except Exception as exc: # noqa: BLE001 - never let logging blow up cleanup
logger.debug("on_aclose hook raised: %s", exc)


class PlinkClient:
Expand Down Expand Up @@ -95,13 +147,19 @@ async def start(
socket_path: pathlib.Path,
*,
backend_storage: str | None = None,
log_level: int | None = None,
) -> "PlinkClient":
"""Spawn the server, run the metadata handshake, return client.

The parent creates the listener and the stop-signal socketpair
itself, then hands both to the child. Multiprocessing's socket
reduction dups the fds across the spawn boundary; the parent
closes its own copies once the child has started.

``log_level`` is forwarded to the subprocess so its
``logger.debug`` / ``info`` output appears in the parent's
log sink. If ``None``, the subprocess uses its default
(WARNING).
"""
socket_path = pathlib.Path(socket_path)
socket_path.parent.mkdir(parents=True, exist_ok=True)
Expand All @@ -112,9 +170,11 @@ async def start(
listener.listen(64)
parent_stop, child_stop = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
ctx = mp.get_context("spawn")
if log_level is None:
log_level = logging.getLogger().getEffectiveLevel()
proc: mp.process.BaseProcess = ctx.Process(
target=plink_server._server_main,
args=(listener, child_stop, vcz_url, backend_storage),
args=(listener, child_stop, vcz_url, backend_storage, log_level),
name="biofuse-plink-server",
)
try:
Expand All @@ -140,10 +200,14 @@ async def __aenter__(self) -> "PlinkClient":
async def __aexit__(self, exc_type, exc, tb) -> None:
await self.aclose()

async def open_bed(self) -> BedConnection:
async def open_bed(
self,
*,
on_aclose: Callable[[float, float], None] | None = None,
) -> BedConnection:
"""Open a new dedicated socket for one ``.bed`` reader."""
stream = await self._connect_stream()
return BedConnection(stream)
return BedConnection(stream, on_aclose=on_aclose)

async def aclose(self) -> None:
"""Tear down the server. Idempotent.
Expand Down Expand Up @@ -214,8 +278,15 @@ async def _connect_stream(
sock.setblocking(False)
trio_sock = trio.socket.from_stdlib_socket(sock)
try:
await trio_sock.connect(path)
with trio.fail_after(_OPEN_TIMEOUT_S):
await trio_sock.connect(path)
return trio.SocketStream(trio_sock)
except trio.TooSlowError as exc:
trio_sock.close()
raise OSError(
errno.EIO,
f"plink-server connect timed out after {_OPEN_TIMEOUT_S:.1f}s",
) from exc
except (FileNotFoundError, ConnectionRefusedError, OSError) as exc:
trio_sock.close()
last_exc = exc
Expand Down
39 changes: 36 additions & 3 deletions biofuse/plink_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
_FILE_MODE = stat.S_IFREG | 0o444
_DEFAULT_MAX_OPEN_BED = 16

# Maximum time a FUSE_OPEN may wait at the per-mount ``.bed`` capacity
# limiter. On expiry the open returns ``EAGAIN`` to the kernel rather
# than blocking forever — this guards against a leaked limiter slot
# permanently wedging open().
_LIMITER_TIMEOUT_S = 30.0


class _BedConnectionProto(Protocol):
async def read(self, off: int, size: int) -> bytes: ...
Expand Down Expand Up @@ -180,14 +186,22 @@ async def open(self, inode, flags, ctx=None):
kind = self._name_to_kind[name]
fh = self._next_fh
self._next_fh += 1
t_open_start = time.monotonic()
if kind == "bed":
# Use the fh itself as the limiter borrower: each open is a
# distinct logical owner, even when several share the same
# trio task (true under direct PlinkOps tests, and cheap
# under pyfuse3 where each request is its own task).
await self._bed_limiter.acquire_on_behalf_of(fh)
t_limiter_start = time.monotonic()
with trio.move_on_after(_LIMITER_TIMEOUT_S) as cs:
await self._bed_limiter.acquire_on_behalf_of(fh)
t_limiter_end = time.monotonic()
if cs.cancelled_caught:
raise pyfuse3.FUSEError(errno.EAGAIN)
self._record_event("limiter_wait", name, fh, t_limiter_start, t_limiter_end)
try:
conn = await self._client.open_bed()
on_aclose = self._make_aclose_recorder(name, fh)
conn = await self._client.open_bed(on_aclose=on_aclose)
except OSError as exc:
self._bed_limiter.release_on_behalf_of(fh)
raise pyfuse3.FUSEError(exc.errno or errno.EIO) from exc
Expand All @@ -197,8 +211,25 @@ async def open(self, inode, flags, ctx=None):
self._fh_to_conn[fh] = conn
self._fh_to_kind[fh] = kind
self._fh_to_name[fh] = name
self._record_event("open", name, fh, t_open_start)
return pyfuse3.FileInfo(fh=fh)

def _record_event(
self, kind: str, name: str, fh: int, t_start: float, t_end=None
) -> None:
if self._access_logger is not None:
self._access_logger.record_event(kind, name, fh, t_start, t_end)

def _make_aclose_recorder(self, name, fh):
if self._access_logger is None:
return None
access_logger = self._access_logger

def hook(t_start: float, t_end: float) -> None:
access_logger.record_event("aclose", name, fh, t_start, t_end)

return hook

async def read(self, fh, off, size):
kind = self._fh_to_kind.get(fh)
name = self._fh_to_name.get(fh)
Expand Down Expand Up @@ -231,10 +262,11 @@ def _read_static(data: bytes, off: int, size: int) -> bytes:

async def release(self, fh):
kind = self._fh_to_kind.pop(fh, None)
self._fh_to_name.pop(fh, None)
name = self._fh_to_name.pop(fh, None)
conn = self._fh_to_conn.pop(fh, None)
if kind is None:
return
t_release_start = time.monotonic()
try:
if conn is not None:
try:
Expand All @@ -244,6 +276,7 @@ async def release(self, fh):
finally:
if kind == "bed":
self._bed_limiter.release_on_behalf_of(fh)
self._record_event("release", name, fh, t_release_start)

async def forget(self, inode_list):
return
Expand Down
Loading