Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 31 additions & 7 deletions cassandra/numpy_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ as numpy is an optional dependency.
include "ioutils.pyx"

cimport cython
from libc.stdint cimport uint64_t, uint8_t
from libc.stdint cimport uint64_t
from libc.string cimport memset
from cpython.ref cimport Py_INCREF, PyObject
Comment on lines 27 to 30

from cassandra.bytesio cimport BytesIOReader
Expand All @@ -52,12 +53,14 @@ ctypedef struct ArrDesc:
int stride # should be large enough as we allocate contiguous arrays
int is_object
Py_uintptr_t mask_ptr
int mask_stride

arrDescDtype = np.dtype(
[ ('buf_ptr', np.uintp)
, ('stride', np.dtype('i'))
, ('is_object', np.dtype('i'))
, ('mask_ptr', np.uintp)
, ('mask_stride', np.dtype('i'))
], align=True)

_cqltype_to_numpy = {
Expand All @@ -71,8 +74,6 @@ _cqltype_to_numpy = {

obj_dtype = np.dtype('O')

cdef uint8_t mask_true = 0x01

cdef class NumpyParser(ColumnParser):
"""Decode a ResultMessage into a bunch of NumPy arrays"""

Expand Down Expand Up @@ -112,7 +113,7 @@ def make_arrays(ParseDesc desc, array_size):
(e.g. this can be fed into pandas.DataFrame)
"""
array_descs = np.empty((desc.rowsize,), arrDescDtype)
arrays = []
arrays = [None] * desc.rowsize

for i, coltype in enumerate(desc.coltypes):
arr = make_array(coltype, array_size)
Expand All @@ -121,17 +122,35 @@ def make_arrays(ParseDesc desc, array_size):
array_descs[i]['is_object'] = arr.dtype is obj_dtype
try:
array_descs[i]['mask_ptr'] = arr.mask.ctypes.data
array_descs[i]['mask_stride'] = arr.mask.strides[0]
except AttributeError:
array_descs[i]['mask_ptr'] = 0
arrays.append(arr)
array_descs[i]['mask_stride'] = 1
arrays[i] = arr

return array_descs, arrays


def make_array(coltype, array_size):
"""
Allocate a new NumPy array of the given column type and size.
For VectorType, creates a 2D array (array_size x vector_dimension).
"""
# Check if this is a VectorType
if issubclass(coltype, cqltypes.VectorType):
# VectorType - create 2D array (rows x vector_dimension)
vector_size = coltype.vector_size
subtype = coltype.subtype
try:
dtype = _cqltype_to_numpy[subtype]
a = np.ma.empty((array_size, vector_size), dtype=dtype)
a.mask = np.zeros((array_size, vector_size), dtype=bool)
except KeyError:
# Unsupported vector subtype - fall back to object array
a = np.empty((array_size,), dtype=obj_dtype)
return a

# Scalar types
try:
a = np.ma.empty((array_size,), dtype=_cqltype_to_numpy[coltype])
a.mask = np.zeros((array_size,), dtype=bool)
Expand Down Expand Up @@ -160,20 +179,25 @@ cdef inline int unpack_row(
Py_INCREF(val)
(<PyObject **> arr.buf_ptr)[0] = <PyObject *> val
elif buf.size >= 0:
if buf.size > arr.stride:
raise ValueError(
"Column %d: received %d bytes but array stride is %d" %
(i, buf.size, arr.stride))
memcpy(<char *> arr.buf_ptr, buf.ptr, buf.size)
else:
memcpy(<char *>arr.mask_ptr, &mask_true, 1)
memset(<char *>arr.mask_ptr, 1, arr.mask_stride)
Comment on lines 181 to +188

# Update the pointer into the array for the next time
arrays[i].buf_ptr += arr.stride
arrays[i].mask_ptr += 1
arrays[i].mask_ptr += arr.mask_stride

return 0


def make_native_byteorder(arr):
"""
Make sure all values have a native endian in the NumPy arrays.
Handles both 1D (scalar types) and 2D (VectorType) arrays.
"""
if is_little_endian and not arr.dtype.kind == 'O':
# We have arrays in big-endian order. First swap the bytes
Expand Down
Loading
Loading