diff --git a/README.md b/README.md index 38fbd030..0dedc8c6 100644 --- a/README.md +++ b/README.md @@ -254,6 +254,22 @@ See new collection examples under `config/collections/` for FX intraday via Finn - collection-level overrides are supported via `collections[].validation.optimization` and are resolved against global `validation.optimization` during config loading. - `validation.result_consistency` controls strategy-result concentration checks: + - `data_integrity_audit` (optional thresholds module; gate is active when `collections[].reference_source` is set): + - purpose: compare canonicalized bars from the primary `source` and a secondary `reference_source` + to catch bad prints / ghost bars before accepting strategy results + - source routing: + - primary fetch uses `collections[].source` (+ `collections[].exchange` for ccxt) + - reference fetch uses `collections[].reference_source` + - for ccxt-vs-ccxt venue comparisons, set: + - `reference_source: ccxt` + - `reference_exchange: ` + - when `reference_source` is ccxt and `reference_exchange` is unset, the runner falls back to + `collections[].exchange` + - `min_overlap_ratio` (optional, default `0.99`, `0..1`): minimum fraction of primary-source bars that must have matching reference-source timestamps (`overlap_bars / primary_bars`) + - `max_median_ohlc_diff_bps` (optional, default `5.0`, `>=0`): maximum allowed median OHLC drift (bps) + - `max_p95_ohlc_diff_bps` (optional, default `20.0`, `>=0`): maximum allowed p95 OHLC drift (bps) + - action: fixed to `reject_result` when overlap/drift thresholds are breached (or comparison is indeterminate) + - diagnostics are attached under `post_run_meta.data_integrity_audit` - `outlier_dependency` (optional module; active when configured): - `slices` (required, `>=2`): number of equal time-slices used for diagnostics - `profit_share_threshold` (required, `0..1`) @@ -297,7 +313,7 @@ Structured logs reflect this directly via gate actions: - `data_validation_gate` can emit `skip_optimization` (job-level optimization disable). - `strategy_optimization_gate` can emit `baseline_only` (strategy-level baseline fallback) or `skip_job`. - `strategy_validation_gate` can emit `reject_result` for outlier dependency, - execution price variance, and lookahead shuffle testing. + execution price variance, lookahead shuffle testing, data integrity audit, and transaction-cost robustness. Numeric config parsing follows `src/config.py` coercion helpers: - numeric fields are strict types: use YAML numbers, not quoted numeric strings diff --git a/config/example.yaml b/config/example.yaml index f721821f..f47184f9 100644 --- a/config/example.yaml +++ b/config/example.yaml @@ -54,6 +54,10 @@ validation: result_consistency: min_metric: 0.5 # fail fast: require at least this metric before expensive checks min_trades: 20 # fail fast: require at least this many closed trades + data_integrity_audit: + min_overlap_ratio: 0.99 # min fraction of primary bars covered by reference timestamps (overlap_bars / primary_bars) + max_median_ohlc_diff_bps: 5.0 # median OHLC drift tolerance (bps) + max_p95_ohlc_diff_bps: 20.0 # tail OHLC drift tolerance (bps) outlier_dependency: slices: 5 # split trade history into N equal time-slices for diagnostics profit_share_threshold: 0.80 @@ -80,6 +84,7 @@ collections: # Stocks (large-cap growth) - name: stocks_large_cap_growth source: yfinance + reference_source: twelvedata # optional golden source for post-run data-integrity audit symbols: ["CNDX.L", "AAPL", "MSFT", "NVDA"] fees: 0.0005 # approx IBKR slippage: 0.0005 @@ -100,8 +105,13 @@ collections: # Crypto (Binance via ccxt) - name: crypto - source: binance + source: ccxt + # For ccxt collections, `exchange` selects the primary venue adapter. + # Set `reference_source: ccxt` + `reference_exchange` to compare venues + # in data_integrity_audit (for example Binance vs Bybit). + reference_source: ccxt exchange: binance + reference_exchange: bybit quote: USDT symbols: ["BTC/USDT", "ETH/USDT", "BNB/USDT", "SOL/USDT"] fees: 0.0006 # approx Bybit/Binance taker diff --git a/src/backtest/runner.py b/src/backtest/runner.py index 43916ead..1ef2abc1 100644 --- a/src/backtest/runner.py +++ b/src/backtest/runner.py @@ -19,7 +19,11 @@ from ..config import ( CollectionConfig, Config, + DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT, + DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT, + DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT, ResultConsistencyConfig, + ResultConsistencyDataIntegrityAuditConfig, ResultConsistencyExecutionPriceVarianceConfig, ResultConsistencyTransactionCostBreakevenConfig, ResultConsistencyTransactionCostRobustnessConfig, @@ -235,6 +239,7 @@ class BacktestRunner: "result_consistency.outlier_dependency", "result_consistency.execution_price_variance", "result_consistency.lookahead_shuffle_test", + "result_consistency.data_integrity_audit", "result_consistency.transaction_cost_robustness", ) @@ -279,10 +284,18 @@ def __init__( self._runtime_signal_error_counts: dict[tuple[str, str, str, str], int] = {} self._runtime_signal_error_capped: set[tuple[str, str, str, str]] = set() self._strategy_fingerprint_cache: dict[type[BaseStrategy], str] = {} + # Cache data-integrity audit outcomes per job/source+exchange identity + # and effective threshold values. This preserves reuse across strategies + # while preventing stale hits when source routing or thresholds differ. + self._data_integrity_audit_cache: dict[ + tuple[str, str, str, str, str, str, str, float, float, float], + tuple[str | None, dict[str, Any]], + ] = {} self.validation_metadata: dict[str, Any] = {} self.active_validation_gates: list[str] = [] self.inactive_validation_gates: list[str] = [] self._evaluator: BacktestEvaluator | None = None + self._run_only_cached = False def _ensure_pybroker(self) -> tuple[Any, ...]: if self._pybroker_components is None: @@ -451,6 +464,20 @@ def _serialize_lookahead_shuffle_test_profile( ), } + @staticmethod + def _serialize_data_integrity_audit_profile( + data_integrity_audit: Any, + ) -> dict[str, Any] | None: + if data_integrity_audit is None: + return None + return { + "min_overlap_ratio": getattr(data_integrity_audit, "min_overlap_ratio", None), + "max_median_ohlc_diff_bps": getattr( + data_integrity_audit, "max_median_ohlc_diff_bps", None + ), + "max_p95_ohlc_diff_bps": getattr(data_integrity_audit, "max_p95_ohlc_diff_bps", None), + } + @staticmethod def _serialize_transaction_cost_breakeven_profile( breakeven: Any, @@ -561,6 +588,9 @@ def _serialize_result_consistency_profile(result_consistency: Any) -> dict[str, "lookahead_shuffle_test": BacktestRunner._serialize_lookahead_shuffle_test_profile( getattr(result_consistency, "lookahead_shuffle_test", None) ), + "data_integrity_audit": BacktestRunner._serialize_data_integrity_audit_profile( + getattr(result_consistency, "data_integrity_audit", None) + ), "transaction_cost_robustness": BacktestRunner._serialize_transaction_cost_robustness_profile( getattr(result_consistency, "transaction_cost_robustness", None) ), @@ -599,7 +629,11 @@ def _active_optimization_gates(optimization: Any) -> set[str]: return {"optimization.feasibility"} @staticmethod - def _active_result_consistency_gates(result_consistency: Any) -> set[str]: + def _active_result_consistency_gates( + result_consistency: Any, + *, + has_reference_source: bool = False, + ) -> set[str]: if result_consistency is None: return set() active: set[str] = set() @@ -613,6 +647,10 @@ def _active_result_consistency_gates(result_consistency: Any) -> set[str]: active.add("result_consistency.execution_price_variance") if getattr(result_consistency, "lookahead_shuffle_test", None) is not None: active.add("result_consistency.lookahead_shuffle_test") + # Data integrity audit activation is collection-scoped via reference_source + # and requires a resolved policy to keep gate reporting aligned with execution. + if has_reference_source and getattr(result_consistency, "data_integrity_audit", None) is not None: + active.add("result_consistency.data_integrity_audit") if getattr(result_consistency, "transaction_cost_robustness", None) is not None: active.add("result_consistency.transaction_cost_robustness") return active @@ -629,7 +667,10 @@ def _build_validation_metadata(self) -> dict[str, Any]: collection_active = self._active_data_quality_gates(collection_dq) collection_active.update(self._active_optimization_gates(collection_optimization)) collection_active.update( - self._active_result_consistency_gates(collection_result_consistency) + self._active_result_consistency_gates( + collection_result_consistency, + has_reference_source=bool(getattr(collection, "reference_source", None)), + ) ) active_gates_union.update(collection_active) collection_profiles.append( @@ -1726,6 +1767,19 @@ def _load_lookahead_shuffle_test_policy( return getattr(resolved_rc, "lookahead_shuffle_test", None) return None + def _load_data_integrity_audit_policy( + self, collection: CollectionConfig + ) -> ResultConsistencyDataIntegrityAuditConfig | None: + if not collection.reference_source: + return None + collection_validation = getattr(collection, "validation", None) + resolved_rc: ResultConsistencyConfig | None = ( + getattr(collection_validation, "result_consistency", None) if collection_validation else None + ) + if resolved_rc is None: + return None + return getattr(resolved_rc, "data_integrity_audit", None) + def _load_transaction_cost_robustness_policy( self, collection: CollectionConfig ) -> ResultConsistencyTransactionCostRobustnessConfig | None: @@ -3905,7 +3959,10 @@ def _strategy_validate_results_common(self, context: ValidationContext) -> GateD reasons = self._collect_strategy_validation_reasons(context, outcome) if reasons: - # Fail fast on cheap result-consistency gates before expensive shuffle checks. + # Fail fast on result-consistency gates before expensive shuffle checks. + return self._strategy_validation_reject_or_continue(reasons) + self._run_data_integrity_audit_validation(context, outcome, reasons) + if reasons: return self._strategy_validation_reject_or_continue(reasons) self._run_lookahead_shuffle_validation(context, plan, outcome, reasons) if reasons: @@ -3945,6 +4002,344 @@ def _run_lookahead_shuffle_validation( if lookahead_reason is not None: reasons.append(lookahead_reason) + def _run_data_integrity_audit_validation( + self, + context: ValidationContext, + outcome: StrategyEvalOutcome, + reasons: list[str], + ) -> None: + policy = self._load_data_integrity_audit_policy(context.job.collection) + if policy is None: + return + cache_key = self._data_integrity_audit_cache_key(context, policy) + cached = self._data_integrity_audit_cache.get(cache_key) + if cached is None: + audit_reason, audit_meta = self._data_integrity_audit_result(context, policy) + self._data_integrity_audit_cache[cache_key] = (audit_reason, copy.deepcopy(audit_meta)) + else: + audit_reason, cached_meta = cached + audit_meta = copy.deepcopy(cached_meta) + self._attach_post_run_meta(outcome, "data_integrity_audit", audit_meta) + if audit_reason is not None: + reasons.append(audit_reason) + + @staticmethod + def _data_integrity_audit_cache_key( + context: ValidationContext, + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[str, str, str, str, str, str, str, float, float, float]: + """Return cache key: job/source+exchange identity + normalized thresholds.""" + thresholds = BacktestRunner._data_integrity_threshold_details(policy) + return ( + context.job.collection.name, + context.job.symbol, + context.job.timeframe, + str(context.job.collection.source), + str(context.job.collection.reference_source), + str(context.job.collection.exchange), + str(context.job.collection.reference_exchange), + thresholds["min_overlap_ratio"], + thresholds["max_median_ohlc_diff_bps"], + thresholds["max_p95_ohlc_diff_bps"], + ) + + @staticmethod + def _data_integrity_audit_indeterminate( + reason: str, + *, + collection: CollectionConfig, + policy: ResultConsistencyDataIntegrityAuditConfig, + details: dict[str, Any] | None = None, + ) -> tuple[str, dict[str, Any]]: + meta: dict[str, Any] = { + "is_complete": False, + "status": "indeterminate", + "reason": reason, + "source": collection.source, + "reference_source": collection.reference_source, + "exchange": collection.exchange, + "reference_exchange": collection.reference_exchange, + } + meta.update(BacktestRunner._data_integrity_threshold_details(policy)) + if details: + meta.update(details) + return f"data_integrity_audit_indeterminate(reason={reason})", meta + + @staticmethod + def _data_integrity_audit_reference_collection(collection: CollectionConfig) -> CollectionConfig | None: + if not collection.reference_source: + return None + return CollectionConfig( + name=collection.name, + source=collection.reference_source, + symbols=list(collection.symbols), + reference_source=None, + exchange=collection.reference_exchange or collection.exchange, + reference_exchange=None, + currency=collection.currency, + quote=collection.quote, + fees=collection.fees, + slippage=collection.slippage, + validation=collection.validation, + ) + + @staticmethod + def _data_integrity_ohlc_diff_metrics( + primary: pd.DataFrame, + reference: pd.DataFrame, + ) -> dict[str, float]: + eps = 1e-12 + columns = ["Open", "High", "Low", "Close"] + diffs: list[np.ndarray] = [] + for column in columns: + lhs = primary[column].to_numpy(dtype=float) + rhs = reference[column].to_numpy(dtype=float) + rel = np.abs(lhs - rhs) / np.maximum(np.abs(rhs), eps) + diffs.append(rel * 10000.0) + all_diffs = np.concatenate(diffs) if diffs else np.array([], dtype=float) + if all_diffs.size == 0: + return { + "median_ohlc_diff_bps": float("nan"), + "p95_ohlc_diff_bps": float("nan"), + "max_ohlc_diff_bps": float("nan"), + } + finite_diffs = all_diffs[np.isfinite(all_diffs)] + if finite_diffs.size == 0: + return { + "median_ohlc_diff_bps": float("nan"), + "p95_ohlc_diff_bps": float("nan"), + "max_ohlc_diff_bps": float("nan"), + } + return { + "median_ohlc_diff_bps": float(np.median(finite_diffs)), + "p95_ohlc_diff_bps": float(np.percentile(finite_diffs, 95)), + "max_ohlc_diff_bps": float(np.max(finite_diffs)), + } + + def _data_integrity_audit_result( + self, + context: ValidationContext, + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[str | None, dict[str, Any]]: + validated_data = context.validated_data + if validated_data is None: + return self._data_integrity_audit_indeterminate( + "missing_validated_data", + collection=context.job.collection, + policy=policy, + ) + reference_outcome = self._load_reference_frame_for_data_integrity(context, policy) + if not isinstance(reference_outcome[0], pd.DataFrame): + return reference_outcome + reference_df, reference_canonicalization = reference_outcome + primary_df = validated_data.raw_df + invalid_input = self._validate_data_integrity_inputs(primary_df, reference_df, context, policy) + if invalid_input is not None: + return invalid_input + overlap_details = self._data_integrity_overlap_details(primary_df, reference_df) + divergence = overlap_details["divergence"] + # Keep raw divergence values for gate checks; sanitize only metadata payload + # so persisted/report JSON remains strict and downstream-safe. + divergence_meta = self._sanitize_non_finite_metrics_for_json(divergence) + non_finite_divergence = [ + name for name, value in divergence.items() if not np.isfinite(float(value)) + ] + if non_finite_divergence: + return self._data_integrity_audit_indeterminate( + "non_finite_divergence_metrics", + collection=context.job.collection, + policy=policy, + details={ + "non_finite_metrics": sorted(non_finite_divergence), + "primary_bars": overlap_details["primary_bars"], + "reference_bars": overlap_details["reference_bars"], + "overlap_bars": overlap_details["overlap_bars"], + "overlap_ratio": overlap_details["overlap_ratio"], + "missing_primary_bar_pct": overlap_details["missing_primary_bar_pct"], + "reference_canonicalization": reference_canonicalization, + **divergence_meta, + }, + ) + threshold_details = self._data_integrity_threshold_details(policy) + failed_checks = self._data_integrity_failed_checks(overlap_details, divergence, threshold_details) + meta: dict[str, Any] = { + "is_complete": True, + "status": "complete", + "source": context.job.collection.source, + "reference_source": context.job.collection.reference_source, + "primary_bars": overlap_details["primary_bars"], + "reference_bars": overlap_details["reference_bars"], + "overlap_bars": overlap_details["overlap_bars"], + "overlap_ratio": overlap_details["overlap_ratio"], + "missing_primary_bar_pct": overlap_details["missing_primary_bar_pct"], + "min_overlap_ratio": threshold_details["min_overlap_ratio"], + "max_median_ohlc_diff_bps": threshold_details["max_median_ohlc_diff_bps"], + "max_p95_ohlc_diff_bps": threshold_details["max_p95_ohlc_diff_bps"], + "reference_canonicalization": reference_canonicalization, + **divergence_meta, + "failed_checks": list(failed_checks), + } + if failed_checks: + reason = "data_integrity_audit_failed(" + "; ".join(failed_checks) + ")" + return reason, meta + return None, meta + + def _load_reference_frame_for_data_integrity( + self, + context: ValidationContext, + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[pd.DataFrame, dict[str, int]] | tuple[str | None, dict[str, Any]]: + reference_collection = self._data_integrity_audit_reference_collection(context.job.collection) + if reference_collection is None: + return self._data_integrity_audit_indeterminate( + "missing_reference_source", + collection=context.job.collection, + policy=policy, + ) + only_cached = bool(getattr(context, "only_cached", getattr(self, "_run_only_cached", False))) + _, _, _, _, _, _, _, _, _, _, calendar_timezone = self._load_data_quality_policy( + context.job.collection + ) + try: + reference_source = self._make_source(reference_collection) + reference_raw_df = reference_source.fetch( + context.job.symbol, + context.job.timeframe, + only_cached=only_cached, + ) + reference_df, reference_canonicalization = self._canonicalize_validation_frame( + reference_raw_df, + calendar_timezone=calendar_timezone, + ) + except Exception as exc: + return self._data_integrity_audit_indeterminate( + "reference_fetch_failed", + collection=context.job.collection, + policy=policy, + details={"error": str(exc)}, + ) + return reference_df, reference_canonicalization + + def _validate_data_integrity_inputs( + self, + primary_df: pd.DataFrame, + reference_df: pd.DataFrame, + context: ValidationContext, + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> tuple[str | None, dict[str, Any]] | None: + if primary_df.empty or reference_df.empty: + return self._data_integrity_audit_indeterminate( + "empty_frame", + collection=context.job.collection, + policy=policy, + details={ + "primary_bars": int(len(primary_df)), + "reference_bars": int(len(reference_df)), + }, + ) + required_columns = ["Open", "High", "Low", "Close"] + missing_columns = [ + name + for name in required_columns + if name not in primary_df.columns or name not in reference_df.columns + ] + if missing_columns: + return self._data_integrity_audit_indeterminate( + "missing_ohlc_columns", + collection=context.job.collection, + policy=policy, + details={"missing_columns": missing_columns}, + ) + return None + + def _data_integrity_overlap_details( + self, + primary_df: pd.DataFrame, + reference_df: pd.DataFrame, + ) -> dict[str, Any]: + required_columns = ["Open", "High", "Low", "Close"] + overlap_index = primary_df.index.intersection(reference_df.index) + primary_bars = int(len(primary_df)) + reference_bars = int(len(reference_df)) + overlap_bars = int(len(overlap_index)) + overlap_ratio = float(overlap_bars / primary_bars) if primary_bars > 0 else 0.0 + missing_primary_bar_pct = float((1.0 - overlap_ratio) * 100.0) + overlap_primary = primary_df.loc[overlap_index, required_columns] + overlap_reference = reference_df.loc[overlap_index, required_columns] + divergence = self._data_integrity_ohlc_diff_metrics(overlap_primary, overlap_reference) + return { + "primary_bars": primary_bars, + "reference_bars": reference_bars, + "overlap_bars": overlap_bars, + "overlap_ratio": overlap_ratio, + "missing_primary_bar_pct": missing_primary_bar_pct, + "divergence": divergence, + } + + @staticmethod + def _sanitize_non_finite_metrics_for_json(values: dict[str, float]) -> dict[str, float | None]: + """Convert non-finite float metrics to JSON-safe nulls.""" + sanitized: dict[str, float | None] = {} + for key, value in values.items(): + parsed = float(value) + sanitized[key] = parsed if np.isfinite(parsed) else None + return sanitized + + @staticmethod + def _data_integrity_threshold_details( + policy: ResultConsistencyDataIntegrityAuditConfig, + ) -> dict[str, float]: + return { + "max_median_ohlc_diff_bps": float( + policy.max_median_ohlc_diff_bps + if policy.max_median_ohlc_diff_bps is not None + else DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT + ), + "max_p95_ohlc_diff_bps": float( + policy.max_p95_ohlc_diff_bps + if policy.max_p95_ohlc_diff_bps is not None + else DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT + ), + "min_overlap_ratio": float( + policy.min_overlap_ratio + if policy.min_overlap_ratio is not None + else DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT + ), + } + + @staticmethod + def _data_integrity_failed_checks( + overlap_details: dict[str, Any], + divergence: dict[str, float], + thresholds: dict[str, float], + ) -> list[str]: + failed_checks: list[str] = [] + overlap_ratio = float(overlap_details["overlap_ratio"]) + min_overlap = float(thresholds["min_overlap_ratio"]) + overlap_bars = int(overlap_details["overlap_bars"]) + primary_bars = int(overlap_details["primary_bars"]) + if overlap_ratio < min_overlap: + failed_checks.append( + "overlap_ratio_below_threshold(" + f"required={min_overlap}, available={overlap_ratio}, overlap_bars={overlap_bars}, " + f"primary_bars={primary_bars})" + ) + median_diff = divergence["median_ohlc_diff_bps"] + max_median = float(thresholds["max_median_ohlc_diff_bps"]) + if np.isfinite(median_diff) and median_diff > max_median: + failed_checks.append( + "median_ohlc_diff_bps_exceeded(" + f"max_allowed={max_median}, available={median_diff})" + ) + p95_diff = divergence["p95_ohlc_diff_bps"] + max_p95 = float(thresholds["max_p95_ohlc_diff_bps"]) + if np.isfinite(p95_diff) and p95_diff > max_p95: + failed_checks.append( + "p95_ohlc_diff_bps_exceeded(" + f"max_allowed={max_p95}, available={p95_diff})" + ) + return failed_checks + def _run_transaction_cost_robustness_validation( self, context: ValidationContext, @@ -4157,6 +4552,8 @@ def run_all(self, only_cached: bool = False) -> list[BestResult]: self._evaluation_cache_write_failures = 0 self._runtime_signal_error_counts = {} self._runtime_signal_error_capped = set() + self._data_integrity_audit_cache = {} + self._run_only_cached = only_cached self._evaluator = None self._strategy_overrides = ( {s.name: s.params for s in self.cfg.strategies} if self.cfg.strategies else {} diff --git a/src/config.py b/src/config.py index e8dd1bd9..be2d6391 100644 --- a/src/config.py +++ b/src/config.py @@ -20,9 +20,11 @@ class StrategyConfig: @dataclass class CollectionConfig: name: str - source: str # yfinance, ccxt, custom + source: str # yfinance, ccxt, custom (also supports ccxt exchange aliases like binance/bybit) symbols: list[str] + reference_source: str | None = None exchange: str | None = None # for ccxt + reference_exchange: str | None = None # for ccxt reference_source currency: str | None = None quote: str | None = None # for ccxt symbols e.g., USDT fees: float | None = None @@ -151,6 +153,13 @@ class ResultConsistencyTransactionCostRobustnessConfig: breakeven: ResultConsistencyTransactionCostBreakevenConfig | None = None +@dataclass +class ResultConsistencyDataIntegrityAuditConfig: + min_overlap_ratio: float | None = None + max_median_ohlc_diff_bps: float | None = None + max_p95_ohlc_diff_bps: float | None = None + + @dataclass class ResultConsistencyConfig: min_metric: float | None = None @@ -159,6 +168,7 @@ class ResultConsistencyConfig: execution_price_variance: ResultConsistencyExecutionPriceVarianceConfig | None = None lookahead_shuffle_test: ValidationLookaheadShuffleTestConfig | None = None transaction_cost_robustness: ResultConsistencyTransactionCostRobustnessConfig | None = None + data_integrity_audit: ResultConsistencyDataIntegrityAuditConfig | None = None @dataclass @@ -195,6 +205,10 @@ class Config: LOOKAHEAD_SHUFFLE_TEST_SEED_MIN = 0 LOOKAHEAD_SHUFFLE_TEST_FAILED_PERMUTATIONS_MIN = 0 LOOKAHEAD_SHUFFLE_TEST_CONFIG_PREFIX = "validation.result_consistency.lookahead_shuffle_test" +DATA_INTEGRITY_AUDIT_CONFIG_PREFIX = "validation.result_consistency.data_integrity_audit" +DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT = 0.99 +DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT = 5.0 +DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT = 20.0 TRANSACTION_COST_ROBUSTNESS_MODE_ANALYTICS = "analytics" TRANSACTION_COST_ROBUSTNESS_MODE_ENFORCE = "enforce" TRANSACTION_COST_ROBUSTNESS_MODES = { @@ -591,6 +605,99 @@ def _apply_result_consistency_execution_price_variance_defaults( ) +def _normalize_result_consistency_data_integrity_audit_config( + cfg: ResultConsistencyDataIntegrityAuditConfig | None, + prefix: str, +) -> ResultConsistencyDataIntegrityAuditConfig | None: + if cfg is None: + return None + min_overlap_ratio_raw = getattr(cfg, "min_overlap_ratio", None) + min_overlap_ratio = ( + _coerce_float(min_overlap_ratio_raw, f"{prefix}.min_overlap_ratio") + if min_overlap_ratio_raw is not None + else None + ) + if min_overlap_ratio is not None and not ( + VALIDATION_PROBABILITY_MIN <= min_overlap_ratio <= VALIDATION_PROBABILITY_MAX + ): + raise ValueError( + f"`{prefix}.min_overlap_ratio` must be between {VALIDATION_PROBABILITY_MIN} and " + f"{VALIDATION_PROBABILITY_MAX}" + ) + max_median_ohlc_diff_bps_raw = getattr(cfg, "max_median_ohlc_diff_bps", None) + max_median_ohlc_diff_bps = ( + _coerce_float(max_median_ohlc_diff_bps_raw, f"{prefix}.max_median_ohlc_diff_bps") + if max_median_ohlc_diff_bps_raw is not None + else None + ) + if ( + max_median_ohlc_diff_bps is not None + and max_median_ohlc_diff_bps < VALIDATION_NON_NEGATIVE_FLOAT_MIN + ): + raise ValueError( + f"`{prefix}.max_median_ohlc_diff_bps` must be >= {VALIDATION_NON_NEGATIVE_FLOAT_MIN}" + ) + max_p95_ohlc_diff_bps_raw = getattr(cfg, "max_p95_ohlc_diff_bps", None) + max_p95_ohlc_diff_bps = ( + _coerce_float(max_p95_ohlc_diff_bps_raw, f"{prefix}.max_p95_ohlc_diff_bps") + if max_p95_ohlc_diff_bps_raw is not None + else None + ) + if max_p95_ohlc_diff_bps is not None and max_p95_ohlc_diff_bps < VALIDATION_NON_NEGATIVE_FLOAT_MIN: + raise ValueError( + f"`{prefix}.max_p95_ohlc_diff_bps` must be >= {VALIDATION_NON_NEGATIVE_FLOAT_MIN}" + ) + if ( + max_median_ohlc_diff_bps is not None + and max_p95_ohlc_diff_bps is not None + and max_p95_ohlc_diff_bps < max_median_ohlc_diff_bps + ): + raise ValueError( + f"`{prefix}.max_p95_ohlc_diff_bps` must be >= `{prefix}.max_median_ohlc_diff_bps`" + ) + return ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=min_overlap_ratio, + max_median_ohlc_diff_bps=max_median_ohlc_diff_bps, + max_p95_ohlc_diff_bps=max_p95_ohlc_diff_bps, + ) + + +def _apply_result_consistency_data_integrity_audit_defaults( + cfg: ResultConsistencyDataIntegrityAuditConfig, +) -> ResultConsistencyDataIntegrityAuditConfig: + min_overlap_ratio = ( + cfg.min_overlap_ratio + if cfg.min_overlap_ratio is not None + else DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT + ) + max_median_ohlc_diff_bps = ( + cfg.max_median_ohlc_diff_bps + if cfg.max_median_ohlc_diff_bps is not None + else DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT + ) + max_p95_ohlc_diff_bps = ( + cfg.max_p95_ohlc_diff_bps + if cfg.max_p95_ohlc_diff_bps is not None + else DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT + ) + if max_p95_ohlc_diff_bps < max_median_ohlc_diff_bps: + raise ValueError( + f"`{DATA_INTEGRITY_AUDIT_CONFIG_PREFIX}.max_p95_ohlc_diff_bps` must be >= " + f"`{DATA_INTEGRITY_AUDIT_CONFIG_PREFIX}.max_median_ohlc_diff_bps`" + ) + return ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=min_overlap_ratio, + max_median_ohlc_diff_bps=max_median_ohlc_diff_bps, + max_p95_ohlc_diff_bps=max_p95_ohlc_diff_bps, + ) + + +def _default_data_integrity_audit_config() -> ResultConsistencyDataIntegrityAuditConfig: + return _apply_result_consistency_data_integrity_audit_defaults( + ResultConsistencyDataIntegrityAuditConfig() + ) + + def _normalize_transaction_cost_breakeven_config( cfg: ResultConsistencyTransactionCostBreakevenConfig | None, prefix: str, @@ -918,6 +1025,10 @@ def _normalize_result_consistency_config( getattr(cfg, "lookahead_shuffle_test", None), f"{prefix}.lookahead_shuffle_test", ) + data_integrity_audit = _normalize_result_consistency_data_integrity_audit_config( + getattr(cfg, "data_integrity_audit", None), + f"{prefix}.data_integrity_audit", + ) transaction_cost_robustness = _normalize_transaction_cost_robustness_config( getattr(cfg, "transaction_cost_robustness", None), f"{prefix}.transaction_cost_robustness", @@ -926,12 +1037,13 @@ def _normalize_result_consistency_config( outlier_dependency is None and execution_price_variance is None and lookahead_shuffle_test is None + and data_integrity_audit is None and transaction_cost_robustness is None ): raise ValueError( f"Invalid `{prefix}`: expected at least one configured module " "(`outlier_dependency`, `execution_price_variance`, `lookahead_shuffle_test`, " - "or `transaction_cost_robustness`)" + "`data_integrity_audit`, or `transaction_cost_robustness`)" ) min_metric_raw = getattr(cfg, "min_metric", None) min_metric = _coerce_float(min_metric_raw, f"{prefix}.min_metric") if min_metric_raw is not None else None @@ -945,6 +1057,7 @@ def _normalize_result_consistency_config( outlier_dependency=outlier_dependency, execution_price_variance=execution_price_variance, lookahead_shuffle_test=lookahead_shuffle_test, + data_integrity_audit=data_integrity_audit, transaction_cost_robustness=transaction_cost_robustness, ) @@ -971,6 +1084,11 @@ def _apply_result_consistency_defaults(cfg: ResultConsistencyConfig) -> ResultCo if cfg.lookahead_shuffle_test is not None else None ), + data_integrity_audit=( + _apply_result_consistency_data_integrity_audit_defaults(cfg.data_integrity_audit) + if cfg.data_integrity_audit is not None + else None + ), transaction_cost_robustness=( _apply_transaction_cost_robustness_defaults(cfg.transaction_cost_robustness) if cfg.transaction_cost_robustness is not None @@ -1060,6 +1178,10 @@ def _merge_result_consistency_config( getattr(base, "lookahead_shuffle_test", None), getattr(override, "lookahead_shuffle_test", None), ), + data_integrity_audit=_merge_result_consistency_data_integrity_audit_config( + getattr(base, "data_integrity_audit", None), + getattr(override, "data_integrity_audit", None), + ), transaction_cost_robustness=_merge_transaction_cost_robustness_config( getattr(base, "transaction_cost_robustness", None), getattr(override, "transaction_cost_robustness", None), @@ -1069,6 +1191,7 @@ def _merge_result_consistency_config( merged.outlier_dependency is None and merged.execution_price_variance is None and merged.lookahead_shuffle_test is None + and merged.data_integrity_audit is None and merged.transaction_cost_robustness is None ): return None @@ -1102,6 +1225,19 @@ def _merge_result_consistency_execution_price_variance_config( ) +def _merge_result_consistency_data_integrity_audit_config( + base: ResultConsistencyDataIntegrityAuditConfig | None, + override: ResultConsistencyDataIntegrityAuditConfig | None, +) -> ResultConsistencyDataIntegrityAuditConfig | None: + if base is None and override is None: + return None + return ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=_merged_field(base, override, "min_overlap_ratio"), + max_median_ohlc_diff_bps=_merged_field(base, override, "max_median_ohlc_diff_bps"), + max_p95_ohlc_diff_bps=_merged_field(base, override, "max_p95_ohlc_diff_bps"), + ) + + def _merge_transaction_cost_breakeven_config( base: ResultConsistencyTransactionCostBreakevenConfig | None, override: ResultConsistencyTransactionCostBreakevenConfig | None, @@ -1499,6 +1635,14 @@ def _apply_resolved_validation_to_collection( global_result_consistency, collection_validation.result_consistency if collection_validation else None, ) + # Special case: data-integrity audit activation is collection-scoped because + # `reference_source` exists only on CollectionConfig. Global validation can + # still define/override thresholds, but enabling the audit requires a + # collection-level reference source. + resolved_result_consistency = _ensure_reference_source_data_integrity_policy( + collection, + resolved_result_consistency, + ) if ( resolved_data_quality is None and resolved_optimization is None @@ -1512,6 +1656,39 @@ def _apply_resolved_validation_to_collection( ) +def _ensure_reference_source_data_integrity_policy( + collection: CollectionConfig, + resolved_result_consistency: ResultConsistencyConfig | None, +) -> ResultConsistencyConfig | None: + """Inject default data-integrity audit only when collection has a reference source. + + Thresholds/rules may come from global validation and collection overrides, + but the audit itself is only meaningful when a collection-level + `reference_source` exists. + """ + if not collection.reference_source: + return resolved_result_consistency + + base_policy = ( + resolved_result_consistency + if resolved_result_consistency is not None + else ResultConsistencyConfig() + ) + if getattr(base_policy, "data_integrity_audit", None) is not None: + return resolved_result_consistency + + with_default_audit = ResultConsistencyConfig( + min_metric=base_policy.min_metric, + min_trades=base_policy.min_trades, + outlier_dependency=base_policy.outlier_dependency, + execution_price_variance=base_policy.execution_price_variance, + lookahead_shuffle_test=base_policy.lookahead_shuffle_test, + transaction_cost_robustness=base_policy.transaction_cost_robustness, + data_integrity_audit=_default_data_integrity_audit_config(), + ) + return _merge_result_consistency_config(with_default_audit, None) + + def resolve_validation_overrides(cfg: Config) -> None: """Resolve effective collection-level validation policies. @@ -2070,19 +2247,15 @@ def _parse_result_consistency(raw: Any, prefix: str) -> ResultConsistencyConfig min_value=RESULT_CONSISTENCY_MIN_TRADES_MIN, ) - outlier_dependency_raw = parsed_raw.get("outlier_dependency") - if outlier_dependency_raw is not None and not isinstance(outlier_dependency_raw, dict): - raise ValueError(f"Invalid `{prefix}.outlier_dependency`: expected a mapping") - execution_price_variance_raw = parsed_raw.get("execution_price_variance") - if execution_price_variance_raw is not None and not isinstance(execution_price_variance_raw, dict): - raise ValueError(f"Invalid `{prefix}.execution_price_variance`: expected a mapping") + outlier_dependency_raw = _optional_mapping_field(parsed_raw, prefix, "outlier_dependency") + execution_price_variance_raw = _optional_mapping_field(parsed_raw, prefix, "execution_price_variance") outlier_dependency = ( _parse_result_consistency_outlier_dependency( outlier_dependency_raw, f"{prefix}.outlier_dependency", ) - if isinstance(outlier_dependency_raw, dict) + if outlier_dependency_raw is not None else None ) execution_price_variance = ( @@ -2090,34 +2263,36 @@ def _parse_result_consistency(raw: Any, prefix: str) -> ResultConsistencyConfig execution_price_variance_raw, f"{prefix}.execution_price_variance", ) - if isinstance(execution_price_variance_raw, dict) + if execution_price_variance_raw is not None else None ) - lookahead_shuffle_test_raw = parsed_raw.get("lookahead_shuffle_test") - if lookahead_shuffle_test_raw is not None and not isinstance(lookahead_shuffle_test_raw, dict): - raise ValueError(f"Invalid `{prefix}.lookahead_shuffle_test`: expected a mapping") + lookahead_shuffle_test_raw = _optional_mapping_field(parsed_raw, prefix, "lookahead_shuffle_test") lookahead_shuffle_test = ( _parse_lookahead_shuffle_test( lookahead_shuffle_test_raw, f"{prefix}.lookahead_shuffle_test", ) - if isinstance(lookahead_shuffle_test_raw, dict) + if lookahead_shuffle_test_raw is not None else None ) - transaction_cost_robustness_raw = parsed_raw.get("transaction_cost_robustness") - if ( - transaction_cost_robustness_raw is not None - and not isinstance(transaction_cost_robustness_raw, dict) - ): - raise ValueError( - f"Invalid `{prefix}.transaction_cost_robustness`: expected a mapping" + data_integrity_audit_raw = _optional_mapping_field(parsed_raw, prefix, "data_integrity_audit") + data_integrity_audit = ( + _parse_result_consistency_data_integrity_audit( + data_integrity_audit_raw, + f"{prefix}.data_integrity_audit", ) + if data_integrity_audit_raw is not None + else None + ) + transaction_cost_robustness_raw = _optional_mapping_field( + parsed_raw, prefix, "transaction_cost_robustness" + ) transaction_cost_robustness = ( _parse_result_consistency_transaction_cost_robustness( transaction_cost_robustness_raw, f"{prefix}.transaction_cost_robustness", ) - if isinstance(transaction_cost_robustness_raw, dict) + if transaction_cost_robustness_raw is not None else None ) @@ -2128,12 +2303,22 @@ def _parse_result_consistency(raw: Any, prefix: str) -> ResultConsistencyConfig outlier_dependency=outlier_dependency, execution_price_variance=execution_price_variance, lookahead_shuffle_test=lookahead_shuffle_test, + data_integrity_audit=data_integrity_audit, transaction_cost_robustness=transaction_cost_robustness, ), prefix, ) +def _optional_mapping_field(raw: dict[str, Any], prefix: str, key: str) -> dict[str, Any] | None: + value = raw.get(key) + if value is None: + return None + if not isinstance(value, dict): + raise ValueError(f"Invalid `{prefix}.{key}`: expected a mapping") + return cast(dict[str, Any], value) + + def _parse_result_consistency_outlier_dependency( raw: Any, prefix: str ) -> ResultConsistencyOutlierDependencyConfig | None: @@ -2184,6 +2369,36 @@ def _parse_result_consistency_execution_price_variance( ) +def _parse_result_consistency_data_integrity_audit( + raw: Any, + prefix: str, +) -> ResultConsistencyDataIntegrityAuditConfig | None: + if raw is None: + return None + parsed_raw = require_mapping(raw, prefix) + return ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=parse_optional_float( + parsed_raw, + prefix, + "min_overlap_ratio", + min_value=VALIDATION_PROBABILITY_MIN, + max_value=VALIDATION_PROBABILITY_MAX, + ), + max_median_ohlc_diff_bps=parse_optional_float( + parsed_raw, + prefix, + "max_median_ohlc_diff_bps", + min_value=VALIDATION_NON_NEGATIVE_FLOAT_MIN, + ), + max_p95_ohlc_diff_bps=parse_optional_float( + parsed_raw, + prefix, + "max_p95_ohlc_diff_bps", + min_value=VALIDATION_NON_NEGATIVE_FLOAT_MIN, + ), + ) + + def _parse_result_consistency_transaction_cost_breakeven( raw: Any, prefix: str ) -> ResultConsistencyTransactionCostBreakevenConfig | None: @@ -2303,7 +2518,13 @@ def _parse_collections(raw_collections: Any) -> list[CollectionConfig]: name=str(collection_raw["name"]).strip(), source=str(collection_raw["source"]).strip(), symbols=[str(symbol).strip() for symbol in symbols_raw], + reference_source=parse_optional_str( + collection_raw, "reference_source", normalize=False + ), exchange=parse_optional_str(collection_raw, "exchange", normalize=False), + reference_exchange=parse_optional_str( + collection_raw, "reference_exchange", normalize=False + ), currency=parse_optional_str(collection_raw, "currency", normalize=False), quote=parse_optional_str(collection_raw, "quote", normalize=False), fees=( diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py index 3e566265..1be31b36 100644 --- a/tests/test_backtest_runner.py +++ b/tests/test_backtest_runner.py @@ -26,8 +26,12 @@ from src.config import ( CollectionConfig, Config, + DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT, + DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT, + DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT, OptimizationPolicyConfig, ResultConsistencyConfig, + ResultConsistencyDataIntegrityAuditConfig, ResultConsistencyExecutionPriceVarianceConfig, ResultConsistencyTransactionCostBreakevenConfig, ResultConsistencyTransactionCostRobustnessConfig, @@ -292,6 +296,7 @@ def _result_consistency_config(**overrides) -> ResultConsistencyConfig: "outlier_dependency": None, "execution_price_variance": None, "lookahead_shuffle_test": None, + "data_integrity_audit": None, "transaction_cost_robustness": None, } payload.update(overrides) @@ -616,6 +621,11 @@ def test_serialize_result_consistency_profile_keeps_schema_and_key_order(): seed=1337, max_failed_permutations=2, ), + data_integrity_audit=SimpleNamespace( + min_overlap_ratio=0.99, + max_median_ohlc_diff_bps=5.0, + max_p95_ohlc_diff_bps=20.0, + ), transaction_cost_robustness=SimpleNamespace( mode="analytics", stress_multipliers=[2.0, 5.0], @@ -638,6 +648,7 @@ def test_serialize_result_consistency_profile_keeps_schema_and_key_order(): "outlier_dependency", "execution_price_variance", "lookahead_shuffle_test", + "data_integrity_audit", "transaction_cost_robustness", ] assert payload["min_metric"] == pytest.approx(0.5) @@ -654,6 +665,11 @@ def test_serialize_result_consistency_profile_keeps_schema_and_key_order(): "seed": 1337, "max_failed_permutations": 2, } + assert payload["data_integrity_audit"] == { + "min_overlap_ratio": 0.99, + "max_median_ohlc_diff_bps": 5.0, + "max_p95_ohlc_diff_bps": 20.0, + } assert payload["transaction_cost_robustness"] == { "mode": "analytics", "stress_multipliers": [2.0, 5.0], @@ -1851,6 +1867,28 @@ def fetch(self, symbol, timeframe, only_cached=False): monkeypatch.setattr(BacktestRunner, "_make_source", lambda self, col: _Source()) +def _patch_primary_and_reference_sources( + monkeypatch, + *, + primary_df: pd.DataFrame, + reference_df: pd.DataFrame, + reference_source: str = "alphavantage", +) -> None: + class _Source: + def __init__(self, df: pd.DataFrame): + self._df = df + + def fetch(self, symbol, timeframe, only_cached=False): + return self._df.copy() + + def _make_source(self, col): + if col.source == reference_source: + return _Source(reference_df) + return _Source(primary_df) + + monkeypatch.setattr(BacktestRunner, "_make_source", _make_source) + + def _lookahead_shuffle_test_config( *, permutations: int = 100, @@ -3274,6 +3312,287 @@ def test_run_all_lookahead_shuffle_test_does_not_mutate_cached_stats_payload( assert post_run_meta["lookahead_shuffle_test"]["is_complete"] is True +def test_run_all_data_integrity_audit_passes_and_attaches_meta(tmp_path, monkeypatch): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + reference = primary.copy() + _patch_primary_and_reference_sources( + monkeypatch, + primary_df=primary, + reference_df=reference, + reference_source="alphavantage", + ) + eval_calls = _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert len(results) == 1 + assert eval_calls["count"] == 2 + post_run_meta = results[0].stats.get("post_run_meta") + assert post_run_meta is not None + audit_meta = post_run_meta["data_integrity_audit"] + assert audit_meta["is_complete"] is True + assert audit_meta["status"] == "complete" + assert audit_meta["overlap_ratio"] == pytest.approx(1.0) + assert audit_meta["median_ohlc_diff_bps"] == pytest.approx(0.0) + assert "result_consistency.data_integrity_audit" in runner.active_validation_gates + + +def test_run_all_data_integrity_audit_rejects_on_ohlc_drift(tmp_path, monkeypatch): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + reference = primary.copy() + reference[["Open", "High", "Low", "Close"]] = reference[["Open", "High", "Low", "Close"]] * 1.02 + _patch_primary_and_reference_sources( + monkeypatch, + primary_df=primary, + reference_df=reference, + reference_source="alphavantage", + ) + eval_calls = _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert results == [] + assert eval_calls["count"] == 2 + assert len(runner.failures) == 1 + failure = runner.failures[0] + assert failure["stage"] == "strategy_validation" + assert "data_integrity_audit_failed(" in failure["error"] + assert "median_ohlc_diff_bps_exceeded" in failure["error"] + + +def test_run_all_data_integrity_audit_rejects_on_low_overlap(tmp_path, monkeypatch): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + reference = primary.iloc[::2].copy() + _patch_primary_and_reference_sources( + monkeypatch, + primary_df=primary, + reference_df=reference, + reference_source="alphavantage", + ) + + results = runner.run_all() + + assert results == [] + assert len(runner.failures) == 1 + failure = runner.failures[0] + assert failure["stage"] == "strategy_validation" + assert "data_integrity_audit_failed(" in failure["error"] + assert "overlap_ratio_below_threshold" in failure["error"] + + +def test_run_all_data_integrity_audit_indeterminate_rejects_and_attaches_meta( + tmp_path, monkeypatch +): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + + class _Source: + def __init__(self, df: pd.DataFrame | None, *, should_fail: bool): + self._df = df + self._should_fail = should_fail + + def fetch(self, symbol, timeframe, only_cached=False): + if self._should_fail: + raise RuntimeError("reference fetch boom") + assert self._df is not None + return self._df.copy() + + def _make_source(self, col): + if col.source == "alphavantage": + return _Source(None, should_fail=True) + return _Source(primary, should_fail=False) + + captured_meta: dict[str, object] = {} + original_attach = runner._attach_post_run_meta + + def _capture_attach(self, outcome, key, meta): + if key == "data_integrity_audit": + captured_meta.update(meta) + return original_attach(outcome, key, meta) + + monkeypatch.setattr(BacktestRunner, "_make_source", _make_source) + monkeypatch.setattr(runner, "_attach_post_run_meta", MethodType(_capture_attach, runner)) + _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert results == [] + assert len(runner.failures) == 1 + failure = runner.failures[0] + assert failure["stage"] == "strategy_validation" + assert "data_integrity_audit_indeterminate(reason=reference_fetch_failed)" in failure["error"] + assert captured_meta["status"] == "indeterminate" + assert captured_meta["is_complete"] is False + assert captured_meta["reason"] == "reference_fetch_failed" + + +def test_run_all_data_integrity_audit_indeterminate_on_non_finite_divergence( + tmp_path, monkeypatch +): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + primary = _make_trending_ohlcv(30) + reference = primary.copy() + primary[["Open", "High", "Low", "Close"]] = np.nan + reference[["Open", "High", "Low", "Close"]] = np.nan + _patch_primary_and_reference_sources( + monkeypatch, + primary_df=primary, + reference_df=reference, + reference_source="alphavantage", + ) + captured_meta: dict[str, object] = {} + original_attach = runner._attach_post_run_meta + + def _capture_attach(self, outcome, key, meta): + if key == "data_integrity_audit": + captured_meta.update(meta) + return original_attach(outcome, key, meta) + + monkeypatch.setattr(runner, "_attach_post_run_meta", MethodType(_capture_attach, runner)) + _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert results == [] + assert len(runner.failures) == 1 + failure = runner.failures[0] + assert failure["stage"] == "strategy_validation" + assert "data_integrity_audit_indeterminate(reason=non_finite_divergence_metrics)" in failure["error"] + assert captured_meta["status"] == "indeterminate" + assert captured_meta["is_complete"] is False + assert captured_meta["reason"] == "non_finite_divergence_metrics" + + +def test_run_all_data_integrity_audit_reuses_job_level_cache_across_strategies(tmp_path, monkeypatch): + runner = _make_runner(tmp_path, monkeypatch, patch_source=False) + runner.cfg.collections[0].reference_source = "alphavantage" + + class _AltStrategy(BaseStrategy): + name = "alt" + + def param_grid(self) -> dict[str, list[int]]: + return {} + + def generate_signals(self, df: pd.DataFrame, params: dict) -> tuple[pd.Series, pd.Series]: + entries = pd.Series([True] + [False] * (len(df.index) - 1), index=df.index) + exits = pd.Series([False] * (len(df.index) - 1) + [True], index=df.index) + return entries, exits + + runner.external_index = {"dummy": _DummyStrategy, "alt": _AltStrategy} + primary = _make_trending_ohlcv(30) + reference = primary.copy() + fetch_counts = {"primary": 0, "reference": 0} + + class _Source: + def __init__(self, df: pd.DataFrame, *, is_reference: bool): + self._df = df + self._is_reference = is_reference + + def fetch(self, symbol, timeframe, only_cached=False): + key = "reference" if self._is_reference else "primary" + fetch_counts[key] += 1 + return self._df.copy() + + def _make_source(self, col): + if col.source == "alphavantage": + return _Source(reference, is_reference=True) + return _Source(primary, is_reference=False) + + monkeypatch.setattr(BacktestRunner, "_make_source", _make_source) + _patch_pybroker_simulation(monkeypatch) + + results = runner.run_all() + + assert len(results) == 2 + assert fetch_counts["primary"] == 1 + assert fetch_counts["reference"] == 1 + + +def test_data_integrity_reference_collection_preserves_exchange_for_reference_source(): + collection = CollectionConfig( + name="integration_crypto", + source="ccxt", + symbols=["BTC/USDT"], + reference_source="ccxt", + exchange="binance", + currency=None, + quote="USDT", + fees=0.0, + slippage=0.0, + validation=None, + ) + + reference = BacktestRunner._data_integrity_audit_reference_collection(collection) + + assert reference is not None + assert reference.source == "ccxt" + assert reference.exchange == "binance" + assert reference.reference_source is None + + +def test_data_integrity_reference_collection_uses_reference_exchange_override(): + collection = CollectionConfig( + name="integration_crypto", + source="ccxt", + symbols=["BTC/USDT"], + reference_source="ccxt", + exchange="binance", + reference_exchange="bybit", + currency=None, + quote="USDT", + fees=0.0, + slippage=0.0, + validation=None, + ) + + reference = BacktestRunner._data_integrity_audit_reference_collection(collection) + + assert reference is not None + assert reference.source == "ccxt" + assert reference.exchange == "bybit" + assert reference.reference_source is None + + +def test_data_integrity_threshold_details_uses_defaults_for_none_values(): + policy = ResultConsistencyDataIntegrityAuditConfig( + min_overlap_ratio=None, + max_median_ohlc_diff_bps=None, + max_p95_ohlc_diff_bps=None, + ) + + thresholds = BacktestRunner._data_integrity_threshold_details(policy) + + assert thresholds == { + "min_overlap_ratio": DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT, + "max_median_ohlc_diff_bps": DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT, + "max_p95_ohlc_diff_bps": DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT, + } + + +def test_data_integrity_sanitize_non_finite_metrics_for_json_converts_to_none(): + values = { + "median_ohlc_diff_bps": float("nan"), + "p95_ohlc_diff_bps": float("inf"), + "max_ohlc_diff_bps": 1.25, + } + + sanitized = BacktestRunner._sanitize_non_finite_metrics_for_json(values) + + assert sanitized == { + "median_ohlc_diff_bps": None, + "p95_ohlc_diff_bps": None, + "max_ohlc_diff_bps": 1.25, + } + + def test_transaction_cost_robustness_result_attaches_meta_without_cache_pollution( tmp_path, monkeypatch ): diff --git a/tests/test_config.py b/tests/test_config.py index 047cb411..63ba8527 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -400,6 +400,77 @@ def test_load_config_lookahead_shuffle_test_defaults(tmp_path: Path): ) +def test_load_config_reference_source_enables_data_integrity_audit_defaults(tmp_path: Path): + config_text = """ +collections: + - name: test + source: yfinance + reference_source: alphavantage + reference_exchange: bybit + symbols: ['AAPL'] +timeframes: ['1d'] +metric: sharpe +""" + path = tmp_path / "config.yaml" + path.write_text(config_text) + + cfg = load_config(path) + assert cfg.collections[0].reference_source == "alphavantage" + assert cfg.collections[0].reference_exchange == "bybit" + assert cfg.collections[0].validation is not None + assert cfg.collections[0].validation.result_consistency is not None + audit = cfg.collections[0].validation.result_consistency.data_integrity_audit + assert audit is not None + assert audit.min_overlap_ratio == pytest.approx(0.99) + assert audit.max_median_ohlc_diff_bps == pytest.approx(5.0) + assert audit.max_p95_ohlc_diff_bps == pytest.approx(20.0) + + +def test_load_config_data_integrity_audit_rejects_non_mapping(tmp_path: Path): + with pytest.raises( + ValueError, + match=r"validation\.result_consistency\.data_integrity_audit", + ): + _load_from_blocks( + tmp_path, + validation_block={ + "result_consistency": _result_consistency_block(data_integrity_audit=True) + }, + ) + + +def test_load_config_data_integrity_audit_collection_override_inherits_global(tmp_path: Path): + config_text = """ +collections: + - name: test + source: yfinance + reference_source: alphavantage + symbols: ['AAPL'] + validation: + result_consistency: + data_integrity_audit: + min_overlap_ratio: 0.97 +timeframes: ['1d'] +metric: sharpe +validation: + result_consistency: + data_integrity_audit: + max_median_ohlc_diff_bps: 2.0 + max_p95_ohlc_diff_bps: 10.0 +""" + path = tmp_path / "config.yaml" + path.write_text(config_text) + + cfg = load_config(path) + assert cfg.collections[0].validation is not None + assert cfg.collections[0].validation.result_consistency is not None + audit = cfg.collections[0].validation.result_consistency.data_integrity_audit + assert audit is not None + assert audit.min_overlap_ratio == pytest.approx(0.97) + assert audit.max_median_ohlc_diff_bps == pytest.approx(2.0) + assert audit.max_p95_ohlc_diff_bps == pytest.approx(10.0) + + def test_load_config_transaction_cost_robustness_inherits_global_overrides(tmp_path: Path): cfg = _load_from_blocks( tmp_path,