Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/denbust/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ def run(
" (scrape_candidates job only).",
),
] = None,
balanced_batch: Annotated[
int | None,
typer.Option(
"--balanced-batch",
help="Scrape a month-frequency-weighted, source-balanced batch of this size"
" from the full prefilter-passing pool (scrape_candidates job only).",
),
] = None,
) -> None:
"""Run a dataset/job pair through the registry."""
from denbust.pipeline import run_job
Expand All @@ -84,6 +92,7 @@ def run(
job_name=job,
days_override=days,
scrape_pub_date_from=pub_date_from,
scrape_balanced_batch_size=balanced_batch,
)


Expand Down
1 change: 1 addition & 0 deletions src/denbust/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ class Config(BaseModel):
days: int = Field(default=3, ge=1)
max_articles: int = Field(default=30, ge=1)
scrape_pub_date_from: datetime | None = None
scrape_balanced_batch_size: int | None = Field(default=None, ge=1)
keywords: list[str] = Field(default_factory=lambda: DEFAULT_KEYWORDS.copy())
sources: list[SourceConfig] = Field(default_factory=list)
classifier: ClassifierConfig = Field(default_factory=ClassifierConfig)
Expand Down
194 changes: 194 additions & 0 deletions src/denbust/discovery/balanced_selection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""Frequency-weighted, source-balanced scrape-batch planning.

Given a pool of scrape-eligible candidates this module selects a fixed-size
batch that is:

1. **Frequency-weighted across months** — months with more eligible candidates
receive proportionally more of the batch (largest-remainder apportionment),
so a quiet month is not over-represented relative to a busy one.
2. **Source-balanced within each month** — a month's allocation is spread
round-robin across the distinct publication sources/source families present,
ordered by source scrape priority, so one prolific site cannot monopolise a
month's slots.

The publication source is derived from the candidate *domain* (not
``source_hints``, which for search-discovered candidates records the discovery
engine — ``exa`` / ``brave`` — rather than the publisher).
"""

from __future__ import annotations

from collections import defaultdict
from datetime import datetime

from denbust.discovery.candidate_filters import normalize_domain
from denbust.discovery.models import PersistentCandidate
from denbust.discovery.scrape_queue import (
_SOURCE_SCRAPE_PRIORITY,
_backfill_publication_datetime,
)
Comment on lines +26 to +29

# Discovery-engine markers that may appear in ``source_hints``; never a publisher.
_ENGINE_SOURCE_HINTS: frozenset[str] = frozenset({"brave", "exa", "google_cse", "test"})

# Registrable domain → canonical source-family key. Subdomains collapse onto
# the family (e.g. ``sport1.maariv.co.il`` and ``maariv.co.il`` both map to
# ``maariv``) so the within-month balancer treats them as one source.
_DOMAIN_SOURCE_FAMILIES: dict[str, str] = {
"ynet.co.il": "ynet",
"mako.co.il": "mako",
"ice.co.il": "ice",
"haaretz.co.il": "haaretz",
"maariv.co.il": "maariv",
"walla.co.il": "walla",
"globes.co.il": "globes",
"themarker.com": "themarker",
"israelhayom.co.il": "israelhayom",
"kan.org.il": "kan",
"news1.co.il": "news1",
"calcalist.co.il": "calcalist",
}


def candidate_source_key(candidate: PersistentCandidate) -> str:
"""Return the canonical publication-source key for *candidate*.

Resolution order: domain → known family; else the normalized domain; else a
non-engine ``source_hint``; else ``"unknown"``.
"""
domain = normalize_domain(candidate.domain) if candidate.domain else None
if domain:
for base, family in _DOMAIN_SOURCE_FAMILIES.items():
if domain == base or domain.endswith(f".{base}"):
return family
return domain
for hint in candidate.source_hints:
if hint and hint not in _ENGINE_SOURCE_HINTS:
return hint
return "unknown"


def candidate_month(candidate: PersistentCandidate) -> str | None:
"""Return the candidate's publication month as ``YYYY-MM`` or ``None``."""
published = _backfill_publication_datetime(candidate)
return published.strftime("%Y-%m") if published is not None else None


def _source_key_priority(source_key: str) -> int:
"""Scrape priority for a resolved source-family key (0 when unknown)."""
return _SOURCE_SCRAPE_PRIORITY.get(source_key, 0)


