diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 575b002b2f6086..99568d47298aaa 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -34,15 +34,16 @@
#include
#include
#include
+#include
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
+#include "io/hdfs_builder.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/pending_rowset_helper.h"
-#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/schema.h"
@@ -53,10 +54,11 @@
#include "olap/txn_manager.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/core/block.h"
+#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_factory.hpp"
+#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/simple_function_factory.h"
@@ -352,8 +354,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
_file_params.expr_of_dest_slot = _params.expr_of_dest_slot;
_file_params.dest_sid_to_src_sid_without_trans = _params.dest_sid_to_src_sid_without_trans;
_file_params.strict_mode = _params.strict_mode;
- _file_params.__isset.broker_addresses = true;
- _file_params.broker_addresses = t_scan_range.broker_addresses;
+ if (_ranges[0].file_type == TFileType::FILE_HDFS) {
+ _file_params.hdfs_params = parse_properties(_params.properties);
+ } else {
+ _file_params.__isset.broker_addresses = true;
+ _file_params.broker_addresses = t_scan_range.broker_addresses;
+ }
for (const auto& range : _ranges) {
TFileRangeDesc file_range;
@@ -482,17 +488,36 @@ Status PushBrokerReader::_cast_to_input_block() {
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
- vectorized::ColumnsWithTypeAndName arguments {
- arg,
- {vectorized::DataTypeString().create_column_const(
- arg.column->size(), remove_nullable(return_type)->get_family_name()),
- std::make_shared(), ""}};
- auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
- "CAST", arguments, return_type);
idx = _src_block_name_to_idx[slot_desc->col_name()];
- RETURN_IF_ERROR(
- func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
- _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ // bitmap convert:src -> to_base64 -> bitmap_from_base64
+ if (slot_desc->type().is_bitmap_type()) {
+ auto base64_return_type = vectorized::DataTypeFactory::instance().create_data_type(
+ vectorized::DataTypeString().get_type_as_type_descriptor(),
+ slot_desc->is_nullable());
+ auto func_to_base64 = vectorized::SimpleFunctionFactory::instance().get_function(
+ "to_base64", {arg}, base64_return_type);
+ RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
+ arg.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(base64_return_type);
+ auto& arg_base64 = _src_block_ptr->get_by_name(slot_desc->col_name());
+ auto func_bitmap_from_base64 =
+ vectorized::SimpleFunctionFactory::instance().get_function(
+ "bitmap_from_base64", {arg_base64}, return_type);
+ RETURN_IF_ERROR(func_bitmap_from_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
+ arg_base64.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ } else {
+ vectorized::ColumnsWithTypeAndName arguments {
+ arg,
+ {vectorized::DataTypeString().create_column_const(
+ arg.column->size(), remove_nullable(return_type)->get_family_name()),
+ std::make_shared(), ""}};
+ auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
+ "CAST", arguments, return_type);
+ RETURN_IF_ERROR(
+ func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
+ _src_block_ptr->get_by_position(idx).type = std::move(return_type);
+ }
}
return Status::OK();
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 01d981efdd97b4..a270501560d743 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -633,6 +633,10 @@ public class Config extends ConfigBase {
@ConfField(description = {"Yarn 配置文件的路径", "Yarn config path"})
public static String yarn_config_dir = System.getenv("DORIS_HOME") + "/lib/yarn-config";
+ @ConfField(mutable = true, masterOnly = true, description = {"Ingestion load 的默认超时时间,单位是秒。",
+ "Default timeout for ingestion load job, in seconds."})
+ public static int ingestion_load_default_timeout_second = 86400; // 1 day
+
@ConfField(mutable = true, masterOnly = true, description = {"Sync job 的最大提交间隔,单位是秒。",
"Maximal intervals between two sync job's commits."})
public static long sync_commit_interval_second = 10;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java b/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
index c59901d383b648..8d9d5de54b59f1 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/sparkdpp/EtlJobConfig.java
@@ -371,14 +371,17 @@ public static class EtlIndex implements Serializable {
public String indexType;
@SerializedName(value = "isBaseIndex")
public boolean isBaseIndex;
+ @SerializedName(value = "schemaVersion")
+ public int schemaVersion;
public EtlIndex(long indexId, List etlColumns, int schemaHash,
- String indexType, boolean isBaseIndex) {
+ String indexType, boolean isBaseIndex, int schemaVersion) {
this.indexId = indexId;
this.columns = etlColumns;
this.schemaHash = schemaHash;
this.indexType = indexType;
this.isBaseIndex = isBaseIndex;
+ this.schemaVersion = schemaVersion;
}
public EtlColumn getColumn(String name) {
@@ -398,6 +401,7 @@ public String toString() {
+ ", schemaHash=" + schemaHash
+ ", indexType='" + indexType + '\''
+ ", isBaseIndex=" + isBaseIndex
+ + ", schemaVersion=" + schemaVersion
+ '}';
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index 704d8e512d7f3c..2af2a9b4a90df2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -70,6 +70,7 @@
*
* DROP RESOURCE "spark0";
*/
+@Deprecated
public class SparkResource extends Resource {
private static final Logger LOG = LogManager.getLogger(SparkResource.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 2f9efc1ed1b1bf..b62ac7832e7a88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -27,13 +27,21 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
+import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
+import org.apache.doris.httpv2.rest.manager.HttpUtils;
+import org.apache.doris.load.FailMsg;
import org.apache.doris.load.StreamLoadHandler;
+import org.apache.doris.load.loadv2.IngestionLoadJob;
+import org.apache.doris.load.loadv2.LoadJob;
+import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.GroupCommitPlanner;
@@ -45,9 +53,14 @@
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.transaction.BeginTransactionException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpHeaderNames;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -59,10 +72,14 @@
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
@@ -694,4 +711,198 @@ private Backend selectBackendForGroupCommit(String clusterName, HttpServletReque
}
return backend;
}
+
+ /**
+ * Request body example:
+ * {
+ * "label": "test",
+ * "tableToPartition": {
+ * "tbl_test_spark_load": ["p1","p2"]
+ * },
+ * "properties": {
+ * "strict_mode": "true",
+ * "timeout": 3600000
+ * }
+ * }
+ *
+ */
+ @RequestMapping(path = "/api/ingestion_load/{" + CATALOG_KEY + "}/{" + DB_KEY
+ + "}/_create", method = RequestMethod.POST)
+ public Object createIngestionLoad(HttpServletRequest request, HttpServletResponse response,
+ @PathVariable(value = CATALOG_KEY) String catalog,
+ @PathVariable(value = DB_KEY) String db) {
+ if (needRedirect(request.getScheme())) {
+ return redirectToHttps(request);
+ }
+
+ executeCheckPassword(request, response);
+
+ if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalog)) {
+ return ResponseEntityBuilder.okWithCommonError("Only support internal catalog. "
+ + "Current catalog is " + catalog);
+ }
+
+ Object redirectView = redirectToMaster(request, response);
+ if (redirectView != null) {
+ return redirectView;
+ }
+
+ String fullDbName = getFullDbName(db);
+
+ Map resultMap = new HashMap<>();
+
+ try {
+
+ String body = HttpUtils.getBody(request);
+ JsonMapper mapper = JsonMapper.builder().build();
+ JsonNode jsonNode = mapper.reader().readTree(body);
+
+ String label = jsonNode.get("label").asText();
+ Map> tableToPartition = mapper.reader()
+ .readValue(jsonNode.get("tableToPartition").traverse(),
+ new TypeReference
+ * Load data file which has been pre-processed
+ *
+ * There are 4 steps in IngestionLoadJob:
+ * Step1: Outside system execute ingestion etl job.
+ * Step2: LoadEtlChecker will check ingestion etl job status periodically
+ * and send push tasks to be when ingestion etl job is finished.
+ * Step3: LoadLoadingChecker will check loading status periodically and commit transaction when push tasks are finished.
+ * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction.
+ */
+public class IngestionLoadJob extends LoadJob {
+
+ public static final Logger LOG = LogManager.getLogger(IngestionLoadJob.class);
+
+ @Setter
+ @SerializedName("ests")
+ private EtlStatus etlStatus;
+
+ // members below updated when job state changed to loading
+ // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) }
+ @SerializedName(value = "tm2fi")
+ private final Map> tabletMetaToFileInfo = Maps.newHashMap();
+
+ @SerializedName(value = "hp")
+ private final Map hadoopProperties = new HashMap<>();
+
+ @SerializedName(value = "i2sv")
+ private final Map indexToSchemaVersion = new HashMap<>();
+
+ private final Map indexToSchemaHash = Maps.newHashMap();
+
+ private final Map filePathToSize = new HashMap<>();
+
+ private final Set finishedReplicas = Sets.newHashSet();
+ private final Set quorumTablets = Sets.newHashSet();
+ private final Set fullTablets = Sets.newHashSet();
+
+ private final List commitInfos = Lists.newArrayList();
+
+ private final Map> tableToLoadPartitions = Maps.newHashMap();
+
+ private final Map> tabletToSentReplicaPushTask = Maps.newHashMap();
+
+ private long etlStartTimestamp = -1;
+
+ private long quorumFinishTimestamp = -1;
+
+ private List loadTableIds = new ArrayList<>();
+
+ public IngestionLoadJob() {
+ super(EtlJobType.INGESTION);
+ }
+
+ public IngestionLoadJob(long dbId, String label, List tableNames, UserIdentity userInfo)
+ throws LoadException {
+ super(EtlJobType.INGESTION, dbId, label);
+ this.loadTableIds = getLoadTableIds(tableNames);
+ this.userInfo = userInfo;
+ }
+
+ @Override
+ public Set getTableNamesForShow() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set getTableNames() throws MetaNotFoundException {
+ Set result = Sets.newHashSet();
+ Database database = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
+ for (long tableId : loadTableIds) {
+ Table table = database.getTableOrMetaException(tableId);
+ result.add(table.getName());
+ }
+ return result;
+ }
+
+ @Override
+ public void afterVisible(TransactionState txnState, boolean txnOperated) {
+ super.afterVisible(txnState, txnOperated);
+ clearJob();
+ }
+
+ @Override
+ public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
+ throws UserException {
+ super.afterAborted(txnState, txnOperated, txnStatusChangeReason);
+ clearJob();
+ }
+
+ @Override
+ public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
+ super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
+ clearJob();
+ }
+
+ @Override
+ public void cancelJob(FailMsg failMsg) throws DdlException {
+ super.cancelJob(failMsg);
+ clearJob();
+ }
+
+ private List getLoadTableIds(List tableNames) throws LoadException {
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
+ List list = new ArrayList<>(tableNames.size());
+ for (String tableName : tableNames) {
+ OlapTable olapTable = (OlapTable) db.getTableOrException(tableName,
+ s -> new LoadException("table does not exist. id: " + s));
+ list.add(olapTable.getId());
+ }
+ return list;
+ }
+
+ @Override
+ protected long getEtlStartTimestamp() {
+ return etlStartTimestamp;
+ }
+
+ public long beginTransaction()
+ throws BeginTransactionException, MetaNotFoundException, AnalysisException, QuotaExceedException,
+ LabelAlreadyUsedException, DuplicatedRequestException {
+ this.transactionId = Env.getCurrentGlobalTransactionMgr()
+ .beginTransaction(dbId, loadTableIds, label, null,
+ new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
+ FrontendOptions.getLocalHostAddress(), ExecuteEnv.getInstance().getStartupTime()),
+ TransactionState.LoadJobSourceType.FRONTEND, id, getTimeout());
+ return transactionId;
+ }
+
+ public Map getLoadMeta(Map> tableToPartitionMap)
+ throws LoadException {
+
+ if (tableToPartitionMap == null || tableToPartitionMap.isEmpty()) {
+ throw new IllegalArgumentException("tableToPartitionMap is empty");
+ }
+
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(dbId, s -> new LoadException("db does not exist. id: " + s));
+ Map loadMeta = new HashMap<>();
+ loadMeta.put("dbId", db.getId());
+ Long signature = Env.getCurrentEnv().getNextId();
+ loadMeta.put("signature", signature);
+
+ List tables;
+ try {
+ tables = db.getTablesOnIdOrderOrThrowException(loadTableIds);
+ } catch (MetaNotFoundException e) {
+ throw new LoadException(e.getMessage());
+ }
+
+ MetaLockUtils.readLockTables(tables);
+ try {
+ Map> tableMeta = new HashMap<>(tableToPartitionMap.size());
+ for (Map.Entry> entry : tableToPartitionMap.entrySet()) {
+ String tableName = entry.getKey();
+ Map meta = tableMeta.getOrDefault(tableName, new HashMap<>());
+ OlapTable olapTable = (OlapTable) db.getTableOrException(tableName,
+ s -> new LoadException("table does not exist. id: " + s));
+ meta.put("id", olapTable.getId());
+ List indices = createEtlIndexes(olapTable);
+ meta.put("indexes", indices);
+ List partitionNames = entry.getValue();
+ Set partitionIds;
+ if (partitionNames != null && !partitionNames.isEmpty()) {
+ partitionIds = new HashSet<>(partitionNames.size());
+ for (String partitionName : partitionNames) {
+ Partition partition = olapTable.getPartition(partitionName);
+ if (partition == null) {
+ throw new LoadException(String.format("partition %s is not exists", partitionName));
+ }
+ partitionIds.add(partition.getId());
+ }
+ } else {
+ partitionIds =
+ olapTable.getAllPartitions().stream().map(Partition::getId).collect(Collectors.toSet());
+ }
+ EtlJobConfig.EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(olapTable, partitionIds);
+ meta.put("partitionInfo", etlPartitionInfo);
+ tableMeta.put(tableName, meta);
+
+ if (tableToLoadPartitions.containsKey(olapTable.getId())) {
+ tableToLoadPartitions.get(olapTable.getId()).addAll(partitionIds);
+ } else {
+ tableToLoadPartitions.put(olapTable.getId(), partitionIds);
+ }
+
+ }
+ loadMeta.put("tableMeta", tableMeta);
+ } finally {
+ MetaLockUtils.readUnlockTables(tables);
+ }
+ return loadMeta;
+
+ }
+
+ private List createEtlIndexes(OlapTable table) throws LoadException {
+ List etlIndexes = Lists.newArrayList();
+
+ for (Map.Entry> entry : table.getIndexIdToSchema().entrySet()) {
+ long indexId = entry.getKey();
+ // todo(liheng): get schema hash and version from materialized index meta directly
+ MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(indexId);
+ int schemaHash = indexMeta.getSchemaHash();
+ int schemaVersion = indexMeta.getSchemaVersion();
+
+ boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
+ && table.getTableProperty().getEnableUniqueKeyMergeOnWrite();
+
+ // columns
+ List etlColumns = Lists.newArrayList();
+ for (Column column : entry.getValue()) {
+ etlColumns.add(createEtlColumn(column, changeAggType));
+ }
+
+ // check distribution type
+ DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
+ if (distributionInfo.getType() != DistributionInfo.DistributionInfoType.HASH) {
+ // RANDOM not supported
+ String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name();
+ LOG.warn(errMsg);
+ throw new LoadException(errMsg);
+ }
+
+ // index type
+ String indexType;
+ KeysType keysType = table.getKeysTypeByIndexId(indexId);
+ switch (keysType) {
+ case DUP_KEYS:
+ indexType = "DUPLICATE";
+ break;
+ case AGG_KEYS:
+ indexType = "AGGREGATE";
+ break;
+ case UNIQUE_KEYS:
+ indexType = "UNIQUE";
+ break;
+ default:
+ String errMsg = "unknown keys type. type: " + keysType.name();
+ LOG.warn(errMsg);
+ throw new LoadException(errMsg);
+ }
+
+ indexToSchemaVersion.put(indexId, schemaVersion);
+
+ etlIndexes.add(new EtlJobConfig.EtlIndex(indexId, etlColumns, schemaHash, indexType,
+ indexId == table.getBaseIndexId(), schemaVersion));
+ }
+
+ return etlIndexes;
+ }
+
+ private EtlJobConfig.EtlColumn createEtlColumn(Column column, boolean changeAggType) {
+ // column name
+ String name = column.getName().toLowerCase(Locale.ROOT);
+ // column type
+ PrimitiveType type = column.getDataType();
+ String columnType = column.getDataType().toString();
+ // is allow null
+ boolean isAllowNull = column.isAllowNull();
+ // is key
+ boolean isKey = column.isKey();
+
+ // aggregation type
+ String aggregationType = null;
+ if (column.getAggregationType() != null) {
+ if (changeAggType && !column.isKey()) {
+ aggregationType = AggregateType.REPLACE.toSql();
+ } else {
+ aggregationType = column.getAggregationType().toString();
+ }
+ }
+
+ // default value
+ String defaultValue = null;
+ if (column.getDefaultValue() != null) {
+ defaultValue = column.getDefaultValue();
+ }
+ if (column.isAllowNull() && column.getDefaultValue() == null) {
+ defaultValue = "\\N";
+ }
+
+ // string length
+ int stringLength = 0;
+ if (type.isStringType()) {
+ stringLength = column.getStrLen();
+ }
+
+ // decimal precision scale
+ int precision = 0;
+ int scale = 0;
+ if (type.isDecimalV2Type() || type.isDecimalV3Type()) {
+ precision = column.getPrecision();
+ scale = column.getScale();
+ }
+
+ return new EtlJobConfig.EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue,
+ stringLength, precision, scale);
+ }
+
+ private EtlJobConfig.EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set partitionIds)
+ throws LoadException {
+ PartitionType type = table.getPartitionInfo().getType();
+
+ List partitionColumnRefs = Lists.newArrayList();
+ List etlPartitions = Lists.newArrayList();
+ if (type == PartitionType.RANGE) {
+ RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo();
+ for (Column column : rangePartitionInfo.getPartitionColumns()) {
+ partitionColumnRefs.add(column.getName());
+ }
+
+ for (Map.Entry entry : rangePartitionInfo.getAllPartitionItemEntryList(true)) {
+ long partitionId = entry.getKey();
+ if (!partitionIds.contains(partitionId)) {
+ continue;
+ }
+
+ Partition partition = table.getPartition(partitionId);
+ if (partition == null) {
+ throw new LoadException("partition does not exist. id: " + partitionId);
+ }
+
+ // bucket num
+ int bucketNum = partition.getDistributionInfo().getBucketNum();
+
+ // is max partition
+ Range range = entry.getValue().getItems();
+ boolean isMaxPartition = range.upperEndpoint().isMaxValue();
+
+ // start keys
+ List rangeKeyExprs = range.lowerEndpoint().getKeys();
+ List