Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

branch-3.0: [feature](load) new insgestion load #45937 #46079

Open
wants to merge 1 commit into
base: branch-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@
#include <new>
#include <queue>
#include <shared_mutex>
#include <type_traits>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/hdfs_builder.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/pending_rowset_helper.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/schema.h"
Expand All @@ -53,10 +54,11 @@
#include "olap/txn_manager.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/simple_function_factory.h"
Expand Down Expand Up @@ -352,8 +354,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
_file_params.expr_of_dest_slot = _params.expr_of_dest_slot;
_file_params.dest_sid_to_src_sid_without_trans = _params.dest_sid_to_src_sid_without_trans;
_file_params.strict_mode = _params.strict_mode;
_file_params.__isset.broker_addresses = true;
_file_params.broker_addresses = t_scan_range.broker_addresses;
if (_ranges[0].file_type == TFileType::FILE_HDFS) {
_file_params.hdfs_params = parse_properties(_params.properties);
} else {
_file_params.__isset.broker_addresses = true;
_file_params.broker_addresses = t_scan_range.broker_addresses;
}

for (const auto& range : _ranges) {
TFileRangeDesc file_range;
Expand Down Expand Up @@ -482,17 +488,36 @@ Status PushBrokerReader::_cast_to_input_block() {
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
vectorized::ColumnsWithTypeAndName arguments {
arg,
{vectorized::DataTypeString().create_column_const(
arg.column->size(), remove_nullable(return_type)->get_family_name()),
std::make_shared<vectorized::DataTypeString>(), ""}};
auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
"CAST", arguments, return_type);
idx = _src_block_name_to_idx[slot_desc->col_name()];
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
// bitmap convert:src -> to_base64 -> bitmap_from_base64
if (slot_desc->type().is_bitmap_type()) {
auto base64_return_type = vectorized::DataTypeFactory::instance().create_data_type(
vectorized::DataTypeString().get_type_as_type_descriptor(),
slot_desc->is_nullable());
auto func_to_base64 = vectorized::SimpleFunctionFactory::instance().get_function(
"to_base64", {arg}, base64_return_type);
RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(base64_return_type);
auto& arg_base64 = _src_block_ptr->get_by_name(slot_desc->col_name());
auto func_bitmap_from_base64 =
vectorized::SimpleFunctionFactory::instance().get_function(
"bitmap_from_base64", {arg_base64}, return_type);
RETURN_IF_ERROR(func_bitmap_from_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg_base64.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
} else {
vectorized::ColumnsWithTypeAndName arguments {
arg,
{vectorized::DataTypeString().create_column_const(
arg.column->size(), remove_nullable(return_type)->get_family_name()),
std::make_shared<vectorized::DataTypeString>(), ""}};
auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
"CAST", arguments, return_type);
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
}
return Status::OK();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,10 @@ public class Config extends ConfigBase {
@ConfField(description = {"Yarn 配置文件的路径", "Yarn config path"})
public static String yarn_config_dir = System.getenv("DORIS_HOME") + "/lib/yarn-config";

@ConfField(mutable = true, masterOnly = true, description = {"Ingestion load 的默认超时时间,单位是秒。",
"Default timeout for ingestion load job, in seconds."})
public static int ingestion_load_default_timeout_second = 86400; // 1 day

@ConfField(mutable = true, masterOnly = true, description = {"Sync job 的最大提交间隔,单位是秒。",
"Maximal intervals between two sync job's commits."})
public static long sync_commit_interval_second = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,17 @@ public static class EtlIndex implements Serializable {
public String indexType;
@SerializedName(value = "isBaseIndex")
public boolean isBaseIndex;
@SerializedName(value = "schemaVersion")
public int schemaVersion;

public EtlIndex(long indexId, List<EtlColumn> etlColumns, int schemaHash,
String indexType, boolean isBaseIndex) {
String indexType, boolean isBaseIndex, int schemaVersion) {
this.indexId = indexId;
this.columns = etlColumns;
this.schemaHash = schemaHash;
this.indexType = indexType;
this.isBaseIndex = isBaseIndex;
this.schemaVersion = schemaVersion;
}

public EtlColumn getColumn(String name) {
Expand All @@ -398,6 +401,7 @@ public String toString() {
+ ", schemaHash=" + schemaHash
+ ", indexType='" + indexType + '\''
+ ", isBaseIndex=" + isBaseIndex
+ ", schemaVersion=" + schemaVersion
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
*
* DROP RESOURCE "spark0";
*/
@Deprecated
public class SparkResource extends Resource {
private static final Logger LOG = LogManager.getLogger(SparkResource.class);

Expand Down
211 changes: 211 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,21 @@
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.StreamLoadHandler;
import org.apache.doris.load.loadv2.IngestionLoadJob;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.GroupCommitPlanner;
Expand All @@ -45,9 +53,14 @@
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.transaction.BeginTransactionException;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.base.Strings;
import io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -59,10 +72,14 @@
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -694,4 +711,198 @@ private Backend selectBackendForGroupCommit(String clusterName, HttpServletReque
}
return backend;
}

/**
* Request body example:
* {
* "label": "test",
* "tableToPartition": {
* "tbl_test_spark_load": ["p1","p2"]
* },
* "properties": {
* "strict_mode": "true",
* "timeout": 3600000
* }
* }
*
*/
@RequestMapping(path = "/api/ingestion_load/{" + CATALOG_KEY + "}/{" + DB_KEY
+ "}/_create", method = RequestMethod.POST)
public Object createIngestionLoad(HttpServletRequest request, HttpServletResponse response,
@PathVariable(value = CATALOG_KEY) String catalog,
@PathVariable(value = DB_KEY) String db) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}

executeCheckPassword(request, response);

if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalog)) {
return ResponseEntityBuilder.okWithCommonError("Only support internal catalog. "
+ "Current catalog is " + catalog);
}

Object redirectView = redirectToMaster(request, response);
if (redirectView != null) {
return redirectView;
}

String fullDbName = getFullDbName(db);

Map<String, Object> resultMap = new HashMap<>();

try {

String body = HttpUtils.getBody(request);
JsonMapper mapper = JsonMapper.builder().build();
JsonNode jsonNode = mapper.reader().readTree(body);

String label = jsonNode.get("label").asText();
Map<String, List<String>> tableToPartition = mapper.reader()
.readValue(jsonNode.get("tableToPartition").traverse(),
new TypeReference<Map<String, List<String>>>() {
});
List<String> tableNames = new LinkedList<>(tableToPartition.keySet());
for (String tableName : tableNames) {
checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tableName, PrivPredicate.LOAD);
}

Map<String, String> properties = new HashMap<>();
if (jsonNode.hasNonNull("properties")) {
properties = mapper.readValue(jsonNode.get("properties").traverse(),
new TypeReference<HashMap<String, String>>() {
});
}

executeCreateAndStartIngestionLoad(fullDbName, label, tableNames, properties, tableToPartition, resultMap,
ConnectContext.get().getCurrentUserIdentity());

} catch (Exception e) {
LOG.warn("create ingestion load job failed, db: {}, err: {}", db, e.getMessage());
return ResponseEntityBuilder.okWithCommonError(e.getMessage());
}

return ResponseEntityBuilder.ok(resultMap);

}

