Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def create_job_resources(
build_setup_args: Optional[List[str]] = None,
pypi_requirements: Optional[List[str]] = None,
populate_requirements_cache: Optional[Callable[[str, str, bool],
None]] = None,
List[str]]] = None,
skip_prestaged_dependencies: Optional[bool] = False,
log_submission_env_dependencies: Optional[bool] = True,
):
Expand Down Expand Up @@ -220,6 +220,7 @@ def create_job_resources(
not os.path.exists(requirements_cache_path)):
os.makedirs(requirements_cache_path)

downloaded_packages = []
# Stage a requirements file if present.
if setup_options.requirements_file is not None:
if not os.path.isfile(setup_options.requirements_file):
Expand All @@ -245,12 +246,14 @@ def create_job_resources(
'such as --requirements_file. ')

if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
result = (
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file,
requirements_cache_path,
setup_options.requirements_cache_only_sources)
if result is not None:
downloaded_packages.extend(result)

if pypi_requirements:
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
Expand All @@ -260,18 +263,18 @@ def create_job_resources(
# Populate cache with packages from PyPI requirements and stage
# the files in the cache.
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
result = (
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name,
requirements_cache_path,
setup_options.requirements_cache_only_sources)
if result is not None:
downloaded_packages.extend(result)

if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and (
setup_options.requirements_file is not None or pypi_requirements):
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
resources.append(
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))
for pkg in downloaded_packages:
resources.append(
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))

# Handle a setup file if present.
# We will build the setup package locally and then copy it to the staging
Expand Down Expand Up @@ -431,7 +434,7 @@ def create_and_stage_job_resources(
temp_dir: Optional[str] = None,
pypi_requirements: Optional[List[str]] = None,
populate_requirements_cache: Optional[Callable[[str, str, bool],
None]] = None,
List[str]]] = None,
staging_location: Optional[str] = None):
"""For internal use only; no backwards-compatibility guarantees.

Expand Down Expand Up @@ -735,7 +738,9 @@ def _get_platform_for_default_sdk_container():
@retry.with_exponential_backoff(
num_retries=4, retry_filter=retry_on_non_zero_exit)
def _populate_requirements_cache(
requirements_file, cache_dir, populate_cache_with_sdists=False):
requirements_file,
cache_dir,
populate_cache_with_sdists=False) -> List[str]:
# The 'pip download' command will not download again if it finds the
# tarball with the proper version already present.
# It will get the packages downloaded in the order they are presented in
Expand Down Expand Up @@ -780,7 +785,12 @@ def _populate_requirements_cache(
platform_tag
])
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)
output = processes.check_output(cmd_args, stderr=subprocess.STDOUT)
downloaded_packages = []
for line in output.decode('utf-8').split('\n'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens to packages that were previously in the requirements cache, hence not downloaded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Packages already in the requirements cache will not appear in the output, this is because pip will not try to download them again

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that means they won't be staged, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, just the packages needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my point is, the packages already in the local cache dir, but also still required in requirements.txt won't get staged, and this is not as intended. Does this also match with your understanding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial understanding, as reflected in the current implementation, was that we only needed to stage newly downloaded or updated packages. I had assumed that since packages in the local cache are already available, staging them would be redundant.

My logic is as follows:

  1. Identify all packages required by the requirements.txt file.
  2. Identify all other required PyPI packages.
  3. Download any of these packages that are not already in the cache.
  4. Stage only the newly downloaded packages.

You've correctly pointed out that this means cached packages aren't staged. To make sure I get the fix right, could you help me understand the downstream process and why it's necessary to stage the cached packages as well?

Copy link
Contributor

@tvalentyn tvalentyn Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a user supplies a --requirements_file option, Beam stages packages to allow a runner execute a pipeline even if the runner environment doesn't have access to PyPI to download the packages on the fly.

To stage packages, we download the packages into the local requirements_cache folder, and then stage the entire folder. The disadvantage is that overtime the requirements_cache folder might have some other packages no longer in requirements.txt. That can cause additional uploads of files that are not necessary. Possible solutions:

  • Clean the requirements cache folder periodically: rm -rf /tmp/dataflow-requirements-cache
  • Use a custom container image (--sdk_container_image) instead of the --requirements_file, and install the packages in your image. This is a recommended option to have self-contained reproducible pipeline environments.
  • Don't stage requirements cache with --requirements_cache=skip (pipeline will depend on PyPI at runtime).

Copy link
Contributor

@tvalentyn tvalentyn Oct 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re how to improve the logic, I looked at the discussion we had on this topic:

https://lists.apache.org/thread/pqc2yl15kjdpxfp3pnocrrhkk3m6gsmh

and there are couple of ideas:

  1. Parse log output to infer dependencies that were downloaded but also note down files that were skipped, because they already existed in the cache (likely this will be brittle because it depends on pip having certain output formats)
  2. Download twice (https://lists.apache.org/thread/v35bgj67hqrwl4ldymo8bqkybgt3z096), something like the following (haven't tested):
pip download --dest /tmp/dataflow_requirements_cache -r requirements.txt --exists-action i --no-deps

pip download --dest /tmp/temporary_folder_that_will_be_cleaned_up -r requirements.txt --find-links /tmp/dataflow_requirements_cache

then, stage deps from temporary_folder_that_will_be_cleaned_up.

if line.startswith('Saved '):
downloaded_packages.append(line.split(' ')[1])
return downloaded_packages

@staticmethod
def _build_setup_package(
Expand Down
20 changes: 15 additions & 5 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ def file_copy(self, from_path, to_path):
def populate_requirements_cache(
self, requirements_file, cache_dir, populate_cache_with_sdists=False):
_ = requirements_file
self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
_ = populate_cache_with_sdists
pkgs = [
os.path.join(cache_dir, 'abc.txt'), os.path.join(cache_dir, 'def.txt')
]
for pkg in pkgs:
self.create_temp_file(pkg, 'nothing')
return pkgs

@mock.patch('apache_beam.runners.portability.stager.open')
@mock.patch('apache_beam.runners.portability.stager.get_new_http')
Expand Down Expand Up @@ -807,10 +812,15 @@ def test_remove_dependency_from_requirements(self):

def _populate_requitements_cache_fake(
self, requirements_file, temp_dir, populate_cache_with_sdists):
paths = []
if not populate_cache_with_sdists:
self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl')
self.create_temp_file(
os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')
path = os.path.join(temp_dir, 'nothing.whl')
self.create_temp_file(path, 'nothing')
paths.append(path)
path = os.path.join(temp_dir, 'nothing.tar.gz')
self.create_temp_file(path, 'Fake tarball content')
paths.append(path)
return paths

# requirements cache will popultated with bdist/whl if present
# else source would be downloaded.
Expand Down
Loading