Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/query/expression/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ pub use self::bitmap::BitmapType;
pub use self::boolean::Bitmap;
pub use self::boolean::BooleanType;
pub use self::boolean::MutableBitmap;
pub use self::compute_view::StringConvert;
pub use self::date::DateType;
pub use self::decimal::*;
pub use self::empty_array::EmptyArrayType;
Expand Down
131 changes: 0 additions & 131 deletions src/query/expression/src/types/compute_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,10 @@ use std::ops::Range;
use databend_common_exception::Result;
use num_traits::AsPrimitive;

use super::column_type_error;
use super::domain_type_error;
use super::scalar_type_error;
use super::string::StringDomain;
use super::string::StringIterator;
use super::AccessType;
use super::AnyType;
use super::ArgType;
use super::Number;
use super::NumberType;
use super::SimpleDomain;
use super::StringColumn;
use super::StringType;
use crate::display::scalar_ref_to_string;
use crate::Column;
use crate::Domain;
use crate::ScalarRef;
Expand Down Expand Up @@ -177,124 +167,3 @@ where
SimpleDomain { min, max }
}
}

/// For number convert
pub type StringConvertView = ComputeView<StringConvert, AnyType, OwnedStringType>;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OwnedStringType;

impl AccessType for OwnedStringType {
type Scalar = String;
type ScalarRef<'a> = String;
type Column = StringColumn;
type Domain = StringDomain;
type ColumnIterator<'a> = std::iter::Map<StringIterator<'a>, fn(&str) -> String>;

fn to_owned_scalar(scalar: Self::ScalarRef<'_>) -> Self::Scalar {
scalar.to_string()
}

fn to_scalar_ref(scalar: &Self::Scalar) -> Self::ScalarRef<'_> {
scalar.clone()
}

fn try_downcast_scalar<'a>(scalar: &ScalarRef<'a>) -> Result<Self::ScalarRef<'a>> {
scalar
.as_string()
.map(|s| s.to_string())
.ok_or_else(|| scalar_type_error::<Self>(scalar))
}

fn try_downcast_column(col: &Column) -> Result<Self::Column> {
col.as_string()
.cloned()
.ok_or_else(|| column_type_error::<Self>(col))
}

fn try_downcast_domain(domain: &Domain) -> Result<Self::Domain> {
domain
.as_string()
.cloned()
.ok_or_else(|| domain_type_error::<Self>(domain))
}

fn column_len(col: &Self::Column) -> usize {
col.len()
}

fn index_column(col: &Self::Column, index: usize) -> Option<Self::ScalarRef<'_>> {
col.index(index).map(str::to_string)
}

#[inline]
unsafe fn index_column_unchecked(col: &Self::Column, index: usize) -> Self::ScalarRef<'_> {
col.value_unchecked(index).to_string()
}

fn slice_column(col: &Self::Column, range: Range<usize>) -> Self::Column {
col.clone().sliced(range.start, range.end - range.start)
}

fn iter_column(col: &Self::Column) -> Self::ColumnIterator<'_> {
col.iter().map(str::to_string)
}

fn scalar_memory_size(scalar: &Self::ScalarRef<'_>) -> usize {
scalar.len()
}

fn column_memory_size(col: &Self::Column) -> usize {
col.memory_size()
}

