Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@

## New Features / Improvements

* (Python) Added exception chaining to preserve error context in CloudSQLEnrichmentHandler, processes utilities, and core transforms ([#37422](https://github.com/apache/beam/issues/37422)).
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Breaking Changes
Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2444,7 +2444,8 @@ def exception_handling_wrapper_do_fn_custom_process(self, *args, **kwargs):
try:
self._on_failure_callback(exn, args[0])
except Exception as e:
logging.warning('on_failure_callback failed with error: %s', e)
logging.warning(
'on_failure_callback failed with error: %s', e, exc_info=True)
yield pvalue.TaggedOutput(
self._dead_letter_tag,
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,11 @@ def _execute_query(
return data
except Exception as e:
transaction.rollback()
raise RuntimeError(f"Database operation failed: {e}")
raise RuntimeError(f"Database operation failed: {e}") from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're chaining the exception, do we still need to include the exception in the RuntimeError message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question exists below in this file as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With from e, the cause is in cause and the traceback, so we don’t need to include the exception in the message.
I kept it in because in many environments only the top-level exception message is logged or shown (e.g. in alerts or dashboards). In those cases, "Database operation failed" alone doesn’t tell you the real error, while f"Database operation failed: {e}" stays useful. It’s also a common Python pattern to add context and still include the original error in the wrapper message.
If you prefer to rely on the chain and keep messages minimal, I’m happy to change to e.g. raise RuntimeError("Database operation failed") from e and remove {e} from the message. Happy to follow whatever convention you prefer for this project.

except Exception as e:
raise Exception(
f'Could not execute the query. Please check if the query is properly '
f'formatted and the table exists. {e}')
f'formatted and the table exists. {e}') from e
finally:
if connection:
connection.close()
Expand Down
24 changes: 12 additions & 12 deletions sdks/python/apache_beam/utils/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,56 +49,56 @@ def call(*args, **kwargs):
kwargs['shell'] = True
try:
out = subprocess.call(*args, **kwargs)
except OSError:
raise RuntimeError("Executable {} not found".format(args[0]))
except OSError as e:
raise RuntimeError("Executable {} not found".format(args[0])) from e
except subprocess.CalledProcessError as error:
if isinstance(args, tuple) and (args[0][2] == "pip"):
raise RuntimeError( \
"Full traceback: {}\n Pip install failed for package: {} \
\n Output from execution of subprocess: {}" \
.format(traceback.format_exc(), args[0][6], error. output))
.format(traceback.format_exc(), args[0][6], error. output)) from error
else:
raise RuntimeError("Full trace: {}\
\n Output of the failed child process: {} " \
.format(traceback.format_exc(), error.output))
.format(traceback.format_exc(), error.output)) from error
return out

def check_call(*args, **kwargs):
if force_shell:
kwargs['shell'] = True
try:
out = subprocess.check_call(*args, **kwargs)
except OSError:
raise RuntimeError("Executable {} not found".format(args[0]))
except OSError as e:
raise RuntimeError("Executable {} not found".format(args[0])) from e
except subprocess.CalledProcessError as error:
if isinstance(args, tuple) and (args[0][2] == "pip"):
raise RuntimeError( \
"Full traceback: {} \n Pip install failed for package: {} \
\n Output from execution of subprocess: {}" \
.format(traceback.format_exc(), args[0][6], error.output))
.format(traceback.format_exc(), args[0][6], error.output)) from error
else:
raise RuntimeError("Full trace: {} \
\n Output of the failed child process: {}" \
.format(traceback.format_exc(), error.output))
.format(traceback.format_exc(), error.output)) from error
return out

def check_output(*args, **kwargs):
if force_shell:
kwargs['shell'] = True
try:
out = subprocess.check_output(*args, **kwargs)
except OSError:
raise RuntimeError("Executable {} not found".format(args[0]))
except OSError as e:
raise RuntimeError("Executable {} not found".format(args[0])) from e
except subprocess.CalledProcessError as error:
if isinstance(args, tuple) and (args[0][2] == "pip"):
raise RuntimeError( \
"Full traceback: {} \n Pip install failed for package: {} \
\n Output from execution of subprocess: {}" \
.format(traceback.format_exc(), args[0][6], error.output))
.format(traceback.format_exc(), args[0][6], error.output)) from error
else:
raise RuntimeError("Full trace: {}, \
output of the failed child process {} "\
.format(traceback.format_exc(), error.output))
.format(traceback.format_exc(), error.output)) from error
return out

def Popen(*args, **kwargs):
Expand Down
Loading