From 2e69b8e40085fd0f1b36e5d33a4310d1f2134120 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Mon, 26 Jan 2026 23:01:27 +0500 Subject: [PATCH 1/3] Add exception chaining to preserve error context - Add 'from e' to exception re-raises in CloudSQLEnrichmentHandler - Add exception chaining in processes.py for OSError and CalledProcessError - Improve logging in core.py to preserve traceback context This improves debuggability by preserving the full exception chain, following Python PEP 3134 best practices. Fixes #37422 --- sdks/python/apache_beam/transforms/core.py | 2 +- .../enrichment_handlers/cloudsql.py | 4 ++-- sdks/python/apache_beam/utils/processes.py | 24 +++++++++---------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index ea11bca9474d..86c5b8d7ac84 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2444,7 +2444,7 @@ def exception_handling_wrapper_do_fn_custom_process(self, *args, **kwargs): try: self._on_failure_callback(exn, args[0]) except Exception as e: - logging.warning('on_failure_callback failed with error: %s', e) + logging.warning('on_failure_callback failed with error: %s', e, exc_info=True) yield pvalue.TaggedOutput( self._dead_letter_tag, ( diff --git a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py index 3fe3a62f9546..ba0b8617f67b 100644 --- a/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py +++ b/sdks/python/apache_beam/transforms/enrichment_handlers/cloudsql.py @@ -395,11 +395,11 @@ def _execute_query( return data except Exception as e: transaction.rollback() - raise RuntimeError(f"Database operation failed: {e}") + raise RuntimeError(f"Database operation failed: {e}") from e except Exception as e: raise Exception( f'Could not execute the query. Please check if the query is properly ' - f'formatted and the table exists. {e}') + f'formatted and the table exists. {e}') from e finally: if connection: connection.close() diff --git a/sdks/python/apache_beam/utils/processes.py b/sdks/python/apache_beam/utils/processes.py index c7b9e240d961..f6daecea2125 100644 --- a/sdks/python/apache_beam/utils/processes.py +++ b/sdks/python/apache_beam/utils/processes.py @@ -49,18 +49,18 @@ def call(*args, **kwargs): kwargs['shell'] = True try: out = subprocess.call(*args, **kwargs) - except OSError: - raise RuntimeError("Executable {} not found".format(args[0])) + except OSError as e: + raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: if isinstance(args, tuple) and (args[0][2] == "pip"): raise RuntimeError( \ "Full traceback: {}\n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error. output)) + .format(traceback.format_exc(), args[0][6], error. output)) from error else: raise RuntimeError("Full trace: {}\ \n Output of the failed child process: {} " \ - .format(traceback.format_exc(), error.output)) + .format(traceback.format_exc(), error.output)) from error return out def check_call(*args, **kwargs): @@ -68,18 +68,18 @@ def check_call(*args, **kwargs): kwargs['shell'] = True try: out = subprocess.check_call(*args, **kwargs) - except OSError: - raise RuntimeError("Executable {} not found".format(args[0])) + except OSError as e: + raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: if isinstance(args, tuple) and (args[0][2] == "pip"): raise RuntimeError( \ "Full traceback: {} \n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error.output)) + .format(traceback.format_exc(), args[0][6], error.output)) from error else: raise RuntimeError("Full trace: {} \ \n Output of the failed child process: {}" \ - .format(traceback.format_exc(), error.output)) + .format(traceback.format_exc(), error.output)) from error return out def check_output(*args, **kwargs): @@ -87,18 +87,18 @@ def check_output(*args, **kwargs): kwargs['shell'] = True try: out = subprocess.check_output(*args, **kwargs) - except OSError: - raise RuntimeError("Executable {} not found".format(args[0])) + except OSError as e: + raise RuntimeError("Executable {} not found".format(args[0])) from e except subprocess.CalledProcessError as error: if isinstance(args, tuple) and (args[0][2] == "pip"): raise RuntimeError( \ "Full traceback: {} \n Pip install failed for package: {} \ \n Output from execution of subprocess: {}" \ - .format(traceback.format_exc(), args[0][6], error.output)) + .format(traceback.format_exc(), args[0][6], error.output)) from error else: raise RuntimeError("Full trace: {}, \ output of the failed child process {} "\ - .format(traceback.format_exc(), error.output)) + .format(traceback.format_exc(), error.output)) from error return out def Popen(*args, **kwargs): From f3b8a5fa760c1efece89bb72ec09a3efa00d14f7 Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 27 Jan 2026 00:50:42 +0500 Subject: [PATCH 2/3] Fix yapf formatting for logging.warning statement --- sdks/python/apache_beam/transforms/core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 86c5b8d7ac84..151a920f3c84 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2444,7 +2444,10 @@ def exception_handling_wrapper_do_fn_custom_process(self, *args, **kwargs): try: self._on_failure_callback(exn, args[0]) except Exception as e: - logging.warning('on_failure_callback failed with error: %s', e, exc_info=True) + logging.warning( + 'on_failure_callback failed with error: %s', + e, + exc_info=True) yield pvalue.TaggedOutput( self._dead_letter_tag, ( From 6d55c0134d6488d1575faf7d2fb2239d16847c4a Mon Sep 17 00:00:00 2001 From: shaheeramjad Date: Tue, 27 Jan 2026 10:57:59 +0500 Subject: [PATCH 3/3] Fix yapf formatting: put logging arguments on single line --- CHANGES.md | 1 + sdks/python/apache_beam/transforms/core.py | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ff931802addf..e2dcf6e0f2ca 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,6 +68,7 @@ ## New Features / Improvements +* (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)). * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Breaking Changes diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 151a920f3c84..128a070e2acf 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2445,9 +2445,7 @@ def exception_handling_wrapper_do_fn_custom_process(self, *args, **kwargs): self._on_failure_callback(exn, args[0]) except Exception as e: logging.warning( - 'on_failure_callback failed with error: %s', - e, - exc_info=True) + 'on_failure_callback failed with error: %s', e, exc_info=True) yield pvalue.TaggedOutput( self._dead_letter_tag, (