From c7c2fc0e7faac0ae5859d0da87268b5df52090a2 Mon Sep 17 00:00:00 2001 From: Tanishq Date: Sun, 25 Jan 2026 10:09:17 +0530 Subject: [PATCH] #19711-Fix missing step_id in Python SDK worker logs during DoFn setup --- .../apache_beam/runners/worker/bundle_processor.py | 10 ++++++++-- sdks/python/apache_beam/runners/worker/log_handler.py | 9 ++++++--- sdks/python/apache_beam/runners/worker/sdk_worker.py | 3 ++- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index faa756d7c5c5..a08bca7dada9 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -1091,6 +1091,7 @@ def __init__( state_handler: sdk_worker.CachingStateHandler, data_channel_factory: data_plane.DataChannelFactory, data_sampler: Optional[data_sampler.DataSampler] = None, + instruction_id: Optional[str] = None, ) -> None: """Initialize a bundle processor. @@ -1159,8 +1160,13 @@ def __init__( from apache_beam.runners.worker.sdk_worker_main import terminate_sdk_harness terminate_sdk_harness() - for op in reversed(self.ops.values()): - op.setup(self.data_sampler) + if instruction_id: + with statesampler.instruction_id(instruction_id): + for op in reversed(self.ops.values()): + op.setup(self.data_sampler) + else: + for op in reversed(self.ops.values()): + op.setup(self.data_sampler) self.splitting_lock = threading.Lock() def create_execution_tree( diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py index 69815acc7194..eb2a153c2b43 100644 --- a/sdks/python/apache_beam/runners/worker/log_handler.py +++ b/sdks/python/apache_beam/runners/worker/log_handler.py @@ -134,9 +134,12 @@ def emit(self, record: logging.LogRecord) -> None: log_entry.timestamp.nanos = int(nanoseconds) if record.exc_info: log_entry.trace = ''.join(traceback.format_exception(*record.exc_info)) - instruction_id = statesampler.get_current_instruction_id() - if instruction_id: - log_entry.instruction_id = instruction_id + if hasattr(record, 'instruction_id'): + log_entry.instruction_id = record.instruction_id + if not log_entry.instruction_id: + instruction_id = statesampler.get_current_instruction_id() + if instruction_id: + log_entry.instruction_id = instruction_id tracker = statesampler.get_current_tracker() if tracker: current_state = tracker.current_state() diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py index 6060ff8d54a8..e03eb354bcb5 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py @@ -520,7 +520,8 @@ def get(self, instruction_id, bundle_descriptor_id): self.state_handler_factory.create_state_handler( pbd.state_api_service_descriptor), self.data_channel_factory, - self.data_sampler) + self.data_sampler, + instruction_id=instruction_id) with self._lock: self.active_bundle_processors[ instruction_id] = bundle_descriptor_id, processor