Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"]

[dependencies]
pyo3 = { version = "0.27.1", features = ["abi3-py311"] }
zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] }
zarrs = { version = "0.23.7", features = ["async", "zlib", "pcodec", "bz2"] }
rayon_iter_concurrent_limit = "0.2.0"
rayon = "1.10.0"
# fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ The `ZarrsCodecPipeline` specific options are:
- Defaults to `False`.
- `codec_pipeline.strict`: raise exceptions for unsupported operations instead of falling back to the default codec pipeline of `zarr-python`.
- Defaults to `False`.
- `codec_pipeline.subchunk_write_order`: Tells `zarrs` in what order to write subchunks within a shard. One of "C" or "random." "C" ordering is `numpy`-speak for [row-major](https://en.wikipedia.org/wiki/Row-_and_column-major_order). "Random" is a bit of a misnomer and implies no ordering in reality, instead being unordered as a result of `rayon`'s lack of ordering guarantee.
- Defaults to `random`.

For example:
```python
Expand All @@ -63,8 +65,9 @@ zarr.config.set({
"chunk_concurrent_maximum": None,
"chunk_concurrent_minimum": 4,
"direct_io": False,
"strict": False
}
"strict": False,
"subchunk_write_order": "C",
},
})
```

Expand Down
1 change: 1 addition & 0 deletions python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CodecPipelineImpl:
chunk_concurrent_maximum: builtins.int | None = None,
num_threads: builtins.int | None = None,
direct_io: builtins.bool = False,
subchunk_write_order: typing.Literal["C", "random"] = "random",
) -> CodecPipelineImpl: ...
def retrieve_chunks_and_apply_index(
self,
Expand Down
3 changes: 3 additions & 0 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ def get_codec_pipeline_impl(
),
num_threads=config.get("threading.max_workers", None),
direct_io=config.get("codec_pipeline.direct_io", False),
subchunk_write_order=config.get(
"codec_pipeline.subchunk_write_order", "random"
),
)
except TypeError as e:
if strict:
Expand Down
43 changes: 27 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon_iter_concurrent_limit::iter_concurrent_limit;
use unsafe_cell_slice::UnsafeCellSlice;
use utils::is_whole_chunk;
use zarrs::array::codec::{ShardingCodecOptions, SubchunkWriteOrder};
use zarrs::array::{
ArrayBytes, ArrayBytesDecodeIntoTarget, ArrayBytesFixedDisjointView, ArrayMetadata,
ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions, DataType,
FillValue, StoragePartialDecoder, copy_fill_value_into, update_array_bytes,
ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions,
CodecSpecificOptions, DataType, FillValue, StoragePartialDecoder, copy_fill_value_into,
update_array_bytes,
};
use zarrs::config::global_config;
use zarrs::convert::array_metadata_v2_to_v3;
Expand All @@ -38,7 +40,7 @@ mod utils;

use crate::concurrency::ChunkConcurrentLimitAndCodecOptions;
use crate::store::StoreConfig;
use crate::utils::{PyCodecErrExt, PyErrExt as _};
use crate::utils::{PyCodecErrExt, PyErrExt as _, SubchunkWriteOrderWrapper};

// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
#[gen_stub_pyclass]
Expand Down Expand Up @@ -209,6 +211,7 @@ impl CodecPipelineImpl {
#[gen_stub_pymethods]
#[pymethods]
impl CodecPipelineImpl {
#[allow(clippy::too_many_arguments)] // python functions can have defaults
#[pyo3(signature = (
array_metadata,
store_config,
Expand All @@ -218,6 +221,7 @@ impl CodecPipelineImpl {
chunk_concurrent_maximum=None,
num_threads=None,
direct_io=false,
subchunk_write_order=SubchunkWriteOrderWrapper(SubchunkWriteOrder::Random),
))]
#[new]
fn new(
Expand All @@ -228,6 +232,7 @@ impl CodecPipelineImpl {
chunk_concurrent_maximum: Option<usize>,
num_threads: Option<usize>,
direct_io: bool,
subchunk_write_order: SubchunkWriteOrderWrapper,
) -> PyResult<Self> {
store_config.direct_io(direct_io);
let metadata = serde_json::from_str(array_metadata).map_py_err::<PyTypeError>()?;
Expand All @@ -237,8 +242,16 @@ impl CodecPipelineImpl {
}
ArrayMetadata::V3(v3) => Cow::Borrowed(v3),
};
let codec_chain =
Arc::new(CodecChain::from_metadata(&metadata_v3.codecs).map_py_err::<PyTypeError>()?);
let codec_chain = Arc::new(
CodecChain::from_metadata(&metadata_v3.codecs)
.map_py_err::<PyTypeError>()?
.with_codec_specific_options(
&CodecSpecificOptions::default().with_option(
ShardingCodecOptions::default()
.with_subchunk_write_order(subchunk_write_order.0),
),
),
);
let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums);

let chunk_concurrent_minimum =
Expand All @@ -254,17 +267,15 @@ impl CodecPipelineImpl {
DataType::from_metadata(&metadata_v3.data_type).map_py_err::<PyTypeError>()?;
let fill_value = data_type
.fill_value(&metadata_v3.fill_value, ZarrVersion::V3)
.or_else(|_| {
Err(match &metadata {
ArrayMetadata::V2(metadata) => format!(
"incompatible fill value metadata: dtype={}, fill_value={}",
metadata.dtype, metadata.fill_value
),
ArrayMetadata::V3(metadata) => format!(
"incompatible fill value metadata: data_type={}, fill_value={}",
metadata.data_type, metadata.fill_value
),
})
.map_err(|_| match &metadata {
ArrayMetadata::V2(metadata) => format!(
"incompatible fill value metadata: dtype={}, fill_value={}",
metadata.dtype, metadata.fill_value
),
ArrayMetadata::V3(metadata) => format!(
"incompatible fill value metadata: data_type={}, fill_value={}",
metadata.data_type, metadata.fill_value
),
})
.map_py_err::<PyTypeError>()?;

Expand Down
2 changes: 1 addition & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<'py> FromPyObject<'_, 'py> for StoreConfig {
}

impl StoreConfig {
pub fn direct_io(&mut self, flag: bool) -> () {
pub fn direct_io(&mut self, flag: bool) {
match self {
StoreConfig::Filesystem(config) => config.direct_io(flag),
StoreConfig::Http(_config) => (),
Expand Down
2 changes: 1 addition & 1 deletion src/store/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl FilesystemStoreConfig {
}
}

pub fn direct_io(&mut self, flag: bool) -> () {
pub fn direct_io(&mut self, flag: bool) {
self.opts.direct_io(flag);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::CodecPipelineImpl;

#[test]
fn test_nparray_to_unsafe_cell_slice_empty() -> PyResult<()> {
pyo3::prepare_freethreaded_python();
Python::with_gil(|py| {
Python::initialize();
Python::attach(|py| {
let arr: Bound<'_, PyUntypedArray> = PyModule::from_code(
py,
c_str!(
Expand Down
46 changes: 44 additions & 2 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::fmt::Display;

use pyo3::{PyErr, PyResult, PyTypeInfo};
use zarrs::array::CodecError;
use pyo3::{
Borrowed, Bound, FromPyObject, IntoPyObject, PyAny, PyErr, PyResult, PyTypeInfo, Python,
exceptions::PyValueError, types::PyString,
};
use zarrs::array::{CodecError, codec::SubchunkWriteOrder};

use crate::ChunkItem;

Expand Down Expand Up @@ -41,3 +44,42 @@ pub fn is_whole_chunk(item: &ChunkItem) -> bool {
item.chunk_subset.start().iter().all(|&o| o == 0)
&& item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(&item.shape)
}

#[derive(Debug, Clone, Copy)]
pub struct SubchunkWriteOrderWrapper(pub SubchunkWriteOrder);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is requeired because of the orphan rule right? SubchunkWriteOrder is not our type and IntoPyObject isn’t either. Did you try implementing SubchunkWriteOrderExt instead for which you then implement IntoPyObject? The orphan rule probably prevents that too, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is requeired because of the orphan rule right? SubchunkWriteOrder is not our type and IntoPyObject isn’t either.

So went my thinking, although I think IntoPyObject not being our trait has nothing to do with this - it's just that we don't create SubchunkWriteOrderhere

Did you try implementing SubchunkWriteOrderExt instead for which you then implement IntoPyObject?

Is that different than what we have here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it’d be a trait instead of a type, which would let us use the unwrapped type. But as said, probably doesn’t work.


impl<'py> IntoPyObject<'py> for SubchunkWriteOrderWrapper {
type Target = PyString;
type Output = Bound<'py, PyString>;
type Error = PyErr;

fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
match self.0 {
SubchunkWriteOrder::C => Ok("C".into_pyobject(py)?),
SubchunkWriteOrder::Random => Ok("random".into_pyobject(py)?),
_ => Err(PyValueError::new_err(
"Unrecognized subchunk write order for converting to python object, only `C` and `random` allowed.",
)),
}
}
}

impl<'py> FromPyObject<'_, 'py> for SubchunkWriteOrderWrapper {
type Error = PyErr;

fn extract(option: Borrowed<'_, 'py, PyAny>) -> PyResult<SubchunkWriteOrderWrapper> {
match option.extract::<&str>()? {
"C" => Ok(SubchunkWriteOrderWrapper(SubchunkWriteOrder::C)),
"random" => Ok(SubchunkWriteOrderWrapper(SubchunkWriteOrder::Random)),
_ => Err(PyValueError::new_err(
"Unrecognized subchunk write order while extracting to rust, only `C` and `random` allowed.",
)),
}
}
}

impl pyo3_stub_gen::PyStubType for SubchunkWriteOrderWrapper {
fn type_output() -> pyo3_stub_gen::TypeInfo {
pyo3_stub_gen::TypeInfo::with_module("typing.Literal['C', 'random']", "typing".into())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah duh!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameter is called name, so it’s not clear that it can accept any code.

}
}
36 changes: 35 additions & 1 deletion tests/test_sharding.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from typing import Any
from typing import Any, Literal

import numpy as np
import numpy.typing as npt
import pytest
import zarr
from zarr import Array, AsyncArray
from zarr.abc.store import Store
from zarr.codecs import (
Expand Down Expand Up @@ -367,3 +368,36 @@ async def test_sharding_with_empty_inner_chunk(
print("read data")
data_read = await a.getitem(...)
assert np.array_equal(data_read, data)


@pytest.mark.parametrize(
"index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end]
)
@pytest.mark.parametrize("subchunk_write_order", ["C", "random"])
async def test_sharding_subchunk_write_order(
store: Store,
index_location: ShardingCodecIndexLocation,
subchunk_write_order: Literal["C", "random"],
) -> None:
with zarr.config.set({"codec_pipeline.subchunk_write_order": subchunk_write_order}):
path = f"sharding_with_empty_inner_chunk_{index_location}"
spath = StorePath(store, path)
codec = ShardingCodec(chunk_shape=(2, 2), index_location=index_location)
a = await AsyncArray.create(
spath,
shape=(16, 16),
chunk_shape=(16, 16),
dtype="uint32",
fill_value=0,
codecs=[codec],
)
await a.setitem(..., np.arange(16 * 16).reshape((16, 16)))
index = await codec._load_shard_index(a.store_path / "/c/0/0", (8, 8))
index_offsets = index.offsets_and_lengths[index.get_full_chunk_map()].ravel()[
::2
]
assert len(index_offsets) == 64 # 8 * 8
if subchunk_write_order == "C":
np.testing.assert_equal(np.sort(index_offsets), index_offsets)
else:
assert not np.array_equal(np.sort(index_offsets), index_offsets)
Loading