Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,63 @@
# In this case, we *want* to extract twice, because there is no other way.
pass

def compare_and_extract_chunks(self, item, fs_path, *, st, pi=None):
"""Compare file chunks and patch if needed. Returns True if patching succeeded."""
if st is None:
return False
try:
# First pass: Build fs chunks list
fs_chunks = []
with backup_io("open"):
fs_file = open(fs_path, "rb")
with fs_file:
for chunk in item.chunks:
with backup_io("read"):
data = fs_file.read(chunk.size)

fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=len(data)))

# Compare chunks and collect needed chunk IDs
needed_chunks = []
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk != item_chunk:
needed_chunks.append(item_chunk)

# Fetch all needed chunks and iterate through ALL of them
chunk_data_iter = self.pipeline.fetch_many([chunk.id for chunk in needed_chunks], ro_type=ROBJ_FILE_STREAM)

# Second pass: Update file
with backup_io("open"):
fs_file = open(fs_path, "rb+")
with fs_file:
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk == item_chunk:
with backup_io("seek"):
fs_file.seek(item_chunk.size, 1)
else:
chunk_data = next(chunk_data_iter)

with backup_io("write"):
fs_file.write(chunk_data)
if pi:
pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)])

Check warning on line 761 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L761

Added line #L761 was not covered by tests

final_size = fs_file.tell()
with backup_io("truncate_and_attrs"):
fs_file.truncate(item.size)
fs_file.flush()
self.restore_attrs(fs_path, item, fd=fs_file.fileno())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could create a method clear_attrs close to the definition of restore_attrs and move that code you added above to there.


if "size" in item and item.size != final_size:
raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {final_size}")

Check warning on line 770 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L770

Added line #L770 was not covered by tests

if "chunks_healthy" in item and not item.chunks_healthy:
raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")

Check warning on line 773 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L773

Added line #L773 was not covered by tests

return True
except OSError:
return False

Check warning on line 777 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L776-L777

Added lines #L776 - L777 were not covered by tests

def extract_item(
self,
item,
Expand Down Expand Up @@ -802,12 +859,14 @@
return # done! we already have fully extracted this file in a previous run.
elif stat.S_ISDIR(st.st_mode):
os.rmdir(path)
st = None
else:
os.unlink(path)
st = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you think about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ThomasWaldmann I didn't realize that this would break the continue_extraction functionality. The issue is compare_and_extract_chunks still tries to use stale st info after the file is unlinked.

I have tried:

  • Tracking unlink state with flags
  • Checking inode/link count
  • Modifying comparison logic

The only fix in my mind is the extra OS call to check the file's existence. Maybe I'm missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the fs file is a normal file, your code requires it to be there, so it can be "updated" - thus it must not be removed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And we also need to think about what to do with existing metadata, like acls, xattrs, fs flags, ... - the current code assumes that there is no existing metadata and just adds the stuff from the archive item.

Copy link
Contributor Author

@alighazi288 alighazi288 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ThomasWaldmann another solution I can think of is:

try:
    st = os.stat(path, follow_symlinks=False)
    if continue_extraction and same_item(item, st):
        return  # done! we already have fully extracted this file in a previous run.
    if stat.S_ISREG(st.st_mode) and not continue_extraction:
        if self.compare_and_extract_chunks(item, path, st=st, pi=pi):
            return
    elif stat.S_ISDIR(st.st_mode):
        os.rmdir(path)
    else:
        os.unlink(path)

This way we can try an in-place update attempt before any removal/recreation. If the function returns True, we're done. Otherwise, we fall back to the original remove/recreate behavior.

Also, since I'm using restore_attrs() just like the existing code and not handling metadata directly at all, shouldn't it be consistent with how Borg already works?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I already said: the restore_attrs code expects a fresh state of the file (like newly created, no acls, no xattrs) and just adds the stuff from the archived item.

But if you are updating an existing file, there can be already acls or xattrs that do not match what's in the archive (and what shall be the final state).

except UnicodeEncodeError:
raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
except OSError:
pass
st = None

def make_parent(path):
parent_dir = os.path.dirname(path)
Expand All @@ -821,6 +880,9 @@
with self.extract_helper(item, path, hlm) as hardlink_set:
if hardlink_set:
return
if self.compare_and_extract_chunks(item, path, st=st, pi=pi):
return

Check warning on line 884 in src/borg/archive.py

View check run for this annotation

Codecov / codecov/patch

src/borg/archive.py#L884

Added line #L884 was not covered by tests

with backup_io("open"):
fd = open(path, "wb")
with fd:
Expand Down
165 changes: 164 additions & 1 deletion src/borg/testsuite/archive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
from ..archive import BackupOSError, backup_io, backup_io_iter, get_item_uid_gid
from ..helpers import msgpack
from ..item import Item, ArchiveItem
from ..item import Item, ArchiveItem, ChunkListEntry
from ..manifest import Manifest
from ..platform import uid2user, gid2group, is_win32

Expand Down Expand Up @@ -132,6 +132,11 @@ def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None):
self.objects[id] = data
return id, len(data)

