Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions docs/user/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,16 @@ Common optional keys:

- `dark_frame_file` + `blank_frame_file` (both or neither)
- `fit.max_points`, `fit.seed`, `fit.loss`, `fit.f_scale`, `fit.max_nfev`
- `fit.initial_C`, `fit.initial_S_i_hat`
- `fit.s3_identifiability_threshold`, `fit.prior_weight`, `fit.c_relative_bounds`
- `fit.initial_C`, `fit.prior_weight`, `fit.c_relative_bounds`
- `output_profile`, `output_report`, `output_diagnostics`

Required fit key:

- `fit.S_i_hat` as an explicit 3-component fixed source state `[S1_hat, S2_hat, S3_hat]`

`fit.S_i_hat` is treated as a fixed source state during calibration; the
optimiser only fits `C`. Calibration does not provide a default `S_i_hat`.

Load step input notes:

- Each `load_steps[i].image_file` may be either:
Expand Down
164 changes: 40 additions & 124 deletions photoelastimetry/calibrate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
Calibration workflows for photoelastimetry experiments.

This module fits per-wavelength stress-optic coefficients (C), incoming
polarisation state (S_i_hat), and optional detector blank correction from a
multi-load calibration sequence.
This module fits per-wavelength stress-optic coefficients (C) for an explicit
fixed incoming polarisation state (S_i_hat), and optional detector blank
correction from a multi-load calibration sequence.

Supported calibration methods:
- ``brazilian_disk``: diametrically-loaded disk with analytical stress field.
Expand Down Expand Up @@ -357,7 +357,7 @@ def validate_calibration_config(config):
if n_no_load < 1:
warnings.warn(
"Calibration usually benefits from at least one no-load step (load ≈ 0); "
"continuing without it will use a default initial S_i_hat guess unless one is provided in fit.initial_S_i_hat.",
"continuing without it may reduce robustness.",
stacklevel=2,
)
if n_loaded < 3:
Expand All @@ -368,13 +368,23 @@ def validate_calibration_config(config):
)

fit_cfg = dict(config.get("fit", {}))
if "initial_S_i_hat" in fit_cfg:
raise ValueError("fit.initial_S_i_hat is no longer supported; use fit.S_i_hat.")
if "S_i_hat" not in fit_cfg:
raise ValueError(
"Calibration config must include fit.S_i_hat as a 3-component array " "[S1_hat, S2_hat, S3_hat]."
)
fit_s_i_hat = np.asarray(fit_cfg["S_i_hat"], dtype=float)
if fit_s_i_hat.shape != (3,):
raise ValueError(f"fit.S_i_hat must have shape (3,), got {fit_s_i_hat.shape}.")

fit_cfg.setdefault("max_points", 6000)
fit_cfg.setdefault("loss", "soft_l1")
fit_cfg.setdefault("f_scale", 0.05)
fit_cfg.setdefault("max_nfev", 300)
fit_cfg.setdefault("seed", 0)
fit_cfg.setdefault("s3_identifiability_threshold", 0.02)
fit_cfg.setdefault("prior_weight", 0.0)
fit_cfg["S_i_hat"] = fit_s_i_hat.tolist()
Comment on lines 370 to +387

output_profile = config.get("output_profile", "calibration_profile.json5")
output_report = config.get("output_report", "calibration_report.md")
Expand Down Expand Up @@ -739,27 +749,6 @@ def _prepare_sampling_points(roi_mask, max_points, seed):
return y_idx, x_idx


def _initial_s_i_hat_from_noload(measured_noload):
"""Estimate initial incoming Stokes state from no-load measurements."""
s1 = float(np.median(measured_noload[..., 0]))
s2 = float(np.median(measured_noload[..., 1]))
magnitude_sq = s1**2 + s2**2
s3 = float(np.sqrt(max(0.0, 1.0 - magnitude_sq)))
s_i = np.array([s1, s2, s3], dtype=float)

norm = np.linalg.norm(s_i)
if norm < 1e-12:
return np.array([1.0, 0.0, 0.0], dtype=float)
if norm > 1.0:
s_i = s_i / norm
return s_i


def _default_initial_s_i_hat():
"""Return a conservative default incoming Stokes state guess."""
return np.array([1.0, 0.0, 0.0], dtype=float)


