diff --git a/allways/validator/scoring.py b/allways/validator/scoring.py index a5eda26e..621e88ed 100644 --- a/allways/validator/scoring.py +++ b/allways/validator/scoring.py @@ -9,7 +9,7 @@ from dataclasses import dataclass from enum import IntEnum -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Dict, Iterator, List, Optional, Set, Tuple import bittensor as bt import numpy as np @@ -228,7 +228,7 @@ def merge_replay_events( return events -def replay_crown_time_window( +def _walk_replay_events( store: ValidatorStateStore, event_watcher: ContractEventWatcher, from_chain: str, @@ -236,15 +236,14 @@ def replay_crown_time_window( window_start: int, window_end: int, rewardable_hotkeys: Set[str], -) -> Dict[str, float]: - """Walk the merged event stream, return ``{hotkey: crown_blocks_float}``. - Ties at the same rate split credit evenly. A miner qualifies for crown - at an instant iff they are on the current metagraph, were active at - that instant, not busy, and had a positive rate posted. Active/rate/busy - are evaluated per-block via the replay — a miner's status at scoring - time is irrelevant other than metagraph membership (used to credit the - UID). Collateral-floor invariants are trusted to the contract's active - flag; halt state is handled at ``score_and_reward_miners`` entry.""" +) -> Iterator[Tuple[int, int, List[str], float]]: + """Walks the merged active/busy/rate stream over the window and yields one + record per uniform-state interval: ``(start, end, holders, best_rate)``. + ``holders`` is empty when nobody qualifies for that interval; emit it + anyway so callers that materialize per-block state can encode "no holder" + as absence. Aggregating and per-block consumers both share this generator + so the qualification rule (active, not busy, has rate > 0, lower-rate- + wins-when-reversed) lives in exactly one place.""" rates, busy_count, active_set = reconstruct_window_start_state( store, event_watcher, from_chain, to_chain, window_start, rewardable_hotkeys ) @@ -256,13 +255,9 @@ def replay_crown_time_window( canon_from, _ = canonical_pair(from_chain, to_chain) lower_rate_wins = from_chain != canon_from - crown_blocks: Dict[str, float] = {} - prev_block = window_start - - def credit_interval(interval_start: int, interval_end: int) -> None: - duration = interval_end - interval_start - if duration <= 0: - return + def emit_interval(interval_start: int, interval_end: int) -> Optional[Tuple[int, int, List[str], float]]: + if interval_end <= interval_start: + return None busy_set = {hk for hk, c in busy_count.items() if c > 0} holders = crown_holders_at_instant( rates, @@ -271,11 +266,8 @@ def credit_interval(interval_start: int, interval_end: int) -> None: active=active_set, lower_rate_wins=lower_rate_wins, ) - if not holders: - return - split = duration / len(holders) - for hk in holders: - crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split + best_rate = rates[holders[0]] if holders else 0.0 + return (interval_start, interval_end, holders, best_rate) def apply_event(event: ReplayEvent) -> None: if event.kind is EventKind.RATE: @@ -292,15 +284,70 @@ def apply_event(event: ReplayEvent) -> None: else: active_set.discard(event.hotkey) + prev_block = window_start for event in replay_events: - credit_interval(prev_block, event.block) + interval = emit_interval(prev_block, event.block) + if interval is not None: + yield interval apply_event(event) prev_block = event.block - credit_interval(prev_block, window_end) + interval = emit_interval(prev_block, window_end) + if interval is not None: + yield interval + + +def replay_crown_time_window( + store: ValidatorStateStore, + event_watcher: ContractEventWatcher, + from_chain: str, + to_chain: str, + window_start: int, + window_end: int, + rewardable_hotkeys: Set[str], +) -> Dict[str, float]: + """Walk the merged event stream, return ``{hotkey: crown_blocks_float}``. + Ties at the same rate split credit evenly. A miner qualifies for crown + at an instant iff they are on the current metagraph, were active at + that instant, not busy, and had a positive rate posted. Active/rate/busy + are evaluated per-block via the replay — a miner's status at scoring + time is irrelevant other than metagraph membership (used to credit the + UID). Collateral-floor invariants are trusted to the contract's active + flag; halt state is handled at ``score_and_reward_miners`` entry.""" + crown_blocks: Dict[str, float] = {} + for start, end, holders, _rate in _walk_replay_events( + store, event_watcher, from_chain, to_chain, window_start, window_end, rewardable_hotkeys + ): + if not holders: + continue + split = (end - start) / len(holders) + for hk in holders: + crown_blocks[hk] = crown_blocks.get(hk, 0.0) + split return crown_blocks +def replay_crown_time_window_iter( + store: ValidatorStateStore, + event_watcher: ContractEventWatcher, + from_chain: str, + to_chain: str, + window_start: int, + window_end: int, + rewardable_hotkeys: Set[str], +) -> Iterator[Tuple[int, List[str], float]]: + """Per-block display path. Yields ``(block, holders, rate)`` for every + block in ``[window_start, window_end)``. ``holders`` is empty when nobody + qualifies — caller is responsible for encoding that as absence in any + persisted store. Each held block emits credit ``1.0 / len(holders)`` if + the caller chooses to credit per-block, so SUM(credit) across the same + window matches ``replay_crown_time_window`` bit-for-bit.""" + for start, end, holders, rate in _walk_replay_events( + store, event_watcher, from_chain, to_chain, window_start, window_end, rewardable_hotkeys + ): + for block in range(start, end): + yield (block, holders, rate) + + def crown_holders_at_instant( rates: Dict[str, float], rewardable: Set[str], diff --git a/allways/validator/state_store.py b/allways/validator/state_store.py index 71ec5aa1..55d0e2ac 100644 --- a/allways/validator/state_store.py +++ b/allways/validator/state_store.py @@ -45,19 +45,32 @@ def __init__( self, db_path: Path | str | None = None, current_block_fn: Optional[Callable[[], int]] = None, + readonly: bool = False, ): self.db_path = Path(db_path or Path.home() / '.allways' / 'validator' / 'state.db') - self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.readonly = readonly + if not readonly: + self.db_path.parent.mkdir(parents=True, exist_ok=True) self.lock = threading.Lock() - self.conn: Optional[sqlite3.Connection] = sqlite3.connect(self.db_path, check_same_thread=False) - # busy_timeout must be set before journal_mode: the WAL switch takes a - # brief exclusive lock that a concurrent opener would otherwise hit as - # an immediate "database is locked" error. - self.conn.execute('PRAGMA busy_timeout=5000') - self.conn.execute('PRAGMA journal_mode=WAL') + if readonly: + # External readers (e.g. the alw-utils sync utility) open the + # validator's DB read-only via SQLite URI mode so a stray write + # surfaces as an error instead of corrupting the live state. WAL + # mode is set by the writer; readers do not flip journal modes. + uri = f'file:{self.db_path}?mode=ro' + self.conn: Optional[sqlite3.Connection] = sqlite3.connect(uri, uri=True, check_same_thread=False) + self.conn.execute('PRAGMA busy_timeout=5000') + else: + self.conn = sqlite3.connect(self.db_path, check_same_thread=False) + # busy_timeout must be set before journal_mode: the WAL switch takes a + # brief exclusive lock that a concurrent opener would otherwise hit as + # an immediate "database is locked" error. + self.conn.execute('PRAGMA busy_timeout=5000') + self.conn.execute('PRAGMA journal_mode=WAL') self.conn.row_factory = sqlite3.Row self.current_block_fn = current_block_fn - self.init_db() + if not readonly: + self.init_db() # ─── pending_confirms ─────────────────────────────────────────────── @@ -254,6 +267,32 @@ def get_rate_events_in_range( ).fetchall() return [{'id': r['id'], 'hotkey': r['hotkey'], 'rate': r['rate'], 'block': r['block']} for r in rows] + def get_rate_events_since_id(self, since_id: int) -> List[dict]: + """Rate events with autoincrement ``id`` > ``since_id``, oldest first. + Read-only consumers (the alw-utils sync utility) tail the log by id so + they advance one cursor regardless of direction or hotkey.""" + with self.lock: + conn = self.require_connection() + rows = conn.execute( + """ + SELECT id, hotkey, from_chain, to_chain, rate, block FROM rate_events + WHERE id > ? + ORDER BY id ASC + """, + (since_id,), + ).fetchall() + return [ + { + 'id': r['id'], + 'hotkey': r['hotkey'], + 'from_chain': r['from_chain'], + 'to_chain': r['to_chain'], + 'rate': r['rate'], + 'block': r['block'], + } + for r in rows + ] + # ─── swap_outcomes ────────────────────────────────────────────────── def insert_swap_outcome( diff --git a/tests/validator/__init__.py b/tests/validator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/validator/test_scoring_iter.py b/tests/validator/test_scoring_iter.py new file mode 100644 index 00000000..e6ae6af8 --- /dev/null +++ b/tests/validator/test_scoring_iter.py @@ -0,0 +1,296 @@ +"""Per-block crown replay iter tests — verifies that the iter wrapper and the +aggregate wrapper share the same underlying state machine.""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest + +from allways.validator.scoring import ( + _walk_replay_events, + replay_crown_time_window, + replay_crown_time_window_iter, +) +from allways.validator.state_store import ValidatorStateStore +from tests.test_scoring_v1 import make_watcher, seed_active + + +def _insert_rate( + conn: sqlite3.Connection, hotkey: str, from_chain: str, to_chain: str, rate: float, block: int +) -> None: + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + (hotkey, from_chain, to_chain, rate, block), + ) + + +class TestIterShape: + def test_iter_yields_one_entry_per_block(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + _insert_rate(store.require_connection(), 'hk_a', 'btc', 'tao', 100.0, 0) + store.require_connection().commit() + + records = list( + replay_crown_time_window_iter( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=200, + rewardable_hotkeys={'hk_a'}, + ) + ) + + assert len(records) == 100 + assert [r[0] for r in records] == list(range(100, 200)) + assert all(r[1] == ['hk_a'] for r in records) + assert all(r[2] == 100.0 for r in records) + store.close() + + def test_iter_empty_window_yields_nothing(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + records = list( + replay_crown_time_window_iter( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=500, + window_end=500, + rewardable_hotkeys={'hk_a'}, + ) + ) + assert records == [] + store.close() + + def test_iter_no_rewardable_emits_empty_holders_each_block(self, tmp_path: Path): + """When no hotkey qualifies, every block still emits a record with + empty holders — caller decides how to encode that (we rely on + absence in Postgres, but the generator faithfully reports every + block of the window).""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active=set()) + records = list( + replay_crown_time_window_iter( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=110, + rewardable_hotkeys=set(), + ) + ) + assert len(records) == 10 + assert all(r[1] == [] for r in records) + assert all(r[2] == 0.0 for r in records) + store.close() + + +class TestIterAggregateParity: + """SUM(credit) derived from iter must equal replay_crown_time_window totals + bit-for-bit — the validator scoring path and the database mirror are not + just close, they share the underlying ``_walk_replay_events`` generator.""" + + def _credit_from_iter(self, records) -> dict[str, float]: + credit: dict[str, float] = {} + for _block, holders, _rate in records: + if not holders: + continue + per = 1.0 / len(holders) + for hk in holders: + credit[hk] = credit.get(hk, 0.0) + per + return credit + + def test_two_miners_alternate_leadership(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a', 'hk_b'}) + conn = store.require_connection() + _insert_rate(conn, 'hk_a', 'btc', 'tao', 100.0, 0) + _insert_rate(conn, 'hk_b', 'btc', 'tao', 200.0, 0) + _insert_rate(conn, 'hk_a', 'btc', 'tao', 300.0, 600) + conn.commit() + + kwargs = dict( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=1100, + rewardable_hotkeys={'hk_a', 'hk_b'}, + ) + aggregate = replay_crown_time_window(**kwargs) + from_iter = self._credit_from_iter(list(replay_crown_time_window_iter(**kwargs))) + + assert aggregate == from_iter == {'hk_a': 500.0, 'hk_b': 500.0} + store.close() + + def test_tie_split_credit_matches(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a', 'hk_b'}) + conn = store.require_connection() + _insert_rate(conn, 'hk_a', 'tao', 'btc', 0.00020, 0) + _insert_rate(conn, 'hk_b', 'tao', 'btc', 0.00020, 0) + conn.commit() + + kwargs = dict( + store=store, + event_watcher=watcher, + from_chain='tao', + to_chain='btc', + window_start=100, + window_end=1100, + rewardable_hotkeys={'hk_a', 'hk_b'}, + ) + aggregate = replay_crown_time_window(**kwargs) + from_iter = self._credit_from_iter(list(replay_crown_time_window_iter(**kwargs))) + assert aggregate == from_iter == {'hk_a': 500.0, 'hk_b': 500.0} + + # Per-block, both miners are listed as holders with rate 0.00020. + records = list(replay_crown_time_window_iter(**kwargs)) + assert all(set(r[1]) == {'hk_a', 'hk_b'} for r in records) + store.close() + + def test_busy_window_runner_up_matches(self, tmp_path: Path): + """A drops crown to B for [400, 800] — both aggregate and iter agree + on the credit split.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a', 'hk_b'}) + conn = store.require_connection() + _insert_rate(conn, 'hk_a', 'btc', 'tao', 300.0, 0) + _insert_rate(conn, 'hk_b', 'btc', 'tao', 200.0, 0) + conn.commit() + watcher.apply_event(400, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + watcher.apply_event(800, 'SwapCompleted', {'swap_id': 1, 'miner': 'hk_a'}) + + kwargs = dict( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=1100, + rewardable_hotkeys={'hk_a', 'hk_b'}, + ) + aggregate = replay_crown_time_window(**kwargs) + from_iter = self._credit_from_iter(list(replay_crown_time_window_iter(**kwargs))) + # A held [100, 400] + [800, 1100] = 600 blocks; B held [400, 800] = 400. + assert aggregate == from_iter == {'hk_a': 600.0, 'hk_b': 400.0} + store.close() + + +class TestNoHolderEncoding: + def test_all_busy_emits_empty_holders(self, tmp_path: Path): + """When the best-rate miner is busy and no runner-up qualifies, iter + emits empty-holder records — the sync utility translates those to + "no rows for this block" in the Postgres mirror.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + conn = store.require_connection() + _insert_rate(conn, 'hk_a', 'btc', 'tao', 300.0, 0) + conn.commit() + watcher.apply_event(400, 'SwapInitiated', {'swap_id': 1, 'miner': 'hk_a'}) + watcher.apply_event(800, 'SwapCompleted', {'swap_id': 1, 'miner': 'hk_a'}) + + records = list( + replay_crown_time_window_iter( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=1100, + rewardable_hotkeys={'hk_a'}, + ) + ) + busy_block = next(r for r in records if r[0] == 500) + assert busy_block == (500, [], 0.0) + held_block = next(r for r in records if r[0] == 300) + assert held_block == (300, ['hk_a'], 300.0) + store.close() + + def test_active_flag_off_emits_empty_holders(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a'}) + conn = store.require_connection() + _insert_rate(conn, 'hk_a', 'btc', 'tao', 300.0, 0) + conn.commit() + # MinerActivated(active=False) at block 500 + seed_active(watcher, 'hk_a', active=False, block=500) + + records = list( + replay_crown_time_window_iter( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=1000, + rewardable_hotkeys={'hk_a'}, + ) + ) + # Pre-deactivation block held; post-deactivation block does not. + assert next(r for r in records if r[0] == 400) == (400, ['hk_a'], 300.0) + assert next(r for r in records if r[0] == 600) == (600, [], 0.0) + store.close() + + +class TestWalkReplayEventsIntervalShape: + def test_intervals_partition_the_window(self, tmp_path: Path): + """The merged walker emits non-overlapping intervals that cover + ``[window_start, window_end)`` exactly.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + watcher = make_watcher(store, active={'hk_a', 'hk_b'}) + conn = store.require_connection() + _insert_rate(conn, 'hk_a', 'btc', 'tao', 100.0, 0) + _insert_rate(conn, 'hk_b', 'btc', 'tao', 200.0, 0) + _insert_rate(conn, 'hk_a', 'btc', 'tao', 300.0, 400) + conn.commit() + + intervals = list( + _walk_replay_events( + store=store, + event_watcher=watcher, + from_chain='btc', + to_chain='tao', + window_start=100, + window_end=1000, + rewardable_hotkeys={'hk_a', 'hk_b'}, + ) + ) + assert intervals[0][0] == 100 + assert intervals[-1][1] == 1000 + for prev, nxt in zip(intervals, intervals[1:]): + assert prev[1] == nxt[0] + store.close() + + +class TestReadOnlyMode: + def test_readonly_can_read_existing_db(self, tmp_path: Path): + writer = ValidatorStateStore(db_path=tmp_path / 'state.db') + _insert_rate(writer.require_connection(), 'hk_a', 'btc', 'tao', 123.0, 7) + writer.require_connection().commit() + writer.close() + + reader = ValidatorStateStore(db_path=tmp_path / 'state.db', readonly=True) + rows = reader.get_rate_events_since_id(0) + assert len(rows) == 1 + assert rows[0]['rate'] == 123.0 + assert rows[0]['hotkey'] == 'hk_a' + reader.close() + + def test_readonly_blocks_writes(self, tmp_path: Path): + writer = ValidatorStateStore(db_path=tmp_path / 'state.db') + writer.close() + + reader = ValidatorStateStore(db_path=tmp_path / 'state.db', readonly=True) + with pytest.raises(sqlite3.OperationalError): + _insert_rate(reader.require_connection(), 'hk_a', 'btc', 'tao', 1.0, 1) + reader.require_connection().commit() + reader.close()