diff --git a/src/meta/api/src/garbage_collection_api.rs b/src/meta/api/src/garbage_collection_api.rs index d8553b85de1d8..c46b244780a91 100644 --- a/src/meta/api/src/garbage_collection_api.rs +++ b/src/meta/api/src/garbage_collection_api.rs @@ -42,6 +42,7 @@ use databend_common_meta_app::tenant::Tenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::txn_op::Request; use databend_common_meta_types::txn_op_response::Response; use databend_common_meta_types::MetaError; use databend_common_meta_types::TxnRequest; @@ -77,19 +78,26 @@ where Self: Send + Sync, Self: kvapi::KVApi, { + /// Garbage collect dropped tables. + /// + /// Returns the approximate number of metadata keys removed. + /// Note: DeleteByPrefix operations count as 1 but may remove multiple keys. #[fastrace::trace] - async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<(), KVAppError> { + async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result { + let mut num_meta_key_removed = 0; for drop_id in req.drop_ids { match drop_id { DroppedId::Db { db_id, db_name } => { - gc_dropped_db_by_id(self, db_id, &req.tenant, &req.catalog, db_name).await? + num_meta_key_removed += + gc_dropped_db_by_id(self, db_id, &req.tenant, &req.catalog, db_name).await? } DroppedId::Table { name, id } => { - gc_dropped_table_by_id(self, &req.tenant, &req.catalog, &name, &id).await? + num_meta_key_removed += + gc_dropped_table_by_id(self, &req.tenant, &req.catalog, &name, &id).await? } } } - Ok(()) + Ok(num_meta_key_removed) } } @@ -107,12 +115,15 @@ pub const ORPHAN_POSTFIX: &str = "orphan"; /// /// Dropped table can not be accessed by any query, /// so it is safe to remove all the copied files in multiple sub transactions. +/// +/// Returns the number of copied file entries removed. async fn remove_copied_files_for_dropped_table( kv_api: &(impl kvapi::KVApi + ?Sized), table_id: &TableId, -) -> Result<(), MetaError> { +) -> Result { let batch_size = 1024; + let mut num_removed_copied_files = 0; // Loop until: // - all cleaned // - or table is removed from meta-service @@ -122,7 +133,7 @@ async fn remove_copied_files_for_dropped_table( let seq_meta = kv_api.get_pb(table_id).await?; let Some(seq_table_meta) = seq_meta else { - return Ok(()); + return Ok(num_removed_copied_files); }; // TODO: enable this check. Currently when gc db, the table may not be dropped. @@ -145,7 +156,7 @@ async fn remove_copied_files_for_dropped_table( let copied_files = key_stream.take(batch_size).try_collect::>().await?; if copied_files.is_empty() { - return Ok(()); + return Ok(num_removed_copied_files); } for copied_ident in copied_files.iter() { @@ -163,6 +174,8 @@ async fn remove_copied_files_for_dropped_table( copied_files.display() ); + num_removed_copied_files += copied_files.len(); + // Txn failures are ignored for simplicity, since copied files kv pairs are put with ttl, // they will not be leaked permanently, will be cleaned eventually. send_txn(kv_api, txn).await?; @@ -322,29 +335,36 @@ pub async fn get_history_tables_for_gc( /// Permanently remove a dropped database from the meta-service. /// then remove all **dropped and non-dropped** tables in the database. +/// +/// Returns the approximate number of metadata keys removed. +/// Note: DeleteByPrefix operations count as 1 but may remove multiple keys. async fn gc_dropped_db_by_id( kv_api: &(impl GarbageCollectionApi + IndexApi + ?Sized), db_id: u64, tenant: &Tenant, catalog: &String, db_name: String, -) -> Result<(), KVAppError> { +) -> Result { + let mut num_meta_keys_removed = 0; // List tables by tenant, db_id, table_name. let db_id_history_ident = DatabaseIdHistoryIdent::new(tenant, db_name.clone()); let Some(seq_dbid_list) = kv_api.get_pb(&db_id_history_ident).await? else { - return Ok(()); + info!("db_id_history_ident not found for db_id {}", db_id); + return Ok(num_meta_keys_removed); }; let mut db_id_list = seq_dbid_list.data; // If the db_id is not in the list, return. if db_id_list.id_list.remove_first(&db_id).is_none() { - return Ok(()); + info!("db_id_history_ident of db_id {} is empty", db_id); + return Ok(num_meta_keys_removed); } let dbid = DatabaseId { db_id }; let Some(seq_db_meta) = kv_api.get_pb(&dbid).await? else { - return Ok(()); + info!("database meta of db_id {} is empty", db_id); + return Ok(num_meta_keys_removed); }; if seq_db_meta.drop_on.is_none() { @@ -352,7 +372,8 @@ async fn gc_dropped_db_by_id( // In subsequent KV transactions, we also verify that db_meta hasn't changed // to ensure we don't reclaim metadata of the given database that might have been // successfully undropped in a parallel operation. - return Ok(()); + info!("database of db_id {} is not marked as dropped", db_id); + return Ok(num_meta_keys_removed); } // Mark database meta as gc_in_progress if necessary @@ -397,6 +418,7 @@ async fn gc_dropped_db_by_id( }; let dir_name = DirName::new_with_level(db_id_table_name, 1); + let mut num_db_id_table_name_keys_removed = 0; let batch_size = 1024; let key_stream = kv_api.list_pb_keys(&dir_name).await?; let mut chunks = key_stream.chunks(batch_size); @@ -404,6 +426,7 @@ async fn gc_dropped_db_by_id( let mut txn = TxnRequest::default(); use itertools::Itertools; let targets: Vec = targets.into_iter().try_collect()?; + num_db_id_table_name_keys_removed += targets.len(); for target in &targets { txn.if_then.push(txn_op_del(target)); } @@ -423,11 +446,17 @@ async fn gc_dropped_db_by_id( ); } } + info!( + "{} DbIdTableNames cleaned for database {}[{}]", + num_db_id_table_name_keys_removed, db_name, db_id, + ); + num_meta_keys_removed += num_db_id_table_name_keys_removed; } let id_to_name = DatabaseIdToName { db_id }; let Some(seq_name) = kv_api.get_pb(&id_to_name).await? else { - return Ok(()); + info!("id_to_name not found for db_id {}", db_id); + return Ok(num_meta_keys_removed); }; let table_history_ident = TableIdHistoryIdent { @@ -444,7 +473,8 @@ async fn gc_dropped_db_by_id( for tb_id in table_history.id_list.iter() { let table_id_ident = TableId { table_id: *tb_id }; - remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?; + let num_removed_copied_files = + remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?; let _ = remove_data_for_dropped_table( kv_api, tenant, @@ -454,7 +484,7 @@ async fn gc_dropped_db_by_id( &mut txn, ) .await?; - remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn).await?; + num_meta_keys_removed += num_removed_copied_files; } txn.condition @@ -481,24 +511,35 @@ async fn gc_dropped_db_by_id( .push(txn_cond_eq_seq(&id_to_name, seq_name.seq)); txn.if_then.push(txn_op_del(&id_to_name)); + // Count removed keys (approximate for DeleteByPrefix operations) + for op in &txn.if_then { + if let Some(Request::Delete(_) | Request::DeleteByPrefix(_)) = &op.request { + num_meta_keys_removed += 1; + } + } + let _resp = kv_api.transaction(txn).await?; - Ok(()) + Ok(num_meta_keys_removed) } /// Permanently remove a dropped table from the meta-service. /// /// The data of the table should already have been removed before calling this method. +/// +/// Returns the approximate number of metadata keys removed. +/// Note: DeleteByPrefix operations count as 1 but may remove multiple keys. async fn gc_dropped_table_by_id( kv_api: &(impl GarbageCollectionApi + IndexApi + ?Sized), tenant: &Tenant, catalog: &String, db_id_table_name: &DBIdTableName, table_id_ident: &TableId, -) -> Result<(), KVAppError> { +) -> Result { // First remove all copied files for the dropped table. // These markers are not part of the table and can be removed in separate transactions. - remove_copied_files_for_dropped_table(kv_api, table_id_ident).await?; + let num_removed_copied_files = + remove_copied_files_for_dropped_table(kv_api, table_id_ident).await?; let mut trials = txn_backoff(None, func_name!()); loop { @@ -532,12 +573,22 @@ async fn gc_dropped_table_by_id( .await?; // 3) + remove_index_for_dropped_table(kv_api, tenant, table_id_ident, &mut txn).await?; + // Count removed keys (approximate for DeleteByPrefix operations) + let mut num_meta_keys_removed = 0; + for op in &txn.if_then { + if let Some(Request::Delete(_) | Request::DeleteByPrefix(_)) = &op.request { + num_meta_keys_removed += 1; + } + } + num_meta_keys_removed += num_removed_copied_files; + let (succ, _responses) = send_txn(kv_api, txn).await?; if succ { - return Ok(()); + return Ok(num_meta_keys_removed); } } } diff --git a/src/query/catalog/src/catalog/interface.rs b/src/query/catalog/src/catalog/interface.rs index bf7ba22394167..dfe090d6c0ee5 100644 --- a/src/query/catalog/src/catalog/interface.rs +++ b/src/query/catalog/src/catalog/interface.rs @@ -324,7 +324,11 @@ pub trait Catalog: DynClone + Send + Sync + Debug { )) } - async fn gc_drop_tables(&self, _req: GcDroppedTableReq) -> Result<()> { + /// Garbage collect dropped tables and databases. + /// + /// Returns the approximate number of metadata keys removed. + /// Note: DeleteByPrefix operations count as 1 but may remove multiple keys. + async fn gc_drop_tables(&self, _req: GcDroppedTableReq) -> Result { Err(ErrorCode::Unimplemented("'gc_drop_tables' not implemented")) } diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index a90b31a708c7c..4a07c7ad2a471 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -32,7 +32,7 @@ use databend_common_meta_app::schema::DatabaseId; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::storage::StorageParams; -use databend_common_meta_kvapi::kvapi::KVApi; +use databend_common_meta_kvapi::kvapi::KvApiExt; use databend_common_meta_store::MetaStore; use databend_common_meta_store::MetaStoreProvider; use databend_common_meta_types::TxnRequest; diff --git a/src/query/service/src/catalogs/default/database_catalog.rs b/src/query/service/src/catalogs/default/database_catalog.rs index 68724d2b5c619..d33108ea3680c 100644 --- a/src/query/service/src/catalogs/default/database_catalog.rs +++ b/src/query/service/src/catalogs/default/database_catalog.rs @@ -761,7 +761,7 @@ impl Catalog for DatabaseCatalog { self.mutable_catalog.get_drop_table_infos(req).await } - async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<()> { + async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result { self.mutable_catalog.gc_drop_tables(req).await } diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index d653dd329f0e8..4435fd262285b 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -637,7 +637,7 @@ impl Catalog for MutableCatalog { Ok((tables, drop_ids)) } - async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<()> { + async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result { let meta = self.ctx.meta.clone(); let resp = meta.gc_drop_tables(req).await?; Ok(resp) diff --git a/src/query/service/src/catalogs/default/session_catalog.rs b/src/query/service/src/catalogs/default/session_catalog.rs index c387aa7ad0997..c347c614a47f4 100644 --- a/src/query/service/src/catalogs/default/session_catalog.rs +++ b/src/query/service/src/catalogs/default/session_catalog.rs @@ -421,7 +421,7 @@ impl Catalog for SessionCatalog { self.inner.get_drop_table_infos(req).await } - async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<()> { + async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result { self.inner.gc_drop_tables(req).await } diff --git a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs index 54d37582edfcf..468c01f4dbdea 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs @@ -52,11 +52,14 @@ impl VacuumDropTablesInterpreter { Ok(VacuumDropTablesInterpreter { ctx, plan }) } + /// Vacuum metadata of dropped tables and databases. + /// + /// Returns the approximate number of metadata keys removed. async fn gc_drop_tables( &self, catalog: Arc, drop_ids: Vec, - ) -> Result<()> { + ) -> Result { info!( "vacuum metadata of dropped table from db {:?}", self.plan.database, @@ -83,6 +86,7 @@ impl VacuumDropTablesInterpreter { let chunk_size = 50; + let mut num_meta_keys_removed = 0; // first gc drop table ids for c in drop_db_table_ids.chunks(chunk_size) { info!("vacuum drop {} table ids: {:?}", c.len(), c); @@ -91,7 +95,7 @@ impl VacuumDropTablesInterpreter { catalog: self.plan.catalog.clone(), drop_ids: c.to_vec(), }; - catalog.gc_drop_tables(req).await?; + num_meta_keys_removed += catalog.gc_drop_tables(req).await?; } // then gc drop db ids @@ -102,10 +106,10 @@ impl VacuumDropTablesInterpreter { catalog: self.plan.catalog.clone(), drop_ids: c.to_vec(), }; - catalog.gc_drop_tables(req).await?; + num_meta_keys_removed += catalog.gc_drop_tables(req).await?; } - Ok(()) + Ok(num_meta_keys_removed) } } @@ -224,6 +228,7 @@ impl Interpreter for VacuumDropTablesInterpreter { .map(|id| *containing_db.get(id).unwrap()) .collect::>(); + let mut num_meta_keys_removed = 0; // gc metadata only when not dry run if self.plan.option.dry_run.is_none() { let mut success_dropped_ids = vec![]; @@ -253,15 +258,15 @@ impl Interpreter for VacuumDropTablesInterpreter { info!("failed table ids: {:?}", failed_tables); } - self.gc_drop_tables(catalog, success_dropped_ids).await?; + num_meta_keys_removed = self.gc_drop_tables(catalog, success_dropped_ids).await?; } let success_count = tables_count as u64 - failed_tables.len() as u64; let failed_count = failed_tables.len() as u64; info!( - "=== VACUUM DROP TABLE COMPLETED === success: {}, failed: {}, total: {}", - success_count, failed_count, tables_count + "=== VACUUM DROP TABLE COMPLETED === success: {}, failed: {}, total: {}, num_meta_keys_removed: {}", + success_count, failed_count, tables_count, num_meta_keys_removed ); match files_opt {