Skip to content

Feat/vd 4355 data integrity audit#159

Open
AlexanderPietsch wants to merge 33 commits intodevfrom
feat/VD-4355-data-integrity-audit
Open

Feat/vd 4355 data integrity audit#159
AlexanderPietsch wants to merge 33 commits intodevfrom
feat/VD-4355-data-integrity-audit

Conversation

@AlexanderPietsch
Copy link
Copy Markdown
Contributor

@AlexanderPietsch AlexanderPietsch commented Apr 20, 2026

Summary

VD-4355: Data Integrity Audit

This branch introduces a new post-run data integrity audit gate and the supporting validation workflow.

Changes

  • Introduce result_consistency.data_integrity_audit as a new validation feature.
  • Enable audit activation via collection-level reference_source.
  • Add reference-vs-primary OHLC integrity checks (overlap + drift thresholds).
  • Add audit diagnostics in run outputs (post_run_meta) for pass/fail/indeterminate outcomes.
  • Add cache-aware behavior and policy-aware gate metadata for reliable operation.
  • Add and update tests covering the new audit behavior and edge cases

How to Test

  • Run targeted tests:
    • poetry run pytest -q tests/test_backtest_runner.py
    • poetry run pytest -q tests/test_config.py
  • Run integration-style backtest:
    • make run-integration"
  • Validate audit gate metadata in latest run:
    • jq '.validation.active_gates' reports/<latest-run>/summary.json
  • Validate result store metadata:
    • sqlite3 .cache/evaluation/result_store.sqlite "select run_id, active_gates_json from run_metadata order by created_at desc limit 1;"

Checklist (KISS)

  • Pre-commit passes locally (pre-commit run --all-files)
  • Tests added/updated where it makes sense (80% cov gate)
  • Docs/README updated if needed
  • No secrets committed; .env values are excluded
  • Backward compatibility considered (configs, CLI flags)

Related Issues/Links

  • Closes #VD-4355
  • References VD-4355 Data Integrity Audit

Note

Medium Risk
Adds a new result-rejection gate that performs additional data fetching and OHLC drift/overlap checks, which can change which strategy results are accepted and increases run-time/IO variability. Risk is contained to validation/config paths and is covered by new unit tests for pass/fail/indeterminate cases and caching.

Overview
RCA: Strategy results could be accepted even when the underlying bar data contained venue/provider-specific bad prints or timestamp gaps, because there was no cross-source integrity check gating results. This allowed silent data issues to propagate into optimization and reporting.

The Fix: Introduces validation.result_consistency.data_integrity_audit, activated per-collection via new collections[].reference_source/reference_exchange, to fetch/canonicalize a secondary reference frame and enforce overlap + OHLC drift thresholds (with indeterminate outcomes also rejecting). The runner caches audit outcomes per job/source-routing/thresholds, reports the gate in active_gates, and persists diagnostics under post_run_meta.data_integrity_audit; config loading/merging now normalizes defaults and validates the new module.

The Proof: Adds targeted tests in test_backtest_runner.py and test_config.py covering passing, failing (drift/overlap), indeterminate (fetch failure/non-finite metrics), exchange routing, defaults, and cache reuse, supporting the existing >80% coverage gate.

Telemetry Added: Audit diagnostics are emitted into persisted run metadata via post_run_meta.data_integrity_audit (status, thresholds, overlap stats, drift metrics, failed checks) and surfaced through validation gate reporting (result_consistency.data_integrity_audit).

Reviewed by Cursor Bugbot for commit e9e1075. Bugbot is set up for automated code reviews on this repo. Configure here.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a data_integrity_audit module to the backtesting validation pipeline. This feature allows for comparing primary data against a secondary reference_source to detect data issues like bad prints or ghost bars by evaluating timestamp overlap and OHLC drift. The implementation includes configuration schema updates, integration into the BacktestRunner strategy validation flow, and new unit tests. Review feedback identifies a logic error where the exchange field is incorrectly preserved when creating the reference collection, potentially causing data to be fetched from the wrong source. Additionally, a performance optimization is suggested to cache audit results per job to prevent redundant data fetching and processing during large parameter sweeps.

Comment thread src/backtest/runner.py
Comment thread src/backtest/runner.py
Comment thread src/backtest/runner.py Outdated
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Reference collection inherits wrong exchange for CCXT sources
    • The reference collection now clears exchange so CCXT references derive their exchange from reference_source (e.g., bybit) instead of inheriting the primary exchange.
Preview (cdddccf042)
diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -254,6 +254,14 @@
   - collection-level overrides are supported via `collections[].validation.optimization`
     and are resolved against global `validation.optimization` during config loading.
 - `validation.result_consistency` controls strategy-result concentration checks:
+  - `data_integrity_audit` (optional thresholds module; gate is active when `collections[].reference_source` is set):
+    - purpose: compare canonicalized bars from the primary `source` and a secondary `reference_source`
+      to catch bad prints / ghost bars before accepting strategy results
+    - `min_overlap_ratio` (optional, default `0.99`, `0..1`): minimum timestamp overlap required between sources
+    - `max_median_ohlc_diff_bps` (optional, default `5.0`, `>=0`): maximum allowed median OHLC drift (bps)
+    - `max_p95_ohlc_diff_bps` (optional, default `20.0`, `>=0`): maximum allowed p95 OHLC drift (bps)
+    - action: fixed to `reject_result` when overlap/drift thresholds are breached (or comparison is indeterminate)
+    - diagnostics are attached under `post_run_meta.data_integrity_audit`
   - `outlier_dependency` (optional module; active when configured):
     - `slices` (required, `>=2`): number of equal time-slices used for diagnostics
     - `profit_share_threshold` (required, `0..1`)
@@ -297,7 +305,7 @@
 - `data_validation_gate` can emit `skip_optimization` (job-level optimization disable).
 - `strategy_optimization_gate` can emit `baseline_only` (strategy-level baseline fallback) or `skip_job`.
 - `strategy_validation_gate` can emit `reject_result` for outlier dependency,
-  execution price variance, and lookahead shuffle testing.
+  execution price variance, lookahead shuffle testing, data integrity audit, and transaction-cost robustness.
 
 Numeric config parsing follows `src/config.py` coercion helpers:
 - numeric fields are strict types: use YAML numbers, not quoted numeric strings

diff --git a/config/example.yaml b/config/example.yaml
--- a/config/example.yaml
+++ b/config/example.yaml
@@ -54,6 +54,10 @@
   result_consistency:
     min_metric: 0.5  # fail fast: require at least this metric before expensive checks
     min_trades: 20  # fail fast: require at least this many closed trades
+    data_integrity_audit:
+      min_overlap_ratio: 0.99  # min shared timestamps between source and reference_source
+      max_median_ohlc_diff_bps: 5.0  # median OHLC drift tolerance (bps)
+      max_p95_ohlc_diff_bps: 20.0  # tail OHLC drift tolerance (bps)
     outlier_dependency:
       slices: 5  # split trade history into N equal time-slices for diagnostics
       profit_share_threshold: 0.80
@@ -80,6 +84,7 @@
   # Stocks (large-cap growth)
   - name: stocks_large_cap_growth
     source: yfinance
+    reference_source: twelvedata  # optional golden source for post-run data-integrity audit
     symbols: ["CNDX.L", "AAPL", "MSFT", "NVDA"]
     fees: 0.0005  # approx IBKR
     slippage: 0.0005
@@ -101,6 +106,7 @@
   # Crypto (Binance via ccxt)
   - name: crypto
     source: binance
+    reference_source: bybit  # optional golden source; activates data_integrity_audit defaults if unset
     exchange: binance
     quote: USDT
     symbols: ["BTC/USDT", "ETH/USDT", "BNB/USDT", "SOL/USDT"]

diff --git a/src/backtest/runner.py b/src/backtest/runner.py
--- a/src/backtest/runner.py
+++ b/src/backtest/runner.py
@@ -20,6 +20,7 @@
     CollectionConfig,
     Config,
     ResultConsistencyConfig,
+    ResultConsistencyDataIntegrityAuditConfig,
     ResultConsistencyExecutionPriceVarianceConfig,
     ResultConsistencyTransactionCostBreakevenConfig,
     ResultConsistencyTransactionCostRobustnessConfig,
@@ -235,6 +236,7 @@
         "result_consistency.outlier_dependency",
         "result_consistency.execution_price_variance",
         "result_consistency.lookahead_shuffle_test",
+        "result_consistency.data_integrity_audit",
         "result_consistency.transaction_cost_robustness",
     )
 
@@ -452,6 +454,20 @@
         }
 
     @staticmethod
+    def _serialize_data_integrity_audit_profile(
+        data_integrity_audit: Any,
+    ) -> dict[str, Any] | None:
+        if data_integrity_audit is None:
+            return None
+        return {
+            "min_overlap_ratio": getattr(data_integrity_audit, "min_overlap_ratio", None),
+            "max_median_ohlc_diff_bps": getattr(
+                data_integrity_audit, "max_median_ohlc_diff_bps", None
+            ),
+            "max_p95_ohlc_diff_bps": getattr(data_integrity_audit, "max_p95_ohlc_diff_bps", None),
+        }
+
+    @staticmethod
     def _serialize_transaction_cost_breakeven_profile(
         breakeven: Any,
     ) -> dict[str, Any] | None:
@@ -561,6 +577,9 @@
             "lookahead_shuffle_test": BacktestRunner._serialize_lookahead_shuffle_test_profile(
                 getattr(result_consistency, "lookahead_shuffle_test", None)
             ),
+            "data_integrity_audit": BacktestRunner._serialize_data_integrity_audit_profile(
+                getattr(result_consistency, "data_integrity_audit", None)
+            ),
             "transaction_cost_robustness": BacktestRunner._serialize_transaction_cost_robustness_profile(
                 getattr(result_consistency, "transaction_cost_robustness", None)
             ),
@@ -613,6 +632,8 @@
             active.add("result_consistency.execution_price_variance")
         if getattr(result_consistency, "lookahead_shuffle_test", None) is not None:
             active.add("result_consistency.lookahead_shuffle_test")
+        if getattr(result_consistency, "data_integrity_audit", None) is not None:
+            active.add("result_consistency.data_integrity_audit")
         if getattr(result_consistency, "transaction_cost_robustness", None) is not None:
             active.add("result_consistency.transaction_cost_robustness")
         return active
@@ -1726,6 +1747,19 @@
             return getattr(resolved_rc, "lookahead_shuffle_test", None)
         return None
 
+    def _load_data_integrity_audit_policy(
+        self, collection: CollectionConfig
+    ) -> ResultConsistencyDataIntegrityAuditConfig | None:
+        if not collection.reference_source:
+            return None
+        collection_validation = getattr(collection, "validation", None)
+        resolved_rc: ResultConsistencyConfig | None = (
+            getattr(collection_validation, "result_consistency", None) if collection_validation else None
+        )
+        if resolved_rc is None:
+            return None
+        return getattr(resolved_rc, "data_integrity_audit", None)
+
     def _load_transaction_cost_robustness_policy(
         self, collection: CollectionConfig
     ) -> ResultConsistencyTransactionCostRobustnessConfig | None:
