Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](bloom filter)Fix drop column with bloom filter index (#44361) #44480

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,12 @@ private boolean processDropColumn(DropColumnClause alterClause, OlapTable olapTa
// drop bloom filter column
Set<String> bfCols = olapTable.getCopiedBfColumns();
if (bfCols != null) {
Set<String> newBfCols = new HashSet<>();
Set<String> newBfCols = null;
for (String bfCol : bfCols) {
if (!bfCol.equalsIgnoreCase(dropColName)) {
if (newBfCols == null) {
newBfCols = Sets.newHashSet();
}
newBfCols.add(bfCol);
}
}
Expand Down Expand Up @@ -2736,6 +2739,25 @@ public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable o
LOG.info("finished modify table's add or drop or modify columns. table: {}, job: {}, is replay: {}",
olapTable.getName(), jobId, isReplay);
}
// for bloom filter, rebuild bloom filter info by table schema in replay
if (isReplay) {
Set<String> bfCols = olapTable.getCopiedBfColumns();
if (bfCols != null) {
List<Column> columns = olapTable.getBaseSchema();
Set<String> newBfCols = null;
for (String bfCol : bfCols) {
for (Column column : columns) {
if (column.getName().equalsIgnoreCase(bfCol)) {
if (newBfCols == null) {
newBfCols = Sets.newHashSet();
}
newBfCols.add(column.getName());
}
}
}
olapTable.setBloomFilterInfo(newBfCols, olapTable.getBfFpp());
}
}
}

public void replayModifyTableLightSchemaChange(TableAddOrDropColumnsInfo info) throws MetaNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
1 1
1 1 1

-- !select --
1 \N
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,93 @@ suite("test_bloom_filter_drop_column") {

sql """CREATE TABLE IF NOT EXISTS ${table_name} (
`a` varchar(150) NULL,
`c1` varchar(10)
`c1` varchar(10),
`c2` varchar(10)
) ENGINE=OLAP
DUPLICATE KEY(`a`)
DISTRIBUTED BY HASH(`a`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"bloom_filter_columns" = "c1",
"bloom_filter_columns" = "c1, c2",
"in_memory" = "false",
"storage_format" = "V2"
)"""
def timeout = 60000
def delta_time = 1000
def alter_res = "null"
def useTime = 0

sql """INSERT INTO ${table_name} values ('1', '1')"""
def wait_for_latest_op_on_table_finish = { tableName, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${tableName}" ORDER BY CreateTime DESC LIMIT 1;"""
alter_res = alter_res.toString()
if(alter_res.contains("FINISHED")) {
sleep(3000) // wait change table state to normal
logger.info(table_name + " latest alter job finished, detail: " + alter_res)
break
}
useTime = t
sleep(delta_time)
}
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}

def assertShowCreateTableWithRetry = { tableName, expectedCondition, contains, maxRetries, waitSeconds ->
int attempt = 0
while (attempt < maxRetries) {
def res = sql """SHOW CREATE TABLE ${tableName}"""
log.info("Attempt ${attempt + 1}: show table: ${res}")
if (res && res.size() > 0 && ((contains && res[0][1].contains(expectedCondition)) || (!contains && !res[0][1].contains(expectedCondition)))) {
logger.info("Attempt ${attempt + 1}: Condition met.")
return
} else {
logger.warn("Attempt ${attempt + 1}: Condition not met. Retrying after ${waitSeconds} second(s)...")
}
attempt++
if (attempt < maxRetries) {
sleep(waitSeconds * 1000)
}
}
def finalRes = sql """SHOW CREATE TABLE ${tableName}"""
log.info("Final attempt: show table: ${finalRes}")
assertTrue(finalRes && finalRes.size() > 0, "SHOW CREATE TABLE return empty or null")
if (contains) {
assertTrue(finalRes[0][1].contains(expectedCondition), "expected to contain \"${expectedCondition}\", actual: ${finalRes[0][1]}")
} else {
assertTrue(!finalRes[0][1].contains(expectedCondition), "expected not to contain \"${expectedCondition}\", actual: ${finalRes[0][1]}")
}
}

sql """INSERT INTO ${table_name} values ('1', '1', '1')"""
sql "sync"

qt_select """select * from ${table_name} order by a"""

assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"c1, c2\"", true, 3, 30)
// drop column c1
sql """ALTER TABLE ${table_name} DROP COLUMN c1"""
// show create table
def res = sql """SHOW CREATE TABLE ${table_name}"""
assert res[0][1].contains("\"bloom_filter_columns\" = \"\"")
wait_for_latest_op_on_table_finish(table_name, timeout)
sql "sync"

// show create table with retry logic
assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"c2\"", true, 3, 30)

// drop column c2
sql """ALTER TABLE ${table_name} DROP COLUMN c2"""
wait_for_latest_op_on_table_finish(table_name, timeout)
sql "sync"

// show create table with retry logic
assertShowCreateTableWithRetry(table_name, "\"bloom_filter_columns\" = \"\"", false, 3, 30)

// add new column c1
sql """ALTER TABLE ${table_name} ADD COLUMN c1 ARRAY<STRING>"""
wait_for_latest_op_on_table_finish(table_name, timeout)
sql "sync"

// insert data
sql """INSERT INTO ${table_name} values ('2', null)"""
sql "sync"
// select data
qt_select """select * from ${table_name} order by a"""
}
Loading