Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Include/fuse_common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ cdef extern from * nogil: # fuse_common.h should not be included
struct fuse_chan:
pass

struct fuse_pollhandle:
pass

void fuse_pollhandle_destroy(fuse_pollhandle *ph)

struct fuse_loop_config:
int clone_fd
unsigned max_idle_threads
Expand Down
4 changes: 4 additions & 0 deletions Include/fuse_lowlevel.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ cdef extern from "<fuse_lowlevel.h>" nogil:
off_t offset, off_t length, fuse_file_info *fi) except *
void (*readdirplus) (fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
fuse_file_info *fi) except *
void (*poll) (fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi,
fuse_pollhandle *ph) except *


# Reply functions
Expand All @@ -137,6 +139,7 @@ cdef extern from "<fuse_lowlevel.h>" nogil:
fuse_buf_copy_flags flags)
int fuse_reply_statfs(fuse_req_t req, statvfs *stbuf)
int fuse_reply_xattr(fuse_req_t req, size_t count)
int fuse_reply_poll(fuse_req_t req, unsigned revents)

size_t fuse_add_direntry(fuse_req_t req, const_char *buf, size_t bufsize,
const_char *name, struct_stat *stbuf,
Expand All @@ -157,6 +160,7 @@ cdef extern from "<fuse_lowlevel.h>" nogil:
fuse_buf_copy_flags flags)
int fuse_lowlevel_notify_retrieve(fuse_session *se, fuse_ino_t ino,
size_t size, off_t offset, void *cookie)
int fuse_lowlevel_notify_poll(fuse_pollhandle *ph)

# Utility functions
void *fuse_req_userdata(fuse_req_t req)
Expand Down
4 changes: 4 additions & 0 deletions src/pyfuse3/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ class FUSEError(Exception):
def __init__(self, errno: int) -> None: ...
def __str__(self) -> str: ...

class PollHandle:
def __getstate__(self) -> None: ...
def notify(self) -> None: ...

def listdir(path: str) -> List[str]: ...
def syncfs(path: str) -> str: ...
def setxattr(path: str, name: str, value: bytes, namespace: NamespaceT = ...) -> None: ...
Expand Down
72 changes: 72 additions & 0 deletions src/pyfuse3/__init__.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,78 @@ cdef class FUSEError(Exception):
return strerror(self.errno_)


@cython.freelist(10)
cdef class PollHandle:
'''
Opaque handle for delivering poll(2) readiness notifications.

Instances of this class are created by pyfuse3 and passed to
`Operations.poll`. The filesystem may keep a reference and later
call `PollHandle.notify` on the handle to wake up any process currently
blocked in :manpage:`poll(2)`, :manpage:`select(2)` or
:manpage:`epoll_wait(2)` for the corresponding file descriptor.

A single notification is sufficient to clear all pending waiters;
filesystems should normally discard the handle after notifying.

The underlying ``fuse_pollhandle`` is automatically destroyed when
the Python object is garbage collected, so filesystems should simply
drop the reference when the notification is no longer needed.
'''

cdef fuse_pollhandle *_ph

def __cinit__(self):
self._ph = NULL

def __init__(self):
raise TypeError('PollHandle cannot be instantiated directly')

@staticmethod
cdef PollHandle from_ptr(fuse_pollhandle *ph):
cdef PollHandle self

if ph == NULL:
raise ValueError('NULL fuse_pollhandle')

self = PollHandle.__new__(PollHandle)
self._ph = ph
return self

def __dealloc__(self):
if self._ph is not NULL:
fuse_pollhandle_destroy(self._ph)
self._ph = NULL

def __getstate__(self):
raise PicklingError("PollHandle instances can't be pickled")

def notify(self):
'''
Notify IO readiness for this poll handle.

After this returns, any process waiting in :manpage:`poll(2)`,
:manpage:`select(2)` or :manpage:`epoll_wait(2)` on the
corresponding file descriptor will be woken so it can re-poll
the filesystem for the current readiness mask.

Each `PollHandle` is intended for a single notification. After a
successful call, the filesystem should not call `notify_poll` again on
the same handle and should discard it.
'''