def fetch_many(self, ids, ro_type=None):
"""Mock implementation of fetch_many"""
for id in ids:
yield self.objects[id]


def test_cache_chunk_buffer():
data = [Item(path="p1"), Item(path="p2")]
Expand Down Expand Up @@ -402,3 +407,161 @@ def test_reject_non_sanitized_item():
for path in rejected_dotdot_paths:
with pytest.raises(ValueError, match="unexpected '..' element in path"):
Item(path=path, user="root", group="root")


@pytest.fixture
def setup_extractor(tmpdir):
"""Setup common test infrastructure"""

class MockCache:
def __init__(self):
self.objects = {}

repository = Mock()
key = PlaintextKey(repository)
manifest = Manifest(key, repository)
cache = MockCache()

extractor = Archive(manifest=manifest, name="test", create=True)
extractor.pipeline = cache
extractor.key = key
extractor.cwd = str(tmpdir)
extractor.restore_attrs = Mock()

# Track fetched chunks across tests
fetched_chunks = []

def create_mock_chunks(item_data, chunk_size=4):
chunks = []
for i in range(0, len(item_data), chunk_size):
chunk_data = item_data[i : i + chunk_size]
chunk_id = key.id_hash(chunk_data)
chunks.append(ChunkListEntry(id=chunk_id, size=len(chunk_data)))
cache.objects[chunk_id] = chunk_data

item = Mock(spec=["chunks", "size", "__contains__", "get"])
item.chunks = chunks
item.size = len(item_data)
item.__contains__ = lambda self, item: item == "size"

return item, str(tmpdir.join("test.txt"))

def mock_fetch_many(chunk_ids, ro_type=None):
fetched_chunks.extend(chunk_ids)
return iter([cache.objects[chunk_id] for chunk_id in chunk_ids])

def clear_fetched_chunks():
fetched_chunks.clear()

def get_fetched_chunks():
return fetched_chunks

cache.fetch_many = mock_fetch_many

return extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks


@pytest.mark.parametrize(
"name, item_data, fs_data, expected_fetched_chunks",
[
(
"no_changes",
b"1111", # One complete chunk, no changes needed
b"1111", # Identical content
0, # No chunks should be fetched
),
(
"single_chunk_change",
b"11112222", # Two chunks
b"1111XXXX", # Second chunk different
1, # Only second chunk should be fetched
),
(
"cross_boundary_change",
b"11112222", # Two chunks
b"111XX22", # Change crosses chunk boundary
2, # Both chunks need update
),
(
"exact_multiple_chunks",
b"11112222333", # Three chunks (last one partial)
b"1111XXXX333", # Middle chunk different
1, # Only middle chunk fetched
),
(
"first_chunk_change",
b"11112222", # Two chunks
b"XXXX2222", # First chunk different
1, # Only first chunk should be fetched
),
(
"all_chunks_different",
b"11112222", # Two chunks
b"XXXXYYYY", # Both chunks different
2, # Both chunks should be fetched
),
(
"partial_last_chunk",
b"111122", # One full chunk + partial
b"1111XX", # Partial chunk different
1, # Only second chunk should be fetched
),
(
"fs_file_shorter",
b"11112222", # Two chunks in archive
b"111122", # Shorter on disk - missing part of second chunk
1, # Should fetch second chunk
),
(
"fs_file_longer",
b"11112222", # Two chunks in archive
b"1111222233", # Longer on disk
0, # Should fetch no chunks since content matches up to archive length
),
(
"empty_archive_file",
b"", # Empty in archive
b"11112222", # Content on disk
0, # No chunks to compare = no chunks to fetch
),
(
"empty_fs_file",
b"11112222", # Two chunks in archive
b"", # Empty on disk
2, # Should fetch all chunks since file is empty
),
],
)
def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks):
"""Test chunk comparison and extraction"""
extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor
clear_fetched_chunks()

chunk_size = 4
item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size)

original_chunk_ids = [chunk.id for chunk in item.chunks]

with open(target_path, "wb") as f:
f.write(fs_data)

st = os.stat(target_path)
result = extractor.compare_and_extract_chunks(item, target_path, st=st)
assert result

fetched_chunks = get_fetched_chunks()
assert len(fetched_chunks) == expected_fetched_chunks

# For single chunk changes, verify it's the correct chunk
if expected_fetched_chunks == 1:
item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)]
fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)]

# Find which chunk should have changed by comparing item_data with fs_data
for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)):
if item_chunk != fs_chunk:
assert fetched_chunks[0] == original_chunk_ids[i]
break

with open(target_path, "rb") as f:
assert f.read() == item_data
Loading