Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 44 additions & 43 deletions config/perf_join_datasource_modes.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
# Test Mode 1: Generate-Save-Load (generate data, save to file, then load from file)
name = "perf_join_gen_save_load_random"
mode = "generate_save_load"
methods = ["bruteforce_eager", "ivf_eager"]
sizes = [2000]
#methods = ["bruteforce_eager", "ivf_eager"]
methods = ["ivf_eager"]
sizes = [1000]
records_count = 10
vector_dim = 64
parallelism = [1,2,4]
parallelism = [1,2,4,8,16,32]
window_time_ms = [10000]
window_trigger_ms = 50
time_interval = 10
Expand All @@ -19,43 +20,43 @@ type = "random"
[performance_test.storage]
format = "fvecs" # Options: "fvecs", "json"
file_path = "test/data/generated_test_data.fvecs"

[[performance_test]]
# Test Mode 2: Direct Load (load data directly from existing file)
name = "perf_join_direct_load_sift"
mode = "direct_load"
methods = ["bruteforce_eager", "ivf_eager"]
sizes = [1000]
records_count = 10
vector_dim = 128
parallelism = [1,2]
window_time_ms = [10000]
window_trigger_ms = 50
time_interval = 10
similarity_threshold = 0.8

[performance_test.data_source]
type = "dataset"
file_path = "data/siftsmall/siftsmall_query.fvecs"
expected_dim = 128
loop = true

[[performance_test]]
# Test Mode 3: Generate and Use Directly (no file I/O)
name = "perf_join_direct_use_random"
mode = "generate_direct_use"
methods = ["bruteforce_eager", "ivf_eager"]
sizes = [2000]
records_count = 10
vector_dim = 64
parallelism = [1,2,4]
window_time_ms = [10000]
window_trigger_ms = 50
time_interval = 10
similarity_threshold = 0.8
seed = 42

[performance_test.data_source]
type = "random"

