Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions predicate/agent_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,7 @@ async def eventually(
min_confidence: float | None = None,
max_snapshot_attempts: int = 3,
snapshot_kwargs: dict[str, Any] | None = None,
snapshot_limit_growth: dict[str, Any] | None = None,
vision_provider: Any | None = None,
vision_system_prompt: str | None = None,
vision_user_prompt: str | None = None,
Expand All @@ -1325,9 +1326,103 @@ async def eventually(
snapshot_attempt = 0
last_outcome = None

# Optional: increase SnapshotOptions.limit across retries to widen element coverage.
#
# This is useful on long / virtualized pages where an initial small limit may miss
# a target element, but taking a "bigger" snapshot is enough to make a deterministic
# predicate pass.
#
# Additive schedule (requested):
# limit(attempt) = min(max_limit, start_limit + step*(attempt-1))
#
# Notes:
# - We clamp to SnapshotOptions Field constraints (1..500).
# - If both snapshot_kwargs["limit"] and snapshot_limit_growth are provided,
# snapshot_limit_growth controls the per-attempt limit (callers can set
# start_limit explicitly if desired).
growth = snapshot_limit_growth or None
growth_apply_on = "only_on_fail"
growth_start: int | None = None
growth_step: int | None = None
growth_max: int | None = None
if isinstance(growth, dict) and growth:
try:
growth_apply_on = str(growth.get("apply_on") or "only_on_fail")
except Exception:
growth_apply_on = "only_on_fail"
try:
v = growth.get("start_limit", None)
growth_start = int(v) if v is not None else None
except Exception:
growth_start = None
try:
v = growth.get("step", None)
growth_step = int(v) if v is not None else None
except Exception:
growth_step = None
try:
v = growth.get("max_limit", None)
growth_max = int(v) if v is not None else None
except Exception:
growth_max = None

# Resolve defaults from runtime + snapshot_kwargs.
if growth and growth_start is None:
try:
if snapshot_kwargs and snapshot_kwargs.get("limit") is not None:
growth_start = int(snapshot_kwargs["limit"])
except Exception:
growth_start = None
if growth and growth_start is None:
try:
growth_start = int(getattr(self.runtime, "_snapshot_options", None).limit) # type: ignore[attr-defined]
except Exception:
growth_start = None
if growth and growth_start is None:
growth_start = 50 # SnapshotOptions default

if growth and growth_step is None:
growth_step = max(1, int(growth_start))
if growth and growth_max is None:
growth_max = 500

def _clamp_limit(n: int) -> int:
if n < 1:
return 1
if n > 500:
return 500
return n

def _limit_for_attempt(attempt_idx_1based: int) -> int:
assert growth_start is not None and growth_step is not None and growth_max is not None
base = int(growth_start) + int(growth_step) * max(0, int(attempt_idx_1based) - 1)
return _clamp_limit(min(int(growth_max), base))

while True:
attempt += 1
await self.runtime.snapshot(**(snapshot_kwargs or {}))

per_attempt_kwargs = dict(snapshot_kwargs or {})
snapshot_limit: int | None = None
if growth:
# Only grow if requested; otherwise fixed start_limit.
apply = growth_apply_on == "all"
if growth_apply_on == "only_on_fail":
# attempt==1 always uses the start_limit; attempt>1 grows (since we'd have
# returned already if the previous attempt passed).
apply = attempt == 1 or (last_outcome is not None and not bool(last_outcome.passed))
if apply:
snapshot_limit = _limit_for_attempt(attempt)
else:
snapshot_limit = _clamp_limit(int(growth_start or 50))
per_attempt_kwargs["limit"] = snapshot_limit
else:
try:
if per_attempt_kwargs.get("limit") is not None:
snapshot_limit = int(per_attempt_kwargs["limit"])
except Exception:
snapshot_limit = None

await self.runtime.snapshot(**per_attempt_kwargs)
snapshot_attempt += 1

# Optional: gate predicate evaluation on snapshot confidence.
Expand Down Expand Up @@ -1372,6 +1467,7 @@ async def eventually(
"eventually": True,
"attempt": attempt,
"snapshot_attempt": snapshot_attempt,
"snapshot_limit": snapshot_limit,
},
)

Expand Down Expand Up @@ -1481,6 +1577,7 @@ async def eventually(
"eventually": True,
"attempt": attempt,
"snapshot_attempt": snapshot_attempt,
"snapshot_limit": snapshot_limit,
"final": True,
"timeout": True,
},
Expand All @@ -1503,7 +1600,12 @@ async def eventually(
required=self.required,
kind="assert",
record_in_step=False,
extra={"eventually": True, "attempt": attempt},
extra={
"eventually": True,
"attempt": attempt,
"snapshot_attempt": snapshot_attempt,
"snapshot_limit": snapshot_limit,
},
)

if last_outcome.passed:
Expand Down
68 changes: 67 additions & 1 deletion predicate/asserts/expect.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class EventuallyConfig:
timeout: float = DEFAULT_TIMEOUT # Max time to wait (seconds)
poll: float = DEFAULT_POLL # Interval between retries (seconds)
max_retries: int = DEFAULT_MAX_RETRIES # Max number of retry attempts
# Optional: increase SnapshotOptions.limit across retries (additive schedule).
# See docs/expand_deterministic_verifications_sdk.md for details.
snapshot_limit_growth: dict[str, Any] | None = None


class ExpectBuilder:
Expand Down Expand Up @@ -514,6 +517,51 @@ async def evaluate(self, ctx: AssertContext, snapshot_fn) -> AssertOutcome:
last_outcome: AssertOutcome | None = None
attempts = 0

growth = self._config.snapshot_limit_growth
growth_apply_on = "only_on_fail"
growth_start: int | None = None
growth_step: int | None = None
growth_max: int | None = None
if isinstance(growth, dict) and growth:
try:
growth_apply_on = str(growth.get("apply_on") or "only_on_fail")
except Exception:
growth_apply_on = "only_on_fail"
try:
v = growth.get("start_limit", None)
growth_start = int(v) if v is not None else None
except Exception:
growth_start = None
try:
v = growth.get("step", None)
growth_step = int(v) if v is not None else None
except Exception:
growth_step = None
try:
v = growth.get("max_limit", None)
growth_max = int(v) if v is not None else None
except Exception:
growth_max = None

if growth and growth_start is None:
growth_start = 50
if growth and growth_step is None:
growth_step = max(1, int(growth_start or 50))
if growth and growth_max is None:
growth_max = 500

def _clamp_limit(n: int) -> int:
if n < 1:
return 1
if n > 500:
return 500
return n

def _limit_for_attempt(attempt_idx_1based: int) -> int:
assert growth_start is not None and growth_step is not None and growth_max is not None
base = int(growth_start) + int(growth_step) * max(0, int(attempt_idx_1based) - 1)
return _clamp_limit(min(int(growth_max), base))

while True:
# Check timeout (higher precedence than max_retries)
elapsed = time.monotonic() - start_time
Expand Down Expand Up @@ -543,7 +591,23 @@ async def evaluate(self, ctx: AssertContext, snapshot_fn) -> AssertOutcome:
# Take fresh snapshot if not first attempt
if attempts > 0:
try:
fresh_snapshot = await snapshot_fn()
# If snapshot_fn supports kwargs (e.g. runtime.snapshot), pass adaptive limit.
snap_limit = None
if growth:
# attempts is 1-based for the snapshot attempt here (attempts>0 means >=2nd try)
attempt_idx = attempts + 1
apply = growth_apply_on == "all" or (
growth_apply_on == "only_on_fail" and last_outcome is not None
)
if apply:
snap_limit = _limit_for_attempt(attempt_idx)
if snap_limit is not None:
try:
fresh_snapshot = await snapshot_fn(limit=int(snap_limit))
except TypeError:
fresh_snapshot = await snapshot_fn()
else:
fresh_snapshot = await snapshot_fn()
ctx = AssertContext(
snapshot=fresh_snapshot,
url=fresh_snapshot.url if fresh_snapshot else ctx.url,
Expand Down Expand Up @@ -588,6 +652,7 @@ def with_eventually(
timeout: float = DEFAULT_TIMEOUT,
poll: float = DEFAULT_POLL,
max_retries: int = DEFAULT_MAX_RETRIES,
snapshot_limit_growth: dict[str, Any] | None = None,
) -> EventuallyWrapper:
"""
Wrap a predicate with retry logic.
Expand Down Expand Up @@ -617,5 +682,6 @@ def with_eventually(
timeout=timeout,
poll=poll,
max_retries=max_retries,
snapshot_limit_growth=snapshot_limit_growth,
)
return EventuallyWrapper(predicate, config)
Loading