From ad12d74cb553c2962f35ba34d53acb8a5b9cd139 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Wed, 15 Apr 2026 14:42:05 +0800 Subject: [PATCH 01/29] feat: add post-run data integrity audit with reference source --- src/backtest/runner.py | 229 ++++++++++++++++++++++++++++++++++ src/config.py | 199 ++++++++++++++++++++++++++++- tests/test_backtest_runner.py | 108 ++++++++++++++++ tests/test_config.py | 69 ++++++++++ 4 files changed, 604 insertions(+), 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 43916ead..efb3fab2 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -20,6 +20,7 @@ CollectionConfig, Config, ResultConsistencyConfig, + ResultConsistencyDataIntegrityAuditConfig, ResultConsistencyExecutionPriceVarianceConfig, ResultConsistencyTransactionCostBreakevenConfig, ResultConsistencyTransactionCostRobustnessConfig, @@ -235,6 +236,7 @@ class BacktestRunner: "result_consistency.outlier_dependency", "result_consistency.execution_price_variance", "result_consistency.lookahead_shuffle_test", + "result_consistency.data_integrity_audit", "result_consistency.transaction_cost_robustness", ) @@ -451,6 +453,20 @@ def _serialize_lookahead_shuffle_test_profile( ), } + @staticmethod + def _serialize_data_integrity_audit_profile( + data_integrity_audit: Any, + ) -> dict[str, Any] | None: + if data_integrity_audit is None: + return None + return { + "min_overlap_ratio": getattr(data_integrity_audit, "min_overlap_ratio", None), + "max_median_ohlc_diff_bps": getattr( + data_integrity_audit, "max_median_ohlc_diff_bps", None + ), + "max_p95_ohlc_diff_bps": getattr(data_integrity_audit, "max_p95_ohlc_diff_bps", None), + } + @staticmethod def _serialize_transaction_cost_breakeven_profile( breakeven: Any, @@ -561,6 +577,9 @@ def _serialize_result_consistency_profile(result_consistency: Any) -> dict[str, "lookahead_shuffle_test": BacktestRunner._serialize_lookahead_shuffle_test_profile( getattr(result_consistency, "lookahead_shuffle_test", None) ), + "data_integrity_audit": BacktestRunner._serialize_data_integrity_audit_profile( + getattr(result_consistency, "data_integrity_audit", None) + ), "transaction_cost_robustness": BacktestRunner._serialize_transaction_cost_robustness_profile( getattr(result_consistency, "transaction_cost_robustness", None) ), @@ -613,6 +632,8 @@ def _active_result_consistency_gates(result_consistency: Any) -> set[str]: active.add("result_consistency.execution_price_variance") if getattr(result_consistency, "lookahead_shuffle_test", None) is not None: active.add("result_consistency.lookahead_shuffle_test") + if getattr(result_consistency, "data_integrity_audit", None) is not None: + active.add("result_consistency.data_integrity_audit") if getattr(result_consistency, "transaction_cost_robustness", None) is not None: active.add("result_consistency.transaction_cost_robustness") return active @@ -1726,6 +1747,19 @@ def _load_lookahead_shuffle_test_policy( return getattr(resolved_rc, "lookahead_shuffle_test", None) return None + def _load_data_integrity_audit_policy( + self, collection: CollectionConfig + ) -> ResultConsistencyDataIntegrityAuditConfig | None: + if not collection.reference_source: + return None + collection_validation = getattr(collection, "validation", None) + resolved_rc: ResultConsistencyConfig | None = ( + getattr(collection_validation, "result_consistency", None) if collection_validation else None + ) + if resolved_rc is None: + return None + return getattr(resolved_rc, "data_integrity_audit", None) + def _load_transaction_cost_robustness_policy( self, collection: CollectionConfig ) -> ResultConsistencyTransactionCostRobustnessConfig | None: @@ -3908,6 +3942,9 @@ def _strategy_validate_results_common(self, context: ValidationContext) -> GateD # Fail fast on cheap result-consistency gates before expensive shuffle checks. return self._strategy_validation_reject_or_continue(reasons) self._run_lookahead_shuffle_validation(context, plan, outcome, reasons) + if reasons: + return self._strategy_validation_reject_or_continue(reasons) + self._run_data_integrity_audit_validation(context, outcome, reasons) if reasons: return self._strategy_validation_reject_or_continue(reasons) self._run_transaction_cost_robustness_validation(context, plan, outcome, reasons) @@ -3945,6 +3982,198 @@ def _run_lookahead_shuffle_validation( if lookahead_reason is not None: reasons.append(lookahead_reason) + def _run_data_integrity_audit_validation( + self, + context: ValidationContext, + outcome: StrategyEvalOutcome, + reasons: list[str], + ) -> None: + policy = self._load_data_integrity_audit_policy(context.job.collection) + if policy is None: + return + audit_reason, audit_meta = self._data_integrity_audit_result(context, policy) + self._attach_post_run_meta(outcome, "data_integrity_audit", audit_meta) + if audit_reason is not None: + reasons.append(audit_reason) + + @staticmethod + def _data_integrity_audit_indeterminate( + reason: str, + *, + collection: CollectionConfig, + policy: ResultConsistencyDataIntegrityAuditConfig, + details: dict[str, Any] | None = None, + ) -> tuple[str, dict[str, Any]]: + meta: dict[str, Any] = { + "is_complete": False, + "status": "indeterminate", + "reason": reason, + "source": collection.source, + "reference_source": collection.reference_source, + "min_overlap_ratio": policy.min_overlap_ratio, + "max_median_ohlc_diff_bps": policy.max_median_ohlc_diff_bps, + "max_p95_ohlc_diff_bps": policy.max_p95_ohlc_diff_bps, + } + if details: + meta.update(details) + return f"data_integrity_audit_indeterminate(reason={reason})", meta + + @staticmethod + def _data_integrity_audit_reference_collection(collection: CollectionConfig) -> CollectionConfig | None: + if not collection.reference_source: + return None + return CollectionConfig( + name=collection.name, + source=collection.reference_source, + symbols=list(collection.symbols), + reference_source=collection.reference_source, + exchange=collection.exchange, + currency=collection.currency, + quote=collection.quote, + fees=collection.fees, + slippage=collection.slippage, + validation=collection.validation, + ) + + @staticmethod + def _data_integrity_ohlc_diff_metrics( + primary: pd.DataFrame, + reference: pd.DataFrame, + ) -> dict[str, float]: + eps = 1e-12 + columns = ["Open", "High", "Low", "Close"] + diffs: list[np.ndarray] = [] + for column in columns: + lhs = primary[column].to_numpy(dtype=float) + rhs = reference[column].to_numpy(dtype=float) + rel = np.abs(lhs - rhs) / np.maximum(np.abs(rhs), eps) + diffs.append(rel * 10000.0) + all_diffs = np.concatenate(diffs) if diffs else np.array([], dtype=float) + if all_diffs.size == 0: + return { + "median_ohlc_diff_bps": float("nan"), + "p95_ohlc_diff_bps": float("nan"), + "max_ohlc_diff_bps": float("nan"), + } + return { + "median_ohlc_diff_bps": float(np.nanmedian(all_diffs)), + "p95_ohlc_diff_bps": float(np.nanpercentile(all_diffs, 95)), + "max_ohlc_diff_bps": float(np.nanmax(all_diffs)), + } + + def _data_integrity_audit_result( + self, + context: ValidationContext, + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[str | None, dict[str, Any]]: + validated_data = context.validated_data + if validated_data is None: + return self._data_integrity_audit_indeterminate( + "missing_validated_data", + collection=context.job.collection, + policy=policy, + ) + reference_collection = self._data_integrity_audit_reference_collection(context.job.collection) + if reference_collection is None: + return self._data_integrity_audit_indeterminate( + "missing_reference_source", + collection=context.job.collection, + policy=policy, + ) + _, _, _, _, _, _, _, _, _, _, calendar_timezone = self._load_data_quality_policy( + context.job.collection + ) + try: + reference_source = self._make_source(reference_collection) + reference_raw_df = reference_source.fetch(context.job.symbol, context.job.timeframe, only_cached=False) + reference_df, reference_canonicalization = self._canonicalize_validation_frame( + reference_raw_df, + calendar_timezone=calendar_timezone, + ) + except Exception as exc: + return self._data_integrity_audit_indeterminate( + "reference_fetch_failed", + collection=context.job.collection, + policy=policy, + details={"error": str(exc)}, + ) + primary_df = validated_data.raw_df + if primary_df.empty or reference_df.empty: + return self._data_integrity_audit_indeterminate( + "empty_frame", + collection=context.job.collection, + policy=policy, + details={ + "primary_bars": int(len(primary_df)), + "reference_bars": int(len(reference_df)), + }, + ) + required_columns = ["Open", "High", "Low", "Close"] + missing_columns = [ + name + for name in required_columns + if name not in primary_df.columns or name not in reference_df.columns + ] + if missing_columns: + return self._data_integrity_audit_indeterminate( + "missing_ohlc_columns", + collection=context.job.collection, + policy=policy, + details={"missing_columns": missing_columns}, + ) + overlap_index = primary_df.index.intersection(reference_df.index) + primary_bars = int(len(primary_df)) + reference_bars = int(len(reference_df)) + overlap_bars = int(len(overlap_index)) + overlap_ratio = float(overlap_bars / primary_bars) if primary_bars > 0 else 0.0 + missing_primary_bar_pct = float((1.0 - overlap_ratio) * 100.0) + overlap_primary = primary_df.loc[overlap_index, required_columns] + overlap_reference = reference_df.loc[overlap_index, required_columns] + divergence = self._data_integrity_ohlc_diff_metrics(overlap_primary, overlap_reference) + max_median = float(policy.max_median_ohlc_diff_bps or 0.0) + max_p95 = float(policy.max_p95_ohlc_diff_bps or 0.0) + min_overlap = float(policy.min_overlap_ratio or 0.0) + failed_checks: list[str] = [] + if overlap_ratio < min_overlap: + failed_checks.append( + "overlap_ratio_below_threshold(" + f"required={min_overlap}, available={overlap_ratio}, overlap_bars={overlap_bars}, " + f"primary_bars={primary_bars})" + ) + median_diff = divergence["median_ohlc_diff_bps"] + if np.isfinite(median_diff) and median_diff > max_median: + failed_checks.append( + "median_ohlc_diff_bps_exceeded(" + f"max_allowed={max_median}, available={median_diff})" + ) + p95_diff = divergence["p95_ohlc_diff_bps"] + if np.isfinite(p95_diff) and p95_diff > max_p95: + failed_checks.append( + "p95_ohlc_diff_bps_exceeded(" + f"max_allowed={max_p95}, available={p95_diff})" + ) + meta: dict[str, Any] = { + "is_complete": True, + "status": "complete", + "source": context.job.collection.source, + "reference_source": context.job.collection.reference_source, + "primary_bars": primary_bars, + "reference_bars": reference_bars, + "overlap_bars": overlap_bars, + "overlap_ratio": overlap_ratio, + "missing_primary_bar_pct": missing_primary_bar_pct, + "min_overlap_ratio": min_overlap, + "max_median_ohlc_diff_bps": max_median, + "max_p95_ohlc_diff_bps": max_p95, + "reference_canonicalization": reference_canonicalization, + **divergence, + "failed_checks": list(failed_checks), + } + if failed_checks: + reason = "data_integrity_audit_failed(" + "; ".join(failed_checks) + ")" + return reason, meta + return None, meta + def _run_transaction_cost_robustness_validation( self, context: ValidationContext, diff --git a/src/config.py b/src/config.py index 56ebea90..a2ab7db7 100644 --- a/src/config.py +++ b/src/config.py @@ -22,6 +22,7 @@ class CollectionConfig: name: str source: str # yfinance, ccxt, custom symbols: list[str] + reference_source: str | None = None exchange: str | None = None # for ccxt currency: str | None = None quote: str | None = None # for ccxt symbols e.g., USDT @@ -151,6 +152,13 @@ class ResultConsistencyTransactionCostRobustnessConfig: breakeven: ResultConsistencyTransactionCostBreakevenConfig | None = None +@dataclass +class ResultConsistencyDataIntegrityAuditConfig: + min_overlap_ratio: float | None = None + max_median_ohlc_diff_bps: float | None = None + max_p95_ohlc_diff_bps: float | None = None + + @dataclass class ResultConsistencyConfig: min_metric: float | None = None @@ -159,6 +167,7 @@ class ResultConsistencyConfig: execution_price_variance: ResultConsistencyExecutionPriceVarianceConfig | None = None lookahead_shuffle_test: ValidationLookaheadShuffleTestConfig | None = None transaction_cost_robustness: ResultConsistencyTransactionCostRobustnessConfig | None = None + data_integrity_audit: ResultConsistencyDataIntegrityAuditConfig | None = None @dataclass @@ -195,6 +204,10 @@ class Config: LOOKAHEAD_SHUFFLE_TEST_SEED_MIN = 0 LOOKAHEAD_SHUFFLE_TEST_FAILED_PERMUTATIONS_MIN = 0 LOOKAHEAD_SHUFFLE_TEST_CONFIG_PREFIX = "validation.result_consistency.lookahead_shuffle_test" +DATA_INTEGRITY_AUDIT_CONFIG_PREFIX = "validation.result_consistency.data_integrity_audit" +DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT = 0.99 +DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT = 5.0 +DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT = 20.0 TRANSACTION_COST_ROBUSTNESS_MODE_ANALYTICS = "analytics" TRANSACTION_COST_ROBUSTNESS_MODE_ENFORCE = "enforce" TRANSACTION_COST_ROBUSTNESS_MODES = { @@ -591,6 +604,99 @@ def _apply_result_consistency_execution_price_variance_defaults( ) +def _normalize_result_consistency_data_integrity_audit_config( + cfg: ResultConsistencyDataIntegrityAuditConfig | None, + prefix: str, +) -> ResultConsistencyDataIntegrityAuditConfig | None: + if cfg is None: + return None + min_overlap_ratio_raw = getattr(cfg, "min_overlap_ratio", None) + min_overlap_ratio = ( + _coerce_float(min_overlap_ratio_raw, f"{prefix}.min_overlap_ratio") + if min_overlap_ratio_raw is not None + else None + ) + if min_overlap_ratio is not None and not ( + VALIDATION_PROBABILITY_MIN <= min_overlap_ratio <= VALIDATION_PROBABILITY_MAX + ): + raise ValueError( + f"`{prefix}.min_overlap_ratio` must be between {VALIDATION_PROBABILITY_MIN} and " + f"{VALIDATION_PROBABILITY_MAX}" + ) + max_median_ohlc_diff_bps_raw = getattr(cfg, "max_median_ohlc_diff_bps", None) + max_median_ohlc_diff_bps = ( + _coerce_float(max_median_ohlc_diff_bps_raw, f"{prefix}.max_median_ohlc_diff_bps") + if max_median_ohlc_diff_bps_raw is not None + else None + ) + if ( + max_median_ohlc_diff_bps is not None + and max_median_ohlc_diff_bps < VALIDATION_NON_NEGATIVE_FLOAT_MIN + ): + raise ValueError( + f"`{prefix}.max_median_ohlc_diff_bps` must be >= {VALIDATION_NON_NEGATIVE_FLOAT_MIN}" + ) + max_p95_ohlc_diff_bps_raw = getattr(cfg, "max_p95_ohlc_diff_bps", None) + max_p95_ohlc_diff_bps = ( + _coerce_float(max_p95_ohlc_diff_bps_raw, f"{prefix}.max_p95_ohlc_diff_bps") + if max_p95_ohlc_diff_bps_raw is not None + else None + ) + if max_p95_ohlc_diff_bps is not None and max_p95_ohlc_diff_bps < VALIDATION_NON_NEGATIVE_FLOAT_MIN: + raise ValueError( + f"`{prefix}.max_p95_ohlc_diff_bps` must be >= {VALIDATION_NON_NEGATIVE_FLOAT_MIN}" + ) + if ( + max_median_ohlc_diff_bps is not None + and max_p95_ohlc_diff_bps is not None + and max_p95_ohlc_diff_bps < max_median_ohlc_diff_bps + ): + raise ValueError( + f"`{prefix}.max_p95_ohlc_diff_bps` must be >= `{prefix}.max_median_ohlc_diff_bps`" + ) + return ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=min_overlap_ratio, + max_median_ohlc_diff_bps=max_median_ohlc_diff_bps, + max_p95_ohlc_diff_bps=max_p95_ohlc_diff_bps, + ) + + +def _apply_result_consistency_data_integrity_audit_defaults( + cfg: ResultConsistencyDataIntegrityAuditConfig, +) -> ResultConsistencyDataIntegrityAuditConfig: + min_overlap_ratio = ( + cfg.min_overlap_ratio + if cfg.min_overlap_ratio is not None + else DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT + ) + max_median_ohlc_diff_bps = ( + cfg.max_median_ohlc_diff_bps + if cfg.max_median_ohlc_diff_bps is not None + else DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT + ) + max_p95_ohlc_diff_bps = ( + cfg.max_p95_ohlc_diff_bps + if cfg.max_p95_ohlc_diff_bps is not None + else DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT + ) + if max_p95_ohlc_diff_bps < max_median_ohlc_diff_bps: + raise ValueError( + f"`{DATA_INTEGRITY_AUDIT_CONFIG_PREFIX}.max_p95_ohlc_diff_bps` must be >= " + f"`{DATA_INTEGRITY_AUDIT_CONFIG_PREFIX}.max_median_ohlc_diff_bps`" + ) + return ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=min_overlap_ratio, + max_median_ohlc_diff_bps=max_median_ohlc_diff_bps, + max_p95_ohlc_diff_bps=max_p95_ohlc_diff_bps, + ) + + +def _default_data_integrity_audit_config() -> ResultConsistencyDataIntegrityAuditConfig: + return _apply_result_consistency_data_integrity_audit_defaults( + ResultConsistencyDataIntegrityAuditConfig() + ) + + def _normalize_transaction_cost_breakeven_config( cfg: ResultConsistencyTransactionCostBreakevenConfig | None, prefix: str, @@ -918,6 +1024,10 @@ def _normalize_result_consistency_config( getattr(cfg, "lookahead_shuffle_test", None), f"{prefix}.lookahead_shuffle_test", ) + data_integrity_audit = _normalize_result_consistency_data_integrity_audit_config( + getattr(cfg, "data_integrity_audit", None), + f"{prefix}.data_integrity_audit", + ) transaction_cost_robustness = _normalize_transaction_cost_robustness_config( getattr(cfg, "transaction_cost_robustness", None), f"{prefix}.transaction_cost_robustness", @@ -926,12 +1036,13 @@ def _normalize_result_consistency_config( outlier_dependency is None and execution_price_variance is None and lookahead_shuffle_test is None + and data_integrity_audit is None and transaction_cost_robustness is None ): raise ValueError( f"Invalid `{prefix}`: expected at least one configured module " "(`outlier_dependency`, `execution_price_variance`, `lookahead_shuffle_test`, " - "or `transaction_cost_robustness`)" + "`data_integrity_audit`, or `transaction_cost_robustness`)" ) min_metric_raw = getattr(cfg, "min_metric", None) min_metric = _coerce_float(min_metric_raw, f"{prefix}.min_metric") if min_metric_raw is not None else None @@ -945,6 +1056,7 @@ def _normalize_result_consistency_config( outlier_dependency=outlier_dependency, execution_price_variance=execution_price_variance, lookahead_shuffle_test=lookahead_shuffle_test, + data_integrity_audit=data_integrity_audit, transaction_cost_robustness=transaction_cost_robustness, ) @@ -971,6 +1083,11 @@ def _apply_result_consistency_defaults(cfg: ResultConsistencyConfig) -> ResultCo if cfg.lookahead_shuffle_test is not None else None ), + data_integrity_audit=( + _apply_result_consistency_data_integrity_audit_defaults(cfg.data_integrity_audit) + if cfg.data_integrity_audit is not None + else None + ), transaction_cost_robustness=( _apply_transaction_cost_robustness_defaults(cfg.transaction_cost_robustness) if cfg.transaction_cost_robustness is not None @@ -1060,6 +1177,10 @@ def _merge_result_consistency_config( getattr(base, "lookahead_shuffle_test", None), getattr(override, "lookahead_shuffle_test", None), ), + data_integrity_audit=_merge_result_consistency_data_integrity_audit_config( + getattr(base, "data_integrity_audit", None), + getattr(override, "data_integrity_audit", None), + ), transaction_cost_robustness=_merge_transaction_cost_robustness_config( getattr(base, "transaction_cost_robustness", None), getattr(override, "transaction_cost_robustness", None), @@ -1069,6 +1190,7 @@ def _merge_result_consistency_config( merged.outlier_dependency is None and merged.execution_price_variance is None and merged.lookahead_shuffle_test is None + and merged.data_integrity_audit is None and merged.transaction_cost_robustness is None ): return None @@ -1102,6 +1224,19 @@ def _merge_result_consistency_execution_price_variance_config( ) +def _merge_result_consistency_data_integrity_audit_config( + base: ResultConsistencyDataIntegrityAuditConfig | None, + override: ResultConsistencyDataIntegrityAuditConfig | None, +) -> ResultConsistencyDataIntegrityAuditConfig | None: + if base is None and override is None: + return None + return ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=_merged_field(base, override, "min_overlap_ratio"), + max_median_ohlc_diff_bps=_merged_field(base, override, "max_median_ohlc_diff_bps"), + max_p95_ohlc_diff_bps=_merged_field(base, override, "max_p95_ohlc_diff_bps"), + ) + + def _merge_transaction_cost_breakeven_config( base: ResultConsistencyTransactionCostBreakevenConfig | None, override: ResultConsistencyTransactionCostBreakevenConfig | None, @@ -1496,6 +1631,23 @@ def resolve_validation_overrides(cfg: Config) -> None: global_result_consistency_policy, getattr(collection_validation, "result_consistency", None), ) + if collection.reference_source: + base_policy = ( + resolved_result_consistency + if resolved_result_consistency is not None + else ResultConsistencyConfig() + ) + if getattr(base_policy, "data_integrity_audit", None) is None: + base_policy = ResultConsistencyConfig( + min_metric=base_policy.min_metric, + min_trades=base_policy.min_trades, + outlier_dependency=base_policy.outlier_dependency, + execution_price_variance=base_policy.execution_price_variance, + lookahead_shuffle_test=base_policy.lookahead_shuffle_test, + transaction_cost_robustness=base_policy.transaction_cost_robustness, + data_integrity_audit=_default_data_integrity_audit_config(), + ) + resolved_result_consistency = _merge_result_consistency_config(base_policy, None) if ( resolved_data_quality is None and resolved_optimization is None @@ -2090,6 +2242,17 @@ def _parse_result_consistency(raw: Any, prefix: str) -> ResultConsistencyConfig if isinstance(lookahead_shuffle_test_raw, dict) else None ) + data_integrity_audit_raw = parsed_raw.get("data_integrity_audit") + if data_integrity_audit_raw is not None and not isinstance(data_integrity_audit_raw, dict): + raise ValueError(f"Invalid `{prefix}.data_integrity_audit`: expected a mapping") + data_integrity_audit = ( + _parse_result_consistency_data_integrity_audit( + data_integrity_audit_raw, + f"{prefix}.data_integrity_audit", + ) + if isinstance(data_integrity_audit_raw, dict) + else None + ) transaction_cost_robustness_raw = parsed_raw.get("transaction_cost_robustness") if ( transaction_cost_robustness_raw is not None @@ -2114,6 +2277,7 @@ def _parse_result_consistency(raw: Any, prefix: str) -> ResultConsistencyConfig outlier_dependency=outlier_dependency, execution_price_variance=execution_price_variance, lookahead_shuffle_test=lookahead_shuffle_test, + data_integrity_audit=data_integrity_audit, transaction_cost_robustness=transaction_cost_robustness, ), prefix, @@ -2170,6 +2334,36 @@ def _parse_result_consistency_execution_price_variance( ) +def _parse_result_consistency_data_integrity_audit( + raw: Any, + prefix: str, +) -> ResultConsistencyDataIntegrityAuditConfig | None: + if raw is None: + return None + parsed_raw = require_mapping(raw, prefix) + return ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=parse_optional_float( + parsed_raw, + prefix, + "min_overlap_ratio", + min_value=VALIDATION_PROBABILITY_MIN, + max_value=VALIDATION_PROBABILITY_MAX, + ), + max_median_ohlc_diff_bps=parse_optional_float( + parsed_raw, + prefix, + "max_median_ohlc_diff_bps", + min_value=VALIDATION_NON_NEGATIVE_FLOAT_MIN, + ), + max_p95_ohlc_diff_bps=parse_optional_float( + parsed_raw, + prefix, + "max_p95_ohlc_diff_bps", + min_value=VALIDATION_NON_NEGATIVE_FLOAT_MIN, + ), + ) + + def _parse_result_consistency_transaction_cost_breakeven( raw: Any, prefix: str ) -> ResultConsistencyTransactionCostBreakevenConfig | None: @@ -2289,6 +2483,9 @@ def _parse_collections(raw_collections: Any) -> list[CollectionConfig]: name=str(collection_raw["name"]).strip(), source=str(collection_raw["source"]).strip(), symbols=[str(symbol).strip() for symbol in symbols_raw], + reference_source=parse_optional_str( + collection_raw, "reference_source", normalize=False + ), exchange=parse_optional_str(collection_raw, "exchange", normalize=False), currency=parse_optional_str(collection_raw, "currency", normalize=False), quote=parse_optional_str(collection_raw, "quote", normalize=False), diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index 3e566265..ead81c35 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -292,6 +292,7 @@ def _result_consistency_config(**overrides) -> ResultConsistencyConfig: "outlier_dependency": None, "execution_price_variance": None, "lookahead_shuffle_test": None, + "data_integrity_audit": None, "transaction_cost_robustness": None, } payload.update(overrides) @@ -616,6 +617,11 @@ def test_serialize_result_consistency_profile_keeps_schema_and_key_order(): seed=1337, max_failed_permutations=2, ), + data_integrity_audit=SimpleNamespace( + min_overlap_ratio=0.99, + max_median_ohlc_diff_bps=5.0, + max_p95_ohlc_diff_bps=20.0, + ), transaction_cost_robustness=SimpleNamespace( mode="analytics", stress_multipliers=[2.0, 5.0], @@ -638,6 +644,7 @@ def test_serialize_result_consistency_profile_keeps_schema_and_key_order(): "outlier_dependency", "execution_price_variance", "lookahead_shuffle_test", + "data_integrity_audit", "transaction_cost_robustness", ] assert payload["min_metric"] == pytest.approx(0.5) @@ -654,6 +661,11 @@ def test_serialize_result_consistency_profile_keeps_schema_and_key_order(): "seed": 1337, "max_failed_permutations": 2, } + assert payload["data_integrity_audit"] == { + "min_overlap_ratio": 0.99, + "max_median_ohlc_diff_bps": 5.0, + "max_p95_ohlc_diff_bps": 20.0, + } assert payload["transaction_cost_robustness"] == { "mode": "analytics", "stress_multipliers": [2.0, 5.0], @@ -1851,6 +1863,28 @@ def fetch(self, symbol, timeframe, only_cached=False): monkeypatch.setattr(BacktestRunner, "_make_source", lambda self, col: _Source()) +def _patch_primary_and_reference_sources( + monkeypatch, + *, + primary_df: pd.DataFrame, + reference_df: pd.DataFrame, + reference_source: str = "alphavantage", +) -> None: + class _Source: + def __init__(self, df: pd.DataFrame): + self._df = df + + def fetch(self, symbol, timeframe, only_cached=False): + return self._df.copy() + + def _make_source(self, col): + if col.source == reference_source: + return _Source(reference_df) + return _Source(primary_df) + + monkeypatch.setattr(BacktestRunner, "_make_source", _make_source) + + def _lookahead_shuffle_test_config( *, permutations: int = 100, @@ -3274,6 +3308,80 @@ def test_run_all_lookahead_shuffle_test_does_not_mutate_cached_stats_payload( assert post_run_meta["lookahead_shuffle_test"]["is_complete"] is True +def test_run_all_data_integrity_audit_passes_and_attaches_meta(tmp_path, monkeypatch): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + reference = primary.copy() + _patch_primary_and_reference_sources( + monkeypatch, + primary_df=primary, + reference_df=reference, + reference_source="alphavantage", + ) + eval_calls = _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert len(results) == 1 + assert eval_calls["count"] == 2 + post_run_meta = results[0].stats.get("post_run_meta") + assert post_run_meta is not None + audit_meta = post_run_meta["data_integrity_audit"] + assert audit_meta["is_complete"] is True + assert audit_meta["status"] == "complete" + assert audit_meta["overlap_ratio"] == pytest.approx(1.0) + assert audit_meta["median_ohlc_diff_bps"] == pytest.approx(0.0) + assert "result_consistency.data_integrity_audit" in runner.active_validation_gates + + +def test_run_all_data_integrity_audit_rejects_on_ohlc_drift(tmp_path, monkeypatch): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + reference = primary.copy() + reference[["Open", "High", "Low", "Close"]] = reference[["Open", "High", "Low", "Close"]] * 1.02 + _patch_primary_and_reference_sources( + monkeypatch, + primary_df=primary, + reference_df=reference, + reference_source="alphavantage", + ) + eval_calls = _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert results == [] + assert eval_calls["count"] == 2 + assert len(runner.failures) == 1 + failure = runner.failures[0] + assert failure["stage"] == "strategy_validation" + assert "data_integrity_audit_failed(" in failure["error"] + assert "median_ohlc_diff_bps_exceeded" in failure["error"] + + +def test_run_all_data_integrity_audit_rejects_on_low_overlap(tmp_path, monkeypatch): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + reference = primary.iloc[::2].copy() + _patch_primary_and_reference_sources( + monkeypatch, + primary_df=primary, + reference_df=reference, + reference_source="alphavantage", + ) + + results = runner.run_all() + + assert results == [] + assert len(runner.failures) == 1 + failure = runner.failures[0] + assert failure["stage"] == "strategy_validation" + assert "data_integrity_audit_failed(" in failure["error"] + assert "overlap_ratio_below_threshold" in failure["error"] + + def test_transaction_cost_robustness_result_attaches_meta_without_cache_pollution( tmp_path, monkeypatch ): diff --git a/tests/test_config.py b/tests/test_config.py index 047cb411..a1bb0ef9 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -400,6 +400,75 @@ def test_load_config_lookahead_shuffle_test_defaults(tmp_path: Path): ) +def test_load_config_reference_source_enables_data_integrity_audit_defaults(tmp_path: Path): + config_text = """ +collections: + - name: test + source: yfinance + reference_source: alphavantage + symbols: ['AAPL'] +timeframes: ['1d'] +metric: sharpe +""" + path = tmp_path / "config.yaml" + path.write_text(config_text) + + cfg = load_config(path) + assert cfg.collections[0].reference_source == "alphavantage" + assert cfg.collections[0].validation is not None + assert cfg.collections[0].validation.result_consistency is not None + audit = cfg.collections[0].validation.result_consistency.data_integrity_audit + assert audit is not None + assert audit.min_overlap_ratio == pytest.approx(0.99) + assert audit.max_median_ohlc_diff_bps == pytest.approx(5.0) + assert audit.max_p95_ohlc_diff_bps == pytest.approx(20.0) + + +def test_load_config_data_integrity_audit_rejects_non_mapping(tmp_path: Path): + with pytest.raises( + ValueError, + match=r"validation\.result_consistency\.data_integrity_audit", + ): + _load_from_blocks( + tmp_path, + validation_block={ + "result_consistency": _result_consistency_block(data_integrity_audit=True) + }, + ) + + +def test_load_config_data_integrity_audit_collection_override_inherits_global(tmp_path: Path): + config_text = """ +collections: + - name: test + source: yfinance + reference_source: alphavantage + symbols: ['AAPL'] + validation: + result_consistency: + data_integrity_audit: + min_overlap_ratio: 0.97 +timeframes: ['1d'] +metric: sharpe +validation: + result_consistency: + data_integrity_audit: + max_median_ohlc_diff_bps: 2.0 + max_p95_ohlc_diff_bps: 10.0 +""" + path = tmp_path / "config.yaml" + path.write_text(config_text) + + cfg = load_config(path) + assert cfg.collections[0].validation is not None + assert cfg.collections[0].validation.result_consistency is not None + audit = cfg.collections[0].validation.result_consistency.data_integrity_audit + assert audit is not None + assert audit.min_overlap_ratio == pytest.approx(0.97) + assert audit.max_median_ohlc_diff_bps == pytest.approx(2.0) + assert audit.max_p95_ohlc_diff_bps == pytest.approx(10.0) + + def test_load_config_transaction_cost_robustness_inherits_global_overrides(tmp_path: Path): cfg = _load_from_blocks( tmp_path, From 2fbbc8de313bdea44df9a66d60a0c45a7a9f1d84 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Wed, 15 Apr 2026 14:53:24 +0800 Subject: [PATCH 02/29] docs: add new module to README.md and example.yaml --- README.md | 10 +++++++++- config/example.yaml | 6 ++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 38fbd030..196a48d5 100644 --- a/README.md +++ b/README.md @@ -254,6 +254,14 @@ See new collection examples under `config/collections/` for FX intraday via Finn - collection-level overrides are supported via `collections[].validation.optimization` and are resolved against global `validation.optimization` during config loading. - `validation.result_consistency` controls strategy-result concentration checks: + - `data_integrity_audit` (optional thresholds module; gate is active when `collections[].reference_source` is set): + - purpose: compare canonicalized bars from the primary `source` and a secondary `reference_source` + to catch bad prints / ghost bars before accepting strategy results + - `min_overlap_ratio` (optional, default `0.99`, `0..1`): minimum timestamp overlap required between sources + - `max_median_ohlc_diff_bps` (optional, default `5.0`, `>=0`): maximum allowed median OHLC drift (bps) + - `max_p95_ohlc_diff_bps` (optional, default `20.0`, `>=0`): maximum allowed p95 OHLC drift (bps) + - action: fixed to `reject_result` when overlap/drift thresholds are breached (or comparison is indeterminate) + - diagnostics are attached under `post_run_meta.data_integrity_audit` - `outlier_dependency` (optional module; active when configured): - `slices` (required, `>=2`): number of equal time-slices used for diagnostics - `profit_share_threshold` (required, `0..1`) @@ -297,7 +305,7 @@ Structured logs reflect this directly via gate actions: - `data_validation_gate` can emit `skip_optimization` (job-level optimization disable). - `strategy_optimization_gate` can emit `baseline_only` (strategy-level baseline fallback) or `skip_job`. - `strategy_validation_gate` can emit `reject_result` for outlier dependency, - execution price variance, and lookahead shuffle testing. + execution price variance, lookahead shuffle testing, data integrity audit, and transaction-cost robustness. Numeric config parsing follows `src/config.py` coercion helpers: - numeric fields are strict types: use YAML numbers, not quoted numeric strings diff --git a/config/example.yaml b/config/example.yaml index f721821f..66418cda 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -54,6 +54,10 @@ validation: result_consistency: min_metric: 0.5 # fail fast: require at least this metric before expensive checks min_trades: 20 # fail fast: require at least this many closed trades + data_integrity_audit: + min_overlap_ratio: 0.99 # min shared timestamps between source and reference_source + max_median_ohlc_diff_bps: 5.0 # median OHLC drift tolerance (bps) + max_p95_ohlc_diff_bps: 20.0 # tail OHLC drift tolerance (bps) outlier_dependency: slices: 5 # split trade history into N equal time-slices for diagnostics profit_share_threshold: 0.80 @@ -80,6 +84,7 @@ collections: # Stocks (large-cap growth) - name: stocks_large_cap_growth source: yfinance + reference_source: twelvedata # optional golden source for post-run data-integrity audit symbols: ["CNDX.L", "AAPL", "MSFT", "NVDA"] fees: 0.0005 # approx IBKR slippage: 0.0005 @@ -101,6 +106,7 @@ collections: # Crypto (Binance via ccxt) - name: crypto source: binance + reference_source: bybit # optional golden source; activates data_integrity_audit defaults if unset exchange: binance quote: USDT symbols: ["BTC/USDT", "ETH/USDT", "BNB/USDT", "SOL/USDT"] From 87881fc13100f832861f14851f96b2d8ea721507 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 20 Apr 2026 02:05:04 +0000 Subject: [PATCH 03/29] Fix audit reference collection self-reference --- src/backtest/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index efb3fab2..7c10c0bb 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4026,7 +4026,7 @@ def _data_integrity_audit_reference_collection(collection: CollectionConfig) -> name=collection.name, source=collection.reference_source, symbols=list(collection.symbols), - reference_source=collection.reference_source, + reference_source=None, exchange=collection.exchange, currency=collection.currency, quote=collection.quote, From cdddccf042606f3e9e5600721325003f3c6b4010 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 20 Apr 2026 02:15:31 +0000 Subject: [PATCH 04/29] Fix reference exchange resolution in data integrity audit --- src/backtest/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 7c10c0bb..e8abb61f 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4027,7 +4027,7 @@ def _data_integrity_audit_reference_collection(collection: CollectionConfig) -> source=collection.reference_source, symbols=list(collection.symbols), reference_source=None, - exchange=collection.exchange, + exchange=None, currency=collection.currency, quote=collection.quote, fees=collection.fees, From 40cbcebc416f294130dd2a62b4bbfa9b2e03b123 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 10:16:44 +0800 Subject: [PATCH 05/29] fix: active data_integrity gate tracking now on reference_source as enforced by the runer --- src/backtest/runner.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index efb3fab2..e7cd5ea7 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -618,7 +618,11 @@ def _active_optimization_gates(optimization: Any) -> set[str]: return {"optimization.feasibility"} @staticmethod - def _active_result_consistency_gates(result_consistency: Any) -> set[str]: + def _active_result_consistency_gates( + result_consistency: Any, + *, + has_reference_source: bool = False, + ) -> set[str]: if result_consistency is None: return set() active: set[str] = set() @@ -632,7 +636,8 @@ def _active_result_consistency_gates(result_consistency: Any) -> set[str]: active.add("result_consistency.execution_price_variance") if getattr(result_consistency, "lookahead_shuffle_test", None) is not None: active.add("result_consistency.lookahead_shuffle_test") - if getattr(result_consistency, "data_integrity_audit", None) is not None: + # Data integrity audit activation is collection-scoped via reference_source. + if has_reference_source: active.add("result_consistency.data_integrity_audit") if getattr(result_consistency, "transaction_cost_robustness", None) is not None: active.add("result_consistency.transaction_cost_robustness") @@ -650,7 +655,10 @@ def _build_validation_metadata(self) -> dict[str, Any]: collection_active = self._active_data_quality_gates(collection_dq) collection_active.update(self._active_optimization_gates(collection_optimization)) collection_active.update( - self._active_result_consistency_gates(collection_result_consistency) + self._active_result_consistency_gates( + collection_result_consistency, + has_reference_source=bool(getattr(collection, "reference_source", None)), + ) ) active_gates_union.update(collection_active) collection_profiles.append( From 320f99d043aefe07bb665792e745438508fc5e25 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 10:26:11 +0800 Subject: [PATCH 06/29] perf: cache data integrity audit per job --- src/backtest/runner.py | 27 ++++++++++++++++++++- tests/test_backtest_runner.py | 45 +++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 915166d3..ddeca37b 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -281,6 +281,10 @@ def __init__( self._runtime_signal_error_counts: dict[tuple[str, str, str, str], int] = {} self._runtime_signal_error_capped: set[tuple[str, str, str, str]] = set() self._strategy_fingerprint_cache: dict[type[BaseStrategy], str] = {} + self._data_integrity_audit_cache: dict[ + tuple[str, str, str, str, str], + tuple[str | None, dict[str, Any]], + ] = {} self.validation_metadata: dict[str, Any] = {} self.active_validation_gates: list[str] = [] self.inactive_validation_gates: list[str] = [] @@ -3999,11 +4003,31 @@ def _run_data_integrity_audit_validation( policy = self._load_data_integrity_audit_policy(context.job.collection) if policy is None: return - audit_reason, audit_meta = self._data_integrity_audit_result(context, policy) + cache_key = self._data_integrity_audit_cache_key(context, policy) + cached = self._data_integrity_audit_cache.get(cache_key) + if cached is None: + audit_reason, audit_meta = self._data_integrity_audit_result(context, policy) + self._data_integrity_audit_cache[cache_key] = (audit_reason, copy.deepcopy(audit_meta)) + else: + audit_reason, cached_meta = cached + audit_meta = copy.deepcopy(cached_meta) self._attach_post_run_meta(outcome, "data_integrity_audit", audit_meta) if audit_reason is not None: reasons.append(audit_reason) + @staticmethod + def _data_integrity_audit_cache_key( + context: ValidationContext, + _policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[str, str, str, str, str]: + return ( + context.job.collection.name, + context.job.symbol, + context.job.timeframe, + str(context.job.collection.source), + str(context.job.collection.reference_source), + ) + @staticmethod def _data_integrity_audit_indeterminate( reason: str, @@ -4394,6 +4418,7 @@ def run_all(self, only_cached: bool = False) -> list[BestResult]: self._evaluation_cache_write_failures = 0 self._runtime_signal_error_counts = {} self._runtime_signal_error_capped = set() + self._data_integrity_audit_cache = {} self._evaluator = None self._strategy_overrides = ( {s.name: s.params for s in self.cfg.strategies} if self.cfg.strategies else {} diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index ead81c35..c532c070 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -3382,6 +3382,51 @@ def test_run_all_data_integrity_audit_rejects_on_low_overlap(tmp_path, monkeypat assert "overlap_ratio_below_threshold" in failure["error"] +def test_run_all_data_integrity_audit_reuses_job_level_cache_across_strategies(tmp_path, monkeypatch): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + + class _AltStrategy(BaseStrategy): + name = "alt" + + def param_grid(self) -> dict[str, list[int]]: + return {} + + def generate_signals(self, df: pd.DataFrame, params: dict) -> tuple[pd.Series, pd.Series]: + entries = pd.Series([True] + [False] * (len(df.index) - 1), index=df.index) + exits = pd.Series([False] * (len(df.index) - 1) + [True], index=df.index) + return entries, exits + + runner.external_index = {"dummy": _DummyStrategy, "alt": _AltStrategy} + primary = _make_trending_ohlcv(30) + reference = primary.copy() + fetch_counts = {"primary": 0, "reference": 0} + + class _Source: + def __init__(self, df: pd.DataFrame, *, is_reference: bool): + self._df = df + self._is_reference = is_reference + + def fetch(self, symbol, timeframe, only_cached=False): + key = "reference" if self._is_reference else "primary" + fetch_counts[key] += 1 + return self._df.copy() + + def _make_source(self, col): + if col.source == "alphavantage": + return _Source(reference, is_reference=True) + return _Source(primary, is_reference=False) + + monkeypatch.setattr(BacktestRunner, "_make_source", _make_source) + _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert len(results) == 2 + assert fetch_counts["primary"] == 1 + assert fetch_counts["reference"] == 1 + + def test_transaction_cost_robustness_result_attaches_meta_without_cache_pollution( tmp_path, monkeypatch ): From bd4939b53ad48002502f0a45353a4c5115a36b31 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 10:31:52 +0800 Subject: [PATCH 07/29] refactor: reduce parser and audit complexity --- src/backtest/runner.py | 113 ++++++++++++++++++++++++++++++++--------- src/config.py | 46 ++++++++--------- 2 files changed, 109 insertions(+), 50 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index ddeca37b..cc7ababc 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4105,6 +4105,45 @@ def _data_integrity_audit_result( collection=context.job.collection, policy=policy, ) + reference_outcome = self._load_reference_frame_for_data_integrity(context, policy) + if not isinstance(reference_outcome[0], pd.DataFrame): + return reference_outcome + reference_df, reference_canonicalization = reference_outcome + primary_df = validated_data.raw_df + invalid_input = self._validate_data_integrity_inputs(primary_df, reference_df, context, policy) + if invalid_input is not None: + return invalid_input + overlap_details = self._data_integrity_overlap_details(primary_df, reference_df) + divergence = overlap_details["divergence"] + threshold_details = self._data_integrity_threshold_details(policy) + failed_checks = self._data_integrity_failed_checks(overlap_details, divergence, threshold_details) + meta: dict[str, Any] = { + "is_complete": True, + "status": "complete", + "source": context.job.collection.source, + "reference_source": context.job.collection.reference_source, + "primary_bars": overlap_details["primary_bars"], + "reference_bars": overlap_details["reference_bars"], + "overlap_bars": overlap_details["overlap_bars"], + "overlap_ratio": overlap_details["overlap_ratio"], + "missing_primary_bar_pct": overlap_details["missing_primary_bar_pct"], + "min_overlap_ratio": threshold_details["min_overlap_ratio"], + "max_median_ohlc_diff_bps": threshold_details["max_median_ohlc_diff_bps"], + "max_p95_ohlc_diff_bps": threshold_details["max_p95_ohlc_diff_bps"], + "reference_canonicalization": reference_canonicalization, + **divergence, + "failed_checks": list(failed_checks), + } + if failed_checks: + reason = "data_integrity_audit_failed(" + "; ".join(failed_checks) + ")" + return reason, meta + return None, meta + + def _load_reference_frame_for_data_integrity( + self, + context: ValidationContext, + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[pd.DataFrame, dict[str, int]] | tuple[str | None, dict[str, Any]]: reference_collection = self._data_integrity_audit_reference_collection(context.job.collection) if reference_collection is None: return self._data_integrity_audit_indeterminate( @@ -4129,7 +4168,15 @@ def _data_integrity_audit_result( policy=policy, details={"error": str(exc)}, ) - primary_df = validated_data.raw_df + return reference_df, reference_canonicalization + + def _validate_data_integrity_inputs( + self, + primary_df: pd.DataFrame, + reference_df: pd.DataFrame, + context: ValidationContext, + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[str | None, dict[str, Any]] | None: if primary_df.empty or reference_df.empty: return self._data_integrity_audit_indeterminate( "empty_frame", @@ -4153,6 +4200,14 @@ def _data_integrity_audit_result( policy=policy, details={"missing_columns": missing_columns}, ) + return None + + def _data_integrity_overlap_details( + self, + primary_df: pd.DataFrame, + reference_df: pd.DataFrame, + ) -> dict[str, Any]: + required_columns = ["Open", "High", "Low", "Close"] overlap_index = primary_df.index.intersection(reference_df.index) primary_bars = int(len(primary_df)) reference_bars = int(len(reference_df)) @@ -4162,10 +4217,36 @@ def _data_integrity_audit_result( overlap_primary = primary_df.loc[overlap_index, required_columns] overlap_reference = reference_df.loc[overlap_index, required_columns] divergence = self._data_integrity_ohlc_diff_metrics(overlap_primary, overlap_reference) - max_median = float(policy.max_median_ohlc_diff_bps or 0.0) - max_p95 = float(policy.max_p95_ohlc_diff_bps or 0.0) - min_overlap = float(policy.min_overlap_ratio or 0.0) + return { + "primary_bars": primary_bars, + "reference_bars": reference_bars, + "overlap_bars": overlap_bars, + "overlap_ratio": overlap_ratio, + "missing_primary_bar_pct": missing_primary_bar_pct, + "divergence": divergence, + } + + @staticmethod + def _data_integrity_threshold_details( + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> dict[str, float]: + return { + "max_median_ohlc_diff_bps": float(policy.max_median_ohlc_diff_bps or 0.0), + "max_p95_ohlc_diff_bps": float(policy.max_p95_ohlc_diff_bps or 0.0), + "min_overlap_ratio": float(policy.min_overlap_ratio or 0.0), + } + + @staticmethod + def _data_integrity_failed_checks( + overlap_details: dict[str, Any], + divergence: dict[str, float], + thresholds: dict[str, float], + ) -> list[str]: failed_checks: list[str] = [] + overlap_ratio = float(overlap_details["overlap_ratio"]) + min_overlap = float(thresholds["min_overlap_ratio"]) + overlap_bars = int(overlap_details["overlap_bars"]) + primary_bars = int(overlap_details["primary_bars"]) if overlap_ratio < min_overlap: failed_checks.append( "overlap_ratio_below_threshold(" @@ -4173,38 +4254,20 @@ def _data_integrity_audit_result( f"primary_bars={primary_bars})" ) median_diff = divergence["median_ohlc_diff_bps"] + max_median = float(thresholds["max_median_ohlc_diff_bps"]) if np.isfinite(median_diff) and median_diff > max_median: failed_checks.append( "median_ohlc_diff_bps_exceeded(" f"max_allowed={max_median}, available={median_diff})" ) p95_diff = divergence["p95_ohlc_diff_bps"] + max_p95 = float(thresholds["max_p95_ohlc_diff_bps"]) if np.isfinite(p95_diff) and p95_diff > max_p95: failed_checks.append( "p95_ohlc_diff_bps_exceeded(" f"max_allowed={max_p95}, available={p95_diff})" ) - meta: dict[str, Any] = { - "is_complete": True, - "status": "complete", - "source": context.job.collection.source, - "reference_source": context.job.collection.reference_source, - "primary_bars": primary_bars, - "reference_bars": reference_bars, - "overlap_bars": overlap_bars, - "overlap_ratio": overlap_ratio, - "missing_primary_bar_pct": missing_primary_bar_pct, - "min_overlap_ratio": min_overlap, - "max_median_ohlc_diff_bps": max_median, - "max_p95_ohlc_diff_bps": max_p95, - "reference_canonicalization": reference_canonicalization, - **divergence, - "failed_checks": list(failed_checks), - } - if failed_checks: - reason = "data_integrity_audit_failed(" + "; ".join(failed_checks) + ")" - return reason, meta - return None, meta + return failed_checks def _run_transaction_cost_robustness_validation( self, diff --git a/src/config.py b/src/config.py index ed44324d..c43819e2 100644 --- a/src/config.py +++ b/src/config.py @@ -2246,19 +2246,15 @@ def _parse_result_consistency(raw: Any, prefix: str) -> ResultConsistencyConfig min_value=RESULT_CONSISTENCY_MIN_TRADES_MIN, ) - outlier_dependency_raw = parsed_raw.get("outlier_dependency") - if outlier_dependency_raw is not None and not isinstance(outlier_dependency_raw, dict): - raise ValueError(f"Invalid `{prefix}.outlier_dependency`: expected a mapping") - execution_price_variance_raw = parsed_raw.get("execution_price_variance") - if execution_price_variance_raw is not None and not isinstance(execution_price_variance_raw, dict): - raise ValueError(f"Invalid `{prefix}.execution_price_variance`: expected a mapping") + outlier_dependency_raw = _optional_mapping_field(parsed_raw, prefix, "outlier_dependency") + execution_price_variance_raw = _optional_mapping_field(parsed_raw, prefix, "execution_price_variance") outlier_dependency = ( _parse_result_consistency_outlier_dependency( outlier_dependency_raw, f"{prefix}.outlier_dependency", ) - if isinstance(outlier_dependency_raw, dict) + if outlier_dependency_raw is not None else None ) execution_price_variance = ( @@ -2266,45 +2262,36 @@ def _parse_result_consistency(raw: Any, prefix: str) -> ResultConsistencyConfig execution_price_variance_raw, f"{prefix}.execution_price_variance", ) - if isinstance(execution_price_variance_raw, dict) + if execution_price_variance_raw is not None else None ) - lookahead_shuffle_test_raw = parsed_raw.get("lookahead_shuffle_test") - if lookahead_shuffle_test_raw is not None and not isinstance(lookahead_shuffle_test_raw, dict): - raise ValueError(f"Invalid `{prefix}.lookahead_shuffle_test`: expected a mapping") + lookahead_shuffle_test_raw = _optional_mapping_field(parsed_raw, prefix, "lookahead_shuffle_test") lookahead_shuffle_test = ( _parse_lookahead_shuffle_test( lookahead_shuffle_test_raw, f"{prefix}.lookahead_shuffle_test", ) - if isinstance(lookahead_shuffle_test_raw, dict) + if lookahead_shuffle_test_raw is not None else None ) - data_integrity_audit_raw = parsed_raw.get("data_integrity_audit") - if data_integrity_audit_raw is not None and not isinstance(data_integrity_audit_raw, dict): - raise ValueError(f"Invalid `{prefix}.data_integrity_audit`: expected a mapping") + data_integrity_audit_raw = _optional_mapping_field(parsed_raw, prefix, "data_integrity_audit") data_integrity_audit = ( _parse_result_consistency_data_integrity_audit( data_integrity_audit_raw, f"{prefix}.data_integrity_audit", ) - if isinstance(data_integrity_audit_raw, dict) + if data_integrity_audit_raw is not None else None ) - transaction_cost_robustness_raw = parsed_raw.get("transaction_cost_robustness") - if ( - transaction_cost_robustness_raw is not None - and not isinstance(transaction_cost_robustness_raw, dict) - ): - raise ValueError( - f"Invalid `{prefix}.transaction_cost_robustness`: expected a mapping" - ) + transaction_cost_robustness_raw = _optional_mapping_field( + parsed_raw, prefix, "transaction_cost_robustness" + ) transaction_cost_robustness = ( _parse_result_consistency_transaction_cost_robustness( transaction_cost_robustness_raw, f"{prefix}.transaction_cost_robustness", ) - if isinstance(transaction_cost_robustness_raw, dict) + if transaction_cost_robustness_raw is not None else None ) @@ -2322,6 +2309,15 @@ def _parse_result_consistency(raw: Any, prefix: str) -> ResultConsistencyConfig ) +def _optional_mapping_field(raw: dict[str, Any], prefix: str, key: str) -> dict[str, Any] | None: + value = raw.get(key) + if value is None: + return None + if not isinstance(value, dict): + raise ValueError(f"Invalid `{prefix}.{key}`: expected a mapping") + return cast(dict[str, Any], value) + + def _parse_result_consistency_outlier_dependency( raw: Any, prefix: str ) -> ResultConsistencyOutlierDependencyConfig | None: From 72522c9f7551c97e9fe92caf5e9b87e45a55adfa Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 20 Apr 2026 02:47:35 +0000 Subject: [PATCH 08/29] Fix data integrity threshold fallbacks for None --- src/backtest/runner.py | 21 ++++++++++++++++++--- tests/test_backtest_runner.py | 20 ++++++++++++++++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index cc7ababc..25ccde44 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -19,6 +19,9 @@ from ..config import ( CollectionConfig, Config, + DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT, + DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT, + DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT, ResultConsistencyConfig, ResultConsistencyDataIntegrityAuditConfig, ResultConsistencyExecutionPriceVarianceConfig, @@ -4231,9 +4234,21 @@ def _data_integrity_threshold_details( policy: ResultConsistencyDataIntegrityAuditConfig, ) -> dict[str, float]: return { - "max_median_ohlc_diff_bps": float(policy.max_median_ohlc_diff_bps or 0.0), - "max_p95_ohlc_diff_bps": float(policy.max_p95_ohlc_diff_bps or 0.0), - "min_overlap_ratio": float(policy.min_overlap_ratio or 0.0), + "max_median_ohlc_diff_bps": float( + policy.max_median_ohlc_diff_bps + if policy.max_median_ohlc_diff_bps is not None + else DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT + ), + "max_p95_ohlc_diff_bps": float( + policy.max_p95_ohlc_diff_bps + if policy.max_p95_ohlc_diff_bps is not None + else DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT + ), + "min_overlap_ratio": float( + policy.min_overlap_ratio + if policy.min_overlap_ratio is not None + else DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT + ), } @staticmethod diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index c532c070..a03caffa 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -26,8 +26,12 @@ from src.config import ( CollectionConfig, Config, + DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT, + DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT, + DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT, OptimizationPolicyConfig, ResultConsistencyConfig, + ResultConsistencyDataIntegrityAuditConfig, ResultConsistencyExecutionPriceVarianceConfig, ResultConsistencyTransactionCostBreakevenConfig, ResultConsistencyTransactionCostRobustnessConfig, @@ -3427,6 +3431,22 @@ def _make_source(self, col): assert fetch_counts["reference"] == 1 +def test_data_integrity_threshold_details_uses_defaults_for_none_values(): + policy = ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=None, + max_median_ohlc_diff_bps=None, + max_p95_ohlc_diff_bps=None, + ) + + thresholds = BacktestRunner._data_integrity_threshold_details(policy) + + assert thresholds == { + "min_overlap_ratio": DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT, + "max_median_ohlc_diff_bps": DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT, + "max_p95_ohlc_diff_bps": DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT, + } + + def test_transaction_cost_robustness_result_attaches_meta_without_cache_pollution( tmp_path, monkeypatch ): From 9c7ee77d9ed06f7b11fbe1baa61de7c6bf8f3313 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 11:03:07 +0800 Subject: [PATCH 09/29] fix: preserve reference exchange for integrity audit source --- src/backtest/runner.py | 2 +- tests/test_backtest_runner.py | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index cc7ababc..eca9f1b2 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4059,7 +4059,7 @@ def _data_integrity_audit_reference_collection(collection: CollectionConfig) -> source=collection.reference_source, symbols=list(collection.symbols), reference_source=None, - exchange=None, + exchange=collection.exchange, currency=collection.currency, quote=collection.quote, fees=collection.fees, diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index c532c070..493e8f31 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -3427,6 +3427,28 @@ def _make_source(self, col): assert fetch_counts["reference"] == 1 +def test_data_integrity_reference_collection_preserves_exchange_for_reference_source(): + collection = CollectionConfig( + name="integration_crypto", + source="ccxt", + symbols=["BTC/USDT"], + reference_source="ccxt", + exchange="binance", + currency=None, + quote="USDT", + fees=0.0, + slippage=0.0, + validation=None, + ) + + reference = BacktestRunner._data_integrity_audit_reference_collection(collection) + + assert reference is not None + assert reference.source == "ccxt" + assert reference.exchange == "binance" + assert reference.reference_source is None + + def test_transaction_cost_robustness_result_attaches_meta_without_cache_pollution( tmp_path, monkeypatch ): From 0e49ab49907b88741b45c15b344108e485b36f59 Mon Sep 17 00:00:00 2001 From: AlexanderPietsch Date: Mon, 20 Apr 2026 10:04:29 +0700 Subject: [PATCH 10/29] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: AlexanderPietsch --- src/backtest/runner.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 25ccde44..f8055ef8 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4021,14 +4021,17 @@ def _run_data_integrity_audit_validation( @staticmethod def _data_integrity_audit_cache_key( context: ValidationContext, - _policy: ResultConsistencyDataIntegrityAuditConfig, - ) -> tuple[str, str, str, str, str]: + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[str, str, str, str, str, str, str, str]: return ( context.job.collection.name, context.job.symbol, context.job.timeframe, str(context.job.collection.source), str(context.job.collection.reference_source), + repr(policy.min_overlap_ratio), + repr(policy.max_median_ohlc_diff_bps), + repr(policy.max_p95_ohlc_diff_bps), ) @staticmethod From 6a094fcce5faf5b114b71ebddeb4287f47a1daf2 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 11:06:03 +0800 Subject: [PATCH 11/29] fix: normalize audit thresholds in indeterminate metadata --- src/backtest/runner.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index eca9f1b2..c6b71af9 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4036,15 +4036,16 @@ def _data_integrity_audit_indeterminate( policy: ResultConsistencyDataIntegrityAuditConfig, details: dict[str, Any] | None = None, ) -> tuple[str, dict[str, Any]]: + thresholds = BacktestRunner._data_integrity_threshold_details(policy) meta: dict[str, Any] = { "is_complete": False, "status": "indeterminate", "reason": reason, "source": collection.source, "reference_source": collection.reference_source, - "min_overlap_ratio": policy.min_overlap_ratio, - "max_median_ohlc_diff_bps": policy.max_median_ohlc_diff_bps, - "max_p95_ohlc_diff_bps": policy.max_p95_ohlc_diff_bps, + "min_overlap_ratio": thresholds["min_overlap_ratio"], + "max_median_ohlc_diff_bps": thresholds["max_median_ohlc_diff_bps"], + "max_p95_ohlc_diff_bps": thresholds["max_p95_ohlc_diff_bps"], } if details: meta.update(details) From a696bf0532ba5f9597605a51ea430599f2bd67f9 Mon Sep 17 00:00:00 2001 From: AlexanderPietsch Date: Mon, 20 Apr 2026 10:22:27 +0700 Subject: [PATCH 12/29] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: AlexanderPietsch --- src/backtest/runner.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index cb2df111..be3da7d8 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4048,10 +4048,8 @@ def _data_integrity_audit_indeterminate( "status": "indeterminate", "reason": reason, "source": collection.source, - "reference_source": collection.reference_source, - "min_overlap_ratio": thresholds["min_overlap_ratio"], - "max_median_ohlc_diff_bps": thresholds["max_median_ohlc_diff_bps"], - "max_p95_ohlc_diff_bps": thresholds["max_p95_ohlc_diff_bps"], + } + meta.update(Runner._data_integrity_threshold_details(policy)) } if details: meta.update(details) From 808d64859b0397af5f4aea6af322830f686bfaf1 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 11:25:34 +0800 Subject: [PATCH 13/29] fix: fix the previously bot commit --- src/backtest/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index be3da7d8..9c239c3b 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4049,8 +4049,8 @@ def _data_integrity_audit_indeterminate( "reason": reason, "source": collection.source, } - meta.update(Runner._data_integrity_threshold_details(policy)) - } + meta.update(BacktestRunner._data_integrity_threshold_details(policy)) + if details: meta.update(details) return f"data_integrity_audit_indeterminate(reason={reason})", meta From 165af243d52bf4f6fa7fee3e9081d11764f7cc49 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 11:25:50 +0800 Subject: [PATCH 14/29] fix: normalize data integrity audit cache key thresholds --- src/backtest/runner.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 9c239c3b..719e7928 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -285,7 +285,7 @@ def __init__( self._runtime_signal_error_capped: set[tuple[str, str, str, str]] = set() self._strategy_fingerprint_cache: dict[type[BaseStrategy], str] = {} self._data_integrity_audit_cache: dict[ - tuple[str, str, str, str, str], + tuple[str, str, str, str, str, float, float, float], tuple[str | None, dict[str, Any]], ] = {} self.validation_metadata: dict[str, Any] = {} @@ -4022,16 +4022,17 @@ def _run_data_integrity_audit_validation( def _data_integrity_audit_cache_key( context: ValidationContext, policy: ResultConsistencyDataIntegrityAuditConfig, - ) -> tuple[str, str, str, str, str, str, str, str]: + ) -> tuple[str, str, str, str, str, float, float, float]: + thresholds = BacktestRunner._data_integrity_threshold_details(policy) return ( context.job.collection.name, context.job.symbol, context.job.timeframe, str(context.job.collection.source), str(context.job.collection.reference_source), - repr(policy.min_overlap_ratio), - repr(policy.max_median_ohlc_diff_bps), - repr(policy.max_p95_ohlc_diff_bps), + thresholds["min_overlap_ratio"], + thresholds["max_median_ohlc_diff_bps"], + thresholds["max_p95_ohlc_diff_bps"], ) @staticmethod From 413826c4755b3bc48abdc2820631ae9fed794382 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 11:28:31 +0800 Subject: [PATCH 15/29] docs: clarify data integrity audit cache key semantics --- src/backtest/runner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 719e7928..bb7330b0 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -284,6 +284,9 @@ def __init__( self._runtime_signal_error_counts: dict[tuple[str, str, str, str], int] = {} self._runtime_signal_error_capped: set[tuple[str, str, str, str]] = set() self._strategy_fingerprint_cache: dict[type[BaseStrategy], str] = {} + # Cache data-integrity audit outcomes per job/source pair and effective + # threshold values. This preserves reuse across strategies while + # preventing stale hits when audit thresholds differ. self._data_integrity_audit_cache: dict[ tuple[str, str, str, str, str, float, float, float], tuple[str | None, dict[str, Any]], @@ -4023,6 +4026,7 @@ def _data_integrity_audit_cache_key( context: ValidationContext, policy: ResultConsistencyDataIntegrityAuditConfig, ) -> tuple[str, str, str, str, str, float, float, float]: + """Return cache key: job/source identity + normalized audit thresholds.""" thresholds = BacktestRunner._data_integrity_threshold_details(policy) return ( context.job.collection.name, @@ -4043,7 +4047,6 @@ def _data_integrity_audit_indeterminate( policy: ResultConsistencyDataIntegrityAuditConfig, details: dict[str, Any] | None = None, ) -> tuple[str, dict[str, Any]]: - thresholds = BacktestRunner._data_integrity_threshold_details(policy) meta: dict[str, Any] = { "is_complete": False, "status": "indeterminate", From a728241703d8f0162fc4c9fc411762cf45a4ca14 Mon Sep 17 00:00:00 2001 From: AlexanderPietsch Date: Mon, 20 Apr 2026 10:38:53 +0700 Subject: [PATCH 16/29] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: AlexanderPietsch --- src/backtest/runner.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index bb7330b0..eb31727b 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4160,9 +4160,14 @@ def _load_reference_frame_for_data_integrity( collection=context.job.collection, policy=policy, ) - _, _, _, _, _, _, _, _, _, _, calendar_timezone = self._load_data_quality_policy( - context.job.collection - ) + only_cached = bool(getattr(context, "only_cached", getattr(self, "only_cached", False))) + try: + reference_source = self._make_source(reference_collection) + reference_raw_df = reference_source.fetch( + context.job.symbol, + context.job.timeframe, + only_cached=only_cached, + ) try: reference_source = self._make_source(reference_collection) reference_raw_df = reference_source.fetch(context.job.symbol, context.job.timeframe, only_cached=False) From 91a84ee85fb64fe2411f0054269245cbfff76ffa Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 11:43:34 +0800 Subject: [PATCH 17/29] chore: extra lines --- tests/test_backtest_runner.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index 85cf9d89..8917dc2c 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -3451,6 +3451,8 @@ def test_data_integrity_reference_collection_preserves_exchange_for_reference_so assert reference.source == "ccxt" assert reference.exchange == "binance" assert reference.reference_source is None + + def test_data_integrity_threshold_details_uses_defaults_for_none_values(): policy = ResultConsistencyDataIntegrityAuditConfig( min_overlap_ratio=None, From e887eb11ebb6fe4f393a280ea4a006bfa9c67ab0 Mon Sep 17 00:00:00 2001 From: AlexanderPietsch Date: Mon, 20 Apr 2026 10:53:39 +0700 Subject: [PATCH 18/29] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: AlexanderPietsch --- src/backtest/runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index eb31727b..31311510 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4054,7 +4054,6 @@ def _data_integrity_audit_indeterminate( "source": collection.source, } meta.update(BacktestRunner._data_integrity_threshold_details(policy)) - if details: meta.update(details) return f"data_integrity_audit_indeterminate(reason={reason})", meta From 3260342689174aba0ee8f10074c8a74cf9ed3591 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 11:56:42 +0800 Subject: [PATCH 19/29] fix: honor cache-only mode in reference audit fetch --- src/backtest/runner.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 31311510..357f9283 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4159,7 +4159,10 @@ def _load_reference_frame_for_data_integrity( collection=context.job.collection, policy=policy, ) - only_cached = bool(getattr(context, "only_cached", getattr(self, "only_cached", False))) + only_cached = bool(getattr(context, "only_cached", self._run_only_cached)) + _, _, _, _, _, _, _, _, _, _, calendar_timezone = self._load_data_quality_policy( + context.job.collection + ) try: reference_source = self._make_source(reference_collection) reference_raw_df = reference_source.fetch( @@ -4167,9 +4170,6 @@ def _load_reference_frame_for_data_integrity( context.job.timeframe, only_cached=only_cached, ) - try: - reference_source = self._make_source(reference_collection) - reference_raw_df = reference_source.fetch(context.job.symbol, context.job.timeframe, only_cached=False) reference_df, reference_canonicalization = self._canonicalize_validation_frame( reference_raw_df, calendar_timezone=calendar_timezone, From 883c8baf531d5d5f7a1ee498d2b15c94914b5443 Mon Sep 17 00:00:00 2001 From: AlexanderPietsch Date: Mon, 20 Apr 2026 11:05:12 +0700 Subject: [PATCH 20/29] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: AlexanderPietsch --- src/backtest/runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 357f9283..355e8d4d 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4052,6 +4052,7 @@ def _data_integrity_audit_indeterminate( "status": "indeterminate", "reason": reason, "source": collection.source, + "reference_source": collection.reference_source, } meta.update(BacktestRunner._data_integrity_threshold_details(policy)) if details: From 110644c758974b63fda4d34766e40fd95297f638 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:07:30 +0800 Subject: [PATCH 21/29] fix: align integrity gate activation with resolved policy --- src/backtest/runner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 355e8d4d..8045fdc4 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -646,8 +646,9 @@ def _active_result_consistency_gates( active.add("result_consistency.execution_price_variance") if getattr(result_consistency, "lookahead_shuffle_test", None) is not None: active.add("result_consistency.lookahead_shuffle_test") - # Data integrity audit activation is collection-scoped via reference_source. - if has_reference_source: + # Data integrity audit activation is collection-scoped via reference_source + # and requires a resolved policy to keep gate reporting aligned with execution. + if has_reference_source and getattr(result_consistency, "data_integrity_audit", None) is not None: active.add("result_consistency.data_integrity_audit") if getattr(result_consistency, "transaction_cost_robustness", None) is not None: active.add("result_consistency.transaction_cost_robustness") From 5ea496ff1326f28c94bd974d2c73b3481187f6ec Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:11:37 +0800 Subject: [PATCH 22/29] fix: reintroduce run_only cached that got lost during merges --- src/backtest/runner.py | 4 ++- tests/test_backtest_runner.py | 47 +++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 8045fdc4..5552b592 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -295,6 +295,7 @@ def __init__( self.active_validation_gates: list[str] = [] self.inactive_validation_gates: list[str] = [] self._evaluator: BacktestEvaluator | None = None + self._run_only_cached = False def _ensure_pybroker(self) -> tuple[Any, ...]: if self._pybroker_components is None: @@ -4161,7 +4162,7 @@ def _load_reference_frame_for_data_integrity( collection=context.job.collection, policy=policy, ) - only_cached = bool(getattr(context, "only_cached", self._run_only_cached)) + only_cached = bool(getattr(context, "only_cached", getattr(self, "_run_only_cached", False))) _, _, _, _, _, _, _, _, _, _, calendar_timezone = self._load_data_quality_policy( context.job.collection ) @@ -4509,6 +4510,7 @@ def run_all(self, only_cached: bool = False) -> list[BestResult]: self._runtime_signal_error_counts = {} self._runtime_signal_error_capped = set() self._data_integrity_audit_cache = {} + self._run_only_cached = only_cached self._evaluator = None self._strategy_overrides = ( {s.name: s.params for s in self.cfg.strategies} if self.cfg.strategies else {} diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index 8917dc2c..6803aa97 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -3386,6 +3386,53 @@ def test_run_all_data_integrity_audit_rejects_on_low_overlap(tmp_path, monkeypat assert "overlap_ratio_below_threshold" in failure["error"] +def test_run_all_data_integrity_audit_indeterminate_rejects_and_attaches_meta( + tmp_path, monkeypatch +): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + + class _Source: + def __init__(self, df: pd.DataFrame | None, *, should_fail: bool): + self._df = df + self._should_fail = should_fail + + def fetch(self, symbol, timeframe, only_cached=False): + if self._should_fail: + raise RuntimeError("reference fetch boom") + assert self._df is not None + return self._df.copy() + + def _make_source(self, col): + if col.source == "alphavantage": + return _Source(None, should_fail=True) + return _Source(primary, should_fail=False) + + captured_meta: dict[str, object] = {} + original_attach = runner._attach_post_run_meta + + def _capture_attach(self, outcome, key, meta): + if key == "data_integrity_audit": + captured_meta.update(meta) + return original_attach(outcome, key, meta) + + monkeypatch.setattr(BacktestRunner, "_make_source", _make_source) + monkeypatch.setattr(runner, "_attach_post_run_meta", MethodType(_capture_attach, runner)) + _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert results == [] + assert len(runner.failures) == 1 + failure = runner.failures[0] + assert failure["stage"] == "strategy_validation" + assert "data_integrity_audit_indeterminate(reason=reference_fetch_failed)" in failure["error"] + assert captured_meta["status"] == "indeterminate" + assert captured_meta["is_complete"] is False + assert captured_meta["reason"] == "reference_fetch_failed" + + def test_run_all_data_integrity_audit_reuses_job_level_cache_across_strategies(tmp_path, monkeypatch): runner = _make_runner(tmp_path, monkeypatch, patch_source=False) runner.cfg.collections[0].reference_source = "alphavantage" From 1650b09a0a963d0d76d309f36baa81a991494b79 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:15:18 +0800 Subject: [PATCH 23/29] docs: clarify min_overlap_ratio overlap definition --- README.md | 2 +- config/example.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 196a48d5..7a3cce3f 100644 --- a/README.md +++ b/README.md @@ -257,7 +257,7 @@ See new collection examples under `config/collections/` for FX intraday via Finn - `data_integrity_audit` (optional thresholds module; gate is active when `collections[].reference_source` is set): - purpose: compare canonicalized bars from the primary `source` and a secondary `reference_source` to catch bad prints / ghost bars before accepting strategy results - - `min_overlap_ratio` (optional, default `0.99`, `0..1`): minimum timestamp overlap required between sources + - `min_overlap_ratio` (optional, default `0.99`, `0..1`): minimum fraction of primary-source bars that must have matching reference-source timestamps (`overlap_bars / primary_bars`) - `max_median_ohlc_diff_bps` (optional, default `5.0`, `>=0`): maximum allowed median OHLC drift (bps) - `max_p95_ohlc_diff_bps` (optional, default `20.0`, `>=0`): maximum allowed p95 OHLC drift (bps) - action: fixed to `reject_result` when overlap/drift thresholds are breached (or comparison is indeterminate) diff --git a/config/example.yaml b/config/example.yaml index 66418cda..e23b0811 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -55,7 +55,7 @@ validation: min_metric: 0.5 # fail fast: require at least this metric before expensive checks min_trades: 20 # fail fast: require at least this many closed trades data_integrity_audit: - min_overlap_ratio: 0.99 # min shared timestamps between source and reference_source + min_overlap_ratio: 0.99 # min fraction of primary bars covered by reference timestamps (overlap_bars / primary_bars) max_median_ohlc_diff_bps: 5.0 # median OHLC drift tolerance (bps) max_p95_ohlc_diff_bps: 20.0 # tail OHLC drift tolerance (bps) outlier_dependency: From e94964c282de2378da759f1f319db940c9e21abe Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:20:03 +0800 Subject: [PATCH 24/29] fix: sanitize non-finite audit divergence metadata --- src/backtest/runner.py | 14 +++++++++++++- tests/test_backtest_runner.py | 16 ++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 5552b592..95a10e2e 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4126,6 +4126,9 @@ def _data_integrity_audit_result( return invalid_input overlap_details = self._data_integrity_overlap_details(primary_df, reference_df) divergence = overlap_details["divergence"] + # Keep raw divergence values for gate checks; sanitize only metadata payload + # so persisted/report JSON remains strict and downstream-safe. + divergence_meta = self._sanitize_non_finite_metrics_for_json(divergence) threshold_details = self._data_integrity_threshold_details(policy) failed_checks = self._data_integrity_failed_checks(overlap_details, divergence, threshold_details) meta: dict[str, Any] = { @@ -4142,7 +4145,7 @@ def _data_integrity_audit_result( "max_median_ohlc_diff_bps": threshold_details["max_median_ohlc_diff_bps"], "max_p95_ohlc_diff_bps": threshold_details["max_p95_ohlc_diff_bps"], "reference_canonicalization": reference_canonicalization, - **divergence, + **divergence_meta, "failed_checks": list(failed_checks), } if failed_checks: @@ -4242,6 +4245,15 @@ def _data_integrity_overlap_details( "divergence": divergence, } + @staticmethod + def _sanitize_non_finite_metrics_for_json(values: dict[str, float]) -> dict[str, float | None]: + """Convert non-finite float metrics to JSON-safe nulls.""" + sanitized: dict[str, float | None] = {} + for key, value in values.items(): + parsed = float(value) + sanitized[key] = parsed if np.isfinite(parsed) else None + return sanitized + @staticmethod def _data_integrity_threshold_details( policy: ResultConsistencyDataIntegrityAuditConfig, diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index 6803aa97..829298a9 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -3516,6 +3516,22 @@ def test_data_integrity_threshold_details_uses_defaults_for_none_values(): } +def test_data_integrity_sanitize_non_finite_metrics_for_json_converts_to_none(): + values = { + "median_ohlc_diff_bps": float("nan"), + "p95_ohlc_diff_bps": float("inf"), + "max_ohlc_diff_bps": 1.25, + } + + sanitized = BacktestRunner._sanitize_non_finite_metrics_for_json(values) + + assert sanitized == { + "median_ohlc_diff_bps": None, + "p95_ohlc_diff_bps": None, + "max_ohlc_diff_bps": 1.25, + } + + def test_transaction_cost_robustness_result_attaches_meta_without_cache_pollution( tmp_path, monkeypatch ): From 36f0dac45d743048c3d77e603fa6fdb4e0f87c7a Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:36:33 +0800 Subject: [PATCH 25/29] fix: reject audit on non-finite divergence metrics --- src/backtest/runner.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 95a10e2e..e9f9b376 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4129,6 +4129,25 @@ def _data_integrity_audit_result( # Keep raw divergence values for gate checks; sanitize only metadata payload # so persisted/report JSON remains strict and downstream-safe. divergence_meta = self._sanitize_non_finite_metrics_for_json(divergence) + non_finite_divergence = [ + name for name, value in divergence.items() if not np.isfinite(float(value)) + ] + if non_finite_divergence: + return self._data_integrity_audit_indeterminate( + "non_finite_divergence_metrics", + collection=context.job.collection, + policy=policy, + details={ + "non_finite_metrics": sorted(non_finite_divergence), + "primary_bars": overlap_details["primary_bars"], + "reference_bars": overlap_details["reference_bars"], + "overlap_bars": overlap_details["overlap_bars"], + "overlap_ratio": overlap_details["overlap_ratio"], + "missing_primary_bar_pct": overlap_details["missing_primary_bar_pct"], + "reference_canonicalization": reference_canonicalization, + **divergence_meta, + }, + ) threshold_details = self._data_integrity_threshold_details(policy) failed_checks = self._data_integrity_failed_checks(overlap_details, divergence, threshold_details) meta: dict[str, Any] = { From ba7123371d53e56a273c3f3250968c4deb9b44a1 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:39:34 +0800 Subject: [PATCH 26/29] test: cover non-finite divergence indeterminate audit path --- tests/test_backtest_runner.py | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index 829298a9..8fe6f33c 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -3433,6 +3433,44 @@ def _capture_attach(self, outcome, key, meta): assert captured_meta["reason"] == "reference_fetch_failed" +def test_run_all_data_integrity_audit_indeterminate_on_non_finite_divergence( + tmp_path, monkeypatch +): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + reference = primary.copy() + primary[["Open", "High", "Low", "Close"]] = np.nan + reference[["Open", "High", "Low", "Close"]] = np.nan + _patch_primary_and_reference_sources( + monkeypatch, + primary_df=primary, + reference_df=reference, + reference_source="alphavantage", + ) + captured_meta: dict[str, object] = {} + original_attach = runner._attach_post_run_meta + + def _capture_attach(self, outcome, key, meta): + if key == "data_integrity_audit": + captured_meta.update(meta) + return original_attach(outcome, key, meta) + + monkeypatch.setattr(runner, "_attach_post_run_meta", MethodType(_capture_attach, runner)) + _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert results == [] + assert len(runner.failures) == 1 + failure = runner.failures[0] + assert failure["stage"] == "strategy_validation" + assert "data_integrity_audit_indeterminate(reason=non_finite_divergence_metrics)" in failure["error"] + assert captured_meta["status"] == "indeterminate" + assert captured_meta["is_complete"] is False + assert captured_meta["reason"] == "non_finite_divergence_metrics" + + def test_run_all_data_integrity_audit_reuses_job_level_cache_across_strategies(tmp_path, monkeypatch): runner = _make_runner(tmp_path, monkeypatch, patch_source=False) runner.cfg.collections[0].reference_source = "alphavantage" From 21fb4705703e5d243234553c3f9fa31dc0f786e6 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:42:29 +0800 Subject: [PATCH 27/29] fix: avoid all-nan warnings in audit divergence metrics --- src/backtest/runner.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index e9f9b376..8396571b 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -4098,10 +4098,17 @@ def _data_integrity_ohlc_diff_metrics( "p95_ohlc_diff_bps": float("nan"), "max_ohlc_diff_bps": float("nan"), } + finite_diffs = all_diffs[np.isfinite(all_diffs)] + if finite_diffs.size == 0: + return { + "median_ohlc_diff_bps": float("nan"), + "p95_ohlc_diff_bps": float("nan"), + "max_ohlc_diff_bps": float("nan"), + } return { - "median_ohlc_diff_bps": float(np.nanmedian(all_diffs)), - "p95_ohlc_diff_bps": float(np.nanpercentile(all_diffs, 95)), - "max_ohlc_diff_bps": float(np.nanmax(all_diffs)), + "median_ohlc_diff_bps": float(np.median(finite_diffs)), + "p95_ohlc_diff_bps": float(np.percentile(finite_diffs, 95)), + "max_ohlc_diff_bps": float(np.max(finite_diffs)), } def _data_integrity_audit_result( From 858a4954af721271b7bad0b3694b380f43739560 Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:44:06 +0800 Subject: [PATCH 28/29] fix: adjust gate order flow for performance --- src/backtest/runner.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 8396571b..2077bb62 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -3959,12 +3959,12 @@ def _strategy_validate_results_common(self, context: ValidationContext) -> GateD reasons = self._collect_strategy_validation_reasons(context, outcome) if reasons: - # Fail fast on cheap result-consistency gates before expensive shuffle checks. + # Fail fast on result-consistency gates before expensive shuffle checks. return self._strategy_validation_reject_or_continue(reasons) - self._run_lookahead_shuffle_validation(context, plan, outcome, reasons) + self._run_data_integrity_audit_validation(context, outcome, reasons) if reasons: return self._strategy_validation_reject_or_continue(reasons) - self._run_data_integrity_audit_validation(context, outcome, reasons) + self._run_lookahead_shuffle_validation(context, plan, outcome, reasons) if reasons: return self._strategy_validation_reject_or_continue(reasons) self._run_transaction_cost_robustness_validation(context, plan, outcome, reasons) From e9e1075bf3b8d3a3861a0ff7dfe68133db63a60a Mon Sep 17 00:00:00 2001 From: Alexander Pietsch Date: Mon, 20 Apr 2026 12:54:38 +0800 Subject: [PATCH 29/29] feat: support reference_exchange in data integrity audit --- README.md | 8 ++++++++ config/example.yaml | 8 ++++++-- src/backtest/runner.py | 19 ++++++++++++------- src/config.py | 6 +++++- tests/test_backtest_runner.py | 23 +++++++++++++++++++++++ tests/test_config.py | 2 ++ 6 files changed, 56 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 7a3cce3f..0dedc8c6 100644 --- a/README.md +++ b/README.md @@ -257,6 +257,14 @@ See new collection examples under `config/collections/` for FX intraday via Finn - `data_integrity_audit` (optional thresholds module; gate is active when `collections[].reference_source` is set): - purpose: compare canonicalized bars from the primary `source` and a secondary `reference_source` to catch bad prints / ghost bars before accepting strategy results + - source routing: + - primary fetch uses `collections[].source` (+ `collections[].exchange` for ccxt) + - reference fetch uses `collections[].reference_source` + - for ccxt-vs-ccxt venue comparisons, set: + - `reference_source: ccxt` + - `reference_exchange: ` + - when `reference_source` is ccxt and `reference_exchange` is unset, the runner falls back to + `collections[].exchange` - `min_overlap_ratio` (optional, default `0.99`, `0..1`): minimum fraction of primary-source bars that must have matching reference-source timestamps (`overlap_bars / primary_bars`) - `max_median_ohlc_diff_bps` (optional, default `5.0`, `>=0`): maximum allowed median OHLC drift (bps) - `max_p95_ohlc_diff_bps` (optional, default `20.0`, `>=0`): maximum allowed p95 OHLC drift (bps) diff --git a/config/example.yaml b/config/example.yaml index e23b0811..f47184f9 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -105,9 +105,13 @@ collections: # Crypto (Binance via ccxt) - name: crypto - source: binance - reference_source: bybit # optional golden source; activates data_integrity_audit defaults if unset + source: ccxt + # For ccxt collections, `exchange` selects the primary venue adapter. + # Set `reference_source: ccxt` + `reference_exchange` to compare venues + # in data_integrity_audit (for example Binance vs Bybit). + reference_source: ccxt exchange: binance + reference_exchange: bybit quote: USDT symbols: ["BTC/USDT", "ETH/USDT", "BNB/USDT", "SOL/USDT"] fees: 0.0006 # approx Bybit/Binance taker diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 2077bb62..1ef2abc1 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -284,11 +284,11 @@ def __init__( self._runtime_signal_error_counts: dict[tuple[str, str, str, str], int] = {} self._runtime_signal_error_capped: set[tuple[str, str, str, str]] = set() self._strategy_fingerprint_cache: dict[type[BaseStrategy], str] = {} - # Cache data-integrity audit outcomes per job/source pair and effective - # threshold values. This preserves reuse across strategies while - # preventing stale hits when audit thresholds differ. + # Cache data-integrity audit outcomes per job/source+exchange identity + # and effective threshold values. This preserves reuse across strategies + # while preventing stale hits when source routing or thresholds differ. self._data_integrity_audit_cache: dict[ - tuple[str, str, str, str, str, float, float, float], + tuple[str, str, str, str, str, str, str, float, float, float], tuple[str | None, dict[str, Any]], ] = {} self.validation_metadata: dict[str, Any] = {} @@ -4027,8 +4027,8 @@ def _run_data_integrity_audit_validation( def _data_integrity_audit_cache_key( context: ValidationContext, policy: ResultConsistencyDataIntegrityAuditConfig, - ) -> tuple[str, str, str, str, str, float, float, float]: - """Return cache key: job/source identity + normalized audit thresholds.""" + ) -> tuple[str, str, str, str, str, str, str, float, float, float]: + """Return cache key: job/source+exchange identity + normalized thresholds.""" thresholds = BacktestRunner._data_integrity_threshold_details(policy) return ( context.job.collection.name, @@ -4036,6 +4036,8 @@ def _data_integrity_audit_cache_key( context.job.timeframe, str(context.job.collection.source), str(context.job.collection.reference_source), + str(context.job.collection.exchange), + str(context.job.collection.reference_exchange), thresholds["min_overlap_ratio"], thresholds["max_median_ohlc_diff_bps"], thresholds["max_p95_ohlc_diff_bps"], @@ -4055,6 +4057,8 @@ def _data_integrity_audit_indeterminate( "reason": reason, "source": collection.source, "reference_source": collection.reference_source, + "exchange": collection.exchange, + "reference_exchange": collection.reference_exchange, } meta.update(BacktestRunner._data_integrity_threshold_details(policy)) if details: @@ -4070,7 +4074,8 @@ def _data_integrity_audit_reference_collection(collection: CollectionConfig) -> source=collection.reference_source, symbols=list(collection.symbols), reference_source=None, - exchange=collection.exchange, + exchange=collection.reference_exchange or collection.exchange, + reference_exchange=None, currency=collection.currency, quote=collection.quote, fees=collection.fees, diff --git a/src/config.py b/src/config.py index c43819e2..be2d6391 100644 --- a/src/config.py +++ b/src/config.py @@ -20,10 +20,11 @@ class StrategyConfig: @dataclass class CollectionConfig: name: str - source: str # yfinance, ccxt, custom + source: str # yfinance, ccxt, custom (also supports ccxt exchange aliases like binance/bybit) symbols: list[str] reference_source: str | None = None exchange: str | None = None # for ccxt + reference_exchange: str | None = None # for ccxt reference_source currency: str | None = None quote: str | None = None # for ccxt symbols e.g., USDT fees: float | None = None @@ -2521,6 +2522,9 @@ def _parse_collections(raw_collections: Any) -> list[CollectionConfig]: collection_raw, "reference_source", normalize=False ), exchange=parse_optional_str(collection_raw, "exchange", normalize=False), + reference_exchange=parse_optional_str( + collection_raw, "reference_exchange", normalize=False + ), currency=parse_optional_str(collection_raw, "currency", normalize=False), quote=parse_optional_str(collection_raw, "quote", normalize=False), fees=( diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index 8fe6f33c..1be31b36 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -3538,6 +3538,29 @@ def test_data_integrity_reference_collection_preserves_exchange_for_reference_so assert reference.reference_source is None +def test_data_integrity_reference_collection_uses_reference_exchange_override(): + collection = CollectionConfig( + name="integration_crypto", + source="ccxt", + symbols=["BTC/USDT"], + reference_source="ccxt", + exchange="binance", + reference_exchange="bybit", + currency=None, + quote="USDT", + fees=0.0, + slippage=0.0, + validation=None, + ) + + reference = BacktestRunner._data_integrity_audit_reference_collection(collection) + + assert reference is not None + assert reference.source == "ccxt" + assert reference.exchange == "bybit" + assert reference.reference_source is None + + def test_data_integrity_threshold_details_uses_defaults_for_none_values(): policy = ResultConsistencyDataIntegrityAuditConfig( min_overlap_ratio=None, diff --git a/tests/test_config.py b/tests/test_config.py index a1bb0ef9..63ba8527 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -406,6 +406,7 @@ def test_load_config_reference_source_enables_data_integrity_audit_defaults(tmp_ - name: test source: yfinance reference_source: alphavantage + reference_exchange: bybit symbols: ['AAPL'] timeframes: ['1d'] metric: sharpe @@ -415,6 +416,7 @@ def test_load_config_reference_source_enables_data_integrity_audit_defaults(tmp_ cfg = load_config(path) assert cfg.collections[0].reference_source == "alphavantage" + assert cfg.collections[0].reference_exchange == "bybit" assert cfg.collections[0].validation is not None assert cfg.collections[0].validation.result_consistency is not None audit = cfg.collections[0].validation.result_consistency.data_integrity_audit