From 57760ef3956c75ce6a80e1766e7b58f2f5f2d5a0 Mon Sep 17 00:00:00 2001 From: alighazi288 <51366992+alighazi288@users.noreply.github.com> Date: Sat, 11 Jan 2025 23:53:12 -0500 Subject: [PATCH 1/2] Implement chunk comparison and selective extraction - Add compare_and_extract_chunks functionality - Add comprehensive test coverage - Fix file state tracking with st parameter --- src/borg/archive.py | 64 ++++++++++- src/borg/testsuite/archive_test.py | 165 ++++++++++++++++++++++++++++- 2 files changed, 227 insertions(+), 2 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 01d1617d17..7461e8ada1 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -719,6 +719,63 @@ def extract_helper(self, item, path, hlm, *, dry_run=False): # In this case, we *want* to extract twice, because there is no other way. pass + def compare_and_extract_chunks(self, item, fs_path, *, st, pi=None): + """Compare file chunks and patch if needed. Returns True if patching succeeded.""" + if st is None: + return False + try: + # First pass: Build fs chunks list + fs_chunks = [] + with backup_io("open"): + fs_file = open(fs_path, "rb") + with fs_file: + for chunk in item.chunks: + with backup_io("read"): + data = fs_file.read(chunk.size) + + fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=len(data))) + + # Compare chunks and collect needed chunk IDs + needed_chunks = [] + for fs_chunk, item_chunk in zip(fs_chunks, item.chunks): + if fs_chunk != item_chunk: + needed_chunks.append(item_chunk) + + # Fetch all needed chunks and iterate through ALL of them + chunk_data_iter = self.pipeline.fetch_many([chunk.id for chunk in needed_chunks], ro_type=ROBJ_FILE_STREAM) + + # Second pass: Update file + with backup_io("open"): + fs_file = open(fs_path, "rb+") + with fs_file: + for fs_chunk, item_chunk in zip(fs_chunks, item.chunks): + if fs_chunk == item_chunk: + with backup_io("seek"): + fs_file.seek(item_chunk.size, 1) + else: + chunk_data = next(chunk_data_iter) + + with backup_io("write"): + fs_file.write(chunk_data) + if pi: + pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)]) + + final_size = fs_file.tell() + with backup_io("truncate_and_attrs"): + fs_file.truncate(item.size) + fs_file.flush() + self.restore_attrs(fs_path, item, fd=fs_file.fileno()) + + if "size" in item and item.size != final_size: + raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {final_size}") + + if "chunks_healthy" in item and not item.chunks_healthy: + raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.") + + return True + except OSError: + return False + def extract_item( self, item, @@ -802,12 +859,14 @@ def same_item(item, st): return # done! we already have fully extracted this file in a previous run. elif stat.S_ISDIR(st.st_mode): os.rmdir(path) + st = None else: os.unlink(path) + st = None except UnicodeEncodeError: raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None except OSError: - pass + st = None def make_parent(path): parent_dir = os.path.dirname(path) @@ -821,6 +880,9 @@ def make_parent(path): with self.extract_helper(item, path, hlm) as hardlink_set: if hardlink_set: return + if self.compare_and_extract_chunks(item, path, st=st, pi=pi): + return + with backup_io("open"): fd = open(path, "wb") with fd: diff --git a/src/borg/testsuite/archive_test.py b/src/borg/testsuite/archive_test.py index 1157994d7d..3aee648ab4 100644 --- a/src/borg/testsuite/archive_test.py +++ b/src/borg/testsuite/archive_test.py @@ -12,7 +12,7 @@ from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics from ..archive import BackupOSError, backup_io, backup_io_iter, get_item_uid_gid from ..helpers import msgpack -from ..item import Item, ArchiveItem +from ..item import Item, ArchiveItem, ChunkListEntry from ..manifest import Manifest from ..platform import uid2user, gid2group, is_win32 @@ -132,6 +132,11 @@ def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None): self.objects[id] = data return id, len(data) + def fetch_many(self, ids, ro_type=None): + """Mock implementation of fetch_many""" + for id in ids: + yield self.objects[id] + def test_cache_chunk_buffer(): data = [Item(path="p1"), Item(path="p2")] @@ -402,3 +407,161 @@ def test_reject_non_sanitized_item(): for path in rejected_dotdot_paths: with pytest.raises(ValueError, match="unexpected '..' element in path"): Item(path=path, user="root", group="root") + + +@pytest.fixture +def setup_extractor(tmpdir): + """Setup common test infrastructure""" + + class MockCache: + def __init__(self): + self.objects = {} + + repository = Mock() + key = PlaintextKey(repository) + manifest = Manifest(key, repository) + cache = MockCache() + + extractor = Archive(manifest=manifest, name="test", create=True) + extractor.pipeline = cache + extractor.key = key + extractor.cwd = str(tmpdir) + extractor.restore_attrs = Mock() + + # Track fetched chunks across tests + fetched_chunks = [] + + def create_mock_chunks(item_data, chunk_size=4): + chunks = [] + for i in range(0, len(item_data), chunk_size): + chunk_data = item_data[i : i + chunk_size] + chunk_id = key.id_hash(chunk_data) + chunks.append(ChunkListEntry(id=chunk_id, size=len(chunk_data))) + cache.objects[chunk_id] = chunk_data + + item = Mock(spec=["chunks", "size", "__contains__", "get"]) + item.chunks = chunks + item.size = len(item_data) + item.__contains__ = lambda self, item: item == "size" + + return item, str(tmpdir.join("test.txt")) + + def mock_fetch_many(chunk_ids, ro_type=None): + fetched_chunks.extend(chunk_ids) + return iter([cache.objects[chunk_id] for chunk_id in chunk_ids]) + + def clear_fetched_chunks(): + fetched_chunks.clear() + + def get_fetched_chunks(): + return fetched_chunks + + cache.fetch_many = mock_fetch_many + + return extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks + + +@pytest.mark.parametrize( + "name, item_data, fs_data, expected_fetched_chunks", + [ + ( + "no_changes", + b"1111", # One complete chunk, no changes needed + b"1111", # Identical content + 0, # No chunks should be fetched + ), + ( + "single_chunk_change", + b"11112222", # Two chunks + b"1111XXXX", # Second chunk different + 1, # Only second chunk should be fetched + ), + ( + "cross_boundary_change", + b"11112222", # Two chunks + b"111XX22", # Change crosses chunk boundary + 2, # Both chunks need update + ), + ( + "exact_multiple_chunks", + b"11112222333", # Three chunks (last one partial) + b"1111XXXX333", # Middle chunk different + 1, # Only middle chunk fetched + ), + ( + "first_chunk_change", + b"11112222", # Two chunks + b"XXXX2222", # First chunk different + 1, # Only first chunk should be fetched + ), + ( + "all_chunks_different", + b"11112222", # Two chunks + b"XXXXYYYY", # Both chunks different + 2, # Both chunks should be fetched + ), + ( + "partial_last_chunk", + b"111122", # One full chunk + partial + b"1111XX", # Partial chunk different + 1, # Only second chunk should be fetched + ), + ( + "fs_file_shorter", + b"11112222", # Two chunks in archive + b"111122", # Shorter on disk - missing part of second chunk + 1, # Should fetch second chunk + ), + ( + "fs_file_longer", + b"11112222", # Two chunks in archive + b"1111222233", # Longer on disk + 0, # Should fetch no chunks since content matches up to archive length + ), + ( + "empty_archive_file", + b"", # Empty in archive + b"11112222", # Content on disk + 0, # No chunks to compare = no chunks to fetch + ), + ( + "empty_fs_file", + b"11112222", # Two chunks in archive + b"", # Empty on disk + 2, # Should fetch all chunks since file is empty + ), + ], +) +def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks): + """Test chunk comparison and extraction""" + extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor + clear_fetched_chunks() + + chunk_size = 4 + item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size) + + original_chunk_ids = [chunk.id for chunk in item.chunks] + + with open(target_path, "wb") as f: + f.write(fs_data) + + st = os.stat(target_path) + result = extractor.compare_and_extract_chunks(item, target_path, st=st) + assert result + + fetched_chunks = get_fetched_chunks() + assert len(fetched_chunks) == expected_fetched_chunks + + # For single chunk changes, verify it's the correct chunk + if expected_fetched_chunks == 1: + item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)] + fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)] + + # Find which chunk should have changed by comparing item_data with fs_data + for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)): + if item_chunk != fs_chunk: + assert fetched_chunks[0] == original_chunk_ids[i] + break + + with open(target_path, "rb") as f: + assert f.read() == item_data From 80862e58ff1175aed4bbf2c1c935d7ff62add512 Mon Sep 17 00:00:00 2001 From: alighazi288 <51366992+alighazi288@users.noreply.github.com> Date: Sun, 26 Jan 2025 18:57:46 -0500 Subject: [PATCH 2/2] Refactor `compare_and_extract_chunks` --- src/borg/archive.py | 56 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 7461e8ada1..e590263c9b 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -747,29 +747,60 @@ def compare_and_extract_chunks(self, item, fs_path, *, st, pi=None): # Second pass: Update file with backup_io("open"): fs_file = open(fs_path, "rb+") - with fs_file: for fs_chunk, item_chunk in zip(fs_chunks, item.chunks): if fs_chunk == item_chunk: with backup_io("seek"): - fs_file.seek(item_chunk.size, 1) + fs_file.seek(item_chunk.size, os.SEEK_CUR) + size = item_chunk.size else: chunk_data = next(chunk_data_iter) with backup_io("write"): fs_file.write(chunk_data) + size = len(chunk_data) if pi: - pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)]) + pi.show(increase=size, info=[remove_surrogates(item.path)]) - final_size = fs_file.tell() with backup_io("truncate_and_attrs"): - fs_file.truncate(item.size) + pos = item_chunks_size = fs_file.tell() + fs_file.truncate(pos) fs_file.flush() + + if not self.noacls: + try: + # Clear ACLs by setting empty ACL entries + acl_set(fs_path, {"acl_entries": []}, self.numeric_ids, fd=fs_file.fileno()) + except OSError as e: + if e.errno != errno.ENOTSUP: # Ignore if ACLs not supported + raise + if not self.noflags: + try: + # Clear all BSD flags + set_flags(fs_path, 0, fd=fs_file.fileno()) + except OSError: + pass + if not self.noxattrs: + try: + # Clear ALL xattrs + attrs = xattr.listxattr(fs_file.fileno(), follow_symlinks=False) + for attr in attrs: + try: + xattr.setxattr(fs_file.fileno(), attr, b"", follow_symlinks=False) + except OSError as e: + if e.errno != xattr.ENOATTR: # Ignore if already removed + raise + except OSError as e: + if e.errno != errno.ENOTSUP: # Ignore if xattrs not supported + raise self.restore_attrs(fs_path, item, fd=fs_file.fileno()) - if "size" in item and item.size != final_size: - raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {final_size}") + if "size" in item: + item_size = item.size + if item_size != item_chunks_size: + raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {item_chunks_size}") - if "chunks_healthy" in item and not item.chunks_healthy: + has_damaged_chunks = "chunks_healthy" in item + if has_damaged_chunks: raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.") return True @@ -857,16 +888,17 @@ def same_item(item, st): st = os.stat(path, follow_symlinks=False) if continue_extraction and same_item(item, st): return # done! we already have fully extracted this file in a previous run. + if stat.S_ISREG(st.st_mode) and not continue_extraction: + if self.compare_and_extract_chunks(item, path, st=st, pi=pi): + return elif stat.S_ISDIR(st.st_mode): os.rmdir(path) - st = None else: os.unlink(path) - st = None except UnicodeEncodeError: raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None except OSError: - st = None + pass def make_parent(path): parent_dir = os.path.dirname(path) @@ -880,8 +912,6 @@ def make_parent(path): with self.extract_helper(item, path, hlm) as hardlink_set: if hardlink_set: return - if self.compare_and_extract_chunks(item, path, st=st, pi=pi): - return with backup_io("open"): fd = open(path, "wb")