From 9316629b1d7da9a2a0bade1182bf7b5640b22a45 Mon Sep 17 00:00:00 2001 From: syaojun Date: Tue, 3 Mar 2026 08:22:10 +0800 Subject: [PATCH] feat(cpp): implement LRU cache for chunk management and update readers --- cpp/src/graphar/arrow/chunk_reader.cc | 116 +++++++++++----- cpp/src/graphar/arrow/chunk_reader.h | 43 ++++-- cpp/src/graphar/lru_cache.h | 104 +++++++++++++++ cpp/src/graphar/reader_util.h | 9 +- cpp/test/test_lru_cache.cc | 184 ++++++++++++++++++++++++++ 5 files changed, 412 insertions(+), 44 deletions(-) create mode 100644 cpp/src/graphar/lru_cache.h create mode 100644 cpp/test/test_lru_cache.cc diff --git a/cpp/src/graphar/arrow/chunk_reader.cc b/cpp/src/graphar/arrow/chunk_reader.cc index 030d595d4..1c73e0ea2 100644 --- a/cpp/src/graphar/arrow/chunk_reader.cc +++ b/cpp/src/graphar/arrow/chunk_reader.cc @@ -170,6 +170,7 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( seek_id_(0), schema_(nullptr), chunk_table_(nullptr), + chunk_cache_(options.cache_capacity), filter_options_(options) { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR(auto pg_path_prefix, @@ -194,6 +195,7 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( seek_id_(0), schema_(nullptr), chunk_table_(nullptr), + chunk_cache_(options.cache_capacity), filter_options_(options) { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); @@ -211,9 +213,8 @@ Status VertexPropertyArrowChunkReader::seek(IdType id) { IdType pre_chunk_index = chunk_index_; chunk_index_ = id / vertex_info_->GetChunkSize(); if (chunk_index_ != pre_chunk_index) { - // TODO(@acezen): use a cache to avoid reloading the same chunk, could use - // a LRU cache. - chunk_table_.reset(); + auto* cached = chunk_cache_.Get(chunk_index_); + chunk_table_ = cached ? *cached : nullptr; } if (chunk_index_ >= chunk_num_) { return Status::IndexError("Internal vertex id ", id, " is out of range [0,", @@ -260,6 +261,7 @@ VertexPropertyArrowChunkReader::GetChunkV2() { GAR_RETURN_NOT_OK( CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); } + chunk_cache_.Put(chunk_index_, chunk_table_); } IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -304,6 +306,7 @@ VertexPropertyArrowChunkReader::GetChunkV1() { GAR_RETURN_NOT_OK( CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); } + chunk_cache_.Put(chunk_index_, chunk_table_); } IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -340,6 +343,7 @@ VertexPropertyArrowChunkReader::GetLabelChunk() { // GAR_RETURN_NOT_OK( // CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); // } + chunk_cache_.Put(chunk_index_, chunk_table_); } IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -352,17 +356,22 @@ Status VertexPropertyArrowChunkReader::next_chunk() { vertex_info_->GetType(), " chunk num ", chunk_num_); } seek_id_ = chunk_index_ * vertex_info_->GetChunkSize(); - chunk_table_.reset(); + auto* cached = chunk_cache_.Get(chunk_index_); + chunk_table_ = cached ? *cached : nullptr; return Status::OK(); } void VertexPropertyArrowChunkReader::Filter(util::Filter filter) { filter_options_.filter = filter; + chunk_table_ = nullptr; + chunk_cache_.Disable(); } void VertexPropertyArrowChunkReader::Select(util::ColumnNames column_names) { filter_options_.columns = column_names; + chunk_table_ = nullptr; + chunk_cache_.Clear(); } Result> @@ -517,7 +526,7 @@ VertexPropertyArrowChunkReader::MakeForLabels( AdjListArrowChunkReader::AdjListArrowChunkReader( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix) + const std::string& prefix, size_t cache_capacity) : edge_info_(edge_info), adj_list_type_(adj_list_type), prefix_(prefix), @@ -525,6 +534,7 @@ AdjListArrowChunkReader::AdjListArrowChunkReader( chunk_index_(0), seek_offset_(0), chunk_table_(nullptr), + chunk_cache_(cache_capacity), chunk_num_(-1) /* -1 means uninitialized */ { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR(auto adj_list_path_prefix, @@ -544,6 +554,7 @@ AdjListArrowChunkReader::AdjListArrowChunkReader( chunk_index_(other.chunk_index_), seek_offset_(other.seek_offset_), chunk_table_(nullptr), + chunk_cache_(other.chunk_cache_.capacity()), vertex_chunk_num_(other.vertex_chunk_num_), chunk_num_(other.chunk_num_), base_dir_(other.base_dir_), @@ -585,7 +596,9 @@ Status AdjListArrowChunkReader::seek_src(IdType id) { // initialize or update chunk_num_ vertex_chunk_index_ = new_vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (adj_list_type_ == AdjListType::unordered_by_source) { @@ -618,7 +631,9 @@ Status AdjListArrowChunkReader::seek_dst(IdType id) { // initialize or update chunk_num_ vertex_chunk_index_ = new_vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (adj_list_type_ == AdjListType::unordered_by_dest) { @@ -636,7 +651,9 @@ Status AdjListArrowChunkReader::seek(IdType offset) { IdType pre_chunk_index = chunk_index_; chunk_index_ = offset / edge_info_->GetChunkSize(); if (chunk_index_ != pre_chunk_index) { - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (chunk_num_ < 0) { // initialize chunk_num_ @@ -666,6 +683,8 @@ Result> AdjListArrowChunkReader::GetChunk() { std::string path = prefix_ + chunk_file_path; auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType(); GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type)); + chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_), + chunk_table_); } IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -688,21 +707,26 @@ Status AdjListArrowChunkReader::next_chunk() { GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); } seek_offset_ = chunk_index_ * edge_info_->GetChunkSize(); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; return Status::OK(); } Status AdjListArrowChunkReader::seek_chunk_index(IdType vertex_chunk_index, IdType chunk_index) { + bool changed = false; if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) { vertex_chunk_index_ = vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + changed = true; } - if (chunk_index_ != chunk_index) { + if (chunk_index_ != chunk_index || changed) { chunk_index_ = chunk_index; seek_offset_ = chunk_index * edge_info_->GetChunkSize(); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } return Status::OK(); } @@ -715,32 +739,35 @@ Result AdjListArrowChunkReader::GetRowNumOfChunk() { std::string path = prefix_ + chunk_file_path; auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType(); GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type)); + chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_), + chunk_table_); } return chunk_table_->num_rows(); } Result> AdjListArrowChunkReader::Make( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix) { + const std::string& prefix, size_t cache_capacity) { if (!edge_info->HasAdjacentListType(adj_list_type)) { return Status::KeyError( "The adjacent list type ", AdjListTypeToString(adj_list_type), " doesn't exist in edge ", edge_info->GetEdgeType(), "."); } return std::make_shared(edge_info, adj_list_type, - prefix); + prefix, cache_capacity); } Result> AdjListArrowChunkReader::Make( const std::shared_ptr& graph_info, const std::string& src_type, const std::string& edge_type, const std::string& dst_type, - AdjListType adj_list_type) { + AdjListType adj_list_type, size_t cache_capacity) { auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type); if (!edge_info) { return Status::KeyError("The edge ", src_type, " ", edge_type, " ", dst_type, " doesn't exist."); } - return Make(edge_info, adj_list_type, graph_info->GetPrefix()); + return Make(edge_info, adj_list_type, graph_info->GetPrefix(), + cache_capacity); } Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() { @@ -752,13 +779,14 @@ Status AdjListArrowChunkReader::initOrUpdateEdgeChunkNum() { AdjListOffsetArrowChunkReader::AdjListOffsetArrowChunkReader( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix) + const std::string& prefix, size_t cache_capacity) : edge_info_(std::move(edge_info)), adj_list_type_(adj_list_type), prefix_(prefix), chunk_index_(0), seek_id_(0), - chunk_table_(nullptr) { + chunk_table_(nullptr), + chunk_cache_(cache_capacity) { std::string base_dir; GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); GAR_ASSIGN_OR_RAISE_ERROR(auto dir_path, @@ -785,7 +813,8 @@ Status AdjListOffsetArrowChunkReader::seek(IdType id) { IdType pre_chunk_index = chunk_index_; chunk_index_ = id / vertex_chunk_size_; if (chunk_index_ != pre_chunk_index) { - chunk_table_.reset(); + auto* cached = chunk_cache_.Get(chunk_index_); + chunk_table_ = cached ? *cached : nullptr; } if (chunk_index_ >= vertex_chunk_num_) { return Status::IndexError("Internal vertex id ", id, "is out of range [0,", @@ -806,6 +835,7 @@ AdjListOffsetArrowChunkReader::GetChunk() { std::string path = prefix_ + chunk_file_path; auto file_type = edge_info_->GetAdjacentList(adj_list_type_)->GetFileType(); GAR_ASSIGN_OR_RAISE(chunk_table_, fs_->ReadFileToTable(path, file_type)); + chunk_cache_.Put(chunk_index_, chunk_table_); } IdType row_offset = seek_id_ - chunk_index_ * vertex_chunk_size_; return chunk_table_->Slice(row_offset)->column(0)->chunk(0); @@ -820,7 +850,8 @@ Status AdjListOffsetArrowChunkReader::next_chunk() { AdjListTypeToString(adj_list_type_), "."); } seek_id_ = chunk_index_ * vertex_chunk_size_; - chunk_table_.reset(); + auto* cached = chunk_cache_.Get(chunk_index_); + chunk_table_ = cached ? *cached : nullptr; return Status::OK(); } @@ -828,27 +859,29 @@ Status AdjListOffsetArrowChunkReader::next_chunk() { Result> AdjListOffsetArrowChunkReader::Make(const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix) { + const std::string& prefix, + size_t cache_capacity) { if (!edge_info->HasAdjacentListType(adj_list_type)) { return Status::KeyError( "The adjacent list type ", AdjListTypeToString(adj_list_type), " doesn't exist in edge ", edge_info->GetEdgeType(), "."); } - return std::make_shared(edge_info, - adj_list_type, prefix); + return std::make_shared( + edge_info, adj_list_type, prefix, cache_capacity); } Result> AdjListOffsetArrowChunkReader::Make( const std::shared_ptr& graph_info, const std::string& src_type, const std::string& edge_type, const std::string& dst_type, - AdjListType adj_list_type) { + AdjListType adj_list_type, size_t cache_capacity) { auto edge_info = graph_info->GetEdgeInfo(src_type, edge_type, dst_type); if (!edge_info) { return Status::KeyError("The edge ", src_type, " ", edge_type, " ", dst_type, " doesn't exist."); } - return Make(edge_info, adj_list_type, graph_info->GetPrefix()); + return Make(edge_info, adj_list_type, graph_info->GetPrefix(), + cache_capacity); } AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( @@ -865,6 +898,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( seek_offset_(0), schema_(nullptr), chunk_table_(nullptr), + chunk_cache_(options.cache_capacity), filter_options_(options), chunk_num_(-1) /* -1 means uninitialized */ { GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); @@ -890,6 +924,7 @@ AdjListPropertyArrowChunkReader::AdjListPropertyArrowChunkReader( seek_offset_(other.seek_offset_), schema_(other.schema_), chunk_table_(nullptr), + chunk_cache_(other.chunk_cache_.capacity()), filter_options_(other.filter_options_), vertex_chunk_num_(other.vertex_chunk_num_), chunk_num_(other.chunk_num_), @@ -935,7 +970,9 @@ Status AdjListPropertyArrowChunkReader::seek_src(IdType id) { if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) { vertex_chunk_index_ = new_vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (adj_list_type_ == AdjListType::unordered_by_source) { @@ -967,7 +1004,9 @@ Status AdjListPropertyArrowChunkReader::seek_dst(IdType id) { if (chunk_num_ < 0 || vertex_chunk_index_ != new_vertex_chunk_index) { vertex_chunk_index_ = new_vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (adj_list_type_ == AdjListType::unordered_by_dest) { @@ -985,7 +1024,9 @@ Status AdjListPropertyArrowChunkReader::seek(IdType offset) { seek_offset_ = offset; chunk_index_ = offset / edge_info_->GetChunkSize(); if (chunk_index_ != pre_chunk_index) { - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } if (chunk_num_ < 0) { // initialize chunk_num_ @@ -1024,6 +1065,8 @@ AdjListPropertyArrowChunkReader::GetChunk() { GAR_RETURN_NOT_OK( CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); } + chunk_cache_.Put(std::make_pair(vertex_chunk_index_, chunk_index_), + chunk_table_); } IdType row_offset = seek_offset_ - chunk_index_ * edge_info_->GetChunkSize(); return chunk_table_->Slice(row_offset); @@ -1049,31 +1092,40 @@ Status AdjListPropertyArrowChunkReader::next_chunk() { GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); } seek_offset_ = chunk_index_ * edge_info_->GetChunkSize(); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; return Status::OK(); } Status AdjListPropertyArrowChunkReader::seek_chunk_index( IdType vertex_chunk_index, IdType chunk_index) { + bool changed = false; if (chunk_num_ < 0 || vertex_chunk_index_ != vertex_chunk_index) { vertex_chunk_index_ = vertex_chunk_index; GAR_RETURN_NOT_OK(initOrUpdateEdgeChunkNum()); - chunk_table_.reset(); + changed = true; } - if (chunk_index_ != chunk_index) { + if (chunk_index_ != chunk_index || changed) { chunk_index_ = chunk_index; seek_offset_ = chunk_index * edge_info_->GetChunkSize(); - chunk_table_.reset(); + auto key = std::make_pair(vertex_chunk_index_, chunk_index_); + auto* cached = chunk_cache_.Get(key); + chunk_table_ = cached ? *cached : nullptr; } return Status::OK(); } void AdjListPropertyArrowChunkReader::Filter(util::Filter filter) { filter_options_.filter = filter; + chunk_table_ = nullptr; + chunk_cache_.Disable(); } void AdjListPropertyArrowChunkReader::Select(util::ColumnNames column_names) { filter_options_.columns = column_names; + chunk_table_ = nullptr; + chunk_cache_.Clear(); } Result> diff --git a/cpp/src/graphar/arrow/chunk_reader.h b/cpp/src/graphar/arrow/chunk_reader.h index 779f85968..19a9d9caf 100644 --- a/cpp/src/graphar/arrow/chunk_reader.h +++ b/cpp/src/graphar/arrow/chunk_reader.h @@ -25,6 +25,7 @@ #include #include "graphar/fwd.h" +#include "graphar/lru_cache.h" #include "graphar/reader_util.h" #include "graphar/status.h" @@ -67,7 +68,10 @@ class VertexPropertyArrowChunkReader { const std::vector& property_names, const std::string& prefix, const util::FilterOptions& options = {}); - VertexPropertyArrowChunkReader() : vertex_info_(nullptr), prefix_("") {} + VertexPropertyArrowChunkReader() + : vertex_info_(nullptr), + prefix_(""), + chunk_cache_(util::FilterOptions::kDefaultCacheCapacity) {} /** * @brief Initialize the VertexPropertyArrowChunkReader. @@ -267,6 +271,7 @@ class VertexPropertyArrowChunkReader { IdType vertex_num_; std::shared_ptr schema_; std::shared_ptr chunk_table_; + LRUCache> chunk_cache_; util::FilterOptions filter_options_; std::shared_ptr fs_; }; @@ -283,9 +288,12 @@ class AdjListArrowChunkReader { * @param edge_info The edge info that describes the edge type. * @param adj_list_type The adj list type for the edge. * @param prefix The absolute prefix. + * @param cache_capacity The capacity of the LRU chunk cache. */ - AdjListArrowChunkReader(const std::shared_ptr& edge_info, - AdjListType adj_list_type, const std::string& prefix); + AdjListArrowChunkReader( + const std::shared_ptr& edge_info, AdjListType adj_list_type, + const std::string& prefix, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); /** * @brief Copy constructor. @@ -355,10 +363,12 @@ class AdjListArrowChunkReader { * @param edge_info The edge info. * @param adj_list_type The adj list type for the edges. * @param prefix The absolute prefix of the graph. + * @param cache_capacity The capacity of the LRU chunk cache. Default is 4. */ static Result> Make( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix); + const std::string& prefix, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); /** * @brief Create an AdjListArrowChunkReader instance from graph info. @@ -368,11 +378,13 @@ class AdjListArrowChunkReader { * @param edge_type The edge type. * @param dst_type The destination vertex type. * @param adj_list_type The adj list type for the edges. + * @param cache_capacity The capacity of the LRU chunk cache. Default is 4. */ static Result> Make( const std::shared_ptr& graph_info, const std::string& src_type, const std::string& edge_type, const std::string& dst_type, - AdjListType adj_list_type); + AdjListType adj_list_type, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); private: Status initOrUpdateEdgeChunkNum(); @@ -384,6 +396,8 @@ class AdjListArrowChunkReader { IdType vertex_chunk_index_, chunk_index_; IdType seek_offset_; std::shared_ptr chunk_table_; + LRUCache, std::shared_ptr, PairHash> + chunk_cache_; IdType vertex_chunk_num_, chunk_num_; std::string base_dir_; std::shared_ptr fs_; @@ -403,10 +417,12 @@ class AdjListOffsetArrowChunkReader { * Note that the adj list type must be AdjListType::ordered_by_source * or AdjListType::ordered_by_dest. * @param prefix The absolute prefix. + * @param cache_capacity The capacity of the LRU chunk cache. */ - AdjListOffsetArrowChunkReader(const std::shared_ptr& edge_info, - AdjListType adj_list_type, - const std::string& prefix); + AdjListOffsetArrowChunkReader( + const std::shared_ptr& edge_info, AdjListType adj_list_type, + const std::string& prefix, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); /** * @brief Sets chunk position indicator for reader by internal vertex id. @@ -441,10 +457,12 @@ class AdjListOffsetArrowChunkReader { * @param edge_info The edge info. * @param adj_list_type The adj list type for the edges. * @param prefix The absolute prefix of the graph. + * @param cache_capacity The capacity of the LRU chunk cache. */ static Result> Make( const std::shared_ptr& edge_info, AdjListType adj_list_type, - const std::string& prefix); + const std::string& prefix, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); /** * @brief Create an AdjListOffsetArrowChunkReader instance from graph info. @@ -454,11 +472,13 @@ class AdjListOffsetArrowChunkReader { * @param edge_type The edge type. * @param dst_type The destination vertex type. * @param adj_list_type The adj list type for the edges. + * @param cache_capacity The capacity of the LRU chunk cache. Default is 4. */ static Result> Make( const std::shared_ptr& graph_info, const std::string& src_type, const std::string& edge_type, const std::string& dst_type, - AdjListType adj_list_type); + AdjListType adj_list_type, + size_t cache_capacity = util::FilterOptions::kDefaultCacheCapacity); private: std::shared_ptr edge_info_; @@ -467,6 +487,7 @@ class AdjListOffsetArrowChunkReader { IdType chunk_index_; IdType seek_id_; std::shared_ptr chunk_table_; + LRUCache> chunk_cache_; IdType vertex_chunk_num_; IdType vertex_chunk_size_; std::string base_dir_; @@ -633,6 +654,8 @@ class AdjListPropertyArrowChunkReader { IdType seek_offset_; std::shared_ptr schema_; std::shared_ptr chunk_table_; + LRUCache, std::shared_ptr, PairHash> + chunk_cache_; util::FilterOptions filter_options_; IdType vertex_chunk_num_, chunk_num_; std::string base_dir_; diff --git a/cpp/src/graphar/lru_cache.h b/cpp/src/graphar/lru_cache.h new file mode 100644 index 000000000..f82f33a6e --- /dev/null +++ b/cpp/src/graphar/lru_cache.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace graphar { + +template > +class LRUCache { + public: + explicit LRUCache(size_t capacity) : capacity_(capacity) {} + + Value* Get(const Key& key) { + auto it = map_.find(key); + if (it == map_.end()) { + return nullptr; + } + items_.splice(items_.begin(), items_, it->second); + return &it->second->second; + } + + void Put(const Key& key, Value value) { + if (capacity_ == 0) { + return; + } + auto it = map_.find(key); + if (it != map_.end()) { + it->second->second = std::move(value); + items_.splice(items_.begin(), items_, it->second); + return; + } + + if (map_.size() >= capacity_) { + auto& back = items_.back(); + map_.erase(back.first); + items_.pop_back(); + } + items_.emplace_front(key, std::move(value)); + map_[key] = items_.begin(); + } + + void Clear() { + map_.clear(); + items_.clear(); + } + + size_t Size() const { return map_.size(); } + + size_t capacity() const { return capacity_; } + + void SetCapacity(size_t capacity) { + capacity_ = capacity; + while (map_.size() > capacity_) { + auto& back = items_.back(); + map_.erase(back.first); + items_.pop_back(); + } + } + + void Disable() { SetCapacity(0); } + + private: + size_t capacity_{0}; + std::list> items_; + std::unordered_map>::iterator, + Hash> + map_; +}; + +struct PairHash { + template + size_t operator()(const std::pair& p) const { + size_t h1 = std::hash{}(p.first); + size_t h2 = std::hash{}(p.second); + // inspired by boost::hash_combine + constexpr size_t kMul = static_cast(0x9e3779b97f4a7c15ULL); + h1 ^= h2 + kMul + (h1 << 6) + (h1 >> 2); + return h1; + } +}; + +} // namespace graphar diff --git a/cpp/src/graphar/reader_util.h b/cpp/src/graphar/reader_util.h index e80cbfa28..9da96051d 100644 --- a/cpp/src/graphar/reader_util.h +++ b/cpp/src/graphar/reader_util.h @@ -28,14 +28,19 @@ namespace graphar::util { struct FilterOptions { + static constexpr size_t kDefaultCacheCapacity = 4; + // The row filter to apply to the table. Filter filter = nullptr; // The columns to include in the table. Select all columns by default. ColumnNames columns = std::nullopt; + // The number of chunks to cache in the LRU cache. + size_t cache_capacity = kDefaultCacheCapacity; FilterOptions() {} - FilterOptions(Filter filter, ColumnNames columns) - : filter(filter), columns(columns) {} + FilterOptions(Filter filter, ColumnNames columns, + size_t cache_capacity = kDefaultCacheCapacity) + : filter(filter), columns(columns), cache_capacity(cache_capacity) {} }; Status CheckFilterOptions( diff --git a/cpp/test/test_lru_cache.cc b/cpp/test/test_lru_cache.cc new file mode 100644 index 000000000..46bba6adb --- /dev/null +++ b/cpp/test/test_lru_cache.cc @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "graphar/lru_cache.h" + +#include + +namespace graphar { + +TEST_CASE("LRUCache basic operations") { + LRUCache cache(3); + + SECTION("Empty cache returns nullptr") { + REQUIRE(cache.Get(1) == nullptr); + REQUIRE(cache.Size() == 0); + } + + SECTION("Put and Get") { + cache.Put(1, "one"); + auto* value = cache.Get(1); + REQUIRE(value != nullptr); + REQUIRE(*value == "one"); + REQUIRE(cache.Size() == 1); + } + + SECTION("Multiple Put and Get") { + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Put(3, "three"); + + auto* v1 = cache.Get(1); + auto* v2 = cache.Get(2); + auto* v3 = cache.Get(3); + + REQUIRE(v1 != nullptr); + REQUIRE(v2 != nullptr); + REQUIRE(v3 != nullptr); + REQUIRE(*v1 == "one"); + REQUIRE(*v2 == "two"); + REQUIRE(*v3 == "three"); + REQUIRE(cache.Size() == 3); + } + + SECTION("Update existing key") { + cache.Put(1, "one"); + cache.Put(1, "updated"); + + auto* value = cache.Get(1); + REQUIRE(value != nullptr); + REQUIRE(*value == "updated"); + REQUIRE(cache.Size() == 1); + } + + SECTION("Get updates recency") { + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Put(3, "three"); + + cache.Get(1); + cache.Put(4, "four"); + + REQUIRE(cache.Get(1) != nullptr); + REQUIRE(cache.Get(2) == nullptr); + REQUIRE(cache.Get(3) != nullptr); + REQUIRE(cache.Get(4) != nullptr); + REQUIRE(cache.Size() == 3); + } +} + +TEST_CASE("LRUCache eviction") { + SECTION("Evict when exceeding capacity") { + LRUCache cache(2); + + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Put(3, "three"); + + REQUIRE(cache.Get(1) == nullptr); + REQUIRE(cache.Get(2) != nullptr); + REQUIRE(cache.Get(3) != nullptr); + REQUIRE(cache.Size() == 2); + } + + SECTION("Evict least recently used") { + LRUCache cache(3); + + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Put(3, "three"); + + cache.Get(1); + cache.Get(2); + cache.Put(4, "four"); + + REQUIRE(cache.Get(1) != nullptr); + REQUIRE(cache.Get(2) != nullptr); + REQUIRE(cache.Get(3) == nullptr); + REQUIRE(cache.Get(4) != nullptr); + } +} + +TEST_CASE("LRUCache Clear") { + LRUCache cache(3); + + cache.Put(1, "one"); + cache.Put(2, "two"); + cache.Clear(); + + REQUIRE(cache.Get(1) == nullptr); + REQUIRE(cache.Get(2) == nullptr); + REQUIRE(cache.Size() == 0); +} + +TEST_CASE("LRUCache with string keys") { + LRUCache cache(2); + + cache.Put("one", 1); + cache.Put("two", 2); + + auto* v1 = cache.Get("one"); + auto* v2 = cache.Get("two"); + + REQUIRE(v1 != nullptr); + REQUIRE(v2 != nullptr); + REQUIRE(*v1 == 1); + REQUIRE(*v2 == 2); +} + +TEST_CASE("LRUCache with PairHash") { + LRUCache, std::string, PairHash> cache(2); + + cache.Put({1, 2}, "value1"); + cache.Put({3, 4}, "value2"); + + auto* v1 = cache.Get({1, 2}); + auto* v2 = cache.Get({3, 4}); + + REQUIRE(v1 != nullptr); + REQUIRE(v2 != nullptr); + REQUIRE(*v1 == "value1"); + REQUIRE(*v2 == "value2"); + + REQUIRE(cache.Get({5, 6}) == nullptr); +} + +TEST_CASE("LRUCache move semantics") { + LRUCache cache(2); + + std::string value = "test_value"; + cache.Put(1, std::move(value)); + + auto* cached = cache.Get(1); + REQUIRE(cached != nullptr); + REQUIRE(*cached == "test_value"); +} + +TEST_CASE("LRUCache zero capacity edge case") { + LRUCache cache(0); + + cache.Put(1, "one"); + + REQUIRE(cache.Size() == 0); + REQUIRE(cache.Get(1) == nullptr); +} + +} // namespace graphar