Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions bench/ndarray/multithreaded_matmul_bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import blosc2
import numpy as np
import time

N = 10000
ndim = 2
ashape = (N,) * ndim
bshape = ashape
dtype = np.float64

achunks = (1000, 1000)
bchunks = (achunks[1], achunks[0])
ablocks = (200, 200)
bblocks = (ablocks[1], ablocks[0])
outblocks = (ablocks[0], bblocks[1])
outchunks = (achunks[0], bchunks[1])
# a = blosc2.linspace(0, 1, dtype=dtype, shape=ashape, chunks=achunks, blocks=ablocks)
# b = blosc2.linspace(0, 1, dtype=dtype, shape=bshape, chunks=bchunks, blocks=bblocks)
a = blosc2.ones(dtype=dtype, shape=ashape, chunks=achunks, blocks=ablocks)
b = blosc2.full(fill_value=2, dtype=dtype, shape=bshape, chunks=bchunks, blocks=bblocks)

a_np = a[:]
b_np = b[:]
tic = time.time()
np_res = np.matmul(a_np, b_np)
print(f'numpy finished in {time.time()-tic} s')

tic = time.time()
b2_res = blosc2.matmul(a, b, blocks=outblocks, chunks=outchunks)
print(f'blosc2 multithreaded finished in {time.time()-tic} s')

tic = time.time()
b2_res = blosc2.matmul(a, b)
print(f'blosc2 normal finished in {time.time()-tic} s')

achunks = None #(1000, 1000)
bchunks = None #(achunks[1], achunks[0])
ablocks = None #(200, 200)
bblocks = None #(ablocks[1], ablocks[0])
outblocks = None #(ablocks[0], bblocks[1])
outchunks = None #(achunks[0], bchunks[1])
# a = blosc2.linspace(0, 1, dtype=dtype, shape=ashape, chunks=achunks, blocks=ablocks)
# b = blosc2.linspace(0, 1, dtype=dtype, shape=bshape, chunks=bchunks, blocks=bblocks)
a = blosc2.ones(dtype=dtype, shape=ashape, chunks=achunks, blocks=ablocks)
b = blosc2.full(fill_value=2, dtype=dtype, shape=bshape, chunks=bchunks, blocks=bblocks)
tic = time.time()
b2_res = blosc2.matmul(a, b, blocks=outblocks, chunks=outchunks)
print(f'blosc2 normal with default chunks etc. finished in {time.time()-tic} s')
2 changes: 1 addition & 1 deletion bench/ndarray/stringops_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import time
import numpy as np
import blosc2
from blosc2.lazyexpr import _toggle_miniexpr
from blosc2.utils import _toggle_miniexpr

# nparr = np.random.randint(low=0, high=128, size=(N, 10), dtype=np.uint32)
# nparr = nparr.view('S40').astype('U10')
Expand Down
252 changes: 249 additions & 3 deletions src/blosc2/blosc2_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ cimport numpy as np

np.import_array()


cdef extern from "<stdint.h>":
ctypedef signed char int8_t
ctypedef signed short int16_t
Expand All @@ -53,6 +52,12 @@ cdef extern from "<stdint.h>":
ctypedef unsigned int uint32_t
ctypedef unsigned long long uint64_t

ctypedef fused T:
float
double
int32_t
int64_t

cdef extern from "<stdio.h>":
int printf(const char *format, ...) nogil

Expand Down Expand Up @@ -669,6 +674,13 @@ ctypedef struct me_udata:
int64_t blocks_in_chunk[B2ND_MAX_DIM]
me_expr* miniexpr_handle

ctypedef struct mm_udata:
b2nd_array_t** inputs
b2nd_array_t* array
int64_t chunks_strides[3][B2ND_MAX_DIM]
int64_t blocks_strides[3][B2ND_MAX_DIM]
int64_t el_strides[3][B2ND_MAX_DIM]

MAX_TYPESIZE = BLOSC2_MAXTYPESIZE
MAX_BUFFERSIZE = BLOSC2_MAX_BUFFERSIZE
MAX_BLOCKSIZE = BLOSC2_MAXBLOCKSIZE
Expand Down Expand Up @@ -982,7 +994,7 @@ cdef _check_cparams(blosc2_cparams *cparams):
if ufilters[i] and cparams.filters[i] in blosc2.ufilters_registry.keys():
raise ValueError("Cannot use multi-threading with user defined Python filters")

