diff --git a/cassandra/numpy_parser.pyx b/cassandra/numpy_parser.pyx index 0ad34f66e2..1740997032 100644 --- a/cassandra/numpy_parser.pyx +++ b/cassandra/numpy_parser.pyx @@ -25,7 +25,8 @@ as numpy is an optional dependency. include "ioutils.pyx" cimport cython -from libc.stdint cimport uint64_t, uint8_t +from libc.stdint cimport uint64_t +from libc.string cimport memset from cpython.ref cimport Py_INCREF, PyObject from cassandra.bytesio cimport BytesIOReader @@ -52,12 +53,14 @@ ctypedef struct ArrDesc: int stride # should be large enough as we allocate contiguous arrays int is_object Py_uintptr_t mask_ptr + int mask_stride arrDescDtype = np.dtype( [ ('buf_ptr', np.uintp) , ('stride', np.dtype('i')) , ('is_object', np.dtype('i')) , ('mask_ptr', np.uintp) + , ('mask_stride', np.dtype('i')) ], align=True) _cqltype_to_numpy = { @@ -71,8 +74,6 @@ _cqltype_to_numpy = { obj_dtype = np.dtype('O') -cdef uint8_t mask_true = 0x01 - cdef class NumpyParser(ColumnParser): """Decode a ResultMessage into a bunch of NumPy arrays""" @@ -112,7 +113,7 @@ def make_arrays(ParseDesc desc, array_size): (e.g. this can be fed into pandas.DataFrame) """ array_descs = np.empty((desc.rowsize,), arrDescDtype) - arrays = [] + arrays = [None] * desc.rowsize for i, coltype in enumerate(desc.coltypes): arr = make_array(coltype, array_size) @@ -121,9 +122,11 @@ def make_arrays(ParseDesc desc, array_size): array_descs[i]['is_object'] = arr.dtype is obj_dtype try: array_descs[i]['mask_ptr'] = arr.mask.ctypes.data + array_descs[i]['mask_stride'] = arr.mask.strides[0] except AttributeError: array_descs[i]['mask_ptr'] = 0 - arrays.append(arr) + array_descs[i]['mask_stride'] = 1 + arrays[i] = arr return array_descs, arrays @@ -131,7 +134,23 @@ def make_arrays(ParseDesc desc, array_size): def make_array(coltype, array_size): """ Allocate a new NumPy array of the given column type and size. + For VectorType, creates a 2D array (array_size x vector_dimension). """ + # Check if this is a VectorType + if issubclass(coltype, cqltypes.VectorType): + # VectorType - create 2D array (rows x vector_dimension) + vector_size = coltype.vector_size + subtype = coltype.subtype + try: + dtype = _cqltype_to_numpy[subtype] + a = np.ma.empty((array_size, vector_size), dtype=dtype) + a.mask = np.zeros((array_size, vector_size), dtype=bool) + except KeyError: + # Unsupported vector subtype - fall back to object array + a = np.empty((array_size,), dtype=obj_dtype) + return a + + # Scalar types try: a = np.ma.empty((array_size,), dtype=_cqltype_to_numpy[coltype]) a.mask = np.zeros((array_size,), dtype=bool) @@ -160,13 +179,17 @@ cdef inline int unpack_row( Py_INCREF(val) ( arr.buf_ptr)[0] = val elif buf.size >= 0: + if buf.size > arr.stride: + raise ValueError( + "Column %d: received %d bytes but array stride is %d" % + (i, buf.size, arr.stride)) memcpy( arr.buf_ptr, buf.ptr, buf.size) else: - memcpy(arr.mask_ptr, &mask_true, 1) + memset(arr.mask_ptr, 1, arr.mask_stride) # Update the pointer into the array for the next time arrays[i].buf_ptr += arr.stride - arrays[i].mask_ptr += 1 + arrays[i].mask_ptr += arr.mask_stride return 0 @@ -174,6 +197,7 @@ cdef inline int unpack_row( def make_native_byteorder(arr): """ Make sure all values have a native endian in the NumPy arrays. + Handles both 1D (scalar types) and 2D (VectorType) arrays. """ if is_little_endian and not arr.dtype.kind == 'O': # We have arrays in big-endian order. First swap the bytes diff --git a/tests/unit/test_numpy_parser.py b/tests/unit/test_numpy_parser.py new file mode 100644 index 0000000000..4a1b0c3253 --- /dev/null +++ b/tests/unit/test_numpy_parser.py @@ -0,0 +1,362 @@ +# Copyright ScyllaDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import struct +import unittest + +try: + import numpy as np + from cassandra.numpy_parser import NumpyParser + from cassandra.bytesio import BytesIOReader + from cassandra.parsing import ParseDesc + from cassandra.deserializers import obj_array + + HAVE_NUMPY = True +except ImportError: + HAVE_NUMPY = False + +from cassandra import cqltypes + + +@unittest.skipUnless(HAVE_NUMPY, "NumPy not available") +class TestNumpyParserVectorType(unittest.TestCase): + """Tests for VectorType support in NumpyParser""" + + def _create_vector_type(self, subtype, vector_size): + """Helper to create a VectorType class""" + return type( + f"VectorType({vector_size})", + (cqltypes.VectorType,), + {"vector_size": vector_size, "subtype": subtype}, + ) + + def _serialize_vectors(self, vectors, format_char): + """Serialize a list of vectors using struct.pack""" + buffer = bytearray() + # Write row count + buffer.extend(struct.pack(">i", len(vectors))) + # Write each vector + for vector in vectors: + # Write byte size of vector (doesn't include size prefix in CQL) + byte_size = len(vector) * struct.calcsize(f">{format_char}") + buffer.extend(struct.pack(">i", byte_size)) + # Write vector elements + buffer.extend(struct.pack(f">{len(vector)}{format_char}", *vector)) + return bytes(buffer) + + def test_vector_float_2d_array(self): + """Test that VectorType creates and populates a 2D NumPy array""" + vector_size = 4 + vector_type = self._create_vector_type(cqltypes.FloatType, vector_size) + + # Create test data: 3 rows of 4-dimensional float vectors + vectors = [ + [1.0, 2.0, 3.0, 4.0], + [5.0, 6.0, 7.0, 8.0], + [9.0, 10.0, 11.0, 12.0], + ] + + # Serialize the data + serialized = self._serialize_vectors(vectors, "f") + + # Parse with NumpyParser + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=["vec"], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + # Verify result structure + self.assertIn("vec", result) + arr = result["vec"] + + # Verify it's a 2D array with correct shape + self.assertEqual(arr.ndim, 2) + self.assertEqual(arr.shape, (3, 4)) + + # Verify the data + expected = np.array(vectors, dtype=np.float32) + np.testing.assert_array_almost_equal(arr, expected) + + def test_vector_double_2d_array(self): + """Test that VectorType creates and populates a 2D NumPy array""" + vector_size = 3 + vector_type = self._create_vector_type(cqltypes.DoubleType, vector_size) + + # Create test data: 2 rows of 3-dimensional double vectors + vectors = [ + [1.5, 2.5, 3.5], + [4.5, 5.5, 6.5], + ] + + serialized = self._serialize_vectors(vectors, "d") + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=["embedding"], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + arr = result["embedding"] + self.assertEqual(arr.shape, (2, 3)) + + expected = np.array(vectors, dtype=np.float64) + np.testing.assert_array_almost_equal(arr, expected) + + def test_vector_int32_2d_array(self): + """Test that VectorType creates and populates a 2D NumPy array""" + vector_size = 128 + vector_type = self._create_vector_type(cqltypes.Int32Type, vector_size) + + # Create test data: 2 rows of 128-dimensional int vectors + vectors = [ + list(range(0, 128)), + list(range(128, 256)), + ] + + serialized = self._serialize_vectors(vectors, "i") + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=["features"], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + arr = result["features"] + self.assertEqual(arr.shape, (2, 128)) + + expected = np.array(vectors, dtype=np.int32) + np.testing.assert_array_equal(arr, expected) + + def test_vector_int64_2d_array(self): + """Test that VectorType creates and populates a 2D NumPy array""" + vector_size = 5 + vector_type = self._create_vector_type(cqltypes.LongType, vector_size) + + vectors = [ + [100, 200, 300, 400, 500], + [600, 700, 800, 900, 1000], + ] + + serialized = self._serialize_vectors(vectors, "q") + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=["ids"], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + arr = result["ids"] + self.assertEqual(arr.shape, (2, 5)) + + expected = np.array(vectors, dtype=np.int64) + np.testing.assert_array_equal(arr, expected) + + def test_vector_int16_2d_array(self): + """Test that VectorType creates and populates a 2D NumPy array""" + vector_size = 8 + vector_type = self._create_vector_type(cqltypes.ShortType, vector_size) + + vectors = [ + [1, 2, 3, 4, 5, 6, 7, 8], + [9, 10, 11, 12, 13, 14, 15, 16], + ] + + serialized = self._serialize_vectors(vectors, "h") + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=["small_vec"], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + arr = result["small_vec"] + self.assertEqual(arr.shape, (2, 8)) + + expected = np.array(vectors, dtype=np.int16) + np.testing.assert_array_equal(arr, expected) + + def test_mixed_columns_with_vectors(self): + """Test parsing multiple columns including VectorType""" + vector_type = self._create_vector_type(cqltypes.FloatType, 3) + + # Serialize: int32 column, vector column + buffer = bytearray() + buffer.extend(struct.pack(">i", 2)) # row count + + # Row 1: id=1, vec=[1.0, 2.0, 3.0] + buffer.extend(struct.pack(">i", 4)) # int32 size + buffer.extend(struct.pack(">i", 1)) # id value + buffer.extend(struct.pack(">i", 12)) # vector size (3 floats) + buffer.extend(struct.pack(">3f", 1.0, 2.0, 3.0)) + + # Row 2: id=2, vec=[4.0, 5.0, 6.0] + buffer.extend(struct.pack(">i", 4)) + buffer.extend(struct.pack(">i", 2)) + buffer.extend(struct.pack(">i", 12)) + buffer.extend(struct.pack(">3f", 4.0, 5.0, 6.0)) + + parser = NumpyParser() + reader = BytesIOReader(bytes(buffer)) + + desc = ParseDesc( + colnames=["id", "vec"], + coltypes=[cqltypes.Int32Type, vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None, None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + # Verify id column (1D array) + self.assertEqual(result["id"].shape, (2,)) + np.testing.assert_array_equal(result["id"], np.array([1, 2], dtype=np.int32)) + + # Verify vec column (2D array) + self.assertEqual(result["vec"].shape, (2, 3)) + expected_vecs = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype=np.float32) + np.testing.assert_array_almost_equal(result["vec"], expected_vecs) + + def test_large_vector_dimensions(self): + """Test VectorType with large dimensions (e.g., 384 for embeddings)""" + vector_size = 384 + vector_type = self._create_vector_type(cqltypes.FloatType, vector_size) + + # Create one row with a 384-dimensional vector + vectors = [[float(i) for i in range(384)]] + + serialized = self._serialize_vectors(vectors, "f") + + parser = NumpyParser() + reader = BytesIOReader(serialized) + + desc = ParseDesc( + colnames=["embedding"], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + arr = result["embedding"] + self.assertEqual(arr.shape, (1, 384)) + + expected = np.array(vectors, dtype=np.float32) + np.testing.assert_array_almost_equal(arr, expected) + + def test_null_vector_sets_mask(self): + """Test that a NULL vector (size = -1) sets the mask correctly""" + vector_size = 3 + vector_type = self._create_vector_type(cqltypes.FloatType, vector_size) + + # Serialize: 2 rows, first is a valid vector, second is NULL (size = -1) + buffer = bytearray() + buffer.extend(struct.pack(">i", 2)) # row count + + # Row 1: valid vector [1.0, 2.0, 3.0] + buffer.extend(struct.pack(">i", 12)) # byte size (3 floats * 4 bytes) + buffer.extend(struct.pack(">3f", 1.0, 2.0, 3.0)) + + # Row 2: NULL vector + buffer.extend(struct.pack(">i", -1)) # -1 signals NULL + + parser = NumpyParser() + reader = BytesIOReader(bytes(buffer)) + + desc = ParseDesc( + colnames=["vec"], + coltypes=[vector_type], + column_encryption_policy=None, + coldescs=None, + deserializers=obj_array([None]), + protocol_version=5, + ) + + result = parser.parse_rows(reader, desc) + + arr = result["vec"] + self.assertEqual(arr.shape, (2, 3)) + + # First row should not be masked + self.assertFalse(arr.mask[0].any()) + np.testing.assert_array_almost_equal( + arr[0], np.array([1.0, 2.0, 3.0], dtype=np.float32) + ) + + # Second row should be fully masked (NULL) + self.assertTrue(arr.mask[1].all()) + + def test_unsupported_subtype_falls_back_to_object_array(self): + """Test that an unsupported vector subtype falls back to an object array""" + vector_size = 2 + vector_type = self._create_vector_type(cqltypes.UTF8Type, vector_size) + + # For an unsupported subtype, make_array should produce a 1D object + # array (not a 2D numeric array), and parsing goes through the + # deserializer/object path instead of the memcpy fast-path. + from cassandra.numpy_parser import make_array + + arr = make_array(vector_type, 5) + self.assertEqual(arr.ndim, 1) + self.assertEqual(arr.dtype, np.dtype("O")) + self.assertEqual(arr.shape, (5,)) + + +if __name__ == "__main__": + unittest.main()