Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,94 @@ def extract_helper(self, item, path, hlm, *, dry_run=False):
# In this case, we *want* to extract twice, because there is no other way.
pass

def compare_and_extract_chunks(self, item, fs_path, *, st, pi=None):
"""Compare file chunks and patch if needed. Returns True if patching succeeded."""
if st is None:
return False
try:
# First pass: Build fs chunks list
fs_chunks = []
with backup_io("open"):
fs_file = open(fs_path, "rb")
with fs_file:
for chunk in item.chunks:
with backup_io("read"):
data = fs_file.read(chunk.size)

fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=len(data)))

# Compare chunks and collect needed chunk IDs
needed_chunks = []
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk != item_chunk:
needed_chunks.append(item_chunk)

# Fetch all needed chunks and iterate through ALL of them
chunk_data_iter = self.pipeline.fetch_many([chunk.id for chunk in needed_chunks], ro_type=ROBJ_FILE_STREAM)

# Second pass: Update file
with backup_io("open"):
fs_file = open(fs_path, "rb+")
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk == item_chunk:
with backup_io("seek"):
fs_file.seek(item_chunk.size, os.SEEK_CUR)
size = item_chunk.size
else:
chunk_data = next(chunk_data_iter)

with backup_io("write"):
fs_file.write(chunk_data)
size = len(chunk_data)
if pi:
pi.show(increase=size, info=[remove_surrogates(item.path)])

with backup_io("truncate_and_attrs"):
pos = item_chunks_size = fs_file.tell()
fs_file.truncate(pos)
fs_file.flush()

if not self.noacls:
try:
# Clear ACLs by setting empty ACL entries
acl_set(fs_path, {"acl_entries": []}, self.numeric_ids, fd=fs_file.fileno())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you didn't read the acl_set code, did you?

