Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jan 27, 2025
1 parent 5174a94 commit 3ac2379
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 39 deletions.
3 changes: 2 additions & 1 deletion be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ if ((ARCH_AMD64 OR ARCH_AARCH64) AND OS_LINUX)
hadoop_hdfs
)
add_definitions(-DUSE_HADOOP_HDFS)
add_definitions(-DUSE_DORIS_HADOOP_HDFS)
else()
add_library(hdfs3 STATIC IMPORTED)
set_target_properties(hdfs3 PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/libhdfs3.a)
Expand Down Expand Up @@ -822,4 +823,4 @@ if (BUILD_BENCHMARK)
target_link_libraries(benchmark_test ${DORIS_LINK_LIBS})
message(STATUS "Add benchmark to build")
install(TARGETS benchmark_test DESTINATION ${OUTPUT_DIR}/lib)
endif()
endif()
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ DEFINE_mBool(allow_zero_date, "false");
DEFINE_Bool(allow_invalid_decimalv2_literal, "false");
DEFINE_mString(kerberos_ccache_path, "");
DEFINE_mString(kerberos_krb5_conf_path, "/etc/krb5.conf");
DEFINE_mInt32(kerberos_refresh_interval_second, "300");
DEFINE_mInt32(kerberos_refresh_interval_second, "3600");

DEFINE_mString(get_stack_trace_tool, "libunwind");
DEFINE_mString(dwarf_location_info_mode, "FAST");
Expand Down
110 changes: 73 additions & 37 deletions be/src/io/hdfs_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@
#include <vector>

#include "common/config.h"
#include "common/kerberos/kerberos_ticket_mgr.h"
#include "common/logging.h"
#ifdef USE_HADOOP_HDFS
#include "hadoop_hdfs/hdfs.h"
#endif
#include "io/fs/hdfs.h"
#include "runtime/exec_env.h"
#include "util/string_util.h"