def _build_dataset(config):
"""Load calibration images and construct regression dataset."""
steps = config["load_steps"]
Expand Down Expand Up @@ -814,8 +803,6 @@ def _build_dataset(config):
sigma_yy_steps = []
sigma_xy_steps = []

no_load_tolerance = config["load_zero_tolerance"]
no_load_measurements = []
diagnostic_step_index = int(np.argmax(np.abs(loads)))
diagnostic_image = corrected_images[diagnostic_step_index]

Expand Down Expand Up @@ -845,20 +832,6 @@ def _build_dataset(config):
sigma_yy_steps.append(sigma_yy)
sigma_xy_steps.append(sigma_xy)

if abs(load) <= no_load_tolerance:
no_load_measurements.append(measured)

if len(no_load_measurements) == 0:
warnings.warn(
"No no-load measurements were found after processing load steps; "
"using a default initial S_i_hat guess of [1, 0, 0].",
stacklevel=2,
)
initial_s_i_hat = _default_initial_s_i_hat()
else:
noload = np.concatenate(no_load_measurements, axis=0)
initial_s_i_hat = _initial_s_i_hat_from_noload(noload)

dataset = {
"method": config["method"],
"wavelengths": config["wavelengths"],
Expand All @@ -876,7 +849,6 @@ def _build_dataset(config):
# Backward-compatible key retained for previous diagnostics consumers.
"disk_mask": model_mask,
"blank_correction": blank_correction,
"initial_s_i_hat": initial_s_i_hat,
"X": X,
"Y": Y,
"geometry": config["geometry"],
Expand All @@ -898,38 +870,35 @@ def _normalise_s_i_hat(s_i_hat):
return s_i


def _decode_params(params, n_channels, fixed_s3=None):
"""Decode parameter vector into C and S_i_hat."""
C = np.asarray(params[:n_channels], dtype=float)
if fixed_s3 is None:
s_i_hat = np.asarray(params[n_channels : n_channels + 3], dtype=float)
else:
s_i_hat = np.array([params[n_channels], params[n_channels + 1], fixed_s3], dtype=float)
s_i_hat = _normalise_s_i_hat(s_i_hat)
return C, s_i_hat
def _resolve_fixed_s_i_hat(fit_config):
"""Resolve the fixed incoming Stokes state used during calibration."""
s_i_hat = np.asarray(fit_config["S_i_hat"], dtype=float)
if s_i_hat.shape != (3,):
raise ValueError(f"fit.S_i_hat must have shape (3,), got {s_i_hat.shape}.")
return _normalise_s_i_hat(s_i_hat)
Comment on lines +873 to +878


def calibration_residuals(params, dataset, fixed_s3=None):
def calibration_residuals(params, dataset, fixed_s_i_hat):
"""
Compute calibration residuals for least-squares fitting.

Parameters
----------
params : ndarray
Parameter vector containing C values and S_i_hat components.
1D parameter vector containing one C value per wavelength channel.
dataset : dict
Dataset dictionary from `_build_dataset`.
fixed_s3 : float, optional
If provided, S3 is fixed and only S1/S2 are optimised.
fixed_s_i_hat : array-like
Fixed source state used for the forward model.

Returns
-------
ndarray
1D residual vector.
"""
wavelengths = dataset["wavelengths"]
n_channels = wavelengths.size
C, s_i_hat = _decode_params(params, n_channels, fixed_s3=fixed_s3)
C = np.asarray(params[: wavelengths.size], dtype=float)
s_i_hat = _normalise_s_i_hat(fixed_s_i_hat)

residual_chunks = []
for measured, sigma_xx, sigma_yy, sigma_xy in zip(
Expand Down Expand Up @@ -959,7 +928,7 @@ def calibration_residuals(params, dataset, fixed_s3=None):

def fit_calibration_parameters(dataset, fit_config):
"""
Fit C and S_i_hat from calibration dataset.
Fit C from calibration dataset with fixed S_i_hat.

Parameters
----------
Expand All @@ -971,21 +940,15 @@ def fit_calibration_parameters(dataset, fit_config):
Returns
-------
dict
Fit results containing estimated C, S_i_hat, metrics, and optimizer state.
Fit results containing estimated C, fixed S_i_hat, metrics, and optimizer state.
"""
wavelengths = dataset["wavelengths"]
n_channels = wavelengths.size

init_c = _as_float_array(
fit_config.get("initial_C", np.full(n_channels, 3e-9)), expected_length=n_channels, name="initial_C"
)
init_s = np.asarray(fit_config.get("initial_S_i_hat", dataset["initial_s_i_hat"]), dtype=float)
if init_s.size == 2:
init_s = np.append(init_s, 0.0)
if init_s.size != 3:
raise ValueError(f"fit.initial_S_i_hat must have length 2 or 3, got {init_s.size}.")

init_s = _normalise_s_i_hat(init_s)
fixed_s_i_hat = _resolve_fixed_s_i_hat(fit_config)

c_relative_bounds = fit_config.get("c_relative_bounds")
if c_relative_bounds is not None:
Expand All @@ -1001,70 +964,26 @@ def fit_calibration_parameters(dataset, fit_config):
c_lower = np.full(n_channels, 1e-15)
c_upper = np.full(n_channels, 1e-4)

x0_full = np.concatenate([init_c, init_s], axis=0)
lb_full = np.concatenate([c_lower, np.full(3, -1.0)])
ub_full = np.concatenate([c_upper, np.full(3, 1.0)])
x0 = init_c.copy()
prior_weight = float(fit_config.get("prior_weight", 0.0))

def residual_full(params):
residual = calibration_residuals(params, dataset, fixed_s3=None)
def residual_c_only(params):
residual = calibration_residuals(params, dataset, fixed_s_i_hat=fixed_s_i_hat)
if prior_weight > 0:
prior = prior_weight * (params - x0_full)
prior = prior_weight * (params - x0)
return np.concatenate([residual, prior], axis=0)
return residual

result_full = least_squares(
residual_full,
x0_full,
bounds=(lb_full, ub_full),
result = least_squares(
residual_c_only,
x0,
bounds=(c_lower, c_upper),
loss=fit_config["loss"],
f_scale=float(fit_config["f_scale"]),
max_nfev=int(fit_config["max_nfev"]),
)

col_norms = (
np.linalg.norm(result_full.jac, axis=0) if result_full.jac.size > 0 else np.zeros_like(x0_full)
)
reference_norm = np.median(col_norms[:-1]) if col_norms.size > 1 else 0.0
s3_ratio = float(col_norms[-1] / max(reference_norm, 1e-12)) if col_norms.size > 0 else 0.0

fallback = False
fallback_reason = None
threshold = float(fit_config.get("s3_identifiability_threshold", 0.02))

if (not result_full.success) or (s3_ratio < threshold):
fallback = True
fallback_reason = "s3_identifiability" if s3_ratio < threshold else "full_fit_failed"

if fallback_reason == "s3_identifiability":
# If S3 is unidentifiable, restart fixed-S3 fit from the configured
# initial guess to avoid inheriting unstable full-model updates.
x0_fix = np.concatenate([init_c, init_s[:2]], axis=0)
else:
c_guess, s_guess = _decode_params(result_full.x, n_channels, fixed_s3=None)
x0_fix = np.concatenate([c_guess, s_guess[:2]], axis=0)
lb_fix = np.concatenate([c_lower, np.full(2, -1.0)])
ub_fix = np.concatenate([c_upper, np.full(2, 1.0)])

def residual_fixed(params):
residual = calibration_residuals(params, dataset, fixed_s3=0.0)
if prior_weight > 0:
prior = prior_weight * (params - x0_fix)
return np.concatenate([residual, prior], axis=0)
return residual

result = least_squares(
residual_fixed,
x0_fix,
bounds=(lb_fix, ub_fix),
loss=fit_config["loss"],
f_scale=float(fit_config["f_scale"]),
max_nfev=int(fit_config["max_nfev"]),
)
C_fit, S_fit = _decode_params(result.x, n_channels, fixed_s3=0.0)
else:
result = result_full
C_fit, S_fit = _decode_params(result.x, n_channels, fixed_s3=None)
C_fit = np.asarray(result.x[:n_channels], dtype=float)
S_fit = fixed_s_i_hat

residual = result.fun
fit_metrics = {
Expand All @@ -1077,9 +996,6 @@ def residual_fixed(params):
"n_residuals": int(residual.size),
"n_samples": int(dataset["sample_y"].size),
"n_load_steps": int(dataset["loads"].size),
"fallback_used": bool(fallback),
"fallback_reason": fallback_reason,
"s3_identifiability_ratio": float(s3_ratio),
}

return {
Expand Down
Loading
Loading