#[inline(always)]
fn compare(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> Ordering {
left.cmp(&right)
}

#[inline(always)]
fn equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool {
left == right
}

#[inline(always)]
fn not_equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool {
left != right
}

#[inline(always)]
fn greater_than(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool {
left > right
}

#[inline(always)]
fn greater_than_equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool {
left >= right
}

#[inline(always)]
fn less_than(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool {
left < right
}

#[inline(always)]
fn less_than_equal(left: Self::ScalarRef<'_>, right: Self::ScalarRef<'_>) -> bool {
left <= right
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StringConvert;

impl Compute<AnyType, OwnedStringType> for StringConvert {
fn compute<'a>(
value: <AnyType as AccessType>::ScalarRef<'a>,
) -> <OwnedStringType as AccessType>::ScalarRef<'a> {
scalar_ref_to_string(&value)
}

fn compute_domain(_: &<AnyType as AccessType>::Domain) -> StringDomain {
StringType::full_domain()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use itertools::Itertools;
use super::batch_merge1;
use super::batch_serialize1;
use super::AggregateFunctionSortDesc;
use super::SerializeInfo;
use super::StateSerde;

#[derive(Debug, Clone)]
Expand All @@ -52,7 +53,7 @@ pub struct SortAggState {
}

impl StateSerde for SortAggState {
fn serialize_type(_: Option<&dyn super::FunctionData>) -> Vec<StateSerdeItem> {
fn serialize_type(_: Option<&dyn SerializeInfo>) -> Vec<StateSerdeItem> {
vec![StateSerdeItem::Binary(None)]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::AggregateFunction;
use super::AggregateFunctionDescription;
use super::AggregateFunctionSortDesc;
use super::AggregateUnaryFunction;
use super::FunctionData;
use super::SerializeInfo;
use super::StateSerde;
use super::UnaryState;

Expand All @@ -48,11 +48,7 @@ where
T: ValueType,
T::Scalar: Hash,
{
fn add(
&mut self,
other: T::ScalarRef<'_>,
_function_data: Option<&dyn FunctionData>,
) -> Result<()> {
fn add(&mut self, other: T::ScalarRef<'_>, _: &Self::FunctionInfo) -> Result<()> {
self.add_object(&T::to_owned_scalar(other));
Ok(())
}
Expand All @@ -65,15 +61,15 @@ where
fn merge_result(
&mut self,
mut builder: BuilderMut<'_, UInt64Type>,
_function_data: Option<&dyn FunctionData>,
_: &Self::FunctionInfo,
) -> Result<()> {
builder.push(self.count() as u64);
Ok(())
}
}

impl<const HLL_P: usize> StateSerde for AggregateApproxCountDistinctState<HLL_P> {
fn serialize_type(_function_data: Option<&dyn FunctionData>) -> Vec<StateSerdeItem> {
fn serialize_type(_: Option<&dyn SerializeInfo>) -> Vec<StateSerdeItem> {
vec![StateSerdeItem::Binary(None)]
}

Expand Down Expand Up @@ -144,39 +140,39 @@ fn create_templated<const P: usize>(
let return_type = DataType::Number(NumberDataType::UInt64);
with_number_mapped_type!(|NUM_TYPE| match &arguments[0] {
DataType::Number(NumberDataType::NUM_TYPE) => {
AggregateUnaryFunction::<HyperLogLog<P>, NumberType<NUM_TYPE>, UInt64Type>::create(
AggregateUnaryFunction::<HyperLogLog<P>, NumberType<NUM_TYPE>, UInt64Type>::new(
display_name,
return_type,
)
.with_need_drop(true)
.finish()
}
DataType::String => {
AggregateUnaryFunction::<HyperLogLog<P>, StringType, UInt64Type>::create(
AggregateUnaryFunction::<HyperLogLog<P>, StringType, UInt64Type>::new(
display_name,
return_type,
)
.with_need_drop(true)
.finish()
}
DataType::Date => {
AggregateUnaryFunction::<HyperLogLog<P>, DateType, UInt64Type>::create(
AggregateUnaryFunction::<HyperLogLog<P>, DateType, UInt64Type>::new(
display_name,
return_type,
)
.with_need_drop(true)
.finish()
}
DataType::Timestamp => {
AggregateUnaryFunction::<HyperLogLog<P>, TimestampType, UInt64Type>::create(
AggregateUnaryFunction::<HyperLogLog<P>, TimestampType, UInt64Type>::new(
display_name,
return_type,
)
.with_need_drop(true)
.finish()
}
_ => {
AggregateUnaryFunction::<HyperLogLog<P>, AnyType, UInt64Type>::create(
AggregateUnaryFunction::<HyperLogLog<P>, AnyType, UInt64Type>::new(
display_name,
return_type,
)
Expand Down
16 changes: 8 additions & 8 deletions src/query/functions/src/aggregates/aggregate_array_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use super::AggrStateLoc;
use super::AggregateFunction;
use super::AggregateFunctionDescription;
use super::AggregateFunctionSortDesc;
use super::FunctionData;
use super::SerializeInfo;
use super::StateAddr;
use super::StateSerde;

Expand Down Expand Up @@ -132,8 +132,8 @@ where
Self: ScalarStateFunc<T>,
T: ValueType,
{
fn serialize_type(function_data: Option<&dyn FunctionData>) -> Vec<StateSerdeItem> {
let return_type = function_data
fn serialize_type(info: Option<&dyn SerializeInfo>) -> Vec<StateSerdeItem> {
let return_type = info
.and_then(|data| data.as_any().downcast_ref::<DataType>())
.cloned()
.unwrap();
Expand Down Expand Up @@ -234,8 +234,8 @@ where T: SimpleType + Debug
impl<T> StateSerde for ArrayAggStateSimple<T>
where T: SimpleType
{
fn serialize_type(function_data: Option<&dyn FunctionData>) -> Vec<StateSerdeItem> {
let data_type = function_data
fn serialize_type(info: Option<&dyn SerializeInfo>) -> Vec<StateSerdeItem> {
let data_type = info
.and_then(|data| data.as_any().downcast_ref::<DataType>())
.and_then(|ty| ty.as_array())
.unwrap()
Expand Down Expand Up @@ -336,7 +336,7 @@ where V: ZeroSizeType
}

impl<const IS_NULL: bool> StateSerde for ArrayAggStateZST<IS_NULL> {
fn serialize_type(_function_data: Option<&dyn super::FunctionData>) -> Vec<StateSerdeItem> {
fn serialize_type(_: Option<&dyn SerializeInfo>) -> Vec<StateSerdeItem> {
vec![ArrayType::<BooleanType>::data_type().into()]
}

Expand Down Expand Up @@ -464,8 +464,8 @@ where T: ArgType + Debug + std::marker::Send
impl<T> StateSerde for ArrayAggStateBinary<T>
where T: ArgType + std::marker::Send
{
fn serialize_type(function_data: Option<&dyn FunctionData>) -> Vec<StateSerdeItem> {
let data_type = function_data
fn serialize_type(info: Option<&dyn SerializeInfo>) -> Vec<StateSerdeItem> {
let data_type = info
.and_then(|data| data.as_any().downcast_ref::<DataType>())
.and_then(|ty| ty.as_array())
.unwrap()
Expand Down
5 changes: 3 additions & 2 deletions src/query/functions/src/aggregates/aggregate_array_moving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use super::AggregateFunction;
use super::AggregateFunctionDescription;
use super::AggregateFunctionRef;
use super::AggregateFunctionSortDesc;
use super::SerializeInfo;
use super::StateAddr;
use super::StateSerde;

Expand Down Expand Up @@ -221,7 +222,7 @@ where
Self: SumState,
T: Number,
{
fn serialize_type(_function_data: Option<&dyn super::FunctionData>) -> Vec<StateSerdeItem> {
fn serialize_type(_: Option<&dyn SerializeInfo>) -> Vec<StateSerdeItem> {
vec![ArrayType::<NumberType<T>>::data_type().into()]
}

Expand Down Expand Up @@ -452,7 +453,7 @@ where
Self: SumState,
T: Decimal,
{
fn serialize_type(_function_data: Option<&dyn super::FunctionData>) -> Vec<StateSerdeItem> {
fn serialize_type(_: Option<&dyn SerializeInfo>) -> Vec<StateSerdeItem> {
vec![DataType::Array(Box::new(DataType::Decimal(T::default_decimal_size()))).into()]
}

Expand Down
Loading
Loading