namespace doris {

#ifdef USE_HADOOP_HDFS
#ifdef USE_HADOOP_HDFS && USE_DORIS_HADOOP_HDFS
void err_log_message(const char* fmt, ...) {
va_list args;
va_start(args, fmt);
Expand Down Expand Up @@ -89,13 +91,13 @@ void va_err_log_message(const char* fmt, va_list ap) {

struct hdfsLogger logger = {.errLogMessage = err_log_message,
.vaErrLogMessage = va_err_log_message};
#endif // #ifdef USE_HADOOP_HDFS
#endif // #ifdef USE_HADOOP_HDFS && USE_DORIS_HADOOP_HDFS

Status HDFSCommonBuilder::init_hdfs_builder() {
#ifdef USE_HADOOP_HDFS
#ifdef USE_HADOOP_HDFS && USE_DORIS_HADOOP_HDFS
static std::once_flag flag;
std::call_once(flag, []() { hdfsSetLogger(&logger); });
#endif // #ifdef USE_HADOOP_HDFS
#endif // #ifdef USE_HADOOP_HDFS && USE_DORIS_HADOOP_HDFS

hdfs_builder = hdfsNewBuilder();
if (hdfs_builder == nullptr) {
Expand Down Expand Up @@ -123,6 +125,43 @@ Status HDFSCommonBuilder::check_krb_params() {
return Status::OK();
}

void HDFSCommonBuilder::set_hdfs_conf(const std::string& key, const std::string& val) {
hdfs_conf[key] = val;
}

std::string HDFSCommonBuilder::get_hdfs_conf_value(const std::string& key,
const std::string& default_val) const {
auto it = hdfs_conf.find(key);
if (it != hdfs_conf.end()) {
return it->second;
} else {
return default_val;
}
}

void HDFSCommonBuilder::set_hdfs_conf_to_hdfs_builder() {
for (const auto& pair : hdfs_conf) {
hdfsBuilderConfSetStr(hdfs_builder, pair.first.c_str(), pair.second.c_str());
}
}

Status HDFSCommonBuilder::set_kerberos_ticket_cache_path() {
kerberos::KerberosConfig config;
config.set_principal_and_keytab(hdfs_kerberos_principal, hdfs_kerberos_keytab);
config.set_krb5_conf_path(config::kerberos_krb5_conf_path);
config.set_refresh_interval(config::kerberos_refresh_interval_second);
config.set_min_time_before_refresh(600);
kerberos::KerberosTicketMgr* ticket_mgr = ExecEnv::GetInstance()->kerberos_ticket_mgr();
// the life cycle of string "ticket_cache_file" must be same as hdfs_builder,
// so here we need to use the ticket_cache_file instead of a temp string
RETURN_IF_ERROR(ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache_file));
hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_cache_file.c_str());
hdfsBuilderSetForceNewInstance(hdfs_builder);
LOG(INFO) << "get kerberos ticket path: " << ticket_cache_file
<< " with principal: " << hdfs_kerberos_principal;
return Status::OK();
}

THdfsParams parse_properties(const std::map<std::string, std::string>& properties) {
StringCaseMap<std::string> prop(properties.begin(), properties.end());
std::vector<THdfsConf> hdfs_configs;
Expand Down Expand Up @@ -158,45 +197,42 @@ THdfsParams parse_properties(const std::map<std::string, std::string>& propertie
Status create_hdfs_builder(const THdfsParams& hdfsParams, const std::string& fs_name,
HDFSCommonBuilder* builder) {
RETURN_IF_ERROR(builder->init_hdfs_builder());
hdfsBuilderSetNameNode(builder->get(), fs_name.c_str());
// set kerberos conf
if (hdfsParams.__isset.hdfs_kerberos_keytab) {
builder->kerberos_login = true;
builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
#ifdef USE_HADOOP_HDFS
hdfsBuilderSetKerb5Conf(builder->get(), doris::config::kerberos_krb5_conf_path.c_str());
hdfsBuilderSetKeyTabFile(builder->get(), hdfsParams.hdfs_kerberos_keytab.c_str());
#endif
}
if (hdfsParams.__isset.hdfs_kerberos_principal) {
builder->kerberos_login = true;
builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
hdfsBuilderSetPrincipal(builder->get(), hdfsParams.hdfs_kerberos_principal.c_str());
} else if (hdfsParams.__isset.user) {
hdfsBuilderSetUserName(builder->get(), hdfsParams.user.c_str());
#ifdef USE_HADOOP_HDFS
hdfsBuilderSetKerb5Conf(builder->get(), nullptr);
hdfsBuilderSetKeyTabFile(builder->get(), nullptr);
#endif
}
// set other conf
builder->fs_name = fs_name;
hdfsBuilderSetNameNode(builder->get(), builder->fs_name.c_str());
LOG(INFO) << "set hdfs namenode: " << fs_name;

std::string auth_type = "simple";
// First, copy all hdfs conf and set to hdfs builder
if (hdfsParams.__isset.hdfs_conf) {
// set other conf
for (const THdfsConf& conf : hdfsParams.hdfs_conf) {
hdfsBuilderConfSetStr(builder->get(), conf.key.c_str(), conf.value.c_str());
LOG(INFO) << "set hdfs config: " << conf.key << ", value: " << conf.value;
#ifdef USE_HADOOP_HDFS
// Set krb5.conf, we should define java.security.krb5.conf in catalog properties
if (strcmp(conf.key.c_str(), "java.security.krb5.conf") == 0) {
hdfsBuilderSetKerb5Conf(builder->get(), conf.value.c_str());
builder->set_hdfs_conf(conf.key, conf.value);
LOG(INFO) << "set hdfs config key: " << conf.key << ", value: " << conf.value;
if (strcmp(conf.key.c_str(), "hadoop.security.authentication") == 0) {
auth_type = conf.value;
}
#endif
}
builder->set_hdfs_conf_to_hdfs_builder();
}
if (builder->is_kerberos()) {
RETURN_IF_ERROR(builder->check_krb_params());

if (auth_type == "kerberos") {
// set kerberos conf
if (!hdfsParams.__isset.hdfs_kerberos_principal ||
!hdfsParams.__isset.hdfs_kerberos_keytab) {
return Status::InvalidArgument("Must set both principal and keytab");
}
builder->kerberos_login = true;
builder->hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
builder->hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
RETURN_IF_ERROR(builder->set_kerberos_ticket_cache_path());
hdfsBuilderConfSetStr(builder->get(), FALLBACK_TO_SIMPLE_AUTH_ALLOWED.c_str(),
TRUE_VALUE.c_str());
} else {
if (hdfsParams.__isset.user) {
builder->hadoop_user = hdfsParams.user;
hdfsBuilderSetUserName(builder->get(), builder->hadoop_user.c_str());
}
}
hdfsBuilderConfSetStr(builder->get(), "ipc.client.fallback-to-simple-auth-allowed", "true");
return Status::OK();
}

Status create_hdfs_builder(const std::map<std::string, std::string>& properties,
Expand Down
12 changes: 12 additions & 0 deletions be/src/io/hdfs_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,23 @@ class HDFSCommonBuilder {
bool is_kerberos() const { return kerberos_login; }
Status check_krb_params();

Status set_kerberos_ticket_cache_path();
void set_hdfs_conf(const std::string& key, const std::string& val);
std::string get_hdfs_conf_value(const std::string& key, const std::string& default_val) const;
void set_hdfs_conf_to_hdfs_builder();

private:
hdfsBuilder* hdfs_builder = nullptr;
bool kerberos_login {false};

// We should save these info from thrift,
// so that the lifecycle of these will same as hdfs_builder
std::string fs_name;
std::string hadoop_user;
std::string hdfs_kerberos_keytab;
std::string hdfs_kerberos_principal;
std::string ticket_cache_file;
std::unordered_map<std::string, std::string> hdfs_conf;
};

THdfsParams parse_properties(const std::map<std::string, std::string>& properties);
Expand Down

0 comments on commit 3ac2379

Please sign in to comment.