From d9dfe9efd888181df3dd1441ea63fd2ed1d52824 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Thu, 5 Feb 2026 23:42:27 +0200 Subject: [PATCH 1/2] (improvement)Add VectorType support to numpy_parser for 2D array parsing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend NumpyParser to handle VectorType columns by creating 2D NumPy arrays (rows × vector_dimension) instead of object arrays. This enables zero-copy parsing for vector embeddings in ML/AI workloads. Features: - Detects VectorType via vector_size and subtype attributes - Creates 2D masked arrays for numeric vector subtypes (float, double, int32, int64, int16) - Falls back to object arrays for unsupported vector subtypes - Handles endianness conversion for both 1D and 2D arrays - Pre-allocates result arrays for efficiency Supported vector types: - Vector → 2D float32 array - Vector → 2D float64 array - Vector → 2D int32 array - Vector → 2D int64 array - Vector → 2D int16 array Adds comprehensive test coverage for all supported vector types, mixed column queries, and large vector dimensions (384-element embeddings). Signed-off-by: Yaniv Kaul --- cassandra/numpy_parser.pyx | 30 +++- tests/unit/test_numpy_parser.py | 305 ++++++++++++++++++++++++++++++++ 2 files changed, 331 insertions(+), 4 deletions(-) create mode 100644 tests/unit/test_numpy_parser.py diff --git a/cassandra/numpy_parser.pyx b/cassandra/numpy_parser.pyx index 0ad34f66e2..83a82bb7b0 100644 --- a/cassandra/numpy_parser.pyx +++ b/cassandra/numpy_parser.pyx @@ -26,6 +26,7 @@ include "ioutils.pyx" cimport cython from libc.stdint cimport uint64_t, uint8_t +from libc.string cimport memset from cpython.ref cimport Py_INCREF, PyObject from cassandra.bytesio cimport BytesIOReader @@ -52,12 +53,14 @@ ctypedef struct ArrDesc: int stride # should be large enough as we allocate contiguous arrays int is_object Py_uintptr_t mask_ptr + int mask_stride arrDescDtype = np.dtype( [ ('buf_ptr', np.uintp) , ('stride', np.dtype('i')) , ('is_object', np.dtype('i')) , ('mask_ptr', np.uintp) + , ('mask_stride', np.dtype('i')) ], align=True) _cqltype_to_numpy = { @@ -112,7 +115,7 @@ def make_arrays(ParseDesc desc, array_size): (e.g. this can be fed into pandas.DataFrame) """ array_descs = np.empty((desc.rowsize,), arrDescDtype) - arrays = [] + arrays = [None] * desc.rowsize for i, coltype in enumerate(desc.coltypes): arr = make_array(coltype, array_size) @@ -121,9 +124,11 @@ def make_arrays(ParseDesc desc, array_size): array_descs[i]['is_object'] = arr.dtype is obj_dtype try: array_descs[i]['mask_ptr'] = arr.mask.ctypes.data + array_descs[i]['mask_stride'] = arr.mask.strides[0] except AttributeError: array_descs[i]['mask_ptr'] = 0 - arrays.append(arr) + array_descs[i]['mask_stride'] = 1 + arrays[i] = arr return array_descs, arrays @@ -131,7 +136,23 @@ def make_arrays(ParseDesc desc, array_size): def make_array(coltype, array_size): """ Allocate a new NumPy array of the given column type and size. + For VectorType, creates a 2D array (array_size x vector_dimension). """ + # Check if this is a VectorType + if issubclass(coltype, cqltypes.VectorType): + # VectorType - create 2D array (rows x vector_dimension) + vector_size = coltype.vector_size + subtype = coltype.subtype + try: + dtype = _cqltype_to_numpy[subtype] + a = np.ma.empty((array_size, vector_size), dtype=dtype) + a.mask = np.zeros((array_size, vector_size), dtype=bool) + except KeyError: + # Unsupported vector subtype - fall back to object array + a = np.empty((array_size,), dtype=obj_dtype) + return a + + # Scalar types try: a = np.ma.empty((array_size,), dtype=_cqltype_to_numpy[coltype]) a.mask = np.zeros((array_size,), dtype=bool) @@ -162,11 +183,11 @@ cdef inline int unpack_row( elif buf.size >= 0: memcpy( arr.buf_ptr, buf.ptr, buf.size) else: - memcpy(arr.mask_ptr, &mask_true, 1) + memset(arr.mask_ptr, 1, arr.mask_stride) # Update the pointer into the array for the next time arrays[i].buf_ptr += arr.stride - arrays[i].mask_ptr += 1 + arrays[i].mask_ptr += arr.mask_stride return 0 @@ -174,6 +195,7 @@ cdef inline int unpack_row( def make_native_byteorder(arr): """ Make sure all values have a native endian in the NumPy arrays. + Handles both 1D (scalar types) and 2D (VectorType) arrays. """ if is_little_endian and not arr.dtype.kind == 'O': # We have arrays in big-endian order. First swap the bytes diff --git a/tests/unit/test_numpy_parser.py b/tests/unit/test_numpy_parser.py new file mode 100644 index 0000000000..0c9cb2e0ff --- /dev/null +++ b/tests/unit/test_numpy_parser.py @@ -0,0 +1,305 @@ +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +import unittest +from unittest.mock import Mock + +try: + import numpy as np + from cassandra.numpy_parser import NumpyParser + from cassandra.bytesio import BytesIOReader + from cassandra.parsing import ParseDesc + from cassandra.deserializers import obj_array + HAVE_NUMPY = True +except ImportError: + HAVE_NUMPY = False + +from cassandra import cqltypes + + +@unittest.skipUnless(HAVE_NUMPY, "NumPy not available") +class TestNumpyParserVectorType(unittest.TestCase): + """Tests for VectorType support in NumpyParser""" + + def _create_vector_type(self, subtype, vector_size): + """Helper to create a VectorType class""" + return type( + f'VectorType({vector_size})', + (cqltypes.VectorType,), + {'vector_size': vector_size, 'subtype': subtype} + ) + + def _serialize_vectors(self, vectors, format_char): + """Serialize a list of vectors using struct.pack""" + buffer = bytearray() + # Write row count + buffer.extend(struct.pack('>i', len(vectors))) + # Write each vector + for vector in vectors: + # Write byte size of vector (doesn't include size prefix in CQL) + byte_size = len(vector) * struct.calcsize(f'>{format_char}') + buffer.extend(struct.pack('>i', byte_size)) + # Write vector elements + buffer.extend(struct.pack(f'>{len(vector)}{format_char}', *vector)) + return bytes(buffer) + + def test_vector_float_2d_array(self): + """Test that VectorType creates and populates a 2D NumPy array""" + vector_size = 4 + vector_type = self._create_vector_type(cqltypes.FloatType, vector_size) + + # Create test data: 3 rows of 4-dimensional float vectors + vectors = [ + [1.0, 2.0, 3.0, 4.0], + [5.0, 6.0, 7.0, 8.0], + [9.0, 10.0, 11.0, 12.0], + ] + + # Serialize the data + serialized = self._serialize_vectors(vectors, 'f') + + # Parse with NumpyParser + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['vec'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + # Verify result structure + self.assertIn('vec', result) + arr = result['vec'] + + # Verify it's a 2D array with correct shape + self.assertEqual(arr.ndim, 2) + self.assertEqual(arr.shape, (3, 4)) + + # Verify the data + expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" + vector_size = 3 + vector_type = self._create_vector_type(cqltypes.DoubleType, vector_size) + + # Create test data: 2 rows of 3-dimensional double vectors + vectors = [ + [1.5, 2.5, 3.5], + [4.5, 5.5, 6.5], + ] + + serialized = self._serialize_vectors(vectors, 'd') + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['embedding'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + arr = result['embedding'] + self.assertEqual(arr.shape, (2, 3)) + + expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" + vector_size = 128 + vector_type = self._create_vector_type(cqltypes.Int32Type, vector_size) + + # Create test data: 2 rows of 128-dimensional int vectors + vectors = [ + list(range(0, 128)), + list(range(128, 256)), + ] + + serialized = self._serialize_vectors(vectors, 'i') + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['features'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + arr = result['features'] + self.assertEqual(arr.shape, (2, 128)) + + expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" + vector_size = 5 + vector_type = self._create_vector_type(cqltypes.LongType, vector_size) + + vectors = [ + [100, 200, 300, 400, 500], + [600, 700, 800, 900, 1000], + ] + + serialized = self._serialize_vectors(vectors, 'q') + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['ids'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + arr = result['ids'] + self.assertEqual(arr.shape, (2, 5)) + + expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" + vector_size = 8 + vector_type = self._create_vector_type(cqltypes.ShortType, vector_size) + + vectors = [ + [1, 2, 3, 4, 5, 6, 7, 8], + [9, 10, 11, 12, 13, 14, 15, 16], + ] + + serialized = self._serialize_vectors(vectors, 'h') + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=['small_vec'], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + arr = result['small_vec'] + self.assertEqual(arr.shape, (2, 8)) + + expected = np.array(vectors, dtype='i', 2)) # row count + + # Row 1: id=1, vec=[1.0, 2.0, 3.0] + buffer.extend(struct.pack('>i', 4)) # int32 size + buffer.extend(struct.pack('>i', 1)) # id value + buffer.extend(struct.pack('>i', 12)) # vector size (3 floats) + buffer.extend(struct.pack('>3f', 1.0, 2.0, 3.0)) + + # Row 2: id=2, vec=[4.0, 5.0, 6.0] + buffer.extend(struct.pack('>i', 4)) + buffer.extend(struct.pack('>i', 2)) + buffer.extend(struct.pack('>i', 12)) + buffer.extend(struct.pack('>3f', 4.0, 5.0, 6.0)) + + parser = NumpyParser() + reader = BytesIOReader(bytes(buffer)) + + desc = ParseDesc( + colnames=['id', 'vec'], + coltypes=[cqltypes.Int32Type, vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None, None]), + protocol_version=5 + ) + + result = parser.parse_rows(reader, desc) + + # Verify id column (1D array) + self.assertEqual(result['id'].shape, (2,)) + np.testing.assert_array_equal(result['id'], np.array([1, 2], dtype=' Date: Fri, 20 Mar 2026 18:33:56 +0200 Subject: [PATCH 2/2] Address review feedback: harden numpy_parser and improve tests - Add buffer size guard before memcpy in unpack_row() to prevent overflow - Remove dead mask_true constant and unused uint8_t cimport - Fix copyright header (DataStax -> ScyllaDB) - Replace hard-coded little-endian dtypes with native numpy dtypes - Remove unused Mock import - Add test for NULL vector mask handling - Add test for unsupported subtype fallback to object array --- cassandra/numpy_parser.pyx | 8 +- tests/unit/test_numpy_parser.py | 271 +++++++++++++++++++------------- 2 files changed, 169 insertions(+), 110 deletions(-) diff --git a/cassandra/numpy_parser.pyx b/cassandra/numpy_parser.pyx index 83a82bb7b0..1740997032 100644 --- a/cassandra/numpy_parser.pyx +++ b/cassandra/numpy_parser.pyx @@ -25,7 +25,7 @@ as numpy is an optional dependency. include "ioutils.pyx" cimport cython -from libc.stdint cimport uint64_t, uint8_t +from libc.stdint cimport uint64_t from libc.string cimport memset from cpython.ref cimport Py_INCREF, PyObject @@ -74,8 +74,6 @@ _cqltype_to_numpy = { obj_dtype = np.dtype('O') -cdef uint8_t mask_true = 0x01 - cdef class NumpyParser(ColumnParser): """Decode a ResultMessage into a bunch of NumPy arrays""" @@ -181,6 +179,10 @@ cdef inline int unpack_row( Py_INCREF(val) ( arr.buf_ptr)[0] = val elif buf.size >= 0: + if buf.size > arr.stride: + raise ValueError( + "Column %d: received %d bytes but array stride is %d" % + (i, buf.size, arr.stride)) memcpy( arr.buf_ptr, buf.ptr, buf.size) else: memset(arr.mask_ptr, 1, arr.mask_stride) diff --git a/tests/unit/test_numpy_parser.py b/tests/unit/test_numpy_parser.py index 0c9cb2e0ff..4a1b0c3253 100644 --- a/tests/unit/test_numpy_parser.py +++ b/tests/unit/test_numpy_parser.py @@ -1,4 +1,4 @@ -# Copyright DataStax, Inc. +# Copyright ScyllaDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ import struct import unittest -from unittest.mock import Mock try: import numpy as np @@ -22,6 +21,7 @@ from cassandra.bytesio import BytesIOReader from cassandra.parsing import ParseDesc from cassandra.deserializers import obj_array + HAVE_NUMPY = True except ImportError: HAVE_NUMPY = False @@ -36,270 +36,327 @@ class TestNumpyParserVectorType(unittest.TestCase): def _create_vector_type(self, subtype, vector_size): """Helper to create a VectorType class""" return type( - f'VectorType({vector_size})', + f"VectorType({vector_size})", (cqltypes.VectorType,), - {'vector_size': vector_size, 'subtype': subtype} + {"vector_size": vector_size, "subtype": subtype}, ) def _serialize_vectors(self, vectors, format_char): """Serialize a list of vectors using struct.pack""" buffer = bytearray() # Write row count - buffer.extend(struct.pack('>i', len(vectors))) + buffer.extend(struct.pack(">i", len(vectors))) # Write each vector for vector in vectors: # Write byte size of vector (doesn't include size prefix in CQL) - byte_size = len(vector) * struct.calcsize(f'>{format_char}') - buffer.extend(struct.pack('>i', byte_size)) + byte_size = len(vector) * struct.calcsize(f">{format_char}") + buffer.extend(struct.pack(">i", byte_size)) # Write vector elements - buffer.extend(struct.pack(f'>{len(vector)}{format_char}', *vector)) + buffer.extend(struct.pack(f">{len(vector)}{format_char}", *vector)) return bytes(buffer) def test_vector_float_2d_array(self): """Test that VectorType creates and populates a 2D NumPy array""" vector_size = 4 vector_type = self._create_vector_type(cqltypes.FloatType, vector_size) - + # Create test data: 3 rows of 4-dimensional float vectors vectors = [ [1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0], [9.0, 10.0, 11.0, 12.0], ] - + # Serialize the data - serialized = self._serialize_vectors(vectors, 'f') - + serialized = self._serialize_vectors(vectors, "f") + # Parse with NumpyParser parser = NumpyParser() reader = BytesIOReader(serialized) - + desc = ParseDesc( - colnames=['vec'], + colnames=["vec"], coltypes=[vector_type], column_encryption_policy=None, coldescs=None, deserializers=obj_array([None]), - protocol_version=5 + protocol_version=5, ) - + result = parser.parse_rows(reader, desc) - + # Verify result structure - self.assertIn('vec', result) - arr = result['vec'] - + self.assertIn("vec", result) + arr = result["vec"] + # Verify it's a 2D array with correct shape self.assertEqual(arr.ndim, 2) self.assertEqual(arr.shape, (3, 4)) - + # Verify the data - expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" vector_size = 3 vector_type = self._create_vector_type(cqltypes.DoubleType, vector_size) - + # Create test data: 2 rows of 3-dimensional double vectors vectors = [ [1.5, 2.5, 3.5], [4.5, 5.5, 6.5], ] - - serialized = self._serialize_vectors(vectors, 'd') - + + serialized = self._serialize_vectors(vectors, "d") + parser = NumpyParser() reader = BytesIOReader(serialized) - + desc = ParseDesc( - colnames=['embedding'], + colnames=["embedding"], coltypes=[vector_type], column_encryption_policy=None, coldescs=None, deserializers=obj_array([None]), - protocol_version=5 + protocol_version=5, ) - + result = parser.parse_rows(reader, desc) - - arr = result['embedding'] + + arr = result["embedding"] self.assertEqual(arr.shape, (2, 3)) - - expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" vector_size = 128 vector_type = self._create_vector_type(cqltypes.Int32Type, vector_size) - + # Create test data: 2 rows of 128-dimensional int vectors vectors = [ list(range(0, 128)), list(range(128, 256)), ] - - serialized = self._serialize_vectors(vectors, 'i') - + + serialized = self._serialize_vectors(vectors, "i") + parser = NumpyParser() reader = BytesIOReader(serialized) - + desc = ParseDesc( - colnames=['features'], + colnames=["features"], coltypes=[vector_type], column_encryption_policy=None, coldescs=None, deserializers=obj_array([None]), - protocol_version=5 + protocol_version=5, ) - + result = parser.parse_rows(reader, desc) - - arr = result['features'] + + arr = result["features"] self.assertEqual(arr.shape, (2, 128)) - - expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" vector_size = 5 vector_type = self._create_vector_type(cqltypes.LongType, vector_size) - + vectors = [ [100, 200, 300, 400, 500], [600, 700, 800, 900, 1000], ] - - serialized = self._serialize_vectors(vectors, 'q') - + + serialized = self._serialize_vectors(vectors, "q") + parser = NumpyParser() reader = BytesIOReader(serialized) - + desc = ParseDesc( - colnames=['ids'], + colnames=["ids"], coltypes=[vector_type], column_encryption_policy=None, coldescs=None, deserializers=obj_array([None]), - protocol_version=5 + protocol_version=5, ) - + result = parser.parse_rows(reader, desc) - - arr = result['ids'] + + arr = result["ids"] self.assertEqual(arr.shape, (2, 5)) - - expected = np.array(vectors, dtype=' creates and populates a 2D NumPy array""" vector_size = 8 vector_type = self._create_vector_type(cqltypes.ShortType, vector_size) - + vectors = [ [1, 2, 3, 4, 5, 6, 7, 8], [9, 10, 11, 12, 13, 14, 15, 16], ] - - serialized = self._serialize_vectors(vectors, 'h') - + + serialized = self._serialize_vectors(vectors, "h") + parser = NumpyParser() reader = BytesIOReader(serialized) - + desc = ParseDesc( - colnames=['small_vec'], + colnames=["small_vec"], coltypes=[vector_type], column_encryption_policy=None, coldescs=None, deserializers=obj_array([None]), - protocol_version=5 + protocol_version=5, ) - + result = parser.parse_rows(reader, desc) - - arr = result['small_vec'] + + arr = result["small_vec"] self.assertEqual(arr.shape, (2, 8)) - - expected = np.array(vectors, dtype='i', 2)) # row count - + buffer.extend(struct.pack(">i", 2)) # row count + # Row 1: id=1, vec=[1.0, 2.0, 3.0] - buffer.extend(struct.pack('>i', 4)) # int32 size - buffer.extend(struct.pack('>i', 1)) # id value - buffer.extend(struct.pack('>i', 12)) # vector size (3 floats) - buffer.extend(struct.pack('>3f', 1.0, 2.0, 3.0)) - + buffer.extend(struct.pack(">i", 4)) # int32 size + buffer.extend(struct.pack(">i", 1)) # id value + buffer.extend(struct.pack(">i", 12)) # vector size (3 floats) + buffer.extend(struct.pack(">3f", 1.0, 2.0, 3.0)) + # Row 2: id=2, vec=[4.0, 5.0, 6.0] - buffer.extend(struct.pack('>i', 4)) - buffer.extend(struct.pack('>i', 2)) - buffer.extend(struct.pack('>i', 12)) - buffer.extend(struct.pack('>3f', 4.0, 5.0, 6.0)) - + buffer.extend(struct.pack(">i", 4)) + buffer.extend(struct.pack(">i", 2)) + buffer.extend(struct.pack(">i", 12)) + buffer.extend(struct.pack(">3f", 4.0, 5.0, 6.0)) + parser = NumpyParser() reader = BytesIOReader(bytes(buffer)) - + desc = ParseDesc( - colnames=['id', 'vec'], + colnames=["id", "vec"], coltypes=[cqltypes.Int32Type, vector_type], column_encryption_policy=None, coldescs=None, deserializers=obj_array([None, None]), - protocol_version=5 + protocol_version=5, ) - + result = parser.parse_rows(reader, desc) - + # Verify id column (1D array) - self.assertEqual(result['id'].shape, (2,)) - np.testing.assert_array_equal(result['id'], np.array([1, 2], dtype='i", 2)) # row count + + # Row 1: valid vector [1.0, 2.0, 3.0] + buffer.extend(struct.pack(">i", 12)) # byte size (3 floats * 4 bytes) + buffer.extend(struct.pack(">3f", 1.0, 2.0, 3.0)) + + # Row 2: NULL vector + buffer.extend(struct.pack(">i", -1)) # -1 signals NULL + + parser = NumpyParser() + reader = BytesIOReader(bytes(buffer)) + + desc = ParseDesc( + colnames=["vec"], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + arr = result["vec"] + self.assertEqual(arr.shape, (2, 3)) + + # First row should not be masked + self.assertFalse(arr.mask[0].any()) + np.testing.assert_array_almost_equal( + arr[0], np.array([1.0, 2.0, 3.0], dtype=np.float32) + ) + + # Second row should be fully masked (NULL) + self.assertTrue(arr.mask[1].all()) + + def test_unsupported_subtype_falls_back_to_object_array(self): + """Test that an unsupported vector subtype falls back to an object array""" + vector_size = 2 + vector_type = self._create_vector_type(cqltypes.UTF8Type, vector_size) + + # For an unsupported subtype, make_array should produce a 1D object + # array (not a 2D numeric array), and parsing goes through the + # deserializer/object path instead of the memcpy fast-path. + from cassandra.numpy_parser import make_array + + arr = make_array(vector_type, 5) + self.assertEqual(arr.ndim, 1) + self.assertEqual(arr.dtype, np.dtype("O")) + self.assertEqual(arr.shape, (5,)) + -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()