cdef int ret

if self._ph == NULL:
raise RuntimeError('PollHandle is no longer valid')

with nogil:
ret = fuse_lowlevel_notify_poll(self._ph)

if ret != 0:
raise OSError(-ret, 'fuse_lowlevel_notify_poll returned: ' + strerror(-ret))


def listdir(path):
'''Like `os.listdir`, but releases the GIL.

Expand Down
39 changes: 39 additions & 0 deletions src/pyfuse3/_pyfuse3.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
EntryAttributes,
FileInfo,
FUSEError,
PollHandle,
ReaddirToken,
RequestContext,
SetattrFields,
Expand Down Expand Up @@ -451,6 +452,44 @@ async def fsync(self, fh: FileHandleT, datasync: bool) -> None:

raise FUSEError(errno.ENOSYS)

async def poll(
self,
inode: InodeT,
fh: FileHandleT,
poll_handle: Optional["PollHandle"],
ctx: "RequestContext",
) -> int:
'''Check IO readiness on an open file.

This method is called when a process performs :manpage:`poll(2)`,
:manpage:`select(2)` or :manpage:`epoll_wait(2)` on a file descriptor
backed by *fh* (returned by a prior `open` or `create` call). *inode*
identifies the inode that *fh* refers to.

The method will return the bitwise-or of the currently active poll
events, for example `select.POLLIN`, `select.POLLOUT` or
`select.POLLPRI`. If no events are currently ready, it will return `0`.

If *poll_handle* is `None`, the kernel has not provided a notification
handle for this request. The filesystem should only return the current
readiness mask and must not attempt to store a handle or arrange a later
`PollHandle.notify` call for this poll request.

If *poll_handle* is not `None`, the kernel has provided a notification
handle that may be used to wake waiters if readiness changes after this
method returns. The filesystem may store the handle and later call
`PollHandle.notify` when a relevant event becomes available. Each
`~Operations.poll` call produces a fresh handle; storing a new handle
should replace any previously held one, allowing the old handle to be
destroyed.

If this method raises `FUSEError(errno.ENOSYS)` (the default), the
kernel will fall back to a default poll implementation and will not call
this handler again for the lifetime of the mount.
'''

raise FUSEError(errno.ENOSYS)

async def opendir(self, inode: InodeT, ctx: "RequestContext") -> FileHandleT:
'''Open the directory with inode *inode*.

Expand Down
33 changes: 33 additions & 0 deletions src/pyfuse3/handlers.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,39 @@ async def fuse_access_async (_Container c):



cdef void fuse_poll (fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi,
fuse_pollhandle *ph):
cdef _Container c = _Container()
cdef object py_ph
c.req = req
c.ino = ino
if fi is NULL:
c.fh = 0
else:
c.fh = fi.fh
if ph == NULL:
py_ph = None
else:
py_ph = PollHandle.from_ptr(ph)
save_retval(fuse_poll_async(c, py_ph))

async def fuse_poll_async (_Container c, object py_ph):
cdef int ret
cdef unsigned revents

ctx = get_request_context(c.req)
try:
result = await operations.poll(c.ino, c.fh, py_ph, ctx)
except FUSEError as e:
ret = fuse_reply_err(c.req, e.errno)
else:
revents = <unsigned> (result if result is not None else 0)
ret = fuse_reply_poll(c.req, revents)

if ret != 0:
log.error('fuse_poll(): fuse_reply_* failed with %s', strerror(-ret))


cdef void fuse_create (fuse_req_t req, fuse_ino_t parent, const_char *name,
mode_t mode, fuse_file_info *fi):
cdef _Container c = _Container()
Expand Down
1 change: 1 addition & 0 deletions src/pyfuse3/internal.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ cdef void init_fuse_ops():
fuse_ops.create = fuse_create
fuse_ops.forget_multi = fuse_forget_multi
fuse_ops.write_buf = fuse_write_buf
fuse_ops.poll = fuse_poll

