Skip to content

Commit

Permalink
making progress
Browse files Browse the repository at this point in the history
  • Loading branch information
blaise-muhirwa committed Dec 5, 2023
1 parent a65a665 commit ef7d6aa
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 161 deletions.
242 changes: 149 additions & 93 deletions flatnav/Index.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@

namespace flatnav {

/**
* The following struct is a configuration of parameters needed for the index.
* It also exposes a builder pattern for constructing the index.
*/

struct IndexParameterConfig {

IndexParameterConfig() = default;
size_t _M;
// size of one data point (does not support variable-size data, strings)
size_t _data_size_bytes;
// Node consists of: ([data] [M links] [data label]). This layout was chosen
// after benchmarking - it's slightly more cache-efficient than others.
size_t _node_size_bytes;
size_t _max_node_count; // Determines size of internal pre-allocated memory
size_t _cur_num_nodes;
std::mutex _cur_num_nodes_global_lock;
std::condition_variable _cur_num_nodes_global_cv;
std::atomic<bool> _current_node_inserted = false;

// Remembers which nodes we've visited, to avoid re-computing distances.
// Might be a caching problem in beamSearch - needs to be profiled.
ShardedExplicitSet *_sharded_visited_nodes;

uint32_t _num_threads;
};

// dist_t: A distance function implementing DistanceInterface.
// label_t: A fixed-width data type for the label (meta-data) of each point.
template <typename dist_t, typename label_t> class Index {
Expand Down Expand Up @@ -69,7 +96,8 @@ template <typename dist_t, typename label_t> class Index {

template <typename Archive> void serialize(Archive &archive) {
archive(_M, _data_size_bytes, _node_size_bytes, _max_node_count,
_cur_num_nodes, *_distance, _visited_nodes);
_cur_num_nodes, *_distance, _visited_nodes,
*_sharded_visited_nodes);

// Serialize the allocated memory for the index & query.
archive(
Expand All @@ -90,15 +118,14 @@ template <typename dist_t, typename label_t> class Index {
int max_edges_per_node)
: _M(max_edges_per_node), _max_node_count(dataset_size),
_cur_num_nodes(0), _distance(dist), _visited_nodes(dataset_size + 1),
_num_threads(0),
_num_threads(std::thread::hardware_concurrency()),
_sharded_visited_nodes(new ShardedVisitedSet(
/* total_size = */ dataset_size + 1,
/* num_shards = */ std::thread::hardware_concurrency())) {

_data_size_bytes = _distance->dataSize();
_node_size_bytes =
_data_size_bytes + (sizeof(node_id_t) * _M) + sizeof(label_t);

size_t index_memory_size = _node_size_bytes * _max_node_count;

_index_memory = new char[index_memory_size];
Expand All @@ -108,8 +135,8 @@ template <typename dist_t, typename label_t> class Index {
// this class.
Index() = default;

~Index() {
delete[] _index_memory;
~Index() {
delete[] _index_memory;
delete _sharded_visited_nodes;
}

Expand Down Expand Up @@ -137,9 +164,9 @@ template <typename dist_t, typename label_t> class Index {

// search graph for neighbors of new node, connect to them
if (new_node_id > 0) {
PriorityQueue neighbors =
beamSearch(/* query = */ data, /* entry_node = */ entry_node,
/* buffer_size = */ ef_construction);
PriorityQueue neighbors = concurrentBeamSearch(
/* query = */ data, /* entry_node = */ entry_node,
/* buffer_size = */ ef_construction);
selectNeighbors(/* neighbors = */ neighbors);
connectNeighbors(neighbors, new_node_id);
}
Expand All @@ -162,78 +189,95 @@ template <typename dist_t, typename label_t> class Index {
std::vector<std::thread> thread_pool(thread_count);
uint32_t batch_size = labels.size() / thread_count;

std::cout << "Starting parallel add"
<< "\n"
<< std::flush;

for (uint32_t thread_id = 0; thread_id < thread_count; thread_id++) {
void *current_batch =
(float *)data + (thread_id * batch_size * _data_size_bytes);
thread_pool[thread_id] =
std::thread(&addParallelBatch, current_batch, batch_size,
std::ref(labels), ef_construction, num_initializations);
uint32_t label_start = thread_id * batch_size;
thread_pool[thread_id] = std::thread(
&Index::addParallelBatch, this, current_batch, batch_size,
label_start, std::ref(labels), ef_construction, num_initializations);
}
// Do the actual work

for (uint32_t thread_id = 0; thread_id < thread_count; thread_id++) {
thread_pool[thread_id].join();
}
}

void addParallelBatch(void *batch, uint32_t batch_size,
void addParallelBatch(void *batch, uint32_t batch_size, uint32_t label_start,
const std::vector<label_t> &labels, int ef_construction,
int num_initializations = 100) {

if (num_initializations <= 0) {
throw std::invalid_argument(
"num_initializations must be greater than 0.");
}

for (uint32_t vec_index = 0; vec_index < batch_size; vec_index++) {
void *vector = (float *)batch + (vec_index * _data_size_bytes);
label_t label = labels[vec_index];

// Lock from now on until we've inserted the new node into the index.
// This prevents multiple threads from trying to insert the same node.
// Use the condition variable to stop other threads from busy-waiting,
// which would be a waste of CPU cycles.
uint32_t vec_dimension = _distance->dimension();
{
std::unique_lock<std::mutex> lock(_cur_num_nodes_global_lock);
_cur_num_nodes_global_cv.wait(lock,
[this] { return !_current_node_inserted; });

if (_cur_num_nodes >= _max_node_count) {
throw std::runtime_error("Maximum number of nodes reached. Consider "
"increasing the `max_node_count` parameter to "
"create a larger index.");
}
for (uint32_t vec_index = label_start; vec_index < batch_size;
vec_index++) {
void *vector = (float *)batch + (vec_index * vec_dimension);
label_t label = labels[vec_index];

// Lock from now on until we've inserted the new node into the index.
// This prevents multiple threads from trying to insert the same node.
// Use the condition variable to stop other threads from busy-waiting,
// which would be a waste of CPU cycles.

// _cur_num_nodes_global_cv.wait(lock,
// [this] { return
// !_current_node_inserted; });

if (_cur_num_nodes >= _max_node_count) {
throw std::runtime_error(
"Maximum number of nodes reached. Consider "
"increasing the `max_node_count` parameter to "
"create a larger index.");
}

uint32_t step_size = _cur_num_nodes / num_initializations;
if (step_size <= 0) {
step_size = 1;
}
float min_dist = std::numeric_limits<float>::max();
node_id_t entry_node = 0;
for (node_id_t node = 0; node < _cur_num_nodes; node += step_size) {
float dist =
_distance->distance(/* x = */ vector, /* y = */ getNodeData(node),
/* asymmetric = */ true);
if (dist < min_dist) {
min_dist = dist;
entry_node = node;
uint32_t step_size = _cur_num_nodes / num_initializations;
if (step_size <= 0) {
step_size = 1;
}
}
node_id_t new_node_id;
allocateNode(vector, label, new_node_id);

// Mark the node as inserted, notify other threads, and reset the flag.
_current_node_inserted = true;
_cur_num_nodes_global_cv.notify_all();
_current_node_inserted = false;

lock.unlock();

// search graph for neighbors of new no de, connect to them
if (new_node_id > 0) {
PriorityQueue neighbors =
beamSearch(/* query = */ vector, /* entry_node = */ entry_node,
/* buffer_size = */ ef_construction);
selectNeighbors(/* neighbors = */ neighbors);
connectNeighbors(neighbors, new_node_id);
float min_dist = std::numeric_limits<float>::max();
node_id_t entry_node = 0;
for (node_id_t node = 0; node < _cur_num_nodes; node += step_size) {
float dist = _distance->distance(/* x = */ vector, /* y = */
getNodeData(node),
/* asymmetric = */ true);
if (dist < min_dist) {
min_dist = dist;
entry_node = node;
}
}
node_id_t new_node_id;
allocateNode(vector, label, new_node_id);

// Mark the node as inserted, notify other threads, and reset the flag.
// _current_node_inserted = true;
// _cur_num_nodes_global_cv.notify_all();
// _current_node_inserted = false;

// lock.unlock();

// search graph for neighbors of new no de, connect to them
if (new_node_id > 0) {
PriorityQueue neighbors = concurrentBeamSearch(
/* query = */ vector, /* entry_node = */ entry_node,
/* buffer_size = */ ef_construction);
selectNeighbors(/* neighbors = */ neighbors);
connectNeighbors(neighbors, new_node_id);
}
// _current_node_inserted = true;
// _cur_num_nodes_global_cv.notify_all();
// _current_node_inserted = false;

// lock.unlock();
}
}
}
Expand All @@ -246,24 +290,21 @@ template <typename dist_t, typename label_t> class Index {
* @param num_initializations The number of random initializations to use.
*/
std::vector<dist_label_t> search(const void *query, const int K,
int ef_search,
int num_initializations = 100) {

int ef_search,
int num_initializations = 100) {
node_id_t entry_node = initializeSearch(query, num_initializations);
PriorityQueue neighbors = beamSearch(/* query = */ query,
/* entry_node = */ entry_node,
/* buffer_size = */ ef_search);
std::vector<dist_label_t> results;

PriorityQueue neighbors =
concurrentBeamSearch(/* query = */ query,
/* entry_node = */ entry_node,
/* buffer_size = */ ef_search);
while (neighbors.size() > K) {
neighbors.pop();
}

auto size = neighbors.size();
results.reserve(size);
std::vector<dist_label_t> results;
results.reserve(K);
while (neighbors.size() > 0) {
results.push_back(std::make_pair(neighbors.top().first,
*getNodeLabel(neighbors.top().second)));
results.emplace_back(neighbors.top().first,
*getNodeLabel(neighbors.top().second));
neighbors.pop();
}
std::sort(results.begin(), results.end(),
Expand Down Expand Up @@ -318,11 +359,14 @@ template <typename dist_t, typename label_t> class Index {
std::shared_ptr<DistanceInterface<dist_t>> dist =
std::make_shared<dist_t>();

ShardedVisitedSet *sharded_visited_nodes = new ShardedVisitedSet();

// 1. Deserialize metadata
archive(index->_M, index->_data_size_bytes, index->_node_size_bytes,
index->_max_node_count, index->_cur_num_nodes, *dist,
index->_visited_nodes);
index->_visited_nodes, *sharded_visited_nodes);
index->_distance = dist;
index->_sharded_visited_nodes = sharded_visited_nodes;

// 3. Allocate memory using deserialized metadata
index->_index_memory =
Expand Down Expand Up @@ -374,6 +418,9 @@ template <typename dist_t, typename label_t> class Index {
private:
char *getNodeData(const node_id_t &n) const {
char *location = _index_memory + (n * _node_size_bytes);
if (location == nullptr) {
throw std::runtime_error("getNodeData: pointer to node data is null.");
}
return location;
}

Expand Down Expand Up @@ -402,8 +449,9 @@ template <typename dist_t, typename label_t> class Index {
}
new_node_id = _cur_num_nodes;

_distance->transformData(/* destination = */ getNodeData(new_node_id),
/* src = */ data);
_distance->transformData(
/* destination = */ (void *)getNodeData(new_node_id),
/* src = */ data);
*(getNodeLabel(_cur_num_nodes)) = label;

node_id_t *links = getNodeLinks(_cur_num_nodes);
Expand Down Expand Up @@ -474,19 +522,21 @@ template <typename dist_t, typename label_t> class Index {
candidates.pop();
node_id_t *d_node_links = getNodeLinks(d_node.second);
for (int i = 0; i < _M; i++) {
if (!_visited_nodes[d_node_links[i]]) {
node_id_t neighbor_node_id = d_node_links[i];
bool neighbor_is_visited = _visited_nodes[neighbor_node_id];
if (!neighbor_is_visited) {
// If we haven't visited the node yet.
_visited_nodes.insert(d_node_links[i]);
_visited_nodes.insert(neighbor_node_id);

dist = _distance->distance(/* x = */ query,
/* y = */ getNodeData(d_node_links[i]),
/* y = */ getNodeData(neighbor_node_id),
/* asymmetric = */ true);

// Include the node in the buffer if buffer isn't full or
// if the node is closer than a node already in the buffer.
if (neighbors.size() < buffer_size || dist < max_dist) {
candidates.emplace(-dist, d_node_links[i]);
neighbors.emplace(dist, d_node_links[i]);
candidates.emplace(-dist, neighbor_node_id);
neighbors.emplace(dist, neighbor_node_id);
if (neighbors.size() > buffer_size) {
neighbors.pop();
}
Expand All @@ -500,13 +550,17 @@ template <typename dist_t, typename label_t> class Index {
return neighbors;
}

void concurrentBeamSearch(const void *query, const node_id_t entry_node,
const int buffer_size) {
PriorityQueue concurrentBeamSearch(const void *query,
const node_id_t entry_node,
const int buffer_size) {
PriorityQueue neighbors;
PriorityQueue candidates;

// Maybe this is supposed to be clearAll()?
_sharded_visited_nodes->clear(entry_node);
_sharded_visited_nodes->clearAll();

if (!_sharded_visited_nodes->allShardsHaveSameMark()) {
throw std::runtime_error("All shards must have the same mark.");
}

float dist =
_distance->distance(/* x = */ query, /* y = */ getNodeData(entry_node),
Expand All @@ -517,24 +571,27 @@ template <typename dist_t, typename label_t> class Index {
neighbors.emplace(dist, entry_node);
_sharded_visited_nodes->insert(entry_node);

while(!candidates.empty()) {
dist_node_it d_node = candidates.top();
while (!candidates.empty()) {
dist_node_t d_node = candidates.top();
if ((-d_node.first) > max_dist) {
break;
}
candidates.pop();
node_id_t *d_node_links = getNodeLinks(d_node.second);
for (int i = 0; i < _M; i++) {
if (!_sharded_visited_nodes[d_node_links[i]]) {
_sharded_visited_nodes->insert(d_node_links[i]);
node_id_t neighbor_node_id = d_node_links[i];
bool neighbor_is_visited = _sharded_visited_nodes->operator[](
/* node_id = */ neighbor_node_id);
if (!neighbor_is_visited) {
_sharded_visited_nodes->insert(/* node_id = */ neighbor_node_id);

dist = _distance->distance(/* x = */ query,
/* y = */ getNodeData(d_node_links[i]),
/* y = */ getNodeData(neighbor_node_id),
/* asymmetric = */ true);

if (neighbors.size() < buffer_size || dist < max_dist) {
candidates.emplace(-dist, d_node_links[i]);
neighbors.emplace(dist, d_node_links[i]);
candidates.emplace(-dist, neighbor_node_id);
neighbors.emplace(dist, neighbor_node_id);
if (neighbors.size() > buffer_size) {
neighbors.pop();
}
Expand Down Expand Up @@ -693,7 +750,6 @@ template <typename dist_t, typename label_t> class Index {
node_id_t entry_node = 0;

for (node_id_t node = 0; node < _cur_num_nodes; node += step_size) {

float dist =
_distance->distance(/* x = */ query, /* y = */ getNodeData(node),
/* asymmetric = */ true);
Expand Down
Loading

0 comments on commit ef7d6aa

Please sign in to comment.