Skip to content

Commit a018c7a

Browse files
committed
(improvement)Add VectorType support to numpy_parser for 2D array parsing
Extend NumpyParser to handle VectorType columns by creating 2D NumPy arrays (rows × vector_dimension) instead of object arrays. This enables zero-copy parsing for vector embeddings in ML/AI workloads. Features: - Detects VectorType via vector_size and subtype attributes - Creates 2D masked arrays for numeric vector subtypes (float, double, int32, int64, int16) - Falls back to object arrays for unsupported vector subtypes - Handles endianness conversion for both 1D and 2D arrays - Pre-allocates result arrays for efficiency Supported vector types: - Vector<float> → 2D float32 array - Vector<double> → 2D float64 array - Vector<int> → 2D int32 array - Vector<bigint> → 2D int64 array - Vector<smallint> → 2D int16 array Adds comprehensive test coverage for all supported vector types, mixed column queries, and large vector dimensions (384-element embeddings). Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
1 parent 1bab091 commit a018c7a

2 files changed

Lines changed: 324 additions & 2 deletions

File tree

cassandra/numpy_parser.pyx

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ def make_arrays(ParseDesc desc, array_size):
112112
(e.g. this can be fed into pandas.DataFrame)
113113
"""
114114
array_descs = np.empty((desc.rowsize,), arrDescDtype)
115-
arrays = []
115+
arrays = [None] * desc.rowsize
116116

117117
for i, coltype in enumerate(desc.coltypes):
118118
arr = make_array(coltype, array_size)
@@ -123,15 +123,31 @@ def make_arrays(ParseDesc desc, array_size):
123123
array_descs[i]['mask_ptr'] = arr.mask.ctypes.data
124124
except AttributeError:
125125
array_descs[i]['mask_ptr'] = 0
126-
arrays.append(arr)
126+
arrays[i] = arr
127127

128128
return array_descs, arrays
129129

130130

131131
def make_array(coltype, array_size):
132132
"""
133133
Allocate a new NumPy array of the given column type and size.
134+
For VectorType, creates a 2D array (array_size x vector_dimension).
134135
"""
136+
# Check if this is a VectorType
137+
if hasattr(coltype, 'vector_size') and hasattr(coltype, 'subtype'):
138+
# VectorType - create 2D array (rows x vector_dimension)
139+
vector_size = coltype.vector_size
140+
subtype = coltype.subtype
141+
try:
142+
dtype = _cqltype_to_numpy[subtype]
143+
a = np.ma.empty((array_size, vector_size), dtype=dtype)
144+
a.mask = np.zeros((array_size, vector_size), dtype=bool)
145+
except KeyError:
146+
# Unsupported vector subtype - fall back to object array
147+
a = np.empty((array_size,), dtype=obj_dtype)
148+
return a
149+
150+
# Scalar types
135151
try:
136152
a = np.ma.empty((array_size,), dtype=_cqltype_to_numpy[coltype])
137153
a.mask = np.zeros((array_size,), dtype=bool)
@@ -174,6 +190,7 @@ cdef inline int unpack_row(
174190
def make_native_byteorder(arr):
175191
"""
176192
Make sure all values have a native endian in the NumPy arrays.
193+
Handles both 1D (scalar types) and 2D (VectorType) arrays.
177194
"""
178195
if is_little_endian and not arr.dtype.kind == 'O':
179196
# We have arrays in big-endian order. First swap the bytes

tests/unit/test_numpy_parser.py

Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
# Copyright DataStax, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import struct
16+
import unittest
17+
from unittest.mock import Mock
18+
19+
try:
20+
import numpy as np
21+
from cassandra.numpy_parser import NumpyParser
22+
from cassandra.bytesio import BytesIOReader
23+
from cassandra.parsing import ParseDesc
24+
from cassandra.deserializers import obj_array
25+
HAVE_NUMPY = True
26+
except ImportError:
27+
HAVE_NUMPY = False
28+
29+
from cassandra import cqltypes
30+
31+
32+
@unittest.skipUnless(HAVE_NUMPY, "NumPy not available")
33+
class TestNumpyParserVectorType(unittest.TestCase):
34+
"""Tests for VectorType support in NumpyParser"""
35+
36+
def _create_vector_type(self, subtype, vector_size):
37+
"""Helper to create a VectorType class"""
38+
return type(
39+
f'VectorType({vector_size})',
40+
(cqltypes.VectorType,),
41+
{'vector_size': vector_size, 'subtype': subtype}
42+
)
43+
44+
def _serialize_vectors(self, vectors, format_char):
45+
"""Serialize a list of vectors using struct.pack"""
46+
buffer = bytearray()
47+
# Write row count
48+
buffer.extend(struct.pack('>i', len(vectors)))
49+
# Write each vector
50+
for vector in vectors:
51+
# Write byte size of vector (doesn't include size prefix in CQL)
52+
byte_size = len(vector) * struct.calcsize(f'>{format_char}')
53+
buffer.extend(struct.pack('>i', byte_size))
54+
# Write vector elements
55+
buffer.extend(struct.pack(f'>{len(vector)}{format_char}', *vector))
56+
return bytes(buffer)
57+
58+
def test_vector_float_2d_array(self):
59+
"""Test that VectorType<float> creates and populates a 2D NumPy array"""
60+
vector_size = 4
61+
vector_type = self._create_vector_type(cqltypes.FloatType, vector_size)
62+
63+
# Create test data: 3 rows of 4-dimensional float vectors
64+
vectors = [
65+
[1.0, 2.0, 3.0, 4.0],
66+
[5.0, 6.0, 7.0, 8.0],
67+
[9.0, 10.0, 11.0, 12.0],
68+
]
69+
70+
# Serialize the data
71+
serialized = self._serialize_vectors(vectors, 'f')
72+
73+
# Parse with NumpyParser
74+
parser = NumpyParser()
75+
reader = BytesIOReader(serialized)
76+
77+
desc = ParseDesc(
78+
colnames=['vec'],
79+
coltypes=[vector_type],
80+
column_encryption_policy=None,
81+
coldescs=None,
82+
deserializers=obj_array([None]),
83+
protocol_version=5
84+
)
85+
86+
result = parser.parse_rows(reader, desc)
87+
88+
# Verify result structure
89+
self.assertIn('vec', result)
90+
arr = result['vec']
91+
92+
# Verify it's a 2D array with correct shape
93+
self.assertEqual(arr.ndim, 2)
94+
self.assertEqual(arr.shape, (3, 4))
95+
96+
# Verify the data
97+
expected = np.array(vectors, dtype='<f4') # little-endian after conversion
98+
np.testing.assert_array_almost_equal(arr, expected)
99+
100+
def test_vector_double_2d_array(self):
101+
"""Test that VectorType<double> creates and populates a 2D NumPy array"""
102+
vector_size = 3
103+
vector_type = self._create_vector_type(cqltypes.DoubleType, vector_size)
104+
105+
# Create test data: 2 rows of 3-dimensional double vectors
106+
vectors = [
107+
[1.5, 2.5, 3.5],
108+
[4.5, 5.5, 6.5],
109+
]
110+
111+
serialized = self._serialize_vectors(vectors, 'd')
112+
113+
parser = NumpyParser()
114+
reader = BytesIOReader(serialized)
115+
116+
desc = ParseDesc(
117+
colnames=['embedding'],
118+
coltypes=[vector_type],
119+
column_encryption_policy=None,
120+
coldescs=None,
121+
deserializers=obj_array([None]),
122+
protocol_version=5
123+
)
124+
125+
result = parser.parse_rows(reader, desc)
126+
127+
arr = result['embedding']
128+
self.assertEqual(arr.shape, (2, 3))
129+
130+
expected = np.array(vectors, dtype='<f8')
131+
np.testing.assert_array_almost_equal(arr, expected)
132+
133+
def test_vector_int32_2d_array(self):
134+
"""Test that VectorType<int> creates and populates a 2D NumPy array"""
135+
vector_size = 128
136+
vector_type = self._create_vector_type(cqltypes.Int32Type, vector_size)
137+
138+
# Create test data: 2 rows of 128-dimensional int vectors
139+
vectors = [
140+
list(range(0, 128)),
141+
list(range(128, 256)),
142+
]
143+
144+
serialized = self._serialize_vectors(vectors, 'i')
145+
146+
parser = NumpyParser()
147+
reader = BytesIOReader(serialized)
148+
149+
desc = ParseDesc(
150+
colnames=['features'],
151+
coltypes=[vector_type],
152+
column_encryption_policy=None,
153+
coldescs=None,
154+
deserializers=obj_array([None]),
155+
protocol_version=5
156+
)
157+
158+
result = parser.parse_rows(reader, desc)
159+
160+
arr = result['features']
161+
self.assertEqual(arr.shape, (2, 128))
162+
163+
expected = np.array(vectors, dtype='<i4')
164+
np.testing.assert_array_equal(arr, expected)
165+
166+
def test_vector_int64_2d_array(self):
167+
"""Test that VectorType<bigint> creates and populates a 2D NumPy array"""
168+
vector_size = 5
169+
vector_type = self._create_vector_type(cqltypes.LongType, vector_size)
170+
171+
vectors = [
172+
[100, 200, 300, 400, 500],
173+
[600, 700, 800, 900, 1000],
174+
]
175+
176+
serialized = self._serialize_vectors(vectors, 'q')
177+
178+
parser = NumpyParser()
179+
reader = BytesIOReader(serialized)
180+
181+
desc = ParseDesc(
182+
colnames=['ids'],
183+
coltypes=[vector_type],
184+
column_encryption_policy=None,
185+
coldescs=None,
186+
deserializers=obj_array([None]),
187+
protocol_version=5
188+
)
189+
190+
result = parser.parse_rows(reader, desc)
191+
192+
arr = result['ids']
193+
self.assertEqual(arr.shape, (2, 5))
194+
195+
expected = np.array(vectors, dtype='<i8')
196+
np.testing.assert_array_equal(arr, expected)
197+
198+
def test_vector_int16_2d_array(self):
199+
"""Test that VectorType<smallint> creates and populates a 2D NumPy array"""
200+
vector_size = 8
201+
vector_type = self._create_vector_type(cqltypes.ShortType, vector_size)
202+
203+
vectors = [
204+
[1, 2, 3, 4, 5, 6, 7, 8],
205+
[9, 10, 11, 12, 13, 14, 15, 16],
206+
]
207+
208+
serialized = self._serialize_vectors(vectors, 'h')
209+
210+
parser = NumpyParser()
211+
reader = BytesIOReader(serialized)
212+
213+
desc = ParseDesc(
214+
colnames=['small_vec'],
215+
coltypes=[vector_type],
216+
column_encryption_policy=None,
217+
coldescs=None,
218+
deserializers=obj_array([None]),
219+
protocol_version=5
220+
)
221+
222+
result = parser.parse_rows(reader, desc)
223+
224+
arr = result['small_vec']
225+
self.assertEqual(arr.shape, (2, 8))
226+
227+
expected = np.array(vectors, dtype='<i2')
228+
np.testing.assert_array_equal(arr, expected)
229+
230+
def test_mixed_columns_with_vectors(self):
231+
"""Test parsing multiple columns including VectorType"""
232+
vector_type = self._create_vector_type(cqltypes.FloatType, 3)
233+
234+
# Serialize: int32 column, vector column
235+
buffer = bytearray()
236+
buffer.extend(struct.pack('>i', 2)) # row count
237+
238+
# Row 1: id=1, vec=[1.0, 2.0, 3.0]
239+
buffer.extend(struct.pack('>i', 4)) # int32 size
240+
buffer.extend(struct.pack('>i', 1)) # id value
241+
buffer.extend(struct.pack('>i', 12)) # vector size (3 floats)
242+
buffer.extend(struct.pack('>3f', 1.0, 2.0, 3.0))
243+
244+
# Row 2: id=2, vec=[4.0, 5.0, 6.0]
245+
buffer.extend(struct.pack('>i', 4))
246+
buffer.extend(struct.pack('>i', 2))
247+
buffer.extend(struct.pack('>i', 12))
248+
buffer.extend(struct.pack('>3f', 4.0, 5.0, 6.0))
249+
250+
parser = NumpyParser()
251+
reader = BytesIOReader(bytes(buffer))
252+
253+
desc = ParseDesc(
254+
colnames=['id', 'vec'],
255+
coltypes=[cqltypes.Int32Type, vector_type],
256+
column_encryption_policy=None,
257+
coldescs=None,
258+
deserializers=obj_array([None, None]),
259+
protocol_version=5
260+
)
261+
262+
result = parser.parse_rows(reader, desc)
263+
264+
# Verify id column (1D array)
265+
self.assertEqual(result['id'].shape, (2,))
266+
np.testing.assert_array_equal(result['id'], np.array([1, 2], dtype='<i4'))
267+
268+
# Verify vec column (2D array)
269+
self.assertEqual(result['vec'].shape, (2, 3))
270+
expected_vecs = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype='<f4')
271+
np.testing.assert_array_almost_equal(result['vec'], expected_vecs)
272+
273+
def test_large_vector_dimensions(self):
274+
"""Test VectorType with large dimensions (e.g., 384 for embeddings)"""
275+
vector_size = 384
276+
vector_type = self._create_vector_type(cqltypes.FloatType, vector_size)
277+
278+
# Create one row with a 384-dimensional vector
279+
vectors = [[float(i) for i in range(384)]]
280+
281+
serialized = self._serialize_vectors(vectors, 'f')
282+
283+
parser = NumpyParser()
284+
reader = BytesIOReader(serialized)
285+
286+
desc = ParseDesc(
287+
colnames=['embedding'],
288+
coltypes=[vector_type],
289+
column_encryption_policy=None,
290+
coldescs=None,
291+
deserializers=obj_array([None]),
292+
protocol_version=5
293+
)
294+
295+
result = parser.parse_rows(reader, desc)
296+
297+
arr = result['embedding']
298+
self.assertEqual(arr.shape, (1, 384))
299+
300+
expected = np.array(vectors, dtype='<f4')
301+
np.testing.assert_array_almost_equal(arr, expected)
302+
303+
304+
if __name__ == '__main__':
305+
unittest.main()

0 commit comments

Comments
 (0)