private void executeCreateAndStartIngestionLoad(String dbName, String label, List<String> tableNames,
Map<String, String> properties,
Map<String, List<String>> tableToPartition,
Map<String, Object> resultMap, UserIdentity userInfo)
throws DdlException, BeginTransactionException, MetaNotFoundException, AnalysisException,
QuotaExceedException, LoadException {

long loadId = -1;
try {

LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
loadId = loadManager.createIngestionLoadJob(dbName, label, tableNames, properties, userInfo);
IngestionLoadJob loadJob = (IngestionLoadJob) loadManager.getLoadJob(loadId);
resultMap.put("loadId", loadId);

long txnId = loadJob.beginTransaction();
resultMap.put("txnId", txnId);

Map<String, Object> loadMeta = loadJob.getLoadMeta(tableToPartition);
resultMap.put("dbId", loadMeta.get("dbId"));
resultMap.put("signature", loadMeta.get("signature"));
resultMap.put("tableMeta", loadMeta.get("tableMeta"));

loadJob.startEtlJob();

} catch (DdlException | BeginTransactionException | MetaNotFoundException | AnalysisException
| QuotaExceedException | LoadException e) {
LOG.warn("create ingestion load job failed, db: {}, load id: {}, err: {}", dbName, loadId, e.getMessage());
if (loadId != -1L) {
try {
Env.getCurrentEnv().getLoadManager().getLoadJob(loadId).cancelJob(
new FailMsg(FailMsg.CancelType.UNKNOWN, StringUtils.defaultIfBlank(e.getMessage(), "")));
} catch (DdlException ex) {
LOG.warn("cancel ingestion load failed, db: {}, load id: {}, err: {}", dbName, loadId,
e.getMessage());
}
}
throw e;
}

}

