diff --git a/bitsail-base/src/main/java/com/bytedance/bitsail/base/extension/SchemaAlignmentable.java b/bitsail-base/src/main/java/com/bytedance/bitsail/base/extension/SchemaAlignmentable.java new file mode 100644 index 000000000..2d1804bff --- /dev/null +++ b/bitsail-base/src/main/java/com/bytedance/bitsail/base/extension/SchemaAlignmentable.java @@ -0,0 +1,25 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.base.extension; + +import java.io.Serializable; + +public interface SchemaAlignmentable extends Serializable { + + void align(T currentSchema, T targetSchema) throws Exception; + +} diff --git a/bitsail-common/src/main/java/com/bytedance/bitsail/common/option/WriterOptions.java b/bitsail-common/src/main/java/com/bytedance/bitsail/common/option/WriterOptions.java index fe16b550c..a08bf49f9 100644 --- a/bitsail-common/src/main/java/com/bytedance/bitsail/common/option/WriterOptions.java +++ b/bitsail-common/src/main/java/com/bytedance/bitsail/common/option/WriterOptions.java @@ -108,5 +108,12 @@ interface BaseWriterOptions { ConfigOption WRITE_MODE = key(WRITER_PREFIX + "write_mode") .defaultValue("overwrite"); + + /** + * Whether to align schema during data writing + */ + ConfigOption SCHEMA_ALIGN = + key(WRITER_PREFIX + "schema_align") + .defaultValue(false); } } diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/error/KuduErrorCode.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/error/KuduErrorCode.java index 3718613c3..cc199b45d 100644 --- a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/error/KuduErrorCode.java +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/error/KuduErrorCode.java @@ -27,7 +27,8 @@ public enum KuduErrorCode implements ErrorCode { UNSUPPORTED_TYPE("Kudu-04", "Type is not supported"), ILLEGAL_VALUE("Kudu-05", "Value type is illegal"), SPLIT_ERROR("Kudu-06", "Something wrong with creating splits."), - PREDICATE_ERROR("Kudu-07", "Something wrong with kudu predicates."); + PREDICATE_ERROR("Kudu-07", "Something wrong with kudu predicates."), + ALTER_TABLE_ERROR("Kudu-08", "Something wrong when alter table."); private final String code; diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/sink/KuduSink.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/sink/KuduSink.java index 5de20ca60..b65bd329a 100644 --- a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/sink/KuduSink.java +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/sink/KuduSink.java @@ -27,17 +27,40 @@ import com.bytedance.bitsail.connector.kudu.core.KuduConstants; import com.bytedance.bitsail.connector.kudu.core.KuduFactory; import com.bytedance.bitsail.connector.kudu.option.KuduWriterOptions; +import com.bytedance.bitsail.connector.kudu.sink.schema.KuduSchemaAlignment; import com.bytedance.bitsail.connector.kudu.util.KuduSchemaUtils; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Type; import org.apache.kudu.client.KuduTable; import java.io.Serializable; +import java.util.HashMap; import java.util.List; +import java.util.stream.Collectors; public class KuduSink implements Sink { private BitSailConfiguration writerConf; + private static final HashMap TYPE_MAPPINGS = new HashMap<>(); + + static { + TYPE_MAPPINGS.put("int8", Type.INT8); + TYPE_MAPPINGS.put("int16", Type.INT16); + TYPE_MAPPINGS.put("int32", Type.INT32); + TYPE_MAPPINGS.put("int64", Type.INT64); + TYPE_MAPPINGS.put("binary", Type.BINARY); + TYPE_MAPPINGS.put("string", Type.STRING); + TYPE_MAPPINGS.put("bool", Type.BOOL); + TYPE_MAPPINGS.put("float", Type.FLOAT); + TYPE_MAPPINGS.put("double", Type.DOUBLE); + TYPE_MAPPINGS.put("unixtime_micros", Type.UNIXTIME_MICROS); + TYPE_MAPPINGS.put("decimal", Type.DECIMAL); + TYPE_MAPPINGS.put("varchar", Type.VARCHAR); + TYPE_MAPPINGS.put("date", Type.DATE); + } + @Override public String getWriterName() { return KuduConstants.KUDU_CONNECTOR_NAME; @@ -51,9 +74,20 @@ public void configure(BitSailConfiguration commonConfiguration, BitSailConfigura KuduFactory kuduFactory = KuduFactory.initWriterFactory(writerConf); KuduTable kuduTable = kuduFactory.getTable(tableName); - // todo: add schema ddl List columns = writerConf.get(KuduWriterOptions.COLUMNS); - KuduSchemaUtils.checkColumnsExist(kuduTable, columns); + Boolean schemaAlign = writerConf.get(KuduWriterOptions.SCHEMA_ALIGN); + if (schemaAlign) { + List targetSchema = columns.stream().map(column -> new ColumnSchema.ColumnSchemaBuilder(column.getName(), + TYPE_MAPPINGS.get(column.getType().trim().toLowerCase())) + .defaultValue(column.getDefaultValue()) + .comment(column.getComment()) + .build()).collect(Collectors.toList()); + KuduSchemaAlignment kuduSchemaAlignment = new KuduSchemaAlignment(kuduFactory.getClient(), kuduTable); + kuduSchemaAlignment.align(kuduTable.getSchema().getColumns(), targetSchema); + } else { + KuduSchemaUtils.checkColumnsExist(kuduTable, columns); + } + } @Override diff --git a/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/sink/schema/KuduSchemaAlignment.java b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/sink/schema/KuduSchemaAlignment.java new file mode 100644 index 000000000..3fb2f89fe --- /dev/null +++ b/bitsail-connectors/connector-kudu/src/main/java/com/bytedance/bitsail/connector/kudu/sink/schema/KuduSchemaAlignment.java @@ -0,0 +1,53 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.kudu.sink.schema; + +import com.bytedance.bitsail.base.extension.SchemaAlignmentable; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.connector.kudu.error.KuduErrorCode; + +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.client.AlterTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduTable; + +import java.util.List; + +public class KuduSchemaAlignment implements SchemaAlignmentable> { + private final transient KuduClient kuduClient; + private final transient KuduTable kuduTable; + + public KuduSchemaAlignment(KuduClient kuduClient, KuduTable kuduTable) { + this.kuduClient = kuduClient; + this.kuduTable = kuduTable; + } + + @Override + public void align(List currentColumnSchema, List targetColumnSchema) { + try { + AlterTableOptions alterTableOptions = new AlterTableOptions(); + for (ColumnSchema newColumn : targetColumnSchema) { + if (currentColumnSchema.stream().noneMatch(column -> column.getName().equals(newColumn.getName()))) { + alterTableOptions.addColumn(newColumn); + } + } + kuduClient.alterTable(kuduTable.getName(), alterTableOptions); + } catch (Exception e) { + throw BitSailException.asBitSailException(KuduErrorCode.ALTER_TABLE_ERROR, e); + } + } +}