From d64ce2baa96b8a9753754fb56c1e72e93a813f59 Mon Sep 17 00:00:00 2001
From: Cadenana <768972002@qq.com>
Date: Tue, 19 Nov 2024 19:52:04 +0800
Subject: [PATCH 1/3] StarRocks StageMode use
Signed-off-by: Cadenana <768972002@qq.com>
---
.idea/.gitignore | 10 -
...7\347\224\250\346\226\207\346\241\2431.md" | 138 +++++++++
pom.xml | 48 ++--
.../connector/spark/catalog/SRCatalog.java | 237 +++++++++++++++
.../connector/spark/catalog/SRColumn.java | 181 ++++++++++++
.../connector/spark/catalog/SRTable.java | 202 +++++++++++++
.../spark/cfg/ConfigurationOptions.java | 2 +-
.../spark/exception/CatalogException.java | 42 +++
.../connector/spark/sql/StarRocksTable.java | 2 +-
.../spark/sql/conf/StarRocksConfigBase.java | 25 +-
.../spark/sql/conf/WriteStarRocksConfig.java | 94 +++++-
.../spark/sql/connect/StarRocksConnector.java | 132 +++++++--
.../spark/sql/schema/StarRocksSchema.java | 4 +-
.../spark/sql/write/StarRocksDataWriter.java | 9 +-
.../spark/sql/write/StarRocksWrite.java | 272 +++++++++++++++++-
.../sql/write/StarRocksWriteBuilder.java | 20 +-
.../sql/write/StarRocksWriterFactory.java | 1 -
.../spark/catalog/StarRocksCatalog.scala | 1 +
.../connector/spark/examples/SimpleWrite.java | 12 +-
.../connector/spark/sql/ITTestBase.java | 7 +-
.../connector/spark/sql/ReadWriteITTest.java | 262 ++++++++++++++---
21 files changed, 1570 insertions(+), 131 deletions(-)
delete mode 100644 .idea/.gitignore
create mode 100644 "docs/StarRocks-Stage\346\250\241\345\274\217\344\275\277\347\224\250\346\226\207\346\241\2431.md"
create mode 100644 src/main/java/com/starrocks/connector/spark/catalog/SRCatalog.java
create mode 100644 src/main/java/com/starrocks/connector/spark/catalog/SRColumn.java
create mode 100644 src/main/java/com/starrocks/connector/spark/catalog/SRTable.java
create mode 100644 src/main/java/com/starrocks/connector/spark/exception/CatalogException.java
diff --git a/.idea/.gitignore b/.idea/.gitignore
deleted file mode 100644
index 0a8642fa..00000000
--- a/.idea/.gitignore
+++ /dev/null
@@ -1,10 +0,0 @@
-# Default ignored files
-/shelf/
-/workspace.xml
-# Editor-based HTTP Client requests
-/httpRequests/
-# Datasource local storage ignored files
-/dataSources/
-/dataSources.local.xml
-# Zeppelin ignored files
-/ZeppelinRemoteNotebooks/
diff --git "a/docs/StarRocks-Stage\346\250\241\345\274\217\344\275\277\347\224\250\346\226\207\346\241\2431.md" "b/docs/StarRocks-Stage\346\250\241\345\274\217\344\275\277\347\224\250\346\226\207\346\241\2431.md"
new file mode 100644
index 00000000..61d6b359
--- /dev/null
+++ "b/docs/StarRocks-Stage\346\250\241\345\274\217\344\275\277\347\224\250\346\226\207\346\241\2431.md"
@@ -0,0 +1,138 @@
+## 列模式部分更新-暂存区模式
+
+在列模式部分更新的场景中,一般涉及少数列大量行,暂存区模式适用于某一列或多列更新的数据行占比很大,甚至近似于全列更新的场景。
+
+配置参数starrocks.write.stage.use用于控制暂存区模式的使用,支持取值为
+
+always
+
+指定使用部分列更新时采用暂存区模式,适用于更新时行占比很大的场景
+
+auto
+
+系统会根据更新数据涉及的列以及目标表的列数,行占比ratio决定是否使用暂存区模式
+
+更新数据的行占比超过60,且更新列数少于4个
+
+更新列数占所有列数的百分比小于25%
+
+反之,系统不会使用暂存区模式
+
+starrocks.write.stage.columns.update.ratio:被更新数据列的行占比,可指定占比,默认20
+
+never(默认值)
+
+指定不使用暂存区模式,涉及行占比不大时采用
+
+
+
+**sql配置**
+
+暂存区模式采用在StarRocks中创建临时表实现,使用暂存区模式更新导入数据时需要执行update语句,可以使用StarRocks系统变量配置sql执行的参数,对应参数为starrocks.write.stage.session.*
+
+参数映射:exec_mem_limit->exec.mem.limit //todo
+
+比如:.option ("starrocks.write.stage.session.exec.mem.limit","8589934592") ,可以实现update执行时的StarRocks内存限制。
+
+**暂存表清理**
+
+正常流程中,在执行完update后会将暂存表清楚,完成整个部分列更新流程。一些特殊情况下暂存表未被正确删除,会占用数据库空间。可以定期执行SparkDrop作业,删除存在时长超过一天的暂存表实现对冗余暂存表的清理。
+
+参数:
+
+1. FE MySQL Server 端口(默认127.0.0.1:9030)
+
+2.用户名(username)
+
+3.密码(password)
+
+4.Spark作业执行环境(默认local[*]) 用--master指定也可(优先级更高)
+
+用例
+
+```
+spark-submit \
+ --class com.TableDrop.SparkDrop \
+ --master local[*] \
+/opt/SparkTask/original-SparkDrop-1.0-SNAPSHOT.jar 192.168.181.1:9030 root "password" local[*]
+```
+
+**使用示例**
+
+建表语句:
+
+```
+CREATE TABLE `test`.`stage_test`
+(
+ `id` BIGINT NOT NULL COMMENT "主键",
+ `name` VARCHAR(65533) NULL ,
+ `score` INT NOT NULL ,
+ `is_active` BOOLEAN NULL ,
+ `decimal_score` DECIMAL(10, 2) NULL ,
+ `double_value` DOUBLE NULL ,
+ `float_value` FLOAT NULL ,
+ `largeint_value` LARGEINT NULL ,
+ `smallint_value` SMALLINT NULL ,
+ `tinyint_value` TINYINT NULL ,
+ `char_value` CHAR(10) NULL ,
+ `string_value` STRING NULL ,
+ `create_date` DATE NULL
+)
+ENGINE=OLAP
+PRIMARY KEY(`id`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`id`)
+BUCKETS 4;
+
+```
+
+插入数据
+
+```
+INSERT INTO `test`.`stage_test` (`id`, `name`, `score`, `is_active`, `decimal_score`, `double_value`, `float_value`, `largeint_value`, `smallint_value`, `tinyint_value`, `char_value`, `string_value`, `create_date`)
+VALUES
+ (1, 'Alice', 95, true, 95.50, 95.5, 95.5, 10000000000, 100, 10, 'A', 'Alice String', '2024-01-01'),
+ (2, 'Bob', 88, true, 88.00, 88.0, 88.0, 20000000000, 90, 9, 'B', 'Bob String', '2024-01-02'),
+ (3, 'Charlie', 76, false, 76.75, 76.75, 76.75, 30000000000, 80, 8, 'C', 'Charlie String', '2024-01-03'),
+ (4, 'David', 82, true, 82.25, 82.25, 82.25, 40000000000, 70, 7, 'D', 'David String', '2024-01-04'),
+ (5, 'Eva', 90, true, 90.00, 90.0, 90.0, 50000000000, 60, 6, 'E', 'Eva String', '2024-01-05'),
+ (6, 'Frank', 65, false, 65.50, 65.5, 65.5, 60000000000, 50, 5, 'F', 'Frank String', '2024-01-06'),
+ (7, 'Grace', 70, true, 70.00, 70.0, 70.0, 70000000000, 40, 4, 'G', 'Grace String', '2024-01-07'),
+ (8, 'Heidi', 80, true, 80.00, 80.0, 80.0, 80000000000, 30, 3, 'H', 'Heidi String', '2024-01-08'),
+ (9, 'Ivan', 92, true, 92.00, 92.0, 92.0, 90000000000, 20, 2, 'I', 'Ivan String', '2024-01-09'),
+ (10, 'Judy', 85, false, 85.00, 85.0, 85.0, 10000000000, 10, 1, 'J', 'Judy String', '2024-01-10');
+
+```
+
+以Scala语言为例,在Spark-Shell中运行
+
+```
+import org.apache.spark.sql.SparkSession
+
+val data = Seq((1, "starrocks", 124578), (2, "spark", 235689))
+val df = data.toDF("id", "name", "score")
+
+df.write.format("starrocks")
+ .option("starrocks.fe.http.url", "http://127.0.0.1:8030")
+ .option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
+ .option("starrocks.table.identifier", "test.stage_test")
+ .option("starrocks.user", "root")
+ .option("starrocks.password", "")
+ .option("starrocks.write.stage.use","always")
+ .option("starrocks.write.properties.partial_update_mode","column")
+ .option("starrocks.write.properties.partial_update","true")
+ .option("starrocks.columns", "id,name,score")
+ .option("starrocks.write.stage.session.query.timeout","309")
+ .option("starrocks.write.stage.session.query.mem.limit","100000000")
+ .option("starrocks.write.stage.session.exec.mem.limit","8589934592")
+ .option("starrocks.write.stage.columns.update.ratio","80")
+ .mode("append")
+ .save()
+```
+
+运行后会在temp_db下创建具有"id", "name", "score"字段的临时表,并更新test.stage_test,更新完成后删除临时表。
+
+
+
+
+
diff --git a/pom.xml b/pom.xml
index aaa28ecc..d5d51d7c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -497,9 +497,9 @@
src/test/resources/sql/**
**/target/**
**/*.md
-
-
-
+
+
+
.github/**
@@ -641,25 +641,25 @@
-
- ossrh
- https://s01.oss.sonatype.org/content/repositories/snapshots
-
-
- ossrh
- https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/
-
-
-
-
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
- 3.1.0
-
- checkstyle.xml
-
-
-
-
+
+ ossrh
+ https://s01.oss.sonatype.org/content/repositories/snapshots
+
+
+ ossrh
+ https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 3.1.0
+
+ checkstyle.xml
+
+
+
+
diff --git a/src/main/java/com/starrocks/connector/spark/catalog/SRCatalog.java b/src/main/java/com/starrocks/connector/spark/catalog/SRCatalog.java
new file mode 100644
index 00000000..c997ee2b
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/spark/catalog/SRCatalog.java
@@ -0,0 +1,237 @@
+// Copyright 2021-present StarRocks, Inc. All rights reserved.
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.starrocks.connector.spark.catalog;
+
+import com.google.common.base.Preconditions;
+import com.starrocks.connector.spark.exception.CatalogException;
+import com.starrocks.connector.spark.sql.connect.StarRocksConnector;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class SRCatalog implements Serializable {
+//supply someBase sql Method for BatchWrite StageMode
+
+private static final long serialVersionUID = 1L;
+
+private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class);
+
+ private final String jdbcUrl;
+ private final String username;
+ private final String password;
+ public SRCatalog(String jdbcUrl, String username, String password) {
+ this.jdbcUrl = jdbcUrl;
+ this.username = username;
+ this.password = password;
+ }
+ public Connection getJdbcConnection() throws Exception {
+ Connection jdbcConnection = StarRocksConnector.createJdbcConnection(jdbcUrl, username, password);
+ return jdbcConnection;
+ }
+
+public void executeDDLBySql(String ddlSql) throws Exception {
+ try (Connection connection = getJdbcConnection()) {
+ Statement statement = connection.createStatement();
+ statement.execute(ddlSql);
+ }
+ catch (Exception e) {
+ throw new IllegalStateException("execute ddl error, " + e.getMessage(), e);
+ }
+}
+
+ //create db if not exists
+ public void createDatabase (String dbNames,boolean ignoreIfExists) {
+ if (StringUtils.isEmpty(dbNames)) {
+ throw new CatalogException("create db error");
+ }
+ try {
+ String buildSql = buildCreateDatabaseSql(dbNames, ignoreIfExists);
+ executeDDLBySql(buildSql);
+ }
+ catch (Exception e) {
+
+ throw new CatalogException("create db error", e);
+ }
+ LOG.info("successfully created database {}", dbNames);
+ }
+
+ public void createTable(SRTable table, boolean ignoreIfExists)
+ {
+ try {
+ String sql = buildCreateTableSql(table, ignoreIfExists);
+ executeDDLBySql(sql);
+ }
+ catch (Exception e) {
+ throw new CatalogException("create table error", e);
+ }
+ LOG.info("successfully created {}", table);
+ }
+
+ public void dropTable(String dbName,String tableName) {
+ String sql = String.format(
+ "SELECT TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'",
+ dbName, tableName
+ );
+ try (Connection connection = getJdbcConnection(); Statement stmt = connection.createStatement()) {
+ ResultSet resultSet = stmt.executeQuery(sql);
+ if (resultSet.next()) {
+ stmt.executeUpdate("DROP TABLE `" + dbName + "`.`" + tableName + "`");
+ }
+ }
+catch (Exception e) {
+ throw new CatalogException("drop table error", e);
+}
+ LOG.info("successfully dropped {}", tableName);
+ }
+
+ private String buildCreateDatabaseSql(String databaseName, boolean ignoreIfExists) {
+ StringBuilder sql = new StringBuilder("CREATE DATABASE ");
+ if (ignoreIfExists) {
+ sql.append("IF NOT EXISTS ");
+ }
+ sql.append(databaseName);
+ return sql.toString();
+ }
+
+ private String buildCreateTableSql(SRTable table, boolean ignoreIfExists) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(
+ String.format(
+ "CREATE TABLE %s`%s`.`%s`",
+ ignoreIfExists ? "IF NOT EXISTS " : "",
+ table.getDatabaseName(),
+ table.getTableName()));
+ builder.append(" (\n");
+
+ List tableKeys = table.getTableKeys().orElse(Collections.emptyList());
+
+ String columnsStmt =
+ table.getColumns().stream()
+ .map(column -> buildColumnStmt(column, tableKeys))
+ .collect(Collectors.joining(",\n"));
+
+ builder.append(columnsStmt);
+ builder.append("\n) ");
+
+ String keyModel;
+ switch (table.getTableType()) {
+ case DUPLICATE_KEYS:
+ keyModel = "DUPLICATE KEY";
+ break;
+ case PRIMARY_KEYS:
+ keyModel = "PRIMARY KEY";
+ break;
+ case UNIQUE_KEYS:
+ keyModel = "UNIQUE KEY";
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Not support to build create table sql for table type " + table.getTableType());
+ }
+
+ builder.append(String.format("%s (%s)\n", keyModel, String.join(", ", tableKeys)));
+
+ if (!table.getDistributionKeys().isPresent()) {
+ Preconditions.checkArgument(
+ table.getTableType() == SRTable.TableType.DUPLICATE_KEYS,
+ "Can't build create table sql because there is no distribution keys");
+ } else {
+ String distributionKeys =
+ table.getDistributionKeys().get().stream()
+ .map(key -> "`" + key + "`")
+ .collect(Collectors.joining(", "));
+ builder.append(String.format("DISTRIBUTED BY HASH (%s)", distributionKeys));
+ }
+
+ if (table.getNumBuckets().isPresent()) {
+ builder.append(" BUCKETS ");
+ builder.append(table.getNumBuckets().get());
+ }
+
+ if (!table.getProperties().isEmpty()) {
+ builder.append("\nPROPERTIES (\n");
+ String properties =
+ table.getProperties().entrySet().stream()
+ .map(entry -> String.format("\"%s\" = \"%s\"", entry.getKey(), entry.getValue()))
+ .collect(Collectors.joining(",\n"));
+ builder.append(properties);
+ builder.append("\n)");
+ }
+
+ builder.append(";");
+ return builder.toString();
+ }
+
+ private String buildColumnStmt(SRColumn column, List tableKeys) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("`");
+ builder.append(column.getColumnName());
+ builder.append("` ");
+
+ builder.append(
+ getFullColumnType(
+ column.getDataType(), column.getColumnSize(), column.getDecimalDigits()));
+ builder.append(" ");
+
+ if (tableKeys.contains(column.getColumnName())) {
+ builder.append("NOT NULL");
+ } else {
+ builder.append(column.isNullable() ? "NULL" : "NOT NULL");
+ }
+
+ if (column.getDefaultValue().isPresent()) {
+ builder.append(String.format(" DEFAULT \"%s\"", column.getDefaultValue().get()));
+ }
+
+ if (column.getColumnComment().isPresent()) {
+ builder.append(String.format(" COMMENT \"%s\"", column.getColumnComment().get()));
+ }
+
+ return builder.toString();
+ }
+
+ private String getFullColumnType(
+ String type, Optional columnSize, Optional decimalDigits) {
+ String dataType = type.toUpperCase();
+ switch (dataType) {
+ case "DECIMAL":
+ Preconditions.checkArgument(
+ columnSize.isPresent(), "DECIMAL type must have column size");
+ Preconditions.checkArgument(
+ decimalDigits.isPresent(), "DECIMAL type must have decimal digits");
+ return String.format("DECIMAL(%d, %s)", columnSize.get(), decimalDigits.get());
+ case "CHAR":
+ case "VARCHAR":
+ Preconditions.checkArgument(
+ columnSize.isPresent(), type + " type must have column size");
+ return String.format("%s(%d)", dataType, columnSize.get());
+ default:
+ return dataType;
+ }
+ }
+}
diff --git a/src/main/java/com/starrocks/connector/spark/catalog/SRColumn.java b/src/main/java/com/starrocks/connector/spark/catalog/SRColumn.java
new file mode 100644
index 00000000..95f03c6d
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/spark/catalog/SRColumn.java
@@ -0,0 +1,181 @@
+// Copyright 2021-present StarRocks, Inc. All rights reserved.
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.starrocks.connector.spark.catalog;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.Optional;
+//to describe a column of SR, info from information_schema.COLUMNS
+public class SRColumn implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String columnName;
+ //column's position
+ private final int ordinalPosition;
+ //column type
+ private final String dataType;
+ //can be null or not
+ private final boolean isNullable;
+ // column's defaultValue
+ @Nullable
+ private final String defaultValue;
+ //column' maxSize
+ @Nullable
+ private final Integer columnSize;
+ //decimal 0 0.1 null if not a number
+ @Nullable
+ private final Integer decimalDigits;
+ //comment about a column
+ @Nullable
+ private final String columnComment;
+ private SRColumn(
+ String columnName,
+ int ordinalPosition,
+ String dataType,
+ boolean isNullable,
+ @Nullable String defaultValue,
+ @Nullable Integer columnSize,
+ @Nullable Integer decimalDigits,
+ @Nullable String columnComment) {
+ this.columnName = Preconditions.checkNotNull(columnName);
+ this.ordinalPosition = ordinalPosition;
+ this.dataType = Preconditions.checkNotNull(dataType);
+ this.isNullable = isNullable;
+ this.defaultValue = defaultValue;
+ this.columnSize = columnSize;
+ this.decimalDigits = decimalDigits;
+ this.columnComment = columnComment;
+ }
+ public String getColumnName() {
+ return columnName;
+ }
+ public int getOrdinalPosition() {
+ return ordinalPosition;
+ }
+ public String getDataType() {
+ return dataType;
+ }
+ public boolean isNullable() {
+ return isNullable;
+ }
+ public Optional getDefaultValue() {
+ return Optional.ofNullable(defaultValue);
+ }
+ public Optional getColumnSize() {
+ return Optional.ofNullable(columnSize);
+ }
+ public Optional getDecimalDigits() {
+ return Optional.ofNullable(decimalDigits);
+ }
+ public Optional getColumnComment() {
+ return Optional.ofNullable(columnComment);
+ }
+ @Override
+ public String toString() {
+ return "StarRocksColumn{" +
+ "columnName='" + columnName + '\'' +
+ ", ordinalPosition=" + ordinalPosition +
+ ", dataType='" + dataType + '\'' +
+ ", isNullable=" + isNullable +
+ ", defaultValue='" + defaultValue + '\'' +
+ ", columnSize=" + columnSize +
+ ", decimalDigits=" + decimalDigits +
+ ", columnComment='" + columnComment + '\'' +
+ '}';
+ }
+ public static class Builder {
+ private String columnName;
+ private int ordinalPosition;
+ private String dataType;
+ private boolean isNullable = true;
+ private String defaultValue;
+ private Integer columnSize;
+ private Integer decimalDigits;
+ private String columnComment;
+ public SRColumn.Builder setColumnName(String columnName) {
+ this.columnName = columnName;
+ return this;
+ }
+ public SRColumn.Builder setOrdinalPosition(int ordinalPosition) {
+ this.ordinalPosition = ordinalPosition;
+ return this;
+ }
+ public SRColumn.Builder setDataType(String dataType,Integer size) {
+ if ("tinyint".equalsIgnoreCase(dataType) && size == null) {
+ this.dataType = "boolean";
+ }
+ else {
+ this.dataType = dataType;
+ }
+
+ return this;
+ }
+ public SRColumn.Builder setDataType(String dataType) {
+ this.dataType = dataType;
+ return this;
+ }
+ public SRColumn.Builder setNullable(String isNULL) {
+
+ if ("YES".equalsIgnoreCase(isNULL)) {
+ this.isNullable = true;
+ }
+ else
+ {
+ this.isNullable = false;
+ }
+ return this;
+ }
+ public SRColumn.Builder setDefaultValue(String defaultValue) {
+ this.defaultValue = defaultValue;
+ return this;
+ }
+ public SRColumn.Builder setColumnSize(Integer columnSize) {
+ this.columnSize = columnSize;
+ return this;
+ }
+ public SRColumn.Builder setDecimalDigits(String decimalDigits) {
+ if (Objects.isNull(decimalDigits)) {
+ this.decimalDigits = null;
+ }
+ else
+ {
+ this.decimalDigits = Integer.parseInt(decimalDigits);
+ }
+ return this;
+ }
+
+ public SRColumn.Builder setColumnComment(String columnComment) {
+ this.columnComment = columnComment;
+ return this;
+ }
+ public SRColumn build() {
+ return new SRColumn(
+ columnName,
+ ordinalPosition,
+ dataType,
+ isNullable,
+ defaultValue,
+ columnSize,
+ decimalDigits,
+ columnComment);
+ }
+ }
+}
diff --git a/src/main/java/com/starrocks/connector/spark/catalog/SRTable.java b/src/main/java/com/starrocks/connector/spark/catalog/SRTable.java
new file mode 100644
index 00000000..34a257b5
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/spark/catalog/SRTable.java
@@ -0,0 +1,202 @@
+// Copyright 2021-present StarRocks, Inc. All rights reserved.
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.starrocks.connector.spark.catalog;
+import com.google.common.base.Preconditions;
+
+
+import javax.annotation.Nullable;
+import java.util.*;
+
+//to describe a column of SR,info from information_schema.tables
+
+public class SRTable {
+
+
+ public enum TableType {
+ UNKNOWN,
+ DUPLICATE_KEYS,
+ AGGREGATES,
+ UNIQUE_KEYS,
+ PRIMARY_KEYS
+ }
+ //database's name
+ private final String databaseName;
+ //table's name
+ private final String tableName;
+ //SR table's type only primary can use stageMode
+ private final TableType tableType;
+ //columns describe
+ private final List columns;
+ //table's primaryKey for pks table, can more than one
+ @Nullable
+ private final List tableKeys;
+ //table properties
+ private final Map properties;
+ @Nullable
+ private final List distributionKeys;
+ @Nullable
+ private final Integer numBuckets;
+ //comment about table
+ @Nullable private final String comment;
+ private SRTable(
+ String databaseName,
+ String tableName,
+ TableType tableType,
+ List columns,
+ @Nullable List tableKeys,
+ @Nullable List distributionKeys,
+ @Nullable Integer numBuckets,
+ @Nullable String comment,
+ Map properties) {
+ Preconditions.checkNotNull(databaseName);
+ Preconditions.checkNotNull(tableName);
+ Preconditions.checkNotNull(tableType);
+ Preconditions.checkArgument(columns != null && !columns.isEmpty());
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ this.tableType = tableType;
+ this.columns = columns;
+ this.tableKeys = tableKeys;
+ this.distributionKeys = distributionKeys;
+ this.numBuckets = numBuckets;
+ this.comment = comment;
+ this.properties = Preconditions.checkNotNull(properties);
+ }
+ public String getDatabaseName() {
+ return databaseName;
+ }
+ public String getTableName() {
+ return tableName;
+ }
+ public TableType getTableType() {
+ return tableType;
+ }
+ public List getColumns() {
+ return columns;
+ }
+
+ public Optional> getTableKeys() {
+ return Optional.ofNullable(tableKeys);
+ }
+ public Optional> getDistributionKeys() {
+ return Optional.ofNullable(distributionKeys);
+ }
+ public Optional getNumBuckets() {
+ return Optional.ofNullable(numBuckets);
+ }
+ public Optional getComment() {
+ return Optional.ofNullable(comment);
+ }
+ public Map getProperties() {
+ return properties;
+ }
+
+ @Override
+ public String toString() {
+ return "StarRocksTable{" +
+ "databaseName='" + databaseName + '\'' +
+ ", tableName='" + tableName + '\'' +
+ ", tableType=" + tableType +
+ ", columns=" + columns +
+ ", tableKeys=" + tableKeys +
+ ", comment='" + comment + '\'' +
+ ", properties=" + properties +
+ '}';
+ }
+ public static class Builder {
+ private String databaseName;
+ private String tableName;
+ private TableType tableType;
+ private List columns = new ArrayList<>();
+ private List tableKeys;
+ private List distributionKeys;
+ private Integer numBuckets;
+ private String comment;
+ private Map properties = new HashMap<>();
+ public Builder setDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ return this;
+ }
+ public Builder setTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+ public Builder setTableType(String tableType) {
+ switch (tableType) {
+ case "DUPLICATE_KEYS" :
+ this.tableType = TableType.DUPLICATE_KEYS;
+ break;
+ case "AGGREGATES":
+ this.tableType = TableType.AGGREGATES;
+ break;
+ case "UNIQUE_KEYS":
+ this.tableType = TableType.UNIQUE_KEYS;
+ break;
+ case "PRIMARY_KEYS":
+ this.tableType = TableType.PRIMARY_KEYS;
+ break;
+ default:
+ this.tableType = TableType.UNKNOWN;
+ }
+ return this;
+ }
+ public Builder setTableType(TableType tableType)
+ {
+ this.tableType = tableType;
+ return this;
+ }
+ public Builder setColumns(List columns) {
+ this.columns = columns;
+ return this;
+ }
+ public Builder setTableKeys(List tableKeys) {
+ this.tableKeys = tableKeys;
+ return this;
+ }
+ public Builder setDistributionKeys(List distributionKeys) {
+ this.distributionKeys = distributionKeys;
+ return this;
+ }
+ public Builder setNumBuckets(Integer numBuckets) {
+ this.numBuckets = numBuckets;
+ return this;
+ }
+ public Builder setComment(String comment) {
+ this.comment = comment;
+ return this;
+ }
+ public Builder setTableProperties(Map properties) {
+ this.properties = properties;
+ return this;
+ }
+ public SRTable build() {
+ return new SRTable(
+ databaseName,
+ tableName,
+ tableType,
+ columns,
+ tableKeys,
+ distributionKeys,
+ numBuckets,
+ comment,
+ properties);
+ }
+ }
+
+}
diff --git a/src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java b/src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java
index 40c0d7c3..242b3cf4 100644
--- a/src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java
+++ b/src/main/java/com/starrocks/connector/spark/cfg/ConfigurationOptions.java
@@ -79,7 +79,7 @@ public interface ConfigurationOptions {
String STARROCKS_DESERIALIZE_QUEUE_SIZE = "starrocks.deserialize.queue.size";
int STARROCKS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
-
+ String STARROCKS_TEMP_DBNAME="temp_db";
static Map makeWriteCompatibleWithRead(Map options) {
// user and password compatible
Map configMap = new HashMap(options);
diff --git a/src/main/java/com/starrocks/connector/spark/exception/CatalogException.java b/src/main/java/com/starrocks/connector/spark/exception/CatalogException.java
new file mode 100644
index 00000000..1ad45fe2
--- /dev/null
+++ b/src/main/java/com/starrocks/connector/spark/exception/CatalogException.java
@@ -0,0 +1,42 @@
+// Copyright 2021-present StarRocks, Inc. All rights reserved.
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package com.starrocks.connector.spark.exception;
+
+public class CatalogException extends RuntimeException {
+
+ public CatalogException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param cause the cause.
+ */
+ public CatalogException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param message the detail message.
+ * @param cause the cause.
+ */
+ public CatalogException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/src/main/java/com/starrocks/connector/spark/sql/StarRocksTable.java b/src/main/java/com/starrocks/connector/spark/sql/StarRocksTable.java
index d1cc815c..31a550f3 100644
--- a/src/main/java/com/starrocks/connector/spark/sql/StarRocksTable.java
+++ b/src/main/java/com/starrocks/connector/spark/sql/StarRocksTable.java
@@ -75,7 +75,7 @@ public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
WriteStarRocksConfig writeConfig = new WriteStarRocksConfig(properties.getPropertyMap(), schema, starRocksSchema);
checkWriteParameter(writeConfig);
- return new StarRocksWriteBuilder(info, writeConfig);
+ return new StarRocksWriteBuilder(info, writeConfig, starRocksSchema);
}
@Override
diff --git a/src/main/java/com/starrocks/connector/spark/sql/conf/StarRocksConfigBase.java b/src/main/java/com/starrocks/connector/spark/sql/conf/StarRocksConfigBase.java
index db74c565..7e03eafa 100644
--- a/src/main/java/com/starrocks/connector/spark/sql/conf/StarRocksConfigBase.java
+++ b/src/main/java/com/starrocks/connector/spark/sql/conf/StarRocksConfigBase.java
@@ -61,14 +61,14 @@ public abstract class StarRocksConfigBase implements StarRocksConfig {
// data type mapping instead of all columns
public static final String KEY_COLUMN_TYPES = PREFIX + "column.types";
- protected final Map originOptions;
+ protected Map originOptions;
private String[] feHttpUrls;
private String feJdbcUrl;
private String username;
private String password;
- private String database;
- private String table;
+ protected String database;
+ protected String table;
@Nullable
private String[] columns;
@Nullable
@@ -77,7 +77,24 @@ public abstract class StarRocksConfigBase implements StarRocksConfig {
private int httpRequestConnectTimeoutMs;
private int httpRequestSocketTimeoutMs;
private ZoneId timeZone;
-
+ // Just for copy()
+ protected StarRocksConfigBase() {
+ }
+ protected void copy(StarRocksConfigBase other) {
+ other.originOptions = new HashMap<>(originOptions);
+ other.feHttpUrls = feHttpUrls;
+ other.feJdbcUrl = feJdbcUrl;
+ other.username = username;
+ other.password = password;
+ other.database = database;
+ other.table = table;
+ other.columns = columns;
+ other.columnTypes = columnTypes;
+ other.httpRequestRetries = httpRequestRetries;
+ other.httpRequestConnectTimeoutMs = httpRequestConnectTimeoutMs;
+ other.httpRequestSocketTimeoutMs = httpRequestSocketTimeoutMs;
+ other.timeZone = timeZone;
+ }
public StarRocksConfigBase(Map options) {
this.originOptions = new HashMap<>(options);
load();
diff --git a/src/main/java/com/starrocks/connector/spark/sql/conf/WriteStarRocksConfig.java b/src/main/java/com/starrocks/connector/spark/sql/conf/WriteStarRocksConfig.java
index 0f30f0f9..6b5007e3 100644
--- a/src/main/java/com/starrocks/connector/spark/sql/conf/WriteStarRocksConfig.java
+++ b/src/main/java/com/starrocks/connector/spark/sql/conf/WriteStarRocksConfig.java
@@ -36,12 +36,7 @@
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.Utils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
public class WriteStarRocksConfig extends StarRocksConfigBase {
@@ -65,6 +60,7 @@ public class WriteStarRocksConfig extends StarRocksConfigBase {
private static final String KEY_BUFFER_SIZE = WRITE_PREFIX + "buffer.size";
// The number of rows buffered before sending to StarRocks.
private static final String KEY_BUFFER_ROWS = WRITE_PREFIX + "buffer.rows";
+
// Flush interval of the row batch in millisecond
private static final String KEY_FLUSH_INTERVAL = WRITE_PREFIX + "flush.interval.ms";
private static final String KEY_MAX_RETIES = WRITE_PREFIX + "max.retries";
@@ -76,6 +72,17 @@ public class WriteStarRocksConfig extends StarRocksConfigBase {
private static final String KEY_NUM_PARTITIONS = WRITE_PREFIX + "num.partitions";
private static final String KEY_PARTITION_COLUMNS = WRITE_PREFIX + "partition.columns";
+ // start use stage mode or not, if not stage mode will never use
+ //decide stageMode when to use always,auto,never
+ private static final String STAGE_MODE_USE = WRITE_PREFIX + "stage.use";
+ // stage config header
+ private static final String STAGE_CONFIG = WRITE_PREFIX + "stage.";
+
+ public enum StageUse {
+ ALWAYS,
+ AUTO,
+ NEVER
+ }
private String labelPrefix = "spark";
private int socketTimeoutMs = -1;
@@ -95,7 +102,13 @@ public class WriteStarRocksConfig extends StarRocksConfigBase {
private String rowDelimiter = "\n";
private String columnSeparator = "\t";
private boolean supportTransactionStreamLoad = true;
+ private StageUse stageUse = StageUse.NEVER;
+ private Map stageConfig;
+
+ //starrocks.write.stage.columns.update.ratio
+ // a rough number describe the ratio of row ro be updated 1-100
+ //if any column in stageUpdateColumn to be updated use stage mode default ""
// According to Spark RequiresDistributionAndOrdering#requiredNumPartitions(),
// any value less than 1 mean no requirement
private int numPartitions = 0;
@@ -104,11 +117,17 @@ public class WriteStarRocksConfig extends StarRocksConfigBase {
private String streamLoadColumnProperty;
private String[] streamLoadColumnNames;
- private final Set starRocksJsonColumnNames;
+ private Set starRocksJsonColumnNames;
+ private StructType sparkSchema;
+
+ public StructType getSparkSchema() {
+ return sparkSchema;
+ }
public WriteStarRocksConfig(Map originOptions, StructType sparkSchema, StarRocksSchema starRocksSchema) {
super(originOptions);
load(sparkSchema);
+ this.sparkSchema = sparkSchema;
genStreamLoadColumns(sparkSchema, starRocksSchema);
this.starRocksJsonColumnNames = new HashSet<>();
for (StarRocksField column : starRocksSchema.getColumns()) {
@@ -130,6 +149,16 @@ private void load(StructType sparkSchema) {
flushInterval = getInt(KEY_FLUSH_INTERVAL, 300000);
maxRetries = getInt(KEY_MAX_RETIES, 3);
retryIntervalInMs = getInt(KEY_RETRY_INTERVAL_MS, 10000);
+ stageConfig= originOptions.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(STAGE_CONFIG))
+ .collect(
+ Collectors.toMap(
+ entry -> entry.getKey().replaceFirst(STAGE_CONFIG, ""),
+ Map.Entry::getValue
+ )
+ );
+
+ stageUse =StageUse.valueOf(get(STAGE_MODE_USE, "never").toUpperCase());
properties = originOptions.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(PROPS_PREFIX))
@@ -168,6 +197,39 @@ private void load(StructType sparkSchema) {
supportTransactionStreamLoad = StreamLoadUtils.isStarRocksSupportTransactionLoad(
Arrays.asList(getFeHttpUrls()), getHttpRequestConnectTimeoutMs(), getUsername(), getPassword());
}
+ private WriteStarRocksConfig() {
+ super();
+ }
+ public WriteStarRocksConfig copy(String dataBase, String table, List excludeStreamLoadProperties) {
+ WriteStarRocksConfig copyConfig = new WriteStarRocksConfig();
+
+ super.copy(copyConfig);
+ copyConfig.database = dataBase;
+ copyConfig.table = table;
+ copyConfig.labelPrefix = labelPrefix;
+ copyConfig.waitForContinueTimeoutMs = waitForContinueTimeoutMs;
+ copyConfig.ioThreadCount = ioThreadCount;
+ copyConfig.chunkLimit = chunkLimit;
+ copyConfig.scanFrequencyInMs = scanFrequencyInMs;
+ copyConfig.enableTransactionStreamLoad = enableTransactionStreamLoad;
+ copyConfig.bufferSize = bufferSize;
+ copyConfig.bufferRows = bufferRows;
+ copyConfig.flushInterval = flushInterval;
+ copyConfig.maxRetries = maxRetries;
+ copyConfig.retryIntervalInMs = retryIntervalInMs;
+ copyConfig.properties = new HashMap<>(properties);
+ excludeStreamLoadProperties.forEach(copyConfig.properties::remove);
+ copyConfig.format = format;
+ copyConfig.rowDelimiter = rowDelimiter;
+ copyConfig.columnSeparator = columnSeparator;
+ copyConfig.supportTransactionStreamLoad = supportTransactionStreamLoad;
+ copyConfig.numPartitions = numPartitions;
+ copyConfig.partitionColumns = partitionColumns;
+ copyConfig.streamLoadColumnProperty = streamLoadColumnProperty;
+ copyConfig.streamLoadColumnNames = streamLoadColumnNames;
+
+ return copyConfig;
+ }
private void genStreamLoadColumns(StructType sparkSchema, StarRocksSchema starRocksSchema) {
streamLoadColumnNames = new String[sparkSchema.length()];
@@ -223,6 +285,15 @@ private String getBitmapFunction(StructField field) {
}
}
+
+ public StageUse getStageUse() {
+ return stageUse;
+ }
+
+
+ public Map getStageConfig() {
+ return stageConfig;
+ }
public String getFormat() {
return format;
}
@@ -247,11 +318,18 @@ public Set getStarRocksJsonColumnNames() {
return starRocksJsonColumnNames;
}
+
+
+
+
public boolean isPartialUpdate() {
String val = properties.get("partial_update");
return val != null && val.equalsIgnoreCase("true");
}
-
+ public boolean isPartialUpdateColumnMode() {
+ String val = properties.get("partial_update_mode");
+ return val != null && val.equalsIgnoreCase("column");
+ }
public StreamLoadProperties toStreamLoadProperties() {
StreamLoadDataFormat dataFormat = "json".equalsIgnoreCase(format) ?
StreamLoadDataFormat.JSON : new StreamLoadDataFormat.CSVFormat(rowDelimiter);
diff --git a/src/main/java/com/starrocks/connector/spark/sql/connect/StarRocksConnector.java b/src/main/java/com/starrocks/connector/spark/sql/connect/StarRocksConnector.java
index 67e76ec6..aec5d610 100644
--- a/src/main/java/com/starrocks/connector/spark/sql/connect/StarRocksConnector.java
+++ b/src/main/java/com/starrocks/connector/spark/sql/connect/StarRocksConnector.java
@@ -19,6 +19,9 @@
package com.starrocks.connector.spark.sql.connect;
+import com.starrocks.connector.spark.catalog.SRColumn;
+import com.starrocks.connector.spark.catalog.SRTable;
+import com.starrocks.connector.spark.exception.CatalogException;
import com.starrocks.connector.spark.exception.StarRocksException;
import com.starrocks.connector.spark.sql.conf.StarRocksConfig;
import com.starrocks.connector.spark.sql.schema.StarRocksField;
@@ -26,32 +29,33 @@
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
public class StarRocksConnector {
- private static Logger logger = LoggerFactory.getLogger(StarRocksConnector.class);
+ private static final Logger logger = LoggerFactory.getLogger(StarRocksConnector.class);
private static final String TABLE_SCHEMA_QUERY =
"SELECT `COLUMN_NAME`, `ORDINAL_POSITION`, `COLUMN_KEY`, `DATA_TYPE`, `COLUMN_SIZE`, `DECIMAL_DIGITS` "
+ "FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`=? AND `TABLE_NAME`=?;";
private static final String ALL_DBS_QUERY = "show databases;";
+
private static final String LOAD_DB_QUERY =
"select SCHEMA_NAME from information_schema.schemata where SCHEMA_NAME in (?) AND CATALOG_NAME = 'def';";
+
private static final String ALL_TABLES_QUERY = "select TABLE_SCHEMA, TABLE_NAME from information_schema.tables "
+ "where TABLE_TYPE = 'BASE TABLE' AND TABLE_SCHEMA in (?) ;";
-
+ private static final String DB_EXISTS =
+ "select SCHEMA_NAME from information_schema.schemata where SCHEMA_NAME =? AND CATALOG_NAME = 'def';";
+ private static final String Table_COLUMNS_QUERY=
+ "select COLUMN_NAME,ORDINAL_POSITION,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS,COLUMN_DEFAULT,IS_NULLABLE,COLUMN_COMMENT from INFORMATION_SCHEMA.columns where TABLE_SCHEMA=? and TABLE_NAME=?";
+ private static final String TABLES_QUERY =
+ "select TABLE_SCHEMA,TABLE_NAME,TABLE_COMMENT from INFORMATION_SCHEMA.tables where TABLE_SCHEMA=? AND TABLE_NAME = ?";
+ private static final String TABLE_CONFIG_QUERY="select TABLE_MODEL,PRIMARY_KEY,DISTRIBUTE_KEY,DISTRIBUTE_BUCKET,PROPERTIES from INFORMATION_SCHEMA.tables_config where TABLE_SCHEMA=? AND TABLE_NAME = ?";
// Driver name for mysql connector 5.1 which is deprecated in 8.0
private static final String MYSQL_51_DRIVER_NAME = "com.mysql.jdbc.Driver";
// Driver name for mysql connector 8.0
@@ -110,12 +114,11 @@ public static Map loadDatabase(StarRocksConfig config, List getTables(StarRocksConfig config, List dbNames) {
- List parameters = Arrays.asList(String.join(",", dbNames));
+ List parameters = Arrays.asList(java.lang.String.join(",", dbNames));
List