From 9c5d868651db0d59c3b6bfee6545a5c2e3c08ac8 Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 21 Jan 2026 16:06:12 -0500 Subject: [PATCH 01/10] Support large pipeline options in Python SDK #37370 --- .../runners/worker/sdk_worker_main.py | 24 ++++++++++++-- sdks/python/container/boot.go | 31 ++++++++++++++++++- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index e4dd6cc2121f..c8885067442f 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -92,9 +92,29 @@ def create_harness(environment, dry_run=False): fn_log_handler = None else: fn_log_handler = None + + options_json = environment.get('PIPELINE_OPTIONS') + + + #We check if options are stored in the file. + if 'PIPELINE_OPTIONS_FILE' in environment: + options_file = environment['PIPELINE_OPTIONS_FILE'] + try: + with open(options_file, 'r') as f: + options_json = f.read() + _LOGGER.info('Load pipeline options from file: %s', options_file) + except: + _LOGGER.error('Failed to load pipeline options from file: %s', options_file) + raise + + pipeline_options_dict = _load_pipeline_options(options_json) + + #pipeline_options_dict = _load_pipeline_options( + # environment.get('PIPELINE_OPTIONS')) + + + - pipeline_options_dict = _load_pipeline_options( - environment.get('PIPELINE_OPTIONS')) default_log_level = _get_log_level_from_options_dict(pipeline_options_dict) logging.getLogger().setLevel(default_log_level) _set_log_level_overrides(pipeline_options_dict) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 847325d4f83c..622c1fa5667d 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -220,7 +220,18 @@ func launchSDKProcess() error { // (3) Invoke python - os.Setenv("PIPELINE_OPTIONS", options) + //Commented out --> + //os.Setenv("PIPELINE_OPTIONS", options) + + //To prevent crashes if options > ARG_MAX --> + // --> Write the large JSON content into a file on disk + // --> environment only receives the short filename + if optionsFile, err := MakePipelineOptionsFileAndEnvVar(options); err != nil { + logger.Fatalf(ctx, "Failed to create a pipeline options file: %v", err) + } else { + os.Setenv("PIPELINE_OPTIONS_FILE", optionsFile) + } + os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir) os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir) os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String()) os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String()) @@ -503,3 +514,21 @@ func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.Buffered bufLogger.Printf(ctx, "%s", string(content)) return nil } + +// MakePipelineOptionsFileAndEnvVar writes the pipeline options to a file. +// Assumes the options string is JSON formatted. +// +// Stores the file name in question in PIPELINE_OPTIONS_FILE for access by the SDK. +func MakePipelineOptionsFileAndEnvVar(options string) (string, error) { + fn := "pipeline_options.json" + f, err := os.Create(fn) + if err != nil { + return "", fmt.Errorf("unable to create %v: %w", fn, err) + } + defer f.Close() + if _, err := f.WriteString(options); err != nil { + return "", fmt.Errorf("error writing %v: %w", f.Name(), err) + } + os.Setenv("PIPELINE_OPTIONS_FILE", f.Name()) + return fn, nil +} From 0707d08484f0f4b0031fd5f8063e4f3e7b8ff162 Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 21 Jan 2026 17:06:43 -0500 Subject: [PATCH 02/10] reformatted code to match the project's standards to pass checks --- .../apache_beam/runners/worker/sdk_worker_main.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index c8885067442f..084b26c41fc1 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -29,8 +29,6 @@ import time import traceback -from google.protobuf import text_format - from apache_beam.internal import pickler from apache_beam.io import filesystems from apache_beam.options.pipeline_options import DebugOptions @@ -46,6 +44,7 @@ from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler from apache_beam.runners.worker.sdk_worker import SdkHarness from apache_beam.utils import profiler +from google.protobuf import text_format _LOGGER = logging.getLogger(__name__) _ENABLE_GOOGLE_CLOUD_PROFILER = 'enable_google_cloud_profiler' @@ -92,9 +91,8 @@ def create_harness(environment, dry_run=False): fn_log_handler = None else: fn_log_handler = None - - options_json = environment.get('PIPELINE_OPTIONS') + options_json = environment.get('PIPELINE_OPTIONS') #We check if options are stored in the file. if 'PIPELINE_OPTIONS_FILE' in environment: @@ -104,16 +102,14 @@ def create_harness(environment, dry_run=False): options_json = f.read() _LOGGER.info('Load pipeline options from file: %s', options_file) except: - _LOGGER.error('Failed to load pipeline options from file: %s', options_file) + _LOGGER.error( + 'Failed to load pipeline options from file: %s', options_file) raise pipeline_options_dict = _load_pipeline_options(options_json) #pipeline_options_dict = _load_pipeline_options( # environment.get('PIPELINE_OPTIONS')) - - - default_log_level = _get_log_level_from_options_dict(pipeline_options_dict) logging.getLogger().setLevel(default_log_level) @@ -271,7 +267,8 @@ def _load_pipeline_options(options_json): return { re.match(portable_option_regex, k).group('key') if re.match( portable_option_regex, k) else k: v - for k, v in options.items() + for k, + v in options.items() } From 258ddf8eb0520543e513bdfbec7b3122cb06b836 Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 21 Jan 2026 17:26:59 -0500 Subject: [PATCH 03/10] Fixed PythonFormatterPreCommitscript error" --- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 084b26c41fc1..80359a500499 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -267,8 +267,7 @@ def _load_pipeline_options(options_json): return { re.match(portable_option_regex, k).group('key') if re.match( portable_option_regex, k) else k: v - for k, - v in options.items() + for k, v in options.items() } From 6a9cd2ff66cacc17060a9f95b23328bad2c9818f Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 21 Jan 2026 17:53:26 -0500 Subject: [PATCH 04/10] Fix import order: move google.protobuf above apache_beam --- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 80359a500499..f836d4be5dd6 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -29,6 +29,9 @@ import time import traceback + +from google.protobuf import text_format + from apache_beam.internal import pickler from apache_beam.io import filesystems from apache_beam.options.pipeline_options import DebugOptions @@ -44,7 +47,6 @@ from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler from apache_beam.runners.worker.sdk_worker import SdkHarness from apache_beam.utils import profiler -from google.protobuf import text_format _LOGGER = logging.getLogger(__name__) _ENABLE_GOOGLE_CLOUD_PROFILER = 'enable_google_cloud_profiler' From 937fd5451fda85841dcaf694811d5266c2ff2161 Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 21 Jan 2026 18:11:36 -0500 Subject: [PATCH 05/10] Removed blank line between import and google import --- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index f836d4be5dd6..56d172faa7db 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -29,7 +29,6 @@ import time import traceback - from google.protobuf import text_format from apache_beam.internal import pickler From f1aa586e7acb642879638e5696539bef1d7fd65a Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 28 Jan 2026 15:19:41 -0500 Subject: [PATCH 06/10] Addressed the review comments: cleaned up comments, align error message with JAVA and GO SDKs --- .../runners/worker/sdk_worker_main.py | 4 +-- sdks/python/container/boot.go | 33 +++---------------- 2 files changed, 6 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 56d172faa7db..0847e2241877 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -109,8 +109,6 @@ def create_harness(environment, dry_run=False): pipeline_options_dict = _load_pipeline_options(options_json) - #pipeline_options_dict = _load_pipeline_options( - # environment.get('PIPELINE_OPTIONS')) default_log_level = _get_log_level_from_options_dict(pipeline_options_dict) logging.getLogger().setLevel(default_log_level) @@ -256,6 +254,7 @@ def terminate_sdk_harness(): def _load_pipeline_options(options_json): + """Deserialize the pipeline options from a JSON string into a dictionary.""" if options_json is None: return {} options = json.loads(options_json) @@ -273,6 +272,7 @@ def _load_pipeline_options(options_json): def _parse_pipeline_options(options_json): + """Parses the pipeline options from a JSON string into a PipelineOptions object.""" return PipelineOptions.from_dictionary(_load_pipeline_options(options_json)) diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go index 622c1fa5667d..73f38e50e108 100644 --- a/sdks/python/container/boot.go +++ b/sdks/python/container/boot.go @@ -220,18 +220,11 @@ func launchSDKProcess() error { // (3) Invoke python - //Commented out --> - //os.Setenv("PIPELINE_OPTIONS", options) - - //To prevent crashes if options > ARG_MAX --> - // --> Write the large JSON content into a file on disk - // --> environment only receives the short filename - if optionsFile, err := MakePipelineOptionsFileAndEnvVar(options); err != nil { - logger.Fatalf(ctx, "Failed to create a pipeline options file: %v", err) - } else { - os.Setenv("PIPELINE_OPTIONS_FILE", optionsFile) + // Write the JSON string of pipeline options into a file to prevent "argument list too long" error. + if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil { + logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err) } - os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir) + os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir) os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String()) os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String()) @@ -514,21 +507,3 @@ func logSubmissionEnvDependencies(ctx context.Context, bufLogger *tools.Buffered bufLogger.Printf(ctx, "%s", string(content)) return nil } - -// MakePipelineOptionsFileAndEnvVar writes the pipeline options to a file. -// Assumes the options string is JSON formatted. -// -// Stores the file name in question in PIPELINE_OPTIONS_FILE for access by the SDK. -func MakePipelineOptionsFileAndEnvVar(options string) (string, error) { - fn := "pipeline_options.json" - f, err := os.Create(fn) - if err != nil { - return "", fmt.Errorf("unable to create %v: %w", fn, err) - } - defer f.Close() - if _, err := f.WriteString(options); err != nil { - return "", fmt.Errorf("error writing %v: %w", f.Name(), err) - } - os.Setenv("PIPELINE_OPTIONS_FILE", f.Name()) - return fn, nil -} From 171cfadb6a51e98bc5e474fc4a6fc54fe11042f8 Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 28 Jan 2026 15:36:37 -0500 Subject: [PATCH 07/10] Fixed Formatting Error --- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 0847e2241877..91159d241cca 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -109,7 +109,6 @@ def create_harness(environment, dry_run=False): pipeline_options_dict = _load_pipeline_options(options_json) - default_log_level = _get_log_level_from_options_dict(pipeline_options_dict) logging.getLogger().setLevel(default_log_level) _set_log_level_overrides(pipeline_options_dict) From 4666684dd7022122aeec469e5a52d3c49c5842aa Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 28 Jan 2026 16:11:04 -0500 Subject: [PATCH 08/10] Fixed line lenght that causes lint error --- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 91159d241cca..0a85c6ddc157 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -271,7 +271,8 @@ def _load_pipeline_options(options_json): def _parse_pipeline_options(options_json): - """Parses the pipeline options from a JSON string into a PipelineOptions object.""" + """Parses the pipeline options from a JSON string into a PipelineOptions + object.""" return PipelineOptions.from_dictionary(_load_pipeline_options(options_json)) From e64900c7756823e5bf4f3383733c0d6574479057 Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 28 Jan 2026 17:46:12 -0500 Subject: [PATCH 09/10] rerun tests From 86cfb0c95a3a944961daddbcfacc9528d8ffc0ae Mon Sep 17 00:00:00 2001 From: "mathijsdeelen ." Date: Wed, 28 Jan 2026 18:26:46 -0500 Subject: [PATCH 10/10] Rerun tests