diff --git a/cpp/test.cpp b/cpp/test.cpp index 4e03bfb8..9fedfcc9 100644 --- a/cpp/test.cpp +++ b/cpp/test.cpp @@ -1096,6 +1096,108 @@ template void test_replacing_update() { expect_eq(final_search[2].member.key, 44); } +/** + * @brief Tests merging. + */ +void test_merge() { + using index_t = index_gt<>; + using distance_t = typename index_t::distance_t; + using key_t = typename index_t::key_t; + using compressed_slot_t = typename index_t::compressed_slot_t; + using member_ref_t = typename index_t::member_ref_t; + using member_cref_t = typename index_t::member_cref_t; + using member_citerator_t = typename index_t::member_citerator_t; + using add_result_t = typename index_t::add_result_t; + + using value_t = float; + + auto create_index = []() { + auto index_result = index_t::make(); + expect(index_result); + return std::move(index_result.index); + }; + + struct metric_t { + std::unordered_map values; + + metric_t() : values() {} + distance_t compute(value_t const& a, value_t const& b) { + if (b > a) { + return b - a; + } else { + return a - b; + } + } + distance_t operator()(value_t const& a, member_cref_t const& b) { return compute(a, values.at(get_slot(b))); } + distance_t operator()(value_t const& a, member_citerator_t const& b) { + return compute(a, values.at(get_slot(b))); + } + distance_t operator()(member_citerator_t const& a, member_citerator_t const& b) { + return compute(values.at(get_slot(a)), values.at(get_slot(b))); + } + }; + + auto add = [](index_t& index, key_t const key, value_t const value, metric_t& metric) { + auto on_success = [&](member_ref_t member) { metric.values[member.slot] = value; }; + add_result_t result = index.add(key, value, metric, {}, on_success); + expect(result); + }; + + // Prepare index 1 + auto index1 = create_index(); + metric_t metric1; + expect(index1.reserve(3)); + add(index1, 11, 1.1f, metric1); + add(index1, 12, 2.1f, metric1); + add(index1, 13, 3.1f, metric1); + expect_eq(index1.size(), 3); + + // Prepare index 2 + auto index2 = create_index(); + metric_t metric2; + expect(index2.reserve(4)); + add(index2, 21, -1.1f, metric2); + add(index2, 22, -2.1f, metric2); + add(index2, 23, -3.1f, metric2); + add(index2, 24, -4.1f, metric2); + expect_eq(index2.size(), 4); + + // Merge indexes + char const* merge_file_path = "merge.usearch"; + auto merged_index = create_index(); + expect(merged_index.save(merge_file_path)); + memory_mapped_file_t file{merge_file_path, true}; + expect(merged_index.load(std::move(file))); + metric_t merged_metric; + auto merge_on_success = [&](member_ref_t member, value_t const& value) { + merged_metric.values[member.slot] = value; + }; + auto get_value1 = [&](member_cref_t member) -> value_t& { return metric1.values[member.slot]; }; + expect(merged_index.merge(index1, get_value1, merged_metric, {}, merge_on_success)); + auto get_value2 = [&](member_cref_t member) -> value_t& { return metric2.values[member.slot]; }; + expect(merged_index.merge(index2, get_value2, merged_metric, {}, merge_on_success)); + + // Assert + expect_eq(merged_index.size(), 7); + auto search = merged_index.search(0.75f, 3, merged_metric); + expect_eq(search.size(), 3); + expect_eq(static_cast(search[0].member.key), 11); + expect_eq(static_cast(search[1].member.key), 12); + expect_eq(static_cast(search[2].member.key), 21); + + // Re-load merged indexes + merged_index.reset(); + merged_index.load(merge_file_path); + + // Assert + expect_eq(merged_index.size(), 7); + search = merged_index.search(0.75f, 3, merged_metric); + expect_eq(search.size(), 3); + expect_eq(static_cast(search[0].member.key), 11); + expect_eq(static_cast(search[1].member.key), 12); + expect_eq(static_cast(search[2].member.key), 21); +} + int main(int, char**) { test_uint40(); test_cosine(10, 10); @@ -1163,5 +1265,9 @@ int main(int, char**) { test_sets(set_size, 20, 30); test_strings(); + // Test merge + std::printf("Testing merge\n"); + test_merge(); + return 0; } diff --git a/include/usearch/index.hpp b/include/usearch/index.hpp index 581ecd37..309ae93f 100644 --- a/include/usearch/index.hpp +++ b/include/usearch/index.hpp @@ -85,18 +85,20 @@ #endif // STL includes -#include // `std::sort_heap` -#include // `std::atomic` -#include // `std::bitset` -#include // `CHAR_BIT` -#include // `std::sqrt` -#include // `std::memset` -#include // `std::reverse_iterator` -#include // `std::unique_lock` - replacement candidate -#include // `std::default_random_engine` - replacement candidate -#include // `std::runtime_exception` -#include // `std::thread` -#include // `std::pair` +#include // `std::sort_heap` +#include // `std::atomic` +#include // `std::bitset` +#include // `CHAR_BIT` +#include // `std::sqrt` +#include // `std::memset` +#include // `std::filesystem` +#include // `std::ofstream` +#include // `std::reverse_iterator` +#include // `std::unique_lock` - replacement candidate +#include // `std::default_random_engine` - replacement candidate +#include // `std::runtime_exception` +#include // `std::thread` +#include // `std::pair` // Helper macros for concatenation and stringification #define usearch_concat_helper_m(a, b) a##b @@ -1491,6 +1493,17 @@ struct dummy_callback_t { template void operator()(member_at&&) const noexcept {} }; +/** + * @brief An example of what a USearch-compatible ad-hoc operation on in-flight entries in merging. + * + * This kind of callbacks is used when the engine is merging and you want to patch + * the entries, while their are still under locks - limiting concurrent access and providing + * consistency. + */ +struct dummy_merge_callback_t { + template void operator()(member_at&&, value_at const&) const noexcept {} +}; + /** * @brief An example of what a USearch-compatible progress-bar should look like. * @@ -1720,9 +1733,10 @@ class input_file_t { * The class automatically closes the file when the object is destroyed. */ class memory_mapped_file_t { - char const* path_{}; /**< The path to the file to be memory-mapped. */ - void* ptr_{}; /**< A pointer to the memory-mapping. */ - size_t length_{}; /**< The length of the memory-mapped file in bytes. */ + char const* path_{}; /**< The path to the file to be memory-mapped. */ + void* ptr_{}; /**< A pointer to the memory-mapping. */ + size_t length_{}; /**< The length of the memory-mapped file in bytes. */ + bool is_writable_{false}; /**< Whether the memory-mapped file is writable or not. */ #if defined(USEARCH_DEFINED_WINDOWS) HANDLE file_handle_{}; /**< The file handle on Windows. */ @@ -1736,13 +1750,15 @@ class memory_mapped_file_t { byte_t* data() noexcept { return reinterpret_cast(ptr_); } byte_t const* data() const noexcept { return reinterpret_cast(ptr_); } std::size_t size() const noexcept { return static_cast(length_); } + bool is_writable() const noexcept { return is_writable_; } memory_mapped_file_t() noexcept {} - memory_mapped_file_t(char const* path) noexcept : path_(path) {} + memory_mapped_file_t(char const* path, bool is_writable = false) noexcept + : path_(path), is_writable_(is_writable) {} ~memory_mapped_file_t() noexcept { close(); } memory_mapped_file_t(memory_mapped_file_t&& other) noexcept : path_(exchange(other.path_, nullptr)), ptr_(exchange(other.ptr_, nullptr)), - length_(exchange(other.length_, 0)), + length_(exchange(other.length_, 0)), is_writable_(other.is_writable_), #if defined(USEARCH_DEFINED_WINDOWS) file_handle_(exchange(other.file_handle_, nullptr)), mapping_handle_(exchange(other.mapping_handle_, nullptr)) #else @@ -1757,6 +1773,7 @@ class memory_mapped_file_t { std::swap(path_, other.path_); std::swap(ptr_, other.ptr_); std::swap(length_, other.length_); + std::swap(is_writable_, other.is_writable_); #if defined(USEARCH_DEFINED_WINDOWS) std::swap(file_handle_, other.file_handle_); std::swap(mapping_handle_, other.mapping_handle_); @@ -1773,19 +1790,31 @@ class memory_mapped_file_t { #if defined(USEARCH_DEFINED_WINDOWS) + DWARD file_desired_access = GENERIC_READ; + DWARD share_mode = FILE_SHARE_READ; + DWORD creation_disposition = OPEN_EXISTING; + if (is_writable_) { + file_desired_access |= GENERIC_WRITE; + share_mode |= FILE_SHARE_WRITE; + } HANDLE file_handle = - CreateFile(path_, GENERIC_READ, FILE_SHARE_READ, 0, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0); + CreateFile(path_, file_desired_access, share_mode, 0, creation_disposition, FILE_ATTRIBUTE_NORMAL, 0); if (file_handle == INVALID_HANDLE_VALUE) return result.failed("Opening file failed!"); std::size_t file_length = GetFileSize(file_handle, 0); - HANDLE mapping_handle = CreateFileMapping(file_handle, 0, PAGE_READONLY, 0, 0, 0); + DWORD protect = is_writable_ ? PAGE_READWRITE : PAGE_READONLY; + HANDLE mapping_handle = CreateFileMapping(file_handle, 0, protect, 0, 0, 0); if (mapping_handle == 0) { CloseHandle(file_handle); return result.failed("Mapping file failed!"); } - byte_t* file = (byte_t*)MapViewOfFile(mapping_handle, FILE_MAP_READ, 0, 0, file_length); + DWORD map_desired_access = FILE_MAP_READ; + if (is_writable_) { + map_desired_access |= FILE_MAP_WRITE; + } + byte_t* file = (byte_t*)MapViewOfFile(mapping_handle, map_desired_access, 0, 0, file_length); if (file == 0) { CloseHandle(mapping_handle); CloseHandle(file_handle); @@ -1797,11 +1826,11 @@ class memory_mapped_file_t { length_ = file_length; #else + int open_flags = is_writable_ ? O_RDWR : O_RDONLY; #if defined(USEARCH_DEFINED_LINUX) - int descriptor = open(path_, O_RDONLY | O_NOATIME); -#else - int descriptor = open(path_, O_RDONLY); + open_flags |= O_NOATIME; #endif + int descriptor = open(path_, open_flags); if (descriptor < 0) return result.failed(std::strerror(errno)); @@ -1814,7 +1843,10 @@ class memory_mapped_file_t { } // Map the entire file - byte_t* file = (byte_t*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, descriptor, 0); + int map_prot = PROT_READ; + if (is_writable_) + map_prot |= PROT_WRITE; + byte_t* file = (byte_t*)mmap(NULL, file_stat.st_size, map_prot, MAP_SHARED, descriptor, 0); if (file == MAP_FAILED) { ::close(descriptor); return result.failed(std::strerror(errno)); @@ -1846,6 +1878,24 @@ class memory_mapped_file_t { ptr_ = nullptr; length_ = 0; } + + /** + * @brief Increase file size. + * + * This uses sparse file if possible. + * + * The memory mapped file must be closed and writable. + */ + void reserve(std::size_t new_size) noexcept { + if (!path_) { + return; + } + if (!is_writable_) { + return; + } + // Make this file sparse if possible. + std::filesystem::resize_file(path_, new_size); + } }; /** @@ -2243,7 +2293,15 @@ class index_gt { tape_allocator_t tape_allocator_{}; precomputed_constants_t pre_{}; - memory_mapped_file_t viewed_file_{}; + /// @brief The associated memory mapped file. + memory_mapped_file_t mapped_file_{}; + /// @brief The offset speficied when the mapped file is loaded. + std::size_t mapped_file_offset_{0}; + /// @brief The offset that will be used when the next node is allocated from + /// the mapped file by `node_malloc_()`. + std::size_t mapped_file_next_node_offset_{0}; + /// @brief Whether the index is mutable or not. + bool is_mutable_{true}; /// @brief Controls access to `max_level_` and `entry_slot_`. /// If any thread is updating those values, no other threads can `add()` or `search()`. @@ -2275,7 +2333,7 @@ class index_gt { std::size_t max_level() const noexcept { return nodes_count_ ? static_cast(max_level_) : 0; } index_config_t const& config() const noexcept { return config_; } index_limits_t const& limits() const noexcept { return limits_; } - bool is_immutable() const noexcept { return bool(viewed_file_); } + bool is_immutable() const noexcept { return !is_mutable_; } explicit operator bool() const noexcept { return config_.is_valid(); } /** @@ -2431,9 +2489,27 @@ class index_gt { * * Will change both `size()` and `capacity()` to zero. * Will deallocate all threads/contexts. + * If the index is memory-mapped and mutable - writes headers and levels. * If the index is memory-mapped - releases the mapping and the descriptor. */ void reset() noexcept { + if (mapped_file_ && is_mutable_) { + // Write header and levels because header information may be + // changed in-memory and levels in nodes_ may be changed. + index_serialized_header_t header{}; + header.size = nodes_count_; + header.connectivity = config_.connectivity; + header.connectivity_base = config_.connectivity_base; + header.max_level = max_level_; + header.entry_slot = entry_slot_; + std::memcpy(mapped_file_.data() + mapped_file_offset_, &header, sizeof(header)); + misaligned_ptr_gt levels{mapped_file_.data() + mapped_file_offset_ + sizeof(header)}; + for (std::size_t i = 0; i != nodes_count_; ++i) { + level_t level = nodes_[i].level(); + levels[i] = level; + } + } + clear(); nodes_ = {}; @@ -2441,7 +2517,10 @@ class index_gt { nodes_mutexes_ = {}; limits_ = index_limits_t{0, 0}; nodes_capacity_ = 0; - viewed_file_ = memory_mapped_file_t{}; + mapped_file_ = memory_mapped_file_t{}; + mapped_file_offset_ = 0; + mapped_file_next_node_offset_ = 0; + is_mutable_ = true; tape_allocator_ = {}; } @@ -2454,7 +2533,10 @@ class index_gt { std::swap(dynamic_allocator_, other.dynamic_allocator_); std::swap(tape_allocator_, other.tape_allocator_); std::swap(pre_, other.pre_); - std::swap(viewed_file_, other.viewed_file_); + std::swap(mapped_file_, other.mapped_file_); + std::swap(mapped_file_offset_, other.mapped_file_offset_); + std::swap(mapped_file_next_node_offset_, other.mapped_file_next_node_offset_); + std::swap(is_mutable_, other.is_mutable_); std::swap(max_level_, other.max_level_); std::swap(entry_slot_, other.entry_slot_); std::swap(nodes_, other.nodes_); @@ -2498,11 +2580,60 @@ class index_gt { if (nodes_) std::memcpy(new_nodes.data(), nodes_.data(), sizeof(node_t) * size()); + if (mapped_file_ && is_mutable_ && limits.members > size()) { + // Extends associated memory-mapped file when the index is mutable. + if (!mapped_file_.open_if_not()) { + return false; + } + // Compute offsets and new file size. + std::size_t base_offset = mapped_file_offset_ + sizeof(index_serialized_header_t); + std::size_t nodes_block_offset = base_offset + size() * sizeof(level_t); + std::size_t nodes_block_end_offset = nodes_block_offset; + std::size_t new_nodes_block_offset = base_offset + limits.members * sizeof(level_t); + std::size_t new_file_size = new_nodes_block_offset; + for (std::size_t i = 0; i != size(); ++i) { + std::size_t node_size = node_bytes_(node_at_(i).level()); + new_file_size += node_size; + nodes_block_end_offset += node_size; + } + new_file_size += (limits.members - size()) * (node_bytes_(max_level()) + sizeof(level_t)); + + // Close before we extend the associated memory mapped file. + mapped_file_.close(); + // Extend the associated memory mapped file. + mapped_file_.reserve(new_file_size); + // Re-open the associated memory mapped file for updating new_nodes. + if (!mapped_file_.open_if_not()) { + return false; + } + + // Update new_nodes for new address. + if (nodes_block_end_offset != nodes_block_offset) { + // Move old node data to new node data space: + // + // Before: | header | level_t * old_size | old_nodes | + // After: | header | level_t * new_size | new_nodes | + // |<-->| + // We need to move for this space. + std::memmove(mapped_file_.data() + new_nodes_block_offset, mapped_file_.data() + nodes_block_offset, + nodes_block_end_offset - nodes_block_offset); + } + // Update node addresses and the next node offset for node_malloc_(). + misaligned_ptr_gt levels{mapped_file_.data() + base_offset}; + std::size_t node_offset = new_nodes_block_offset; + for (std::size_t i = 0; i != size(); ++i) { + new_nodes[i] = node_t{mapped_file_.data() + node_offset}; + node_offset += node_bytes_(levels[i]); + } + mapped_file_next_node_offset_ = node_offset; + } + limits_ = limits; nodes_capacity_ = limits.members; nodes_ = std::move(new_nodes); contexts_ = std::move(new_contexts); nodes_mutexes_ = std::move(new_mutexes); + return true; } @@ -2929,6 +3060,52 @@ class index_gt { return result; } + /** + * @brief Merge an index to this index. + * + * @param[in] index The index that has nodes to be merged. + * @param[in] get_value Callable object getting a value from `index`. + * @param[in] metric Callable object measuring distance between value and present objects. + * @param[in] metric Callable object measuring distance between value and present objects. + * @param[in] config Configuration options for this specific operation. + * @param[in] callback On-success callback, executed while the `member_ref_t` is still under lock. + */ + template < // + typename metric_at, // + typename get_value_at, // + typename merge_callback_at = dummy_merge_callback_t, // + typename prefetch_at = dummy_prefetch_t // + > + add_result_t merge( // + index_gt const& index, get_value_at&& get_value, metric_at&& metric, // + index_update_config_t update_config = {}, merge_callback_at&& callback = merge_callback_at{}, + prefetch_at&& prefetch = prefetch_at{}) noexcept { + // Reserve spaces. + std::size_t new_members = size() + index.size(); + index_limits_t limits; + limits.members = new_members; + if (!reserve(limits)) { + add_result_t result; + return result.failed("Out of memory"); + } + + // Add all values in `index` to this index. + add_result_t merge_result; + for (const auto& member : index) { + auto& value = get_value(member); + auto merge_callback = [&](member_ref_t m) { callback(m, value); }; + add_result_t result = add(get_key(member), value, metric, update_config, merge_callback, prefetch); + if (!result) { + return result; + } + } + merge_result.new_size = new_members; + // Should we set more members in merge_result? Or should we + // define merge_result_t that only has new_size? + + return merge_result; + } + /** * @brief Searches for the closest elements to the given ::query. Thread-safe. * @@ -3163,7 +3340,7 @@ class index_gt { */ std::size_t memory_usage(std::size_t allocator_entry_bytes = default_allocator_entry_bytes()) const noexcept { std::size_t total = 0; - if (!viewed_file_) { + if (!mapped_file_) { stats_t s = stats(); total += s.allocated_bytes; total += s.nodes * allocator_entry_bytes; @@ -3272,7 +3449,6 @@ class index_gt { // We are loading an empty index, no more work to do if (!header.size) { - reset(); return result; } @@ -3409,6 +3585,8 @@ class index_gt { /** * @brief Loads the serialized binary index representation from disk to RAM. + * If file is writable, this loads @b without copying + * data into RAM, and fetching it on-demand like view() does. * Adjusts the configuration properties of the constructed index to * match the settings in the file. */ @@ -3416,21 +3594,29 @@ class index_gt { serialization_result_t load(memory_mapped_file_t file, std::size_t offset = 0, progress_at&& progress = {}) noexcept { - serialization_result_t io_result = file.open_if_not(); - if (!io_result) - return io_result; + if (file.is_writable()) { + serialization_result_t result = view(std::move(file), offset, std::forward(progress)); + if (!result) + return result; - serialization_result_t stream_result = load_from_stream( - [&](void* buffer, std::size_t length) { - if (offset + length > file.size()) - return false; - std::memcpy(buffer, file.data() + offset, length); - offset += length; - return true; - }, - std::forward(progress)); - - return stream_result; + is_mutable_ = true; + return {}; + } else { + serialization_result_t io_result = file.open_if_not(); + if (!io_result) + return io_result; + + serialization_result_t stream_result = load_from_stream( + [&](void* buffer, std::size_t length) { + if (offset + length > file.size()) + return false; + std::memcpy(buffer, file.data() + offset, length); + offset += length; + return true; + }, + std::forward(progress)); + return stream_result; + } } /** @@ -3455,8 +3641,16 @@ class index_gt { return result.failed("File is corrupted and lacks a header"); std::memcpy(&header, file.data() + offset, sizeof(header)); + // Keep offset for re-using later. + mapped_file_offset_ = offset; + // Viewed index is immutable. + is_mutable_ = false; + if (!header.size) { - reset(); + // This is one of success paths. So keep the associated file. + mapped_file_ = std::move(file); + // No levels space exists for an empty index. + mapped_file_next_node_offset_ = offset + sizeof(header); return result; } @@ -3504,7 +3698,7 @@ class index_gt { if (!progress(i + 1, header.size)) return result.failed("Terminated by user"); } - viewed_file_ = std::move(file); + mapped_file_ = std::move(file); return {}; } @@ -3690,7 +3884,15 @@ class index_gt { span_bytes_t node_malloc_(level_t level) noexcept { std::size_t node_bytes = node_bytes_(level); - byte_t* data = (byte_t*)tape_allocator_.allocate(node_bytes); + byte_t* data; + if (mapped_file_) { + // Mutable memory mapped index allocates a node space from + // the next free node space. + data = mapped_file_.data() + mapped_file_next_node_offset_; + mapped_file_next_node_offset_ += node_bytes; + } else { + data = (byte_t*)tape_allocator_.allocate(node_bytes); + } return data ? span_bytes_t{data, node_bytes} : span_bytes_t{}; } @@ -3715,7 +3917,7 @@ class index_gt { } void node_free_(std::size_t idx) noexcept { - if (viewed_file_) + if (mapped_file_) return; node_t& node = nodes_[idx];