Skip to content

Commit

Permalink
Merge pull request #142 from dbussink/add-vector-type
Browse files Browse the repository at this point in the history
Add vector type added in MySQL 9.0
  • Loading branch information
blackbeam authored Aug 10, 2024
2 parents 146ee55 + 88edfdd commit 3d6bd8e
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 4 deletions.
3 changes: 3 additions & 0 deletions src/binlog/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,8 @@ pub enum OptionalMetadataFieldType {
ENUM_AND_SET_COLUMN_CHARSET,
/// A flag that indicates column visibility attribute.
COLUMN_VISIBILITY,
/// Vector column dimensionality.
VECTOR_DIMENSIONALITY,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
Expand Down Expand Up @@ -605,6 +607,7 @@ impl TryFrom<u8> for OptionalMetadataFieldType {
10 => Ok(Self::ENUM_AND_SET_DEFAULT_CHARSET),
11 => Ok(Self::ENUM_AND_SET_COLUMN_CHARSET),
12 => Ok(Self::COLUMN_VISIBILITY),
13 => Ok(Self::VECTOR_DIMENSIONALITY),
x => Err(UnknownOptionalMetadataFieldType(x)),
}
}
Expand Down
62 changes: 62 additions & 0 deletions src/binlog/events/table_map_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,6 +960,62 @@ impl MySerialize for GeometryTypes<'_> {
}
}

/// Contains a number of dimensions for every vector column.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct VectorDimensionalities<'a> {
dimensionalities: RawBytes<'a, EofBytes>,
}

impl<'a> VectorDimensionalities<'a> {
/// Returns an iterator over dimensionalities.
///
/// It will signal an error and stop iteration if field value is malformed.
pub fn iter_dimensionalities(&'a self) -> IterVectorDimensionalities<'a> {
IterVectorDimensionalities {
buf: ParseBuf(self.dimensionalities.as_bytes()),
}
}
}

pub struct IterVectorDimensionalities<'a> {
buf: ParseBuf<'a>,
}

impl<'a> Iterator for IterVectorDimensionalities<'a> {
type Item = io::Result<u64>;

fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
None
} else {
match self.buf.parse::<RawInt<LenEnc>>(()) {
Ok(x) => Some(Ok(x.0)),
Err(e) => {
self.buf = ParseBuf(b"");
Some(Err(e))
}
}
}
}
}

impl<'de> MyDeserialize<'de> for VectorDimensionalities<'de> {
const SIZE: Option<usize> = None;
type Ctx = ();

fn deserialize((): Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
Ok(Self {
dimensionalities: buf.parse(())?,
})
}
}

impl MySerialize for VectorDimensionalities<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.dimensionalities.serialize(buf);
}
}

/// Contains a sequence of PK column indexes where PK doesn't have a prefix.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SimplePrimaryKey<'a> {
Expand Down Expand Up @@ -1146,6 +1202,8 @@ pub enum OptionalMetadataField<'a> {
/// Flags indicating visibility for every numeric column.
&'a BitSlice<u8, Msb0>,
),
/// See [`OptionalMetadataFieldType::VECTOR_DIMENSIONALITY`].
Dimensionality(VectorDimensionalities<'a>),
}

/// Iterator over fields of an optional metadata.
Expand Down Expand Up @@ -1260,6 +1318,9 @@ impl<'a> Iterator for OptionalMetadataIter<'a> {
let flags = &flags[..num_columns];
Ok(OptionalMetadataField::ColumnVisibility(flags))
}
VECTOR_DIMENSIONALITY => {
Ok(OptionalMetadataField::Dimensionality(v.parse(())?))
}
},
Err(_) => Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down Expand Up @@ -1325,6 +1386,7 @@ impl<'a> OptionalMetaExtractor<'a> {
this.enum_and_set_column_charset = Some(x);
}
OptionalMetadataField::ColumnVisibility(_) => (),
OptionalMetadataField::Dimensionality(_) => (),
}
}

Expand Down
94 changes: 91 additions & 3 deletions src/binlog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ impl ColumnType {
| Self::MYSQL_TYPE_TIME2
| Self::MYSQL_TYPE_DATETIME2
| Self::MYSQL_TYPE_TIMESTAMP2
| Self::MYSQL_TYPE_JSON => ptr.get(..1).map(|x| (x, 1)),
| Self::MYSQL_TYPE_JSON
| Self::MYSQL_TYPE_VECTOR => ptr.get(..1).map(|x| (x, 1)),
Self::MYSQL_TYPE_VARCHAR => {
if is_array {
ptr.get(..3).map(|x| (x, 3))
Expand All @@ -302,7 +303,21 @@ impl ColumnType {
.ok()?
.get_metadata(ptr.get(1..)?, true)
.map(|(x, n)| (x, n + 1)),
_ => Some((&[], 0)),
Self::MYSQL_TYPE_DECIMAL
| Self::MYSQL_TYPE_TINY
| Self::MYSQL_TYPE_SHORT
| Self::MYSQL_TYPE_LONG
| Self::MYSQL_TYPE_NULL
| Self::MYSQL_TYPE_TIMESTAMP
| Self::MYSQL_TYPE_LONGLONG
| Self::MYSQL_TYPE_INT24
| Self::MYSQL_TYPE_DATE
| Self::MYSQL_TYPE_TIME
| Self::MYSQL_TYPE_DATETIME
| Self::MYSQL_TYPE_YEAR
| Self::MYSQL_TYPE_NEWDATE
| Self::MYSQL_TYPE_UNKNOWN
| Self::MYSQL_TYPE_VAR_STRING => Some((&[], 0)),
}
}
}
Expand All @@ -311,6 +326,7 @@ impl ColumnType {
mod tests {
use std::{
collections::HashMap,
convert::TryFrom,
io,
iter::{once, repeat},
};
Expand All @@ -322,10 +338,14 @@ mod tests {
};

use crate::{
binlog::{events::RowsEventData, value::BinlogValue},
binlog::{
events::{OptionalMetadataField, RowsEventData},
value::BinlogValue,
},
collations::CollationId,
constants::ColumnFlags,
proto::MySerialize,
row::convert::from_row,
value::Value,
};

Expand Down Expand Up @@ -719,10 +739,12 @@ mod tests {
let file_data = std::fs::read(dbg!(&file_path))?;
let mut binlog_file = BinlogFile::new(BinlogVersion::Version4, &file_data[..])?;

let mut i = 0;
let mut ev_pos = 4;
let mut table_map_events = HashMap::new();

while let Some(ev) = binlog_file.next() {
i += 1;
let ev = ev?;
let _ = dbg!(ev.header().event_type());
let ev_end = ev_pos + ev.header().event_size() as usize;
Expand Down Expand Up @@ -832,6 +854,72 @@ mod tests {
}
}

if file_path.file_name().unwrap() == "vector.binlog" {
let event_data = ev.read_data().unwrap();
match event_data {
Some(EventData::TableMapEvent(ev)) => {
let optional_meta = ev.iter_optional_meta();
match ev.table_name().as_ref() {
"foo" => {
for meta in optional_meta {
match meta.unwrap() {
OptionalMetadataField::Dimensionality(x) => assert_eq!(
x.iter_dimensionalities()
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![3],
),
_ => (),
}
}
}
"bar" => {
for meta in optional_meta {
match meta.unwrap() {
OptionalMetadataField::Dimensionality(x) => assert_eq!(
x.iter_dimensionalities()
.collect::<Result<Vec<_>, _>>()
.unwrap(),
vec![2, 4],
),
_ => (),
}
}
}
_ => (),
}
}
Some(EventData::RowsEvent(ev)) if i == 12 => {
let table_map_event =
binlog_file.reader().get_tme(ev.table_id()).unwrap();
let mut rows = ev.rows(table_map_event);

let (None, Some(after)) = rows.next().unwrap().unwrap() else {
panic!("Unexpected data");
};
let (id, vector_column): (u8, Vec<u8>) =
from_row(crate::Row::try_from(after).unwrap());
assert_eq!(id, 1);
assert_eq!(
vector_column,
vec![205, 204, 140, 63, 205, 204, 12, 64, 51, 51, 83, 64]
);

let (None, Some(after)) = rows.next().unwrap().unwrap() else {
panic!("Unexpected data");
};
let (id, vector_column): (u8, Vec<u8>) =
from_row(crate::Row::try_from(after).unwrap());
assert_eq!(id, 2);
assert_eq!(
vector_column,
vec![0, 0, 128, 63, 0, 0, 128, 191, 0, 0, 0, 0]
);
}
_ => (),
}
}

if file_path.file_name().unwrap() == "mysql-enum-string-set.000001" {
if let Some(EventData::RowsEvent(data)) = ev.read_data().unwrap() {
let table_map_event =
Expand Down
3 changes: 2 additions & 1 deletion src/binlog/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ impl<'de> MyDeserialize<'de> for BinlogValue<'de> {
| MYSQL_TYPE_MEDIUM_BLOB
| MYSQL_TYPE_LONG_BLOB
| MYSQL_TYPE_BLOB
| MYSQL_TYPE_GEOMETRY => {
| MYSQL_TYPE_GEOMETRY
| MYSQL_TYPE_VECTOR => {
let nbytes = match col_meta[0] {
1 => *buf.parse::<RawInt<u8>>(())? as usize,
2 => *buf.parse::<RawInt<LeU16>>(())? as usize,
Expand Down
6 changes: 6 additions & 0 deletions src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ pub enum ColumnType {
MYSQL_TYPE_DATETIME2,
MYSQL_TYPE_TIME2,
MYSQL_TYPE_TYPED_ARRAY, // Used for replication only
MYSQL_TYPE_VECTOR = 242,
MYSQL_TYPE_UNKNOWN = 243,
MYSQL_TYPE_JSON = 245,
MYSQL_TYPE_NEWDECIMAL = 246,
Expand Down Expand Up @@ -671,6 +672,10 @@ impl ColumnType {
pub fn is_geometry_type(&self) -> bool {
matches!(self, ColumnType::MYSQL_TYPE_GEOMETRY)
}

pub fn is_vector_type(&self) -> bool {
matches!(self, ColumnType::MYSQL_TYPE_VECTOR)
}
}

impl TryFrom<u8> for ColumnType {
Expand Down Expand Up @@ -698,6 +703,7 @@ impl TryFrom<u8> for ColumnType {
0x12_u8 => Ok(ColumnType::MYSQL_TYPE_DATETIME2),
0x13_u8 => Ok(ColumnType::MYSQL_TYPE_TIME2),
0x14_u8 => Ok(ColumnType::MYSQL_TYPE_TYPED_ARRAY),
0xf2_u8 => Ok(ColumnType::MYSQL_TYPE_VECTOR),
0xf3_u8 => Ok(ColumnType::MYSQL_TYPE_UNKNOWN),
0xf5_u8 => Ok(ColumnType::MYSQL_TYPE_JSON),
0xf6_u8 => Ok(ColumnType::MYSQL_TYPE_NEWDECIMAL),
Expand Down
3 changes: 3 additions & 0 deletions src/value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ impl Value {
| ColumnType::MYSQL_TYPE_BIT
| ColumnType::MYSQL_TYPE_NEWDECIMAL
| ColumnType::MYSQL_TYPE_GEOMETRY
| ColumnType::MYSQL_TYPE_VECTOR
| ColumnType::MYSQL_TYPE_JSON => Ok(Bytes(
buf.checked_eat_lenenc_str()
.ok_or_else(unexpected_buf_eof)?
Expand Down Expand Up @@ -576,6 +577,7 @@ mod test {
Value::Bytes(b"MYSQL_TYPE_STRING".to_vec()),
Value::NULL,
Value::Bytes(b"MYSQL_TYPE_GEOMETRY".to_vec()),
Value::Bytes(b"MYSQL_TYPE_VECTOR".to_vec()),
];

let (body, _) = ComStmtExecuteRequestBuilder::new(0).build(&*values);
Expand Down Expand Up @@ -627,6 +629,7 @@ mod test {
Value::Bytes(b"MYSQL_TYPE_STRING".to_vec()),
Value::NULL,
Value::Bytes(b"MYSQL_TYPE_GEOMETRY".to_vec()),
Value::Bytes(b"MYSQL_TYPE_VECTOR".to_vec()),
];

let (body, _) = ComStmtExecuteRequestBuilder::new(0).build(&*values);
Expand Down
Binary file added test-data/binlogs/vector.binlog
Binary file not shown.

0 comments on commit 3d6bd8e

Please sign in to comment.