Skip to content

Commit 564b040

Browse files
committed
Implement chunk comparison and selective extraction
Add tests Change logic for chunk comaprison and extraction Refactor `compare_and_extract_chunks` and improve test coverage To do: - Remove additional comments after approval. Thank you for helping me with my first file system operations contribution! Refactor `compare_and_extract_chunks` remove unnecessary `st` parameter Fix file state tracking with st = None after unlink/rmdir
1 parent b9498ca commit 564b040

File tree

2 files changed

+227
-2
lines changed

2 files changed

+227
-2
lines changed

src/borg/archive.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,63 @@ def extract_helper(self, item, path, hlm, *, dry_run=False):
719719
# In this case, we *want* to extract twice, because there is no other way.
720720
pass
721721

722+
def compare_and_extract_chunks(self, item, fs_path, *, st, pi=None):
723+
"""Compare file chunks and patch if needed. Returns True if patching succeeded."""
724+
if st is None:
725+
return False
726+
try:
727+
# First pass: Build fs chunks list
728+
fs_chunks = []
729+
with backup_io("open"):
730+
fs_file = open(fs_path, "rb")
731+
with fs_file:
732+
for chunk in item.chunks:
733+
with backup_io("read"):
734+
data = fs_file.read(chunk.size)
735+
736+
fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=len(data)))
737+
738+
# Compare chunks and collect needed chunk IDs
739+
needed_chunks = []
740+
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
741+
if fs_chunk != item_chunk:
742+
needed_chunks.append(item_chunk)
743+
744+
# Fetch all needed chunks and iterate through ALL of them
745+
chunk_data_iter = self.pipeline.fetch_many([chunk.id for chunk in needed_chunks], ro_type=ROBJ_FILE_STREAM)
746+
747+
# Second pass: Update file
748+
with backup_io("open"):
749+
fs_file = open(fs_path, "rb+")
750+
with fs_file:
751+
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
752+
if fs_chunk == item_chunk:
753+
with backup_io("seek"):
754+
fs_file.seek(item_chunk.size, 1)
755+
else:
756+
chunk_data = next(chunk_data_iter)
757+
758+
with backup_io("write"):
759+
fs_file.write(chunk_data)
760+
if pi:
761+
pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)])
762+
763+
final_size = fs_file.tell()
764+
with backup_io("truncate_and_attrs"):
765+
fs_file.truncate(item.size)
766+
fs_file.flush()
767+
self.restore_attrs(fs_path, item, fd=fs_file.fileno())
768+
769+
if "size" in item and item.size != final_size:
770+
raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {final_size}")
771+
772+
if "chunks_healthy" in item and not item.chunks_healthy:
773+
raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")
774+
775+
return True
776+
except OSError:
777+
return False
778+
722779
def extract_item(
723780
self,
724781
item,
@@ -802,12 +859,14 @@ def same_item(item, st):
802859
return # done! we already have fully extracted this file in a previous run.
803860
elif stat.S_ISDIR(st.st_mode):
804861
os.rmdir(path)
862+
st = None
805863
else:
806864
os.unlink(path)
865+
st = None
807866
except UnicodeEncodeError:
808867
raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
809868
except OSError:
810-
pass
869+
st = None
811870

812871
def make_parent(path):
813872
parent_dir = os.path.dirname(path)
@@ -821,6 +880,9 @@ def make_parent(path):
821880
with self.extract_helper(item, path, hlm) as hardlink_set:
822881
if hardlink_set:
823882
return
883+
if self.compare_and_extract_chunks(item, path, st=st, pi=pi):
884+
return
885+
824886
with backup_io("open"):
825887
fd = open(path, "wb")
826888
with fd:

src/borg/testsuite/archive_test.py

Lines changed: 164 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
1313
from ..archive import BackupOSError, backup_io, backup_io_iter, get_item_uid_gid
1414
from ..helpers import msgpack
15-
from ..item import Item, ArchiveItem
15+
from ..item import Item, ArchiveItem, ChunkListEntry
1616
from ..manifest import Manifest
1717
from ..platform import uid2user, gid2group, is_win32
1818

@@ -132,6 +132,11 @@ def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None):
132132
self.objects[id] = data
133133
return id, len(data)
134134

135+
def fetch_many(self, ids, ro_type=None):
136+
"""Mock implementation of fetch_many"""
137+
for id in ids:
138+
yield self.objects[id]
139+
135140

