Skip to content

Commit

Permalink
[opt](inverted index) inverted Index File Cache Queue Optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed Dec 28, 2024
1 parent 6472174 commit 148112b
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 40 deletions.
2 changes: 1 addition & 1 deletion be/src/clucene
41 changes: 30 additions & 11 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,18 @@ namespace segment_v2 {
class CSIndexInput : public lucene::store::BufferedIndexInput {
private:
CL_NS(store)::IndexInput* base;
std::string file_name;
int64_t fileOffset;
int64_t _length;
const io::IOContext* _io_ctx = nullptr;
bool _is_index_file = false; // Indicates if the file is a TII file

protected:
void readInternal(uint8_t* /*b*/, const int32_t /*len*/) override;
void seekInternal(const int64_t /*pos*/) override {}

public:
CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset, const int64_t length,
CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size = CL_NS(store)::BufferedIndexInput::BUFFER_SIZE);
CSIndexInput(const CSIndexInput& clone);
~CSIndexInput() override;
Expand All @@ -78,13 +79,14 @@ class CSIndexInput : public lucene::store::BufferedIndexInput {
const char* getObjectName() const override { return getClassName(); }
static const char* getClassName() { return "CSIndexInput"; }
void setIoContext(const void* io_ctx) override;
void setIndexFile(bool isIndexFile) override;
};

CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const int64_t fileOffset,
const int64_t length, const int32_t read_buffer_size)
CSIndexInput::CSIndexInput(CL_NS(store)::IndexInput* base, const std::string& file_name,
const int64_t fileOffset, const int64_t length,
const int32_t read_buffer_size)
: BufferedIndexInput(read_buffer_size) {
this->base = base;
this->file_name = file_name;
this->fileOffset = fileOffset;
this->_length = length;
}
Expand All @@ -101,7 +103,27 @@ void CSIndexInput::readInternal(uint8_t* b, const int32_t len) {
base->setIoContext(_io_ctx);
}

base->setIndexFile(_is_index_file);
DBUG_EXECUTE_IF("CSIndexInput.readInternal", {
for (const auto& entry : InvertedIndexDescriptor::index_file_info_map) {
if (file_name.find(entry.first) != std::string::npos) {
if (!static_cast<const io::IOContext*>(base->getIoContext())->is_index_data) {
_CLTHROWA(CL_ERR_IO,
"The 'is_index_data' flag should be true for inverted index meta "
"files.");
}
}
}
for (const auto& entry : InvertedIndexDescriptor::normal_file_info_map) {
if (file_name.find(entry.first) != std::string::npos) {
if (static_cast<const io::IOContext*>(base->getIoContext())->is_index_data) {
_CLTHROWA(CL_ERR_IO,
"The 'is_index_data' flag should be false for non-meta inverted "
"index files.");
}
}
}
});

base->seek(fileOffset + start);
bool read_from_buffer = true;
base->readBytes(b, len, read_from_buffer);
Expand All @@ -119,6 +141,7 @@ lucene::store::IndexInput* CSIndexInput::clone() const {

CSIndexInput::CSIndexInput(const CSIndexInput& clone) : BufferedIndexInput(clone) {
this->base = clone.base;
this->file_name = clone.file_name;
this->fileOffset = clone.fileOffset;
this->_length = clone._length;
}
Expand All @@ -129,10 +152,6 @@ void CSIndexInput::setIoContext(const void* io_ctx) {
_io_ctx = static_cast<const io::IOContext*>(io_ctx);
}

void CSIndexInput::setIndexFile(bool isIndexFile) {
_is_index_file = isIndexFile;
}

DorisCompoundReader::DorisCompoundReader(CL_NS(store)::IndexInput* stream, int32_t read_buffer_size)
: _ram_dir(new lucene::store::RAMDirectory()),
_stream(stream),
Expand Down Expand Up @@ -299,7 +318,7 @@ bool DorisCompoundReader::openInput(const char* name, lucene::store::IndexInput*
bufferSize = _read_buffer_size;
}

ret = _CLNEW CSIndexInput(_stream, entry->offset, entry->length, bufferSize);
ret = _CLNEW CSIndexInput(_stream, entry->file_name, entry->offset, entry->length, bufferSize);
return true;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const std::unordered_map<std::string, int32_t> InvertedIndexDescriptor::index_fi
{"null_bitmap", 1}, {"segments.gen", 2}, {"segments_", 3}, {"fnm", 4},
{"tii", 5}, {"bkd_meta", 6}, {"bkd_index", 7}};