/**
* Request body example:
* {
* "statusInfo": {
* "msg": "",
* "hadoopProperties": "{\"fs.defaultFS\":\"hdfs://hadoop01:8020\",\"hadoop.username\":\"hadoop\"}",
* "appId": "local-1723088141438",
* "filePathToSize": "{\"hdfs://hadoop01:8020/spark-load/jobs/25054/test/36019/dpp_result.json\":179,
* \"hdfs://hadoop01:8020/spark-load/jobs/25054/test/36019/load_meta.json\":3441,\"hdfs://hadoop01:8020
* /spark-load/jobs/25054/test/36019/V1.test.25056.29373.25057.0.366242211.parquet\":5745}",
* "dppResult": "{\"isSuccess\":true,\"failedReason\":\"\",\"scannedRows\":10,\"fileNumber\":1,
* \"fileSize\":2441,\"normalRows\":10,\"abnormalRows\":0,\"unselectRows\":0,\"partialAbnormalRows\":\"[]\",
* \"scannedBytes\":0}",
* "status": "SUCCESS"
* },
* "loadId": 36018
* }
*
*/
@RequestMapping(path = "/api/ingestion_load/{" + CATALOG_KEY + "}/{" + DB_KEY
+ "}/_update", method = RequestMethod.POST)
public Object updateIngestionLoad(HttpServletRequest request, HttpServletResponse response,
@PathVariable(value = CATALOG_KEY) String catalog,
@PathVariable(value = DB_KEY) String db) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}

executeCheckPassword(request, response);

if (!InternalCatalog.INTERNAL_CATALOG_NAME.equals(catalog)) {
return ResponseEntityBuilder.okWithCommonError("Only support internal catalog. "
+ "Current catalog is " + catalog);
}

Object redirectView = redirectToMaster(request, response);
if (redirectView != null) {
return redirectView;
}

String fullDbName = getFullDbName(db);

long loadId = -1;
try {

String body = HttpUtils.getBody(request);
JsonMapper mapper = JsonMapper.builder().build();
JsonNode jsonNode = mapper.readTree(body);
LoadJob loadJob = null;

if (jsonNode.hasNonNull("loadId")) {
loadId = jsonNode.get("loadId").asLong();
loadJob = Env.getCurrentEnv().getLoadManager().getLoadJob(loadId);
}

if (loadJob == null) {
return ResponseEntityBuilder.okWithCommonError("load job not exists, load id: " + loadId);
}

IngestionLoadJob ingestionLoadJob = (IngestionLoadJob) loadJob;
Set<String> tableNames = ingestionLoadJob.getTableNames();
for (String tableName : tableNames) {
checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tableName, PrivPredicate.LOAD);
}
Map<String, String> statusInfo = mapper.readValue(jsonNode.get("statusInfo").traverse(),
new TypeReference<HashMap<String, String>>() {
});
ingestionLoadJob.updateJobStatus(statusInfo);
} catch (IOException | MetaNotFoundException | UnauthorizedException e) {
LOG.warn("cancel ingestion load job failed, db: {}, load id: {}, err: {}", db, loadId, e.getMessage());
return ResponseEntityBuilder.okWithCommonError(e.getMessage());
}

return ResponseEntityBuilder.ok();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public enum EtlJobType {
LOCAL_FILE,
// create by job scheduler,inner use
INSERT_JOB,
INGESTION,
UNKNOWN
}
Loading
Loading