Skip to content

Commit

Permalink
light schema change support add key column
Browse files Browse the repository at this point in the history
  • Loading branch information
wangqingtao6 committed Nov 5, 2024
1 parent 8d741b6 commit 72f1191
Show file tree
Hide file tree
Showing 5 changed files with 648 additions and 22 deletions.
9 changes: 5 additions & 4 deletions be/src/vec/olap/block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.

#include "vec/olap/block_reader.h"

#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <stdint.h>
Expand All @@ -27,6 +25,8 @@
#include <ostream>
#include <string>

#include "vec/olap/block_reader.h"

// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "cloud/config.h"
#include "common/compiler_util.h" // IWYU pragma: keep
Expand Down Expand Up @@ -198,7 +198,8 @@ Status BlockReader::init(const ReaderParams& read_params) {
auto cid = read_params.origin_return_columns->at(i);
for (int j = 0; j < read_params.return_columns.size(); ++j) {
if (read_params.return_columns[j] == cid) {
if (j < _tablet->num_key_columns() || _tablet->keys_type() != AGG_KEYS) {
if (j < _tablet_schema->num_key_columns() ||
_tablet_schema->keys_type() != AGG_KEYS) {
_normal_columns_idx.emplace_back(j);
} else {
_agg_columns_idx.emplace_back(j);
Expand Down Expand Up @@ -466,7 +467,7 @@ size_t BlockReader::_copy_agg_data() {
for (auto idx : _agg_columns_idx) {
auto& dst_column = _stored_data_columns[idx];
if (_stored_has_variable_length_tag[idx]) {
//variable length type should replace ordered
// variable length type should replace ordered
dst_column->clear();
for (size_t i = 0; i < copy_size; i++) {
auto& ref = _stored_row_ref[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,11 @@ private boolean processAddColumn(AddColumnClause alterClause, OlapTable olapTabl

Set<String> newColNameSet = Sets.newHashSet(column.getName());

return addColumnInternal(olapTable, column, columnPos, targetIndexId, baseIndexId, indexSchemaMap,
boolean lightSchemaChange = true;
addColumnInternal(olapTable, column, columnPos, targetIndexId, baseIndexId, indexSchemaMap,
newColNameSet, false, colUniqueIdSupplierMap);
lightSchemaChange = checkLightSchemaChange(olapTable, column, indexSchemaMap);
return lightSchemaChange;
}

private void processAddColumn(AddColumnClause alterClause, Table externalTable, List<Column> newSchema)
Expand Down Expand Up @@ -246,8 +249,9 @@ public boolean processAddColumns(AddColumnsClause alterClause, OlapTable olapTab

boolean lightSchemaChange = true;
for (Column column : columns) {
boolean result = addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap,
addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap,
newColNameSet, ignoreSameColumn, colUniqueIdSupplierMap);
boolean result = checkLightSchemaChange(olapTable, column, indexSchemaMap);
if (!result) {
lightSchemaChange = false;
}
Expand Down Expand Up @@ -928,18 +932,16 @@ private void addColumnInternal(Column newColumn, ColumnPosition columnPos, List<
* @param newColNameSet
* @param ignoreSameColumn
* @param colUniqueIdSupplierMap
* @return true: can light schema change, false: cannot
* @return void
* @throws DdlException
*/
private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnPosition columnPos,
private void addColumnInternal(OlapTable olapTable, Column newColumn, ColumnPosition columnPos,
long targetIndexId, long baseIndexId,
Map<Long, LinkedList<Column>> indexSchemaMap,
Set<String> newColNameSet, boolean ignoreSameColumn,
Map<Long, IntSupplier> colUniqueIdSupplierMap)
throws DdlException {

//only new table generate ColUniqueId, exist table do not.
boolean lightSchemaChange = olapTable.getEnableLightSchemaChange();
String newColName = newColumn.getName();

if (newColumn.isAutoInc()) {
Expand Down Expand Up @@ -1008,14 +1010,6 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
throw new DdlException("BITMAP_UNION must be used in AGG_KEYS");
}

//type key column do not allow light schema change.
if (newColumn.isKey()) {
if (LOG.isDebugEnabled()) {
LOG.debug("newColumn: {}, isKey()==true", newColumn);
}
lightSchemaChange = false;
}

// check if the new column already exist in base schema.
// do not support adding new column which already exist in base schema.
List<Column> baseSchema = olapTable.getBaseSchema(true);
Expand Down Expand Up @@ -1084,7 +1078,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true,
baseIndexNewColumnUniqueId);
if (targetIndexId == -1L) {
return lightSchemaChange;
return;
}
// 2. add to rollup
modIndexSchema = indexSchemaMap.get(targetIndexId);
Expand All @@ -1105,7 +1099,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, true,
baseIndexNewColumnUniqueId);
// no specified target index. return
return lightSchemaChange;
return;
} else {
// add to rollup index
List<Column> modIndexSchema = indexSchemaMap.get(targetIndexId);
Expand Down Expand Up @@ -1139,7 +1133,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP

if (targetIndexId == -1L) {
// no specified target index. return
return lightSchemaChange;
return;
}

// 2. add to rollup index
Expand All @@ -1149,6 +1143,64 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
modIndexSchema = indexSchemaMap.get(targetIndexId);
checkAndAddColumn(modIndexSchema, newColumn, columnPos, newColNameSet, false, rollUpNewColumnUniqueId);
}
return;
}

/**
* @param olapTable
* @param newColumn
* @param indexSchemaMap
* @return true: can light schema change, false: cannot
* @throws DdlException
*/
private boolean checkLightSchemaChange(OlapTable olapTable, Column newColumn,
Map<Long, LinkedList<Column>> indexSchemaMap) {
// only new table generate ColUniqueId, exist table do not.
boolean lightSchemaChange = olapTable.getEnableLightSchemaChange();
if (!lightSchemaChange || !newColumn.isKey()) {
return lightSchemaChange;
}

long baseIndexId = olapTable.getBaseIndexId();
Set<String> baseColumnNames = Sets.newHashSet();

// check light schema change with add key column
for (Long alterIndexId : indexSchemaMap.keySet()) {
List<Column> alterSchema = indexSchemaMap.get(alterIndexId);
int newColumnPos = -1;
for (int i = 0; i < alterSchema.size(); ++i) {
if (alterSchema.get(i).getName() == newColumn.getName()) {
newColumnPos = i;
}
}

if (newColumnPos >= 0) {
// add key column in short key columns
MaterializedIndexMeta currentIndexMeta = olapTable.getIndexMetaByIndexId(alterIndexId);
if (newColumnPos < currentIndexMeta.getShortKeyColumnCount()) {
return false;
}

// not support add key column for mv index
if (alterIndexId != baseIndexId) {
if (baseColumnNames.isEmpty()) {
for (Column col : olapTable.getBaseSchemaKeyColumns()) {
baseColumnNames.add(col.getName());
}
}
for (Column col : olapTable.getKeyColumnsByIndexId(alterIndexId)) {
if (null != col.getDefineExpr() || !baseColumnNames.contains(col.getName())) {
return false;
}
}
}

// unique key merge on write
if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
return false;
}
}
}
return lightSchemaChange;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sc --
1 2 2017-10-01 Beijing 10 1 1 20 \N \N

-- !sc --
1 2017-10-01 Beijing 10 1 20

-- !21_agg_multi_rowset --
1 3 40 3 3

-- !21_agg_multi_rowset --
1 2 30 2 2

-- !21_agg_multi_rowset --
1 1 40 1 1

-- !21_agg_compaction --
1 3 40 3 3

-- !21_agg_compaction --
1 2 30 2 2

-- !21_agg_compaction --
1 1 40 1 1

-- !22_agg_drop_multi_rowset --
1 3 40 3 3

-- !22_agg_drop_multi_rowset --
1 2 30 2 2

-- !22_agg_drop_multi_rowset --
1 1 40 1 1

-- !22_agg_drop_compaction --
1 3 40 3 3

-- !22_agg_drop_compaction --
1 2 30 2 2

-- !22_agg_drop_compaction --
1 1 40 1 1

-- !23_base_table_multi_rowset --
1 3 40 3 3

-- !23_base_table_multi_rowset --
1 2 30 2 2

-- !23_base_table_multi_rowset --
1 1 40 1 1

-- !23_base_table_compaction --
1 3 40 3 3

-- !23_base_table_compaction --
1 2 30 2 2

-- !23_base_table_compaction --
1 1 40 1 1

-- !23_rollup_multi_rowset --
1 3

-- !23_rollup_multi_rowset --
1 2

-- !23_rollup_multi_rowset --
1 1

-- !23_agg_rollup_compaction --
1 3

-- !23_agg_rollup_compaction --
1 2

-- !23_agg_rollup_compaction --
1 1

-- !31_duplicate_multi_rowset --
2017-10-01T10:00 1 0 2 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00
2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00

-- !31_duplicate_multi_rowset --
2017-10-01T10:00 1 0 2 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00

-- !31_duplicate_multi_rowset --
2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00

-- !31_duplicate_compaction --
2017-10-01T10:00 1 0 2 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00
2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00

-- !31_duplicate_compaction --
2017-10-01T10:00 1 0 2 10 2017-10-01T12:00
2017-10-01T10:00 1 0 2 none 10 2017-10-01T12:00

-- !31_duplicate_compaction --
2017-10-01T10:00 1 0 1 none 10 2017-10-01T12:00

-- !41_unique_multi_rowset --
1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00

-- !41_unique_multi_rowset --

-- !41_unique_multi_rowset --
1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00

-- !41_unique_compaction --
1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00

-- !41_unique_compaction --

-- !41_unique_compaction --
1 Jone Beijing 1 10 1 10010 Haidian 2017-10-01T14:00

Loading

0 comments on commit 72f1191

Please sign in to comment.