Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 186 additions & 16 deletions pluto/compat/wandb.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"""

import atexit
import json
import logging
import os
import threading
Expand All @@ -55,6 +56,9 @@

logger = logging.getLogger(__name__)

# Distinct from None so config dedup can tell "never logged" from "logged None".
_MISSING = object()

_original_wandb_init = None
_original_wandb_log = None
_original_wandb_finish = None
Expand Down Expand Up @@ -91,6 +95,13 @@ def __init__(self, wandb_run, pluto_run, pluto_module, wandb_disabled=False):
self._fallback_step = 0 # Used when wandb is disabled (_step won't increment)
self._closed = False
self._close_lock = threading.Lock()
# Keys we've already warned about being unforwardable to Pluto, so a
# value logged every step warns once rather than spamming the logs.
self._unforwardable_warned: set = set()
Comment on lines +98 to +100

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Performance Bottleneck: Redundant Config Updates

Logging string or boolean values (such as training phase, status, or checkpoint paths) at every step is a very common pattern in machine learning training loops. Currently, every call to log() containing a string or boolean will trigger a call to self._pluto_run.update_config(pluto_config).

If the sync process is disabled or not yet initialized, update_config performs a synchronous, blocking HTTP POST request to the server. Even when the sync process is enabled, it triggers a synchronous write to the local SQLite database. Doing this at every single step will severely degrade training performance due to network or disk I/O bottlenecks.

To prevent this, we should cache the last logged config values and only send updates when a value actually changes.

Suggested change
# Keys we've already warned about being unforwardable to Pluto, so a
# value logged every step warns once rather than spamming the logs.
self._unforwardable_warned: set = set()
# Keys we've already warned about being unforwardable to Pluto, so a
# value logged every step warns once rather than spamming the logs.
self._unforwardable_warned: set = set()
# Cache of the last logged config values to avoid redundant updates.
self._last_logged_config: Dict[str, Any] = {}

# Last config values we synced to Pluto, keyed by log key. Lets us skip
# redundant update_config() calls when a str/bool/config value is logged
# unchanged every step (a common pattern: phase/status/checkpoint paths).
self._last_logged_config: Dict[str, Any] = {}

if self._pluto_run:
atexit.register(self._atexit_cleanup_pluto)
Expand Down Expand Up @@ -158,7 +169,26 @@ def _do_finish():
logger.debug(f'pluto.compat.wandb: Pluto finish timed out after {timeout}s')

def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs):
"""Log metrics to both wandb and Pluto."""
"""Log metrics to both wandb and Pluto.

Value routing for the Pluto side:
- int/float and any scalar exposing .item() (numpy/torch/etc.)
-> Pluto metrics (time-series), matching Pluto core's own log()
- wandb media (Image/Video/Audio/Histogram/Table), and lists
thereof -> converted Pluto media
- str and bool -> Pluto config (latest-wins). Pluto has no
string/bool time-series metric, so these mirror wandb's
summary/overview placement and stay queryable via
get_run().config.
- anything else with no metric/media mapping -> preserved as
config if it survives update_config's normalization (incl.
OmegaConf), otherwise dropped and reported to Sentry telemetry
once per key (a maintainer-coverage signal, not a user-facing
warning). See _handle_unforwardable.

