Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](Schema Change)light schema change support add key column #42911

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 22 additions & 23 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,29 +139,6 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
// not call validate request here, because realtime load does not
// contain version info

Status res;
// check delete condition if push for delete
std::queue<DeletePredicatePB> del_preds;
if (push_type == PushType::PUSH_FOR_DELETE) {
DeletePredicatePB del_pred;
TabletSchema tablet_schema;
tablet_schema.copy_from(*tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
for (const auto& column_desc : request.columns_desc) {
tablet_schema.append_column(TabletColumn(column_desc));
}
}
res = DeleteHandler::generate_delete_predicate(tablet_schema, request.delete_conditions,
&del_pred);
del_preds.push(del_pred);
if (!res.ok()) {
LOG(WARNING) << "fail to generate delete condition. res=" << res
<< ", tablet=" << tablet->tablet_id();
return res;
}
}

// check if version number exceed limit
if (tablet->exceed_version_limit(config::max_tablet_version_num)) {
return Status::Status::Error<TOO_MANY_VERSION>(
Expand All @@ -182,6 +159,7 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
config::tablet_meta_serialize_size_limit, tablet->tablet_id());
}

// set tablet schema
auto tablet_schema = std::make_shared<TabletSchema>();
tablet_schema->copy_from(*tablet->tablet_schema());
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
Expand All @@ -190,7 +168,28 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
for (const auto& column_desc : request.columns_desc) {
tablet_schema->append_column(TabletColumn(column_desc));
}
if (!request.__isset.schema_version) {
return Status::InternalError("No valid schema version in request, tablet_id={}",
tablet->tablet_id());
}
tablet_schema->set_schema_version(request.schema_version);
}

Status res;
// check delete condition if push for delete
std::queue<DeletePredicatePB> del_preds;
if (push_type == PushType::PUSH_FOR_DELETE) {
DeletePredicatePB del_pred;
res = DeleteHandler::generate_delete_predicate(*tablet_schema, request.delete_conditions,
&del_pred);
del_preds.push(del_pred);
if (!res.ok()) {
LOG(WARNING) << "fail to generate delete condition. res=" << res
<< ", tablet=" << tablet->tablet_id();
return res;
}
}