def largest_remainder_allocation(weights: dict[str, int], total: int) -> dict[str, int]:
"""Apportion *total* across keys proportionally to *weights*.

Uses the largest-remainder (Hamilton) method so the allocations sum exactly
to ``min(total, sum(weights))`` worth of demand while staying as close as
possible to the proportional ideal.
"""
weight_sum = sum(weights.values())
if weight_sum <= 0 or total <= 0:
return dict.fromkeys(weights, 0)
exact = {key: total * value / weight_sum for key, value in weights.items()}
allocation = {key: int(value) for key, value in exact.items()}
Comment on lines +89 to +93
remaining = total - sum(allocation.values())
if remaining > 0:
order = sorted(
weights,
key=lambda key: (exact[key] - allocation[key], weights[key], key),
reverse=True,
)
for key in order[:remaining]:
allocation[key] += 1
return allocation


def _order_within_source(candidates: list[PersistentCandidate]) -> list[PersistentCandidate]:
"""Order one source's candidates by priority, then recency, then id."""
return sorted(
candidates,
key=lambda c: (
-_source_key_priority(candidate_source_key(c)),
-c.last_seen_at.timestamp(),
c.candidate_id,
),
)


def select_within_month(
candidates: list[PersistentCandidate],
count: int,
) -> list[PersistentCandidate]:
"""Pick *count* candidates from one month, round-robin across sources.

Sources are visited in descending scrape-priority order each round, so
higher-signal publishers get first pick while every source is represented
before any source is drained.
"""
if count <= 0 or not candidates:
return []
by_source: dict[str, list[PersistentCandidate]] = defaultdict(list)
for candidate in candidates:
by_source[candidate_source_key(candidate)].append(candidate)
queues = {key: _order_within_source(group) for key, group in by_source.items()}
source_order = sorted(
queues,
key=lambda key: (-_source_key_priority(key), -len(queues[key]), key),
)
selected: list[PersistentCandidate] = []
while len(selected) < count and any(queues[key] for key in source_order):
for key in source_order:
if queues[key]:
selected.append(queues[key].pop(0))
if len(selected) >= count:
break
return selected


def plan_balanced_scrape_batch(
candidates: list[PersistentCandidate],
*,
batch_size: int,
now: datetime | None = None,
) -> list[PersistentCandidate]:
"""Select a month-frequency-weighted, source-balanced batch.

*candidates* should already be the scrape-eligible (and prefilter-passing)
pool. Candidates with no resolvable publication month are excluded from
balanced selection. Returns at most *batch_size* candidates.
"""
del now # reserved for future recency weighting; selection is order-stable
if batch_size <= 0:
return []
by_month: dict[str, list[PersistentCandidate]] = defaultdict(list)
for candidate in candidates:
month = candidate_month(candidate)
if month is not None:
by_month[month].append(candidate)
if not by_month:
return []

available = sum(len(group) for group in by_month.values())
target = min(batch_size, available)
weights = {month: len(group) for month, group in by_month.items()}
allocation = largest_remainder_allocation(weights, target)

selected: list[PersistentCandidate] = []
selected_ids: set[str] = set()
for month, group in by_month.items():
picked = select_within_month(group, allocation.get(month, 0))
selected.extend(picked)
selected_ids.update(c.candidate_id for c in picked)

# Top-up: a month allocated more than it could supply leaves the batch
# short; backfill from the remaining pool by global scrape priority.
if len(selected) < target:
leftovers = [c for c in candidates if c.candidate_id not in selected_ids]
leftovers = [c for c in leftovers if candidate_month(c) is not None]
for candidate in _order_within_source(leftovers):
if len(selected) >= target:
break
selected.append(candidate)
selected_ids.add(candidate.candidate_id)