cdef make_fuse_args(args, fuse_args* f_args):
cdef char* arg
Expand Down
95 changes: 92 additions & 3 deletions test/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import multiprocessing
import os
import select
import stat
import threading
import time
Expand All @@ -34,6 +35,7 @@
FileInfo,
FUSEError,
InodeT,
PollHandle,
ReaddirToken,
RequestContext,
)
Expand All @@ -59,11 +61,20 @@ def get_mp():

@pytest.fixture()
def testfs(tmpdir):
yield from _mount_fs(tmpdir, Fs)


@pytest.fixture()
def pollfs(tmpdir):
yield from _mount_fs(tmpdir, PollTestFs)


def _mount_fs(tmpdir, fs_class):
mnt_dir = str(tmpdir)
mp = get_mp()
with mp.Manager() as mgr:
cross_process = mgr.Namespace()
mount_process = mp.Process(target=run_fs, args=(mnt_dir, cross_process))
mount_process = mp.Process(target=run_fs, args=(mnt_dir, cross_process, fs_class))

mount_process.start()
try:
Expand Down Expand Up @@ -118,6 +129,38 @@ def test_notify_store(testfs):
assert not fs_state.read_called


def test_notify_poll(pollfs):
(mnt_dir, fs_state) = pollfs
path = os.path.join(mnt_dir, 'message')

with open(path, 'rb', buffering=0) as fh:
poller = select.poll()
poller.register(fh.fileno(), select.POLLPRI)

events = []

def poll_wait():
events.extend(poller.poll(5000))

thread = threading.Thread(target=poll_wait)
thread.start()

deadline = time.monotonic() + 5
while time.monotonic() < deadline and not fs_state.poll_handle_received:
time.sleep(0.01)

assert fs_state.poll_called
assert fs_state.poll_handle_received
assert not events

pyfuse3.setxattr(path, 'command', b'poll_ready')
thread.join(5)
assert not thread.is_alive()
assert events
assert events[0][0] == fh.fileno()
assert events[0][1] & select.POLLPRI


def test_entry_timeout(testfs):
(mnt_dir, fs_state) = testfs
fs_state.entry_timeout = 1
Expand Down Expand Up @@ -267,11 +310,57 @@ async def setxattr(self, inode, name, value, ctx):

elif value == b'terminate':
pyfuse3.terminate()

else:
raise FUSEError(errno.EINVAL)


def run_fs(mountpoint, cross_process):
class PollTestFs(Fs):
def __init__(self, cross_process):
super().__init__(cross_process)
self.poll_handle: PollHandle | None = None
self.status.poll_called = False
self.status.poll_handle_received = False
self.status.poll_ready = False

async def poll(
self,
inode: InodeT,
fh: FileHandleT,
poll_handle: PollHandle | None,
ctx: RequestContext,
) -> int:
assert inode == self.hello_inode
assert fh == self.hello_inode

self.status.poll_called = True

if poll_handle is not None:
self.poll_handle = poll_handle
self.status.poll_handle_received = True

if self.status.poll_ready:
return select.POLLPRI

return 0

async def setxattr(self, inode, name, value, ctx):
if value != b"poll_ready":
return await super().setxattr(inode, name, value, ctx)

if inode != self.hello_inode or name != b"command":
raise FUSEError(errno.ENOTSUP)

self.status.poll_ready = True

if self.poll_handle is None:
raise FUSEError(errno.EINVAL)

self.poll_handle.notify()
self.poll_handle = None


def run_fs(mountpoint, cross_process, fs_class=Fs):
# Logging (note that we run in a new process, so we can't
# rely on direct log capture and instead print to stdout)
root_logger = logging.getLogger()
Expand All @@ -285,7 +374,7 @@ def run_fs(mountpoint, cross_process):
root_logger.addHandler(handler)
root_logger.setLevel(logging.DEBUG)

testfs = Fs(cross_process)
testfs = fs_class(cross_process)
fuse_options = set(pyfuse3.default_options)
fuse_options.add('fsname=pyfuse3_testfs')
pyfuse3.init(testfs, mountpoint, fuse_options)
Expand Down
6 changes: 3 additions & 3 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading