Irregular chunked Zarr Arrays for High-Energy Physics Analysis #3948
Replies: 3 comments 2 replies
-
|
hi @pfackeldey, I confess I don't see the connection between the awkward / jagged array data structure and the newly released rectilinear chunks feature. Data types (what is the array made of) and chunking (how big, in array elements, is each chunk) are two completely separate layers of the zarr stack. Starting with the data type for awkward / jagged arrays, if I understand this correctly, we would model this in Zarr as a 1-dimensional array of variable-length scalar elements. Although we do have variable-length string and variable-length raw bytes data types, Zarr V3 doesn't have a variable-length array data type. This has been requested before, but the idea probably requires a specification document before we add support for it here. See the tracking issue in As for the general work load you describe (a lot of small chunks aggregated into larger, variable-length chunks). The variable-length chunks metadata can definitely express the outcome of this process, but how you reach that outcome still requires solving some coordination issues for your dask tasks. Zarr Python does not provide any locking mechanism to guard against multiple workers writing to the same chunk. So your N input files -> M output files step (where N >> M) has to guarantee that two workers never attempt to write to the same output file at the same time. And even if you can guarantee that workers write in order, each addition to an existing chunk requires a full read -> write, which is less efficient that writing the entire output chunk at once. When possible, the most efficient thing is to iterate over the output chunks. Each worker is tasked with gathering the input files required to completely fill that chunk, and you have no race conditions. But this assumes you know the output chunking structure. |
Beta Was this translation helpful? Give feedback.
-
|
The use case you describe would probably be handled better by a collection of 1D Zarr arrays laid out as part of a group rather than irregular chunking. Each 1D Zarr array could be sized or resized independently of each other. Irregular chunking defined by a single grid does not seem appropriate here because this would require coordination when revising a single component of the grid. Another approach would be to store the chunk shape as part of the chunk itself rather than being described by the chunk grid. The recently added n5_default codec does store the chunk shape in the header of the chunk. |
Beta Was this translation helpful? Give feedback.
-
|
Using a zarr group was also my idea to store these 1D arrays, i.e. (below is a runnable example): import awkward as ak
import zarr
import collections
# Let's take the jagged array from Martin's example above:
# Awkward Array: `jagged_array = ak.Array([[1, 2, 3], [], [4, 5]])`
# -> offsets: [0, 3, 3, 5]
# -> values: [1, 2, 3, 4, 5]
#
# Now let's store this in a Zarr group...
# First the full awkward array
jagged_array = ak.Array([[1, 2, 3], [], [4, 5]])
# decompose the ak.Array into metadata and 1D buffers (offsets and values)
form, length, buffers = ak.to_buffers(jagged_array)
# just for understanding purposes
# print(buffers)
# -> {'node0-offsets': array([0, 3, 3, 5]), 'node1-data': array([1, 2, 3, 4, 5])}
root = zarr.group("data.zarr")
# first the offsets
offsets_1darray = buffers['node0-offsets']
offsets = root.create_array(
name='node0-offsets', shape=offsets_1darray.shape, chunks=offsets_1darray.shape, dtype=offsets_1darray.dtype
)
offsets[:] = offsets_1darray
# then the values
values_1darray = buffers['node1-data']
values = root.create_array(
name='node1-data', shape=values_1darray.shape, chunks=values_1darray.shape, dtype=values_1darray.dtype
)
values[:] = values_1darray
# reconstruct the original jagged array using the zarr group now,
# first define a little helper that loads and decompresses upon getitem access:
class ZarrGroupWrapper(collections.abc.Mapping):
def __init__(self, group):
self.group = group
def __getitem__(self, key):
return self.group[key][...] # load
def __iter__(self):
return iter(self.group)
def __len__(self):
return len(self.group)
reconstructed = ak.from_buffers(form, length, ZarrGroupWrapper(root)) # <- notice the Zarr group
print(reconstructed) # works!
# -> <Array [[1, 2, 3], [], [4, 5]] type='3 * var * int64'>Now, that part is clear to me how it works (same would work with plain HDF5). In an improved version one would store the metadata (form and length, they're JSON serializable) in But we didn't really make use of any chunking yet (notice that shape and chunks are equal above). Instead, we're interested in storing many such jagged arrays into the above Zarr group and preserve their individual top level length by storing all offsets and values concatenated with different chunk sizes. Each jagged array may have different number of offsets and values. Something in the direction of (the following doesn't work, just pseudo-code): root = zarr.group("data.zarr")
# offsets = [0, 3, 3, 5] (shape=4)
# -> chunk0: [0, 3, 3] (chunksize=3)
# -> chunk1: [5] (chunksize=1) ...this is not entirely correct as the offsets should be [3, 5] instead
offsets = root.create_array(
name='node0-offsets', shape=(4,), chunks=[3, 1], dtype=offsets_1darray.dtype
)
offsets[:] = offsets_1darray
# values = [1, 2, 3, 4, 5] (shape=5)
# -> chunk0: [1, 2, 3] (chunksize=3)
# -> chunk1: [4, 5] (chunksize=2)
values = root.create_array(
name='node1-data', shape=(5,), chunks=[3, 2], dtype=values_1darray.dtype
)
values[:] = values_1darray
# The following doesn't work yet with Awkward Array, but that's what I like to achieve:
# reconstruct the chunked array:
reconstructed = ak.from_buffers(form, length, ZarrGroupWrapper(root))
# when slicing the first 2, the idea is that awkward's slicing will "request" exactly the slice for
# the first chunk of offsets and values, i.e., the following would only load and decompress chunk0:
chunk0 = reconstructed[:2]
# similarly, the following would only load and decompress chunk1:
chunk1 = reconstructed[2:]All of this requires however that we can write efficiently a large Zarr group in the first place from many concurrent dask workers where each dask task writes a jagged array into the group (appending the offsets and values into the group's offset and values arrays, and appending the additional chunk size). This step is appending only, so no "+=" like logic (we do N -> N, but the output N are just smaller arrays). That way, we could lazily construct one awkward array that uses all chunks internally, but we only load and decompress a small subset when e.g. looking at only the first 100 top level elements with |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Dear Zarr experts,
It's great to see the new support for irregular chunking of Zarr arrays: https://www.earthmover.io/blog/zarr-variable-length-chunks 🎉
This opens up the possibility to efficiently store jagged Awkward Arrays with Zarr, which is a primary use-case for high-energy physics (HEP) analyses. I think there's now a straight forward way to provide Zarr serialization for single Awkward Arrays, however we're very much interested in how to use this at scale when writing many (O(100)k) differently sized and jagged Awkward Arrays from a dask cluster concurrently and subsequently read them again.
Our workflow is roughly as follows:
We run O(10-100)k embarrassingly parallel dask tasks on a cluster where each task would read a ROOT file (https://root.cern) reading e.g. 100k LHC collision events that each contains a variable number of particles (and their properties, e.g., Electron momentum). The total input data size is typically O(1-10)TB. Inside each task we select and reject certain collision events based on our physics knowledge. The output of each task would be a smaller sized Awkward Array that has multiple fields of yet again differently sized particle properties (a jagged array). This step we typically refer to as "skimming" to reduce the total amount of events that are relevant for further analysis. Currently, we write these skimmed event collections out as individual files (typically in ROOT or parquet format): 1 file per dask task. The total output size of such skims are roughly O(100)GB - O(1TB) split into many small files.
Now, with irregular chunking support in Zarr we could also write them out as Zarr arrays where the Zarr array is dynamically (with different chunk sizes) grown at axis=0 ('event axis'). That way subsequent analysis steps that want to read again e.g. 100k events per dask task could just slice 100k events and it would read the corresponding skimmed particle collections (e.g. multiple irregular chunks of the Zarr file). In my understanding this would eliminate an explicit file merging step that we currently typically do when we write 1 file per skimming task (they're often very small, so we have to merge many of those to end up with at least 100k events again), and transparently handle this through slicing multiple irregular chunks. Is my understanding here correct?
The mental model I currently struggle with is that we do not know before doing the skimming what the output shape and chunk sizes are that we want to write; we only know them per skimmed event chunk after doing our selection in each dask task. This would mean I need to dynamically grow a Zarr array every time any dask task finishes to properly extend the Zarr array and then append the data of each skimmed event chunk into it along the event axis (axis=0). Do you have tips on how to achieve this as efficiently as possible?
I had a brief offline discussion with @martindurant (thanks!) who suggested the following for the 'end' of each dask task:
This would write the bytes into individual files and collects the chunk_size from each dask task, which could let us build a Zarr array based on each bytes + chunk size pair. @martindurant suggested to reach out to you to understand if this is the recommended/most efficient way to build a "dynamically growable" Zarr array with irregular chunking for our full skimmed event dataset?
Thank you a lot in advance for your help and sorry for the wall of text 😅!
Best, Peter
Beta Was this translation helpful? Give feedback.
All reactions