Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 72 additions & 25 deletions allways/validator/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from dataclasses import dataclass
from enum import IntEnum
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, Iterator, List, Optional, Set, Tuple

import bittensor as bt
import numpy as np
Expand Down Expand Up @@ -228,23 +228,22 @@ def merge_replay_events(
return events


def replay_crown_time_window(
def _walk_replay_events(
store: ValidatorStateStore,
event_watcher: ContractEventWatcher,
from_chain: str,
to_chain: str,
window_start: int,
window_end: int,
rewardable_hotkeys: Set[str],
) -> Dict[str, float]:
"""Walk the merged event stream, return ``{hotkey: crown_blocks_float}``.
Ties at the same rate split credit evenly. A miner qualifies for crown
at an instant iff they are on the current metagraph, were active at
that instant, not busy, and had a positive rate posted. Active/rate/busy
are evaluated per-block via the replay — a miner's status at scoring
time is irrelevant other than metagraph membership (used to credit the
UID). Collateral-floor invariants are trusted to the contract's active
flag; halt state is handled at ``score_and_reward_miners`` entry."""
) -> Iterator[Tuple[int, int, List[str], float]]:
"""Walks the merged active/busy/rate stream over the window and yields one
record per uniform-state interval: ``(start, end, holders, best_rate)``.
``holders`` is empty when nobody qualifies for that interval; emit it
anyway so callers that materialize per-block state can encode "no holder"
as absence. Aggregating and per-block consumers both share this generator
so the qualification rule (active, not busy, has rate > 0, lower-rate-
wins-when-reversed) lives in exactly one place."""
rates, busy_count, active_set = reconstruct_window_start_state(
store, event_watcher, from_chain, to_chain, window_start, rewardable_hotkeys
)
Expand All @@ -256,13 +255,9 @@ def replay_crown_time_window(
canon_from, _ = canonical_pair(from_chain, to_chain)
lower_rate_wins = from_chain != canon_from

crown_blocks: Dict[str, float] = {}
prev_block = window_start

def credit_interval(interval_start: int, interval_end: int) -> None:
duration = interval_end - interval_start
if duration <= 0:
return
def emit_interval(interval_start: int, interval_end: int) -> Optional[Tuple[int, int, List[str], float]]:
if interval_end <= interval_start:
return None
busy_set = {hk for hk, c in busy_count.items() if c > 0}
holders = crown_holders_at_instant(
rates,
Expand All @@ -271,11 +266,8 @@ def credit_interval(interval_start: int, interval_end: int) -> None:
active=active_set,
lower_rate_wins=lower_rate_wins,
)
if not holders:
return
split = duration / len(holders)
for hk in holders:
crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split
best_rate = rates[holders[0]] if holders else 0.0
return (interval_start, interval_end, holders, best_rate)

def apply_event(event: ReplayEvent) -> None:
if event.kind is EventKind.RATE:
Expand All @@ -292,15 +284,70 @@ def apply_event(event: ReplayEvent) -> None:
else:
active_set.discard(event.hotkey)

prev_block = window_start
for event in replay_events:
credit_interval(prev_block, event.block)
interval = emit_interval(prev_block, event.block)
if interval is not None:
yield interval
apply_event(event)
prev_block = event.block

credit_interval(prev_block, window_end)
interval = emit_interval(prev_block, window_end)
if interval is not None:
yield interval


def replay_crown_time_window(
store: ValidatorStateStore,
event_watcher: ContractEventWatcher,
from_chain: str,
to_chain: str,
window_start: int,
window_end: int,
rewardable_hotkeys: Set[str],
) -> Dict[str, float]:
"""Walk the merged event stream, return ``{hotkey: crown_blocks_float}``.
Ties at the same rate split credit evenly. A miner qualifies for crown
at an instant iff they are on the current metagraph, were active at
that instant, not busy, and had a positive rate posted. Active/rate/busy
are evaluated per-block via the replay — a miner's status at scoring
time is irrelevant other than metagraph membership (used to credit the
UID). Collateral-floor invariants are trusted to the contract's active
flag; halt state is handled at ``score_and_reward_miners`` entry."""
crown_blocks: Dict[str, float] = {}
for start, end, holders, _rate in _walk_replay_events(
store, event_watcher, from_chain, to_chain, window_start, window_end, rewardable_hotkeys
):
if not holders:
continue
split = (end - start) / len(holders)
for hk in holders:
crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split
return crown_blocks


def replay_crown_time_window_iter(
store: ValidatorStateStore,
event_watcher: ContractEventWatcher,
from_chain: str,
to_chain: str,
window_start: int,
window_end: int,
rewardable_hotkeys: Set[str],
) -> Iterator[Tuple[int, List[str], float]]:
"""Per-block display path. Yields ``(block, holders, rate)`` for every
block in ``[window_start, window_end)``. ``holders`` is empty when nobody
qualifies — caller is responsible for encoding that as absence in any
persisted store. Each held block emits credit ``1.0 / len(holders)`` if
the caller chooses to credit per-block, so SUM(credit) across the same
window matches ``replay_crown_time_window`` bit-for-bit."""
for start, end, holders, rate in _walk_replay_events(
store, event_watcher, from_chain, to_chain, window_start, window_end, rewardable_hotkeys
):
for block in range(start, end):
yield (block, holders, rate)


def crown_holders_at_instant(
rates: Dict[str, float],
rewardable: Set[str],
Expand Down
55 changes: 47 additions & 8 deletions allways/validator/state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,32 @@ def __init__(
self,
db_path: Path | str | None = None,
current_block_fn: Optional[Callable[[], int]] = None,
readonly: bool = False,
):
self.db_path = Path(db_path or Path.home() / '.allways' / 'validator' / 'state.db')
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.readonly = readonly
if not readonly:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.lock = threading.Lock()
self.conn: Optional[sqlite3.Connection] = sqlite3.connect(self.db_path, check_same_thread=False)
# busy_timeout must be set before journal_mode: the WAL switch takes a
# brief exclusive lock that a concurrent opener would otherwise hit as
# an immediate "database is locked" error.
self.conn.execute('PRAGMA busy_timeout=5000')
self.conn.execute('PRAGMA journal_mode=WAL')
if readonly:
# External readers (e.g. the alw-utils sync utility) open the
# validator's DB read-only via SQLite URI mode so a stray write
# surfaces as an error instead of corrupting the live state. WAL
# mode is set by the writer; readers do not flip journal modes.
uri = f'file:{self.db_path}?mode=ro'
self.conn: Optional[sqlite3.Connection] = sqlite3.connect(uri, uri=True, check_same_thread=False)
self.conn.execute('PRAGMA busy_timeout=5000')
else:
self.conn = sqlite3.connect(self.db_path, check_same_thread=False)
# busy_timeout must be set before journal_mode: the WAL switch takes a
# brief exclusive lock that a concurrent opener would otherwise hit as
# an immediate "database is locked" error.
self.conn.execute('PRAGMA busy_timeout=5000')
self.conn.execute('PRAGMA journal_mode=WAL')
self.conn.row_factory = sqlite3.Row
self.current_block_fn = current_block_fn
self.init_db()
if not readonly:
self.init_db()

# ─── pending_confirms ───────────────────────────────────────────────

Expand Down Expand Up @@ -254,6 +267,32 @@ def get_rate_events_in_range(
).fetchall()
return [{'id': r['id'], 'hotkey': r['hotkey'], 'rate': r['rate'], 'block': r['block']} for r in rows]

def get_rate_events_since_id(self, since_id: int) -> List[dict]:
"""Rate events with autoincrement ``id`` > ``since_id``, oldest first.
Read-only consumers (the alw-utils sync utility) tail the log by id so
they advance one cursor regardless of direction or hotkey."""
with self.lock:
conn = self.require_connection()
rows = conn.execute(
"""
SELECT id, hotkey, from_chain, to_chain, rate, block FROM rate_events
WHERE id > ?
ORDER BY id ASC
""",
(since_id,),
).fetchall()
return [
{
'id': r['id'],
'hotkey': r['hotkey'],
'from_chain': r['from_chain'],
'to_chain': r['to_chain'],
'rate': r['rate'],
'block': r['block'],
}
for r in rows
]

# ─── swap_outcomes ──────────────────────────────────────────────────

def insert_swap_outcome(
Expand Down
Empty file added tests/validator/__init__.py
Empty file.
Loading