diff --git a/news/12991.feature.rst b/news/12991.feature.rst new file mode 100644 index 00000000000..26c2df389ed --- /dev/null +++ b/news/12991.feature.rst @@ -0,0 +1,3 @@ +Add support to enable resuming incomplete downloads. + +Control the number of retry attempts using the ``--resume-retries`` flag. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index eeb7e651b79..1b292d585f7 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -1028,6 +1028,15 @@ def check_list_path_option(options: Values) -> None: help=("Enable deprecated functionality, that will be removed in the future."), ) +resume_retries: Callable[..., Option] = partial( + Option, + "--resume-retries", + dest="resume_retries", + type="int", + default=0, + help="Maximum number of resumption retries for incomplete downloads" + "(default %default times).", +) ########## # groups # @@ -1061,6 +1070,7 @@ def check_list_path_option(options: Values) -> None: no_python_version_warning, use_new_feature, use_deprecated_feature, + resume_retries, ], } diff --git a/src/pip/_internal/cli/progress_bars.py b/src/pip/_internal/cli/progress_bars.py index 3d9dde8ed88..533f9575176 100644 --- a/src/pip/_internal/cli/progress_bars.py +++ b/src/pip/_internal/cli/progress_bars.py @@ -26,6 +26,7 @@ def _rich_progress_bar( *, bar_type: str, size: Optional[int], + initial_progress: Optional[int] = None, ) -> Generator[bytes, None, None]: assert bar_type == "on", "This should only be used in the default mode." @@ -51,6 +52,8 @@ def _rich_progress_bar( progress = Progress(*columns, refresh_per_second=5) task_id = progress.add_task(" " * (get_indentation() + 2), total=total) + if initial_progress is not None: + progress.update(task_id, advance=initial_progress) with progress: for chunk in iterable: yield chunk @@ -61,12 +64,13 @@ def _raw_progress_bar( iterable: Iterable[bytes], *, size: Optional[int], + initial_progress: Optional[int] = None, ) -> Generator[bytes, None, None]: def write_progress(current: int, total: int) -> None: sys.stdout.write(f"Progress {current} of {total}\n") sys.stdout.flush() - current = 0 + current = initial_progress or 0 total = size or 0 rate_limiter = RateLimiter(0.25) @@ -80,15 +84,24 @@ def write_progress(current: int, total: int) -> None: def get_download_progress_renderer( - *, bar_type: str, size: Optional[int] = None + *, bar_type: str, size: Optional[int] = None, initial_progress: Optional[int] = None ) -> DownloadProgressRenderer: """Get an object that can be used to render the download progress. Returns a callable, that takes an iterable to "wrap". """ if bar_type == "on": - return functools.partial(_rich_progress_bar, bar_type=bar_type, size=size) + return functools.partial( + _rich_progress_bar, + bar_type=bar_type, + size=size, + initial_progress=initial_progress, + ) elif bar_type == "raw": - return functools.partial(_raw_progress_bar, size=size) + return functools.partial( + _raw_progress_bar, + size=size, + initial_progress=initial_progress, + ) else: return iter # no-op, when passed an iterator diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index 92900f94ff4..6d9431fc87c 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -142,6 +142,7 @@ def make_requirement_preparer( lazy_wheel=lazy_wheel, verbosity=verbosity, legacy_resolver=legacy_resolver, + resume_retries=options.resume_retries, ) @classmethod diff --git a/src/pip/_internal/exceptions.py b/src/pip/_internal/exceptions.py index 45a876a850d..36491ccc404 100644 --- a/src/pip/_internal/exceptions.py +++ b/src/pip/_internal/exceptions.py @@ -807,3 +807,24 @@ def __init__( ), hint_stmt="To proceed this package must be uninstalled.", ) + + +class IncompleteDownloadError(DiagnosticPipError): + """Raised when the downloader receives fewer bytes than advertised + in the Content-Length header.""" + + reference = "incomplete-download-error" + + def __init__(self, link: str, resume_retries: int) -> None: + message = ( + f"Download failed after {resume_retries} attempts because not enough" + " bytes were received. The incomplete file has been cleaned up." + ) + hint = "Use --resume-retries to configure resume retry limit." + + super().__init__( + message=message, + context=f"File: {link}\nResume retry limit: {resume_retries}", + hint_stmt=hint, + note_stmt="This is an issue with network connectivity, not pip.", + ) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index 5c3bce3d2fd..5d2f2cbf41d 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -5,12 +5,14 @@ import logging import mimetypes import os -from typing import Iterable, Optional, Tuple +from http import HTTPStatus +from typing import BinaryIO, Iterable, Optional, Tuple from pip._vendor.requests.models import Response +from pip._vendor.urllib3.exceptions import ReadTimeoutError from pip._internal.cli.progress_bars import get_download_progress_renderer -from pip._internal.exceptions import NetworkConnectionError +from pip._internal.exceptions import IncompleteDownloadError, NetworkConnectionError from pip._internal.models.index import PyPI from pip._internal.models.link import Link from pip._internal.network.cache import is_from_cache @@ -28,13 +30,21 @@ def _get_http_response_size(resp: Response) -> Optional[int]: return None +def _get_http_response_etag_or_date(resp: Response) -> Optional[str]: + """ + Return either the ETag or Date header (or None if neither exists). + The return value can be used in an If-Range header. + """ + return resp.headers.get("etag", resp.headers.get("date")) + + def _prepare_download( resp: Response, link: Link, progress_bar: str, + total_length: Optional[int], + range_start: Optional[int] = 0, ) -> Iterable[bytes]: - total_length = _get_http_response_size(resp) - if link.netloc == PyPI.file_storage_domain: url = link.show_url else: @@ -43,10 +53,17 @@ def _prepare_download( logged_url = redact_auth_from_url(url) if total_length: - logged_url = f"{logged_url} ({format_size(total_length)})" + if range_start: + logged_url = ( + f"{logged_url} ({format_size(range_start)}/{format_size(total_length)})" + ) + else: + logged_url = f"{logged_url} ({format_size(total_length)})" if is_from_cache(resp): logger.info("Using cached %s", logged_url) + elif range_start: + logger.info("Resuming download %s", logged_url) else: logger.info("Downloading %s", logged_url) @@ -66,7 +83,9 @@ def _prepare_download( if not show_progress: return chunks - renderer = get_download_progress_renderer(bar_type=progress_bar, size=total_length) + renderer = get_download_progress_renderer( + bar_type=progress_bar, size=total_length, initial_progress=range_start + ) return renderer(chunks) @@ -113,10 +132,27 @@ def _get_http_response_filename(resp: Response, link: Link) -> str: return filename -def _http_get_download(session: PipSession, link: Link) -> Response: +def _http_get_download( + session: PipSession, + link: Link, + range_start: Optional[int] = 0, + if_range: Optional[str] = None, +) -> Response: target_url = link.url.split("#", 1)[0] - resp = session.get(target_url, headers=HEADERS, stream=True) - raise_for_status(resp) + headers = {**HEADERS} + # request a partial download + if range_start: + headers["Range"] = f"bytes={range_start}-" + # make sure the file hasn't changed + if if_range: + headers["If-Range"] = if_range + try: + resp = session.get(target_url, headers=headers, stream=True) + raise_for_status(resp) + except NetworkConnectionError as e: + assert e.response is not None + logger.critical("HTTP error %s while getting %s", e.response.status_code, link) + raise return resp @@ -125,63 +161,169 @@ def __init__( self, session: PipSession, progress_bar: str, + resume_retries: int, ) -> None: + assert ( + resume_retries >= 0 + ), "Number of max resume retries must be bigger or equal to zero" self._session = session self._progress_bar = progress_bar + self._resume_retries = resume_retries def __call__(self, link: Link, location: str) -> Tuple[str, str]: """Download the file given by link into location.""" - try: - resp = _http_get_download(self._session, link) - except NetworkConnectionError as e: - assert e.response is not None - logger.critical( - "HTTP error %s while getting %s", e.response.status_code, link - ) - raise + resp = _http_get_download(self._session, link) + total_length = _get_http_response_size(resp) filename = _get_http_response_filename(resp, link) filepath = os.path.join(location, filename) - chunks = _prepare_download(resp, link, self._progress_bar) + bytes_received = 0 with open(filepath, "wb") as content_file: - for chunk in chunks: - content_file.write(chunk) + bytes_received = self._process_response( + resp, link, content_file, bytes_received, total_length + ) + + if not total_length: + content_type = resp.headers.get("Content-Type", "") + return filepath, content_type + + if bytes_received < total_length: + self._attempt_resume( + resp, link, content_file, total_length, bytes_received, filepath + ) + content_type = resp.headers.get("Content-Type", "") return filepath, content_type + def _process_response( + self, + resp: Response, + link: Link, + content_file: BinaryIO, + bytes_received: int, + total_length: Optional[int], + ) -> int: + """Process the response and write the chunks to the file.""" + chunks = _prepare_download( + resp, link, self._progress_bar, total_length, range_start=bytes_received + ) + + bytes_received = self._write_chunks_to_file( + chunks, + content_file, + bytes_received, + total_length, + ) + + return bytes_received + + def _write_chunks_to_file( + self, + chunks: Iterable[bytes], + content_file: BinaryIO, + bytes_received: int, + total_length: Optional[int], + ) -> int: + """Write the chunks to the file and return the number of bytes received.""" + try: + for chunk in chunks: + bytes_received += len(chunk) + content_file.write(chunk) + except ReadTimeoutError as e: + if not total_length: + # Raise exception if the Content-Length header is not provided + # and the connection times out. + raise e + + # Ensuring bytes_received is returned to attempt resume + logger.warning("Connection timed out while downloading.") + + return bytes_received + + def _attempt_resume( + self, + resp: Response, + link: Link, + content_file: BinaryIO, + total_length: Optional[int], + bytes_received: int, + filepath: str, + ) -> int: + """Attempt to resume the download if connection was dropped.""" + etag_or_date = _get_http_response_etag_or_date(resp) + + attempts_left = self._resume_retries + while attempts_left and total_length and bytes_received < total_length: + attempts_left -= 1 + + logger.warning( + "Attempting to resume download with bytes received: %s/%s", + format_size(bytes_received), + format_size(total_length), + ) + + try: + resume_resp = _http_get_download( + self._session, + link, + range_start=bytes_received, + if_range=etag_or_date, + ) + + # If the server responded with 200 (e.g. when the file has been + # modified on the server or range requests are not supported by + # the server), reset the download to start from the beginning. + restart = resume_resp.status_code != HTTPStatus.PARTIAL_CONTENT + if restart: + bytes_received, total_length, etag_or_date = ( + self._reset_download_state(resume_resp, content_file) + ) + + bytes_received = self._process_response( + resume_resp, + link, + content_file, + bytes_received, + total_length, + ) + except (ConnectionError, ReadTimeoutError): + continue + + if total_length and bytes_received < total_length: + os.remove(filepath) + raise IncompleteDownloadError(str(link), self._resume_retries) + + return bytes_received + + def _reset_download_state( + self, + resp: Response, + content_file: BinaryIO, + ) -> Tuple[int, Optional[int], Optional[str]]: + """Reset the download state to restart downloading from the beginning.""" + content_file.seek(0) + content_file.truncate() + bytes_received = 0 + total_length = _get_http_response_size(resp) + etag_or_date = _get_http_response_etag_or_date(resp) + + return bytes_received, total_length, etag_or_date + class BatchDownloader: def __init__( self, session: PipSession, progress_bar: str, + resume_retries: int, ) -> None: - self._session = session - self._progress_bar = progress_bar + self._downloader = Downloader(session, progress_bar, resume_retries) def __call__( self, links: Iterable[Link], location: str ) -> Iterable[Tuple[Link, Tuple[str, str]]]: """Download the files given by links into location.""" for link in links: - try: - resp = _http_get_download(self._session, link) - except NetworkConnectionError as e: - assert e.response is not None - logger.critical( - "HTTP error %s while getting %s", - e.response.status_code, - link, - ) - raise - - filename = _get_http_response_filename(resp, link) - filepath = os.path.join(location, filename) - - chunks = _prepare_download(resp, link, self._progress_bar) - with open(filepath, "wb") as content_file: - for chunk in chunks: - content_file.write(chunk) - content_type = resp.headers.get("Content-Type", "") + filepath, content_type = self._downloader(link, location) yield link, (filepath, content_type) diff --git a/src/pip/_internal/operations/prepare.py b/src/pip/_internal/operations/prepare.py index e6aa3447200..1719799ba52 100644 --- a/src/pip/_internal/operations/prepare.py +++ b/src/pip/_internal/operations/prepare.py @@ -231,6 +231,7 @@ def __init__( lazy_wheel: bool, verbosity: int, legacy_resolver: bool, + resume_retries: int, ) -> None: super().__init__() @@ -238,8 +239,8 @@ def __init__( self.build_dir = build_dir self.build_tracker = build_tracker self._session = session - self._download = Downloader(session, progress_bar) - self._batch_download = BatchDownloader(session, progress_bar) + self._download = Downloader(session, progress_bar, resume_retries) + self._batch_download = BatchDownloader(session, progress_bar, resume_retries) self.finder = finder # Where still-packed archives should be written to. If None, they are diff --git a/tests/unit/test_network_download.py b/tests/unit/test_network_download.py index 14998d229bf..4250ca4c4ee 100644 --- a/tests/unit/test_network_download.py +++ b/tests/unit/test_network_download.py @@ -1,53 +1,79 @@ import logging import sys -from typing import Dict +from pathlib import Path +from typing import Dict, List, Optional, Tuple +from unittest.mock import MagicMock, call, patch import pytest +from pip._internal.exceptions import IncompleteDownloadError from pip._internal.models.link import Link from pip._internal.network.download import ( + Downloader, + _get_http_response_size, + _http_get_download, _prepare_download, parse_content_disposition, sanitize_content_filename, ) +from pip._internal.network.session import PipSession +from pip._internal.network.utils import HEADERS from tests.lib.requests_mocks import MockResponse @pytest.mark.parametrize( - "url, headers, from_cache, expected", + "url, headers, from_cache, range_start, expected", [ ( "http://example.com/foo.tgz", {}, False, + None, "Downloading http://example.com/foo.tgz", ), ( "http://example.com/foo.tgz", {"content-length": "2"}, False, + None, "Downloading http://example.com/foo.tgz (2 bytes)", ), ( "http://example.com/foo.tgz", {"content-length": "2"}, True, + None, "Using cached http://example.com/foo.tgz (2 bytes)", ), - ("https://files.pythonhosted.org/foo.tgz", {}, False, "Downloading foo.tgz"), + ( + "https://files.pythonhosted.org/foo.tgz", + {}, + False, + None, + "Downloading foo.tgz", + ), ( "https://files.pythonhosted.org/foo.tgz", {"content-length": "2"}, False, + None, "Downloading foo.tgz (2 bytes)", ), ( "https://files.pythonhosted.org/foo.tgz", {"content-length": "2"}, True, + None, "Using cached foo.tgz", ), + ( + "http://example.com/foo.tgz", + {"content-length": "200"}, + False, + 100, + "Resuming download http://example.com/foo.tgz (100 bytes/200 bytes)", + ), ], ) def test_prepare_download__log( @@ -55,6 +81,7 @@ def test_prepare_download__log( url: str, headers: Dict[str, str], from_cache: bool, + range_start: Optional[int], expected: str, ) -> None: caplog.set_level(logging.INFO) @@ -64,7 +91,14 @@ def test_prepare_download__log( if from_cache: resp.from_cache = from_cache link = Link(url) - _prepare_download(resp, link, progress_bar="on") + total_length = _get_http_response_size(resp) + _prepare_download( + resp, + link, + progress_bar="on", + total_length=total_length, + range_start=range_start, + ) assert len(caplog.records) == 1 record = caplog.records[0] @@ -114,6 +148,29 @@ def test_sanitize_content_filename__platform_dependent( assert sanitize_content_filename(filename) == expected +@pytest.mark.parametrize( + "range_start, if_range, expected_headers", + [ + (None, None, HEADERS), + (1234, None, {**HEADERS, "Range": "bytes=1234-"}), + (1234, '"etag"', {**HEADERS, "Range": "bytes=1234-", "If-Range": '"etag"'}), + ], +) +def test_http_get_download( + range_start: Optional[int], + if_range: Optional[str], + expected_headers: Dict[str, str], +) -> None: + session = PipSession() + session.get = MagicMock() + link = Link("http://example.com/foo.tgz") + with patch("pip._internal.network.download.raise_for_status"): + _http_get_download(session, link, range_start, if_range) + session.get.assert_called_once_with( + "http://example.com/foo.tgz", headers=expected_headers, stream=True + ) + + @pytest.mark.parametrize( "content_disposition, default_filename, expected", [ @@ -125,3 +182,174 @@ def test_parse_content_disposition( ) -> None: actual = parse_content_disposition(content_disposition, default_filename) assert actual == expected + + +@pytest.mark.parametrize( + "resume_retries,mock_responses,expected_resume_args,expected_bytes", + [ + # If content-length is not provided, the download will + # always "succeed" since we don't have a way to check if + # the download is complete. + ( + 0, + [({}, 200, b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89")], + [], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # Complete download (content-length matches body) + ( + 0, + [({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89")], + [], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # Incomplete download without resume retries + ( + 0, + [({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-")], + [], + None, + ), + # Incomplete download with resume retries + ( + 5, + [ + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-"), + ({"content-length": "12"}, 206, b"f2561d5dfd89"), + ], + [(24, None)], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # If the server responds with 200 (e.g. no range header support or the file + # has changed between the requests) the downloader should restart instead of + # attempting to resume. The downloaded file should not be affected. + ( + 5, + [ + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-"), + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-"), + ( + {"content-length": "36"}, + 200, + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + ], + [(24, None), (14, None)], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # File size could change between requests. Make sure this is handled correctly. + ( + 5, + [ + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-"), + ( + {"content-length": "40"}, + 200, + b"new-0cfa7e9d-1868-4dd7-9fb3-f2561d5d", + ), + ({"content-length": "4"}, 206, b"fd89"), + ], + [(24, None), (36, None)], + b"new-0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # The downloader should fail after n resume_retries attempts. + # This prevents the downloader from getting stuck if the connection + # is unstable and the server doesn't not support range requests. + ( + 1, + [ + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-4dd7-9fb3-"), + ({"content-length": "36"}, 200, b"0cfa7e9d-1868-"), + ], + [(24, None)], + None, + ), + # The downloader should use If-Range header to make the range + # request conditional if it is possible to check for modifications + # (e.g. if we know the creation time of the initial response). + ( + 5, + [ + ( + {"content-length": "36", "date": "Wed, 21 Oct 2015 07:28:00 GMT"}, + 200, + b"0cfa7e9d-1868-4dd7-9fb3-", + ), + ( + {"content-length": "12", "date": "Wed, 21 Oct 2015 07:54:00 GMT"}, + 206, + b"f2561d5dfd89", + ), + ], + [(24, "Wed, 21 Oct 2015 07:28:00 GMT")], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + # ETag is preferred over Date for the If-Range condition. + ( + 5, + [ + ( + { + "content-length": "36", + "date": "Wed, 21 Oct 2015 07:28:00 GMT", + "etag": '"33a64df551425fcc55e4d42a148795d9f25f89d4"', + }, + 200, + b"0cfa7e9d-1868-4dd7-9fb3-", + ), + ( + { + "content-length": "12", + "date": "Wed, 21 Oct 2015 07:54:00 GMT", + "etag": '"33a64df551425fcc55e4d42a148795d9f25f89d4"', + }, + 206, + b"f2561d5dfd89", + ), + ], + [(24, '"33a64df551425fcc55e4d42a148795d9f25f89d4"')], + b"0cfa7e9d-1868-4dd7-9fb3-f2561d5dfd89", + ), + ], +) +def test_downloader( + resume_retries: int, + mock_responses: List[Tuple[Dict[str, str], int, bytes]], + # list of (range_start, if_range) + expected_resume_args: List[Tuple[Optional[int], Optional[int]]], + # expected_bytes is None means the download should fail + expected_bytes: Optional[bytes], + tmpdir: Path, +) -> None: + session = PipSession() + link = Link("http://example.com/foo.tgz") + downloader = Downloader(session, "on", resume_retries) + + responses = [] + for headers, status_code, body in mock_responses: + resp = MockResponse(body) + resp.headers = headers + resp.status_code = status_code + responses.append(resp) + _http_get_download = MagicMock(side_effect=responses) + + with patch("pip._internal.network.download._http_get_download", _http_get_download): + if expected_bytes is None: + remove = MagicMock(return_value=None) + with patch("os.remove", remove): + with pytest.raises(IncompleteDownloadError): + downloader(link, str(tmpdir)) + # Make sure the incomplete file is removed + remove.assert_called_once() + else: + filepath, _ = downloader(link, str(tmpdir)) + with open(filepath, "rb") as downloaded_file: + downloaded_bytes = downloaded_file.read() + assert downloaded_bytes == expected_bytes + + calls = [call(session, link)] # the initial request + for range_start, if_range in expected_resume_args: + calls.append(call(session, link, range_start=range_start, if_range=if_range)) + + # Make sure that the download makes additional requests for resumption + _http_get_download.assert_has_calls(calls) diff --git a/tests/unit/test_operations_prepare.py b/tests/unit/test_operations_prepare.py index 86e26c11801..bc6edcb3b9a 100644 --- a/tests/unit/test_operations_prepare.py +++ b/tests/unit/test_operations_prepare.py @@ -32,7 +32,7 @@ def _fake_session_get(*args: Any, **kwargs: Any) -> Dict[str, str]: session = Mock() session.get = _fake_session_get - download = Downloader(session, progress_bar="on") + download = Downloader(session, progress_bar="on", resume_retries=0) uri = data.packages.joinpath("simple-1.0.tar.gz").as_uri() link = Link(uri) @@ -78,7 +78,7 @@ def test_download_http_url__no_directory_traversal( "content-disposition": 'attachment;filename="../out_dir_file"', } session.get.return_value = resp - download = Downloader(session, progress_bar="on") + download = Downloader(session, progress_bar="on", resume_retries=0) download_dir = os.fspath(tmpdir.joinpath("download")) os.mkdir(download_dir) diff --git a/tests/unit/test_req.py b/tests/unit/test_req.py index e243a718725..327597a8c0f 100644 --- a/tests/unit/test_req.py +++ b/tests/unit/test_req.py @@ -109,6 +109,7 @@ def _basic_resolver( lazy_wheel=False, verbosity=0, legacy_resolver=True, + resume_retries=0, ) yield Resolver( preparer=preparer,