except OSError as e:
if e.errno != errno.ENOTSUP: # Ignore if ACLs not supported
raise
if not self.noflags:
try:
# Clear all BSD flags
set_flags(fs_path, 0, fd=fs_file.fileno())
except OSError:
pass
if not self.noxattrs:
try:
# Clear ALL xattrs
attrs = xattr.listxattr(fs_file.fileno(), follow_symlinks=False)
for attr in attrs:
try:
xattr.setxattr(fs_file.fileno(), attr, b"", follow_symlinks=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that does not do what you intended. setting an xattr to an empty bytestring is not the same as removing an xattr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are calling fs_file.fileno() way too often.

except OSError as e:
if e.errno != xattr.ENOATTR: # Ignore if already removed
raise
except OSError as e:
if e.errno != errno.ENOTSUP: # Ignore if xattrs not supported
raise
self.restore_attrs(fs_path, item, fd=fs_file.fileno())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could create a method clear_attrs close to the definition of restore_attrs and move that code you added above to there.


if "size" in item:
item_size = item.size
if item_size != item_chunks_size:
raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {item_chunks_size}")

has_damaged_chunks = "chunks_healthy" in item
if has_damaged_chunks:
raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")

return True
except OSError:
return False

def extract_item(
self,
item,
Expand Down Expand Up @@ -800,6 +888,9 @@ def same_item(item, st):
st = os.stat(path, follow_symlinks=False)
if continue_extraction and same_item(item, st):
return # done! we already have fully extracted this file in a previous run.
if stat.S_ISREG(st.st_mode) and not continue_extraction:
if self.compare_and_extract_chunks(item, path, st=st, pi=pi):
return
elif stat.S_ISDIR(st.st_mode):
os.rmdir(path)
else:
Expand All @@ -821,6 +912,7 @@ def make_parent(path):
with self.extract_helper(item, path, hlm) as hardlink_set:
if hardlink_set:
return

with backup_io("open"):
fd = open(path, "wb")
with fd:
Expand Down
165 changes: 164 additions & 1 deletion src/borg/testsuite/archive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
from ..archive import BackupOSError, backup_io, backup_io_iter, get_item_uid_gid
from ..helpers import msgpack
from ..item import Item, ArchiveItem
from ..item import Item, ArchiveItem, ChunkListEntry
from ..manifest import Manifest
from ..platform import uid2user, gid2group, is_win32

Expand Down Expand Up @@ -132,6 +132,11 @@ def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None):
self.objects[id] = data
return id, len(data)

def fetch_many(self, ids, ro_type=None):
"""Mock implementation of fetch_many"""
for id in ids:
yield self.objects[id]


def test_cache_chunk_buffer():
data = [Item(path="p1"), Item(path="p2")]
Expand Down Expand Up @@ -402,3 +407,161 @@ def test_reject_non_sanitized_item():
for path in rejected_dotdot_paths:
with pytest.raises(ValueError, match="unexpected '..' element in path"):
Item(path=path, user="root", group="root")


@pytest.fixture
def setup_extractor(tmpdir):
"""Setup common test infrastructure"""

class MockCache:
def __init__(self):
self.objects = {}

repository = Mock()
key = PlaintextKey(repository)
manifest = Manifest(key, repository)
cache = MockCache()

extractor = Archive(manifest=manifest, name="test", create=True)
extractor.pipeline = cache
extractor.key = key
extractor.cwd = str(tmpdir)
extractor.restore_attrs = Mock()

# Track fetched chunks across tests
fetched_chunks = []

def create_mock_chunks(item_data, chunk_size=4):
chunks = []
for i in range(0, len(item_data), chunk_size):
chunk_data = item_data[i : i + chunk_size]
chunk_id = key.id_hash(chunk_data)
chunks.append(ChunkListEntry(id=chunk_id, size=len(chunk_data)))
cache.objects[chunk_id] = chunk_data

item = Mock(spec=["chunks", "size", "__contains__", "get"])
item.chunks = chunks
item.size = len(item_data)
item.__contains__ = lambda self, item: item == "size"

return item, str(tmpdir.join("test.txt"))

def mock_fetch_many(chunk_ids, ro_type=None):
fetched_chunks.extend(chunk_ids)
return iter([cache.objects[chunk_id] for chunk_id in chunk_ids])

def clear_fetched_chunks():
fetched_chunks.clear()

def get_fetched_chunks():
return fetched_chunks

cache.fetch_many = mock_fetch_many

return extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks


@pytest.mark.parametrize(
"name, item_data, fs_data, expected_fetched_chunks",
[
(
"no_changes",
b"1111", # One complete chunk, no changes needed
b"1111", # Identical content
0, # No chunks should be fetched
),
(
"single_chunk_change",
b"11112222", # Two chunks
b"1111XXXX", # Second chunk different
1, # Only second chunk should be fetched
),
(
"cross_boundary_change",
b"11112222", # Two chunks
b"111XX22", # Change crosses chunk boundary
2, # Both chunks need update
),
(
"exact_multiple_chunks",
b"11112222333", # Three chunks (last one partial)
b"1111XXXX333", # Middle chunk different
1, # Only middle chunk fetched
),
(
"first_chunk_change",
b"11112222", # Two chunks
b"XXXX2222", # First chunk different
1, # Only first chunk should be fetched
),
(
"all_chunks_different",
b"11112222", # Two chunks
b"XXXXYYYY", # Both chunks different
2, # Both chunks should be fetched
),
(
"partial_last_chunk",
b"111122", # One full chunk + partial
b"1111XX", # Partial chunk different
1, # Only second chunk should be fetched
),
(
"fs_file_shorter",
b"11112222", # Two chunks in archive
b"111122", # Shorter on disk - missing part of second chunk
1, # Should fetch second chunk
),
(
"fs_file_longer",
b"11112222", # Two chunks in archive
b"1111222233", # Longer on disk
0, # Should fetch no chunks since content matches up to archive length
),
(
"empty_archive_file",
b"", # Empty in archive
b"11112222", # Content on disk
0, # No chunks to compare = no chunks to fetch
),
(
"empty_fs_file",
b"11112222", # Two chunks in archive
b"", # Empty on disk
2, # Should fetch all chunks since file is empty
),
],
)
def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks):
"""Test chunk comparison and extraction"""
extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor
clear_fetched_chunks()

chunk_size = 4
item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size)

original_chunk_ids = [chunk.id for chunk in item.chunks]

with open(target_path, "wb") as f:
f.write(fs_data)

st = os.stat(target_path)
result = extractor.compare_and_extract_chunks(item, target_path, st=st)
assert result

fetched_chunks = get_fetched_chunks()
assert len(fetched_chunks) == expected_fetched_chunks

# For single chunk changes, verify it's the correct chunk
if expected_fetched_chunks == 1:
item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)]
fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)]

# Find which chunk should have changed by comparing item_data with fs_data
for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)):
if item_chunk != fs_chunk:
assert fetched_chunks[0] == original_chunk_ids[i]
break

with open(target_path, "rb") as f:
assert f.read() == item_data
Loading