Skip to content

Commit

Permalink
Handle client caching at hash table client
Browse files Browse the repository at this point in the history
  • Loading branch information
charles-typ committed Aug 20, 2019
1 parent 8d304af commit 3b70efd
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 12 deletions.
13 changes: 9 additions & 4 deletions libjiffy/src/jiffy/auto_scaling/auto_scaling_service_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,15 @@ void auto_scaling_service_handler::auto_scaling(const std::vector<std::string> &
}
UNLOCK;
auto dst_slot_range = string_utils::split(dst_old_name[0], '_', 2);
auto dst_name = (std::stoi(dst_slot_range[0]) == merge_range_end) ?
std::to_string(merge_range_beg) + "_" + dst_slot_range[1]:
dst_slot_range[0] + "_" + std::to_string(merge_range_end);
src->run_command({"update_partition", name, "exporting$" + name + "$" + exp_target});
std::string dst_name;
bool merge_direction = false; // 0 low, 1 high
if(std::stoi(dst_slot_range[0]) == merge_range_end) {
merge_direction = true;
dst_name = std::to_string(merge_range_beg) + "_" + dst_slot_range[1];
} else {
dst_name = dst_slot_range[0] + "_" + std::to_string(merge_range_end);
}
src->run_command({"update_partition", name, "exporting$" + name + "$" + exp_target + "$" + std::to_string(merge_direction)});
auto finish_update_partition_before = time_utils::now_us();

// Transfer data from source to destination
Expand Down
22 changes: 21 additions & 1 deletion libjiffy/src/jiffy/storage/client/hash_table_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,34 @@ void hash_table_client::handle_redirect(std::vector<std::string> &_return, const
if (it == redirect_blocks_.end()) {
auto chain = directory::replica_chain(string_utils::split(_return[1], '!'));
auto client = std::make_shared<replica_chain_client>(fs_, path_, chain, HT_OPS, 0);
redirect_blocks_.emplace(std::make_pair(_return[0] + _return[1], client));
redirect_blocks_.emplace(std::make_pair(_return[0] + _return[1], client_cache_slot(_return[2], std::stoi(_return[3]), std::stoi(_return[4]), client)));
_return = client->run_command_redirected(args);
} else {
_return = it->second->run_command_redirected(args);
}

}
if (_return[0] == "!block_moved") {
auto hash = hash_slot::get(args[1]);
for(const auto &x : redirect_blocks_) {
auto slot_range = string_utils::split(x.second.slot_range_, '_', 2);
if(hash >= std::stoi(slot_range[0]) && hash < std::stoi(slot_range[1])) {
auto it = blocks_.find(std::stoi(slot_range[0]));
if(it == blocks_.end() && !x.second.merging_) {
blocks_.emplace(std::make_pair(std::stoi(slot_range[0]), x.second.client_));
} else if(x.second.merging_) {
if(x.second.merge_direction_) {
auto node = *(std::next(it));
blocks_.erase(std::next(it));
blocks_.insert(std::make_pair(std::stoi(slot_range[0]), node.second));
}
blocks_.erase(it);
} else {
continue;
}
throw redo_error();
}
}
refresh();
throw redo_error();
}
Expand Down
16 changes: 15 additions & 1 deletion libjiffy/src/jiffy/storage/client/hash_table_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@
namespace jiffy {
namespace storage {

struct client_cache_slot {
std::string slot_range_;
int merging_;
int merge_direction_;
std::shared_ptr<replica_chain_client> client_;

client_cache_slot(std::string slot_range, int merging, int merge_direction, std::shared_ptr<replica_chain_client> & client) {
slot_range_ = slot_range;
merging_ = merging;
merge_direction_ = merge_direction;
client_ = client;
}
};

/* Hash table client */
class hash_table_client : public data_structure_client {
public:
Expand Down Expand Up @@ -89,7 +103,7 @@ class hash_table_client : public data_structure_client {
std::map<int32_t, std::shared_ptr<replica_chain_client>> blocks_;

/* Caching created connections */
std::map<std::string, std::shared_ptr<replica_chain_client>> redirect_blocks_;
std::map<std::string, client_cache_slot>> redirect_blocks_;
};

}
Expand Down
15 changes: 9 additions & 6 deletions libjiffy/src/jiffy/storage/hashtable/hash_table_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void hash_table_partition::put(response &_return, const arg_list &args) {
auto hash = hash_slot::get(args[1]);
if (in_slot_range(hash) || (in_import_slot_range(hash) && args[3] == "!redirected")) {
if (metadata_ == "exporting" && in_export_slot_range(hash)) {
RETURN_ERR("!exporting", export_target_str_);
RETURN_ERR("!exporting", export_target_str_, export_slot_range(), std::to_string(scaling_down_), std::to_string(merge_direction_));
}
if (storage_size() + args[1].size() + args[2].size() > storage_capacity()) {
RETURN_ERR("!full");
Expand All @@ -78,7 +78,7 @@ void hash_table_partition::upsert(response &_return, const arg_list &args) {
auto hash = hash_slot::get(args[1]);
if (in_slot_range(hash) || (in_import_slot_range(hash) && args[3] == "!redirected")) {
if (metadata_ == "exporting" && in_export_slot_range(hash)) {
RETURN_ERR("!exporting", export_target_str_);
RETURN_ERR("!exporting", export_target_str_, export_slot_range(), std::to_string(scaling_down_), std::to_string(merge_direction_));
}
block_.upsert(make_binary(args[1]), [&](value_type &v) {
v = make_binary(args[2]);
Expand All @@ -98,7 +98,7 @@ void hash_table_partition::exists(response &_return, const arg_list &args) {
RETURN_OK("true");
}
if (metadata_ == "exporting" && in_export_slot_range(hash)) {
RETURN_ERR("!exporting", export_target_str_);
RETURN_ERR("!exporting", export_target_str_, export_slot_range(), std::to_string(scaling_down_), std::to_string(merge_direction_));
}
RETURN_OK("false");
}
Expand All @@ -115,7 +115,7 @@ void hash_table_partition::get(response &_return, const arg_list &args) {
RETURN_OK(to_string(block_.find(args[1])));
} catch (std::out_of_range &e) {
if (metadata_ == "exporting" && in_export_slot_range(hash)) {
RETURN_ERR("!exporting", export_target_str_);
RETURN_ERR("!exporting", export_target_str_, export_slot_range(), std::to_string(scaling_down_), std::to_string(merge_direction_));
}
RETURN_ERR("!key_not_found");
}
Expand All @@ -130,7 +130,7 @@ void hash_table_partition::update(response &_return, const arg_list &args) {
auto hash = hash_slot::get(args[1]);
if (in_slot_range(hash) || (in_import_slot_range(hash) && args[3] == "!redirected")) {
if (metadata_ == "exporting" && in_export_slot_range(hash)) {
RETURN_ERR("!exporting", export_target_str_);
RETURN_ERR("!exporting", export_target_str_, export_slot_range(), std::to_string(scaling_down_), std::to_string(merge_direction_));
}
std::string old_val;
if (block_.update_fn(args[1], [&](value_type &v) {
Expand All @@ -154,7 +154,7 @@ void hash_table_partition::remove(response &_return, const arg_list &args) {
auto hash = hash_slot::get(args[1]);
if (in_slot_range(hash) || (in_import_slot_range(hash) && args[2] == "!redirected")) {
if (metadata_ == "exporting" && in_export_slot_range(hash)) {
RETURN_ERR("!exporting", export_target_str_);
RETURN_ERR("!exporting", export_target_str_, export_slot_range(), std::to_string(scaling_down_), std::to_string(merge_direction_));
}
std::string old_val;
if (block_.erase_fn(args[1], [&](value_type &value) {
Expand Down Expand Up @@ -240,6 +240,9 @@ void hash_table_partition::update_partition(response &_return, const arg_list &a
export_target(s[2]);
auto range = utils::string_utils::split(s[1], '_');
export_slot_range(std::stoi(range[0]), std::stoi(range[1]));
if(s.size() == 4) {
merge_direction_ = std::stoi(s[3]);
}
} else if (status == "importing") {
if ((metadata() != "regular" && !(metadata() == "split_importing" && s[1] == name())) || new_name != name()
|| scaling_up_ || scaling_down_) {
Expand Down
10 changes: 10 additions & 0 deletions libjiffy/src/jiffy/storage/hashtable/hash_table_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ class hash_table_partition : public chain_module {
export_slot_range_.second = slot_end;
};

/**
* @brief Fetch export slot range
* @return Export slot range
*/
std::string export_slot_range() {
return std::to_string(export_slot_range_.first) + "_" + std::to_string(export_slot_range_.second);
}

/**
* @brief Check if slot is within export slot range
* @param slot Slot
Expand Down Expand Up @@ -375,6 +383,8 @@ class hash_table_partition : public chain_module {
/* Data update mutex, we want only one update function happen at a time */
std::mutex update_lock_;

/* Merge direction */
int merge_direction_;
};

}
Expand Down

0 comments on commit 3b70efd

Please sign in to comment.