diff --git a/.github/trigger_files/beam_PostCommit_Python.json b/.github/trigger_files/beam_PostCommit_Python.json index e43868bf4f24..d36d0db940c8 100644 --- a/.github/trigger_files/beam_PostCommit_Python.json +++ b/.github/trigger_files/beam_PostCommit_Python.json @@ -1,5 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "pr": "36271", - "modification": 37 + "pr": "37360", + "modification": 38 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json b/.github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json new file mode 100644 index 000000000000..b8513bdfc7b7 --- /dev/null +++ b/.github/trigger_files/beam_PostCommit_Python_Examples_Dataflow.json @@ -0,0 +1,5 @@ +{ + "comment": "Modify this file in a trivial way to cause this test suite to run.", + "pr": "37360", + "modification": 1 +} \ No newline at end of file diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 17cf6514cace..668477ce1461 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -213,13 +213,15 @@ def create_job_resources( # if we know we are using a dependency pre-installed sdk container image. if not skip_prestaged_dependencies: requirements_cache_path = ( - os.path.join(tempfile.gettempdir(), 'dataflow-requirements-cache') if + os.path.join(tempfile.gettempdir(), 'beam-requirements-cache') if (setup_options.requirements_cache is None) else setup_options.requirements_cache) if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE and not os.path.exists(requirements_cache_path)): os.makedirs(requirements_cache_path, exist_ok=True) + # Track packages to stage for this specific run. + packages_to_stage = set() # Stage a requirements file if present. if setup_options.requirements_file is not None: if not os.path.isfile(setup_options.requirements_file): @@ -245,12 +247,16 @@ def create_job_resources( 'such as --requirements_file. ') if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE: - ( + populate_cache_callable = ( populate_requirements_cache if populate_requirements_cache else - Stager._populate_requirements_cache)( - setup_options.requirements_file, - requirements_cache_path, - setup_options.requirements_cache_only_sources) + Stager._populate_requirements_cache) + + downloaded_packages = populate_cache_callable( + setup_options.requirements_file, + requirements_cache_path, + setup_options.requirements_cache_only_sources) + if downloaded_packages: + packages_to_stage.update(downloaded_packages) if pypi_requirements: tf = tempfile.NamedTemporaryFile(mode='w', delete=False) @@ -260,18 +266,23 @@ def create_job_resources( # Populate cache with packages from PyPI requirements and stage # the files in the cache. if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE: - ( + populate_cache_callable = ( populate_requirements_cache if populate_requirements_cache else - Stager._populate_requirements_cache)( - tf.name, - requirements_cache_path, - setup_options.requirements_cache_only_sources) + Stager._populate_requirements_cache) + downloaded_packages = populate_cache_callable( + tf.name, + requirements_cache_path, + setup_options.requirements_cache_only_sources) + if downloaded_packages: + packages_to_stage.update(downloaded_packages) if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and ( setup_options.requirements_file is not None or pypi_requirements): - for pkg in glob.glob(os.path.join(requirements_cache_path, '*')): - resources.append( - Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg))) + for pkg in packages_to_stage: + pkg_path = os.path.join(requirements_cache_path, pkg) + if os.path.exists(pkg_path): + resources.append( + Stager._create_file_stage_to_artifact(pkg_path, pkg)) # Handle a setup file if present. # We will build the setup package locally and then copy it to the staging @@ -741,19 +752,26 @@ def _populate_requirements_cache( # the requirements file and will download package dependencies. # The apache-beam dependency is excluded from requirements cache population - # because we stage the SDK separately. + # because we stage the SDK separately. with tempfile.TemporaryDirectory() as temp_directory: tmp_requirements_filepath = Stager._remove_dependency_from_requirements( requirements_file=requirements_file, dependency_to_remove='apache-beam', temp_directory_path=temp_directory) + # Download to a temporary directory first, then copy to cache. + # This allows us to track exactly which packages are needed for this + # requirements file. + download_dir = tempfile.mkdtemp(dir=temp_directory) + cmd_args = [ Stager._get_python_executable(), '-m', 'pip', 'download', '--dest', + download_dir, + '--find-links', cache_dir, '-r', tmp_requirements_filepath, @@ -781,6 +799,18 @@ def _populate_requirements_cache( _LOGGER.info('Executing command: %s', cmd_args) processes.check_output(cmd_args, stderr=processes.STDOUT) + # Get list of downloaded packages and copy them to the cache + downloaded_packages = set() + for pkg_file in os.listdir(download_dir): + downloaded_packages.add(pkg_file) + src_path = os.path.join(download_dir, pkg_file) + dest_path = os.path.join(cache_dir, pkg_file) + # Only copy if not already in cache + if not os.path.exists(dest_path): + shutil.copy2(src_path, dest_path) + + return downloaded_packages + @staticmethod def _build_setup_package( setup_file: str, diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 233e0c3dcea1..4ec1c697fbff 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -100,6 +100,8 @@ def populate_requirements_cache( _ = requirements_file self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing') self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing') + # Return the list of packages that were "downloaded" for this requirements + return {'abc.txt', 'def.txt'} @mock.patch('apache_beam.runners.portability.stager.open') @mock.patch('apache_beam.runners.portability.stager.get_new_http') @@ -810,10 +812,14 @@ def test_remove_dependency_from_requirements(self): def _populate_requitements_cache_fake( self, requirements_file, temp_dir, populate_cache_with_sdists): + packages = set() if not populate_cache_with_sdists: self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl') + packages.add('nothing.whl') self.create_temp_file( os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball') + packages.add('nothing.tar.gz') + return packages # requirements cache will popultated with bdist/whl if present # else source would be downloaded. @@ -913,6 +919,89 @@ def test_populate_requirements_cache_with_local_files(self): self.assertNotIn('fake_pypi', extra_packages_contents) self.assertIn('local_package', extra_packages_contents) + def test_only_required_packages_staged_from_cache(self): + """Test that only packages needed for current requirements are staged. + + This test verifies the fix for the issue where the entire cache directory + was being staged, even when some packages weren't needed for the current + workflow. + """ + staging_dir = self.make_temp_dir() + requirements_cache_dir = self.make_temp_dir() + source_dir = self.make_temp_dir() + + # Pre-populate cache with packages from a "previous run" + self.create_temp_file( + os.path.join(requirements_cache_dir, 'old_package.whl'), 'old package') + self.create_temp_file( + os.path.join(requirements_cache_dir, 'another_old.tar.gz'), 'another') + + options = PipelineOptions() + self.update_options(options) + options.view_as(SetupOptions).requirements_cache = requirements_cache_dir + options.view_as(SetupOptions).requirements_file = os.path.join( + source_dir, stager.REQUIREMENTS_FILE) + self.create_temp_file( + os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'new_package') + + def populate_cache_for_new_requirements( + requirements_file, cache_dir, populate_cache_with_sdists=False): + # Simulate downloading only the packages needed for new requirements + self.create_temp_file( + os.path.join(cache_dir, 'new_package.whl'), 'new package content') + return {'new_package.whl'} + + resources = self.stager.create_and_stage_job_resources( + options, + populate_requirements_cache=populate_cache_for_new_requirements, + staging_location=staging_dir)[1] + + # Verify only new_package.whl is staged, not old packages + self.assertIn('new_package.whl', resources) + self.assertNotIn('old_package.whl', resources) + self.assertNotIn('another_old.tar.gz', resources) + + # Verify the file exists in staging + self.assertTrue( + os.path.isfile(os.path.join(staging_dir, 'new_package.whl'))) + # Verify old packages are NOT in staging + self.assertFalse( + os.path.isfile(os.path.join(staging_dir, 'old_package.whl'))) + self.assertFalse( + os.path.isfile(os.path.join(staging_dir, 'another_old.tar.gz'))) + + def test_populate_requirements_cache_uses_find_links(self): + """Test that _populate_requirements_cache uses --find-links to reuse cache. + + This test verifies that pip download is called with --find-links pointing + to the cache directory, so packages already in cache are reused instead + of being re-downloaded from PyPI. + """ + requirements_cache_dir = self.make_temp_dir() + source_dir = self.make_temp_dir() + + # Create a requirements file + requirements_file = os.path.join(source_dir, 'requirements.txt') + self.create_temp_file(requirements_file, 'some_package==1.0.0') + + captured_cmd_args = [] + + def mock_check_output(cmd_args, **kwargs): + captured_cmd_args.extend(cmd_args) + return b'' + + with mock.patch( + 'apache_beam.runners.portability.stager.processes.check_output', + side_effect=mock_check_output): + stager.Stager._populate_requirements_cache( + requirements_file, requirements_cache_dir) + + # Verify --find-links is in the command with the cache directory + self.assertIn('--find-links', captured_cmd_args) + find_links_index = captured_cmd_args.index('--find-links') + self.assertEqual( + requirements_cache_dir, captured_cmd_args[find_links_index + 1]) + class TestStager(stager.Stager): def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):