if cparams.prefilter != NULL and cparams.prefilter != <blosc2_prefilter_fn>miniexpr_prefilter:
if cparams.prefilter != NULL and (cparams.prefilter != <blosc2_prefilter_fn>miniexpr_prefilter and cparams.prefilter != <blosc2_prefilter_fn>matmul_prefilter):
# Note: miniexpr_prefilter uses miniexpr C API which is thread-friendly,
raise ValueError("`nthreads` must be 1 when a prefilter is set")

Expand Down Expand Up @@ -2017,7 +2029,6 @@ cdef int aux_miniexpr(me_udata *udata, int64_t nchunk, int32_t nblock,
cdef b2nd_array_t* ndarr
cdef int rc
cdef void** input_buffers = <void**> malloc(udata.ninputs * sizeof(uint8_t*))
cdef float *buf
cdef uint8_t* src
cdef uint8_t* chunk
cdef c_bool needs_free
Expand Down Expand Up @@ -2143,6 +2154,165 @@ cdef int aux_miniexpr(me_udata *udata, int64_t nchunk, int32_t nblock,

return 0

cdef int matmul_block_kernel(T* A, T* B, T* C, int M, int K, int N) nogil:
cdef int r, c, k
cdef T a
cdef int rowA, rowC, rowB
for r in range(M):
rowA = r * K
rowC = r * N
for k in range(K):
a = A[rowA + k]
rowB = k * N
for c in range(N):
C[rowC + c] += <T>(a * B[rowB + c])
return 0

cdef int aux_matmul(mm_udata *udata, int64_t nchunk, int32_t nblock, void *params_output, int32_t typesize, int typecode) nogil:
# Declare all C variables at the beginning
cdef b2nd_array_t* out_arr
cdef b2nd_array_t* ndarr
cdef c_bool first_run
cdef int rc, p, q, r
cdef void** input_buffers = <void**> malloc(2 * sizeof(uint8_t*))
cdef uint8_t** src = <uint8_t**> malloc(2 * sizeof(uint8_t*))
cdef int32_t chunk_nbytes[2]
cdef int32_t chunk_cbytes[2]
cdef int32_t block_nbytes[2]
cdef int blocknitems[2]
cdef int startA, startB, expected_blocknitems
cdef blosc2_context* dctx
cdef int i, j, block_i, block_j, ncols, block_ncols, Bblock_ncols, Bncols
cdef int nchunkA = 0, nchunkB = 0, nblockA = 0, nblockB = 0, offsetA = 0, offsetB = 0, offset = 0
out_arr = udata.array
cdef int ndim = out_arr.ndim
cdef int nchunk_ = nchunk
cdef int coord, batch, batch_, batches = 1

# batches = sum(strides[i]*elcoords[i])
for i in range(ndim - 2):
batches *= out_arr.blockshape[i]

# nchunk = sum(strides[i]*chunkcoords[i])
for i in range(ndim - 2):
coord = nchunk_ // udata.chunks_strides[0][i]
nchunk_ = nchunk_ % udata.chunks_strides[0][i]
nchunkA += coord * udata.chunks_strides[1][i]
nchunkB += coord * udata.chunks_strides[2][i]

ncols = udata.chunks_strides[0][ndim - 2]
Bncols = udata.chunks_strides[2][ndim - 2]

i = nchunk_ // ncols # ncols * i + j
j = nchunk_ % ncols
chunk_startA = nchunkA + i * ncols
chunk_startB = nchunkB + j

# nblock = sum(strides[i]*blockcoords[i])
cdef int nblock_ = nblock
for i in range(ndim - 2):
coord = nblock_ // udata.blocks_strides[0][i]
nblock_ = nblock_ % udata.blocks_strides[0][i]
nblockA += coord * udata.blocks_strides[1][i]
nblockB += coord * udata.blocks_strides[2][i]

block_ncols = udata.blocks_strides[0][ndim - 2]
Bblock_ncols = udata.blocks_strides[2][ndim - 2]

block_i = nblock_ // block_ncols
block_j = nblock_ % block_ncols
block_startA = nblockA + block_i * block_ncols
block_startB = nblockB + block_j

dctx = blosc2_create_dctx(BLOSC2_DPARAMS_DEFAULTS)

first_run = True
nchunkA = chunk_startA
nchunkB = chunk_startB
while True: # chunk loop
for i in range(2):
chunk_idx = nchunkA if i == 0 else nchunkB
ndarr = udata.inputs[i]
ndim = ndarr.ndim
src[i] = ndarr.sc.data[chunk_idx]
rc = blosc2_cbuffer_sizes(src[i], &chunk_nbytes[i], &chunk_cbytes[i], &block_nbytes[i])
if rc < 0:
raise ValueError("miniexpr: error getting cbuffer sizes")
if block_nbytes[i] <= 0:
raise ValueError("miniexpr: invalid block size")
if first_run:
if i == 0:
q = ndarr.blockshape[ndim - 1]
p = ndarr.blockshape[ndim - 2]
else: # i = 1
r = ndarr.blockshape[ndim - 1]
input_buffers[i] = malloc(block_nbytes[i])
if input_buffers[i] == NULL:
raise MemoryError("miniexpr: cannot allocate input block buffer")
blocknitems[i] = block_nbytes[i] // <int> ndarr.sc.typesize
if i == 0:
expected_blocknitems = blocknitems[i]
elif blocknitems[i] != expected_blocknitems:
raise ValueError("miniexpr: inconsistent block element counts across inputs")

first_run = False
nblockA = block_startA
nblockB = block_startB
while True: # block loop
startA = nblockA * blocknitems[0]
startB = nblockB * blocknitems[1]
rc = blosc2_getitem_ctx(dctx, src[0], chunk_cbytes[0], startA, blocknitems[0],
input_buffers[0], block_nbytes[0])
if rc < 0:
raise ValueError("matmul: error decompressing the A chunk")
rc = blosc2_getitem_ctx(dctx, src[1], chunk_cbytes[1], startB, blocknitems[1],
input_buffers[1], block_nbytes[1])
if rc < 0:
raise ValueError("matmul: error decompressing the B chunk")
batch = 0
offsetA = 0
offsetB = 0
offset = 0
while batch < batches:
batch_ = batch
for i in range(ndim - 2):
coord = batch // udata.el_strides[0][i]
batch_ = batch_ % udata.el_strides[0][i]
offsetA += coord * udata.el_strides[1][i]
offsetB += coord * udata.el_strides[2][i]
offset += coord * udata.el_strides[0][i]
if typecode == 0:
if typesize == 4:
rc = matmul_block_kernel[float](<float*>input_buffers[0] + offsetA, <float*>input_buffers[1] + offsetB, <float*>params_output + offset, p, q, r)
else:
rc = matmul_block_kernel[double](<double*>input_buffers[0] + offsetA, <double*>input_buffers[1] + offsetB, <double*>params_output + offset, p, q, r)
elif typecode == 1:
if typesize == 4:
rc = matmul_block_kernel[int32_t](<int32_t*>input_buffers[0] + offsetA, <int32_t*>input_buffers[1] + offsetB, <int32_t*>params_output + offset, p, q, r)
else:
rc = matmul_block_kernel[int64_t](<int64_t*>input_buffers[0] + offsetA, <int64_t*>input_buffers[1] + offsetB, <int64_t*>params_output + offset, p, q, r)
else:
with gil:
raise ValueError("Unsupported dtype")
batch += 1
nblockA += 1
nblockB += Bblock_ncols
if (nblockA % block_ncols == 0):
break
nchunkA += 1
nchunkB += Bncols
if (nchunkA % ncols == 0):
break


blosc2_free_ctx(dctx)
# Free resources
for i in range(2):
free(input_buffers[i])
free(input_buffers)
free(src)

return 0

# Aux function for prefilter and postfilter udf
cdef int aux_udf(udf_udata *udata, int64_t nchunk, int32_t nblock,
Expand Down Expand Up @@ -2221,6 +2391,19 @@ cdef int miniexpr_prefilter(blosc2_prefilter_params *params):
return aux_miniexpr(<me_udata *> params.user_data, params.nchunk, params.nblock, False,
params.output, params.output_typesize)

cdef int matmul_prefilter(blosc2_prefilter_params *params):
cdef int typecode

cdef mm_udata* udata = <mm_udata *> params.user_data
cdef b2nd_array_t* out_arr = udata.array
cdef char dtype_kind = out_arr.dtype[1]
if dtype_kind == 'f':
typecode = 0
elif dtype_kind == 'i':
typecode = 1
else:
raise ValueError("Unsupported dtype")
return aux_matmul(udata, params.nchunk, params.nblock, params.output, params.output_typesize, typecode)

cdef int general_udf_prefilter(blosc2_prefilter_params *params):
cdef udf_udata *udata = <udf_udata *> params.user_data
Expand Down Expand Up @@ -3068,6 +3251,49 @@ cdef class NDArray:

return udata

cdef mm_udata *_fill_mm_udata(self, inputs):
cdef mm_udata *udata = <mm_udata *> malloc(sizeof(mm_udata))
cdef int cstrides, bstrides, estrides
cdef b2nd_array_t* inp
cdef b2nd_array_t** inputs_ = <b2nd_array_t**> malloc(2 * sizeof(b2nd_array_t*))
for i in range(2):
operand = inputs['x1'] if i == 0 else inputs['x2']
inputs_[i] = <b2nd_array_t*><uintptr_t>operand.c_array
inputs_[i].chunk_cache.nchunk = -1
inputs_[i].chunk_cache.data = NULL
udata.inputs = inputs_
udata.array = self.array

# Save these in udf_udata to avoid computing them for each block
for i in range(3):
udata.chunks_strides[i][self.array.ndim - 1] = 1
udata.blocks_strides[i][self.array.ndim - 1] = 1
udata.el_strides[i][self.array.ndim - 1] = 1
for idx in range(2, self.array.ndim + 1):
i = self.array.ndim - idx
udata.chunks_strides[0][i] = udata.chunks_strides[0][i + 1] * udata.array.extshape[i + 1] // udata.array.chunkshape[i + 1]
udata.blocks_strides[0][i] = udata.blocks_strides[0][i + 1] * udata.array.extchunkshape[i + 1] // udata.array.blockshape[i + 1]
udata.el_strides[0][i] = udata.el_strides[0][i + 1] * udata.array.blockshape[i + 1]

for j in range(1, 3):
inp = inputs_[j - 1]
cstrides = bstrides = estrides = 1
for idx in range(2, self.array.ndim + 1):
i = inp.ndim - idx
if inp.shape[i + 1] == 1 or i < 0:
udata.chunks_strides[j][i] = 0
udata.blocks_strides[j][i] = 0
udata.el_strides[j][i] = 0
else:
bstrides *= inp.extchunkshape[i + 1] // inp.blockshape[i + 1]
cstrides *= inp.extshape[i + 1] // inp.chunkshape[i + 1]
estrides *= inp.blockshape[i + 1]
udata.chunks_strides[j][i] = cstrides
udata.blocks_strides[j][i] = bstrides
udata.el_strides[j][i] = estrides

return udata

def _set_pref_expr(self, expression, inputs, fp_accuracy, aux_reduc=None, jit=None):
# Set prefilter for miniexpr
cdef blosc2_cparams* cparams = self.array.sc.storage.cparams
Expand Down Expand Up @@ -3153,6 +3379,26 @@ cdef class NDArray:
if self.array.sc.cctx == NULL:
raise RuntimeError("Could not create compression context")

def _set_pref_matmul(self, inputs, fp_accuracy):
# Set prefilter for miniexpr
cdef blosc2_cparams* cparams = self.array.sc.storage.cparams
cparams.prefilter = <blosc2_prefilter_fn> matmul_prefilter

cdef mm_udata* udata = self._fill_mm_udata(inputs)
cdef b2nd_array_t* out_arr = udata.array
cdef blosc2_prefilter_params* preparams = <blosc2_prefilter_params *> calloc(1, sizeof(blosc2_prefilter_params))
preparams.user_data = udata
preparams.output_is_disposable = False
cparams.preparams = preparams
_check_cparams(cparams)

if self.array.sc.cctx != NULL:
# Freeing NULL context can lead to segmentation fault
blosc2_free_ctx(self.array.sc.cctx)
self.array.sc.cctx = blosc2_create_cctx(dereference(cparams))
if self.array.sc.cctx == NULL:
raise RuntimeError("Could not create compression context")

def _set_pref_udf(self, func, inputs_id):
if self.array.sc.storage.cparams.nthreads > 1:
raise AttributeError("compress `nthreads` must be 1 when assigning a prefilter")
Expand Down
9 changes: 1 addition & 8 deletions src/blosc2/lazyexpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
process_key,
reducers,
safe_numpy_globals,
try_miniexpr,
)

if not blosc2.IS_WASM:
Expand All @@ -76,14 +77,6 @@
global safe_blosc2_globals
safe_blosc2_globals = {}

# Set this to False if miniexpr should not be tried out
try_miniexpr = not blosc2.IS_WASM or getattr(blosc2, "_WASM_MINIEXPR_ENABLED", False)


def _toggle_miniexpr(FLAG):
global try_miniexpr
try_miniexpr = FLAG


def ne_evaluate(expression, local_dict=None, **kwargs):
"""Safely evaluate expressions using numexpr when possible, falling back to numpy."""
Expand Down
Loading
Loading