Skip to content

Commit 51a67dd

Browse files
authored
fix: vacuum tables that are dropped by create or replace statement (#18751)
* fix: vacuum tables dropped by `create or replace` statement Tables implicitly dropped by the `create or replace` statement are not handled correctly: The vacuum operation collects candidate tables according to the `drop_on` mark of table meta data, unfortunately, during `create or replace`, the table being replaced is NOT marked with `drop_on`. In this PR, for table ids that have no `drop_on` marks, will be selected as the candidates for vacuum even, except it is the largest table id (of the same table name), which is the live table that should be kept. * fix typo * tweak integration test * fix lint * refactor: include all tables of a given dropped db
1 parent ee0b686 commit 51a67dd

File tree

5 files changed

+166
-14
lines changed

5 files changed

+166
-14
lines changed

โ€ŽCargo.lockโ€Ž

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

โ€Žsrc/meta/api/src/garbage_collection_api.rsโ€Ž

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashSet;
1516
use std::ops::Range;
1617

1718
use chrono::DateTime;
@@ -179,6 +180,7 @@ pub async fn get_history_tables_for_gc(
179180
drop_time_range: Range<Option<DateTime<Utc>>>,
180181
db_id: u64,
181182
limit: usize,
183+
db_is_dropped: bool,
182184
) -> Result<Vec<TableNIV>, KVAppError> {
183185
info!(
184186
"get_history_tables_for_gc: db_id {}, limit {}",
@@ -194,9 +196,37 @@ pub async fn get_history_tables_for_gc(
194196

195197
let mut args = vec![];
196198

199+
// For table ids of each table with the same name, we keep the largest one, e.g.
200+
// ```
201+
// create or replace table t ... ; -- table_id 1
202+
// create or replace table t ... ; -- table_id 2
203+
// ```
204+
// table_id 2 will be kept for table t (we do not care about the table name though),
205+
//
206+
// Please note that the largest table id might be "invisible", e.g.
207+
// ```
208+
// create or replace table t ... ; -- table_id 1
209+
// create or replace table t ... ; -- table_id 2
210+
// drop table t ... ;
211+
// ```
212+
// In this case, table_id 2 is marked as dropped.
213+
214+
let mut latest_table_ids = HashSet::with_capacity(table_history_kvs.len());
215+
197216
for (ident, table_history) in table_history_kvs {
198-
for table_id in table_history.id_list.iter() {
199-
args.push((TableId::new(*table_id), ident.table_name.clone()));
217+
let id_list = &table_history.id_list;
218+
if !id_list.is_empty() {
219+
// Make sure that the last table id is also the max one (of each table name).
220+
let last_id = id_list.last().unwrap();
221+
{
222+
let max_id = id_list.iter().max().unwrap();
223+
assert_eq!(max_id, last_id);
224+
}
225+
latest_table_ids.insert(*last_id);
226+
227+
for table_id in id_list.iter() {
228+
args.push((TableId::new(*table_id), ident.table_name.clone()));
229+
}
200230
}
201231
}
202232

@@ -228,10 +258,39 @@ pub async fn get_history_tables_for_gc(
228258
continue;
229259
};
230260

231-
if !drop_time_range.contains(&seq_meta.data.drop_on) {
232-
debug!("table {:?} is not in drop_time_range", seq_meta.data);
233-
num_out_of_time_range += 1;
234-
continue;
261+
if !db_is_dropped {
262+
// We shall not vacuum it if the table id is the largest table id of some table, but not marked with drop_on,
263+
{
264+
let is_visible_last_active_table = seq_meta.data.drop_on.is_none()
265+
&& latest_table_ids.contains(&table_id.table_id);
266+
267+
if is_visible_last_active_table {
268+
debug!(
269+
"Table id {:?} of {} is the last visible one, not available for vacuum",
270+
table_id, table_name,
271+
);
272+
num_out_of_time_range += 1;
273+
continue;
274+
}
275+
}
276+
277+
// Now the table id is
278+
// - Either marked with drop_on
279+
// We use its `drop_on` to do the time range filtering
280+
// - Or has no drop_on, but is not the last visible one of the table
281+
// It is still available for vacuum, and we use its `updated_on` to do the time range filtering
282+
283+
let time_point = if seq_meta.drop_on.is_none() {
284+
&Some(seq_meta.updated_on)
285+
} else {
286+
&seq_meta.drop_on
287+
};
288+
289+
if !drop_time_range.contains(time_point) {
290+
debug!("table {:?} is not in drop_time_range", seq_meta.data);
291+
num_out_of_time_range += 1;
292+
continue;
293+
}
235294
}
236295

237296
filter_tb_infos.push(TableNIV::new(

โ€Žsrc/meta/api/src/table_api.rsโ€Ž

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,6 +1601,7 @@ where
16011601
table_drop_time_range,
16021602
db_info.database_id.db_id,
16031603
capacity,
1604+
vacuum_db,
16041605
)
16051606
.await?;
16061607

@@ -1647,9 +1648,14 @@ where
16471648
};
16481649

16491650
let database_id = seq_db_id.data;
1650-
let table_nivs =
1651-
get_history_tables_for_gc(self, drop_time_range.clone(), database_id.db_id, the_limit)
1652-
.await?;
1651+
let table_nivs = get_history_tables_for_gc(
1652+
self,
1653+
drop_time_range.clone(),
1654+
database_id.db_id,
1655+
the_limit,
1656+
false,
1657+
)
1658+
.await?;
16531659

16541660
let mut drop_ids = vec![];
16551661
let mut vacuum_tables = vec![];

โ€Žsrc/query/ee/Cargo.tomlโ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ uuid = { workspace = true }
6666

6767
[dev-dependencies]
6868
databend-common-functions = { workspace = true }
69+
databend-common-meta-kvapi = { workspace = true }
6970
jsonb = { workspace = true }
7071
tantivy = { workspace = true }
7172
walkdir = { workspace = true }

โ€Žsrc/query/ee/tests/it/storages/fuse/operations/vacuum.rsโ€Ž

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use databend_common_meta_app::schema::DatabaseId;
3232
use databend_common_meta_app::schema::TableInfo;
3333
use databend_common_meta_app::schema::TableMeta;
3434
use databend_common_meta_app::storage::StorageParams;
35+
use databend_common_meta_kvapi::kvapi::KVApi;
3536
use databend_common_meta_store::MetaStore;
3637
use databend_common_meta_store::MetaStoreProvider;
3738
use databend_common_meta_types::TxnRequest;
@@ -713,17 +714,101 @@ async fn test_gc_in_progress_db_not_undroppable() -> Result<()> {
713714
Ok(())
714715
}
715716

717+
#[tokio::test(flavor = "multi_thread")]
718+
async fn test_vacuum_drop_create_or_replace() -> Result<()> {
719+
// vacuum dropped tables by specific database names
720+
test_vacuum_drop_create_or_replace_impl(&[
721+
"vacuum drop table from db1",
722+
"vacuum drop table from db2",
723+
])
724+
.await?;
725+
726+
// vacuum dropped tables all
727+
test_vacuum_drop_create_or_replace_impl(&["vacuum drop table"]).await?;
728+
Ok(())
729+
}
730+
731+
async fn test_vacuum_drop_create_or_replace_impl(vacuum_stmts: &[&str]) -> Result<()> {
732+
// Setup
733+
let meta = new_local_meta().await;
734+
let endpoints = meta.endpoints.clone();
735+
736+
let mut ee_setup = EESetup::new();
737+
let config = ee_setup.config_mut();
738+
config.meta.endpoints = endpoints.clone();
739+
740+
let fixture = TestFixture::setup_with_custom(ee_setup).await?;
741+
742+
// Adjust retention period to 0, so that dropped tables will be available for vacuum immediately
743+
let session = fixture.default_session();
744+
session.get_settings().set_data_retention_time_in_days(0)?;
745+
746+
// Prepare test dbs and tables
747+
// - 2 db ids
748+
// - 6 table ids
749+
// - db2.t1 explicitly dropped
750+
751+
let sqls = vec![
752+
"create database db1",
753+
"create or replace table db1.t1 (a int)",
754+
"create or replace table db1.t1 (a int) as select 1",
755+
"create or replace table db1.t1 (a int) as select 2",
756+
"create database db2",
757+
"create or replace table db2.t1 (a int) as select 1",
758+
"create or replace table db2.t1 (a int) as select 2",
759+
"create or replace table db2.t1 (a int) as select 3",
760+
"DROP TABLE DB2.T1",
761+
];
762+
763+
for sql in sqls {
764+
fixture.execute_command(sql).await?;
765+
}
766+
767+
// there should be 6 table ids : create or replace table 6 times
768+
let prefix = "__fd_table_by_id";
769+
let items = meta.list_kv_collect(prefix).await?;
770+
assert_eq!(items.len(), 6);
771+
772+
// there should be ownerships for 2 db ids and 5 table ids, one of the ownership is revoked by drop table
773+
let prefix = "__fd_object_owners";
774+
let items = meta.list_kv_collect(prefix).await?;
775+
assert_eq!(items.len(), 7);
776+
777+
for sql in vacuum_stmts {
778+
fixture.execute_command(sql).await?;
779+
}
780+
781+
// After vacuum, 1 table ids left
782+
let prefix = "__fd_table_by_id";
783+
let items = meta.list_kv_collect(prefix).await?;
784+
assert_eq!(items.len(), 1);
785+
786+
let prefix = "__fd_table_id_to_name";
787+
let items = meta.list_kv_collect(prefix).await?;
788+
assert_eq!(items.len(), 1);
789+
790+
// There are ownership objs of 2 dbs and 1 tables
791+
let prefix = "__fd_object_owners";
792+
let items = meta.list_kv_collect(prefix).await?;
793+
assert_eq!(items.len(), 3);
794+
795+
// db1.t1 should still be accessible
796+
fixture.execute_command("select * from db1.t1").await?;
797+
// db2.t1 should not exist
798+
assert!(fixture
799+
.execute_command("select * from db2.t1")
800+
.await
801+
.is_err());
802+
Ok(())
803+
}
804+
716805
async fn new_local_meta() -> MetaStore {
717806
let version = &BUILD_INFO;
718807
let meta_config = MetaConfig::default();
719808
let meta = {
720809
let config = meta_config.to_meta_grpc_client_conf(version);
721810
let provider = Arc::new(MetaStoreProvider::new(config));
722-
provider
723-
.create_meta_store()
724-
.await
725-
.map_err(|e| ErrorCode::MetaServiceError(format!("Failed to create meta store: {}", e)))
726-
.unwrap()
811+
provider.create_meta_store().await.unwrap()
727812
};
728813
meta
729814
}

0 commit comments

Comments
ย (0)