@@ -3910,6 +3944,9 @@
         self._run_lookahead_shuffle_validation(context, plan, outcome, reasons)
         if reasons:
             return self._strategy_validation_reject_or_continue(reasons)
+        self._run_data_integrity_audit_validation(context, outcome, reasons)
+        if reasons:
+            return self._strategy_validation_reject_or_continue(reasons)
         self._run_transaction_cost_robustness_validation(context, plan, outcome, reasons)
         return self._strategy_validation_reject_or_continue(reasons)
 
@@ -3945,6 +3982,198 @@
         if lookahead_reason is not None:
             reasons.append(lookahead_reason)
 
+    def _run_data_integrity_audit_validation(
+        self,
+        context: ValidationContext,
+        outcome: StrategyEvalOutcome,
+        reasons: list[str],
+    ) -> None:
+        policy = self._load_data_integrity_audit_policy(context.job.collection)
+        if policy is None:
+            return
+        audit_reason, audit_meta = self._data_integrity_audit_result(context, policy)
+        self._attach_post_run_meta(outcome, "data_integrity_audit", audit_meta)
+        if audit_reason is not None:
+            reasons.append(audit_reason)
+
+    @staticmethod
+    def _data_integrity_audit_indeterminate(
+        reason: str,
+        *,
+        collection: CollectionConfig,
+        policy: ResultConsistencyDataIntegrityAuditConfig,
+        details: dict[str, Any] | None = None,
+    ) -> tuple[str, dict[str, Any]]:
+        meta: dict[str, Any] = {
+            "is_complete": False,
+            "status": "indeterminate",
+            "reason": reason,
+            "source": collection.source,
+            "reference_source": collection.reference_source,
+            "min_overlap_ratio": policy.min_overlap_ratio,
+            "max_median_ohlc_diff_bps": policy.max_median_ohlc_diff_bps,
+            "max_p95_ohlc_diff_bps": policy.max_p95_ohlc_diff_bps,
+        }
+        if details:
+            meta.update(details)
+        return f"data_integrity_audit_indeterminate(reason={reason})", meta
+
+    @staticmethod
+    def _data_integrity_audit_reference_collection(collection: CollectionConfig) -> CollectionConfig | None:
+        if not collection.reference_source:
+            return None
+        return CollectionConfig(
+            name=collection.name,
+            source=collection.reference_source,
+            symbols=list(collection.symbols),
+            reference_source=None,
+            exchange=None,
+            currency=collection.currency,
+            quote=collection.quote,
+            fees=collection.fees,
+            slippage=collection.slippage,
+            validation=collection.validation,
+        )
+
+    @staticmethod
+    def _data_integrity_ohlc_diff_metrics(
+        primary: pd.DataFrame,
+        reference: pd.DataFrame,
+    ) -> dict[str, float]:
+        eps = 1e-12
+        columns = ["Open", "High", "Low", "Close"]
+        diffs: list[np.ndarray] = []
+        for column in columns:
+            lhs = primary[column].to_numpy(dtype=float)
+            rhs = reference[column].to_numpy(dtype=float)
+            rel = np.abs(lhs - rhs) / np.maximum(np.abs(rhs), eps)
+            diffs.append(rel * 10000.0)
+        all_diffs = np.concatenate(diffs) if diffs else np.array([], dtype=float)
+        if all_diffs.size == 0:
+            return {
+                "median_ohlc_diff_bps": float("nan"),
+                "p95_ohlc_diff_bps": float("nan"),
+                "max_ohlc_diff_bps": float("nan"),
+            }
+        return {
+            "median_ohlc_diff_bps": float(np.nanmedian(all_diffs)),
+            "p95_ohlc_diff_bps": float(np.nanpercentile(all_diffs, 95)),
+            "max_ohlc_diff_bps": float(np.nanmax(all_diffs)),
+        }
+
+    def _data_integrity_audit_result(
+        self,
+        context: ValidationContext,
+        policy: ResultConsistencyDataIntegrityAuditConfig,
+    ) -> tuple[str | None, dict[str, Any]]:
+        validated_data = context.validated_data
+        if validated_data is None:
+            return self._data_integrity_audit_indeterminate(
+                "missing_validated_data",
+                collection=context.job.collection,
+                policy=policy,
+            )
+        reference_collection = self._data_integrity_audit_reference_collection(context.job.collection)
+        if reference_collection is None:
+            return self._data_integrity_audit_indeterminate(
+                "missing_reference_source",
+                collection=context.job.collection,
+                policy=policy,
+            )
+        _, _, _, _, _, _, _, _, _, _, calendar_timezone = self._load_data_quality_policy(
+            context.job.collection
+        )
+        try:
+            reference_source = self._make_source(reference_collection)
+            reference_raw_df = reference_source.fetch(context.job.symbol, context.job.timeframe, only_cached=False)
+            reference_df, reference_canonicalization = self._canonicalize_validation_frame(
+                reference_raw_df,
+                calendar_timezone=calendar_timezone,
+            )
+        except Exception as exc:
+            return self._data_integrity_audit_indeterminate(
+                "reference_fetch_failed",
+                collection=context.job.collection,
+                policy=policy,
+                details={"error": str(exc)},
+            )
+        primary_df = validated_data.raw_df
+        if primary_df.empty or reference_df.empty:
+            return self._data_integrity_audit_indeterminate(
+                "empty_frame",
+                collection=context.job.collection,
+                policy=policy,
+                details={
+                    "primary_bars": int(len(primary_df)),
+                    "reference_bars": int(len(reference_df)),
+                },
+            )
+        required_columns = ["Open", "High", "Low", "Close"]
+        missing_columns = [
+            name
+            for name in required_columns
+            if name not in primary_df.columns or name not in reference_df.columns
+        ]
+        if missing_columns:
+            return self._data_integrity_audit_indeterminate(
+                "missing_ohlc_columns",
+                collection=context.job.collection,
+                policy=policy,
+                details={"missing_columns": missing_columns},
+            )
+        overlap_index = primary_df.index.intersection(reference_df.index)
+        primary_bars = int(len(primary_df))
+        reference_bars = int(len(reference_df))
+        overlap_bars = int(len(overlap_index))
+        overlap_ratio = float(overlap_bars / primary_bars) if primary_bars > 0 else 0.0
+        missing_primary_bar_pct = float((1.0 - overlap_ratio) * 100.0)
+        overlap_primary = primary_df.loc[overlap_index, required_columns]
+        overlap_reference = reference_df.loc[overlap_index, required_columns]
+        divergence = self._data_integrity_ohlc_diff_metrics(overlap_primary, overlap_reference)
+        max_median = float(policy.max_median_ohlc_diff_bps or 0.0)
+        max_p95 = float(policy.max_p95_ohlc_diff_bps or 0.0)
+        min_overlap = float(policy.min_overlap_ratio or 0.0)
+        failed_checks: list[str] = []
+        if overlap_ratio < min_overlap:
+            failed_checks.append(
+                "overlap_ratio_below_threshold("
+                f"required={min_overlap}, available={overlap_ratio}, overlap_bars={overlap_bars}, "
+                f"primary_bars={primary_bars})"
+            )
+        median_diff = divergence["median_ohlc_diff_bps"]
+        if np.isfinite(median_diff) and median_diff > max_median:
+            failed_checks.append(
+                "median_ohlc_diff_bps_exceeded("
+                f"max_allowed={max_median}, available={median_diff})"
+            )
+        p95_diff = divergence["p95_ohlc_diff_bps"]
+        if np.isfinite(p95_diff) and p95_diff > max_p95:
+            failed_checks.append(
+                "p95_ohlc_diff_bps_exceeded("
+                f"max_allowed={max_p95}, available={p95_diff})"
+            )
+        meta: dict[str, Any] = {
+            "is_complete": True,
+            "status": "complete",
+            "source": context.job.collection.source,
+            "reference_source": context.job.collection.reference_source,
+            "primary_bars": primary_bars,
+            "reference_bars": reference_bars,
+            "overlap_bars": overlap_bars,
+            "overlap_ratio": overlap_ratio,
+            "missing_primary_bar_pct": missing_primary_bar_pct,
+            "min_overlap_ratio": min_overlap,
+            "max_median_ohlc_diff_bps": max_median,
+            "max_p95_ohlc_diff_bps": max_p95,
+            "reference_canonicalization": reference_canonicalization,
+            **divergence,
+            "failed_checks": list(failed_checks),
+        }
+        if failed_checks:
+            reason = "data_integrity_audit_failed(" + "; ".join(failed_checks) + ")"
+            return reason, meta
+        return None, meta
+
     def _run_transaction_cost_robustness_validation(
         self,
         context: ValidationContext,

diff --git a/src/config.py b/src/config.py
--- a/src/config.py
+++ b/src/config.py
@@ -22,6 +22,7 @@
     name: str
     source: str  # yfinance, ccxt, custom
     symbols: list[str]
+    reference_source: str | None = None
     exchange: str | None = None  # for ccxt
     currency: str | None = None
     quote: str | None = None  # for ccxt symbols e.g., USDT
@@ -152,6 +153,13 @@
 
 
 @dataclass
+class ResultConsistencyDataIntegrityAuditConfig:
+    min_overlap_ratio: float | None = None
+    max_median_ohlc_diff_bps: float | None = None
+    max_p95_ohlc_diff_bps: float | None = None
+
+
+@dataclass
 class ResultConsistencyConfig:
     min_metric: float | None = None
     min_trades: int | None = None
@@ -159,6 +167,7 @@
     execution_price_variance: ResultConsistencyExecutionPriceVarianceConfig | None = None
     lookahead_shuffle_test: ValidationLookaheadShuffleTestConfig | None = None
     transaction_cost_robustness: ResultConsistencyTransactionCostRobustnessConfig | None = None
+    data_integrity_audit: ResultConsistencyDataIntegrityAuditConfig | None = None
 
 
 @dataclass
@@ -195,6 +204,10 @@
 LOOKAHEAD_SHUFFLE_TEST_SEED_MIN = 0
 LOOKAHEAD_SHUFFLE_TEST_FAILED_PERMUTATIONS_MIN = 0
 LOOKAHEAD_SHUFFLE_TEST_CONFIG_PREFIX = "validation.result_consistency.lookahead_shuffle_test"
+DATA_INTEGRITY_AUDIT_CONFIG_PREFIX = "validation.result_consistency.data_integrity_audit"
+DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT = 0.99
+DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT = 5.0
+DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT = 20.0
 TRANSACTION_COST_ROBUSTNESS_MODE_ANALYTICS = "analytics"
 TRANSACTION_COST_ROBUSTNESS_MODE_ENFORCE = "enforce"
 TRANSACTION_COST_ROBUSTNESS_MODES = {
@@ -591,6 +604,99 @@
     )
 
 
+def _normalize_result_consistency_data_integrity_audit_config(
+    cfg: ResultConsistencyDataIntegrityAuditConfig | None,
+    prefix: str,
+) -> ResultConsistencyDataIntegrityAuditConfig | None:
+    if cfg is None:
+        return None
+    min_overlap_ratio_raw = getattr(cfg, "min_overlap_ratio", None)
+    min_overlap_ratio = (
+        _coerce_float(min_overlap_ratio_raw, f"{prefix}.min_overlap_ratio")
+        if min_overlap_ratio_raw is not None
+        else None
+    )
+    if min_overlap_ratio is not None and not (
+        VALIDATION_PROBABILITY_MIN <= min_overlap_ratio <= VALIDATION_PROBABILITY_MAX
+    ):
+        raise ValueError(
+            f"`{prefix}.min_overlap_ratio` must be between {VALIDATION_PROBABILITY_MIN} and "
+            f"{VALIDATION_PROBABILITY_MAX}"
+        )
+    max_median_ohlc_diff_bps_raw = getattr(cfg, "max_median_ohlc_diff_bps", None)
+    max_median_ohlc_diff_bps = (
+        _coerce_float(max_median_ohlc_diff_bps_raw, f"{prefix}.max_median_ohlc_diff_bps")
+        if max_median_ohlc_diff_bps_raw is not None
+        else None
+    )
+    if (
+        max_median_ohlc_diff_bps is not None
+        and max_median_ohlc_diff_bps < VALIDATION_NON_NEGATIVE_FLOAT_MIN
+    ):
+        raise ValueError(
+            f"`{prefix}.max_median_ohlc_diff_bps` must be >= {VALIDATION_NON_NEGATIVE_FLOAT_MIN}"
+        )
+    max_p95_ohlc_diff_bps_raw = getattr(cfg, "max_p95_ohlc_diff_bps", None)
+    max_p95_ohlc_diff_bps = (
+        _coerce_float(max_p95_ohlc_diff_bps_raw, f"{prefix}.max_p95_ohlc_diff_bps")
+        if max_p95_ohlc_diff_bps_raw is not None
+        else None
+    )
+    if max_p95_ohlc_diff_bps is not None and max_p95_ohlc_diff_bps < VALIDATION_NON_NEGATIVE_FLOAT_MIN:
+        raise ValueError(
+            f"`{prefix}.max_p95_ohlc_diff_bps` must be >= {VALIDATION_NON_NEGATIVE_FLOAT_MIN}"
+        )
+    if (
+        max_median_ohlc_diff_bps is not None
+        and max_p95_ohlc_diff_bps is not None
+        and max_p95_ohlc_diff_bps < max_median_ohlc_diff_bps
+    ):
+        raise ValueError(
+            f"`{prefix}.max_p95_ohlc_diff_bps` must be >= `{prefix}.max_median_ohlc_diff_bps`"
+        )
+    return ResultConsistencyDataIntegrityAuditConfig(
+        min_overlap_ratio=min_overlap_ratio,
+        max_median_ohlc_diff_bps=max_median_ohlc_diff_bps,
+        max_p95_ohlc_diff_bps=max_p95_ohlc_diff_bps,
+    )
+
+
+def _apply_result_consistency_data_integrity_audit_defaults(
+    cfg: ResultConsistencyDataIntegrityAuditConfig,
+) -> ResultConsistencyDataIntegrityAuditConfig:
+    min_overlap_ratio = (
+        cfg.min_overlap_ratio
+        if cfg.min_overlap_ratio is not None
+        else DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT
+    )
+    max_median_ohlc_diff_bps = (
+        cfg.max_median_ohlc_diff_bps
+        if cfg.max_median_ohlc_diff_bps is not None
+        else DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT
+    )
+    max_p95_ohlc_diff_bps = (
+        cfg.max_p95_ohlc_diff_bps
+        if cfg.max_p95_ohlc_diff_bps is not None
+        else DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT
+    )
+    if max_p95_ohlc_diff_bps < max_median_ohlc_diff_bps:
+        raise ValueError(
+            f"`{DATA_INTEGRITY_AUDIT_CONFIG_PREFIX}.max_p95_ohlc_diff_bps` must be >= "
+            f"`{DATA_INTEGRITY_AUDIT_CONFIG_PREFIX}.max_median_ohlc_diff_bps`"
+        )
+    return ResultConsistencyDataIntegrityAuditConfig(
+        min_overlap_ratio=min_overlap_ratio,
+        max_median_ohlc_diff_bps=max_median_ohlc_diff_bps,
+        max_p95_ohlc_diff_bps=max_p95_ohlc_diff_bps,
+    )
+
+
+def _default_data_integrity_audit_config() -> ResultConsistencyDataIntegrityAuditConfig:
+    return _apply_result_consistency_data_integrity_audit_defaults(
+        ResultConsistencyDataIntegrityAuditConfig()
+    )
+
+
 def _normalize_transaction_cost_breakeven_config(
     cfg: ResultConsistencyTransactionCostBreakevenConfig | None,
     prefix: str,
@@ -918,6 +1024,10 @@
         getattr(cfg, "lookahead_shuffle_test", None),
         f"{prefix}.lookahead_shuffle_test",
     )
+    data_integrity_audit = _normalize_result_consistency_data_integrity_audit_config(
+        getattr(cfg, "data_integrity_audit", None),
+        f"{prefix}.data_integrity_audit",
+    )
     transaction_cost_robustness = _normalize_transaction_cost_robustness_config(
         getattr(cfg, "transaction_cost_robustness", None),
         f"{prefix}.transaction_cost_robustness",
@@ -926,12 +1036,13 @@
         outlier_dependency is None
         and execution_price_variance is None
         and lookahead_shuffle_test is None
+        and data_integrity_audit is None
         and transaction_cost_robustness is None
     ):
         raise ValueError(
             f"Invalid `{prefix}`: expected at least one configured module "
             "(`outlier_dependency`, `execution_price_variance`, `lookahead_shuffle_test`, "
-            "or `transaction_cost_robustness`)"
+            "`data_integrity_audit`, or `transaction_cost_robustness`)"
         )
     min_metric_raw = getattr(cfg, "min_metric", None)
     min_metric = _coerce_float(min_metric_raw, f"{prefix}.min_metric") if min_metric_raw is not None else None
@@ -945,6 +1056,7 @@
         outlier_dependency=outlier_dependency,
         execution_price_variance=execution_price_variance,
         lookahead_shuffle_test=lookahead_shuffle_test,
+        data_integrity_audit=data_integrity_audit,
         transaction_cost_robustness=transaction_cost_robustness,
     )
 
