From 235e34248506efc4299bc493e1768bcfddb70c01 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 4 Jun 2026 22:32:41 +0000 Subject: [PATCH 1/5] wandb compat: route string/bool log values to Pluto config WandbRunWrapper.log() pre-filtered logged values and only forwarded Python int/float, torch scalar tensors, and wandb media to Pluto. String values had no branch (_convert_wandb_to_pluto returns None for str), so they were silently dropped. Concretely, a checkpoint-metadata logger that does: wandb.log({'checkpoint/step': step, # int -> reached Pluto 'checkpoint/epoch': epoch, # int -> reached Pluto 'checkpoint/r2_path': r2_path, # str -> DROPPED 'checkpoint/local_path': path}, # str -> DROPPED step=step) lost the two string paths while the numeric step/epoch came through fine. Those string paths are exactly what a resume flow needs to know which object to stage. Now str/bool values route to update_config() (latest-wins, queryable via get_run().config), mirroring where wandb places loose strings (run summary/overview). A resume skill can read the most recent checkpoint/r2_path off the run config instead of scanning storage. Also forward numpy scalars (np.generic, excluding np.bool_) as metrics via .item(). This is defensive hardening, not part of the above bug: the shim was stricter than Pluto's own log(), which already accepts anything exposing .item(). Frameworks that log np.int64/np.float32 were dropped here even though Pluto core would have kept them. --- pluto/compat/wandb.py | 55 ++++++++++++++++++++++++++++-- tests/test_wandb_compat.py | 68 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 3 deletions(-) diff --git a/pluto/compat/wandb.py b/pluto/compat/wandb.py index a9e702a..44a9a74 100644 --- a/pluto/compat/wandb.py +++ b/pluto/compat/wandb.py @@ -158,7 +158,17 @@ def _do_finish(): logger.debug(f'pluto.compat.wandb: Pluto finish timed out after {timeout}s') def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): - """Log metrics to both wandb and Pluto.""" + """Log metrics to both wandb and Pluto. + + Value routing for the Pluto side: + - int/float, torch & numpy scalars -> Pluto metrics (time-series) + - wandb media (Image/Video/Audio/Histogram/Table), and lists + thereof -> converted Pluto media + - str and bool -> Pluto config (latest-wins). Pluto has no + string/bool time-series metric, so these mirror wandb's + summary/overview placement and stay queryable via + get_run().config. + """ # Determine the step to use for Pluto. # When step is explicit, use it. Otherwise: # - Normal mode: read wandb's _step before log() increments it @@ -186,11 +196,25 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): # Pluto.log() natively supports lists, so we just need # to convert each element and pass the list through. pluto_data: Dict[str, Any] = {} + # String values have no time-series metric equivalent in + # Pluto (op._process_log_item_sync only keeps int/float/ + # tensor/File/Data). wandb puts loose strings in the run + # summary/overview; the closest Pluto analogue is config, + # which is latest-wins and queryable via get_run().config. + # This is what lets e.g. a resume skill read back the most + # recent checkpoint/r2_path for a run. + pluto_config: Dict[str, Any] = {} for key, value in data.items(): - if isinstance(value, (int, float)): + if isinstance(value, bool): + # bool is a subclass of int, but Pluto drops bool + # metrics — surface it as config so it isn't lost. + pluto_config[key] = value + elif isinstance(value, (int, float)): pluto_data[key] = value - elif _is_torch_tensor_scalar(value): + elif _is_torch_tensor_scalar(value) or _is_numpy_scalar(value): pluto_data[key] = value.item() + elif isinstance(value, str): + pluto_config[key] = value elif isinstance(value, (list, tuple)): # List of wandb media — convert each element. converted_items = [] @@ -211,6 +235,15 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): if actual_step is not None: log_kwargs['step'] = actual_step self._pluto_run.log(pluto_data, **log_kwargs) + + if pluto_config: + try: + self._pluto_run.update_config(pluto_config) + except Exception as e: + logger.debug( + f'pluto.compat.wandb: Failed to sync string/bool ' + f'values to Pluto config: {e}' + ) except Exception as e: logger.debug(f'pluto.compat.wandb: Failed to log metrics to Pluto: {e}') @@ -508,6 +541,22 @@ def _is_torch_tensor_scalar(value): return False +def _is_numpy_scalar(value): + """Check if value is a numpy scalar number (e.g. np.int64, np.float32). + + numpy scalars are NOT instances of Python int/float, so frameworks that + log ``np.int64(step)`` / ``np.float32(loss)`` would otherwise be dropped + by the numeric filter below — even though Pluto's own log() accepts + anything with .item(). Booleans are excluded (Pluto drops bool metrics). + """ + try: + import numpy as np + + return isinstance(value, np.generic) and not isinstance(value, np.bool_) + except ImportError: + return False + + def _is_torch_distributed() -> bool: """ Check if we're running in a torch.distributed environment. diff --git a/tests/test_wandb_compat.py b/tests/test_wandb_compat.py index 00eb7a1..2e6e078 100644 --- a/tests/test_wandb_compat.py +++ b/tests/test_wandb_compat.py @@ -236,3 +236,71 @@ def test_omegaconf_config_flows_through_shim_and_serializes(clean_env, monkeypat payload = make_compat_start_v1(native, Settings(), info=None) inner = json.loads(json.loads(payload.decode())['config']) assert inner['model']['full_name'] == 'resnet-v2' # interpolation resolved + + +# --------------------------------------------------------------------------- +# WandbRunWrapper.log value routing +# +# The shim pre-filters each logged value before forwarding to Pluto. These +# tests pin the routing that backs the /resume-crashed-run use case: string +# paths (e.g. checkpoint/r2_path) must reach Pluto as config (latest-wins, +# queryable via get_run().config), and numpy scalars must not be silently +# dropped the way plain str/np values were before. +# --------------------------------------------------------------------------- + + +def _make_wrapper(): + """Build a WandbRunWrapper with mock wandb/pluto runs (no atexit).""" + wandb_run = MagicMock() + wandb_run._step = 7 + pluto_run = MagicMock() + pluto_module = MagicMock() + # Avoid registering a real atexit handler during the test. + with mock.patch.object(wandb_compat.atexit, 'register'): + wrapper = wandb_compat.WandbRunWrapper( + wandb_run, pluto_run, pluto_module, wandb_disabled=False + ) + return wrapper, pluto_run + + +def test_log_routes_strings_to_config_not_metrics(): + """checkpoint/r2_path (a str) must land in Pluto config, not log().""" + wrapper, pluto_run = _make_wrapper() + + wrapper.log( + { + 'checkpoint/step': 100, + 'checkpoint/r2_path': 's3://bucket/run/ckpt-100.pt', + 'checkpoint/local_path': '/nfs/run/ckpt-100.pt', + } + ) + + # Strings forwarded to config (latest-wins, readable via get_run().config). + assert pluto_run.update_config.call_count == 1 + cfg = pluto_run.update_config.call_args.args[0] + assert cfg['checkpoint/r2_path'] == 's3://bucket/run/ckpt-100.pt' + assert cfg['checkpoint/local_path'] == '/nfs/run/ckpt-100.pt' + + # Numeric value still goes to metrics; strings must NOT be in log(). + logged = pluto_run.log.call_args.args[0] + assert logged == {'checkpoint/step': 100} + assert 'checkpoint/r2_path' not in logged + + +def test_log_forwards_numpy_scalars_as_metrics(): + """np.int64/np.float32 must reach Pluto metrics, not be dropped.""" + np = pytest.importorskip('numpy') + wrapper, pluto_run = _make_wrapper() + + wrapper.log( + { + 'checkpoint/step': np.int64(100), + 'loss': np.float32(0.5), + } + ) + + logged = pluto_run.log.call_args.args[0] + assert logged['checkpoint/step'] == 100 + assert isinstance(logged['checkpoint/step'], int) # .item() -> python int + assert abs(logged['loss'] - 0.5) < 1e-6 + assert isinstance(logged['loss'], float) From bf625a698b83b2a2fab209728776770c643212c6 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 01:05:36 +0000 Subject: [PATCH 2/5] wandb compat: forward any .item() scalar as a metric (match Pluto core) Generalize the shim's numeric forwarding to accept any value exposing a callable .item() returning a number, replacing the narrower torch-tensor + numpy-generic checks. This mirrors Pluto's own log() (op._process_log_item_sync), which already forwards anything with .item(). Motivation: a checkpoint logger annotated as log_checkpoint(step: int, epoch: int, ...) can still pass a non-plain-int at runtime (np.int64, a 0-d tensor, a framework scalar). The old shim dropped those even though Pluto core would have kept them, so a logged 'epoch' could silently never reach Pluto. A failing .item() (multi-element array) is treated as not-a-scalar and ignored, same as Pluto would fail it. bool/str remain routed to config. --- pluto/compat/wandb.py | 49 ++++++++++++++++++++------------------ tests/test_wandb_compat.py | 36 ++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 23 deletions(-) diff --git a/pluto/compat/wandb.py b/pluto/compat/wandb.py index 44a9a74..34f054e 100644 --- a/pluto/compat/wandb.py +++ b/pluto/compat/wandb.py @@ -161,7 +161,8 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): """Log metrics to both wandb and Pluto. Value routing for the Pluto side: - - int/float, torch & numpy scalars -> Pluto metrics (time-series) + - int/float and any scalar exposing .item() (numpy/torch/etc.) + -> Pluto metrics (time-series), matching Pluto core's own log() - wandb media (Image/Video/Audio/Histogram/Table), and lists thereof -> converted Pluto media - str and bool -> Pluto config (latest-wins). Pluto has no @@ -211,8 +212,8 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): pluto_config[key] = value elif isinstance(value, (int, float)): pluto_data[key] = value - elif _is_torch_tensor_scalar(value) or _is_numpy_scalar(value): - pluto_data[key] = value.item() + elif (num := _as_scalar_number(value)) is not None: + pluto_data[key] = num elif isinstance(value, str): pluto_config[key] = value elif isinstance(value, (list, tuple)): @@ -531,30 +532,32 @@ def _resolve_wandb_to_pluto_run(wandb_run_id, project): return None -def _is_torch_tensor_scalar(value): - """Check if value is a scalar torch tensor.""" - try: - import torch - - return isinstance(value, torch.Tensor) and value.dim() == 0 - except ImportError: - return False +def _as_scalar_number(value): + """Return value as a python int/float if it's a scalar number, else None. + Mirrors Pluto's own log() (op._process_log_item_sync), which forwards + anything exposing a callable ``.item()``. The shim previously only + accepted plain int/float and torch scalar tensors, so a value logged as + a numpy scalar (``np.int64``), a 0-d numpy array, or a non-torch 0-d + tensor was dropped here even though Pluto core would have kept it — e.g. + an ``epoch`` that is ``np.int64`` rather than a plain ``int``. -def _is_numpy_scalar(value): - """Check if value is a numpy scalar number (e.g. np.int64, np.float32). - - numpy scalars are NOT instances of Python int/float, so frameworks that - log ``np.int64(step)`` / ``np.float32(loss)`` would otherwise be dropped - by the numeric filter below — even though Pluto's own log() accepts - anything with .item(). Booleans are excluded (Pluto drops bool metrics). + bool and str are excluded (Pluto drops bool metrics; str routes to + config). ``.item()`` on a multi-element array/tensor raises — we treat + that as "not a scalar" and return None, same as Pluto would fail it. """ + if isinstance(value, (bool, str)): + return None + item = getattr(value, 'item', None) + if not callable(item): + return None try: - import numpy as np - - return isinstance(value, np.generic) and not isinstance(value, np.bool_) - except ImportError: - return False + result = item() + except Exception: + return None + if isinstance(result, bool) or not isinstance(result, (int, float)): + return None + return result def _is_torch_distributed() -> bool: diff --git a/tests/test_wandb_compat.py b/tests/test_wandb_compat.py index 2e6e078..e2df6ab 100644 --- a/tests/test_wandb_compat.py +++ b/tests/test_wandb_compat.py @@ -304,3 +304,39 @@ def test_log_forwards_numpy_scalars_as_metrics(): assert isinstance(logged['checkpoint/step'], int) # .item() -> python int assert abs(logged['loss'] - 0.5) < 1e-6 assert isinstance(logged['loss'], float) + + +def test_log_forwards_any_item_scalar_like_pluto_core(): + """Any scalar exposing .item() is forwarded, matching Pluto's own log(). + + Guards against the shim being stricter than op._process_log_item_sync: + e.g. an ``epoch`` that arrives as a 0-d-tensor-like wrapper rather than a + plain int must still reach Pluto instead of being silently dropped. + """ + wrapper, pluto_run = _make_wrapper() + + class _ScalarLike: + def __init__(self, v): + self._v = v + + def item(self): + return self._v + + wrapper.log({'checkpoint/epoch': _ScalarLike(12)}) + + logged = pluto_run.log.call_args.args[0] + assert logged == {'checkpoint/epoch': 12} + + +def test_log_does_not_treat_failing_item_as_scalar(): + """A non-scalar whose .item() raises must not crash or produce a metric.""" + wrapper, pluto_run = _make_wrapper() + + class _MultiElement: + def item(self): + raise ValueError('can only convert an array of size 1') + + wrapper.log({'weird': _MultiElement()}) + + assert not pluto_run.log.called + assert not pluto_run.update_config.called From fcfaf6ac487b62b40fbe3a2caba13def1251cd2a Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 23:03:42 +0000 Subject: [PATCH 3/5] wandb compat: stop silently dropping unforwardable log values MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The shim only forwarded recognized types (numbers, wandb media, str/bool) and silently dropped everything else — dicts, None, raw/multi-element tensors, numpy arrays, unconvertible wandb media (Html/Object3D/...), custom objects. Silent drops are what made missing data (e.g. a whole checkpoint metadata call) so hard to diagnose: no error, no warning, just absent. Add a 'preserve-what-we-can, otherwise fail loud' fallback in _handle_unforwardable(): - JSON-serializable leftovers (nested dicts/lists of primitives, None, etc.) are preserved as Pluto config, mirroring how wandb keeps loose values in the run summary. - Anything else warns ONCE per key (WARNING, naming key + type) instead of vanishing. The value still reaches W&B; only the Pluto copy is dropped, and now visibly. Never raises — wandb behavior is unaffected. --- pluto/compat/wandb.py | 61 ++++++++++++++++++++++++++++++++++++++ tests/test_wandb_compat.py | 35 ++++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/pluto/compat/wandb.py b/pluto/compat/wandb.py index 34f054e..381cc8e 100644 --- a/pluto/compat/wandb.py +++ b/pluto/compat/wandb.py @@ -41,6 +41,7 @@ """ import atexit +import json import logging import os import threading @@ -91,6 +92,9 @@ def __init__(self, wandb_run, pluto_run, pluto_module, wandb_disabled=False): self._fallback_step = 0 # Used when wandb is disabled (_step won't increment) self._closed = False self._close_lock = threading.Lock() + # Keys we've already warned about being unforwardable to Pluto, so a + # value logged every step warns once rather than spamming the logs. + self._unforwardable_warned: set = set() if self._pluto_run: atexit.register(self._atexit_cleanup_pluto) @@ -169,6 +173,10 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): string/bool time-series metric, so these mirror wandb's summary/overview placement and stay queryable via get_run().config. + - anything else with no metric/media mapping -> preserved as + config if JSON-serializable, otherwise dropped with a ONE-TIME + warning per key (see _handle_unforwardable). Nothing is dropped + silently. """ # Determine the step to use for Pluto. # When step is explicit, use it. Otherwise: @@ -225,11 +233,19 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): converted_items.append(c) if converted_items: pluto_data[key] = converted_items + else: + # Not a media list (e.g. list of primitives) — + # preserve as config if possible, else warn. + self._handle_unforwardable(key, value, pluto_config) else: # Try to convert wandb data types to pluto equivalents converted = _convert_wandb_to_pluto(key, value, self._pluto) if converted is not None: pluto_data[key] = converted + else: + # No metric/media mapping — last-resort handling + # so the value is never silently dropped. + self._handle_unforwardable(key, value, pluto_config) if pluto_data: log_kwargs = {} @@ -250,6 +266,36 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): return result + def _handle_unforwardable(self, key, value, pluto_config: Dict[str, Any]) -> None: + """Last-resort handling for a value with no metric/media mapping. + + Pluto only stores numbers (metrics), media/structured data, and + config — so values outside those (dicts, None, raw/multi-element + tensors, numpy arrays, unconvertible wandb media like Html/Object3D, + custom objects) have nowhere to go. Rather than dropping them + silently — which is what made missing data so hard to diagnose — + we: + + 1. Preserve the value as config if it's JSON-serializable (mirrors + how wandb keeps loose values in the run summary). This covers + nested dicts/lists of primitives, None, etc. + 2. Otherwise warn ONCE per key (WARNING, not debug) naming the key + and type, so the drop is visible. The value still reached W&B; + only the Pluto copy is dropped. + """ + if _is_json_serializable(value): + pluto_config[key] = value + return + if key not in self._unforwardable_warned: + self._unforwardable_warned.add(key) + logger.warning( + 'pluto.compat.wandb: not forwarding %r to Pluto — value of ' + 'type %s has no metric/media/config mapping (it was still ' + 'logged to W&B). This warning is shown once per key.', + key, + type(value).__name__, + ) + def finish(self, exit_code=None, quiet=None): """Finish both wandb and Pluto runs.""" with self._close_lock: @@ -560,6 +606,21 @@ def _as_scalar_number(value): return result +def _is_json_serializable(value) -> bool: + """True if value can be stored as Pluto config (round-trips through JSON). + + Used as the last-resort fallback for values with no metric/media + mapping: serializable ones (str, bool, None, nested dicts/lists of + primitives) are preserved as config; everything else is warned about + and dropped. Mirrors the serialization Pluto config itself performs. + """ + try: + json.dumps(value) + return True + except (TypeError, ValueError): + return False + + def _is_torch_distributed() -> bool: """ Check if we're running in a torch.distributed environment. diff --git a/tests/test_wandb_compat.py b/tests/test_wandb_compat.py index e2df6ab..85eb416 100644 --- a/tests/test_wandb_compat.py +++ b/tests/test_wandb_compat.py @@ -340,3 +340,38 @@ def item(self): assert not pluto_run.log.called assert not pluto_run.update_config.called + + +def test_unforwardable_value_warns_once_not_silent(caplog): + """A value with no Pluto mapping must warn (once per key), never silently drop.""" + wrapper, pluto_run = _make_wrapper() + + class _Opaque: + """Not numeric, not media, not JSON-serializable.""" + + def item(self): + raise ValueError('not a scalar') + + with caplog.at_level('WARNING', logger='pluto.compat.wandb'): + wrapper.log({'mystery': _Opaque()}) + wrapper.log({'mystery': _Opaque()}) # second time: must NOT warn again + + warnings = [r for r in caplog.records if 'mystery' in r.getMessage()] + assert len(warnings) == 1, 'should warn exactly once per key' + assert 'mystery' in warnings[0].getMessage() + assert '_Opaque' in warnings[0].getMessage() # names the offending type + # Nothing forwarded — but it was loud about it. + assert not pluto_run.log.called + assert not pluto_run.update_config.called + + +def test_json_serializable_unmapped_value_falls_back_to_config(): + """A dict/None with no metric mapping is preserved as config, not dropped.""" + wrapper, pluto_run = _make_wrapper() + + wrapper.log({'meta/info': {'kind': 'resume', 'attempt': 3}, 'note': None}) + + cfg = pluto_run.update_config.call_args.args[0] + assert cfg['meta/info'] == {'kind': 'resume', 'attempt': 3} + assert cfg['note'] is None + assert not pluto_run.log.called # no numeric metrics in this call From 3d52d9ced60d6e9146d878e65a2d49d4488d614d Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 5 Jun 2026 23:05:32 +0000 Subject: [PATCH 4/5] wandb compat: report unforwardable values to Sentry, not to users MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous change warned users (WARNING log) when a value had no Pluto mapping. But that's a gap in OUR type coverage, not a user error — people migrating away from wandb shouldn't be nagged about types only the package maintainers can fix. Reroute the signal: genuinely unforwardable values (non-JSON-serializable, no metric/media mapping) now emit a Sentry telemetry alert via the SDK's isolated client (pluto/sentry.py), once per key, grouped by type name so we can see which unhandled types show up in the wild and add coverage. The local log drops to debug. JSON-serializable leftovers still fall back to config silently. Sentry honors PLUTO_DISABLE_TELEMETRY and swallows all errors, so this never affects the user or wandb. --- pluto/compat/wandb.py | 47 ++++++++++++++++++++++++++------------ tests/test_wandb_compat.py | 18 +++++++-------- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/pluto/compat/wandb.py b/pluto/compat/wandb.py index 381cc8e..594842f 100644 --- a/pluto/compat/wandb.py +++ b/pluto/compat/wandb.py @@ -174,9 +174,9 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): summary/overview placement and stay queryable via get_run().config. - anything else with no metric/media mapping -> preserved as - config if JSON-serializable, otherwise dropped with a ONE-TIME - warning per key (see _handle_unforwardable). Nothing is dropped - silently. + config if JSON-serializable, otherwise dropped and reported to + Sentry telemetry once per key (a maintainer-coverage signal, not + a user-facing warning). See _handle_unforwardable. """ # Determine the step to use for Pluto. # When step is explicit, use it. Otherwise: @@ -279,22 +279,41 @@ def _handle_unforwardable(self, key, value, pluto_config: Dict[str, Any]) -> Non 1. Preserve the value as config if it's JSON-serializable (mirrors how wandb keeps loose values in the run summary). This covers nested dicts/lists of primitives, None, etc. - 2. Otherwise warn ONCE per key (WARNING, not debug) naming the key - and type, so the drop is visible. The value still reached W&B; - only the Pluto copy is dropped. + 2. Otherwise drop the Pluto copy (it still reached W&B) and report + it as a maintainer-coverage signal via Sentry telemetry — once + per key. This is a gap in OUR type handling, not a user error, + so we deliberately do NOT emit a user-facing warning: people + migrating away from wandb shouldn't be nagged about types only + we can fix. The local log stays at debug for self-host + debugging. """ if _is_json_serializable(value): pluto_config[key] = value return - if key not in self._unforwardable_warned: - self._unforwardable_warned.add(key) - logger.warning( - 'pluto.compat.wandb: not forwarding %r to Pluto — value of ' - 'type %s has no metric/media/config mapping (it was still ' - 'logged to W&B). This warning is shown once per key.', - key, - type(value).__name__, + if key in self._unforwardable_warned: + return + self._unforwardable_warned.add(key) + type_name = type(value).__name__ + # Quiet locally (debug only) — not a user-actionable problem. + logger.debug( + 'pluto.compat.wandb: not forwarding %r to Pluto — type %s has no ' + 'metric/media/config mapping (still logged to W&B).', + key, + type_name, + ) + # Alert us (the maintainers) so we can add coverage for the type. + # Message is keyed on the type (not the run-specific key) so Sentry + # groups all occurrences of the same unhandled type together. + try: + from pluto import sentry + + sentry.capture_message( + f'wandb compat: unforwardable Pluto log value of type ' + f'{type_name!r} (no metric/media/config mapping)', + level='warning', ) + except Exception: + pass def finish(self, exit_code=None, quiet=None): """Finish both wandb and Pluto runs.""" diff --git a/tests/test_wandb_compat.py b/tests/test_wandb_compat.py index 85eb416..43e6cc1 100644 --- a/tests/test_wandb_compat.py +++ b/tests/test_wandb_compat.py @@ -342,8 +342,8 @@ def item(self): assert not pluto_run.update_config.called -def test_unforwardable_value_warns_once_not_silent(caplog): - """A value with no Pluto mapping must warn (once per key), never silently drop.""" +def test_unforwardable_value_alerts_sentry_once_not_user(): + """An unmappable value alerts Sentry (maintainers) once — not the user.""" wrapper, pluto_run = _make_wrapper() class _Opaque: @@ -352,15 +352,15 @@ class _Opaque: def item(self): raise ValueError('not a scalar') - with caplog.at_level('WARNING', logger='pluto.compat.wandb'): + with mock.patch('pluto.sentry.capture_message') as cap: wrapper.log({'mystery': _Opaque()}) - wrapper.log({'mystery': _Opaque()}) # second time: must NOT warn again + wrapper.log({'mystery': _Opaque()}) # second time: no duplicate alert - warnings = [r for r in caplog.records if 'mystery' in r.getMessage()] - assert len(warnings) == 1, 'should warn exactly once per key' - assert 'mystery' in warnings[0].getMessage() - assert '_Opaque' in warnings[0].getMessage() # names the offending type - # Nothing forwarded — but it was loud about it. + # Exactly one maintainer-facing Sentry alert, grouped by type name. + assert cap.call_count == 1 + assert '_Opaque' in cap.call_args.args[0] + assert cap.call_args.kwargs.get('level') == 'warning' + # Nothing forwarded to the user's run, and no user-facing exception. assert not pluto_run.log.called assert not pluto_run.update_config.called From b338a3487ffbb5e6e687c8cbda52fa2899f399d4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 6 Jun 2026 00:14:45 +0000 Subject: [PATCH 5/5] =?UTF-8?q?wandb=20compat:=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20config=20independence,=20OmegaConf,=20dedup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes from PR #120 review (Cursor Bugbot + Gemini): 1. Config no longer skipped on metric-log failure. update_config() shared the outer try with pluto_run.log(); a metric/media logging exception jumped to the handler and dropped str/bool config from the same call. Metrics and config now send in independent try blocks. 2. OmegaConf values are storable as config. The fallback gate used plain json.dumps, but update_config normalizes via to_native_config (which deep-converts DictConfig/ListConfig). A logged OmegaConf node was wrongly treated as unforwardable (dropped + Sentry-alerted). _config_storable_value now mirrors update_config's normalization, so OmegaConf is kept while tensors/ndarrays/custom objects still fall through to Sentry. 3. Skip redundant config writes. Logging an unchanged str/bool/config value every step re-triggered update_config (a SQLite write) each step. Dedup against the last synced value via self._last_logged_config, updated only on a successful update_config. Uses a _MISSING sentinel so a first-time None is still sent (None != missing). Tests: dedup skip/change behavior, OmegaConf-node fallback. --- pluto/compat/wandb.py | 92 +++++++++++++++++++++++++++----------- tests/test_wandb_compat.py | 36 +++++++++++++++ 2 files changed, 101 insertions(+), 27 deletions(-) diff --git a/pluto/compat/wandb.py b/pluto/compat/wandb.py index 594842f..8540e3c 100644 --- a/pluto/compat/wandb.py +++ b/pluto/compat/wandb.py @@ -56,6 +56,9 @@ logger = logging.getLogger(__name__) +# Distinct from None so config dedup can tell "never logged" from "logged None". +_MISSING = object() + _original_wandb_init = None _original_wandb_log = None _original_wandb_finish = None @@ -95,6 +98,10 @@ def __init__(self, wandb_run, pluto_run, pluto_module, wandb_disabled=False): # Keys we've already warned about being unforwardable to Pluto, so a # value logged every step warns once rather than spamming the logs. self._unforwardable_warned: set = set() + # Last config values we synced to Pluto, keyed by log key. Lets us skip + # redundant update_config() calls when a str/bool/config value is logged + # unchanged every step (a common pattern: phase/status/checkpoint paths). + self._last_logged_config: Dict[str, Any] = {} if self._pluto_run: atexit.register(self._atexit_cleanup_pluto) @@ -174,9 +181,13 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): summary/overview placement and stay queryable via get_run().config. - anything else with no metric/media mapping -> preserved as - config if JSON-serializable, otherwise dropped and reported to - Sentry telemetry once per key (a maintainer-coverage signal, not - a user-facing warning). See _handle_unforwardable. + config if it survives update_config's normalization (incl. + OmegaConf), otherwise dropped and reported to Sentry telemetry + once per key (a maintainer-coverage signal, not a user-facing + warning). See _handle_unforwardable. + + str/bool/config values are deduped against the last synced value, so + logging an unchanged value every step doesn't spam update_config. """ # Determine the step to use for Pluto. # When step is explicit, use it. Otherwise: @@ -217,13 +228,17 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): if isinstance(value, bool): # bool is a subclass of int, but Pluto drops bool # metrics — surface it as config so it isn't lost. - pluto_config[key] = value + # Skip if unchanged since last log (avoid redundant + # config writes when logged every step). + if self._last_logged_config.get(key, _MISSING) != value: + pluto_config[key] = value elif isinstance(value, (int, float)): pluto_data[key] = value elif (num := _as_scalar_number(value)) is not None: pluto_data[key] = num elif isinstance(value, str): - pluto_config[key] = value + if self._last_logged_config.get(key, _MISSING) != value: + pluto_config[key] = value elif isinstance(value, (list, tuple)): # List of wandb media — convert each element. converted_items = [] @@ -247,22 +262,32 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs): # so the value is never silently dropped. self._handle_unforwardable(key, value, pluto_config) + # Metrics and config are sent in independent try blocks: a + # failure logging metrics must NOT skip the config update (or + # vice versa) — str/bool from the same wandb.log() call live in + # config and would otherwise be silently lost. if pluto_data: - log_kwargs = {} - if actual_step is not None: - log_kwargs['step'] = actual_step - self._pluto_run.log(pluto_data, **log_kwargs) + try: + log_kwargs = {} + if actual_step is not None: + log_kwargs['step'] = actual_step + self._pluto_run.log(pluto_data, **log_kwargs) + except Exception as e: + logger.debug( + f'pluto.compat.wandb: Failed to log metrics to Pluto: {e}' + ) if pluto_config: try: self._pluto_run.update_config(pluto_config) + # Only remember as synced once the update succeeds. + self._last_logged_config.update(pluto_config) except Exception as e: logger.debug( - f'pluto.compat.wandb: Failed to sync string/bool ' - f'values to Pluto config: {e}' + f'pluto.compat.wandb: Failed to sync config to Pluto: {e}' ) except Exception as e: - logger.debug(f'pluto.compat.wandb: Failed to log metrics to Pluto: {e}') + logger.debug(f'pluto.compat.wandb: Failed to prepare Pluto data: {e}') return result @@ -276,9 +301,11 @@ def _handle_unforwardable(self, key, value, pluto_config: Dict[str, Any]) -> Non silently — which is what made missing data so hard to diagnose — we: - 1. Preserve the value as config if it's JSON-serializable (mirrors - how wandb keeps loose values in the run summary). This covers - nested dicts/lists of primitives, None, etc. + 1. Preserve the value as config if it survives update_config's own + normalization (mirrors how wandb keeps loose values in the run + summary). This covers nested dicts/lists of primitives, None, and + OmegaConf DictConfig/ListConfig nodes (which to_native_config + deep-converts). Skipped if unchanged since the last log. 2. Otherwise drop the Pluto copy (it still reached W&B) and report it as a maintainer-coverage signal via Sentry telemetry — once per key. This is a gap in OUR type handling, not a user error, @@ -287,8 +314,10 @@ def _handle_unforwardable(self, key, value, pluto_config: Dict[str, Any]) -> Non we can fix. The local log stays at debug for self-host debugging. """ - if _is_json_serializable(value): - pluto_config[key] = value + storable, native = _config_storable_value(value) + if storable: + if self._last_logged_config.get(key, _MISSING) != native: + pluto_config[key] = native return if key in self._unforwardable_warned: return @@ -625,19 +654,28 @@ def _as_scalar_number(value): return result -def _is_json_serializable(value) -> bool: - """True if value can be stored as Pluto config (round-trips through JSON). +def _config_storable_value(value): + """Return ``(storable, native)`` for the config fallback. - Used as the last-resort fallback for values with no metric/media - mapping: serializable ones (str, bool, None, nested dicts/lists of - primitives) are preserved as config; everything else is warned about - and dropped. Mirrors the serialization Pluto config itself performs. + Mirrors what ``update_config`` actually does — normalize via + ``to_native_config`` (which deep-converts OmegaConf ``DictConfig`` / + ``ListConfig`` to native containers), then check JSON-serializability. + Keeping the gate in lockstep with ``update_config`` means a logged + ``DictConfig`` is correctly stored as config, even though plain + ``json.dumps`` would reject it. Tensors / ndarrays / custom objects still + fail (``to_native_config`` leaves them as-is) and fall through to the + Sentry path. + + Returns ``(True, native_value)`` when storable, else ``(False, None)``. """ try: - json.dumps(value) - return True - except (TypeError, ValueError): - return False + from pluto.util import to_native_config + + native = to_native_config(value) + json.dumps(native) + return True, native + except Exception: + return False, None def _is_torch_distributed() -> bool: diff --git a/tests/test_wandb_compat.py b/tests/test_wandb_compat.py index 43e6cc1..ade9434 100644 --- a/tests/test_wandb_compat.py +++ b/tests/test_wandb_compat.py @@ -375,3 +375,39 @@ def test_json_serializable_unmapped_value_falls_back_to_config(): assert cfg['meta/info'] == {'kind': 'resume', 'attempt': 3} assert cfg['note'] is None assert not pluto_run.log.called # no numeric metrics in this call + + +def test_log_skips_redundant_config_updates(): + """An unchanged str/bool config value must not re-trigger update_config.""" + wrapper, pluto_run = _make_wrapper() + + # First log: config is synced. + wrapper.log({'phase': 'train', 'loss': 0.5}) + assert pluto_run.update_config.call_count == 1 + assert pluto_run.update_config.call_args.args[0] == {'phase': 'train'} + + # Same config value again: update_config must NOT be called. + pluto_run.update_config.reset_mock() + wrapper.log({'phase': 'train', 'loss': 0.4}) + assert pluto_run.update_config.call_count == 0 + + # Changed config value: update_config is called again, with only the change. + wrapper.log({'phase': 'val', 'loss': 0.3}) + assert pluto_run.update_config.call_count == 1 + assert pluto_run.update_config.call_args.args[0] == {'phase': 'val'} + + +def test_omegaconf_value_falls_back_to_config_not_dropped(): + """A logged OmegaConf node is storable as config (not Sentry-dropped).""" + OmegaConf = pytest.importorskip('omegaconf').OmegaConf + wrapper, pluto_run = _make_wrapper() + + cfg_node = OmegaConf.create({'lr': 0.01, 'sched': {'name': 'cosine'}}) + + with mock.patch('pluto.sentry.capture_message') as cap: + wrapper.log({'hparams': cfg_node}) + + # Stored as config, deep-converted to native containers; not dropped. + cfg = pluto_run.update_config.call_args.args[0] + assert cfg['hparams'] == {'lr': 0.01, 'sched': {'name': 'cosine'}} + assert not cap.called # OmegaConf is storable -> no maintainer alert