136141
def test_cache_chunk_buffer():
137142
data = [Item(path="p1"), Item(path="p2")]
@@ -402,3 +407,161 @@ def test_reject_non_sanitized_item():
402407
for path in rejected_dotdot_paths:
403408
with pytest.raises(ValueError, match="unexpected '..' element in path"):
404409
Item(path=path, user="root", group="root")
410+
411+
412+
@pytest.fixture
413+
def setup_extractor(tmpdir):
414+
"""Setup common test infrastructure"""
415+
416+
class MockCache:
417+
def __init__(self):
418+
self.objects = {}
419+
420+
repository = Mock()
421+
key = PlaintextKey(repository)
422+
manifest = Manifest(key, repository)
423+
cache = MockCache()
424+
425+
extractor = Archive(manifest=manifest, name="test", create=True)
426+
extractor.pipeline = cache
427+
extractor.key = key
428+
extractor.cwd = str(tmpdir)
429+
extractor.restore_attrs = Mock()
430+
431+
# Track fetched chunks across tests
432+
fetched_chunks = []
433+
434+
def create_mock_chunks(item_data, chunk_size=4):
435+
chunks = []
436+
for i in range(0, len(item_data), chunk_size):
437+
chunk_data = item_data[i : i + chunk_size]
438+
chunk_id = key.id_hash(chunk_data)
439+
chunks.append(ChunkListEntry(id=chunk_id, size=len(chunk_data)))
440+
cache.objects[chunk_id] = chunk_data
441+
442+
item = Mock(spec=["chunks", "size", "__contains__", "get"])
443+
item.chunks = chunks
444+
item.size = len(item_data)
445+
item.__contains__ = lambda self, item: item == "size"
446+
447+
return item, str(tmpdir.join("test.txt"))
448+
449+
def mock_fetch_many(chunk_ids, ro_type=None):
450+
fetched_chunks.extend(chunk_ids)
451+
return iter([cache.objects[chunk_id] for chunk_id in chunk_ids])
452+
453+
def clear_fetched_chunks():
454+
fetched_chunks.clear()
455+
456+
def get_fetched_chunks():
457+
return fetched_chunks
458+
459+
cache.fetch_many = mock_fetch_many
460+
461+
return extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks
462+
463+
464+
@pytest.mark.parametrize(
465+
"name, item_data, fs_data, expected_fetched_chunks",
466+
[
467+
(
468+
"no_changes",
469+
b"1111", # One complete chunk, no changes needed
470+
b"1111", # Identical content
471+
0, # No chunks should be fetched
472+
),
473+
(
474+
"single_chunk_change",
475+
b"11112222", # Two chunks
476+
b"1111XXXX", # Second chunk different
477+
1, # Only second chunk should be fetched
478+
),
479+
(
480+
"cross_boundary_change",
481+
b"11112222", # Two chunks
482+
b"111XX22", # Change crosses chunk boundary
483+
2, # Both chunks need update
484+
),
485+
(
486+
"exact_multiple_chunks",
487+
b"11112222333", # Three chunks (last one partial)
488+
b"1111XXXX333", # Middle chunk different
489+
1, # Only middle chunk fetched
490+
),
491+
(
492+
"first_chunk_change",
493+
b"11112222", # Two chunks
494+
b"XXXX2222", # First chunk different
495+
1, # Only first chunk should be fetched
496+
),
497+
(
498+
"all_chunks_different",
499+
b"11112222", # Two chunks
500+
b"XXXXYYYY", # Both chunks different
501+
2, # Both chunks should be fetched
502+
),
503+
(
504+
"partial_last_chunk",
505+
b"111122", # One full chunk + partial
506+
b"1111XX", # Partial chunk different
507+
1, # Only second chunk should be fetched
508+
),
509+
(
510+
"fs_file_shorter",
511+
b"11112222", # Two chunks in archive
512+
b"111122", # Shorter on disk - missing part of second chunk
513+
1, # Should fetch second chunk
514+
),
515+
(
516+
"fs_file_longer",
517+
b"11112222", # Two chunks in archive
518+
b"1111222233", # Longer on disk
519+
0, # Should fetch no chunks since content matches up to archive length
520+
),
521+
(
522+
"empty_archive_file",
523+
b"", # Empty in archive
524+
b"11112222", # Content on disk
525+
0, # No chunks to compare = no chunks to fetch
526+
),
527+
(
528+
"empty_fs_file",
529+
b"11112222", # Two chunks in archive
530+
b"", # Empty on disk
531+
2, # Should fetch all chunks since file is empty
532+
),
533+
],
534+
)
535+
def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks):
536+
"""Test chunk comparison and extraction"""
537+
extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor
538+
clear_fetched_chunks()
539+
540+
chunk_size = 4
541+
item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size)
542+
543+
original_chunk_ids = [chunk.id for chunk in item.chunks]
544+
545+
with open(target_path, "wb") as f:
546+
f.write(fs_data)
547+
548+
st = os.stat(target_path)
549+
result = extractor.compare_and_extract_chunks(item, target_path, st=st)
550+
assert result
551+
552+
fetched_chunks = get_fetched_chunks()
553+
assert len(fetched_chunks) == expected_fetched_chunks
554+
555+
# For single chunk changes, verify it's the correct chunk
556+
if expected_fetched_chunks == 1:
557+
item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)]
558+
fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)]
559+
560+
# Find which chunk should have changed by comparing item_data with fs_data
561+
for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)):
562+
if item_chunk != fs_chunk:
563+
assert fetched_chunks[0] == original_chunk_ids[i]
564+
break
565+
566+
with open(target_path, "rb") as f:
567+
assert f.read() == item_data

0 commit comments

Comments
 (0)