@@ -971,6 +1083,11 @@
             if cfg.lookahead_shuffle_test is not None
             else None
         ),
+        data_integrity_audit=(
+            _apply_result_consistency_data_integrity_audit_defaults(cfg.data_integrity_audit)
+            if cfg.data_integrity_audit is not None
+            else None
+        ),
         transaction_cost_robustness=(
             _apply_transaction_cost_robustness_defaults(cfg.transaction_cost_robustness)
             if cfg.transaction_cost_robustness is not None
@@ -1060,6 +1177,10 @@
             getattr(base, "lookahead_shuffle_test", None),
             getattr(override, "lookahead_shuffle_test", None),
         ),
+        data_integrity_audit=_merge_result_consistency_data_integrity_audit_config(
+            getattr(base, "data_integrity_audit", None),
+            getattr(override, "data_integrity_audit", None),
+        ),
         transaction_cost_robustness=_merge_transaction_cost_robustness_config(
             getattr(base, "transaction_cost_robustness", None),
             getattr(override, "transaction_cost_robustness", None),
@@ -1069,6 +1190,7 @@
         merged.outlier_dependency is None
         and merged.execution_price_variance is None
         and merged.lookahead_shuffle_test is None
+        and merged.data_integrity_audit is None
         and merged.transaction_cost_robustness is None
     ):
         return None
@@ -1102,6 +1224,19 @@
     )
 
 
+def _merge_result_consistency_data_integrity_audit_config(
+    base: ResultConsistencyDataIntegrityAuditConfig | None,
+    override: ResultConsistencyDataIntegrityAuditConfig | None,
+) -> ResultConsistencyDataIntegrityAuditConfig | None:
+    if base is None and override is None:
+        return None
+    return ResultConsistencyDataIntegrityAuditConfig(
+        min_overlap_ratio=_merged_field(base, override, "min_overlap_ratio"),
+        max_median_ohlc_diff_bps=_merged_field(base, override, "max_median_ohlc_diff_bps"),
+        max_p95_ohlc_diff_bps=_merged_field(base, override, "max_p95_ohlc_diff_bps"),
+    )
+
+
 def _merge_transaction_cost_breakeven_config(
     base: ResultConsistencyTransactionCostBreakevenConfig | None,
     override: ResultConsistencyTransactionCostBreakevenConfig | None,
@@ -1499,6 +1634,14 @@
         global_result_consistency,
         collection_validation.result_consistency if collection_validation else None,
     )
+    # Special case: data-integrity audit activation is collection-scoped because
+    # `reference_source` exists only on CollectionConfig. Global validation can
+    # still define/override thresholds, but enabling the audit requires a
+    # collection-level reference source.
+    resolved_result_consistency = _ensure_reference_source_data_integrity_policy(
+        collection,
+        resolved_result_consistency,
+    )
     if (
         resolved_data_quality is None
         and resolved_optimization is None
@@ -1512,6 +1655,39 @@
     )
 
 
