diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 4ab78caf1c9c5..9c2a53defef34 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -291,7 +291,7 @@ build_exceptions! { ConstraintError(1133), } -// Sequence Errors [1124-1126, 3101] +// Sequence Errors [1124-1126, 3101-3102] build_exceptions! { /// Out of sequence range OutofSequenceRange(1124), @@ -301,6 +301,8 @@ build_exceptions! { UnknownSequence(1126), /// Sequence error SequenceError(3101), + /// AutoIncrement error + AutoIncrementError(3102), } // Virtual Column Errors [1128-1129] diff --git a/src/meta/api/src/auto_increment_api.rs b/src/meta/api/src/auto_increment_api.rs new file mode 100644 index 0000000000000..b21aee61dd2b7 --- /dev/null +++ b/src/meta/api/src/auto_increment_api.rs @@ -0,0 +1,27 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; + +use crate::errors::AutoIncrementError; +use crate::meta_txn_error::MetaTxnError; + +#[async_trait::async_trait] +pub trait AutoIncrementApi: Send + Sync { + async fn get_auto_increment_next_value( + &self, + req: GetAutoIncrementNextValueReq, + ) -> Result, MetaTxnError>; +} diff --git a/src/meta/api/src/auto_increment_api_test_suite.rs b/src/meta/api/src/auto_increment_api_test_suite.rs new file mode 100644 index 0000000000000..841e8d387a4c3 --- /dev/null +++ b/src/meta/api/src/auto_increment_api_test_suite.rs @@ -0,0 +1,258 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::Utc; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::AutoIncrementExpr; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchema; +use databend_common_meta_app::principal::AutoIncrementKey; +use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; +use databend_common_meta_app::schema::AutoIncrementStorageIdent; +use databend_common_meta_app::schema::CreateDatabaseReq; +use databend_common_meta_app::schema::CreateOption; +use databend_common_meta_app::schema::CreateTableReq; +use databend_common_meta_app::schema::DatabaseMeta; +use databend_common_meta_app::schema::GcDroppedTableReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; +use databend_common_meta_app::schema::ListDroppedTableReq; +use databend_common_meta_app::schema::TableMeta; +use databend_common_meta_app::schema::TableNameIdent; +use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_kvapi::kvapi::KvApiExt; +use databend_common_meta_types::MetaError; +use fastrace::func_name; +use log::info; + +use crate::kv_pb_api::KVPbApi; +use crate::AutoIncrementApi; +use crate::DatabaseApi; +use crate::DatamaskApi; +use crate::GarbageCollectionApi; +use crate::RowAccessPolicyApi; +use crate::SchemaApi; +use crate::TableApi; + +/// Test suite of `AutoIncrementApi`. +/// +/// It is not used by this crate, but is used by other crate that impl `AutoIncrementApi`, +/// to ensure an impl works as expected, +/// such as `meta/embedded` and `metasrv`. +#[derive(Copy, Clone)] +pub struct AutoIncrementApiTestSuite {} + +impl AutoIncrementApiTestSuite { + /// Test AutoIncrementApi on a single node + pub async fn test_single_node(b: B) -> anyhow::Result<()> + where + B: kvapi::ApiBuilder, + MT: kvapi::KVApi + + SchemaApi + + DatamaskApi + + AutoIncrementApi + + RowAccessPolicyApi + + 'static, + { + let suite = AutoIncrementApiTestSuite {}; + suite.table_commit_table_meta(&b.build().await).await?; + Ok(()) + } + + #[fastrace::trace] + async fn table_commit_table_meta>( + &self, + mt: &MT, + ) -> anyhow::Result<()> { + let tenant_name = "table_commit_table_meta_tenant"; + let db_name = "db1"; + let tbl_name = "tb2"; + + info!("--- prepare db"); + let mut util = Util::new(mt, tenant_name, db_name, ""); + util.create_db().await?; + + let schema = || { + Arc::new(TableSchema::new(vec![TableField::new( + "number", + TableDataType::Number(NumberDataType::UInt64), + ) + .with_auto_increment_expr(Some(AutoIncrementExpr { + column_id: 0, + start: 0, + step: 1, + is_ordered: true, + }))])) + }; + let options = || maplit::btreemap! {"opt‐1".into() => "val-1".into()}; + + let drop_table_meta = |created_on| TableMeta { + schema: schema(), + engine: "JSON".to_string(), + options: options(), + created_on, + drop_on: Some(created_on), + ..TableMeta::default() + }; + let created_on = Utc::now(); + + // verify the auto increment will be vacuum + { + // use a new tenant and db do test + let db_name = "orphan_db"; + let tenant_name = "orphan_tenant"; + let mut orphan_util = Util::new(mt, tenant_name, db_name, ""); + orphan_util.create_db().await?; + let tenant = orphan_util.tenant(); + + let create_table_req = CreateTableReq { + create_option: CreateOption::CreateOrReplace, + catalog_name: Some("default".to_string()), + name_ident: TableNameIdent { + tenant: Tenant::new_or_err(tenant_name, func_name!())?, + db_name: db_name.to_string(), + table_name: tbl_name.to_string(), + }, + table_meta: drop_table_meta(created_on), + as_dropped: true, + table_properties: None, + table_partition: None, + }; + + let create_table_as_dropped_resp = mt.create_table(create_table_req.clone()).await?; + + let auto_increment_key = + AutoIncrementKey::new(create_table_as_dropped_resp.table_id, 0); + let auto_increment_sequence_storage = + AutoIncrementStorageIdent::new_generic(&tenant, auto_increment_key.clone()); + + // assert auto increment exists + let seqv = mt + .get_kv(&auto_increment_sequence_storage.to_string_key()) + .await?; + assert!(seqv.is_some() && seqv.unwrap().seq != 0); + + // auto increment next val + let expr = AutoIncrementExpr { + column_id: 0, + start: 0, + step: 1, + is_ordered: true, + }; + let next_val_req = GetAutoIncrementNextValueReq { + tenant: tenant.clone(), + expr, + key: auto_increment_key, + count: 1, + }; + mt.get_auto_increment_next_value(next_val_req) + .await? + .unwrap(); + + // assert auto increment current is 1 after next val + let seqv = mt.get_pb(&auto_increment_sequence_storage).await?; + assert!(seqv.as_ref().unwrap().seq != 0); + assert_eq!(seqv.as_ref().unwrap().data.inner().0, 1); + + // assert auto increment exists + let seqv = mt + .get_kv(&auto_increment_sequence_storage.to_string_key()) + .await?; + assert!(seqv.is_some() && seqv.unwrap().seq != 0); + + // vacuum drop table + let req = ListDroppedTableReq::new(&tenant); + let resp = mt.get_drop_table_infos(req).await?; + assert!(!resp.drop_ids.is_empty()); + + let req = GcDroppedTableReq { + tenant: Tenant::new_or_err(tenant_name, func_name!())?, + catalog: "default".to_string(), + drop_ids: resp.drop_ids.clone(), + }; + mt.gc_drop_tables(req).await?; + + // assert auto increment has been vacuum + let seqv = mt + .get_kv(&auto_increment_sequence_storage.to_string_key()) + .await?; + assert!(seqv.is_none()); + } + + Ok(()) + } +} + +struct Util<'a, MT> +// where MT: AutoIncrementApi +where MT: kvapi::KVApi + AutoIncrementApi +{ + tenant: Tenant, + db_name: String, + engine: String, + db_id: u64, + mt: &'a MT, +} + +impl<'a, MT> Util<'a, MT> +where MT: AutoIncrementApi + kvapi::KVApi +{ + fn new( + mt: &'a MT, + tenant: impl ToString, + db_name: impl ToString, + engine: impl ToString, + ) -> Self { + Self { + tenant: Tenant::new_or_err(tenant, func_name!()).unwrap(), + db_name: db_name.to_string(), + engine: engine.to_string(), + db_id: 0, + mt, + } + } + + fn tenant(&self) -> Tenant { + self.tenant.clone() + } + + fn engine(&self) -> String { + self.engine.clone() + } + + fn db_name(&self) -> String { + self.db_name.clone() + } + + async fn create_db(&mut self) -> anyhow::Result<()> { + let plan = CreateDatabaseReq { + create_option: CreateOption::Create, + catalog_name: None, + name_ident: DatabaseNameIdent::new(self.tenant(), self.db_name()), + meta: DatabaseMeta { + engine: self.engine(), + ..DatabaseMeta::default() + }, + }; + + let res = self.mt.create_database(plan).await?; + self.db_id = *res.db_id; + + Ok(()) + } +} diff --git a/src/meta/api/src/auto_increment_impl.rs b/src/meta/api/src/auto_increment_impl.rs new file mode 100644 index 0000000000000..95f280c882315 --- /dev/null +++ b/src/meta/api/src/auto_increment_impl.rs @@ -0,0 +1,55 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; +use databend_common_meta_kvapi::kvapi; +use databend_common_meta_types::MetaError; +use fastrace::func_name; +use log::debug; + +use crate::auto_increment_api::AutoIncrementApi; +use crate::auto_increment_nextval_impl::NextVal; +use crate::errors::AutoIncrementError; +use crate::meta_txn_error::MetaTxnError; + +#[async_trait::async_trait] +#[tonic::async_trait] +impl + ?Sized> AutoIncrementApi for KV { + async fn get_auto_increment_next_value( + &self, + req: GetAutoIncrementNextValueReq, + ) -> Result, MetaTxnError> { + debug!(req :? =(&req); "AutoIncrementApi: {}", func_name!()); + + let next_val = NextVal { + kv_api: self, + key: req.key.clone(), + expr: req.expr.clone(), + }; + + let (start, end) = match next_val.next_val(&req.tenant, req.count).await? { + Ok(resp) => (resp.before, resp.after), + Err(err) => { + return Ok(Err(err)); + } + }; + + Ok(Ok(GetAutoIncrementNextValueReply { + start, + step: req.expr.step, + end, + })) + } +} diff --git a/src/meta/api/src/auto_increment_nextval_impl.rs b/src/meta/api/src/auto_increment_nextval_impl.rs new file mode 100644 index 0000000000000..4d61e92d31a16 --- /dev/null +++ b/src/meta/api/src/auto_increment_nextval_impl.rs @@ -0,0 +1,87 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression::AutoIncrementExpr; +use databend_common_meta_app::principal::AutoIncrementKey; +use databend_common_meta_app::schema::AutoIncrementStorageIdent; +use databend_common_meta_app::tenant::ToTenant; +use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::anyerror::func_name; +use databend_common_meta_types::protobuf::FetchAddU64Response; +use databend_common_meta_types::MetaError; +use databend_common_meta_types::TxnOp; +use databend_common_meta_types::TxnRequest; +use log::debug; + +use crate::errors::AutoIncrementError; +use crate::meta_txn_error::MetaTxnError; +use crate::send_txn; + +/// The implementation of `next_val` for sequence number. +pub(crate) struct NextVal<'a, KV> +where KV: kvapi::KVApi + ?Sized +{ + pub(crate) kv_api: &'a KV, + pub(crate) key: AutoIncrementKey, + pub(crate) expr: AutoIncrementExpr, +} + +impl<'a, KV> NextVal<'a, KV> +where KV: kvapi::KVApi + ?Sized +{ + /// AutoIncrement number stores the value in standalone key that support `FetchAddU64`. + pub(crate) async fn next_val( + self, + tenant: impl ToTenant, + count: u64, + ) -> Result, MetaTxnError> { + debug!("{}", func_name!()); + + // Key for the sequence number value. + let storage_ident = AutoIncrementStorageIdent::new_generic(tenant, self.key.clone()); + let storage_key = storage_ident.to_string_key(); + + let delta = count * self.expr.step as u64; + + let txn = TxnRequest::new(vec![], vec![TxnOp::fetch_add_u64( + &storage_key, + delta as i64, + )]); + + let (succ, responses) = send_txn(self.kv_api, txn).await?; + + debug!( + "{} txn result: succ: {succ}, ident: {}, update seq by {delta}", + func_name!(), + self.key, + ); + debug_assert!(succ); + + let resp = responses[0].try_as_fetch_add_u64().unwrap(); + let got_delta = resp.delta(); + + if got_delta < delta { + return Ok(Err(AutoIncrementError::OutOfAutoIncrementRange { + key: self.key, + context: format!( + "{}: count: {count}; expected delta: {delta}, but got: {resp}", + self.expr.step, + ), + })); + } + + Ok(Ok(resp.clone())) + } +} diff --git a/src/meta/api/src/errors.rs b/src/meta/api/src/errors.rs index b260e8e1c1efc..6fab53337bf28 100644 --- a/src/meta/api/src/errors.rs +++ b/src/meta/api/src/errors.rs @@ -13,6 +13,7 @@ // limitations under the License. use databend_common_exception::ErrorCode; +use databend_common_meta_app::principal::AutoIncrementKey; /// Table logic error, unrelated to the backend service providing Table management, or dependent component. #[derive(Clone, Debug, thiserror::Error)] @@ -37,3 +38,21 @@ impl From for ErrorCode { } } } + +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +pub enum AutoIncrementError { + #[error("OutOfAutoIncrementRange: `{key}` while `{context}`")] + OutOfAutoIncrementRange { + key: AutoIncrementKey, + context: String, + }, +} + +impl From for ErrorCode { + fn from(value: AutoIncrementError) -> Self { + let s = value.to_string(); + match value { + AutoIncrementError::OutOfAutoIncrementRange { .. } => ErrorCode::AutoIncrementError(s), + } + } +} diff --git a/src/meta/api/src/garbage_collection_api.rs b/src/meta/api/src/garbage_collection_api.rs index c46b244780a91..ce640e2b32196 100644 --- a/src/meta/api/src/garbage_collection_api.rs +++ b/src/meta/api/src/garbage_collection_api.rs @@ -21,11 +21,13 @@ use databend_common_base::vec_ext::VecExt; use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::app_error::CleanDbIdTableNamesFailed; use databend_common_meta_app::app_error::MarkDatabaseMetaAsGCInProgressFailed; +use databend_common_meta_app::principal::AutoIncrementKey; use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::principal::TenantOwnershipObjectIdent; use databend_common_meta_app::schema::index_id_ident::IndexIdIdent; use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent; use databend_common_meta_app::schema::table_niv::TableNIV; +use databend_common_meta_app::schema::AutoIncrementStorageIdent; use databend_common_meta_app::schema::DBIdTableName; use databend_common_meta_app::schema::DatabaseId; use databend_common_meta_app::schema::DatabaseIdHistoryIdent; @@ -681,6 +683,20 @@ async fn remove_data_for_dropped_table( txn_delete_exact(txn, &id_to_name, seq_name.seq); } + // Remove table auto increment sequences + { + // clear the sequence associated with auto increment in the table field + let auto_increment_key = AutoIncrementKey::new(table_id.table_id, 0); + let dir_name = DirName::new(AutoIncrementStorageIdent::new_generic( + tenant, + auto_increment_key, + )); + let mut auto_increments = kv_api.list_pb_keys(&dir_name).await?; + + while let Some(auto_increment_ident) = auto_increments.try_next().await? { + txn.if_then.push(txn_op_del(&auto_increment_ident)); + } + } // Remove table ownership { let table_ownership = OwnershipObject::Table { diff --git a/src/meta/api/src/lib.rs b/src/meta/api/src/lib.rs index 233678ef11a45..b1a5c810354fe 100644 --- a/src/meta/api/src/lib.rs +++ b/src/meta/api/src/lib.rs @@ -51,6 +51,10 @@ pub mod txn_op_builder_util; pub mod txn_retry_util; pub mod util; +mod auto_increment_api; +mod auto_increment_api_test_suite; +mod auto_increment_impl; +pub(crate) mod auto_increment_nextval_impl; pub mod crud; mod errors; mod row_access_policy_api; @@ -58,6 +62,8 @@ mod row_access_policy_api_impl; mod sequence_api_impl; pub(crate) mod sequence_nextval_impl; +pub use auto_increment_api::AutoIncrementApi; +pub use auto_increment_api_test_suite::AutoIncrementApiTestSuite; pub use catalog_api::CatalogApi; pub use data_mask_api::DatamaskApi; // Re-export from new data_retention_util module for backward compatibility diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index c0df8cf4ff25e..081b7c9b7545e 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -5701,11 +5701,12 @@ impl SchemaApiTestSuite { mt.drop_sequence(req).await?; + let sequence_ident = SequenceIdent::new(&tenant, sequence_name); let req = SequenceIdent::new(&tenant, sequence_name); let resp = mt.get_sequence(&req).await?; assert!(resp.is_none()); - let storage_ident = SequenceStorageIdent::new_from(req); + let storage_ident = SequenceStorageIdent::new_from(sequence_ident); let got = mt.get_kv(&storage_ident.to_string_key()).await?; assert!( got.is_none(), diff --git a/src/meta/api/src/table_api.rs b/src/meta/api/src/table_api.rs index 7064a2a6f862d..25cabaf845f7a 100644 --- a/src/meta/api/src/table_api.rs +++ b/src/meta/api/src/table_api.rs @@ -45,9 +45,13 @@ use databend_common_meta_app::app_error::ViewAlreadyExists; use databend_common_meta_app::app_error::VirtualColumnIdOutBound; use databend_common_meta_app::app_error::VirtualColumnTooMany; use databend_common_meta_app::id_generator::IdGenerator; +use databend_common_meta_app::primitive::Id; +use databend_common_meta_app::principal::AutoIncrementKey; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent; use databend_common_meta_app::schema::table_niv::TableNIV; +use databend_common_meta_app::schema::AutoIncrementStorageIdent; +use databend_common_meta_app::schema::AutoIncrementStorageValue; use databend_common_meta_app::schema::CommitTableMetaReply; use databend_common_meta_app::schema::CommitTableMetaReq; use databend_common_meta_app::schema::CreateOption; @@ -142,6 +146,7 @@ use crate::txn_op_builder_util::txn_op_put_pb; use crate::txn_op_del; use crate::txn_op_get; use crate::txn_op_put; +use crate::txn_put_pb; use crate::util::IdempotentKVTxnResponse; use crate::util::IdempotentKVTxnSender; use crate::DEFAULT_MGET_SIZE; @@ -444,6 +449,21 @@ where .push(txn_op_put(&key_dbid_tbname, serialize_u64(table_id)?)) } + for table_field in req.table_meta.schema.fields() { + let Some(auto_increment_expr) = table_field.auto_increment_expr() else { + continue; + }; + + let auto_increment_key = + AutoIncrementKey::new(table_id, table_field.column_id()); + let storage_ident = + AutoIncrementStorageIdent::new_generic(req.tenant(), auto_increment_key); + let storage_value = + Id::new_typed(AutoIncrementStorageValue(auto_increment_expr.start)); + txn.if_then + .extend(vec![txn_put_pb(&storage_ident, &storage_value)?]); + } + let (succ, responses) = send_txn(self, txn).await?; debug!( diff --git a/src/meta/app/src/principal/auto_increment.rs b/src/meta/app/src/principal/auto_increment.rs new file mode 100644 index 0000000000000..6a50de57a976c --- /dev/null +++ b/src/meta/app/src/principal/auto_increment.rs @@ -0,0 +1,65 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; + +use databend_common_expression::ColumnId; +use databend_common_meta_types::MetaId; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] +pub struct AutoIncrementKey { + pub table_id: MetaId, + pub column_id: ColumnId, +} + +impl AutoIncrementKey { + pub fn new(table_id: MetaId, column_id: ColumnId) -> Self { + Self { + table_id, + column_id, + } + } +} + +impl Display for AutoIncrementKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "AutoIncrement({}/{})", self.table_id, self.column_id) + } +} + +mod kvapi_key_impl { + use databend_common_meta_kvapi::kvapi; + use databend_common_meta_kvapi::kvapi::KeyBuilder; + use databend_common_meta_kvapi::kvapi::KeyError; + use databend_common_meta_kvapi::kvapi::KeyParser; + + use crate::principal::auto_increment::AutoIncrementKey; + + impl kvapi::KeyCodec for AutoIncrementKey { + fn encode_key(&self, b: KeyBuilder) -> KeyBuilder { + b.push_u64(self.table_id).push_u64(self.column_id as u64) + } + + fn decode_key(parser: &mut KeyParser) -> Result + where Self: Sized { + let table_id = parser.next_u64()?; + let column_id = parser.next_u64()?; + + Ok(Self { + table_id, + column_id: column_id as u32, + }) + } + } +} diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs index 08a2cef1a58dc..6d6c4c5083c0a 100644 --- a/src/meta/app/src/principal/mod.rs +++ b/src/meta/app/src/principal/mod.rs @@ -37,6 +37,7 @@ mod user_stage; mod ownership_object; +mod auto_increment; pub mod client_session; pub mod client_session_ident; pub mod connection_ident; @@ -59,6 +60,7 @@ pub mod user_stage_ident; pub mod user_token; pub mod user_token_ident; +pub use auto_increment::AutoIncrementKey; pub use connection::*; pub use file_format::*; pub use network_policy::NetworkPolicy; diff --git a/src/meta/app/src/schema/auto_increment.rs b/src/meta/app/src/schema/auto_increment.rs new file mode 100644 index 0000000000000..b08ae1c1171e6 --- /dev/null +++ b/src/meta/app/src/schema/auto_increment.rs @@ -0,0 +1,42 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression::AutoIncrementExpr; + +use crate::principal::AutoIncrementKey; +use crate::tenant::Tenant; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct GetAutoIncrementNextValueReq { + pub tenant: Tenant, + // Information describing the AutoIncrement + pub expr: AutoIncrementExpr, + // AutoIncrement unique key information, including table id and column id + pub key: AutoIncrementKey, + // get the number of steps at one time + pub count: u64, +} + +/// The collection of auto increment value in range `[start, end)`, e.g.: +/// `start + i * step` where `start + i + step < end`, +/// or `[start + 0 * step, start + 1 * step, ...)`. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct GetAutoIncrementNextValueReply { + /// The first value in the auto increment, inclusive. + pub start: u64, + // step has no use until now + pub step: i64, + /// The right bound, exclusive. + pub end: u64, +} diff --git a/src/meta/app/src/schema/auto_increment_storage.rs b/src/meta/app/src/schema/auto_increment_storage.rs new file mode 100644 index 0000000000000..42a608fd66eae --- /dev/null +++ b/src/meta/app/src/schema/auto_increment_storage.rs @@ -0,0 +1,85 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub use kvapi_impl::AutoIncrementStorageRsc; +pub use kvapi_impl::AutoIncrementStorageValue; + +use crate::principal::AutoIncrementKey; +use crate::tenant_key::ident::TIdent; + +/// Defines the meta-service key for sequence. +pub type AutoIncrementStorageIdent = TIdent; + +mod kvapi_impl { + + use databend_common_meta_kvapi::kvapi; + + use crate::primitive::Id; + use crate::tenant_key::resource::TenantResource; + + /// Defines the storage for autoincrement generator. + /// + /// [`AutoIncrementStorageRsc`] like ['SequenceStorageRsc'] just stores the metadata of autoincrement but not the counter. + /// This key stores the counter for the autoincrement name. + pub struct AutoIncrementStorageRsc; + impl TenantResource for AutoIncrementStorageRsc { + const PREFIX: &'static str = "__fd_autoincrement_storage"; + const HAS_TENANT: bool = true; + type ValueType = Id; + } + + impl kvapi::Value for Id { + type KeyType = super::AutoIncrementStorageIdent; + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } + + #[derive( + Debug, + Clone, + Copy, + Default, + PartialEq, + Eq, + derive_more::From, + derive_more::Deref, + derive_more::DerefMut, + )] + pub struct AutoIncrementStorageValue(pub u64); +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use crate::principal::AutoIncrementKey; + use crate::schema::auto_increment_storage::AutoIncrementStorageIdent; + use crate::tenant::Tenant; + + #[test] + fn test_auto_increment_storage_ident() { + let tenant = Tenant::new_literal("dummy"); + let key = AutoIncrementKey::new(0, 3); + let ident = AutoIncrementStorageIdent::new_generic(tenant, key); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_autoincrement_storage/dummy/0/3"); + + assert_eq!( + ident, + AutoIncrementStorageIdent::from_str_key(&key).unwrap() + ); + } +} diff --git a/src/meta/app/src/schema/mod.rs b/src/meta/app/src/schema/mod.rs index 7f5b95ca16506..8d19cbeb70d1a 100644 --- a/src/meta/app/src/schema/mod.rs +++ b/src/meta/app/src/schema/mod.rs @@ -14,39 +14,45 @@ //! Schema types +mod auto_increment; +mod auto_increment_storage; pub mod catalog; pub mod catalog_id_ident; pub mod catalog_id_to_name_ident; pub mod catalog_name_ident; +mod constraint; +mod create_option; +mod database; pub mod database_id; pub mod database_id_history_ident; pub mod database_name_ident; +mod dictionary; pub mod dictionary_id_ident; +mod dictionary_identity; pub mod dictionary_name_ident; +mod index; pub mod index_id_ident; pub mod index_id_to_name_ident; pub mod index_name_ident; +mod least_visible_time; pub mod least_visible_time_ident; +mod lock; pub mod marked_deleted_index_id; pub mod marked_deleted_index_ident; pub mod marked_deleted_table_index_id; pub mod marked_deleted_table_index_ident; -pub mod sequence_storage; -pub mod table_lock_ident; -pub mod table_niv; - -mod constraint; -mod create_option; -mod database; -mod dictionary; -mod dictionary_identity; -mod index; -mod least_visible_time; -mod lock; mod ownership; mod sequence; +pub mod sequence_storage; mod table; +pub mod table_lock_ident; +pub mod table_niv; +pub use auto_increment::GetAutoIncrementNextValueReply; +pub use auto_increment::GetAutoIncrementNextValueReq; +pub use auto_increment_storage::AutoIncrementStorageIdent; +pub use auto_increment_storage::AutoIncrementStorageRsc; +pub use auto_increment_storage::AutoIncrementStorageValue; pub use catalog::*; pub use catalog_id_ident::CatalogIdIdent; pub use catalog_id_to_name_ident::CatalogIdToNameIdent; diff --git a/src/meta/client/src/lib.rs b/src/meta/client/src/lib.rs index 2c35bc733489a..f5d0d68383232 100644 --- a/src/meta/client/src/lib.rs +++ b/src/meta/client/src/lib.rs @@ -168,7 +168,7 @@ use semver::Version; // Version: v1.2.257-nightly-188426e3e6-simd(1.75.0-nightly-2023-12-17T22:09:06.675156000Z) // ``` // Skip 1.2.258 use the next 1.2.259 -pub static MIN_METASRV_SEMVER: Version = Version::new(1, 2, 677); +pub static MIN_METASRV_SEMVER: Version = Version::new(1, 2, 768); pub fn to_digit_ver(v: &Version) -> u64 { v.major * 1_000_000 + v.minor * 1_000 + v.patch diff --git a/src/meta/proto-conv/src/schema_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/schema_from_to_protobuf_impl.rs index 12d7cd3861002..6d320b90b0179 100644 --- a/src/meta/proto-conv/src/schema_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/schema_from_to_protobuf_impl.rs @@ -78,6 +78,10 @@ impl FromToProto for ex::TableField { Some(computed_expr) => Some(ex::ComputedExpr::from_pb(computed_expr)?), None => None, }; + let auto_increment_expr = match p.auto_increment_expr { + Some(auto_increment_expr) => Some(ex::AutoIncrementExpr::from_pb(auto_increment_expr)?), + None => None, + }; let v = ex::TableField::new_from_column_id( &p.name, @@ -87,7 +91,8 @@ impl FromToProto for ex::TableField { p.column_id, ) .with_default_expr(p.default_expr) - .with_computed_expr(computed_expr); + .with_computed_expr(computed_expr) + .with_auto_increment_expr(auto_increment_expr); Ok(v) } @@ -96,6 +101,10 @@ impl FromToProto for ex::TableField { Some(computed_expr) => Some(computed_expr.to_pb()?), None => None, }; + let auto_increment_expr = match self.auto_increment_expr() { + Some(auto_increment_expr) => Some(auto_increment_expr.to_pb()?), + None => None, + }; let p = pb::DataField { ver: VER, min_reader_ver: MIN_READER_VER, @@ -104,6 +113,7 @@ impl FromToProto for ex::TableField { data_type: Some(self.data_type().to_pb()?), column_id: self.column_id(), computed_expr, + auto_increment_expr, }; Ok(p) } @@ -147,6 +157,37 @@ impl FromToProto for ex::ComputedExpr { } } +impl FromToProto for ex::AutoIncrementExpr { + type PB = pb::AutoIncrementExpr; + + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + + fn from_pb(p: Self::PB) -> Result + where Self: Sized { + reader_check_msg(p.ver, p.min_reader_ver)?; + + Ok(ex::AutoIncrementExpr { + column_id: p.column_id, + start: p.start, + step: p.step, + is_ordered: p.is_ordered, + }) + } + + fn to_pb(&self) -> Result { + Ok(pb::AutoIncrementExpr { + ver: VER, + min_reader_ver: MIN_READER_VER, + start: self.start, + step: self.step, + column_id: self.column_id, + is_ordered: self.is_ordered, + }) + } +} + impl FromToProto for ex::TableDataType { type PB = pb::DataType; fn get_pb_ver(p: &Self::PB) -> u64 { diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index f39de2c54d022..9d88d05d368c2 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -178,6 +178,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (146, "2025-09-01: Add: add Constraint on TableMeta"), (147, "2025-09-17: Add: Grant/OwnershipProcedureObject and UserPrivilegeType AccessProcedure, AccessProcedure"), (148, "2025-09-22: Add: virtual_data_type add Decimal, Binary, Date, Timestamp, Interval"), + (149, "2025-09-24: Add: add AutoIncrement name and display on TableField"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 16d2385c987d1..3206157300076 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -140,3 +140,4 @@ mod v145_opaque_data_type; mod v146_constraint; mod v147_grant_object_procedure; mod v148_virtual_schema; +mod v149_field_auto_increment; diff --git a/src/meta/proto-conv/tests/it/v149_field_auto_increment.rs b/src/meta/proto-conv/tests/it/v149_field_auto_increment.rs new file mode 100644 index 0000000000000..170437eaac4ef --- /dev/null +++ b/src/meta/proto-conv/tests/it/v149_field_auto_increment.rs @@ -0,0 +1,65 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression as ce; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::AutoIncrementExpr; +use databend_common_expression::TableDataType; +use fastrace::func_name; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// +// The message bytes are built from the output of `test_pb_from_to()` +#[test] +fn test_decode_v149_field_auto_increment() -> anyhow::Result<()> { + let table_schema_v149 = vec![ + 10, 44, 10, 1, 97, 26, 19, 154, 2, 9, 42, 0, 160, 6, 149, 1, 168, 6, 24, 160, 6, 149, 1, + 168, 6, 24, 50, 11, 8, 1, 16, 2, 160, 6, 149, 1, 168, 6, 24, 160, 6, 149, 1, 168, 6, 24, + 10, 33, 10, 1, 98, 26, 19, 154, 2, 9, 42, 0, 160, 6, 149, 1, 168, 6, 24, 160, 6, 149, 1, + 168, 6, 24, 32, 1, 160, 6, 149, 1, 168, 6, 24, 24, 2, 160, 6, 149, 1, 168, 6, 24, + ]; + + let want = || { + let mut field_a = ce::TableField::new("a", TableDataType::Number(NumberDataType::Int8)) + .with_auto_increment_expr(Some(AutoIncrementExpr { + column_id: 0, + start: 1, + step: 2, + is_ordered: false, + })); + let mut field_b = ce::TableField::new("b", TableDataType::Number(NumberDataType::Int8)); + + field_a.column_id = 0; + field_b.column_id = 1; + + ce::TableSchema { + fields: vec![field_a, field_b], + metadata: Default::default(), + next_column_id: 2, + } + }; + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), table_schema_v149.as_slice(), 149, want())?; + + Ok(()) +} diff --git a/src/meta/protos/proto/metadata.proto b/src/meta/protos/proto/metadata.proto index ca6d9eac7deb9..a9a9dd75a1bbe 100644 --- a/src/meta/protos/proto/metadata.proto +++ b/src/meta/protos/proto/metadata.proto @@ -43,6 +43,18 @@ message ComputedExpr { } } +// AutoIncrement expression +message AutoIncrementExpr { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + uint64 start = 1; + int64 step = 2; + bool is_ordered = 3; + // Used to concatenate AutoIncrementKey in `DefaultExprBinder::parse_and_bind` + uint32 column_id = 4; +} + // One field, AKA column message DataField { uint64 ver = 100; @@ -61,4 +73,6 @@ message DataField { uint32 column_id = 4; optional ComputedExpr computed_expr = 5; + + optional AutoIncrementExpr auto_increment_expr = 6; } diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_schema_api.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_schema_api.rs index 103d61cf3cc1b..c6464b3f431f6 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_schema_api.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_schema_api.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use std::sync::Mutex; +use databend_common_meta_api::AutoIncrementApiTestSuite; use databend_common_meta_api::SchemaApiTestSuite; use test_harness::test; @@ -31,6 +32,7 @@ async fn test_meta_grpc_client_single() -> anyhow::Result<()> { }; SchemaApiTestSuite::test_single_node(builder.clone()).await?; + AutoIncrementApiTestSuite::test_single_node(builder).await?; Ok(()) } diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 4e7f8fa8e8684..59574a8188c4b 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -938,6 +938,11 @@ pub enum ColumnExpr { Default(Box), Virtual(Box), Stored(Box), + AutoIncrement { + start: u64, + step: i64, + is_ordered: bool, + }, } impl Display for ColumnExpr { @@ -952,6 +957,18 @@ impl Display for ColumnExpr { ColumnExpr::Stored(expr) => { write!(f, " AS ({expr}) STORED")?; } + ColumnExpr::AutoIncrement { + start, + step, + is_ordered, + } => { + write!(f, " AUTOINCREMENT ({}, {}) ", start, step)?; + if *is_ordered { + write!(f, "ORDER")?; + } else { + write!(f, "NOORDER")?; + } + } } Ok(()) } diff --git a/src/query/ast/src/parser/expr.rs b/src/query/ast/src/parser/expr.rs index cd90ed666a3bc..83c50b96c731e 100644 --- a/src/query/ast/src/parser/expr.rs +++ b/src/query/ast/src/parser/expr.rs @@ -1680,6 +1680,24 @@ pub fn literal_u64(i: Input) -> IResult { )(i) } +#[allow(clippy::from_str_radix_10)] +pub fn literal_i64(i: Input) -> IResult { + let decimal = map_res( + rule! { + LiteralInteger + }, + |token| i64::from_str_radix(token.text(), 10).map_err(|e| nom::Err::Failure(e.into())), + ); + let hex = map_res(literal_hex_str, |lit| { + i64::from_str_radix(lit, 16).map_err(|e| nom::Err::Failure(e.into())) + }); + + rule!( + #decimal + | #hex + )(i) +} + pub fn literal_number(i: Input) -> IResult { let decimal_uint = map_res( rule! { diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 8c9ac47f279cb..602dc658c71ee 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -3253,12 +3253,32 @@ pub fn column_def(i: Input) -> IResult { VirtualExpr(Box), StoredExpr(Box), CheckExpr(Box), + AutoIncrement { + start: u64, + step: i64, + is_ordered: bool, + }, } let nullable = alt(( value(ColumnConstraint::Nullable(true), rule! { NULL }), value(ColumnConstraint::Nullable(false), rule! { NOT ~ ^NULL }), )); + let identity_parmas = alt(( + map( + rule! { + "(" ~ ^#literal_u64 ~ ^"," ~ ^#literal_i64 ~ ^")" + }, + |(_, start, _, step, _)| (start, step), + ), + map( + rule! { + START ~ ^#literal_u64 ~ ^INCREMENT ~ ^#literal_i64 + }, + |(_, start, _, step)| (start, step), + ), + )); + let expr = alt(( map( rule! { @@ -3284,6 +3304,25 @@ pub fn column_def(i: Input) -> IResult { }, |(_, _, expr, _)| ColumnConstraint::CheckExpr(Box::new(expr)), ), + map( + rule! { + (AUTOINCREMENT | IDENTITY) + ~ #identity_parmas? + ~ (ORDER | NOORDER)? + }, + |(_, params, order_token)| { + let (start, step) = params.unwrap_or((0, 1)); + let is_ordered = order_token + .map(|token| token.text().eq_ignore_ascii_case("order")) + .unwrap_or(true); + + ColumnConstraint::AutoIncrement { + start, + step, + is_ordered, + } + }, + ), )); let comment = map( @@ -3331,6 +3370,14 @@ pub fn column_def(i: Input) -> IResult { } } ColumnConstraint::DefaultExpr(default_expr) => { + if matches!(def.expr, Some(ColumnExpr::AutoIncrement { .. })) { + return Err(nom::Err::Error(Error::from_error_kind( + i, + ErrorKind::Other( + "DEFAULT and AUTO INCREMENT cannot exist at the same time", + ), + ))); + } def.expr = Some(ColumnExpr::Default(default_expr)) } ColumnConstraint::VirtualExpr(virtual_expr) => { @@ -3340,6 +3387,29 @@ pub fn column_def(i: Input) -> IResult { def.expr = Some(ColumnExpr::Stored(stored_expr)) } ColumnConstraint::CheckExpr(check) => def.check = Some(*check), + ColumnConstraint::AutoIncrement { + start, + step, + is_ordered, + } => { + if matches!(def.expr, Some(ColumnExpr::Default(_))) { + return Err(nom::Err::Error(Error::from_error_kind( + i, + ErrorKind::Other("DEFAULT and AUTOINCREMENT cannot exist at the same time"), + ))); + } + if !is_ordered { + return Err(nom::Err::Error(Error::from_error_kind( + i, + ErrorKind::Other("AUTOINCREMENT only support ORDER now"), + ))); + } + def.expr = Some(ColumnExpr::AutoIncrement { + start, + step, + is_ordered, + }) + } } } diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index cadc8c159f560..f7d6267110638 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -355,6 +355,8 @@ pub enum TokenKind { ARGS, #[token("AUTO", ignore(ascii_case))] AUTO, + #[token("AUTOINCREMENT", ignore(ascii_case))] + AUTOINCREMENT, #[token("SOME", ignore(ascii_case))] SOME, #[token("ALTER", ignore(ascii_case))] @@ -725,6 +727,8 @@ pub enum TokenKind { IDENTIFIED, #[token("IDENTIFIER", ignore(ascii_case))] IDENTIFIER, + #[token("IDENTITY", ignore(ascii_case))] + IDENTITY, #[token("IF", ignore(ascii_case))] IF, #[token("IMMUTABLE", ignore(ascii_case))] @@ -884,6 +888,8 @@ pub enum TokenKind { NO_PASSWORD, #[token("NONE", ignore(ascii_case))] NONE, + #[token("NOORDER", ignore(ascii_case))] + NOORDER, #[token("NOSCAN", ignore(ascii_case))] NOSCAN, #[token("NOT", ignore(ascii_case))] diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index c13fb31166014..d6a09faa47217 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -191,6 +191,8 @@ fn test_statement() { r#"CREATE TABLE t(c1 varbinary, c2 binary(10));"#, r#"CREATE TABLE t(c1 int default 1);"#, r#"create table abc as (select * from xyz limit 10)"#, + r#"create table a.b (c integer autoincrement (10, 20) ORDER)"#, + r#"create table a.b (c integer identity START 10 INCREMENT 20)"#, r#"ALTER USER u1 IDENTIFIED BY '123456';"#, r#"ALTER USER u1 WITH disabled = false;"#, r#"ALTER USER u1 WITH default_role = role1;"#, @@ -1012,6 +1014,7 @@ fn test_statement_error() { r#"CREATE TABLE t(c1 NULLABLE(int) NOT NULL);"#, r#"create table a (c1 decimal(38), c2 int) partition by ();"#, r#"CREATE TABLE t(c1 int, c2 int) partition by (c1, c2) PROPERTIES ("read.split.target-size"='134217728', "read.split.metadata-target-size"=33554432);"#, + r#"create table a.b (c integer autoincrement (10, 20) NOORDER)"#, r#"drop table if a.b"#, r#"show table"#, r#"show column"#, diff --git a/src/query/ast/tests/it/testdata/stmt-error.txt b/src/query/ast/tests/it/testdata/stmt-error.txt index 0acdcd780d646..259419b1f153e 100644 --- a/src/query/ast/tests/it/testdata/stmt-error.txt +++ b/src/query/ast/tests/it/testdata/stmt-error.txt @@ -5,7 +5,7 @@ error: --> SQL:1:38 | 1 | create table a.b (c integer not null 1, b float(10)) - | ------ ^ unexpected `1`, expecting `)`, `NULL`, `NOT`, `DEFAULT`, `GENERATED`, `AS`, `CHECK`, `COMMENT`, or `,` + | ------ ^ unexpected `1`, expecting `)`, `NULL`, `NOT`, `DEFAULT`, `GENERATED`, `AS`, `CHECK`, `AUTOINCREMENT`, `IDENTITY`, `COMMENT`, or `,` | | | while parsing `CREATE [OR REPLACE] TABLE [IF NOT EXISTS] [.] [] []` @@ -27,7 +27,7 @@ error: --> SQL:1:24 | 1 | create table a (c float(10)) - | ------ ^ unexpected `(`, expecting `)`, `NULL`, `NOT`, `DEFAULT`, `GENERATED`, `AS`, `CHECK`, `COMMENT`, or `,` + | ------ ^ unexpected `(`, expecting `)`, `NULL`, `NOT`, `DEFAULT`, `GENERATED`, `AS`, `CHECK`, `AUTOINCREMENT`, `IDENTITY`, `COMMENT`, or `,` | | | while parsing `CREATE [OR REPLACE] TABLE [IF NOT EXISTS] [.]
[] []` @@ -111,6 +111,18 @@ error: | while parsing `CREATE [OR REPLACE] TABLE [IF NOT EXISTS] [.]
[] []` +---------- Input ---------- +create table a.b (c integer autoincrement (10, 20) NOORDER) +---------- Output --------- +error: + --> SQL:1:59 + | +1 | create table a.b (c integer autoincrement (10, 20) NOORDER) + | ------ ^ AUTOINCREMENT only support ORDER now + | | + | while parsing `CREATE [OR REPLACE] TABLE [IF NOT EXISTS] [.]
[] []` + + ---------- Input ---------- drop table if a.b ---------- Output --------- diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 83aed60f959e9..beed80283a78a 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -5746,6 +5746,142 @@ CreateTable( ) +---------- Input ---------- +create table a.b (c integer autoincrement (10, 20) ORDER) +---------- Output --------- +CREATE TABLE a.b (c Int32 AUTOINCREMENT (10, 20) ORDER) +---------- AST ------------ +CreateTable( + CreateTableStmt { + create_option: Create, + catalog: None, + database: Some( + Identifier { + span: Some( + 13..14, + ), + name: "a", + quote: None, + ident_type: None, + }, + ), + table: Identifier { + span: Some( + 15..16, + ), + name: "b", + quote: None, + ident_type: None, + }, + source: Some( + Columns { + columns: [ + ColumnDefinition { + name: Identifier { + span: Some( + 18..19, + ), + name: "c", + quote: None, + ident_type: None, + }, + data_type: Int32, + expr: Some( + AutoIncrement { + start: 10, + step: 20, + is_ordered: true, + }, + ), + check: None, + comment: None, + }, + ], + opt_table_indexes: None, + opt_column_constraints: None, + opt_table_constraints: None, + }, + ), + engine: None, + uri_location: None, + cluster_by: None, + table_options: {}, + iceberg_table_partition: None, + table_properties: None, + as_query: None, + table_type: Normal, + }, +) + + +---------- Input ---------- +create table a.b (c integer identity START 10 INCREMENT 20) +---------- Output --------- +CREATE TABLE a.b (c Int32 AUTOINCREMENT (10, 20) ORDER) +---------- AST ------------ +CreateTable( + CreateTableStmt { + create_option: Create, + catalog: None, + database: Some( + Identifier { + span: Some( + 13..14, + ), + name: "a", + quote: None, + ident_type: None, + }, + ), + table: Identifier { + span: Some( + 15..16, + ), + name: "b", + quote: None, + ident_type: None, + }, + source: Some( + Columns { + columns: [ + ColumnDefinition { + name: Identifier { + span: Some( + 18..19, + ), + name: "c", + quote: None, + ident_type: None, + }, + data_type: Int32, + expr: Some( + AutoIncrement { + start: 10, + step: 20, + is_ordered: true, + }, + ), + check: None, + comment: None, + }, + ], + opt_table_indexes: None, + opt_column_constraints: None, + opt_table_constraints: None, + }, + ), + engine: None, + uri_location: None, + cluster_by: None, + table_options: {}, + iceberg_table_partition: None, + table_properties: None, + as_query: None, + table_type: Normal, + }, +) + + ---------- Input ---------- ALTER USER u1 IDENTIFIED BY '123456'; ---------- Output --------- diff --git a/src/query/catalog/src/catalog/interface.rs b/src/query/catalog/src/catalog/interface.rs index dfe090d6c0ee5..d7009a55198a2 100644 --- a/src/query/catalog/src/catalog/interface.rs +++ b/src/query/catalog/src/catalog/interface.rs @@ -54,6 +54,8 @@ use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::DroppedId; use databend_common_meta_app::schema::ExtendLockRevReq; use databend_common_meta_app::schema::GcDroppedTableReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -564,6 +566,11 @@ pub trait Catalog: DynClone + Send + Sync + Debug { async fn drop_sequence(&self, req: DropSequenceReq) -> Result; + async fn get_autoincrement_next_value( + &self, + req: GetAutoIncrementNextValueReq, + ) -> Result; + fn set_session_state(&self, _state: SessionState) -> Arc { unimplemented!() } diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index 4a07c7ad2a471..e32f247f7d86d 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -25,8 +25,10 @@ use databend_common_meta_api::get_u64_value; use databend_common_meta_api::kv_pb_api::KVPbApi; use databend_common_meta_api::send_txn; use databend_common_meta_api::txn_core_util::txn_replace_exact; +use databend_common_meta_app::principal::AutoIncrementKey; use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::principal::TenantOwnershipObjectIdent; +use databend_common_meta_app::schema::AutoIncrementStorageIdent; use databend_common_meta_app::schema::DBIdTableName; use databend_common_meta_app::schema::DatabaseId; use databend_common_meta_app::schema::TableInfo; @@ -551,6 +553,102 @@ async fn test_remove_files_in_batch_do_not_swallow_errors() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_vacuum_dropped_table_clean_autoincrement() -> Result<()> { + // 1. Prepare local meta service + let meta = new_local_meta().await; + let endpoints = meta.endpoints.clone(); + + // Modify config to use local meta store + let mut ee_setup = EESetup::new(); + let config = ee_setup.config_mut(); + config.meta.endpoints = endpoints.clone(); + + // 2. Setup test fixture by using local meta store + let fixture = TestFixture::setup_with_custom(ee_setup).await?; + + // Adjust retention period to 0, so that dropped tables will be vacuumed immediately + let session = fixture.default_session(); + session.get_settings().set_data_retention_time_in_days(0)?; + + // 3. Prepare test db and table + let ctx = fixture.new_query_ctx().await?; + let db_name = "test_vacuum_clean_ownership"; + let tbl_name = "t"; + fixture + .execute_command(format!("create database {db_name}").as_str()) + .await?; + fixture + .execute_command( + format!("create table {db_name}.{tbl_name} (a int autoincrement)").as_str(), + ) + .await?; + + // 4. Ensure that table auto increment sequence exist right after table is created + let tenant = ctx.get_tenant(); + let table = ctx + .get_default_catalog()? + .get_table(&tenant, db_name, tbl_name) + .await?; + + let auto_increment_key_0 = AutoIncrementKey::new(table.get_id(), 0); + let sequence_storage_ident_0 = + AutoIncrementStorageIdent::new_generic(&tenant, auto_increment_key_0); + + let v = meta.get_pb(&sequence_storage_ident_0).await?; + assert!(v.is_some()); + + // 5. Ensure that table auto increment sequence exist right after table is replace + fixture + .execute_command( + format!("create or replace table {db_name}.{tbl_name} (a int autoincrement, b int autoincrement)").as_str(), + ) + .await?; + let table = ctx + .get_default_catalog()? + .get_table(&tenant, db_name, tbl_name) + .await?; + + let auto_increment_key_1 = AutoIncrementKey::new(table.get_id(), 0); + let sequence_storage_ident_1 = + AutoIncrementStorageIdent::new_generic(&tenant, auto_increment_key_1); + let auto_increment_key_2 = AutoIncrementKey::new(table.get_id(), 1); + let sequence_storage_ident_2 = + AutoIncrementStorageIdent::new_generic(&tenant, auto_increment_key_2); + + let v = meta.get_pb(&sequence_storage_ident_0).await?; + assert!(v.is_some()); + let v = meta.get_pb(&sequence_storage_ident_1).await?; + assert!(v.is_some()); + let v = meta.get_pb(&sequence_storage_ident_2).await?; + assert!(v.is_some()); + + // 6. Ensure that column b auto increment sequence exist right after column b is drop + fixture + .execute_command(format!("alter table {db_name}.{tbl_name} drop column b").as_str()) + .await?; + let v = meta.get_pb(&sequence_storage_ident_2).await?; + assert!(v.is_some()); + + // 7. Drop test table + fixture + .execute_command(format!("drop table {db_name}.{tbl_name}").as_str()) + .await?; + + // 8. Vacuum dropped tables + fixture.execute_command("vacuum drop table").await?; + + // 9. Ensure that table auto increment sequence is cleaned up + let v = meta.get_pb(&sequence_storage_ident_0).await?; + assert!(v.is_none()); + let v = meta.get_pb(&sequence_storage_ident_1).await?; + assert!(v.is_none()); + let v = meta.get_pb(&sequence_storage_ident_2).await?; + assert!(v.is_none()); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_vacuum_dropped_table_clean_ownership() -> Result<()> { // 1. Prepare local meta service diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index 6f7ab4136e608..d10f11593e202 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; +use std::ops::Add; use std::sync::Arc; use std::sync::LazyLock; @@ -252,6 +253,27 @@ pub struct DataField { default_expr: Option, data_type: DataType, computed_expr: Option, + auto_increment_expr: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct AutoIncrementExpr { + pub column_id: ColumnId, + pub start: u64, + pub step: i64, + // Ensure that the generated sequence values are distributed strictly in the order of insertion time + pub is_ordered: bool, +} + +impl AutoIncrementExpr { + pub fn to_sql_string(&self) -> String { + let string = format!("AUTOINCREMENT ({}, {}) ", self.start, self.step); + if self.is_ordered { + string.add("ORDER") + } else { + string.add("NOORDER") + } + } } fn uninit_column_id() -> ColumnId { @@ -276,6 +298,7 @@ pub struct TableField { #[serde(default = "uninit_column_id")] pub column_id: ColumnId, pub computed_expr: Option, + pub auto_increment_expr: Option, } /// DataType with more information that is only available for table field, e.g, the @@ -1079,6 +1102,7 @@ impl DataField { default_expr: None, data_type, computed_expr: None, + auto_increment_expr: None, } } @@ -1088,6 +1112,7 @@ impl DataField { default_expr: None, data_type: data_type.wrap_nullable(), computed_expr: None, + auto_increment_expr: None, } } @@ -1102,6 +1127,14 @@ impl DataField { self } + pub fn with_auto_increment_expr( + mut self, + auto_increment_expr: Option, + ) -> Self { + self.auto_increment_expr = auto_increment_expr; + self + } + pub fn name(&self) -> &String { &self.name } @@ -1118,6 +1151,10 @@ impl DataField { self.computed_expr.as_ref() } + pub fn auto_increment_expr(&self) -> Option<&AutoIncrementExpr> { + self.auto_increment_expr.as_ref() + } + #[inline] pub fn is_nullable(&self) -> bool { self.data_type.is_nullable() @@ -1137,6 +1174,7 @@ impl TableField { data_type, column_id: 0, computed_expr: None, + auto_increment_expr: None, } } @@ -1147,6 +1185,7 @@ impl TableField { data_type, column_id, computed_expr: None, + auto_increment_expr: None, } } @@ -1193,6 +1232,7 @@ impl TableField { data_type: self.data_type.clone(), column_id, computed_expr: self.computed_expr.clone(), + auto_increment_expr: self.auto_increment_expr.clone(), } } @@ -1238,6 +1278,14 @@ impl TableField { self } + pub fn with_auto_increment_expr( + mut self, + auto_increment_expr: Option, + ) -> Self { + self.auto_increment_expr = auto_increment_expr; + self + } + pub fn name(&self) -> &String { &self.name } @@ -1254,6 +1302,10 @@ impl TableField { self.computed_expr.as_ref() } + pub fn auto_increment_expr(&self) -> Option<&AutoIncrementExpr> { + self.auto_increment_expr.as_ref() + } + #[inline] pub fn is_nullable(&self) -> bool { self.data_type.is_nullable() @@ -1552,6 +1604,7 @@ impl From<&TableField> for DataField { DataField::new(&name, DataType::from(&data_type)) .with_default_expr(f.default_expr.clone()) .with_computed_expr(f.computed_expr.clone()) + .with_auto_increment_expr(f.auto_increment_expr.clone()) } } @@ -1575,6 +1628,7 @@ impl From<&DataField> for TableField { TableField::new(&name, ty) .with_default_expr(f.default_expr.clone()) .with_computed_expr(f.computed_expr.clone()) + .with_auto_increment_expr(f.auto_increment_expr.clone()) } } @@ -1647,7 +1701,8 @@ pub fn infer_table_schema(data_schema: &DataSchema) -> Result { fields.push( TableField::new(field.name(), field_type) .with_default_expr(field.default_expr.clone()) - .with_computed_expr(field.computed_expr.clone()), + .with_computed_expr(field.computed_expr.clone()) + .with_auto_increment_expr(field.auto_increment_expr.clone()), ); } Ok(TableSchemaRefExt::create(fields)) diff --git a/src/query/service/src/catalogs/default/database_catalog.rs b/src/query/service/src/catalogs/default/database_catalog.rs index d33108ea3680c..4e7546bc2d5da 100644 --- a/src/query/service/src/catalogs/default/database_catalog.rs +++ b/src/query/service/src/catalogs/default/database_catalog.rs @@ -59,6 +59,8 @@ use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::DroppedId; use databend_common_meta_app::schema::ExtendLockRevReq; use databend_common_meta_app::schema::GcDroppedTableReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -882,4 +884,11 @@ impl Catalog for DatabaseCatalog { async fn rename_dictionary(&self, req: RenameDictionaryReq) -> Result<()> { self.mutable_catalog.rename_dictionary(req).await } + + async fn get_autoincrement_next_value( + &self, + req: GetAutoIncrementNextValueReq, + ) -> Result { + self.mutable_catalog.get_autoincrement_next_value(req).await + } } diff --git a/src/query/service/src/catalogs/default/immutable_catalog.rs b/src/query/service/src/catalogs/default/immutable_catalog.rs index ebe1f3cf774da..7e194b3ff2394 100644 --- a/src/query/service/src/catalogs/default/immutable_catalog.rs +++ b/src/query/service/src/catalogs/default/immutable_catalog.rs @@ -50,6 +50,8 @@ use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -593,4 +595,11 @@ impl Catalog for ImmutableCatalog { async fn rename_dictionary(&self, _req: RenameDictionaryReq) -> Result<()> { unimplemented!() } + + async fn get_autoincrement_next_value( + &self, + _req: GetAutoIncrementNextValueReq, + ) -> Result { + unimplemented!() + } } diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 4435fd262285b..ecf71400662d9 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -25,6 +25,7 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_api::kv_app_error::KVAppError; use databend_common_meta_api::name_id_value_api::NameIdValueApiCompat; +use databend_common_meta_api::AutoIncrementApi; use databend_common_meta_api::DatabaseApi; use databend_common_meta_api::DictionaryApi; use databend_common_meta_api::GarbageCollectionApi; @@ -72,6 +73,8 @@ use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::DroppedId; use databend_common_meta_app::schema::ExtendLockRevReq; use databend_common_meta_app::schema::GcDroppedTableReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDatabaseReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; @@ -928,4 +931,13 @@ impl Catalog for MutableCatalog { let res = self.ctx.meta.rename_dictionary(req).await?; Ok(res) } + + #[async_backtrace::framed] + async fn get_autoincrement_next_value( + &self, + req: GetAutoIncrementNextValueReq, + ) -> Result { + let res = self.ctx.meta.get_auto_increment_next_value(req).await??; + Ok(res) + } } diff --git a/src/query/service/src/catalogs/default/session_catalog.rs b/src/query/service/src/catalogs/default/session_catalog.rs index c347c614a47f4..df1354aeeadfb 100644 --- a/src/query/service/src/catalogs/default/session_catalog.rs +++ b/src/query/service/src/catalogs/default/session_catalog.rs @@ -55,6 +55,8 @@ use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::DroppedId; use databend_common_meta_app::schema::ExtendLockRevReq; use databend_common_meta_app::schema::GcDroppedTableReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -764,6 +766,13 @@ impl Catalog for SessionCatalog { async fn rename_dictionary(&self, req: RenameDictionaryReq) -> Result<()> { self.inner.rename_dictionary(req).await } + + async fn get_autoincrement_next_value( + &self, + req: GetAutoIncrementNextValueReq, + ) -> Result { + self.inner.get_autoincrement_next_value(req).await + } } impl SessionCatalog { diff --git a/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs b/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs index 2caf122b71404..1b963deb7c371 100644 --- a/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs +++ b/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs @@ -52,6 +52,8 @@ use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -582,4 +584,11 @@ impl Catalog for IcebergCatalog { async fn rename_dictionary(&self, _req: RenameDictionaryReq) -> Result<()> { unimplemented!() } + + async fn get_autoincrement_next_value( + &self, + _req: GetAutoIncrementNextValueReq, + ) -> Result { + unimplemented!() + } } diff --git a/src/query/service/src/interpreters/interpreter_table_add_column.rs b/src/query/service/src/interpreters/interpreter_table_add_column.rs index 6ef7db66f3abd..77e3abad918f9 100644 --- a/src/query/service/src/interpreters/interpreter_table_add_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_add_column.rs @@ -111,6 +111,12 @@ impl Interpreter for AddTableColumnInterpreter { &self.plan.field.name, &self.plan.table ))); } + if self.plan.is_autoincrement && num_rows > 0 { + return Err(ErrorCode::AlterTableError(format!( + "Cannot add column '{}' with `AUTOINCREMENT` to non-empty table '{}'", + &self.plan.field.name, &self.plan.table + ))); + } if field.default_expr().is_some() { let _ = DefaultExprBinder::try_new(self.ctx.clone())?.get_scalar(&field)?; } diff --git a/src/query/service/src/interpreters/interpreter_table_show_create.rs b/src/query/service/src/interpreters/interpreter_table_show_create.rs index 10a388eeb09ed..ba903dd08e644 100644 --- a/src/query/service/src/interpreters/interpreter_table_show_create.rs +++ b/src/query/service/src/interpreters/interpreter_table_show_create.rs @@ -197,6 +197,12 @@ impl ShowCreateTableInterpreter { } _ => "".to_string(), }; + let auto_increment = if let Some(auto_increment_expr) = &field.auto_increment_expr { + format!(" {}", auto_increment_expr.to_sql_string()) + } else { + "".to_string() + }; + let comment = if field_comments.len() == schema.fields().len() && !field_comments[idx].is_empty() { @@ -212,8 +218,9 @@ impl ShowCreateTableInterpreter { sql_dialect, ); let data_type = field.data_type().sql_name_explicit_null(); - let column_str = - format!(" {ident} {data_type}{default_expr}{computed_expr}{comment}"); + let column_str = format!( + " {ident} {data_type}{default_expr}{computed_expr}{auto_increment}{comment}" + ); create_defs.push(column_str); } diff --git a/src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs b/src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs index 5298375d2d542..7a083d45a59da 100644 --- a/src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs +++ b/src/query/service/src/pipelines/builders/builder_fill_missing_columns.rs @@ -47,7 +47,8 @@ impl PipelineBuilder { // Fill missing default columns and resort the columns. if source_schema != default_schema { - let mut default_expr_binder = DefaultExprBinder::try_new(ctx.clone())?; + let mut default_expr_binder = + DefaultExprBinder::try_new(ctx.clone())?.auto_increment_table_id(table.get_id()); if let Some((async_funcs, new_default_schema, new_default_schema_no_cast)) = default_expr_binder .split_async_default_exprs(source_schema.clone(), default_schema.clone())? diff --git a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs index 82749b2ed9282..0b738a26ce979 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs @@ -18,10 +18,14 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use databend_common_base::base::tokio::sync::RwLock; +use databend_common_catalog::catalog::Catalog; use databend_common_exception::Result; use databend_common_expression::types::UInt64Type; +use databend_common_expression::AutoIncrementExpr; use databend_common_expression::DataBlock; use databend_common_expression::FromData; +use databend_common_meta_app::principal::AutoIncrementKey; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetSequenceNextValueReq; use databend_common_meta_app::schema::GetSequenceReq; use databend_common_meta_app::schema::SequenceIdent; @@ -119,11 +123,11 @@ impl TransformAsyncFunction { } // transform add sequence nextval column. - pub async fn transform_sequence( + pub async fn transform( ctx: Arc, data_block: &mut DataBlock, counter_lock: Arc>, - sequence_name: &String, + fetcher: T, ) -> Result<()> { let count = data_block.num_rows() as u64; let column = if count == 0 { @@ -149,7 +153,6 @@ impl TransformAsyncFunction { UInt64Type::from_data(range.collect::>()) } else { // We need to fetch more sequence numbers - let tenant = ctx.get_tenant(); let catalog = ctx.get_default_catalog()?; // Get current state of the counter @@ -160,34 +163,7 @@ impl TransformAsyncFunction { let remaining = max.saturating_sub(current); let to_fetch = count.saturating_sub(remaining); - let visibility_checker = if ctx - .get_settings() - .get_enable_experimental_sequence_privilege_check()? - { - Some(ctx.get_visibility_checker(false, Object::Sequence).await?) - } else { - None - }; - - let req = GetSequenceReq { - ident: SequenceIdent::new(&tenant, sequence_name), - }; - let resp = catalog.get_sequence(req, &visibility_checker).await?; - let step_size = resp.meta.step as u64; - - // Calculate batch size - take the larger of count or step_size - let batch_size = to_fetch.max(step_size); - - // Calculate batch size - take the larger of count or step_size - let req = GetSequenceNextValueReq { - ident: SequenceIdent::new(&tenant, sequence_name), - count: batch_size, - }; - - let resp = catalog - .get_sequence_next_value(req, &visibility_checker) - .await?; - let start = resp.start; + let (start, batch_size) = fetcher.fetch(&ctx, &catalog, to_fetch).await?; // If we have remaining numbers, use them first if remaining > 0 { @@ -229,6 +205,89 @@ impl TransformAsyncFunction { } } +pub trait NextValFetcher { + async fn fetch( + self, + ctx: &QueryContext, + catalog: &Arc, + to_fetch: u64, + ) -> Result<(u64 /* start */, u64 /* batch */)>; +} + +pub struct SequenceNextValFetcher { + pub(crate) sequence_ident: SequenceIdent, +} + +impl NextValFetcher for SequenceNextValFetcher { + async fn fetch( + self, + ctx: &QueryContext, + catalog: &Arc, + to_fetch: u64, + ) -> Result<(u64, u64)> { + let visibility_checker = if ctx + .get_settings() + .get_enable_experimental_sequence_privilege_check()? + { + Some(ctx.get_visibility_checker(false, Object::Sequence).await?) + } else { + None + }; + + let req = GetSequenceReq { + ident: self.sequence_ident.clone(), + }; + let resp = catalog.get_sequence(req, &visibility_checker).await?; + let step_size = resp.meta.step as u64; + + // Calculate batch size - take the larger of count or step_size + let batch_size = to_fetch.max(step_size); + + // Calculate batch size - take the larger of count or step_size + let req = GetSequenceNextValueReq { + ident: self.sequence_ident, + count: batch_size, + }; + + let resp = catalog + .get_sequence_next_value(req, &visibility_checker) + .await?; + Ok((resp.start, batch_size)) + } +} + +pub struct AutoIncrementNextValFetcher { + pub(crate) key: AutoIncrementKey, + pub(crate) expr: AutoIncrementExpr, +} + +impl NextValFetcher for AutoIncrementNextValFetcher { + async fn fetch( + self, + ctx: &QueryContext, + catalog: &Arc, + to_fetch: u64, + ) -> Result<(u64, u64)> { + let step_size = self.expr.step as u64; + + // Calculate batch size - take the larger of count or step_size + let batch_size = to_fetch.max(step_size); + + // Calculate batch size - take the larger of count or step_size + let req = GetAutoIncrementNextValueReq { + tenant: ctx.get_tenant(), + key: self.key, + expr: self.expr, + // FIXME: count * step + // count: batch_size, + count: 1, + }; + + let resp = catalog.get_autoincrement_next_value(req).await?; + Ok((resp.start, batch_size)) + } +} + #[async_trait::async_trait] impl AsyncTransform for TransformAsyncFunction { const NAME: &'static str = "AsyncFunction"; @@ -238,11 +297,28 @@ impl AsyncTransform for TransformAsyncFunction { for (i, async_func_desc) in self.async_func_descs.iter().enumerate() { match &async_func_desc.func_arg { AsyncFunctionArgument::SequenceFunction(sequence_name) => { - Self::transform_sequence( + Self::transform( + self.ctx.clone(), + &mut data_block, + self.sequence_counters[i].clone(), + SequenceNextValFetcher { + sequence_ident: SequenceIdent::new( + self.ctx.get_tenant(), + sequence_name, + ), + }, + ) + .await?; + } + AsyncFunctionArgument::AutoIncrement { key, expr } => { + Self::transform( self.ctx.clone(), &mut data_block, self.sequence_counters[i].clone(), - sequence_name, + AutoIncrementNextValFetcher { + key: key.clone(), + expr: expr.clone(), + }, ) .await?; } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_branched_async_function.rs b/src/query/service/src/pipelines/processors/transforms/transform_branched_async_function.rs index e6ee4bffac891..e8ec40ce6c332 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_branched_async_function.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_branched_async_function.rs @@ -15,14 +15,18 @@ use std::collections::HashMap; use std::sync::Arc; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::SourceSchemaIndex; +use databend_common_meta_app::schema::SequenceIdent; use databend_common_pipeline_transforms::processors::AsyncTransform; use databend_common_sql::binder::AsyncFunctionDesc; +use crate::pipelines::processors::transforms::transform_async_function::AutoIncrementNextValFetcher; use crate::pipelines::processors::transforms::transform_async_function::SequenceCounters; +use crate::pipelines::processors::transforms::transform_async_function::SequenceNextValFetcher; use crate::pipelines::processors::transforms::TransformAsyncFunction; use crate::sessions::QueryContext; use crate::sql::plans::AsyncFunctionArgument; @@ -64,11 +68,29 @@ impl AsyncTransform for TransformBranchedAsyncFunction { match &async_func_desc.func_arg { AsyncFunctionArgument::SequenceFunction(sequence_name) => { let counter_lock = sequence_counters[i].clone(); - TransformAsyncFunction::transform_sequence( + TransformAsyncFunction::transform( self.ctx.clone(), &mut block, counter_lock, - sequence_name, + SequenceNextValFetcher { + sequence_ident: SequenceIdent::new( + self.ctx.get_tenant(), + sequence_name.clone(), + ), + }, + ) + .await?; + } + AsyncFunctionArgument::AutoIncrement { key, expr } => { + let counter_lock = sequence_counters[i].clone(); + TransformAsyncFunction::transform( + self.ctx.clone(), + &mut block, + counter_lock, + AutoIncrementNextValFetcher { + key: key.clone(), + expr: expr.clone(), + }, ) .await?; } diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 3307f22e81912..6a488b69a3d37 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -85,6 +85,8 @@ use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -471,6 +473,13 @@ impl Catalog for FakedCatalog { async fn rename_dictionary(&self, _req: RenameDictionaryReq) -> Result<()> { todo!() } + + async fn get_autoincrement_next_value( + &self, + _req: GetAutoIncrementNextValueReq, + ) -> Result { + todo!() + } } struct CtxDelegation { diff --git a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs index 59d931081d1c7..8b69fac3acaa3 100644 --- a/src/query/service/tests/it/storages/fuse/operations/alter_table.rs +++ b/src/query/service/tests/it/storages/fuse/operations/alter_table.rs @@ -186,6 +186,7 @@ async fn test_fuse_table_optimize_alter_table() -> Result<()> { option: AddColumnOption::End, is_deterministic: true, is_nextval: false, + is_autoincrement: false, }; let interpreter = AddTableColumnInterpreter::try_create(ctx.clone(), add_table_column_plan)?; let _ = interpreter.execute(ctx.clone()).await?; diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index f3c2bfcc0d3c4..b417f65973639 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -84,6 +84,8 @@ use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -1221,4 +1223,11 @@ impl Catalog for FakedCatalog { async fn rename_dictionary(&self, _req: RenameDictionaryReq) -> Result<()> { todo!() } + + async fn get_autoincrement_next_value( + &self, + _req: GetAutoIncrementNextValueReq, + ) -> Result { + todo!() + } } diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 453d77ec18504..de510ca50a560 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -73,6 +73,7 @@ use databend_common_exception::Result; use databend_common_expression::infer_schema_type; use databend_common_expression::infer_table_schema; use databend_common_expression::types::DataType; +use databend_common_expression::AutoIncrementExpr; use databend_common_expression::ComputedExpr; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRefExt; @@ -1091,7 +1092,7 @@ impl Binder { .get_table(&catalog, &database, &table) .await? .schema(); - let (field, comment, is_deterministic, is_nextval) = + let (field, comment, is_deterministic, is_nextval, is_autoincrement) = self.analyze_add_column(column, schema).await?; let option = match ast_option { AstAddColumnOption::First => AddColumnOption::First, @@ -1110,6 +1111,7 @@ impl Binder { option, is_deterministic, is_nextval, + is_autoincrement, }))) } AlterTableAction::AddConstraint { constraint } => { @@ -1180,7 +1182,7 @@ impl Binder { .await? .schema(); for column in column_def_vec { - let (field, comment, _, _) = + let (field, comment, _, _, _) = self.analyze_add_column(column, schema.clone()).await?; field_and_comment.push((field, comment)); } @@ -1657,13 +1659,15 @@ impl Binder { &self, column: &ColumnDefinition, table_schema: TableSchemaRef, - ) -> Result<(TableField, String, bool, bool)> { + ) -> Result<(TableField, String, bool, bool, bool)> { let name = normalize_identifier(&column.name, &self.name_resolution_ctx).name; let not_null = self.is_column_not_null(); let data_type = resolve_type_name(&column.data_type, not_null)?; let mut is_deterministic = true; let mut is_nextval = false; + let mut is_autoincrement = false; let mut field = TableField::new(&name, data_type); + if let Some(expr) = &column.expr { match expr { ColumnExpr::Default(default_expr) => { @@ -1693,10 +1697,37 @@ impl Binder { field = field.with_computed_expr(Some(ComputedExpr::Stored(expr))); is_deterministic = false; } + ColumnExpr::AutoIncrement { + start, + step, + is_ordered, + } => { + if !matches!( + field.data_type().remove_nullable(), + TableDataType::Number(_) | TableDataType::Decimal(_) + ) { + return Err(ErrorCode::SemanticError( + "AUTO INCREMENT only supports Decimal or Numeric (e.g. INT32) types", + )); + } + field.auto_increment_expr = Some(AutoIncrementExpr { + column_id: table_schema.next_column_id(), + start: *start, + step: *step, + is_ordered: *is_ordered, + }); + is_autoincrement = true; + } } } let comment = column.comment.clone().unwrap_or_default(); - Ok((field, comment, is_deterministic, is_nextval)) + Ok(( + field, + comment, + is_deterministic, + is_nextval, + is_autoincrement, + )) } #[async_backtrace::framed] @@ -1705,6 +1736,7 @@ impl Binder { columns: &[ColumnDefinition], ) -> Result<(TableSchemaRef, Vec)> { let mut has_computed = false; + let mut has_autoincrement = false; let mut fields = Vec::with_capacity(columns.len()); let mut fields_comments = Vec::with_capacity(columns.len()); let not_null = self.is_column_not_null(); @@ -1721,13 +1753,34 @@ impl Binder { .parse_default_expr_to_string(&field, default_expr)?; field = field.with_default_expr(Some(expr)); } + ColumnExpr::AutoIncrement { + start, + step, + is_ordered, + } => { + if !matches!( + field.data_type().remove_nullable(), + TableDataType::Number(_) | TableDataType::Decimal(_) + ) { + return Err(ErrorCode::SemanticError( + "AUTO INCREMENT only supports Decimal or Numeric (e.g. INT32) types", + )); + } + has_autoincrement = true; + field.auto_increment_expr = Some(AutoIncrementExpr { + column_id: 0, + start: *start, + step: *step, + is_ordered: *is_ordered, + }); + } _ => has_computed = true, } } fields.push(field); } - let fields = if has_computed { + let mut fields = if has_computed { let mut source_fields = Vec::with_capacity(fields.len()); for (column, field) in columns.iter().zip(fields.iter()) { match &column.expr { @@ -1770,6 +1823,18 @@ impl Binder { } else { fields }; + // update auto increment expr column id + if has_autoincrement { + let table_schema = TableSchema::new(fields.clone()); + + for (i, table_field) in table_schema.fields().iter().enumerate() { + let Some(auto_increment_expr) = fields[i].auto_increment_expr.as_mut() else { + continue; + }; + + auto_increment_expr.column_id = table_field.column_id; + } + } let schema = TableSchemaRefExt::create(fields); Self::validate_create_table_schema(&schema)?; diff --git a/src/query/sql/src/planner/binder/default_expr.rs b/src/query/sql/src/planner/binder/default_expr.rs index 910916759f5a5..0caedd5307502 100644 --- a/src/query/sql/src/planner/binder/default_expr.rs +++ b/src/query/sql/src/planner/binder/default_expr.rs @@ -34,6 +34,8 @@ use databend_common_expression::RemoteDefaultExpr; use databend_common_expression::Scalar; use databend_common_expression::TableField; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_meta_app::principal::AutoIncrementKey; +use databend_common_meta_types::MetaId; use parking_lot::RwLock; use crate::binder::wrap_cast; @@ -52,6 +54,8 @@ use crate::MetadataRef; /// Helper for binding scalar expression with `BindContext`. pub struct DefaultExprBinder { + // the table id of the auto increment column processed by the binder + auto_increment_table_id: Option, bind_context: BindContext, ctx: Arc, dialect: Dialect, @@ -90,6 +94,7 @@ impl DefaultExprBinder { let rewriter = DefaultValueRewriter::new(); Ok(DefaultExprBinder { + auto_increment_table_id: None, bind_context, ctx, dialect, @@ -101,6 +106,11 @@ impl DefaultExprBinder { }) } + pub fn auto_increment_table_id(mut self, table_id: u64) -> Self { + self.auto_increment_table_id = Some(table_id); + self + } + fn evaluator(&self) -> Evaluator { Evaluator::new(&self.dummy_block, &self.func_ctx, &BUILTIN_FUNCTIONS) } @@ -189,6 +199,23 @@ impl DefaultExprBinder { } else { Ok(scalar_expr) } + } else if let (Some(table_id), Some(auto_increment_expr)) = + (&self.auto_increment_table_id, field.auto_increment_expr()) + { + let auto_increment_key = + AutoIncrementKey::new(*table_id, auto_increment_expr.column_id); + + Ok(ScalarExpr::AsyncFunctionCall(AsyncFunctionCall { + span: None, + func_name: "nextval".to_string(), + display_name: "".to_string(), + return_type: Box::new(field.data_type().clone()), + arguments: vec![], + func_arg: AsyncFunctionArgument::AutoIncrement { + key: auto_increment_key, + expr: auto_increment_expr.clone(), + }, + })) } else { Ok(ScalarExpr::ConstantExpr(ConstantExpr { span: None, @@ -225,6 +252,7 @@ impl DefaultExprBinder { schema: &DataSchema, ) -> Result { let scalar_expr = self.parse_and_bind(field)?; + scalar_expr .as_expr()? .project_column_ref(|col| schema.index_of(&col.index.to_string())) @@ -240,6 +268,9 @@ impl DefaultExprBinder { let expr = if let Some(async_func) = get_nextval(&scalar_expr) { let name = match async_func.func_arg { AsyncFunctionArgument::SequenceFunction(name) => name, + AsyncFunctionArgument::AutoIncrement { .. } => { + unreachable!("expect AsyncFunctionArgument::SequenceFunction") + } AsyncFunctionArgument::DictGetFunction(_) => { unreachable!("expect AsyncFunctionArgument::SequenceFunction") } @@ -265,30 +296,38 @@ impl DefaultExprBinder { let mut async_fields = vec![]; let mut async_fields_no_cast = vec![]; - for f in dest_schema.fields().iter() { - if !input_schema.has_field(f.name()) { - if let Some(default_expr) = f.default_expr() { - if default_expr.contains("nextval(") { - let scalar_expr = self.parse_and_bind(f)?; - if let Some(async_func) = get_nextval(&scalar_expr) { - async_func_descs.push(AsyncFunctionDesc { - func_name: async_func.func_name.clone(), - display_name: async_func.display_name.clone(), - // not used - output_column: 0, - arg_indices: vec![], - data_type: async_func.return_type.clone(), - func_arg: async_func.func_arg.clone(), - }); - async_fields.push(f.clone()); - async_fields_no_cast.push(DataField::new( - f.name(), - DataType::Number(NumberDataType::UInt64), - )); - } - } + let fn_check_auto_increment = |field: &DataField| { + if input_schema.has_field(field.name()) { + return false; + } + if let Some(default_expr) = field.default_expr() { + if default_expr.contains("nextval(") { + return true; } - }; + } + field.auto_increment_expr().is_some() + }; + for f in dest_schema.fields().iter() { + if !fn_check_auto_increment(f) { + continue; + } + let scalar_expr = self.parse_and_bind(f)?; + if let Some(async_func) = get_nextval(&scalar_expr) { + async_func_descs.push(AsyncFunctionDesc { + func_name: async_func.func_name.clone(), + display_name: async_func.display_name.clone(), + // not used + output_column: 0, + arg_indices: vec![], + data_type: async_func.return_type.clone(), + func_arg: async_func.func_arg.clone(), + }); + async_fields.push(f.clone()); + async_fields_no_cast.push(DataField::new( + f.name(), + DataType::Number(NumberDataType::UInt64), + )); + } } if async_func_descs.is_empty() { Ok(None) diff --git a/src/query/sql/src/planner/binder/insert.rs b/src/query/sql/src/planner/binder/insert.rs index d9dabbf053167..0cc84ce8e91d6 100644 --- a/src/query/sql/src/planner/binder/insert.rs +++ b/src/query/sql/src/planner/binder/insert.rs @@ -207,6 +207,7 @@ impl Binder { let default_exprs = if file_format_params.need_field_default() { Some( DefaultExprBinder::try_new(self.ctx.clone())? + .auto_increment_table_id(table.get_id()) .prepare_default_values(&required_values_schema)?, ) } else { diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index ef024a42a84bc..b39ed80f5bbfa 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -315,6 +315,7 @@ pub struct AddTableColumnPlan { pub option: AddColumnOption, pub is_deterministic: bool, pub is_nextval: bool, + pub is_autoincrement: bool, } impl AddTableColumnPlan { diff --git a/src/query/sql/src/planner/plans/scalar_expr.rs b/src/query/sql/src/planner/plans/scalar_expr.rs index 5d53302655ca7..2c6fcdd555361 100644 --- a/src/query/sql/src/planner/plans/scalar_expr.rs +++ b/src/query/sql/src/planner/plans/scalar_expr.rs @@ -30,12 +30,15 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberScalar; +use databend_common_expression::AutoIncrementExpr; use databend_common_expression::FunctionKind; use databend_common_expression::RemoteExpr; use databend_common_expression::Scalar; use databend_common_functions::aggregates::AggregateFunctionSortDesc; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_meta_app::principal::AutoIncrementKey; use databend_common_meta_app::principal::StageInfo; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetSequenceNextValueReq; use databend_common_meta_app::schema::SequenceIdent; use databend_common_meta_app::tenant::Tenant; @@ -1169,6 +1172,11 @@ pub enum AsyncFunctionArgument { // Used by `nextval` function to call meta's `get_sequence_next_value` api // to get incremental values. SequenceFunction(String), + // used for auto increment calling sequence + AutoIncrement { + key: AutoIncrementKey, + expr: AutoIncrementExpr, + }, // The dictionary argument is connection URL of remote source, like Redis, MySQL ... // Used by `dict_get` function to connect source and read data. DictGetFunction(DictGetFunctionArgument), @@ -1262,6 +1270,17 @@ impl AsyncFunctionCall { .await?; Ok(Scalar::Number(NumberScalar::UInt64(reply.start))) } + AsyncFunctionArgument::AutoIncrement { key, expr } => { + let req = GetAutoIncrementNextValueReq { + tenant, + expr: expr.clone(), + key: key.clone(), + count: 1, + }; + // Call meta's api to generate an incremental value. + let reply = catalog.get_autoincrement_next_value(req).await?; + Ok(Scalar::Number(NumberScalar::UInt64(reply.start))) + } AsyncFunctionArgument::DictGetFunction(_dict_get_function_argument) => { Err(ErrorCode::Internal("Cannot generate dict_get function")) } diff --git a/src/query/storages/common/table_meta/src/meta/v3/frozen/table_schema.rs b/src/query/storages/common/table_meta/src/meta/v3/frozen/table_schema.rs index 745dc54a93a1d..308743a6e92a5 100644 --- a/src/query/storages/common/table_meta/src/meta/v3/frozen/table_schema.rs +++ b/src/query/storages/common/table_meta/src/meta/v3/frozen/table_schema.rs @@ -100,6 +100,7 @@ mod converters { data_type: value.data_type.into(), column_id: value.column_id, computed_expr: None, + auto_increment_expr: None, } } } diff --git a/src/query/storages/hive/hive/src/hive_catalog.rs b/src/query/storages/hive/hive/src/hive_catalog.rs index 9f9d5b89224ff..7531c9b16dd78 100644 --- a/src/query/storages/hive/hive/src/hive_catalog.rs +++ b/src/query/storages/hive/hive/src/hive_catalog.rs @@ -57,6 +57,8 @@ use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -749,4 +751,11 @@ impl Catalog for HiveCatalog { async fn rename_dictionary(&self, _req: RenameDictionaryReq) -> Result<()> { unimplemented!() } + + async fn get_autoincrement_next_value( + &self, + _req: GetAutoIncrementNextValueReq, + ) -> Result { + unimplemented!() + } } diff --git a/src/query/storages/iceberg/src/catalog.rs b/src/query/storages/iceberg/src/catalog.rs index d116a0fd6cdbd..207f5c7e4ccc1 100644 --- a/src/query/storages/iceberg/src/catalog.rs +++ b/src/query/storages/iceberg/src/catalog.rs @@ -59,6 +59,8 @@ use databend_common_meta_app::schema::DropTableByIdReq; use databend_common_meta_app::schema::DropTableIndexReq; use databend_common_meta_app::schema::DropTableReply; use databend_common_meta_app::schema::ExtendLockRevReq; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReply; +use databend_common_meta_app::schema::GetAutoIncrementNextValueReq; use databend_common_meta_app::schema::GetDictionaryReply; use databend_common_meta_app::schema::GetIndexReply; use databend_common_meta_app::schema::GetIndexReq; @@ -745,4 +747,11 @@ impl Catalog for IcebergMutableCatalog { async fn rename_dictionary(&self, _req: RenameDictionaryReq) -> Result<()> { unimplemented!() } + + async fn get_autoincrement_next_value( + &self, + _req: GetAutoIncrementNextValueReq, + ) -> Result { + unimplemented!() + } } diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test b/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test index 3e6fc329ea980..db37ba1670bad 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test +++ b/tests/sqllogictests/suites/base/05_ddl/05_0000_ddl_create_tables.test @@ -683,4 +683,4 @@ select '\'0\'' = a from t 1 statement error 1075 -create or replace table t(id int) compression='xxxx'; \ No newline at end of file +create or replace table t(id int) compression='xxxx'; diff --git a/tests/suites/0_stateless/02_ddl/02_0001_autoincrement.result b/tests/suites/0_stateless/02_ddl/02_0001_autoincrement.result new file mode 100644 index 0000000000000..0a747f13f52f1 --- /dev/null +++ b/tests/suites/0_stateless/02_ddl/02_0001_autoincrement.result @@ -0,0 +1,89 @@ +>>>> +create or replace table test_autoincrement (c1 int, c2 string autoincrement); + +Error: APIError: QueryFailed: [1065]AUTO INCREMENT only supports Decimal or Numeric (e.g. INT32) types +<<<< +>>>> +create or replace table test_autoincrement (c1 int, c2 int default 1 autoincrement); + +Error: APIError: QueryFailed: [1005]error: + --> SQL:1:83 + | +1 | create or replace table test_autoincrement (c1 int, c2 int default 1 autoincrement) + | ------ ^ DEFAULT and AUTOINCREMENT cannot exist at the same time + | | + | while parsing `CREATE [OR REPLACE] TABLE [IF NOT EXISTS] [.]
[] []` + + +<<<< +>>>> +create or replace table test_autoincrement (c1 int, c2 int autoincrement, c3 float autoincrement, c4 int identity (1,2), c5 int autoincrement start 1 increment 2, c6 decimal(20, 2) autoincrement); + +>>>> +insert into test_autoincrement (c1) values(0); + +1 +>>>> +insert into test_autoincrement (c1) values(0); + +1 +>>>> +insert into test_autoincrement (c1) values(0); + +1 +>>>> select * from test_autoincrement order by c2; +0 0 0 1 1 0.00 +0 1 1 3 3 1.00 +0 2 2 5 5 2.00 +<<<< +>>>> +alter table test_autoincrement add column c4 string autoincrement; + +Error: APIError: QueryFailed: [1065]AUTO INCREMENT only supports Decimal or Numeric (e.g. INT32) types +<<<< +>>>> +alter table test_autoincrement add column c4 int default autoincrement; + +Error: APIError: QueryFailed: [1065]error: + --> SQL:1:58 + | +1 | alter table test_autoincrement add column c4 int default autoincrement + | ^^^^^^^^^^^^^ column autoincrement doesn't exist + + +<<<< +>>>> +alter table test_autoincrement add column c7 int autoincrement; + +Error: APIError: QueryFailed: [1132]Cannot add column 'c7' with `AUTOINCREMENT` to non-empty table 'test_autoincrement' +<<<< +>>>> +alter table test_autoincrement drop column c3; + +>>>> select * from test_autoincrement order by c2; +0 0 1 1 0.00 +0 1 3 3 1.00 +0 2 5 5 2.00 +<<<< +>>>> +create or replace table test_empty_autoincrement (c1 int); + +>>>> +alter table test_empty_autoincrement add column c2 int autoincrement; + +>>>> +insert into test_empty_autoincrement (c1) values(0); + +1 +>>>> +insert into test_empty_autoincrement (c1) values(0); + +1 +>>>> select * from test_empty_autoincrement order by c2; +0 0 +0 1 +<<<< +>>>> +drop table test_empty_autoincrement; + +>>>> drop database test diff --git a/tests/suites/0_stateless/02_ddl/02_0001_autoincrement.sh b/tests/suites/0_stateless/02_ddl/02_0001_autoincrement.sh new file mode 100755 index 0000000000000..231f22c18793a --- /dev/null +++ b/tests/suites/0_stateless/02_ddl/02_0001_autoincrement.sh @@ -0,0 +1,71 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../../../shell_env.sh + +echo "create or replace database test" | $BENDSQL_CLIENT_CONNECT + + +stmt """ +create or replace table test_autoincrement (c1 int, c2 string autoincrement); +""" +stmt """ +create or replace table test_autoincrement (c1 int, c2 int default 1 autoincrement); +""" +stmt """ +create or replace table test_autoincrement (c1 int, c2 int autoincrement, c3 float autoincrement, c4 int identity (1,2), c5 int autoincrement start 1 increment 2, c6 decimal(20, 2) autoincrement); +""" + +stmt """ +insert into test_autoincrement (c1) values(0); +""" + +stmt """ +insert into test_autoincrement (c1) values(0); +""" + +stmt """ +insert into test_autoincrement (c1) values(0); +""" + +query "select * from test_autoincrement order by c2;" + +stmt """ +alter table test_autoincrement add column c4 string autoincrement; +""" +stmt """ +alter table test_autoincrement add column c4 int default autoincrement; +""" +stmt """ +alter table test_autoincrement add column c7 int autoincrement; +""" + +stmt """ +alter table test_autoincrement drop column c3; +""" + +query "select * from test_autoincrement order by c2;" + +# test add column success on empty table +stmt """ +create or replace table test_empty_autoincrement (c1 int); +""" + +stmt """ +alter table test_empty_autoincrement add column c2 int autoincrement; +""" + +stmt """ +insert into test_empty_autoincrement (c1) values(0); +""" +stmt """ +insert into test_empty_autoincrement (c1) values(0); +""" + +query "select * from test_empty_autoincrement order by c2;" + +stmt """ +drop table test_empty_autoincrement; +""" + +stmt "drop database test" \ No newline at end of file diff --git a/tests/suites/0_stateless/02_ddl/02_0001_show_create_table.result b/tests/suites/0_stateless/02_ddl/02_0001_show_create_table.result index da8f51a1b66b5..026caf76043ad 100755 --- a/tests/suites/0_stateless/02_ddl/02_0001_show_create_table.result +++ b/tests/suites/0_stateless/02_ddl/02_0001_show_create_table.result @@ -104,4 +104,14 @@ tc CREATE TABLE tc ( id4 INT NULL, id5 INT NULL ) ENGINE=FUSE +>>>> +create or replace table test.auto_increment(c1 int, c2 int autoincrement, c3 int identity (2, 3) order); + +>>>> show create table test.auto_increment; +auto_increment CREATE TABLE auto_increment ( + c1 INT NULL, + c2 INT NULL AUTOINCREMENT (0, 1) ORDER, + c3 INT NULL AUTOINCREMENT (2, 3) ORDER +) ENGINE=FUSE +<<<< >>>> drop database test diff --git a/tests/suites/0_stateless/02_ddl/02_0001_show_create_table.sh b/tests/suites/0_stateless/02_ddl/02_0001_show_create_table.sh index 48c4a9f8f6e58..d24668c85bdfc 100755 --- a/tests/suites/0_stateless/02_ddl/02_0001_show_create_table.sh +++ b/tests/suites/0_stateless/02_ddl/02_0001_show_create_table.sh @@ -89,4 +89,11 @@ set sql_dialect='MySQL'; show create table test.tc; """ | $BENDSQL_CLIENT_CONNECT +stmt """ +create or replace table test.auto_increment(c1 int, c2 int autoincrement, c3 int identity (2, 3) order); +""" + +query "show create table test.auto_increment;" + + stmt "drop database test" diff --git a/tests/suites/0_stateless/12_time_travel/12_0002_time_travel_show.result b/tests/suites/0_stateless/12_time_travel/12_0002_time_travel_show.result index 7af6fc4e959bb..574d87cd257c6 100644 --- a/tests/suites/0_stateless/12_time_travel/12_0002_time_travel_show.result +++ b/tests/suites/0_stateless/12_time_travel/12_0002_time_travel_show.result @@ -1,6 +1,6 @@ t BASE TABLE db12_0002 default account_admin FUSE yyyy-mm-dd HH:MM:SS.ssssss 0 0 0 0 v_t db12_0002 default NULL VIEW yyyy-mm-dd HH:MM:SS.ssssss SELECT * FROM db12_0002.t t yyyy-mm-dd HH:MM:SS.ssssss -seq 1 1 yyyy-mm-dd HH:MM:SS.ssssss yyyy-mm-dd HH:MM:SS.ssssss NULL -seq1 1 1 yyyy-mm-dd HH:MM:SS.ssssss yyyy-mm-dd HH:MM:SS.ssssss NULL +1 1 yyyy-mm-dd HH:MM:SS.ssssss yyyy-mm-dd HH:MM:SS.ssssss NULL +1 1 yyyy-mm-dd HH:MM:SS.ssssss yyyy-mm-dd HH:MM:SS.ssssss NULL seq 1 1 yyyy-mm-dd HH:MM:SS.ssssss yyyy-mm-dd HH:MM:SS.ssssss NULL diff --git a/tests/suites/0_stateless/12_time_travel/12_0002_time_travel_show.sql b/tests/suites/0_stateless/12_time_travel/12_0002_time_travel_show.sql index 3065fcd41883d..6ccd098da1b46 100644 --- a/tests/suites/0_stateless/12_time_travel/12_0002_time_travel_show.sql +++ b/tests/suites/0_stateless/12_time_travel/12_0002_time_travel_show.sql @@ -17,7 +17,7 @@ DROP SEQUENCE if exists seq; DROP SEQUENCE if exists seq1; CREATE SEQUENCE seq; CREATE SEQUENCE seq1; -show sequences; +select interval, current, created_on, updated_on, comment from show_sequences(); desc sequence seq; DROP SEQUENCE if exists seq; DROP SEQUENCE if exists seq1; diff --git a/tests/suites/0_stateless/18_rbac/18_0016_seq_rbac.sh b/tests/suites/0_stateless/18_rbac/18_0016_seq_rbac.sh index 385c2d0a7472c..8ad506a630dd2 100755 --- a/tests/suites/0_stateless/18_rbac/18_0016_seq_rbac.sh +++ b/tests/suites/0_stateless/18_rbac/18_0016_seq_rbac.sh @@ -8,6 +8,10 @@ export USER_A_CONNECT="bendsql --user=a --password=123 --host=${QUERY_MYSQL_HAND export USER_B_CONNECT="bendsql --user=b --password=123 --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}" export USER_C_CONNECT="bendsql --user=c --password=123 --host=${QUERY_MYSQL_HANDLER_HOST} --port ${QUERY_HTTP_HANDLER_PORT}" +for seq in $(echo "select name from show_sequences();" | $BENDSQL_CLIENT_CONNECT); do + echo "drop sequence if exists $seq;" | $BENDSQL_CLIENT_CONNECT +done + echo "=== OLD LOGIC: user has super privileges can operator all sequences with enable_experimental_sequence_privilege_check=0 ===" echo "=== TEST USER A WITH SUPER PRIVILEGES ===" echo "set global enable_experimental_sequence_privilege_check=0;" | $BENDSQL_CLIENT_CONNECT