const std::unordered_map<std::string, int32_t> InvertedIndexDescriptor::normal_file_info_map = {
{"tis", 1}, {"frq", 2}, {"prx", 3}};

// {tmp_dir}/{rowset_id}_{seg_id}_{index_id}@{suffix}
std::string InvertedIndexDescriptor::get_temporary_index_path(std::string_view tmp_dir_path,
std::string_view rowset_id,
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ namespace segment_v2 {
class InvertedIndexDescriptor {
public:
static const std::unordered_map<std::string, int32_t> index_file_info_map;
static const std::unordered_map<std::string, int32_t> normal_file_info_map;

static constexpr std::string_view segment_suffix = ".dat";
static constexpr std::string_view index_suffix = ".idx";
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Status InvertedIndexFileReader::init(int32_t read_buffer_size, const io::IOConte
if (_storage_format == InvertedIndexStorageFormatPB::V2) {
if (_stream) {
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);
}
}
}
Expand Down Expand Up @@ -83,6 +84,7 @@ Status InvertedIndexFileReader::_init_from(int32_t read_buffer_size, const io::I
}
_stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
_stream->setIoContext(io_ctx);
_stream->setIndexFile(true);

// 3. read file
int32_t version = _stream->readInt(); // Read version number
Expand Down
16 changes: 0 additions & 16 deletions be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,6 @@ lucene::store::IndexInput* DorisFSDirectory::FSIndexInput::clone() const {
}
void DorisFSDirectory::FSIndexInput::close() {
BufferedIndexInput::close();
/*if (_handle != nullptr) {
std::mutex* lock = _handle->_shared_lock;
bool ref = false;
{
std::lock_guard<std::mutex> wlock(*lock);
//determine if we are about to delete the handle...
ref = (_LUCENE_ATOMIC_INT_GET(_handle->__cl_refcount) > 1);
//decdelete (deletes if refcount is down to 0
_CLDECDELETE(_handle);
}
//if _handle is not ref by other FSIndexInput, try to release mutex lock, or it will be leaked.
if (!ref) {
delete lock;
}
}*/
}

void DorisFSDirectory::FSIndexInput::setIoContext(const void* io_ctx) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ Status InvertedIndexReader::create_index_searcher(lucene::store::Directory* dir,
*searcher = searcher_result;

// When the meta information has been read, the ioContext needs to be reset to prevent it from being used by other queries.
static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput()->setIoContext(nullptr);
auto stream = static_cast<DorisCompoundReader*>(dir)->getDorisIndexInput();
stream->setIoContext(nullptr);
stream->setIndexFile(false);

// NOTE: before mem_tracker hook becomes active, we caculate reader memory size by hand.
mem_tracker->consume(index_searcher_builder->get_reader_size());
Expand Down
30 changes: 19 additions & 11 deletions be/test/olap/rowset/segment_v2/inverted_index_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,23 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(local_fs_index_path).ok());
EXPECT_TRUE(io::global_local_filesystem()->create_directory(local_fs_index_path).ok());
mock_dir->init(_fs, local_fs_index_path.c_str());
std::vector<std::string> files = {"0.segments", "0.fnm", "0.tii", "nullbitmap", "write.lock"};
std::vector<std::string> files = {"segments_0", "segments.gen", "0.fnm",
"0.tii", "null_bitmap", "write.lock"};
for (auto& file : files) {
auto out_file_1 =
std::unique_ptr<lucene::store::IndexOutput>(mock_dir->createOutput(file.c_str()));
out_file_1->writeString("test1");
out_file_1->close();
}

EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.segments")))
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments_0")))
.WillOnce(testing::Return(1000));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("segments.gen")))
.WillOnce(testing::Return(1200));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.fnm"))).WillOnce(testing::Return(2000));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("0.tii"))).WillOnce(testing::Return(1500));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("nullbitmap"))).WillOnce(testing::Return(500));
EXPECT_CALL(*mock_dir, fileLength(testing::StrEq("null_bitmap")))
.WillOnce(testing::Return(500));