+def _ensure_reference_source_data_integrity_policy(
+    collection: CollectionConfig,
+    resolved_result_consistency: ResultConsistencyConfig | None,
+) -> ResultConsistencyConfig | None:
+    """Inject default data-integrity audit only when collection has a reference source.
+
+    Thresholds/rules may come from global validation and collection overrides,
+    but the audit itself is only meaningful when a collection-level
+    `reference_source` exists.
+    """
+    if not collection.reference_source:
+        return resolved_result_consistency
+
+    base_policy = (
+        resolved_result_consistency
+        if resolved_result_consistency is not None
+        else ResultConsistencyConfig()
+    )
+    if getattr(base_policy, "data_integrity_audit", None) is not None:
+        return resolved_result_consistency
+
+    with_default_audit = ResultConsistencyConfig(
+        min_metric=base_policy.min_metric,
+        min_trades=base_policy.min_trades,
+        outlier_dependency=base_policy.outlier_dependency,
+        execution_price_variance=base_policy.execution_price_variance,
+        lookahead_shuffle_test=base_policy.lookahead_shuffle_test,
+        transaction_cost_robustness=base_policy.transaction_cost_robustness,
+        data_integrity_audit=_default_data_integrity_audit_config(),
+    )
+    return _merge_result_consistency_config(with_default_audit, None)
+
+
 def resolve_validation_overrides(cfg: Config) -> None:
     """Resolve effective collection-level validation policies.
 
@@ -2104,6 +2280,17 @@
         if isinstance(lookahead_shuffle_test_raw, dict)
         else None
     )
+    data_integrity_audit_raw = parsed_raw.get("data_integrity_audit")
+    if data_integrity_audit_raw is not None and not isinstance(data_integrity_audit_raw, dict):
+        raise ValueError(f"Invalid `{prefix}.data_integrity_audit`: expected a mapping")
+    data_integrity_audit = (
+        _parse_result_consistency_data_integrity_audit(
+            data_integrity_audit_raw,
+            f"{prefix}.data_integrity_audit",
+        )
+        if isinstance(data_integrity_audit_raw, dict)
+        else None
+    )
     transaction_cost_robustness_raw = parsed_raw.get("transaction_cost_robustness")
     if (
         transaction_cost_robustness_raw is not None
@@ -2128,6 +2315,7 @@
             outlier_dependency=outlier_dependency,
             execution_price_variance=execution_price_variance,
             lookahead_shuffle_test=lookahead_shuffle_test,
+            data_integrity_audit=data_integrity_audit,
             transaction_cost_robustness=transaction_cost_robustness,
         ),
         prefix,
@@ -2184,6 +2372,36 @@
     )
 
 
+def _parse_result_consistency_data_integrity_audit(
+    raw: Any,
+    prefix: str,
+) -> ResultConsistencyDataIntegrityAuditConfig | None:
+    if raw is None:
+        return None
+    parsed_raw = require_mapping(raw, prefix)
+    return ResultConsistencyDataIntegrityAuditConfig(
+        min_overlap_ratio=parse_optional_float(
+            parsed_raw,
+            prefix,
+            "min_overlap_ratio",
+            min_value=VALIDATION_PROBABILITY_MIN,
+            max_value=VALIDATION_PROBABILITY_MAX,
+        ),
+        max_median_ohlc_diff_bps=parse_optional_float(
+            parsed_raw,
+            prefix,
+            "max_median_ohlc_diff_bps",
+            min_value=VALIDATION_NON_NEGATIVE_FLOAT_MIN,
+        ),
+        max_p95_ohlc_diff_bps=parse_optional_float(
+            parsed_raw,
+            prefix,
+            "max_p95_ohlc_diff_bps",
+            min_value=VALIDATION_NON_NEGATIVE_FLOAT_MIN,
+        ),
+    )
+
+
 def _parse_result_consistency_transaction_cost_breakeven(
     raw: Any, prefix: str
 ) -> ResultConsistencyTransactionCostBreakevenConfig | None:
@@ -2303,6 +2521,9 @@
                 name=str(collection_raw["name"]).strip(),
                 source=str(collection_raw["source"]).strip(),
                 symbols=[str(symbol).strip() for symbol in symbols_raw],
+                reference_source=parse_optional_str(
+                    collection_raw, "reference_source", normalize=False
+                ),
                 exchange=parse_optional_str(collection_raw, "exchange", normalize=False),
                 currency=parse_optional_str(collection_raw, "currency", normalize=False),
                 quote=parse_optional_str(collection_raw, "quote", normalize=False),

diff --git a/tests/test_backtest_runner.py b/tests/test_backtest_runner.py
--- a/tests/test_backtest_runner.py
+++ b/tests/test_backtest_runner.py
@@ -292,6 +292,7 @@
         "outlier_dependency": None,
         "execution_price_variance": None,
         "lookahead_shuffle_test": None,
+        "data_integrity_audit": None,
         "transaction_cost_robustness": None,
     }
     payload.update(overrides)
@@ -616,6 +617,11 @@
             seed=1337,
             max_failed_permutations=2,
         ),
+        data_integrity_audit=SimpleNamespace(
+            min_overlap_ratio=0.99,
+            max_median_ohlc_diff_bps=5.0,
+            max_p95_ohlc_diff_bps=20.0,
+        ),
         transaction_cost_robustness=SimpleNamespace(
             mode="analytics",
             stress_multipliers=[2.0, 5.0],
@@ -638,6 +644,7 @@
         "outlier_dependency",
         "execution_price_variance",
         "lookahead_shuffle_test",
+        "data_integrity_audit",
         "transaction_cost_robustness",
     ]
     assert payload["min_metric"] == pytest.approx(0.5)
@@ -654,6 +661,11 @@
         "seed": 1337,
         "max_failed_permutations": 2,
     }
+    assert payload["data_integrity_audit"] == {
+        "min_overlap_ratio": 0.99,
+        "max_median_ohlc_diff_bps": 5.0,
+        "max_p95_ohlc_diff_bps": 20.0,
+    }
     assert payload["transaction_cost_robustness"] == {
         "mode": "analytics",
         "stress_multipliers": [2.0, 5.0],
@@ -1851,6 +1863,28 @@
     monkeypatch.setattr(BacktestRunner, "_make_source", lambda self, col: _Source())
 
 
+def _patch_primary_and_reference_sources(
+    monkeypatch,
+    *,
+    primary_df: pd.DataFrame,
+    reference_df: pd.DataFrame,
+    reference_source: str = "alphavantage",
+) -> None:
+    class _Source:
+        def __init__(self, df: pd.DataFrame):
+            self._df = df
+
+        def fetch(self, symbol, timeframe, only_cached=False):
+            return self._df.copy()
+
+    def _make_source(self, col):
+        if col.source == reference_source:
+            return _Source(reference_df)
+        return _Source(primary_df)
+
+    monkeypatch.setattr(BacktestRunner, "_make_source", _make_source)
+
+
 def _lookahead_shuffle_test_config(
     *,
     permutations: int = 100,
@@ -3274,6 +3308,80 @@
     assert post_run_meta["lookahead_shuffle_test"]["is_complete"] is True
 
 
+def test_run_all_data_integrity_audit_passes_and_attaches_meta(tmp_path, monkeypatch):
+    runner = _make_runner(tmp_path, monkeypatch, patch_source=False)
+    runner.cfg.collections[0].reference_source = "alphavantage"
+    primary = _make_trending_ohlcv(30)
+    reference = primary.copy()
+    _patch_primary_and_reference_sources(
+        monkeypatch,
+        primary_df=primary,
+        reference_df=reference,
+        reference_source="alphavantage",
+    )
+    eval_calls = _patch_pybroker_simulation(monkeypatch)
+
+    results = runner.run_all()
+
+    assert len(results) == 1
+    assert eval_calls["count"] == 2
+    post_run_meta = results[0].stats.get("post_run_meta")
... diff truncated: showing 800 of 939 lines

You can send follow-ups to the cloud agent here.

Comment thread src/backtest/runner.py Outdated
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Fallback or 0.0 silently disables overlap threshold on None
    • Replaced or 0.0 coercions with explicit None handling that falls back to the configured data-integrity default constants and added a regression test for the None case.
Preview (72522c9f75)
diff --git a/README.md b/README.md
--- a/README.md
+++ b/README.md
@@ -254,6 +254,14 @@
   - collection-level overrides are supported via `collections[].validation.optimization`
     and are resolved against global `validation.optimization` during config loading.
 - `validation.result_consistency` controls strategy-result concentration checks:
+  - `data_integrity_audit` (optional thresholds module; gate is active when `collections[].reference_source` is set):
+    - purpose: compare canonicalized bars from the primary `source` and a secondary `reference_source`
+      to catch bad prints / ghost bars before accepting strategy results
+    - `min_overlap_ratio` (optional, default `0.99`, `0..1`): minimum timestamp overlap required between sources
+    - `max_median_ohlc_diff_bps` (optional, default `5.0`, `>=0`): maximum allowed median OHLC drift (bps)
+    - `max_p95_ohlc_diff_bps` (optional, default `20.0`, `>=0`): maximum allowed p95 OHLC drift (bps)
+    - action: fixed to `reject_result` when overlap/drift thresholds are breached (or comparison is indeterminate)
+    - diagnostics are attached under `post_run_meta.data_integrity_audit`
   - `outlier_dependency` (optional module; active when configured):
     - `slices` (required, `>=2`): number of equal time-slices used for diagnostics
     - `profit_share_threshold` (required, `0..1`)
@@ -297,7 +305,7 @@
 - `data_validation_gate` can emit `skip_optimization` (job-level optimization disable).
 - `strategy_optimization_gate` can emit `baseline_only` (strategy-level baseline fallback) or `skip_job`.
 - `strategy_validation_gate` can emit `reject_result` for outlier dependency,
-  execution price variance, and lookahead shuffle testing.
+  execution price variance, lookahead shuffle testing, data integrity audit, and transaction-cost robustness.
 
 Numeric config parsing follows `src/config.py` coercion helpers:
 - numeric fields are strict types: use YAML numbers, not quoted numeric strings

diff --git a/config/example.yaml b/config/example.yaml
--- a/config/example.yaml
+++ b/config/example.yaml
@@ -54,6 +54,10 @@
   result_consistency:
     min_metric: 0.5  # fail fast: require at least this metric before expensive checks
     min_trades: 20  # fail fast: require at least this many closed trades
+    data_integrity_audit:
+      min_overlap_ratio: 0.99  # min shared timestamps between source and reference_source
+      max_median_ohlc_diff_bps: 5.0  # median OHLC drift tolerance (bps)
+      max_p95_ohlc_diff_bps: 20.0  # tail OHLC drift tolerance (bps)
     outlier_dependency:
       slices: 5  # split trade history into N equal time-slices for diagnostics
       profit_share_threshold: 0.80
@@ -80,6 +84,7 @@
   # Stocks (large-cap growth)
   - name: stocks_large_cap_growth
     source: yfinance
+    reference_source: twelvedata  # optional golden source for post-run data-integrity audit
     symbols: ["CNDX.L", "AAPL", "MSFT", "NVDA"]
     fees: 0.0005  # approx IBKR
     slippage: 0.0005
@@ -101,6 +106,7 @@
   # Crypto (Binance via ccxt)
   - name: crypto
     source: binance
+    reference_source: bybit  # optional golden source; activates data_integrity_audit defaults if unset
     exchange: binance
     quote: USDT
     symbols: ["BTC/USDT", "ETH/USDT", "BNB/USDT", "SOL/USDT"]

diff --git a/src/backtest/runner.py b/src/backtest/runner.py
--- a/src/backtest/runner.py
+++ b/src/backtest/runner.py
@@ -19,7 +19,11 @@
 from ..config import (
     CollectionConfig,
     Config,
+    DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT,
+    DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT,
+    DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT,
     ResultConsistencyConfig,
+    ResultConsistencyDataIntegrityAuditConfig,
     ResultConsistencyExecutionPriceVarianceConfig,
     ResultConsistencyTransactionCostBreakevenConfig,
     ResultConsistencyTransactionCostRobustnessConfig,
@@ -235,6 +239,7 @@
         "result_consistency.outlier_dependency",
         "result_consistency.execution_price_variance",
         "result_consistency.lookahead_shuffle_test",
+        "result_consistency.data_integrity_audit",
         "result_consistency.transaction_cost_robustness",
     )
 
@@ -279,6 +284,10 @@
         self._runtime_signal_error_counts: dict[tuple[str, str, str, str], int] = {}
         self._runtime_signal_error_capped: set[tuple[str, str, str, str]] = set()
         self._strategy_fingerprint_cache: dict[type[BaseStrategy], str] = {}
+        self._data_integrity_audit_cache: dict[
+            tuple[str, str, str, str, str],
+            tuple[str | None, dict[str, Any]],
+        ] = {}
         self.validation_metadata: dict[str, Any] = {}
         self.active_validation_gates: list[str] = []
         self.inactive_validation_gates: list[str] = []
@@ -452,6 +461,20 @@
         }
 
     @staticmethod
+    def _serialize_data_integrity_audit_profile(
+        data_integrity_audit: Any,
+    ) -> dict[str, Any] | None:
+        if data_integrity_audit is None:
+            return None
+        return {
+            "min_overlap_ratio": getattr(data_integrity_audit, "min_overlap_ratio", None),
+            "max_median_ohlc_diff_bps": getattr(
+                data_integrity_audit, "max_median_ohlc_diff_bps", None
+            ),
+            "max_p95_ohlc_diff_bps": getattr(data_integrity_audit, "max_p95_ohlc_diff_bps", None),
+        }
+
+    @staticmethod
     def _serialize_transaction_cost_breakeven_profile(
         breakeven: Any,
     ) -> dict[str, Any] | None:
@@ -561,6 +584,9 @@
             "lookahead_shuffle_test": BacktestRunner._serialize_lookahead_shuffle_test_profile(
                 getattr(result_consistency, "lookahead_shuffle_test", None)
             ),
+            "data_integrity_audit": BacktestRunner._serialize_data_integrity_audit_profile(
+                getattr(result_consistency, "data_integrity_audit", None)
+            ),
             "transaction_cost_robustness": BacktestRunner._serialize_transaction_cost_robustness_profile(
                 getattr(result_consistency, "transaction_cost_robustness", None)
             ),
@@ -599,7 +625,11 @@
         return {"optimization.feasibility"}
 
     @staticmethod
-    def _active_result_consistency_gates(result_consistency: Any) -> set[str]:
+    def _active_result_consistency_gates(
+        result_consistency: Any,
+        *,
+        has_reference_source: bool = False,
+    ) -> set[str]:
         if result_consistency is None:
             return set()
         active: set[str] = set()
@@ -613,6 +643,9 @@
             active.add("result_consistency.execution_price_variance")
         if getattr(result_consistency, "lookahead_shuffle_test", None) is not None:
             active.add("result_consistency.lookahead_shuffle_test")
+        # Data integrity audit activation is collection-scoped via reference_source.
+        if has_reference_source:
+            active.add("result_consistency.data_integrity_audit")
         if getattr(result_consistency, "transaction_cost_robustness", None) is not None:
             active.add("result_consistency.transaction_cost_robustness")
         return active
@@ -629,7 +662,10 @@
             collection_active = self._active_data_quality_gates(collection_dq)
             collection_active.update(self._active_optimization_gates(collection_optimization))
             collection_active.update(
-                self._active_result_consistency_gates(collection_result_consistency)
+                self._active_result_consistency_gates(
+                    collection_result_consistency,
+                    has_reference_source=bool(getattr(collection, "reference_source", None)),
+                )
             )
             active_gates_union.update(collection_active)
             collection_profiles.append(
@@ -1726,6 +1762,19 @@
             return getattr(resolved_rc, "lookahead_shuffle_test", None)
         return None
 
+    def _load_data_integrity_audit_policy(
+        self, collection: CollectionConfig
+    ) -> ResultConsistencyDataIntegrityAuditConfig | None:
+        if not collection.reference_source:
+            return None
+        collection_validation = getattr(collection, "validation", None)
+        resolved_rc: ResultConsistencyConfig | None = (
+            getattr(collection_validation, "result_consistency", None) if collection_validation else None
+        )
+        if resolved_rc is None:
+            return None
+        return getattr(resolved_rc, "data_integrity_audit", None)
+
     def _load_transaction_cost_robustness_policy(
         self, collection: CollectionConfig
     ) -> ResultConsistencyTransactionCostRobustnessConfig | None:
@@ -3910,6 +3959,9 @@
         self._run_lookahead_shuffle_validation(context, plan, outcome, reasons)
         if reasons:
             return self._strategy_validation_reject_or_continue(reasons)
+        self._run_data_integrity_audit_validation(context, outcome, reasons)
+        if reasons:
+            return self._strategy_validation_reject_or_continue(reasons)
         self._run_transaction_cost_robustness_validation(context, plan, outcome, reasons)
         return self._strategy_validation_reject_or_continue(reasons)
 
@@ -3945,6 +3997,293 @@
         if lookahead_reason is not None:
             reasons.append(lookahead_reason)
 
+    def _run_data_integrity_audit_validation(
+        self,
+        context: ValidationContext,
+        outcome: StrategyEvalOutcome,
+        reasons: list[str],
+    ) -> None:
+        policy = self._load_data_integrity_audit_policy(context.job.collection)
+        if policy is None:
+            return
+        cache_key = self._data_integrity_audit_cache_key(context, policy)
+        cached = self._data_integrity_audit_cache.get(cache_key)
+        if cached is None:
+            audit_reason, audit_meta = self._data_integrity_audit_result(context, policy)
+            self._data_integrity_audit_cache[cache_key] = (audit_reason, copy.deepcopy(audit_meta))
+        else:
+            audit_reason, cached_meta = cached
+            audit_meta = copy.deepcopy(cached_meta)
+        self._attach_post_run_meta(outcome, "data_integrity_audit", audit_meta)
+        if audit_reason is not None:
+            reasons.append(audit_reason)
+
+    @staticmethod
+    def _data_integrity_audit_cache_key(
+        context: ValidationContext,
+        _policy: ResultConsistencyDataIntegrityAuditConfig,
+    ) -> tuple[str, str, str, str, str]:
+        return (
+            context.job.collection.name,
+            context.job.symbol,
+            context.job.timeframe,
+            str(context.job.collection.source),
+            str(context.job.collection.reference_source),
+        )
+
+    @staticmethod
+    def _data_integrity_audit_indeterminate(
+        reason: str,
+        *,
+        collection: CollectionConfig,
+        policy: ResultConsistencyDataIntegrityAuditConfig,
+        details: dict[str, Any] | None = None,
+    ) -> tuple[str, dict[str, Any]]:
+        meta: dict[str, Any] = {
+            "is_complete": False,
+            "status": "indeterminate",
+            "reason": reason,
+            "source": collection.source,
+            "reference_source": collection.reference_source,
+            "min_overlap_ratio": policy.min_overlap_ratio,
+            "max_median_ohlc_diff_bps": policy.max_median_ohlc_diff_bps,
+            "max_p95_ohlc_diff_bps": policy.max_p95_ohlc_diff_bps,
+        }
+        if details:
+            meta.update(details)
+        return f"data_integrity_audit_indeterminate(reason={reason})", meta
+
+    @staticmethod
+    def _data_integrity_audit_reference_collection(collection: CollectionConfig) -> CollectionConfig | None:
+        if not collection.reference_source:
+            return None
+        return CollectionConfig(
+            name=collection.name,
+            source=collection.reference_source,
+            symbols=list(collection.symbols),
+            reference_source=None,
+            exchange=None,
+            currency=collection.currency,
+            quote=collection.quote,
+            fees=collection.fees,
+            slippage=collection.slippage,
+            validation=collection.validation,
+        )
+
+    @staticmethod
+    def _data_integrity_ohlc_diff_metrics(
+        primary: pd.DataFrame,
+        reference: pd.DataFrame,
+    ) -> dict[str, float]:
+        eps = 1e-12
+        columns = ["Open", "High", "Low", "Close"]
+        diffs: list[np.ndarray] = []
+        for column in columns:
+            lhs = primary[column].to_numpy(dtype=float)
+            rhs = reference[column].to_numpy(dtype=float)
+            rel = np.abs(lhs - rhs) / np.maximum(np.abs(rhs), eps)
+            diffs.append(rel * 10000.0)
+        all_diffs = np.concatenate(diffs) if diffs else np.array([], dtype=float)
+        if all_diffs.size == 0:
+            return {
+                "median_ohlc_diff_bps": float("nan"),
+                "p95_ohlc_diff_bps": float("nan"),
+                "max_ohlc_diff_bps": float("nan"),
+            }
+        return {
+            "median_ohlc_diff_bps": float(np.nanmedian(all_diffs)),
+            "p95_ohlc_diff_bps": float(np.nanpercentile(all_diffs, 95)),
+            "max_ohlc_diff_bps": float(np.nanmax(all_diffs)),
+        }
+
+    def _data_integrity_audit_result(
+        self,
+        context: ValidationContext,
+        policy: ResultConsistencyDataIntegrityAuditConfig,
+    ) -> tuple[str | None, dict[str, Any]]:
+        validated_data = context.validated_data
+        if validated_data is None:
+            return self._data_integrity_audit_indeterminate(
+                "missing_validated_data",
+                collection=context.job.collection,
+                policy=policy,
+            )
+        reference_outcome = self._load_reference_frame_for_data_integrity(context, policy)
+        if not isinstance(reference_outcome[0], pd.DataFrame):
+            return reference_outcome
+        reference_df, reference_canonicalization = reference_outcome
+        primary_df = validated_data.raw_df
+        invalid_input = self._validate_data_integrity_inputs(primary_df, reference_df, context, policy)
+        if invalid_input is not None:
+            return invalid_input
+        overlap_details = self._data_integrity_overlap_details(primary_df, reference_df)
+        divergence = overlap_details["divergence"]
+        threshold_details = self._data_integrity_threshold_details(policy)
+        failed_checks = self._data_integrity_failed_checks(overlap_details, divergence, threshold_details)
+        meta: dict[str, Any] = {
+            "is_complete": True,
+            "status": "complete",
+            "source": context.job.collection.source,
+            "reference_source": context.job.collection.reference_source,
+            "primary_bars": overlap_details["primary_bars"],
+            "reference_bars": overlap_details["reference_bars"],
+            "overlap_bars": overlap_details["overlap_bars"],
+            "overlap_ratio": overlap_details["overlap_ratio"],
+            "missing_primary_bar_pct": overlap_details["missing_primary_bar_pct"],
+            "min_overlap_ratio": threshold_details["min_overlap_ratio"],
+            "max_median_ohlc_diff_bps": threshold_details["max_median_ohlc_diff_bps"],
+            "max_p95_ohlc_diff_bps": threshold_details["max_p95_ohlc_diff_bps"],
+            "reference_canonicalization": reference_canonicalization,
+            **divergence,
+            "failed_checks": list(failed_checks),
+        }
+        if failed_checks:
+            reason = "data_integrity_audit_failed(" + "; ".join(failed_checks) + ")"
+            return reason, meta
+        return None, meta
+
+    def _load_reference_frame_for_data_integrity(
+        self,
+        context: ValidationContext,
+        policy: ResultConsistencyDataIntegrityAuditConfig,
+    ) -> tuple[pd.DataFrame, dict[str, int]] | tuple[str | None, dict[str, Any]]:
+        reference_collection = self._data_integrity_audit_reference_collection(context.job.collection)
+        if reference_collection is None:
+            return self._data_integrity_audit_indeterminate(
+                "missing_reference_source",
+                collection=context.job.collection,
+                policy=policy,
+            )
+        _, _, _, _, _, _, _, _, _, _, calendar_timezone = self._load_data_quality_policy(
+            context.job.collection
+        )
+        try:
+            reference_source = self._make_source(reference_collection)
+            reference_raw_df = reference_source.fetch(context.job.symbol, context.job.timeframe, only_cached=False)
+            reference_df, reference_canonicalization = self._canonicalize_validation_frame(
+                reference_raw_df,
+                calendar_timezone=calendar_timezone,
+            )
+        except Exception as exc:
+            return self._data_integrity_audit_indeterminate(
+                "reference_fetch_failed",
+                collection=context.job.collection,
+                policy=policy,
+                details={"error": str(exc)},
+            )
+        return reference_df, reference_canonicalization
+
+    def _validate_data_integrity_inputs(
+        self,
+        primary_df: pd.DataFrame,
+        reference_df: pd.DataFrame,
+        context: ValidationContext,
+        policy: ResultConsistencyDataIntegrityAuditConfig,
+    ) -> tuple[str | None, dict[str, Any]] | None:
+        if primary_df.empty or reference_df.empty:
+            return self._data_integrity_audit_indeterminate(
+                "empty_frame",
+                collection=context.job.collection,
+                policy=policy,
+                details={
+                    "primary_bars": int(len(primary_df)),
+                    "reference_bars": int(len(reference_df)),
+                },
+            )
+        required_columns = ["Open", "High", "Low", "Close"]
+        missing_columns = [
+            name
+            for name in required_columns
+            if name not in primary_df.columns or name not in reference_df.columns
+        ]
+        if missing_columns:
+            return self._data_integrity_audit_indeterminate(
+                "missing_ohlc_columns",
+                collection=context.job.collection,
+                policy=policy,
+                details={"missing_columns": missing_columns},
+            )
+        return None
+
+    def _data_integrity_overlap_details(
+        self,
+        primary_df: pd.DataFrame,
+        reference_df: pd.DataFrame,
+    ) -> dict[str, Any]:
+        required_columns = ["Open", "High", "Low", "Close"]
+        overlap_index = primary_df.index.intersection(reference_df.index)
+        primary_bars = int(len(primary_df))
+        reference_bars = int(len(reference_df))
+        overlap_bars = int(len(overlap_index))
+        overlap_ratio = float(overlap_bars / primary_bars) if primary_bars > 0 else 0.0
+        missing_primary_bar_pct = float((1.0 - overlap_ratio) * 100.0)
+        overlap_primary = primary_df.loc[overlap_index, required_columns]
+        overlap_reference = reference_df.loc[overlap_index, required_columns]
+        divergence = self._data_integrity_ohlc_diff_metrics(overlap_primary, overlap_reference)
+        return {
+            "primary_bars": primary_bars,
+            "reference_bars": reference_bars,
+            "overlap_bars": overlap_bars,
+            "overlap_ratio": overlap_ratio,
+            "missing_primary_bar_pct": missing_primary_bar_pct,
+            "divergence": divergence,
+        }
+
+    @staticmethod
+    def _data_integrity_threshold_details(
+        policy: ResultConsistencyDataIntegrityAuditConfig,
+    ) -> dict[str, float]:
+        return {
+            "max_median_ohlc_diff_bps": float(
+                policy.max_median_ohlc_diff_bps
+                if policy.max_median_ohlc_diff_bps is not None
+                else DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT
+            ),
+            "max_p95_ohlc_diff_bps": float(
+                policy.max_p95_ohlc_diff_bps
+                if policy.max_p95_ohlc_diff_bps is not None
+                else DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT
+            ),
+            "min_overlap_ratio": float(
+                policy.min_overlap_ratio
+                if policy.min_overlap_ratio is not None
+                else DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT
+            ),
+        }
+
+    @staticmethod
+    def _data_integrity_failed_checks(
+        overlap_details: dict[str, Any],
+        divergence: dict[str, float],
+        thresholds: dict[str, float],
+    ) -> list[str]:
+        failed_checks: list[str] = []
+        overlap_ratio = float(overlap_details["overlap_ratio"])
+        min_overlap = float(thresholds["min_overlap_ratio"])
+        overlap_bars = int(overlap_details["overlap_bars"])
+        primary_bars = int(overlap_details["primary_bars"])
+        if overlap_ratio < min_overlap:
+            failed_checks.append(
+                "overlap_ratio_below_threshold("
+                f"required={min_overlap}, available={overlap_ratio}, overlap_bars={overlap_bars}, "
+                f"primary_bars={primary_bars})"
+            )
+        median_diff = divergence["median_ohlc_diff_bps"]
+        max_median = float(thresholds["max_median_ohlc_diff_bps"])
+        if np.isfinite(median_diff) and median_diff > max_median:
+            failed_checks.append(
+                "median_ohlc_diff_bps_exceeded("
+                f"max_allowed={max_median}, available={median_diff})"
+            )
+        p95_diff = divergence["p95_ohlc_diff_bps"]
+        max_p95 = float(thresholds["max_p95_ohlc_diff_bps"])
+        if np.isfinite(p95_diff) and p95_diff > max_p95:
+            failed_checks.append(
+                "p95_ohlc_diff_bps_exceeded("
+                f"max_allowed={max_p95}, available={p95_diff})"
+            )
+        return failed_checks
+
     def _run_transaction_cost_robustness_validation(
         self,
         context: ValidationContext,
@@ -4157,6 +4496,7 @@
         self._evaluation_cache_write_failures = 0
         self._runtime_signal_error_counts = {}
         self._runtime_signal_error_capped = set()
+        self._data_integrity_audit_cache = {}
         self._evaluator = None
         self._strategy_overrides = (
             {s.name: s.params for s in self.cfg.strategies} if self.cfg.strategies else {}

diff --git a/src/config.py b/src/config.py
--- a/src/config.py
+++ b/src/config.py
@@ -22,6 +22,7 @@
     name: str
     source: str  # yfinance, ccxt, custom
     symbols: list[str]
+    reference_source: str | None = None
     exchange: str | None = None  # for ccxt
     currency: str | None = None
     quote: str | None = None  # for ccxt symbols e.g., USDT
@@ -152,6 +153,13 @@
 
 
 @dataclass
+class ResultConsistencyDataIntegrityAuditConfig:
+    min_overlap_ratio: float | None = None
+    max_median_ohlc_diff_bps: float | None = None
+    max_p95_ohlc_diff_bps: float | None = None
+
+
+@dataclass
 class ResultConsistencyConfig:
     min_metric: float | None = None
     min_trades: int | None = None
@@ -159,6 +167,7 @@
     execution_price_variance: ResultConsistencyExecutionPriceVarianceConfig | None = None
     lookahead_shuffle_test: ValidationLookaheadShuffleTestConfig | None = None
     transaction_cost_robustness: ResultConsistencyTransactionCostRobustnessConfig | None = None
+    data_integrity_audit: ResultConsistencyDataIntegrityAuditConfig | None = None
 
 
 @dataclass
@@ -195,6 +204,10 @@
 LOOKAHEAD_SHUFFLE_TEST_SEED_MIN = 0
 LOOKAHEAD_SHUFFLE_TEST_FAILED_PERMUTATIONS_MIN = 0
 LOOKAHEAD_SHUFFLE_TEST_CONFIG_PREFIX = "validation.result_consistency.lookahead_shuffle_test"
+DATA_INTEGRITY_AUDIT_CONFIG_PREFIX = "validation.result_consistency.data_integrity_audit"
+DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT = 0.99
+DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT = 5.0
+DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT = 20.0
 TRANSACTION_COST_ROBUSTNESS_MODE_ANALYTICS = "analytics"
 TRANSACTION_COST_ROBUSTNESS_MODE_ENFORCE = "enforce"
 TRANSACTION_COST_ROBUSTNESS_MODES = {
@@ -591,6 +604,99 @@
     )
 
 
+def _normalize_result_consistency_data_integrity_audit_config(
+    cfg: ResultConsistencyDataIntegrityAuditConfig | None,
+    prefix: str,
+) -> ResultConsistencyDataIntegrityAuditConfig | None:
+    if cfg is None:
+        return None
+    min_overlap_ratio_raw = getattr(cfg, "min_overlap_ratio", None)
+    min_overlap_ratio = (
+        _coerce_float(min_overlap_ratio_raw, f"{prefix}.min_overlap_ratio")
+        if min_overlap_ratio_raw is not None
+        else None
+    )
+    if min_overlap_ratio is not None and not (
+        VALIDATION_PROBABILITY_MIN <= min_overlap_ratio <= VALIDATION_PROBABILITY_MAX
+    ):
+        raise ValueError(
+            f"`{prefix}.min_overlap_ratio` must be between {VALIDATION_PROBABILITY_MIN} and "
+            f"{VALIDATION_PROBABILITY_MAX}"
+        )
+    max_median_ohlc_diff_bps_raw = getattr(cfg, "max_median_ohlc_diff_bps", None)
+    max_median_ohlc_diff_bps = (
+        _coerce_float(max_median_ohlc_diff_bps_raw, f"{prefix}.max_median_ohlc_diff_bps")
+        if max_median_ohlc_diff_bps_raw is not None
+        else None
+    )
+    if (
+        max_median_ohlc_diff_bps is not None
+        and max_median_ohlc_diff_bps < VALIDATION_NON_NEGATIVE_FLOAT_MIN
+    ):
+        raise ValueError(
+            f"`{prefix}.max_median_ohlc_diff_bps` must be >= {VALIDATION_NON_NEGATIVE_FLOAT_MIN}"
+        )
+    max_p95_ohlc_diff_bps_raw = getattr(cfg, "max_p95_ohlc_diff_bps", None)
+    max_p95_ohlc_diff_bps = (
+        _coerce_float(max_p95_ohlc_diff_bps_raw, f"{prefix}.max_p95_ohlc_diff_bps")
+        if max_p95_ohlc_diff_bps_raw is not None
+        else None
+    )
+    if max_p95_ohlc_diff_bps is not None and max_p95_ohlc_diff_bps < VALIDATION_NON_NEGATIVE_FLOAT_MIN:
+        raise ValueError(
+            f"`{prefix}.max_p95_ohlc_diff_bps` must be >= {VALIDATION_NON_NEGATIVE_FLOAT_MIN}"
+        )
+    if (
+        max_median_ohlc_diff_bps is not None
+        and max_p95_ohlc_diff_bps is not None
+        and max_p95_ohlc_diff_bps < max_median_ohlc_diff_bps
+    ):
+        raise ValueError(
+            f"`{prefix}.max_p95_ohlc_diff_bps` must be >= `{prefix}.max_median_ohlc_diff_bps`"
+        )
+    return ResultConsistencyDataIntegrityAuditConfig(
+        min_overlap_ratio=min_overlap_ratio,
+        max_median_ohlc_diff_bps=max_median_ohlc_diff_bps,
+        max_p95_ohlc_diff_bps=max_p95_ohlc_diff_bps,
+    )
+
+
+def _apply_result_consistency_data_integrity_audit_defaults(
+    cfg: ResultConsistencyDataIntegrityAuditConfig,
+) -> ResultConsistencyDataIntegrityAuditConfig:
+    min_overlap_ratio = (
+        cfg.min_overlap_ratio
+        if cfg.min_overlap_ratio is not None
+        else DATA_INTEGRITY_AUDIT_MIN_OVERLAP_RATIO_DEFAULT
+    )
+    max_median_ohlc_diff_bps = (
+        cfg.max_median_ohlc_diff_bps
+        if cfg.max_median_ohlc_diff_bps is not None
+        else DATA_INTEGRITY_AUDIT_MAX_MEDIAN_OHLC_DIFF_BPS_DEFAULT
+    )
+    max_p95_ohlc_diff_bps = (
+        cfg.max_p95_ohlc_diff_bps
+        if cfg.max_p95_ohlc_diff_bps is not None
+        else DATA_INTEGRITY_AUDIT_MAX_P95_OHLC_DIFF_BPS_DEFAULT
+    )
+    if max_p95_ohlc_diff_bps < max_median_ohlc_diff_bps:
+        raise ValueError(
+            f"`{DATA_INTEGRITY_AUDIT_CONFIG_PREFIX}.max_p95_ohlc_diff_bps` must be >= "
+            f"`{DATA_INTEGRITY_AUDIT_CONFIG_PREFIX}.max_median_ohlc_diff_bps`"
+        )
+    return ResultConsistencyDataIntegrityAuditConfig(
+        min_overlap_ratio=min_overlap_ratio,
+        max_median_ohlc_diff_bps=max_median_ohlc_diff_bps,
+        max_p95_ohlc_diff_bps=max_p95_ohlc_diff_bps,
+    )
+
+
+def _default_data_integrity_audit_config() -> ResultConsistencyDataIntegrityAuditConfig:
+    return _apply_result_consistency_data_integrity_audit_defaults(
+        ResultConsistencyDataIntegrityAuditConfig()
+    )
+
+
 def _normalize_transaction_cost_breakeven_config(
     cfg: ResultConsistencyTransactionCostBreakevenConfig | None,
     prefix: str,
@@ -918,6 +1024,10 @@
         getattr(cfg, "lookahead_shuffle_test", None),
         f"{prefix}.lookahead_shuffle_test",
     )
+    data_integrity_audit = _normalize_result_consistency_data_integrity_audit_config(
+        getattr(cfg, "data_integrity_audit", None),
+        f"{prefix}.data_integrity_audit",
+    )
     transaction_cost_robustness = _normalize_transaction_cost_robustness_config(
         getattr(cfg, "transaction_cost_robustness", None),
         f"{prefix}.transaction_cost_robustness",
@@ -926,12 +1036,13 @@
         outlier_dependency is None
         and execution_price_variance is None
         and lookahead_shuffle_test is None
+        and data_integrity_audit is None
         and transaction_cost_robustness is None
     ):
         raise ValueError(
             f"Invalid `{prefix}`: expected at least one configured module "
             "(`outlier_dependency`, `execution_price_variance`, `lookahead_shuffle_test`, "
-            "or `transaction_cost_robustness`)"
+            "`data_integrity_audit`, or `transaction_cost_robustness`)"
         )
     min_metric_raw = getattr(cfg, "min_metric", None)
     min_metric = _coerce_float(min_metric_raw, f"{prefix}.min_metric") if min_metric_raw is not None else None
@@ -945,6 +1056,7 @@
         outlier_dependency=outlier_dependency,
         execution_price_variance=execution_price_variance,
         lookahead_shuffle_test=lookahead_shuffle_test,
+        data_integrity_audit=data_integrity_audit,
         transaction_cost_robustness=transaction_cost_robustness,
     )
 
@@ -971,6 +1083,11 @@
             if cfg.lookahead_shuffle_test is not None
             else None
         ),
+        data_integrity_audit=(
+            _apply_result_consistency_data_integrity_audit_defaults(cfg.data_integrity_audit)
+            if cfg.data_integrity_audit is not None
+            else None
+        ),
         transaction_cost_robustness=(
             _apply_transaction_cost_robustness_defaults(cfg.transaction_cost_robustness)
             if cfg.transaction_cost_robustness is not None
@@ -1060,6 +1177,10 @@
             getattr(base, "lookahead_shuffle_test", None),
             getattr(override, "lookahead_shuffle_test", None),
         ),
+        data_integrity_audit=_merge_result_consistency_data_integrity_audit_config(
+            getattr(base, "data_integrity_audit", None),
+            getattr(override, "data_integrity_audit", None),
+        ),
         transaction_cost_robustness=_merge_transaction_cost_robustness_config(
             getattr(base, "transaction_cost_robustness", None),
             getattr(override, "transaction_cost_robustness", None),
@@ -1069,6 +1190,7 @@
         merged.outlier_dependency is None
         and merged.execution_price_variance is None
         and merged.lookahead_shuffle_test is None
+        and merged.data_integrity_audit is None
         and merged.transaction_cost_robustness is None
     ):
         return None
@@ -1102,6 +1224,19 @@
     )
 
 
+def _merge_result_consistency_data_integrity_audit_config(
+    base: ResultConsistencyDataIntegrityAuditConfig | None,
+    override: ResultConsistencyDataIntegrityAuditConfig | None,
+) -> ResultConsistencyDataIntegrityAuditConfig | None:
+    if base is None and override is None:
+        return None
+    return ResultConsistencyDataIntegrityAuditConfig(
+        min_overlap_ratio=_merged_field(base, override, "min_overlap_ratio"),
+        max_median_ohlc_diff_bps=_merged_field(base, override, "max_median_ohlc_diff_bps"),
+        max_p95_ohlc_diff_bps=_merged_field(base, override, "max_p95_ohlc_diff_bps"),
+    )
+
+
 def _merge_transaction_cost_breakeven_config(
     base: ResultConsistencyTransactionCostBreakevenConfig | None,
     override: ResultConsistencyTransactionCostBreakevenConfig | None,
@@ -1499,6 +1634,14 @@
         global_result_consistency,
         collection_validation.result_consistency if collection_validation else None,
     )
+    # Special case: data-integrity audit activation is collection-scoped because
+    # `reference_source` exists only on CollectionConfig. Global validation can
+    # still define/override thresholds, but enabling the audit requires a
+    # collection-level reference source.
+    resolved_result_consistency = _ensure_reference_source_data_integrity_policy(
+        collection,
+        resolved_result_consistency,
+    )
     if (
         resolved_data_quality is None
         and resolved_optimization is None
@@ -1512,6 +1655,39 @@
     )
 
 
+def _ensure_reference_source_data_integrity_policy(
+    collection: CollectionConfig,
+    resolved_result_consistency: ResultConsistencyConfig | None,
+) -> ResultConsistencyConfig | None:
+    """Inject default data-integrity audit only when collection has a reference source.
+
+    Thresholds/rules may come from global validation and collection overrides,
+    but the audit itself is only meaningful when a collection-level
+    `reference_source` exists.
+    """
+    if not collection.reference_source:
+        return resolved_result_consistency
+
+    base_policy = (
+        resolved_result_consistency
+        if resolved_result_consistency is not None
+        else ResultConsistencyConfig()
+    )
+    if getattr(base_policy, "data_integrity_audit", None) is not None:
+        return resolved_result_consistency
+
+    with_default_audit = ResultConsistencyConfig(
+        min_metric=base_policy.min_metric,
+        min_trades=base_policy.min_trades,
+        outlier_dependency=base_policy.outlier_dependency,
+        execution_price_variance=base_policy.execution_price_variance,
+        lookahead_shuffle_test=base_policy.lookahead_shuffle_test,
+        transaction_cost_robustness=base_policy.transaction_cost_robustness,
+        data_integrity_audit=_default_data_integrity_audit_config(),
+    )
+    return _merge_result_consistency_config(with_default_audit, None)
+
+
 def resolve_validation_overrides(cfg: Config) -> None:
     """Resolve effective collection-level validation policies.
 
