Skip to content

Commit c084897

Browse files
committed
2
1 parent 36fa6fb commit c084897

10 files changed

+306
-53
lines changed

be/src/common/kerberos/kerberos_config.cpp

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,11 @@
2020
#include <filesystem>
2121

2222
#include "common/config.h"
23+
#include "util/md5.h"
2324

2425
namespace doris::kerberos {
2526

2627
KerberosConfig::KerberosConfig()
2728
: _refresh_interval_second(300), _min_time_before_refresh_second(600) {}
2829

29-
void KerberosConfig::set_cache_file_path(const std::string& path) {
30-
const std::string prefix = doris::config::kerberos_ccache_path;
31-
std::filesystem::path full_path = std::filesystem::path(prefix) / path;
32-
full_path = std::filesystem::weakly_canonical(full_path);
33-
_cache_file_path = full_path.string();
34-
}
35-
3630
} // namespace doris::kerberos

be/src/common/kerberos/kerberos_config.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,31 @@
2020
#include <chrono>
2121
#include <string>
2222

23+
#include "common/status.h"
24+
2325
namespace doris::kerberos {
2426

2527
class KerberosConfig {
2628
public:
2729
KerberosConfig();
2830

29-
void set_keytab_path(const std::string& path) { _keytab_path = path; }
30-
void set_principal(const std::string& principal) { _principal = principal; }
31-
void set_cache_file_path(const std::string& path);
31+
void set_principal_and_keytab(const std::string& principal, const std::string& keytab) {
32+
_principal = principal;
33+
_keytab_path = keytab;
34+
}
3235
void set_krb5_conf_path(const std::string& path) { _krb5_conf_path = path; }
3336
void set_refresh_interval(int32_t interval) { _refresh_interval_second = interval; }
3437
void set_min_time_before_refresh(int32_t time) { _min_time_before_refresh_second = time; }
3538

36-
const std::string& get_keytab_path() const { return _keytab_path; }
3739
const std::string& get_principal() const { return _principal; }
38-
const std::string& get_cache_file_path() const { return _cache_file_path; }
40+
const std::string& get_keytab_path() const { return _keytab_path; }
3941
const std::string& get_krb5_conf_path() const { return _krb5_conf_path; }
4042
int32_t get_refresh_interval_second() const { return _refresh_interval_second; }
4143
int32_t get_min_time_before_refresh_second() const { return _min_time_before_refresh_second; }
4244

4345
private:
44-
std::string _keytab_path;
4546
std::string _principal;
46-
std::string _cache_file_path;
47+
std::string _keytab_path;
4748
std::string _krb5_conf_path;
4849
int32_t _refresh_interval_second;
4950
int32_t _min_time_before_refresh_second;

be/src/common/kerberos/kerberos_ticket_cache.cpp

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,76 @@
1818
#include "common/kerberos/kerberos_ticket_cache.h"
1919

2020
#include <chrono>
21+
#include <filesystem>
2122
#include <sstream>
2223
#include <thread>
2324

25+
#include "common/config.h"
26+
#include "util/md5.h"
27+
2428
namespace doris::kerberos {
2529

2630
KerberosTicketCache::KerberosTicketCache(const KerberosConfig& config) : _config(config) {}
2731

2832
KerberosTicketCache::~KerberosTicketCache() {
2933
stop_periodic_refresh();
3034
_cleanup_context();
31-
LOG(INFO) << "destroy kerberos ticket cache with principal: " << _config.get_principal();
35+
if (std::filesystem::exists(_ticket_cache_path)) {
36+
std::filesystem::remove(_ticket_cache_path);
37+
}
38+
LOG(INFO) << "destroy kerberos ticket cache " << _ticket_cache_path
39+
<< " with principal: " << _config.get_principal();
3240
}
3341

3442
Status KerberosTicketCache::initialize() {
3543
std::lock_guard<std::mutex> lock(_mutex);
44+
RETURN_IF_ERROR(_init_ticket_cache_path());
3645
Status st = _initialize_context();
37-
LOG(INFO) << "initialized kerberos ticket cache with principal: " << _config.get_principal()
38-
<< ": " << st.to_string();
46+
LOG(INFO) << "initialized kerberos ticket cache " << _ticket_cache_path
47+
<< " with principal: " << _config.get_principal() << ": " << st.to_string();
3948
return st;
4049
}
4150

51+
Status KerberosTicketCache::_init_ticket_cache_path() {
52+
// use md5(principal + keytab_path) as ticket cache file name.
53+
// so that same (principal + keytab_path) will have same name.
54+
std::string combined = _config.get_principal() + _config.get_keytab_path();
55+
Md5Digest digest;
56+
digest.update(combined.c_str(), combined.length());
57+
digest.digest();
58+
std::string cache_file_md5 = digest.hex();
59+
60+
// The path should be with prefix "config::kerberos_ccache_path"
61+
const std::string prefix = doris::config::kerberos_ccache_path;
62+
std::filesystem::path full_path = std::filesystem::path(prefix) / cache_file_md5;
63+
full_path = std::filesystem::weakly_canonical(full_path);
64+
std::filesystem::path parent_path = full_path.parent_path();
65+
66+
try {
67+
if (!std::filesystem::exists(parent_path)) {
68+
// Create the parent dir if not exists
69+
std::filesystem::create_directories(parent_path);
70+
} else {
71+
// Delete the ticket cache file if exists
72+
if (std::filesystem::exists(full_path)) {
73+
std::filesystem::remove(full_path);
74+
}
75+
}
76+
77+
_ticket_cache_path = full_path.string();
78+
return Status::OK();
79+
} catch (const std::filesystem::filesystem_error& e) {
80+
return Status::InternalError("Error when setting kerberos ticket cache file: {}, {}",
81+
full_path.native(), e.what());
82+
} catch (const std::exception& e) {
83+
return Status::InternalError("Exception when setting kerberos ticket cache file: {}, {}",
84+
full_path.native(), e.what());
85+
} catch (...) {
86+
return Status::InternalError("Unknown error when setting kerberos ticket cache file: {}",
87+
full_path.native());
88+
}
89+
}
90+
4291
Status KerberosTicketCache::login() {
4392
std::lock_guard<std::mutex> lock(_mutex);
4493

@@ -51,7 +100,7 @@ Status KerberosTicketCache::login() {
51100
RETURN_IF_ERROR(_check_error(code, "Failed to resolve keytab"));
52101

53102
// init ccache
54-
code = krb5_cc_resolve(_context, _config.get_cache_file_path().c_str(), &_ccache);
103+
code = krb5_cc_resolve(_context, _ticket_cache_path.c_str(), &_ccache);
55104
RETURN_IF_ERROR(_check_error(code, "Failed to resolve credential cache"));
56105

57106
// get init creds
@@ -89,8 +138,7 @@ Status KerberosTicketCache::login() {
89138

90139
Status KerberosTicketCache::login_with_cache() {
91140
std::lock_guard<std::mutex> lock(_mutex);
92-
krb5_error_code code =
93-
krb5_cc_resolve(_context, _config.get_cache_file_path().c_str(), &_ccache);
141+
krb5_error_code code = krb5_cc_resolve(_context, _ticket_cache_path.c_str(), &_ccache);
94142
return _check_error(code, "Failed to resolve credential cache");
95143
}
96144

@@ -139,8 +187,7 @@ void KerberosTicketCache::start_periodic_refresh() {
139187
// ignore and continue
140188
LOG(WARNING) << st.to_string();
141189
} else {
142-
LOG(INFO) << "refresh kerberos ticket cache: "
143-
<< _config.get_cache_file_path();
190+
LOG(INFO) << "refresh kerberos ticket cache: " << _ticket_cache_path;
144191
}
145192
}
146193
}

be/src/common/kerberos/kerberos_ticket_cache.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,20 @@ class KerberosTicketCache {
6161
void start_periodic_refresh();
6262
void stop_periodic_refresh();
6363

64+
const KerberosConfig& get_config() const { return _config; }
65+
66+
const std::string get_ticket_cache_path() const { return _ticket_cache_path; }
67+
6468
private:
69+
Status _init_ticket_cache_path();
6570
Status _initialize_context();
6671
void _cleanup_context();
6772
Status _check_error(krb5_error_code code, const char* message);
6873
bool _needs_refresh() const;
6974

75+
private:
7076
KerberosConfig _config;
77+
std::string _ticket_cache_path;
7178
krb5_context _context {nullptr};
7279
krb5_ccache _ccache {nullptr};
7380
krb5_principal _principal {nullptr};
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "common/kerberos/kerberos_ticket_mgr.h"
19+
20+
#include <iomanip>
21+
#include <sstream>
22+
23+
#include "util/md5.h"
24+
25+
namespace doris::kerberos {
26+
27+
KerberosTicketMgr::KerberosTicketMgr() {
28+
_start_cleanup_thread();
29+
}
30+
31+
KerberosTicketMgr::~KerberosTicketMgr() {
32+
_stop_cleanup_thread();
33+
}
34+
35+
std::string KerberosTicketMgr::_generate_key(const std::string& principal,
36+
const std::string& keytab_path) {
37+
std::string combined = principal + keytab_path;
38+
Md5Digest digest;
39+
digest.update(combined.c_str(), combined.length());
40+
digest.digest();
41+
return digest.hex();
42+
}
43+
44+
Status KerberosTicketMgr::get_or_set_ticket_cache(const KerberosConfig& config,
45+
std::string* cache_path) {
46+
std::string key = _generate_key(config.get_principal(), config.get_keytab_path());
47+
48+
std::lock_guard<std::mutex> lock(_mutex);
49+
50+
// Check if already exists
51+
auto it = _ticket_caches.find(key);
52+
if (it != _ticket_caches.end()) {
53+
it->second.last_access_time = std::chrono::steady_clock::now();
54+
*cache_path = it->second.cache->get_ticket_cache_path();
55+
return Status::OK();
56+
}
57+
58+
// Create new ticket cache
59+
auto ticket_cache = std::make_unique<KerberosTicketCache>(config);
60+
RETURN_IF_ERROR(ticket_cache->initialize());
61+
RETURN_IF_ERROR(ticket_cache->login());
62+
RETURN_IF_ERROR(ticket_cache->write_ticket_cache());
63+
ticket_cache->start_periodic_refresh();
64+
65+
// Add to map
66+
KerberosTicketEntry entry {.cache = std::move(ticket_cache),
67+
.last_access_time = std::chrono::steady_clock::now()};
68+
69+
auto [inserted_it, success] = _ticket_caches.emplace(key, std::move(entry));
70+
if (!success) {
71+
return Status::InternalError("Failed to insert ticket cache into map");
72+
}
73+
74+
*cache_path = inserted_it->second.cache->get_ticket_cache_path();
75+
LOG(INFO) << "create new kerberos ticket cache: " << *cache_path;
76+
return Status::OK();
77+
}
78+
79+
Status KerberosTicketMgr::get_cache_file_path(const std::string& principal,
80+
const std::string& keytab_path,
81+
std::string* cache_path) {
82+
std::string key = _generate_key(principal, keytab_path);
83+
84+
std::lock_guard<std::mutex> lock(_mutex);
85+
86+
auto it = _ticket_caches.find(key);
87+
if (it == _ticket_caches.end()) {
88+
return Status::NotFound("Kerberos ticket cache not found for principal: " + principal);
89+
}
90+
91+
// Update last access time
92+
it->second.last_access_time = std::chrono::steady_clock::now();
93+
94+
*cache_path = it->second.cache->get_ticket_cache_path();
95+
return Status::OK();
96+
}
97+
98+
void KerberosTicketMgr::_cleanup_thread_func() {
99+
// Use a shorter sleep interval for quicker shutdown
100+
static constexpr std::chrono::seconds SLEEP_INTERVAL {1};
101+
auto last_cleanup_time = std::chrono::steady_clock::now();
102+
103+
while (!_should_stop) {
104+
std::this_thread::sleep_for(SLEEP_INTERVAL);
105+
auto now = std::chrono::steady_clock::now();
106+
if (now - last_cleanup_time < CLEANUP_INTERVAL && !_should_stop) {
107+
continue;
108+
}
109+
110+
// Store expired caches here to destroy them after releasing the lock
111+
std::vector<std::unique_ptr<KerberosTicketCache>> expired_caches;
112+
{
113+
std::lock_guard<std::mutex> lock(_mutex);
114+
// Remove expired entries
115+
for (auto it = _ticket_caches.begin(); it != _ticket_caches.end();) {
116+
auto time_since_last_access = now - it->second.last_access_time;
117+
if (time_since_last_access > EXPIRATION_TIME) {
118+
// Move the unique_ptr to our expired_caches vector
119+
expired_caches.push_back(std::move(it->second.cache));
120+
it = _ticket_caches.erase(it);
121+
} else {
122+
++it;
123+
}
124+
}
125+
last_cleanup_time = now;
126+
}
127+
128+
// expired_caches will be destroyed here, outside the lock
129+
}
130+
}
131+
132+
void KerberosTicketMgr::_start_cleanup_thread() {
133+
_should_stop = false;
134+
_cleanup_thread = std::make_unique<std::thread>(&KerberosTicketMgr::_cleanup_thread_func, this);
135+
}
136+
137+
void KerberosTicketMgr::_stop_cleanup_thread() {
138+
if (_cleanup_thread) {
139+
_should_stop = true;
140+
_cleanup_thread->join();
141+
_cleanup_thread.reset();
142+
}
143+
}
144+
145+
} // namespace doris::kerberos

0 commit comments

Comments
 (0)