Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 70 additions & 19 deletions src/meta/api/src/garbage_collection_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::txn_op::Request;
use databend_common_meta_types::txn_op_response::Response;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::TxnRequest;
Expand Down Expand Up @@ -77,19 +78,26 @@ where
Self: Send + Sync,
Self: kvapi::KVApi<Error = MetaError>,
{
/// Garbage collect dropped tables.
///
/// Returns the approximate number of metadata keys removed.
/// Note: DeleteByPrefix operations count as 1 but may remove multiple keys.
#[fastrace::trace]
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<(), KVAppError> {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<usize, KVAppError> {
let mut num_meta_key_removed = 0;
for drop_id in req.drop_ids {
match drop_id {
DroppedId::Db { db_id, db_name } => {
gc_dropped_db_by_id(self, db_id, &req.tenant, &req.catalog, db_name).await?
num_meta_key_removed +=
gc_dropped_db_by_id(self, db_id, &req.tenant, &req.catalog, db_name).await?
}
DroppedId::Table { name, id } => {
gc_dropped_table_by_id(self, &req.tenant, &req.catalog, &name, &id).await?
num_meta_key_removed +=
gc_dropped_table_by_id(self, &req.tenant, &req.catalog, &name, &id).await?
}
}
}
Ok(())
Ok(num_meta_key_removed)
}
}

Expand All @@ -107,12 +115,15 @@ pub const ORPHAN_POSTFIX: &str = "orphan";
///
/// Dropped table can not be accessed by any query,
/// so it is safe to remove all the copied files in multiple sub transactions.
///
/// Returns the number of copied file entries removed.
async fn remove_copied_files_for_dropped_table(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
table_id: &TableId,
) -> Result<(), MetaError> {
) -> Result<usize, MetaError> {
let batch_size = 1024;

let mut num_removed_copied_files = 0;
// Loop until:
// - all cleaned
// - or table is removed from meta-service
Expand All @@ -122,7 +133,7 @@ async fn remove_copied_files_for_dropped_table(

let seq_meta = kv_api.get_pb(table_id).await?;
let Some(seq_table_meta) = seq_meta else {
return Ok(());
return Ok(num_removed_copied_files);
};

// TODO: enable this check. Currently when gc db, the table may not be dropped.
Expand All @@ -145,7 +156,7 @@ async fn remove_copied_files_for_dropped_table(
let copied_files = key_stream.take(batch_size).try_collect::<Vec<_>>().await?;

if copied_files.is_empty() {
return Ok(());
return Ok(num_removed_copied_files);
}

for copied_ident in copied_files.iter() {
Expand All @@ -163,6 +174,8 @@ async fn remove_copied_files_for_dropped_table(
copied_files.display()
);

num_removed_copied_files += copied_files.len();

// Txn failures are ignored for simplicity, since copied files kv pairs are put with ttl,
// they will not be leaked permanently, will be cleaned eventually.
send_txn(kv_api, txn).await?;
Expand Down Expand Up @@ -322,37 +335,45 @@ pub async fn get_history_tables_for_gc(

/// Permanently remove a dropped database from the meta-service.
/// then remove all **dropped and non-dropped** tables in the database.
///
/// Returns the approximate number of metadata keys removed.
/// Note: DeleteByPrefix operations count as 1 but may remove multiple keys.
async fn gc_dropped_db_by_id(
kv_api: &(impl GarbageCollectionApi + IndexApi + ?Sized),
db_id: u64,
tenant: &Tenant,
catalog: &String,
db_name: String,
) -> Result<(), KVAppError> {
) -> Result<usize, KVAppError> {
let mut num_meta_keys_removed = 0;
// List tables by tenant, db_id, table_name.
let db_id_history_ident = DatabaseIdHistoryIdent::new(tenant, db_name.clone());
let Some(seq_dbid_list) = kv_api.get_pb(&db_id_history_ident).await? else {
return Ok(());
info!("db_id_history_ident not found for db_id {}", db_id);
return Ok(num_meta_keys_removed);
};

let mut db_id_list = seq_dbid_list.data;

// If the db_id is not in the list, return.
if db_id_list.id_list.remove_first(&db_id).is_none() {
return Ok(());
info!("db_id_history_ident of db_id {} is empty", db_id);
return Ok(num_meta_keys_removed);
}

let dbid = DatabaseId { db_id };
let Some(seq_db_meta) = kv_api.get_pb(&dbid).await? else {
return Ok(());
info!("database meta of db_id {} is empty", db_id);
return Ok(num_meta_keys_removed);
};

if seq_db_meta.drop_on.is_none() {
// If db is not marked as dropped, just ignore the gc request and return directly.
// In subsequent KV transactions, we also verify that db_meta hasn't changed
// to ensure we don't reclaim metadata of the given database that might have been
// successfully undropped in a parallel operation.
return Ok(());
info!("database of db_id {} is not marked as dropped", db_id);
return Ok(num_meta_keys_removed);
}

// Mark database meta as gc_in_progress if necessary
Expand Down Expand Up @@ -397,13 +418,15 @@ async fn gc_dropped_db_by_id(
};
let dir_name = DirName::new_with_level(db_id_table_name, 1);

let mut num_db_id_table_name_keys_removed = 0;
let batch_size = 1024;
let key_stream = kv_api.list_pb_keys(&dir_name).await?;
let mut chunks = key_stream.chunks(batch_size);
while let Some(targets) = chunks.next().await {
let mut txn = TxnRequest::default();
use itertools::Itertools;
let targets: Vec<DBIdTableName> = targets.into_iter().try_collect()?;
num_db_id_table_name_keys_removed += targets.len();
for target in &targets {
txn.if_then.push(txn_op_del(target));
}
Expand All @@ -423,11 +446,17 @@ async fn gc_dropped_db_by_id(
);
}
}
info!(
"{} DbIdTableNames cleaned for database {}[{}]",
num_db_id_table_name_keys_removed, db_name, db_id,
);
num_meta_keys_removed += num_db_id_table_name_keys_removed;
}

let id_to_name = DatabaseIdToName { db_id };
let Some(seq_name) = kv_api.get_pb(&id_to_name).await? else {
return Ok(());
info!("id_to_name not found for db_id {}", db_id);
return Ok(num_meta_keys_removed);
};

let table_history_ident = TableIdHistoryIdent {
Expand All @@ -444,7 +473,8 @@ async fn gc_dropped_db_by_id(
for tb_id in table_history.id_list.iter() {
let table_id_ident = TableId { table_id: *tb_id };

remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?;
let num_removed_copied_files =
remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?;
let _ = remove_data_for_dropped_table(
kv_api,
tenant,
Expand All @@ -454,7 +484,7 @@ async fn gc_dropped_db_by_id(
&mut txn,
)
.await?;
remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn).await?;
num_meta_keys_removed += num_removed_copied_files;
}

txn.condition
Expand All @@ -481,24 +511,35 @@ async fn gc_dropped_db_by_id(
.push(txn_cond_eq_seq(&id_to_name, seq_name.seq));
txn.if_then.push(txn_op_del(&id_to_name));

// Count removed keys (approximate for DeleteByPrefix operations)
for op in &txn.if_then {
if let Some(Request::Delete(_) | Request::DeleteByPrefix(_)) = &op.request {
num_meta_keys_removed += 1;
}
}

let _resp = kv_api.transaction(txn).await?;

Ok(())
Ok(num_meta_keys_removed)
}

/// Permanently remove a dropped table from the meta-service.
///
/// The data of the table should already have been removed before calling this method.
///
/// Returns the approximate number of metadata keys removed.
/// Note: DeleteByPrefix operations count as 1 but may remove multiple keys.
async fn gc_dropped_table_by_id(
kv_api: &(impl GarbageCollectionApi + IndexApi + ?Sized),
tenant: &Tenant,
catalog: &String,
db_id_table_name: &DBIdTableName,
table_id_ident: &TableId,
) -> Result<(), KVAppError> {
) -> Result<usize, KVAppError> {
// First remove all copied files for the dropped table.
// These markers are not part of the table and can be removed in separate transactions.
remove_copied_files_for_dropped_table(kv_api, table_id_ident).await?;
let num_removed_copied_files =
remove_copied_files_for_dropped_table(kv_api, table_id_ident).await?;

let mut trials = txn_backoff(None, func_name!());
loop {
Expand Down Expand Up @@ -532,12 +573,22 @@ async fn gc_dropped_table_by_id(
.await?;

// 3)

remove_index_for_dropped_table(kv_api, tenant, table_id_ident, &mut txn).await?;

// Count removed keys (approximate for DeleteByPrefix operations)
let mut num_meta_keys_removed = 0;
for op in &txn.if_then {
if let Some(Request::Delete(_) | Request::DeleteByPrefix(_)) = &op.request {
num_meta_keys_removed += 1;
}
}
num_meta_keys_removed += num_removed_copied_files;

let (succ, _responses) = send_txn(kv_api, txn).await?;

if succ {
return Ok(());
return Ok(num_meta_keys_removed);
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,11 @@ pub trait Catalog: DynClone + Send + Sync + Debug {
))
}

async fn gc_drop_tables(&self, _req: GcDroppedTableReq) -> Result<()> {
/// Garbage collect dropped tables and databases.
///
/// Returns the approximate number of metadata keys removed.
/// Note: DeleteByPrefix operations count as 1 but may remove multiple keys.
async fn gc_drop_tables(&self, _req: GcDroppedTableReq) -> Result<usize> {
Err(ErrorCode::Unimplemented("'gc_drop_tables' not implemented"))
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use databend_common_meta_app::schema::DatabaseId;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::storage::StorageParams;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KvApiExt;
use databend_common_meta_store::MetaStore;
use databend_common_meta_store::MetaStoreProvider;
use databend_common_meta_types::TxnRequest;
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/catalogs/default/database_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ impl Catalog for DatabaseCatalog {
self.mutable_catalog.get_drop_table_infos(req).await
}

async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<()> {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<usize> {
self.mutable_catalog.gc_drop_tables(req).await
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ impl Catalog for MutableCatalog {
Ok((tables, drop_ids))
}

async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<()> {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<usize> {
let meta = self.ctx.meta.clone();
let resp = meta.gc_drop_tables(req).await?;
Ok(resp)
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/catalogs/default/session_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ impl Catalog for SessionCatalog {
self.inner.get_drop_table_infos(req).await
}

async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<()> {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<usize> {
self.inner.gc_drop_tables(req).await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ impl VacuumDropTablesInterpreter {
Ok(VacuumDropTablesInterpreter { ctx, plan })
}

/// Vacuum metadata of dropped tables and databases.
///
/// Returns the approximate number of metadata keys removed.
async fn gc_drop_tables(
&self,
catalog: Arc<dyn Catalog>,
drop_ids: Vec<DroppedId>,
) -> Result<()> {
) -> Result<usize> {
info!(
"vacuum metadata of dropped table from db {:?}",
self.plan.database,
Expand All @@ -83,6 +86,7 @@ impl VacuumDropTablesInterpreter {

let chunk_size = 50;

let mut num_meta_keys_removed = 0;
// first gc drop table ids
for c in drop_db_table_ids.chunks(chunk_size) {
info!("vacuum drop {} table ids: {:?}", c.len(), c);
Expand All @@ -91,7 +95,7 @@ impl VacuumDropTablesInterpreter {
catalog: self.plan.catalog.clone(),
drop_ids: c.to_vec(),
};
catalog.gc_drop_tables(req).await?;
num_meta_keys_removed += catalog.gc_drop_tables(req).await?;
}

// then gc drop db ids
Expand All @@ -102,10 +106,10 @@ impl VacuumDropTablesInterpreter {
catalog: self.plan.catalog.clone(),
drop_ids: c.to_vec(),
};
catalog.gc_drop_tables(req).await?;
num_meta_keys_removed += catalog.gc_drop_tables(req).await?;
}

Ok(())
Ok(num_meta_keys_removed)
}
}

Expand Down Expand Up @@ -224,6 +228,7 @@ impl Interpreter for VacuumDropTablesInterpreter {
.map(|id| *containing_db.get(id).unwrap())
.collect::<HashSet<_>>();

let mut num_meta_keys_removed = 0;
// gc metadata only when not dry run
if self.plan.option.dry_run.is_none() {
let mut success_dropped_ids = vec![];
Expand Down Expand Up @@ -253,15 +258,15 @@ impl Interpreter for VacuumDropTablesInterpreter {
info!("failed table ids: {:?}", failed_tables);
}

self.gc_drop_tables(catalog, success_dropped_ids).await?;
num_meta_keys_removed = self.gc_drop_tables(catalog, success_dropped_ids).await?;
}

let success_count = tables_count as u64 - failed_tables.len() as u64;
let failed_count = failed_tables.len() as u64;

info!(
"=== VACUUM DROP TABLE COMPLETED === success: {}, failed: {}, total: {}",
success_count, failed_count, tables_count
"=== VACUUM DROP TABLE COMPLETED === success: {}, failed: {}, total: {}, num_meta_keys_removed: {}",
success_count, failed_count, tables_count, num_meta_keys_removed
);

match files_opt {
Expand Down