@@ -2070,19 +2246,15 @@
         min_value=RESULT_CONSISTENCY_MIN_TRADES_MIN,
     )
 
-    outlier_dependency_raw = parsed_raw.get("outlier_dependency")
-    if outlier_dependency_raw is not None and not isinstance(outlier_dependency_raw, dict):
-        raise ValueError(f"Invalid `{prefix}.outlier_dependency`: expected a mapping")
-    execution_price_variance_raw = parsed_raw.get("execution_price_variance")
-    if execution_price_variance_raw is not None and not isinstance(execution_price_variance_raw, dict):
-        raise ValueError(f"Invalid `{prefix}.execution_price_variance`: expected a mapping")
+    outlier_dependency_raw = _optional_mapping_field(parsed_raw, prefix, "outlier_dependency")
+    execution_price_variance_raw = _optional_mapping_field(parsed_raw, prefix, "execution_price_variance")
 
     outlier_dependency = (
         _parse_result_consistency_outlier_dependency(
             outlier_dependency_raw,
             f"{prefix}.outlier_dependency",
         )
-        if isinstance(outlier_dependency_raw, dict)
+        if outlier_dependency_raw is not None
         else None
     )
     execution_price_variance = (
@@ -2090,34 +2262,36 @@
... diff truncated: showing 800 of 1227 lines

You can send follow-ups to the cloud agent here.

Reviewed by Cursor Bugbot for commit bd4939b. Configure here.

Comment thread src/backtest/runner.py
@AlexanderPietsch AlexanderPietsch marked this pull request as ready for review April 20, 2026 02:50
Copilot AI review requested due to automatic review settings April 20, 2026 02:50
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds an optional data-integrity audit gate that compares canonicalized OHLCV bars between a collection’s primary source and a new per-collection reference_source, rejecting strategy results when overlap/drift checks fail or are indeterminate.

Changes:

  • Extends config/schema to support collections[].reference_source and validation.result_consistency.data_integrity_audit with default thresholds.
  • Implements a new runner validation gate with job-level caching and attaches diagnostics under post_run_meta.data_integrity_audit.
  • Updates example/docs and adds tests for default injection, pass/fail cases, and caching behavior.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/test_config.py Adds config-loading tests for reference_source-driven default audit injection and validation errors.
tests/test_backtest_runner.py Adds runner tests for audit pass/fail paths and cache reuse; updates profile serialization assertions.
src/config.py Introduces reference_source + data-integrity audit config, parsing/normalization/defaulting, and merge logic.
src/backtest/runner.py Implements the audit gate, reference fetch + canonicalization, drift/overlap metrics, caching, and metadata wiring.
config/example.yaml Documents new config fields and shows collection examples using reference_source.
README.md Documents the new validation module, thresholds, behavior, and diagnostics location.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/backtest/runner.py
Comment thread src/backtest/runner.py Outdated
Comment thread src/backtest/runner.py
AlexanderPietsch and others added 2 commits April 20, 2026 11:03
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: AlexanderPietsch <alexander.pietsch@vollcom-digital.de>
Copilot AI review requested due to automatic review settings April 20, 2026 03:04
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds an optional, collection-scoped data-integrity audit that compares canonicalized OHLC bars from a collection’s primary source against a new collections[].reference_source, attaching diagnostics and rejecting results when overlap/drift checks fail.

Changes:

  • Extend config/schema + merge/normalization to support collections[].reference_source and validation.result_consistency.data_integrity_audit (with defaults).
  • Add runner-side validation gate that fetches reference bars, computes overlap + OHLC drift, caches audit results, and attaches post_run_meta.data_integrity_audit.
  • Update docs/examples and add tests covering defaults, pass/fail outcomes, overlap edge cases, and cache reuse across strategies.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/config.py Adds new audit config dataclass, parsing/normalization/default injection, and collection-scoped activation via reference_source.
src/backtest/runner.py Implements audit gate execution, reference fetch/canonicalization, overlap/drift metrics, caching, and metadata attachment.
tests/test_config.py Adds config parsing/merge tests for audit defaults + validation errors.
tests/test_backtest_runner.py Adds integration-style tests for audit acceptance/rejection and cache reuse across strategies.
config/example.yaml Documents new config knobs and shows reference_source usage.
README.md Documents audit purpose, thresholds, behavior, and diagnostics location.
Comments suppressed due to low confidence (1)

src/backtest/runner.py:1

  • The comment says we fail fast before expensive shuffle checks, but the data-integrity audit is executed after _run_lookahead_shuffle_validation (and shuffle is typically the expensive step). Either update the comment to reflect the actual ordering rationale, or reorder gates so the cheapest reject conditions run before shuffle when that’s intended.
from __future__ import annotations

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/backtest/runner.py Outdated
Comment thread src/backtest/runner.py
Comment thread src/backtest/runner.py Outdated
AlexanderPietsch and others added 2 commits April 20, 2026 11:12
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: AlexanderPietsch <alexander.pietsch@vollcom-digital.de>
Copilot AI review requested due to automatic review settings April 20, 2026 03:22
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds an optional, per-collection “data integrity audit” validation gate that compares primary OHLCV bars against a secondary reference_source, attaches diagnostics to post_run_meta, and can reject results when overlap/drift thresholds are breached or indeterminate.

Changes:

  • Introduces collections[].reference_source and new validation.result_consistency.data_integrity_audit config module with defaults, parsing, normalization, and merging.
  • Implements audit execution + job-level caching in BacktestRunner, plus metadata serialization and active-gate reporting.
  • Adds/updates tests for config behavior, runner accept/reject paths, and cache reuse; updates README + example config.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/config.py Adds new config dataclass, defaults, parsing/normalization/merge logic, and collection-scoped activation based on reference_source.
src/backtest/runner.py Implements the audit gate, caching, metadata attachment, and gate activation/serialization.
tests/test_config.py Adds config parsing/default-injection and override-inheritance test cases for the audit module.
tests/test_backtest_runner.py Adds runner behavior tests (pass/reject/overlap) and cache reuse coverage; adds helper patching for primary/reference sources.
config/example.yaml Documents how to configure thresholds and set reference_source in collections.
README.md Documents the new audit gate, thresholds, activation condition, and where diagnostics are stored.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/backtest/runner.py
Comment thread src/backtest/runner.py
Comment thread tests/test_backtest_runner.py
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: AlexanderPietsch <alexander.pietsch@vollcom-digital.de>
Copilot AI review requested due to automatic review settings April 20, 2026 03:38
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds an optional cross-provider OHLCV “data integrity audit” validation gate that activates per-collection when reference_source is configured, to reject strategy results when overlap/drift thresholds are breached (or comparison is indeterminate).

Changes:

  • Extend config schema to support collections[].reference_source and validation.result_consistency.data_integrity_audit with defaults/merging/validation.
  • Add runner-side audit execution, caching, metadata attachment (post_run_meta.data_integrity_audit), and gate surfacing in validation metadata.
  • Add unit tests plus updates to example config and README documentation.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
tests/test_config.py Adds config-loading tests for reference-source activation, mapping validation, and global/collection override merging.
tests/test_backtest_runner.py Adds runner tests for audit pass/fail, cache reuse, and reference collection construction/threshold defaults.
src/config.py Introduces reference_source and the data_integrity_audit config module with parsing/normalization/defaults/merge + activation injection.
src/backtest/runner.py Implements the audit gate, job-level cache, reference fetching/canonicalization, overlap+drift checks, and metadata attachment.
config/example.yaml Documents new reference_source and audit thresholds in the example config.
README.md Documents the audit module/gating behavior and includes it in gate action descriptions.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/backtest/runner.py
Comment thread src/backtest/runner.py Outdated
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: AlexanderPietsch <alexander.pietsch@vollcom-digital.de>
Copilot AI review requested due to automatic review settings April 20, 2026 03:53
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds an optional data-integrity audit gate that compares OHLCV bars between a collection’s primary source and a configured reference_source, rejecting results when overlap/drift thresholds are violated and attaching diagnostics to post_run_meta.

Changes:

  • Extends config/schema to support collections[].reference_source and validation.result_consistency.data_integrity_audit with defaults/normalization/merging.
  • Implements a new result_consistency.data_integrity_audit validation gate in BacktestRunner, including job-level caching and metadata serialization.
  • Updates example config + README and adds tests for config behavior and pass/fail audit outcomes + cache reuse.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
tests/test_config.py Adds config-loading tests for reference_source enabling audit defaults and override inheritance.
tests/test_backtest_runner.py Adds runner tests for pass/fail audit outcomes, cache reuse across strategies, and helper patching for dual sources.
src/config.py Introduces reference_source + audit config dataclass, parsing/normalization/defaults, and auto-injection when reference_source is set.
src/backtest/runner.py Implements audit gate execution, caching, reference fetching/canonicalization, drift/overlap computation, and post-run meta attachment.
config/example.yaml Documents new audit thresholds and shows reference_source usage in collections.
README.md Documents the new audit module, parameters, and emitted diagnostics/gate behavior.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread README.md Outdated
Comment thread src/backtest/runner.py
Comment thread src/backtest/runner.py Outdated
Comment thread src/backtest/runner.py
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Signed-off-by: AlexanderPietsch <alexander.pietsch@vollcom-digital.de>
Copilot AI review requested due to automatic review settings April 20, 2026 04:05
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds a new post-run “data integrity audit” validation gate that compares primary vs reference OHLCV to reject inconsistent results, configurable via validation.result_consistency.data_integrity_audit and activated by collections[].reference_source.

Changes:

  • Introduces ResultConsistencyDataIntegrityAuditConfig with defaults, parsing/normalization, and collection-scoped activation via reference_source.
  • Adds runner-side audit execution, caching, metadata serialization, and gate activation reporting.
  • Extends tests/docs/examples to cover config loading, serialization, runtime pass/fail paths, and configuration guidance.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/config.py Adds reference_source, audit config schema, parsing/normalization, and activation logic during validation resolution.
src/backtest/runner.py Implements the audit gate execution, caching, metadata emission, and gate activation reporting.
tests/test_config.py Adds config-loading tests for reference-source activation and override inheritance.
tests/test_backtest_runner.py Adds serialization + runtime gate tests (pass/fail, cache reuse) and helper patching utilities.
config/example.yaml Documents how to configure thresholds and enable the audit via reference_source.
README.md Documents the new gate, its thresholds, activation semantics, and output diagnostics.
Comments suppressed due to low confidence (1)

src/backtest/runner.py:1

  • The data integrity audit runs after the lookahead shuffle test, but it’s likely much cheaper than a permutation-based shuffle validation and could reject early (saving expensive work). Consider running _run_data_integrity_audit_validation(...) before _run_lookahead_shuffle_validation(...) (or updating the surrounding comment if the intended cost ordering is different).
from __future__ import annotations

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/backtest/runner.py Outdated
Comment thread src/backtest/runner.py
Comment thread src/config.py Outdated
Comment thread config/example.yaml Outdated
Copilot AI review requested due to automatic review settings April 20, 2026 04:21
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Adds a collection-scoped, post-run “data integrity audit” validation gate that compares primary vs reference OHLC bars and rejects results when overlap/drift checks fail or the comparison is indeterminate.

Changes:

  • Extends config schema with collections[].reference_source and validation.result_consistency.data_integrity_audit (defaults + merge/normalize).
  • Implements data-integrity audit execution, caching, diagnostics (post_run_meta), and gate metadata reporting.
  • Adds unit tests for config loading/merging and runner audit pass/fail/indeterminate + cache reuse.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/config.py Adds audit config dataclass, parsing/normalization/defaulting, and collection-scoped activation when reference_source is set.
src/backtest/runner.py Implements the audit gate, reference fetching/canonicalization, drift/overlap checks, caching, serialization, and metadata.
tests/test_config.py Adds tests ensuring reference_source activates audit defaults and validates schema errors/override merging.
tests/test_backtest_runner.py Adds tests for audit pass/fail/indeterminate behavior, cache reuse, and helper utilities.
config/example.yaml Documents new configuration knobs and shows reference_source usage in examples.
README.md Documents the new data_integrity_audit module, defaults, behavior, and diagnostics output.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/backtest/runner.py
Comment thread tests/test_backtest_runner.py
Comment thread src/backtest/runner.py
Comment thread src/backtest/runner.py Outdated
Comment thread config/example.yaml
@sonarqubecloud
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants