Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 57 additions & 53 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,25 @@ pub struct File(FileState);

enum FileState {
Reader(ocore::blocking::StdReader),
Writer(ocore::blocking::StdWriter),
Writer(FileWriter),
Closed,
}

struct FileWriter {
writer: ocore::blocking::StdWriter,
position: u64,
}

impl File {
pub fn new_reader(reader: ocore::blocking::StdReader) -> Self {
Self(FileState::Reader(reader))
}

pub fn new_writer(writer: ocore::blocking::Writer) -> Self {
Self(FileState::Writer(writer.into_std_write()))
Self(FileState::Writer(FileWriter {
writer: writer.into_std_write(),
position: 0,
}))
}
}

Expand Down Expand Up @@ -247,24 +255,21 @@ impl File {
&mut self,
#[gen_stub(override_type(type_repr = "builtins.bytes", imports=("builtins")))] bs: &[u8],
) -> PyResult<usize> {
let writer = match &mut self.0 {
FileState::Reader(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on read only file.",
));
}
FileState::Writer(w) => w,
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
match &mut self.0 {
FileState::Reader(_) => Err(PyIOError::new_err(
"I/O operation failed for reading on read only file.",
)),
FileState::Writer(w) => {
w.writer
.write_all(bs)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
w.position += bs.len() as u64;
Ok(bs.len())
}
};

writer
.write_all(bs)
.map(|_| bs.len())
.map_err(|err| PyIOError::new_err(err.to_string()))
FileState::Closed => Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
)),
}
}

/// Change the position of this file to the given byte offset.
Expand Down Expand Up @@ -323,11 +328,7 @@ impl File {
pub fn tell(&mut self) -> PyResult<u64> {
let reader = match &mut self.0 {
FileState::Reader(r) => r,
FileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
FileState::Writer(w) => return Ok(w.position),
FileState::Closed => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
Expand All @@ -349,7 +350,7 @@ impl File {
/// A closed file cannot be used for further I/O operations.
fn close(&mut self) -> PyResult<()> {
if let FileState::Writer(w) = &mut self.0 {
w.close().map_err(format_pyerr_from_io_error)?;
w.writer.close().map_err(format_pyerr_from_io_error)?;
};
self.0 = FileState::Closed;
Ok(())
Expand Down Expand Up @@ -382,7 +383,7 @@ impl File {
if matches!(self.0, FileState::Reader(_)) {
Ok(())
} else if let FileState::Writer(w) = &mut self.0 {
match w.flush() {
match w.writer.flush() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
Expand Down Expand Up @@ -449,17 +450,27 @@ pub struct AsyncFile(Arc<Mutex<AsyncFileState>>);

enum AsyncFileState {
Reader(ocore::FuturesAsyncReader),
Writer(ocore::FuturesAsyncWriter),
Writer(AsyncFileWriter),
Closed,
}

struct AsyncFileWriter {
writer: ocore::FuturesAsyncWriter,
position: u64,
}

impl AsyncFile {
pub fn new_reader(reader: ocore::FuturesAsyncReader) -> Self {
Self(Arc::new(Mutex::new(AsyncFileState::Reader(reader))))
}

pub fn new_writer(writer: ocore::FuturesAsyncWriter) -> Self {
Self(Arc::new(Mutex::new(AsyncFileState::Writer(writer))))
Self(Arc::new(Mutex::new(AsyncFileState::Writer(
AsyncFileWriter {
writer,
position: 0,
},
))))
}
}

Expand Down Expand Up @@ -561,26 +572,23 @@ impl AsyncFile {

future_into_py(py, async move {
let mut guard = state.lock().await;
let writer = match guard.deref_mut() {
AsyncFileState::Reader(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on read only file.",
));
}
AsyncFileState::Writer(w) => w,
_ => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
));
match guard.deref_mut() {
AsyncFileState::Reader(_) => Err(PyIOError::new_err(
"I/O operation failed for reading on read only file.",
)),
AsyncFileState::Writer(w) => {
let len = bs.len();
w.writer
.write_all(&bs)
.await
.map_err(|err| PyIOError::new_err(err.to_string()))?;
w.position += len as u64;
Ok(len)
}
};

let len = bs.len();
writer
.write_all(&bs)
.await
.map(|_| len)
.map_err(|err| PyIOError::new_err(err.to_string()))
_ => Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
)),
}
})
}

Expand Down Expand Up @@ -660,11 +668,7 @@ impl AsyncFile {
let mut guard = state.lock().await;
let reader = match guard.deref_mut() {
AsyncFileState::Reader(r) => r,
AsyncFileState::Writer(_) => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on write only file.",
));
}
AsyncFileState::Writer(w) => return Ok(w.position),
_ => {
return Err(PyIOError::new_err(
"I/O operation failed for reading on closed file.",
Expand Down Expand Up @@ -697,7 +701,7 @@ impl AsyncFile {
future_into_py(py, async move {
let mut state = state.lock().await;
if let AsyncFileState::Writer(w) = &mut *state {
w.close().await.map_err(format_pyerr_from_io_error)?;
w.writer.close().await.map_err(format_pyerr_from_io_error)?;
}
*state = AsyncFileState::Closed;
Ok(())
Expand Down
27 changes: 27 additions & 0 deletions bindings/python/tests/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ async def test_async_writer(service_name, operator, async_operator):
await async_operator.stat(filename)


@pytest.mark.asyncio
@pytest.mark.need_capability("write", "delete")
async def test_async_writer_tell_tracks_written_bytes(
service_name, operator, async_operator
):
filename = f"test_file_{str(uuid4())}.txt"
content = b"hello"
f = await async_operator.open(filename, "wb")
assert await f.tell() == 0
assert await f.write(content) == len(content)
assert await f.tell() == len(content)
await f.close()
await async_operator.delete(filename)


@pytest.mark.asyncio
@pytest.mark.need_capability("write", "delete", "write_with_if_not_exists")
async def test_async_writer_options(service_name, operator, async_operator):
Expand Down Expand Up @@ -181,6 +196,18 @@ def test_sync_writer(service_name, operator, async_operator):
operator.stat(filename)


@pytest.mark.need_capability("write", "delete")
def test_sync_writer_tell_tracks_written_bytes(service_name, operator, async_operator):
filename = f"test_file_{str(uuid4())}.txt"
content = b"hello"
f = operator.open(filename, "wb")
assert f.tell() == 0
assert f.write(content) == len(content)
assert f.tell() == len(content)
f.close()
operator.delete(filename)


@pytest.mark.need_capability("write", "delete", "write_with_if_not_exists")
def test_sync_writer_options(service_name, operator, async_operator):
size = randint(1, 1024)
Expand Down
Loading