From 82688d1e43e545ea8b61b4ce4313d8241d36e294 Mon Sep 17 00:00:00 2001 From: Shay Palachy Date: Wed, 10 Jun 2026 15:38:26 +0300 Subject: [PATCH] feat(scrape): month-frequency-weighted, source-balanced batch selection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds --balanced-batch N to the scrape_candidates job for working through the backlog in fixed-size batches that respect a limited scraping budget: - Frequency-weighted across months: months with more prefilter-passing candidates get proportionally more of the batch (largest-remainder / Hamilton apportionment), so busy months (Jan–Mar) are prioritised over quiet ones without starving them. - Source-balanced within each month: a month's allocation is spread round-robin across distinct publication sources/source families, visited in descending scrape-priority order, so one prolific site cannot monopolise a month's slots (also spreads load to reduce per-site rate-limit/block risk). Publication source is derived from the candidate domain (collapsing subdomains onto a family, e.g. sport1.maariv.co.il -> maariv), since source_hints records the discovery engine (exa/brave), not the publisher. New module denbust/discovery/balanced_selection.py: - candidate_source_key / candidate_month - largest_remainder_allocation - select_within_month (round-robin) - plan_balanced_scrape_batch (months -> apportion -> within-month -> top-up) Wiring: - Config.scrape_balanced_batch_size: int | None - CLI: denbust run ... --balanced-batch N - _run_candidate_scrape_job: balanced branch draws the batch from the full prefilter-passing pool instead of the priority-ordered limit head Usage: denbust run --dataset news_items --job scrape_candidates \ --config agents/news/local_search_brave_exa.yaml \ --balanced-batch 60 Co-Authored-By: Claude Sonnet 4.6 --- src/denbust/cli.py | 9 + src/denbust/config.py | 1 + src/denbust/discovery/balanced_selection.py | 194 ++++++++++++++++++++ src/denbust/pipeline.py | 47 ++++- tests/unit/test_balanced_selection.py | 168 +++++++++++++++++ tests/unit/test_cli.py | 1 + tests/unit/test_pipeline_core.py | 2 + 7 files changed, 415 insertions(+), 7 deletions(-) create mode 100644 src/denbust/discovery/balanced_selection.py create mode 100644 tests/unit/test_balanced_selection.py diff --git a/src/denbust/cli.py b/src/denbust/cli.py index de01b7c..2c23370 100644 --- a/src/denbust/cli.py +++ b/src/denbust/cli.py @@ -73,6 +73,14 @@ def run( " (scrape_candidates job only).", ), ] = None, + balanced_batch: Annotated[ + int | None, + typer.Option( + "--balanced-batch", + help="Scrape a month-frequency-weighted, source-balanced batch of this size" + " from the full prefilter-passing pool (scrape_candidates job only).", + ), + ] = None, ) -> None: """Run a dataset/job pair through the registry.""" from denbust.pipeline import run_job @@ -84,6 +92,7 @@ def run( job_name=job, days_override=days, scrape_pub_date_from=pub_date_from, + scrape_balanced_batch_size=balanced_batch, ) diff --git a/src/denbust/config.py b/src/denbust/config.py index ce283da..f528860 100644 --- a/src/denbust/config.py +++ b/src/denbust/config.py @@ -437,6 +437,7 @@ class Config(BaseModel): days: int = Field(default=3, ge=1) max_articles: int = Field(default=30, ge=1) scrape_pub_date_from: datetime | None = None + scrape_balanced_batch_size: int | None = Field(default=None, ge=1) keywords: list[str] = Field(default_factory=lambda: DEFAULT_KEYWORDS.copy()) sources: list[SourceConfig] = Field(default_factory=list) classifier: ClassifierConfig = Field(default_factory=ClassifierConfig) diff --git a/src/denbust/discovery/balanced_selection.py b/src/denbust/discovery/balanced_selection.py new file mode 100644 index 0000000..0b15229 --- /dev/null +++ b/src/denbust/discovery/balanced_selection.py @@ -0,0 +1,194 @@ +"""Frequency-weighted, source-balanced scrape-batch planning. + +Given a pool of scrape-eligible candidates this module selects a fixed-size +batch that is: + +1. **Frequency-weighted across months** — months with more eligible candidates + receive proportionally more of the batch (largest-remainder apportionment), + so a quiet month is not over-represented relative to a busy one. +2. **Source-balanced within each month** — a month's allocation is spread + round-robin across the distinct publication sources/source families present, + ordered by source scrape priority, so one prolific site cannot monopolise a + month's slots. + +The publication source is derived from the candidate *domain* (not +``source_hints``, which for search-discovered candidates records the discovery +engine — ``exa`` / ``brave`` — rather than the publisher). +""" + +from __future__ import annotations + +from collections import defaultdict +from datetime import datetime + +from denbust.discovery.candidate_filters import normalize_domain +from denbust.discovery.models import PersistentCandidate +from denbust.discovery.scrape_queue import ( + _SOURCE_SCRAPE_PRIORITY, + _backfill_publication_datetime, +) + +# Discovery-engine markers that may appear in ``source_hints``; never a publisher. +_ENGINE_SOURCE_HINTS: frozenset[str] = frozenset({"brave", "exa", "google_cse", "test"}) + +# Registrable domain → canonical source-family key. Subdomains collapse onto +# the family (e.g. ``sport1.maariv.co.il`` and ``maariv.co.il`` both map to +# ``maariv``) so the within-month balancer treats them as one source. +_DOMAIN_SOURCE_FAMILIES: dict[str, str] = { + "ynet.co.il": "ynet", + "mako.co.il": "mako", + "ice.co.il": "ice", + "haaretz.co.il": "haaretz", + "maariv.co.il": "maariv", + "walla.co.il": "walla", + "globes.co.il": "globes", + "themarker.com": "themarker", + "israelhayom.co.il": "israelhayom", + "kan.org.il": "kan", + "news1.co.il": "news1", + "calcalist.co.il": "calcalist", +} + + +def candidate_source_key(candidate: PersistentCandidate) -> str: + """Return the canonical publication-source key for *candidate*. + + Resolution order: domain → known family; else the normalized domain; else a + non-engine ``source_hint``; else ``"unknown"``. + """ + domain = normalize_domain(candidate.domain) if candidate.domain else None + if domain: + for base, family in _DOMAIN_SOURCE_FAMILIES.items(): + if domain == base or domain.endswith(f".{base}"): + return family + return domain + for hint in candidate.source_hints: + if hint and hint not in _ENGINE_SOURCE_HINTS: + return hint + return "unknown" + + +def candidate_month(candidate: PersistentCandidate) -> str | None: + """Return the candidate's publication month as ``YYYY-MM`` or ``None``.""" + published = _backfill_publication_datetime(candidate) + return published.strftime("%Y-%m") if published is not None else None + + +def _source_key_priority(source_key: str) -> int: + """Scrape priority for a resolved source-family key (0 when unknown).""" + return _SOURCE_SCRAPE_PRIORITY.get(source_key, 0) + + +def largest_remainder_allocation(weights: dict[str, int], total: int) -> dict[str, int]: + """Apportion *total* across keys proportionally to *weights*. + + Uses the largest-remainder (Hamilton) method so the allocations sum exactly + to ``min(total, sum(weights))`` worth of demand while staying as close as + possible to the proportional ideal. + """ + weight_sum = sum(weights.values()) + if weight_sum <= 0 or total <= 0: + return dict.fromkeys(weights, 0) + exact = {key: total * value / weight_sum for key, value in weights.items()} + allocation = {key: int(value) for key, value in exact.items()} + remaining = total - sum(allocation.values()) + if remaining > 0: + order = sorted( + weights, + key=lambda key: (exact[key] - allocation[key], weights[key], key), + reverse=True, + ) + for key in order[:remaining]: + allocation[key] += 1 + return allocation + + +def _order_within_source(candidates: list[PersistentCandidate]) -> list[PersistentCandidate]: + """Order one source's candidates by priority, then recency, then id.""" + return sorted( + candidates, + key=lambda c: ( + -_source_key_priority(candidate_source_key(c)), + -c.last_seen_at.timestamp(), + c.candidate_id, + ), + ) + + +def select_within_month( + candidates: list[PersistentCandidate], + count: int, +) -> list[PersistentCandidate]: + """Pick *count* candidates from one month, round-robin across sources. + + Sources are visited in descending scrape-priority order each round, so + higher-signal publishers get first pick while every source is represented + before any source is drained. + """ + if count <= 0 or not candidates: + return [] + by_source: dict[str, list[PersistentCandidate]] = defaultdict(list) + for candidate in candidates: + by_source[candidate_source_key(candidate)].append(candidate) + queues = {key: _order_within_source(group) for key, group in by_source.items()} + source_order = sorted( + queues, + key=lambda key: (-_source_key_priority(key), -len(queues[key]), key), + ) + selected: list[PersistentCandidate] = [] + while len(selected) < count and any(queues[key] for key in source_order): + for key in source_order: + if queues[key]: + selected.append(queues[key].pop(0)) + if len(selected) >= count: + break + return selected + + +def plan_balanced_scrape_batch( + candidates: list[PersistentCandidate], + *, + batch_size: int, + now: datetime | None = None, +) -> list[PersistentCandidate]: + """Select a month-frequency-weighted, source-balanced batch. + + *candidates* should already be the scrape-eligible (and prefilter-passing) + pool. Candidates with no resolvable publication month are excluded from + balanced selection. Returns at most *batch_size* candidates. + """ + del now # reserved for future recency weighting; selection is order-stable + if batch_size <= 0: + return [] + by_month: dict[str, list[PersistentCandidate]] = defaultdict(list) + for candidate in candidates: + month = candidate_month(candidate) + if month is not None: + by_month[month].append(candidate) + if not by_month: + return [] + + available = sum(len(group) for group in by_month.values()) + target = min(batch_size, available) + weights = {month: len(group) for month, group in by_month.items()} + allocation = largest_remainder_allocation(weights, target) + + selected: list[PersistentCandidate] = [] + selected_ids: set[str] = set() + for month, group in by_month.items(): + picked = select_within_month(group, allocation.get(month, 0)) + selected.extend(picked) + selected_ids.update(c.candidate_id for c in picked) + + # Top-up: a month allocated more than it could supply leaves the batch + # short; backfill from the remaining pool by global scrape priority. + if len(selected) < target: + leftovers = [c for c in candidates if c.candidate_id not in selected_ids] + leftovers = [c for c in leftovers if candidate_month(c) is not None] + for candidate in _order_within_source(leftovers): + if len(selected) >= target: + break + selected.append(candidate) + selected_ids.add(candidate.candidate_id) + + return selected[:batch_size] diff --git a/src/denbust/pipeline.py b/src/denbust/pipeline.py index 16c49ba..8e53875 100644 --- a/src/denbust/pipeline.py +++ b/src/denbust/pipeline.py @@ -42,6 +42,7 @@ plan_backfill_windows, resolve_backfill_request_window, ) +from denbust.discovery.balanced_selection import candidate_month, plan_balanced_scrape_batch from denbust.discovery.base import DiscoveryContext, SourceDiscoveryContext from denbust.discovery.engine_checkpoint import ( cache_path as _engine_cache_path, @@ -67,6 +68,7 @@ from denbust.discovery.scrape_queue import ( SCRAPEABLE_CANDIDATE_STATUSES, CandidateScrapeBatch, + order_scrape_eligible_candidates, scrape_candidates, select_backfill_candidates_for_scrape, select_candidates_for_scrape, @@ -1418,25 +1420,47 @@ async def _run_candidate_scrape_job( limit: int, orchestrator: CascadeOrchestrator | None = None, pub_date_from: datetime | None = None, + balanced_batch_size: int | None = None, ) -> tuple[CandidateScrapeBatch, list[PrefilterDecision]]: """Select queued candidates, apply the thin prefilter, and run the scrape-attempt layer. Returns ``(scrape_batch, thin_decisions)``. When *orchestrator* is ``None`` the thin pass is skipped and ``thin_decisions`` is empty. When *pub_date_from* is set only candidates published on or after that date are - considered (targeted / recent-only scrape). + considered (targeted / recent-only scrape). When *balanced_batch_size* is + set the selection is a month-frequency-weighted, source-balanced batch of + that size drawn from the full prefilter-passing pool (instead of the + priority-ordered ``limit`` head). """ persistence = create_discovery_persistence(config) try: - selected_candidates = select_candidates_for_scrape( - persistence, - limit=limit, - pub_date_from=pub_date_from, - ) + if balanced_batch_size is not None: + pool = persistence.list_candidates(statuses=SCRAPEABLE_CANDIDATE_STATUSES) + eligible = order_scrape_eligible_candidates(pool) + if pub_date_from is not None: + cutoff_month = pub_date_from.strftime("%Y-%m") + eligible = [ + candidate + for candidate in eligible + if (month := candidate_month(candidate)) is not None and month >= cutoff_month + ] + passed_pool, thin_decisions = _thin_pass_prefilter(eligible, orchestrator) + passed_candidates = plan_balanced_scrape_batch( + passed_pool, + batch_size=balanced_batch_size, + ) + else: + selected_candidates = select_candidates_for_scrape( + persistence, + limit=limit, + pub_date_from=pub_date_from, + ) + passed_candidates, thin_decisions = _thin_pass_prefilter( + selected_candidates, orchestrator + ) finally: persistence.close() - passed_candidates, thin_decisions = _thin_pass_prefilter(selected_candidates, orchestrator) batch = await _scrape_candidate_batch( config=config, candidates=passed_candidates, @@ -2499,6 +2523,7 @@ async def run_news_scrape_candidates_job( limit=config.max_articles, orchestrator=orchestrator, pub_date_from=config.scrape_pub_date_from, + balanced_batch_size=config.scrape_balanced_batch_size, ) result.raw_article_count = len(scrape_batch.raw_articles) if scrape_batch.errors: @@ -3300,6 +3325,7 @@ def _run_job_from_config( days_override: int | None = None, operational_store: OperationalStore | None = None, scrape_pub_date_from: str | None = None, + scrape_balanced_batch_size: int | None = None, ) -> RunSnapshot: """Shared sync wrapper for CLI-triggered job runs.""" setup_logging() @@ -3318,6 +3344,11 @@ def _run_job_from_config( except ValueError as exc: print(f"Error: --pub-date-from must be an ISO date (YYYY-MM-DD): {exc}") sys.exit(1) + if scrape_balanced_batch_size is not None: + if scrape_balanced_batch_size < 1: + print("Error: --balanced-batch must be a positive integer") + sys.exit(1) + update["scrape_balanced_batch_size"] = scrape_balanced_batch_size if update: config = config.model_copy(update=update) @@ -3394,6 +3425,7 @@ def run_job( days_override: int | None = None, operational_store: OperationalStore | None = None, scrape_pub_date_from: str | None = None, + scrape_balanced_batch_size: int | None = None, ) -> None: """Run a dataset/job pair through the generic registry.""" _run_job_from_config( @@ -3403,6 +3435,7 @@ def run_job( days_override=days_override, operational_store=operational_store, scrape_pub_date_from=scrape_pub_date_from, + scrape_balanced_batch_size=scrape_balanced_batch_size, ) diff --git a/tests/unit/test_balanced_selection.py b/tests/unit/test_balanced_selection.py new file mode 100644 index 0000000..53d8778 --- /dev/null +++ b/tests/unit/test_balanced_selection.py @@ -0,0 +1,168 @@ +"""Unit tests for frequency-weighted, source-balanced scrape batch planning.""" + +from __future__ import annotations + +import collections +from datetime import UTC, datetime + +from pydantic import HttpUrl + +from denbust.discovery.balanced_selection import ( + candidate_month, + candidate_source_key, + largest_remainder_allocation, + plan_balanced_scrape_batch, + select_within_month, +) +from denbust.discovery.models import CandidateStatus, PersistentCandidate + + +def make_candidate( + candidate_id: str, + *, + domain: str, + month: str, + last_seen: datetime | None = None, + source_hints: list[str] | None = None, +) -> PersistentCandidate: + """Build a candidate with a publication month and domain.""" + pub = f"{month}-15T08:00:00+00:00" + return PersistentCandidate( + candidate_id=candidate_id, + canonical_url=HttpUrl(f"https://{domain}/article/{candidate_id}"), + current_url=HttpUrl(f"https://{domain}/article/{candidate_id}"), + domain=domain, + titles=["t"], + snippets=["s"], + discovered_via=["brave"], + discovery_queries=["q"], + source_hints=source_hints if source_hints is not None else ["brave"], + first_seen_at=datetime(2026, 1, 1, tzinfo=UTC), + last_seen_at=last_seen or datetime(2026, 1, 1, tzinfo=UTC), + candidate_status=CandidateStatus.NEW, + metadata={"latest_publication_datetime_hint": pub}, + ) + + +def test_candidate_source_key_collapses_subdomains() -> None: + """Subdomains and engine hints resolve to the publisher family.""" + assert candidate_source_key(make_candidate("a", domain="www.ynet.co.il", month="2026-01")) == ( + "ynet" + ) + assert ( + candidate_source_key(make_candidate("b", domain="sport1.maariv.co.il", month="2026-01")) + == "maariv" + ) + assert ( + candidate_source_key(make_candidate("c", domain="canary.haaretz.co.il", month="2026-01")) + == "haaretz" + ) + # Unknown domain falls back to the normalized domain, not the engine hint. + assert ( + candidate_source_key( + make_candidate("d", domain="example.org", month="2026-01", source_hints=["brave"]) + ) + == "example.org" + ) + + +def test_candidate_month_reads_publication_hint() -> None: + """Publication month derives from the latest_publication_datetime_hint.""" + assert candidate_month(make_candidate("a", domain="ynet.co.il", month="2026-03")) == "2026-03" + + +def test_largest_remainder_allocation_sums_to_total() -> None: + """Allocation is proportional and sums exactly to the requested total.""" + weights = {"2026-01": 621, "2026-02": 505, "2026-03": 425, "2026-04": 86, "2026-05": 171} + alloc = largest_remainder_allocation(weights, 60) + assert sum(alloc.values()) == 60 + # Busiest month gets the most; quietest gets the least. + assert alloc["2026-01"] == max(alloc.values()) + assert alloc["2026-04"] == min(alloc.values()) + + +def test_largest_remainder_handles_zero_and_empty() -> None: + """Zero total or zero weights yield an all-zero allocation.""" + assert largest_remainder_allocation({"a": 5}, 0) == {"a": 0} + assert largest_remainder_allocation({"a": 0, "b": 0}, 10) == {"a": 0, "b": 0} + + +def test_select_within_month_spreads_across_sources() -> None: + """Within a month, selection round-robins across sources before draining one.""" + cands = ( + [make_candidate(f"y{i}", domain="ynet.co.il", month="2026-01") for i in range(10)] + + [make_candidate(f"m{i}", domain="mako.co.il", month="2026-01") for i in range(10)] + + [make_candidate(f"h{i}", domain="haaretz.co.il", month="2026-01") for i in range(10)] + ) + picked = select_within_month(cands, 6) + keys = collections.Counter(candidate_source_key(c) for c in picked) + # Six slots across three equally-sized sources → two each, none monopolised. + assert keys == {"ynet": 2, "mako": 2, "haaretz": 2} + + +def test_plan_balanced_scrape_batch_weights_months_and_balances_sources() -> None: + """The batch is frequency-weighted across months and source-balanced within.""" + cands: list[PersistentCandidate] = [] + # Jan: 40 candidates (busy), Feb: 20, Mar: 4 (quiet) — two sources each. + for month, total in [("2026-01", 40), ("2026-02", 20), ("2026-03", 4)]: + for i in range(total): + domain = "ynet.co.il" if i % 2 == 0 else "mako.co.il" + cands.append(make_candidate(f"{month}-{i}", domain=domain, month=month)) + + batch = plan_balanced_scrape_batch(cands, batch_size=16) + assert len(batch) == 16 + by_month = collections.Counter(candidate_month(c) for c in batch) + # 40:20:4 over 16 slots → 10:5:1 (largest remainder). + assert by_month["2026-01"] > by_month["2026-02"] > by_month["2026-03"] + assert sum(by_month.values()) == 16 + # Within the busy month both sources appear. + jan_sources = collections.Counter( + candidate_source_key(c) for c in batch if candidate_month(c) == "2026-01" + ) + assert set(jan_sources) == {"ynet", "mako"} + + +def test_plan_balanced_scrape_batch_caps_at_pool_size() -> None: + """Requesting more than available returns the whole pool, no duplicates.""" + cands = [make_candidate(f"a{i}", domain="ynet.co.il", month="2026-01") for i in range(5)] + batch = plan_balanced_scrape_batch(cands, batch_size=60) + assert len(batch) == 5 + assert len({c.candidate_id for c in batch}) == 5 + + +def test_plan_balanced_scrape_batch_tops_up_when_month_underfills() -> None: + """A month allocated more than it can supply is topped up from other months.""" + # Month A is heavily weighted but only has 1 candidate; month B has plenty. + cands = [make_candidate("a0", domain="ynet.co.il", month="2026-01")] + cands += [make_candidate(f"b{i}", domain="mako.co.il", month="2026-02") for i in range(30)] + batch = plan_balanced_scrape_batch(cands, batch_size=10) + assert len(batch) == 10 + assert len({c.candidate_id for c in batch}) == 10 + + +def test_plan_balanced_scrape_batch_excludes_undated() -> None: + """Candidates with no publication month are excluded from balanced selection.""" + dated = make_candidate("d", domain="ynet.co.il", month="2026-01") + undated = PersistentCandidate( + candidate_id="u", + canonical_url=HttpUrl("https://ynet.co.il/article/u"), + current_url=HttpUrl("https://ynet.co.il/article/u"), + domain="ynet.co.il", + titles=["t"], + snippets=["s"], + discovered_via=["brave"], + discovery_queries=["q"], + source_hints=["brave"], + first_seen_at=datetime(2026, 1, 1, tzinfo=UTC), + last_seen_at=datetime(2026, 1, 1, tzinfo=UTC), + candidate_status=CandidateStatus.NEW, + metadata={}, + ) + batch = plan_balanced_scrape_batch([dated, undated], batch_size=10) + assert [c.candidate_id for c in batch] == ["d"] + + +def test_plan_balanced_scrape_batch_zero_size_is_empty() -> None: + """A non-positive batch size yields no selection.""" + cands = [make_candidate("a", domain="ynet.co.il", month="2026-01")] + assert plan_balanced_scrape_batch(cands, batch_size=0) == [] diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index cf3014a..813807c 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -71,6 +71,7 @@ def fake_run_job( job_name: JobName, days_override: int | None = None, scrape_pub_date_from: str | None = None, # noqa: ARG001 + scrape_balanced_batch_size: int | None = None, # noqa: ARG001 ) -> None: captured["config_path"] = config_path captured["dataset_name"] = dataset_name diff --git a/tests/unit/test_pipeline_core.py b/tests/unit/test_pipeline_core.py index 50fc56d..1e7c7ef 100644 --- a/tests/unit/test_pipeline_core.py +++ b/tests/unit/test_pipeline_core.py @@ -2574,6 +2574,7 @@ async def fake_run_candidate_scrape_job( limit: int, orchestrator: object = None, # noqa: ARG001 pub_date_from: object = None, # noqa: ARG001 + balanced_batch_size: object = None, # noqa: ARG001 ) -> tuple[CandidateScrapeBatch, list[object]]: captured["days"] = config.days captured["limit"] = limit @@ -4059,6 +4060,7 @@ def fake_run_job_from_config(**kwargs: object) -> RunSnapshot: "days_override": 5, "operational_store": None, "scrape_pub_date_from": None, + "scrape_balanced_batch_size": None, } def test_run_job_from_config_passes_operational_store_to_async_runner(