str/bool/config values are deduped against the last synced value, so
logging an unchanged value every step doesn't spam update_config.
"""
# Determine the step to use for Pluto.
# When step is explicit, use it. Otherwise:
# - Normal mode: read wandb's _step before log() increments it
Expand Down Expand Up @@ -186,11 +216,29 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs):
# Pluto.log() natively supports lists, so we just need
# to convert each element and pass the list through.
pluto_data: Dict[str, Any] = {}
# String values have no time-series metric equivalent in
# Pluto (op._process_log_item_sync only keeps int/float/
# tensor/File/Data). wandb puts loose strings in the run
# summary/overview; the closest Pluto analogue is config,
# which is latest-wins and queryable via get_run().config.
# This is what lets e.g. a resume skill read back the most
# recent checkpoint/r2_path for a run.
pluto_config: Dict[str, Any] = {}
for key, value in data.items():
if isinstance(value, (int, float)):
if isinstance(value, bool):
# bool is a subclass of int, but Pluto drops bool
# metrics — surface it as config so it isn't lost.
# Skip if unchanged since last log (avoid redundant
# config writes when logged every step).
if self._last_logged_config.get(key, _MISSING) != value:
pluto_config[key] = value
elif isinstance(value, (int, float)):
pluto_data[key] = value
elif _is_torch_tensor_scalar(value):
pluto_data[key] = value.item()
elif (num := _as_scalar_number(value)) is not None:
pluto_data[key] = num
elif isinstance(value, str):
if self._last_logged_config.get(key, _MISSING) != value:
pluto_config[key] = value
elif isinstance(value, (list, tuple)):
# List of wandb media — convert each element.
converted_items = []
Expand All @@ -200,22 +248,102 @@ def log(self, data: Dict[str, Any], step=None, commit=None, **kwargs):
converted_items.append(c)
if converted_items:
pluto_data[key] = converted_items
else:
# Not a media list (e.g. list of primitives) —
# preserve as config if possible, else warn.
self._handle_unforwardable(key, value, pluto_config)
else:
# Try to convert wandb data types to pluto equivalents
converted = _convert_wandb_to_pluto(key, value, self._pluto)
if converted is not None:
pluto_data[key] = converted

else:
# No metric/media mapping — last-resort handling
# so the value is never silently dropped.
self._handle_unforwardable(key, value, pluto_config)

# Metrics and config are sent in independent try blocks: a
# failure logging metrics must NOT skip the config update (or
# vice versa) — str/bool from the same wandb.log() call live in
# config and would otherwise be silently lost.
if pluto_data:
log_kwargs = {}
if actual_step is not None:
log_kwargs['step'] = actual_step
self._pluto_run.log(pluto_data, **log_kwargs)
try:
log_kwargs = {}
if actual_step is not None:
log_kwargs['step'] = actual_step
self._pluto_run.log(pluto_data, **log_kwargs)
except Exception as e:
logger.debug(
f'pluto.compat.wandb: Failed to log metrics to Pluto: {e}'
)

if pluto_config:
try:
self._pluto_run.update_config(pluto_config)
# Only remember as synced once the update succeeds.
self._last_logged_config.update(pluto_config)
except Exception as e:
logger.debug(
f'pluto.compat.wandb: Failed to sync config to Pluto: {e}'
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pluto log failure skips config

Medium Severity

update_config runs in the same outer try as pluto_run.log. If metric/media logging raises, execution jumps to the broad handler and string or bool config updates from the same wandb.log call are never sent to Pluto.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 3d52d9c. Configure here.

Comment on lines +280 to +288

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Update the local config cache self._last_logged_config once the config has been successfully synced.

Suggested change
if pluto_config:
try:
self._pluto_run.update_config(pluto_config)
except Exception as e:
logger.debug(
f'pluto.compat.wandb: Failed to sync string/bool '
f'values to Pluto config: {e}'
)
if pluto_config:
try:
self._pluto_run.update_config(pluto_config)
self._last_logged_config.update(pluto_config)
except Exception as e:
logger.debug(
f'pluto.compat.wandb: Failed to sync string/bool '
f'values to Pluto config: {e}'
)

except Exception as e:
logger.debug(f'pluto.compat.wandb: Failed to log metrics to Pluto: {e}')
logger.debug(f'pluto.compat.wandb: Failed to prepare Pluto data: {e}')

return result

def _handle_unforwardable(self, key, value, pluto_config: Dict[str, Any]) -> None:
"""Last-resort handling for a value with no metric/media mapping.

