diff --git a/src/main/java/com/starrocks/connector/spark/exception/NotSupportedOperationException.java b/src/main/java/com/starrocks/connector/spark/exception/NotSupportedOperationException.java new file mode 100644 index 00000000..bb5dacab --- /dev/null +++ b/src/main/java/com/starrocks/connector/spark/exception/NotSupportedOperationException.java @@ -0,0 +1,26 @@ +// Modifications Copyright 2021 StarRocks Limited. +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.connector.spark.exception; + +public class NotSupportedOperationException extends StarRocksException { + public NotSupportedOperationException(String msg) { + super(msg); + } +} diff --git a/src/main/java/com/starrocks/connector/spark/rest/models/PartitionType.java b/src/main/java/com/starrocks/connector/spark/rest/models/PartitionType.java new file mode 100644 index 00000000..78581106 --- /dev/null +++ b/src/main/java/com/starrocks/connector/spark/rest/models/PartitionType.java @@ -0,0 +1,24 @@ +// Modifications Copyright 2021 StarRocks Limited. +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.connector.spark.rest.models; + +public enum PartitionType { + NONE, LIST, RANGE, EXPRESSION +} diff --git a/src/main/java/com/starrocks/connector/spark/sql/StarRocksTable.java b/src/main/java/com/starrocks/connector/spark/sql/StarRocksTable.java index d1cc815c..ee45128d 100644 --- a/src/main/java/com/starrocks/connector/spark/sql/StarRocksTable.java +++ b/src/main/java/com/starrocks/connector/spark/sql/StarRocksTable.java @@ -26,11 +26,7 @@ import com.starrocks.connector.spark.sql.conf.WriteStarRocksConfig; import com.starrocks.connector.spark.sql.schema.StarRocksSchema; import com.starrocks.connector.spark.sql.write.StarRocksWriteBuilder; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.SupportsRead; -import org.apache.spark.sql.connector.catalog.SupportsWrite; -import org.apache.spark.sql.connector.catalog.Table; -import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.catalog.*; import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.write.LogicalWriteInfo; import org.apache.spark.sql.connector.write.WriteBuilder; @@ -105,7 +101,8 @@ private void checkWriteParameter(WriteStarRocksConfig config) { } private static final Set TABLE_CAPABILITY_SET = new HashSet<>( - Arrays.asList(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE)); + Arrays.asList(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.OVERWRITE_DYNAMIC, + TableCapability.STREAMING_WRITE, TableCapability.OVERWRITE_BY_FILTER)); @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { diff --git a/src/main/java/com/starrocks/connector/spark/sql/conf/WriteStarRocksConfig.java b/src/main/java/com/starrocks/connector/spark/sql/conf/WriteStarRocksConfig.java index 0f30f0f9..933af8e2 100644 --- a/src/main/java/com/starrocks/connector/spark/sql/conf/WriteStarRocksConfig.java +++ b/src/main/java/com/starrocks/connector/spark/sql/conf/WriteStarRocksConfig.java @@ -26,26 +26,19 @@ import com.starrocks.data.load.stream.StreamLoadUtils; import com.starrocks.data.load.stream.properties.StreamLoadProperties; import com.starrocks.data.load.stream.properties.StreamLoadTableProperties; -import org.apache.spark.sql.types.ArrayType; -import org.apache.spark.sql.types.ByteType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.IntegerType; -import org.apache.spark.sql.types.LongType; -import org.apache.spark.sql.types.ShortType; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.sql.sources.Filter; +import org.apache.spark.sql.types.*; import org.apache.spark.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; public class WriteStarRocksConfig extends StarRocksConfigBase { + private static final Logger LOG = LoggerFactory.getLogger(WriteStarRocksConfig.class); private static final long serialVersionUID = 1L; public static final String WRITE_PREFIX = PREFIX + "write."; @@ -77,6 +70,8 @@ public class WriteStarRocksConfig extends StarRocksConfigBase { private static final String KEY_NUM_PARTITIONS = WRITE_PREFIX + "num.partitions"; private static final String KEY_PARTITION_COLUMNS = WRITE_PREFIX + "partition.columns"; + private static final String KEY_OVERWRITE_PARTITION_PREFIX = WRITE_PREFIX + "overwrite.partitions."; + private String labelPrefix = "spark"; private int socketTimeoutMs = -1; private int waitForContinueTimeoutMs = 30000; @@ -106,6 +101,56 @@ public class WriteStarRocksConfig extends StarRocksConfigBase { private String[] streamLoadColumnNames; private final Set starRocksJsonColumnNames; + private boolean overwrite; + private Filter[] filters; + private String tempTableName; + + // + private Map overwritePartitions; + + // + private Map overwriteTempPartitionMappings; + // + private Map overwriteTempPartitions; + + public static final String TEMPORARY_PARTITION_SUFFIX = "_created_by_sr_spark_connector_"; + + public Map getOverwritePartitions() { + return overwritePartitions; + } + + public Map getOverwriteTempPartitionMappings() { + return overwriteTempPartitionMappings; + } + + public Map getOverwriteTempPartitions() { + return overwriteTempPartitions; + } + + public void setTempTableName(String tempTableName) { + this.tempTableName = tempTableName; + } + + public String getTempTableName() { + return tempTableName; + } + + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } + + public void setFilters(Filter[] filters) { + this.filters = filters; + } + + public Filter[] getFilters() { + return filters; + } + + public boolean isOverwrite() { + return overwrite; + } + public WriteStarRocksConfig(Map originOptions, StructType sparkSchema, StarRocksSchema starRocksSchema) { super(originOptions); load(sparkSchema); @@ -139,6 +184,40 @@ private void load(StructType sparkSchema) { Map.Entry::getValue ) ); + overwritePartitions = originOptions.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(KEY_OVERWRITE_PARTITION_PREFIX)) + .peek(entry -> { + if (StringUtils.isEmpty(entry.getValue())) { + throw new IllegalArgumentException("value of `"+ entry.getKey() +"` cannot be empty !!!"); + } + }) + .collect( + Collectors.toMap( + entry -> entry.getKey().replaceFirst(KEY_OVERWRITE_PARTITION_PREFIX, ""), + Map.Entry::getValue + ) + ); + overwriteTempPartitionMappings = overwritePartitions.entrySet().stream() + .collect( + Collectors.toMap( + entry -> entry.getKey() + TEMPORARY_PARTITION_SUFFIX + System.currentTimeMillis(), + Map.Entry::getKey + ) + ); + overwriteTempPartitions = overwriteTempPartitionMappings.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> overwritePartitions.get(entry.getValue()) + )); + if (!overwritePartitions.isEmpty()) { + String temporaryPartitionsList = String.join(",", overwriteTempPartitionMappings.keySet()); + String temporaryPartitions = "temporary_partitions"; + String oldSetting = properties.get(temporaryPartitions); + if (StringUtils.isNotEmpty(oldSetting)) { + LOG.warn("replace temporary_partitions value({}) with {}", oldSetting, temporaryPartitionsList); + } + properties.put(temporaryPartitions, temporaryPartitionsList); + } format = originOptions.getOrDefault(KEY_PROPS_FORMAT, "CSV"); rowDelimiter = DelimiterParser.convertDelimiter( originOptions.getOrDefault(KEY_PROPS_ROW_DELIMITER, "\n")); @@ -255,10 +334,15 @@ public boolean isPartialUpdate() { public StreamLoadProperties toStreamLoadProperties() { StreamLoadDataFormat dataFormat = "json".equalsIgnoreCase(format) ? StreamLoadDataFormat.JSON : new StreamLoadDataFormat.CSVFormat(rowDelimiter); - + String table; + if (isOverwrite() && getTempTableName() != null) { + table = getTempTableName(); + } else { + table = getTable(); + } StreamLoadTableProperties tableProperties = StreamLoadTableProperties.builder() .database(getDatabase()) - .table(getTable()) + .table(table) .columns(streamLoadColumnProperty) .streamLoadDataFormat(dataFormat) .chunkLimit(chunkLimit) diff --git a/src/main/java/com/starrocks/connector/spark/sql/connect/StarRocksConnector.java b/src/main/java/com/starrocks/connector/spark/sql/connect/StarRocksConnector.java index 67e76ec6..356e7d93 100644 --- a/src/main/java/com/starrocks/connector/spark/sql/connect/StarRocksConnector.java +++ b/src/main/java/com/starrocks/connector/spark/sql/connect/StarRocksConnector.java @@ -19,26 +19,19 @@ package com.starrocks.connector.spark.sql.connect; +import com.google.common.annotations.VisibleForTesting; import com.starrocks.connector.spark.exception.StarRocksException; +import com.starrocks.connector.spark.rest.models.PartitionType; import com.starrocks.connector.spark.sql.conf.StarRocksConfig; +import com.starrocks.connector.spark.sql.conf.WriteStarRocksConfig; import com.starrocks.connector.spark.sql.schema.StarRocksField; import com.starrocks.connector.spark.sql.schema.StarRocksSchema; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.sql.*; +import java.util.*; public class StarRocksConnector { private static Logger logger = LoggerFactory.getLogger(StarRocksConnector.class); @@ -81,6 +74,43 @@ public static StarRocksSchema getSchema(StarRocksConfig config) { return new StarRocksSchema(columns, pks); } + public static PartitionType getPartitionType(StarRocksConfig config) { + String showCreateTableDDL = String.format("SHOW CREATE TABLE `%s`.`%s`", config.getDatabase(), config.getTable()); + String createTableDDL = ""; + try (Connection conn = createJdbcConnection(config.getFeJdbcUrl(), config.getUsername(), config.getPassword()); + PreparedStatement ps = conn.prepareStatement(showCreateTableDDL)) { + ResultSet rs = ps.executeQuery(); + if (rs.next()) { + createTableDDL = rs.getString(2); + } + rs.close(); + } catch (Exception e) { + throw new IllegalStateException("show create table ddl by sql error, " + e.getMessage(), e); + } + return createTableDDL.contains("PARTITION BY RANGE(") ? + PartitionType.RANGE: + createTableDDL.contains("PARTITION BY LIST(") ? + PartitionType.LIST: + createTableDDL.contains("PARTITION BY") ? + PartitionType.EXPRESSION : PartitionType.NONE; + } + + public static boolean isDynamicPartitionTable(StarRocksConfig config) { + String showCreateTableDDL = String.format("SHOW CREATE TABLE `%s`.`%s`", config.getDatabase(), config.getTable()); + String createTableDDL = ""; + try (Connection conn = createJdbcConnection(config.getFeJdbcUrl(), config.getUsername(), config.getPassword()); + PreparedStatement ps = conn.prepareStatement(showCreateTableDDL)) { + ResultSet rs = ps.executeQuery(); + if (rs.next()) { + createTableDDL = rs.getString(2); + } + rs.close(); + } catch (Exception e) { + throw new IllegalStateException("show create table ddl by sql error, " + e.getMessage(), e); + } + return createTableDDL.contains("\"dynamic_partition.enable\" = \"true\""); + } + public static List getDatabases(StarRocksConfig config) { List> dbs = extractColumnValuesBySql(config, ALL_DBS_QUERY, Arrays.asList()); List dbNames = new ArrayList<>(); @@ -128,7 +158,8 @@ public static Map getTables(StarRocksConfig config, List return table2Db; } - private static Connection createJdbcConnection(String jdbcUrl, String username, String password) throws Exception { + @VisibleForTesting + public static Connection createJdbcConnection(String jdbcUrl, String username, String password) throws Exception { try { Class.forName(MYSQL_80_DRIVER_NAME); } catch (ClassNotFoundException e) { @@ -182,4 +213,57 @@ private static List> extractColumnValuesBySql(StarRocksConfi return columnValues; } + private static boolean executeSql(StarRocksConfig config, String sql, String errorMsg) { + try (Connection conn = createJdbcConnection(config.getFeJdbcUrl(), config.getUsername(), config.getPassword()); + Statement statement = conn.createStatement()) { + return statement.execute(sql); + } catch (Exception e) { + throw new IllegalStateException(errorMsg + " , sql: " + sql + " , " + e.getMessage(), e); + } + } + + public static boolean createTableBySql(StarRocksConfig config, String sql) { + return executeSql(config, sql, "create table by sql error"); + } + + public static boolean createTemporaryPartitionBySql(StarRocksConfig config, String sql) { + return executeSql(config, sql, "create temporary partition by sql error"); + } + + public static boolean createPartitionBySql(StarRocksConfig config, String sql) { + return executeSql(config, sql, "create partition by sql error"); + } + + public static boolean dropAndCreatePartitionBySql(StarRocksConfig config, String sql, String overwritePartition) { + String queryPartitionDDL = "SELECT DB_NAME, TABLE_NAME, PARTITION_NAME, PARTITION_KEY, PARTITION_VALUE FROM `information_schema`.`partitions_meta` WHERE IS_TEMP = 1 AND " + + "DB_NAME = ? AND TABLE_NAME = ? AND PARTITION_NAME LIKE ?"; + List> existsPartitions = extractColumnValuesBySql(config, queryPartitionDDL, + Arrays.asList(config.getDatabase(), config.getTable(), overwritePartition + WriteStarRocksConfig.TEMPORARY_PARTITION_SUFFIX + "%")); + existsPartitions.forEach(partition -> { + String partitionName = partition.get("PARTITION_NAME"); + String partitionValue = partition.get("PARTITION_VALUE"); + logger.info("exists partition {} with value : {}, drop it ...", partitionName, partitionValue); + String dropTempPartitionDDL = String.format("ALTER TABLE `%s`.`%s` DROP TEMPORARY PARTITION IF EXISTS %s", + config.getDatabase(), config.getTable(), partitionName); + dropTempPartitionBySql(config, dropTempPartitionDDL); + } + ); + return executeSql(config, sql, "create partition by sql error"); + } + + public static boolean swapTableBySql(StarRocksConfig config, String sql) { + return executeSql(config, sql, "swap table by sql error"); + } + + public static boolean replacePartitionBySql(StarRocksConfig config, String sql) { + return executeSql(config, sql, "replace partition by sql error"); + } + + public static boolean dropTempPartitionBySql(StarRocksConfig config, String sql) { + return executeSql(config, sql, "drop temporary partition by sql error"); + } + + public static boolean dropTableBySql(StarRocksConfig config, String sql) { + return executeSql(config, sql, "drop table by sql error"); + } } diff --git a/src/main/java/com/starrocks/connector/spark/sql/schema/InferSchema.java b/src/main/java/com/starrocks/connector/spark/sql/schema/InferSchema.java index 9346b3e7..191332fa 100644 --- a/src/main/java/com/starrocks/connector/spark/sql/schema/InferSchema.java +++ b/src/main/java/com/starrocks/connector/spark/sql/schema/InferSchema.java @@ -127,6 +127,7 @@ static DataType inferDataType(StarRocksField field) { return DataTypes.createDecimalType(field.getPrecision(), field.getScale()); case "char": case "varchar": + case "varbinary": case "string": case "json": return DataTypes.StringType; diff --git a/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksDataWriter.java b/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksDataWriter.java index 950a84e9..90d6f76f 100644 --- a/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksDataWriter.java +++ b/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksDataWriter.java @@ -47,6 +47,9 @@ public class StarRocksDataWriter implements DataWriter, Serializabl private final RowStringConverter converter; private final StreamLoadManager manager; + private final String database; + private final String table; + public StarRocksDataWriter(WriteStarRocksConfig config, StructType schema, int partitionId, @@ -65,6 +68,9 @@ public StarRocksDataWriter(WriteStarRocksConfig config, throw new RuntimeException("Unsupported format " + config.getFormat()); } this.manager = new StreamLoadManagerV2(config.toStreamLoadProperties(), true); + this.database = config.getDatabase(); + this.table = (config.isOverwrite() && config.getTempTableName() != null ? + config.getTempTableName(): config.getTable()); } public void open() { @@ -76,8 +82,7 @@ public void open() { @Override public void write(InternalRow internalRow) throws IOException { String data = converter.fromRow(internalRow); - manager.write(null, config.getDatabase(), config.getTable(), data); - + manager.write(null, this.database, this.table, data); log.debug("partitionId: {}, taskId: {}, epochId: {}, receive raw row: {}", partitionId, taskId, epochId, internalRow); log.debug("partitionId: {}, taskId: {}, epochId: {}, receive converted row: {}", diff --git a/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksWrite.java b/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksWrite.java index c03204f8..29d2220d 100644 --- a/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksWrite.java +++ b/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksWrite.java @@ -19,17 +19,21 @@ package com.starrocks.connector.spark.sql.write; +import com.starrocks.connector.spark.exception.NotSupportedOperationException; +import com.starrocks.connector.spark.rest.models.PartitionType; import com.starrocks.connector.spark.sql.conf.WriteStarRocksConfig; -import org.apache.spark.sql.connector.write.BatchWrite; -import org.apache.spark.sql.connector.write.DataWriterFactory; -import org.apache.spark.sql.connector.write.LogicalWriteInfo; -import org.apache.spark.sql.connector.write.PhysicalWriteInfo; -import org.apache.spark.sql.connector.write.WriterCommitMessage; +import com.starrocks.connector.spark.sql.connect.StarRocksConnector; +import org.apache.spark.sql.connector.write.*; import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory; import org.apache.spark.sql.connector.write.streaming.StreamingWrite; +import org.apache.spark.sql.sources.AlwaysTrue; +import org.apache.spark.sql.sources.Filter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.SimpleDateFormat; +import java.util.Date; + public class StarRocksWrite implements BatchWrite, StreamingWrite { private static final Logger log = LoggerFactory.getLogger(StarRocksWrite.class); @@ -44,9 +48,65 @@ public StarRocksWrite(LogicalWriteInfo logicalInfo, WriteStarRocksConfig config) @Override public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) { + createTemporaryPartitionOrTable(config); return new StarRocksWriterFactory(logicalInfo.schema(), config); } + private void createTemporaryPartitionOrTable(WriteStarRocksConfig config) { + if (!config.isOverwrite()) { + return; + } + String database = config.getDatabase(); + String table = config.getTable(); + + Filter[] filters = config.getFilters(); + if (filters.length == 1 && filters[0] instanceof AlwaysTrue + && config.getOverwriteTempPartitions().isEmpty()) { + SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss"); + String tempTable = table + WriteStarRocksConfig.TEMPORARY_PARTITION_SUFFIX + format.format(new Date()); + String createTempTableDDL = "CREATE TABLE " + database + "." + tempTable +" LIKE " + database + "." + table; + log.info("prepare to execute: " + createTempTableDDL); + StarRocksConnector.createTableBySql(config, createTempTableDDL); + config.setTempTableName(tempTable); + } else if (config.isOverwrite() && !config.getOverwriteTempPartitions().isEmpty()) { + PartitionType partitionType = StarRocksConnector.getPartitionType(config); + if (PartitionType.NONE.equals(partitionType)) { + throw new NotSupportedOperationException("Overwriting partition only supports list/range partitioning," + + " not support none partitioning table !!!"); + } else if (PartitionType.EXPRESSION.equals(partitionType)) { + throw new NotSupportedOperationException("Overwriting partition only supports list/range partitioning," + + " not support expression/automatic partitioning !!!"); + } + config.getOverwriteTempPartitions().forEach((tempPartition, partitionExpr) -> { + String addTempPartitionDDL = getAddTemporaryPartitionDDL(config, tempPartition, partitionExpr, partitionType); + try { + log.info("prepare to execute: " + addTempPartitionDDL); + StarRocksConnector.createTemporaryPartitionBySql(config, addTempPartitionDDL); + } catch (Exception e) { + if (e.getMessage().contains("Duplicate partition")) { + String overwritePartition = config.getOverwriteTempPartitionMappings().get(tempPartition); + StarRocksConnector.dropAndCreatePartitionBySql(config, addTempPartitionDDL, overwritePartition); + } else { + throw e; + } + } + }); + } + } + + private static String getAddTemporaryPartitionDDL( + WriteStarRocksConfig config, String tempPartition, String partitionExpr, PartitionType partitionType) { + String addTemporaryPartitionDDL; + if (PartitionType.LIST.equals(partitionType)) { // list partition + addTemporaryPartitionDDL = String.format("ALTER TABLE `%s`.`%s` ADD TEMPORARY PARTITION %s VALUES IN %s", + config.getDatabase(), config.getTable(), tempPartition, partitionExpr); + } else { // range partition + addTemporaryPartitionDDL = String.format("ALTER TABLE `%s`.`%s` ADD TEMPORARY PARTITION %s VALUES %s", + config.getDatabase(), config.getTable(), tempPartition, partitionExpr); + } + return addTemporaryPartitionDDL; + } + @Override public boolean useCommitCoordinator() { return true; @@ -55,11 +115,54 @@ public boolean useCommitCoordinator() { @Override public void commit(WriterCommitMessage[] messages) { log.info("batch query `{}` commit", logicalInfo.queryId()); + if (config.isOverwrite() && config.getTempTableName() != null) { + StarRocksConnector.swapTableBySql(config, + "ALTER TABLE " + config.getDatabase() +"." + config.getTable() + " SWAP WITH " + config.getTempTableName()); + StarRocksConnector.dropTableBySql(config, + "DROP TABLE IF EXISTS " + config.getDatabase() + "." + config.getTempTableName() + " FORCE"); + } else if (config.isOverwrite() && !config.getOverwritePartitions().isEmpty()) { + PartitionType partitionType = StarRocksConnector.getPartitionType(config); + boolean dynamicPartitionTable = StarRocksConnector.isDynamicPartitionTable(config); + if (!dynamicPartitionTable) { + config.getOverwritePartitions().forEach((partitionName, partitionValue) -> { + String addPartitionDDL; + if (PartitionType.LIST.equals(partitionType)) { + addPartitionDDL = String.format("ALTER TABLE `%s`.`%s` ADD PARTITION IF NOT EXISTS %s VALUES IN %s", + config.getDatabase(), config.getTable(), partitionName, partitionValue); + } else { + addPartitionDDL = String.format("ALTER TABLE `%s`.`%s` ADD PARTITION IF NOT EXISTS %s VALUES %s", + config.getDatabase(), config.getTable(), partitionName, partitionValue); + } + log.info("prepare to execute: " + addPartitionDDL); + StarRocksConnector.createPartitionBySql(config, addPartitionDDL); + }); + } else { + log.info("no need create partition for dynamic partition table"); + } + config.getOverwriteTempPartitionMappings().forEach((tempPartitionName, partitionName) -> { + String replacePartitionDDL = String.format( + "ALTER TABLE `%s`.`%s` REPLACE PARTITION (`%s`) WITH TEMPORARY PARTITION (`%s`)", + config.getDatabase(), config.getTable(), partitionName, tempPartitionName); + log.info("prepare to execute: " + replacePartitionDDL); + StarRocksConnector.replacePartitionBySql(config, replacePartitionDDL); + }); + } } @Override public void abort(WriterCommitMessage[] messages) { log.info("batch query `{}` abort", logicalInfo.queryId()); + if (config.isOverwrite() && config.getTempTableName() != null) { + StarRocksConnector.dropTableBySql(config, + "DROP TABLE IF EXISTS " + config.getDatabase() + "." + config.getTempTableName() + " FORCE"); + } else if (config.isOverwrite() && !config.getOverwritePartitions().isEmpty()) { + config.getOverwriteTempPartitions().keySet().forEach(tempPartition -> { + String dropTempPartitionDDL = String.format( + "ALTER TABLE `%s`.`%s` DROP TEMPORARY PARTITION IF EXISTS %s", + config.getDatabase(), config.getTable(), tempPartition); + StarRocksConnector.dropTempPartitionBySql(config, dropTempPartitionDDL); + }); + } } @Override diff --git a/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksWriteBuilder.java b/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksWriteBuilder.java index 4ae4418e..5a3a3eb7 100644 --- a/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksWriteBuilder.java +++ b/src/main/java/com/starrocks/connector/spark/sql/write/StarRocksWriteBuilder.java @@ -25,14 +25,16 @@ import org.apache.spark.sql.connector.expressions.Expression; import org.apache.spark.sql.connector.expressions.Expressions; import org.apache.spark.sql.connector.expressions.SortOrder; -import org.apache.spark.sql.connector.write.BatchWrite; -import org.apache.spark.sql.connector.write.LogicalWriteInfo; -import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering; -import org.apache.spark.sql.connector.write.Write; -import org.apache.spark.sql.connector.write.WriteBuilder; +import org.apache.spark.sql.connector.write.*; import org.apache.spark.sql.connector.write.streaming.StreamingWrite; +import org.apache.spark.sql.sources.AlwaysTrue; +import org.apache.spark.sql.sources.Filter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class StarRocksWriteBuilder implements WriteBuilder { +public class StarRocksWriteBuilder implements WriteBuilder, SupportsOverwrite, SupportsDynamicOverwrite { + + private Logger logger = LoggerFactory.getLogger(getClass()); private final LogicalWriteInfo info; private final WriteStarRocksConfig config; @@ -46,6 +48,22 @@ public Write build() { return new StarRocksWriteImpl(info, config); } + @Override + public WriteBuilder overwrite(Filter[] filters) { + logger.info("invoke overwrite ...."); + config.setOverwrite(true); + config.setFilters(filters); + return this; + } + + @Override + public WriteBuilder overwriteDynamicPartitions() { + logger.info("invoke overwriteDynamicPartitions ...."); + config.setOverwrite(true); + config.setFilters(new Filter[]{new AlwaysTrue()}); + return this; + } + private static class StarRocksWriteImpl implements Write, RequiresDistributionAndOrdering { private final LogicalWriteInfo info; diff --git a/src/test/scala/com/starrocks/connector/spark/sql/TestOverwrite.scala b/src/test/scala/com/starrocks/connector/spark/sql/TestOverwrite.scala new file mode 100644 index 00000000..703f9a5a --- /dev/null +++ b/src/test/scala/com/starrocks/connector/spark/sql/TestOverwrite.scala @@ -0,0 +1,1005 @@ +// Modifications Copyright 2021 StarRocks Limited. +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.connector.spark.sql + +import com.starrocks.connector.spark.exception.NotSupportedOperationException +import com.starrocks.connector.spark.sql.conf.WriteStarRocksConfig +import com.starrocks.connector.spark.sql.connect.StarRocksConnector +import org.apache.spark.sql.SparkSession +import org.junit.jupiter.api.{Assertions, Test} +import org.slf4j.LoggerFactory + +import java.sql.{Connection, ResultSet, Statement} +import java.text.SimpleDateFormat +import java.util.Date + +/** + * test on local docker environment, spark version 3.3, 3.4, 3.5 + * docker run -p 9030:9030 -p 8030:8030 -p 8040:8040 --name quickstart-3.1.14 -itd starrocks/allin1-ubuntu:3.1.14 + * docker run -p 9030:9030 -p 8030:8030 -p 8040:8040 --name quickstart-3.2.12 -itd starrocks/allin1-ubuntu:3.2.12 + * docker run -p 9030:9030 -p 8030:8030 -p 8040:8040 --name quickstart-3.3.2 -itd starrocks/allin1-ubuntu:3.3.2 + */ +class TestOverwrite extends ExpectedExceptionTest { + private lazy val logger = LoggerFactory.getLogger(classOf[TestOverwrite]) + + private val STARROCKS_FE_HTTP_URL = "127.0.0.1:8030" + private val STARROCKS_FE_JDBC_URL = "jdbc:mysql://127.0.0.1:9030" + private val STARROCKS_FE_JDBC_USER = "root" + private val STARROCKS_FE_JDBC_PASSWORD = "" + + @Test + def testFullTableOverwrite(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`score_board`") + val createTableDDL = + """ + |CREATE TABLE IF NOT EXISTS `test`.`score_board` + |( + | `id` int(11) NOT NULL COMMENT "", + | `name` varchar(65533) NULL DEFAULT "" COMMENT "", + | `score` int(11) NOT NULL DEFAULT "0" COMMENT "" + |) + |ENGINE=OLAP + |PRIMARY KEY(`id`) + |COMMENT "OLAP" + |DISTRIBUTED BY HASH(`id`) + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`score_board` values (1, 'spark', 100)") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + val data = Seq((5, "starrocks", 103), (6, "spark", 103)) + val df = data.toDF("id", "name", "score") + + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.write.format("starrocks") + .option("starrocks.fe.http.url", STARROCKS_FE_HTTP_URL) + .option("starrocks.fe.jdbc.url", STARROCKS_FE_JDBC_URL) + .option("starrocks.user", STARROCKS_FE_JDBC_USER) + .option("starrocks.password", STARROCKS_FE_JDBC_PASSWORD) + .option("starrocks.table.identifier", "test.score_board") + .mode("overwrite") + .save() + + rs = statement.executeQuery("select id, name, score from test.score_board order by id asc") + if (rs.next()) { + Assertions.assertEquals(5, rs.getInt("id")) + Assertions.assertEquals("starrocks", rs.getString("name")) + Assertions.assertEquals(103, rs.getInt("score")) + } + + if (rs.next()) { + Assertions.assertEquals(6, rs.getInt("id")) + Assertions.assertEquals("spark", rs.getString("name")) + Assertions.assertEquals(103, rs.getInt("score")) + } + } finally { + try { + dropTable(statement, "`test`.`score_board`") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + def dropTable(statement: Statement, table: String): Unit = { + if (statement != null) { + statement.execute(s"drop table if exists $table") + } + } + def releaseConn(conn: Connection, statement: Statement, rs: ResultSet): Unit = { + if (conn != null) { + try { + conn.close() + } catch { + case e: Throwable => logger.error("close connection error", e) + } + } + + if (statement != null) { + try { + statement.close() + } catch { + case e: Throwable => logger.error("close statement error", e) + } + } + + if (rs != null) { + try { + rs.close() + } catch { + case e: Throwable => logger.error("close resultSet error", e) + } + } + } + + @Test + def testFullTableOverwriteWithEx(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`score_board`") + val createTableDDL = + """ + |CREATE TABLE IF NOT EXISTS `test`.`score_board` + |( + | `id` int(11) NOT NULL COMMENT "", + | `name` varchar(65533) NULL DEFAULT "" COMMENT "", + | `score` int(11) NOT NULL DEFAULT "0" COMMENT "" + |) + |ENGINE=OLAP + |PRIMARY KEY(`id`) + |COMMENT "OLAP" + |DISTRIBUTED BY HASH(`id`) + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`score_board` values (1, 'spark', 100)") + try { + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + val data = Seq((5, "starrocks", 103), (6, "spark", 103)) + val frame = data.toDF("id", "name", "score") + val encoder = frame.encoder + val df = frame.map(x => { + if (x.getInt(0) == 6) { + throw new RuntimeException() + } + x + })(encoder) + + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.write.format("starrocks") + .option("starrocks.fe.http.url", STARROCKS_FE_HTTP_URL) + .option("starrocks.fe.jdbc.url", STARROCKS_FE_JDBC_URL) + .option("starrocks.user", STARROCKS_FE_JDBC_USER) + .option("starrocks.password", STARROCKS_FE_JDBC_PASSWORD) + .option("starrocks.table.identifier", "test.score_board") + .mode("overwrite") + .save() + } catch { + case e: Throwable => logger.error("error occurs", e) + } + + rs = statement.executeQuery("select id, name, score from test.score_board order by id asc") + if (rs.next()) { + Assertions.assertEquals(1, rs.getInt("id")) + Assertions.assertEquals("spark", rs.getString("name")) + Assertions.assertEquals(100, rs.getInt("score")) + } + } finally { + try { + dropTable(statement, "`test`.`score_board`") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwrite(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`t_recharge_detail1`") + val createTableDDL = + """ + |CREATE TABLE `test`.`t_recharge_detail1` ( + | id bigint, + | user_id bigint, + | city varchar(20) not null, + | dt varchar(20) not null + |) + |DUPLICATE KEY(id) + |PARTITION BY LIST (city) ( + | PARTITION pLos_Angeles VALUES IN ("Los Angeles"), + | PARTITION pSan_Francisco VALUES IN ("San Francisco") + |) + |DISTRIBUTED BY HASH(`id`); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`t_recharge_detail1` values (1, 1, 'Los Angeles', '20241107')," + + " (2, 2, 'San Francisco', '20241101')") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + // + val data = Seq((3, 3, "Los Angeles", "20241107"), (2, 2, "Los Angeles", "20241106")) + val df = data.toDF("id", "user_id", "city", "dt") + + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.write.format("starrocks") + .option("starrocks.fe.http.url", STARROCKS_FE_HTTP_URL) + .option("starrocks.fe.jdbc.url", STARROCKS_FE_JDBC_URL) + .option("starrocks.user", STARROCKS_FE_JDBC_USER) + .option("starrocks.password", STARROCKS_FE_JDBC_PASSWORD) + .option("starrocks.table.identifier", "test.t_recharge_detail1") + .option("starrocks.write.overwrite.partitions.pLos_Angeles", "(\"Los Angeles\")") + .mode("overwrite") + .save() + + rs = statement.executeQuery( + "select id, user_id, city, dt from test.t_recharge_detail1 where city = 'Los Angeles' order by id asc") + if (rs.next()) { + Assertions.assertEquals(2, rs.getInt("id")) + Assertions.assertEquals(2, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241106", rs.getString("dt")) + } + + if (rs.next()) { + Assertions.assertEquals(3, rs.getInt("id")) + Assertions.assertEquals(3, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241107", rs.getString("dt")) + } + } finally { + try { + dropTable(statement, "test.t_recharge_detail1") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwriteViaSql(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`t_recharge_detail1`") + val createTableDDL = + """ + |CREATE TABLE `test`.`t_recharge_detail1` ( + | id bigint, + | user_id bigint, + | city varchar(20) not null, + | dt varchar(20) not null + |) + |DUPLICATE KEY(id) + |PARTITION BY LIST (city) ( + | PARTITION pLos_Angeles VALUES IN ("Los Angeles"), + | PARTITION pSan_Francisco VALUES IN ("San Francisco") + |) + |DISTRIBUTED BY HASH(`id`); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`t_recharge_detail1` values (1, 1, 'Los Angeles', '20241107')," + + " (2, 2, 'San Francisco', '20241101')") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + // + val data = Seq((3, 3, "Los Angeles", "20241107"), (2, 2, "Los Angeles", "20241106")) + val df = data.toDF("id", "user_id", "city", "dt") + val view = s""" + |CREATE OR REPLACE TEMPORARY VIEW `sr_connector_performance_test` + |USING starrocks + |OPTIONS( + |"starrocks.fe.http.url" = "${STARROCKS_FE_HTTP_URL}", + |"starrocks.fe.jdbc.url" = "${STARROCKS_FE_JDBC_URL}", + |"starrocks.table.identifier" = "test.t_recharge_detail1", + |"starrocks.user" = "${STARROCKS_FE_JDBC_USER}", + |"starrocks.password" = "${STARROCKS_FE_JDBC_PASSWORD}", + |"starrocks.write.overwrite.partitions.pLos_Angeles" = "(\\"Los Angeles\\")" + |); + |""".stripMargin + spark.sql(view) + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.registerTempTable("test123") + spark.sql("insert overwrite sr_connector_performance_test select * from test123") + + rs = statement.executeQuery( + "select id, user_id, city, dt from test.t_recharge_detail1 where city = 'Los Angeles' order by id asc") + if (rs.next()) { + Assertions.assertEquals(2, rs.getInt("id")) + Assertions.assertEquals(2, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241106", rs.getString("dt")) + } + + if (rs.next()) { + Assertions.assertEquals(3, rs.getInt("id")) + Assertions.assertEquals(3, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241107", rs.getString("dt")) + } + } finally { + try { + dropTable(statement, "test.t_recharge_detail1") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwriteMultiPartitions(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`t_recharge_detail1`") + val createTableDDL = + """ + |CREATE TABLE `test`.`t_recharge_detail1` ( + | id bigint, + | user_id bigint, + | city varchar(20) not null, + | dt varchar(20) not null + |) + |DUPLICATE KEY(id) + |PARTITION BY LIST (city) ( + | PARTITION pLos_Angeles VALUES IN ("Los Angeles"), + | PARTITION pSan_Francisco VALUES IN ("San Francisco") + |) + |DISTRIBUTED BY HASH(`id`); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`t_recharge_detail1` values (1, 1, 'Los Angeles', '20241107')," + + " (2, 2, 'San Francisco', '20241101')") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + // + val data = Seq( + (3, 3, "Los Angeles", "20241107"), + (2, 2, "Los Angeles", "20241106"), + (5, 5, "San Francisco", "20241108")) + val df = data.toDF("id", "user_id", "city", "dt") + + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.write.format("starrocks") + .option("starrocks.fe.http.url", STARROCKS_FE_HTTP_URL) + .option("starrocks.fe.jdbc.url", STARROCKS_FE_JDBC_URL) + .option("starrocks.user", STARROCKS_FE_JDBC_USER) + .option("starrocks.password", STARROCKS_FE_JDBC_PASSWORD) + .option("starrocks.table.identifier", "test.t_recharge_detail1") + .option("starrocks.write.overwrite.partitions.pLos_Angeles", "(\"Los Angeles\")") + .option("starrocks.write.overwrite.partitions.pSan_Francisco", "(\"San Francisco\")") + .mode("overwrite") + .save() + + rs = statement.executeQuery( + "select id, user_id, city, dt from test.t_recharge_detail1 where city = 'Los Angeles' order by id asc") + if (rs.next()) { + Assertions.assertEquals(2, rs.getInt("id")) + Assertions.assertEquals(2, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241106", rs.getString("dt")) + } + + if (rs.next()) { + Assertions.assertEquals(3, rs.getInt("id")) + Assertions.assertEquals(3, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241107", rs.getString("dt")) + } + + if (rs.next()) { + Assertions.assertEquals(5, rs.getInt("id")) + Assertions.assertEquals(5, rs.getInt("user_id")) + Assertions.assertEquals("San Francisco", rs.getString("city")) + Assertions.assertEquals("20241108", rs.getString("dt")) + } + } finally { + try { + dropTable(statement, "test.t_recharge_detail1") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwriteMultiPartitionsViaSql(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`t_recharge_detail1`") + val createTableDDL = + """ + |CREATE TABLE `test`.`t_recharge_detail1` ( + | id bigint, + | user_id bigint, + | city varchar(20) not null, + | dt varchar(20) not null + |) + |DUPLICATE KEY(id) + |PARTITION BY LIST (city) ( + | PARTITION pLos_Angeles VALUES IN ("Los Angeles"), + | PARTITION pSan_Francisco VALUES IN ("San Francisco") + |) + |DISTRIBUTED BY HASH(`id`); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`t_recharge_detail1` values (1, 1, 'Los Angeles', '20241107')," + + " (2, 2, 'San Francisco', '20241101')") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + // + val data = Seq( + (3, 3, "Los Angeles", "20241107"), + (2, 2, "Los Angeles", "20241106"), + (5, 5, "San Francisco", "20241108")) + val df = data.toDF("id", "user_id", "city", "dt") + + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + val view = s""" + |CREATE OR REPLACE TEMPORARY VIEW `sr_connector_performance_test` + |USING starrocks + |OPTIONS( + |"starrocks.fe.http.url" = "${STARROCKS_FE_HTTP_URL}", + |"starrocks.fe.jdbc.url" = "${STARROCKS_FE_JDBC_URL}", + |"starrocks.table.identifier" = "test.t_recharge_detail1", + |"starrocks.user" = "${STARROCKS_FE_JDBC_USER}", + |"starrocks.password" = "${STARROCKS_FE_JDBC_PASSWORD}", + |"starrocks.write.overwrite.partitions.pLos_Angeles" = "(\\"Los Angeles\\")", + |"starrocks.write.overwrite.partitions.pSan_Francisco" = "(\\"San Francisco\\")" + |); + |""".stripMargin + spark.sql(view) + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.registerTempTable("test123") + spark.sql("insert overwrite sr_connector_performance_test select * from test123") + + rs = statement.executeQuery( + "select id, user_id, city, dt from test.t_recharge_detail1 where city = 'Los Angeles' order by id asc") + if (rs.next()) { + Assertions.assertEquals(2, rs.getInt("id")) + Assertions.assertEquals(2, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241106", rs.getString("dt")) + } + + if (rs.next()) { + Assertions.assertEquals(3, rs.getInt("id")) + Assertions.assertEquals(3, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241107", rs.getString("dt")) + } + + if (rs.next()) { + Assertions.assertEquals(5, rs.getInt("id")) + Assertions.assertEquals(5, rs.getInt("user_id")) + Assertions.assertEquals("San Francisco", rs.getString("city")) + Assertions.assertEquals("20241108", rs.getString("dt")) + } + } finally { + try { + dropTable(statement, "test.t_recharge_detail1") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwriteWithEx(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`t_recharge_detail1`") + val createTableDDL = + """ + |CREATE TABLE `test`.`t_recharge_detail1` ( + | id bigint, + | user_id bigint, + | city varchar(20) not null, + | dt varchar(20) not null + |) + |DUPLICATE KEY(id) + |PARTITION BY LIST (city) ( + | PARTITION pLos_Angeles VALUES IN ("Los Angeles"), + | PARTITION pSan_Francisco VALUES IN ("San Francisco") + |) + |DISTRIBUTED BY HASH(`id`); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`t_recharge_detail1` values (1, 1, 'Los Angeles', '20241107')," + + " (2, 2, 'San Francisco', '20241101')") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + // + try { + val data = Seq((3, 3, "Los Angeles", "20241107"), (2, 2, "Los Angeles", "20241106")) + val frame = data.toDF("id", "user_id", "city", "dt") + val encoder = frame.encoder + val df = frame.map(x => { + if (x.getInt(0) == 2) { + throw new RuntimeException() + } + x + })(encoder) + + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.write.format("starrocks") + .option("starrocks.fe.http.url", STARROCKS_FE_HTTP_URL) + .option("starrocks.fe.jdbc.url", STARROCKS_FE_JDBC_URL) + .option("starrocks.user", STARROCKS_FE_JDBC_USER) + .option("starrocks.password", STARROCKS_FE_JDBC_PASSWORD) + .option("starrocks.table.identifier", "test.t_recharge_detail1") + .option("starrocks.write.overwrite.partitions.pLos_Angeles", "(\"Los Angeles\")") + .mode("overwrite") + .save() + } catch { + case e: Throwable => logger.error("error occurs", e) + } + + rs = statement.executeQuery( + "select id, user_id, city, dt from test.t_recharge_detail1 where city = 'Los Angeles' order by id asc") + if (rs.next()) { + Assertions.assertEquals(1, rs.getInt("id")) + Assertions.assertEquals(1, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241107", rs.getString("dt")) + } + } finally { + try { + dropTable(statement, "test.t_recharge_detail1") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwriteWithExistsTemporaryPartition(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`t_recharge_detail1`") + val createTableDDL = + """ + |CREATE TABLE `test`.`t_recharge_detail1` ( + | id bigint, + | user_id bigint, + | city varchar(20) not null, + | dt varchar(20) not null + |) + |DUPLICATE KEY(id) + |PARTITION BY LIST (city) ( + | PARTITION pLos_Angeles VALUES IN ("Los Angeles"), + | PARTITION pSan_Francisco VALUES IN ("San Francisco") + |) + |DISTRIBUTED BY HASH(`id`); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`t_recharge_detail1` values (1, 1, 'Los Angeles', '20241107')," + + " (2, 2, 'San Francisco', '20241101')") + statement.execute("ALTER TABLE `test`.`t_recharge_detail1` ADD TEMPORARY PARTITION pLos_Angeles" + WriteStarRocksConfig.TEMPORARY_PARTITION_SUFFIX + + System.currentTimeMillis() + " VALUES IN (\"Los Angeles\")") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + // + val data = Seq((3, 3, "Los Angeles", "20241107"), (2, 2, "Los Angeles", "20241106")) + val df = data.toDF("id", "user_id", "city", "dt") + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.write.format("starrocks") + .option("starrocks.fe.http.url", STARROCKS_FE_HTTP_URL) + .option("starrocks.fe.jdbc.url", STARROCKS_FE_JDBC_URL) + .option("starrocks.user", STARROCKS_FE_JDBC_USER) + .option("starrocks.password", STARROCKS_FE_JDBC_PASSWORD) + .option("starrocks.table.identifier", "test.t_recharge_detail1") + .option("starrocks.write.overwrite.partitions.pLos_Angeles", "(\"Los Angeles\")") + .mode("overwrite") + .save() + + rs = statement.executeQuery( + "select id, user_id, city, dt from test.t_recharge_detail1 where city = 'Los Angeles' order by id asc") + if (rs.next()) { + Assertions.assertEquals(2, rs.getInt("id")) + Assertions.assertEquals(2, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241106", rs.getString("dt")) + } + + if (rs.next()) { + Assertions.assertEquals(3, rs.getInt("id")) + Assertions.assertEquals(3, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241107", rs.getString("dt")) + } + } finally { + try { + dropTable(statement, "test.t_recharge_detail1") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwriteWithExistsTemporaryPartitionViaSql(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`t_recharge_detail1`") + val createTableDDL = + """ + |CREATE TABLE `test`.`t_recharge_detail1` ( + | id bigint, + | user_id bigint, + | city varchar(20) not null, + | dt varchar(20) not null + |) + |DUPLICATE KEY(id) + |PARTITION BY LIST (city) ( + | PARTITION pLos_Angeles VALUES IN ("Los Angeles"), + | PARTITION pSan_Francisco VALUES IN ("San Francisco") + |) + |DISTRIBUTED BY HASH(`id`); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`t_recharge_detail1` values (1, 1, 'Los Angeles', '20241107')," + + " (2, 2, 'San Francisco', '20241101')") + statement.execute("ALTER TABLE `test`.`t_recharge_detail1` ADD TEMPORARY PARTITION pLos_Angeles" + WriteStarRocksConfig.TEMPORARY_PARTITION_SUFFIX + + System.currentTimeMillis() + " VALUES IN (\"Los Angeles\")") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + // + val data = Seq((3, 3, "Los Angeles", "20241107"), (2, 2, "Los Angeles", "20241106")) + val df = data.toDF("id", "user_id", "city", "dt") + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + val view = s""" + |CREATE OR REPLACE TEMPORARY VIEW `sr_connector_performance_test` + |USING starrocks + |OPTIONS( + |"starrocks.fe.http.url" = "${STARROCKS_FE_HTTP_URL}", + |"starrocks.fe.jdbc.url" = "${STARROCKS_FE_JDBC_URL}", + |"starrocks.table.identifier" = "test.t_recharge_detail1", + |"starrocks.user" = "${STARROCKS_FE_JDBC_USER}", + |"starrocks.password" = "${STARROCKS_FE_JDBC_PASSWORD}", + |"starrocks.write.overwrite.partitions.pLos_Angeles" = "(\\"Los Angeles\\")" + |); + |""".stripMargin + spark.sql(view) + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.registerTempTable("test123") + spark.sql("insert overwrite sr_connector_performance_test select * from test123") + + + rs = statement.executeQuery( + "select id, user_id, city, dt from test.t_recharge_detail1 where city = 'Los Angeles' order by id asc") + if (rs.next()) { + Assertions.assertEquals(2, rs.getInt("id")) + Assertions.assertEquals(2, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241106", rs.getString("dt")) + } + + if (rs.next()) { + Assertions.assertEquals(3, rs.getInt("id")) + Assertions.assertEquals(3, rs.getInt("user_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city")) + Assertions.assertEquals("20241107", rs.getString("dt")) + } + } finally { + try { + dropTable(statement, "test.t_recharge_detail1") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + private def parse(dt: Date) = { + val format = new SimpleDateFormat("yyyy-MM-dd") + val times = format.format(dt).split("-") + (times(0), times(1), times(2)) + } + + @Test + def testPartitionOverwriteWithDynamicPartition(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`site_access`") + val now = new Date() + val yes = new Date(now.getTime - 24 * 3600 * 1000) + val future = new Date(now.getTime + 24 * 3600 * 1000) + val yesm1 = new Date(now.getTime - 2 * 24 * 3600 * 1000) + val yesm2 = new Date(now.getTime - 3 * 24 * 3600 * 1000) + val (fy, fm, fd) = parse(future) + val (ny, nm, nd) = parse(now) + val (yesy, yesm, yesd) = parse(yes) + val (yesm1y, yesm1m, yesm1d) = parse(yesm1) + val (yesm2y, yesm2m, yesm2d) = parse(yesm2) + val createTableDDL = + s""" + |CREATE TABLE `test`.`site_access`( + | event_day DATE, + | site_id INT DEFAULT '10', + | city_code VARCHAR(100), + | user_name VARCHAR(32) DEFAULT '', + | pv BIGINT DEFAULT '0' + |) + |DUPLICATE KEY(event_day, site_id, city_code, user_name) + |PARTITION BY RANGE(event_day)( + | PARTITION p${yesm2y}${yesm2m}${yesm2d} VALUES LESS THAN ("${yesm1y}-${yesm1m}-${yesm1d}"), + | PARTITION p${yesm1y}${yesm1m}${yesm1d} VALUES LESS THAN ("${yesy}-${yesm}-${yesd}"), + | PARTITION p${yesy}${yesm}${yesd} VALUES LESS THAN ("${ny}-${nm}-${nd}"), + | PARTITION p${ny}${nm}${nd} VALUES LESS THAN ("${fy}-${fm}-${fd}") + |) + |DISTRIBUTED BY HASH(event_day, site_id) + |PROPERTIES( + | "dynamic_partition.enable" = "true", + | "dynamic_partition.time_unit" = "DAY", + | "dynamic_partition.start" = "-3", + | "dynamic_partition.end" = "3", + | "dynamic_partition.prefix" = "p", + | "dynamic_partition.buckets" = "32", + | "dynamic_partition.history_partition_num" = "0" + |); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute(s"insert into `test`.`site_access`(event_day, site_id, city_code, user_name, pv) values ('${yesy}-${yesm}-${yesd}', 1, 'Los Angeles', 'jack', 10)") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + val data = Seq((s"${yesy}-${yesm}-${yesd} 12:12:23", 10, "Los Angeles", "jack", 30), (s"${yesy}-${yesm}-${yesd} 08:12:23", 20, "Los Angeles", "jack", 20)) + var df = data.toDF("event_day", "site_id", "city_code", "user_name", "pv") + df.createOrReplaceTempView("test_view") + df = spark.sql("select cast(event_day as date) as event_day, site_id, city_code, user_name, pv from test_view") + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.write.format("starrocks") + .option("starrocks.fe.http.url", STARROCKS_FE_HTTP_URL) + .option("starrocks.fe.jdbc.url", STARROCKS_FE_JDBC_URL) + .option("starrocks.user", STARROCKS_FE_JDBC_USER) + .option("starrocks.password", STARROCKS_FE_JDBC_PASSWORD) + .option("starrocks.table.identifier", "test.site_access") + .option(s"starrocks.write.overwrite.partitions.p${yesy}${yesm}${yesd}", s"[('${yesy}-${yesm}-${yesd}'),('${ny}-${nm}-${nd}'))") + .mode("overwrite") + .save() + + rs = statement.executeQuery( + "select event_day, site_id, city_code, user_name, pv from test.site_access order by site_id asc") + if (rs.next()) { + Assertions.assertEquals(s"${yesy}-${yesm}-${yesd}", rs.getDate("event_day").toString) + Assertions.assertEquals(10, rs.getInt("site_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city_code")) + Assertions.assertEquals("jack", rs.getString("user_name")) + Assertions.assertEquals(30, rs.getInt("pv")) + } + + if (rs.next()) { + Assertions.assertEquals(s"${yesy}-${yesm}-${yesd}", rs.getDate("event_day").toString) + Assertions.assertEquals(20, rs.getInt("site_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city_code")) + Assertions.assertEquals("jack", rs.getString("user_name")) + Assertions.assertEquals(20, rs.getInt("pv")) + } + } finally { + try { + dropTable(statement, "`test`.`site_access`") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwriteWithDynamicPartitionViaSql(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`site_access`") + val now = new Date() + val yes = new Date(now.getTime - 24 * 3600 * 1000) + val future = new Date(now.getTime + 24 * 3600 * 1000) + val yesm1 = new Date(now.getTime - 2 * 24 * 3600 * 1000) + val yesm2 = new Date(now.getTime - 3 * 24 * 3600 * 1000) + val (fy, fm, fd) = parse(future) + val (ny, nm, nd) = parse(now) + val (yesy, yesm, yesd) = parse(yes) + val (yesm1y, yesm1m, yesm1d) = parse(yesm1) + val (yesm2y, yesm2m, yesm2d) = parse(yesm2) + val createTableDDL = + s""" + |CREATE TABLE `test`.`site_access`( + | event_day DATE, + | site_id INT DEFAULT '10', + | city_code VARCHAR(100), + | user_name VARCHAR(32) DEFAULT '', + | pv BIGINT DEFAULT '0' + |) + |DUPLICATE KEY(event_day, site_id, city_code, user_name) + |PARTITION BY RANGE(event_day)( + | PARTITION p${yesm2y}${yesm2m}${yesm2d} VALUES LESS THAN ("${yesm1y}-${yesm1m}-${yesm1d}"), + | PARTITION p${yesm1y}${yesm1m}${yesm1d} VALUES LESS THAN ("${yesy}-${yesm}-${yesd}"), + | PARTITION p${yesy}${yesm}${yesd} VALUES LESS THAN ("${ny}-${nm}-${nd}"), + | PARTITION p${ny}${nm}${nd} VALUES LESS THAN ("${fy}-${fm}-${fd}") + |) + |DISTRIBUTED BY HASH(event_day, site_id) + |PROPERTIES( + | "dynamic_partition.enable" = "true", + | "dynamic_partition.time_unit" = "DAY", + | "dynamic_partition.start" = "-3", + | "dynamic_partition.end" = "3", + | "dynamic_partition.prefix" = "p", + | "dynamic_partition.buckets" = "32", + | "dynamic_partition.history_partition_num" = "0" + |); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute(s"insert into `test`.`site_access`(event_day, site_id, city_code, user_name, pv) values ('${yesy}-${yesm}-${yesd}', 1, 'Los Angeles', 'jack', 10)") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + val data = Seq((s"${yesy}-${yesm}-${yesd} 12:12:23", 10, "Los Angeles", "jack", 30), (s"${yesy}-${yesm}-${yesd} 08:12:23", 20, "Los Angeles", "jack", 20)) + var df = data.toDF("event_day", "site_id", "city_code", "user_name", "pv") + df.createOrReplaceTempView("test_view") + df = spark.sql("select cast(event_day as date) as event_day, site_id, city_code, user_name, pv from test_view") + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + val view = s""" + |CREATE OR REPLACE TEMPORARY VIEW `sr_connector_performance_test` + |USING starrocks + |OPTIONS( + |"starrocks.fe.http.url" = "${STARROCKS_FE_HTTP_URL}", + |"starrocks.fe.jdbc.url" = "${STARROCKS_FE_JDBC_URL}", + |"starrocks.table.identifier" = "test.site_access", + |"starrocks.user" = "${STARROCKS_FE_JDBC_USER}", + |"starrocks.password" = "${STARROCKS_FE_JDBC_PASSWORD}", + |"starrocks.write.overwrite.partitions.p${yesy}${yesm}${yesd}" = "[('${yesy}-${yesm}-${yesd}'),('${ny}-${nm}-${nd}'))" + |); + |""".stripMargin + spark.sql(view) + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + df.registerTempTable("test123") + spark.sql("insert overwrite sr_connector_performance_test select * from test123") + + rs = statement.executeQuery( + "select event_day, site_id, city_code, user_name, pv from test.site_access order by site_id asc") + if (rs.next()) { + Assertions.assertEquals(s"${yesy}-${yesm}-${yesd}", rs.getDate("event_day").toString) + Assertions.assertEquals(10, rs.getInt("site_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city_code")) + Assertions.assertEquals("jack", rs.getString("user_name")) + Assertions.assertEquals(30, rs.getInt("pv")) + } + + if (rs.next()) { + Assertions.assertEquals(s"${yesy}-${yesm}-${yesd}", rs.getDate("event_day").toString) + Assertions.assertEquals(20, rs.getInt("site_id")) + Assertions.assertEquals("Los Angeles", rs.getString("city_code")) + Assertions.assertEquals("jack", rs.getString("user_name")) + Assertions.assertEquals(20, rs.getInt("pv")) + } + } finally { + try { + dropTable(statement, "`test`.`site_access`") + } finally { + releaseConn(conn, statement, rs) + } + } + } + + @Test + def testPartitionOverwriteWithExpressionPartitioning(): Unit = { + val conn = StarRocksConnector.createJdbcConnection(STARROCKS_FE_JDBC_URL, + STARROCKS_FE_JDBC_USER, STARROCKS_FE_JDBC_PASSWORD) + val statement = conn.createStatement() + var rs: ResultSet = null + try { + statement.execute("CREATE DATABASE IF NOT EXISTS `test`") + statement.execute("DROP TABLE IF EXISTS `test`.`site_access1`") + val createTableDDL = + """ + |CREATE TABLE `test`.`site_access1` ( + | event_day DATETIME NOT NULL, + | site_id INT DEFAULT '10', + | city_code VARCHAR(100), + | user_name VARCHAR(32) DEFAULT '', + | pv BIGINT DEFAULT '0' + |) + |DUPLICATE KEY(event_day, site_id, city_code, user_name) + |PARTITION BY date_trunc('day', event_day) + |DISTRIBUTED BY HASH(event_day, site_id); + |""".stripMargin + statement.execute(createTableDDL) + statement.execute("insert into `test`.`site_access1`(event_day, site_id, city_code, user_name, pv)" + + " values ('2023-02-26 20:12:04',2,'New York','Sam Smith',1)," + + " ('2023-02-27 21:06:54',1,'Los Angeles','Taylor Swift',1)") + val spark = SparkSession.builder().master("local[2]").getOrCreate() + import spark.implicits._ + // 1. Create a DataFrame from a sequence. + val data = Seq(("2023-02-26 12:12:23", 10, "Los Angeles", "jack", 30), ("2023-02-26 08:12:23", 20, "Los Angeles", "jack", 20)) + var df = data.toDF("event_day", "site_id", "city_code", "user_name", "pv") + df.createOrReplaceTempView("test_view1") + df = spark.sql("select cast(event_day as timestamp) as event_day, site_id, city_code, user_name, pv from test_view1") + // 2. Write to StarRocks by configuring the format as "starrocks" and the following options. + // You need to modify the options according your own environment. + try { + df.write.format("starrocks") + .option("starrocks.fe.http.url", STARROCKS_FE_HTTP_URL) + .option("starrocks.fe.jdbc.url", STARROCKS_FE_JDBC_URL) + .option("starrocks.user", STARROCKS_FE_JDBC_USER) + .option("starrocks.password", STARROCKS_FE_JDBC_PASSWORD) + .option("starrocks.table.identifier", "test.site_access1") + .option("starrocks.write.overwrite.partitions.p20230226", "[(\"2022-02-26 00:00:00\"),(\"2024-02-27 00:00:00\"))") + .mode("overwrite") + .save() + } catch { + case e: Throwable => Assertions.assertTrue(e.isInstanceOf[NotSupportedOperationException] + && e.getMessage.equals( + "Overwriting partition only supports list/range partitioning, not support expression/automatic partitioning !!!")) + } + } finally { + try { + dropTable(statement, "`test`.`site_access1`") + } finally { + releaseConn(conn, statement, rs) + } + } + } +}