diff --git a/CHANGELOG.md b/CHANGELOG.md index 3210ebb3..64c2cf67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### Added + +- **Greedy Scheduler**: Fast greedy scheduling algorithms as an alternative to CP-SAT optimization + - Added `greedy_minimize_time()` for minimal execution time scheduling with ALAP preparation optimization + - Added `greedy_minimize_space()` for minimal qubit usage scheduling + +### Tests + +- **Greedy Scheduler**: Added tests for greedy scheduling algorithms + + ## [0.2.1] - 2026-01-16 ### Added diff --git a/graphqomb/feedforward.py b/graphqomb/feedforward.py index e1f558ef..fe8a542f 100644 --- a/graphqomb/feedforward.py +++ b/graphqomb/feedforward.py @@ -3,6 +3,8 @@ This module provides: - `dag_from_flow`: Construct a directed acyclic graph (DAG) from a flowlike object. +- `inverse_dag_from_dag`: Construct an inverse DAG (node -> dependencies). +- `topo_order_from_inv_dag`: Construct a topological order from an inverse DAG. - `check_dag`: Check if a directed acyclic graph (DAG) does not contain a cycle. - `check_flow`: Check if the flowlike object is causal with respect to the graph state. - `signal_shifting`: Convert the correction maps into more parallel-friendly forms using signal shifting. @@ -13,7 +15,7 @@ from collections.abc import Iterable, Mapping from collections.abc import Set as AbstractSet -from graphlib import TopologicalSorter +from graphlib import CycleError, TopologicalSorter from typing import Any, TypeGuard import typing_extensions @@ -21,6 +23,8 @@ from graphqomb.common import Axis, Plane, determine_pauli_axis from graphqomb.graphstate import BaseGraphState, odd_neighbors +TOPO_ORDER_CYCLE_ERROR_MSG = "No nodes can be measured; possible cyclic dependency or incomplete preparation." + def _is_flow(flowlike: Mapping[int, Any]) -> TypeGuard[Mapping[int, int]]: r"""Check if the flowlike object is a flow. @@ -129,6 +133,60 @@ def check_dag(dag: Mapping[int, Iterable[int]]) -> None: raise ValueError(msg) +def inverse_dag_from_dag( + dag: Mapping[int, Iterable[int]], + all_nodes: Iterable[int] | None = None, +) -> dict[int, set[int]]: + r"""Build inverse DAG (node -> dependencies) from parent->children DAG. + + Parameters + ---------- + dag : `collections.abc.Mapping`\[`int`, `collections.abc.Iterable`\[`int`\]\] + DAG represented as parent node -> children. + all_nodes : `collections.abc.Iterable`\[`int`\] | `None`, optional + Optional full node set to include isolated nodes. + + Returns + ------- + `dict`\[`int`, `set`\[`int`\]\] + Inverse DAG represented as node -> dependencies. + """ + nodes = set(all_nodes) if all_nodes is not None else set(dag) + for children in dag.values(): + nodes.update(children) + + inv_dag: dict[int, set[int]] = {node: set() for node in nodes} + for parent, children in dag.items(): + for child in children: + inv_dag[child].add(parent) + + return inv_dag + + +def topo_order_from_inv_dag(inv_dag: Mapping[int, Iterable[int]]) -> list[int]: + r"""Build topological order from an inverse DAG (node -> dependencies). + + Parameters + ---------- + inv_dag : `collections.abc.Mapping`\[`int`, `collections.abc.Iterable`\[`int`\]\] + Inverse DAG where each node maps to the nodes it depends on. + + Returns + ------- + `list`\[`int`\] + Topological order from dependencies to dependents. + + Raises + ------ + RuntimeError + If topological ordering is not possible due to a cycle. + """ + try: + return list(TopologicalSorter(inv_dag).static_order()) + except CycleError as exc: + raise RuntimeError(TOPO_ORDER_CYCLE_ERROR_MSG) from exc + + def check_flow( graph: BaseGraphState, xflow: Mapping[int, int] | Mapping[int, AbstractSet[int]], diff --git a/graphqomb/greedy_scheduler.py b/graphqomb/greedy_scheduler.py new file mode 100644 index 00000000..a4751e3f --- /dev/null +++ b/graphqomb/greedy_scheduler.py @@ -0,0 +1,487 @@ +"""Greedy heuristic scheduler for fast MBQC pattern scheduling. + +This module provides fast greedy scheduling algorithms as an alternative to +CP-SAT based optimization. The greedy algorithms provide approximate solutions +with speedup compared to CP-SAT, making them suitable for large-scale +graphs or when optimality is not critical. + +This module provides: + +- `greedy_minimize_time`: Fast greedy scheduler optimizing for minimal execution time +- `greedy_minimize_space`: Fast greedy scheduler optimizing for minimal qubit usage +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from graphqomb.feedforward import TOPO_ORDER_CYCLE_ERROR_MSG, inverse_dag_from_dag, topo_order_from_inv_dag + +if TYPE_CHECKING: + from collections.abc import Mapping + from collections.abc import Set as AbstractSet + + from graphqomb.graphstate import BaseGraphState + + +def greedy_minimize_time( # noqa: PLR0914 + graph: BaseGraphState, + dag: Mapping[int, AbstractSet[int]], + max_qubit_count: int | None = None, +) -> tuple[dict[int, int], dict[int, int]]: + r"""Fast greedy scheduler optimizing for minimal execution time (makespan). + + This algorithm uses a single slice-by-slice strategy with slack-filling. + If `max_qubit_count` is `None`, it is treated as no active-qubit limit. + + At each time slice, scheduling proceeds in two phases: + + 1. Phase 1 (measurement phase): Measure every currently ready node. + A node is ready when all DAG parents are already measured, all graph + neighbors are prepared, and (for non-input nodes) the node itself is prepared. + 2. Phase 2 (preparation phase): Use remaining qubit capacity to prepare + high-priority unprepared nodes that are likely to unblock future + measurements (slack-filling). + + Parameters + ---------- + graph : `BaseGraphState` + The graph state to schedule + dag : `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] + The directed acyclic graph representing measurement dependencies + max_qubit_count : `int` | `None`, optional + Maximum allowed number of active qubits. If None, no limit is enforced. + + Returns + ------- + `tuple`\[`dict`\[`int`, `int`\], `dict`\[`int`, `int`\]\] + A tuple of (prepare_time, measure_time) dictionaries + + Raises + ------ + RuntimeError + If the scheduling cannot proceed due to cyclic dependencies + or if the max_qubit_count constraint is too tight to allow any progress. + """ + unmeasured = graph.physical_nodes - graph.output_node_indices.keys() + input_nodes = set(graph.input_node_indices.keys()) + output_nodes = set(graph.output_node_indices.keys()) + + inv_dag = inverse_dag_from_dag(dag, graph.physical_nodes) + + # Cache neighbors to avoid repeated set constructions in tight loops + neighbors_map = {node: graph.neighbors(node) for node in graph.physical_nodes} + + # Single implementation for both bounded and unbounded capacity modes. + prepare_time: dict[int, int] = {} + measure_time: dict[int, int] = {} + + # Make mutable copies + inv_dag_mut: dict[int, set[int]] = {node: set(parents) for node, parents in inv_dag.items()} + unmeasured_mut: set[int] = set(unmeasured) + + prepared: set[int] = set(input_nodes) + alive: set[int] = set(input_nodes) + + effective_max_qubit_count = max_qubit_count if max_qubit_count is not None else len(graph.physical_nodes) + + if len(alive) > effective_max_qubit_count: + msg = "Initial number of active qubits exceeds max_qubit_count." + raise RuntimeError(msg) + + # Compute criticality for prioritizing preparations + criticality = _compute_criticality(dag, output_nodes) + + current_time = 0 + + while unmeasured_mut: + ready_to_measure = _phase1_measure_ready_nodes( + current_time, + dag=dag, + inv_dag=inv_dag_mut, + neighbors_map=neighbors_map, + input_nodes=input_nodes, + prepared=prepared, + alive=alive, + unmeasured=unmeasured_mut, + measure_time=measure_time, + ) + prepared_in_phase2 = _phase2_prepare_nodes_with_slack( + current_time, + physical_nodes=graph.physical_nodes, + max_qubit_count=effective_max_qubit_count, + inv_dag=inv_dag_mut, + neighbors_map=neighbors_map, + prepared=prepared, + alive=alive, + unmeasured=unmeasured_mut, + output_nodes=output_nodes, + criticality=criticality, + prepare_time=prepare_time, + ) + + # Check if we made progress + if not ready_to_measure and not prepared_in_phase2 and unmeasured_mut: + if max_qubit_count is None: + raise RuntimeError(TOPO_ORDER_CYCLE_ERROR_MSG) + # No measurements and no room to prepare under qubit-capacity constraint. + msg = ( + "Cannot schedule more measurements without exceeding max qubit count. Please increase max_qubit_count." + ) + raise RuntimeError(msg) + + current_time += 1 + + # Safety check for infinite loops + if current_time > len(graph.physical_nodes) * 2: + msg = "Scheduling did not converge; possible cyclic dependency." + raise RuntimeError(msg) + + # Apply ALAP post-processing to minimize active volume + prepare_time = alap_prepare_times(graph, measure_time) + + return prepare_time, measure_time + + +def _phase1_measure_ready_nodes( # noqa: PLR0913 + current_time: int, + *, + dag: Mapping[int, AbstractSet[int]], + inv_dag: dict[int, set[int]], + neighbors_map: Mapping[int, AbstractSet[int]], + input_nodes: AbstractSet[int], + prepared: set[int], + alive: set[int], + unmeasured: set[int], + measure_time: dict[int, int], +) -> set[int]: + # Phase 1: measure all currently ready nodes. + ready_to_measure: set[int] = set() + for node in unmeasured: + if inv_dag[node]: + continue + if not neighbors_map[node] <= prepared: + continue + if node not in input_nodes and node not in prepared: + continue + ready_to_measure.add(node) + + for node in ready_to_measure: + measure_time[node] = current_time + unmeasured.remove(node) + alive.discard(node) + for child in dag.get(node, ()): + inv_dag[child].discard(node) + + return ready_to_measure + + +def _phase2_prepare_nodes_with_slack( # noqa: PLR0913 + current_time: int, + *, + physical_nodes: AbstractSet[int], + max_qubit_count: int, + inv_dag: Mapping[int, AbstractSet[int]], + neighbors_map: Mapping[int, AbstractSet[int]], + prepared: set[int], + alive: set[int], + unmeasured: AbstractSet[int], + output_nodes: AbstractSet[int], + criticality: Mapping[int, int], + prepare_time: dict[int, int], +) -> bool: + # Phase 2: fill free qubit capacity with high-priority preparation candidates. + free_capacity = max_qubit_count - len(alive) + if free_capacity <= 0: + return False + + unprepared = physical_nodes - prepared + if not unprepared: + return False + + prep_candidates = _get_prep_candidates_with_priority( + unprepared, + inv_dag, + neighbors_map, + prepared, + unmeasured, + output_nodes, + criticality, + ) + + prepared_in_phase2 = False + for candidate, _score in prep_candidates[:free_capacity]: + prepare_time[candidate] = current_time + prepared.add(candidate) + alive.add(candidate) + prepared_in_phase2 = True + + return prepared_in_phase2 + + +def _compute_criticality( + dag: Mapping[int, AbstractSet[int]], + output_nodes: AbstractSet[int], +) -> dict[int, int]: + # Compute criticality (remaining DAG depth) for each node. + # Nodes with higher criticality should be prioritized for unblocking. + criticality: dict[int, int] = {} + + # For criticality we need children first, so reverse dependency-first topo order. + topo_order = topo_order_from_inv_dag(inverse_dag_from_dag(dag)) + topo_order.reverse() + + for node in topo_order: + children_crits = [criticality.get(c, 0) for c in dag.get(node, ())] + criticality[node] = 1 + max(children_crits, default=0) + + # Output nodes have criticality 0 (they don't need to be measured) + for node in output_nodes: + criticality[node] = 0 + + return criticality + + +def _get_prep_candidates_with_priority( # noqa: PLR0913, PLR0917 + unprepared: AbstractSet[int], + inv_dag: Mapping[int, AbstractSet[int]], + neighbors_map: Mapping[int, AbstractSet[int]], + prepared: AbstractSet[int], + unmeasured: AbstractSet[int], + output_nodes: AbstractSet[int], + criticality: Mapping[int, int], +) -> list[tuple[int, float]]: + # Get preparation candidates sorted by priority score. + # Priority is based on how much preparing a node helps unblock measurements. + # Find nodes that are DAG-ready but blocked by missing neighbors + dag_ready_blocked: set[int] = set() + missing_map: dict[int, set[int]] = {} + + for node in unmeasured: + if inv_dag[node]: + continue # Not DAG-ready + missing = set(neighbors_map[node]) - set(prepared) + # Also check if the node itself needs preparation + if node not in prepared: + missing.add(node) + if missing: + dag_ready_blocked.add(node) + missing_map[node] = missing + + # Score each unprepared node + scores: list[tuple[int, float]] = [] + for candidate in unprepared: + score = 0.0 + for blocked_node in dag_ready_blocked: + if candidate in missing_map[blocked_node]: + crit = criticality.get(blocked_node, 1) + score += crit / len(missing_map[blocked_node]) + + # Apply penalty for output nodes (they stay alive forever) + if candidate in output_nodes: + score *= 0.5 + + scores.append((candidate, score)) + + # Sort by score descending (higher score = higher priority) + scores.sort(key=lambda x: -x[1]) + + return scores + + +def greedy_minimize_space( # noqa: PLR0914 + graph: BaseGraphState, + dag: Mapping[int, AbstractSet[int]], +) -> tuple[dict[int, int], dict[int, int]]: + r"""Fast greedy scheduler optimizing for minimal qubit usage (space). + + This algorithm uses a greedy approach to minimize the number of active + qubits at each time step: + 1. At each time step, select the next node to measure that minimizes the + projected number of alive qubits after any required preparations. + 2. Prepare neighbors of the measured node just before measurement. + + Parameters + ---------- + graph : `BaseGraphState` + The graph state to schedule + dag : `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] + The directed acyclic graph representing measurement dependencies + + Returns + ------- + `tuple`\[`dict`\[`int`, `int`\], `dict`\[`int`, `int`\]\] + A tuple of (prepare_time, measure_time) dictionaries + + Raises + ------ + RuntimeError + If no nodes can be measured at a given time step, indicating a possible + cyclic dependency or incomplete preparation. + """ + prepare_time: dict[int, int] = {} + measure_time: dict[int, int] = {} + + unmeasured = graph.physical_nodes - graph.output_node_indices.keys() + + inv_dag = inverse_dag_from_dag(dag, graph.physical_nodes) + topo_order = topo_order_from_inv_dag(inv_dag) # from parents to children + topo_rank = {node: i for i, node in enumerate(topo_order)} + + input_nodes = set(graph.input_node_indices.keys()) + prepared: set[int] = set(input_nodes) + alive: set[int] = set(input_nodes) + current_time = 0 + + # Cache neighbors once as the graph is static during scheduling + neighbors_map = {node: graph.neighbors(node) for node in graph.physical_nodes} + + measure_candidates: set[int] = {node for node in unmeasured if not inv_dag[node]} + + while unmeasured: + if not measure_candidates: + raise RuntimeError(TOPO_ORDER_CYCLE_ERROR_MSG) + + # calculate costs and pick the best node to measure + default_rank = len(topo_rank) + candidates = iter(measure_candidates) + best_node = next(candidates) + best_cost = _calc_activate_cost(best_node, neighbors_map, prepared, alive, input_nodes) + best_rank = topo_rank.get(best_node, default_rank) + for node in candidates: + cost = _calc_activate_cost(node, neighbors_map, prepared, alive, input_nodes) + rank = topo_rank.get(node, default_rank) + if cost < best_cost or (cost == best_cost and rank < best_rank): + best_cost = cost + best_rank = rank + best_node = node + + # Prepare neighbors and the node itself (if non-input) at current_time + new_neighbors = neighbors_map[best_node] - prepared + needs_self_prep = best_node not in input_nodes and best_node not in prepared + to_prepare = new_neighbors | ({best_node} if needs_self_prep else set()) + needs_prep = bool(to_prepare) + if needs_prep: + for node_to_prep in to_prepare: + prepare_time[node_to_prep] = current_time + prepared.update(to_prepare) + alive.update(to_prepare) + + # Measure at current_time if no prep needed, otherwise at current_time + 1 + meas_time = current_time + 1 if needs_prep else current_time + measure_time[best_node] = meas_time + unmeasured.remove(best_node) + alive.remove(best_node) + + measure_candidates.remove(best_node) + + # Remove measured node from dependencies of all its children in the DAG + for child in dag.get(best_node, ()): + inv_dag[child].remove(best_node) + if not inv_dag[child] and child in unmeasured: + measure_candidates.add(child) + + current_time = meas_time + 1 + + return prepare_time, measure_time + + +def _calc_activate_cost( + node: int, + neighbors_map: Mapping[int, AbstractSet[int]], + prepared: AbstractSet[int], + alive: AbstractSet[int], + input_nodes: AbstractSet[int], +) -> int: + r"""Calculate the projected number of alive qubits if measuring this node next. + + If neighbors or the node itself must be prepared, they become alive at the + current time slice while the node itself remains alive until the next slice. + If no preparation is needed, the node is measured in the current slice and removed. + + Parameters + ---------- + node : `int` + The node to evaluate. + neighbors_map : `collections.abc.Mapping`\[`int`, `collections.abc.Set`\[`int`\]\] + Cached neighbor sets for graph nodes. + prepared : `collections.abc.Set`\[`int`\] + The set of currently prepared nodes. + alive : `collections.abc.Set`\[`int`\] + The set of currently active (prepared but not yet measured) nodes. + input_nodes : `collections.abc.Set`\[`int`\] + The set of input nodes (already prepared at the start). + + Returns + ------- + `int` + The activation cost for the node. + """ + new_neighbors = neighbors_map[node] - prepared + needs_self_prep = node not in input_nodes and node not in prepared + num_to_prepare = len(new_neighbors) + (1 if needs_self_prep else 0) + if num_to_prepare > 0: + return len(alive) + num_to_prepare + # No preparation needed -> node is measured in the current slice, so alive decreases by 1. + return max(len(alive) - 1, 0) + + +def alap_prepare_times( + graph: BaseGraphState, + measure_time: Mapping[int, int], +) -> dict[int, int]: + r"""Recompute preparation times using ALAP (As Late As Possible) strategy. + + Given fixed measurement times, this computes the latest possible preparation + time for each node while respecting the constraint that all neighbors must + be prepared before a node is measured. + + This post-processing reduces active volume (sum of qubit lifetimes) without + changing the measurement schedule or depth. + + Parameters + ---------- + graph : `BaseGraphState` + The graph state + measure_time : `collections.abc.Mapping`\[`int`, `int`\] + Fixed measurement times for non-output nodes + + Returns + ------- + `dict`\[`int`, `int`\] + ALAP preparation times for non-input nodes + """ + input_nodes = set(graph.input_node_indices.keys()) + + # deadline[v] = latest time v can be prepared + deadline: dict[int, int] = {} + + # For each measured node u, all its neighbors must be prepared before meas(u) + for u, meas_u in measure_time.items(): + for neighbor in graph.neighbors(u): + if neighbor in input_nodes: + continue # Input nodes don't need prep + if neighbor not in deadline: + deadline[neighbor] = meas_u - 1 + else: + deadline[neighbor] = min(deadline[neighbor], meas_u - 1) + + # For measured nodes, they must be prepared before their own measurement + for v, meas_v in measure_time.items(): + if v in input_nodes: + continue # Input nodes don't need prep + if v not in deadline: + deadline[v] = meas_v - 1 + else: + deadline[v] = min(deadline[v], meas_v - 1) + + # Handle nodes with no deadline yet (output nodes with no measured neighbors) + # These should be prepared at the latest possible time: max(measure_time) - 1 + # or 0 if there are no measurements + makespan = max(measure_time.values(), default=0) + for v in graph.physical_nodes - input_nodes: + if v not in deadline: + # No constraint from neighbors, prep as late as possible + deadline[v] = max(makespan - 1, 0) + + return deadline diff --git a/graphqomb/schedule_solver.py b/graphqomb/schedule_solver.py index ce478c93..0a20f47b 100644 --- a/graphqomb/schedule_solver.py +++ b/graphqomb/schedule_solver.py @@ -37,6 +37,7 @@ class ScheduleConfig: strategy: Strategy max_qubit_count: int | None = None max_time: int | None = None + use_greedy: bool = False @dataclass diff --git a/graphqomb/scheduler.py b/graphqomb/scheduler.py index 2e149857..8021ea3c 100644 --- a/graphqomb/scheduler.py +++ b/graphqomb/scheduler.py @@ -12,6 +12,7 @@ from typing import TYPE_CHECKING, NamedTuple from graphqomb.feedforward import dag_from_flow +from graphqomb.greedy_scheduler import greedy_minimize_space, greedy_minimize_time from graphqomb.schedule_solver import ScheduleConfig, Strategy, solve_schedule if TYPE_CHECKING: @@ -496,14 +497,15 @@ def solve_schedule( config: ScheduleConfig | None = None, timeout: int = 60, ) -> bool: - r"""Compute the schedule using the constraint programming solver. + r"""Compute the schedule using constraint programming or greedy heuristics. Parameters ---------- config : `ScheduleConfig` | `None`, optional - The scheduling configuration. If None, defaults to MINIMIZE_SPACE strategy. + The scheduling configuration. If None, defaults to MINIMIZE_TIME strategy. timeout : `int`, optional - Maximum solve time in seconds, by default 60 + Maximum solve time in seconds for CP-SAT solver, by default 60. + Ignored when use_greedy=True. Returns ------- @@ -518,7 +520,17 @@ def solve_schedule( if config is None: config = ScheduleConfig(Strategy.MINIMIZE_TIME) - result = solve_schedule(self.graph, self.dag, config, timeout) + result: tuple[dict[int, int], dict[int, int]] | None + if config.use_greedy: + # Use fast greedy heuristics + if config.strategy == Strategy.MINIMIZE_TIME: + result = greedy_minimize_time(self.graph, self.dag, max_qubit_count=config.max_qubit_count) + else: # Strategy.MINIMIZE_SPACE + result = greedy_minimize_space(self.graph, self.dag) + else: + # Use CP-SAT solver for optimal solution + result = solve_schedule(self.graph, self.dag, config, timeout) + if result is None: return False diff --git a/tests/test_feedforward.py b/tests/test_feedforward.py index b49beb64..6907e99a 100644 --- a/tests/test_feedforward.py +++ b/tests/test_feedforward.py @@ -6,14 +6,17 @@ from graphqomb.circuit import MBQCCircuit, circuit2graph from graphqomb.common import Axis, AxisMeasBasis, Plane, PlannerMeasBasis, Sign from graphqomb.feedforward import ( + TOPO_ORDER_CYCLE_ERROR_MSG, _is_flow, _is_gflow, check_dag, check_flow, dag_from_flow, + inverse_dag_from_dag, pauli_simplification, propagate_correction_map, signal_shifting, + topo_order_from_inv_dag, ) from graphqomb.graphstate import GraphState from graphqomb.qompiler import qompile @@ -96,6 +99,46 @@ def test_check_flow_true_for_acyclic() -> None: check_flow(graphstate, flow) +def test_topo_order_from_inv_dag_basic() -> None: + inv_dag = { + 0: set(), + 1: {0}, + 2: {1}, + } + assert topo_order_from_inv_dag(inv_dag) == [0, 1, 2] + + +def test_inverse_dag_from_dag_basic() -> None: + dag = { + 0: {1, 2}, + 1: {2}, + 2: set(), + } + assert inverse_dag_from_dag(dag) == { + 0: set(), + 1: {0}, + 2: {0, 1}, + } + + +def test_inverse_dag_from_dag_with_all_nodes() -> None: + dag = {0: {1}} + assert inverse_dag_from_dag(dag, all_nodes={0, 1, 2}) == { + 0: set(), + 1: {0}, + 2: set(), + } + + +def test_topo_order_from_inv_dag_cycle_raises() -> None: + inv_dag = { + 0: {1}, + 1: {0}, + } + with pytest.raises(RuntimeError, match=TOPO_ORDER_CYCLE_ERROR_MSG): + topo_order_from_inv_dag(inv_dag) + + # Tests for propagate_correction_map diff --git a/tests/test_greedy_scheduler.py b/tests/test_greedy_scheduler.py new file mode 100644 index 00000000..9b64e6f9 --- /dev/null +++ b/tests/test_greedy_scheduler.py @@ -0,0 +1,613 @@ +"""Test greedy scheduling algorithms.""" + +import pytest + +from graphqomb.graphstate import GraphState +from graphqomb.greedy_scheduler import ( + greedy_minimize_space, + greedy_minimize_time, +) +from graphqomb.schedule_solver import ScheduleConfig, Strategy +from graphqomb.scheduler import Scheduler + + +def test_greedy_minimize_time_simple() -> None: + """Test greedy_minimize_time on a simple graph.""" + # Create a simple 3-node chain graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + + # Run greedy scheduler + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # Check that all non-input nodes have preparation times + assert node1 in prepare_time + assert node0 not in prepare_time # Input node should not be prepared + + # Check that all non-output nodes have measurement times + assert node0 in measure_time + assert node1 in measure_time + assert node2 not in measure_time # Output node should not be measured + + # Verify DAG constraints: node0 measured before node1 + assert measure_time[node0] < measure_time[node1] + + +def test_greedy_minimize_space_simple() -> None: + """Test greedy_minimize_space on a simple graph.""" + # Create a simple 3-node chain graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + + # Run greedy scheduler + prepare_time, measure_time = greedy_minimize_space(graph, scheduler.dag) + + # Check that all non-input nodes have preparation times + assert node1 in prepare_time + assert node0 not in prepare_time # Input node should not be prepared + + # Check that all non-output nodes have measurement times + assert node0 in measure_time + assert node1 in measure_time + assert node2 not in measure_time # Output node should not be measured + + # Verify DAG constraints + assert measure_time[node0] < measure_time[node1] + + +def _compute_max_alive_qubits( + graph: GraphState, + prepare_time: dict[int, int], + measure_time: dict[int, int], +) -> int: + """Compute the maximum number of alive qubits over time. + + A node is considered alive at time t if: + - It is an input node and t >= -1 and t < measurement time (if any), or + - It has a preparation time p and t >= p and t < measurement time (if any). + + Returns + ------- + int + The maximum number of alive qubits at any time step. + """ + # Determine time range to check + max_t = max(set(prepare_time.values()) | set(measure_time.values()), default=0) + + max_alive = len(graph.input_node_indices) # At least inputs are alive at t = -1 + for t in range(max_t + 1): + alive_nodes: set[int] = set() + for node in graph.physical_nodes: + # Determine preparation time + prep_t = -1 if node in graph.input_node_indices else prepare_time.get(node) + + if prep_t is None or t < prep_t: + continue + + # Determine measurement time (None for outputs or unscheduled) + meas_t = measure_time.get(node) + + if meas_t is None or t < meas_t: + alive_nodes.add(node) + + max_alive = max(max_alive, len(alive_nodes)) + + return max_alive + + +def test_greedy_minimize_time_with_max_qubit_count_respects_limit() -> None: + """Verify that greedy_minimize_time respects max_qubit_count.""" + graph = GraphState() + # chain graph: 0-1-2-3 + n0 = graph.add_physical_node() + n1 = graph.add_physical_node() + n2 = graph.add_physical_node() + n3 = graph.add_physical_node() + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + graph.add_physical_edge(n2, n3) + + qindex = 0 + graph.register_input(n0, qindex) + graph.register_output(n3, qindex) + + flow = {n0: {n1}, n1: {n2}, n2: {n3}} + scheduler = Scheduler(graph, flow) + + # Set max_qubit_count to 2 (a feasible value for this graph) + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag, max_qubit_count=2) + + # Check basic properties + assert n1 in prepare_time + assert n0 not in prepare_time + assert n0 in measure_time + assert n2 in measure_time + assert n3 not in measure_time + + # Verify that the number of alive qubits never exceeds the limit + max_alive = _compute_max_alive_qubits(graph, prepare_time, measure_time) + assert max_alive <= 2 + + +def test_greedy_minimize_time_with_too_small_max_qubit_count_raises() -> None: + """Verify that greedy_minimize_time raises RuntimeError when max_qubit_count is too small.""" + graph = GraphState() + # chain graph: 0-1-2 (at least 2 qubits are needed) + n0 = graph.add_physical_node() + n1 = graph.add_physical_node() + n2 = graph.add_physical_node() + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + + qindex = 0 + graph.register_input(n0, qindex) + graph.register_output(n2, qindex) + + flow = {n0: {n1}, n1: {n2}} + scheduler = Scheduler(graph, flow) + + # max_qubit_count=1 is not feasible, so expect RuntimeError + with pytest.raises(RuntimeError, match="max_qubit_count"): + greedy_minimize_time(graph, scheduler.dag, max_qubit_count=1) + + +def test_greedy_scheduler_via_solve_schedule() -> None: + """Test greedy scheduler through Scheduler.solve_schedule with use_greedy=True.""" + # Create a simple graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + + # Test with greedy MINIMIZE_TIME + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + success = scheduler.solve_schedule(config) + assert success + + # Verify schedule is valid + scheduler.validate_schedule() + + # Test with greedy MINIMIZE_SPACE + scheduler2 = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_SPACE, use_greedy=True) + success = scheduler2.solve_schedule(config) + assert success + + # Verify schedule is valid + scheduler2.validate_schedule() + + +def test_greedy_vs_cpsat_correctness() -> None: + """Test that greedy scheduler produces valid schedules compared to CP-SAT.""" + # Create a slightly larger graph + graph = GraphState() + nodes = [graph.add_physical_node() for _ in range(5)] + + # Create a chain + for i in range(4): + graph.add_physical_edge(nodes[i], nodes[i + 1]) + + qindex = 0 + graph.register_input(nodes[0], qindex) + graph.register_output(nodes[4], qindex) + + flow = {nodes[i]: {nodes[i + 1]} for i in range(4)} + + # Test greedy scheduler + scheduler_greedy = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + success_greedy = scheduler_greedy.solve_schedule(config) + assert success_greedy + + # Verify greedy schedule is valid + scheduler_greedy.validate_schedule() + + # Test CP-SAT scheduler + scheduler_cpsat = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=False) + success_cpsat = scheduler_cpsat.solve_schedule(config, timeout=10) + assert success_cpsat + + # Verify CP-SAT schedule is valid + scheduler_cpsat.validate_schedule() + + # Both should produce valid schedules + # Note: Greedy may not be optimal, so we don't compare quality here + + +def test_greedy_scheduler_larger_graph() -> None: + """Test greedy scheduler on a larger graph to ensure scalability.""" + # Create a larger graph with branching structure + graph = GraphState() + num_layers = 4 + nodes_per_layer = 3 + + # Build layered graph + all_nodes: list[list[int]] = [] + for layer in range(num_layers): + layer_nodes = [graph.add_physical_node() for _ in range(nodes_per_layer)] + all_nodes.append(layer_nodes) + + # Connect to previous layer (if not first layer) + if layer > 0: + for i, node in enumerate(layer_nodes): + # Connect to corresponding node in previous layer + prev_node = all_nodes[layer - 1][i] + graph.add_physical_edge(prev_node, node) + + # Register inputs (first layer) and outputs (last layer) + for i, node in enumerate(all_nodes[0]): + graph.register_input(node, i) + for i, node in enumerate(all_nodes[-1]): + graph.register_output(node, i) + + # Build flow (simple forward flow) + flow: dict[int, set[int]] = {} + for layer in range(num_layers - 1): + for i, node in enumerate(all_nodes[layer]): + if node not in graph.output_node_indices: + flow[node] = {all_nodes[layer + 1][i]} + + # Test greedy scheduler + scheduler = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + success = scheduler.solve_schedule(config) + assert success + + # Validate the schedule + scheduler.validate_schedule() + + # Check that we got reasonable results + assert scheduler.num_slices() > 0 + assert scheduler.num_slices() <= num_layers * 2 # Reasonable upper bound + + +@pytest.mark.parametrize("strategy", [Strategy.MINIMIZE_TIME, Strategy.MINIMIZE_SPACE]) +def test_greedy_scheduler_both_strategies(strategy: Strategy) -> None: + """Test greedy scheduler with both optimization strategies.""" + # Create a graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + node3 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + graph.add_physical_edge(node2, node3) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node3, qindex) + + flow = {node0: {node1}, node1: {node2}, node2: {node3}} + scheduler = Scheduler(graph, flow) + + # Test with specified strategy + config = ScheduleConfig(strategy=strategy, use_greedy=True) + success = scheduler.solve_schedule(config) + assert success + + # Validate schedule + scheduler.validate_schedule() + + +def test_greedy_minimize_space_wrapper() -> None: + """Test the greedy_minimize_space wrapper function.""" + # Create a simple graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + + # Test MINIMIZE_TIME + result = greedy_minimize_time(graph, scheduler.dag) + assert result is not None + prepare_time, measure_time = result + assert len(prepare_time) > 0 + assert len(measure_time) > 0 + + # Test MINIMIZE_SPACE + result = greedy_minimize_space(graph, scheduler.dag) + assert result is not None + prepare_time, measure_time = result + assert len(prepare_time) > 0 + assert len(measure_time) > 0 + + +def test_greedy_scheduler_dag_constraints() -> None: + """Test that greedy scheduler respects DAG constraints.""" + # Create a graph with more complex dependencies + graph = GraphState() + nodes = [graph.add_physical_node() for _ in range(6)] + + # Create edges forming a DAG structure + # 0 -> 2 -> 4 + # | + # 1 -> 3 -> 5 + graph.add_physical_edge(nodes[0], nodes[2]) + graph.add_physical_edge(nodes[2], nodes[4]) + graph.add_physical_edge(nodes[1], nodes[3]) + graph.add_physical_edge(nodes[3], nodes[5]) + graph.add_physical_edge(nodes[2], nodes[3]) + + graph.register_input(nodes[0], 0) + graph.register_input(nodes[1], 1) + graph.register_output(nodes[4], 0) + graph.register_output(nodes[5], 1) + + # Create flow with dependencies + flow = { + nodes[0]: {nodes[2]}, + nodes[1]: {nodes[3]}, + nodes[2]: {nodes[4]}, + nodes[3]: {nodes[5], nodes[1]}, # cyclic dependency to test DAG constraint handling + } + + scheduler = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + + # Note: This flow creates a cyclic DAG (nodes 3 and 4 have circular dependency) + # The greedy scheduler should raise RuntimeError for invalid flows + with pytest.raises(RuntimeError, match="No nodes can be measured"): + scheduler.solve_schedule(config) + + +def test_greedy_scheduler_edge_constraints() -> None: + """Test that greedy scheduler respects edge constraints (neighbor preparation).""" + # Create a simple graph + graph = GraphState() + node0 = graph.add_physical_node() + node1 = graph.add_physical_node() + node2 = graph.add_physical_node() + graph.add_physical_edge(node0, node1) + graph.add_physical_edge(node1, node2) + qindex = 0 + graph.register_input(node0, qindex) + graph.register_output(node2, qindex) + + flow = {node0: {node1}, node1: {node2}} + scheduler = Scheduler(graph, flow) + config = ScheduleConfig(strategy=Strategy.MINIMIZE_TIME, use_greedy=True) + success = scheduler.solve_schedule(config) + assert success + + # Validate edge constraints via validate_schedule + scheduler.validate_schedule() + + # Manually check: neighbors must be prepared before measurement + # node0 (input) is prepared at time -1, node1 prepared at some time + # node0 must be measured after node1 is prepared + # This is ensured by the auto-scheduled entanglement times + + # Check that entanglement times were auto-scheduled correctly + edge01 = (node0, node1) + edge12 = (node1, node2) + entangle01 = scheduler.entangle_time[edge01] + entangle12 = scheduler.entangle_time[edge12] + assert entangle01 is not None + assert entangle12 is not None + + # Entanglement must happen before measurement + meas0 = scheduler.measure_time[node0] + meas1 = scheduler.measure_time[node1] + assert meas0 is not None + assert meas1 is not None + assert entangle01 < meas0 + assert entangle12 < meas1 + + +def test_greedy_minimize_time_3x3_grid_optimal() -> None: + """Test that greedy_minimize_time achieves optimal depth on 3x3 grid. + + This is a regression test for the optimization that measures in ASAP order + based on DAG dependencies. With ALAP preparation, nodes are prepared as + late as possible, but depth should still be optimal. + Previously, the greedy algorithm produced depth=4 instead of optimal depth=3. + """ + # Create 3x3 grid graph + # Layout: + # 0 - 3 - 6 + # | | | + # 1 - 4 - 7 + # | | | + # 2 - 5 - 8 + # Inputs: 0, 1, 2 (left column) + # Outputs: 6, 7, 8 (right column) + graph = GraphState() + nodes = [graph.add_physical_node() for _ in range(9)] + + # Horizontal edges + for row in range(3): + for col in range(2): + graph.add_physical_edge(nodes[row + col * 3], nodes[row + (col + 1) * 3]) + + # Vertical edges + for row in range(2): + for col in range(3): + graph.add_physical_edge(nodes[row + col * 3], nodes[row + 1 + col * 3]) + + # Register inputs (left column) and outputs (right column) + for row in range(3): + graph.register_input(nodes[row], row) + graph.register_output(nodes[row + 6], row) + + # Flow: left to right + flow: dict[int, set[int]] = {} + for row in range(3): + flow[nodes[row]] = {nodes[row + 3]} # 0->3, 1->4, 2->5 + flow[nodes[row + 3]] = {nodes[row + 6]} # 3->6, 4->7, 5->8 + + scheduler = Scheduler(graph, flow) + + # Test greedy scheduler (no qubit limit) + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # With ALAP, nodes are prepared as late as possible, not at time=0 + # Check that all non-input nodes have a prepare_time + for node in [3, 4, 5, 6, 7, 8]: + assert node in prepare_time, f"Node {node} should have a prepare_time" + + # Calculate depth + greedy_depth = max(measure_time.values()) + 1 + + # The optimal depth for a 3x3 grid is 3 (same as CP-SAT) + assert greedy_depth == 3, f"Expected depth=3, got depth={greedy_depth}" + + +def test_greedy_minimize_time_alap_preparation() -> None: + """Test that greedy_minimize_time uses ALAP preparation to minimize active volume.""" + graph = GraphState() + # Create a 4-node chain: 0-1-2-3 + n0 = graph.add_physical_node() + n1 = graph.add_physical_node() + n2 = graph.add_physical_node() + n3 = graph.add_physical_node() + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + graph.add_physical_edge(n2, n3) + + graph.register_input(n0, 0) + graph.register_output(n3, 0) + + flow = {n0: {n1}, n1: {n2}, n2: {n3}} + scheduler = Scheduler(graph, flow) + + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # With ALAP, nodes should be prepared as late as possible + # n1 is neighbor of n0, so prep(n1) < meas(n0) + assert prepare_time[n1] == measure_time[n0] - 1 + # n2 is neighbor of n1, so prep(n2) < meas(n1) + assert prepare_time[n2] == measure_time[n1] - 1 + # n3 (output) is neighbor of n2, so prep(n3) < meas(n2) + assert prepare_time[n3] == measure_time[n2] - 1 + + # Input node should not have prepare_time + assert n0 not in prepare_time + + +def test_alap_reduces_active_volume() -> None: + """Test that ALAP preparation reduces active volume compared to ASAP.""" + graph = GraphState() + # Create a chain graph: 0-1-2-3 + n0 = graph.add_physical_node() + n1 = graph.add_physical_node() + n2 = graph.add_physical_node() + n3 = graph.add_physical_node() + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + graph.add_physical_edge(n2, n3) + graph.register_input(n0, 0) + graph.register_output(n3, 0) + + flow = {n0: {n1}, n1: {n2}, n2: {n3}} + scheduler = Scheduler(graph, flow) + + prepare_time, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # With ALAP: n3 (output) should be prepared as late as possible + # n3 is neighbor of n2, so prep(n3) < meas(n2) + # This should be later than time=0 + assert prepare_time[n3] == measure_time[n2] - 1 + assert prepare_time[n3] > 0 # ALAP should delay preparation + + +def test_alap_preserves_depth() -> None: + """Test that ALAP does not increase depth.""" + # Create a 3x3 grid + graph = GraphState() + nodes = [graph.add_physical_node() for _ in range(9)] + + # Horizontal and vertical edges + for row in range(3): + for col in range(2): + graph.add_physical_edge(nodes[row + col * 3], nodes[row + (col + 1) * 3]) + for row in range(2): + for col in range(3): + graph.add_physical_edge(nodes[row + col * 3], nodes[row + 1 + col * 3]) + + for row in range(3): + graph.register_input(nodes[row], row) + graph.register_output(nodes[row + 6], row) + + flow: dict[int, set[int]] = {nodes[row]: {nodes[row + 3]} for row in range(3)} + flow.update({nodes[row + 3]: {nodes[row + 6]} for row in range(3)}) + + scheduler = Scheduler(graph, flow) + _, measure_time = greedy_minimize_time(graph, scheduler.dag) + + # Depth should still be optimal (3) + assert max(measure_time.values()) + 1 == 3 + + +def test_greedy_minimize_space_non_input_dag_root() -> None: + """Test greedy_minimize_space handles non-input nodes that are DAG roots. + + This is a regression test for a bug where non-input nodes with no DAG + dependencies and all input neighbors would not be prepared before measurement, + causing a KeyError when removing from the alive set. + """ + graph = GraphState() + n0 = graph.add_physical_node() # input + n1 = graph.add_physical_node() # non-input, DAG root (no feedforward dependency) + n2 = graph.add_physical_node() # input + n3 = graph.add_physical_node() # output + + graph.add_physical_edge(n0, n1) + graph.add_physical_edge(n1, n2) + graph.add_physical_edge(n2, n3) + + graph.register_input(n0, 0) + graph.register_input(n2, 1) + graph.register_output(n3, 0) + + # Empty DAG: no feedforward dependencies + dag: dict[int, set[int]] = {} + + # This should not raise KeyError + prepare_time, measure_time = greedy_minimize_space(graph, dag) + + # Verify that n1 (non-input) was prepared + assert n1 in prepare_time + # Verify that n1 was measured + assert n1 in measure_time + # Verify prepare happens before measure + assert prepare_time[n1] < measure_time[n1] + # Input nodes should not be in prepare_time + assert n0 not in prepare_time + assert n2 not in prepare_time