Pluto only stores numbers (metrics), media/structured data, and
config — so values outside those (dicts, None, raw/multi-element
tensors, numpy arrays, unconvertible wandb media like Html/Object3D,
custom objects) have nowhere to go. Rather than dropping them
silently — which is what made missing data so hard to diagnose —
we:

1. Preserve the value as config if it survives update_config's own
normalization (mirrors how wandb keeps loose values in the run
summary). This covers nested dicts/lists of primitives, None, and
OmegaConf DictConfig/ListConfig nodes (which to_native_config
deep-converts). Skipped if unchanged since the last log.
2. Otherwise drop the Pluto copy (it still reached W&B) and report
it as a maintainer-coverage signal via Sentry telemetry — once
per key. This is a gap in OUR type handling, not a user error,
so we deliberately do NOT emit a user-facing warning: people
migrating away from wandb shouldn't be nagged about types only
we can fix. The local log stays at debug for self-host
debugging.
"""
storable, native = _config_storable_value(value)
if storable:
if self._last_logged_config.get(key, _MISSING) != native:
pluto_config[key] = native
return
if key in self._unforwardable_warned:
return
self._unforwardable_warned.add(key)
type_name = type(value).__name__
# Quiet locally (debug only) — not a user-actionable problem.
logger.debug(
'pluto.compat.wandb: not forwarding %r to Pluto — type %s has no '
'metric/media/config mapping (still logged to W&B).',
key,
type_name,
)
# Alert us (the maintainers) so we can add coverage for the type.
# Message is keyed on the type (not the run-specific key) so Sentry
# groups all occurrences of the same unhandled type together.
try:
from pluto import sentry

sentry.capture_message(
f'wandb compat: unforwardable Pluto log value of type '
f'{type_name!r} (no metric/media/config mapping)',
level='warning',
)
except Exception:
pass

def finish(self, exit_code=None, quiet=None):
"""Finish both wandb and Pluto runs."""
with self._close_lock:
Expand Down Expand Up @@ -498,14 +626,56 @@ def _resolve_wandb_to_pluto_run(wandb_run_id, project):
return None


def _is_torch_tensor_scalar(value):
"""Check if value is a scalar torch tensor."""
def _as_scalar_number(value):
"""Return value as a python int/float if it's a scalar number, else None.

Mirrors Pluto's own log() (op._process_log_item_sync), which forwards
anything exposing a callable ``.item()``. The shim previously only
accepted plain int/float and torch scalar tensors, so a value logged as
a numpy scalar (``np.int64``), a 0-d numpy array, or a non-torch 0-d
tensor was dropped here even though Pluto core would have kept it — e.g.
an ``epoch`` that is ``np.int64`` rather than a plain ``int``.

bool and str are excluded (Pluto drops bool metrics; str routes to
config). ``.item()`` on a multi-element array/tensor raises — we treat
that as "not a scalar" and return None, same as Pluto would fail it.
"""
if isinstance(value, (bool, str)):
return None
item = getattr(value, 'item', None)
if not callable(item):
return None
try:
result = item()
except Exception:
return None
if isinstance(result, bool) or not isinstance(result, (int, float)):
return None
return result


def _config_storable_value(value):
"""Return ``(storable, native)`` for the config fallback.

Mirrors what ``update_config`` actually does — normalize via
``to_native_config`` (which deep-converts OmegaConf ``DictConfig`` /
``ListConfig`` to native containers), then check JSON-serializability.
Keeping the gate in lockstep with ``update_config`` means a logged
``DictConfig`` is correctly stored as config, even though plain
``json.dumps`` would reject it. Tensors / ndarrays / custom objects still
fail (``to_native_config`` leaves them as-is) and fall through to the
Sentry path.

Returns ``(True, native_value)`` when storable, else ``(False, None)``.
"""
try:
import torch
from pluto.util import to_native_config

return isinstance(value, torch.Tensor) and value.dim() == 0
except ImportError:
return False
native = to_native_config(value)
json.dumps(native)
return True, native
except Exception:
return False, None


def _is_torch_distributed() -> bool:
Expand Down
Loading