return selected[:batch_size]
47 changes: 40 additions & 7 deletions src/denbust/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
plan_backfill_windows,
resolve_backfill_request_window,
)
from denbust.discovery.balanced_selection import candidate_month, plan_balanced_scrape_batch
from denbust.discovery.base import DiscoveryContext, SourceDiscoveryContext
from denbust.discovery.engine_checkpoint import (
cache_path as _engine_cache_path,
Expand All @@ -67,6 +68,7 @@
from denbust.discovery.scrape_queue import (
SCRAPEABLE_CANDIDATE_STATUSES,
CandidateScrapeBatch,
order_scrape_eligible_candidates,
scrape_candidates,
select_backfill_candidates_for_scrape,
select_candidates_for_scrape,
Expand Down Expand Up @@ -1418,25 +1420,47 @@ async def _run_candidate_scrape_job(
limit: int,
orchestrator: CascadeOrchestrator | None = None,
pub_date_from: datetime | None = None,
balanced_batch_size: int | None = None,
) -> tuple[CandidateScrapeBatch, list[PrefilterDecision]]:
"""Select queued candidates, apply the thin prefilter, and run the scrape-attempt layer.

Returns ``(scrape_batch, thin_decisions)``. When *orchestrator* is ``None``
the thin pass is skipped and ``thin_decisions`` is empty. When
*pub_date_from* is set only candidates published on or after that date are
considered (targeted / recent-only scrape).
considered (targeted / recent-only scrape). When *balanced_batch_size* is
set the selection is a month-frequency-weighted, source-balanced batch of
that size drawn from the full prefilter-passing pool (instead of the
priority-ordered ``limit`` head).
"""
persistence = create_discovery_persistence(config)
try:
selected_candidates = select_candidates_for_scrape(
persistence,
limit=limit,
pub_date_from=pub_date_from,
)
if balanced_batch_size is not None:
pool = persistence.list_candidates(statuses=SCRAPEABLE_CANDIDATE_STATUSES)
eligible = order_scrape_eligible_candidates(pool)
if pub_date_from is not None:
cutoff_month = pub_date_from.strftime("%Y-%m")
eligible = [
candidate
for candidate in eligible
if (month := candidate_month(candidate)) is not None and month >= cutoff_month
]
Comment on lines +1440 to +1446
passed_pool, thin_decisions = _thin_pass_prefilter(eligible, orchestrator)
passed_candidates = plan_balanced_scrape_batch(
passed_pool,
batch_size=balanced_batch_size,
)
else:
selected_candidates = select_candidates_for_scrape(
persistence,
limit=limit,
pub_date_from=pub_date_from,
)
passed_candidates, thin_decisions = _thin_pass_prefilter(
selected_candidates, orchestrator
)
finally:
persistence.close()

passed_candidates, thin_decisions = _thin_pass_prefilter(selected_candidates, orchestrator)
batch = await _scrape_candidate_batch(
config=config,
candidates=passed_candidates,
Expand Down Expand Up @@ -2499,6 +2523,7 @@ async def run_news_scrape_candidates_job(
limit=config.max_articles,
orchestrator=orchestrator,
pub_date_from=config.scrape_pub_date_from,
balanced_batch_size=config.scrape_balanced_batch_size,
)
result.raw_article_count = len(scrape_batch.raw_articles)
if scrape_batch.errors:
Expand Down Expand Up @@ -3300,6 +3325,7 @@ def _run_job_from_config(
days_override: int | None = None,
operational_store: OperationalStore | None = None,
scrape_pub_date_from: str | None = None,
scrape_balanced_batch_size: int | None = None,
) -> RunSnapshot:
"""Shared sync wrapper for CLI-triggered job runs."""
setup_logging()
Expand All @@ -3318,6 +3344,11 @@ def _run_job_from_config(
except ValueError as exc:
print(f"Error: --pub-date-from must be an ISO date (YYYY-MM-DD): {exc}")
sys.exit(1)
if scrape_balanced_batch_size is not None:
if scrape_balanced_batch_size < 1:
print("Error: --balanced-batch must be a positive integer")
sys.exit(1)
update["scrape_balanced_batch_size"] = scrape_balanced_batch_size
if update:
config = config.model_copy(update=update)

Expand Down Expand Up @@ -3394,6 +3425,7 @@ def run_job(
days_override: int | None = None,
operational_store: OperationalStore | None = None,
scrape_pub_date_from: str | None = None,
scrape_balanced_batch_size: int | None = None,
) -> None:
"""Run a dataset/job pair through the generic registry."""
_run_job_from_config(
Expand All @@ -3403,6 +3435,7 @@ def run_job(
days_override=days_override,
operational_store=operational_store,
scrape_pub_date_from=scrape_pub_date_from,
scrape_balanced_batch_size=scrape_balanced_batch_size,
)


Expand Down
Loading
Loading