Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pathwaysutils/debug/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
70 changes: 70 additions & 0 deletions pathwaysutils/debug/timing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Timing utilities.

This module provides utilities for timing code with decorators and context
managers.
"""

import functools
import logging
import time
from typing import Any, Callable


_logger = logging.getLogger(__name__)


class Timer:
"""Timer context manager.

Attributes:
name: The name of the timer.
start: The start time of the timer.
stop: The stop time of the timer.
duration: The elapsed time of the timer.
"""

def __init__(self, name: str):
self.name = name

def __enter__(self):
self.start = time.time()
return self

def __exit__(self, exc_type, exc_value, tb):
self.stop = time.time()
self.duration = self.stop - self.start
_logger.debug(str(self))

def __str__(self):
return f"{self.name} elapsed {self.duration:.4f} seconds."


def timeit(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to time a function.

Args:
func: The function to time.

Returns:
The decorated function.
"""

@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any):
with Timer(getattr(func, "__name__", "Unknown")):
return func(*args, **kwargs)

return wrapper
89 changes: 89 additions & 0 deletions pathwaysutils/debug/watchdog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Watchdog context manager.

This context manager is used to monitor the progress of a long running process.
If the process takes longer than the specified timeout, it will print the stack
trace of all threads.
"""

import contextlib
import logging
import os
import sys
import threading
import traceback


_logger = logging.getLogger(__name__)


def _log_thread_stack(thread: threading.Thread):
_logger.debug("Thread: %s", thread.ident)
_logger.debug(
"".join(
traceback.format_stack(
sys._current_frames().get( # pylint: disable=protected-access
thread.ident, []
)
)
)
)


@contextlib.contextmanager
def watchdog(timeout: float, repeat: bool = True):
"""Watchdog context manager.

Prints the stack trace of all threads after `timeout` seconds.

Args:
timeout: The timeout in seconds. If the timeout is reached, the stack trace
of all threads will be printed.
repeat: Whether to repeat the watchdog after the timeout. If False, the
process will be aborted after the first timeout.

Yields:
None
"""
event = threading.Event()

def handler():
count = 0
while not event.wait(timeout):
_logger.debug(
"Watchdog thread dump every %s seconds. Count: %s", timeout, count
)
try:
for thread in threading.enumerate():
try:
_log_thread_stack(thread)
except Exception: # pylint: disable=broad-exception-caught
_logger.debug("Error print traceback for thread: %s", thread.ident)
finally:
if not repeat:
_logger.critical("Timeout from watchdog!")
os.abort()

count += 1

_logger.debug("Registering watchdog")
watchdog_thread = threading.Thread(target=handler, name="watchdog")
watchdog_thread.start()
try:
yield
finally:
event.set()
watchdog_thread.join()
_logger.debug("Deregistering watchdog")
13 changes: 13 additions & 0 deletions pathwaysutils/test/debug/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
86 changes: 86 additions & 0 deletions pathwaysutils/test/debug/timing_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Timing tests."""

import logging
import time
from unittest import mock

from pathwaysutils.debug import timing

from absl.testing import absltest
from absl.testing import parameterized


class TimingTest(parameterized.TestCase):

def test_timer_context_manager(self):
with mock.patch.object(
time,
"time",
side_effect=[1, 8.9],
autospec=True,
):
with timing.Timer("test_timer") as timer:
pass

self.assertEqual(timer.name, "test_timer")
self.assertEqual(timer.start, 1)
self.assertEqual(timer.stop, 8.9)
self.assertEqual(timer.duration, 7.9)
self.assertEqual(str(timer), "test_timer elapsed 7.9000 seconds.")

def test_timeit_log(self):

@timing.timeit
def my_function():
pass

with mock.patch.object(
time,
"time",
side_effect=[1, 8.9, 0], # Third time is used for logging.
autospec=True,
):
with self.assertLogs(timing._logger, logging.DEBUG) as log_output:
my_function()

self.assertEqual(
log_output.output,
[
"DEBUG:pathwaysutils.debug.timing:my_function"
" elapsed 7.9000 seconds."
],
)

def test_timeit_return_value(self):

@timing.timeit
def my_function():
return "test"

self.assertEqual(my_function(), "test")

def test_timeit_exception(self):

@timing.timeit
def my_function():
raise ValueError("test")

with self.assertRaises(ValueError):
my_function()


if __name__ == "__main__":
absltest.main()
96 changes: 96 additions & 0 deletions pathwaysutils/test/debug/watchdog_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Watchdog tests."""

import logging
import sys
import threading
import traceback
from unittest import mock

from pathwaysutils.debug import watchdog

from absl.testing import absltest
from absl.testing import parameterized


class WatchdogTest(parameterized.TestCase):
def test_watchdog_start_join(self):
with (
mock.patch.object(
threading.Thread,
"start",
autospec=True,
) as mock_start,
mock.patch.object(threading.Thread, "join", autospec=True) as mock_join,
):
with watchdog.watchdog(timeout=1):
mock_start.assert_called_once()
mock_join.assert_not_called()

mock_start.assert_called_once()
mock_join.assert_called_once()

@parameterized.named_parameters([
(
"thread 1",
1,
[
"DEBUG:pathwaysutils.debug.watchdog:Thread: 1",
"DEBUG:pathwaysutils.debug.watchdog:examplestack1",
],
),
(
"thread 2",
2,
[
"DEBUG:pathwaysutils.debug.watchdog:Thread: 2",
"DEBUG:pathwaysutils.debug.watchdog:examplestack2",
],
),
(
"thread 3",
3,
[
"DEBUG:pathwaysutils.debug.watchdog:Thread: 3",
"DEBUG:pathwaysutils.debug.watchdog:",
],
),
])
def test_log_thread_strack_succes(self, thread_ident, expected_log_output):
with (
mock.patch.object(
sys,
"_current_frames",
return_value={1: ["example", "stack1"], 2: ["example", "stack2"]},
autospec=True,
),
mock.patch.object(
traceback,
"format_stack",
side_effect=lambda stack_str_list: stack_str_list,
autospec=True,
),
):
mock_thread = mock.create_autospec(threading.Thread, instance=True)
mock_thread.ident = thread_ident

with self.assertLogs(watchdog._logger, logging.DEBUG) as log_output:
watchdog._log_thread_stack(mock_thread)

self.assertEqual(log_output.output, expected_log_output)


if __name__ == "__main__":
absltest.main()