Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "36271",
"modification": 37
"pr": "37360",
"modification": 38
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "37360",
"modification": 1
}
60 changes: 45 additions & 15 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,15 @@ def create_job_resources(
# if we know we are using a dependency pre-installed sdk container image.
if not skip_prestaged_dependencies:
requirements_cache_path = (
os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') if
os.path.join(tempfile.gettempdir(), 'beam-requirements-cache') if
(setup_options.requirements_cache
is None) else setup_options.requirements_cache)
if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE and
not os.path.exists(requirements_cache_path)):
os.makedirs(requirements_cache_path, exist_ok=True)

# Track packages to stage for this specific run.
packages_to_stage = set()
# Stage a requirements file if present.
if setup_options.requirements_file is not None:
if not os.path.isfile(setup_options.requirements_file):
Expand All @@ -245,12 +247,16 @@ def create_job_resources(
'such as --requirements_file. ')

if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
populate_cache_callable = (
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
setup_options.requirements_file,
requirements_cache_path,
setup_options.requirements_cache_only_sources)
Stager._populate_requirements_cache)

downloaded_packages = populate_cache_callable(
setup_options.requirements_file,
requirements_cache_path,
setup_options.requirements_cache_only_sources)
if downloaded_packages:
packages_to_stage.update(downloaded_packages)

if pypi_requirements:
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
Expand All @@ -260,18 +266,23 @@ def create_job_resources(
# Populate cache with packages from PyPI requirements and stage
# the files in the cache.
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
(
populate_cache_callable = (
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache)(
tf.name,
requirements_cache_path,
setup_options.requirements_cache_only_sources)
Stager._populate_requirements_cache)
downloaded_packages = populate_cache_callable(
tf.name,
requirements_cache_path,
setup_options.requirements_cache_only_sources)
if downloaded_packages:
packages_to_stage.update(downloaded_packages)

if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and (
setup_options.requirements_file is not None or pypi_requirements):
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
resources.append(
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))
for pkg in packages_to_stage:
pkg_path = os.path.join(requirements_cache_path, pkg)
if os.path.exists(pkg_path):
resources.append(
Stager._create_file_stage_to_artifact(pkg_path, pkg))

