Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
d1bc0f4
Initial implementation for BatchArray
FrancescAlted Mar 16, 2026
37837d9
Add .info to BatchArray and VLArray; fancier .info for others too
FrancescAlted Mar 17, 2026
f3ef361
Update to latest c-blosc2 in vlblocks branch
FrancescAlted Mar 17, 2026
65ea44b
Update to latest c-blosc2 in vlblocks branch
FrancescAlted Mar 17, 2026
4172c3f
Enable dicts by default in BatchArray and VLArray when using zstd
FrancescAlted Mar 17, 2026
e799739
Update to latest c-blosc2
FrancescAlted Mar 17, 2026
e640474
Update to latest c-blosc2
FrancescAlted Mar 17, 2026
07ec9d2
Update the nthreads to the default value in this machine
FrancescAlted Mar 17, 2026
4fe44e8
New meanings for .chunksize and .blocksize properties
FrancescAlted Mar 18, 2026
bad8411
Fix for newer versions of torch
FrancescAlted Mar 18, 2026
4fb6882
BatchArray -> ObjectArray
FrancescAlted Mar 18, 2026
bf1b935
Remove legacy fallback
FrancescAlted Mar 18, 2026
93378e5
Update to latest c-blosc2
FrancescAlted Mar 18, 2026
99be797
ObjectArray -> ObjectStore
FrancescAlted Mar 18, 2026
244c86d
chunksize -> batchsize
FrancescAlted Mar 18, 2026
e999d1d
batchsize is always constant
FrancescAlted Mar 18, 2026
afc8436
ObjectStore -> BatchStore
FrancescAlted Mar 18, 2026
be41f85
More consistent naming
FrancescAlted Mar 18, 2026
c6540e6
batchsize is not immutable anymore (and neither blocksize)
FrancescAlted Mar 19, 2026
e60e31c
Implemented block-only reads for improved random read access
FrancescAlted Mar 19, 2026
4eed97c
New cache for the last block read
FrancescAlted Mar 19, 2026
f51c430
New iter_objects for iterating over objects in batch store
FrancescAlted Mar 19, 2026
ab8b495
Recognize .b2b extension as BatchStore in DictStore
FrancescAlted Mar 19, 2026
d21ed20
blocksize_max -> max_blocksize. also, this is persisted in metalayer …
FrancescAlted Mar 19, 2026
9c177bd
Adapt max_blocksize depending on the clevel
FrancescAlted Mar 19, 2026
a65cb69
Revamped BatchStore. Add arrow as an optional serializer, and much mo…
FrancescAlted Mar 20, 2026
0ab340d
Use metadata-based DictStore discovery and warn on leaf mismatches
FrancescAlted Mar 20, 2026
79cd7ab
Add a BatchStore.items accessor
FrancescAlted Mar 20, 2026
c6cccdd
Undo an unneceesary workaround
FrancescAlted Mar 20, 2026
20a958c
Start using L2 cache size for clevel==5
FrancescAlted Mar 20, 2026
c8c1632
Allow use a filename in .b2z as a single argument
FrancescAlted Mar 20, 2026
5a1cd0f
Adapt test to new blocksize thresholds
FrancescAlted Mar 20, 2026
62dc717
Fix BatchStore metadata preservation paths
FrancescAlted Mar 20, 2026
b379a2c
Update to latest c-blosc2
FrancescAlted Mar 20, 2026
80f2b5f
Merge branch 'main' into batch-store
FrancescAlted Mar 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions bench/batch_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#######################################################################
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
#######################################################################

from __future__ import annotations

import argparse
import random
import statistics
import time

import blosc2


URLPATH = "bench_batch_store.b2b"
NBATCHES = 10_000
OBJECTS_PER_BATCH = 100
TOTAL_OBJECTS = NBATCHES * OBJECTS_PER_BATCH
BLOCKSIZE_MAX = 32
N_RANDOM_READS = 1_000


def make_rgb(batch_index: int, item_index: int) -> dict[str, int]:
global_index = batch_index * OBJECTS_PER_BATCH + item_index
return {
"red": batch_index,
"green": item_index,
"blue": global_index,
}


def make_batch(batch_index: int) -> list[dict[str, int]]:
return [make_rgb(batch_index, item_index) for item_index in range(OBJECTS_PER_BATCH)]