InvertedIndexFileWriter writer(_fs, _index_path_prefix, _rowset_id, _seg_id,
InvertedIndexStorageFormatPB::V2);
Expand All @@ -362,24 +366,28 @@ TEST_F(InvertedIndexFileWriterTest, PrepareSortedFilesTest) {
std::vector<FileInfo> sorted_files =
writer.prepare_sorted_files(writer._indices_dirs[std::make_pair(1, "suffix1")].get());

// 1. 0.segments (priority 1, size 1000)
// 2. 0.fnm (priority 2, size 2000)
// 3. 0.tii (priority 3, size 1500)
// 4. nullbitmap (priority 4, size 500)
// 1. null_bitmap (priority 1, size 500)
// 2. segments.gen (priority 2, size 1200)
// 3. segments_0 (priority 3, size 1000)
// 4. 0.fnm (priority 4, size 2000)
// 5. 0.tii (priority 5, size 1500)

std::vector<std::string> expected_order = {"0.segments", "0.fnm", "0.tii", "nullbitmap"};
std::vector<std::string> expected_order = {"null_bitmap", "segments.gen", "segments_0", "0.fnm",
"0.tii"};
ASSERT_EQ(sorted_files.size(), expected_order.size());

for (size_t i = 0; i < expected_order.size(); ++i) {
EXPECT_EQ(sorted_files[i].filename, expected_order[i]);
if (sorted_files[i].filename == "0.segments") {
if (sorted_files[i].filename == "null_bitmap") {
EXPECT_EQ(sorted_files[i].filesize, 500);
} else if (sorted_files[i].filename == "segments.gen") {
EXPECT_EQ(sorted_files[i].filesize, 1200);
} else if (sorted_files[i].filename == "segments_0") {
EXPECT_EQ(sorted_files[i].filesize, 1000);
} else if (sorted_files[i].filename == "0.fnm") {
EXPECT_EQ(sorted_files[i].filesize, 2000);
} else if (sorted_files[i].filename == "0.tii") {
EXPECT_EQ(sorted_files[i].filesize, 1500);
} else if (sorted_files[i].filename == "nullbitmap") {
EXPECT_EQ(sorted_files[i].filesize, 500);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1790

-- !sql --
2000

-- !sql --
4

-- !sql --
58

-- !sql --
0

-- !sql --
16

-- !sql --
12

-- !sql --
16

-- !sql --
12

-- !sql --
10

-- !sql --
88

-- !sql --
648

-- !sql --
386

-- !sql --
78

-- !sql --
746

-- !sql --
476

-- !sql --
2000

Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_index_file_cache_fault_injection", "nonConcurrent") {
def indexTbName = "test_index_file_cache_fault_injection"

sql "DROP TABLE IF EXISTS ${indexTbName}"
sql """
CREATE TABLE ${indexTbName} (
`@timestamp` int(11) NULL COMMENT "",
`clientip` varchar(20) NULL COMMENT "",
`request` text NULL COMMENT "",
`status` int(11) NULL COMMENT "",
`size` int(11) NULL COMMENT "",
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT '',
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT "OLAP"
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true"
);
"""

def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false,
expected_succ_rows = -1, load_to_single_tablet = 'true' ->

// load the json data
streamLoad {
table "${table_name}"

// set http request header params
set 'label', label + "_" + UUID.randomUUID().toString()
set 'read_json_by_line', read_flag
set 'format', format_flag
file file_name // import json file
time 10000 // limit inflight 10s
if (expected_succ_rows >= 0) {
set 'max_filter_ratio', '1'
}

// if declared a check callback, the default check condition will ignore.
// So you must check all condition
check { result, exception, startTime, endTime ->
if (ignore_failure && expected_succ_rows < 0) { return }
if (exception != null) {
throw exception
}
log.info("Stream load result: ${result}".toString())
def json = parseJson(result)
}
}
}

try {
load_httplogs_data.call(indexTbName, 'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json')
load_httplogs_data.call(indexTbName, 'test_index_file_cache_fault_injection', 'true', 'json', 'documents-1000.json')

sql "sync"
sql """ set enable_common_expr_pushdown = true; """

try {
GetDebugPoint().enableDebugPointForAllBEs("CSIndexInput.readInternal")
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '0'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '1'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '2'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '3'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '4'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '5'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '6'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '7'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '8'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix '9'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'a'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'b'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'c'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'd'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'e'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'f'; """
qt_sql """ select count() from ${indexTbName} where request match_phrase_prefix 'g'; """
} finally {
GetDebugPoint().disableDebugPointForAllBEs("CSIndexInput.readInternal")
}
} finally {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ suite("test_inverted_index_cache", "nonConcurrent") {
load_httplogs_data.call(indexTbName, 'test_index_inlist_fault_injection', 'true', 'json', 'documents-1000.json')
sql "sync"

sql """ set enable_common_expr_pushdown = true; """
qt_sql """ select count() from ${indexTbName} where (request match 'images'); """

// query cache hit
Expand Down

0 comments on commit 148112b

Please sign in to comment.