# Handle a setup file if present.
# We will build the setup package locally and then copy it to the staging
Expand Down Expand Up @@ -741,19 +752,26 @@ def _populate_requirements_cache(
# the requirements file and will download package dependencies.

# The apache-beam dependency is excluded from requirements cache population
# because we stage the SDK separately.
# because we stage the SDK separately.
with tempfile.TemporaryDirectory() as temp_directory:
tmp_requirements_filepath = Stager._remove_dependency_from_requirements(
requirements_file=requirements_file,
dependency_to_remove='apache-beam',
temp_directory_path=temp_directory)

# Download to a temporary directory first, then copy to cache.
# This allows us to track exactly which packages are needed for this
# requirements file.
download_dir = tempfile.mkdtemp(dir=temp_directory)

cmd_args = [
Stager._get_python_executable(),
'-m',
'pip',
'download',
'--dest',
download_dir,
'--find-links',
cache_dir,
'-r',
tmp_requirements_filepath,
Expand Down Expand Up @@ -781,6 +799,18 @@ def _populate_requirements_cache(
_LOGGER.info('Executing command: %s', cmd_args)
processes.check_output(cmd_args, stderr=processes.STDOUT)

# Get list of downloaded packages and copy them to the cache
downloaded_packages = set()
for pkg_file in os.listdir(download_dir):
downloaded_packages.add(pkg_file)
src_path = os.path.join(download_dir, pkg_file)
dest_path = os.path.join(cache_dir, pkg_file)
# Only copy if not already in cache
if not os.path.exists(dest_path):
shutil.copy2(src_path, dest_path)

return downloaded_packages

@staticmethod
def _build_setup_package(
setup_file: str,
Expand Down
89 changes: 89 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ def populate_requirements_cache(
_ = requirements_file
self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
# Return the list of packages that were "downloaded" for this requirements
return {'abc.txt', 'def.txt'}

@mock.patch('apache_beam.runners.portability.stager.open')
@mock.patch('apache_beam.runners.portability.stager.get_new_http')
Expand Down Expand Up @@ -810,10 +812,14 @@ def test_remove_dependency_from_requirements(self):

def _populate_requitements_cache_fake(
self, requirements_file, temp_dir, populate_cache_with_sdists):
packages = set()
if not populate_cache_with_sdists:
self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl')
packages.add('nothing.whl')
self.create_temp_file(
os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')
packages.add('nothing.tar.gz')
return packages

# requirements cache will popultated with bdist/whl if present
# else source would be downloaded.
Expand Down Expand Up @@ -913,6 +919,89 @@ def test_populate_requirements_cache_with_local_files(self):
self.assertNotIn('fake_pypi', extra_packages_contents)
self.assertIn('local_package', extra_packages_contents)

def test_only_required_packages_staged_from_cache(self):
"""Test that only packages needed for current requirements are staged.

This test verifies the fix for the issue where the entire cache directory
was being staged, even when some packages weren't needed for the current
workflow.
"""
staging_dir = self.make_temp_dir()
requirements_cache_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()

# Pre-populate cache with packages from a "previous run"
self.create_temp_file(
os.path.join(requirements_cache_dir, 'old_package.whl'), 'old package')
self.create_temp_file(
os.path.join(requirements_cache_dir, 'another_old.tar.gz'), 'another')

options = PipelineOptions()
self.update_options(options)
options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
options.view_as(SetupOptions).requirements_file = os.path.join(
source_dir, stager.REQUIREMENTS_FILE)
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'new_package')

def populate_cache_for_new_requirements(
requirements_file, cache_dir, populate_cache_with_sdists=False):
# Simulate downloading only the packages needed for new requirements
self.create_temp_file(
os.path.join(cache_dir, 'new_package.whl'), 'new package content')
return {'new_package.whl'}

resources = self.stager.create_and_stage_job_resources(
options,
populate_requirements_cache=populate_cache_for_new_requirements,
staging_location=staging_dir)[1]

# Verify only new_package.whl is staged, not old packages
self.assertIn('new_package.whl', resources)
self.assertNotIn('old_package.whl', resources)
self.assertNotIn('another_old.tar.gz', resources)

# Verify the file exists in staging
self.assertTrue(
os.path.isfile(os.path.join(staging_dir, 'new_package.whl')))
# Verify old packages are NOT in staging
self.assertFalse(
os.path.isfile(os.path.join(staging_dir, 'old_package.whl')))
self.assertFalse(
os.path.isfile(os.path.join(staging_dir, 'another_old.tar.gz')))

def test_populate_requirements_cache_uses_find_links(self):
"""Test that _populate_requirements_cache uses --find-links to reuse cache.

This test verifies that pip download is called with --find-links pointing
to the cache directory, so packages already in cache are reused instead
of being re-downloaded from PyPI.
"""
requirements_cache_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()

# Create a requirements file
requirements_file = os.path.join(source_dir, 'requirements.txt')
self.create_temp_file(requirements_file, 'some_package==1.0.0')

captured_cmd_args = []

def mock_check_output(cmd_args, **kwargs):
captured_cmd_args.extend(cmd_args)
return b''

with mock.patch(
'apache_beam.runners.portability.stager.processes.check_output',
side_effect=mock_check_output):
stager.Stager._populate_requirements_cache(
requirements_file, requirements_cache_dir)

# Verify --find-links is in the command with the cache directory
self.assertIn('--find-links', captured_cmd_args)
find_links_index = captured_cmd_args.index('--find-links')
self.assertEqual(
requirements_cache_dir, captured_cmd_args[find_links_index + 1])


class TestStager(stager.Stager):
def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):
Expand Down
Loading