diff --git a/e2e_test/sink/sink_into_table/specify_column.slt b/e2e_test/sink/sink_into_table/specify_column.slt new file mode 100644 index 0000000000000..2eefd8abf8fb0 --- /dev/null +++ b/e2e_test/sink/sink_into_table/specify_column.slt @@ -0,0 +1,54 @@ +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +create table s (a int, b int, c int) append only; + +statement ok +create table t (a int, b int default 900, c int default 9000); + +statement error +create sink ss into t(aaa) as select a from s with(type = 'append-only'); + +statement error +create sink ss into t(a) as select a, b from s with(type = 'append-only'); + +statement error +create sink ss into t(a, b) as select b from s with(type = 'append-only'); + +statement error +create sink ss into t(a, b, c, a) as select a, b from s with(type = 'append-only'); + +statement ok +create sink s1 into t(a,B,c) as select c, b, a from s with(type = 'append-only'); + +statement ok +create sink s2 into t(a,B) as select 2*c, 2*b from s with(type = 'append-only'); + +statement ok +create sink s3 into t(c) as select 3*a from s with(type = 'append-only'); + +statement ok +insert into s values(10, 100, 1000); + +query III rowsort +select * from t order by a; +---- +1000 100 10 +2000 200 9000 +NULL 900 30 + +statement ok +drop sink s1; + +statement ok +drop sink s2; + +statement ok +drop sink s3; + +statement ok +drop table s; + +statement ok +drop table t; diff --git a/src/frontend/src/catalog/table_catalog.rs b/src/frontend/src/catalog/table_catalog.rs index f5f95861d29cd..9b02402571d0f 100644 --- a/src/frontend/src/catalog/table_catalog.rs +++ b/src/frontend/src/catalog/table_catalog.rs @@ -450,6 +450,20 @@ impl TableCatalog { .map(|(i, _)| i) } + pub fn default_column_expr(&self, col_idx: usize) -> ExprImpl { + if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { expr, .. })) = self + .columns[col_idx] + .column_desc + .generated_or_default_column + .as_ref() + { + ExprImpl::from_expr_proto(expr.as_ref().unwrap()) + .expect("expr in default columns corrupted") + } else { + ExprImpl::literal_null(self.columns[col_idx].data_type().clone()) + } + } + pub fn default_columns(&self) -> impl Iterator + '_ { self.columns.iter().enumerate().filter_map(|(i, c)| { if let Some(GeneratedOrDefaultColumn::DefaultColumn(DefaultColumnDesc { diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 94ff4eebbc66e..ceaa92e06b71a 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -37,21 +37,24 @@ use crate::scheduler::streaming_manager::CreatingStreamingJobInfo; use crate::session::SessionImpl; use crate::stream_fragmenter::build_graph; +pub(super) fn parse_column_names(columns: &[Ident]) -> Option> { + if columns.is_empty() { + None + } else { + Some(columns.iter().map(|v| v.real_value()).collect()) + } +} + +/// If columns is empty, it means that the user did not specify the column names. +/// In this case, we extract the column names from the query. +/// If columns is not empty, it means that user specify the column names and the user +/// should guarantee that the column names number are consistent with the query. pub(super) fn get_column_names( bound: &BoundQuery, session: &SessionImpl, columns: Vec, ) -> Result>> { - // If columns is empty, it means that the user did not specify the column names. - // In this case, we extract the column names from the query. - // If columns is not empty, it means that user specify the column names and the user - // should guarantee that the column names number are consistent with the query. - let col_names: Option> = if columns.is_empty() { - None - } else { - Some(columns.iter().map(|v| v.real_value()).collect()) - }; - + let col_names = parse_column_names(&columns); if let BoundSetExpr::Select(select) = &bound.body { // `InputRef`'s alias will be implicitly assigned in `bind_project`. // If user provide columns name (col_names.is_some()), we don't need alias. diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index bed409de178f1..cf0bd0968d079 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::rc::Rc; use std::sync::{Arc, LazyLock}; @@ -22,9 +22,8 @@ use either::Either; use itertools::Itertools; use maplit::{convert_args, hashmap}; use pgwire::pg_response::{PgResponse, StatementType}; -use risingwave_common::catalog::{ConnectionId, DatabaseId, SchemaId, TableId, UserId}; -use risingwave_common::types::{DataType, Datum}; -use risingwave_common::util::value_encoding::DatumFromProtoExt; +use risingwave_common::catalog::{ConnectionId, DatabaseId, Schema, SchemaId, TableId, UserId}; +use risingwave_common::types::DataType; use risingwave_common::{bail, catalog}; use risingwave_connector::sink::catalog::{SinkCatalog, SinkFormatDesc, SinkType}; use risingwave_connector::sink::iceberg::{IcebergConfig, ICEBERG_SINK}; @@ -33,7 +32,6 @@ use risingwave_connector::sink::{ }; use risingwave_pb::catalog::{PbSource, Table}; use risingwave_pb::ddl_service::ReplaceTablePlan; -use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatcherType, MergeNode, StreamFragmentGraph, StreamNode}; @@ -50,8 +48,9 @@ use crate::binder::Binder; use crate::catalog::catalog_service::CatalogReadGuard; use crate::catalog::source_catalog::SourceCatalog; use crate::error::{ErrorCode, Result, RwError}; -use crate::expr::{ExprImpl, InputRef, Literal}; +use crate::expr::{ExprImpl, InputRef}; use crate::handler::alter_table_column::fetch_table_catalog_for_alter; +use crate::handler::create_mv::parse_column_names; use crate::handler::create_table::{generate_stream_graph_for_table, ColumnIdGenerator}; use crate::handler::privilege::resolve_query_privileges; use crate::handler::util::SourceSchemaCompatExt; @@ -80,6 +79,7 @@ pub fn gen_sink_plan( stmt: CreateSinkStatement, partition_info: Option, ) -> Result { + let user_specified_columns = !stmt.columns.is_empty(); let db_name = session.database(); let (sink_schema_name, sink_table_name) = Binder::resolve_schema_qualified_name(db_name, stmt.sink_name.clone())?; @@ -118,8 +118,12 @@ pub fn gen_sink_plan( let check_items = resolve_query_privileges(&bound); session.check_privileges(&check_items)?; - // If column names not specified, use the name in materialized view. - let col_names = get_column_names(&bound, session, stmt.columns)?; + let col_names = if sink_into_table_name.is_some() { + parse_column_names(&stmt.columns) + } else { + // If column names not specified, use the name in the bound query, which is equal with the plan root's original field name. + get_column_names(&bound, session, stmt.columns)? + }; let mut with_options = context.with_options().clone(); @@ -172,8 +176,8 @@ pub fn gen_sink_plan( }; let mut plan_root = Planner::new(context).plan_query(bound)?; - if let Some(col_names) = col_names { - plan_root.set_out_names(col_names)?; + if let Some(col_names) = &col_names { + plan_root.set_out_names(col_names.clone())?; }; let without_backfill = match with_options.remove(SINK_WITHOUT_BACKFILL) { @@ -196,6 +200,25 @@ pub fn gen_sink_plan( .map(|table_name| fetch_table_catalog_for_alter(session, table_name)) .transpose()?; + if let Some(target_table_catalog) = &target_table_catalog { + if let Some(col_names) = col_names { + let target_table_columns = target_table_catalog + .columns() + .iter() + .map(|c| c.name()) + .collect::>(); + for c in col_names { + if !target_table_columns.contains(c.as_str()) { + return Err(RwError::from(ErrorCode::BindError(format!( + "Column {} not found in table {}", + c, + target_table_catalog.name() + )))); + } + } + } + } + let target_table = target_table_catalog.as_ref().map(|catalog| catalog.id()); let sink_plan = plan_root.gen_sink_plan( @@ -251,7 +274,12 @@ pub fn gen_sink_plan( ))); } - let exprs = derive_default_column_project_for_sink(&sink_catalog, table_catalog)?; + let exprs = derive_default_column_project_for_sink( + &sink_catalog, + sink_plan.schema(), + table_catalog, + user_specified_columns, + )?; let logical_project = generic::Project::new(exprs, sink_plan); @@ -632,66 +660,78 @@ pub(crate) fn insert_merger_to_union(node: &mut StreamNode) { insert_merger_to_union(input); } } + +fn derive_sink_to_table_expr( + sink_schema: &Schema, + idx: usize, + target_type: &DataType, +) -> Result { + let input_type = &sink_schema.fields()[idx].data_type; + + if target_type != input_type { + bail!( + "column type mismatch: {:?} vs {:?}", + target_type, + input_type + ); + } else { + Ok(ExprImpl::InputRef(Box::new(InputRef::new( + idx, + input_type.clone(), + )))) + } +} + fn derive_default_column_project_for_sink( sink: &SinkCatalog, + sink_schema: &Schema, target_table_catalog: &Arc, + user_specified_columns: bool, ) -> Result> { + assert_eq!(sink.full_schema().len(), sink_schema.len()); + let mut exprs = vec![]; - let sink_visible_columns = sink + let sink_visible_col_idxes = sink .full_columns() .iter() - .enumerate() - .filter(|(_i, c)| !c.is_hidden()) + .positions(|c| !c.is_hidden()) .collect_vec(); + let sink_visible_col_idxes_by_name = sink + .full_columns() + .iter() + .enumerate() + .filter(|(_, c)| !c.is_hidden()) + .map(|(i, c)| (c.name(), i)) + .collect::>(); for (idx, table_column) in target_table_catalog.columns().iter().enumerate() { if table_column.is_generated() { continue; } - let data_type = table_column.data_type(); - - if idx < sink_visible_columns.len() { - let (sink_col_idx, sink_column) = sink_visible_columns[idx]; - - let sink_col_type = sink_column.data_type(); + let default_col_expr = || -> ExprImpl { target_table_catalog.default_column_expr(idx) }; + let sink_col_expr = |sink_col_idx: usize| -> Result { + derive_sink_to_table_expr(sink_schema, sink_col_idx, table_column.data_type()) + }; - if data_type != sink_col_type { - bail!( - "column type mismatch: {:?} vs {:?}", - data_type, - sink_col_type - ); + // If users specified the columns to be inserted e.g. `CREATE SINK s INTO t(a, b)`, the expressions of `Project` will be generated accordingly. + // The missing columns will be filled with default value (`null` if not explicitly defined). + // Otherwise, e.g. `CREATE SINK s INTO t`, the columns will be matched by their order in `select` query and the target table. + #[allow(clippy::collapsible_else_if)] + if user_specified_columns { + if let Some(idx) = sink_visible_col_idxes_by_name.get(table_column.name()) { + exprs.push(sink_col_expr(*idx)?); } else { - exprs.push(ExprImpl::InputRef(Box::new(InputRef::new( - sink_col_idx, - data_type.clone(), - )))); + exprs.push(default_col_expr()); } } else { - let data = match table_column - .column_desc - .generated_or_default_column - .as_ref() - { - // default column with default value - Some(GeneratedOrDefaultColumn::DefaultColumn(default_column)) => { - Datum::from_protobuf(default_column.get_snapshot_value().unwrap(), data_type) - .unwrap() - } - // default column with no default value - None => None, - - // generated column is unreachable - _ => unreachable!(), + if idx < sink_visible_col_idxes.len() { + exprs.push(sink_col_expr(sink_visible_col_idxes[idx])?); + } else { + exprs.push(default_col_expr()); }; - - exprs.push(ExprImpl::Literal(Box::new(Literal::new( - data, - data_type.clone(), - )))); - }; + } } Ok(exprs) } diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index e0bd1eb225e07..9d9646ac3a2ce 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -179,6 +179,7 @@ impl StreamSink { .into_stream() .expect("input should be stream plan") .clone_with_new_plan_id(); + Self { base, input, diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 3f437d6d60347..70f0859e95c20 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -476,7 +476,6 @@ impl fmt::Display for CreateSink { } } } - // sql_grammar!(CreateSinkStatement { // if_not_exists => [Keyword::IF, Keyword::NOT, Keyword::EXISTS], // sink_name: Ident, @@ -521,7 +520,7 @@ impl ParseTo for CreateSinkStatement { p.expected("FROM or AS after CREATE SINK sink_name", p.peek_token())? }; - let emit_mode = p.parse_emit_mode()?; + let emit_mode: Option = p.parse_emit_mode()?; // This check cannot be put into the `WithProperties::parse_to`, since other // statements may not need the with properties.