log.level = "debug"
#
#[[performance_test]]
## Test Mode 2: Direct Load (load data directly from existing file)
#name = "perf_join_direct_load_sift"
#mode = "direct_load"
#methods = ["bruteforce_eager", "ivf_eager"]
#sizes = [2000]
#records_count = 10
#vector_dim = 128
#parallelism = [1,2,4]
#window_time_ms = [10000]
#window_trigger_ms = 50
#time_interval = 10
#similarity_threshold = 0.8
#
#[performance_test.data_source]
#type = "dataset"
#file_path = "data/siftsmall/siftsmall_query.fvecs"
#expected_dim = 128
#loop = true
#
#[[performance_test]]
## Test Mode 3: Generate and Use Directly (no file I/O)
#name = "perf_join_direct_use_random"
#mode = "generate_direct_use"
#methods = ["bruteforce_eager", "ivf_eager"]
#sizes = [2000]
#records_count = 10
#vector_dim = 64
#parallelism = [1,2,4]
#window_time_ms = [10000]
#window_trigger_ms = 50
#time_interval = 10
#similarity_threshold = 0.8
#seed = 42
#
#[performance_test.data_source]
#type = "random"
#
#log.level = "debug"
2 changes: 2 additions & 0 deletions include/concurrency/concurrency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class ConcurrencyManager {
~ConcurrencyManager();

auto create_index(const std::string &name, const IndexType &index_type, int dimension) -> int;
auto create_index(const std::string &name, const IndexType &index_type, int dimension,
const IndexParameters& params) -> int;
auto create_index(const std::string &name, int dimension) -> int;

auto drop_index(const std::string &name) -> bool;
Expand Down
13 changes: 13 additions & 0 deletions include/function/join_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ namespace sageFlow {
lastEmitted = -1;
}

auto getWindowSize() const -> int64_t {
std::lock_guard<std::mutex> lock(mutex_);
return windowSize;
}

auto getStepSize() const -> int64_t {
std::lock_guard<std::mutex> lock(mutex_);
return stepSize;
}

auto windowTimeLimit(int64_t timestamp) const -> int64_t {
std::lock_guard<std::mutex> lock(mutex_);
return timestamp - windowSize;
Expand Down Expand Up @@ -118,6 +128,9 @@ namespace sageFlow {
auto setOtherStream(std::shared_ptr<Stream> other_plan) -> void;

auto setWindow(int64_t time_window, int64_t stepsize) -> void;

auto getWindowSize() const -> int64_t;
auto getStepSize() const -> int64_t;

SlidingWindow windowL, windowR;
ThreadSafeSlidingWindow threadSafeWindowL, threadSafeWindowR;
Expand Down
20 changes: 20 additions & 0 deletions include/index/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <string>
#include <utility>
#include <vector>
#include <variant>
#include <optional>

#include "compute_engine/compute_engine.h"
#include "storage/storage_manager.h"
Expand All @@ -18,6 +20,24 @@ enum class IndexType { // NOLINT
Vectraflow
};

// Index-specific parameter structures
struct IVFParameters {
int nlist = 1000;
double rebuild_threshold = 1.5;
int nprobes = 10;
};

struct HNSWParameters {
int m = 20;
int ef_construction = 100;
int ef_search = 40;
};

struct NoParameters {};

// Variant to hold any index parameters
using IndexParameters = std::variant<NoParameters, IVFParameters, HNSWParameters>;

class Index {
public:
// data
Expand Down
4 changes: 4 additions & 0 deletions include/operator/join_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ struct JoinMetrics {
std::atomic<uint64_t> total_records_left{0};
std::atomic<uint64_t> total_records_right{0};
std::atomic<uint64_t> total_emits{0};
std::atomic<uint64_t> window_records_left_completed{0};
std::atomic<uint64_t> window_records_right_completed{0};

// 新增:apply 处理耗时与端到端延迟(单位:纳秒,均为累加值;另附计数)
std::atomic<uint64_t> apply_processing_ns{0};
Expand All @@ -33,6 +35,7 @@ struct JoinMetrics {
void reset() {
window_insert_ns = index_insert_ns = expire_ns = candidate_fetch_ns = similarity_ns = join_function_ns = emit_ns = lock_wait_ns = 0;
total_records_left = total_records_right = total_emits = 0;
window_records_left_completed = window_records_right_completed = 0;
apply_processing_ns = apply_processing_count = e2e_latency_ns = e2e_latency_count = 0;
}
void dump_tsv(const std::string& path) {
Expand All @@ -43,6 +46,7 @@ struct JoinMetrics {
#define EMIT(m) ofs<<#m"\t"<<m.load()<<"\n";
EMIT(window_insert_ns) EMIT(index_insert_ns) EMIT(expire_ns) EMIT(candidate_fetch_ns) EMIT(similarity_ns)
EMIT(join_function_ns) EMIT(emit_ns) EMIT(lock_wait_ns) EMIT(total_records_left) EMIT(total_records_right) EMIT(total_emits)
EMIT(window_records_left_completed) EMIT(window_records_right_completed)
EMIT(apply_processing_ns) EMIT(apply_processing_count) EMIT(e2e_latency_ns) EMIT(e2e_latency_count)
#undef EMIT
}
Expand Down
19 changes: 19 additions & 0 deletions include/operator/join_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace sageFlow {

void initializeIVFIndexes(int nlist, double rebuild_threshold, int nprobes); // 保留现有接口(暂未用到额外参数)
bool createIndexPair(IndexType type, const std::string& prefix);
bool createIndexPair(IndexType type, const std::string& prefix, const IndexParameters& params);

// 线程安全的窗口更新方法(容器改为 deque)
auto updateSideThreadSafe(
Expand All @@ -55,6 +56,10 @@ namespace sageFlow {
std::vector<std::unique_ptr<VectorRecord>> getCandidates(
const std::unique_ptr<VectorRecord>& data_ptr, int slot);

// 获取候选项的辅助方法(假定已持有两个窗口的锁)
std::vector<std::unique_ptr<VectorRecord>> getCandidatesWithLocksHeld(
const std::unique_ptr<VectorRecord>& data_ptr, int slot);

// 验证候选项是否在指定窗口中的辅助方法(容器改为 deque)
bool validateCandidateInWindow(
const std::unique_ptr<VectorRecord>& candidate,
Expand All @@ -67,12 +72,26 @@ namespace sageFlow {
int slot,
std::vector<std::pair<int, std::unique_ptr<VectorRecord>>>& local_return_pool);

// 执行join操作的辅助方法(假定已持有对面窗口的锁)
void executeJoinForCandidatesWithLockHeld(
const std::vector<std::unique_ptr<VectorRecord>>& candidates,
const std::unique_ptr<VectorRecord>& data_ptr,
int slot,
const std::deque<std::unique_ptr<VectorRecord>>& opposite_window,
std::vector<std::pair<int, std::unique_ptr<VectorRecord>>>& local_return_pool);

// Lazy模式的join执行辅助方法
void executeLazyJoin(
const std::vector<std::unique_ptr<VectorRecord>>& candidates,
int slot,
std::vector<std::pair<int, std::unique_ptr<VectorRecord>>>& local_return_pool);

// Lazy模式的join执行辅助方法(假定已持有两个窗口的锁)
void executeLazyJoinWithLocksHeld(
const std::vector<std::unique_ptr<VectorRecord>>& candidates,
int slot,
std::vector<std::pair<int, std::unique_ptr<VectorRecord>>>& local_return_pool);

std::unique_ptr<JoinFunction> join_func_;
std::shared_ptr<Operator> mother_;
std::unique_ptr<BaseMethod> join_method_;
Expand Down
44 changes: 44 additions & 0 deletions src/concurrency/concurrency_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,50 @@ auto sageFlow::ConcurrencyManager::create_index(const std::string& name, const I
return index->index_id_;
}

auto sageFlow::ConcurrencyManager::create_index(const std::string& name, const IndexType& index_type, int dimension,
const IndexParameters& params) -> int {
std::shared_ptr<Index> index = nullptr;
switch (index_type) {
case IndexType::None:
return -1;
case IndexType::IVF:
if (auto* ivf_params = std::get_if<IVFParameters>(&params)) {
index = std::make_shared<Ivf>(ivf_params->nlist, ivf_params->rebuild_threshold, ivf_params->nprobes);
} else {
// Use default parameters if wrong type provided
index = std::make_shared<Ivf>();
}
break;
case IndexType::HNSW:
if (auto* hnsw_params = std::get_if<HNSWParameters>(&params)) {
index = std::make_shared<HNSW>(hnsw_params->m, hnsw_params->ef_construction, hnsw_params->ef_search);
} else {
// Use default parameters if wrong type provided
index = std::make_shared<HNSW>();
}
break;
case IndexType::Vectraflow:
index = std::make_shared<VectraFlow>();
break;
case IndexType::BruteForce:
default:
index = std::make_shared<Knn>();
break;
}
index->index_id_ = index_id_counter_++;
index->index_type_ = index_type;
index->dimension_ = dimension;

index->storage_manager_ = storage_;
storage_->engine_ = std::make_shared<ComputeEngine>();

const auto blank_controller = std::make_shared<BlankController>(index);

controller_map_[index->index_id_] = blank_controller;
index_map_[name] = IdWithType{.id_ = index->index_id_, .index_type_ = index_type};
return index->index_id_;
}

auto sageFlow::ConcurrencyManager::create_index(const std::string& name, int dimension) -> int {
return create_index(name, IndexType::BruteForce, dimension);
}
Expand Down
8 changes: 8 additions & 0 deletions src/function/join_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ auto sageFlow::JoinFunction::setWindow(int64_t windowsize, int64_t stepsize) ->
windowR.setWindow(windowsize, stepsize);
threadSafeWindowL.setWindow(windowsize, stepsize);
threadSafeWindowR.setWindow(windowsize, stepsize);
}

auto sageFlow::JoinFunction::getWindowSize() const -> int64_t {
return threadSafeWindowL.getWindowSize();
}

auto sageFlow::JoinFunction::getStepSize() const -> int64_t {
return threadSafeWindowL.getStepSize();
}
37 changes: 21 additions & 16 deletions src/index/ivf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ void Ivf::debugDumpStateUnlocked() {
size_t total_in_lists = 0;
for (auto &kv : inverted_lists_) total_in_lists += kv.second.size();
SAGEFLOW_LOG_WARN("INDEX", "DEBUG_DUMP size_={} total_in_lists={} deleted_uids={} nlists={} attempts={} success={} missing={} miss_in_storage={} miss_not_in_storage={} underflow={} ",
size_.load(), total_in_lists, deleted_uids_.size(), inverted_lists_.size(),
erase_attempts_.load(), erase_success_.load(), erase_missing_.load(),
erase_missing_in_storage_.load(), erase_missing_not_in_storage_.load(), erase_underflow_.load());
size_.load(), total_in_lists, deleted_uids_.size(), inverted_lists_.size(),
erase_attempts_.load(), erase_success_.load(), erase_missing_.load(),
erase_missing_in_storage_.load(), erase_missing_not_in_storage_.load(), erase_underflow_.load());
// 采样输出前几个非空列表的部分内容
int printed = 0;
for (auto &kv : inverted_lists_) {
if (kv.second.empty()) continue;
std::string sample;
size_t limit = std::min<size_t>(kv.second.size(), 5);
for (size_t i = 0; i < limit; ++i) { sample += std::to_string(kv.second[i]); sample.push_back(','); }
SAGEFLOW_LOG_DEBUG("INDEX", "list_id={} size={} sample=[{}]", kv.first, kv.second.size(), sample);
SAGEFLOW_LOG_DEBUG("INDEX", "list_id={} size={} sample=[{}]", kv.first, kv.second.size(), sample);
if (++printed >= 5) break;
}
}
Expand Down Expand Up @@ -491,18 +491,23 @@ auto Ivf::query_for_join(const VectorRecord &record, double join_similarity_thre
while (!probe_indices.empty()) {
auto cluster_idx = probe_indices.top().second;
probe_indices.pop();
std::shared_lock<std::shared_mutex> list_lock(list_mutexes_[cluster_idx]);
if (inverted_lists_.find(cluster_idx) != inverted_lists_.end()) {
const auto& candidate_ids = inverted_lists_.at(cluster_idx);
for (const auto& id_val : candidate_ids) {
if (local_deleted_uids.find(id_val) != local_deleted_uids.end()) {
continue;
}
if (auto candidate = storage_manager_->getVectorByUid(id_val)) {
double similarity = storage_manager_->engine_->Similarity(record.data_, candidate->data_);
if (similarity - join_similarity_threshold > epsilon) {
results.emplace_back(id_val);
}
// Copy the candidate list under lock to avoid race condition
std::vector<uint64_t> candidate_ids_copy;
{
std::shared_lock<std::shared_mutex> list_lock(list_mutexes_[cluster_idx]);
if (inverted_lists_.find(cluster_idx) != inverted_lists_.end()) {
candidate_ids_copy = inverted_lists_.at(cluster_idx);
}
}
// Process candidates without holding the lock
for (const auto& id_val : candidate_ids_copy) {
if (local_deleted_uids.find(id_val) != local_deleted_uids.end()) {
continue;
}
if (auto candidate = storage_manager_->getVectorByUid(id_val)) {
double similarity = storage_manager_->engine_->Similarity(record.data_, candidate->data_);
if (similarity - join_similarity_threshold > epsilon) {
results.emplace_back(id_val);
}
}
}
Expand Down
Loading