RowsetSharedPtr rowset_to_add;
// writes
res = _convert_v2(tablet, &rowset_to_add, tablet_schema, push_type);
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ Status BlockReader::init(const ReaderParams& read_params) {
auto cid = read_params.origin_return_columns->at(i);
for (int j = 0; j < read_params.return_columns.size(); ++j) {
if (read_params.return_columns[j] == cid) {
if (j < _tablet->num_key_columns() || _tablet->keys_type() != AGG_KEYS) {
if (j < _tablet_schema->num_key_columns() ||
_tablet_schema->keys_type() != AGG_KEYS) {
_normal_columns_idx.emplace_back(j);
} else {
_agg_columns_idx.emplace_back(j);
Expand Down Expand Up @@ -475,7 +476,7 @@ size_t BlockReader::_copy_agg_data() {
for (auto idx : _agg_columns_idx) {
auto& dst_column = _stored_data_columns[idx];
if (_stored_has_variable_length_tag[idx]) {
//variable length type should replace ordered
// variable length type should replace ordered
dst_column->clear();
for (size_t i = 0; i < copy_size; i++) {
auto& ref = _stored_row_ref[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,11 @@ private boolean processAddColumn(AddColumnClause alterClause, OlapTable olapTabl

Set<String> newColNameSet = Sets.newHashSet(column.getName());

return addColumnInternal(olapTable, column, columnPos, targetIndexId, baseIndexId, indexSchemaMap,
boolean lightSchemaChange = true;
addColumnInternal(olapTable, column, columnPos, targetIndexId, baseIndexId, indexSchemaMap,
newColNameSet, false, colUniqueIdSupplierMap);
lightSchemaChange = checkLightSchemaChange(olapTable, column, indexSchemaMap);
return lightSchemaChange;
}

private void processAddColumn(AddColumnClause alterClause, Table externalTable, List<Column> newSchema)
Expand Down Expand Up @@ -247,8 +250,9 @@ public boolean processAddColumns(AddColumnsClause alterClause, OlapTable olapTab

boolean lightSchemaChange = true;
for (Column column : columns) {
boolean result = addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap,
addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap,
newColNameSet, ignoreSameColumn, colUniqueIdSupplierMap);
boolean result = checkLightSchemaChange(olapTable, column, indexSchemaMap);
if (!result) {
lightSchemaChange = false;
}
Expand Down Expand Up @@ -932,18 +936,16 @@ private void addColumnInternal(Column newColumn, ColumnPosition columnPos, List<
* @param newColNameSet
* @param ignoreSameColumn
* @param colUniqueIdSupplierMap
* @return true: can light schema change, false: cannot
* @return void
* @throws DdlException
*/
private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnPosition columnPos,
private void addColumnInternal(OlapTable olapTable, Column newColumn, ColumnPosition columnPos,
long targetIndexId, long baseIndexId,
Map<Long, LinkedList<Column>> indexSchemaMap,
Set<String> newColNameSet, boolean ignoreSameColumn,
Map<Long, IntSupplier> colUniqueIdSupplierMap)
throws DdlException {

//only new table generate ColUniqueId, exist table do not.
boolean lightSchemaChange = olapTable.getEnableLightSchemaChange();
String newColName = newColumn.getName();

if (newColumn.isAutoInc()) {
Expand Down Expand Up @@ -1012,14 +1014,6 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
throw new DdlException("BITMAP_UNION must be used in AGG_KEYS");
}

//type key column do not allow light schema change.
if (newColumn.isKey()) {
if (LOG.isDebugEnabled()) {
LOG.debug("newColumn: {}, isKey()==true", newColumn);
}
lightSchemaChange = false;
}

// check if the new column already exist in base schema.
// do not support adding new column which already exist in base schema.
List<Column> baseSchema = olapTable.getBaseSchema(true);
Expand Down Expand Up @@ -1088,7 +1082,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true,
baseIndexNewColumnUniqueId);
if (targetIndexId == -1L) {
return lightSchemaChange;
return;
}
// 2. add to rollup
modIndexSchema = indexSchemaMap.get(targetIndexId);
Expand All @@ -1109,7 +1103,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true,
baseIndexNewColumnUniqueId);
// no specified target index. return
return lightSchemaChange;
return;
} else {
// add to rollup index
List<Column> modIndexSchema = indexSchemaMap.get(targetIndexId);
Expand Down Expand Up @@ -1143,7 +1137,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP

if (targetIndexId == -1L) {
// no specified target index. return
return lightSchemaChange;
return;
}

// 2. add to rollup index
Expand All @@ -1153,6 +1147,47 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
modIndexSchema = indexSchemaMap.get(targetIndexId);
checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, false, rollUpNewColumnUniqueId);
}
return;
}

/**
* @param olapTable
* @param newColumn
* @param indexSchemaMap
* @return true: can light schema change, false: cannot
* @throws DdlException
*/
private boolean checkLightSchemaChange(OlapTable olapTable, Column newColumn,
Map<Long, LinkedList<Column>> indexSchemaMap) {
// only new table generate ColUniqueId, exist table do not.
boolean lightSchemaChange = olapTable.getEnableLightSchemaChange();
if (!lightSchemaChange || !newColumn.isKey()) {
return lightSchemaChange;
}

// check light schema change with add key column
for (Long alterIndexId : indexSchemaMap.keySet()) {
List<Column> alterSchema = indexSchemaMap.get(alterIndexId);
int newColumnPos = -1;
for (int i = 0; i < alterSchema.size(); ++i) {
if (alterSchema.get(i).getName() == newColumn.getName()) {
newColumnPos = i;
}
}

if (newColumnPos >= 0) {
// add key column in short key columns
MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(alterIndexId);
if (newColumnPos < currentIndexMeta.getShortKeyColumnCount()) {
return false;
}

// unique key merge on write
if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
return false;
}
}
}
return lightSchemaChange;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sc_11 --
1 2 2017-10-01 Beijing 10 1 1 20 \N \N

-- !sc_12 --
1 2017-10-01 Beijing 10 1 20

-- !21_agg_multi_1 --
1 3 40 3 3

-- !21_agg_multi_2 --
1 2 30 2 2

-- !21_agg_multi_3 --
1 1 40 1 1

-- !21_agg_multi_1_compaction --
1 3 40 3 3

-- !21_agg_multi_2_compaction --
1 2 30 2 2

-- !21_agg_multi_3_compaction --
1 1 40 1 1

-- !23_base_table_multi_1 --
1 3 40 3 3

-- !23_base_table_multi_2 --
1 2 30 2 2

-- !23_base_table_multi_3 --
1 1 40 1 1

-- !23_base_table_multi_1_compaction --
1 3 40 3 3

-- !23_base_table_multi_2_compaction --
1 2 30 2 2

-- !23_base_table_multi_3_compaction --
1 1 40 1 1

-- !23_rollup_multi_1 --
1 3

-- !23_rollup_multi_2 --
1 2

-- !23_rollup_multi_3 --
1 1

-- !23_rollup_multi_1_compaction --
1 3

-- !23_rollup_multi_2_compaction --
1 2

-- !23_rollup_multi_3_compaction --
1 1

-- !23_agg_drop_1 --
1 3 40

-- !23_agg_drop_2 --
1 2 30

-- !23_agg_drop_3 --
1 1 40

-- !23_agg_drop_1_compaction --
1 3 40

-- !23_agg_drop_2_compaction --
1 2 30

-- !23_agg_drop_3_compaction --
1 1 40

-- !23_agg_condition_del_1 --
1 1 40

-- !23_agg_condition_del_1_compaction --
1 1 40

-- !23_agg_drop_and_add_1 --
1 2017-10-01 Beijing 10 new_key 1 1 15
1 2017-10-01 Beijing 10 new_key 3 1 40

-- !23_agg_drop_and_add_2 --
1 2 40

-- !23_agg_drop_and_add_3 --
1 new_key 1 1 15
1 new_key 3 1 40

-- !23_agg_drop_and_add_1_compaction --
1 2017-10-01 Beijing 10 new_key 1 1 15
1 2017-10-01 Beijing 10 new_key 3 1 40

-- !23_agg_drop_and_add_2_compaction --
1 2 40

-- !23_agg_drop_and_add_3_compaction --
1 new_key 1 1 15
1 new_key 3 1 40

-- !31_duplicate_multi_1 --
2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00

-- !31_duplicate_multi_2 --
2017-10-01T10:00 1 0 2 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00

-- !31_duplicate_multi_3 --
2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00

-- !31_duplicate_multi_1_compaction --
2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00

-- !31_duplicate_multi_2_compaction --
2017-10-01T10:00 1 0 2 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00

-- !31_duplicate_multi_3_compaction --
2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00

-- !41_unique_multi_1 --
1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00
1 Jone Beijing 2 10 1 10010 Haidian 2017-10-01T13:00

-- !41_unique_multi_2 --
1 Jone Beijing 2 10 1 10010 Haidian 2017-10-01T13:00

-- !41_unique_multi_3 --
1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00

-- !41_unique_multi_1_compaction --
1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00
1 Jone Beijing 2 10 1 10010 Haidian 2017-10-01T13:00

-- !41_unique_multi_2_compaction --
1 Jone Beijing 2 10 1 10010 Haidian 2017-10-01T13:00

-- !41_unique_multi_3_compaction --
1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00

Loading
Loading