diff --git a/CMakeLists.txt b/CMakeLists.txt index e281a85..c1b9d87 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -211,7 +211,7 @@ endif() set(HEADERS ${PROJECT_SOURCE_DIR}/flatnav/distances/InnerProductDistance.h ${PROJECT_SOURCE_DIR}/flatnav/distances/SquaredL2Distance.h - ${PROJECT_SOURCE_DIR}/flatnav/util/ExplicitSet.h + ${PROJECT_SOURCE_DIR}/flatnav/util/VisitedNodesHandler.h ${PROJECT_SOURCE_DIR}/flatnav/util/GorderPriorityQueue.h ${PROJECT_SOURCE_DIR}/flatnav/util/Reordering.h ${PROJECT_SOURCE_DIR}/flatnav/util/SIMDDistanceSpecializations.h diff --git a/flatnav/Index.h b/flatnav/Index.h index cde510a..4e71904 100644 --- a/flatnav/Index.h +++ b/flatnav/Index.h @@ -10,10 +10,10 @@ #include #include #include -#include #include #include #include +#include #include #include #include @@ -43,12 +43,10 @@ struct IndexParameterConfig { size_t _max_node_count; // Determines size of internal pre-allocated memory size_t _cur_num_nodes; std::mutex _cur_num_nodes_global_lock; - std::condition_variable _cur_num_nodes_global_cv; std::atomic _current_node_inserted = false; // Remembers which nodes we've visited, to avoid re-computing distances. - // Might be a caching problem in beamSearch - needs to be profiled. - ShardedExplicitSet *_sharded_visited_nodes; + // ThreadSafeVisitedNodesHandler *_visited_nodes; uint32_t _num_threads; }; @@ -61,9 +59,6 @@ template class Index { typedef uint32_t node_id_t; typedef std::pair dist_node_t; - typedef ExplicitSet VisitedSet; - typedef ShardedExplicitSet ShardedVisitedSet; - // NOTE: by default this is a max-heap. We could make this a min-heap // by using std::greater, but we want to use the queue as both a max-heap and // min-heap depending on the context. @@ -80,24 +75,21 @@ template class Index { // after benchmarking - it's slightly more cache-efficient than others. size_t _node_size_bytes; size_t _max_node_count; // Determines size of internal pre-allocated memory - std::atomic _cur_num_nodes; + size_t _cur_num_nodes; std::shared_ptr> _distance; - std::mutex _cur_num_nodes_global_lock; - std::condition_variable _cur_num_nodes_global_cv; - std::atomic _current_node_inserted = false; - - // Remembers which nodes we've visited, to avoid re-computing distances. - // Might be a caching problem in beamSearch - needs to be profiled. - VisitedSet _visited_nodes; - ShardedVisitedSet *_sharded_visited_nodes; + std::mutex _index_data_guard; uint32_t _num_threads; + // Remembers which nodes we've visited, to avoid re-computing distances. + ThreadSafeVisitedNodesHandler *_visited_nodes_handlers; + std::vector _node_links_mutexes; + friend class cereal::access; template void serialize(Archive &archive) { - archive(_M, _data_size_bytes, _node_size_bytes, _max_node_count, *_distance, - _visited_nodes, *_sharded_visited_nodes); + archive(_M, _data_size_bytes, _node_size_bytes, _max_node_count, + _cur_num_nodes, *_distance); // Serialize the allocated memory for the index & query. archive( @@ -117,11 +109,12 @@ template class Index { Index(std::shared_ptr> dist, int dataset_size, int max_edges_per_node) : _M(max_edges_per_node), _max_node_count(dataset_size), - _cur_num_nodes(0), _distance(dist), _visited_nodes(dataset_size + 1), + _cur_num_nodes(0), _distance(dist), _num_threads(std::thread::hardware_concurrency()), - _sharded_visited_nodes(new ShardedVisitedSet( - /* total_size = */ dataset_size + 1, - /* num_shards = */ std::thread::hardware_concurrency())) { + _visited_nodes_handlers(new ThreadSafeVisitedNodesHandler( + /* initial_pool_size = */ 1, + /* num_elements = */ dataset_size)), + _node_links_mutexes(dataset_size) { _data_size_bytes = _distance->dataSize(); _node_size_bytes = @@ -137,7 +130,7 @@ template class Index { ~Index() { delete[] _index_memory; - delete _sharded_visited_nodes; + delete _visited_nodes_handlers; } /** @@ -154,153 +147,59 @@ template class Index { * @param num_initializations The number of random initializations to use. * This determines the number of nodes to search before adding the new node. */ - void add(void *data, label_t &label, int ef_construction, - int num_initializations = 100) { - node_id_t new_node_id; - node_id_t entry_node = initializeSearch(data, num_initializations); - - // make space for the new node - allocateNode(data, label, new_node_id); - - // search graph for neighbors of new node, connect to them - if (new_node_id > 0) { - PriorityQueue neighbors = concurrentBeamSearch( - /* query = */ data, /* entry_node = */ entry_node, - /* buffer_size = */ ef_construction); - selectNeighbors(/* neighbors = */ neighbors); - connectNeighbors(neighbors, new_node_id); - } - } - - void setNumThreads(uint32_t num_threads) { - if (!num_threads) { - _num_threads = std::thread::hardware_concurrency(); - return; - } - _num_threads = num_threads; - } - void addParallel(void *data, std::vector &labels, int ef_construction, int num_initializations = 100) { if (num_initializations <= 0) { throw std::invalid_argument( "num_initializations must be greater than 0."); } - uint32_t thread_count = - _num_threads <= 0 ? std::thread::hardware_concurrency() : _num_threads; uint32_t total_num_nodes = labels.size(); uint32_t data_dimension = _distance->dimension(); + if (_num_threads == 1) { + for (uint32_t row_id = 0; row_id < total_num_nodes; row_id++) { + void *vector = (float *)data + (row_id * data_dimension); + label_t label = labels[row_id]; + concurrentAdd(vector, label, ef_construction, num_initializations); + } + return; + } + parallelFor(/* start = */ 0, /* end = */ total_num_nodes, - /* num_threads = */ thread_count, /* fn = */ - [&](uint32_t row, uint32_t thread_id) { - void *vector = (float *)data + (row * data_dimension); - label_t label = labels[row]; + /* num_threads = */ _num_threads, /* fn = */ + [&](uint32_t row_index) { + void *vector = (float *)data + (row_index * data_dimension); + label_t label = labels[row_index]; concurrentAdd(vector, label, ef_construction, num_initializations); }); } void concurrentAdd(void *data, label_t &label, int ef_construction, - int num_initializations = 100) { - // Lock the global counter to prevent multiple threads from - // trying to insert the same node. - // std::unique_lock lock(_cur_num_nodes_global_lock); + int num_initializations) { if (_cur_num_nodes >= _max_node_count) { throw std::runtime_error("Maximum number of nodes reached. Consider " "increasing the `max_node_count` parameter to " "create a larger index."); } + _index_data_guard.lock(); auto entry_node = initializeSearch(data, num_initializations); node_id_t new_node_id; allocateNode(data, label, new_node_id); + _index_data_guard.unlock(); - // if (new_node_id == 0) { - // return; - // } - - // PriorityQueue neighbors = beamSearch( - // /* query = */ data, /* entry_node = */ entry_node, - // /* buffer_size = */ ef_construction); - // selectNeighbors(/* neighbors = */ neighbors); - // connectNeighbors(neighbors, new_node_id); - } - - void addParallelBatch(void *batch, uint32_t batch_size, uint32_t label_start, - const std::vector &labels, int ef_construction, - int num_initializations = 100) { - - if (num_initializations <= 0) { - throw std::invalid_argument( - "num_initializations must be greater than 0."); + if (new_node_id == 0) { + return; } - uint32_t vec_dimension = _distance->dimension(); - { - std::unique_lock lock(_cur_num_nodes_global_lock); - - for (uint32_t vec_index = label_start; vec_index < batch_size; - vec_index++) { - void *vector = (float *)batch + (vec_index * vec_dimension); - label_t label = labels[vec_index]; - - // Lock from now on until we've inserted the new node into the index. - // This prevents multiple threads from trying to insert the same node. - // Use the condition variable to stop other threads from busy-waiting, - // which would be a waste of CPU cycles. - - // _cur_num_nodes_global_cv.wait(lock, - // [this] { return - // !_current_node_inserted; }); - - if (_cur_num_nodes >= _max_node_count) { - throw std::runtime_error( - "Maximum number of nodes reached. Consider " - "increasing the `max_node_count` parameter to " - "create a larger index."); - } - uint32_t step_size = _cur_num_nodes / num_initializations; - if (step_size <= 0) { - step_size = 1; - } - float min_dist = std::numeric_limits::max(); - node_id_t entry_node = 0; - for (node_id_t node = 0; node < _cur_num_nodes; node += step_size) { - float dist = _distance->distance(/* x = */ vector, /* y = */ - getNodeData(node), - /* asymmetric = */ true); - if (dist < min_dist) { - min_dist = dist; - entry_node = node; - } - } - node_id_t new_node_id; - allocateNode(vector, label, new_node_id); - - // Mark the node as inserted, notify other threads, and reset the flag. - // _current_node_inserted = true; - // _cur_num_nodes_global_cv.notify_all(); - // _current_node_inserted = false; - - // lock.unlock(); - - // search graph for neighbors of new no de, connect to them - if (new_node_id > 0) { - PriorityQueue neighbors = concurrentBeamSearch( - /* query = */ vector, /* entry_node = */ entry_node, - /* buffer_size = */ ef_construction); - selectNeighbors(/* neighbors = */ neighbors); - connectNeighbors(neighbors, new_node_id); - } - // _current_node_inserted = true; - // _cur_num_nodes_global_cv.notify_all(); - // _current_node_inserted = false; + auto neighbors = beamSearch( + /* query = */ data, /* entry_node = */ entry_node, + /* buffer_size = */ ef_construction); - // lock.unlock(); - } - } + selectNeighbors(/* neighbors = */ neighbors); + connectNeighbors(neighbors, new_node_id); } /*** @@ -314,10 +213,9 @@ template class Index { int ef_search, int num_initializations = 100) { node_id_t entry_node = initializeSearch(query, num_initializations); - PriorityQueue neighbors = - concurrentBeamSearch(/* query = */ query, - /* entry_node = */ entry_node, - /* buffer_size = */ ef_search); + PriorityQueue neighbors = beamSearch(/* query = */ query, + /* entry_node = */ entry_node, + /* buffer_size = */ ef_search); while (neighbors.size() > K) { neighbors.pop(); } @@ -380,21 +278,22 @@ template class Index { std::shared_ptr> dist = std::make_shared(); - ShardedVisitedSet *sharded_visited_nodes = new ShardedVisitedSet(); - // 1. Deserialize metadata archive(index->_M, index->_data_size_bytes, index->_node_size_bytes, - index->_max_node_count, *dist, index->_visited_nodes, - *sharded_visited_nodes); + index->_max_node_count, index->_cur_num_nodes, *dist); + index->_visited_nodes_handlers = new ThreadSafeVisitedNodesHandler( + /* initial_pool_size = */ 1, + /* num_elements = */ index->_max_node_count); index->_distance = dist; - index->_sharded_visited_nodes = sharded_visited_nodes; - index->_cur_num_nodes = 0; + index->_num_threads = std::thread::hardware_concurrency(); + index->_node_links_mutexes = + std::vector(index->_max_node_count); - // 3. Allocate memory using deserialized metadata + // 2. Allocate memory using deserialized metadata index->_index_memory = new char[index->_node_size_bytes * index->_max_node_count]; - // 4. Deserialize content into allocated memory + // 3. Deserialize content into allocated memory archive( cereal::binary_data(index->_index_memory, index->_node_size_bytes * index->_max_node_count)); @@ -413,6 +312,16 @@ template class Index { archive(*this); } + inline void setNumThreads(uint32_t num_threads) { + if (!num_threads) { + _num_threads = std::thread::hardware_concurrency(); + return; + } + _num_threads = num_threads; + } + + inline uint32_t getNumThreads() const { return _num_threads; } + inline size_t maxEdgesPerNode() const { return _M; } inline size_t dataSizeBytes() const { return _data_size_bytes; } @@ -432,7 +341,6 @@ template class Index { std::cout << "node_size_bytes: " << _node_size_bytes << std::endl; std::cout << "max_node_count: " << _max_node_count << std::endl; std::cout << "cur_num_nodes: " << _cur_num_nodes << std::endl; - std::cout << "visited_nodes size: " << _visited_nodes.size() << std::endl; _distance->printParams(); } @@ -458,30 +366,26 @@ template class Index { } /** - * @brief Allocates a new node in the index. + * @brief Store the new node in the global data structure. In a + * multi-threaded setting, the index data guard should be held by the caller + * with an exclusive lock. + * * @param data The vector to add. * @param label The label (meta-data) of the vector. * @param new_node_id The id of the new node. */ void allocateNode(void *data, label_t &label, node_id_t &new_node_id) { - { - std::cout << "allocateNode: " << new_node_id << std::endl; - std::unique_lock lock(_cur_num_nodes_global_lock); - new_node_id = _cur_num_nodes.fetch_add(1); - } - _distance->transformData( - /* destination = */ (void *)getNodeData(new_node_id), - /* src = */ data); - std::cout << "Setting label for node " << new_node_id << std::endl; + new_node_id = _cur_num_nodes; + _distance->transformData( + /* destination = */ (void *)getNodeData(new_node_id), + /* src = */ data); *(getNodeLabel(new_node_id)) = label; - + node_id_t *links = getNodeLinks(new_node_id); - std::cout << "Inserting links now" << std::endl; - for (uint32_t i = 0; i < _M; i++) { - links[i] = new_node_id; - } - std::cout << "Finished inserting links" << std::endl; + // Initialize all edges to self + std::fill_n(links, _M, new_node_id); + _cur_num_nodes++; } inline void swapNodes(node_id_t a, node_id_t b, void *temp_data, @@ -501,8 +405,6 @@ template class Index { std::memcpy(getNodeData(a), temp_data, _data_size_bytes); std::memcpy(getNodeLinks(a), temp_links, _M * sizeof(node_id_t)); std::memcpy(getNodeLabel(a), temp_label, sizeof(label_t)); - - return; } /** @@ -518,113 +420,81 @@ template class Index { */ PriorityQueue beamSearch(const void *query, const node_id_t entry_node, const int buffer_size) { + PriorityQueue neighbors; + PriorityQueue candidates; - // The query pointer should contain transformed data. - // returns an iterable list of node_id_t's, sorted by distance (ascending) - PriorityQueue neighbors; // W in the HNSW paper - PriorityQueue candidates; // C in the HNSW paper + auto *visited_nodes_handler = + _visited_nodes_handlers->pollAvailableHandler(); + visited_nodes_handler->clear(); - _visited_nodes.clear(); float dist = _distance->distance(/* x = */ query, /* y = */ getNodeData(entry_node), /* asymmetric = */ true); float max_dist = dist; - candidates.emplace(-dist, entry_node); neighbors.emplace(dist, entry_node); - _visited_nodes.insert(entry_node); + visited_nodes_handler->insert(entry_node); while (!candidates.empty()) { - // get nearest element from candidates dist_node_t d_node = candidates.top(); if ((-d_node.first) > max_dist) { break; } candidates.pop(); - node_id_t *d_node_links = getNodeLinks(d_node.second); - for (int i = 0; i < _M; i++) { - node_id_t neighbor_node_id = d_node_links[i]; - bool neighbor_is_visited = _visited_nodes[neighbor_node_id]; - if (!neighbor_is_visited) { - // If we haven't visited the node yet. - _visited_nodes.insert(neighbor_node_id); - - dist = _distance->distance(/* x = */ query, - /* y = */ getNodeData(neighbor_node_id), - /* asymmetric = */ true); - - // Include the node in the buffer if buffer isn't full or - // if the node is closer than a node already in the buffer. - if (neighbors.size() < buffer_size || dist < max_dist) { - candidates.emplace(-dist, neighbor_node_id); - neighbors.emplace(dist, neighbor_node_id); - if (neighbors.size() > buffer_size) { - neighbors.pop(); - } - if (!neighbors.empty()) { - max_dist = neighbors.top().first; - } - } - } - } + + processCandidateNode( + /* query = */ query, /* node = */ d_node.second, + /* max_dist = */ max_dist, /* buffer_size = */ buffer_size, + /* visited_nodes = */ visited_nodes_handler, + /* neighbors = */ neighbors, /* candidates = */ candidates); } + + _visited_nodes_handlers->pushHandler( + /* handler = */ visited_nodes_handler); + return neighbors; } - PriorityQueue concurrentBeamSearch(const void *query, - const node_id_t entry_node, - const int buffer_size) { - PriorityQueue neighbors; - PriorityQueue candidates; + void processCandidateNode(const void *query, node_id_t &node, float &max_dist, + const int buffer_size, + VisitedNodesHandler *visited_nodes, + PriorityQueue &neighbors, + PriorityQueue &candidates) { + // Lock all operations on this specific node + std::unique_lock lock(_node_links_mutexes[node]); + float dist = 0.f; - _sharded_visited_nodes->clearAll(); + node_id_t *neighbor_node_links = getNodeLinks(node); + for (uint32_t i = 0; i < _M; i++) { + node_id_t neighbor_node_id = neighbor_node_links[i]; + bool neighbor_is_visited = + visited_nodes->operator[](/* num = */ neighbor_node_id); - if (!_sharded_visited_nodes->allShardsHaveSameMark()) { - throw std::runtime_error("All shards must have the same mark."); - } + if (neighbor_is_visited) { + continue; + } - float dist = - _distance->distance(/* x = */ query, /* y = */ getNodeData(entry_node), - /* asymmetric = */ true); + visited_nodes->insert(/* num = */ neighbor_node_id); + dist = _distance->distance(/* x = */ query, + /* y = */ getNodeData(neighbor_node_id), + /* asymmetric = */ true); - float max_dist = dist; - candidates.emplace(-dist, entry_node); - neighbors.emplace(dist, entry_node); - _sharded_visited_nodes->insert(entry_node); + if (neighbors.size() < buffer_size || dist < max_dist) { + candidates.emplace(-dist, neighbor_node_id); + neighbors.emplace(dist, neighbor_node_id); - while (!candidates.empty()) { - dist_node_t d_node = candidates.top(); - if ((-d_node.first) > max_dist) { - break; - } - candidates.pop(); - node_id_t *d_node_links = getNodeLinks(d_node.second); - for (int i = 0; i < _M; i++) { - node_id_t neighbor_node_id = d_node_links[i]; - bool neighbor_is_visited = _sharded_visited_nodes->operator[]( - /* node_id = */ neighbor_node_id); - if (!neighbor_is_visited) { - _sharded_visited_nodes->insert(/* node_id = */ neighbor_node_id); - - dist = _distance->distance(/* x = */ query, - /* y = */ getNodeData(neighbor_node_id), - /* asymmetric = */ true); - - if (neighbors.size() < buffer_size || dist < max_dist) { - candidates.emplace(-dist, neighbor_node_id); - neighbors.emplace(dist, neighbor_node_id); - if (neighbors.size() > buffer_size) { - neighbors.pop(); - } - if (!neighbors.empty()) { - max_dist = neighbors.top().first; - } - } + if (neighbors.size() > buffer_size) { + neighbors.pop(); + } + if (!neighbors.empty()) { + max_dist = neighbors.top().first; } } } - return neighbors; + + // Release the lock(unnecessary since we are exiting the scope) + lock.unlock(); } /** @@ -683,6 +553,10 @@ template class Index { void connectNeighbors(PriorityQueue &neighbors, node_id_t new_node_id) { // connects neighbors according to the HSNW heuristic + + // Lock all operations on this node + std::unique_lock lock(_node_links_mutexes[new_node_id]); + node_id_t *new_node_links = getNodeLinks(new_node_id); int i = 0; // iterates through links for "new_node_id" @@ -691,6 +565,9 @@ template class Index { // add link to the current new node new_node_links[i] = neighbor_node_id; // now do the back-connections (a little tricky) + + std::unique_lock neighbor_lock( + _node_links_mutexes[neighbor_node_id]); node_id_t *neighbor_node_links = getNodeLinks(neighbor_node_id); bool is_inserted = false; for (size_t j = 0; j < _M; j++) { @@ -737,6 +614,10 @@ template class Index { j++; } } + + // Unlock the current node we are iterating on + neighbor_lock.unlock(); + // loop increments: i++; if (i >= _M) { @@ -744,6 +625,10 @@ template class Index { } neighbors.pop(); } + + // Release the lock. I don't think this is necessary since are actually + // exiting the function scope, but just in case + lock.unlock(); } /** @@ -762,7 +647,9 @@ template class Index { throw std::invalid_argument( "num_initializations must be greater than 0."); } - int step_size = _cur_num_nodes ? _cur_num_nodes / num_initializations : 1; + + int step_size = _cur_num_nodes / num_initializations; + step_size = step_size ? step_size : 1; float min_dist = std::numeric_limits::max(); node_id_t entry_node = 0; @@ -771,7 +658,6 @@ template class Index { float dist = _distance->distance(/* x = */ query, /* y = */ getNodeData(node), /* asymmetric = */ true); - if (dist < min_dist) { min_dist = dist; entry_node = node; @@ -794,37 +680,42 @@ template class Index { node_id_t *temp_links = new node_id_t[_M]; label_t *temp_label = new label_t; + auto *visited_nodes = _visited_nodes_handlers->pollAvailableHandler(); + // In this context, is_visited stores which nodes have been relocated // (it would be equivalent to name this variable "is_relocated"). - _visited_nodes.clear(); + visited_nodes->clear(); for (node_id_t n = 0; n < _cur_num_nodes; n++) { - if (!_visited_nodes[n]) { + if (visited_nodes->operator[](/* num = */ n)) { + continue; + } - node_id_t src = n; - node_id_t dest = P[src]; + node_id_t src = n; + node_id_t dest = P[src]; - // swap node at src with node at dest - swapNodes(src, dest, temp_data, temp_links, temp_label); + // swap node at src with node at dest + swapNodes(src, dest, temp_data, temp_links, temp_label); - // mark src as having been relocated - _visited_nodes.insert(src); + // mark src as having been relocated + visited_nodes->insert(src); - // recursively relocate the node from "dest" - while (!_visited_nodes[dest]) { - // mark node as having been relocated - _visited_nodes.insert(dest); - // the value of src remains the same. However, dest needs - // to change because the node located at src was previously - // located at dest, and must be relocated to P[dest]. - dest = P[dest]; + // recursively relocate the node from "dest" + while (!visited_nodes->operator[](/* num = */ dest)) { + // mark node as having been relocated + visited_nodes->insert(dest); + // the value of src remains the same. However, dest needs + // to change because the node located at src was previously + // located at dest, and must be relocated to P[dest]. + dest = P[dest]; - // swap node at src with node at dest - swapNodes(src, dest, temp_data, temp_links, temp_label); - } + // swap node at src with node at dest + swapNodes(src, dest, temp_data, temp_links, temp_label); } } + _visited_nodes_handlers->pushHandler(/* handler = */ visited_nodes); + delete[] temp_data; delete[] temp_links; delete temp_label; diff --git a/flatnav/util/ExplicitSet.h b/flatnav/util/ExplicitSet.h deleted file mode 100644 index 273dc6e..0000000 --- a/flatnav/util/ExplicitSet.h +++ /dev/null @@ -1,205 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace flatnav { - -class ExplicitSet { -private: - uint32_t _mark; - uint32_t *_table; - uint32_t _table_size; - - friend class cereal::access; - template void serialize(Archive &archive) { - archive(_mark, _table_size); - - if (Archive::is_loading::value) { - // If we are loading, allocate memory for the table and delete - // previously allocated memory if any. - delete[] _table; - _table = new uint32_t[_table_size]; - } - - archive(cereal::binary_data(_table, _table_size * sizeof(uint32_t))); - } - -public: - ExplicitSet() = default; - - ExplicitSet(const uint32_t size) : _mark(0), _table_size(size) { - // initialize values to 0 - _table = new uint32_t[_table_size](); - } - - inline void prefetch(const uint32_t num) const { -#ifdef USE_SSE - _mm_prefetch((char *)_table[num], _MM_HINT_T0); -#endif - } - - inline uint32_t getMark() const { return _mark; } - - inline void insert(const uint32_t num) { set(num); } - - inline void set(const uint32_t num) { _table[num] = _mark; } - - inline uint32_t size() const { return _table_size; } - - inline void reset(const uint32_t num) { _table[num] = _mark + 1; } - - inline void clear() { _mark++; } - - inline bool operator[](const uint32_t num) { return (_table[num] == _mark); } - - ~ExplicitSet() { delete[] _table; } - - // copy constructor - ExplicitSet(const ExplicitSet &other) { - _table_size = other._table_size; - _mark = other._mark; - delete[] _table; - _table = new uint32_t[_table_size]; - std::memcpy(other._table, _table, _table_size * sizeof(uint32_t)); - } - - // move constructor - ExplicitSet(ExplicitSet &&other) noexcept { - _table_size = other._table_size; - _mark = other._mark; - _table = other._table; - other._table = NULL; - other._table_size = 0; - other._mark = 0; - } - - // copy assignment - ExplicitSet &operator=(const ExplicitSet &other) { - return *this = ExplicitSet(other); - } - - // move assignment - ExplicitSet &operator=(ExplicitSet &&other) noexcept { - _table_size = other._table_size; - _mark = other._mark; - _table = other._table; - other._table = NULL; - other._table_size = 0; - other._mark = 0; - return *this; - } -}; - -class ShardedExplicitSet { - uint32_t _shard_size; - std::vector> _shards; - - // Mutexes for each shard - // We are using a std::unique_ptr here because std::mutex is neither - // copy-constructible nor move-constructible and std::vector requires - // its elements to be copy-constructible or move-constructible. - std::vector> _shard_mutexes; - -public: - // Constructor for cereal. Do not call except for serialization. - ShardedExplicitSet() = default; - - /** - * @brief Construct a new Sharded Explicit Set object - * TODO: Add exception checks in the constructor to make sure we take - * valid input arguments for total_size and num_shards. - * - * @param total_size Corresponds to the total number of elements across all - * shards. This is also the maximum number of nodes held by a flatnav index. - * @param num_shards Corresponds to the number of sharded regions. Each region - * (shard) is an ExplicitSet object. - */ - ShardedExplicitSet(uint32_t total_size, uint32_t num_shards) - : _shard_size(total_size / num_shards), _shards(num_shards), - _shard_mutexes(num_shards) { - for (uint32_t i = 0; i < num_shards; i++) { - _shards[i] = std::make_unique(_shard_size); - _shard_mutexes[i] = std::make_unique(); - } - } - - void insert(uint32_t node_id) { - uint32_t shard_id = node_id / _shard_size; - - { - std::lock_guard lock(*(_shard_mutexes[shard_id])); - uint32_t index_in_shard = node_id % _shard_size; - _shards[shard_id]->insert(index_in_shard); - } - } - - inline bool operator[](uint32_t node_id) { - uint32_t shard_id = node_id / _shard_size; - - std::lock_guard lock(*(_shard_mutexes[shard_id])); - uint32_t index_in_shard = node_id % _shard_size; - return _shards[shard_id]->operator[](index_in_shard); - } - - inline void clear(uint32_t node_id) { - uint32_t shard_id = node_id / _shard_size; - std::lock_guard lock(*(_shard_mutexes[shard_id])); - _shards[shard_id]->clear(); - } - - inline void clearAll() { - // Step 1: Acquire locks on all shards - std::vector> locks; - locks.reserve(_shard_mutexes.size()); - - for (auto &mutex : _shard_mutexes) { - locks.emplace_back(*mutex); - } - - // Step 2: Clear all shards - for (auto &shard : _shards) { - shard->clear(); - } - } - - inline bool allShardsHaveSameMark() { - uint32_t mark = _shards[0]->getMark(); - - for (uint32_t i = 1; i < _shards.size(); i++) { - if (_shards[i]->getMark() != mark) { - return false; - } - } - - return true; - } - - ~ShardedExplicitSet() = default; - -private: - friend class cereal::access; - template void serialize(Archive &archive) { - archive(_shard_size, _shards); - - if (Archive::is_loading::value) { - - _shard_mutexes.resize(_shards.size()); - for (uint32_t i = 0; i < _shards.size(); i++) { - _shard_mutexes[i] = std::make_unique(); - } - } - } -}; - -} // namespace flatnav \ No newline at end of file diff --git a/flatnav/util/ParallelConstructs.h b/flatnav/util/ParallelConstructs.h index c3a42b8..d6ea777 100644 --- a/flatnav/util/ParallelConstructs.h +++ b/flatnav/util/ParallelConstructs.h @@ -12,49 +12,30 @@ namespace flatnav { template void parallelFor(uint32_t start, uint32_t end, uint32_t num_threads, Function fn) { - if (num_threads <= 0) { + if (num_threads == 0) { throw std::invalid_argument("Invalid number of threads"); } - if (num_threads == 1) { - for (uint32_t i = start; i < end; i++) { - fn(i, 0); - } - return; - } - std::vector threads; + // This needs to be an atomic because mutliple threads will be + // modifying it concurrently. std::atomic current(start); + std::thread thread_objects[num_threads]; - std::exception_ptr last_exception = nullptr; - std::mutex last_exception_mutex; - - for (uint32_t thread_id = 0; thread_id < num_threads; thread_id++) { - threads.push_back(std::thread([&, thread_id] { - while (true) { - uint32_t current_value = current.fetch_add(1); - if (current_value >= end) { - break; - } - - try { - fn(current_value, thread_id); - } catch (...) { - std::unique_lock lock(last_exception_mutex); - last_exception = std::current_exception(); - - current = end; - break; - } + auto parallel_executor = [&] { + while (true) { + uint32_t current_vector_idx = current.fetch_add(1); + if (current_vector_idx >= end) { + break; } - })); - } + fn(current_vector_idx); + } + }; - for (auto &thread : threads) { - thread.join(); + for (uint32_t id = 0; id < num_threads; id++) { + thread_objects[id] = std::thread(parallel_executor); } - - if (last_exception) { - std::rethrow_exception(last_exception); + for (uint32_t id = 0; id < num_threads; id++) { + thread_objects[id].join(); } } diff --git a/flatnav/util/Reordering.h b/flatnav/util/Reordering.h index a5a3979..904f95d 100644 --- a/flatnav/util/Reordering.h +++ b/flatnav/util/Reordering.h @@ -1,7 +1,7 @@ #pragma once -#include "ExplicitSet.h" -#include "GorderPriorityQueue.h" +#include +#include #include #include @@ -137,7 +137,7 @@ rcmOrder(std::vector> &outdegree_table) { const std::pair &b) { return a.second < b.second; }); std::vector P; - ExplicitSet is_listed = ExplicitSet(cur_num_nodes); + VisitedNodesHandler is_listed = VisitedNodesHandler(cur_num_nodes); is_listed.clear(); for (int i = 0; i < sorted_nodes.size(); i++) { diff --git a/flatnav/util/VisitedNodesHandler.h b/flatnav/util/VisitedNodesHandler.h new file mode 100644 index 0000000..a3d55b1 --- /dev/null +++ b/flatnav/util/VisitedNodesHandler.h @@ -0,0 +1,167 @@ +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace flatnav { + +class VisitedNodesHandler { +private: + uint32_t _mark; + uint32_t *_table; + uint32_t _table_size; + + friend class cereal::access; + template void serialize(Archive &archive) { + archive(_mark, _table_size); + + if (Archive::is_loading::value) { + // If we are loading, allocate memory for the table and delete + // previously allocated memory if any. + delete[] _table; + _table = new uint32_t[_table_size]; + } + + archive(cereal::binary_data(_table, _table_size * sizeof(uint32_t))); + } + +public: + VisitedNodesHandler() = default; + + VisitedNodesHandler(const uint32_t size) : _mark(0), _table_size(size) { + // initialize values to 0 + _table = new uint32_t[_table_size](); + } + + inline void prefetch(const uint32_t num) const { +#ifdef USE_SSE + _mm_prefetch((char *)_table[num], _MM_HINT_T0); +#endif + } + + inline uint32_t getMark() const { return _mark; } + + inline void insert(const uint32_t num) { set(num); } + + inline void set(const uint32_t num) { _table[num] = _mark; } + + inline uint32_t size() const { return _table_size; } + + inline void reset(const uint32_t num) { _table[num] = _mark + 1; } + + inline void clear() { _mark++; } + + inline bool operator[](const uint32_t num) { return (_table[num] == _mark); } + + ~VisitedNodesHandler() { delete[] _table; } + + // copy constructor + VisitedNodesHandler(const VisitedNodesHandler &other) { + _table_size = other._table_size; + _mark = other._mark; + _table = new uint32_t[_table_size]; + std::memcpy(_table, other._table, _table_size * sizeof(uint32_t)); + } + + // move constructor + VisitedNodesHandler(VisitedNodesHandler &&other) noexcept { + _table_size = other._table_size; + _mark = other._mark; + _table = other._table; + other._table = NULL; + other._table_size = 0; + other._mark = 0; + } + + // copy assignment + VisitedNodesHandler &operator=(const VisitedNodesHandler &other) { + if (this != &other) { + delete[] _table; + _table_size = other._table_size; + _mark = other._mark; + _table = new uint32_t[_table_size]; + std::memcpy(_table, other._table, _table_size * sizeof(uint32_t)); + } + return *this; + } + + // move assignment + VisitedNodesHandler &operator=(VisitedNodesHandler &&other) noexcept { + _table_size = other._table_size; + _mark = other._mark; + _table = other._table; + other._table = NULL; + other._table_size = 0; + other._mark = 0; + return *this; + } +}; + +class ThreadSafeVisitedNodesHandler { + std::vector> _handler_pool; + std::mutex _pool_guard; + uint32_t _num_elements; + uint32_t _total_handlers_in_use; + + friend class cereal::access; + + template void serialize(Archive &archive) { + archive(_handler_pool, _num_elements, _total_handlers_in_use); + } + +public: + ThreadSafeVisitedNodesHandler() = default; + ThreadSafeVisitedNodesHandler(uint32_t initial_pool_size, + uint32_t num_elements) + : _handler_pool(initial_pool_size), _num_elements(num_elements), + _total_handlers_in_use(1) { + for (uint32_t handler_id = 0; handler_id < _handler_pool.size(); + handler_id++) { + _handler_pool[handler_id] = + std::make_unique(/* size = */ _num_elements); + } + } + + VisitedNodesHandler *pollAvailableHandler() { + std::unique_lock lock(_pool_guard); + + if (!_handler_pool.empty()) { + // NOTE: release() call is required here to ensure that we don't free + // the handler's memory before using it since it's under a unique pointer. + auto *handler = _handler_pool.back().release(); + _handler_pool.pop_back(); + return handler; + } else { + // TODO: This is not great because it assumes the caller is responsible + // enough to return this handler to the pool. If the caller doesn't return + // the handler to the pool, we will have a memory leak. This can be + // resolved by std::unique_ptr but I prefer to use a raw pointer here. + auto *handler = new VisitedNodesHandler(/* size = */ _num_elements); + _total_handlers_in_use++; + return handler; + } + } + + void pushHandler(VisitedNodesHandler *handler) { + std::unique_lock lock(_pool_guard); + + _handler_pool.push_back(std::make_unique(*handler)); + _handler_pool.shrink_to_fit(); + } + + inline uint32_t getPoolSize() { return _handler_pool.size(); } + + ~ThreadSafeVisitedNodesHandler() = default; +}; + +} // namespace flatnav \ No newline at end of file diff --git a/flatnav_python/python_bindings.cpp b/flatnav_python/python_bindings.cpp index f6bb0e2..a1f0020 100644 --- a/flatnav_python/python_bindings.cpp +++ b/flatnav_python/python_bindings.cpp @@ -69,39 +69,30 @@ template class PyIndex { throw std::invalid_argument("Data has incorrect dimensions."); } if (labels.is_none()) { - for (size_t vec_index = 0; vec_index < num_vectors; vec_index++) { - this->_index->add(/* data = */ (void *)data.data(vec_index), - /* label = */ label_id, - /* ef_construction = */ ef_construction, - /* num_initializations = */ 100); - if (_verbose && vec_index % NUM_LOG_STEPS == 0) { - std::clog << "." << std::flush; - } - label_id++; - } - std::clog << std::endl; + std::vector vec_labels(num_vectors); + std::iota(vec_labels.begin(), vec_labels.end(), 0); + + this->_index->addParallel( + /* data = */ (void *)data.data(0), /* labels = */ vec_labels, + /* ef_construction = */ ef_construction, + /* num_initializations = */ num_initializations); return; } // Use the provided labels now - py::array_t node_labels( - labels); - if (node_labels.ndim() != 1 || node_labels.shape(0) != num_vectors) { - throw std::invalid_argument("Labels have incorrect dimensions."); - } - - for (size_t vec_index = 0; vec_index < num_vectors; vec_index++) { - label_t label_id = *node_labels.data(vec_index); - this->_index->add(/* data = */ (void *)data.data(vec_index), - /* label = */ label_id, - /* ef_construction = */ ef_construction, - /* num_initializations = */ 100); - - if (_verbose && vec_index % NUM_LOG_STEPS == 0) { - std::clog << "." << std::flush; + try { + auto vec_labels = py::cast>(labels); + if (vec_labels.size() != num_vectors) { + throw std::invalid_argument("Incorrect numbe of labels."); } + + this->_index->addParallel( + /* data = */ (void *)data.data(0), /* labels = */ vec_labels, + /* ef_construction = */ ef_construction, + /* num_initializations = */ num_initializations); + } catch (const py::cast_error &error) { + throw std::invalid_argument("Invalid labels provided."); } - std::clog << std::endl; } DistancesLabelsPair @@ -205,6 +196,15 @@ void bindIndexMethods(py::class_ &index_class) { "Perform graph re-ordering based on the given sequence of " "re-ordering strategies. " "Supported re-ordering algorithms include `gorder` and `rcm`.") + .def_property( + "num_threads", + [](IndexType &index_type, uint32_t num_threads) { + index_type.getIndex()->setNumThreads( + /* num_threads = */ num_threads); + }, + [](IndexType &index_type) { index_type.getIndex()->getNumThreads(); }, + "Configure the desired number of threads. This is useful for " + "constructing the NSW graph in parallel.") .def_property_readonly( "max_edges_per_node", [](IndexType &index_type) { diff --git a/tools/construct_npy.cpp b/tools/construct_npy.cpp index 435ccdf..58c7faf 100644 --- a/tools/construct_npy.cpp +++ b/tools/construct_npy.cpp @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -31,33 +30,16 @@ void buildIndex(float *data, int M, int dim, int ef_construction, const std::string &save_file) { - std::cout << "Building index" << std::endl; - auto index = new Index( /* dist = */ std::move(distance), /* dataset_size = */ N, /* max_edges = */ M); - std::cout << "Index initialized" << std::endl; - auto start = std::chrono::high_resolution_clock::now(); - // for (int label = 0; label < N; label++) { - // float *element = data + (dim * label); - // index->add(/* data = */ (void *)element, /* label = */ label, - // /* ef_construction */ ef_construction); - // if (label % 10000 == 0) - // std::clog << "." << std::flush; - // } - - // Invoke addParallel() to add vectors in parallel. - std::cout << "Creating a vector of labels" << std::endl; std::vector labels(N); std::iota(labels.begin(), labels.end(), 0); - std::cout << "Adding vectors in parallel" << std::endl; index->addParallel(/* data = */ (void *)data, /* labels = */ labels, /* ef_construction */ ef_construction); - std::cout << "Done adding vectors in parallel" << std::endl; - std::clog << std::endl; auto stop = std::chrono::high_resolution_clock ::now(); auto duration = diff --git a/tools/query_npy.cpp b/tools/query_npy.cpp index 29da978..fad19f8 100644 --- a/tools/query_npy.cpp +++ b/tools/query_npy.cpp @@ -50,8 +50,6 @@ void run(float *queries, int *gtruth, const std::string &index_filename, float *q = queries + dim * i; int *g = gtruth + num_gtruth * i; - std::cout << "[INFO] Query " << i << std::endl; - std::vector> result = index->search(q, K, ef_search); diff --git a/tools/reorder_npy.cpp b/tools/reorder_npy.cpp deleted file mode 100644 index d8e046b..0000000 --- a/tools/reorder_npy.cpp +++ /dev/null @@ -1,151 +0,0 @@ -#include "../flatnav/Index.h" -#include "../flatnav/distances/InnerProductDistance.h" -#include "../flatnav/distances/SquaredL2Distance.h" -#include "cnpy.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using flatnav::Index; -using flatnav::InnerProductDistance; -using flatnav::SquaredL2Distance; - -int main(int argc, char **argv) { - - if (argc < 4) { - std::clog << "Usage: " << std::endl; - std::clog << "reorder_npy "; - std::clog << "[ ]"; - std::clog << "Positional arguments:" << std::endl; - std::clog << "\t : Filename for input index (float32 index)." - << std::endl; - std::clog - << "\t : Which reordering algorithm to use? 0:none " - "1:gorder 2:indegsort 3:outdegsort 4:RCM 5:hubsort 6:hubcluster " - "7:DBG 8:corder 91:profiled_gorder 94:profiled_rcm 41:RCM+gorder" - << std::endl; - std::clog << "\t : Filename for output index (float32 index)." - << std::endl; - std::clog << "Profiling arguments:" << std::endl; - std::clog << "\t : Filename for profiling queries (NPY file)." - << std::endl; - std::clog << "\t : Integer distance ID: 0 for L2 distance, 1 for " - "inner product (angular distance)." - << std::endl; - std::clog << "\t : Number of queries to use for profiling." - << std::endl; - std::clog - << "\t : Candidate list size for search with profiling." - << std::endl; - return -1; - } - - std::string infile(argv[1]); - std::string outfile(argv[3]); - - int reorder_ID = std::stoi(argv[2]); - bool is_profiling = ((reorder_ID >= 90) && (reorder_ID < 100)); - if (is_profiling) { - // We are doing profiling, so we need the extra arguments. - if (argc < 8) { - std::clog << "Incorrect arguments for profiled reordering. Run with no " - "arguments for help." - << std::endl; - return -1; - } - cnpy::NpyArray queryfile = cnpy::npy_load(argv[4]); - int num_queries_check = queryfile.shape[0]; - int dim = queryfile.shape[1]; - int num_queries = std::stoi(argv[6]); - int ef_search = std::stoi(argv[7]); - - std::clog << "Reading " << num_queries << " queries of " - << num_queries_check << " total queries of dimension " << dim - << "." << std::endl; - if (num_queries_check != num_queries) { - std::clog << "Warning: Using only " << num_queries << " points of total " - << num_queries_check << "." << std::endl; - } - float *queries = queryfile.data(); - std::clog << "Read " << num_queries << " queries for profiling." - << std::endl; - - int metric_id = std::stoi(argv[5]); - if (metric_id == 0) { - auto distance = std::make_unique(dim); - - } else if (metric_id == 1) { - auto distance = std::make_unique(dim); - } else { - throw std::invalid_argument("Provided metric ID " + - std::to_string(metric_id) + "is invalid."); - } - auto index = new Index(/* stream */ infile); - - // SpaceInterface* space; - // if (space_ID == 0){ - // space = new L2Space(dim); - // } else { - // space = new InnerProductSpace(dim); - // } - // Index index(space, infile); - - if (reorder_ID == 91) { - std::clog << "Using profile-based GORDER" << std::endl; - std::clog << "Reordering" << std::endl; - auto start_r = std::chrono::high_resolution_clock::now(); - index.profile_reorder(queries, num_queries, ef_search, - Index::ProfileOrder::GORDER); - auto stop_r = std::chrono::high_resolution_clock::now(); - auto duration_r = std::chrono::duration_cast( - stop_r - start_r); - std::clog << "Reorder time: " << (float)(duration_r.count()) / (1000.0) - << " seconds" << std::endl; - } else if (reorder_ID == 94) { - std::clog << "Using profile-based RCM" << std::endl; - std::clog << "Reordering" << std::endl; - auto start_r = std::chrono::high_resolution_clock::now(); - index.profile_reorder(queries, num_queries, ef_search, - Index::ProfileOrder::RCM); - auto stop_r = std::chrono::high_resolution_clock::now(); - auto duration_r = std::chrono::duration_cast( - stop_r - start_r); - std::clog << "Reorder time: " << (float)(duration_r.count()) / (1000.0) - << " seconds" << std::endl; - } else { - std::clog << "No reordering" << std::endl; - } - std::clog << "Saving index." << std::endl; - index.save(outfile); - } else { - // we can do reordering without knowing anything about the space or - // dimensions. - L2Space space(0); - Index index(&space, infile); - - if (reorder_ID == 1) { - std::clog << "Using GORDER" << std::endl; - std::clog << "Reordering: " << std::endl; - auto start_r = std::chrono::high_resolution_clock::now(); - index.reorder(Index::GraphOrder::GORDER); - auto stop_r = std::chrono::high_resolution_clock::now(); - auto duration_r = std::chrono::duration_cast( - stop_r - start_r); - std::clog << "Reorder time: " << (float)(duration_r.count()) / (1000.0) - << " seconds" << std::endl; - } else { - std::clog << "No reordering" << std::endl; - } - std::clog << "Saving index." << std::endl; - index.save(outfile); - } - - return 0; -}