def expected_entry(batch_index: int, item_index: int) -> dict[str, int]:
return {
"red": batch_index,
"green": item_index,
"blue": batch_index * OBJECTS_PER_BATCH + item_index,
}


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Benchmark BatchStore single-entry reads.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--codec", type=str, default="ZSTD", choices=[codec.name for codec in blosc2.Codec])
parser.add_argument("--clevel", type=int, default=5)
parser.add_argument("--serializer", type=str, default="msgpack", choices=["msgpack", "arrow"])
parser.add_argument("--use-dict", action="store_true", help="Enable dictionaries for ZSTD/LZ4/LZ4HC codecs.")
parser.add_argument("--in-mem", action="store_true", help="Keep the BatchStore purely in memory.")
return parser


def build_store(
codec: blosc2.Codec, clevel: int, use_dict: bool, serializer: str, in_mem: bool
) -> blosc2.BatchStore | None:
if in_mem:
storage = blosc2.Storage(mode="w")
store = blosc2.BatchStore(
storage=storage,
max_blocksize=BLOCKSIZE_MAX,
serializer=serializer,
cparams={
"codec": codec,
"clevel": clevel,
"use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC),
},
)
for batch_index in range(NBATCHES):
store.append(make_batch(batch_index))
return store

blosc2.remove_urlpath(URLPATH)
storage = blosc2.Storage(urlpath=URLPATH, mode="w", contiguous=True)
cparams = {
"codec": codec,
"clevel": clevel,
"use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC),
}
with blosc2.BatchStore(
storage=storage, max_blocksize=BLOCKSIZE_MAX, serializer=serializer, cparams=cparams
) as store:
for batch_index in range(NBATCHES):
store.append(make_batch(batch_index))
return None


def measure_random_reads(store: blosc2.BatchStore) -> tuple[list[tuple[int, int, int, dict[str, int]]], list[int]]:
rng = random.Random(2024)
samples: list[tuple[int, int, int, dict[str, int]]] = []
timings_ns: list[int] = []

for _ in range(N_RANDOM_READS):
batch_index = rng.randrange(len(store))
item_index = rng.randrange(OBJECTS_PER_BATCH)
t0 = time.perf_counter_ns()
value = store[batch_index][item_index]
timings_ns.append(time.perf_counter_ns() - t0)
if value != expected_entry(batch_index, item_index):
raise RuntimeError(f"Value mismatch at batch={batch_index}, item={item_index}")
samples.append((timings_ns[-1], batch_index, item_index, value))

return samples, timings_ns


def main() -> None:
parser = build_parser()
args = parser.parse_args()
codec = blosc2.Codec[args.codec]
use_dict = args.use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC)

mode_label = "in-memory" if args.in_mem else "persistent"
article = "an" if args.in_mem else "a"
print(f"Building {article} {mode_label} BatchStore with 1,000,000 RGB dicts and timing 1,000 random scalar reads...")
print(f" codec: {codec.name}")
print(f" clevel: {args.clevel}")
print(f" serializer: {args.serializer}")
print(f" use_dict: {use_dict}")
print(f" in_mem: {args.in_mem}")
t0 = time.perf_counter()
store = build_store(
codec=codec, clevel=args.clevel, use_dict=use_dict, serializer=args.serializer, in_mem=args.in_mem
)
build_time_s = time.perf_counter() - t0
if args.in_mem:
assert store is not None
read_store = store
else:
read_store = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, max_blocksize=BLOCKSIZE_MAX)
samples, timings_ns = measure_random_reads(read_store)
t0 = time.perf_counter()
checksum = 0
nitems = 0
for item in read_store.iter_items():
checksum += item["blue"]
nitems += 1
iter_time_s = time.perf_counter() - t0

print()
print("BatchStore benchmark")
print(f" build time: {build_time_s:.3f} s")
print(f" batches: {len(read_store)}")
print(f" items: {TOTAL_OBJECTS}")
print(f" max_blocksize: {read_store.max_blocksize}")
print()
print(read_store.info)
print(f"Random scalar reads: {N_RANDOM_READS}")
print(f" mean: {statistics.fmean(timings_ns) / 1_000:.2f} us")
print(f" max: {max(timings_ns) / 1_000:.2f} us")
print(f" min: {min(timings_ns) / 1_000:.2f} us")
print(f"Item iteration via iter_items(): {iter_time_s:.3f} s")
print(f" per item: {iter_time_s * 1_000_000 / nitems:.2f} us")
print(f" checksum: {checksum}")
print("Sample reads:")
for timing_ns, batch_index, item_index, value in samples[:5]:
print(f" {timing_ns / 1_000:.2f} us -> read_store[{batch_index}][{item_index}] = {value}")
if args.in_mem:
print("BatchStore kept in memory")
else:
print(f"BatchStore file at: {read_store.urlpath}")


if __name__ == "__main__":
main()
Loading
Loading