Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/concurrency/concurrency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class ConcurrencyManager {
~ConcurrencyManager();

auto create_index(const std::string &name, const IndexType &index_type, int dimension) -> int;
auto create_index(const std::string &name, const IndexType &index_type, int dimension,
const IndexParameters& params) -> int;
auto create_index(const std::string &name, int dimension) -> int;

auto drop_index(const std::string &name) -> bool;
Expand Down
13 changes: 13 additions & 0 deletions include/function/join_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ namespace sageFlow {
lastEmitted = -1;
}

auto getWindowSize() const -> int64_t {
std::lock_guard<std::mutex> lock(mutex_);
return windowSize;
}

auto getStepSize() const -> int64_t {
std::lock_guard<std::mutex> lock(mutex_);
return stepSize;
}

auto windowTimeLimit(int64_t timestamp) const -> int64_t {
std::lock_guard<std::mutex> lock(mutex_);
return timestamp - windowSize;
Expand Down Expand Up @@ -118,6 +128,9 @@ namespace sageFlow {
auto setOtherStream(std::shared_ptr<Stream> other_plan) -> void;

auto setWindow(int64_t time_window, int64_t stepsize) -> void;

auto getWindowSize() const -> int64_t;
auto getStepSize() const -> int64_t;

SlidingWindow windowL, windowR;
ThreadSafeSlidingWindow threadSafeWindowL, threadSafeWindowR;
Expand Down
20 changes: 20 additions & 0 deletions include/index/index.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <string>
#include <utility>
#include <vector>
#include <variant>
#include <optional>

#include "compute_engine/compute_engine.h"
#include "storage/storage_manager.h"
Expand All @@ -18,6 +20,24 @@ enum class IndexType { // NOLINT
Vectraflow
};

// Index-specific parameter structures
struct IVFParameters {
int nlist = 1000;
double rebuild_threshold = 1.5;
int nprobes = 10;
};

struct HNSWParameters {
int m = 20;
int ef_construction = 100;
int ef_search = 40;
};

struct NoParameters {};

// Variant to hold any index parameters
using IndexParameters = std::variant<NoParameters, IVFParameters, HNSWParameters>;

class Index {
public:
// data
Expand Down
1 change: 1 addition & 0 deletions include/operator/join_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace sageFlow {

void initializeIVFIndexes(int nlist, double rebuild_threshold, int nprobes); // 保留现有接口(暂未用到额外参数)
bool createIndexPair(IndexType type, const std::string& prefix);
bool createIndexPair(IndexType type, const std::string& prefix, const IndexParameters& params);

// 线程安全的窗口更新方法(容器改为 deque)
auto updateSideThreadSafe(
Expand Down
44 changes: 44 additions & 0 deletions src/concurrency/concurrency_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,50 @@ auto sageFlow::ConcurrencyManager::create_index(const std::string& name, const I
return index->index_id_;
}

auto sageFlow::ConcurrencyManager::create_index(const std::string& name, const IndexType& index_type, int dimension,
const IndexParameters& params) -> int {
std::shared_ptr<Index> index = nullptr;
switch (index_type) {
case IndexType::None:
return -1;
case IndexType::IVF:
if (auto* ivf_params = std::get_if<IVFParameters>(&params)) {
index = std::make_shared<Ivf>(ivf_params->nlist, ivf_params->rebuild_threshold, ivf_params->nprobes);
} else {
// Use default parameters if wrong type provided
index = std::make_shared<Ivf>();
}
break;
case IndexType::HNSW:
if (auto* hnsw_params = std::get_if<HNSWParameters>(&params)) {
index = std::make_shared<HNSW>(hnsw_params->m, hnsw_params->ef_construction, hnsw_params->ef_search);
} else {
// Use default parameters if wrong type provided
index = std::make_shared<HNSW>();
}
break;
case IndexType::Vectraflow:
index = std::make_shared<VectraFlow>();
break;
case IndexType::BruteForce:
default:
index = std::make_shared<Knn>();
break;
}
index->index_id_ = index_id_counter_++;
index->index_type_ = index_type;
index->dimension_ = dimension;

index->storage_manager_ = storage_;
storage_->engine_ = std::make_shared<ComputeEngine>();

const auto blank_controller = std::make_shared<BlankController>(index);

controller_map_[index->index_id_] = blank_controller;
index_map_[name] = IdWithType{.id_ = index->index_id_, .index_type_ = index_type};
return index->index_id_;
}

auto sageFlow::ConcurrencyManager::create_index(const std::string& name, int dimension) -> int {
return create_index(name, IndexType::BruteForce, dimension);
}
Expand Down
8 changes: 8 additions & 0 deletions src/function/join_function.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ auto sageFlow::JoinFunction::setWindow(int64_t windowsize, int64_t stepsize) ->
windowR.setWindow(windowsize, stepsize);
threadSafeWindowL.setWindow(windowsize, stepsize);
threadSafeWindowR.setWindow(windowsize, stepsize);
}

auto sageFlow::JoinFunction::getWindowSize() const -> int64_t {
return threadSafeWindowL.getWindowSize();
}

auto sageFlow::JoinFunction::getStepSize() const -> int64_t {
return threadSafeWindowL.getStepSize();
}
8 changes: 4 additions & 4 deletions src/index/ivf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ void Ivf::debugDumpStateUnlocked() {
size_t total_in_lists = 0;
for (auto &kv : inverted_lists_) total_in_lists += kv.second.size();
SAGEFLOW_LOG_WARN("INDEX", "DEBUG_DUMP size_={} total_in_lists={} deleted_uids={} nlists={} attempts={} success={} missing={} miss_in_storage={} miss_not_in_storage={} underflow={} ",
size_.load(), total_in_lists, deleted_uids_.size(), inverted_lists_.size(),
erase_attempts_.load(), erase_success_.load(), erase_missing_.load(),
erase_missing_in_storage_.load(), erase_missing_not_in_storage_.load(), erase_underflow_.load());
size_.load(), total_in_lists, deleted_uids_.size(), inverted_lists_.size(),
erase_attempts_.load(), erase_success_.load(), erase_missing_.load(),
erase_missing_in_storage_.load(), erase_missing_not_in_storage_.load(), erase_underflow_.load());
// 采样输出前几个非空列表的部分内容
int printed = 0;
for (auto &kv : inverted_lists_) {
if (kv.second.empty()) continue;
std::string sample;
size_t limit = std::min<size_t>(kv.second.size(), 5);
for (size_t i = 0; i < limit; ++i) { sample += std::to_string(kv.second[i]); sample.push_back(','); }
SAGEFLOW_LOG_DEBUG("INDEX", "list_id={} size={} sample=[{}]", kv.first, kv.second.size(), sample);
SAGEFLOW_LOG_DEBUG("INDEX", "list_id={} size={} sample=[{}]", kv.first, kv.second.size(), sample);
if (++printed >= 5) break;
}
}
Expand Down
39 changes: 33 additions & 6 deletions src/operator/join_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cassert>
#include <iostream>
#include <algorithm>
#include <cmath>

#include "utils/logger.h"

Expand All @@ -28,6 +29,13 @@ bool JoinOperator::createIndexPair(IndexType type, const std::string& prefix) {
return left_index_id_ != -1 && right_index_id_ != -1;
}

bool JoinOperator::createIndexPair(IndexType type, const std::string& prefix, const IndexParameters& params) {
if (!concurrency_manager_) return false;
left_index_id_ = concurrency_manager_->create_index(prefix + "_left", type, join_func_->getDim(), params);
right_index_id_ = concurrency_manager_->create_index(prefix + "_right", type, join_func_->getDim(), params);
return left_index_id_ != -1 && right_index_id_ != -1;
}

static inline std::string to_lower_copy(std::string v) {
std::transform(v.begin(), v.end(), v.begin(), [](unsigned char c){return char(std::tolower(c));});
return v;
Expand Down Expand Up @@ -66,31 +74,50 @@ JoinOperator::JoinOperator(std::unique_ptr<Function> &join_func,

if (algo == "ivf") {
index_kind_ = InternalIndexKind::IVF;
if (createIndexPair(IndexType::IVF, "join_ivf")) {
// Calculate IVF parameters based on window size
// nlist = 4 * sqrt(window_size/step_size), rebuild_threshold = 1.5, nprobes = 10
Copy link

Copilot AI Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states the formula uses window_size/step_size, but the actual implementation on line 82 uses vector_count which is calculated differently. The comment should accurately reflect that vector_count represents the actual number of vectors in the window, not just the raw division result.

Suggested change
// nlist = 4 * sqrt(window_size/step_size), rebuild_threshold = 1.5, nprobes = 10
// nlist = 4 * sqrt(vector_count), where vector_count = (window_size / step_size) if step_size > 0, else window_size

Copilot uses AI. Check for mistakes.
int64_t window_size = join_func_->getWindowSize();
int64_t step_size = join_func_->getStepSize();
// Calculate actual vector count in window
int64_t vector_count = (step_size > 0) ? (window_size / step_size) : window_size;
Copy link

Copilot AI Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fallback calculation vector_count = window_size when step_size <= 0 is incorrect. When step_size is 0 or negative, the window configuration is invalid, and this should either return an error or use a more appropriate default. Using window_size directly as vector count doesn't align with the formula's intent of calculating the number of vectors based on the step interval.

Copilot uses AI. Check for mistakes.
int nlist = static_cast<int>(4.0 * std::sqrt(static_cast<double>(vector_count)));
// Ensure nlist is at least 1
if (nlist < 1) nlist = 1;

IVFParameters ivf_params{
.nlist = nlist,
.rebuild_threshold = 1.5,
.nprobes = 10
};

if (createIndexPair(IndexType::IVF, "join_ivf", ivf_params)) {
use_index_ = true;
join_method_ = std::make_unique<IvfJoinMethod>(left_index_id_, right_index_id_,
join_similarity_threshold_, concurrency_manager_);
join_similarity_threshold_, concurrency_manager_);
} else {
index_kind_ = InternalIndexKind::NONE;
use_index_ = false;
join_method_ = std::make_unique<BruteForceJoinMethod>(-1, -1, join_similarity_threshold_, concurrency_manager_);
join_method_ = std::make_unique<BruteForceJoinMethod>(
-1, -1, join_similarity_threshold_, concurrency_manager_);
}
} else if (algo == "bruteforce" || algo == "bf" ) {
index_kind_ = InternalIndexKind::BRUTEFORCE;
if (createIndexPair(IndexType::BruteForce, "join_bf")) {
use_index_ = true;
join_method_ = std::make_unique<BruteForceJoinMethod>(left_index_id_, right_index_id_,
join_similarity_threshold_, concurrency_manager_);
join_similarity_threshold_, concurrency_manager_);
} else {
index_kind_ = InternalIndexKind::NONE;
use_index_ = false;
join_method_ = std::make_unique<BruteForceJoinMethod>(-1, -1, join_similarity_threshold_, concurrency_manager_);
join_method_ = std::make_unique<BruteForceJoinMethod>(
-1, -1, join_similarity_threshold_, concurrency_manager_);
}
} else {
index_kind_ = InternalIndexKind::NONE;
use_index_ = false;
is_eager_ = false;
join_method_ = std::make_unique<BruteForceJoinMethod>(-1, -1, join_similarity_threshold_, concurrency_manager_);
join_method_ = std::make_unique<BruteForceJoinMethod>(
-1, -1, join_similarity_threshold_, concurrency_manager_);
}
}

Expand Down
12 changes: 6 additions & 6 deletions src/utils/monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ void PerformanceMonitor::StartProfiling() {
if (!profiling_) {
ProfilerStart(profile_output_file_.c_str());
profiling_ = true;
SAGEFLOW_LOG_INFO("MONITOR", "profiling_started file={} ", profile_output_file_);
SAGEFLOW_LOG_INFO("MONITOR", "profiling_started file={} ", profile_output_file_);
} else {
SAGEFLOW_LOG_WARN("MONITOR", "profiling_already_running file={} ", profile_output_file_);
SAGEFLOW_LOG_WARN("MONITOR", "profiling_already_running file={} ", profile_output_file_);
}
#else
std::cerr << "Profiling not available: gperftools not found." << '\n';
SAGEFLOW_LOG_ERROR("MONITOR", "Profiling not available: gperftools not found.");
#endif
}

Expand All @@ -36,12 +36,12 @@ void PerformanceMonitor::StopProfiling() {
if (profiling_) {
ProfilerStop();
profiling_ = false;
SAGEFLOW_LOG_INFO("MONITOR", "profiling_stopped file={} ", profile_output_file_);
SAGEFLOW_LOG_INFO("MONITOR", "profiling_stopped file={} ", profile_output_file_);
} else {
SAGEFLOW_LOG_WARN("MONITOR", "profiling_not_running file={} ", profile_output_file_);
SAGEFLOW_LOG_WARN("MONITOR", "profiling_not_running file={} ", profile_output_file_);
}
#else
std::cerr << "Profiling not available: gperftools not found." << '\n';
SAGEFLOW_LOG_ERROR("MONITOR", "Profiling not available: gperftools not found.");
#endif
}

Expand Down