Skip to content
21 changes: 19 additions & 2 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,22 @@ def create_harness(environment, dry_run=False):
else:
fn_log_handler = None

pipeline_options_dict = _load_pipeline_options(
environment.get('PIPELINE_OPTIONS'))
options_json = environment.get('PIPELINE_OPTIONS')

#We check if options are stored in the file.
if 'PIPELINE_OPTIONS_FILE' in environment:
options_file = environment['PIPELINE_OPTIONS_FILE']
try:
with open(options_file, 'r') as f:
options_json = f.read()
_LOGGER.info('Load pipeline options from file: %s', options_file)
except:
_LOGGER.error(
'Failed to load pipeline options from file: %s', options_file)
raise

pipeline_options_dict = _load_pipeline_options(options_json)

default_log_level = _get_log_level_from_options_dict(pipeline_options_dict)
logging.getLogger().setLevel(default_log_level)
_set_log_level_overrides(pipeline_options_dict)
Expand Down Expand Up @@ -239,6 +253,7 @@ def terminate_sdk_harness():


def _load_pipeline_options(options_json):
"""Deserialize the pipeline options from a JSON string into a dictionary."""
if options_json is None:
return {}
options = json.loads(options_json)
Expand All @@ -256,6 +271,8 @@ def _load_pipeline_options(options_json):


def _parse_pipeline_options(options_json):
"""Parses the pipeline options from a JSON string into a PipelineOptions
object."""
return PipelineOptions.from_dictionary(_load_pipeline_options(options_json))


Expand Down
6 changes: 5 additions & 1 deletion sdks/python/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,11 @@ func launchSDKProcess() error {

// (3) Invoke python

os.Setenv("PIPELINE_OPTIONS", options)
// Write the JSON string of pipeline options into a file to prevent "argument list too long" error.
if err := tools.MakePipelineOptionsFileAndEnvVar(options); err != nil {
logger.Fatalf(ctx, "Failed to load pipeline options to worker: %v", err)
}

os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *loggingEndpoint}).String())
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", (&pipepb.ApiServiceDescriptor{Url: *controlEndpoint}).String())
Expand Down
Loading